Skip to content

Commit

Permalink
Initial commit for scale to zero
Browse files Browse the repository at this point in the history
Signed-off-by: Prudhvi Godithi <[email protected]>
  • Loading branch information
prudhvigodithi committed Nov 20, 2024
1 parent 9f790ee commit 9f095fc
Show file tree
Hide file tree
Showing 16 changed files with 641 additions and 89 deletions.
11 changes: 11 additions & 0 deletions gradle/run.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,17 @@ testClusters {
plugin('plugins:'.concat(p))
}
}
setting 'opensearch.experimental.feature.read.write.split.enabled', 'true'
setting 'path.repo', '["/tmp/my-repo"]'
setting 'node.attr.remote_store', 'true'
setting 'cluster.remote_store.state.enabled', 'true'
setting 'node.attr.remote_store.segment.repository', 'my-repository'
setting 'node.attr.remote_store.translog.repository', 'my-repository'
setting 'node.attr.remote_store.repository.my-repository.type', 'fs'
setting 'node.attr.remote_store.state.repository', 'my-repository'
setting 'node.attr.remote_store.repository.my-repository.settings.location', '/tmp/my-repo'


}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,17 @@
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.cluster.AckedClusterStateUpdateTask;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ack.ClusterStateUpdateResponse;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.block.ClusterBlockLevel;
import org.opensearch.cluster.block.ClusterBlocks;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.MetadataUpdateSettingsService;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Priority;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
Expand Down Expand Up @@ -170,6 +173,42 @@ protected void clusterManagerOperation(
updateSettingsService.updateSettings(clusterStateUpdateRequest, new ActionListener<ClusterStateUpdateResponse>() {
@Override
public void onResponse(ClusterStateUpdateResponse response) {
if (response.isAcknowledged()
&& request.settings().hasValue(IndexMetadata.SETTING_REMOVE_INDEXING_SHARDS)
&& request.settings().getAsBoolean(IndexMetadata.SETTING_REMOVE_INDEXING_SHARDS, false)) {
UpdateSettingsClusterStateUpdateRequest updateRequest = new UpdateSettingsClusterStateUpdateRequest().indices(
concreteIndices
).ackTimeout(request.timeout()).masterNodeTimeout(request.clusterManagerNodeTimeout());

clusterService.submitStateUpdateTask(
"update-routing-table-after-settings",
new AckedClusterStateUpdateTask<>(Priority.URGENT, updateRequest, new ActionListener<ClusterStateUpdateResponse>() {
@Override
public void onResponse(ClusterStateUpdateResponse response) {
listener.onResponse(new AcknowledgedResponse(response.isAcknowledged()));
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
}) {

@Override
protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
return new ClusterStateUpdateResponse(acknowledged);
}

@Override
public ClusterState execute(ClusterState currentState) {
return updateSettingsService.updateRoutingTableForRemoveIndexShards(
Arrays.stream(concreteIndices).map(Index::getName).toArray(String[]::new),
currentState
);
}
}
);
}
listener.onResponse(new AcknowledgedResponse(response.isAcknowledged()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.opensearch.core.xcontent.XContentParser;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -148,39 +149,96 @@ public final class ClusterIndexHealth implements Iterable<ClusterShardHealth>, W
private final ClusterHealthStatus status;
private final Map<Integer, ClusterShardHealth> shards;


/**
* A constructor for Index-level only
*/
public ClusterIndexHealth(final IndexMetadata indexMetadata, final IndexRoutingTable indexRoutingTable) {
this.index = indexMetadata.getIndex().getName();
this.numberOfShards = indexMetadata.getNumberOfShards();
this.numberOfReplicas = indexMetadata.getNumberOfReplicas();

shards = new HashMap<>();
for (IndexShardRoutingTable shardRoutingTable : indexRoutingTable) {
int shardId = shardRoutingTable.shardId().id();
shards.put(shardId, new ClusterShardHealth(shardId, shardRoutingTable));
}
boolean removeIndexingShards = indexMetadata.getSettings().getAsBoolean("index.remove_indexing_shards", false);

// update the index status
ClusterHealthStatus computeStatus = ClusterHealthStatus.GREEN;
int computeActivePrimaryShards = 0;
int computeActiveShards = 0;
int computeRelocatingShards = 0;
int computeInitializingShards = 0;
int computeUnassignedShards = 0;
int computeDelayedUnassignedShards = 0;
for (ClusterShardHealth shardHealth : shards.values()) {
if (shardHealth.isPrimaryActive()) {
computeActivePrimaryShards++;

if (removeIndexingShards) {
this.shards = Collections.emptyMap();
// For scaled down indices with remove_indexing_shards=true, track both search replicas and unassigned shards
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
if (indexShardRoutingTable == null) {
continue;
}

List<ShardRouting> shards = indexShardRoutingTable.shards();

// Count active search-only shards
int activeSearchShards = (int) shards.stream()
.filter(shard -> shard != null && shard.isSearchOnly() && shard.active())
.count();
computeActiveShards += activeSearchShards;

// Count other shard states directly without creating ClusterShardHealth
for (ShardRouting shard : shards) {
if (shard != null) {
if (shard.relocating()) {
computeRelocatingShards++;
} else if (shard.initializing()) {
computeInitializingShards++;
} else if (shard.unassigned()) {
computeUnassignedShards++;
if (shard.unassignedInfo().isDelayed()) {
computeDelayedUnassignedShards++;
}
}
}
}
}
computeActiveShards += shardHealth.getActiveShards();
computeRelocatingShards += shardHealth.getRelocatingShards();
computeInitializingShards += shardHealth.getInitializingShards();
computeUnassignedShards += shardHealth.getUnassignedShards();
computeDelayedUnassignedShards += shardHealth.getDelayedUnassignedShards();
// Health is GREEN if any search replicas are active
computeStatus = computeActiveShards > 0 ? ClusterHealthStatus.GREEN : ClusterHealthStatus.RED;
} else {
// Regular health calculation for normal indices
this.shards = new HashMap<>();
for (IndexShardRoutingTable shardRoutingTable : indexRoutingTable) {
if (shardRoutingTable == null) {
computeStatus = ClusterHealthStatus.RED;
continue;
}

computeStatus = getIndexHealthStatus(shardHealth.getStatus(), computeStatus);
}
if (shards.isEmpty()) { // might be since none has been created yet (two phase index creation)
computeStatus = ClusterHealthStatus.RED;
int shardId = shardRoutingTable.shardId().id();
ShardRouting primaryShard = shardRoutingTable.primaryShard();

// Handle case where primary shard is null
if (primaryShard == null) {
computeUnassignedShards++;
computeStatus = ClusterHealthStatus.RED;
continue;
}

ClusterShardHealth shardHealth = new ClusterShardHealth(shardId, shardRoutingTable);
shards.put(shardId, shardHealth);

if (shardHealth.isPrimaryActive()) {
computeActivePrimaryShards++;
}
computeActiveShards += shardHealth.getActiveShards();
computeRelocatingShards += shardHealth.getRelocatingShards();
computeInitializingShards += shardHealth.getInitializingShards();
computeUnassignedShards += shardHealth.getUnassignedShards();
computeDelayedUnassignedShards += shardHealth.getDelayedUnassignedShards();

computeStatus = getIndexHealthStatus(shardHealth.getStatus(), computeStatus);
}

if (this.shards.isEmpty()) {
computeStatus = ClusterHealthStatus.RED;
}
}

this.status = computeStatus;
Expand All @@ -192,6 +250,9 @@ public ClusterIndexHealth(final IndexMetadata indexMetadata, final IndexRoutingT
this.delayedUnassignedShards = computeDelayedUnassignedShards;
}

/**
* Supports both index-level and shard-level depending on ClusterHealthRequest.Level
*/
public ClusterIndexHealth(
final IndexMetadata indexMetadata,
final IndexRoutingTable indexRoutingTable,
Expand All @@ -201,9 +262,7 @@ public ClusterIndexHealth(
this.numberOfShards = indexMetadata.getNumberOfShards();
this.numberOfReplicas = indexMetadata.getNumberOfReplicas();

shards = new HashMap<>();

// update the index status
boolean removeIndexingShards = indexMetadata.getSettings().getAsBoolean(IndexMetadata.SETTING_REMOVE_INDEXING_SHARDS, false);
ClusterHealthStatus computeStatus = ClusterHealthStatus.GREEN;
int computeActivePrimaryShards = 0;
int computeActiveShards = 0;
Expand All @@ -212,61 +271,116 @@ public ClusterIndexHealth(
int computeUnassignedShards = 0;
int computeDelayedUnassignedShards = 0;

boolean isShardLevelHealthRequired = healthLevel == ClusterHealthRequest.Level.SHARDS;
if (isShardLevelHealthRequired) {
if (removeIndexingShards) {
this.shards = Collections.emptyMap();
// For scaled down indices with remove_indexing_shards=true, track both search replicas and unassigned shards
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
int shardId = indexShardRoutingTable.shardId().id();
ClusterShardHealth shardHealth = new ClusterShardHealth(shardId, indexShardRoutingTable);
if (shardHealth.isPrimaryActive()) {
computeActivePrimaryShards++;
if (indexShardRoutingTable == null) {
continue;
}
computeActiveShards += shardHealth.getActiveShards();
computeRelocatingShards += shardHealth.getRelocatingShards();
computeInitializingShards += shardHealth.getInitializingShards();
computeUnassignedShards += shardHealth.getUnassignedShards();
computeDelayedUnassignedShards += shardHealth.getDelayedUnassignedShards();
computeStatus = getIndexHealthStatus(shardHealth.getStatus(), computeStatus);
shards.put(shardId, shardHealth);
}
} else {
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
int activeShardsPerShardId = 0;

List<ShardRouting> shardRoutings = indexShardRoutingTable.shards();
int shardRoutingCountPerShardId = shardRoutings.size();
for (int index = 0; index < shardRoutingCountPerShardId; index++) {
ShardRouting shardRouting = shardRoutings.get(index);
if (shardRouting.active()) {
computeActiveShards++;
activeShardsPerShardId++;
if (shardRouting.relocating()) {

List<ShardRouting> shards = indexShardRoutingTable.shards();

// Count active search-only shards
int activeSearchShards = (int) shards.stream()
.filter(shard -> shard != null && shard.isSearchOnly() && shard.active())
.count();
computeActiveShards += activeSearchShards;

// Count other shard states directly without creating ClusterShardHealth
for (ShardRouting shard : shards) {
if (shard != null) {
if (shard.relocating()) {
computeRelocatingShards++;
}
} else if (shardRouting.initializing()) {
computeInitializingShards++;
} else if (shardRouting.unassigned()) {
computeUnassignedShards++;
if (shardRouting.unassignedInfo().isDelayed()) {
computeDelayedUnassignedShards++;
} else if (shard.initializing()) {
computeInitializingShards++;
} else if (shard.unassigned()) {
computeUnassignedShards++;
if (shard.unassignedInfo().isDelayed()) {
computeDelayedUnassignedShards++;
}
}
}
}
ShardRouting primaryShard = indexShardRoutingTable.primaryShard();
if (primaryShard.active()) {
computeActivePrimaryShards++;
}
// Health is GREEN if any search replicas are active
computeStatus = computeActiveShards > 0 ? ClusterHealthStatus.GREEN : ClusterHealthStatus.RED;
} else {
this.shards = new HashMap<>();
boolean isShardLevelHealthRequired = healthLevel == ClusterHealthRequest.Level.SHARDS;

if (isShardLevelHealthRequired) {
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
if (indexShardRoutingTable == null) {
computeStatus = ClusterHealthStatus.RED;
continue;
}

int shardId = indexShardRoutingTable.shardId().id();
ClusterShardHealth shardHealth = new ClusterShardHealth(shardId, indexShardRoutingTable);
if (shardHealth.isPrimaryActive()) {
computeActivePrimaryShards++;
}
computeActiveShards += shardHealth.getActiveShards();
computeRelocatingShards += shardHealth.getRelocatingShards();
computeInitializingShards += shardHealth.getInitializingShards();
computeUnassignedShards += shardHealth.getUnassignedShards();
computeDelayedUnassignedShards += shardHealth.getDelayedUnassignedShards();
computeStatus = getIndexHealthStatus(shardHealth.getStatus(), computeStatus);
shards.put(shardId, shardHealth);
}
} else {
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
if (indexShardRoutingTable == null) {
computeStatus = ClusterHealthStatus.RED;
continue;
}

int activeShardsPerShardId = 0;
List<ShardRouting> shardRoutings = indexShardRoutingTable.shards();
int shardRoutingCountPerShardId = shardRoutings.size();

for (ShardRouting shardRouting : shardRoutings) {
if (shardRouting.active()) {
computeActiveShards++;
activeShardsPerShardId++;
if (shardRouting.relocating()) {
computeRelocatingShards++;
}
} else if (shardRouting.initializing()) {
computeInitializingShards++;
} else if (shardRouting.unassigned()) {
computeUnassignedShards++;
if (shardRouting.unassignedInfo().isDelayed()) {
computeDelayedUnassignedShards++;
}
}
}

ShardRouting primaryShard = indexShardRoutingTable.primaryShard();
if (primaryShard == null) {
computeStatus = ClusterHealthStatus.RED;
continue;
}

if (primaryShard.active()) {
computeActivePrimaryShards++;
}

ClusterHealthStatus shardHealth = ClusterShardHealth.getShardHealth(
primaryShard,
activeShardsPerShardId,
shardRoutingCountPerShardId
);
computeStatus = getIndexHealthStatus(shardHealth, computeStatus);
}
ClusterHealthStatus shardHealth = ClusterShardHealth.getShardHealth(
primaryShard,
activeShardsPerShardId,
shardRoutingCountPerShardId
);
computeStatus = getIndexHealthStatus(shardHealth, computeStatus);
}
}

if (indexRoutingTable.shards() != null && indexRoutingTable.shards().isEmpty()) {
// might be since none has been created yet (two phase index creation)
computeStatus = ClusterHealthStatus.RED;
// Handle empty shard case
if (indexRoutingTable.shards() == null || indexRoutingTable.shards().isEmpty()) {
computeStatus = removeIndexingShards && computeActiveShards > 0 ?
ClusterHealthStatus.GREEN : ClusterHealthStatus.RED;
}

this.status = computeStatus;
Expand All @@ -276,9 +390,9 @@ public ClusterIndexHealth(
this.initializingShards = computeInitializingShards;
this.unassignedShards = computeUnassignedShards;
this.delayedUnassignedShards = computeDelayedUnassignedShards;

}


public static ClusterHealthStatus getIndexHealthStatus(ClusterHealthStatus shardHealth, ClusterHealthStatus computeStatus) {
switch (shardHealth) {
case RED:
Expand Down
Loading

0 comments on commit 9f095fc

Please sign in to comment.