diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java
index f473e9427ba5d..bbfb2583f8874 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java
@@ -710,4 +710,118 @@ private OpenFileOptions() {
public static final String FS_OPTION_OPENFILE_EC_POLICY =
FS_OPTION_OPENFILE + "ec.policy";
}
+
+ /**
+ * The standard {@code createFile()} options.
+ *
+ * If an option is not supported during file creation and it is considered
+ * part of a commit protocol, then, when supplied in a must() option,
+ * it MUST be rejected.
+ */
+ @InterfaceAudience.Public
+ @InterfaceStability.Evolving
+ public static final class CreateFileOptionKeys {
+
+ private CreateFileOptionKeys() {
+ }
+
+ /**
+ * {@code createFile()} option to write a file in the close() operation iff
+ * there is nothing at the destination.
+ * this is the equivalent of {@code create(path, overwrite=true)}
+ * except that the existence check is postponed to the end of the write.
+ *
+ * Value {@value}.
+ *
+ *
+ * This can be set in the builder.
+ *
+ *
+ *
It is for object stores stores which only upload/manifest files
+ * at the end of the stream write.
+ *
Streams which support it SHALL not manifest any object to
+ * the destination path until close()
+ *
It MUST be declared as a stream capability in streams for which
+ * this overwrite is enabled.
+ *
It MUST be exported as a path capability for all stores where
+ * the feature is available and enabled
+ *
If passed to a filesystem as a {@code must()} parameter where
+ * the option value is {@code true}, and it is supported/enabled,
+ * the FS SHALL omit all overwrite checks in {@code create},
+ * including for the existence of an object or a directory underneath.
+ * Instead, during {@code close()} the object will only be manifest
+ * at the target path if there is no object at the destination.
+ *
+ *
The existence check and object creation SHALL be atomic.
+ *
If passed to a filesystem as a {@code must()} parameter where
+ * the option value is {@code true}, and the FS does not recognise
+ * the feature, or it is recognized but disabled on this FS instance,
+ * the filesystem SHALL reject the request.
+ *
+ *
If passed to a filesystem as a {@code opt()} parameter where
+ * the option value is {@code true}, the filesystem MAY ignore
+ * the request, or it MAY enable the feature.
+ * Any filesystem which does not support the feature, including
+ * from older releases, SHALL ignore it.
+ *
+ *
+ */
+ public static final String FS_OPTION_CREATE_CONDITIONAL_OVERWRITE =
+ "fs.option.create.conditional.overwrite";
+
+ /**
+ * Overwrite a file only if there is an Etag match. This option takes a string,
+ *
+ * Value {@value}.
+ *
+ * This is similar to {@link #FS_OPTION_CREATE_CONDITIONAL_OVERWRITE}.
+ *
+ *
If supported and enabled, it SHALL be declared as a capability of the filesystem
+ *
If supported and enabled, it SHALL be declared as a capability of the stream
+ *
The string passed as the value SHALL be the etag value as returned by
+ * {@code EtagSource.getEtag()}
+ *
This value MUST NOT be empty
+ *
If passed to a filesystem which supports it, then when the file is created,
+ * the store SHALL check for the existence of a file/object at the destination
+ * path.
+ *
+ *
If there is no object there, the operation SHALL be rejected by raising
+ * either a {@code org.apache.hadoop.fs.FileAlreadyExistsException}
+ * exception, or a{@code java.nio.file.FileAlreadyExistsException}
+ *
+ *
If there is an object there, its Etag SHALL be compared to the
+ * value passed here.
+ *
If there is no match, the operation SHALL be rejected by raising
+ * either a {@code org.apache.hadoop.fs.FileAlreadyExistsException}
+ * exception, or a{@code java.nio.file.FileAlreadyExistsException}
+ *
+ *
If the etag does match, the file SHALL be created.
+ *
The check and create SHALL be atomic
+ *
The check and create MAY be at the end of the write, in {@code close()},
+ * or it MAY be in the {@code create()} operation. That is: some stores
+ * MAY perform the check early
+ *
If supported and enabled, stores MAY check for the existence of subdirectories;
+ * this behavior is implementation-specific.
+ *
+ */
+ public static final String FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG =
+ "fs.option.create.conditional.overwrite.etag";
+
+ /**
+ * A flag which requires the filesystem to create files/objects in close(),
+ * rather than create/createFile.
+ *
+ * Object stores with this behavior should also export it as a path capability.
+ *
+ * Value {@value}.
+ */
+ public static final String FS_OPTION_CREATE_IN_CLOSE = "fs.option.create.in.close";
+
+ /**
+ * String to define the content filetype.
+ * Value {@value}.
+ */
+ public static final String FS_OPTION_CREATE_CONTENT_TYPE = "fs.option.create.content.type";
+
+ }
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java
index e3deda775286a..13832f8f4adc9 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java
@@ -467,6 +467,13 @@ public final class StoreStatisticNames {
public static final String MULTIPART_UPLOAD_LIST
= "multipart_upload_list";
+ public static final String CONDITIONAL_CREATE
+ = "conditional_create";
+
+ public static final String CONDITIONAL_CREATE_FAILED
+ = "conditional_create_failed";
+
+
private StoreStatisticNames() {
}
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdataoutputstreambuilder.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdataoutputstreambuilder.md
index 7dd3170036ce9..e31208eb5c187 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdataoutputstreambuilder.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdataoutputstreambuilder.md
@@ -192,6 +192,7 @@ Here are the custom options which the S3A Connector supports.
|-----------------------------|-----------|----------------------------------------|
| `fs.s3a.create.performance` | `boolean` | create a file with maximum performance |
| `fs.s3a.create.header` | `string` | prefix for user supplied headers |
+| `fs.s3a.create.multipart` | `boolean` | create a multipart file |
### `fs.s3a.create.performance`
@@ -200,9 +201,11 @@ Prioritize file creation performance over safety checks for filesystem consisten
This:
1. Skips the `LIST` call which makes sure a file is being created over a directory.
Risk: a file is created over a directory.
-2. Ignores the overwrite flag.
+2. If the overwrite flag is false, uses conditional creation to prevent the overwrtie
3. Never issues a `DELETE` call to delete parent directory markers.
+
+
It is possible to probe an S3A Filesystem instance for this capability through
the `hasPathCapability(path, "fs.s3a.create.performance")` check.
@@ -243,3 +246,17 @@ When an object is renamed, the metadata is propagated the copy created.
It is possible to probe an S3A Filesystem instance for this capability through
the `hasPathCapability(path, "fs.s3a.create.header")` check.
+
+### `fs.s3a.create.multipart` Create a multipart file
+
+Initiate a multipart upload when a file is created, rather
+than only when the amount of data buffered reaches the threshold
+set in `fs.s3a.multipart.size`.
+
+This is only relevant during testing, as it allows for multipart
+operation to be initiated without writing any data, so
+reducing test time.
+
+It is not recommended for production use, because as well as adding
+more network IO, it is not compatible with third-party stores which
+do not supprt multipart uploads.
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index 822f682240437..d29aef6cdb8ee 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -1522,6 +1522,29 @@ private Constants() {
*/
public static final String FS_S3A_PERFORMANCE_FLAGS =
"fs.s3a.performance.flags";
+
+ /**
+ * Is the create overwrite feature enabled or not?
+ * A configuration option and a path status probe.
+ * Value {@value}.
+ */
+ public static final String FS_S3A_CONDITIONAL_CREATE_ENABLED =
+ "fs.s3a.conditional.create.enabled";
+
+ /**
+ * Default value for {@link #FS_S3A_CONDITIONAL_CREATE_ENABLED}.
+ * Value {@value}.
+ */
+ public static final boolean DEFAULT_FS_S3A_CONDITIONAL_CREATE_ENABLED = true;
+
+ /**
+ * createFile() boolean option toreate a multipart file, always: {@value}.
+ *
+ * This is inefficient and will not work on a store which doesn't support that feature,
+ * so is primarily for testing.
+ */
+ public static final String FS_S3A_CREATE_MULTIPART = "fs.s3a.create.multipart";
+
/**
* Prefix for adding a header to the object when created.
* The actual value must have a "." suffix and then the actual header.
@@ -1845,4 +1868,11 @@ private Constants() {
public static final String ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX =
"fs.s3a.analytics.accelerator";
+ /**
+ * Value for the {@code If-None-Match} HTTP header in S3 requests.
+ * Value: {@value}.
+ * More information:
+ * AWS S3 PutObject API Documentation
+ */
+ public static final String IF_NONE_MATCH_STAR = "*";
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
index db1134dc5a2e2..9574485eb9dce 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
@@ -26,6 +26,7 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@@ -52,6 +53,7 @@
import org.apache.hadoop.fs.s3a.impl.ProgressListener;
import org.apache.hadoop.fs.s3a.impl.ProgressListenerEvent;
import org.apache.hadoop.fs.s3a.impl.PutObjectOptions;
+import org.apache.hadoop.fs.s3a.impl.write.WriteObjectFlags;
import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
@@ -224,6 +226,11 @@ class S3ABlockOutputStream extends OutputStream implements
/** Is multipart upload enabled? */
private final boolean isMultipartUploadEnabled;
+ /**
+ * Object write option flags.
+ */
+ private final EnumSet writeObjectFlags;
+
/**
* An S3A output stream which uploads partitions in a separate pool of
* threads; different {@link S3ADataBlocks.BlockFactory}
@@ -249,6 +256,7 @@ class S3ABlockOutputStream extends OutputStream implements
this.iostatistics = statistics.getIOStatistics();
this.writeOperationHelper = builder.writeOperations;
this.putTracker = builder.putTracker;
+ this.writeObjectFlags = builder.putOptions.getWriteObjectFlags();
this.executorService = MoreExecutors.listeningDecorator(
builder.executorService);
this.multiPartUpload = null;
@@ -266,9 +274,19 @@ class S3ABlockOutputStream extends OutputStream implements
? builder.blockSize
: -1;
+ // if required to be multipart by the committer put tracker or
+ // write flags (i.e createFile() options, initiate multipart uploads.
+ // this will fail fast if the store doesn't support multipart uploads
if (putTracker.initialize()) {
LOG.debug("Put tracker requests multipart upload");
initMultipartUpload();
+ } else if (writeObjectFlags.contains(WriteObjectFlags.CreateMultipart)) {
+ // this not merged simply to avoid confusion
+ // to what to do it both are set, so as to guarantee
+ // the put tracker initialization always takes priority
+ // over any file flag.
+ LOG.debug("Multipart initiated from createFile() options");
+ initMultipartUpload();
}
this.isCSEEnabled = builder.isCSEEnabled;
this.threadIOStatisticsAggregator = builder.ioStatisticsAggregator;
@@ -772,7 +790,8 @@ BlockOutputStreamStatistics getStatistics() {
@SuppressWarnings("deprecation")
@Override
public boolean hasCapability(String capability) {
- switch (capability.toLowerCase(Locale.ENGLISH)) {
+ final String cap = capability.toLowerCase(Locale.ENGLISH);
+ switch (cap) {
// does the output stream have delayed visibility
case CommitConstants.STREAM_CAPABILITY_MAGIC_OUTPUT:
@@ -797,6 +816,12 @@ public boolean hasCapability(String capability) {
return true;
default:
+ // scan flags for the capability
+ for (WriteObjectFlags flag : writeObjectFlags) {
+ if (flag.hasKey(cap)) {
+ return true;
+ }
+ }
return false;
}
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 2e0d00fd74244..95659766876b1 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -153,6 +153,7 @@
import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStreamCallbacks;
import org.apache.hadoop.fs.s3a.impl.streams.StreamFactoryRequirements;
import org.apache.hadoop.fs.s3a.impl.streams.StreamIntegration;
+import org.apache.hadoop.fs.s3a.impl.write.WriteObjectFlags;
import org.apache.hadoop.fs.s3a.tools.MarkerToolOperations;
import org.apache.hadoop.fs.s3a.tools.MarkerToolOperationsImpl;
import org.apache.hadoop.fs.statistics.DurationTracker;
@@ -226,6 +227,10 @@
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
import static org.apache.hadoop.fs.CommonPathCapabilities.DIRECTORY_LISTING_INCONSISTENT;
+import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONDITIONAL_OVERWRITE;
+import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG;
+import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONTENT_TYPE;
+import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_IN_CLOSE;
import static org.apache.hadoop.fs.impl.FlagSet.buildFlagSet;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
import static org.apache.hadoop.fs.s3a.Constants.*;
@@ -512,6 +517,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
*/
private boolean s3AccessGrantsEnabled;
+ /**
+ * Are the conditional create operations enabled?
+ */
+ private boolean conditionalCreateEnabled;
+
/** Add any deprecated keys. */
@SuppressWarnings("deprecation")
private static void addDeprecatedKeys() {
@@ -694,6 +704,9 @@ public void initialize(URI name, Configuration originalConf)
" access points. Upgrading to V2");
useListV1 = false;
}
+ conditionalCreateEnabled = conf.getBoolean(FS_S3A_CONDITIONAL_CREATE_ENABLED,
+ DEFAULT_FS_S3A_CONDITIONAL_CREATE_ENABLED);
+
signerManager = new SignerManager(bucket, this, conf, owner);
signerManager.initCustomSigners();
@@ -2081,20 +2094,66 @@ private FSDataOutputStream innerCreateFile(
}
EnumSet flags = options.getFlags();
- boolean skipProbes = options.isPerformance() || isUnderMagicCommitPath(path);
- if (skipProbes) {
- LOG.debug("Skipping existence/overwrite checks");
- } else {
+ /*
+ Calculate whether to perform HEAD/LIST checks,
+ and whether the conditional create option should be set.
+ This seems complicated, but comes down to
+ "if explicitly requested and the FS enables it, use".
+ */
+ // create file attributes
+ boolean cCreate = options.isConditionalOverwrite();
+ boolean cEtag = options.isConditionalOverwriteEtag();
+ boolean createPerf = options.isPerformance();
+ boolean overwrite = flags.contains(CreateFlag.OVERWRITE);
+
+ // path attributes
+ boolean magic = isUnderMagicCommitPath(path);
+
+ // store options
+ // is CC available.
+ boolean ccAvailable = conditionalCreateEnabled;
+
+ if (!ccAvailable && (cCreate || cEtag)) {
+ // fail fast if conditional creation is requested on an FS without it.
+ throw new PathIOException(path.toString(), "Conditional Writes Unavailable");
+ }
+
+ // probes to evaluate.
+ Set probes = EnumSet.of(
+ StatusProbeEnum.List, StatusProbeEnum.Head);
+
+
+ // the PUT is conditional if requested, or if one of the
+ // this is a performance creation, overwrite has not been requested,
+ // this is not and etag write *and* conditional creation is available.
+ // write is NOT conditional etag write.
+ boolean conditionalPut = cCreate
+ || !(overwrite || cEtag) && ccAvailable && createPerf;
+
+ // skip the HEAD check for many reasons
+ // old: the path is magic, it's an overwrite or the "create" performance is set.
+ // new: also skip if any conditional create operation is in progress
+
+ boolean skipHead =
+ createPerf || magic || overwrite // classic reasons to skip HEAD
+ || cCreate || cEtag; // conditional creation
+
+ if (skipHead) {
+ probes.remove(StatusProbeEnum.Head);
+ }
+
+ // list logic
+ boolean skipList = createPerf || magic || cCreate || cEtag;
+ if (skipList) {
+ probes.remove(StatusProbeEnum.List);
+ }
+
+ // if probes are required -request them and evaluate the result.
+ if (!probes.isEmpty()) {
try {
- boolean overwrite = flags.contains(CreateFlag.OVERWRITE);
// get the status or throw an FNFE.
- // when overwriting, there is no need to look for any existing file,
- // just a directory (for safety)
- FileStatus status = innerGetFileStatus(path, false,
- overwrite
- ? StatusProbeEnum.DIRECTORIES
- : StatusProbeEnum.ALL);
+ FileStatus status = innerGetFileStatus(path, false, probes);
// if the thread reaches here, there is something at the path
if (status.isDirectory()) {
@@ -2109,6 +2168,10 @@ private FSDataOutputStream innerCreateFile(
} catch (FileNotFoundException e) {
// this means there is nothing at the path; all good.
}
+ } else {
+ LOG.debug("Skipping all probes with flags:"
+ + " createPerf={}, magic={}, ccAvailable={}, cCreate={}, cEtag={}",
+ createPerf, magic, ccAvailable, cCreate, cEtag);
}
instrumentation.fileCreated();
final BlockOutputStreamStatistics outputStreamStatistics
@@ -2117,37 +2180,45 @@ private FSDataOutputStream innerCreateFile(
committerIntegration.createTracker(path, key, outputStreamStatistics);
String destKey = putTracker.getDestKey();
+ EnumSet putFlags = options.writeObjectFlags();
+ if (conditionalPut) {
+ putFlags.add(WriteObjectFlags.ConditionalOverwrite);
+ }
+
// put options are derived from the option builder.
final PutObjectOptions putOptions =
- new PutObjectOptions(null, options.getHeaders());
+ new PutObjectOptions(null,
+ options.getHeaders(),
+ putFlags,
+ options.etag());
validateOutputStreamConfiguration(path, getConf());
final S3ABlockOutputStream.BlockOutputStreamBuilder builder =
S3ABlockOutputStream.builder()
- .withKey(destKey)
- .withBlockFactory(blockFactory)
- .withBlockSize(partSize)
- .withStatistics(outputStreamStatistics)
- .withProgress(progress)
- .withPutTracker(putTracker)
- .withWriteOperations(
- createWriteOperationHelper(auditSpan))
- .withExecutorService(
- new SemaphoredDelegatingExecutor(
- boundedThreadPool,
- blockOutputActiveBlocks,
- true,
- outputStreamStatistics))
- .withDowngradeSyncableExceptions(
+ .withKey(destKey)
+ .withBlockFactory(blockFactory)
+ .withBlockSize(partSize)
+ .withStatistics(outputStreamStatistics)
+ .withProgress(progress)
+ .withPutTracker(putTracker)
+ .withWriteOperations(
+ createWriteOperationHelper(auditSpan))
+ .withExecutorService(
+ new SemaphoredDelegatingExecutor(
+ boundedThreadPool,
+ blockOutputActiveBlocks,
+ true,
+ outputStreamStatistics))
+ .withDowngradeSyncableExceptions(
getConf().getBoolean(
DOWNGRADE_SYNCABLE_EXCEPTIONS,
DOWNGRADE_SYNCABLE_EXCEPTIONS_DEFAULT))
- .withCSEEnabled(isCSEEnabled)
- .withPutOptions(putOptions)
- .withIOStatisticsAggregator(
- IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator())
- .withMultipartEnabled(isMultipartUploadEnabled);
+ .withCSEEnabled(isCSEEnabled)
+ .withPutOptions(putOptions)
+ .withIOStatisticsAggregator(
+ IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator())
+ .withMultipartEnabled(isMultipartUploadEnabled);
return new FSDataOutputStream(
new S3ABlockOutputStream(builder),
null);
@@ -3192,7 +3263,10 @@ private DeleteObjectsResponse deleteObjects(DeleteObjectsRequest deleteRequest)
public PutObjectRequest.Builder newPutObjectRequestBuilder(String key,
long length,
boolean isDirectoryMarker) {
- return requestFactory.newPutObjectRequestBuilder(key, null, length, isDirectoryMarker);
+ return requestFactory.newPutObjectRequestBuilder(key,
+ PutObjectOptions.defaultOptions(),
+ length,
+ isDirectoryMarker);
}
/**
@@ -5362,11 +5436,20 @@ public boolean hasPathCapability(final Path path, final String capability)
case STORE_CAPABILITY_DIRECTORY_MARKER_MULTIPART_UPLOAD_ENABLED:
return isMultipartUploadEnabled();
- // create file options
+ // create file options which are always true
+
+ case FS_OPTION_CREATE_IN_CLOSE:
+ case FS_OPTION_CREATE_CONTENT_TYPE:
case FS_S3A_CREATE_PERFORMANCE:
case FS_S3A_CREATE_HEADER:
return true;
+ // conditional create requires it to be enabled in the FS.
+ case FS_S3A_CONDITIONAL_CREATE_ENABLED:
+ case FS_OPTION_CREATE_CONDITIONAL_OVERWRITE:
+ case FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG:
+ return conditionalCreateEnabled;
+
// is the FS configured for create file performance
case FS_S3A_CREATE_PERFORMANCE_ENABLED:
return performanceFlags.enabled(PerformanceFlagEnum.Create);
@@ -5390,7 +5473,7 @@ public boolean hasPathCapability(final Path path, final String capability)
}
// ask the store for what capabilities it offers
- // this may include input and output capabilites -and more
+ // this includes, store configuration flags, IO capabilites...etc.
if (getStore() != null && getStore().hasPathCapability(path, capability)) {
return true;
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
index 1d26eb6275021..b3c907428ac4e 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
@@ -1530,7 +1530,9 @@ private OutputStreamStatistics(
STREAM_WRITE_TOTAL_DATA.getSymbol(),
STREAM_WRITE_TOTAL_TIME.getSymbol(),
INVOCATION_HFLUSH.getSymbol(),
- INVOCATION_HSYNC.getSymbol())
+ INVOCATION_HSYNC.getSymbol(),
+ CONDITIONAL_CREATE.getSymbol(),
+ CONDITIONAL_CREATE_FAILED.getSymbol())
.withGauges(
STREAM_WRITE_BLOCK_UPLOADS_ACTIVE.getSymbol(),
STREAM_WRITE_BLOCK_UPLOADS_PENDING.getSymbol(),
@@ -1688,6 +1690,15 @@ public void hsyncInvoked() {
incCounter(INVOCATION_HSYNC.getSymbol(), 1);
}
+ @Override
+ public void conditionalCreateOutcome(boolean success) {
+ if (success) {
+ incCounter(CONDITIONAL_CREATE.getSymbol(), 1);
+ } else {
+ incCounter(CONDITIONAL_CREATE_FAILED.getSymbol(), 1);
+ }
+ }
+
@Override
public void close() {
if (getBytesPendingUpload() > 0) {
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
index ffd3f5e115519..ee98693e6963a 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
@@ -108,6 +108,12 @@ public enum Statistic {
"Filesystem close",
TYPE_DURATION),
+ CONDITIONAL_CREATE(StoreStatisticNames.CONDITIONAL_CREATE,
+ "Count of successful conditional create operations.",
+ TYPE_COUNTER),
+ CONDITIONAL_CREATE_FAILED(StoreStatisticNames.CONDITIONAL_CREATE_FAILED,
+ "Count of failed conditional create operations.",
+ TYPE_COUNTER),
DIRECTORIES_CREATED("directories_created",
"Total number of directories created through the object store.",
TYPE_COUNTER),
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
index 969c1023d7347..d0d1f0f833c1e 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
@@ -318,8 +318,10 @@ private CompleteMultipartUploadResponse finalizeMultipartUpload(
retrying,
() -> {
final CompleteMultipartUploadRequest.Builder requestBuilder =
- getRequestFactory().newCompleteMultipartUploadRequestBuilder(
- destKey, uploadId, partETags);
+ getRequestFactory().newCompleteMultipartUploadRequestBuilder(destKey,
+ uploadId,
+ partETags,
+ putOptions);
return writeOperationHelperCallbacks.completeMultipartUpload(requestBuilder.build());
});
return uploadResult;
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java
index c69e3394c3dd3..294f0a08f9257 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java
@@ -168,12 +168,14 @@ CreateMultipartUploadRequest.Builder newMultipartUploadRequestBuilder(
* @param destKey destination object key
* @param uploadId ID of initiated upload
* @param partETags ordered list of etags
+ * @param putOptions options for the request
* @return the request builder.
*/
CompleteMultipartUploadRequest.Builder newCompleteMultipartUploadRequestBuilder(
String destKey,
String uploadId,
- List partETags);
+ List partETags,
+ PutObjectOptions putOptions);
/**
* Create a HEAD object request builder.
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/S3MagicCommitTracker.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/S3MagicCommitTracker.java
index ecc3496ce8f3a..dafcbe45ac5c9 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/S3MagicCommitTracker.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/S3MagicCommitTracker.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.fs.s3a.commit.magic;
import java.io.IOException;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -33,6 +34,7 @@
import org.apache.hadoop.fs.s3a.WriteOperationHelper;
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
import org.apache.hadoop.fs.s3a.impl.PutObjectOptions;
+import org.apache.hadoop.fs.s3a.impl.write.WriteObjectFlags;
import org.apache.hadoop.fs.s3a.statistics.PutTrackerStatistics;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
@@ -40,6 +42,7 @@
import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_MAGIC_MARKER_PUT;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.X_HEADER_MAGIC_MARKER;
+import static org.apache.hadoop.fs.s3a.impl.PutObjectOptions.defaultOptions;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation;
/**
@@ -79,7 +82,10 @@ public boolean aboutToComplete(String uploadId,
PutObjectRequest originalDestPut = getWriter().createPutObjectRequest(
getOriginalDestKey(),
0,
- new PutObjectOptions(null, headers));
+ new PutObjectOptions(null,
+ headers,
+ EnumSet.noneOf(WriteObjectFlags.class),
+ ""));
upload(originalDestPut, EMPTY);
// build the commit summary
@@ -103,7 +109,8 @@ public boolean aboutToComplete(String uploadId,
getPath(), getPendingPartKey(), commitData);
PutObjectRequest put = getWriter().createPutObjectRequest(
getPendingPartKey(),
- bytes.length, null);
+ bytes.length,
+ defaultOptions());
upload(put, bytes);
return false;
}
@@ -117,7 +124,7 @@ public boolean aboutToComplete(String uploadId,
@Retries.RetryTranslated
private void upload(PutObjectRequest request, byte[] bytes) throws IOException {
trackDurationOfInvocation(getTrackerStatistics(), COMMITTER_MAGIC_MARKER_PUT.getSymbol(),
- () -> getWriter().putObject(request, PutObjectOptions.defaultOptions(),
+ () -> getWriter().putObject(request, defaultOptions(),
new S3ADataBlocks.BlockUploadData(bytes, null), null));
}
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSHeaders.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSHeaders.java
index fe7b45147986f..8a7dd3b173cdb 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSHeaders.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSHeaders.java
@@ -38,6 +38,8 @@ public interface AWSHeaders {
String DATE = "Date";
String ETAG = "ETag";
String LAST_MODIFIED = "Last-Modified";
+ String IF_NONE_MATCH = "If-None-Match";
+ String IF_MATCH = "If-Match";
/*
* Amazon HTTP Headers used by S3A.
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CreateFileBuilder.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CreateFileBuilder.java
index ae2945989ddd3..3706937389135 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CreateFileBuilder.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CreateFileBuilder.java
@@ -21,9 +21,9 @@
import java.io.IOException;
import java.util.EnumSet;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.hadoop.conf.Configuration;
@@ -33,11 +33,22 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIOException;
-import org.apache.hadoop.fs.s3a.Constants;
+import org.apache.hadoop.fs.s3a.impl.write.WriteObjectFlags;
import org.apache.hadoop.util.Progressable;
+import static java.util.Objects.requireNonNull;
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG;
+import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONTENT_TYPE;
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_HEADER;
+import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.CONTENT_TYPE;
+import static org.apache.hadoop.fs.s3a.impl.write.WriteObjectFlags.ConditionalOverwrite;
+import static org.apache.hadoop.fs.s3a.impl.write.WriteObjectFlags.ConditionalOverwriteEtag;
+import static org.apache.hadoop.fs.s3a.impl.write.WriteObjectFlags.CreateMultipart;
+import static org.apache.hadoop.fs.s3a.impl.write.WriteObjectFlags.Performance;
+import static org.apache.hadoop.fs.s3a.impl.write.WriteObjectFlags.Recursive;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.CREATE_FILE_KEYS;
+import static org.apache.hadoop.util.Preconditions.checkArgument;
/**
* Builder used in create file; takes a callback to the operation
@@ -63,19 +74,25 @@ public class CreateFileBuilder extends
* Classic create file option set: overwriting.
*/
public static final CreateFileOptions OPTIONS_CREATE_FILE_OVERWRITE =
- new CreateFileOptions(CREATE_OVERWRITE_FLAGS, true, false, null);
+ new CreateFileOptions(CREATE_OVERWRITE_FLAGS,
+ EnumSet.of(Recursive),
+ null, null);
/**
* Classic create file option set: no overwrite.
*/
public static final CreateFileOptions OPTIONS_CREATE_FILE_NO_OVERWRITE =
- new CreateFileOptions(CREATE_NO_OVERWRITE_FLAGS, true, false, null);
+ new CreateFileOptions(CREATE_NO_OVERWRITE_FLAGS,
+ EnumSet.of(Recursive),
+ null, null);
/**
* Performance create options.
*/
public static final CreateFileOptions OPTIONS_CREATE_FILE_PERFORMANCE =
- new CreateFileOptions(CREATE_OVERWRITE_FLAGS, true, true, null);
+ new CreateFileOptions(CREATE_OVERWRITE_FLAGS,
+ EnumSet.of(Performance, Recursive),
+ null, null);
/**
* Callback interface.
@@ -109,27 +126,36 @@ public FSDataOutputStream build() throws IOException {
final Configuration options = getOptions();
final Map headers = new HashMap<>();
final Set mandatoryKeys = getMandatoryKeys();
- final Set keysToValidate = new HashSet<>();
+ final EnumSet createFileSwitches = EnumSet.noneOf(
+ WriteObjectFlags.class);
// pick up all headers from the mandatory list and strip them before
// validating the keys
+
+ // merge the config lists
+
String headerPrefix = FS_S3A_CREATE_HEADER + ".";
final int prefixLen = headerPrefix.length();
- mandatoryKeys.stream().forEach(key -> {
- if (key.startsWith(headerPrefix) && key.length() > prefixLen) {
- headers.put(key.substring(prefixLen), options.get(key));
- } else {
- keysToValidate.add(key);
- }
- });
+
+ final Set keysToValidate = mandatoryKeys.stream()
+ .filter(key -> !key.startsWith(headerPrefix))
+ .collect(Collectors.toSet());
rejectUnknownMandatoryKeys(keysToValidate, CREATE_FILE_KEYS, "for " + path);
- // and add any optional headers
- getOptionalKeys().stream()
- .filter(key -> key.startsWith(headerPrefix) && key.length() > prefixLen)
- .forEach(key -> headers.put(key.substring(prefixLen), options.get(key)));
+ // look for headers
+ for (Map.Entry option : options) {
+ String key = option.getKey();
+ if (key.startsWith(headerPrefix) && key.length() > prefixLen) {
+ headers.put(key.substring(prefixLen), option.getValue());
+ }
+ }
+
+ // and add the mimetype
+ if (options.get(FS_OPTION_CREATE_CONTENT_TYPE, null) != null) {
+ headers.put(CONTENT_TYPE, options.get(FS_OPTION_CREATE_CONTENT_TYPE, null));
+ }
EnumSet flags = getFlags();
if (flags.contains(CreateFlag.APPEND)) {
@@ -142,13 +168,32 @@ public FSDataOutputStream build() throws IOException {
"Must specify either create or overwrite");
}
- final boolean performance =
- options.getBoolean(Constants.FS_S3A_CREATE_PERFORMANCE, false);
+ // build the other switches
+ if (isRecursive()) {
+ createFileSwitches.add(Recursive);
+ }
+ if (Performance.isEnabled(options)) {
+ createFileSwitches.add(Performance);
+ }
+ if (CreateMultipart.isEnabled(options)) {
+ createFileSwitches.add(CreateMultipart);
+ }
+ if (ConditionalOverwrite.isEnabled(options)) {
+ createFileSwitches.add(ConditionalOverwrite);
+ }
+ // etag is a string so is checked for then extracted.
+ final String etag = options.get(FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG, null);
+ if (etag != null) {
+ createFileSwitches.add(ConditionalOverwriteEtag);
+ }
+
return callbacks.createFileFromBuilder(
path,
getProgress(),
- new CreateFileOptions(flags, isRecursive(), performance, headers));
-
+ new CreateFileOptions(flags,
+ createFileSwitches,
+ etag,
+ headers));
}
/**
@@ -209,14 +254,14 @@ public static final class CreateFileOptions {
private final EnumSet flags;
/**
- * create parent dirs?
+ * Create File switches.
*/
- private final boolean recursive;
+ private final EnumSet writeObjectFlags;
/**
- * performance flag.
+ * Etag. Only used if the create file switches enable it.
*/
- private final boolean performance;
+ private final String etag;
/**
* Headers; may be null.
@@ -225,18 +270,22 @@ public static final class CreateFileOptions {
/**
* @param flags creation flags
- * @param recursive create parent dirs?
- * @param performance performance flag
+ * @param writeObjectFlags Create File switches.
+ * @param etag ETag, used only if enabled by switches
* @param headers nullable header map.
*/
public CreateFileOptions(
final EnumSet flags,
- final boolean recursive,
- final boolean performance,
+ final EnumSet writeObjectFlags,
+ final String etag,
final Map headers) {
- this.flags = flags;
- this.recursive = recursive;
- this.performance = performance;
+ this.flags = requireNonNull(flags);
+ this.writeObjectFlags = requireNonNull(writeObjectFlags);
+ if (writeObjectFlags().contains(ConditionalOverwriteEtag)) {
+ checkArgument(!isEmpty(etag),
+ "etag overwrite is enabled but the etag string is null/empty");
+ }
+ this.etag = etag;
this.headers = headers;
}
@@ -244,8 +293,7 @@ public CreateFileOptions(
public String toString() {
return "CreateFileOptions{" +
"flags=" + flags +
- ", recursive=" + recursive +
- ", performance=" + performance +
+ ", writeObjectFlags=" + writeObjectFlags +
", headers=" + headers +
'}';
}
@@ -255,16 +303,36 @@ public EnumSet getFlags() {
}
public boolean isRecursive() {
- return recursive;
+ return isSet(Recursive);
}
public boolean isPerformance() {
- return performance;
+ return isSet(Performance);
+ }
+
+ public boolean isConditionalOverwrite() {
+ return isSet(ConditionalOverwrite);
+ }
+
+ public boolean isConditionalOverwriteEtag() {
+ return isSet(ConditionalOverwriteEtag);
+ }
+
+ public boolean isSet(WriteObjectFlags val) {
+ return writeObjectFlags().contains(val);
}
public Map getHeaders() {
return headers;
}
+
+ public String etag() {
+ return etag;
+ }
+
+ public EnumSet writeObjectFlags() {
+ return writeObjectFlags;
+ }
}
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java
index 5bb64ddc28920..4422231c1b1e3 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java
@@ -35,15 +35,19 @@
import static org.apache.hadoop.fs.CommonPathCapabilities.ETAGS_AVAILABLE;
import static org.apache.hadoop.fs.CommonPathCapabilities.FS_CHECKSUMS;
import static org.apache.hadoop.fs.CommonPathCapabilities.FS_MULTIPART_UPLOADER;
+import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG;
+import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONTENT_TYPE;
+import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONDITIONAL_OVERWRITE;
+import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_IN_CLOSE;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS;
import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_ACCESS_GRANTS_ENABLED;
import static org.apache.hadoop.fs.s3a.Constants.DIRECTORY_OPERATIONS_PURGE_UPLOADS;
import static org.apache.hadoop.fs.s3a.Constants.ENABLE_MULTI_DELETE;
import static org.apache.hadoop.fs.s3a.Constants.FIPS_ENDPOINT;
+import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_MULTIPART;
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE;
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE_ENABLED;
import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_AWS_V2;
-import static org.apache.hadoop.fs.s3a.impl.S3ExpressStorage.STORE_CAPABILITY_S3_EXPRESS_STORAGE;
import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_DELETE;
import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_KEEP;
import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_AUTHORITATIVE;
@@ -51,6 +55,7 @@
import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_KEEP;
import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_MULTIPART_UPLOAD_ENABLED;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.STORE_CAPABILITY_MAGIC_COMMITTER;
+import static org.apache.hadoop.fs.s3a.impl.S3ExpressStorage.STORE_CAPABILITY_S3_EXPRESS_STORAGE;
/**
* Internal constants private only to the S3A codebase.
@@ -260,7 +265,14 @@ private InternalConstants() {
*/
public static final Set CREATE_FILE_KEYS =
Collections.unmodifiableSet(
- new HashSet<>(Arrays.asList(FS_S3A_CREATE_PERFORMANCE)));
+ new HashSet<>(Arrays.asList(
+ FS_OPTION_CREATE_CONDITIONAL_OVERWRITE,
+ FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG,
+ FS_OPTION_CREATE_IN_CLOSE,
+ FS_OPTION_CREATE_CONTENT_TYPE,
+ FS_S3A_CREATE_PERFORMANCE,
+ FS_S3A_CREATE_MULTIPART
+ )));
/**
* Dynamic Path capabilities to be evaluated
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/PutObjectOptions.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/PutObjectOptions.java
index 1ca502c44cde6..c48a0e03c6bcf 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/PutObjectOptions.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/PutObjectOptions.java
@@ -18,9 +18,17 @@
package org.apache.hadoop.fs.s3a.impl;
+import java.util.EnumSet;
import java.util.Map;
import javax.annotation.Nullable;
+import org.apache.hadoop.fs.s3a.impl.write.WriteObjectFlags;
+
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+import static org.apache.hadoop.fs.s3a.impl.write.WriteObjectFlags.ConditionalOverwrite;
+import static org.apache.hadoop.fs.s3a.impl.write.WriteObjectFlags.ConditionalOverwriteEtag;
+import static org.apache.hadoop.util.Preconditions.checkArgument;
+
/**
* Extensible structure for options when putting/writing objects.
*/
@@ -36,16 +44,71 @@ public final class PutObjectOptions {
*/
private final Map headers;
+ /**
+ * Flags to control the write process.
+ */
+ private final EnumSet writeObjectFlags;
+
+ /**
+ * If set, allows overwriting an object only if the object's ETag matches this value.
+ */
+ private final String etagOverwrite;
+
/**
* Constructor.
* @param storageClass Storage class, if not null.
* @param headers Headers; may be null.
+ * @param writeObjectFlags flags for writing
+ * @param etagOverwrite etag for etag writes.
+ * MUST not be empty if etag overwrite flag is set.
*/
public PutObjectOptions(
@Nullable final String storageClass,
- @Nullable final Map headers) {
+ @Nullable final Map headers,
+ final EnumSet writeObjectFlags,
+ @Nullable final String etagOverwrite) {
this.storageClass = storageClass;
this.headers = headers;
+ this.writeObjectFlags = writeObjectFlags;
+ this.etagOverwrite = etagOverwrite;
+ if (isEtagOverwrite()) {
+ checkArgument(!isEmpty(etagOverwrite),
+ "etag overwrite is enabled but the etag string is null/empty");
+ }
+ }
+
+ /**
+ * Get the noObjectOverwrite flag.
+ * @return true if object override not allowed.
+ */
+ public boolean isNoObjectOverwrite() {
+ return hasFlag(ConditionalOverwrite);
+ }
+
+ /**
+ * Get the isEtagOverwrite flag.
+ * @return true if the write MUST overwrite an object with the
+ * supplied etag.
+ */
+ public boolean isEtagOverwrite() {
+ return hasFlag(ConditionalOverwriteEtag);
+ }
+
+ /**
+ * Does the flag set contain the specific flag.
+ * @param flag flag to look for
+ * @return true if the flag is set.
+ */
+ public boolean hasFlag(WriteObjectFlags flag) {
+ return writeObjectFlags.contains(flag);
+ }
+
+ /**
+ * Get the ETag that must match for an overwrite operation to proceed.
+ * @return The ETag required for overwrite, or {@code null} if no ETag match is required.
+ */
+ public String getEtagOverwrite() {
+ return etagOverwrite;
}
/**
@@ -56,10 +119,17 @@ public Map getHeaders() {
return headers;
}
+ public EnumSet getWriteObjectFlags() {
+ return writeObjectFlags;
+ }
+
@Override
public String toString() {
return "PutObjectOptions{" +
- ", storageClass='" + storageClass + '\'' +
+ "storageClass='" + storageClass + '\'' +
+ ", headers=" + headers +
+ ", writeObjectFlags=" + writeObjectFlags +
+ ", etagOverwrite='" + etagOverwrite + '\'' +
'}';
}
@@ -67,9 +137,12 @@ public String toString() {
* Empty options.
*/
private static final PutObjectOptions EMPTY_OPTIONS = new PutObjectOptions(
- null, null);
+ null,
+ null,
+ EnumSet.noneOf(WriteObjectFlags.class),
+ null);
- /**
+ /**
* Get the default options.
* @return an instance with no storage class or headers.
*/
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java
index f03b83764b8bd..5754e4e3feb83 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java
@@ -60,12 +60,17 @@
import org.apache.hadoop.fs.s3a.api.RequestFactory;
import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecretOperations;
import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
+import org.apache.hadoop.fs.s3a.impl.write.WriteObjectFlags;
+import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PART_UPLOAD_TIMEOUT;
+import static org.apache.hadoop.fs.s3a.Constants.IF_NONE_MATCH_STAR;
import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.SSE_C;
import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.UNKNOWN_ALGORITHM;
import static org.apache.hadoop.fs.s3a.impl.AWSClientConfig.setRequestTimeout;
+import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.IF_MATCH;
+import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.IF_NONE_MATCH;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT;
import static org.apache.hadoop.util.Preconditions.checkArgument;
import static org.apache.hadoop.util.Preconditions.checkNotNull;
@@ -372,6 +377,19 @@ public PutObjectRequest.Builder newPutObjectRequestBuilder(String key,
setRequestTimeout(putObjectRequestBuilder, partUploadTimeout);
}
+ if (options != null) {
+ if (options.isNoObjectOverwrite()) {
+ LOG.debug("setting If-None-Match");
+ putObjectRequestBuilder.overrideConfiguration(
+ override -> override.putHeader(IF_NONE_MATCH, IF_NONE_MATCH_STAR));
+ }
+ if (options.hasFlag(WriteObjectFlags.ConditionalOverwriteEtag)) {
+ LOG.debug("setting If-Match");
+ putObjectRequestBuilder.overrideConfiguration(
+ override -> override.putHeader(IF_MATCH, options.getEtagOverwrite()));
+ }
+ }
+
return prepareRequest(putObjectRequestBuilder);
}
@@ -553,12 +571,29 @@ public CreateMultipartUploadRequest.Builder newMultipartUploadRequestBuilder(
public CompleteMultipartUploadRequest.Builder newCompleteMultipartUploadRequestBuilder(
String destKey,
String uploadId,
- List partETags) {
+ List partETags,
+ PutObjectOptions putOptions) {
+
// a copy of the list is required, so that the AWS SDK doesn't
// attempt to sort an unmodifiable list.
- CompleteMultipartUploadRequest.Builder requestBuilder =
- CompleteMultipartUploadRequest.builder().bucket(bucket).key(destKey).uploadId(uploadId)
- .multipartUpload(CompletedMultipartUpload.builder().parts(partETags).build());
+ CompleteMultipartUploadRequest.Builder requestBuilder;
+ requestBuilder = CompleteMultipartUploadRequest.builder()
+ .bucket(bucket)
+ .key(destKey)
+ .uploadId(uploadId)
+ .multipartUpload(CompletedMultipartUpload.builder().parts(partETags).build());
+
+ if (putOptions.isNoObjectOverwrite()) {
+ LOG.debug("setting If-None-Match");
+ requestBuilder.overrideConfiguration(
+ override -> override.putHeader(IF_NONE_MATCH, IF_NONE_MATCH_STAR));
+ }
+ if (!isEmpty(putOptions.getEtagOverwrite())) {
+ LOG.debug("setting if If-Match");
+ requestBuilder.overrideConfiguration(
+ override -> override.putHeader(IF_MATCH, putOptions.getEtagOverwrite()));
+ }
+
// Correct SSE-C request parameters are required for this request when
// specifying checksums for each part
if (checksumAlgorithm != null && getServerSideEncryptionAlgorithm() == SSE_C) {
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/write/WriteObjectFlags.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/write/WriteObjectFlags.java
new file mode 100644
index 0000000000000..5918316c810af
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/write/WriteObjectFlags.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl.write;
+
+import org.apache.hadoop.conf.Configuration;
+
+import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONDITIONAL_OVERWRITE;
+import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG;
+import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_MULTIPART;
+import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE;
+
+/**
+ * Flags to use when creating/writing objects.
+ * The configuration key is used in two places:
+ *
+ *
Parsing builder options
+ *
hasCapability() probes of the output stream.
+ *
+ */
+public enum WriteObjectFlags {
+ ConditionalOverwrite(FS_OPTION_CREATE_CONDITIONAL_OVERWRITE),
+ ConditionalOverwriteEtag(FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG),
+ CreateMultipart(FS_S3A_CREATE_MULTIPART),
+ Performance(FS_S3A_CREATE_PERFORMANCE),
+ Recursive("");
+
+ /** Configuration key, or "" if not configurable. */
+ private final String key;
+
+ /**
+ * Constructor.
+ * @param key key configuration key, or "" if not configurable.
+ */
+ WriteObjectFlags(final String key) {
+ this.key = key;
+ }
+
+ /**
+ * does the configuration contain this option as a boolean?
+ * @param options options to scan
+ * @return true if this is defined as a boolean
+ */
+ public boolean isEnabled(Configuration options) {
+ return options.getBoolean(key, false);
+ }
+
+ /**
+ * Does the key of this option match the parameter?
+ * @param k key
+ * @return true if there is a match.
+ */
+ public boolean hasKey(String k) {
+ return !key.isEmpty() && key.equals(k);
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/write/package-info.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/write/package-info.java
new file mode 100644
index 0000000000000..6bde1caf79592
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/write/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Classes related to writing objects.
+ */
+package org.apache.hadoop.fs.s3a.impl.write;
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/BlockOutputStreamStatistics.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/BlockOutputStreamStatistics.java
index 6bf2354a83ede..db1b7def47482 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/BlockOutputStreamStatistics.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/BlockOutputStreamStatistics.java
@@ -146,4 +146,14 @@ public interface BlockOutputStreamStatistics extends Closeable,
* Syncable.hsync() has been invoked.
*/
void hsyncInvoked();
+
+ /**
+ * Record the outcome of a conditional create operation.
+ *
+ * This method increments the appropriate counter based on whether
+ * the conditional create operation was successful or failed.
+ * @param success {@code true} if the conditional create operation succeeded,
+ * {@code false} if it failed.
+ */
+ void conditionalCreateOutcome(boolean success);
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java
index a5d20095ba5cc..26b9f2b1568ca 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java
@@ -549,6 +549,10 @@ public void hflushInvoked() {
public void hsyncInvoked() {
}
+ @Override
+ public void conditionalCreateOutcome(boolean success) {
+ }
+
@Override
public void close() throws IOException {
}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java
index 508e1a38356ec..396a9f60a054d 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java
@@ -332,7 +332,9 @@ public void testSizeOfEncryptedObjectFromHeaderWithV1Compatibility() throws Exce
.build();
PutObjectRequest.Builder putObjectRequestBuilder =
factory.newPutObjectRequestBuilder(key,
- null, SMALL_FILE_SIZE, false);
+ PutObjectOptions.defaultOptions(),
+ SMALL_FILE_SIZE,
+ false);
putObjectRequestBuilder.contentLength(Long.parseLong(String.valueOf(SMALL_FILE_SIZE)));
putObjectRequestBuilder.metadata(metadata);
fs.putObjectDirect(putObjectRequestBuilder.build(),
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java
index ecda6fd2acee6..3b30a8e05dca5 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java
@@ -107,7 +107,9 @@ public void testPutObjectDirect() throws Throwable {
.build();
Path path = path("putDirect");
PutObjectRequest.Builder putObjectRequestBuilder =
- factory.newPutObjectRequestBuilder(path.toUri().getPath(), null, -1, false);
+ factory.newPutObjectRequestBuilder(path.toUri().getPath(),
+ PutObjectOptions.defaultOptions(),
+ -1, false);
putObjectRequestBuilder.contentLength(-1L);
LambdaTestUtils.intercept(IllegalStateException.class,
() -> fs.putObjectDirect(
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
index 53e4a68cbb60b..4d97ab2179b63 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
@@ -69,6 +69,7 @@
import org.apache.hadoop.util.functional.FutureIO;
import org.assertj.core.api.Assertions;
+import org.assertj.core.api.Assumptions;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.AssumptionViolatedException;
@@ -1166,6 +1167,14 @@ public static void assumeStoreAwsHosted(final FileSystem fs) {
.getTrimmed(ENDPOINT, DEFAULT_ENDPOINT)));
}
+ /**
+ * Skip if conditional creation is not enabled.
+ */
+ public static void assumeConditionalCreateEnabled(Configuration conf) {
+ skipIfNotEnabled(conf, FS_S3A_CONDITIONAL_CREATE_ENABLED,
+ "conditional create is disabled");
+ }
+
/**
* Modify the config by setting the performance flags and return the modified config.
*
@@ -1466,7 +1475,9 @@ public static void assume(String message, boolean condition) {
if (!condition) {
LOG.warn(message);
}
- Assume.assumeTrue(message, condition);
+ Assumptions.assumeThat(condition).
+ describedAs(message)
+ .isTrue();
}
/**
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3AConditionalCreateBehavior.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3AConditionalCreateBehavior.java
new file mode 100644
index 0000000000000..1b740f7b9f845
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3AConditionalCreateBehavior.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+
+import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONDITIONAL_OVERWRITE;
+import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CONDITIONAL_CREATE_ENABLED;
+import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE;
+import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_PERFORMANCE_FLAGS;
+import static org.apache.hadoop.fs.s3a.Constants.MIN_MULTIPART_THRESHOLD;
+import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.assertj.core.api.Assumptions.assumeThat;
+
+@RunWith(Parameterized.class)
+public class ITestS3AConditionalCreateBehavior extends AbstractS3ATestBase {
+
+ private static final byte[] SMALL_FILE_BYTES = dataset(TEST_FILE_LEN, 0, 255);
+
+ private final boolean conditionalCreateEnabled;
+
+ public ITestS3AConditionalCreateBehavior(boolean conditionalCreateEnabled) {
+ this.conditionalCreateEnabled = conditionalCreateEnabled;
+ }
+
+ @Parameterized.Parameters
+ public static Collection