diff --git a/kaldb/pom.xml b/kaldb/pom.xml
index dfe34ec022..568d156c24 100644
--- a/kaldb/pom.xml
+++ b/kaldb/pom.xml
@@ -25,8 +25,8 @@
3.4.0
2.15.2
2.15.1
- 9.5.0
- 2.7.0
+ 9.7.0
+ 2.11.0
5.5.0
2.20.0
2.21.2
@@ -187,6 +187,11 @@
lang-painless
${opensearch.version}
+
+ org.opensearch.plugin
+ opensearch-scripting-painless-spi
+ ${opensearch.version}
+
diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/opensearch/KaldbBigArrays.java b/kaldb/src/main/java/com/slack/kaldb/logstore/opensearch/KaldbBigArrays.java
index 0ba69514e2..1af5669d70 100644
--- a/kaldb/src/main/java/com/slack/kaldb/logstore/opensearch/KaldbBigArrays.java
+++ b/kaldb/src/main/java/com/slack/kaldb/logstore/opensearch/KaldbBigArrays.java
@@ -5,7 +5,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.BigArrays;
import org.opensearch.common.util.PageCacheRecycler;
-import org.opensearch.indices.breaker.NoneCircuitBreakerService;
+import org.opensearch.core.indices.breaker.NoneCircuitBreakerService;
/**
* Kaldb singleton wrapper for an OpenSearch BigArrays implementation. Only one BigArrays should be
diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/opensearch/KaldbSearchContext.java b/kaldb/src/main/java/com/slack/kaldb/logstore/opensearch/KaldbSearchContext.java
index 9cc2bde01a..8bcc84efa0 100644
--- a/kaldb/src/main/java/com/slack/kaldb/logstore/opensearch/KaldbSearchContext.java
+++ b/kaldb/src/main/java/com/slack/kaldb/logstore/opensearch/KaldbSearchContext.java
@@ -23,7 +23,10 @@
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.search.SearchExtBuilder;
import org.opensearch.search.SearchShardTarget;
+import org.opensearch.search.aggregations.BucketCollectorProcessor;
+import org.opensearch.search.aggregations.InternalAggregation;
import org.opensearch.search.aggregations.SearchContextAggregations;
+import org.opensearch.search.aggregations.pipeline.PipelineAggregator;
import org.opensearch.search.collapse.CollapseContext;
import org.opensearch.search.dfs.DfsSearchResult;
import org.opensearch.search.fetch.FetchPhase;
@@ -58,6 +61,7 @@
public class KaldbSearchContext extends SearchContext {
private final BigArrays bigArrays;
private final ContextIndexSearcher contextIndexSearcher;
+ private BucketCollectorProcessor bucketCollectorProcessor = NO_OP_BUCKET_COLLECTOR_PROCESSOR;
private final QueryShardContext queryShardContext;
private final Query query;
@@ -77,7 +81,8 @@ public KaldbSearchContext(
IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(),
false,
- indexSearcher.getExecutor());
+ indexSearcher.getExecutor(),
+ this);
}
@Override
@@ -561,4 +566,29 @@ public QueryShardContext getQueryShardContext() {
public ReaderContext readerContext() {
throw new NotImplementedException();
}
+
+ @Override
+ public InternalAggregation.ReduceContext partialOnShard() {
+ return InternalAggregation.ReduceContext.forPartialReduction(
+ KaldbBigArrays.getInstance(),
+ ScriptServiceProvider.getInstance(),
+ () -> PipelineAggregator.PipelineTree.EMPTY);
+ }
+
+ @Override
+ public void setBucketCollectorProcessor(BucketCollectorProcessor bucketCollectorProcessor) {
+ this.bucketCollectorProcessor = bucketCollectorProcessor;
+ }
+
+ @Override
+ public BucketCollectorProcessor bucketCollectorProcessor() {
+ return bucketCollectorProcessor;
+ }
+
+ @Override
+ public boolean shouldUseTimeSeriesDescSortOptimization() {
+ // this is true, since we index with the timestamp in reverse order
+ // see LuceneIndexStoreImpl.buildIndexWriterConfig()
+ return true;
+ }
}
diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/opensearch/OpenSearchAdapter.java b/kaldb/src/main/java/com/slack/kaldb/logstore/opensearch/OpenSearchAdapter.java
index 5a3bdc47c0..0665096c23 100644
--- a/kaldb/src/main/java/com/slack/kaldb/logstore/opensearch/OpenSearchAdapter.java
+++ b/kaldb/src/main/java/com/slack/kaldb/logstore/opensearch/OpenSearchAdapter.java
@@ -40,11 +40,12 @@
import org.opensearch.cluster.ClusterModule;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.CheckedConsumer;
-import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.compress.CompressedXContent;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.BigArrays;
import org.opensearch.common.xcontent.XContentFactory;
+import org.opensearch.core.common.bytes.BytesReference;
+import org.opensearch.core.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.IndexSettings;
@@ -61,7 +62,6 @@
import org.opensearch.index.query.RangeQueryBuilder;
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.indices.IndicesModule;
-import org.opensearch.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.opensearch.script.Script;
import org.opensearch.search.aggregations.AbstractAggregationBuilder;
@@ -327,7 +327,7 @@ protected static IndexSettings buildIndexSettings() {
Settings.builder()
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
- .put(IndexMetadata.SETTING_VERSION_CREATED, Version.V_2_7_0)
+ .put(IndexMetadata.SETTING_VERSION_CREATED, Version.V_2_11_0)
.put(
MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), TOTAL_FIELDS_LIMIT)
diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/opensearch/OpenSearchInternalAggregation.java b/kaldb/src/main/java/com/slack/kaldb/logstore/opensearch/OpenSearchInternalAggregation.java
index 263a05a832..2f21923a60 100644
--- a/kaldb/src/main/java/com/slack/kaldb/logstore/opensearch/OpenSearchInternalAggregation.java
+++ b/kaldb/src/main/java/com/slack/kaldb/logstore/opensearch/OpenSearchInternalAggregation.java
@@ -6,12 +6,12 @@
import java.io.InputStream;
import java.util.Arrays;
import java.util.List;
-import org.opensearch.common.io.stream.InputStreamStreamInput;
-import org.opensearch.common.io.stream.NamedWriteableAwareStreamInput;
-import org.opensearch.common.io.stream.NamedWriteableRegistry;
-import org.opensearch.common.io.stream.OutputStreamStreamOutput;
-import org.opensearch.common.io.stream.StreamInput;
-import org.opensearch.common.io.stream.StreamOutput;
+import org.opensearch.core.common.io.stream.InputStreamStreamInput;
+import org.opensearch.core.common.io.stream.NamedWriteableAwareStreamInput;
+import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
+import org.opensearch.core.common.io.stream.OutputStreamStreamOutput;
+import org.opensearch.core.common.io.stream.StreamInput;
+import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.aggregations.AggregationBuilder;
import org.opensearch.search.aggregations.InternalAggregation;
diff --git a/kaldb/src/main/java/com/slack/kaldb/server/OpenSearchBulkIngestApi.java b/kaldb/src/main/java/com/slack/kaldb/server/OpenSearchBulkIngestApi.java
index 7d81997f1c..c885f1d0bc 100644
--- a/kaldb/src/main/java/com/slack/kaldb/server/OpenSearchBulkIngestApi.java
+++ b/kaldb/src/main/java/com/slack/kaldb/server/OpenSearchBulkIngestApi.java
@@ -40,7 +40,6 @@
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.opensearch.action.index.IndexRequest;
-import org.opensearch.common.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -134,7 +133,7 @@ public OpenSearchBulkIngestApi(
"Kafka bootstrapServers must be provided");
checkArgument(
- !Strings.isEmpty(preprocessorConfig.getDownstreamTopic()),
+ !preprocessorConfig.getDownstreamTopic().isEmpty(),
"Kafka downstreamTopic must be provided");
this.datasetMetadataStore = datasetMetadataStore;