Skip to content

Commit

Permalink
Fix for hasInitiatedFetching() in batch mode
Browse files Browse the repository at this point in the history
Signed-off-by: Rahul Karajgikar <[email protected]>
  • Loading branch information
Rahul Karajgikar committed Jul 25, 2024
1 parent 8ae728c commit f565b94
Show file tree
Hide file tree
Showing 3 changed files with 230 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -925,6 +925,202 @@ public void testMultipleReplicaShardAssignmentWithDelayedAllocationAndDifferentN
ensureGreen("test");
}

public void testAllocationExplainReturnsNoWhenExtraReplicaShardInNonBatchMode() throws Exception {
// Non batch mode - This test is to validate that we don't return AWAITING_INFO in allocation explain API when the deciders are
// returning NO
internalCluster().startClusterManagerOnlyNodes(
1,
Settings.builder().put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), false).build()
);
internalCluster().startDataOnlyNodes(5);
createIndex(
"test",
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 4).build()
);
ensureGreen("test");
ensureStableCluster(6);

// Stop one of the nodes to make the cluster yellow
// We cannot directly create an index with replica = data node count because then the whole flow will get skipped due to
// INDEX_CREATED
List<String> nodesWithReplicaShards = findNodesWithShard(false);
Settings replicaNodeDataPathSettings = internalCluster().dataPathSettings(nodesWithReplicaShards.get(0));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodesWithReplicaShards.get(0)));

ensureStableCluster(5);
ensureYellow("test");

logger.info("--> calling allocation explain API");
// shard should have decision NO because there is no valid node for the extra replica to go to
assertEquals(
AllocationDecision.NO,
client().admin()
.cluster()
.prepareAllocationExplain()
.setIndex("test")
.setShard(0)
.setPrimary(false)
.get()
.getExplanation()
.getShardAllocationDecision()
.getAllocateDecision()
.getAllocationDecision()
);

// Now creating a new index with too many replicas and trying again
createIndex(
"test2",
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 5).build()
);

ensureYellowAndNoInitializingShards("test2");

logger.info("--> calling allocation explain API again");
// shard should have decision NO because there are 6 replicas and 4 data nodes
assertEquals(
AllocationDecision.NO,
client().admin()
.cluster()
.prepareAllocationExplain()
.setIndex("test2")
.setShard(0)
.setPrimary(false)
.get()
.getExplanation()
.getShardAllocationDecision()
.getAllocateDecision()
.getAllocationDecision()
);

logger.info("--> restarting the stopped node");
internalCluster().startDataOnlyNode(
Settings.builder().put("node.name", nodesWithReplicaShards.get(0)).put(replicaNodeDataPathSettings).build()
);

ensureStableCluster(6);
ensureGreen("test");

logger.info("--> calling allocation explain API 3rd time");
// shard should still have decision NO because there are 6 replicas and 5 data nodes
assertEquals(
AllocationDecision.NO,
client().admin()
.cluster()
.prepareAllocationExplain()
.setIndex("test2")
.setShard(0)
.setPrimary(false)
.get()
.getExplanation()
.getShardAllocationDecision()
.getAllocateDecision()
.getAllocationDecision()
);

internalCluster().startDataOnlyNodes(1);

ensureStableCluster(7);
ensureGreen("test2");
}

public void testAllocationExplainReturnsNoWhenExtraReplicaShardInBatchMode() throws Exception {
// Batch mode - This test is to validate that we don't return AWAITING_INFO in allocation explain API when the deciders are
// returning NO
internalCluster().startClusterManagerOnlyNodes(
1,
Settings.builder().put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), true).build()
);
internalCluster().startDataOnlyNodes(5);
createIndex(
"test",
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 4).build()
);
ensureGreen("test");
ensureStableCluster(6);

// Stop one of the nodes to make the cluster yellow
// We cannot directly create an index with replica = data node count because then the whole flow will get skipped due to
// INDEX_CREATED
List<String> nodesWithReplicaShards = findNodesWithShard(false);
Settings replicaNodeDataPathSettings = internalCluster().dataPathSettings(nodesWithReplicaShards.get(0));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodesWithReplicaShards.get(0)));

ensureStableCluster(5);
ensureYellow("test");

logger.info("--> calling allocation explain API");
// shard should have decision NO because there is no valid node for the extra replica to go to
assertEquals(
AllocationDecision.NO,
client().admin()
.cluster()
.prepareAllocationExplain()
.setIndex("test")
.setShard(0)
.setPrimary(false)
.get()
.getExplanation()
.getShardAllocationDecision()
.getAllocateDecision()
.getAllocationDecision()
);

// Now creating a new index with too many replicas and trying again
createIndex(
"test2",
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 5).build()
);

ensureYellowAndNoInitializingShards("test2");

logger.info("--> calling allocation explain API again");
// shard should have decision NO because there are 6 replicas and 4 data nodes
assertEquals(
AllocationDecision.NO,
client().admin()
.cluster()
.prepareAllocationExplain()
.setIndex("test2")
.setShard(0)
.setPrimary(false)
.get()
.getExplanation()
.getShardAllocationDecision()
.getAllocateDecision()
.getAllocationDecision()
);

logger.info("--> restarting the stopped node");
internalCluster().startDataOnlyNode(
Settings.builder().put("node.name", nodesWithReplicaShards.get(0)).put(replicaNodeDataPathSettings).build()
);

ensureStableCluster(6);
ensureGreen("test");

logger.info("--> calling allocation explain API 3rd time");
// shard should still have decision NO because there are 6 replicas and 5 data nodes
assertEquals(
AllocationDecision.NO,
client().admin()
.cluster()
.prepareAllocationExplain()
.setIndex("test2")
.setShard(0)
.setPrimary(false)
.get()
.getExplanation()
.getShardAllocationDecision()
.getAllocateDecision()
.getAllocationDecision()
);

internalCluster().startDataOnlyNodes(1);

ensureStableCluster(7);
ensureGreen("test2");
}

public void testNBatchesCreationAndAssignment() throws Exception {
// we will reduce batch size to 5 to make sure we have enough batches to test assignment
// Total number of primary shards = 50 (50 indices*1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,14 @@ public synchronized void clearShard(ShardId shardId) {
this.cache.deleteShard(shardId);
}

public boolean hasEmptyCache() {
return this.cache.getCache().isEmpty();
}

public AsyncShardFetchCache<T> getCache() {
return this.cache;
}

/**
* Cache implementation of transport actions returning batch of shards related data in the response.
* Store node level responses of transport actions like {@link TransportNodesListGatewayStartedShardsBatch} or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -516,8 +516,33 @@ protected AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetadataBatch.

@Override
protected boolean hasInitiatedFetching(ShardRouting shard) {
/**
* This function is to check if asyncFetch has happened before for this shard batch, or is ongoing.
* It should return false if there has never been a fetch for this batch.
* This function is currently only used in the case of replica shards when all deciders returned NO/THROTTLE, and explain mode is ON.
* Allocation explain and manual reroute APIs try to append shard store information (matching bytes) to the allocation decision.
* However, these APIs do not want to trigger a new asyncFetch for these ineligible shards, unless the data from nodes is already there.
* This function is used to see if a fetch has happened to decide if it is possible to append shard store info without a new async fetch.
*
* In order to check if a fetch has ever happened, we check 2 things:
* 1. If the shard batch cache is empty, we know that fetch has never happened so we return false.
* 2. If we see that the list of nodes to fetch from is empty, we know that all nodes have data or are ongoing a fetch. So we return true.
* 3. Otherwise we return false.
*
* see {@link AsyncShardFetchCache#findNodesToFetch()}
*/
String batchId = getBatchId(shard, shard.primary());
return batchId != null;
logger.trace("Checking if fetching done for batch id {}", batchId);
ShardsBatch shardsBatch = shard.primary() ? batchIdToStartedShardBatch.get(batchId) : batchIdToStoreShardBatch.get(batchId);

// if fetchData has never been called, the per node cache will be empty and have no nodes
// this is because cache.fillShardCacheWithDataNodes(nodes) initialises this map and is called in AsyncShardFetch.fetchData
if (shardsBatch.getAsyncFetcher().hasEmptyCache()) {
logger.trace("Batch cache is empty for batch {} ", batchId);
return false;
}
// this check below is to make sure we already have all the data and that we wouldn't create a new async fetchData call
return shardsBatch.getAsyncFetcher().getCache().findNodesToFetch().isEmpty();
}
}

Expand Down

0 comments on commit f565b94

Please sign in to comment.