From c297d4107e4c6fcd4059e082d8f57c9f44eb59cd Mon Sep 17 00:00:00 2001 From: Bryan Burkholder Date: Wed, 25 Oct 2023 15:26:25 -0700 Subject: [PATCH] Bump Lucene from 9.5 to 9.7, OpenSearch from 2.7 to 2.11 --- kaldb/pom.xml | 9 ++++-- .../logstore/opensearch/KaldbBigArrays.java | 2 +- .../opensearch/KaldbSearchContext.java | 32 ++++++++++++++++++- .../opensearch/OpenSearchAdapter.java | 6 ++-- .../OpenSearchInternalAggregation.java | 12 +++---- .../kaldb/server/OpenSearchBulkIngestApi.java | 3 +- 6 files changed, 49 insertions(+), 15 deletions(-) diff --git a/kaldb/pom.xml b/kaldb/pom.xml index dfe34ec022..568d156c24 100644 --- a/kaldb/pom.xml +++ b/kaldb/pom.xml @@ -25,8 +25,8 @@ 3.4.0 2.15.2 2.15.1 - 9.5.0 - 2.7.0 + 9.7.0 + 2.11.0 5.5.0 2.20.0 2.21.2 @@ -187,6 +187,11 @@ lang-painless ${opensearch.version} + + org.opensearch.plugin + opensearch-scripting-painless-spi + ${opensearch.version} + diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/opensearch/KaldbBigArrays.java b/kaldb/src/main/java/com/slack/kaldb/logstore/opensearch/KaldbBigArrays.java index 0ba69514e2..1af5669d70 100644 --- a/kaldb/src/main/java/com/slack/kaldb/logstore/opensearch/KaldbBigArrays.java +++ b/kaldb/src/main/java/com/slack/kaldb/logstore/opensearch/KaldbBigArrays.java @@ -5,7 +5,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.util.BigArrays; import org.opensearch.common.util.PageCacheRecycler; -import org.opensearch.indices.breaker.NoneCircuitBreakerService; +import org.opensearch.core.indices.breaker.NoneCircuitBreakerService; /** * Kaldb singleton wrapper for an OpenSearch BigArrays implementation. Only one BigArrays should be diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/opensearch/KaldbSearchContext.java b/kaldb/src/main/java/com/slack/kaldb/logstore/opensearch/KaldbSearchContext.java index 9cc2bde01a..8bcc84efa0 100644 --- a/kaldb/src/main/java/com/slack/kaldb/logstore/opensearch/KaldbSearchContext.java +++ b/kaldb/src/main/java/com/slack/kaldb/logstore/opensearch/KaldbSearchContext.java @@ -23,7 +23,10 @@ import org.opensearch.index.similarity.SimilarityService; import org.opensearch.search.SearchExtBuilder; import org.opensearch.search.SearchShardTarget; +import org.opensearch.search.aggregations.BucketCollectorProcessor; +import org.opensearch.search.aggregations.InternalAggregation; import org.opensearch.search.aggregations.SearchContextAggregations; +import org.opensearch.search.aggregations.pipeline.PipelineAggregator; import org.opensearch.search.collapse.CollapseContext; import org.opensearch.search.dfs.DfsSearchResult; import org.opensearch.search.fetch.FetchPhase; @@ -58,6 +61,7 @@ public class KaldbSearchContext extends SearchContext { private final BigArrays bigArrays; private final ContextIndexSearcher contextIndexSearcher; + private BucketCollectorProcessor bucketCollectorProcessor = NO_OP_BUCKET_COLLECTOR_PROCESSOR; private final QueryShardContext queryShardContext; private final Query query; @@ -77,7 +81,8 @@ public KaldbSearchContext( IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), false, - indexSearcher.getExecutor()); + indexSearcher.getExecutor(), + this); } @Override @@ -561,4 +566,29 @@ public QueryShardContext getQueryShardContext() { public ReaderContext readerContext() { throw new NotImplementedException(); } + + @Override + public InternalAggregation.ReduceContext partialOnShard() { + return InternalAggregation.ReduceContext.forPartialReduction( + KaldbBigArrays.getInstance(), + ScriptServiceProvider.getInstance(), + () -> PipelineAggregator.PipelineTree.EMPTY); + } + + @Override + public void setBucketCollectorProcessor(BucketCollectorProcessor bucketCollectorProcessor) { + this.bucketCollectorProcessor = bucketCollectorProcessor; + } + + @Override + public BucketCollectorProcessor bucketCollectorProcessor() { + return bucketCollectorProcessor; + } + + @Override + public boolean shouldUseTimeSeriesDescSortOptimization() { + // this is true, since we index with the timestamp in reverse order + // see LuceneIndexStoreImpl.buildIndexWriterConfig() + return true; + } } diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/opensearch/OpenSearchAdapter.java b/kaldb/src/main/java/com/slack/kaldb/logstore/opensearch/OpenSearchAdapter.java index 5a3bdc47c0..0665096c23 100644 --- a/kaldb/src/main/java/com/slack/kaldb/logstore/opensearch/OpenSearchAdapter.java +++ b/kaldb/src/main/java/com/slack/kaldb/logstore/opensearch/OpenSearchAdapter.java @@ -40,11 +40,12 @@ import org.opensearch.cluster.ClusterModule; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.CheckedConsumer; -import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.compress.CompressedXContent; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.BigArrays; import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.indices.breaker.NoneCircuitBreakerService; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.index.IndexSettings; @@ -61,7 +62,6 @@ import org.opensearch.index.query.RangeQueryBuilder; import org.opensearch.index.similarity.SimilarityService; import org.opensearch.indices.IndicesModule; -import org.opensearch.indices.breaker.NoneCircuitBreakerService; import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache; import org.opensearch.script.Script; import org.opensearch.search.aggregations.AbstractAggregationBuilder; @@ -327,7 +327,7 @@ protected static IndexSettings buildIndexSettings() { Settings.builder() .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - .put(IndexMetadata.SETTING_VERSION_CREATED, Version.V_2_7_0) + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.V_2_11_0) .put( MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), TOTAL_FIELDS_LIMIT) diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/opensearch/OpenSearchInternalAggregation.java b/kaldb/src/main/java/com/slack/kaldb/logstore/opensearch/OpenSearchInternalAggregation.java index 263a05a832..2f21923a60 100644 --- a/kaldb/src/main/java/com/slack/kaldb/logstore/opensearch/OpenSearchInternalAggregation.java +++ b/kaldb/src/main/java/com/slack/kaldb/logstore/opensearch/OpenSearchInternalAggregation.java @@ -6,12 +6,12 @@ import java.io.InputStream; import java.util.Arrays; import java.util.List; -import org.opensearch.common.io.stream.InputStreamStreamInput; -import org.opensearch.common.io.stream.NamedWriteableAwareStreamInput; -import org.opensearch.common.io.stream.NamedWriteableRegistry; -import org.opensearch.common.io.stream.OutputStreamStreamOutput; -import org.opensearch.common.io.stream.StreamInput; -import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.InputStreamStreamInput; +import org.opensearch.core.common.io.stream.NamedWriteableAwareStreamInput; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.core.common.io.stream.OutputStreamStreamOutput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.search.DocValueFormat; import org.opensearch.search.aggregations.AggregationBuilder; import org.opensearch.search.aggregations.InternalAggregation; diff --git a/kaldb/src/main/java/com/slack/kaldb/server/OpenSearchBulkIngestApi.java b/kaldb/src/main/java/com/slack/kaldb/server/OpenSearchBulkIngestApi.java index 7d81997f1c..c885f1d0bc 100644 --- a/kaldb/src/main/java/com/slack/kaldb/server/OpenSearchBulkIngestApi.java +++ b/kaldb/src/main/java/com/slack/kaldb/server/OpenSearchBulkIngestApi.java @@ -40,7 +40,6 @@ import org.apache.kafka.common.errors.OutOfOrderSequenceException; import org.apache.kafka.common.errors.ProducerFencedException; import org.opensearch.action.index.IndexRequest; -import org.opensearch.common.Strings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -134,7 +133,7 @@ public OpenSearchBulkIngestApi( "Kafka bootstrapServers must be provided"); checkArgument( - !Strings.isEmpty(preprocessorConfig.getDownstreamTopic()), + !preprocessorConfig.getDownstreamTopic().isEmpty(), "Kafka downstreamTopic must be provided"); this.datasetMetadataStore = datasetMetadataStore;