From 706710c00ca69afbbfdfccaf1ef33637e0f6b9cd Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Mon, 11 Nov 2024 22:18:53 +0530 Subject: [PATCH 1/3] Reproduce stale blob cleanup with index name having period Signed-off-by: Ashish Singh --- .../snapshots/DeleteSnapshotIT.java | 31 +++++++++++++++++++ .../test/OpenSearchIntegTestCase.java | 2 +- 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotIT.java index 26b30af4c2c50..25f7ff08535ad 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotIT.java @@ -45,6 +45,37 @@ public class DeleteSnapshotIT extends AbstractSnapshotIntegTestCase { private static final String REMOTE_REPO_NAME = "remote-store-repo-name"; + public void testShardBlobDeletionForHashedPrefixPathType() throws Exception { + String indexName1 = ".testindex1"; + String repoName = "test-restore-snapshot-repo"; + String snapshotName1 = "test-restore-snapshot1"; + Path absolutePath = randomRepoPath().toAbsolutePath(); + logger.info("Path [{}]", absolutePath); + + Client client = client(); + // Write a document + String docId = Integer.toString(randomInt()); + index(indexName1, "_doc", docId, "value", "expected"); + createRepository(repoName, "fs", absolutePath); + + logger.info("--> snapshot"); + CreateSnapshotResponse createSnapshotResponse = client.admin() + .cluster() + .prepareCreateSnapshot(repoName, snapshotName1) + .setWaitForCompletion(true) + .setIndices(indexName1) + .get(); + assertTrue(createSnapshotResponse.getSnapshotInfo().successfulShards() > 0); + assertEquals(createSnapshotResponse.getSnapshotInfo().totalShards(), createSnapshotResponse.getSnapshotInfo().successfulShards()); + assertEquals(SnapshotState.SUCCESS, createSnapshotResponse.getSnapshotInfo().state()); + + assertAcked(startDeleteSnapshot(repoName, snapshotName1).get()); + assertBusy(() -> assertEquals(0, RemoteStoreBaseIntegTestCase.getFileCount(absolutePath.resolve(BlobStoreRepository.INDICES_DIR)))); + assertBusy(() -> assertEquals(0, RemoteStoreBaseIntegTestCase.getFileCount(absolutePath.resolve(SnapshotShardPaths.DIR)))); + // At the end there are 2 files that exists - index-N and index.latest + assertBusy(() -> assertEquals(2, RemoteStoreBaseIntegTestCase.getFileCount(absolutePath))); + } + public void testDeleteSnapshot() throws Exception { disableRepoConsistencyCheck("Remote store repository is being used in the test"); final Path remoteStoreRepoPath = randomRepoPath(); diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java index e27ff311c06f6..bcfd24cbe618e 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java @@ -2658,7 +2658,7 @@ public static PutRepositoryRequestBuilder putRepositoryRequestBuilder( builder.setTimeout(timeout); } if (finalSettings == false) { - settings.put(BlobStoreRepository.SHARD_PATH_TYPE.getKey(), randomFrom(PathType.values())); + settings.put(BlobStoreRepository.SHARD_PATH_TYPE.getKey(), PathType.HASHED_PREFIX.name()); } builder.setSettings(settings); return builder; From c17268348702f23d6a60ab077246c84b86c3e262 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Mon, 11 Nov 2024 22:43:07 +0530 Subject: [PATCH 2/3] Fix stale index deletion in snapshots for hashed prefix path type Signed-off-by: Ashish Singh --- .../snapshots/DeleteSnapshotIT.java | 2 +- .../blobstore/BlobStoreRepository.java | 65 ++++++++++++------- .../snapshots/SnapshotShardPaths.java | 11 ++-- .../test/OpenSearchIntegTestCase.java | 2 +- 4 files changed, 51 insertions(+), 29 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotIT.java index 25f7ff08535ad..85d81761ea4a0 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotIT.java @@ -45,7 +45,7 @@ public class DeleteSnapshotIT extends AbstractSnapshotIntegTestCase { private static final String REMOTE_REPO_NAME = "remote-store-repo-name"; - public void testShardBlobDeletionForHashedPrefixPathType() throws Exception { + public void testStaleIndexDeletion() throws Exception { String indexName1 = ".testindex1"; String repoName = "test-restore-snapshot-repo"; String snapshotName1 = "test-restore-snapshot1"; diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index 243d0021fac2e..21802e26072d1 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -2307,11 +2307,23 @@ private List findMatchingShardPaths(String indexId, Map findHighestGenerationShardPaths(List matchingShardPaths) { - return matchingShardPaths.stream() - .map(s -> s.split("\\" + SnapshotShardPaths.DELIMITER)) - .sorted((a, b) -> Integer.parseInt(b[2]) - Integer.parseInt(a[2])) - .map(parts -> String.join(SnapshotShardPaths.DELIMITER, parts)) - .findFirst(); + if (matchingShardPaths.isEmpty()) { + return Optional.empty(); + } + + int maxGen = Integer.MIN_VALUE; + String maxGenShardPath = null; + + for (String shardPath : matchingShardPaths) { + String[] parts = shardPath.split("\\" + SnapshotShardPaths.DELIMITER); + int shardCount = Integer.parseInt(parts[parts.length - 3]); + if (shardCount > maxGen) { + maxGen = shardCount; + maxGenShardPath = shardPath; + } + } + assert maxGenShardPath != null : "Valid maxGenShardPath should be present"; + return Optional.of(maxGenShardPath); } /** @@ -2549,25 +2561,32 @@ public void finalizeSnapshot( * on account of new indexes by same index name being snapshotted that exists already in the repository's snapshots. */ private void cleanupRedundantSnapshotShardPaths(Set updatedShardPathsIndexIds) { - Set updatedIndexIds = updatedShardPathsIndexIds.stream() - .map(s -> getIndexId(s.split("\\" + SnapshotShardPaths.DELIMITER)[0])) - .collect(Collectors.toSet()); - Set indexIdShardPaths = getSnapshotShardPaths().keySet(); - List staleShardPaths = indexIdShardPaths.stream().filter(s -> updatedShardPathsIndexIds.contains(s) == false).filter(s -> { - String indexId = getIndexId(s.split("\\" + SnapshotShardPaths.DELIMITER)[0]); - return updatedIndexIds.contains(indexId); - }).collect(Collectors.toList()); try { - deleteFromContainer(snapshotShardPathBlobContainer(), staleShardPaths); - } catch (IOException e) { - logger.warn( - new ParameterizedMessage( - "Repository [{}] Exception during snapshot stale index deletion {}", - metadata.name(), - staleShardPaths - ), - e - ); + Set updatedIndexIds = updatedShardPathsIndexIds.stream() + .map(s -> getIndexId(s.split("\\" + SnapshotShardPaths.DELIMITER)[0])) + .collect(Collectors.toSet()); + Set indexIdShardPaths = getSnapshotShardPaths().keySet(); + List staleShardPaths = indexIdShardPaths.stream() + .filter(s -> updatedShardPathsIndexIds.contains(s) == false) + .filter(s -> { + String indexId = getIndexId(s.split("\\" + SnapshotShardPaths.DELIMITER)[0]); + return updatedIndexIds.contains(indexId); + }) + .collect(Collectors.toList()); + try { + deleteFromContainer(snapshotShardPathBlobContainer(), staleShardPaths); + } catch (IOException e) { + logger.warn( + new ParameterizedMessage( + "Repository [{}] Exception during snapshot stale index deletion {}", + metadata.name(), + staleShardPaths + ), + e + ); + } + } catch (Exception ex) { + logger.error("Exception during cleanup of redundant snapshot shard paths", ex); } } diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotShardPaths.java b/server/src/main/java/org/opensearch/snapshots/SnapshotShardPaths.java index 878c2baba4ce9..64a308c1c2d4e 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotShardPaths.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotShardPaths.java @@ -92,18 +92,21 @@ public static SnapshotShardPaths fromXContent(XContentParser ignored) { * Parses a shard path string and extracts relevant shard information. * * @param shardPath The shard path string to parse. Expected format is: - * [index_id]#[index_name]#[shard_count]#[path_type_code]#[path_hash_algorithm_code] + * [index_id].[index_name].[shard_count].[path_type_code].[path_hash_algorithm_code] * @return A {@link ShardInfo} object containing the parsed index ID and shard count. * @throws IllegalArgumentException if the shard path format is invalid or cannot be parsed. */ public static ShardInfo parseShardPath(String shardPath) { String[] parts = shardPath.split("\\" + SnapshotShardPaths.DELIMITER); - if (parts.length != 5) { + int len = parts.length; + if (len < 5) { throw new IllegalArgumentException("Invalid shard path format: " + shardPath); } try { - IndexId indexId = new IndexId(parts[1], getIndexId(parts[0]), Integer.parseInt(parts[3])); - int shardCount = Integer.parseInt(parts[2]); + // indexName is never used from within the ShardInfo returned + String indexName = len == 5 ? parts[1] : ""; + IndexId indexId = new IndexId(indexName, getIndexId(parts[0]), Integer.parseInt(parts[len - 2])); + int shardCount = Integer.parseInt(parts[len - 3]); return new ShardInfo(indexId, shardCount); } catch (NumberFormatException e) { throw new IllegalArgumentException("Invalid shard path format: " + shardPath, e); diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java index bcfd24cbe618e..e27ff311c06f6 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java @@ -2658,7 +2658,7 @@ public static PutRepositoryRequestBuilder putRepositoryRequestBuilder( builder.setTimeout(timeout); } if (finalSettings == false) { - settings.put(BlobStoreRepository.SHARD_PATH_TYPE.getKey(), PathType.HASHED_PREFIX.name()); + settings.put(BlobStoreRepository.SHARD_PATH_TYPE.getKey(), randomFrom(PathType.values())); } builder.setSettings(settings); return builder; From fb503986f37119a12437cff841a47d98ab504c71 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Wed, 13 Nov 2024 00:33:12 +0530 Subject: [PATCH 3/3] Address PR review comments Signed-off-by: Ashish Singh --- .../blobstore/BlobStoreRepository.java | 27 +++++++++---------- .../snapshots/SnapshotShardPaths.java | 10 ++++--- 2 files changed, 20 insertions(+), 17 deletions(-) diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index 21802e26072d1..3343bb735ab6e 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -2565,7 +2565,9 @@ private void cleanupRedundantSnapshotShardPaths(Set updatedShardPathsInd Set updatedIndexIds = updatedShardPathsIndexIds.stream() .map(s -> getIndexId(s.split("\\" + SnapshotShardPaths.DELIMITER)[0])) .collect(Collectors.toSet()); + logger.debug(new ParameterizedMessage("updatedIndexIds={}", updatedIndexIds)); Set indexIdShardPaths = getSnapshotShardPaths().keySet(); + logger.debug(new ParameterizedMessage("indexIdShardPaths={}", indexIdShardPaths)); List staleShardPaths = indexIdShardPaths.stream() .filter(s -> updatedShardPathsIndexIds.contains(s) == false) .filter(s -> { @@ -2573,20 +2575,17 @@ private void cleanupRedundantSnapshotShardPaths(Set updatedShardPathsInd return updatedIndexIds.contains(indexId); }) .collect(Collectors.toList()); - try { - deleteFromContainer(snapshotShardPathBlobContainer(), staleShardPaths); - } catch (IOException e) { - logger.warn( - new ParameterizedMessage( - "Repository [{}] Exception during snapshot stale index deletion {}", - metadata.name(), - staleShardPaths - ), - e - ); - } - } catch (Exception ex) { - logger.error("Exception during cleanup of redundant snapshot shard paths", ex); + logger.debug(new ParameterizedMessage("staleShardPaths={}", staleShardPaths)); + deleteFromContainer(snapshotShardPathBlobContainer(), staleShardPaths); + } catch (Exception e) { + logger.warn( + new ParameterizedMessage( + "Repository [{}] Exception during snapshot stale index deletion for updatedIndexIds {}", + metadata.name(), + updatedShardPathsIndexIds + ), + e + ); } } diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotShardPaths.java b/server/src/main/java/org/opensearch/snapshots/SnapshotShardPaths.java index 64a308c1c2d4e..dd0b67ca9bfaa 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotShardPaths.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotShardPaths.java @@ -92,7 +92,7 @@ public static SnapshotShardPaths fromXContent(XContentParser ignored) { * Parses a shard path string and extracts relevant shard information. * * @param shardPath The shard path string to parse. Expected format is: - * [index_id].[index_name].[shard_count].[path_type_code].[path_hash_algorithm_code] + * snapshot_path_[index_id].[index_name].[shard_count].[path_type_code].[path_hash_algorithm_code] * @return A {@link ShardInfo} object containing the parsed index ID and shard count. * @throws IllegalArgumentException if the shard path format is invalid or cannot be parsed. */ @@ -103,8 +103,12 @@ public static ShardInfo parseShardPath(String shardPath) { throw new IllegalArgumentException("Invalid shard path format: " + shardPath); } try { - // indexName is never used from within the ShardInfo returned - String indexName = len == 5 ? parts[1] : ""; + String indexName = shardPath.substring( + // First separator after index id + shardPath.indexOf(DELIMITER) + 1, + // Since we know there are exactly 3 fields at the end + shardPath.lastIndexOf(DELIMITER, shardPath.lastIndexOf(DELIMITER, shardPath.lastIndexOf(DELIMITER) - 1) - 1) + ); IndexId indexId = new IndexId(indexName, getIndexId(parts[0]), Integer.parseInt(parts[len - 2])); int shardCount = Integer.parseInt(parts[len - 3]); return new ShardInfo(indexId, shardCount);