diff --git a/benchmarks/src/main/java/com/slack/kaldb/IndexAPILog.java b/benchmarks/src/main/java/com/slack/kaldb/IndexAPILog.java index 0ec8e8c0dd..1de2f67341 100644 --- a/benchmarks/src/main/java/com/slack/kaldb/IndexAPILog.java +++ b/benchmarks/src/main/java/com/slack/kaldb/IndexAPILog.java @@ -56,6 +56,7 @@ public void createIndexer() throws Exception { commitInterval, refreshInterval, true, + false, SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy.CONVERT_VALUE_AND_DUPLICATE_FIELD, registry); diff --git a/benchmarks/src/main/java/com/slack/kaldb/IndexingBenchmark.java b/benchmarks/src/main/java/com/slack/kaldb/IndexingBenchmark.java index 6b9bbd8fce..80538d223f 100644 --- a/benchmarks/src/main/java/com/slack/kaldb/IndexingBenchmark.java +++ b/benchmarks/src/main/java/com/slack/kaldb/IndexingBenchmark.java @@ -55,6 +55,7 @@ public void createIndexer() throws Exception { commitInterval, refreshInterval, true, + false, CONVERT_VALUE_AND_DUPLICATE_FIELD, registry); diff --git a/benchmarks/src/main/java/com/slack/kaldb/QueryBenchmark.java b/benchmarks/src/main/java/com/slack/kaldb/QueryBenchmark.java index 380a9c4d1e..a4f9a6c6f7 100644 --- a/benchmarks/src/main/java/com/slack/kaldb/QueryBenchmark.java +++ b/benchmarks/src/main/java/com/slack/kaldb/QueryBenchmark.java @@ -70,6 +70,7 @@ public void createIndexer() throws Exception { commitInterval, refreshInterval, true, + false, SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy.CONVERT_VALUE_AND_DUPLICATE_FIELD, registry); diff --git a/config/config.yaml b/config/config.yaml index 79785a14d0..6567b18c36 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -7,6 +7,7 @@ indexerConfig: commitDurationSecs: ${INDEXER_COMMIT_DURATION_SECS:-10} refreshDurationSecs: ${INDEXER_REFRESH_DURATION_SECS:-11} enableFullTextSearch: ${INDEXER_ENABLE_FULL_TEXT_SEARCH:-false} + throttleMerges: ${INDEXER_THROTTLE_MERGES:-true} staleDurationSecs: ${INDEXER_STALE_DURATION_SECS:-7200} dataTransformer: ${INDEXER_DATA_TRANSFORMER:-trace_span} dataDirectory: ${INDEXER_DATA_DIR:-/tmp} diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/KalDBMergeScheduler.java b/kaldb/src/main/java/com/slack/kaldb/logstore/KalDBMergeScheduler.java index 736d1a27fd..5812a1f1fe 100644 --- a/kaldb/src/main/java/com/slack/kaldb/logstore/KalDBMergeScheduler.java +++ b/kaldb/src/main/java/com/slack/kaldb/logstore/KalDBMergeScheduler.java @@ -12,6 +12,7 @@ public class KalDBMergeScheduler extends ConcurrentMergeScheduler { private static final Logger LOG = LoggerFactory.getLogger(KalDBMergeScheduler.class); + private final boolean throttleMerges; private MeterRegistry metricsRegistry; public static final String STALL_TIME = "kaldb_index_merge_stall_time_ms"; @@ -23,11 +24,12 @@ public class KalDBMergeScheduler extends ConcurrentMergeScheduler { public static final String MERGE_COUNTER = "kaldb_index_merge_count"; private final Counter mergeCounter; - public KalDBMergeScheduler(MeterRegistry metricsRegistry) { + public KalDBMergeScheduler(MeterRegistry metricsRegistry, boolean throttleMerges) { this.metricsRegistry = metricsRegistry; stallCounter = this.metricsRegistry.counter(STALL_TIME); activeStallThreadsCount = this.metricsRegistry.gauge(STALL_THREADS, new AtomicInteger()); this.mergeCounter = this.metricsRegistry.counter(MERGE_COUNTER); + this.throttleMerges = throttleMerges; } protected void doMerge(MergeSource mergeSource, MergePolicy.OneMerge merge) throws IOException { @@ -48,6 +50,7 @@ protected void doMerge(MergeSource mergeSource, MergePolicy.OneMerge merge) thro * has details on why Lucene added auto IO throttle */ protected synchronized boolean maybeStall(MergeSource mergeSource) { + if (!throttleMerges) return false; long startTime = System.nanoTime(); activeStallThreadsCount.incrementAndGet(); diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreConfig.java b/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreConfig.java index 2cb6098acf..daca62300b 100644 --- a/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreConfig.java +++ b/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreConfig.java @@ -29,6 +29,9 @@ public class LuceneIndexStoreConfig { // A flag that turns on internal logging. public final boolean enableTracing; + // setting for the merge scheduler + public final boolean throttleMerges; + // TODO: Tweak the default values once in prod. static final Duration defaultCommitDuration = Duration.ofSeconds(15); static final Duration defaultRefreshDuration = Duration.ofSeconds(15); @@ -45,7 +48,7 @@ public static Duration getRefreshDuration(final long refreshDurationSecs) { public LuceneIndexStoreConfig( Duration commitDuration, Duration refreshDuration, String indexRoot, boolean enableTracing) { - this(commitDuration, refreshDuration, indexRoot, DEFAULT_LOG_FILE_NAME, enableTracing); + this(commitDuration, refreshDuration, indexRoot, DEFAULT_LOG_FILE_NAME, enableTracing, false); } public LuceneIndexStoreConfig( @@ -53,7 +56,8 @@ public LuceneIndexStoreConfig( Duration refreshDuration, String indexRoot, String logFileName, - boolean enableTracing) { + boolean enableTracing, + boolean throttleMerges) { ensureTrue( !(commitDuration.isZero() || commitDuration.isNegative()), "Commit duration should be greater than zero"); @@ -65,6 +69,7 @@ public LuceneIndexStoreConfig( this.indexRoot = indexRoot; this.logFileName = logFileName; this.enableTracing = enableTracing; + this.throttleMerges = throttleMerges; } public File indexFolder(String id) { diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreImpl.java b/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreImpl.java index 6169c06d26..a782b4f532 100644 --- a/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreImpl.java +++ b/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreImpl.java @@ -73,6 +73,7 @@ public static LuceneIndexStoreImpl makeLogStore( LuceneIndexStoreConfig.getCommitDuration(luceneConfig.getCommitDurationSecs()), LuceneIndexStoreConfig.getRefreshDuration(luceneConfig.getRefreshDurationSecs()), luceneConfig.getEnableFullTextSearch(), + luceneConfig.getThrottleMerges(), SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy.CONVERT_VALUE_AND_DUPLICATE_FIELD, metricsRegistry); } @@ -82,6 +83,7 @@ public static LuceneIndexStoreImpl makeLogStore( Duration commitInterval, Duration refreshInterval, boolean enableFullTextSearch, + boolean throttleMerges, SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy fieldConflictPolicy, MeterRegistry metricsRegistry) throws IOException { @@ -155,7 +157,7 @@ private IndexWriterConfig buildIndexWriterConfig( final IndexWriterConfig indexWriterCfg = new IndexWriterConfig(analyzer) .setOpenMode(IndexWriterConfig.OpenMode.CREATE) - .setMergeScheduler(new KalDBMergeScheduler(metricsRegistry)) + .setMergeScheduler(new KalDBMergeScheduler(metricsRegistry, config.throttleMerges)) .setRAMBufferSizeMB(ramBufferSizeMb) .setUseCompoundFile(useCFSFiles) // we sort by timestamp descending, as that is the order we expect to return results the diff --git a/kaldb/src/main/proto/kaldb_configs.proto b/kaldb/src/main/proto/kaldb_configs.proto index 3fd5adb099..6c94382584 100644 --- a/kaldb/src/main/proto/kaldb_configs.proto +++ b/kaldb/src/main/proto/kaldb_configs.proto @@ -115,6 +115,7 @@ message LuceneConfig { int64 commit_duration_secs = 1; int64 refresh_duration_secs = 2; bool enable_full_text_search = 3; + bool throttle_merges = 4; } // ServerConfig contains the address and port info of a Kaldb service. diff --git a/kaldb/src/test/java/com/slack/kaldb/chunk/IndexingChunkImplTest.java b/kaldb/src/test/java/com/slack/kaldb/chunk/IndexingChunkImplTest.java index 085b7fd478..ebccd11696 100644 --- a/kaldb/src/test/java/com/slack/kaldb/chunk/IndexingChunkImplTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/chunk/IndexingChunkImplTest.java @@ -121,6 +121,7 @@ public void setUp() throws Exception { COMMIT_INTERVAL, REFRESH_INTERVAL, true, + false, SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy .CONVERT_VALUE_AND_DUPLICATE_FIELD, registry); @@ -467,6 +468,7 @@ public void setUp() throws Exception { COMMIT_INTERVAL, REFRESH_INTERVAL, true, + false, SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy.RAISE_ERROR, registry); chunk = @@ -551,6 +553,7 @@ public void setUp() throws Exception { COMMIT_INTERVAL, REFRESH_INTERVAL, true, + false, SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy .CONVERT_VALUE_AND_DUPLICATE_FIELD, registry); diff --git a/kaldb/src/test/java/com/slack/kaldb/chunk/ReadOnlyChunkImplTest.java b/kaldb/src/test/java/com/slack/kaldb/chunk/ReadOnlyChunkImplTest.java index c79e18929c..1c174c9a11 100644 --- a/kaldb/src/test/java/com/slack/kaldb/chunk/ReadOnlyChunkImplTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/chunk/ReadOnlyChunkImplTest.java @@ -508,6 +508,7 @@ private void initializeBlobStorageWithIndex(String snapshotId) throws Exception Duration.ofSeconds(60), Duration.ofSeconds(60), true, + false, SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy.CONVERT_VALUE_AND_DUPLICATE_FIELD, meterRegistry); addMessages(logStore, 1, 10, true); diff --git a/kaldb/src/test/java/com/slack/kaldb/chunk/RecoveryChunkImplTest.java b/kaldb/src/test/java/com/slack/kaldb/chunk/RecoveryChunkImplTest.java index 7889dfe0cf..877183b517 100644 --- a/kaldb/src/test/java/com/slack/kaldb/chunk/RecoveryChunkImplTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/chunk/RecoveryChunkImplTest.java @@ -100,6 +100,7 @@ public void setUp() throws Exception { COMMIT_INTERVAL, REFRESH_INTERVAL, true, + false, SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy .CONVERT_VALUE_AND_DUPLICATE_FIELD, registry); @@ -455,6 +456,7 @@ public void setUp() throws Exception { COMMIT_INTERVAL, REFRESH_INTERVAL, true, + false, SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy.RAISE_ERROR, registry); chunk = @@ -541,6 +543,7 @@ public void setUp() throws Exception { COMMIT_INTERVAL, REFRESH_INTERVAL, true, + false, SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy .CONVERT_VALUE_AND_DUPLICATE_FIELD, registry); diff --git a/kaldb/src/test/java/com/slack/kaldb/logstore/LuceneIndexStoreConfigTest.java b/kaldb/src/test/java/com/slack/kaldb/logstore/LuceneIndexStoreConfigTest.java index 0c27d79940..821f132c4f 100644 --- a/kaldb/src/test/java/com/slack/kaldb/logstore/LuceneIndexStoreConfigTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/logstore/LuceneIndexStoreConfigTest.java @@ -12,7 +12,7 @@ public void testZeroCommitDuration() { .isThrownBy( () -> new LuceneIndexStoreConfig( - Duration.ZERO, Duration.ofSeconds(10), "indexRoot", "logfile", true)); + Duration.ZERO, Duration.ofSeconds(10), "indexRoot", "logfile", true, false)); } @Test @@ -21,7 +21,7 @@ public void testZeroRefreshDuration() { .isThrownBy( () -> new LuceneIndexStoreConfig( - Duration.ofSeconds(10), Duration.ZERO, "indexRoot", "logfile", true)); + Duration.ofSeconds(10), Duration.ZERO, "indexRoot", "logfile", true, false)); } @Test @@ -30,7 +30,12 @@ public void testNegativeCommitDuration() { .isThrownBy( () -> new LuceneIndexStoreConfig( - Duration.ofSeconds(-10), Duration.ofSeconds(10), "indexRoot", "logfile", true)); + Duration.ofSeconds(-10), + Duration.ofSeconds(10), + "indexRoot", + "logfile", + true, + false)); } @Test @@ -43,6 +48,7 @@ public void testNegativeRefreshDuration() { Duration.ofSeconds(-100), "indexRoot", "logfile", - true)); + true, + false)); } }