From 23d1c7a55a63250b962c1fad4e6fb962fdd156cc Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Tue, 15 Oct 2024 21:13:33 +0530 Subject: [PATCH] Fix deletion permits flow in RemoteFsTimestampAwareTranslog (#16282) --------- Signed-off-by: Sachin Kale --- .../RemoteFsTimestampAwareTranslog.java | 52 ++++- .../index/translog/RemoteFsTranslog.java | 9 +- .../transfer/TranslogTransferManager.java | 84 +++---- .../RemoteFsTimestampAwareTranslogTests.java | 213 +++++++++++++++++- .../index/translog/RemoteFsTranslogTests.java | 24 ++ 5 files changed, 317 insertions(+), 65 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java index 3ccacde22bbfc..1f54c09a04cc7 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java @@ -215,21 +215,42 @@ public void onResponse(List blobMetadata) { logger.debug(() -> "generationsToBeDeleted = " + generationsToBeDeleted); if (generationsToBeDeleted.isEmpty() == false) { // Delete stale generations - translogTransferManager.deleteGenerationAsync( - primaryTermSupplier.getAsLong(), - generationsToBeDeleted, - remoteGenerationDeletionPermits::release - ); + try { + translogTransferManager.deleteGenerationAsync( + primaryTermSupplier.getAsLong(), + generationsToBeDeleted, + remoteGenerationDeletionPermits::release + ); + } catch (Exception e) { + logger.error("Exception in delete generations flow", e); + // Release permit that is meant for metadata files and return + remoteGenerationDeletionPermits.release(); + assert remoteGenerationDeletionPermits.availablePermits() == REMOTE_DELETION_PERMITS : "Available permits " + + remoteGenerationDeletionPermits.availablePermits() + + " is not equal to " + + REMOTE_DELETION_PERMITS; + return; + } } else { remoteGenerationDeletionPermits.release(); } if (metadataFilesToBeDeleted.isEmpty() == false) { // Delete stale metadata files - translogTransferManager.deleteMetadataFilesAsync( - metadataFilesToBeDeleted, - remoteGenerationDeletionPermits::release - ); + try { + translogTransferManager.deleteMetadataFilesAsync( + metadataFilesToBeDeleted, + remoteGenerationDeletionPermits::release + ); + } catch (Exception e) { + logger.error("Exception in delete metadata files flow", e); + // Permits is already released by deleteMetadataFilesAsync + assert remoteGenerationDeletionPermits.availablePermits() == REMOTE_DELETION_PERMITS : "Available permits " + + remoteGenerationDeletionPermits.availablePermits() + + " is not equal to " + + REMOTE_DELETION_PERMITS; + return; + } // Update cache to keep only those metadata files that are not getting deleted oldFormatMetadataFileGenerationMap.keySet().retainAll(metadataFilesNotToBeDeleted); @@ -240,7 +261,12 @@ public void onResponse(List blobMetadata) { remoteGenerationDeletionPermits.release(); } } catch (Exception e) { + logger.error("Exception in trimUnreferencedReaders", e); remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS); + assert remoteGenerationDeletionPermits.availablePermits() == REMOTE_DELETION_PERMITS : "Available permits " + + remoteGenerationDeletionPermits.availablePermits() + + " is not equal to " + + REMOTE_DELETION_PERMITS; } } @@ -441,7 +467,8 @@ protected static void deleteStaleRemotePrimaryTerms( } Optional minPrimaryTermFromMetadataFiles = metadataFilesNotToBeDeleted.stream().map(file -> { try { - return getMinMaxPrimaryTermFromMetadataFile(file, translogTransferManager, oldFormatMetadataFilePrimaryTermMap).v1(); + return getMinMaxPrimaryTermFromMetadataFile(file, translogTransferManager, oldFormatMetadataFilePrimaryTermMap, logger) + .v1(); } catch (IOException e) { return Long.MIN_VALUE; } @@ -482,7 +509,8 @@ protected static Long getMinPrimaryTermInRemote( protected static Tuple getMinMaxPrimaryTermFromMetadataFile( String metadataFile, TranslogTransferManager translogTransferManager, - Map> oldFormatMetadataFilePrimaryTermMap + Map> oldFormatMetadataFilePrimaryTermMap, + Logger logger ) throws IOException { Tuple minMaxPrimaryTermFromFileName = TranslogTransferMetadata.getMinMaxPrimaryTermFromFilename(metadataFile); if (minMaxPrimaryTermFromFileName != null) { @@ -504,6 +532,8 @@ protected static Tuple getMinMaxPrimaryTermFromMetadataFile( if (primaryTerm.isPresent()) { minPrimaryTem = primaryTerm.get(); } + } else { + logger.warn("No primary term found from GenerationToPrimaryTermMap for file [{}]", metadataFile); } Tuple minMaxPrimaryTermTuple = new Tuple<>(minPrimaryTem, maxPrimaryTem); oldFormatMetadataFilePrimaryTermMap.put(metadataFile, minMaxPrimaryTermTuple); diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index f5a9ed8ed9362..e697e16d5e8a0 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -590,7 +590,14 @@ protected void trimUnreferencedReaders(boolean onlyTrimLocal) throws IOException generationsToDelete.add(generation); } if (generationsToDelete.isEmpty() == false) { - deleteRemoteGeneration(generationsToDelete); + try { + deleteRemoteGeneration(generationsToDelete); + } catch (Exception e) { + logger.error("Exception in delete generations flow", e); + // Release permit that is meant for metadata files and return + remoteGenerationDeletionPermits.release(); + return; + } translogTransferManager.deleteStaleTranslogMetadataFilesAsync(remoteGenerationDeletionPermits::release); deleteStaleRemotePrimaryTerms(); } else { diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 291218ea47499..924669d0e46a9 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -496,19 +496,24 @@ public byte[] getMetadataBytes(TranslogTransferMetadata metadata) throws IOExcep * @param onCompletion runnable to run on completion of deletion regardless of success/failure. */ public void deleteGenerationAsync(long primaryTerm, Set generations, Runnable onCompletion) { - List translogFiles = new ArrayList<>(); - generations.forEach(generation -> { - // Add .ckp and .tlog file to translog file list which is located in basePath/ - String ckpFileName = Translog.getCommitCheckpointFileName(generation); - String translogFileName = Translog.getFilename(generation); - if (isTranslogMetadataEnabled == false) { - translogFiles.addAll(List.of(ckpFileName, translogFileName)); - } else { - translogFiles.add(translogFileName); - } - }); - // Delete the translog and checkpoint files asynchronously - deleteTranslogFilesAsync(primaryTerm, translogFiles, onCompletion); + try { + List translogFiles = new ArrayList<>(); + generations.forEach(generation -> { + // Add .ckp and .tlog file to translog file list which is located in basePath/ + String ckpFileName = Translog.getCommitCheckpointFileName(generation); + String translogFileName = Translog.getFilename(generation); + if (isTranslogMetadataEnabled == false) { + translogFiles.addAll(List.of(ckpFileName, translogFileName)); + } else { + translogFiles.add(translogFileName); + } + }); + // Delete the translog and checkpoint files asynchronously + deleteTranslogFilesAsync(primaryTerm, translogFiles, onCompletion); + } catch (Exception e) { + onCompletion.run(); + throw e; + } } /** @@ -658,37 +663,32 @@ public void deleteTranslogFiles() throws IOException { * @param onCompletion runnable to run on completion of deletion regardless of success/failure. */ private void deleteTranslogFilesAsync(long primaryTerm, List files, Runnable onCompletion) { - try { - transferService.deleteBlobsAsync( - ThreadPool.Names.REMOTE_PURGE, - remoteDataTransferPath.add(String.valueOf(primaryTerm)), - files, - new ActionListener<>() { - @Override - public void onResponse(Void unused) { - fileTransferTracker.delete(files); - logger.trace("Deleted translogs for primaryTerm={} files={}", primaryTerm, files); - onCompletion.run(); - } + transferService.deleteBlobsAsync( + ThreadPool.Names.REMOTE_PURGE, + remoteDataTransferPath.add(String.valueOf(primaryTerm)), + files, + new ActionListener<>() { + @Override + public void onResponse(Void unused) { + fileTransferTracker.delete(files); + logger.trace("Deleted translogs for primaryTerm={} files={}", primaryTerm, files); + onCompletion.run(); + } - @Override - public void onFailure(Exception e) { - onCompletion.run(); - logger.error( - () -> new ParameterizedMessage( - "Exception occurred while deleting translog for primaryTerm={} files={}", - primaryTerm, - files - ), - e - ); - } + @Override + public void onFailure(Exception e) { + onCompletion.run(); + logger.error( + () -> new ParameterizedMessage( + "Exception occurred while deleting translog for primaryTerm={} files={}", + primaryTerm, + files + ), + e + ); } - ); - } catch (Exception e) { - onCompletion.run(); - throw e; - } + } + ); } /** diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java index 73db3314f4d1e..0995f2e75a17a 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java @@ -63,6 +63,7 @@ import org.mockito.Mockito; +import static org.opensearch.index.translog.RemoteFsTranslog.REMOTE_DELETION_PERMITS; import static org.opensearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; import static org.opensearch.index.translog.transfer.TranslogTransferMetadata.METADATA_SEPARATOR; import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED; @@ -480,10 +481,7 @@ public void onResponse(List blobMetadataList) { // we will not delete them if (dataFilesAfterTrim.equals(dataFilesBeforeTrim) == false) { // We check for number of pinned timestamp or +1 due to latest metadata. - assertTrue( - metadataFilesAfterTrim.size() == pinnedTimestamps.size() - || metadataFilesAfterTrim.size() == pinnedTimestamps.size() + 1 - ); + assertTrue(metadataFilesAfterTrim.size() >= pinnedTimestamps.size()); } for (String md : pinnedTimestampMatchingMetadataFiles) { @@ -1061,15 +1059,14 @@ public void testGetMinMaxTranslogGenerationFromMetadataFile() throws IOException public void testGetMinMaxPrimaryTermFromMetadataFile() throws IOException { TranslogTransferManager translogTransferManager = mock(TranslogTransferManager.class); - RemoteFsTimestampAwareTranslog translog = (RemoteFsTimestampAwareTranslog) this.translog; - // Fetch generations directly from the filename assertEquals( new Tuple<>(1L, 1008L), RemoteFsTimestampAwareTranslog.getMinMaxPrimaryTermFromMetadataFile( "metadata__9223372036854774799__9223372036854774799__9223370311919910393__31__9223372036854775106__1__1", translogTransferManager, - new HashMap<>() + new HashMap<>(), + logger ) ); assertEquals( @@ -1077,7 +1074,8 @@ public void testGetMinMaxPrimaryTermFromMetadataFile() throws IOException { RemoteFsTimestampAwareTranslog.getMinMaxPrimaryTermFromMetadataFile( "metadata__9223372036854775800__9223372036854775800__9223370311919910398__31__9223372036854775803__4__1", translogTransferManager, - new HashMap<>() + new HashMap<>(), + logger ) ); assertEquals( @@ -1085,7 +1083,8 @@ public void testGetMinMaxPrimaryTermFromMetadataFile() throws IOException { RemoteFsTimestampAwareTranslog.getMinMaxPrimaryTermFromMetadataFile( "metadata__9223372036854775797__9223372036854775800__9223370311919910398__31__9223372036854775803__10__1", translogTransferManager, - new HashMap<>() + new HashMap<>(), + logger ) ); @@ -1099,7 +1098,8 @@ public void testGetMinMaxPrimaryTermFromMetadataFile() throws IOException { RemoteFsTimestampAwareTranslog.getMinMaxPrimaryTermFromMetadataFile( "metadata__9223372036854775805__9223372036854774799__9223370311919910393__31__1", translogTransferManager, - new HashMap<>() + new HashMap<>(), + logger ) ); assertEquals( @@ -1107,7 +1107,8 @@ public void testGetMinMaxPrimaryTermFromMetadataFile() throws IOException { RemoteFsTimestampAwareTranslog.getMinMaxPrimaryTermFromMetadataFile( "metadata__9223372036438563903__9223372036854775800__9223370311919910398__31__1", translogTransferManager, - Map.of("metadata__9223372036438563903__9223372036854775800__9223370311919910398__31__1", new Tuple<>(4L, 7L)) + Map.of("metadata__9223372036438563903__9223372036854775800__9223370311919910398__31__1", new Tuple<>(4L, 7L)), + logger ) ); @@ -1115,6 +1116,36 @@ public void testGetMinMaxPrimaryTermFromMetadataFile() throws IOException { verify(translogTransferManager, times(0)).readMetadata( "metadata__9223372036438563903__9223372036854775800__9223370311919910398__31__1" ); + + // Older md files with empty GenerationToPrimaryTermMap + md1 = mock(TranslogTransferMetadata.class); + when(md1.getGenerationToPrimaryTermMapper()).thenReturn(Map.of()); + when(translogTransferManager.readMetadata("metadata__9223372036854775805__9223372036854774799__9223370311919910393__31__1")) + .thenReturn(md1); + assertEquals( + new Tuple<>(-1L, 2L), + RemoteFsTimestampAwareTranslog.getMinMaxPrimaryTermFromMetadataFile( + "metadata__9223372036854775805__9223372036854774799__9223370311919910393__31__1", + translogTransferManager, + new HashMap<>(), + logger + ) + ); + + // Older md files with empty GenerationToPrimaryTermMap + md1 = mock(TranslogTransferMetadata.class); + when(md1.getGenerationToPrimaryTermMapper()).thenReturn(null); + when(translogTransferManager.readMetadata("metadata__9223372036854775805__9223372036854774799__9223370311919910393__31__1")) + .thenReturn(md1); + assertEquals( + new Tuple<>(-1L, 2L), + RemoteFsTimestampAwareTranslog.getMinMaxPrimaryTermFromMetadataFile( + "metadata__9223372036854775805__9223372036854774799__9223370311919910393__31__1", + translogTransferManager, + new HashMap<>(), + logger + ) + ); } public void testDeleteStaleRemotePrimaryTerms() throws IOException { @@ -1332,4 +1363,164 @@ public void testGetMinPrimaryTermInRemoteNotFetched() throws IOException { ); verify(translogTransferManager).listPrimaryTermsInRemote(); } + + public void testTrimUnreferencedReadersStalePinnedTimestamps() throws Exception { + ArrayList ops = new ArrayList<>(); + + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("0", 0, primaryTerm.get(), new byte[] { 1 })); + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 1, primaryTerm.get(), new byte[] { 1 })); + + // First reader is created at the init of translog + assertEquals(3, translog.readers.size()); + assertEquals(2, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()); + assertBusy(() -> { + assertEquals(6, translog.allUploaded().size()); + assertEquals( + 6, + blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size() + ); + }); + + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("2", 2, primaryTerm.get(), new byte[] { 1 })); + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("3", 3, primaryTerm.get(), new byte[] { 1 })); + + assertBusy(() -> { + assertEquals( + 10, + blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size() + ); + }); + + assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); + + translog.setMinSeqNoToKeep(3); + translog.trimUnreferencedReaders(); + + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("4", 4, primaryTerm.get(), new byte[] { 1 })); + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("5", 5, primaryTerm.get(), new byte[] { 1 })); + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("6", 6, primaryTerm.get(), new byte[] { 1 })); + + assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); + translog.setMinSeqNoToKeep(6); + translog.trimUnreferencedReaders(); + assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); + + assertEquals(1, translog.readers.size()); + assertBusy(() -> { + assertEquals(2, translog.allUploaded().size()); + assertEquals(7, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()); + assertEquals( + 16, + blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size() + ); + }, 30, TimeUnit.SECONDS); + + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("7", 7, primaryTerm.get(), new byte[] { 1 })); + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("8", 8, primaryTerm.get(), new byte[] { 1 })); + + assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); + translog.trimUnreferencedReaders(); + assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); + + assertEquals(3, translog.readers.size()); + assertBusy(() -> { + assertEquals(6, translog.allUploaded().size()); + assertEquals(9, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()); + assertEquals( + 20, + blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size() + ); + }, 30, TimeUnit.SECONDS); + } + + public void testTrimUnreferencedReadersNoPermits() throws Exception { + // Acquire the permits so that remote translog deletion will not happen + translog.remoteGenerationDeletionPermits.acquire(REMOTE_DELETION_PERMITS); + + ArrayList ops = new ArrayList<>(); + + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("0", 0, primaryTerm.get(), new byte[] { 1 })); + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 1, primaryTerm.get(), new byte[] { 1 })); + + // First reader is created at the init of translog + assertEquals(3, translog.readers.size()); + assertEquals(2, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()); + assertBusy(() -> { + assertEquals(6, translog.allUploaded().size()); + assertEquals( + 6, + blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size() + ); + }); + + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("2", 2, primaryTerm.get(), new byte[] { 1 })); + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("3", 3, primaryTerm.get(), new byte[] { 1 })); + + assertBusy(() -> { + assertEquals( + 10, + blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size() + ); + }); + + RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); + // Fetch pinned timestamps so that it won't be stale + updatePinnedTimstampTask.run(); + translog.setMinSeqNoToKeep(3); + translog.trimUnreferencedReaders(); + + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("4", 4, primaryTerm.get(), new byte[] { 1 })); + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("5", 5, primaryTerm.get(), new byte[] { 1 })); + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("6", 6, primaryTerm.get(), new byte[] { 1 })); + + // Fetch pinned timestamps so that it won't be stale + updatePinnedTimstampTask.run(); + translog.setMinSeqNoToKeep(6); + translog.trimUnreferencedReaders(); + + assertEquals(1, translog.readers.size()); + assertBusy(() -> { + assertEquals(2, translog.allUploaded().size()); + assertEquals(7, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()); + assertEquals( + 16, + blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size() + ); + }, 30, TimeUnit.SECONDS); + + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("7", 7, primaryTerm.get(), new byte[] { 1 })); + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("8", 8, primaryTerm.get(), new byte[] { 1 })); + + // Fetch pinned timestamps so that it won't be stale + updatePinnedTimstampTask.run(); + translog.trimUnreferencedReaders(); + + assertEquals(3, translog.readers.size()); + assertBusy(() -> { + assertEquals(6, translog.allUploaded().size()); + assertEquals(9, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()); + assertEquals( + 20, + blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size() + ); + }, 30, TimeUnit.SECONDS); + } + + public void testTrimUnreferencedReadersFailAlwaysRepo() throws Exception { + ArrayList ops = new ArrayList<>(); + + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("0", 0, primaryTerm.get(), new byte[] { 1 })); + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 1, primaryTerm.get(), new byte[] { 1 })); + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("2", 2, primaryTerm.get(), new byte[] { 1 })); + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("3", 3, primaryTerm.get(), new byte[] { 1 })); + + translog.setMinSeqNoToKeep(2); + RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); + updatePinnedTimstampTask.run(); + + fail.failAlways(); + translog.trimUnreferencedReaders(); + + assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); + } } diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java index 03c77a9a83f57..190af714d5764 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java @@ -20,6 +20,7 @@ import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; import org.opensearch.common.blobstore.fs.FsBlobContainer; @@ -32,6 +33,7 @@ import org.opensearch.common.util.concurrent.AbstractRunnable; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.common.util.io.IOUtils; +import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.core.common.unit.ByteSizeValue; @@ -1965,6 +1967,28 @@ public void writeBlobAtomic(final String blobName, final InputStream inputStream } super.writeBlobAtomic(blobName, inputStream, blobSize, failIfAlreadyExists); } + + @Override + public void listBlobsByPrefixInSortedOrder( + String blobNamePrefix, + int limit, + BlobNameSortOrder blobNameSortOrder, + ActionListener> listener + ) { + if (fail.fail()) { + listener.onFailure(new RuntimeException("blob container throwing error")); + return; + } + if (slowDown.getSleepSeconds() > 0) { + try { + Thread.sleep(slowDown.getSleepSeconds() * 1000L); + } catch (InterruptedException e) { + listener.onFailure(new RuntimeException(e)); + return; + } + } + super.listBlobsByPrefixInSortedOrder(blobNamePrefix, limit, blobNameSortOrder, listener); + } } class TranslogThread extends Thread {