Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make RemoteStoreReplicationSource#getSegmentFiles asynchronous #10725

Merged
merged 6 commits into from
Oct 20, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add ITs ensuring segRep targets are cleaned up on cancellation during…
… metadata and segment fetch steps.

Signed-off-by: Marc Handalian <[email protected]>
mch2 committed Oct 19, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit 74d154803154a5a292a5903dfe17c3da706216df
Original file line number Diff line number Diff line change
@@ -194,7 +194,7 @@ protected IndexShard getIndexShard(String node, ShardId shardId, String indexNam
/**
* Fetch IndexShard, assumes only a single shard per node.
*/
protected IndexShard getIndexShard(String node, String indexName) {
public static IndexShard getIndexShard(String node, String indexName) {
final Index index = resolveIndex(indexName);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
IndexService indexService = indicesService.indexService(index);
Original file line number Diff line number Diff line change
@@ -114,6 +114,10 @@ protected void cleanupRepo() {
}

protected String setup(Path repoLocation, double ioFailureRate, String skipExceptionBlobList, long maxFailure) {
return setup(repoLocation, ioFailureRate, skipExceptionBlobList, maxFailure, 0);
}

protected String setup(Path repoLocation, double ioFailureRate, String skipExceptionBlobList, long maxFailure, int replicaCount) {
// The random_control_io_exception_rate setting ensures that 10-25% of all operations to remote store results in
/// IOException. skip_exception_on_verification_file & skip_exception_on_list_blobs settings ensures that the
// repository creation can happen without failure.
@@ -128,6 +132,9 @@ protected String setup(Path repoLocation, double ioFailureRate, String skipExcep

internalCluster().startClusterManagerOnlyNode(settings.build());
String dataNodeName = internalCluster().startDataOnlyNode(settings.build());
for (int i = 0; i < replicaCount; i++) {
internalCluster().startDataOnlyNode(settings.build());
}
createIndex(INDEX_NAME);
logger.info("--> Created index={}", INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.replication.SegmentReplicationBaseIT;
import org.opensearch.indices.replication.SegmentReplicationState;
import org.opensearch.indices.replication.SegmentReplicationTarget;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.common.ReplicationCollection;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;
import java.util.Set;

/**
* This class runs tests with remote store + segRep while blocking file downloads
*/
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SegmentReplicationUsingRemoteStoreDisruptionIT extends AbstractRemoteStoreMockRepositoryIntegTestCase {

@Override
public Settings indexSettings() {
return remoteStoreIndexSettings(1);
}

@Override
protected boolean addMockInternalEngine() {
return false;
}

public void testCancelReplicationWhileSyncingSegments() throws Exception {
Path location = randomRepoPath().toAbsolutePath();
setup(location, 0d, "metadata", Long.MAX_VALUE, 1);

final Set<String> dataNodeNames = internalCluster().getDataNodeNames();
final String replicaNode = getNode(dataNodeNames, false);
final String primaryNode = getNode(dataNodeNames, true);

SegmentReplicationTargetService targetService = internalCluster().getInstance(SegmentReplicationTargetService.class, replicaNode);
ensureGreen(INDEX_NAME);
blockNodeOnAnySegmentFile(REPOSITORY_NAME, replicaNode);
final IndexShard indexShard = SegmentReplicationBaseIT.getIndexShard(replicaNode, INDEX_NAME);
indexSingleDoc();
refresh(INDEX_NAME);
waitForBlock(replicaNode, REPOSITORY_NAME, TimeValue.timeValueSeconds(10));
final SegmentReplicationState state = targetService.getOngoingEventSegmentReplicationState(indexShard.shardId());
assertEquals(SegmentReplicationState.Stage.GET_FILES, state.getStage());
ReplicationCollection.ReplicationRef<SegmentReplicationTarget> segmentReplicationTargetReplicationRef = targetService.get(
state.getReplicationId()
);
final SegmentReplicationTarget segmentReplicationTarget = segmentReplicationTargetReplicationRef.get();
// close the target ref here otherwise it will hold a refcount
segmentReplicationTargetReplicationRef.close();
assertNotNull(segmentReplicationTarget);
assertTrue(segmentReplicationTarget.refCount() > 0);
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode));
assertBusy(() -> {
assertTrue(indexShard.routingEntry().primary());
assertNull(targetService.getOngoingEventSegmentReplicationState(indexShard.shardId()));
assertEquals("Target should be closed", 0, segmentReplicationTarget.refCount());
});
unblockNode(REPOSITORY_NAME, replicaNode);
cleanupRepo();
}

public void testCancelReplicationWhileFetchingMetadata() throws Exception {
Path location = randomRepoPath().toAbsolutePath();
setup(location, 0d, "metadata", Long.MAX_VALUE, 1);

final Set<String> dataNodeNames = internalCluster().getDataNodeNames();
final String replicaNode = getNode(dataNodeNames, false);
final String primaryNode = getNode(dataNodeNames, true);

SegmentReplicationTargetService targetService = internalCluster().getInstance(SegmentReplicationTargetService.class, replicaNode);
ensureGreen(INDEX_NAME);
blockNodeOnAnyFiles(REPOSITORY_NAME, replicaNode);
final IndexShard indexShard = SegmentReplicationBaseIT.getIndexShard(replicaNode, INDEX_NAME);
indexSingleDoc();
refresh(INDEX_NAME);
waitForBlock(replicaNode, REPOSITORY_NAME, TimeValue.timeValueSeconds(10));
final SegmentReplicationState state = targetService.getOngoingEventSegmentReplicationState(indexShard.shardId());
assertEquals(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO, state.getStage());
ReplicationCollection.ReplicationRef<SegmentReplicationTarget> segmentReplicationTargetReplicationRef = targetService.get(
state.getReplicationId()
);
final SegmentReplicationTarget segmentReplicationTarget = segmentReplicationTargetReplicationRef.get();
// close the target ref here otherwise it will hold a refcount
segmentReplicationTargetReplicationRef.close();
assertNotNull(segmentReplicationTarget);
assertTrue(segmentReplicationTarget.refCount() > 0);
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode));
assertBusy(() -> {
assertTrue(indexShard.routingEntry().primary());
assertNull(targetService.getOngoingEventSegmentReplicationState(indexShard.shardId()));
assertEquals("Target should be closed", 0, segmentReplicationTarget.refCount());
});
unblockNode(REPOSITORY_NAME, replicaNode);
cleanupRepo();
}

private String getNode(Set<String> dataNodeNames, boolean primary) {
assertEquals(2, dataNodeNames.size());
for (String name : dataNodeNames) {
final IndexShard indexShard = SegmentReplicationBaseIT.getIndexShard(name, INDEX_NAME);
if (indexShard.routingEntry().primary() == primary) {
return name;
}
}
return null;
}
}
Original file line number Diff line number Diff line change
@@ -331,6 +331,10 @@ public static void blockNodeOnAnyFiles(String repository, String nodeName) {
);
}

public static void blockNodeOnAnySegmentFile(String repository, String nodeName) {
((MockRepository) internalCluster().getInstance(RepositoriesService.class, nodeName).repository(repository)).blockOnSegmentFiles(true);
}

public static void blockDataNode(String repository, String nodeName) {
((MockRepository) internalCluster().getInstance(RepositoriesService.class, nodeName).repository(repository)).blockOnDataFiles(true);
}
Original file line number Diff line number Diff line change
@@ -139,6 +139,8 @@ public long getFailureCount() {

private volatile boolean blockOnDataFiles;

private volatile boolean blockOnSegmentFiles;

private volatile boolean blockOnDeleteIndexN;

/**
@@ -190,6 +192,7 @@ public MockRepository(
maximumNumberOfFailures = metadata.settings().getAsLong("max_failure_number", 100L);
blockOnAnyFiles = metadata.settings().getAsBoolean("block_on_control", false);
blockOnDataFiles = metadata.settings().getAsBoolean("block_on_data", false);
blockOnSegmentFiles = metadata.settings().getAsBoolean("block_on_segment", false);
blockAndFailOnWriteSnapFile = metadata.settings().getAsBoolean("block_on_snap", false);
randomPrefix = metadata.settings().get("random", "default");
waitAfterUnblock = metadata.settings().getAsLong("wait_after_unblock", 0L);
@@ -237,6 +240,7 @@ public synchronized void unblock() {
blocked = false;
// Clean blocking flags, so we wouldn't try to block again
blockOnDataFiles = false;
blockOnSegmentFiles = false;
blockOnAnyFiles = false;
blockAndFailOnWriteIndexFile = false;
blockOnWriteIndexFile = false;
@@ -259,6 +263,14 @@ public void setBlockOnAnyFiles(boolean blocked) {
blockOnAnyFiles = blocked;
}

public void blockOnSegmentFiles(boolean blocked) {
blockOnSegmentFiles = blocked;
}

public void setBlockOnSegmentFiles(boolean blocked) {
blockOnSegmentFiles = blocked;
}

public void setBlockAndFailOnWriteSnapFiles(boolean blocked) {
blockAndFailOnWriteSnapFile = blocked;
}
@@ -306,6 +318,7 @@ private synchronized boolean blockExecution() {
boolean wasBlocked = false;
try {
while (blockOnDataFiles
|| blockOnSegmentFiles
|| blockOnAnyFiles
|| blockAndFailOnWriteIndexFile
|| blockOnWriteIndexFile
@@ -407,6 +420,8 @@ private void maybeIOExceptionOrBlock(String blobName) throws IOException {
blockExecutionAndMaybeWait(blobName);
} else if (blobName.startsWith("snap-") && blockAndFailOnWriteSnapFile) {
blockExecutionAndFail(blobName);
} else if (blockOnSegmentFiles && blobName.contains(".si__")) {
blockExecutionAndMaybeWait(blobName);
}
}
}