Skip to content

Commit

Permalink
Modify according to transport PRs.
Browse files Browse the repository at this point in the history
Signed-off-by: Aman Khare <[email protected]>
  • Loading branch information
Aman Khare committed Dec 13, 2023
1 parent 1a68940 commit dc7eb43
Showing 1 changed file with 23 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,17 @@
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway;

import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.UnassignedInfo.AllocationStatus;
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.opensearch.cluster.routing.allocation.NodeAllocationResult;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.gateway.AsyncShardFetch.FetchResult;
import org.opensearch.gateway.TransportNodesListGatewayStartedBatchShards.NodeGatewayStartedShards;
import org.opensearch.gateway.TransportNodesListGatewayStartedBatchShards.NodeGatewayStartedShard;
import org.opensearch.gateway.TransportNodesListGatewayStartedBatchShards.NodeGatewayStartedShardsBatch;

import java.util.ArrayList;
Expand Down Expand Up @@ -64,12 +62,11 @@ protected FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedS

@Override
public AllocateUnassignedDecision makeAllocationDecision(ShardRouting unassignedShard, RoutingAllocation allocation, Logger logger) {

return makeAllocationDecision(new HashSet<>(Collections.singletonList(unassignedShard)), allocation, logger).get(unassignedShard);
}

/**
* Build allocation decisions for all the shards given to this allocator.
* Build allocation decisions for all the shards present in the batch identified by batchId.
*
* @param shards set of shards given for allocation
* @param allocation current allocation of all the shards
Expand All @@ -83,7 +80,6 @@ public HashMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(
Logger logger
) {
HashMap<ShardRouting, AllocateUnassignedDecision> shardAllocationDecisions = new HashMap<>();
final boolean explain = allocation.debugDecision();
Set<ShardRouting> eligibleShards = new HashSet<>();
Set<ShardRouting> inEligibleShards = new HashSet<>();
// identify ineligible shards
Expand All @@ -101,62 +97,50 @@ public HashMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(
return shardAllocationDecisions;
}
// only fetch data for eligible shards
final FetchResult<NodeGatewayStartedShardsBatch> shardsState = fetchData(
eligibleShards,
inEligibleShards,
allocation
);
// Note : shardsState contain the Data, there key is DiscoveryNode but value is Map<ShardId,
// NodeGatewayStartedShardsBatch> so to get one shard level data (from all the nodes), we'll traverse the map
// and construct the nodeShardState along the way before making any allocation decision. As metadata for a
// particular shard is needed from all the discovery nodes.
final FetchResult<NodeGatewayStartedShardsBatch> shardsState = fetchData(eligibleShards, inEligibleShards, allocation);

// process the received data
for (ShardRouting unassignedShard : eligibleShards) {
if (shardsState.hasData() == false) {
// if fetching is not done, add that no decision in the resultant map
allocation.setHasPendingAsyncFetch();
List<NodeAllocationResult> nodeDecisions = null;
if (explain) {
nodeDecisions = buildDecisionsForAllNodes(unassignedShard, allocation);
}
shardAllocationDecisions.put(
unassignedShard,
AllocateUnassignedDecision.no(AllocationStatus.FETCHING_SHARD_DATA, nodeDecisions)
);
} else {

List<NodeShardState> nodeShardStates = adaptToNodeShardStates(unassignedShard, shardsState);
// get allocation decision for this shard
shardAllocationDecisions.put(unassignedShard, getAllocationDecision(unassignedShard, allocation, nodeShardStates, logger));
}
List<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> nodeShardStates = adaptToNodeShardStates(
unassignedShard,
shardsState
);
// get allocation decision for this shard
shardAllocationDecisions.put(unassignedShard, getAllocationDecision(unassignedShard, allocation, nodeShardStates, logger));
}
return shardAllocationDecisions;
}

/**
* shardsState contain the Data, there key is DiscoveryNode but value is Map<ShardId,
* NodeGatewayStartedShardsBatch> so to get one shard level data (from all the nodes), we'll traverse the map
* Transforms {@link FetchResult} of {@link NodeGatewayStartedShardsBatch} to {@link List} of {@link TransportNodesListGatewayStartedShards.NodeGatewayStartedShards}.
* <p>
* Returns null if {@link FetchResult} does not have any data.
* <p>
* shardsState contain the Data, there key is DiscoveryNode but value is Map of ShardId
* and NodeGatewayStartedShardsBatch so to get one shard level data (from all the nodes), we'll traverse the map
* and construct the nodeShardState along the way before making any allocation decision. As metadata for a
* particular shard is needed from all the discovery nodes.
*
* @param unassignedShard unassigned shard
* @param shardsState fetch data result for the whole batch
* @return shard state returned from each node
*/
private static List<NodeShardState> adaptToNodeShardStates(
private static List<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> adaptToNodeShardStates(
ShardRouting unassignedShard,
FetchResult<NodeGatewayStartedShardsBatch> shardsState
) {
List<NodeShardState> nodeShardStates = new ArrayList<>();
if (!shardsState.hasData()) {
return null;
}
List<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> nodeShardStates = new ArrayList<>();
Map<DiscoveryNode, NodeGatewayStartedShardsBatch> nodeResponses = shardsState.getData();

// build data for a shard from all the nodes
nodeResponses.forEach((node, nodeGatewayStartedShardsBatch) -> {
NodeGatewayStartedShards shardData = nodeGatewayStartedShardsBatch.getNodeGatewayStartedShardsBatch()
NodeGatewayStartedShard shardData = nodeGatewayStartedShardsBatch.getNodeGatewayStartedShardsBatch()
.get(unassignedShard.shardId());
nodeShardStates.add(
new NodeShardState(
new TransportNodesListGatewayStartedShards.NodeGatewayStartedShards(
node,
shardData.allocationId(),
shardData.primary(),
Expand Down

0 comments on commit dc7eb43

Please sign in to comment.