Skip to content

Commit

Permalink
Bump Lucene from 9.5 to 9.7, OpenSearch from 2.7 to 2.11
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanlb authored and vthacker committed Oct 25, 2023
1 parent 2985153 commit f73932d
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 15 deletions.
14 changes: 12 additions & 2 deletions kaldb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
<kafka.version>3.5.0</kafka.version>
<jackson.version>2.15.2</jackson.version>
<jackson.databind.version>2.15.1</jackson.databind.version>
<lucene.version>9.5.0</lucene.version>
<opensearch.version>2.7.0</opensearch.version>
<lucene.version>9.7.0</lucene.version>
<opensearch.version>2.11.0</opensearch.version>
<curator.version>5.5.0</curator.version>
<log4j.version>2.20.0</log4j.version>
<aws.sdk.version>2.21.2</aws.sdk.version>
Expand Down Expand Up @@ -187,6 +187,16 @@
<artifactId>lang-painless</artifactId>
<version>${opensearch.version}</version>
</dependency>
<dependency>
<groupId>org.antlr</groupId>
<artifactId>antlr4-runtime</artifactId>
<version>4.13.1</version>
</dependency>
<dependency>
<groupId>org.opensearch.plugin</groupId>
<artifactId>opensearch-scripting-painless-spi</artifactId>
<version>${opensearch.version}</version>
</dependency>

<!-- Kafka writer dependencies -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.BigArrays;
import org.opensearch.common.util.PageCacheRecycler;
import org.opensearch.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.core.indices.breaker.NoneCircuitBreakerService;

/**
* Kaldb singleton wrapper for an OpenSearch BigArrays implementation. Only one BigArrays should be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.search.SearchExtBuilder;
import org.opensearch.search.SearchShardTarget;
import org.opensearch.search.aggregations.BucketCollectorProcessor;
import org.opensearch.search.aggregations.InternalAggregation;
import org.opensearch.search.aggregations.SearchContextAggregations;
import org.opensearch.search.aggregations.pipeline.PipelineAggregator;
import org.opensearch.search.collapse.CollapseContext;
import org.opensearch.search.dfs.DfsSearchResult;
import org.opensearch.search.fetch.FetchPhase;
Expand Down Expand Up @@ -58,6 +61,7 @@
public class KaldbSearchContext extends SearchContext {
private final BigArrays bigArrays;
private final ContextIndexSearcher contextIndexSearcher;
private BucketCollectorProcessor bucketCollectorProcessor = NO_OP_BUCKET_COLLECTOR_PROCESSOR;
private final QueryShardContext queryShardContext;
private final Query query;

Expand All @@ -77,7 +81,8 @@ public KaldbSearchContext(
IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(),
false,
indexSearcher.getExecutor());
indexSearcher.getExecutor(),
this);
}

@Override
Expand Down Expand Up @@ -561,4 +566,29 @@ public QueryShardContext getQueryShardContext() {
public ReaderContext readerContext() {
throw new NotImplementedException();
}

@Override
public InternalAggregation.ReduceContext partialOnShard() {
return InternalAggregation.ReduceContext.forPartialReduction(
KaldbBigArrays.getInstance(),
ScriptServiceProvider.getInstance(),
() -> PipelineAggregator.PipelineTree.EMPTY);
}

@Override
public void setBucketCollectorProcessor(BucketCollectorProcessor bucketCollectorProcessor) {
this.bucketCollectorProcessor = bucketCollectorProcessor;
}

@Override
public BucketCollectorProcessor bucketCollectorProcessor() {
return bucketCollectorProcessor;
}

@Override
public boolean shouldUseTimeSeriesDescSortOptimization() {
// this is true, since we index with the timestamp in reverse order
// see LuceneIndexStoreImpl.buildIndexWriterConfig()
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,12 @@
import org.opensearch.cluster.ClusterModule;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.CheckedConsumer;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.compress.CompressedXContent;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.BigArrays;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.IndexSettings;
Expand All @@ -61,7 +62,6 @@
import org.opensearch.index.query.RangeQueryBuilder;
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.indices.IndicesModule;
import org.opensearch.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.opensearch.script.Script;
import org.opensearch.search.aggregations.AbstractAggregationBuilder;
Expand Down Expand Up @@ -327,7 +327,7 @@ protected static IndexSettings buildIndexSettings() {
Settings.builder()
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.V_2_7_0)
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.V_2_11_0)
.put(
MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), TOTAL_FIELDS_LIMIT)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
import java.io.InputStream;
import java.util.Arrays;
import java.util.List;
import org.opensearch.common.io.stream.InputStreamStreamInput;
import org.opensearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.io.stream.OutputStreamStreamOutput;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.InputStreamStreamInput;
import org.opensearch.core.common.io.stream.NamedWriteableAwareStreamInput;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.common.io.stream.OutputStreamStreamOutput;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.aggregations.AggregationBuilder;
import org.opensearch.search.aggregations.InternalAggregation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.common.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -134,7 +133,7 @@ public OpenSearchBulkIngestApi(
"Kafka bootstrapServers must be provided");

checkArgument(
!Strings.isEmpty(preprocessorConfig.getDownstreamTopic()),
!preprocessorConfig.getDownstreamTopic().isEmpty(),
"Kafka downstreamTopic must be provided");

this.datasetMetadataStore = datasetMetadataStore;
Expand Down

0 comments on commit f73932d

Please sign in to comment.