-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Fix bug where ReplicationListeners would not complete on cancellation. (
#8478) * [Segment Replication] Fix bug where ReplicationListeners would not complete on target cancellation. This change updates cancellation with Segment Replication to ensure all listeners are resolved. It does this by requesting cancellation before shard closure instead of using ReplicationCollection's cancelForShard which immediately removes it from the replicationCollection. This would cause the underlying ReplicationListener to never get invoked on close. This change includes new tests using suite scope to catch for any open tasks. This caught other locations where this was possible: 1. On a replica during force sync if the shard was closed while resolving its listeners, it would never call back to the primary if an exception was caught in the onDone method. - Fixed by refactoring those paths to use a ChannelActionListener and always reply to primary. 2. On the primary during forceSync, the primary would not successfully cancel before shard close during a forceSync, Fixed by wrapping the synchronous recoveryTarget::forceSync call in cancellableThreads. Signed-off-by: Marc Handalian <[email protected]> PR cleanup. Signed-off-by: Marc Handalian <[email protected]> Update log message Signed-off-by: Marc Handalian <[email protected]> * PR feedback. Signed-off-by: Marc Handalian <[email protected]> * Update server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java Co-authored-by: Suraj Singh <[email protected]> Signed-off-by: Marc Handalian <[email protected]> * Add more tests. Signed-off-by: Marc Handalian <[email protected]> --------- Signed-off-by: Marc Handalian <[email protected]> Co-authored-by: Suraj Singh <[email protected]>
- Loading branch information
1 parent
542041f
commit 4ccbf9d
Showing
12 changed files
with
623 additions
and
255 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
88 changes: 88 additions & 0 deletions
88
...nternalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationSuiteIT.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.indices.replication; | ||
|
||
import org.junit.Before; | ||
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; | ||
import org.opensearch.cluster.metadata.IndexMetadata; | ||
import org.opensearch.common.settings.Settings; | ||
import org.opensearch.indices.replication.common.ReplicationType; | ||
import org.opensearch.test.OpenSearchIntegTestCase; | ||
|
||
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, minNumDataNodes = 2) | ||
public class SegmentReplicationSuiteIT extends SegmentReplicationBaseIT { | ||
|
||
@Before | ||
public void setup() { | ||
internalCluster().startClusterManagerOnlyNode(); | ||
createIndex(INDEX_NAME); | ||
} | ||
|
||
@Override | ||
public Settings indexSettings() { | ||
final Settings.Builder builder = Settings.builder() | ||
.put(super.indexSettings()) | ||
// reset shard & replica count to random values set by OpenSearchIntegTestCase. | ||
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards()) | ||
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas()) | ||
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT); | ||
|
||
// TODO: Randomly enable remote store on these tests. | ||
return builder.build(); | ||
} | ||
|
||
public void testBasicReplication() throws Exception { | ||
final int docCount = scaledRandomIntBetween(10, 200); | ||
for (int i = 0; i < docCount; i++) { | ||
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get(); | ||
} | ||
refresh(); | ||
ensureGreen(INDEX_NAME); | ||
verifyStoreContent(); | ||
} | ||
|
||
public void testDropRandomNodeDuringReplication() throws Exception { | ||
internalCluster().ensureAtLeastNumDataNodes(2); | ||
internalCluster().startClusterManagerOnlyNodes(1); | ||
|
||
final int docCount = scaledRandomIntBetween(10, 200); | ||
for (int i = 0; i < docCount; i++) { | ||
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get(); | ||
} | ||
refresh(); | ||
|
||
internalCluster().restartRandomDataNode(); | ||
|
||
ensureYellow(INDEX_NAME); | ||
client().prepareIndex(INDEX_NAME).setId(Integer.toString(docCount)).setSource("field", "value" + docCount).execute().get(); | ||
internalCluster().startDataOnlyNode(); | ||
client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).actionGet(); | ||
} | ||
|
||
public void testDeleteIndexWhileReplicating() throws Exception { | ||
internalCluster().startClusterManagerOnlyNode(); | ||
final int docCount = scaledRandomIntBetween(10, 200); | ||
for (int i = 0; i < docCount; i++) { | ||
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get(); | ||
} | ||
refresh(INDEX_NAME); | ||
client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).actionGet(); | ||
} | ||
|
||
public void testFullRestartDuringReplication() throws Exception { | ||
internalCluster().startNode(); | ||
final int docCount = scaledRandomIntBetween(10, 200); | ||
for (int i = 0; i < docCount; i++) { | ||
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get(); | ||
} | ||
refresh(INDEX_NAME); | ||
internalCluster().fullRestart(); | ||
ensureGreen(INDEX_NAME); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.