diff --git a/docs/changelog/116687.yaml b/docs/changelog/116687.yaml new file mode 100644 index 0000000000000..f8c7f86eff04a --- /dev/null +++ b/docs/changelog/116687.yaml @@ -0,0 +1,5 @@ +pr: 116687 +summary: Add LogsDB option to route on sort fields +area: Logs +type: enhancement +issues: [] diff --git a/docs/changelog/118562.yaml b/docs/changelog/118562.yaml new file mode 100644 index 0000000000000..a6b00b326151f --- /dev/null +++ b/docs/changelog/118562.yaml @@ -0,0 +1,6 @@ +pr: 118562 +summary: Update data stream deprecations warnings to new format and filter searchable + snapshots from response +area: Data streams +type: enhancement +issues: [] diff --git a/muted-tests.yml b/muted-tests.yml index 8cfc7c082473f..f2294939b7aab 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -288,9 +288,6 @@ tests: - class: org.elasticsearch.cluster.service.MasterServiceTests method: testThreadContext issue: https://github.com/elastic/elasticsearch/issues/118914 -- class: org.elasticsearch.smoketest.SmokeTestMultiNodeClientYamlTestSuiteIT - method: test {yaml=indices.create/20_synthetic_source/create index with use_synthetic_source} - issue: https://github.com/elastic/elasticsearch/issues/118955 - class: org.elasticsearch.repositories.blobstore.testkit.analyze.SecureHdfsRepositoryAnalysisRestIT issue: https://github.com/elastic/elasticsearch/issues/118970 - class: org.elasticsearch.xpack.security.authc.AuthenticationServiceTests @@ -301,9 +298,16 @@ tests: issue: https://github.com/elastic/elasticsearch/issues/116777 - class: org.elasticsearch.xpack.security.authc.ldap.ActiveDirectoryRunAsIT issue: https://github.com/elastic/elasticsearch/issues/115727 -- class: org.elasticsearch.cluster.coordination.NodeJoinExecutorTests - method: testSuccess - issue: https://github.com/elastic/elasticsearch/issues/119052 +- class: org.elasticsearch.xpack.security.authc.kerberos.KerberosAuthenticationIT + issue: https://github.com/elastic/elasticsearch/issues/118414 +- class: org.elasticsearch.xpack.esql.qa.multi_node.EsqlClientYamlIT + issue: https://github.com/elastic/elasticsearch/issues/119086 +- class: org.elasticsearch.xpack.spatial.index.query.ShapeQueryBuilderOverShapeTests + method: testToQuery + issue: https://github.com/elastic/elasticsearch/issues/119090 +- class: org.elasticsearch.xpack.spatial.index.query.GeoShapeQueryBuilderGeoShapeTests + method: testToQuery + issue: https://github.com/elastic/elasticsearch/issues/119091 # Examples: # diff --git a/rest-api-spec/build.gradle b/rest-api-spec/build.gradle index f23b5460f7d53..fab47c5b05006 100644 --- a/rest-api-spec/build.gradle +++ b/rest-api-spec/build.gradle @@ -59,6 +59,15 @@ tasks.named("yamlRestCompatTestTransform").configure ({ task -> task.replaceValueInMatch("profile.shards.0.dfs.knn.0.query.0.description", "DocAndScoreQuery[0,...][0.009673266,...],0.009673266", "dfs knn vector profiling with vector_operations_count") task.skipTest("cat.aliases/10_basic/Deprecated local parameter", "CAT APIs not covered by compatibility policy") task.skipTest("cat.shards/10_basic/Help", "sync_id is removed in 9.0") + task.skipTest("tsdb/20_mapping/exact match object type", "skip until pr/116687 gets backported") + task.skipTest("tsdb/25_id_generation/delete over _bulk", "skip until pr/116687 gets backported") + task.skipTest("tsdb/80_index_resize/split", "skip until pr/116687 gets backported") + task.skipTest("tsdb/90_unsupported_operations/noop update", "skip until pr/116687 gets backported") + task.skipTest("tsdb/90_unsupported_operations/regular update", "skip until pr/116687 gets backported") + task.skipTest("tsdb/90_unsupported_operations/search with routing", "skip until pr/116687 gets backported") + task.skipTest("tsdb/90_unsupported_operations/index with routing over _bulk", "skip until pr/116687 gets backported") + task.skipTest("tsdb/90_unsupported_operations/update over _bulk", "skip until pr/116687 gets backported") + task.skipTest("tsdb/90_unsupported_operations/index with routing", "skip until pr/116687 gets backported") task.skipTest("search/500_date_range/from, to, include_lower, include_upper deprecated", "deprecated parameters are removed in 9.0") task.skipTest("tsdb/20_mapping/stored source is supported", "no longer serialize source_mode") task.skipTest("tsdb/20_mapping/Synthetic source", "no longer serialize source_mode") diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.create/20_synthetic_source.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.create/20_synthetic_source.yml index edb684168278b..5003f6df79a14 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.create/20_synthetic_source.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.create/20_synthetic_source.yml @@ -2036,14 +2036,12 @@ create index with use_synthetic_source: - is_true: test.settings.index.recovery.use_synthetic_source - do: - bulk: + index: index: test + id: 1 refresh: true - body: - - '{ "create": { } }' - - '{ "field": "aaaa" }' - - '{ "create": { } }' - - '{ "field": "bbbb" }' + body: { foo: bar } + - match: { _version: 1 } - do: indices.disk_usage: diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.recovery/20_synthetic_source.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.recovery/20_synthetic_source.yml new file mode 100644 index 0000000000000..493b834fc5a90 --- /dev/null +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.recovery/20_synthetic_source.yml @@ -0,0 +1,33 @@ +--- +test recovery empty index with use_synthetic_source: + - requires: + cluster_features: ["mapper.synthetic_recovery_source"] + reason: requires synthetic recovery source + + - do: + indices.create: + index: test + body: + settings: + index: + number_of_replicas: 0 + recovery: + use_synthetic_source: true + mapping: + source: + mode: synthetic + + - do: + indices.get_settings: {} + - match: { test.settings.index.mapping.source.mode: synthetic} + - is_true: test.settings.index.recovery.use_synthetic_source + + - do: + indices.put_settings: + index: test + body: + index.number_of_replicas: 1 + + - do: + cluster.health: + wait_for_events: languid diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/logsdb/10_settings.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/logsdb/10_settings.yml index 463df7d2ab1bb..5f4314f724c23 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/logsdb/10_settings.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/logsdb/10_settings.yml @@ -513,6 +513,48 @@ routing path not allowed in logs mode: - match: { error.type: "illegal_argument_exception" } - match: { error.reason: "[index.routing_path] requires [index.mode=time_series]" } +--- +routing path allowed in logs mode with routing on sort fields: + - requires: + cluster_features: [ "routing.logsb_route_on_sort_fields" ] + reason: introduction of route on index sorting fields + + - do: + indices.create: + index: test + body: + settings: + index: + mode: logsdb + number_of_replicas: 0 + number_of_shards: 2 + routing_path: [ host.name, agent_id ] + logsdb: + route_on_sort_fields: true + mappings: + properties: + "@timestamp": + type: date + host.name: + type: keyword + agent_id: + type: keyword + process_id: + type: integer + http_method: + type: keyword + message: + type: text + + - do: + indices.get_settings: + index: test + + - is_true: test + - match: { test.settings.index.mode: logsdb } + - match: { test.settings.index.logsdb.route_on_sort_fields: "true" } + - match: { test.settings.index.routing_path: [ host.name, agent_id ] } + --- start time not allowed in logs mode: - requires: diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/tsdb/20_mapping.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/tsdb/20_mapping.yml index 9fe3f5e0b7272..f25601fc2e228 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/tsdb/20_mapping.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/tsdb/20_mapping.yml @@ -127,7 +127,7 @@ exact match object type: reason: routing_path error message updated in 8.14.0 - do: - catch: '/All fields that match routing_path must be configured with \[time_series_dimension: true\] or flattened fields with a list of dimensions in \[time_series_dimensions\] and without the \[script\] parameter. \[dim\] was \[object\]./' + catch: '/All fields that match routing_path must be .*flattened fields.* \[dim\] was \[object\]./' indices.create: index: tsdb_index body: diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/tsdb/25_id_generation.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/tsdb/25_id_generation.yml index 4faa0424adb43..beba6f2752a11 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/tsdb/25_id_generation.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/tsdb/25_id_generation.yml @@ -427,7 +427,7 @@ delete over _bulk: - match: {items.0.delete.result: deleted} - match: {items.1.delete.result: deleted} - match: {items.2.delete.status: 404} - - match: {items.2.delete.error.reason: "invalid id [not found ++ not found] for index [id_generation_test] in time series mode"} + - match: {items.2.delete.error.reason: '/invalid\ id\ \[not\ found\ \+\+\ not\ found\]\ for\ index\ \[id_generation_test\]\ in\ time.series\ mode/'} --- routing_path matches deep object: diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/tsdb/80_index_resize.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/tsdb/80_index_resize.yml index c32d3c50b0784..c71555dd073d6 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/tsdb/80_index_resize.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/tsdb/80_index_resize.yml @@ -95,7 +95,7 @@ split: reason: tsdb indexing changed in 8.2.0 - do: - catch: /index-split is not supported because the destination index \[test\] is in time series mode/ + catch: /index-split is not supported because the destination index \[test\] is in time.series mode/ indices.split: index: test target: test_split diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/tsdb/90_unsupported_operations.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/tsdb/90_unsupported_operations.yml index 54b2bf59c8ddc..142d1281ad12b 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/tsdb/90_unsupported_operations.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/tsdb/90_unsupported_operations.yml @@ -75,7 +75,7 @@ index with routing: reason: tsdb indexing changed in 8.2.0 - do: - catch: /specifying routing is not supported because the destination index \[test\] is in time series mode/ + catch: /specifying routing is not supported because the destination index \[test\] is in time.series mode/ index: index: test routing: foo @@ -104,7 +104,7 @@ index with routing over _bulk: body: - '{"index": {"routing": "foo"}}' - '{"@timestamp": "2021-04-28T18:50:04.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}' - - match: {items.0.index.error.reason: "specifying routing is not supported because the destination index [test] is in time series mode"} + - match: {items.0.index.error.reason: '/specifying\ routing\ is\ not\ supported\ because\ the\ destination\ index\ \[test\]\ is\ in\ time.series\ mode/'} --- noop update: @@ -120,7 +120,7 @@ noop update: - length: {hits.hits: 1} - do: - catch: /update is not supported because the destination index \[test\] is in time series mode/ + catch: /update is not supported because the destination index \[test\] is in time.series mode/ update: index: test id: "1" @@ -136,7 +136,7 @@ regular update: # We fail even though the document isn't found. - do: - catch: /update is not supported because the destination index \[test\] is in time series mode/ + catch: /update is not supported because the destination index \[test\] is in time.series mode/ update: index: test id: "1" @@ -165,7 +165,7 @@ update over _bulk: body: - '{"update": {"_id": 1}}' - '{"doc":{"@timestamp": "2021-04-28T18:03:24.467Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434595272, "rx": 530605511}}}}}' - - match: {items.0.update.error.reason: "update is not supported because the destination index [test] is in time series mode"} + - match: {items.0.update.error.reason: '/update\ is\ not\ supported\ because\ the\ destination\ index\ \[test\]\ is\ in\ time.series\ mode/'} --- search with routing: @@ -175,7 +175,7 @@ search with routing: # We fail even though the document isn't found. - do: - catch: /searching with a specified routing is not supported because the destination index \[test\] is in time series mode/ + catch: /searching with a specified routing is not supported because the destination index \[test\] is in time.series mode/ search: index: test routing: rrrr diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java index d5b8b657bd14e..9f4231c25dfca 100644 --- a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -51,6 +51,7 @@ import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.OptionalInt; import java.util.function.Supplier; import static org.elasticsearch.action.ValidateActions.addValidationError; @@ -78,7 +79,6 @@ public class IndexRequest extends ReplicatedWriteRequest implement private static final TransportVersion PIPELINES_HAVE_RUN_FIELD_ADDED = TransportVersions.V_8_10_X; private static final Supplier ID_GENERATOR = UUIDs::base64UUID; - private static final Supplier K_SORTED_TIME_BASED_ID_GENERATOR = UUIDs::base64TimeBasedKOrderedUUID; /** * Max length of the source document to include into string() @@ -705,9 +705,18 @@ public void autoGenerateId() { } public void autoGenerateTimeBasedId() { + autoGenerateTimeBasedId(OptionalInt.empty()); + } + + /** + * Set the {@code #id()} to an automatically generated one, optimized for storage (compression) efficiency. + * If a routing hash is passed, it is included in the generated id starting at 9 bytes before the end. + * @param hash optional routing hash value, used to route requests by id to the right shard. + */ + public void autoGenerateTimeBasedId(OptionalInt hash) { assertBeforeGeneratingId(); autoGenerateTimestamp(); - id(K_SORTED_TIME_BASED_ID_GENERATOR.get()); + id(UUIDs.base64TimeBasedKOrderedUUIDWithHash(hash)); } private void autoGenerateTimestamp() { diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/IndexRouting.java b/server/src/main/java/org/elasticsearch/cluster/routing/IndexRouting.java index f42252df4ab7b..ecd1b735cc396 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/IndexRouting.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/IndexRouting.java @@ -40,6 +40,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.OptionalInt; import java.util.Set; import java.util.function.IntConsumer; import java.util.function.IntSupplier; @@ -55,6 +56,7 @@ public abstract class IndexRouting { static final NodeFeature BOOLEAN_ROUTING_PATH = new NodeFeature("routing.boolean_routing_path"); static final NodeFeature MULTI_VALUE_ROUTING_PATH = new NodeFeature("routing.multi_value_routing_path"); + static final NodeFeature LOGSB_ROUTE_ON_SORT_FIELDS = new NodeFeature("routing.logsb_route_on_sort_fields"); /** * Build the routing from {@link IndexMetadata}. @@ -165,7 +167,8 @@ private abstract static class IdAndRoutingOnly extends IndexRouting { @Override public void preProcess(IndexRequest indexRequest) { - // generate id if not already provided + // Generate id if not already provided. + // This is needed for routing, so it has to happen in pre-processing. final String id = indexRequest.id(); if (id == null) { if (shouldUseTimeBasedId(indexMode, creationVersion)) { @@ -272,7 +275,9 @@ public void collectSearchShards(String routing, IntConsumer consumer) { public static class ExtractFromSource extends IndexRouting { private final Predicate isRoutingPath; private final XContentParserConfiguration parserConfig; + private final IndexMode indexMode; private final boolean trackTimeSeriesRoutingHash; + private final boolean addIdWithRoutingHash; private int hash = Integer.MAX_VALUE; ExtractFromSource(IndexMetadata metadata) { @@ -280,7 +285,10 @@ public static class ExtractFromSource extends IndexRouting { if (metadata.isRoutingPartitionedIndex()) { throw new IllegalArgumentException("routing_partition_size is incompatible with routing_path"); } - trackTimeSeriesRoutingHash = metadata.getCreationVersion().onOrAfter(IndexVersions.TIME_SERIES_ROUTING_HASH_IN_ID); + indexMode = metadata.getIndexMode(); + trackTimeSeriesRoutingHash = indexMode == IndexMode.TIME_SERIES + && metadata.getCreationVersion().onOrAfter(IndexVersions.TIME_SERIES_ROUTING_HASH_IN_ID); + addIdWithRoutingHash = indexMode == IndexMode.LOGSDB; List routingPaths = metadata.getRoutingPaths(); isRoutingPath = Regex.simpleMatcher(routingPaths.toArray(String[]::new)); this.parserConfig = XContentParserConfiguration.EMPTY.withFiltering(null, Set.copyOf(routingPaths), null, true); @@ -292,8 +300,13 @@ public boolean matchesField(String fieldName) { @Override public void postProcess(IndexRequest indexRequest) { + // Update the request with the routing hash, if needed. + // This needs to happen in post-processing, after the routing hash is calculated. if (trackTimeSeriesRoutingHash) { indexRequest.routing(TimeSeriesRoutingHashFieldMapper.encode(hash)); + } else if (addIdWithRoutingHash) { + assert hash != Integer.MAX_VALUE; + indexRequest.autoGenerateTimeBasedId(OptionalInt.of(hash)); } } @@ -461,12 +474,15 @@ private int idToHash(String id) { try { idBytes = Base64.getUrlDecoder().decode(id); } catch (IllegalArgumentException e) { - throw new ResourceNotFoundException("invalid id [{}] for index [{}] in time series mode", id, indexName); + throw new ResourceNotFoundException("invalid id [{}] for index [{}] in " + indexMode.getName() + " mode", id, indexName); } if (idBytes.length < 4) { - throw new ResourceNotFoundException("invalid id [{}] for index [{}] in time series mode", id, indexName); + throw new ResourceNotFoundException("invalid id [{}] for index [{}] in " + indexMode.getName() + " mode", id, indexName); } - return hashToShardId(ByteUtils.readIntLE(idBytes, 0)); + // For TSDB, the hash is stored as the id prefix. + // For LogsDB with routing on sort fields, the routing hash is stored in the range[id.length - 9, id.length - 5] of the id, + // see IndexRequest#autoGenerateTimeBasedId. + return hashToShardId(ByteUtils.readIntLE(idBytes, addIdWithRoutingHash ? idBytes.length - 9 : 0)); } @Override @@ -480,7 +496,7 @@ public void collectSearchShards(String routing, IntConsumer consumer) { } private String error(String operation) { - return operation + " is not supported because the destination index [" + indexName + "] is in time series mode"; + return operation + " is not supported because the destination index [" + indexName + "] is in " + indexMode.getName() + " mode"; } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingFeatures.java b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingFeatures.java index f8028ce7f9d68..1545fdf90d111 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingFeatures.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingFeatures.java @@ -20,4 +20,9 @@ public class RoutingFeatures implements FeatureSpecification { public Set getFeatures() { return Set.of(IndexRouting.BOOLEAN_ROUTING_PATH, IndexRouting.MULTI_VALUE_ROUTING_PATH); } + + @Override + public Set getTestFeatures() { + return Set.of(IndexRouting.LOGSB_ROUTE_ON_SORT_FIELDS); + } } diff --git a/server/src/main/java/org/elasticsearch/common/TimeBasedKOrderedUUIDGenerator.java b/server/src/main/java/org/elasticsearch/common/TimeBasedKOrderedUUIDGenerator.java index 7ea58ee326a79..58ad3f5b47415 100644 --- a/server/src/main/java/org/elasticsearch/common/TimeBasedKOrderedUUIDGenerator.java +++ b/server/src/main/java/org/elasticsearch/common/TimeBasedKOrderedUUIDGenerator.java @@ -9,7 +9,10 @@ package org.elasticsearch.common; +import org.elasticsearch.common.util.ByteUtils; + import java.nio.ByteBuffer; +import java.util.OptionalInt; import java.util.function.Supplier; /** @@ -28,6 +31,7 @@ * The result is a compact base64-encoded string, optimized for efficient compression of the _id field in an inverted index. */ public class TimeBasedKOrderedUUIDGenerator extends TimeBasedUUIDGenerator { + static final int SIZE_IN_BYTES = 15; public TimeBasedKOrderedUUIDGenerator( final Supplier timestampSupplier, @@ -39,6 +43,10 @@ public TimeBasedKOrderedUUIDGenerator( @Override public String getBase64UUID() { + return getBase64UUID(OptionalInt.empty()); + } + + public String getBase64UUID(OptionalInt hash) { final int sequenceId = sequenceNumber.incrementAndGet() & 0x00FF_FFFF; // Calculate timestamp to ensure ordering and avoid backward movement in case of time shifts. @@ -50,7 +58,7 @@ public String getBase64UUID() { sequenceId == 0 ? (lastTimestamp, currentTimeMillis) -> Math.max(lastTimestamp, currentTimeMillis) + 1 : Math::max ); - final byte[] uuidBytes = new byte[15]; + final byte[] uuidBytes = new byte[SIZE_IN_BYTES + (hash.isPresent() ? 4 : 0)]; final ByteBuffer buffer = ByteBuffer.wrap(uuidBytes); buffer.put((byte) (timestamp >>> 40)); // changes every 35 years @@ -64,6 +72,13 @@ public String getBase64UUID() { assert macAddress.length == 6; buffer.put(macAddress, 0, macAddress.length); + // Copy the hash value if provided + if (hash.isPresent()) { + byte[] hashBytes = new byte[4]; + ByteUtils.writeIntLE(hash.getAsInt(), hashBytes, 0); + buffer.put(hashBytes, 0, hashBytes.length); + } + buffer.put((byte) (sequenceId >>> 16)); // From hereinafter everything is almost like random and does not compress well diff --git a/server/src/main/java/org/elasticsearch/common/UUIDs.java b/server/src/main/java/org/elasticsearch/common/UUIDs.java index ebcb375bc01bc..6b19fcddb87ca 100644 --- a/server/src/main/java/org/elasticsearch/common/UUIDs.java +++ b/server/src/main/java/org/elasticsearch/common/UUIDs.java @@ -11,6 +11,7 @@ import org.elasticsearch.common.settings.SecureString; +import java.util.OptionalInt; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; @@ -23,14 +24,14 @@ public class UUIDs { public static final Supplier DEFAULT_TIMESTAMP_SUPPLIER = System::currentTimeMillis; public static final Supplier DEFAULT_SEQUENCE_ID_SUPPLIER = sequenceNumber::incrementAndGet; public static final Supplier DEFAULT_MAC_ADDRESS_SUPPLIER = MacAddressProvider::getSecureMungedAddress; - private static final UUIDGenerator RANDOM_UUID_GENERATOR = new RandomBasedUUIDGenerator(); - private static final UUIDGenerator TIME_BASED_K_ORDERED_GENERATOR = new TimeBasedKOrderedUUIDGenerator( + private static final RandomBasedUUIDGenerator RANDOM_UUID_GENERATOR = new RandomBasedUUIDGenerator(); + private static final TimeBasedKOrderedUUIDGenerator TIME_BASED_K_ORDERED_GENERATOR = new TimeBasedKOrderedUUIDGenerator( DEFAULT_TIMESTAMP_SUPPLIER, DEFAULT_SEQUENCE_ID_SUPPLIER, DEFAULT_MAC_ADDRESS_SUPPLIER ); - private static final UUIDGenerator TIME_UUID_GENERATOR = new TimeBasedUUIDGenerator( + private static final TimeBasedUUIDGenerator TIME_UUID_GENERATOR = new TimeBasedUUIDGenerator( DEFAULT_TIMESTAMP_SUPPLIER, DEFAULT_SEQUENCE_ID_SUPPLIER, DEFAULT_MAC_ADDRESS_SUPPLIER @@ -51,12 +52,8 @@ public static String base64UUID() { return TIME_UUID_GENERATOR.getBase64UUID(); } - public static String base64TimeBasedKOrderedUUID() { - return TIME_BASED_K_ORDERED_GENERATOR.getBase64UUID(); - } - - public static String base64TimeBasedUUID() { - return TIME_UUID_GENERATOR.getBase64UUID(); + public static String base64TimeBasedKOrderedUUIDWithHash(OptionalInt hash) { + return TIME_BASED_K_ORDERED_GENERATOR.getBase64UUID(hash); } /** diff --git a/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java index fc8f128e92f32..09561661ccd52 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java @@ -182,6 +182,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexSettings.LIFECYCLE_ORIGINATION_DATE_SETTING, IndexSettings.LIFECYCLE_PARSE_ORIGINATION_DATE_SETTING, IndexSettings.TIME_SERIES_ES87TSDB_CODEC_ENABLED_SETTING, + IndexSettings.LOGSDB_ROUTE_ON_SORT_FIELDS, IndexSettings.PREFER_ILM_SETTING, DataStreamFailureStoreDefinition.FAILURE_STORE_DEFINITION_VERSION_SETTING, FieldMapper.SYNTHETIC_SOURCE_KEEP_INDEX_SETTING, diff --git a/server/src/main/java/org/elasticsearch/index/IndexMode.java b/server/src/main/java/org/elasticsearch/index/IndexMode.java index f5f923f3657f8..a138407991b68 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexMode.java +++ b/server/src/main/java/org/elasticsearch/index/IndexMode.java @@ -63,7 +63,8 @@ public enum IndexMode { STANDARD("standard") { @Override void validateWithOtherSettings(Map, Object> settings) { - IndexMode.validateTimeSeriesSettings(settings); + validateRoutingPathSettings(settings); + validateTimeSeriesSettings(settings); } @Override @@ -235,7 +236,11 @@ public SourceFieldMapper.Mode defaultSourceMode() { LOGSDB("logsdb") { @Override void validateWithOtherSettings(Map, Object> settings) { - IndexMode.validateTimeSeriesSettings(settings); + validateTimeSeriesSettings(settings); + var setting = settings.get(IndexSettings.LOGSDB_ROUTE_ON_SORT_FIELDS); + if (setting.equals(Boolean.FALSE)) { + validateRoutingPathSettings(settings); + } } @Override @@ -389,8 +394,11 @@ public SourceFieldMapper.Mode defaultSourceMode() { private static final String HOST_NAME = "host.name"; - private static void validateTimeSeriesSettings(Map, Object> settings) { + private static void validateRoutingPathSettings(Map, Object> settings) { settingRequiresTimeSeries(settings, IndexMetadata.INDEX_ROUTING_PATH); + } + + private static void validateTimeSeriesSettings(Map, Object> settings) { settingRequiresTimeSeries(settings, IndexSettings.TIME_SERIES_START_TIME); settingRequiresTimeSeries(settings, IndexSettings.TIME_SERIES_END_TIME); } @@ -450,6 +458,7 @@ private static CompressedXContent createDefaultMapping(boolean includeHostName) IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING, IndexMetadata.INDEX_ROUTING_PARTITION_SIZE_SETTING, IndexMetadata.INDEX_ROUTING_PATH, + IndexSettings.LOGSDB_ROUTE_ON_SORT_FIELDS, IndexSettings.TIME_SERIES_START_TIME, IndexSettings.TIME_SERIES_END_TIME ), diff --git a/server/src/main/java/org/elasticsearch/index/IndexSettings.java b/server/src/main/java/org/elasticsearch/index/IndexSettings.java index 9273888b9ec91..c8a983a48ff16 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/server/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -704,6 +704,13 @@ public boolean isES87TSDBCodecEnabled() { return es87TSDBCodecEnabled; } + public static final Setting LOGSDB_ROUTE_ON_SORT_FIELDS = Setting.boolSetting( + "index.logsdb.route_on_sort_fields", + false, + Property.IndexScope, + Property.Final + ); + /** * The {@link IndexMode "mode"} of the index. */ @@ -825,6 +832,7 @@ private static String getIgnoreAboveDefaultValue(final Settings settings) { private final boolean softDeleteEnabled; private volatile long softDeleteRetentionOperations; private final boolean es87TSDBCodecEnabled; + private final boolean logsdbRouteOnSortFields; private volatile long retentionLeaseMillis; @@ -935,6 +943,13 @@ public boolean isDefaultAllowUnmappedFields() { return defaultAllowUnmappedFields; } + /** + * Returns true if routing on sort fields is enabled for LogsDB. The default is false + */ + public boolean logsdbRouteOnSortFields() { + return logsdbRouteOnSortFields; + } + /** * Creates a new {@link IndexSettings} instance. The given node settings will be merged with the settings in the metadata * while index level settings will overwrite node settings. @@ -1027,12 +1042,16 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti indexRouting = IndexRouting.fromIndexMetadata(indexMetadata); sourceKeepMode = scopedSettings.get(Mapper.SYNTHETIC_SOURCE_KEEP_INDEX_SETTING); es87TSDBCodecEnabled = scopedSettings.get(TIME_SERIES_ES87TSDB_CODEC_ENABLED_SETTING); + logsdbRouteOnSortFields = scopedSettings.get(LOGSDB_ROUTE_ON_SORT_FIELDS); skipIgnoredSourceWrite = scopedSettings.get(IgnoredSourceFieldMapper.SKIP_IGNORED_SOURCE_WRITE_SETTING); skipIgnoredSourceRead = scopedSettings.get(IgnoredSourceFieldMapper.SKIP_IGNORED_SOURCE_READ_SETTING); indexMappingSourceMode = scopedSettings.get(INDEX_MAPPER_SOURCE_MODE_SETTING); recoverySourceEnabled = RecoverySettings.INDICES_RECOVERY_SOURCE_ENABLED_SETTING.get(nodeSettings); recoverySourceSyntheticEnabled = scopedSettings.get(RECOVERY_USE_SYNTHETIC_SOURCE_SETTING); if (recoverySourceSyntheticEnabled) { + if (DiscoveryNode.isStateless(settings)) { + throw new IllegalArgumentException("synthetic recovery source is only allowed in stateful"); + } // Verify that all nodes can handle this setting if (version.before(IndexVersions.USE_SYNTHETIC_SOURCE_FOR_RECOVERY) && version.between( diff --git a/server/src/main/java/org/elasticsearch/index/engine/LuceneSyntheticSourceChangesSnapshot.java b/server/src/main/java/org/elasticsearch/index/engine/LuceneSyntheticSourceChangesSnapshot.java index f21a3c06ab015..08508103181ed 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/LuceneSyntheticSourceChangesSnapshot.java +++ b/server/src/main/java/org/elasticsearch/index/engine/LuceneSyntheticSourceChangesSnapshot.java @@ -77,12 +77,13 @@ public LuceneSyntheticSourceChangesSnapshot( IndexVersion indexVersionCreated ) throws IOException { super(engineSearcher, searchBatchSize, fromSeqNo, toSeqNo, requiredFullRange, accessStats, indexVersionCreated); - assert mappingLookup.isSourceSynthetic(); + // a MapperService#updateMapping(...) of empty index may not have been invoked and then mappingLookup is empty + assert engineSearcher.getDirectoryReader().maxDoc() == 0 || mappingLookup.isSourceSynthetic() + : "either an empty index or synthetic source must be enabled for proper functionality."; // ensure we can buffer at least one document this.maxMemorySizeInBytes = maxMemorySizeInBytes > 0 ? maxMemorySizeInBytes : 1; this.sourceLoader = mappingLookup.newSourceLoader(null, SourceFieldMetrics.NOOP); Set storedFields = sourceLoader.requiredStoredFields(); - assert mappingLookup.isSourceSynthetic() : "synthetic source must be enabled for proper functionality."; this.storedFieldLoader = StoredFieldLoader.create(false, storedFields); this.lastSeenSeqNo = fromSeqNo - 1; } diff --git a/server/src/main/java/org/elasticsearch/index/mapper/DocumentMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/DocumentMapper.java index a99fa3f93679b..03e6c343c7ab9 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/DocumentMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/DocumentMapper.java @@ -13,6 +13,7 @@ import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.features.NodeFeature; +import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexSortConfig; import org.elasticsearch.index.IndexVersion; @@ -177,18 +178,16 @@ public void validate(IndexSettings settings, boolean checkLimits) { } List routingPaths = settings.getIndexMetadata().getRoutingPaths(); for (String path : routingPaths) { - for (String match : mappingLookup.getMatchingFieldNames(path)) { - mappingLookup.getFieldType(match).validateMatchedRoutingPath(path); + if (settings.getMode() == IndexMode.TIME_SERIES) { + for (String match : mappingLookup.getMatchingFieldNames(path)) { + mappingLookup.getFieldType(match).validateMatchedRoutingPath(path); + } } for (String objectName : mappingLookup.objectMappers().keySet()) { // object type is not allowed in the routing paths if (path.equals(objectName)) { throw new IllegalArgumentException( - "All fields that match routing_path must be configured with [time_series_dimension: true] " - + "or flattened fields with a list of dimensions in [time_series_dimensions] " - + "and without the [script] parameter. [" - + objectName - + "] was [object]." + "All fields that match routing_path must be flattened fields. [" + objectName + "] was [object]." ); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinExecutorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinExecutorTests.java index 492a142492e18..34add82e66557 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinExecutorTests.java @@ -467,7 +467,7 @@ public void testSuccess() { .build(); metaBuilder.put(indexMetadata, false); Metadata metadata = metaBuilder.build(); - NodeJoinExecutor.ensureIndexCompatibility(IndexVersions.MINIMUM_COMPATIBLE, IndexVersion.current(), metadata); + NodeJoinExecutor.ensureIndexCompatibility(IndexVersions.MINIMUM_READONLY_COMPATIBLE, IndexVersion.current(), metadata); } public static Settings.Builder randomCompatibleVersionSettings() { diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/IndexRoutingTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/IndexRoutingTests.java index 943fb6fd63b0b..1db6192eee80e 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/IndexRoutingTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/IndexRoutingTests.java @@ -16,6 +16,8 @@ import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.core.Nullable; +import org.elasticsearch.index.IndexMode; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.shard.ShardId; @@ -511,7 +513,7 @@ public void testRoutingPathUpdate() throws IOException { IllegalArgumentException.class, () -> routing.updateShard(randomAlphaOfLength(5), randomBoolean() ? null : randomAlphaOfLength(5)) ); - assertThat(e.getMessage(), equalTo("update is not supported because the destination index [test] is in time series mode")); + assertThat(e.getMessage(), equalTo("update is not supported because the destination index [test] is in time_series mode")); } public void testRoutingIndexWithRouting() throws IOException { @@ -525,7 +527,7 @@ public void testRoutingIndexWithRouting() throws IOException { ); assertThat( e.getMessage(), - equalTo("specifying routing is not supported because the destination index [test] is in time series mode") + equalTo("specifying routing is not supported because the destination index [test] is in time_series mode") ); } @@ -534,7 +536,7 @@ public void testRoutingPathCollectSearchWithRouting() throws IOException { Exception e = expectThrows(IllegalArgumentException.class, () -> routing.collectSearchShards(randomAlphaOfLength(5), null)); assertThat( e.getMessage(), - equalTo("searching with a specified routing is not supported because the destination index [test] is in time series mode") + equalTo("searching with a specified routing is not supported because the destination index [test] is in time_series mode") ); } @@ -647,14 +649,42 @@ public void testRoutingPathReadWithInvalidString() throws IOException { int shards = between(2, 1000); IndexRouting indexRouting = indexRoutingForPath(shards, "foo"); Exception e = expectThrows(ResourceNotFoundException.class, () -> shardIdForReadFromSourceExtracting(indexRouting, "!@#")); - assertThat(e.getMessage(), equalTo("invalid id [!@#] for index [test] in time series mode")); + assertThat(e.getMessage(), equalTo("invalid id [!@#] for index [test] in time_series mode")); } public void testRoutingPathReadWithShortString() throws IOException { int shards = between(2, 1000); IndexRouting indexRouting = indexRoutingForPath(shards, "foo"); Exception e = expectThrows(ResourceNotFoundException.class, () -> shardIdForReadFromSourceExtracting(indexRouting, "")); - assertThat(e.getMessage(), equalTo("invalid id [] for index [test] in time series mode")); + assertThat(e.getMessage(), equalTo("invalid id [] for index [test] in time_series mode")); + } + + public void testRoutingPathLogsdb() throws IOException { + int shards = between(2, 1000); + IndexRouting routing = IndexRouting.fromIndexMetadata( + IndexMetadata.builder("test") + .settings( + settings(IndexVersion.current()).put(IndexMetadata.INDEX_ROUTING_PATH.getKey(), "foo") + .put(IndexSettings.MODE.getKey(), IndexMode.LOGSDB) + .build() + ) + .numberOfShards(shards) + .numberOfReplicas(1) + .build() + ); + + IndexRequest req = new IndexRequest(); + routing.preProcess(req); + assertNull(req.id()); + + // Verify that routing uses the field name and value in the routing path. + int expectedShard = Math.floorMod(hash(List.of("foo", "A")), shards); + BytesReference sourceBytes = source(Map.of("foo", "A", "bar", "B")); + assertEquals(expectedShard, routing.indexShard(null, null, XContentType.JSON, sourceBytes)); + + // Verify that the request id gets updated to contain the routing hash. + routing.postProcess(req); + assertEquals(expectedShard, routing.getShard(req.id(), null)); } /** @@ -673,7 +703,11 @@ private IndexRouting indexRoutingForPath(int shards, String path) { private IndexRouting indexRoutingForPath(IndexVersion createdVersion, int shards, String path) { return IndexRouting.fromIndexMetadata( IndexMetadata.builder("test") - .settings(settings(createdVersion).put(IndexMetadata.INDEX_ROUTING_PATH.getKey(), path)) + .settings( + settings(createdVersion).put(IndexMetadata.INDEX_ROUTING_PATH.getKey(), path) + .put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES) + .build() + ) .numberOfShards(shards) .numberOfReplicas(1) .build() diff --git a/server/src/test/java/org/elasticsearch/common/TimeBasedUUIDGeneratorTests.java b/server/src/test/java/org/elasticsearch/common/TimeBasedUUIDGeneratorTests.java index 964683a1972ba..fa51f89dea9c3 100644 --- a/server/src/test/java/org/elasticsearch/common/TimeBasedUUIDGeneratorTests.java +++ b/server/src/test/java/org/elasticsearch/common/TimeBasedUUIDGeneratorTests.java @@ -9,12 +9,14 @@ package org.elasticsearch.common; +import org.elasticsearch.common.util.ByteUtils; import org.elasticsearch.test.ESTestCase; import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.Base64; import java.util.HashSet; +import java.util.OptionalInt; import java.util.Set; import java.util.function.Supplier; import java.util.stream.IntStream; @@ -106,6 +108,12 @@ public void testUUIDEncodingDecodingWithRandomValues() { ); } + public void testUUIDEncodingDecodingWithHash() { + int hash = randomInt(); + byte[] decoded = Base64.getUrlDecoder().decode(UUIDs.base64TimeBasedKOrderedUUIDWithHash(OptionalInt.of(hash))); + assertEquals(hash, ByteUtils.readIntLE(decoded, decoded.length - 9)); + } + private void testUUIDEncodingDecodingHelper(final long timestamp, final int sequenceId, final byte[] macAddress) { final TestTimeBasedKOrderedUUIDDecoder decoder = new TestTimeBasedKOrderedUUIDDecoder( createKOrderedGenerator(() -> timestamp, () -> sequenceId, () -> macAddress).getBase64UUID() diff --git a/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java index 4428a7e078510..7e761cb10d4d6 100644 --- a/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java @@ -54,7 +54,6 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.TimeValue; -import org.elasticsearch.core.UpdateForV9; import org.elasticsearch.env.BuildVersion; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; @@ -1392,7 +1391,7 @@ public void testLimitsFileCount() throws IOException { } } - public void testOverrideLuceneVersion() throws IOException { + public void testOverrideNodeVersion() throws IOException { try (NodeEnvironment nodeEnvironment = newNodeEnvironment(createDataPaths())) { final PersistedClusterStateService persistedClusterStateService = newPersistedClusterStateService(nodeEnvironment); final String clusterUUID = UUIDs.randomBase64UUID(random()); @@ -1415,9 +1414,7 @@ public void testOverrideLuceneVersion() throws IOException { assertThat(clusterState.metadata().version(), equalTo(version)); } - @UpdateForV9(owner = UpdateForV9.Owner.SEARCH_FOUNDATIONS) BuildVersion overrideVersion = BuildVersion.fromVersionId(Version.V_8_0_0.id); - NodeMetadata prevMetadata = PersistedClusterStateService.nodeMetadata(persistedClusterStateService.getDataPaths()); assertEquals(BuildVersion.current(), prevMetadata.nodeVersion()); PersistedClusterStateService.overrideVersion(overrideVersion, persistedClusterStateService.getDataPaths()); diff --git a/server/src/test/java/org/elasticsearch/index/TimeSeriesModeTests.java b/server/src/test/java/org/elasticsearch/index/TimeSeriesModeTests.java index d0fa037079255..93dedbb355fbd 100644 --- a/server/src/test/java/org/elasticsearch/index/TimeSeriesModeTests.java +++ b/server/src/test/java/org/elasticsearch/index/TimeSeriesModeTests.java @@ -173,14 +173,7 @@ public void testRoutingPathEqualsObjectNameError() { b.startObject("dim").field("type", "keyword").field("time_series_dimension", true).endObject(); b.endObject().endObject(); }))); - assertThat( - e.getMessage(), - equalTo( - "All fields that match routing_path must be configured with [time_series_dimension: true] " - + "or flattened fields with a list of dimensions in [time_series_dimensions] and " - + "without the [script] parameter. [dim.o] was [object]." - ) - ); + assertThat(e.getMessage(), equalTo("All fields that match routing_path must be flattened fields. [dim.o] was [object].")); } public void testRoutingPathMatchesNonDimensionKeyword() { diff --git a/server/src/test/java/org/elasticsearch/indices/analysis/AnalysisModuleTests.java b/server/src/test/java/org/elasticsearch/indices/analysis/AnalysisModuleTests.java index 1bcd84aadd6cd..abaab1ac8983b 100644 --- a/server/src/test/java/org/elasticsearch/indices/analysis/AnalysisModuleTests.java +++ b/server/src/test/java/org/elasticsearch/indices/analysis/AnalysisModuleTests.java @@ -22,6 +22,7 @@ import org.elasticsearch.index.IndexService.IndexCreationContext; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.index.IndexVersions; import org.elasticsearch.index.analysis.Analysis; import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.index.analysis.CharFilterFactory; @@ -186,6 +187,34 @@ public void testUnderscoreInAnalyzerName() throws IOException { } } + public void testStandardFilterBWC() throws IOException { + // standard tokenfilter should have been removed entirely in the 7x line. However, a + // cacheing bug meant that it was still possible to create indexes using a standard + // filter until 7.6 + { + IndexVersion version = IndexVersionUtils.randomVersionBetween(random(), IndexVersions.V_7_6_0, IndexVersion.current()); + final Settings settings = Settings.builder() + .put("index.analysis.analyzer.my_standard.tokenizer", "standard") + .put("index.analysis.analyzer.my_standard.filter", "standard") + .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()) + .put(IndexMetadata.SETTING_VERSION_CREATED, version) + .build(); + IllegalArgumentException exc = expectThrows(IllegalArgumentException.class, () -> getIndexAnalyzers(settings)); + assertThat(exc.getMessage(), equalTo("The [standard] token filter has been removed.")); + } + { + IndexVersion version = IndexVersionUtils.randomVersionBetween(random(), IndexVersions.V_7_0_0, IndexVersions.V_7_5_2); + final Settings settings = Settings.builder() + .put("index.analysis.analyzer.my_standard.tokenizer", "standard") + .put("index.analysis.analyzer.my_standard.filter", "standard") + .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()) + .put(IndexMetadata.SETTING_VERSION_CREATED, version) + .build(); + getIndexAnalyzers(settings); + assertWarnings("The [standard] token filter is deprecated and will be removed in a future version."); + } + } + /** * Tests that plugins can register pre-configured char filters that vary in behavior based on Elasticsearch version, Lucene version, * and that do not vary based on version at all. diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/deprecation/DeprecatedIndexPredicate.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/deprecation/DeprecatedIndexPredicate.java new file mode 100644 index 0000000000000..024d24fdf5151 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/deprecation/DeprecatedIndexPredicate.java @@ -0,0 +1,47 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.deprecation; + +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.index.IndexVersions; + +import java.util.function.Predicate; + +public class DeprecatedIndexPredicate { + + public static final IndexVersion MINIMUM_WRITEABLE_VERSION_AFTER_UPGRADE = IndexVersions.UPGRADE_TO_LUCENE_10_0_0; + + /* + * This predicate allows through only indices that were created with a previous lucene version, meaning that they need to be reindexed + * in order to be writable in the _next_ lucene version. + * + * It ignores searchable snapshots as they are not writable. + */ + public static Predicate getReindexRequiredPredicate(Metadata metadata) { + return index -> { + IndexMetadata indexMetadata = metadata.index(index); + return reindexRequired(indexMetadata); + }; + } + + public static boolean reindexRequired(IndexMetadata indexMetadata) { + return creationVersionBeforeMinimumWritableVersion(indexMetadata) && isNotSearchableSnapshot(indexMetadata); + } + + private static boolean isNotSearchableSnapshot(IndexMetadata indexMetadata) { + return indexMetadata.isSearchableSnapshot() == false; + } + + private static boolean creationVersionBeforeMinimumWritableVersion(IndexMetadata metadata) { + return metadata.getCreationVersion().before(MINIMUM_WRITEABLE_VERSION_AFTER_UPGRADE); + } + +} diff --git a/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/DataStreamDeprecationChecks.java b/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/DataStreamDeprecationChecks.java index ee029d01427aa..65f2659fda04a 100644 --- a/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/DataStreamDeprecationChecks.java +++ b/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/DataStreamDeprecationChecks.java @@ -10,10 +10,12 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.index.Index; -import org.elasticsearch.index.IndexVersions; +import org.elasticsearch.xpack.core.deprecation.DeprecatedIndexPredicate; import org.elasticsearch.xpack.core.deprecation.DeprecationIssue; import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; import static java.util.Map.entry; import static java.util.Map.ofEntries; @@ -21,54 +23,28 @@ public class DataStreamDeprecationChecks { static DeprecationIssue oldIndicesCheck(DataStream dataStream, ClusterState clusterState) { List backingIndices = dataStream.getIndices(); - boolean hasOldIndices = backingIndices.stream() - .anyMatch(index -> clusterState.metadata().index(index).getCompatibilityVersion().before(IndexVersions.V_8_0_0)); - if (hasOldIndices) { - long totalIndices = backingIndices.size(); - List oldIndices = backingIndices.stream() - .filter(index -> clusterState.metadata().index(index).getCompatibilityVersion().before(IndexVersions.V_8_0_0)) - .toList(); - long totalOldIndices = oldIndices.size(); - long totalOldSearchableSnapshots = oldIndices.stream() - .filter(index -> clusterState.metadata().index(index).isSearchableSnapshot()) - .count(); - long totalOldPartiallyMountedSearchableSnapshots = oldIndices.stream() - .filter(index -> clusterState.metadata().index(index).isPartialSearchableSnapshot()) - .count(); - long totalOldFullyMountedSearchableSnapshots = totalOldSearchableSnapshots - totalOldPartiallyMountedSearchableSnapshots; + + Set indicesNeedingUpgrade = backingIndices.stream() + .filter(DeprecatedIndexPredicate.getReindexRequiredPredicate(clusterState.metadata())) + .map(Index::getName) + .collect(Collectors.toUnmodifiableSet()); + + if (indicesNeedingUpgrade.isEmpty() == false) { return new DeprecationIssue( DeprecationIssue.Level.CRITICAL, - "Old data stream with a compatibility version < 8.0", + "Old data stream with a compatibility version < 9.0", "https://www.elastic.co/guide/en/elasticsearch/reference/master/breaking-changes-9.0.html", - "This data stream has backing indices that were created before Elasticsearch 8.0.0", + "This data stream has backing indices that were created before Elasticsearch 9.0.0", false, ofEntries( - entry( - "backing_indices", - ofEntries( - entry("count", totalIndices), - entry( - "need_upgrading", - ofEntries( - entry("count", totalOldIndices), - entry( - "searchable_snapshots", - ofEntries( - entry("count", totalOldSearchableSnapshots), - entry("fully_mounted", ofEntries(entry("count", totalOldFullyMountedSearchableSnapshots))), - entry( - "partially_mounted", - ofEntries(entry("count", totalOldPartiallyMountedSearchableSnapshots)) - ) - ) - ) - ) - ) - ) - ) + entry("reindex_required", true), + entry("total_backing_indices", backingIndices.size()), + entry("indices_requiring_upgrade_count", indicesNeedingUpgrade.size()), + entry("indices_requiring_upgrade", indicesNeedingUpgrade) ) ); } + return null; } } diff --git a/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/IndexDeprecationChecks.java b/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/IndexDeprecationChecks.java index aaf58a44a6565..de06e270a867e 100644 --- a/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/IndexDeprecationChecks.java +++ b/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/IndexDeprecationChecks.java @@ -14,12 +14,13 @@ import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexVersion; -import org.elasticsearch.index.IndexVersions; import org.elasticsearch.index.engine.frozen.FrozenEngine; import org.elasticsearch.index.mapper.SourceFieldMapper; +import org.elasticsearch.xpack.core.deprecation.DeprecatedIndexPredicate; import org.elasticsearch.xpack.core.deprecation.DeprecationIssue; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Locale; import java.util.Map; @@ -36,14 +37,14 @@ static DeprecationIssue oldIndicesCheck(IndexMetadata indexMetadata, ClusterStat // TODO: this check needs to be revised. It's trivially true right now. IndexVersion currentCompatibilityVersion = indexMetadata.getCompatibilityVersion(); // We intentionally exclude indices that are in data streams because they will be picked up by DataStreamDeprecationChecks - if (currentCompatibilityVersion.before(IndexVersions.V_8_0_0) && isNotDataStreamIndex(indexMetadata, clusterState)) { + if (DeprecatedIndexPredicate.reindexRequired(indexMetadata) && isNotDataStreamIndex(indexMetadata, clusterState)) { return new DeprecationIssue( DeprecationIssue.Level.CRITICAL, - "Old index with a compatibility version < 8.0", + "Old index with a compatibility version < 9.0", "https://www.elastic.co/guide/en/elasticsearch/reference/master/breaking-changes-9.0.html", "This index has version: " + currentCompatibilityVersion.toReleaseVersion(), false, - null + Collections.singletonMap("reindex_required", true) ); } return null; diff --git a/x-pack/plugin/deprecation/src/test/java/org/elasticsearch/xpack/deprecation/DataStreamDeprecationChecksTests.java b/x-pack/plugin/deprecation/src/test/java/org/elasticsearch/xpack/deprecation/DataStreamDeprecationChecksTests.java index d5325fb0ff3a4..b297cc1a5bdf8 100644 --- a/x-pack/plugin/deprecation/src/test/java/org/elasticsearch/xpack/deprecation/DataStreamDeprecationChecksTests.java +++ b/x-pack/plugin/deprecation/src/test/java/org/elasticsearch/xpack/deprecation/DataStreamDeprecationChecksTests.java @@ -17,41 +17,46 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.snapshots.SearchableSnapshotsSettings; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.deprecation.DeprecationIssue; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import static java.util.Collections.singletonList; +import static java.util.Map.entry; +import static java.util.Map.ofEntries; +import static org.elasticsearch.index.IndexModule.INDEX_STORE_TYPE_SETTING; import static org.elasticsearch.xpack.deprecation.DeprecationChecks.DATA_STREAM_CHECKS; import static org.hamcrest.Matchers.equalTo; public class DataStreamDeprecationChecksTests extends ESTestCase { public void testOldIndicesCheck() { - long oldIndexCount = randomIntBetween(1, 100); - long newIndexCount = randomIntBetween(1, 100); - long oldSearchableSnapshotCount = 0; - long oldFullyManagedSearchableSnapshotCount = 0; - long oldPartiallyManagedSearchableSnapshotCount = 0; + int oldIndexCount = randomIntBetween(1, 100); + int newIndexCount = randomIntBetween(1, 100); + List allIndices = new ArrayList<>(); Map nameToIndexMetadata = new HashMap<>(); + Set expectedIndices = new HashSet<>(); + for (int i = 0; i < oldIndexCount; i++) { - Settings.Builder settingsBuilder = settings(IndexVersion.fromId(7170099)); - if (randomBoolean()) { - settingsBuilder.put("index.store.type", "snapshot"); - if (randomBoolean()) { - oldFullyManagedSearchableSnapshotCount++; - } else { - settingsBuilder.put("index.store.snapshot.partial", true); - oldPartiallyManagedSearchableSnapshotCount++; - } - oldSearchableSnapshotCount++; + Settings.Builder settings = settings(IndexVersion.fromId(7170099)); + + String indexName = "old-data-stream-index-" + i; + if (expectedIndices.isEmpty() == false && randomIntBetween(0, 2) == 0) { + settings.put(INDEX_STORE_TYPE_SETTING.getKey(), SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOT_STORE_TYPE); + } else { + expectedIndices.add(indexName); } - IndexMetadata oldIndexMetadata = IndexMetadata.builder("old-data-stream-index-" + i) + + Settings.Builder settingsBuilder = settings; + IndexMetadata oldIndexMetadata = IndexMetadata.builder(indexName) .settings(settingsBuilder) .numberOfShards(1) .numberOfReplicas(0) @@ -59,11 +64,9 @@ public void testOldIndicesCheck() { allIndices.add(oldIndexMetadata.getIndex()); nameToIndexMetadata.put(oldIndexMetadata.getIndex().getName(), oldIndexMetadata); } + for (int i = 0; i < newIndexCount; i++) { Settings.Builder settingsBuilder = settings(IndexVersion.current()); - if (randomBoolean()) { - settingsBuilder.put("index.store.type", "snapshot"); - } IndexMetadata newIndexMetadata = IndexMetadata.builder("new-data-stream-index-" + i) .settings(settingsBuilder) .numberOfShards(1) @@ -72,6 +75,7 @@ public void testOldIndicesCheck() { allIndices.add(newIndexMetadata.getIndex()); nameToIndexMetadata.put(newIndexMetadata.getIndex().getName(), newIndexMetadata); } + DataStream dataStream = new DataStream( randomAlphaOfLength(10), allIndices, @@ -88,37 +92,27 @@ public void testOldIndicesCheck() { randomBoolean(), null ); + Metadata metadata = Metadata.builder().indices(nameToIndexMetadata).build(); ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metadata(metadata).build(); + DeprecationIssue expected = new DeprecationIssue( DeprecationIssue.Level.CRITICAL, - "Old data stream with a compatibility version < 8.0", + "Old data stream with a compatibility version < 9.0", "https://www.elastic.co/guide/en/elasticsearch/reference/master/breaking-changes-9.0.html", - "This data stream has backing indices that were created before Elasticsearch 8.0.0", + "This data stream has backing indices that were created before Elasticsearch 9.0.0", false, - Map.of( - "backing_indices", - Map.of( - "count", - oldIndexCount + newIndexCount, - "need_upgrading", - Map.of( - "count", - oldIndexCount, - "searchable_snapshots", - Map.of( - "count", - oldSearchableSnapshotCount, - "fully_mounted", - Map.of("count", oldFullyManagedSearchableSnapshotCount), - "partially_mounted", - Map.of("count", oldPartiallyManagedSearchableSnapshotCount) - ) - ) - ) + ofEntries( + entry("reindex_required", true), + entry("total_backing_indices", oldIndexCount + newIndexCount), + entry("indices_requiring_upgrade_count", expectedIndices.size()), + entry("indices_requiring_upgrade", expectedIndices) ) ); + List issues = DeprecationChecks.filterChecks(DATA_STREAM_CHECKS, c -> c.apply(dataStream, clusterState)); + assertThat(issues, equalTo(singletonList(expected))); } + } diff --git a/x-pack/plugin/deprecation/src/test/java/org/elasticsearch/xpack/deprecation/IndexDeprecationChecksTests.java b/x-pack/plugin/deprecation/src/test/java/org/elasticsearch/xpack/deprecation/IndexDeprecationChecksTests.java index 48cbef6831a2b..c6f3208a1cfb0 100644 --- a/x-pack/plugin/deprecation/src/test/java/org/elasticsearch/xpack/deprecation/IndexDeprecationChecksTests.java +++ b/x-pack/plugin/deprecation/src/test/java/org/elasticsearch/xpack/deprecation/IndexDeprecationChecksTests.java @@ -19,8 +19,8 @@ import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexVersion; -import org.elasticsearch.index.IndexVersions; import org.elasticsearch.index.engine.frozen.FrozenEngine; +import org.elasticsearch.snapshots.SearchableSnapshotsSettings; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.deprecation.DeprecationIssue; @@ -29,6 +29,8 @@ import java.util.Map; import static java.util.Collections.singletonList; +import static java.util.Collections.singletonMap; +import static org.elasticsearch.index.IndexModule.INDEX_STORE_TYPE_SETTING; import static org.elasticsearch.xpack.deprecation.DeprecationChecks.INDEX_SETTINGS_CHECKS; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; @@ -48,11 +50,11 @@ public void testOldIndicesCheck() { .build(); DeprecationIssue expected = new DeprecationIssue( DeprecationIssue.Level.CRITICAL, - "Old index with a compatibility version < 8.0", + "Old index with a compatibility version < 9.0", "https://www.elastic.co/guide/en/elasticsearch/reference/master/breaking-changes-9.0.html", "This index has version: " + createdWith.toReleaseVersion(), false, - null + singletonMap("reindex_required", true) ); List issues = DeprecationChecks.filterChecks(INDEX_SETTINGS_CHECKS, c -> c.apply(indexMetadata, clusterState)); assertEquals(singletonList(expected), issues); @@ -100,6 +102,20 @@ public void testOldIndicesCheckDataStreamIndex() { assertThat(issues.size(), equalTo(0)); } + public void testOldIndicesCheckSnapshotIgnored() { + IndexVersion createdWith = IndexVersion.fromId(7170099); + Settings.Builder settings = settings(createdWith); + settings.put(INDEX_STORE_TYPE_SETTING.getKey(), SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOT_STORE_TYPE); + IndexMetadata indexMetadata = IndexMetadata.builder("test").settings(settings).numberOfShards(1).numberOfReplicas(0).build(); + ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE) + .metadata(Metadata.builder().put(indexMetadata, true)) + .build(); + + List issues = DeprecationChecks.filterChecks(INDEX_SETTINGS_CHECKS, c -> c.apply(indexMetadata, clusterState)); + + assertThat(issues, empty()); + } + public void testTranslogRetentionSettings() { Settings.Builder settings = settings(IndexVersion.current()); settings.put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), randomPositiveTimeValue()); @@ -229,7 +245,7 @@ public void testCamelCaseDeprecation() throws IOException { + "} }"; IndexMetadata simpleIndex = IndexMetadata.builder(randomAlphaOfLengthBetween(5, 10)) - .settings(settings(IndexVersions.MINIMUM_COMPATIBLE)) + .settings(settings(IndexVersion.current())) .numberOfShards(1) .numberOfReplicas(1) .putMapping(simpleMapping) diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java index 1d2de407219ee..7aca63182e2b1 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java @@ -64,6 +64,11 @@ public class CsvTestsDataLoader { .withSetting("languages_lookup-settings.json"); private static final TestsDataset LANGUAGES_LOOKUP_NON_UNIQUE_KEY = LANGUAGES_LOOKUP.withIndex("languages_lookup_non_unique_key") .withData("languages_non_unique_key.csv"); + private static final TestsDataset LANGUAGES_NESTED_FIELDS = new TestsDataset( + "languages_nested_fields", + "mapping-languages_nested_fields.json", + "languages_nested_fields.csv" + ).withSetting("languages_lookup-settings.json"); private static final TestsDataset ALERTS = new TestsDataset("alerts"); private static final TestsDataset UL_LOGS = new TestsDataset("ul_logs"); private static final TestsDataset SAMPLE_DATA = new TestsDataset("sample_data"); @@ -116,6 +121,7 @@ public class CsvTestsDataLoader { Map.entry(LANGUAGES.indexName, LANGUAGES), Map.entry(LANGUAGES_LOOKUP.indexName, LANGUAGES_LOOKUP), Map.entry(LANGUAGES_LOOKUP_NON_UNIQUE_KEY.indexName, LANGUAGES_LOOKUP_NON_UNIQUE_KEY), + Map.entry(LANGUAGES_NESTED_FIELDS.indexName, LANGUAGES_NESTED_FIELDS), Map.entry(UL_LOGS.indexName, UL_LOGS), Map.entry(SAMPLE_DATA.indexName, SAMPLE_DATA), Map.entry(MV_SAMPLE_DATA.indexName, MV_SAMPLE_DATA), diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/data/languages_nested_fields.csv b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/data/languages_nested_fields.csv new file mode 100644 index 0000000000000..154125cf49304 --- /dev/null +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/data/languages_nested_fields.csv @@ -0,0 +1,5 @@ +_id:integer,language.id:integer,language.name:text,language.code:keyword +1,1,English,EN +2,2,French,FR +3,3,Spanish,ES +4,4,German,DE diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec index 7d4f89ed920a9..618149f2c3dde 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec @@ -416,6 +416,55 @@ language_code:integer | language_name:keyword | country:keyword 8 | Mv-Lang2 | Mv-Land2 ; +########################################################################### +# nested filed join behavior with languages_nested_fields index +########################################################################### + +joinOnNestedField +required_capability: join_lookup_v8 + +FROM employees +| WHERE 10000 < emp_no AND emp_no < 10006 +| EVAL language.id = emp_no % 10 +| LOOKUP JOIN languages_nested_fields ON language.id +| SORT emp_no +| KEEP emp_no, language.id, language.name +; + +emp_no:integer | language.id:integer | language.name:text +10001 | 1 | English +10002 | 2 | French +10003 | 3 | Spanish +10004 | 4 | German +10005 | 5 | null +; + + +joinOnNestedFieldRow +required_capability: join_lookup_v8 + +ROW language.code = "EN" +| LOOKUP JOIN languages_nested_fields ON language.code +| KEEP language.id, language.code, language.name.keyword +; + +language.id:integer | language.code:keyword | language.name.keyword:keyword +1 | EN | English +; + + +joinOnNestedNestedFieldRow +required_capability: join_lookup_v8 + +ROW language.name.keyword = "English" +| LOOKUP JOIN languages_nested_fields ON language.name.keyword +| KEEP language.id, language.name, language.name.keyword +; + +language.id:integer | language.name:text | language.name.keyword:keyword +1 | English | English +; + ############################################### # Tests with clientips_lookup index ############################################### diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/mapping-languages_nested_fields.json b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/mapping-languages_nested_fields.json new file mode 100644 index 0000000000000..9b46a85ed8d11 --- /dev/null +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/mapping-languages_nested_fields.json @@ -0,0 +1,22 @@ +{ + "properties" : { + "language" : { + "properties" : { + "id": { + "type": "integer" + }, + "name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword" + } + } + }, + "code": { + "type": "keyword" + } + } + } + } +} diff --git a/x-pack/plugin/logsdb/build.gradle b/x-pack/plugin/logsdb/build.gradle index 1aef69e0e3fac..f66dc23ff41bb 100644 --- a/x-pack/plugin/logsdb/build.gradle +++ b/x-pack/plugin/logsdb/build.gradle @@ -8,6 +8,7 @@ evaluationDependsOn(xpackModule('core')) apply plugin: 'elasticsearch.internal-es-plugin' +apply plugin: 'elasticsearch.internal-cluster-test' apply plugin: 'elasticsearch.internal-java-rest-test' apply plugin: 'elasticsearch.internal-yaml-rest-test' @@ -29,7 +30,9 @@ restResources { dependencies { compileOnly project(path: xpackModule('core')) + testImplementation project(':modules:data-streams') testImplementation(testArtifact(project(xpackModule('core')))) + internalClusterTestImplementation(testArtifact(project(xpackModule('core')))) } tasks.named("javaRestTest").configure { diff --git a/x-pack/plugin/logsdb/qa/with-basic/src/javaRestTest/java/org/elasticsearch/xpack/logsdb/LogsdbWithBasicRestIT.java b/x-pack/plugin/logsdb/qa/with-basic/src/javaRestTest/java/org/elasticsearch/xpack/logsdb/LogsdbWithBasicRestIT.java index 381c83ceee289..4a9d13bc642d7 100644 --- a/x-pack/plugin/logsdb/qa/with-basic/src/javaRestTest/java/org/elasticsearch/xpack/logsdb/LogsdbWithBasicRestIT.java +++ b/x-pack/plugin/logsdb/qa/with-basic/src/javaRestTest/java/org/elasticsearch/xpack/logsdb/LogsdbWithBasicRestIT.java @@ -9,7 +9,9 @@ import org.elasticsearch.client.Request; import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.mapper.SourceFieldMapper; import org.elasticsearch.test.cluster.ElasticsearchCluster; import org.elasticsearch.test.cluster.local.distribution.DistributionType; @@ -203,4 +205,59 @@ public void testLogsdbOverrideDefaultModeForLogsIndex() throws IOException { assertEquals("logsdb", settings.get("index.mode")); assertEquals(SourceFieldMapper.Mode.STORED.toString(), settings.get("index.mapping.source.mode")); } + + public void testLogsdbRouteOnSortFields() throws IOException { + Request request = new Request("PUT", "/_cluster/settings"); + request.setJsonEntity("{ \"transient\": { \"cluster.logsdb.enabled\": true } }"); + assertOK(client().performRequest(request)); + + request = new Request("POST", "/_index_template/1"); + request.setJsonEntity(""" + { + "index_patterns": ["my-log-*"], + "data_stream": { + }, + "template": { + "settings":{ + "index": { + "mode": "logsdb", + "sort.field": [ "host.name", "message", "@timestamp" ], + "logsdb.route_on_sort_fields": true + } + }, + "mappings": { + "properties": { + "@timestamp" : { + "type": "date" + }, + "host.name": { + "type": "keyword" + }, + "message": { + "type": "keyword" + } + } + } + } + } + """); + assertOK(client().performRequest(request)); + + request = new Request("POST", "/my-log-foo/_doc"); + request.setJsonEntity(""" + { + "@timestamp": "2020-01-01T00:00:00.000Z", + "host.name": "foo", + "message": "bar" + } + """); + assertOK(client().performRequest(request)); + + String index = DataStream.getDefaultBackingIndexName("my-log-foo", 1); + var settings = (Map) ((Map) getIndexSettings(index).get(index)).get("settings"); + assertEquals("logsdb", settings.get("index.mode")); + assertEquals(SourceFieldMapper.Mode.STORED.toString(), settings.get("index.mapping.source.mode")); + assertEquals("true", settings.get(IndexSettings.LOGSDB_ROUTE_ON_SORT_FIELDS.getKey())); + assertEquals(List.of("host.name", "message"), settings.get(IndexMetadata.INDEX_ROUTING_PATH.getKey())); + } } diff --git a/x-pack/plugin/logsdb/src/internalClusterTest/java/org/elasticsearch/xpack/logsdb/LogsIndexingIT.java b/x-pack/plugin/logsdb/src/internalClusterTest/java/org/elasticsearch/xpack/logsdb/LogsIndexingIT.java new file mode 100644 index 0000000000000..1ab49f91376f1 --- /dev/null +++ b/x-pack/plugin/logsdb/src/internalClusterTest/java/org/elasticsearch/xpack/logsdb/LogsIndexingIT.java @@ -0,0 +1,213 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.logsdb; + +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; +import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest; +import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; +import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; +import org.elasticsearch.cluster.metadata.Template; +import org.elasticsearch.common.compress.CompressedXContent; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.time.DateFormatter; +import org.elasticsearch.common.time.FormatNames; +import org.elasticsearch.datastreams.DataStreamsPlugin; +import org.elasticsearch.license.LicenseSettings; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.test.InternalSettingsPlugin; +import org.elasticsearch.xcontent.XContentType; +import org.elasticsearch.xpack.core.XPackPlugin; + +import java.time.Instant; +import java.util.Collection; +import java.util.List; +import java.util.UUID; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; + +public class LogsIndexingIT extends ESSingleNodeTestCase { + + public static final String MAPPING_TEMPLATE = """ + { + "_doc":{ + "properties": { + "@timestamp" : { + "type": "date" + }, + "message": { + "type": "keyword" + }, + "k8s": { + "properties": { + "pod": { + "properties": { + "uid": { + "type": "keyword" + } + } + } + } + } + } + } + }"""; + + private static final String DOC = """ + { + "@timestamp": "$time", + "message": "$pod", + "k8s": { + "pod": { + "name": "dog", + "uid":"$uuid", + "ip": "10.10.55.3", + "network": { + "tx": 1434595272, + "rx": 530605511 + } + } + } + } + """; + + @Override + protected Collection> getPlugins() { + return List.of(InternalSettingsPlugin.class, XPackPlugin.class, LogsDBPlugin.class, DataStreamsPlugin.class); + } + + @Override + protected Settings nodeSettings() { + return Settings.builder() + .put(super.nodeSettings()) + .put("cluster.logsdb.enabled", "true") + .put(LicenseSettings.SELF_GENERATED_LICENSE_TYPE.getKey(), "trial") + .build(); + } + + public void testStandard() throws Exception { + String dataStreamName = "k8s"; + var putTemplateRequest = new TransportPutComposableIndexTemplateAction.Request("id"); + putTemplateRequest.indexTemplate( + ComposableIndexTemplate.builder() + .indexPatterns(List.of(dataStreamName + "*")) + .template( + new Template( + indexSettings(4, 0).put("index.mode", "logsdb").put("index.sort.field", "message,k8s.pod.uid,@timestamp").build(), + new CompressedXContent(MAPPING_TEMPLATE), + null + ) + ) + .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false)) + .build() + ); + client().execute(TransportPutComposableIndexTemplateAction.TYPE, putTemplateRequest).actionGet(); + checkIndexSearchAndRetrieval(dataStreamName, false); + } + + public void testRouteOnSortFields() throws Exception { + String dataStreamName = "k8s"; + var putTemplateRequest = new TransportPutComposableIndexTemplateAction.Request("id"); + putTemplateRequest.indexTemplate( + ComposableIndexTemplate.builder() + .indexPatterns(List.of(dataStreamName + "*")) + .template( + new Template( + indexSettings(4, 0).put("index.mode", "logsdb") + .put("index.sort.field", "message,k8s.pod.uid,@timestamp") + .put("index.logsdb.route_on_sort_fields", true) + .build(), + new CompressedXContent(MAPPING_TEMPLATE), + null + ) + ) + .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false)) + .build() + ); + client().execute(TransportPutComposableIndexTemplateAction.TYPE, putTemplateRequest).actionGet(); + checkIndexSearchAndRetrieval(dataStreamName, true); + } + + private void checkIndexSearchAndRetrieval(String dataStreamName, boolean routeOnSortFields) throws Exception { + String[] uuis = { + UUID.randomUUID().toString(), + UUID.randomUUID().toString(), + UUID.randomUUID().toString(), + UUID.randomUUID().toString() }; + int numBulkRequests = randomIntBetween(128, 1024); + int numDocsPerBulk = randomIntBetween(16, 256); + String indexName = null; + { + Instant time = Instant.now(); + for (int i = 0; i < numBulkRequests; i++) { + BulkRequest bulkRequest = new BulkRequest(dataStreamName); + for (int j = 0; j < numDocsPerBulk; j++) { + var indexRequest = new IndexRequest(dataStreamName).opType(DocWriteRequest.OpType.CREATE); + indexRequest.source( + DOC.replace("$time", formatInstant(time)) + .replace("$uuid", uuis[j % uuis.length]) + .replace("$pod", "pod-" + randomIntBetween(0, 10)), + XContentType.JSON + ); + bulkRequest.add(indexRequest); + time = time.plusMillis(1); + } + var bulkResponse = client().bulk(bulkRequest).actionGet(); + assertThat(bulkResponse.hasFailures(), is(false)); + indexName = bulkResponse.getItems()[0].getIndex(); + } + client().admin().indices().refresh(new RefreshRequest(dataStreamName)).actionGet(); + } + + // Verify settings. + final GetSettingsResponse getSettingsResponse = indicesAdmin().getSettings( + new GetSettingsRequest().indices(indexName).includeDefaults(false) + ).actionGet(); + final Settings settings = getSettingsResponse.getIndexToSettings().get(indexName); + assertEquals("message,k8s.pod.uid,@timestamp", settings.get("index.sort.field")); + if (routeOnSortFields) { + assertEquals("[message, k8s.pod.uid]", settings.get("index.routing_path")); + assertEquals("true", settings.get("index.logsdb.route_on_sort_fields")); + } else { + assertNull(settings.get("index.routing_path")); + assertNull(settings.get("index.logsdb.route_on_sort_fields")); + } + + // Check the search api can synthesize _id + final String idxName = indexName; + var searchRequest = new SearchRequest(dataStreamName); + searchRequest.source().trackTotalHits(true); + assertResponse(client().search(searchRequest), searchResponse -> { + assertThat(searchResponse.getHits().getTotalHits().value(), equalTo((long) numBulkRequests * numDocsPerBulk)); + + for (int i = 0; i < searchResponse.getHits().getHits().length; i++) { + String id = searchResponse.getHits().getHits()[i].getId(); + assertThat(id, notNullValue()); + + // Check that the _id is gettable: + var getResponse = client().get(new GetRequest(idxName).id(id)).actionGet(); + assertThat(getResponse.isExists(), is(true)); + assertThat(getResponse.getId(), equalTo(id)); + } + }); + } + + static String formatInstant(Instant instant) { + return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant); + } + +} diff --git a/x-pack/plugin/logsdb/src/javaRestTest/java/org/elasticsearch/xpack/logsdb/LogsdbRestIT.java b/x-pack/plugin/logsdb/src/javaRestTest/java/org/elasticsearch/xpack/logsdb/LogsdbRestIT.java index ef9480681f559..bd8093c0a01c1 100644 --- a/x-pack/plugin/logsdb/src/javaRestTest/java/org/elasticsearch/xpack/logsdb/LogsdbRestIT.java +++ b/x-pack/plugin/logsdb/src/javaRestTest/java/org/elasticsearch/xpack/logsdb/LogsdbRestIT.java @@ -9,9 +9,11 @@ import org.elasticsearch.client.Request; import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.time.DateFormatter; import org.elasticsearch.common.time.FormatNames; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.test.cluster.ElasticsearchCluster; import org.elasticsearch.test.cluster.local.distribution.DistributionType; import org.elasticsearch.test.rest.ESRestTestCase; @@ -225,4 +227,58 @@ static String formatInstant(Instant instant) { return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant); } + public void testLogsdbRouteOnSortFields() throws IOException { + Request request = new Request("PUT", "/_cluster/settings"); + request.setJsonEntity("{ \"transient\": { \"cluster.logsdb.enabled\": true } }"); + assertOK(client().performRequest(request)); + + request = new Request("POST", "/_index_template/1"); + request.setJsonEntity(""" + { + "index_patterns": ["my-log-*"], + "data_stream": { + }, + "template": { + "settings":{ + "index": { + "mode": "logsdb", + "sort.field": [ "host.name", "message", "@timestamp" ], + "logsdb.route_on_sort_fields": true + } + }, + "mappings": { + "properties": { + "@timestamp" : { + "type": "date" + }, + "host.name": { + "type": "keyword" + }, + "message": { + "type": "keyword" + } + } + } + } + } + """); + assertOK(client().performRequest(request)); + + request = new Request("POST", "/my-log-foo/_doc"); + request.setJsonEntity(""" + { + "@timestamp": "2020-01-01T00:00:00.000Z", + "host.name": "foo", + "message": "bar" + } + """); + assertOK(client().performRequest(request)); + + String index = DataStream.getDefaultBackingIndexName("my-log-foo", 1); + var settings = (Map) ((Map) getIndexSettings(index).get(index)).get("settings"); + assertEquals("logsdb", settings.get("index.mode")); + assertNull(settings.get("index.mapping.source.mode")); + assertEquals("true", settings.get(IndexSettings.LOGSDB_ROUTE_ON_SORT_FIELDS.getKey())); + assertEquals(List.of("host.name", "message"), settings.get(IndexMetadata.INDEX_ROUTING_PATH.getKey())); + } } diff --git a/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsdbIndexModeSettingsProvider.java b/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsdbIndexModeSettingsProvider.java index 977b0e1c57578..f95b64f8c0ec9 100644 --- a/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsdbIndexModeSettingsProvider.java +++ b/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsdbIndexModeSettingsProvider.java @@ -21,12 +21,15 @@ import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.IndexSettingProvider; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.IndexSortConfig; import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.index.mapper.DataStreamTimestampFieldMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.SourceFieldMapper; import java.io.IOException; import java.time.Instant; +import java.util.ArrayList; import java.util.List; import java.util.Locale; import java.util.function.Supplier; @@ -79,6 +82,9 @@ public Settings getAdditionalIndexSettings( final List combinedTemplateMappings ) { Settings.Builder settingsBuilder = null; + boolean isLogsDB = templateIndexMode == IndexMode.LOGSDB; + + // Inject logsdb index mode, based on the logs pattern. if (isLogsdbEnabled && dataStreamName != null && resolveIndexMode(settings.get(IndexSettings.MODE.getKey())) == null @@ -87,8 +93,10 @@ && matchesLogsPattern(dataStreamName)) { if (supportFallbackToStoredSource()) { settings = Settings.builder().put(IndexSettings.MODE.getKey(), IndexMode.LOGSDB.getName()).put(settings).build(); } + isLogsDB = true; } + // Inject stored source mode if synthetic source if not available per licence. if (supportFallbackToStoredSource()) { // This index name is used when validating component and index templates, we should skip this check in that case. // (See MetadataIndexTemplateService#validateIndexTemplateV2(...) method) @@ -110,14 +118,57 @@ && matchesLogsPattern(dataStreamName)) { settingsBuilder.put(SourceFieldMapper.INDEX_MAPPER_SOURCE_MODE_SETTING.getKey(), SourceFieldMapper.Mode.STORED.toString()); } } + + if (isLogsDB) { + // Inject routing path matching sort fields. + if (settings.getAsBoolean(IndexSettings.LOGSDB_ROUTE_ON_SORT_FIELDS.getKey(), false)) { + List sortFields = new ArrayList<>(settings.getAsList(IndexSortConfig.INDEX_SORT_FIELD_SETTING.getKey())); + sortFields.removeIf(s -> s.equals(DataStreamTimestampFieldMapper.DEFAULT_PATH)); + if (sortFields.size() < 2) { + throw new IllegalStateException( + String.format( + Locale.ROOT, + "data stream [%s] in logsdb mode and with [%s] index setting has only %d sort fields " + + "(excluding timestamp), needs at least 2", + dataStreamName, + IndexSettings.LOGSDB_ROUTE_ON_SORT_FIELDS.getKey(), + sortFields.size() + ) + ); + } + if (settings.hasValue(IndexMetadata.INDEX_ROUTING_PATH.getKey())) { + List routingPaths = settings.getAsList(IndexMetadata.INDEX_ROUTING_PATH.getKey()); + if (routingPaths.equals(sortFields) == false) { + throw new IllegalStateException( + String.format( + Locale.ROOT, + "data stream [%s] in logsdb mode and with [%s] index setting has mismatching sort " + + "and routing fields, [index.routing_path:%s], [index.sort.fields:%s]", + dataStreamName, + IndexSettings.LOGSDB_ROUTE_ON_SORT_FIELDS.getKey(), + routingPaths, + sortFields + ) + ); + } + } else { + if (settingsBuilder == null) { + settingsBuilder = Settings.builder(); + } + settingsBuilder.putList(INDEX_ROUTING_PATH.getKey(), sortFields).build(); + } + } + } + return settingsBuilder == null ? Settings.EMPTY : settingsBuilder.build(); + } private static boolean matchesLogsPattern(final String name) { return Regex.simpleMatch(LOGS_PATTERN, name); } - private IndexMode resolveIndexMode(final String mode) { + private static IndexMode resolveIndexMode(final String mode) { return mode != null ? Enum.valueOf(IndexMode.class, mode.toUpperCase(Locale.ROOT)) : null; } diff --git a/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsdbIndexModeSettingsProviderTests.java b/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsdbIndexModeSettingsProviderTests.java index de4f0960f50e7..5d3cb7b2a9967 100644 --- a/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsdbIndexModeSettingsProviderTests.java +++ b/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsdbIndexModeSettingsProviderTests.java @@ -11,6 +11,7 @@ import org.elasticsearch.cluster.metadata.ComposableIndexTemplateMetadata; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.DataStreamTestHelper; +import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.Template; import org.elasticsearch.common.compress.CompressedXContent; @@ -18,6 +19,7 @@ import org.elasticsearch.core.Tuple; import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.IndexSortConfig; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.MapperTestUtils; import org.elasticsearch.index.mapper.SourceFieldMapper; @@ -36,6 +38,7 @@ import static org.elasticsearch.common.settings.Settings.builder; import static org.elasticsearch.xpack.logsdb.SyntheticSourceLicenseServiceTests.createEnterpriseLicense; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; @@ -43,6 +46,7 @@ public class LogsdbIndexModeSettingsProviderTests extends ESTestCase { + private static final String DATA_STREAM_NAME = "logs-app1"; public static final String DEFAULT_MAPPING = """ { "_doc": { @@ -385,7 +389,7 @@ private void assertIndexMode(final Settings settings, final String expectedIndex } public void testNewIndexHasSyntheticSourceUsage() throws IOException { - String dataStreamName = "logs-app1"; + String dataStreamName = DATA_STREAM_NAME; String indexName = DataStream.getDefaultBackingIndexName(dataStreamName, 0); Settings settings = Settings.EMPTY; LogsdbIndexModeSettingsProvider provider = withSyntheticSourceDemotionSupport(false); @@ -472,7 +476,7 @@ public void testValidateIndexName() throws IOException { } public void testNewIndexHasSyntheticSourceUsageLogsdbIndex() throws IOException { - String dataStreamName = "logs-app1"; + String dataStreamName = DATA_STREAM_NAME; String indexName = DataStream.getDefaultBackingIndexName(dataStreamName, 0); String mapping = """ { @@ -516,7 +520,7 @@ public void testNewIndexHasSyntheticSourceUsageLogsdbIndex() throws IOException } public void testNewIndexHasSyntheticSourceUsageTimeSeries() throws IOException { - String dataStreamName = "logs-app1"; + String dataStreamName = DATA_STREAM_NAME; String indexName = DataStream.getDefaultBackingIndexName(dataStreamName, 0); String mapping = """ { @@ -557,7 +561,7 @@ public void testNewIndexHasSyntheticSourceUsageTimeSeries() throws IOException { } public void testNewIndexHasSyntheticSourceUsage_invalidSettings() throws IOException { - String dataStreamName = "logs-app1"; + String dataStreamName = DATA_STREAM_NAME; String indexName = DataStream.getDefaultBackingIndexName(dataStreamName, 0); Settings settings = Settings.builder().put("index.soft_deletes.enabled", false).build(); LogsdbIndexModeSettingsProvider provider = withSyntheticSourceDemotionSupport(false); @@ -599,7 +603,7 @@ public void testNewIndexHasSyntheticSourceUsage_invalidSettings() throws IOExcep } public void testGetAdditionalIndexSettingsDowngradeFromSyntheticSource() throws IOException { - String dataStreamName = "logs-app1"; + String dataStreamName = DATA_STREAM_NAME; Metadata.Builder mb = Metadata.builder( DataStreamTestHelper.getClusterStateWithDataStreams( List.of(Tuple.tuple(dataStreamName, 1)), @@ -672,7 +676,7 @@ public void testGetAdditionalIndexSettingsDowngradeFromSyntheticSourceFileMatch( LogsdbIndexModeSettingsProvider provider = withSyntheticSourceDemotionSupport(true); final Settings settings = Settings.EMPTY; - String dataStreamName = "logs-app1"; + String dataStreamName = DATA_STREAM_NAME; Metadata.Builder mb = Metadata.builder( DataStreamTestHelper.getClusterStateWithDataStreams( List.of(Tuple.tuple(dataStreamName, 1)), @@ -731,4 +735,86 @@ public void testGetAdditionalIndexSettingsDowngradeFromSyntheticSourceFileMatch( assertThat(result.size(), equalTo(0)); } + public void testLogsdbRoutingPathOnSortFields() throws Exception { + var settings = Settings.builder() + .put(IndexSortConfig.INDEX_SORT_FIELD_SETTING.getKey(), "host,message") + .put(IndexSettings.LOGSDB_ROUTE_ON_SORT_FIELDS.getKey(), true) + .build(); + Settings result = generateLogsdbSettings(settings); + assertThat(IndexMetadata.INDEX_ROUTING_PATH.get(result), contains("host", "message")); + } + + public void testLogsdbRoutingPathOnSortFieldsFilterTimestamp() throws Exception { + var settings = Settings.builder() + .put(IndexSortConfig.INDEX_SORT_FIELD_SETTING.getKey(), "host,message,@timestamp") + .put(IndexSettings.LOGSDB_ROUTE_ON_SORT_FIELDS.getKey(), true) + .build(); + Settings result = generateLogsdbSettings(settings); + assertThat(IndexMetadata.INDEX_ROUTING_PATH.get(result), contains("host", "message")); + } + + public void testLogsdbRoutingPathOnSortSingleField() throws Exception { + var settings = Settings.builder() + .put(IndexSortConfig.INDEX_SORT_FIELD_SETTING.getKey(), "host") + .put(IndexSettings.LOGSDB_ROUTE_ON_SORT_FIELDS.getKey(), true) + .build(); + Exception e = expectThrows(IllegalStateException.class, () -> generateLogsdbSettings(settings)); + assertThat( + e.getMessage(), + equalTo( + "data stream [" + + DATA_STREAM_NAME + + "] in logsdb mode and with [index.logsdb.route_on_sort_fields] index setting has only 1 sort fields " + + "(excluding timestamp), needs at least 2" + ) + ); + } + + public void testLogsdbExplicitRoutingPathMatchesSortFields() throws Exception { + var settings = Settings.builder() + .put(IndexSettings.MODE.getKey(), IndexMode.LOGSDB) + .put(IndexSortConfig.INDEX_SORT_FIELD_SETTING.getKey(), "host,message,@timestamp") + .put(IndexMetadata.INDEX_ROUTING_PATH.getKey(), "host,message") + .put(IndexSettings.LOGSDB_ROUTE_ON_SORT_FIELDS.getKey(), true) + .build(); + Settings result = generateLogsdbSettings(settings); + assertTrue(result.isEmpty()); + } + + public void testLogsdbExplicitRoutingPathDoesNotMatchSortFields() throws Exception { + var settings = Settings.builder() + .put(IndexSortConfig.INDEX_SORT_FIELD_SETTING.getKey(), "host,message,@timestamp") + .put(IndexMetadata.INDEX_ROUTING_PATH.getKey(), "host,message,foo") + .put(IndexSettings.LOGSDB_ROUTE_ON_SORT_FIELDS.getKey(), true) + .build(); + Exception e = expectThrows(IllegalStateException.class, () -> generateLogsdbSettings(settings)); + assertThat( + e.getMessage(), + equalTo( + "data stream [" + + DATA_STREAM_NAME + + "] in logsdb mode and with [index." + + "logsdb.route_on_sort_fields] index setting has mismatching sort " + + "and routing fields, [index.routing_path:[host, message, foo]], [index.sort.fields:[host, message]]" + ) + ); + } + + private Settings generateLogsdbSettings(Settings settings) throws IOException { + Metadata metadata = Metadata.EMPTY_METADATA; + var provider = new LogsdbIndexModeSettingsProvider( + syntheticSourceLicenseService, + Settings.builder().put("cluster.logsdb.enabled", true).build() + ); + var result = provider.getAdditionalIndexSettings( + DataStream.getDefaultBackingIndexName(DATA_STREAM_NAME, 0), + DATA_STREAM_NAME, + IndexMode.LOGSDB, + metadata, + Instant.now(), + settings, + List.of() + ); + return builder().put(result).build(); + } } diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamAction.java index b10bea9e54230..9e4cbb1082215 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamAction.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamAction.java @@ -13,14 +13,10 @@ import org.elasticsearch.action.ActionType; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.util.FeatureFlag; import org.elasticsearch.features.NodeFeature; -import org.elasticsearch.index.Index; -import org.elasticsearch.index.IndexVersion; -import org.elasticsearch.index.IndexVersions; import org.elasticsearch.xcontent.ConstructingObjectParser; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xcontent.ToXContent; @@ -43,24 +39,10 @@ public class ReindexDataStreamAction extends ActionType getOldIndexVersionPredicate(Metadata metadata) { - return index -> metadata.index(index).getCreationVersion().onOrBefore(MINIMUM_WRITEABLE_VERSION_AFTER_UPGRADE); - } - public enum Mode { UPGRADE } diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java index 66b13a9ce22b0..38b5da6527039 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java @@ -32,6 +32,7 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.deprecation.DeprecatedIndexPredicate; import java.util.Locale; import java.util.Map; @@ -78,13 +79,13 @@ protected void doExecute( IndexMetadata sourceIndex = clusterService.state().getMetadata().index(sourceIndexName); Settings settingsBefore = sourceIndex.getSettings(); - var hasOldVersion = ReindexDataStreamAction.getOldIndexVersionPredicate(clusterService.state().metadata()); + var hasOldVersion = DeprecatedIndexPredicate.getReindexRequiredPredicate(clusterService.state().metadata()); if (hasOldVersion.test(sourceIndex.getIndex()) == false) { logger.warn( "Migrating index [{}] with version [{}] is unnecessary as its version is not before [{}]", sourceIndexName, sourceIndex.getCreationVersion(), - ReindexDataStreamAction.MINIMUM_WRITEABLE_VERSION_AFTER_UPGRADE + DeprecatedIndexPredicate.MINIMUM_WRITEABLE_VERSION_AFTER_UPGRADE ); } diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportAction.java index f011c429ce79c..cc648c1984544 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportAction.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportAction.java @@ -26,8 +26,8 @@ import org.elasticsearch.xpack.migrate.task.ReindexDataStreamTask; import org.elasticsearch.xpack.migrate.task.ReindexDataStreamTaskParams; +import static org.elasticsearch.xpack.core.deprecation.DeprecatedIndexPredicate.getReindexRequiredPredicate; import static org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.TASK_ID_PREFIX; -import static org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.getOldIndexVersionPredicate; /* * This transport action creates a new persistent task for reindexing the source data stream given in the request. On successful creation @@ -68,7 +68,7 @@ protected void doExecute(Task task, ReindexDataStreamRequest request, ActionList return; } int totalIndices = dataStream.getIndices().size(); - int totalIndicesToBeUpgraded = (int) dataStream.getIndices().stream().filter(getOldIndexVersionPredicate(metadata)).count(); + int totalIndicesToBeUpgraded = (int) dataStream.getIndices().stream().filter(getReindexRequiredPredicate(metadata)).count(); ReindexDataStreamTaskParams params = new ReindexDataStreamTaskParams( sourceDataStreamName, transportService.getThreadPool().absoluteTimeInMillis(), diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java index dc8e33bc091e6..30f64fdd1d6f6 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java @@ -35,7 +35,7 @@ import java.util.Map; import java.util.NoSuchElementException; -import static org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.getOldIndexVersionPredicate; +import static org.elasticsearch.xpack.core.deprecation.DeprecatedIndexPredicate.getReindexRequiredPredicate; public class ReindexDataStreamPersistentTaskExecutor extends PersistentTasksExecutor { private static final TimeValue TASK_KEEP_ALIVE_TIME = TimeValue.timeValueDays(1); @@ -84,7 +84,7 @@ protected void nodeOperation(AllocatedPersistentTask task, ReindexDataStreamTask List dataStreamInfos = response.getDataStreams(); if (dataStreamInfos.size() == 1) { DataStream dataStream = dataStreamInfos.getFirst().getDataStream(); - if (getOldIndexVersionPredicate(clusterService.state().metadata()).test(dataStream.getWriteIndex())) { + if (getReindexRequiredPredicate(clusterService.state().metadata()).test(dataStream.getWriteIndex())) { reindexClient.execute( RolloverAction.INSTANCE, new RolloverRequest(sourceDataStream, null), @@ -109,7 +109,7 @@ private void reindexIndices( String sourceDataStream ) { List indices = dataStream.getIndices(); - List indicesToBeReindexed = indices.stream().filter(getOldIndexVersionPredicate(clusterService.state().metadata())).toList(); + List indicesToBeReindexed = indices.stream().filter(getReindexRequiredPredicate(clusterService.state().metadata())).toList(); reindexDataStreamTask.setPendingIndicesCount(indicesToBeReindexed.size()); // The CountDownActionListener is 1 more than the number of indices so that the count is not 0 if we have no indices CountDownActionListener listener = new CountDownActionListener(indicesToBeReindexed.size() + 1, ActionListener.wrap(response1 -> { diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/190_lookup_join.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/190_lookup_join.yml index fdb6746bbeed8..5b39f74de1b9d 100644 --- a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/190_lookup_join.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/190_lookup_join.yml @@ -12,8 +12,6 @@ setup: indices.create: index: test body: - settings: - number_of_shards: 5 mappings: properties: key: @@ -27,6 +25,7 @@ setup: settings: index: mode: lookup + number_of_shards: 1 mappings: properties: key: