Skip to content

Commit

Permalink
[Remote Store] Waiting for remote store upload in snapshot/local reco…
Browse files Browse the repository at this point in the history
…very (#11720)

* Giving time for snapshot recovery/local time to upload all the data to remote

Signed-off-by: Gaurav Bafna <[email protected]>
  • Loading branch information
gbbafna authored Feb 5, 2024
1 parent a8dd6a0 commit 4d055b8
Showing 12 changed files with 404 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -50,6 +50,8 @@
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
import org.opensearch.test.VersionUtils;

import java.util.concurrent.ExecutionException;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;
@@ -130,4 +132,61 @@ public void testCreateCloneIndex() {

}

public void testCreateCloneIndexFailure() throws ExecutionException, InterruptedException {
Version version = VersionUtils.randomIndexCompatibleVersion(random());
int numPrimaryShards = 1;
prepareCreate("source").setSettings(
Settings.builder().put(indexSettings()).put("number_of_shards", numPrimaryShards).put("index.version.created", version)
).get();
final int docs = 2;
for (int i = 0; i < docs; i++) {
client().prepareIndex("source").setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}", MediaTypeRegistry.JSON).get();
}
internalCluster().ensureAtLeastNumDataNodes(2);
// ensure all shards are allocated otherwise the ensure green below might not succeed since we require the merge node
// if we change the setting too quickly we will end up with one replica unassigned which can't be assigned anymore due
// to the require._name below.
ensureGreen();
// relocate all shards to one node such that we can merge it.
client().admin().indices().prepareUpdateSettings("source").setSettings(Settings.builder().put("index.blocks.write", true)).get();
ensureGreen();

// disable rebalancing to be able to capture the right stats. balancing can move the target primary
// making it hard to pin point the source shards.
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), "none"))
.get();
try {
setFailRate(REPOSITORY_NAME, 100);

client().admin()
.indices()
.prepareResizeIndex("source", "target")
.setResizeType(ResizeType.CLONE)
.setWaitForActiveShards(0)
.setSettings(Settings.builder().put("index.number_of_replicas", 0).putNull("index.blocks.write").build())
.get();

Thread.sleep(2000);
ensureYellow("target");

} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
} finally {
setFailRate(REPOSITORY_NAME, 0);
ensureGreen();
// clean up
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(
Settings.builder().put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), (String) null)
)
.get();
}

}

}
Original file line number Diff line number Diff line change
@@ -9,6 +9,8 @@
package org.opensearch.remotestore;

import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest;
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
import org.opensearch.action.admin.indices.get.GetIndexRequest;
import org.opensearch.action.admin.indices.get.GetIndexResponse;
import org.opensearch.action.bulk.BulkItemResponse;
@@ -37,7 +39,7 @@
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.fs.FsRepository;
import org.opensearch.repositories.fs.ReloadableFsRepository;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.After;

@@ -60,6 +62,7 @@
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.repositories.fs.ReloadableFsRepository.REPOSITORIES_FAILRATE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

public class RemoteStoreBaseIntegTestCase extends OpenSearchIntegTestCase {
@@ -146,6 +149,18 @@ protected Settings nodeSettings(int nodeOrdinal) {
}
}

protected void setFailRate(String repoName, int value) throws ExecutionException, InterruptedException {
GetRepositoriesRequest gr = new GetRepositoriesRequest(new String[] { repoName });
GetRepositoriesResponse res = client().admin().cluster().getRepositories(gr).get();
RepositoryMetadata rmd = res.repositories().get(0);
Settings.Builder settings = Settings.builder()
.put("location", rmd.settings().get("location"))
.put(REPOSITORIES_FAILRATE_SETTING.getKey(), value);
assertAcked(
client().admin().cluster().preparePutRepository(repoName).setType(ReloadableFsRepository.TYPE).setSettings(settings).get()
);
}

public Settings indexSettings() {
return defaultIndexSettings();
}
@@ -224,10 +239,10 @@ public static Settings buildRemoteStoreNodeAttributes(
return buildRemoteStoreNodeAttributes(
segmentRepoName,
segmentRepoPath,
FsRepository.TYPE,
ReloadableFsRepository.TYPE,
translogRepoName,
translogRepoPath,
FsRepository.TYPE,
ReloadableFsRepository.TYPE,
withRateLimiterAttributes
);
}
Original file line number Diff line number Diff line change
@@ -24,6 +24,7 @@
import org.opensearch.indices.IndicesService;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.fs.ReloadableFsRepository;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;

@@ -479,7 +480,14 @@ public void testRateLimitedRemoteDownloads() throws Exception {
settingsMap.entrySet().forEach(entry -> settings.put(entry.getKey(), entry.getValue()));
settings.put("location", segmentRepoPath).put("max_remote_download_bytes_per_sec", 4, ByteSizeUnit.KB);

assertAcked(client().admin().cluster().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(settings).get());
assertAcked(
client().admin()
.cluster()
.preparePutRepository(REPOSITORY_NAME)
.setType(ReloadableFsRepository.TYPE)
.setSettings(settings)
.get()
);

for (RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) {
Repository segmentRepo = repositoriesService.repository(REPOSITORY_NAME);
@@ -508,7 +516,14 @@ public void testRateLimitedRemoteDownloads() throws Exception {
// revert repo metadata to pass asserts on repo metadata vs. node attrs during teardown
// https://github.com/opensearch-project/OpenSearch/pull/9569#discussion_r1345668700
settings.remove("max_remote_download_bytes_per_sec");
assertAcked(client().admin().cluster().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(settings).get());
assertAcked(
client().admin()
.cluster()
.preparePutRepository(REPOSITORY_NAME)
.setType(ReloadableFsRepository.TYPE)
.setSettings(settings)
.get()
);
for (RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) {
Repository segmentRepo = repositoriesService.repository(REPOSITORY_NAME);
assertNull(segmentRepo.getMetadata().settings().get("max_remote_download_bytes_per_sec"));
Original file line number Diff line number Diff line change
@@ -12,7 +12,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.repositories.RepositoryException;
import org.opensearch.repositories.fs.FsRepository;
import org.opensearch.repositories.fs.ReloadableFsRepository;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.Before;

@@ -53,7 +53,7 @@ public void testRestrictedSettingsCantBeUpdated() {
assertEquals(
e.getMessage(),
"[system-repo-name] trying to modify an unmodifiable attribute type of system "
+ "repository from current value [fs] to new value [mock]"
+ "repository from current value [reloadable-fs] to new value [mock]"
);
}

@@ -65,7 +65,12 @@ public void testSystemRepositoryNonRestrictedSettingsCanBeUpdated() {
final Settings.Builder repoSettings = Settings.builder().put("location", absolutePath).put("chunk_size", new ByteSizeValue(20));

assertAcked(
client.admin().cluster().preparePutRepository(systemRepoName).setType(FsRepository.TYPE).setSettings(repoSettings).get()
client.admin()
.cluster()
.preparePutRepository(systemRepoName)
.setType(ReloadableFsRepository.TYPE)
.setSettings(repoSettings)
.get()
);
}
}
Original file line number Diff line number Diff line change
@@ -293,6 +293,7 @@ public void apply(Settings value, Settings current, Settings previous) {
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING,
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING,
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_REMOTE_STORE_STREAMS_SETTING,
RecoverySettings.INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT,
RecoverySettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING,
28 changes: 20 additions & 8 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
@@ -2025,23 +2025,35 @@ public RemoteSegmentStoreDirectory getRemoteDirectory() {
}

/**
Returns true iff it is able to verify that remote segment store
is in sync with local
* Returns true iff it is able to verify that remote segment store
* is in sync with local
*/
boolean isRemoteSegmentStoreInSync() {
assert indexSettings.isRemoteStoreEnabled();
try {
RemoteSegmentStoreDirectory directory = getRemoteDirectory();
if (directory.readLatestMetadataFile() != null) {
// verifying that all files except EXCLUDE_FILES are uploaded to the remote
Collection<String> uploadFiles = directory.getSegmentsUploadedToRemoteStore().keySet();
SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo();
Collection<String> localFiles = segmentInfos.files(true);
if (uploadFiles.containsAll(localFiles)) {
return true;
try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = getSegmentInfosSnapshot()) {
Collection<String> localSegmentInfosFiles = segmentInfosGatedCloseable.get().files(true);
Set<String> localFiles = new HashSet<>(localSegmentInfosFiles);
// verifying that all files except EXCLUDE_FILES are uploaded to the remote
localFiles.removeAll(RemoteStoreRefreshListener.EXCLUDE_FILES);
if (uploadFiles.containsAll(localFiles)) {
return true;
}
logger.debug(
() -> new ParameterizedMessage(
"RemoteSegmentStoreSyncStatus localSize={} remoteSize={}",
localFiles.size(),
uploadFiles.size()
)
);
}
}
} catch (IOException e) {
} catch (AlreadyClosedException e) {
throw e;
} catch (Throwable e) {
logger.error("Exception while reading latest metadata", e);
}
return false;
Original file line number Diff line number Diff line change
@@ -172,13 +172,33 @@ private boolean shouldSync(boolean didRefresh, boolean skipPrimaryTermCheck) {
// When the shouldSync is called the first time, then 1st condition on primary term is true. But after that
// we update the primary term and the same condition would not evaluate to true again in syncSegments.
// Below check ensures that if there is commit, then that gets picked up by both 1st and 2nd shouldSync call.
|| isRefreshAfterCommitSafe();
|| isRefreshAfterCommitSafe()
|| isRemoteSegmentStoreInSync() == false;
if (shouldSync || skipPrimaryTermCheck) {
return shouldSync;
}
return this.primaryTerm != indexShard.getOperationPrimaryTerm();
}

/**
* Checks if all files present in local store are uploaded to remote store or part of excluded files.
*
* Different from IndexShard#isRemoteSegmentStoreInSync as
* it uses files uploaded cache in RemoteDirector and it doesn't make a remote store call.
* Doesn't throw an exception on store getting closed as store will be open
*
*
* @return true iff all the local files are uploaded to remote store.
*/
boolean isRemoteSegmentStoreInSync() {
try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
return segmentInfosGatedCloseable.get().files(true).stream().allMatch(this::skipUpload);
} catch (Throwable throwable) {
logger.error("Throwable thrown during isRemoteSegmentStoreInSync", throwable);
}
return false;
}

/*
@return false if retry is needed
*/
38 changes: 35 additions & 3 deletions server/src/main/java/org/opensearch/index/shard/StoreRecovery.java
Original file line number Diff line number Diff line change
@@ -38,11 +38,13 @@
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.search.Sort;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.action.StepListener;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.MappingMetadata;
@@ -191,7 +193,8 @@ void recoverFromLocalShards(
// just trigger a merge to do housekeeping on the
// copied segments - we will also see them in stats etc.
indexShard.getEngine().forceMerge(false, -1, false, false, false, UUIDs.randomBase64UUID());
if (indexShard.isRemoteTranslogEnabled()) {
if (indexShard.isRemoteTranslogEnabled() && indexShard.shardRouting.primary()) {
waitForRemoteStoreSync(indexShard);
if (indexShard.isRemoteSegmentStoreInSync() == false) {
throw new IndexShardRecoveryException(
indexShard.shardId(),
@@ -432,7 +435,8 @@ void recoverFromSnapshotAndRemoteStore(
}
indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm());
indexShard.finalizeRecovery();
if (indexShard.isRemoteTranslogEnabled()) {
if (indexShard.isRemoteTranslogEnabled() && indexShard.shardRouting.primary()) {
waitForRemoteStoreSync(indexShard);
if (indexShard.isRemoteSegmentStoreInSync() == false) {
listener.onFailure(new IndexShardRestoreFailedException(shardId, "Failed to upload to remote segment store"));
return;
@@ -717,7 +721,8 @@ private void restore(
}
indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm());
indexShard.finalizeRecovery();
if (indexShard.isRemoteTranslogEnabled()) {
if (indexShard.isRemoteTranslogEnabled() && indexShard.shardRouting.primary()) {
waitForRemoteStoreSync(indexShard);
if (indexShard.isRemoteSegmentStoreInSync() == false) {
listener.onFailure(new IndexShardRestoreFailedException(shardId, "Failed to upload to remote segment store"));
return;
@@ -791,4 +796,31 @@ private void bootstrap(final IndexShard indexShard, final Store store) throws IO
);
store.associateIndexWithNewTranslog(translogUUID);
}

/*
Blocks the calling thread, waiting for the remote store to get synced till internal Remote Upload Timeout
*/
private void waitForRemoteStoreSync(IndexShard indexShard) {
if (indexShard.shardRouting.primary() == false) {
return;
}
long startNanos = System.nanoTime();

while (System.nanoTime() - startNanos < indexShard.getRecoverySettings().internalRemoteUploadTimeout().nanos()) {
try {
if (indexShard.isRemoteSegmentStoreInSync()) {
break;
} else {
try {
Thread.sleep(TimeValue.timeValueMinutes(1).seconds());
} catch (InterruptedException ie) {
throw new OpenSearchException("Interrupted waiting for completion of [{}]", ie);
}
}
} catch (AlreadyClosedException e) {
// There is no point in waiting as shard is now closed .
return;
}
}
}
}
Loading

0 comments on commit 4d055b8

Please sign in to comment.