Skip to content

Commit

Permalink
Merge branch 'main' into refactoring/cached_blob_version
Browse files Browse the repository at this point in the history
  • Loading branch information
javanna authored Dec 19, 2024
2 parents 2224503 + b2879c3 commit 8c9d23a
Show file tree
Hide file tree
Showing 47 changed files with 987 additions and 194 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/116687.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 116687
summary: Add LogsDB option to route on sort fields
area: Logs
type: enhancement
issues: []
6 changes: 6 additions & 0 deletions docs/changelog/118562.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 118562
summary: Update data stream deprecations warnings to new format and filter searchable
snapshots from response
area: Data streams
type: enhancement
issues: []
16 changes: 10 additions & 6 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -288,9 +288,6 @@ tests:
- class: org.elasticsearch.cluster.service.MasterServiceTests
method: testThreadContext
issue: https://github.com/elastic/elasticsearch/issues/118914
- class: org.elasticsearch.smoketest.SmokeTestMultiNodeClientYamlTestSuiteIT
method: test {yaml=indices.create/20_synthetic_source/create index with use_synthetic_source}
issue: https://github.com/elastic/elasticsearch/issues/118955
- class: org.elasticsearch.repositories.blobstore.testkit.analyze.SecureHdfsRepositoryAnalysisRestIT
issue: https://github.com/elastic/elasticsearch/issues/118970
- class: org.elasticsearch.xpack.security.authc.AuthenticationServiceTests
Expand All @@ -301,9 +298,16 @@ tests:
issue: https://github.com/elastic/elasticsearch/issues/116777
- class: org.elasticsearch.xpack.security.authc.ldap.ActiveDirectoryRunAsIT
issue: https://github.com/elastic/elasticsearch/issues/115727
- class: org.elasticsearch.cluster.coordination.NodeJoinExecutorTests
method: testSuccess
issue: https://github.com/elastic/elasticsearch/issues/119052
- class: org.elasticsearch.xpack.security.authc.kerberos.KerberosAuthenticationIT
issue: https://github.com/elastic/elasticsearch/issues/118414
- class: org.elasticsearch.xpack.esql.qa.multi_node.EsqlClientYamlIT
issue: https://github.com/elastic/elasticsearch/issues/119086
- class: org.elasticsearch.xpack.spatial.index.query.ShapeQueryBuilderOverShapeTests
method: testToQuery
issue: https://github.com/elastic/elasticsearch/issues/119090
- class: org.elasticsearch.xpack.spatial.index.query.GeoShapeQueryBuilderGeoShapeTests
method: testToQuery
issue: https://github.com/elastic/elasticsearch/issues/119091

# Examples:
#
Expand Down
9 changes: 9 additions & 0 deletions rest-api-spec/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,15 @@ tasks.named("yamlRestCompatTestTransform").configure ({ task ->
task.replaceValueInMatch("profile.shards.0.dfs.knn.0.query.0.description", "DocAndScoreQuery[0,...][0.009673266,...],0.009673266", "dfs knn vector profiling with vector_operations_count")
task.skipTest("cat.aliases/10_basic/Deprecated local parameter", "CAT APIs not covered by compatibility policy")
task.skipTest("cat.shards/10_basic/Help", "sync_id is removed in 9.0")
task.skipTest("tsdb/20_mapping/exact match object type", "skip until pr/116687 gets backported")
task.skipTest("tsdb/25_id_generation/delete over _bulk", "skip until pr/116687 gets backported")
task.skipTest("tsdb/80_index_resize/split", "skip until pr/116687 gets backported")
task.skipTest("tsdb/90_unsupported_operations/noop update", "skip until pr/116687 gets backported")
task.skipTest("tsdb/90_unsupported_operations/regular update", "skip until pr/116687 gets backported")
task.skipTest("tsdb/90_unsupported_operations/search with routing", "skip until pr/116687 gets backported")
task.skipTest("tsdb/90_unsupported_operations/index with routing over _bulk", "skip until pr/116687 gets backported")
task.skipTest("tsdb/90_unsupported_operations/update over _bulk", "skip until pr/116687 gets backported")
task.skipTest("tsdb/90_unsupported_operations/index with routing", "skip until pr/116687 gets backported")
task.skipTest("search/500_date_range/from, to, include_lower, include_upper deprecated", "deprecated parameters are removed in 9.0")
task.skipTest("tsdb/20_mapping/stored source is supported", "no longer serialize source_mode")
task.skipTest("tsdb/20_mapping/Synthetic source", "no longer serialize source_mode")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2036,14 +2036,12 @@ create index with use_synthetic_source:
- is_true: test.settings.index.recovery.use_synthetic_source

- do:
bulk:
index:
index: test
id: 1
refresh: true
body:
- '{ "create": { } }'
- '{ "field": "aaaa" }'
- '{ "create": { } }'
- '{ "field": "bbbb" }'
body: { foo: bar }
- match: { _version: 1 }

- do:
indices.disk_usage:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
---
test recovery empty index with use_synthetic_source:
- requires:
cluster_features: ["mapper.synthetic_recovery_source"]
reason: requires synthetic recovery source

- do:
indices.create:
index: test
body:
settings:
index:
number_of_replicas: 0
recovery:
use_synthetic_source: true
mapping:
source:
mode: synthetic

- do:
indices.get_settings: {}
- match: { test.settings.index.mapping.source.mode: synthetic}
- is_true: test.settings.index.recovery.use_synthetic_source

- do:
indices.put_settings:
index: test
body:
index.number_of_replicas: 1

- do:
cluster.health:
wait_for_events: languid
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,48 @@ routing path not allowed in logs mode:
- match: { error.type: "illegal_argument_exception" }
- match: { error.reason: "[index.routing_path] requires [index.mode=time_series]" }

---
routing path allowed in logs mode with routing on sort fields:
- requires:
cluster_features: [ "routing.logsb_route_on_sort_fields" ]
reason: introduction of route on index sorting fields

- do:
indices.create:
index: test
body:
settings:
index:
mode: logsdb
number_of_replicas: 0
number_of_shards: 2
routing_path: [ host.name, agent_id ]
logsdb:
route_on_sort_fields: true
mappings:
properties:
"@timestamp":
type: date
host.name:
type: keyword
agent_id:
type: keyword
process_id:
type: integer
http_method:
type: keyword
message:
type: text

- do:
indices.get_settings:
index: test

- is_true: test
- match: { test.settings.index.mode: logsdb }
- match: { test.settings.index.logsdb.route_on_sort_fields: "true" }
- match: { test.settings.index.routing_path: [ host.name, agent_id ] }

---
start time not allowed in logs mode:
- requires:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ exact match object type:
reason: routing_path error message updated in 8.14.0

- do:
catch: '/All fields that match routing_path must be configured with \[time_series_dimension: true\] or flattened fields with a list of dimensions in \[time_series_dimensions\] and without the \[script\] parameter. \[dim\] was \[object\]./'
catch: '/All fields that match routing_path must be .*flattened fields.* \[dim\] was \[object\]./'
indices.create:
index: tsdb_index
body:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ delete over _bulk:
- match: {items.0.delete.result: deleted}
- match: {items.1.delete.result: deleted}
- match: {items.2.delete.status: 404}
- match: {items.2.delete.error.reason: "invalid id [not found ++ not found] for index [id_generation_test] in time series mode"}
- match: {items.2.delete.error.reason: '/invalid\ id\ \[not\ found\ \+\+\ not\ found\]\ for\ index\ \[id_generation_test\]\ in\ time.series\ mode/'}

---
routing_path matches deep object:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ split:
reason: tsdb indexing changed in 8.2.0

- do:
catch: /index-split is not supported because the destination index \[test\] is in time series mode/
catch: /index-split is not supported because the destination index \[test\] is in time.series mode/
indices.split:
index: test
target: test_split
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ index with routing:
reason: tsdb indexing changed in 8.2.0

- do:
catch: /specifying routing is not supported because the destination index \[test\] is in time series mode/
catch: /specifying routing is not supported because the destination index \[test\] is in time.series mode/
index:
index: test
routing: foo
Expand Down Expand Up @@ -104,7 +104,7 @@ index with routing over _bulk:
body:
- '{"index": {"routing": "foo"}}'
- '{"@timestamp": "2021-04-28T18:50:04.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}'
- match: {items.0.index.error.reason: "specifying routing is not supported because the destination index [test] is in time series mode"}
- match: {items.0.index.error.reason: '/specifying\ routing\ is\ not\ supported\ because\ the\ destination\ index\ \[test\]\ is\ in\ time.series\ mode/'}

---
noop update:
Expand All @@ -120,7 +120,7 @@ noop update:
- length: {hits.hits: 1}

- do:
catch: /update is not supported because the destination index \[test\] is in time series mode/
catch: /update is not supported because the destination index \[test\] is in time.series mode/
update:
index: test
id: "1"
Expand All @@ -136,7 +136,7 @@ regular update:

# We fail even though the document isn't found.
- do:
catch: /update is not supported because the destination index \[test\] is in time series mode/
catch: /update is not supported because the destination index \[test\] is in time.series mode/
update:
index: test
id: "1"
Expand Down Expand Up @@ -165,7 +165,7 @@ update over _bulk:
body:
- '{"update": {"_id": 1}}'
- '{"doc":{"@timestamp": "2021-04-28T18:03:24.467Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434595272, "rx": 530605511}}}}}'
- match: {items.0.update.error.reason: "update is not supported because the destination index [test] is in time series mode"}
- match: {items.0.update.error.reason: '/update\ is\ not\ supported\ because\ the\ destination\ index\ \[test\]\ is\ in\ time.series\ mode/'}

---
search with routing:
Expand All @@ -175,7 +175,7 @@ search with routing:

# We fail even though the document isn't found.
- do:
catch: /searching with a specified routing is not supported because the destination index \[test\] is in time series mode/
catch: /searching with a specified routing is not supported because the destination index \[test\] is in time.series mode/
search:
index: test
routing: rrrr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalInt;
import java.util.function.Supplier;

import static org.elasticsearch.action.ValidateActions.addValidationError;
Expand Down Expand Up @@ -78,7 +79,6 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
private static final TransportVersion PIPELINES_HAVE_RUN_FIELD_ADDED = TransportVersions.V_8_10_X;

private static final Supplier<String> ID_GENERATOR = UUIDs::base64UUID;
private static final Supplier<String> K_SORTED_TIME_BASED_ID_GENERATOR = UUIDs::base64TimeBasedKOrderedUUID;

/**
* Max length of the source document to include into string()
Expand Down Expand Up @@ -705,9 +705,18 @@ public void autoGenerateId() {
}

public void autoGenerateTimeBasedId() {
autoGenerateTimeBasedId(OptionalInt.empty());
}

/**
* Set the {@code #id()} to an automatically generated one, optimized for storage (compression) efficiency.
* If a routing hash is passed, it is included in the generated id starting at 9 bytes before the end.
* @param hash optional routing hash value, used to route requests by id to the right shard.
*/
public void autoGenerateTimeBasedId(OptionalInt hash) {
assertBeforeGeneratingId();
autoGenerateTimestamp();
id(K_SORTED_TIME_BASED_ID_GENERATOR.get());
id(UUIDs.base64TimeBasedKOrderedUUIDWithHash(hash));
}

private void autoGenerateTimestamp() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.OptionalInt;
import java.util.Set;
import java.util.function.IntConsumer;
import java.util.function.IntSupplier;
Expand All @@ -55,6 +56,7 @@ public abstract class IndexRouting {

static final NodeFeature BOOLEAN_ROUTING_PATH = new NodeFeature("routing.boolean_routing_path");
static final NodeFeature MULTI_VALUE_ROUTING_PATH = new NodeFeature("routing.multi_value_routing_path");
static final NodeFeature LOGSB_ROUTE_ON_SORT_FIELDS = new NodeFeature("routing.logsb_route_on_sort_fields");

/**
* Build the routing from {@link IndexMetadata}.
Expand Down Expand Up @@ -165,7 +167,8 @@ private abstract static class IdAndRoutingOnly extends IndexRouting {

@Override
public void preProcess(IndexRequest indexRequest) {
// generate id if not already provided
// Generate id if not already provided.
// This is needed for routing, so it has to happen in pre-processing.
final String id = indexRequest.id();
if (id == null) {
if (shouldUseTimeBasedId(indexMode, creationVersion)) {
Expand Down Expand Up @@ -272,15 +275,20 @@ public void collectSearchShards(String routing, IntConsumer consumer) {
public static class ExtractFromSource extends IndexRouting {
private final Predicate<String> isRoutingPath;
private final XContentParserConfiguration parserConfig;
private final IndexMode indexMode;
private final boolean trackTimeSeriesRoutingHash;
private final boolean addIdWithRoutingHash;
private int hash = Integer.MAX_VALUE;

ExtractFromSource(IndexMetadata metadata) {
super(metadata);
if (metadata.isRoutingPartitionedIndex()) {
throw new IllegalArgumentException("routing_partition_size is incompatible with routing_path");
}
trackTimeSeriesRoutingHash = metadata.getCreationVersion().onOrAfter(IndexVersions.TIME_SERIES_ROUTING_HASH_IN_ID);
indexMode = metadata.getIndexMode();
trackTimeSeriesRoutingHash = indexMode == IndexMode.TIME_SERIES
&& metadata.getCreationVersion().onOrAfter(IndexVersions.TIME_SERIES_ROUTING_HASH_IN_ID);
addIdWithRoutingHash = indexMode == IndexMode.LOGSDB;
List<String> routingPaths = metadata.getRoutingPaths();
isRoutingPath = Regex.simpleMatcher(routingPaths.toArray(String[]::new));
this.parserConfig = XContentParserConfiguration.EMPTY.withFiltering(null, Set.copyOf(routingPaths), null, true);
Expand All @@ -292,8 +300,13 @@ public boolean matchesField(String fieldName) {

@Override
public void postProcess(IndexRequest indexRequest) {
// Update the request with the routing hash, if needed.
// This needs to happen in post-processing, after the routing hash is calculated.
if (trackTimeSeriesRoutingHash) {
indexRequest.routing(TimeSeriesRoutingHashFieldMapper.encode(hash));
} else if (addIdWithRoutingHash) {
assert hash != Integer.MAX_VALUE;
indexRequest.autoGenerateTimeBasedId(OptionalInt.of(hash));
}
}

Expand Down Expand Up @@ -461,12 +474,15 @@ private int idToHash(String id) {
try {
idBytes = Base64.getUrlDecoder().decode(id);
} catch (IllegalArgumentException e) {
throw new ResourceNotFoundException("invalid id [{}] for index [{}] in time series mode", id, indexName);
throw new ResourceNotFoundException("invalid id [{}] for index [{}] in " + indexMode.getName() + " mode", id, indexName);
}
if (idBytes.length < 4) {
throw new ResourceNotFoundException("invalid id [{}] for index [{}] in time series mode", id, indexName);
throw new ResourceNotFoundException("invalid id [{}] for index [{}] in " + indexMode.getName() + " mode", id, indexName);
}
return hashToShardId(ByteUtils.readIntLE(idBytes, 0));
// For TSDB, the hash is stored as the id prefix.
// For LogsDB with routing on sort fields, the routing hash is stored in the range[id.length - 9, id.length - 5] of the id,
// see IndexRequest#autoGenerateTimeBasedId.
return hashToShardId(ByteUtils.readIntLE(idBytes, addIdWithRoutingHash ? idBytes.length - 9 : 0));
}

@Override
Expand All @@ -480,7 +496,7 @@ public void collectSearchShards(String routing, IntConsumer consumer) {
}

private String error(String operation) {
return operation + " is not supported because the destination index [" + indexName + "] is in time series mode";
return operation + " is not supported because the destination index [" + indexName + "] is in " + indexMode.getName() + " mode";
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,9 @@ public class RoutingFeatures implements FeatureSpecification {
public Set<NodeFeature> getFeatures() {
return Set.of(IndexRouting.BOOLEAN_ROUTING_PATH, IndexRouting.MULTI_VALUE_ROUTING_PATH);
}

@Override
public Set<NodeFeature> getTestFeatures() {
return Set.of(IndexRouting.LOGSB_ROUTE_ON_SORT_FIELDS);
}
}
Loading

0 comments on commit 8c9d23a

Please sign in to comment.