Skip to content

Commit

Permalink
Giving time for snapshot recovery/local time to upload all the data t…
Browse files Browse the repository at this point in the history
…o remote

Signed-off-by: Gaurav Bafna <[email protected]>
  • Loading branch information
gbbafna committed Jan 3, 2024
1 parent 7b1c2c7 commit c3c5a54
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ public void apply(Settings value, Settings current, Settings previous) {
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING,
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING,
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_REMOTE_STORE_STREAMS_SETTING,
RecoverySettings.INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT,
RecoverySettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -951,8 +951,8 @@ public IndexResult index(Index index) throws IOException {
final Translog.Location translogLocation = trackTranslogLocation.get() ? indexResult.getTranslogLocation() : null;
versionMap.maybePutIndexUnderLock(
index.uid().bytes(),
new IndexVersionValue(translogLocation, plan.versionForIndexing, index.seqNo(), index.primaryTerm())
);
new IndexVersionValue(translogLocation, plan.versionForIndexing, index.seqNo(), index.primaryTerm()))
;
}
localCheckpointTracker.markSeqNoAsProcessed(indexResult.getSeqNo());
if (indexResult.getTranslogLocation() == null) {
Expand Down
20 changes: 12 additions & 8 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -2024,23 +2024,27 @@ private RemoteSegmentStoreDirectory getRemoteDirectory() {
}

/**
Returns true iff it is able to verify that remote segment store
is in sync with local
* Returns true iff it is able to verify that remote segment store
* is in sync with local
*/
boolean isRemoteSegmentStoreInSync() {
assert indexSettings.isRemoteStoreEnabled();
try {
RemoteSegmentStoreDirectory directory = getRemoteDirectory();
if (directory.readLatestMetadataFile() != null) {
// verifying that all files except EXCLUDE_FILES are uploaded to the remote
Collection<String> uploadFiles = directory.getSegmentsUploadedToRemoteStore().keySet();
SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo();
Collection<String> localFiles = segmentInfos.files(true);
if (uploadFiles.containsAll(localFiles)) {
return true;
try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = getSegmentInfosSnapshot()) {
Collection<String> localSegmentInfosFiles = segmentInfosGatedCloseable.get().files(true);
Set<String> localFiles = new HashSet<>(localSegmentInfosFiles);
// verifying that all files except EXCLUDE_FILES are uploaded to the remote
localFiles.removeAll(RemoteStoreRefreshListener.EXCLUDE_FILES);
if (uploadFiles.containsAll(localFiles)) {
return true;
}
logger.debug(() -> new ParameterizedMessage("RemoteSegmentStoreSyncStatus localSize={} remoteSize={}", localFiles.size(), uploadFiles.size()));
}
}
} catch (IOException e) {
} catch (Throwable e) {
logger.error("Exception while reading latest metadata", e);
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,31 @@ private boolean shouldSync(boolean didRefresh, boolean skipPrimaryTermCheck) {
// When the shouldSync is called the first time, then 1st condition on primary term is true. But after that
// we update the primary term and the same condition would not evaluate to true again in syncSegments.
// Below check ensures that if there is commit, then that gets picked up by both 1st and 2nd shouldSync call.
|| isRefreshAfterCommitSafe();
|| isRefreshAfterCommitSafe()
|| isRemoteSegmentStoreInSync();
if (shouldSync || skipPrimaryTermCheck) {
return shouldSync;
}
return this.primaryTerm != indexShard.getOperationPrimaryTerm();
}

/**
* Checks if all files present in local store are uploaded to remote store or part of excluded files.
*
* Different from IndexShard#isRemoteSegmentStoreInSync as it uses files uploaded cache in RemoteDirectory
* And it doesn't make a remote store call.
*
* @return true iff all the local files are uploaded to remote store.
*/
boolean isRemoteSegmentStoreInSync() {
try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
return segmentInfosGatedCloseable.get().files(true).stream().allMatch(this::skipUpload);
} catch (Throwable throwable) {
logger.error("Throwable thrown during isRemoteSegmentStoreInSync", throwable);
}
return false;
}

/*
@return false if retry is needed
*/
Expand Down
32 changes: 29 additions & 3 deletions server/src/main/java/org/opensearch/index/shard/StoreRecovery.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.action.StepListener;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.MappingMetadata;
Expand Down Expand Up @@ -191,7 +192,8 @@ void recoverFromLocalShards(
// just trigger a merge to do housekeeping on the
// copied segments - we will also see them in stats etc.
indexShard.getEngine().forceMerge(false, -1, false, false, false, UUIDs.randomBase64UUID());
if (indexShard.isRemoteTranslogEnabled()) {
if (indexShard.isRemoteTranslogEnabled() && indexShard.shardRouting.primary()) {
waitForRemoteStoreSync(indexShard);
if (indexShard.isRemoteSegmentStoreInSync() == false) {
throw new IndexShardRecoveryException(
indexShard.shardId(),
Expand Down Expand Up @@ -432,7 +434,8 @@ void recoverFromSnapshotAndRemoteStore(
}
indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm());
indexShard.finalizeRecovery();
if (indexShard.isRemoteTranslogEnabled()) {
if (indexShard.isRemoteTranslogEnabled() && indexShard.shardRouting.primary()) {
waitForRemoteStoreSync(indexShard);
if (indexShard.isRemoteSegmentStoreInSync() == false) {
listener.onFailure(new IndexShardRestoreFailedException(shardId, "Failed to upload to remote segment store"));
return;
Expand Down Expand Up @@ -717,7 +720,8 @@ private void restore(
}
indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm());
indexShard.finalizeRecovery();
if (indexShard.isRemoteTranslogEnabled()) {
if (indexShard.isRemoteTranslogEnabled() && indexShard.shardRouting.primary()) {
waitForRemoteStoreSync(indexShard);
if (indexShard.isRemoteSegmentStoreInSync() == false) {
listener.onFailure(new IndexShardRestoreFailedException(shardId, "Failed to upload to remote segment store"));
return;
Expand Down Expand Up @@ -791,4 +795,26 @@ private void bootstrap(final IndexShard indexShard, final Store store) throws IO
);
store.associateIndexWithNewTranslog(translogUUID);
}

/*
Blocks the calling thread, waiting for the remote store to get synced till internal Remote Upload Timeout
*/
private void waitForRemoteStoreSync(IndexShard indexShard) {
if (indexShard.shardRouting.primary() == false) {
return;
}
long startNanos = System.nanoTime();

while (System.nanoTime() - startNanos < indexShard.getRecoverySettings().internalRemoteUploadTimeout().nanos()) {
if (indexShard.isRemoteSegmentStoreInSync()) {
break;
} else {
try {
Thread.sleep(TimeValue.timeValueMinutes(1).seconds());
} catch (InterruptedException ie) {
throw new OpenSearchException("Interrupted waiting for completion of [{}]", ie);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;

import java.util.concurrent.TimeUnit;

/**
* Settings for the recovery mechanism
*
Expand Down Expand Up @@ -176,6 +178,13 @@ public class RecoverySettings {
Property.Dynamic
);

public static final Setting<TimeValue> INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT = Setting.timeSetting(
"indices.recovery.internal_remote_upload_timeout",
new TimeValue(1, TimeUnit.HOURS),
Property.Dynamic,
Property.NodeScope
);

// choose 512KB-16B to ensure that the resulting byte[] is not a humongous allocation in G1.
public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512 * 1024 - 16, ByteSizeUnit.BYTES);

Expand All @@ -193,6 +202,7 @@ public class RecoverySettings {
private volatile int minRemoteSegmentMetadataFiles;

private volatile ByteSizeValue chunkSize = DEFAULT_CHUNK_SIZE;
private volatile TimeValue internalRemoteUploadTimeout;

public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
this.retryDelayStateSync = INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING.get(settings);
Expand All @@ -216,6 +226,7 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
}

logger.debug("using max_bytes_per_sec[{}]", maxBytesPerSec);
this.internalRemoteUploadTimeout = INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT.get(settings);

clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setMaxBytesPerSec);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING, this::setMaxConcurrentFileChunks);
Expand All @@ -237,6 +248,8 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING,
this::setMinRemoteSegmentMetadataFiles
);
clusterSettings.addSettingsUpdateConsumer(INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT, this::setInternalRemoteUploadTimeout);

}

public RateLimiter rateLimiter() {
Expand Down Expand Up @@ -267,6 +280,10 @@ public TimeValue internalActionLongTimeout() {
return internalActionLongTimeout;
}

public TimeValue internalRemoteUploadTimeout() {
return internalRemoteUploadTimeout;
}

public ByteSizeValue getChunkSize() {
return chunkSize;
}
Expand Down Expand Up @@ -298,6 +315,10 @@ public void setInternalActionLongTimeout(TimeValue internalActionLongTimeout) {
this.internalActionLongTimeout = internalActionLongTimeout;
}

public void setInternalRemoteUploadTimeout(TimeValue internalRemoteUploadTimeout) {
this.internalRemoteUploadTimeout = internalRemoteUploadTimeout;
}

private void setMaxBytesPerSec(ByteSizeValue maxBytesPerSec) {
this.maxBytesPerSec = maxBytesPerSec;
if (maxBytesPerSec.getBytes() <= 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.tests.store.BaseDirectoryWrapper;
import org.junit.After;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
Expand All @@ -37,7 +38,6 @@
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.threadpool.ThreadPool;
import org.junit.After;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -46,17 +46,17 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
import static org.opensearch.index.store.RemoteSegmentStoreDirectory.METADATA_FILES_TO_FETCH;
import static org.opensearch.test.RemoteStoreTestUtils.createMetadataFileBytes;
import static org.opensearch.test.RemoteStoreTestUtils.getDummyMetadata;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
import static org.opensearch.index.store.RemoteSegmentStoreDirectory.METADATA_FILES_TO_FETCH;
import static org.opensearch.test.RemoteStoreTestUtils.createMetadataFileBytes;
import static org.opensearch.test.RemoteStoreTestUtils.getDummyMetadata;

public class RemoteStoreRefreshListenerTests extends IndexShardTestCase {
private IndexShard indexShard;
Expand Down Expand Up @@ -227,6 +227,7 @@ public void testRefreshAfterCommit() throws IOException {

verifyUploadedSegments(remoteSegmentStoreDirectory);


// This is to check if reading data from remote segment store works as well.
remoteSegmentStoreDirectory.init();

Expand Down Expand Up @@ -335,6 +336,7 @@ public void testRefreshSuccessOnFirstAttempt() throws Exception {
RemoteStoreStatsTrackerFactory trackerFactory = tuple.v2();
RemoteSegmentTransferTracker segmentTracker = trackerFactory.getRemoteSegmentTransferTracker(indexShard.shardId());
assertNoLagAndTotalUploadsFailed(segmentTracker, 0);
assertTrue("remote store in sync", tuple.v1().isRemoteSegmentStoreInSync());
}

public void testRefreshSuccessOnSecondAttempt() throws Exception {
Expand Down Expand Up @@ -404,6 +406,40 @@ public void testRefreshSuccessOnThirdAttempt() throws Exception {
assertNoLagAndTotalUploadsFailed(segmentTracker, 2);
}

public void testRefreshPersistentFailure() throws Exception {
// This covers 3 cases - 1) isRetry=false, shouldRetry=true 2) isRetry=true, shouldRetry=false 3) isRetry=True, shouldRetry=true
// Succeed on 3rd attempt
// int succeedOnAttempt = 5;
// // We spy on IndexShard.isPrimaryStarted() to validate that we have tried running remote time as per the expectation.
// CountDownLatch refreshCountLatch = new CountDownLatch(succeedOnAttempt);
// // We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload.
// // Value has been set as INT_MAX as during a successful upload IndexShard.getEngine() is hit thrice and with mockito we are counting down
// CountDownLatch successLatch = new CountDownLatch(Integer.MAX_VALUE);
// Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> tuple = mockIndexShardWithRetryAndScheduleRefresh(
// succeedOnAttempt,
// refreshCountLatch,
// successLatch
// );
// assertBusy(() -> assertEquals(Integer.MAX_VALUE - 2, successLatch.getCount()));
// assertFalse("remote store should not in sync", tuple.v1().isRemoteSegmentStoreInSync());

// This covers 3 cases - 1) isRetry=false, shouldRetry=true 2) isRetry=true, shouldRetry=false 3) isRetry=True, shouldRetry=true
// Succeed on 3rd attempt
int succeedOnAttempt = 10;
// We spy on IndexShard.isPrimaryStarted() to validate that we have tried running remote time as per the expectation.
CountDownLatch refreshCountLatch = new CountDownLatch(1);
// We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload.
// Value has been set as 3 as during a successful upload IndexShard.getEngine() is hit thrice and with mockito we are counting down
CountDownLatch successLatch = new CountDownLatch(10);
Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> tuple = mockIndexShardWithRetryAndScheduleRefresh(
succeedOnAttempt,
refreshCountLatch,
successLatch
);
assertBusy(() -> assertTrue(10 > successLatch.getCount()));
assertFalse("remote store should not in sync", tuple.v1().isRemoteSegmentStoreInSync());
}

private void assertNoLagAndTotalUploadsFailed(RemoteSegmentTransferTracker segmentTracker, long totalUploadsFailed) throws Exception {
assertBusy(() -> {
assertEquals(0, segmentTracker.getBytesLag());
Expand Down Expand Up @@ -527,6 +563,7 @@ private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIn
doAnswer(invocation -> {
if (Objects.nonNull(successLatch)) {
successLatch.countDown();
logger.info("Value fo latch {}", successLatch.getCount());
}
return indexShard.getEngine();
}).when(shard).getEngine();
Expand Down

0 comments on commit c3c5a54

Please sign in to comment.