From 3332318484324ec80e5d3d24bef0a3f8ec26270e Mon Sep 17 00:00:00 2001 From: Sandeep Kumawat Date: Tue, 9 Apr 2024 01:58:22 +0530 Subject: [PATCH] metadata upload (all flows) and download(not async multipart flow) tested Signed-off-by: Sandeep Kumawat --- .../repositories/s3/S3BlobContainer.java | 3 ++ .../s3/async/AsyncTransferManager.java | 2 + .../transfer/BlobStoreTransferService.java | 36 ++++++++++--- .../index/translog/transfer/FileSnapshot.java | 33 ++++++++++++ .../TranslogCheckpointTransferSnapshot.java | 2 +- .../transfer/TranslogTransferManager.java | 52 ++++++++++++++++--- 6 files changed, 113 insertions(+), 15 deletions(-) diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java index 5e3bdf5269cf4..220c840952467 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java @@ -203,8 +203,10 @@ public void writeBlobWithMetadata( assert inputStream.markSupported() : "No mark support on inputStream breaks the S3 SDK's ability to retry requests"; SocketAccess.doPrivilegedIOException(() -> { if (blobSize <= getLargeBlobThresholdInBytes()) { + logger.info("using executeSingleUpload()..to upload file = {}", blobName); executeSingleUpload(blobStore, buildKey(blobName), inputStream, blobSize, metadata); } else { + logger.info("using executeMultipartUpload()..to upload file = {}", blobName); executeMultipartUpload(blobStore, buildKey(blobName), inputStream, blobSize, metadata); } return null; @@ -231,6 +233,7 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener comp ); InputStreamContainer inputStream = streamContext.provideStream(0); try { + logger.info("Using multipart upload method.."); executeMultipartUpload( blobStore, uploadRequest.getKey(), diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java index 9d74ecc64f480..ffa2f394314b4 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java @@ -106,10 +106,12 @@ public CompletableFuture uploadObject( CompletableFuture returnFuture = new CompletableFuture<>(); try { if (streamContext.getNumberOfParts() == 1) { + log.info("using uploadInOneChunk()..to upload file = {}", uploadRequest.getKey()); log.debug(() -> "Starting the upload as a single upload part request"); uploadInOneChunk(s3AsyncClient, uploadRequest, streamContext.provideStream(0), returnFuture, statsMetricPublisher); } else { log.debug(() -> "Starting the upload as multipart upload request"); + log.info("using uploadInParts()..to upload file = {}", uploadRequest.getKey()); uploadInParts(s3AsyncClient, uploadRequest, streamContext, returnFuture, statsMetricPublisher); } } catch (Throwable throwable) { diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java index a1d5041ff9aff..604bd477cb782 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java @@ -32,10 +32,7 @@ import java.io.InputStream; import java.nio.channels.FileChannel; import java.nio.file.StandardOpenOption; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; +import java.util.*; import static org.opensearch.common.blobstore.BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC; @@ -81,8 +78,9 @@ public void uploadBlob( public void uploadBlob(final TransferFileSnapshot fileSnapshot, Iterable remoteTransferPath, WritePriority writePriority) throws IOException { BlobPath blobPath = (BlobPath) remoteTransferPath; + Map metadata = prepareFileMetadata(fileSnapshot); try (InputStream inputStream = fileSnapshot.inputStream()) { - blobStore.blobContainer(blobPath).writeBlobAtomic(fileSnapshot.getName(), inputStream, fileSnapshot.getContentLength(), true); + blobStore.blobContainer(blobPath).writeBlobAtomicWithMetadata(fileSnapshot.getName(), inputStream, metadata, fileSnapshot.getContentLength(), true); } } @@ -98,18 +96,38 @@ public void uploadBlobs( if (!(blobStore.blobContainer(blobPath) instanceof AsyncMultiStreamBlobContainer)) { uploadBlob(ThreadPool.Names.TRANSLOG_TRANSFER, fileSnapshot, blobPath, listener, writePriority); } else { - uploadBlob(fileSnapshot, listener, blobPath, writePriority); + if(!(fileSnapshot instanceof FileSnapshot.CheckpointFileSnapshot)) { + logger.info("uploading file = {}", fileSnapshot.getName()); + try { + uploadBlob(fileSnapshot, listener, blobPath, writePriority); + } catch (IOException e) { + throw new RuntimeException(e); + } + } } }); } + private Map prepareFileMetadata(TransferFileSnapshot fileSnapshot) throws IOException { + Map metadata = new HashMap<>(); + if (fileSnapshot instanceof FileSnapshot.TranslogFileSnapshot){ + FileSnapshot.TranslogFileSnapshot tlogFileSnapshot = (FileSnapshot.TranslogFileSnapshot) fileSnapshot; + String ckpAsString = tlogFileSnapshot.provideCheckpointDataAsString(); + metadata.put(FileSnapshot.TranslogFileSnapshot.CHECKPOINT_OBJECT_METADATA_KEY, ckpAsString); + return metadata; + } + return null; + } + private void uploadBlob( TransferFileSnapshot fileSnapshot, ActionListener listener, BlobPath blobPath, WritePriority writePriority - ) { + ) throws IOException { + + Map metadata = prepareFileMetadata(fileSnapshot); try { ChannelFactory channelFactory = FileChannel::open; @@ -130,7 +148,8 @@ private void uploadBlob( writePriority, (size, position) -> new OffsetRangeFileInputStream(fileSnapshot.getPath(), size, position), Objects.requireNonNull(fileSnapshot.getChecksum()), - remoteIntegrityEnabled + remoteIntegrityEnabled, + metadata ); ActionListener completionListener = ActionListener.wrap(resp -> listener.onResponse(fileSnapshot), ex -> { logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", fileSnapshot.getName()), ex); @@ -149,6 +168,7 @@ private void uploadBlob( ((AsyncMultiStreamBlobContainer) blobStore.blobContainer(blobPath)).asyncBlobUpload(writeContext, completionListener); } catch (Exception e) { + logger.info("Exception while uploading file = {} with metadata", fileSnapshot.getName()); logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", fileSnapshot.getName()), e); listener.onFailure(new FileTransferException(fileSnapshot, e)); } finally { diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java b/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java index dcec94edd694f..77449c12642df 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java @@ -17,11 +17,13 @@ import java.io.Closeable; import java.io.IOException; import java.io.InputStream; +import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.FileChannel; import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.util.Arrays; +import java.util.Base64; import java.util.Objects; /** @@ -153,12 +155,43 @@ public boolean equals(Object o) { public static final class TranslogFileSnapshot extends TransferFileSnapshot { private final long generation; + private Path checkpointFilePath; + private String checkpointDataAsString; + public final static String CHECKPOINT_OBJECT_METADATA_KEY = "ckpfile"; public TranslogFileSnapshot(long primaryTerm, long generation, Path path, Long checksum) throws IOException { super(path, primaryTerm, checksum); this.generation = generation; } + public TranslogFileSnapshot(long primaryTerm, long generation, Path path, Long checksum, Path checkpointPath) throws IOException { + super(path, primaryTerm, checksum); + this.generation = generation; + this.checkpointFilePath = checkpointPath; + } + + public String provideCheckpointDataAsString() throws IOException { + this.checkpointDataAsString = buildCheckpointDataAsBase64String(checkpointFilePath); + return checkpointDataAsString; + } + + static String buildCheckpointDataAsBase64String(Path checkpointFilePath) throws IOException { + try (FileChannel fileChannel = FileChannel.open(checkpointFilePath, StandardOpenOption.READ)) { + assert fileChannel.size() < 1500 : "checkpoint file size of more then 1.5KB size, can't be stored as metadata"; + ByteBuffer buffer = ByteBuffer.allocate((int) fileChannel.size()); + fileChannel.read(buffer); + buffer.flip(); + return Base64.getEncoder().encodeToString(buffer.array()); + } + } + + public static byte[] convertBase64StringToCheckpointFileDataBytes(String base64CheckpointString) { + if (base64CheckpointString == null) { + return null; + } + return Base64.getDecoder().decode(base64CheckpointString); + } + public long getGeneration() { return generation; } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java index fb78731246a07..4ae00bea1b7eb 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java @@ -158,7 +158,7 @@ public TranslogCheckpointTransferSnapshot build() throws IOException { Path checkpointPath = location.resolve(checkpointGenFileNameMapper.apply(readerGeneration)); generations.add(readerGeneration); translogTransferSnapshot.add( - new TranslogFileSnapshot(readerPrimaryTerm, readerGeneration, translogPath, reader.getTranslogChecksum()), + new TranslogFileSnapshot(readerPrimaryTerm, readerGeneration, translogPath, reader.getTranslogChecksum(), checkpointPath), new CheckpointFileSnapshot( readerPrimaryTerm, checkpointGeneration, diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 1087244623b87..d28bf3dbf393a 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -14,6 +14,7 @@ import org.apache.lucene.store.OutputStreamIndexOutput; import org.opensearch.action.LatchedActionListener; import org.opensearch.common.SetOnce; +import org.opensearch.common.blobstore.BlobDownloadResponse; import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.stream.write.WritePriority; @@ -47,6 +48,8 @@ import static org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; import static org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot; +import static org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot.CHECKPOINT_OBJECT_METADATA_KEY; +import static org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot.convertBase64StringToCheckpointFileDataBytes; /** * The class responsible for orchestrating the transfer of a {@link TransferSnapshot} via a {@link TransferService} @@ -111,7 +114,9 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans try { toUpload.addAll(fileTransferTracker.exclusionFilter(transferSnapshot.getTranslogFileSnapshots())); - toUpload.addAll(fileTransferTracker.exclusionFilter((transferSnapshot.getCheckpointFileSnapshots()))); + //skip checkpoint files... + //toUpload.addAll(fileTransferTracker.exclusionFilter((transferSnapshot.getCheckpointFileSnapshots()))); + if (toUpload.isEmpty()) { logger.trace("Nothing to upload for transfer"); return true; @@ -237,15 +242,15 @@ public boolean downloadTranslog(String primaryTerm, String generation, Path loca location ); // Download Checkpoint file from remote to local FS - String ckpFileName = Translog.getCommitCheckpointFileName(Long.parseLong(generation)); - downloadToFS(ckpFileName, location, primaryTerm); + //String ckpFileName = Translog.getCommitCheckpointFileName(Long.parseLong(generation)); + //downloadToFS(ckpFileName, location, primaryTerm); // Download translog file from remote to local FS String translogFilename = Translog.getFilename(Long.parseLong(generation)); - downloadToFS(translogFilename, location, primaryTerm); + downloadToFS(translogFilename, location, primaryTerm, generation); return true; } - private void downloadToFS(String fileName, Path location, String primaryTerm) throws IOException { + private void downloadToFS(String fileName, Path location, String primaryTerm, String generation) throws IOException { Path filePath = location.resolve(fileName); // Here, we always override the existing file if present. // We need to change this logic when we introduce incremental download @@ -255,8 +260,16 @@ private void downloadToFS(String fileName, Path location, String primaryTerm) th boolean downloadStatus = false; long bytesToRead = 0, downloadStartTime = System.nanoTime(); - try (InputStream inputStream = transferService.downloadBlob(remoteDataTransferPath.add(primaryTerm), fileName)) { + BlobDownloadResponse downloaded = transferService.downloadBlobWithMetadata(remoteDataTransferPath.add(primaryTerm), fileName); + try { // Capture number of bytes for stats before reading + InputStream inputStream = downloaded.getInputStream(); + Map metadata = downloaded.getMetadata(); + + logger.info("downloaded translog for fileName = {}, metadata = {}", fileName, metadata); + + applyMetadataToCkpFile(metadata, location, generation, fileName); + bytesToRead = inputStream.available(); Files.copy(inputStream, filePath); downloadStatus = true; @@ -271,6 +284,33 @@ private void downloadToFS(String fileName, Path location, String primaryTerm) th fileTransferTracker.add(fileName, true); } + private void applyMetadataToCkpFile(Map metadata, Path location, String generation, String fileName) throws IOException { + String ckpFileName = Translog.getCommitCheckpointFileName(Long.parseLong(generation)); + Path filePath = location.resolve(ckpFileName); + + byte[] ckpFileBytes = convertBase64StringToCheckpointFileDataBytes(metadata.get(CHECKPOINT_OBJECT_METADATA_KEY)); + if (ckpFileBytes == null) { + logger.info("If ckpFileBytes is null file should be ckp which is without metadata, for file = {}", fileName); + return; + } + + // Here, we always override the existing file if present. + // We need to change this logic when we introduce incremental download + if (Files.exists(filePath)) { + Files.delete(filePath); + } + + try { + Files.write(filePath, ckpFileBytes); + // Mark in FileTransferTracker so that the same files are not uploaded at the time of translog sync + fileTransferTracker.add(ckpFileName, true); + logger.info("Applied metadata bytes to checkpoint file for fileName = {}", fileName); + } catch (IOException e) { + logger.debug("Error in processing metadata bytes"); + throw e; + } + } + public TranslogTransferMetadata readMetadata() throws IOException { SetOnce metadataSetOnce = new SetOnce<>(); SetOnce exceptionSetOnce = new SetOnce<>();