Skip to content

Commit

Permalink
remove metdata changes for asyn download flow
Browse files Browse the repository at this point in the history
Signed-off-by: Sandeep Kumawat <[email protected]>
  • Loading branch information
skumawat2025 committed Apr 11, 2024
1 parent b528cfa commit 886f1ff
Show file tree
Hide file tree
Showing 9 changed files with 16 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.opensearch.common.annotation.ExperimentalApi;

import java.io.InputStream;
import java.util.Map;

/**
* Model composed of an input stream and the total content length of the stream
Expand All @@ -24,20 +23,17 @@ public class InputStreamContainer {
private final InputStream inputStream;
private final long contentLength;
private final long offset;
private final Map<String, String> metadata;

/**
* Construct a new stream object
*
* @param inputStream The input stream that is to be encapsulated
* @param contentLength The total content length that is to be read from the stream
* @param metadata The metadata of the blob. This will be same for each part download.
*/
public InputStreamContainer(InputStream inputStream, long contentLength, long offset, Map<String, String> metadata) {
public InputStreamContainer(InputStream inputStream, long contentLength, long offset) {
this.inputStream = inputStream;
this.contentLength = contentLength;
this.offset = offset;
this.metadata = metadata;
}

/**
Expand All @@ -60,11 +56,4 @@ public long getContentLength() {
public long getOffset() {
return offset;
}

/**
* @return metadata of the source content.
*/
public Map<String, String> getMetadata() {
return metadata;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ private void testWriteBlobByStreams(boolean expectException, boolean throwExcept
StreamContextSupplier streamContextSupplier = partSize -> new StreamContext((partNo, size, position) -> {
InputStream inputStream = new OffsetRangeIndexInputStream(new ByteArrayIndexInput("desc", bytes), size, position);
openInputStreams.add(inputStream);
return new InputStreamContainer(inputStream, size, position, null);
return new InputStreamContainer(inputStream, size, position);
}, partSize, calculateLastPartSize(bytes.length, partSize), calculateNumberOfParts(bytes.length, partSize));

CheckedConsumer<Boolean, IOException> uploadFinalizer = uploadSuccess -> {
Expand Down Expand Up @@ -529,7 +529,7 @@ private void testWriteBlobByStreamsLargeBlob(boolean expectException, boolean th
StreamContextSupplier streamContextSupplier = partSize1 -> new StreamContext((partNo, size, position) -> {
InputStream inputStream = new OffsetRangeIndexInputStream(new ZeroIndexInput("desc", blobSize), size, position);
openInputStreams.add(inputStream);
return new InputStreamContainer(inputStream, size, position, null);
return new InputStreamContainer(inputStream, size, position);
}, partSize1, calculateLastPartSize(blobSize, partSize1), calculateNumberOfParts(blobSize, partSize1));

CheckedConsumer<Boolean, IOException> uploadFinalizer = uploadSuccess -> {
Expand Down Expand Up @@ -651,7 +651,7 @@ private void testLargeFilesRedirectedToSlowSyncClient(boolean expectException) t
StreamContextSupplier streamContextSupplier = partSize1 -> new StreamContext((partNo, size, position) -> {
InputStream inputStream = new OffsetRangeIndexInputStream(new ZeroIndexInput("desc", blobSize), size, position);
openInputStreams.add(inputStream);
return new InputStreamContainer(inputStream, size, position, null);
return new InputStreamContainer(inputStream, size, position);
}, partSize1, calculateLastPartSize(blobSize, partSize1), calculateNumberOfParts(blobSize, partSize1));

WriteContext writeContext = new WriteContext.Builder().fileName("write_large_blob")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ public void testWriteBlobByStreamsWithRetries() throws Exception {
StreamContextSupplier streamContextSupplier = partSize -> new StreamContext((partNo, size, position) -> {
InputStream inputStream = new OffsetRangeIndexInputStream(new ByteArrayIndexInput("desc", bytes), size, position);
openInputStreams.add(inputStream);
return new InputStreamContainer(inputStream, size, position, null);
return new InputStreamContainer(inputStream, size, position);
}, partSize, calculateLastPartSize(bytes.length, partSize), calculateNumberOfParts(bytes.length, partSize));

WriteContext writeContext = new WriteContext.Builder().fileName("write_blob_by_streams_max_retries")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,7 @@ public void readBlobAsync(String blobName, ActionListener<ReadContext> listener)
List<ReadContext.StreamPartCreator> blobPartStreams = new ArrayList<>();
for (int partNumber = 0; partNumber < numberOfParts; partNumber++) {
long offset = partNumber * partSize;
InputStreamContainer blobPartStream = new InputStreamContainer(
readBlob(blobName, offset, partSize),
partSize,
offset,
null
);
InputStreamContainer blobPartStream = new InputStreamContainer(readBlob(blobName, offset, partSize), partSize, offset);
blobPartStreams.add(() -> CompletableFuture.completedFuture(blobPartStream));
}
ReadContext blobReadContext = new ReadContext.Builder(contentLength, blobPartStreams).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public long readBlobPreferredLength() {
private void executeWrite(InputStream inputStream, long blobSize, CheckedBiConsumer<InputStream, Long, IOException> writeConsumer)
throws IOException {
T cryptoContext = cryptoHandler.initEncryptionMetadata();
InputStreamContainer streamContainer = new InputStreamContainer(inputStream, blobSize, 0, null);
InputStreamContainer streamContainer = new InputStreamContainer(inputStream, blobSize, 0);
InputStreamContainer encryptedStream = cryptoHandler.createEncryptingStream(cryptoContext, streamContainer);
long cryptoLength = cryptoHandler.estimateEncryptedLengthOfEntireContent(cryptoContext, blobSize);
writeConsumer.accept(encryptedStream.getInputStream(), cryptoLength);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ private LocalStreamSupplier<InputStreamContainer> getMultipartStreamSupplier(
inputStream = offsetRangeInputStream;
}

return new InputStreamContainer(inputStream, size, position, null);
return new InputStreamContainer(inputStream, size, position);
} catch (IOException e) {
log.error("Failed to create input stream", e);
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public void testReadBlobAsync() throws Exception {
final byte[] data = new byte[size];
Randomness.get().nextBytes(data);

final InputStreamContainer inputStreamContainer = new InputStreamContainer(new ByteArrayInputStream(data), data.length, 0, null);
final InputStreamContainer inputStreamContainer = new InputStreamContainer(new ByteArrayInputStream(data), data.length, 0);
final ListenerTestUtils.CountingCompletionListener<ReadContext> completionListener =
new ListenerTestUtils.CountingCompletionListener<>();
final CompletableFuture<InputStreamContainer> streamContainerFuture = CompletableFuture.completedFuture(inputStreamContainer);
Expand Down Expand Up @@ -99,7 +99,7 @@ public void testReadBlobAsyncException() throws Exception {
// Objects needed for API call
final byte[] data = new byte[size];
Randomness.get().nextBytes(data);
final InputStreamContainer inputStreamContainer = new InputStreamContainer(new ByteArrayInputStream(data), data.length, 0, null);
final InputStreamContainer inputStreamContainer = new InputStreamContainer(new ByteArrayInputStream(data), data.length, 0);
final ListenerTestUtils.CountingCompletionListener<ReadContext> completionListener =
new ListenerTestUtils.CountingCompletionListener<>();
final CompletableFuture<InputStreamContainer> streamContainerFuture = CompletableFuture.completedFuture(inputStreamContainer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public void testFilePartWriter() throws Exception {
Path segmentFilePath = path.resolve(UUID.randomUUID().toString());
int contentLength = 100;
InputStream inputStream = new ByteArrayInputStream(randomByteArrayOfLength(contentLength));
InputStreamContainer inputStreamContainer = new InputStreamContainer(inputStream, inputStream.available(), 0, null);
InputStreamContainer inputStreamContainer = new InputStreamContainer(inputStream, inputStream.available(), 0);

FilePartWriter.write(segmentFilePath, inputStreamContainer, UnaryOperator.identity());

Expand All @@ -45,7 +45,7 @@ public void testFilePartWriterWithOffset() throws Exception {
int contentLength = 100;
int offset = 10;
InputStream inputStream = new ByteArrayInputStream(randomByteArrayOfLength(contentLength));
InputStreamContainer inputStreamContainer = new InputStreamContainer(inputStream, inputStream.available(), offset, null);
InputStreamContainer inputStreamContainer = new InputStreamContainer(inputStream, inputStream.available(), offset);

FilePartWriter.write(segmentFilePath, inputStreamContainer, UnaryOperator.identity());

Expand All @@ -57,7 +57,7 @@ public void testFilePartWriterLargeInput() throws Exception {
Path segmentFilePath = path.resolve(UUID.randomUUID().toString());
int contentLength = 20 * 1024 * 1024;
InputStream inputStream = new ByteArrayInputStream(randomByteArrayOfLength(contentLength));
InputStreamContainer inputStreamContainer = new InputStreamContainer(inputStream, contentLength, 0, null);
InputStreamContainer inputStreamContainer = new InputStreamContainer(inputStream, contentLength, 0);

FilePartWriter.write(segmentFilePath, inputStreamContainer, UnaryOperator.identity());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public int available() {
blobPartStreams.add(
NUMBER_OF_PARTS,
() -> CompletableFuture.supplyAsync(
() -> new InputStreamContainer(badInputStream, PART_SIZE, PART_SIZE * NUMBER_OF_PARTS, null),
() -> new InputStreamContainer(badInputStream, PART_SIZE, PART_SIZE * NUMBER_OF_PARTS),
threadPool.generic()
)
);
Expand Down Expand Up @@ -174,7 +174,7 @@ public int read(byte[] b) throws IOException {
blobPartStreams.add(
NUMBER_OF_PARTS,
() -> CompletableFuture.supplyAsync(
() -> new InputStreamContainer(assertingStream, PART_SIZE, PART_SIZE * NUMBER_OF_PARTS, null),
() -> new InputStreamContainer(assertingStream, PART_SIZE, PART_SIZE * NUMBER_OF_PARTS),
threadPool.generic()
)
);
Expand Down Expand Up @@ -219,7 +219,7 @@ private List<ReadContext.StreamPartCreator> initializeBlobPartStreams() {
int finalPartNumber = partNumber;
blobPartStreams.add(
() -> CompletableFuture.supplyAsync(
() -> new InputStreamContainer(testStream, PART_SIZE, (long) finalPartNumber * PART_SIZE, null),
() -> new InputStreamContainer(testStream, PART_SIZE, (long) finalPartNumber * PART_SIZE),
threadPool.generic()
)
);
Expand Down

0 comments on commit 886f1ff

Please sign in to comment.