Skip to content

Commit

Permalink
Initial commit for scale to zero
Browse files Browse the repository at this point in the history
Signed-off-by: Prudhvi Godithi <[email protected]>
  • Loading branch information
prudhvigodithi committed Nov 19, 2024
1 parent 9f790ee commit ee34050
Show file tree
Hide file tree
Showing 15 changed files with 612 additions and 55 deletions.
11 changes: 11 additions & 0 deletions gradle/run.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,17 @@ testClusters {
plugin('plugins:'.concat(p))
}
}
setting 'opensearch.experimental.feature.read.write.split.enabled', 'true'
setting 'path.repo', '["/tmp/my-repo"]'
setting 'node.attr.remote_store', 'true'
setting 'cluster.remote_store.state.enabled', 'true'
setting 'node.attr.remote_store.segment.repository', 'my-repository'
setting 'node.attr.remote_store.translog.repository', 'my-repository'
setting 'node.attr.remote_store.repository.my-repository.type', 'fs'
setting 'node.attr.remote_store.state.repository', 'my-repository'
setting 'node.attr.remote_store.repository.my-repository.settings.location', '/tmp/my-repo'


}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,17 @@
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.cluster.AckedClusterStateUpdateTask;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ack.ClusterStateUpdateResponse;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.block.ClusterBlockLevel;
import org.opensearch.cluster.block.ClusterBlocks;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.MetadataUpdateSettingsService;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Priority;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
Expand Down Expand Up @@ -170,6 +173,42 @@ protected void clusterManagerOperation(
updateSettingsService.updateSettings(clusterStateUpdateRequest, new ActionListener<ClusterStateUpdateResponse>() {
@Override
public void onResponse(ClusterStateUpdateResponse response) {
if (response.isAcknowledged()
&& request.settings().hasValue(IndexMetadata.SETTING_REMOVE_INDEXING_SHARDS)
&& request.settings().getAsBoolean(IndexMetadata.SETTING_REMOVE_INDEXING_SHARDS, false)) {
UpdateSettingsClusterStateUpdateRequest updateRequest = new UpdateSettingsClusterStateUpdateRequest().indices(
concreteIndices
).ackTimeout(request.timeout()).masterNodeTimeout(request.clusterManagerNodeTimeout());

clusterService.submitStateUpdateTask(
"update-routing-table-after-settings",
new AckedClusterStateUpdateTask<>(Priority.URGENT, updateRequest, new ActionListener<ClusterStateUpdateResponse>() {
@Override
public void onResponse(ClusterStateUpdateResponse response) {
listener.onResponse(new AcknowledgedResponse(response.isAcknowledged()));
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
}) {

@Override
protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
return new ClusterStateUpdateResponse(acknowledged);
}

@Override
public ClusterState execute(ClusterState currentState) {
return updateSettingsService.updateRoutingTableForRemoveIndexShards(
Arrays.stream(concreteIndices).map(Index::getName).toArray(String[]::new),
currentState
);
}
}
);
}
listener.onResponse(new AcknowledgedResponse(response.isAcknowledged()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public final class ClusterIndexHealth implements Iterable<ClusterShardHealth>, W
private static final String INITIALIZING_SHARDS = "initializing_shards";
private static final String UNASSIGNED_SHARDS = "unassigned_shards";
private static final String SHARDS = "shards";
private static final String INDEXING_SHARDS_REMOVED = "indexing_shards_removed";

private static final ConstructingObjectParser<ClusterIndexHealth, String> PARSER = new ConstructingObjectParser<>(
"cluster_index_health",
Expand All @@ -91,6 +92,7 @@ public final class ClusterIndexHealth implements Iterable<ClusterShardHealth>, W
int unassignedShards = (int) parsedObjects[i++];
int activePrimaryShards = (int) parsedObjects[i++];
String statusStr = (String) parsedObjects[i++];
boolean indexingShardsRemoved = (boolean) parsedObjects[i++];
ClusterHealthStatus status = ClusterHealthStatus.fromString(statusStr);
@SuppressWarnings("unchecked")
List<ClusterShardHealth> shardList = (List<ClusterShardHealth>) parsedObjects[i];
Expand All @@ -113,6 +115,7 @@ public final class ClusterIndexHealth implements Iterable<ClusterShardHealth>, W
unassignedShards,
activePrimaryShards,
status,
indexingShardsRemoved,
shards
);
}
Expand All @@ -132,6 +135,7 @@ public final class ClusterIndexHealth implements Iterable<ClusterShardHealth>, W
PARSER.declareInt(constructorArg(), new ParseField(UNASSIGNED_SHARDS));
PARSER.declareInt(constructorArg(), new ParseField(ACTIVE_PRIMARY_SHARDS));
PARSER.declareString(constructorArg(), new ParseField(STATUS));
PARSER.declareBoolean(constructorArg(), new ParseField(INDEXING_SHARDS_REMOVED));
// Can be absent if LEVEL == 'indices' or 'cluster'
PARSER.declareNamedObjects(optionalConstructorArg(), SHARD_PARSER, new ParseField(SHARDS));
}
Expand All @@ -147,17 +151,14 @@ public final class ClusterIndexHealth implements Iterable<ClusterShardHealth>, W
private final int activePrimaryShards;
private final ClusterHealthStatus status;
private final Map<Integer, ClusterShardHealth> shards;
private final boolean indexingShardsRemoved;

public ClusterIndexHealth(final IndexMetadata indexMetadata, final IndexRoutingTable indexRoutingTable) {
this.index = indexMetadata.getIndex().getName();
this.numberOfShards = indexMetadata.getNumberOfShards();
this.numberOfReplicas = indexMetadata.getNumberOfReplicas();

shards = new HashMap<>();
for (IndexShardRoutingTable shardRoutingTable : indexRoutingTable) {
int shardId = shardRoutingTable.shardId().id();
shards.put(shardId, new ClusterShardHealth(shardId, shardRoutingTable));
}

// update the index status
ClusterHealthStatus computeStatus = ClusterHealthStatus.GREEN;
Expand All @@ -167,7 +168,11 @@ public ClusterIndexHealth(final IndexMetadata indexMetadata, final IndexRoutingT
int computeInitializingShards = 0;
int computeUnassignedShards = 0;
int computeDelayedUnassignedShards = 0;
for (ClusterShardHealth shardHealth : shards.values()) {
boolean computeIndexingShardsRemoved = false;

for (IndexShardRoutingTable shardRoutingTable : indexRoutingTable) {
int shardId = shardRoutingTable.shardId().id();
ClusterShardHealth shardHealth = new ClusterShardHealth(shardId, shardRoutingTable);
if (shardHealth.isPrimaryActive()) {
computeActivePrimaryShards++;
}
Expand All @@ -176,10 +181,18 @@ public ClusterIndexHealth(final IndexMetadata indexMetadata, final IndexRoutingT
computeInitializingShards += shardHealth.getInitializingShards();
computeUnassignedShards += shardHealth.getUnassignedShards();
computeDelayedUnassignedShards += shardHealth.getDelayedUnassignedShards();

computeStatus = getIndexHealthStatus(shardHealth.getStatus(), computeStatus);

// Check if any shard has indexing removed
if (shardHealth.isIndexingShardsRemoved()) {
computeIndexingShardsRemoved = true;
}

shards.put(shardId, shardHealth);
}
if (shards.isEmpty()) { // might be since none has been created yet (two phase index creation)

if (indexRoutingTable.shards() == null || indexRoutingTable.shards().isEmpty()) {
// might be since none has been created yet (two phase index creation)
computeStatus = ClusterHealthStatus.RED;
}

Expand All @@ -190,6 +203,7 @@ public ClusterIndexHealth(final IndexMetadata indexMetadata, final IndexRoutingT
this.initializingShards = computeInitializingShards;
this.unassignedShards = computeUnassignedShards;
this.delayedUnassignedShards = computeDelayedUnassignedShards;
this.indexingShardsRemoved = computeIndexingShardsRemoved;
}

public ClusterIndexHealth(
Expand All @@ -211,6 +225,7 @@ public ClusterIndexHealth(
int computeInitializingShards = 0;
int computeUnassignedShards = 0;
int computeDelayedUnassignedShards = 0;
boolean computeIndexingShardsRemoved = false;

boolean isShardLevelHealthRequired = healthLevel == ClusterHealthRequest.Level.SHARDS;
if (isShardLevelHealthRequired) {
Expand All @@ -226,6 +241,9 @@ public ClusterIndexHealth(
computeUnassignedShards += shardHealth.getUnassignedShards();
computeDelayedUnassignedShards += shardHealth.getDelayedUnassignedShards();
computeStatus = getIndexHealthStatus(shardHealth.getStatus(), computeStatus);
if (shardHealth.isIndexingShardsRemoved()) {
computeIndexingShardsRemoved = true;
}
shards.put(shardId, shardHealth);
}
} else {
Expand All @@ -252,9 +270,15 @@ public ClusterIndexHealth(
}
}
ShardRouting primaryShard = indexShardRoutingTable.primaryShard();
if (primaryShard.active()) {
if (primaryShard != null && primaryShard.active()) {
computeActivePrimaryShards++;
}

// Check if primary is unassigned but has active replicas
if (primaryShard == null || (primaryShard.unassigned() && hasActiveReplicas(indexShardRoutingTable))) {
computeIndexingShardsRemoved = true;
}

ClusterHealthStatus shardHealth = ClusterShardHealth.getShardHealth(
primaryShard,
activeShardsPerShardId,
Expand All @@ -276,20 +300,27 @@ public ClusterIndexHealth(
this.initializingShards = computeInitializingShards;
this.unassignedShards = computeUnassignedShards;
this.delayedUnassignedShards = computeDelayedUnassignedShards;
this.indexingShardsRemoved = computeIndexingShardsRemoved;
}

private boolean hasActiveReplicas(IndexShardRoutingTable shardRoutingTable) {
return shardRoutingTable.shards().stream().anyMatch(shard -> !shard.primary() && shard.active());
}

public static ClusterHealthStatus getIndexHealthStatus(ClusterHealthStatus shardHealth, ClusterHealthStatus computeStatus) {
switch (shardHealth) {
case RED:
// Only go RED if we're not already GREEN or YELLOW
if (computeStatus == ClusterHealthStatus.GREEN || computeStatus == ClusterHealthStatus.YELLOW) {
return computeStatus;
}
return ClusterHealthStatus.RED;
case YELLOW:
// do not override an existing red
if (computeStatus != ClusterHealthStatus.RED) {
return ClusterHealthStatus.YELLOW;
} else {
return ClusterHealthStatus.RED;
// Don't override GREEN or RED
if (computeStatus == ClusterHealthStatus.GREEN || computeStatus == ClusterHealthStatus.RED) {
return computeStatus;
}
return ClusterHealthStatus.YELLOW;
default:
return computeStatus;
}
Expand All @@ -305,6 +336,7 @@ public ClusterIndexHealth(final StreamInput in) throws IOException {
initializingShards = in.readVInt();
unassignedShards = in.readVInt();
status = ClusterHealthStatus.fromValue(in.readByte());
indexingShardsRemoved = in.readBoolean();

int size = in.readVInt();
shards = new HashMap<>(size);
Expand All @@ -327,6 +359,7 @@ public ClusterIndexHealth(final StreamInput in) throws IOException {
int unassignedShards,
int activePrimaryShards,
ClusterHealthStatus status,
boolean indexingShardsRemoved,
Map<Integer, ClusterShardHealth> shards
) {
this.index = index;
Expand All @@ -337,6 +370,7 @@ public ClusterIndexHealth(final StreamInput in) throws IOException {
this.initializingShards = initializingShards;
this.unassignedShards = unassignedShards;
this.activePrimaryShards = activePrimaryShards;
this.indexingShardsRemoved = indexingShardsRemoved;
this.status = status;
this.shards = shards;
}
Expand Down Expand Up @@ -401,6 +435,7 @@ public void writeTo(final StreamOutput out) throws IOException {
out.writeVInt(initializingShards);
out.writeVInt(unassignedShards);
out.writeByte(status.value());
out.writeBoolean(indexingShardsRemoved);
out.writeCollection(shards.values());
}

Expand All @@ -416,6 +451,10 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa
builder.field(INITIALIZING_SHARDS, getInitializingShards());
builder.field(UNASSIGNED_SHARDS, getUnassignedShards());

if (indexingShardsRemoved) {
builder.field(INDEXING_SHARDS_REMOVED, true);
}

if ("shards".equals(params.param("level", "indices"))) {
builder.startObject(SHARDS);
for (ClusterShardHealth shardHealth : shards.values()) {
Expand Down Expand Up @@ -481,6 +520,7 @@ public boolean equals(Object o) {
&& initializingShards == that.initializingShards
&& unassignedShards == that.unassignedShards
&& activePrimaryShards == that.activePrimaryShards
&& indexingShardsRemoved == that.indexingShardsRemoved
&& status == that.status
&& Objects.equals(shards, that.shards);
}
Expand All @@ -497,7 +537,8 @@ public int hashCode() {
unassignedShards,
activePrimaryShards,
status,
shards
shards,
indexingShardsRemoved
);
}
}
Loading

0 comments on commit ee34050

Please sign in to comment.