Skip to content

Commit

Permalink
Replace Set with List
Browse files Browse the repository at this point in the history
Signed-off-by: Rishab Nahata <[email protected]>
  • Loading branch information
imRishN committed Jul 9, 2024
1 parent 60630d8 commit 4cc0676
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public AllocateUnassignedDecision makeAllocationDecision(ShardRouting unassigned
* @param shardRoutings the shards to allocate
* @param allocation the allocation state container object
*/
public void allocateUnassignedBatch(Set<ShardRouting> shardRoutings, RoutingAllocation allocation) {
public void allocateUnassignedBatch(List<ShardRouting> shardRoutings, RoutingAllocation allocation) {
HashMap<ShardId, AllocateUnassignedDecision> ineligibleShardAllocationDecisions = new HashMap<>();
List<ShardRouting> eligibleShards = new ArrayList<>();
List<ShardRouting> inEligibleShards = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ public abstract class ReplicaShardBatchAllocator extends ReplicaShardAllocator {
* @param allocation the overall routing allocation
* @param shardBatches a list of shard batches to check for existing recoveries
*/
public void processExistingRecoveries(RoutingAllocation allocation, List<Set<ShardRouting>> shardBatches) {
public void processExistingRecoveries(RoutingAllocation allocation, List<List<ShardRouting>> shardBatches) {
List<Runnable> shardCancellationActions = new ArrayList<>();
// iterate through the batches, each batch needs to be processed together as fetch call should be made for shards from same batch
for (Set<ShardRouting> shardBatch : shardBatches) {
for (List<ShardRouting> shardBatch : shardBatches) {
List<ShardRouting> eligibleShards = new ArrayList<>();
List<ShardRouting> ineligibleShards = new ArrayList<>();
// iterate over shards to check for match for each of those
Expand Down Expand Up @@ -112,7 +112,7 @@ public AllocateUnassignedDecision makeAllocationDecision(ShardRouting unassigned
* @param shardRoutings the shards to allocate
* @param allocation the allocation state container object
*/
public void allocateUnassignedBatch(Set<ShardRouting> shardRoutings, RoutingAllocation allocation) {
public void allocateUnassignedBatch(List<ShardRouting> shardRoutings, RoutingAllocation allocation) {
List<ShardRouting> eligibleShards = new ArrayList<>();
List<ShardRouting> ineligibleShards = new ArrayList<>();
Map<ShardRouting, AllocateUnassignedDecision> ineligibleShardAllocationDecisions = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public void beforeAllocation(final RoutingAllocation allocation) {
@Override
public void afterPrimariesBeforeReplicas(RoutingAllocation allocation) {
assert replicaShardBatchAllocator != null;
List<Set<ShardRouting>> storedShardBatches = batchIdToStoreShardBatch.values()
List<List<ShardRouting>> storedShardBatches = batchIdToStoreShardBatch.values()
.stream()
.map(ShardsBatch::getBatchedShardRoutings)
.collect(Collectors.toList());
Expand Down Expand Up @@ -684,8 +684,8 @@ private void clearShardFromCache(ShardId shardId) {
asyncBatch.clearShard(shardId);
}

public Set<ShardRouting> getBatchedShardRoutings() {
return batchInfo.values().stream().map(ShardEntry::getShardRouting).collect(Collectors.toSet());
public List<ShardRouting> getBatchedShardRoutings() {
return batchInfo.values().stream().map(ShardEntry::getShardRouting).collect(Collectors.toList());
}

public Set<ShardId> getBatchedShards() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,15 @@ public void testCorrectnessOfBatch() {
.values()
.stream()
.map(ShardsBatchGatewayAllocator.ShardsBatch::getBatchedShardRoutings)
.flatMap(Set::stream)
.flatMap(List::stream)
.collect(Collectors.toSet());
primariesInAllBatches.forEach(shardRouting -> assertTrue(shardRouting.unassigned() && shardRouting.primary() == true));

Set<ShardRouting> replicasInAllBatches = testShardsBatchGatewayAllocator.getBatchIdToStoreShardBatch()
.values()
.stream()
.map(ShardsBatchGatewayAllocator.ShardsBatch::getBatchedShardRoutings)
.flatMap(Set::stream)
.flatMap(List::stream)
.collect(Collectors.toSet());

replicasInAllBatches.forEach(shardRouting -> assertTrue(shardRouting.unassigned() && shardRouting.primary() == false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ private void allocateAllUnassigned(final RoutingAllocation allocation) {

private void allocateAllUnassignedBatch(final RoutingAllocation allocation) {
final RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
Set<ShardRouting> shardsToBatch = new HashSet<>();
List<ShardRouting> shardsToBatch = new ArrayList<>();
while (iterator.hasNext()) {
shardsToBatch.add(iterator.next());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.opensearch.snapshots.SnapshotShardSizeInfo;
import org.junit.Before;

import java.sql.Array;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -97,7 +98,7 @@ public static void setUpShards(int numberOfShards) {

private void allocateAllUnassignedBatch(final RoutingAllocation allocation) {
final RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
Set<ShardRouting> shardToBatch = new HashSet<>();
List<ShardRouting> shardToBatch = new ArrayList<>();
while (iterator.hasNext()) {
shardToBatch.add(iterator.next());
}
Expand Down Expand Up @@ -275,7 +276,7 @@ public void testCancelRecoveryIfFoundCopyWithNoopRetentionLease() {
new StoreFileMetadata("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)
);
Collection<ShardRouting> replicaShards = allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED);
Set<ShardRouting> shardRoutingBatch = new HashSet<>(replicaShards);
List<ShardRouting> shardRoutingBatch = new ArrayList<>(replicaShards);
List<Set<ShardRouting>> shardBatchList = Collections.singletonList(
new HashSet<>(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING))
);
Expand Down Expand Up @@ -319,7 +320,7 @@ public void testNotCancellingRecoveryIfCurrentRecoveryHasRetentionLease() {
);
testBatchAllocator.processExistingRecoveries(
allocation,
Collections.singletonList(new HashSet<>(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING)))
Collections.singletonList(new ArrayList<>(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING)))
);
assertThat(allocation.routingNodesChanged(), equalTo(false));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(0));
Expand Down Expand Up @@ -348,7 +349,7 @@ public void testNotCancelIfPrimaryDoesNotHaveValidRetentionLease() {
);
testBatchAllocator.processExistingRecoveries(
allocation,
Collections.singletonList(new HashSet<>(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING)))
Collections.singletonList(new ArrayList<>(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING)))
);
assertThat(allocation.routingNodesChanged(), equalTo(false));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(0));
Expand Down Expand Up @@ -586,7 +587,7 @@ public void testNotCancellingRecoveryIfSyncedOnExistingRecovery() {
);
testBatchAllocator.processExistingRecoveries(
allocation,
Collections.singletonList(new HashSet<>(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING)))
Collections.singletonList(new ArrayList<>(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING)))
);
assertThat(allocation.routingNodesChanged(), equalTo(false));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(0));
Expand All @@ -598,7 +599,7 @@ public void testNotCancellingRecovery() {
.addData(node2, "MATCH", null, new StoreFileMetadata("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION));
testBatchAllocator.processExistingRecoveries(
allocation,
Collections.singletonList(new HashSet<>(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING)))
Collections.singletonList(new ArrayList<>(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING)))
);
assertThat(allocation.routingNodesChanged(), equalTo(false));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(0));
Expand Down Expand Up @@ -638,7 +639,7 @@ public void testDoNotCancelForBrokenNode() {
.addData(node3, randomSyncId(), null, new StoreFileMetadata("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION));
testBatchAllocator.processExistingRecoveries(
allocation,
Collections.singletonList(new HashSet<>(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING)))
Collections.singletonList(new ArrayList<>(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING)))
);
assertThat(allocation.routingNodesChanged(), equalTo(false));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED), empty());
Expand Down

0 comments on commit 4cc0676

Please sign in to comment.