Skip to content

Commit

Permalink
[Snapshot V2] Remove orphan timestamps post create snapshot completion (
Browse files Browse the repository at this point in the history
opensearch-project#16148)

Signed-off-by: Gaurav Bafna <[email protected]>
  • Loading branch information
gbbafna authored and dk2k committed Oct 16, 2024
1 parent ea7570c commit b99d02c
Show file tree
Hide file tree
Showing 4 changed files with 216 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.remotestore;

import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest;
import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest;
Expand All @@ -29,6 +30,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.index.Index;
import org.opensearch.core.rest.RestStatus;
Expand All @@ -41,6 +43,7 @@
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryData;
Expand All @@ -62,8 +65,10 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand All @@ -73,6 +78,7 @@
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.METADATA;
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING;
import static org.opensearch.snapshots.SnapshotsService.getPinningEntity;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
Expand Down Expand Up @@ -753,17 +759,15 @@ public void testInvalidRestoreRequestScenarios() throws Exception {
assertTrue(exception.getMessage().contains("cannot remove setting [index.remote_store.segment.repository]" + " on restore"));
}

public void testCreateSnapshotV2() throws Exception {
public void testCreateSnapshotV2_Orphan_Timestamp_Cleanup() throws Exception {
internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings());
internalCluster().startDataOnlyNode(pinnedTimestampSettings());
internalCluster().startDataOnlyNode(pinnedTimestampSettings());
String indexName1 = "testindex1";
String indexName2 = "testindex2";
String indexName3 = "testindex3";
String snapshotRepoName = "test-create-snapshot-repo";
String snapshotName1 = "test-create-snapshot1";
Path absolutePath1 = randomRepoPath().toAbsolutePath();
logger.info("Snapshot Path [{}]", absolutePath1);

Settings.Builder settings = Settings.builder()
.put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1)
Expand All @@ -787,27 +791,37 @@ public void testCreateSnapshotV2() throws Exception {
indexDocuments(client, indexName2, numDocsInIndex2);
ensureGreen(indexName1, indexName2);

// create an orphan timestamp related to this repo
RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance(
RemoteStorePinnedTimestampService.class,
internalCluster().getClusterManagerName()
);
forceSyncPinnedTimestamps();

long pinnedTimestamp = System.currentTimeMillis();
final CountDownLatch latch = new CountDownLatch(1);
LatchedActionListener<Void> latchedActionListener = new LatchedActionListener<>(new ActionListener<>() {
@Override
public void onResponse(Void unused) {}

@Override
public void onFailure(Exception e) {}
}, latch);

remoteStorePinnedTimestampService.pinTimestamp(
pinnedTimestamp,
getPinningEntity(snapshotRepoName, "some_uuid"),
latchedActionListener
);
latch.await();

SnapshotInfo snapshotInfo = createSnapshot(snapshotRepoName, snapshotName1, Collections.emptyList());
assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo.successfulShards(), greaterThan(0));
assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards()));
assertThat(snapshotInfo.getPinnedTimestamp(), greaterThan(0L));

indexDocuments(client, indexName1, 10);
indexDocuments(client, indexName2, 20);

createIndex(indexName3, indexSettings);
indexDocuments(client, indexName3, 10);

String snapshotName2 = "test-create-snapshot2";

// verify response status if waitForCompletion is not true
RestStatus createSnapshotResponseStatus = client().admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName2)
.get()
.status();
assertEquals(RestStatus.ACCEPTED, createSnapshotResponseStatus);
waitUntil(() -> 1 == RemoteStorePinnedTimestampService.getPinnedEntities().size());
}

public void testMixedSnapshotCreationWithV2RepositorySetting() throws Exception {
Expand Down Expand Up @@ -879,7 +893,8 @@ public void testMixedSnapshotCreationWithV2RepositorySetting() throws Exception
assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards()));
assertThat(snapshotInfo.snapshotId().getName(), equalTo(snapshotName2));
assertThat(snapshotInfo.getPinnedTimestamp(), greaterThan(0L));

forceSyncPinnedTimestamps();
assertEquals(RemoteStorePinnedTimestampService.getPinnedEntities().size(), 1);
}

public void testConcurrentSnapshotV2CreateOperation() throws InterruptedException, ExecutionException {
Expand Down Expand Up @@ -955,6 +970,8 @@ public void testConcurrentSnapshotV2CreateOperation() throws InterruptedExceptio

RepositoryData repositoryData = repositoryDataPlainActionFuture.get();
assertThat(repositoryData.getSnapshotIds().size(), greaterThanOrEqualTo(1));
forceSyncPinnedTimestamps();
assertEquals(RemoteStorePinnedTimestampService.getPinnedEntities().size(), repositoryData.getSnapshotIds().size());
}

public void testConcurrentSnapshotV2CreateOperation_MasterChange() throws Exception {
Expand Down Expand Up @@ -1017,13 +1034,92 @@ public void testConcurrentSnapshotV2CreateOperation_MasterChange() throws Except
logger.info("Exception while creating new-snapshot", e);
}

AtomicLong totalSnaps = new AtomicLong();

// Validate that snapshot is present in repository data
assertBusy(() -> {
GetSnapshotsRequest request = new GetSnapshotsRequest(snapshotRepoName);
GetSnapshotsResponse response2 = client().admin().cluster().getSnapshots(request).actionGet();
assertThat(response2.getSnapshots().size(), greaterThanOrEqualTo(1));
totalSnaps.set(response2.getSnapshots().size());

}, 30, TimeUnit.SECONDS);
thread.join();
forceSyncPinnedTimestamps();
waitUntil(() -> {
this.forceSyncPinnedTimestamps();
return RemoteStorePinnedTimestampService.getPinnedEntities().size() == totalSnaps.intValue();
});
}

public void testCreateSnapshotV2() throws Exception {
internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings());
internalCluster().startDataOnlyNode(pinnedTimestampSettings());
internalCluster().startDataOnlyNode(pinnedTimestampSettings());
String indexName1 = "testindex1";
String indexName2 = "testindex2";
String indexName3 = "testindex3";
String snapshotRepoName = "test-create-snapshot-repo";
String snapshotName1 = "test-create-snapshot1";
Path absolutePath1 = randomRepoPath().toAbsolutePath();
logger.info("Snapshot Path [{}]", absolutePath1);

Settings.Builder settings = Settings.builder()
.put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1)
.put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean())
.put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES)
.put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true)
.put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true);

createRepository(snapshotRepoName, FsRepository.TYPE, settings);

Client client = client();
Settings indexSettings = getIndexSettings(20, 0).build();
createIndex(indexName1, indexSettings);

Settings indexSettings2 = getIndexSettings(15, 0).build();
createIndex(indexName2, indexSettings2);

final int numDocsInIndex1 = 10;
final int numDocsInIndex2 = 20;
indexDocuments(client, indexName1, numDocsInIndex1);
indexDocuments(client, indexName2, numDocsInIndex2);
ensureGreen(indexName1, indexName2);

SnapshotInfo snapshotInfo = createSnapshot(snapshotRepoName, snapshotName1, Collections.emptyList());
assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo.successfulShards(), greaterThan(0));
assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards()));
assertThat(snapshotInfo.getPinnedTimestamp(), greaterThan(0L));

indexDocuments(client, indexName1, 10);
indexDocuments(client, indexName2, 20);

createIndex(indexName3, indexSettings);
indexDocuments(client, indexName3, 10);

String snapshotName2 = "test-create-snapshot2";

// verify response status if waitForCompletion is not true
RestStatus createSnapshotResponseStatus = client().admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName2)
.get()
.status();
assertEquals(RestStatus.ACCEPTED, createSnapshotResponseStatus);
forceSyncPinnedTimestamps();
assertEquals(2, RemoteStorePinnedTimestampService.getPinnedEntities().size());
}

public void forceSyncPinnedTimestamps() {
// for all nodes , run forceSyncPinnedTimestamps()
for (String node : internalCluster().getNodeNames()) {
RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance(
RemoteStorePinnedTimestampService.class,
node
);
remoteStorePinnedTimestampService.forceSyncPinnedTimestamps();
}
}

public void testCreateSnapshotV2WithRedIndex() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;

Expand Down Expand Up @@ -76,6 +79,11 @@ public void testTimestampPinUnpin() throws Exception {
Tuple<Long, Set<Long>> pinnedTimestampWithFetchTimestamp_2 = RemoteStorePinnedTimestampService.getPinnedTimestamps();
long lastFetchTimestamp_2 = pinnedTimestampWithFetchTimestamp_2.v1();
assertTrue(lastFetchTimestamp_2 != -1);
Map<String, List<Long>> pinnedEntities = RemoteStorePinnedTimestampService.getPinnedEntities();
assertEquals(3, pinnedEntities.size());
assertEquals(Set.of("ss2", "ss3", "ss4"), pinnedEntities.keySet());
assertEquals(pinnedEntities.get("ss2").size(), 1);
assertEquals(Optional.of(timestamp1).get(), pinnedEntities.get("ss2").get(0));
assertEquals(Set.of(timestamp1, timestamp2, timestamp3), pinnedTimestampWithFetchTimestamp_2.v2());
});

Expand Down Expand Up @@ -103,10 +111,14 @@ public void onFailure(Exception e) {
// Adding different entity to already pinned timestamp
remoteStorePinnedTimestampService.pinTimestamp(timestamp3, "ss5", noOpActionListener);

remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1));
remoteStorePinnedTimestampService.forceSyncPinnedTimestamps();

assertBusy(() -> {
Tuple<Long, Set<Long>> pinnedTimestampWithFetchTimestamp_3 = RemoteStorePinnedTimestampService.getPinnedTimestamps();
Map<String, List<Long>> pinnedEntities = RemoteStorePinnedTimestampService.getPinnedEntities();
assertEquals(3, pinnedEntities.size());
assertEquals(pinnedEntities.get("ss5").size(), 1);
assertEquals(Optional.of(timestamp3).get(), pinnedEntities.get("ss5").get(0));
long lastFetchTimestamp_3 = pinnedTimestampWithFetchTimestamp_3.v1();
assertTrue(lastFetchTimestamp_3 != -1);
assertEquals(Set.of(timestamp1, timestamp3), pinnedTimestampWithFetchTimestamp_3.v2());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -49,6 +52,7 @@
public class RemoteStorePinnedTimestampService implements Closeable {
private static final Logger logger = LogManager.getLogger(RemoteStorePinnedTimestampService.class);
private static Tuple<Long, Set<Long>> pinnedTimestampsSet = new Tuple<>(-1L, Set.of());
private static Map<String, List<Long>> pinnedEntityToTimestampsMap = new HashMap<>();
public static final String PINNED_TIMESTAMPS_PATH_TOKEN = "pinned_timestamps";
public static final String PINNED_TIMESTAMPS_FILENAME_SEPARATOR = "__";

Expand Down Expand Up @@ -216,6 +220,16 @@ private long getTimestampFromBlobName(String blobName) {
return -1;
}

private String getEntityFromBlobName(String blobName) {
String[] blobNameTokens = blobName.split(PINNED_TIMESTAMPS_FILENAME_SEPARATOR);
if (blobNameTokens.length < 2) {
String errorMessage = "Pinned timestamps blob name contains invalid format: " + blobName;
logger.error(errorMessage);
throw new IllegalArgumentException(errorMessage);
}
return String.join(PINNED_TIMESTAMPS_FILENAME_SEPARATOR, Arrays.copyOfRange(blobNameTokens, 0, blobNameTokens.length - 1));
}

/**
* Unpins a timestamp from the remote store.
*
Expand Down Expand Up @@ -262,6 +276,10 @@ public static Tuple<Long, Set<Long>> getPinnedTimestamps() {
return pinnedTimestampsSet;
}

public static Map<String, List<Long>> getPinnedEntities() {
return pinnedEntityToTimestampsMap;
}

/**
* Inner class for asynchronously updating the pinned timestamp set.
*/
Expand All @@ -283,15 +301,27 @@ protected void runInternal() {
Map<String, BlobMetadata> pinnedTimestampList = blobContainer.listBlobs();
if (pinnedTimestampList.isEmpty()) {
pinnedTimestampsSet = new Tuple<>(triggerTimestamp, Set.of());
pinnedEntityToTimestampsMap = new HashMap<>();
return;
}
Set<Long> pinnedTimestamps = pinnedTimestampList.keySet()
.stream()
.map(RemoteStorePinnedTimestampService.this::getTimestampFromBlobName)
.filter(timestamp -> timestamp != -1)
.collect(Collectors.toSet());

logger.debug("Fetched pinned timestamps from remote store: {} - {}", triggerTimestamp, pinnedTimestamps);
pinnedTimestampsSet = new Tuple<>(triggerTimestamp, pinnedTimestamps);
pinnedEntityToTimestampsMap = pinnedTimestampList.keySet()
.stream()
.collect(Collectors.toMap(RemoteStorePinnedTimestampService.this::getEntityFromBlobName, blobName -> {
long timestamp = RemoteStorePinnedTimestampService.this.getTimestampFromBlobName(blobName);
return Collections.singletonList(timestamp);
}, (existingList, newList) -> {
List<Long> mergedList = new ArrayList<>(existingList);
mergedList.addAll(newList);
return mergedList;
}));
} catch (Throwable t) {
logger.error("Exception while fetching pinned timestamp details", t);
}
Expand Down
Loading

0 comments on commit b99d02c

Please sign in to comment.