Skip to content

Commit

Permalink
Optmising AwarenessAllocationDecider for hashmap.get call
Browse files Browse the repository at this point in the history
Signed-off-by: RS146BIJAY <[email protected]>
  • Loading branch information
RS146BIJAY committed Jul 16, 2024
1 parent 29a3e2c commit 941b1b7
Show file tree
Hide file tree
Showing 5 changed files with 1,368 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,74 @@ public void testSimpleAwareness() throws Exception {
}, 10, TimeUnit.SECONDS);
}

public void testSimpleAwarenessWithZoneOptimised() throws Exception {
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.zone_optimised", true)
.build();

logger.info("--> starting 2 nodes on the same rack");
internalCluster().startNodes(2, Settings.builder().put(commonSettings).put("node.attr.zone", "a").build());

Settings settings = Settings.builder()
.put(AwarenessReplicaBalance.CLUSTER_ROUTING_ALLOCATION_AWARENESS_BALANCE_SETTING.getKey(), false)
.build();
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(settings);

createIndex("test1");
createIndex("test2");

NumShards test1 = getNumShards("test1");
NumShards test2 = getNumShards("test2");
// no replicas will be allocated as both indices end up on a single node
final int totalPrimaries = test1.numPrimaries + test2.numPrimaries;

ensureGreen();

final List<String> indicesToClose = randomSubsetOf(Arrays.asList("test1", "test2"));
indicesToClose.forEach(indexToClose -> assertAcked(client().admin().indices().prepareClose(indexToClose).get()));

logger.info("--> starting 1 node on a different rack");
final String node3 = internalCluster().startNode(Settings.builder().put(commonSettings).put("node.attr.zone", "b").build());

// On slow machines the initial relocation might be delayed
assertBusy(() -> {
logger.info("--> waiting for no relocation");
ClusterHealthResponse clusterHealth = client().admin()
.cluster()
.prepareHealth()
.setIndices("test1", "test2")
.setWaitForEvents(Priority.LANGUID)
.setWaitForGreenStatus()
.setWaitForNodes("3")
.setWaitForNoRelocatingShards(true)
.get();

assertThat("Cluster health request timed out", clusterHealth.isTimedOut(), equalTo(false));

logger.info("--> checking current state");
ClusterState clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();

// check that closed indices are effectively closed
final List<String> notClosedIndices = indicesToClose.stream()
.filter(index -> clusterState.metadata().index(index).getState() != State.CLOSE)
.collect(Collectors.toList());
assertThat("Some indices not closed", notClosedIndices, empty());

// verify that we have all the primaries on node3
final Map<String, Integer> counts = new HashMap<>();
for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) {
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
for (ShardRouting shardRouting : indexShardRoutingTable) {
counts.merge(clusterState.nodes().get(shardRouting.currentNodeId()).getName(), 1, Integer::sum);
}
}
}
assertThat(counts.get(node3), equalTo(totalPrimaries));
}, 10, TimeUnit.SECONDS);
}

public void testAwarenessZones() {
Settings commonSettings = Settings.builder()
.put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.getKey() + "zone.values", "a,b")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ public static boolean isDedicatedSearchNode(Settings settings) {
private final Map<String, String> attributes;
private final Version version;
private final SortedSet<DiscoveryNodeRole> roles;
private final String zoneValue;
private final boolean hasZoneAttribute;

/**
* Creates a new {@link DiscoveryNode}
Expand Down Expand Up @@ -268,6 +270,8 @@ public DiscoveryNode(
this.version = version;
}
this.attributes = Collections.unmodifiableMap(attributes);
this.zoneValue = this.attributes.get("zone");
this.hasZoneAttribute = attributes.containsKey("zone");
// verify that no node roles are being provided as attributes
Predicate<Map<String, String>> predicate = (attrs) -> {
boolean success = true;
Expand Down Expand Up @@ -329,6 +333,9 @@ public DiscoveryNode(StreamInput in) throws IOException {
for (int i = 0; i < size; i++) {
this.attributes.put(in.readString(), in.readString());
}

this.zoneValue = this.attributes.get("zone");
this.hasZoneAttribute = attributes.containsKey("zone");
int rolesSize = in.readVInt();
final Set<DiscoveryNodeRole> roles = new HashSet<>(rolesSize);
for (int i = 0; i < rolesSize; i++) {
Expand Down Expand Up @@ -458,6 +465,14 @@ public boolean isRemoteClusterClient() {
return roles.contains(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE);
}

public String getZoneValue() {
return zoneValue;
}

public boolean hasZoneAttribute() {
return hasZoneAttribute;
}

/**
* Returns whether the node is dedicated to provide search capability.
*
Expand Down
Loading

0 comments on commit 941b1b7

Please sign in to comment.