diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java index ef6ea562a8543..4de25f30ea0d6 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java @@ -12,6 +12,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.action.ActionRunnable; +import org.opensearch.action.LatchedActionListener; import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; import org.opensearch.common.blobstore.BlobContainer; @@ -32,11 +33,15 @@ import java.io.InputStream; import java.nio.channels.FileChannel; import java.nio.file.StandardOpenOption; -import java.util.HashMap; +import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import static org.opensearch.common.blobstore.BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC; @@ -49,12 +54,35 @@ public class BlobStoreTransferService implements TransferService { private final BlobStore blobStore; private final ThreadPool threadPool; + private ConcurrentHashMap fileTransferTrackerCache; private static final Logger logger = LogManager.getLogger(BlobStoreTransferService.class); public BlobStoreTransferService(BlobStore blobStore, ThreadPool threadPool) { this.blobStore = blobStore; this.threadPool = threadPool; + fileTransferTrackerCache = new ConcurrentHashMap<>(); + } + + @Override + public ConcurrentHashMap getFileTransferTrackerCache() { + return fileTransferTrackerCache; + } + + void add(String file, boolean success) { + FileTransferTracker.TransferState targetState = success + ? FileTransferTracker.TransferState.SUCCESS + : FileTransferTracker.TransferState.FAILED; + add(file, targetState); + } + + private void add(String file, FileTransferTracker.TransferState targetState) { + fileTransferTrackerCache.compute(file, (k, v) -> { + if (v == null || v.validateNextState(targetState)) { + return targetState; + } + throw new IllegalStateException("Unexpected transfer state " + v + "while setting target to" + targetState); + }); } @Override @@ -99,33 +127,105 @@ public void uploadBlobs( ActionListener listener, WritePriority writePriority ) { + boolean isObjectMetadataUploadSupported = isObjectMetadataUploadSupported(); + fileSnapshots.forEach(fileSnapshot -> { BlobPath blobPath = blobPaths.get(fileSnapshot.getPrimaryTerm()); - if (!(blobStore.blobContainer(blobPath) instanceof AsyncMultiStreamBlobContainer)) { - uploadBlob(ThreadPool.Names.TRANSLOG_TRANSFER, fileSnapshot, blobPath, listener, writePriority); - } else { - logger.info("uploading file = {}", fileSnapshot.getName()); - uploadBlob(fileSnapshot, listener, blobPath, writePriority); - } - }); - } + if (!isObjectMetadataUploadSupported) { + List exceptionList = new ArrayList<>(2); - private Map prepareFileMetadata(TransferFileSnapshot fileSnapshot) throws IOException { - if (!(fileSnapshot instanceof FileSnapshot.TranslogFileSnapshot)) { - return null; - } + Set filesToUpload = new HashSet<>(2); + try { + if (!fileTransferTrackerCache.get(fileSnapshot.getName()).equals(FileTransferTracker.TransferState.SUCCESS)) { + filesToUpload.add( + new TransferFileSnapshot(fileSnapshot.getPath(), fileSnapshot.getPrimaryTerm(), fileSnapshot.getChecksum()) + ); + } + if (!fileTransferTrackerCache.get(fileSnapshot.getMetadataFileName()) + .equals(FileTransferTracker.TransferState.FAILED)) { + filesToUpload.add( + new TransferFileSnapshot( + fileSnapshot.getMetadataFilePath(), + fileSnapshot.getPrimaryTerm(), + fileSnapshot.getMetadataFileChecksum() + ) + ); + } + } catch (IOException e) { + throw new FileTransferException(fileSnapshot, e); + } + + if (filesToUpload.isEmpty()) { + listener.onResponse(fileSnapshot); + return; + } + + final CountDownLatch latch = new CountDownLatch(filesToUpload.size()); + LatchedActionListener latchedActionListener = new LatchedActionListener<>( + ActionListener.wrap(fileSnapshot1 -> add(fileSnapshot1.getName(), true), ex -> { + assert ex instanceof FileTransferException; + logger.error( + () -> new ParameterizedMessage( + "Exception during transfer for file {}", + ((FileTransferException) ex).getFileSnapshot().getName() + ), + ex + ); + FileTransferException e = (FileTransferException) ex; + TransferFileSnapshot file = e.getFileSnapshot(); + add(file.getName(), false); + exceptionList.add(ex); + }), + latch + ); + + filesToUpload.forEach(separateFileSnapshot -> { + if (!(blobStore.blobContainer(blobPath) instanceof AsyncMultiStreamBlobContainer)) { + uploadBlob( + ThreadPool.Names.TRANSLOG_TRANSFER, + separateFileSnapshot, + blobPath, + latchedActionListener, + writePriority + ); + } else { + logger.info("uploading file = {}", fileSnapshot.getName()); + uploadBlob(separateFileSnapshot, latchedActionListener, blobPath, writePriority); + } + }); + + try { + if (latch.await(300, TimeUnit.SECONDS) == false) { + Exception ex = new TranslogUploadFailedException( + "Timed out waiting for transfer of snapshot " + fileSnapshot + " to complete" + ); + throw new FileTransferException(fileSnapshot, ex); + } + } catch (InterruptedException ex) { + Exception exception = new TranslogUploadFailedException("Failed to upload " + fileSnapshot, ex); + Thread.currentThread().interrupt(); + throw new FileTransferException(fileSnapshot, exception); + } - FileSnapshot.TranslogFileSnapshot tlogFileSnapshot = (FileSnapshot.TranslogFileSnapshot) fileSnapshot; - String ckpAsString = tlogFileSnapshot.provideCheckpointDataAsString(); - Long checkpointChecksum = tlogFileSnapshot.getCheckpointChecksum(); + if (fileTransferTrackerCache.get(fileSnapshot.getName()).equals(FileTransferTracker.TransferState.SUCCESS) + && fileTransferTrackerCache.get(fileSnapshot.getMetadataFileName()).equals(FileTransferTracker.TransferState.SUCCESS)) { + listener.onResponse(fileSnapshot); + } else { + assert exceptionList.isEmpty() == false; + listener.onFailure(exceptionList.get(0)); + } - assert checkpointChecksum != null : "checksum can not be null"; + } else { + if (!(blobStore.blobContainer(blobPath) instanceof AsyncMultiStreamBlobContainer)) { + uploadBlob(ThreadPool.Names.TRANSLOG_TRANSFER, fileSnapshot, blobPath, listener, writePriority); + } else { + logger.info("uploading file = {}", fileSnapshot.getName()); + uploadBlob(fileSnapshot, listener, blobPath, writePriority); + } + } + }); - Map metadata = new HashMap<>(); - metadata.put(FileSnapshot.TranslogFileSnapshot.CHECKPOINT_FILE_DATA_KEY, ckpAsString); - metadata.put(FileSnapshot.TranslogFileSnapshot.CHECKPOINT_FILE_CHECKSUM_KEY, checkpointChecksum.toString()); - return metadata; } private void uploadBlob( @@ -138,7 +238,7 @@ private void uploadBlob( try { Map metadata = null; if (isObjectMetadataUploadSupported()) { - metadata = prepareFileMetadata(fileSnapshot); + metadata = fileSnapshot.prepareFileMetadata(); } ChannelFactory channelFactory = FileChannel::open; diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java b/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java index 9daa0f9b154d4..0f0aa640aadeb 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java @@ -24,6 +24,8 @@ import java.nio.file.StandardOpenOption; import java.util.Arrays; import java.util.Base64; +import java.util.HashMap; +import java.util.Map; import java.util.Objects; /** @@ -110,11 +112,23 @@ public static class TransferFileSnapshot extends FileSnapshot { private final long primaryTerm; private Long checksum; - - public TransferFileSnapshot(Path path, long primaryTerm, Long checksum) throws IOException { + @Nullable + private Path metadataFilePath; + @Nullable + private Long metadataFileChecksum; + @Nullable + private String metadataFileName; + + public TransferFileSnapshot(Path path, long primaryTerm, Long checksum, Path metadataFilePath, Long metadataFileChecksum) + throws IOException { super(path); this.primaryTerm = primaryTerm; this.checksum = checksum; + Objects.requireNonNull(metadataFilePath); + Objects.requireNonNull(metadataFileChecksum); + this.metadataFilePath = metadataFilePath; + this.metadataFileChecksum = metadataFileChecksum; + this.metadataFileName = metadataFilePath.getFileName().toString(); } public TransferFileSnapshot(String name, byte[] content, long primaryTerm) throws IOException { @@ -122,6 +136,12 @@ public TransferFileSnapshot(String name, byte[] content, long primaryTerm) throw this.primaryTerm = primaryTerm; } + public TransferFileSnapshot(Path path, long primaryTerm, Long checksum) throws IOException { + super(path); + this.primaryTerm = primaryTerm; + this.checksum = checksum; + } + public Long getChecksum() { return checksum; } @@ -130,6 +150,56 @@ public long getPrimaryTerm() { return primaryTerm; } + public Path getMetadataFilePath() { + return metadataFilePath; + } + + public Long getMetadataFileChecksum() { + return metadataFileChecksum; + } + + public long getMinTranslogGeneration() { + return getMinTranslogGeneration(); + } + + public String getMetadataFileName() { + return metadataFileName; + } + + public Map prepareFileMetadata() throws IOException { + if (!(this instanceof TranslogCheckpointFileSnapshot)) { + return null; + } + + String ckpAsString = provideCheckpointDataAsString(); + Long checkpointChecksum = metadataFileChecksum; + + assert checkpointChecksum != null : "checksum can not be null"; + + Map metadata = new HashMap<>(); + metadata.put(TranslogFileSnapshot.CHECKPOINT_FILE_DATA_KEY, ckpAsString); + metadata.put(TranslogFileSnapshot.CHECKPOINT_FILE_CHECKSUM_KEY, checkpointChecksum.toString()); + return metadata; + } + + public String provideCheckpointDataAsString() throws IOException { + return buildCheckpointDataAsBase64String(metadataFilePath); + } + + static String buildCheckpointDataAsBase64String(Path checkpointFilePath) throws IOException { + long fileSize = Files.size(checkpointFilePath); + assert fileSize < 1500 : "checkpoint file size is more than 1.5KB size, can't be stored as metadata"; + byte[] fileBytes = Files.readAllBytes(checkpointFilePath); + return Base64.getEncoder().encodeToString(fileBytes); + } + + public static byte[] convertBase64StringToCheckpointFileDataBytes(String base64CheckpointString) { + if (base64CheckpointString == null) { + return null; + } + return Base64.getDecoder().decode(base64CheckpointString); + } + @Override public int hashCode() { return Objects.hash(primaryTerm, super.hashCode()); @@ -259,4 +329,34 @@ public boolean equals(Object o) { return false; } } + + /** + * Single snapshot of combined translog.tlog and translog.ckp files that gets transferred + * + * @opensearch.internal + */ + public static final class TranslogCheckpointFileSnapshot extends TransferFileSnapshot { + + private final long generation; + private final long minTranslogGeneration; + + public TranslogCheckpointFileSnapshot( + Path tlogFilepath, + long primaryTerm, + Long tlogFilechecksum, + long generation, + long minTranslogGeneration, + Path ckpFilePath, + Long ckpFileChecksum + ) throws IOException { + super(tlogFilepath, primaryTerm, tlogFilechecksum, ckpFilePath, ckpFileChecksum); + this.minTranslogGeneration = minTranslogGeneration; + this.generation = generation; + } + + public long getGeneration() { + return generation; + } + + } } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java b/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java index f3c17cdaa0054..e083430aaddcc 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java @@ -77,6 +77,7 @@ public void onSuccess(TransferFileSnapshot fileSnapshot) { } add(fileSnapshot.getName(), TransferState.SUCCESS); + add(fileSnapshot.getMetadataFileName(), TransferState.SUCCESS); } void add(String file, boolean success) { @@ -130,7 +131,7 @@ public Set allUploaded() { /** * Represents the state of the upload operation */ - private enum TransferState { + enum TransferState { SUCCESS, FAILED; diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java index 3ab31b3b497ae..3fd3bb3d89d7e 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; /** * Interface for the translog transfer service responsible for interacting with a remote store @@ -147,6 +148,8 @@ void listAllInSortedOrderAsync( ActionListener> listener ); + ConcurrentHashMap getFileTransferTrackerCache(); + boolean isObjectMetadataUploadSupported(); } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshot.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshot.java index ef34fd31a296b..6d427853d8f37 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshot.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshot.java @@ -10,6 +10,7 @@ import org.opensearch.index.translog.transfer.FileSnapshot.CheckpointFileSnapshot; import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; +import org.opensearch.index.translog.transfer.FileSnapshot.TranslogCheckpointFileSnapshot; import org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot; import java.util.Set; @@ -39,4 +40,10 @@ public interface TransferSnapshot { * @return the translog transfer metadata */ TranslogTransferMetadata getTranslogTransferMetadata(); + + /** + * The single snapshot of the translog and checkpoint generational files + * @return the set of {@link TranslogCheckpointFileSnapshot} + */ + Set getTranslogCheckpointFileSnapshots(); } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java index 5ed72b64f1c23..8acee8795344c 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java @@ -25,6 +25,7 @@ import static org.opensearch.index.translog.transfer.FileSnapshot.CheckpointFileSnapshot; import static org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; +import static org.opensearch.index.translog.transfer.FileSnapshot.TranslogCheckpointFileSnapshot; import static org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot; /** @@ -35,6 +36,7 @@ public class TranslogCheckpointTransferSnapshot implements TransferSnapshot, Closeable { private final Set> translogCheckpointFileInfoTupleSet; + private final Set translogCheckpointFileSnapshotSet; private final int size; private final long generation; private final long primaryTerm; @@ -44,17 +46,30 @@ public class TranslogCheckpointTransferSnapshot implements TransferSnapshot, Clo TranslogCheckpointTransferSnapshot(long primaryTerm, long generation, int size, String nodeId) { translogCheckpointFileInfoTupleSet = new HashSet<>(size); + translogCheckpointFileSnapshotSet = new HashSet<>(size); this.size = size; this.generation = generation; this.primaryTerm = primaryTerm; this.nodeId = nodeId; } - private void add(TranslogFileSnapshot translogFileSnapshot, CheckpointFileSnapshot checkPointFileSnapshot) { + private void add(TranslogFileSnapshot translogFileSnapshot, CheckpointFileSnapshot checkPointFileSnapshot) throws IOException { // set checkpoint file path and checkpoint file checksum for a translog file translogFileSnapshot.setCheckpointFilePath(checkPointFileSnapshot.getPath()); translogFileSnapshot.setCheckpointChecksum(checkPointFileSnapshot.getChecksum()); + translogCheckpointFileSnapshotSet.add( + new TranslogCheckpointFileSnapshot( + translogFileSnapshot.getPath(), + translogFileSnapshot.getPrimaryTerm(), + translogFileSnapshot.getChecksum(), + translogFileSnapshot.getGeneration(), + checkPointFileSnapshot.getMinTranslogGeneration(), + checkPointFileSnapshot.getPath(), + checkPointFileSnapshot.getChecksum() + ) + ); + translogCheckpointFileInfoTupleSet.add(Tuple.tuple(translogFileSnapshot, checkPointFileSnapshot)); assert translogFileSnapshot.getGeneration() == checkPointFileSnapshot.getGeneration(); } @@ -68,6 +83,11 @@ public Set getTranslogFileSnapshots() { return translogCheckpointFileInfoTupleSet.stream().map(Tuple::v1).collect(Collectors.toSet()); } + @Override + public Set getTranslogCheckpointFileSnapshots() { + return new HashSet<>(translogCheckpointFileSnapshotSet); + } + @Override public TranslogTransferMetadata getTranslogTransferMetadata() { return new TranslogTransferMetadata( diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 6c6753eab5c5e..5c98b186671aa 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -43,6 +43,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -106,8 +107,8 @@ public ShardId getShardId() { public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTransferListener translogTransferListener) throws IOException { - List exceptionList = new ArrayList<>(transferSnapshot.getTranslogTransferMetadata().getCount()); - Set toUpload = new HashSet<>(transferSnapshot.getTranslogTransferMetadata().getCount()); + List exceptionList = new ArrayList<>(transferSnapshot.getTranslogTransferMetadata().getCount() / 2); + Set toUploadCombined = new HashSet<>(transferSnapshot.getTranslogTransferMetadata().getCount() / 2); long metadataBytesToUpload; long metadataUploadStartTime; long uploadStartTime; @@ -115,24 +116,28 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans long prevUploadTimeInMillis = remoteTranslogTransferTracker.getTotalUploadTimeInMillis(); try { - boolean isObjectMetadataUploadSupported = transferService.isObjectMetadataUploadSupported(); - toUpload.addAll(fileTransferTracker.exclusionFilter(transferSnapshot.getTranslogFileSnapshots())); - // if the transferService support uploading object metadata, we don't need to transfer checkpoint file snapshots separately. - if (!isObjectMetadataUploadSupported) { - toUpload.addAll(fileTransferTracker.exclusionFilter((transferSnapshot.getCheckpointFileSnapshots()))); - } + toUploadCombined.addAll(fileTransferTracker.exclusionFilter(transferSnapshot.getTranslogCheckpointFileSnapshots())); - if (toUpload.isEmpty()) { + if (toUploadCombined.isEmpty()) { logger.trace("Nothing to upload for transfer"); return true; } - fileTransferTracker.recordBytesForFiles(toUpload); + fileTransferTracker.recordBytesForFiles(toUploadCombined); + captureStatsBeforeUpload(); - final CountDownLatch latch = new CountDownLatch(toUpload.size()); + + ConcurrentHashMap transferServiceFileTrackerCache = transferService + .getFileTransferTrackerCache(); + + final CountDownLatch latch = new CountDownLatch(toUploadCombined.size()); LatchedActionListener latchedActionListener = new LatchedActionListener<>( - ActionListener.wrap(fileTransferTracker::onSuccess, ex -> { + ActionListener.wrap(fileSnapshot -> { + fileTransferTracker.onSuccess(fileSnapshot); + transferServiceFileTrackerCache.remove(fileSnapshot.getName()); + transferServiceFileTrackerCache.remove(fileSnapshot.getMetadataFileName()); + }, ex -> { assert ex instanceof FileTransferException; logger.error( () -> new ParameterizedMessage( @@ -149,7 +154,7 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans latch ); Map blobPathMap = new HashMap<>(); - toUpload.forEach( + toUploadCombined.forEach( fileSnapshot -> blobPathMap.put( fileSnapshot.getPrimaryTerm(), remoteDataTransferPath.add(String.valueOf(fileSnapshot.getPrimaryTerm())) @@ -160,7 +165,7 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans // TODO: Ideally each file's upload start time should be when it is actually picked for upload // https://github.com/opensearch-project/OpenSearch/issues/9729 fileTransferTracker.recordFileTransferStartTime(uploadStartTime); - transferService.uploadBlobs(toUpload, blobPathMap, latchedActionListener, WritePriority.HIGH); + transferService.uploadBlobs(toUploadCombined, blobPathMap, latchedActionListener, WritePriority.HIGH); try { if (latch.await(remoteStoreSettings.getClusterRemoteTranslogTransferTimeout().millis(), TimeUnit.MILLISECONDS) == false) { diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/FileSnapshotTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/FileSnapshotTests.java index 338be8b61a501..3dabc056f212d 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/FileSnapshotTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/FileSnapshotTests.java @@ -81,9 +81,6 @@ public void testBuildCheckpointDataAsBase64String() throws IOException { Files.writeString(file, "hello_world_with_checkpoint_file_data-4"); Files.writeString(file, "213123123"); - fileSnapshot = new FileSnapshot.TransferFileSnapshot(file, 12, null); - - assertFileSnapshotProperties(file); String encodedString = FileSnapshot.TranslogFileSnapshot.buildCheckpointDataAsBase64String(file); // Assert @@ -96,10 +93,6 @@ public void testBuildCheckpointDataAsBase64StringWhenPathIsNull() throws IOExcep Path file = createTempFile(Translog.TRANSLOG_FILE_PREFIX + 10, Translog.CHECKPOINT_SUFFIX); Files.writeString(file, "hello_world_with_checkpoint_file_data"); - fileSnapshot = new FileSnapshot.TransferFileSnapshot(file, 12, null); - - assertFileSnapshotProperties(file); - assertThrows(NullPointerException.class, () -> FileSnapshot.TranslogFileSnapshot.buildCheckpointDataAsBase64String(null)); } @@ -107,9 +100,6 @@ public void testConvertCheckpointBase64StringToBytes() throws IOException { Path file = createTempFile(Translog.TRANSLOG_FILE_PREFIX + 10, Translog.CHECKPOINT_SUFFIX); Files.writeString(file, "test-hello_world_with_checkpoint_file_data"); - fileSnapshot = new FileSnapshot.TransferFileSnapshot(file, 12, null); - - assertFileSnapshotProperties(file); String encodedString = FileSnapshot.TranslogFileSnapshot.buildCheckpointDataAsBase64String(file); byte[] decodedBytes = FileSnapshot.TranslogFileSnapshot.convertBase64StringToCheckpointFileDataBytes(encodedString); @@ -121,10 +111,6 @@ public void testBuildCheckpointDataAsBase64String_whenFileSizeGreaterThan2KB_sho Path file = createTempFile(Translog.TRANSLOG_FILE_PREFIX + 10, Translog.CHECKPOINT_SUFFIX); byte[] data = new byte[2048]; // 2KB - fileSnapshot = new FileSnapshot.TransferFileSnapshot(file, 12, null); - - assertFileSnapshotProperties(file); - ByteBuffer buffer = ByteBuffer.wrap(data); Files.write(file, buffer.array(), StandardOpenOption.WRITE); diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java index ea4963dffc9f9..dcbdbc6acef5f 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java @@ -43,6 +43,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Objects; @@ -361,6 +362,11 @@ public TranslogTransferMetadata getTranslogTransferMetadata() { return new TranslogTransferMetadata(primaryTerm, generation, minTranslogGeneration, randomInt(5)); } + @Override + public Set getTranslogCheckpointFileSnapshots() { + return new HashSet<>(); + } + @Override public String toString() { return "test-to-string";