Skip to content

HADOOP-19394. Integrate with AAL's readVectored(). #7720

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,16 @@

import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.IntFunction;

import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream;
import software.amazon.s3.analyticsaccelerator.common.ObjectRange;
import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata;
import software.amazon.s3.analyticsaccelerator.util.InputPolicy;
import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation;
Expand All @@ -37,6 +44,11 @@
import org.apache.hadoop.fs.s3a.Retries;
import org.apache.hadoop.fs.s3a.S3AInputPolicy;
import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
import org.apache.hadoop.fs.FileRange;
import org.apache.hadoop.fs.VectoredReadUtils;

import static org.apache.hadoop.fs.VectoredReadUtils.LOG_BYTE_BUFFER_RELEASED;


/**
* Analytics stream creates a stream using aws-analytics-accelerator-s3. This stream supports
Expand Down Expand Up @@ -128,6 +140,42 @@ public int read(byte[] buf, int off, int len) throws IOException {
return bytesRead;
}

/**
* Pass to {@link #readVectored(List, IntFunction, Consumer)}
* with the {@link VectoredReadUtils#LOG_BYTE_BUFFER_RELEASED} releaser.
* {@inheritDoc}
*/
@Override
public void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate) throws IOException {
readVectored(ranges, allocate, LOG_BYTE_BUFFER_RELEASED);
}

/**
* Pass to {@link #readVectored(List, IntFunction, Consumer)}
* with the {@link VectoredReadUtils#LOG_BYTE_BUFFER_RELEASED} releaser.
* {@inheritDoc}
*/
@Override
public void readVectored(final List<? extends FileRange> ranges,
final IntFunction<ByteBuffer> allocate,
final Consumer<ByteBuffer> release) throws IOException {
LOG.debug("AAL: Starting vectored read on path {} for ranges {} ", getPathStr(), ranges);
throwIfClosed();

List<ObjectRange> objectRanges = new ArrayList<>();

for (FileRange range : ranges) {
CompletableFuture<ByteBuffer> result = new CompletableFuture<>();
ObjectRange objectRange = new ObjectRange(result, range.getOffset(), range.getLength());
objectRanges.add(objectRange);
range.setData(result);
}

// AAL does not do any range coalescing, so input and combined ranges are the same.
this.getS3AStreamStatistics().readVectoredOperationStarted(ranges.size(), ranges.size());
inputStream.readVectored(objectRanges, allocate, release);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this call release on errors? curious -and hopeful

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah no :( release isn't called right now, but will make that change and get it released in the next version

}

@Override
public boolean seekToNewSource(long l) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,28 @@
package org.apache.hadoop.fs.contract.s3a;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileRange;
import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.StreamStatisticNames;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.test.tags.IntegrationTest;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedClass;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.List;
import java.util.function.Consumer;

import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipForAnyEncryptionExceptSSES3;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;

/**
* S3A contract tests for vectored reads with the Analytics stream.
Expand Down Expand Up @@ -64,12 +77,48 @@ protected Configuration createConfiguration() {
// This issue is tracked in:
// https://github.com/awslabs/analytics-accelerator-s3/issues/218
skipForAnyEncryptionExceptSSES3(conf);
conf.set("fs.contract.vector-io-early-eof-check", "false");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this is removed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's about whether the stream is doing early or late EOF checks on the read, so presumably the semantics have changed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, before we implemented this we we were using the base implementation which does not do any early checks. Now it's the same the current implementation, which will validate the ranges before doing any reads, and throw an EoF for any ranges > file size.

return conf;
}

@Override
protected AbstractFSContract createContract(Configuration conf) {
return new S3AContract(conf);
}

/**
* When the offset is negative, AAL returns IllegalArgumentException, whereas the base implementation will return
* an EoF.
*/
@Override
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a javadoc explaining why the override

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

public void testNegativeOffsetRange() throws Exception {
verifyExceptionalVectoredRead(ContractTestUtils.range(-1, 50), IllegalArgumentException.class);
}

/**
* Currently there is no null check on the release operation, this will be fixed in the next AAL version.
*/
@Override
public void testNullReleaseOperation() {
skip("AAL current does not do a null check on the release operation");
}

@Test
public void testReadVectoredWithAALStatsCollection() throws Exception {

List<FileRange> fileRanges = createSampleNonOverlappingRanges();
try (FSDataInputStream in = openVectorFile()) {
in.readVectored(fileRanges, getAllocate());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we not verifying the data after vectored read.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we should be. lot easy to do faster vector IO if you always return the same array of emtpy bytes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added in validation


validateVectoredReadResult(fileRanges, DATASET, 0);
IOStatistics st = in.getIOStatistics();

// Statistics such as GET requests will be added after IoStats support.
verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED, 1);

verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
1);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ protected AbstractFSContract createContract(Configuration conf) {
public void setup() throws Exception {
super.setup();
skipIfAnalyticsAcceleratorEnabled(getContract().getConf(),
"Analytics Accelerator does not support vectored reads");
"AAL with readVectored() is tested in ITestS3AContractAnalyticsStreamVectoredRead");
}

/**
Expand Down