diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamAction.java index 9e4cbb1082215..39d4170f6e712 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamAction.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamAction.java @@ -13,10 +13,14 @@ import org.elasticsearch.action.ActionType; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.util.FeatureFlag; import org.elasticsearch.features.NodeFeature; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.index.IndexVersions; import org.elasticsearch.xcontent.ConstructingObjectParser; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xcontent.ToXContent; @@ -39,10 +43,24 @@ public class ReindexDataStreamAction extends ActionType getOldIndexVersionPredicate(Metadata metadata) { + return index -> metadata.index(index).getCreationVersion().onOrBefore(MINIMUM_WRITEABLE_VERSION_AFTER_UPGRADE); + } + public enum Mode { UPGRADE } diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportAction.java index 95a078690a055..f011c429ce79c 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportAction.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportAction.java @@ -27,6 +27,7 @@ import org.elasticsearch.xpack.migrate.task.ReindexDataStreamTaskParams; import static org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.TASK_ID_PREFIX; +import static org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.getOldIndexVersionPredicate; /* * This transport action creates a new persistent task for reindexing the source data stream given in the request. On successful creation @@ -67,10 +68,7 @@ protected void doExecute(Task task, ReindexDataStreamRequest request, ActionList return; } int totalIndices = dataStream.getIndices().size(); - int totalIndicesToBeUpgraded = (int) dataStream.getIndices() - .stream() - .filter(index -> metadata.index(index).getCreationVersion().isLegacyIndexVersion()) - .count(); + int totalIndicesToBeUpgraded = (int) dataStream.getIndices().stream().filter(getOldIndexVersionPredicate(metadata)).count(); ReindexDataStreamTaskParams params = new ReindexDataStreamTaskParams( sourceDataStreamName, transportService.getThreadPool().absoluteTimeInMillis(), diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java index fc471cfa89f26..7ec5014b9edff 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java @@ -24,6 +24,8 @@ import java.util.List; import java.util.Map; +import static org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.getOldIndexVersionPredicate; + public class ReindexDataStreamPersistentTaskExecutor extends PersistentTasksExecutor { private static final TimeValue TASK_KEEP_ALIVE_TIME = TimeValue.timeValueDays(1); private final Client client; @@ -72,7 +74,7 @@ protected void nodeOperation(AllocatedPersistentTask task, ReindexDataStreamTask if (dataStreamInfos.size() == 1) { List indices = dataStreamInfos.getFirst().getDataStream().getIndices(); List indicesToBeReindexed = indices.stream() - .filter(index -> clusterService.state().getMetadata().index(index).getCreationVersion().isLegacyIndexVersion()) + .filter(getOldIndexVersionPredicate(clusterService.state().metadata())) .toList(); reindexDataStreamTask.setPendingIndicesCount(indicesToBeReindexed.size()); for (Index index : indicesToBeReindexed) {