Skip to content

Commit

Permalink
add an option to disable merge throttle
Browse files Browse the repository at this point in the history
  • Loading branch information
vthacker committed Jun 19, 2023
1 parent f289e21 commit ae900b6
Show file tree
Hide file tree
Showing 12 changed files with 36 additions and 8 deletions.
1 change: 1 addition & 0 deletions benchmarks/src/main/java/com/slack/kaldb/IndexAPILog.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public void createIndexer() throws Exception {
commitInterval,
refreshInterval,
true,
false,
SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy.CONVERT_VALUE_AND_DUPLICATE_FIELD,
registry);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public void createIndexer() throws Exception {
commitInterval,
refreshInterval,
true,
false,
CONVERT_VALUE_AND_DUPLICATE_FIELD,
registry);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public void createIndexer() throws Exception {
commitInterval,
refreshInterval,
true,
false,
SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy.CONVERT_VALUE_AND_DUPLICATE_FIELD,
registry);

Expand Down
1 change: 1 addition & 0 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ indexerConfig:
commitDurationSecs: ${INDEXER_COMMIT_DURATION_SECS:-10}
refreshDurationSecs: ${INDEXER_REFRESH_DURATION_SECS:-11}
enableFullTextSearch: ${INDEXER_ENABLE_FULL_TEXT_SEARCH:-false}
throttleMerges: ${INDEXER_THROTTLE_MERGES:-true}
staleDurationSecs: ${INDEXER_STALE_DURATION_SECS:-7200}
dataTransformer: ${INDEXER_DATA_TRANSFORMER:-trace_span}
dataDirectory: ${INDEXER_DATA_DIR:-/tmp}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
public class KalDBMergeScheduler extends ConcurrentMergeScheduler {

private static final Logger LOG = LoggerFactory.getLogger(KalDBMergeScheduler.class);
private final boolean throttleMerges;
private MeterRegistry metricsRegistry;

public static final String STALL_TIME = "kaldb_index_merge_stall_time_ms";
Expand All @@ -23,11 +24,12 @@ public class KalDBMergeScheduler extends ConcurrentMergeScheduler {
public static final String MERGE_COUNTER = "kaldb_index_merge_count";
private final Counter mergeCounter;

public KalDBMergeScheduler(MeterRegistry metricsRegistry) {
public KalDBMergeScheduler(MeterRegistry metricsRegistry, boolean throttleMerges) {
this.metricsRegistry = metricsRegistry;
stallCounter = this.metricsRegistry.counter(STALL_TIME);
activeStallThreadsCount = this.metricsRegistry.gauge(STALL_THREADS, new AtomicInteger());
this.mergeCounter = this.metricsRegistry.counter(MERGE_COUNTER);
this.throttleMerges = throttleMerges;
}

protected void doMerge(MergeSource mergeSource, MergePolicy.OneMerge merge) throws IOException {
Expand All @@ -48,6 +50,7 @@ protected void doMerge(MergeSource mergeSource, MergePolicy.OneMerge merge) thro
* has details on why Lucene added auto IO throttle
*/
protected synchronized boolean maybeStall(MergeSource mergeSource) {
if (!throttleMerges) return false;
long startTime = System.nanoTime();
activeStallThreadsCount.incrementAndGet();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ public class LuceneIndexStoreConfig {
// A flag that turns on internal logging.
public final boolean enableTracing;

// setting for the merge scheduler
public final boolean throttleMerges;

// TODO: Tweak the default values once in prod.
static final Duration defaultCommitDuration = Duration.ofSeconds(15);
static final Duration defaultRefreshDuration = Duration.ofSeconds(15);
Expand All @@ -45,15 +48,16 @@ public static Duration getRefreshDuration(final long refreshDurationSecs) {

public LuceneIndexStoreConfig(
Duration commitDuration, Duration refreshDuration, String indexRoot, boolean enableTracing) {
this(commitDuration, refreshDuration, indexRoot, DEFAULT_LOG_FILE_NAME, enableTracing);
this(commitDuration, refreshDuration, indexRoot, DEFAULT_LOG_FILE_NAME, enableTracing, false);
}

public LuceneIndexStoreConfig(
Duration commitDuration,
Duration refreshDuration,
String indexRoot,
String logFileName,
boolean enableTracing) {
boolean enableTracing,
boolean throttleMerges) {
ensureTrue(
!(commitDuration.isZero() || commitDuration.isNegative()),
"Commit duration should be greater than zero");
Expand All @@ -65,6 +69,7 @@ public LuceneIndexStoreConfig(
this.indexRoot = indexRoot;
this.logFileName = logFileName;
this.enableTracing = enableTracing;
this.throttleMerges = throttleMerges;
}

public File indexFolder(String id) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public static LuceneIndexStoreImpl makeLogStore(
LuceneIndexStoreConfig.getCommitDuration(luceneConfig.getCommitDurationSecs()),
LuceneIndexStoreConfig.getRefreshDuration(luceneConfig.getRefreshDurationSecs()),
luceneConfig.getEnableFullTextSearch(),
luceneConfig.getThrottleMerges(),
SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy.CONVERT_VALUE_AND_DUPLICATE_FIELD,
metricsRegistry);
}
Expand All @@ -82,6 +83,7 @@ public static LuceneIndexStoreImpl makeLogStore(
Duration commitInterval,
Duration refreshInterval,
boolean enableFullTextSearch,
boolean throttleMerges,
SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy fieldConflictPolicy,
MeterRegistry metricsRegistry)
throws IOException {
Expand Down Expand Up @@ -155,7 +157,7 @@ private IndexWriterConfig buildIndexWriterConfig(
final IndexWriterConfig indexWriterCfg =
new IndexWriterConfig(analyzer)
.setOpenMode(IndexWriterConfig.OpenMode.CREATE)
.setMergeScheduler(new KalDBMergeScheduler(metricsRegistry))
.setMergeScheduler(new KalDBMergeScheduler(metricsRegistry, config.throttleMerges))
.setRAMBufferSizeMB(ramBufferSizeMb)
.setUseCompoundFile(useCFSFiles)
// we sort by timestamp descending, as that is the order we expect to return results the
Expand Down
1 change: 1 addition & 0 deletions kaldb/src/main/proto/kaldb_configs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ message LuceneConfig {
int64 commit_duration_secs = 1;
int64 refresh_duration_secs = 2;
bool enable_full_text_search = 3;
bool throttle_merges = 4;
}

// ServerConfig contains the address and port info of a Kaldb service.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ public void setUp() throws Exception {
COMMIT_INTERVAL,
REFRESH_INTERVAL,
true,
false,
SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy
.CONVERT_VALUE_AND_DUPLICATE_FIELD,
registry);
Expand Down Expand Up @@ -467,6 +468,7 @@ public void setUp() throws Exception {
COMMIT_INTERVAL,
REFRESH_INTERVAL,
true,
false,
SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy.RAISE_ERROR,
registry);
chunk =
Expand Down Expand Up @@ -551,6 +553,7 @@ public void setUp() throws Exception {
COMMIT_INTERVAL,
REFRESH_INTERVAL,
true,
false,
SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy
.CONVERT_VALUE_AND_DUPLICATE_FIELD,
registry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,7 @@ private void initializeBlobStorageWithIndex(String snapshotId) throws Exception
Duration.ofSeconds(60),
Duration.ofSeconds(60),
true,
false,
SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy.CONVERT_VALUE_AND_DUPLICATE_FIELD,
meterRegistry);
addMessages(logStore, 1, 10, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public void setUp() throws Exception {
COMMIT_INTERVAL,
REFRESH_INTERVAL,
true,
false,
SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy
.CONVERT_VALUE_AND_DUPLICATE_FIELD,
registry);
Expand Down Expand Up @@ -455,6 +456,7 @@ public void setUp() throws Exception {
COMMIT_INTERVAL,
REFRESH_INTERVAL,
true,
false,
SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy.RAISE_ERROR,
registry);
chunk =
Expand Down Expand Up @@ -541,6 +543,7 @@ public void setUp() throws Exception {
COMMIT_INTERVAL,
REFRESH_INTERVAL,
true,
false,
SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy
.CONVERT_VALUE_AND_DUPLICATE_FIELD,
registry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public void testZeroCommitDuration() {
.isThrownBy(
() ->
new LuceneIndexStoreConfig(
Duration.ZERO, Duration.ofSeconds(10), "indexRoot", "logfile", true));
Duration.ZERO, Duration.ofSeconds(10), "indexRoot", "logfile", true, false));
}

@Test
Expand All @@ -21,7 +21,7 @@ public void testZeroRefreshDuration() {
.isThrownBy(
() ->
new LuceneIndexStoreConfig(
Duration.ofSeconds(10), Duration.ZERO, "indexRoot", "logfile", true));
Duration.ofSeconds(10), Duration.ZERO, "indexRoot", "logfile", true, false));
}

@Test
Expand All @@ -30,7 +30,12 @@ public void testNegativeCommitDuration() {
.isThrownBy(
() ->
new LuceneIndexStoreConfig(
Duration.ofSeconds(-10), Duration.ofSeconds(10), "indexRoot", "logfile", true));
Duration.ofSeconds(-10),
Duration.ofSeconds(10),
"indexRoot",
"logfile",
true,
false));
}

@Test
Expand All @@ -43,6 +48,7 @@ public void testNegativeRefreshDuration() {
Duration.ofSeconds(-100),
"indexRoot",
"logfile",
true));
true,
false));
}
}

0 comments on commit ae900b6

Please sign in to comment.