Skip to content

Commit

Permalink
Simplify remote directory cleanup after snapshot delete to … (opensea…
Browse files Browse the repository at this point in the history
…rch-project#12672) (opensearch-project#12682)

* Simplify remote directory cleanup after snapshot delete to avoid concurrent cleanup task runs for same shard.



* Address PR Comments.



---------



(cherry picked from commit d0467b3)

Signed-off-by: Harish Bhakuni <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: Harish Bhakuni <[email protected]>
  • Loading branch information
3 people authored Mar 15, 2024
1 parent dccb2ca commit 204f354
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1216,11 +1216,12 @@ protected void releaseRemoteStoreLockAndCleanup(
logger.debug("Successfully released lock for shard {} of index with uuid {}", shardId, indexUUID);
if (!isIndexPresent(clusterService, indexUUID)) {
// Note: this is a temporary solution where snapshot deletion triggers remote store side cleanup if
// index is already deleted. shard cleanup will still happen asynchronously using REMOTE_PURGE
// threadpool. if it fails, it could leave some stale files in remote directory. this issue could
// even happen in cases of shard level remote store data cleanup which also happens asynchronously.
// in long term, we have plans to implement remote store GC poller mechanism which will take care of
// such stale data. related issue: https://github.com/opensearch-project/OpenSearch/issues/8469
// index is already deleted. this is the best effort at the moment since shard cleanup will still happen
// asynchronously using REMOTE_PURGE thread pool. if it fails, it could leave some stale files in remote
// directory. this issue could even happen in cases of shard level remote store data cleanup which also
// happens asynchronously. in long term, we have plans to implement remote store GC poller mechanism which
// will take care of such stale data.
// related issue: https://github.com/opensearch-project/OpenSearch/issues/8469
RemoteSegmentStoreDirectoryFactory remoteDirectoryFactory = new RemoteSegmentStoreDirectoryFactory(
remoteStoreLockManagerFactory.getRepositoriesService(),
threadPool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.core.index.shard.ShardId;

import java.util.Map;
import java.util.Set;

import static org.opensearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
import static org.opensearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet;

/**
Expand All @@ -25,7 +23,6 @@ public class RemoteStoreShardCleanupTask implements Runnable {
private final Runnable task;
private final String shardIdentifier;
final static Set<String> ongoingRemoteDirectoryCleanups = newConcurrentSet();
final static Map<String, Runnable> pendingRemoteDirectoryCleanups = newConcurrentMap();
private static final Logger staticLogger = LogManager.getLogger(RemoteStoreShardCleanupTask.class);

public RemoteStoreShardCleanupTask(Runnable task, String indexUUID, ShardId shardId) {
Expand All @@ -39,25 +36,16 @@ private static String indexShardIdentifier(String indexUUID, ShardId shardId) {

@Override
public void run() {
// TODO: this is the best effort at the moment since there is still a known race condition scenario in this
// method which needs to be handled where one of the thread just came out of while loop and removed the
// entry from ongoingRemoteDirectoryCleanup, and another thread added new pending task in the map.
// we need to introduce semaphores/locks to avoid that situation which introduces the overhead of lock object
// cleanups. however, there will be no scenario where two threads run cleanup for same shard at same time.
// <issue-link>
if (pendingRemoteDirectoryCleanups.put(shardIdentifier, task) == null) {
if (ongoingRemoteDirectoryCleanups.add(shardIdentifier)) {
while (pendingRemoteDirectoryCleanups.containsKey(shardIdentifier)) {
Runnable newTask = pendingRemoteDirectoryCleanups.get(shardIdentifier);
pendingRemoteDirectoryCleanups.remove(shardIdentifier);
newTask.run();
}
// If there is already a same task ongoing for a shard, we need to skip the new task to avoid multiple
// concurrent cleanup of same shard.
if (ongoingRemoteDirectoryCleanups.add(shardIdentifier)) {
try {
task.run();
} finally {
ongoingRemoteDirectoryCleanups.remove(shardIdentifier);
} else {
staticLogger.debug("one task is already ongoing for shard {}, we can leave entry in pending", shardIdentifier);
}
} else {
staticLogger.debug("one cleanup task for shard {} is already in pending, we can skip this task", shardIdentifier);
staticLogger.warn("one cleanup task for shard {} is already ongoing, need to skip this task", shardIdentifier);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -321,38 +321,31 @@ private RepositoryData addRandomSnapshotsToRepoData(RepositoryData repoData, boo
return repoData;
}

private String getShardIdentifier(String indexUUID, String shardId) {
return String.join("/", indexUUID, shardId);
}

public void testRemoteStoreShardCleanupTask() {
// todo: move it to separate class and add more scenarios.
AtomicBoolean executed1 = new AtomicBoolean(false);
Runnable task1 = () -> executed1.set(true);
String indexName = "test-idx";
String testIndexUUID = "test-idx-uuid";
ShardId shardId = new ShardId(new Index(indexName, testIndexUUID), 0);

// Scenario 1: pending = empty, ongoing = false => executed
// just adding random shards in ongoing cleanups.
RemoteStoreShardCleanupTask.ongoingRemoteDirectoryCleanups.add(getShardIdentifier(testIndexUUID, "1"));
RemoteStoreShardCleanupTask.ongoingRemoteDirectoryCleanups.add(getShardIdentifier(testIndexUUID, "2"));

// Scenario 1: ongoing = false => executed
RemoteStoreShardCleanupTask remoteStoreShardCleanupTask = new RemoteStoreShardCleanupTask(task1, testIndexUUID, shardId);
remoteStoreShardCleanupTask.run();
assertTrue(executed1.get());

// Scenario 2: pending = empty, ongoing = true => pending = currentTask
// Scenario 2: ongoing = true => currentTask skipped.
executed1.set(false);
String shardIdentifier = String.join("/", testIndexUUID, String.valueOf(shardId.id()));
RemoteStoreShardCleanupTask.ongoingRemoteDirectoryCleanups.add(shardIdentifier);

RemoteStoreShardCleanupTask.ongoingRemoteDirectoryCleanups.add(getShardIdentifier(testIndexUUID, "0"));
remoteStoreShardCleanupTask = new RemoteStoreShardCleanupTask(task1, testIndexUUID, shardId);
remoteStoreShardCleanupTask.run();
assertFalse(executed1.get());
assertSame(RemoteStoreShardCleanupTask.pendingRemoteDirectoryCleanups.get(shardIdentifier), task1);

// Scenario3: pending = anotherTask, ongoing = true => pending = currentTask
AtomicBoolean executed2 = new AtomicBoolean(false);
Runnable task2 = () -> executed2.set(true);
RemoteStoreShardCleanupTask.pendingRemoteDirectoryCleanups.put(shardIdentifier, task1);
RemoteStoreShardCleanupTask.ongoingRemoteDirectoryCleanups.add(shardIdentifier);

remoteStoreShardCleanupTask = new RemoteStoreShardCleanupTask(task2, testIndexUUID, shardId);
remoteStoreShardCleanupTask.run();
assertFalse(executed1.get());
assertSame(RemoteStoreShardCleanupTask.pendingRemoteDirectoryCleanups.get(shardIdentifier), task2);
}
}

0 comments on commit 204f354

Please sign in to comment.