Skip to content

Commit

Permalink
Fix AwarenessAttributeDecommissionIT.testConcurrentDecommissionAction
Browse files Browse the repository at this point in the history
The problem is that this test would decommission one of six nodes. The
tear down logic of the test would attempt to assert on the health of the
cluster by randomly selecting a node and requesting the cluster health.
If this random check happened to select the node that was
decommissioned, then the test would fail. The fix is to recommission
the node at the end of the test.

Also, the "recommission node and assert cluster health" logic was used
in multiple places and could be refactored out to a helper method.

Resolves opensearch-project#14290
Resolves opensearch-project#12197

Signed-off-by: Andrew Ross <[email protected]>
  • Loading branch information
andrross committed Jun 14, 2024
1 parent 16c8806 commit 77a9432
Showing 1 changed file with 28 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -539,18 +539,7 @@ private void assertNodesRemovedAfterZoneDecommission(boolean originalClusterMana
assertEquals(originalClusterManager, currentClusterManager);
}

// Will wait for all events to complete
client(activeNode).admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get();

// Recommissioning the zone back to gracefully succeed the test once above tests succeeds
DeleteDecommissionStateResponse deleteDecommissionStateResponse = client(currentClusterManager).execute(
DeleteDecommissionStateAction.INSTANCE,
new DeleteDecommissionStateRequest()
).get();
assertTrue(deleteDecommissionStateResponse.isAcknowledged());

// will wait for cluster to stabilise with a timeout of 2 min as by then all nodes should have joined the cluster
ensureStableCluster(15, TimeValue.timeValueMinutes(2));
deleteDecommissionStateAndWaitForStableCluster(currentClusterManager, 15);
}

public void testDecommissionFailedWhenDifferentAttributeAlreadyDecommissioned() throws Exception {
Expand Down Expand Up @@ -617,18 +606,7 @@ public void testDecommissionFailedWhenDifferentAttributeAlreadyDecommissioned()
)
);

// Will wait for all events to complete
client(node_in_c).admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get();

// Recommissioning the zone back to gracefully succeed the test once above tests succeeds
DeleteDecommissionStateResponse deleteDecommissionStateResponse = client(node_in_c).execute(
DeleteDecommissionStateAction.INSTANCE,
new DeleteDecommissionStateRequest()
).get();
assertTrue(deleteDecommissionStateResponse.isAcknowledged());

// will wait for cluster to stabilise with a timeout of 2 min as by then all nodes should have joined the cluster
ensureStableCluster(6, TimeValue.timeValueMinutes(2));
deleteDecommissionStateAndWaitForStableCluster(node_in_c, 6);
}

public void testDecommissionStatusUpdatePublishedToAllNodes() throws ExecutionException, InterruptedException {
Expand Down Expand Up @@ -748,20 +726,7 @@ public void testDecommissionStatusUpdatePublishedToAllNodes() throws ExecutionEx
);
logger.info("--> Verified the decommissioned node has in_progress state.");

// Will wait for all events to complete
client(activeNode).admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get();
logger.info("--> Got LANGUID event");
// Recommissioning the zone back to gracefully succeed the test once above tests succeeds
DeleteDecommissionStateResponse deleteDecommissionStateResponse = client(activeNode).execute(
DeleteDecommissionStateAction.INSTANCE,
new DeleteDecommissionStateRequest()
).get();
assertTrue(deleteDecommissionStateResponse.isAcknowledged());
logger.info("--> Deleting decommission done.");

// will wait for cluster to stabilise with a timeout of 2 min (findPeerInterval for decommissioned nodes)
// as by then all nodes should have joined the cluster
ensureStableCluster(6, TimeValue.timeValueSeconds(121));
deleteDecommissionStateAndWaitForStableCluster(activeNode, 6);
}

public void testDecommissionFailedWhenAttributeNotWeighedAway() throws Exception {
Expand Down Expand Up @@ -983,15 +948,7 @@ public void testDecommissionAcknowledgedIfWeightsNotSetForNonRoutingNode() throw
assertEquals(clusterState.nodes().getDataNodes().size(), 3);
assertEquals(clusterState.nodes().getClusterManagerNodes().size(), 2);

// Recommissioning the zone back to gracefully succeed the test once above tests succeeds
DeleteDecommissionStateResponse deleteDecommissionStateResponse = client(dataNodes.get(0)).execute(
DeleteDecommissionStateAction.INSTANCE,
new DeleteDecommissionStateRequest()
).get();
assertTrue(deleteDecommissionStateResponse.isAcknowledged());

// will wait for cluster to stabilise with a timeout of 2 min as by then all nodes should have joined the cluster
ensureStableCluster(6, TimeValue.timeValueMinutes(2));
deleteDecommissionStateAndWaitForStableCluster(dataNodes.get(0), 6);
}

public void testConcurrentDecommissionAction() throws Exception {
Expand Down Expand Up @@ -1019,7 +976,7 @@ public void testConcurrentDecommissionAction() throws Exception {
.build()
);
logger.info("--> start 3 data nodes on zones 'a' & 'b' & 'c'");
internalCluster().startNodes(
final String bZoneDataNode = internalCluster().startNodes(
Settings.builder()
.put(commonSettings)
.put("node.attr.zone", "a")
Expand All @@ -1035,7 +992,7 @@ public void testConcurrentDecommissionAction() throws Exception {
.put("node.attr.zone", "c")
.put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE))
.build()
);
).get(1);

ensureStableCluster(6);
ClusterHealthResponse health = client().admin()
Expand Down Expand Up @@ -1077,8 +1034,9 @@ public void testConcurrentDecommissionAction() throws Exception {
DecommissionRequest decommissionRequest = new DecommissionRequest(decommissionAttribute);
decommissionRequest.setNoDelay(true);
try {
DecommissionResponse decommissionResponse = client().execute(DecommissionAction.INSTANCE, decommissionRequest)
.get();
DecommissionResponse decommissionResponse =
client().execute(DecommissionAction.INSTANCE, decommissionRequest)
.get();
if (decommissionResponse.isAcknowledged()) {
numRequestAcknowledged.incrementAndGet();
} else {
Expand All @@ -1100,6 +1058,25 @@ public void testConcurrentDecommissionAction() throws Exception {
assertEquals(concurrentRuns, numRequestAcknowledged.get() + numRequestUnAcknowledged.get() + numRequestFailed.get());
assertEquals(concurrentRuns - 1, numRequestFailed.get());
assertEquals(1, numRequestAcknowledged.get() + numRequestUnAcknowledged.get());

deleteDecommissionStateAndWaitForStableCluster(bZoneDataNode, 6);
}

private void deleteDecommissionStateAndWaitForStableCluster(String activeNodeName, int expectedClusterSize)
throws ExecutionException, InterruptedException {
client(activeNodeName).admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get();

// Recommissioning the zone back to gracefully succeed the test once above tests succeeds
DeleteDecommissionStateResponse deleteDecommissionStateResponse = client(activeNodeName).execute(
DeleteDecommissionStateAction.INSTANCE,
new DeleteDecommissionStateRequest()
).get();
assertTrue(deleteDecommissionStateResponse.isAcknowledged());
logger.info("--> Deleting decommission done.");

// will wait for cluster to stabilise with a timeout of 2 min (findPeerInterval for decommissioned nodes)
// as by then all nodes should have joined the cluster
ensureStableCluster(expectedClusterSize, TimeValue.timeValueSeconds(121));
}

private static class WaitForFailedDecommissionState implements ClusterStateObserver.Listener {
Expand Down

0 comments on commit 77a9432

Please sign in to comment.