Skip to content

Commit

Permalink
Throw exception for single shard calls
Browse files Browse the repository at this point in the history
Signed-off-by: Aman Khare <[email protected]>
  • Loading branch information
Aman Khare committed Dec 4, 2023
1 parent 6c38303 commit 1a68940
Showing 1 changed file with 31 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,18 @@

import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.UnassignedInfo.AllocationStatus;
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.opensearch.cluster.routing.allocation.NodeAllocationResult;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.gateway.AsyncShardFetch.FetchResult;
import org.opensearch.gateway.TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch;
import org.opensearch.gateway.TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShards;
import org.opensearch.gateway.TransportNodesListGatewayStartedBatchShards.NodeGatewayStartedShards;
import org.opensearch.gateway.TransportNodesListGatewayStartedBatchShards.NodeGatewayStartedShardsBatch;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -56,7 +58,8 @@ protected FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedS
ShardRouting shard,
RoutingAllocation allocation
) {
return null;
logger.error("fetchData for single shard called via batch allocator");
throw new IllegalStateException("PrimaryShardBatchAllocator should only be used for a batch of shards");
}

@Override
Expand All @@ -66,7 +69,7 @@ public AllocateUnassignedDecision makeAllocationDecision(ShardRouting unassigned
}

/**
* Build allocation decisions for all the shards given to this allocator..
* Build allocation decisions for all the shards given to this allocator.
*
* @param shards set of shards given for allocation
* @param allocation current allocation of all the shards
Expand All @@ -81,31 +84,35 @@ public HashMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(
) {
HashMap<ShardRouting, AllocateUnassignedDecision> shardAllocationDecisions = new HashMap<>();
final boolean explain = allocation.debugDecision();
Set<ShardRouting> shardsEligibleForFetch = new HashSet<>();
Set<ShardRouting> shardsNotEligibleForFetch = new HashSet<>();
Set<ShardRouting> eligibleShards = new HashSet<>();
Set<ShardRouting> inEligibleShards = new HashSet<>();
// identify ineligible shards
for (ShardRouting shard : shards) {
AllocateUnassignedDecision decision = getInEligibleShardDecision(shard, allocation);
if (decision != null) {
shardsNotEligibleForFetch.add(shard);
inEligibleShards.add(shard);
shardAllocationDecisions.put(shard, decision);
} else {
shardsEligibleForFetch.add(shard);
eligibleShards.add(shard);
}
}
// Do not call fetchData if there are no eligible shards
if (shardsEligibleForFetch.size() == 0) {
if (eligibleShards.isEmpty()) {
return shardAllocationDecisions;
}
// only fetch data for eligible shards
final FetchResult<NodeGatewayStartedShardsBatch> shardsState = fetchData(
shardsEligibleForFetch,
shardsNotEligibleForFetch,
eligibleShards,
inEligibleShards,
allocation
);
// Note : shardsState contain the Data, there key is DiscoveryNode but value is Map<ShardId,
// NodeGatewayStartedShardsBatch> so to get one shard level data (from all the nodes), we'll traverse the map
// and construct the nodeShardState along the way before making any allocation decision. As metadata for a
// particular shard is needed from all the discovery nodes.

// process the received data
for (ShardRouting unassignedShard : shardsEligibleForFetch) {
for (ShardRouting unassignedShard : eligibleShards) {
if (shardsState.hasData() == false) {
// if fetching is not done, add that no decision in the resultant map
allocation.setHasPendingAsyncFetch();
Expand All @@ -119,7 +126,7 @@ public HashMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(
);
} else {

NodeShardStates nodeShardStates = adaptToNodeShardStates(unassignedShard, shardsState);
List<NodeShardState> nodeShardStates = adaptToNodeShardStates(unassignedShard, shardsState);
// get allocation decision for this shard
shardAllocationDecisions.put(unassignedShard, getAllocationDecision(unassignedShard, allocation, nodeShardStates, logger));
}
Expand All @@ -137,27 +144,26 @@ public HashMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(
* @param shardsState fetch data result for the whole batch
* @return shard state returned from each node
*/
private static NodeShardStates adaptToNodeShardStates(
private static List<NodeShardState> adaptToNodeShardStates(
ShardRouting unassignedShard,
FetchResult<NodeGatewayStartedShardsBatch> shardsState
) {
NodeShardStates nodeShardStates = new NodeShardStates();
List<NodeShardState> nodeShardStates = new ArrayList<>();
Map<DiscoveryNode, NodeGatewayStartedShardsBatch> nodeResponses = shardsState.getData();

// build data for a shard from all the nodes
nodeResponses.forEach((node, nodeGatewayStartedShardsBatch) -> {
NodeGatewayStartedShards shardData = nodeGatewayStartedShardsBatch.getNodeGatewayStartedShardsBatch()
.get(unassignedShard.shardId());
nodeShardStates.getNodeShardStates()
.add(
new NodeShardState(
node,
shardData.allocationId(),
shardData.primary(),
shardData.replicationCheckpoint(),
shardData.storeException()
)
);
nodeShardStates.add(
new NodeShardState(
node,
shardData.allocationId(),
shardData.primary(),
shardData.replicationCheckpoint(),
shardData.storeException()
)
);
});
return nodeShardStates;
}
Expand Down

0 comments on commit 1a68940

Please sign in to comment.