Skip to content

Commit

Permalink
Fix unassigned ml system shard replicas
Browse files Browse the repository at this point in the history
Signed-off-by: Sicheng Song <[email protected]>
  • Loading branch information
b4sjoo committed Sep 9, 2023
1 parent 104b1b7 commit e3e812f
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 16 deletions.
10 changes: 5 additions & 5 deletions common/src/main/java/org/opensearch/ml/common/CommonValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ public class CommonValue {
public static final String ML_MODEL_GROUP_INDEX = ".plugins-ml-model-group";
public static final String ML_MODEL_INDEX = ".plugins-ml-model";
public static final String ML_TASK_INDEX = ".plugins-ml-task";
public static final Integer ML_MODEL_GROUP_INDEX_SCHEMA_VERSION = 1;
public static final Integer ML_MODEL_INDEX_SCHEMA_VERSION = 6;
public static final Integer ML_MODEL_GROUP_INDEX_SCHEMA_VERSION = 2;
public static final Integer ML_MODEL_INDEX_SCHEMA_VERSION = 7;
public static final String ML_CONNECTOR_INDEX = ".plugins-ml-connector";
public static final Integer ML_TASK_INDEX_SCHEMA_VERSION = 1;
public static final Integer ML_CONNECTOR_SCHEMA_VERSION = 1;
public static final Integer ML_TASK_INDEX_SCHEMA_VERSION = 2;
public static final Integer ML_CONNECTOR_SCHEMA_VERSION = 2;
public static final String ML_CONFIG_INDEX = ".plugins-ml-config";
public static final Integer ML_CONFIG_INDEX_SCHEMA_VERSION = 1;
public static final Integer ML_CONFIG_INDEX_SCHEMA_VERSION = 2;
public static final String USER_FIELD_MAPPING = " \""
+ CommonValue.USER
+ "\": {\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
package org.opensearch.ml.indices;

import static org.opensearch.ml.common.CommonValue.META;
import static org.opensearch.ml.common.CommonValue.ML_MODEL_INDEX;
import static org.opensearch.ml.common.CommonValue.ML_TASK_INDEX;
import static org.opensearch.ml.common.CommonValue.SCHEMA_VERSION_FIELD;

import java.util.HashMap;
Expand All @@ -17,6 +15,7 @@
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.service.ClusterService;
Expand All @@ -41,8 +40,9 @@ public class MLIndicesHandler {

private static final Map<String, AtomicBoolean> indexMappingUpdated = new HashMap<>();
static {
indexMappingUpdated.put(ML_MODEL_INDEX, new AtomicBoolean(false));
indexMappingUpdated.put(ML_TASK_INDEX, new AtomicBoolean(false));
for (MLIndex mlIndex : MLIndex.values()) {
indexMappingUpdated.put(mlIndex.getIndexName(), new AtomicBoolean(false));
}
}

public void initModelGroupIndexIfAbsent(ActionListener<Boolean> listener) {
Expand All @@ -68,6 +68,7 @@ public void initMLConfigIndex(ActionListener<Boolean> listener) {
public void initMLIndexIfAbsent(MLIndex index, ActionListener<Boolean> listener) {
String indexName = index.getIndexName();
String mapping = index.getMapping();
final Map<String, Object> indexSettings = Map.of("index.auto_expand_replicas", "0-6");

try (ThreadContext.StoredContext threadContext = client.threadPool().getThreadContext().stashContext()) {
ActionListener<Boolean> internalListener = ActionListener.runBefore(listener, () -> threadContext.restore());
Expand All @@ -83,7 +84,7 @@ public void initMLIndexIfAbsent(MLIndex index, ActionListener<Boolean> listener)
log.error("Failed to create index " + indexName, e);
internalListener.onFailure(e);
});
CreateIndexRequest request = new CreateIndexRequest(indexName).mapping(mapping);
CreateIndexRequest request = new CreateIndexRequest(indexName).mapping(mapping).settings(indexSettings);
client.admin().indices().create(request, actionListener);
} else {
log.debug("index:{} is already created", indexName);
Expand All @@ -97,12 +98,19 @@ public void initMLIndexIfAbsent(MLIndex index, ActionListener<Boolean> listener)
.putMapping(
new PutMappingRequest().indices(indexName).source(mapping, XContentType.JSON),
ActionListener.wrap(response -> {
if (response.isAcknowledged()) {
indexMappingUpdated.get(indexName).set(true);
internalListener.onResponse(true);
} else {
internalListener.onFailure(new MLException("Failed to update index: " + indexName));
}
UpdateSettingsRequest updateSettingRequest = new UpdateSettingsRequest();
updateSettingRequest.indices(indexName).settings(indexSettings);
client
.admin()
.indices()
.updateSettings(updateSettingRequest, ActionListener.wrap(updateResponse -> {
if (response.isAcknowledged()) {
indexMappingUpdated.get(indexName).set(true);
internalListener.onResponse(true);
} else {
internalListener.onFailure(new MLException("Failed to update index: " + indexName));
}
}, internalListener::onFailure));
}, exception -> {
log.error("Failed to update index " + indexName, exception);
internalListener.onFailure(exception);
Expand Down

0 comments on commit e3e812f

Please sign in to comment.