From f289e212f1c02da5a7cdde3095c8851745ad9e34 Mon Sep 17 00:00:00 2001 From: Varun Thacker Date: Fri, 16 Jun 2023 15:08:43 -0700 Subject: [PATCH 1/5] make larger segments --- .../slack/kaldb/logstore/LuceneIndexStoreImpl.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreImpl.java b/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreImpl.java index 06c08813e9..6169c06d26 100644 --- a/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreImpl.java +++ b/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreImpl.java @@ -62,6 +62,8 @@ public class LuceneIndexStoreImpl implements LogStore { private final io.micrometer.core.instrument.Timer commitsTimer; private final io.micrometer.core.instrument.Timer refreshesTimer; + private final String MAX_RAM_BUFFER_SIZE_MB = "maxRamBufferSizeMb"; + // TODO: Set the policy via a lucene config file. public static LuceneIndexStoreImpl makeLogStore( File dataDirectory, KaldbConfigs.LuceneConfig luceneConfig, MeterRegistry metricsRegistry) @@ -148,10 +150,14 @@ private IndexWriterConfig buildIndexWriterConfig( SnapshotDeletionPolicy snapshotDeletionPolicy, LuceneIndexStoreConfig config, MeterRegistry metricsRegistry) { + int ramBufferSizeMb = Integer.getInteger(MAX_RAM_BUFFER_SIZE_MB, 1024); + boolean useCFSFiles = ramBufferSizeMb <= 128; final IndexWriterConfig indexWriterCfg = new IndexWriterConfig(analyzer) .setOpenMode(IndexWriterConfig.OpenMode.CREATE) .setMergeScheduler(new KalDBMergeScheduler(metricsRegistry)) + .setRAMBufferSizeMB(ramBufferSizeMb) + .setUseCompoundFile(useCFSFiles) // we sort by timestamp descending, as that is the order we expect to return results the // majority of the time .setIndexSort( @@ -162,6 +168,12 @@ private IndexWriterConfig buildIndexWriterConfig( true))) .setIndexDeletionPolicy(snapshotDeletionPolicy); + // See + // https://lucene.apache.org/core/9_5_0/core/org/apache/lucene/index/IndexWriterConfig.html#setUseCompoundFile(boolean) + if (ramBufferSizeMb >= 128) { + indexWriterCfg.getMergePolicy().setNoCFSRatio(0.0); + } + if (config.enableTracing) { indexWriterCfg.setInfoStream(System.out); } From 0c7193a08d3fc87dede8bd2be3d9e3291c7b7bd1 Mon Sep 17 00:00:00 2001 From: Varun Thacker Date: Mon, 19 Jun 2023 15:05:28 -0700 Subject: [PATCH 2/5] add an option to disable merge throttle --- .../java/com/slack/kaldb/IndexAPILog.java | 1 + .../com/slack/kaldb/IndexingBenchmark.java | 1 + .../java/com/slack/kaldb/QueryBenchmark.java | 1 + config/config.yaml | 1 + .../kaldb/logstore/KalDBMergeScheduler.java | 5 ++++- .../logstore/LuceneIndexStoreConfig.java | 21 ++++++++++++++++--- .../kaldb/logstore/LuceneIndexStoreImpl.java | 6 ++++-- kaldb/src/main/proto/kaldb_configs.proto | 1 + .../kaldb/chunk/IndexingChunkImplTest.java | 3 +++ .../kaldb/chunk/ReadOnlyChunkImplTest.java | 1 + .../kaldb/chunk/RecoveryChunkImplTest.java | 3 +++ .../logstore/LuceneIndexStoreConfigTest.java | 14 +++++++++---- .../SearchResultAggregatorImplTest.java | 3 ++- ...TemporaryLogStoreAndSearcherExtension.java | 2 +- 14 files changed, 51 insertions(+), 12 deletions(-) diff --git a/benchmarks/src/main/java/com/slack/kaldb/IndexAPILog.java b/benchmarks/src/main/java/com/slack/kaldb/IndexAPILog.java index 0ec8e8c0dd..d093883f19 100644 --- a/benchmarks/src/main/java/com/slack/kaldb/IndexAPILog.java +++ b/benchmarks/src/main/java/com/slack/kaldb/IndexAPILog.java @@ -56,6 +56,7 @@ public void createIndexer() throws Exception { commitInterval, refreshInterval, true, + true, SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy.CONVERT_VALUE_AND_DUPLICATE_FIELD, registry); diff --git a/benchmarks/src/main/java/com/slack/kaldb/IndexingBenchmark.java b/benchmarks/src/main/java/com/slack/kaldb/IndexingBenchmark.java index 6b9bbd8fce..8849947916 100644 --- a/benchmarks/src/main/java/com/slack/kaldb/IndexingBenchmark.java +++ b/benchmarks/src/main/java/com/slack/kaldb/IndexingBenchmark.java @@ -55,6 +55,7 @@ public void createIndexer() throws Exception { commitInterval, refreshInterval, true, + true, CONVERT_VALUE_AND_DUPLICATE_FIELD, registry); diff --git a/benchmarks/src/main/java/com/slack/kaldb/QueryBenchmark.java b/benchmarks/src/main/java/com/slack/kaldb/QueryBenchmark.java index 380a9c4d1e..b901bff40d 100644 --- a/benchmarks/src/main/java/com/slack/kaldb/QueryBenchmark.java +++ b/benchmarks/src/main/java/com/slack/kaldb/QueryBenchmark.java @@ -70,6 +70,7 @@ public void createIndexer() throws Exception { commitInterval, refreshInterval, true, + true, SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy.CONVERT_VALUE_AND_DUPLICATE_FIELD, registry); diff --git a/config/config.yaml b/config/config.yaml index 79785a14d0..6567b18c36 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -7,6 +7,7 @@ indexerConfig: commitDurationSecs: ${INDEXER_COMMIT_DURATION_SECS:-10} refreshDurationSecs: ${INDEXER_REFRESH_DURATION_SECS:-11} enableFullTextSearch: ${INDEXER_ENABLE_FULL_TEXT_SEARCH:-false} + throttleMerges: ${INDEXER_THROTTLE_MERGES:-true} staleDurationSecs: ${INDEXER_STALE_DURATION_SECS:-7200} dataTransformer: ${INDEXER_DATA_TRANSFORMER:-trace_span} dataDirectory: ${INDEXER_DATA_DIR:-/tmp} diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/KalDBMergeScheduler.java b/kaldb/src/main/java/com/slack/kaldb/logstore/KalDBMergeScheduler.java index 736d1a27fd..5812a1f1fe 100644 --- a/kaldb/src/main/java/com/slack/kaldb/logstore/KalDBMergeScheduler.java +++ b/kaldb/src/main/java/com/slack/kaldb/logstore/KalDBMergeScheduler.java @@ -12,6 +12,7 @@ public class KalDBMergeScheduler extends ConcurrentMergeScheduler { private static final Logger LOG = LoggerFactory.getLogger(KalDBMergeScheduler.class); + private final boolean throttleMerges; private MeterRegistry metricsRegistry; public static final String STALL_TIME = "kaldb_index_merge_stall_time_ms"; @@ -23,11 +24,12 @@ public class KalDBMergeScheduler extends ConcurrentMergeScheduler { public static final String MERGE_COUNTER = "kaldb_index_merge_count"; private final Counter mergeCounter; - public KalDBMergeScheduler(MeterRegistry metricsRegistry) { + public KalDBMergeScheduler(MeterRegistry metricsRegistry, boolean throttleMerges) { this.metricsRegistry = metricsRegistry; stallCounter = this.metricsRegistry.counter(STALL_TIME); activeStallThreadsCount = this.metricsRegistry.gauge(STALL_THREADS, new AtomicInteger()); this.mergeCounter = this.metricsRegistry.counter(MERGE_COUNTER); + this.throttleMerges = throttleMerges; } protected void doMerge(MergeSource mergeSource, MergePolicy.OneMerge merge) throws IOException { @@ -48,6 +50,7 @@ protected void doMerge(MergeSource mergeSource, MergePolicy.OneMerge merge) thro * has details on why Lucene added auto IO throttle */ protected synchronized boolean maybeStall(MergeSource mergeSource) { + if (!throttleMerges) return false; long startTime = System.nanoTime(); activeStallThreadsCount.incrementAndGet(); diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreConfig.java b/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreConfig.java index 2cb6098acf..dfa65cba64 100644 --- a/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreConfig.java +++ b/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreConfig.java @@ -29,6 +29,9 @@ public class LuceneIndexStoreConfig { // A flag that turns on internal logging. public final boolean enableTracing; + // setting for the merge scheduler + public final boolean throttleMerges; + // TODO: Tweak the default values once in prod. static final Duration defaultCommitDuration = Duration.ofSeconds(15); static final Duration defaultRefreshDuration = Duration.ofSeconds(15); @@ -44,8 +47,18 @@ public static Duration getRefreshDuration(final long refreshDurationSecs) { } public LuceneIndexStoreConfig( - Duration commitDuration, Duration refreshDuration, String indexRoot, boolean enableTracing) { - this(commitDuration, refreshDuration, indexRoot, DEFAULT_LOG_FILE_NAME, enableTracing); + Duration commitDuration, + Duration refreshDuration, + String indexRoot, + boolean enableTracing, + boolean throttleMerges) { + this( + commitDuration, + refreshDuration, + indexRoot, + DEFAULT_LOG_FILE_NAME, + enableTracing, + throttleMerges); } public LuceneIndexStoreConfig( @@ -53,7 +66,8 @@ public LuceneIndexStoreConfig( Duration refreshDuration, String indexRoot, String logFileName, - boolean enableTracing) { + boolean enableTracing, + boolean throttleMerges) { ensureTrue( !(commitDuration.isZero() || commitDuration.isNegative()), "Commit duration should be greater than zero"); @@ -65,6 +79,7 @@ public LuceneIndexStoreConfig( this.indexRoot = indexRoot; this.logFileName = logFileName; this.enableTracing = enableTracing; + this.throttleMerges = throttleMerges; } public File indexFolder(String id) { diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreImpl.java b/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreImpl.java index 6169c06d26..36dbd2fabc 100644 --- a/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreImpl.java +++ b/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreImpl.java @@ -73,6 +73,7 @@ public static LuceneIndexStoreImpl makeLogStore( LuceneIndexStoreConfig.getCommitDuration(luceneConfig.getCommitDurationSecs()), LuceneIndexStoreConfig.getRefreshDuration(luceneConfig.getRefreshDurationSecs()), luceneConfig.getEnableFullTextSearch(), + luceneConfig.getThrottleMerges(), SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy.CONVERT_VALUE_AND_DUPLICATE_FIELD, metricsRegistry); } @@ -82,6 +83,7 @@ public static LuceneIndexStoreImpl makeLogStore( Duration commitInterval, Duration refreshInterval, boolean enableFullTextSearch, + boolean throttleMerges, SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy fieldConflictPolicy, MeterRegistry metricsRegistry) throws IOException { @@ -89,7 +91,7 @@ public static LuceneIndexStoreImpl makeLogStore( // TODO: Chunk should create log store? LuceneIndexStoreConfig indexStoreCfg = new LuceneIndexStoreConfig( - commitInterval, refreshInterval, dataDirectory.getAbsolutePath(), false); + commitInterval, refreshInterval, dataDirectory.getAbsolutePath(), false, true); return new LuceneIndexStoreImpl( indexStoreCfg, @@ -155,7 +157,7 @@ private IndexWriterConfig buildIndexWriterConfig( final IndexWriterConfig indexWriterCfg = new IndexWriterConfig(analyzer) .setOpenMode(IndexWriterConfig.OpenMode.CREATE) - .setMergeScheduler(new KalDBMergeScheduler(metricsRegistry)) + .setMergeScheduler(new KalDBMergeScheduler(metricsRegistry, config.throttleMerges)) .setRAMBufferSizeMB(ramBufferSizeMb) .setUseCompoundFile(useCFSFiles) // we sort by timestamp descending, as that is the order we expect to return results the diff --git a/kaldb/src/main/proto/kaldb_configs.proto b/kaldb/src/main/proto/kaldb_configs.proto index 3fd5adb099..6c94382584 100644 --- a/kaldb/src/main/proto/kaldb_configs.proto +++ b/kaldb/src/main/proto/kaldb_configs.proto @@ -115,6 +115,7 @@ message LuceneConfig { int64 commit_duration_secs = 1; int64 refresh_duration_secs = 2; bool enable_full_text_search = 3; + bool throttle_merges = 4; } // ServerConfig contains the address and port info of a Kaldb service. diff --git a/kaldb/src/test/java/com/slack/kaldb/chunk/IndexingChunkImplTest.java b/kaldb/src/test/java/com/slack/kaldb/chunk/IndexingChunkImplTest.java index 085b7fd478..ebccd11696 100644 --- a/kaldb/src/test/java/com/slack/kaldb/chunk/IndexingChunkImplTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/chunk/IndexingChunkImplTest.java @@ -121,6 +121,7 @@ public void setUp() throws Exception { COMMIT_INTERVAL, REFRESH_INTERVAL, true, + false, SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy .CONVERT_VALUE_AND_DUPLICATE_FIELD, registry); @@ -467,6 +468,7 @@ public void setUp() throws Exception { COMMIT_INTERVAL, REFRESH_INTERVAL, true, + false, SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy.RAISE_ERROR, registry); chunk = @@ -551,6 +553,7 @@ public void setUp() throws Exception { COMMIT_INTERVAL, REFRESH_INTERVAL, true, + false, SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy .CONVERT_VALUE_AND_DUPLICATE_FIELD, registry); diff --git a/kaldb/src/test/java/com/slack/kaldb/chunk/ReadOnlyChunkImplTest.java b/kaldb/src/test/java/com/slack/kaldb/chunk/ReadOnlyChunkImplTest.java index c79e18929c..1c174c9a11 100644 --- a/kaldb/src/test/java/com/slack/kaldb/chunk/ReadOnlyChunkImplTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/chunk/ReadOnlyChunkImplTest.java @@ -508,6 +508,7 @@ private void initializeBlobStorageWithIndex(String snapshotId) throws Exception Duration.ofSeconds(60), Duration.ofSeconds(60), true, + false, SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy.CONVERT_VALUE_AND_DUPLICATE_FIELD, meterRegistry); addMessages(logStore, 1, 10, true); diff --git a/kaldb/src/test/java/com/slack/kaldb/chunk/RecoveryChunkImplTest.java b/kaldb/src/test/java/com/slack/kaldb/chunk/RecoveryChunkImplTest.java index 7889dfe0cf..877183b517 100644 --- a/kaldb/src/test/java/com/slack/kaldb/chunk/RecoveryChunkImplTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/chunk/RecoveryChunkImplTest.java @@ -100,6 +100,7 @@ public void setUp() throws Exception { COMMIT_INTERVAL, REFRESH_INTERVAL, true, + false, SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy .CONVERT_VALUE_AND_DUPLICATE_FIELD, registry); @@ -455,6 +456,7 @@ public void setUp() throws Exception { COMMIT_INTERVAL, REFRESH_INTERVAL, true, + false, SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy.RAISE_ERROR, registry); chunk = @@ -541,6 +543,7 @@ public void setUp() throws Exception { COMMIT_INTERVAL, REFRESH_INTERVAL, true, + false, SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy .CONVERT_VALUE_AND_DUPLICATE_FIELD, registry); diff --git a/kaldb/src/test/java/com/slack/kaldb/logstore/LuceneIndexStoreConfigTest.java b/kaldb/src/test/java/com/slack/kaldb/logstore/LuceneIndexStoreConfigTest.java index 0c27d79940..821f132c4f 100644 --- a/kaldb/src/test/java/com/slack/kaldb/logstore/LuceneIndexStoreConfigTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/logstore/LuceneIndexStoreConfigTest.java @@ -12,7 +12,7 @@ public void testZeroCommitDuration() { .isThrownBy( () -> new LuceneIndexStoreConfig( - Duration.ZERO, Duration.ofSeconds(10), "indexRoot", "logfile", true)); + Duration.ZERO, Duration.ofSeconds(10), "indexRoot", "logfile", true, false)); } @Test @@ -21,7 +21,7 @@ public void testZeroRefreshDuration() { .isThrownBy( () -> new LuceneIndexStoreConfig( - Duration.ofSeconds(10), Duration.ZERO, "indexRoot", "logfile", true)); + Duration.ofSeconds(10), Duration.ZERO, "indexRoot", "logfile", true, false)); } @Test @@ -30,7 +30,12 @@ public void testNegativeCommitDuration() { .isThrownBy( () -> new LuceneIndexStoreConfig( - Duration.ofSeconds(-10), Duration.ofSeconds(10), "indexRoot", "logfile", true)); + Duration.ofSeconds(-10), + Duration.ofSeconds(10), + "indexRoot", + "logfile", + true, + false)); } @Test @@ -43,6 +48,7 @@ public void testNegativeRefreshDuration() { Duration.ofSeconds(-100), "indexRoot", "logfile", - true)); + true, + false)); } } diff --git a/kaldb/src/test/java/com/slack/kaldb/logstore/search/SearchResultAggregatorImplTest.java b/kaldb/src/test/java/com/slack/kaldb/logstore/search/SearchResultAggregatorImplTest.java index 6266bd8d79..727a14817b 100644 --- a/kaldb/src/test/java/com/slack/kaldb/logstore/search/SearchResultAggregatorImplTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/logstore/search/SearchResultAggregatorImplTest.java @@ -469,7 +469,8 @@ private InternalAggregation makeHistogram( Duration.of(1, ChronoUnit.MINUTES), Duration.of(1, ChronoUnit.MINUTES), tempFolder.getCanonicalPath(), - false); + false, + true); MeterRegistry metricsRegistry = new SimpleMeterRegistry(); DocumentBuilder documentBuilder = SchemaAwareLogDocumentBuilderImpl.build( diff --git a/kaldb/src/test/java/com/slack/kaldb/testlib/TemporaryLogStoreAndSearcherExtension.java b/kaldb/src/test/java/com/slack/kaldb/testlib/TemporaryLogStoreAndSearcherExtension.java index 1245a265ae..ced3f8b9d8 100644 --- a/kaldb/src/test/java/com/slack/kaldb/testlib/TemporaryLogStoreAndSearcherExtension.java +++ b/kaldb/src/test/java/com/slack/kaldb/testlib/TemporaryLogStoreAndSearcherExtension.java @@ -98,7 +98,7 @@ public TemporaryLogStoreAndSearcherExtension( public static LuceneIndexStoreConfig getIndexStoreConfig( Duration commitInterval, Duration refreshInterval, File tempFolder) throws IOException { return new LuceneIndexStoreConfig( - commitInterval, refreshInterval, tempFolder.getCanonicalPath(), false); + commitInterval, refreshInterval, tempFolder.getCanonicalPath(), false, true); } @Override From 034541a67eacb53a37907535aa45c947b56fb51c Mon Sep 17 00:00:00 2001 From: Varun Thacker Date: Mon, 19 Jun 2023 15:33:22 -0700 Subject: [PATCH 3/5] Revert "make larger segments" This reverts commit f289e212f1c02da5a7cdde3095c8851745ad9e34. --- .../slack/kaldb/logstore/LuceneIndexStoreImpl.java | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreImpl.java b/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreImpl.java index 36dbd2fabc..df16a9d3e4 100644 --- a/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreImpl.java +++ b/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreImpl.java @@ -62,8 +62,6 @@ public class LuceneIndexStoreImpl implements LogStore { private final io.micrometer.core.instrument.Timer commitsTimer; private final io.micrometer.core.instrument.Timer refreshesTimer; - private final String MAX_RAM_BUFFER_SIZE_MB = "maxRamBufferSizeMb"; - // TODO: Set the policy via a lucene config file. public static LuceneIndexStoreImpl makeLogStore( File dataDirectory, KaldbConfigs.LuceneConfig luceneConfig, MeterRegistry metricsRegistry) @@ -152,14 +150,10 @@ private IndexWriterConfig buildIndexWriterConfig( SnapshotDeletionPolicy snapshotDeletionPolicy, LuceneIndexStoreConfig config, MeterRegistry metricsRegistry) { - int ramBufferSizeMb = Integer.getInteger(MAX_RAM_BUFFER_SIZE_MB, 1024); - boolean useCFSFiles = ramBufferSizeMb <= 128; final IndexWriterConfig indexWriterCfg = new IndexWriterConfig(analyzer) .setOpenMode(IndexWriterConfig.OpenMode.CREATE) .setMergeScheduler(new KalDBMergeScheduler(metricsRegistry, config.throttleMerges)) - .setRAMBufferSizeMB(ramBufferSizeMb) - .setUseCompoundFile(useCFSFiles) // we sort by timestamp descending, as that is the order we expect to return results the // majority of the time .setIndexSort( @@ -169,13 +163,6 @@ private IndexWriterConfig buildIndexWriterConfig( SortField.Type.LONG, true))) .setIndexDeletionPolicy(snapshotDeletionPolicy); - - // See - // https://lucene.apache.org/core/9_5_0/core/org/apache/lucene/index/IndexWriterConfig.html#setUseCompoundFile(boolean) - if (ramBufferSizeMb >= 128) { - indexWriterCfg.getMergePolicy().setNoCFSRatio(0.0); - } - if (config.enableTracing) { indexWriterCfg.setInfoStream(System.out); } From d62b1c9dbc2acb19419f186e4b8db7c2f85fb7ee Mon Sep 17 00:00:00 2001 From: Varun Thacker Date: Mon, 19 Jun 2023 15:35:42 -0700 Subject: [PATCH 4/5] LuceneIndexStoreConfigTest should use true for throttling --- .../slack/kaldb/logstore/LuceneIndexStoreConfigTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/kaldb/src/test/java/com/slack/kaldb/logstore/LuceneIndexStoreConfigTest.java b/kaldb/src/test/java/com/slack/kaldb/logstore/LuceneIndexStoreConfigTest.java index 821f132c4f..1d1a614f3c 100644 --- a/kaldb/src/test/java/com/slack/kaldb/logstore/LuceneIndexStoreConfigTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/logstore/LuceneIndexStoreConfigTest.java @@ -12,7 +12,7 @@ public void testZeroCommitDuration() { .isThrownBy( () -> new LuceneIndexStoreConfig( - Duration.ZERO, Duration.ofSeconds(10), "indexRoot", "logfile", true, false)); + Duration.ZERO, Duration.ofSeconds(10), "indexRoot", "logfile", true, true)); } @Test @@ -21,7 +21,7 @@ public void testZeroRefreshDuration() { .isThrownBy( () -> new LuceneIndexStoreConfig( - Duration.ofSeconds(10), Duration.ZERO, "indexRoot", "logfile", true, false)); + Duration.ofSeconds(10), Duration.ZERO, "indexRoot", "logfile", true, true)); } @Test @@ -35,7 +35,7 @@ public void testNegativeCommitDuration() { "indexRoot", "logfile", true, - false)); + true)); } @Test @@ -49,6 +49,6 @@ public void testNegativeRefreshDuration() { "indexRoot", "logfile", true, - false)); + true)); } } From 8ec94b1bcb65d8c45b03caa9602246dfe1646336 Mon Sep 17 00:00:00 2001 From: Varun Thacker Date: Mon, 19 Jun 2023 15:55:13 -0700 Subject: [PATCH 5/5] more changes --- .../java/com/slack/kaldb/logstore/LuceneIndexStoreImpl.java | 6 +++++- .../java/com/slack/kaldb/chunk/IndexingChunkImplTest.java | 6 +++--- .../java/com/slack/kaldb/chunk/ReadOnlyChunkImplTest.java | 2 +- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreImpl.java b/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreImpl.java index df16a9d3e4..9109ee0170 100644 --- a/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreImpl.java +++ b/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreImpl.java @@ -89,7 +89,11 @@ public static LuceneIndexStoreImpl makeLogStore( // TODO: Chunk should create log store? LuceneIndexStoreConfig indexStoreCfg = new LuceneIndexStoreConfig( - commitInterval, refreshInterval, dataDirectory.getAbsolutePath(), false, true); + commitInterval, + refreshInterval, + dataDirectory.getAbsolutePath(), + false, + throttleMerges); return new LuceneIndexStoreImpl( indexStoreCfg, diff --git a/kaldb/src/test/java/com/slack/kaldb/chunk/IndexingChunkImplTest.java b/kaldb/src/test/java/com/slack/kaldb/chunk/IndexingChunkImplTest.java index ebccd11696..11938c8bea 100644 --- a/kaldb/src/test/java/com/slack/kaldb/chunk/IndexingChunkImplTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/chunk/IndexingChunkImplTest.java @@ -121,7 +121,7 @@ public void setUp() throws Exception { COMMIT_INTERVAL, REFRESH_INTERVAL, true, - false, + true, SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy .CONVERT_VALUE_AND_DUPLICATE_FIELD, registry); @@ -468,7 +468,7 @@ public void setUp() throws Exception { COMMIT_INTERVAL, REFRESH_INTERVAL, true, - false, + true, SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy.RAISE_ERROR, registry); chunk = @@ -553,7 +553,7 @@ public void setUp() throws Exception { COMMIT_INTERVAL, REFRESH_INTERVAL, true, - false, + true, SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy .CONVERT_VALUE_AND_DUPLICATE_FIELD, registry); diff --git a/kaldb/src/test/java/com/slack/kaldb/chunk/ReadOnlyChunkImplTest.java b/kaldb/src/test/java/com/slack/kaldb/chunk/ReadOnlyChunkImplTest.java index 1c174c9a11..8e0e06f157 100644 --- a/kaldb/src/test/java/com/slack/kaldb/chunk/ReadOnlyChunkImplTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/chunk/ReadOnlyChunkImplTest.java @@ -508,7 +508,7 @@ private void initializeBlobStorageWithIndex(String snapshotId) throws Exception Duration.ofSeconds(60), Duration.ofSeconds(60), true, - false, + true, SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy.CONVERT_VALUE_AND_DUPLICATE_FIELD, meterRegistry); addMessages(logStore, 1, 10, true);