Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose merge throttling parameter #581

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions benchmarks/src/main/java/com/slack/kaldb/IndexAPILog.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public void createIndexer() throws Exception {
commitInterval,
refreshInterval,
true,
true,
SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy.CONVERT_VALUE_AND_DUPLICATE_FIELD,
registry);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public void createIndexer() throws Exception {
commitInterval,
refreshInterval,
true,
true,
CONVERT_VALUE_AND_DUPLICATE_FIELD,
registry);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public void createIndexer() throws Exception {
commitInterval,
refreshInterval,
true,
true,
SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy.CONVERT_VALUE_AND_DUPLICATE_FIELD,
registry);

Expand Down
1 change: 1 addition & 0 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ indexerConfig:
commitDurationSecs: ${INDEXER_COMMIT_DURATION_SECS:-10}
refreshDurationSecs: ${INDEXER_REFRESH_DURATION_SECS:-11}
enableFullTextSearch: ${INDEXER_ENABLE_FULL_TEXT_SEARCH:-false}
throttleMerges: ${INDEXER_THROTTLE_MERGES:-true}
staleDurationSecs: ${INDEXER_STALE_DURATION_SECS:-7200}
dataTransformer: ${INDEXER_DATA_TRANSFORMER:-trace_span}
dataDirectory: ${INDEXER_DATA_DIR:-/tmp}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
public class KalDBMergeScheduler extends ConcurrentMergeScheduler {

private static final Logger LOG = LoggerFactory.getLogger(KalDBMergeScheduler.class);
private final boolean throttleMerges;
private MeterRegistry metricsRegistry;

public static final String STALL_TIME = "kaldb_index_merge_stall_time_ms";
Expand All @@ -23,11 +24,12 @@ public class KalDBMergeScheduler extends ConcurrentMergeScheduler {
public static final String MERGE_COUNTER = "kaldb_index_merge_count";
private final Counter mergeCounter;

public KalDBMergeScheduler(MeterRegistry metricsRegistry) {
public KalDBMergeScheduler(MeterRegistry metricsRegistry, boolean throttleMerges) {
this.metricsRegistry = metricsRegistry;
stallCounter = this.metricsRegistry.counter(STALL_TIME);
activeStallThreadsCount = this.metricsRegistry.gauge(STALL_THREADS, new AtomicInteger());
this.mergeCounter = this.metricsRegistry.counter(MERGE_COUNTER);
this.throttleMerges = throttleMerges;
}

protected void doMerge(MergeSource mergeSource, MergePolicy.OneMerge merge) throws IOException {
Expand All @@ -48,6 +50,7 @@ protected void doMerge(MergeSource mergeSource, MergePolicy.OneMerge merge) thro
* has details on why Lucene added auto IO throttle
*/
protected synchronized boolean maybeStall(MergeSource mergeSource) {
if (!throttleMerges) return false;
long startTime = System.nanoTime();
activeStallThreadsCount.incrementAndGet();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ public class LuceneIndexStoreConfig {
// A flag that turns on internal logging.
public final boolean enableTracing;

// setting for the merge scheduler
public final boolean throttleMerges;

// TODO: Tweak the default values once in prod.
static final Duration defaultCommitDuration = Duration.ofSeconds(15);
static final Duration defaultRefreshDuration = Duration.ofSeconds(15);
Expand All @@ -44,16 +47,27 @@ public static Duration getRefreshDuration(final long refreshDurationSecs) {
}

public LuceneIndexStoreConfig(
Duration commitDuration, Duration refreshDuration, String indexRoot, boolean enableTracing) {
this(commitDuration, refreshDuration, indexRoot, DEFAULT_LOG_FILE_NAME, enableTracing);
Duration commitDuration,
Duration refreshDuration,
String indexRoot,
boolean enableTracing,
boolean throttleMerges) {
this(
commitDuration,
refreshDuration,
indexRoot,
DEFAULT_LOG_FILE_NAME,
enableTracing,
throttleMerges);
}

public LuceneIndexStoreConfig(
Duration commitDuration,
Duration refreshDuration,
String indexRoot,
String logFileName,
boolean enableTracing) {
boolean enableTracing,
boolean throttleMerges) {
ensureTrue(
!(commitDuration.isZero() || commitDuration.isNegative()),
"Commit duration should be greater than zero");
Expand All @@ -65,6 +79,7 @@ public LuceneIndexStoreConfig(
this.indexRoot = indexRoot;
this.logFileName = logFileName;
this.enableTracing = enableTracing;
this.throttleMerges = throttleMerges;
}

public File indexFolder(String id) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public static LuceneIndexStoreImpl makeLogStore(
LuceneIndexStoreConfig.getCommitDuration(luceneConfig.getCommitDurationSecs()),
LuceneIndexStoreConfig.getRefreshDuration(luceneConfig.getRefreshDurationSecs()),
luceneConfig.getEnableFullTextSearch(),
luceneConfig.getThrottleMerges(),
SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy.CONVERT_VALUE_AND_DUPLICATE_FIELD,
metricsRegistry);
}
Expand All @@ -80,14 +81,19 @@ public static LuceneIndexStoreImpl makeLogStore(
Duration commitInterval,
Duration refreshInterval,
boolean enableFullTextSearch,
boolean throttleMerges,
SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy fieldConflictPolicy,
MeterRegistry metricsRegistry)
throws IOException {
// TODO: Move all these config values into chunk?
// TODO: Chunk should create log store?
LuceneIndexStoreConfig indexStoreCfg =
new LuceneIndexStoreConfig(
commitInterval, refreshInterval, dataDirectory.getAbsolutePath(), false);
commitInterval,
refreshInterval,
dataDirectory.getAbsolutePath(),
false,
throttleMerges);

return new LuceneIndexStoreImpl(
indexStoreCfg,
Expand Down Expand Up @@ -151,7 +157,7 @@ private IndexWriterConfig buildIndexWriterConfig(
final IndexWriterConfig indexWriterCfg =
new IndexWriterConfig(analyzer)
.setOpenMode(IndexWriterConfig.OpenMode.CREATE)
.setMergeScheduler(new KalDBMergeScheduler(metricsRegistry))
.setMergeScheduler(new KalDBMergeScheduler(metricsRegistry, config.throttleMerges))
// we sort by timestamp descending, as that is the order we expect to return results the
// majority of the time
.setIndexSort(
Expand All @@ -161,7 +167,6 @@ private IndexWriterConfig buildIndexWriterConfig(
SortField.Type.LONG,
true)))
.setIndexDeletionPolicy(snapshotDeletionPolicy);

if (config.enableTracing) {
indexWriterCfg.setInfoStream(System.out);
}
Expand Down
1 change: 1 addition & 0 deletions kaldb/src/main/proto/kaldb_configs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ message LuceneConfig {
int64 commit_duration_secs = 1;
int64 refresh_duration_secs = 2;
bool enable_full_text_search = 3;
bool throttle_merges = 4;
}

// ServerConfig contains the address and port info of a Kaldb service.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ public void setUp() throws Exception {
COMMIT_INTERVAL,
REFRESH_INTERVAL,
true,
true,
SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy
.CONVERT_VALUE_AND_DUPLICATE_FIELD,
registry);
Expand Down Expand Up @@ -467,6 +468,7 @@ public void setUp() throws Exception {
COMMIT_INTERVAL,
REFRESH_INTERVAL,
true,
true,
SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy.RAISE_ERROR,
registry);
chunk =
Expand Down Expand Up @@ -551,6 +553,7 @@ public void setUp() throws Exception {
COMMIT_INTERVAL,
REFRESH_INTERVAL,
true,
true,
SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy
.CONVERT_VALUE_AND_DUPLICATE_FIELD,
registry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,7 @@ private void initializeBlobStorageWithIndex(String snapshotId) throws Exception
Duration.ofSeconds(60),
Duration.ofSeconds(60),
true,
true,
SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy.CONVERT_VALUE_AND_DUPLICATE_FIELD,
meterRegistry);
addMessages(logStore, 1, 10, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public void setUp() throws Exception {
COMMIT_INTERVAL,
REFRESH_INTERVAL,
true,
false,
SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy
.CONVERT_VALUE_AND_DUPLICATE_FIELD,
registry);
Expand Down Expand Up @@ -455,6 +456,7 @@ public void setUp() throws Exception {
COMMIT_INTERVAL,
REFRESH_INTERVAL,
true,
false,
SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy.RAISE_ERROR,
registry);
chunk =
Expand Down Expand Up @@ -541,6 +543,7 @@ public void setUp() throws Exception {
COMMIT_INTERVAL,
REFRESH_INTERVAL,
true,
false,
SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy
.CONVERT_VALUE_AND_DUPLICATE_FIELD,
registry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public void testZeroCommitDuration() {
.isThrownBy(
() ->
new LuceneIndexStoreConfig(
Duration.ZERO, Duration.ofSeconds(10), "indexRoot", "logfile", true));
Duration.ZERO, Duration.ofSeconds(10), "indexRoot", "logfile", true, true));
}

@Test
Expand All @@ -21,7 +21,7 @@ public void testZeroRefreshDuration() {
.isThrownBy(
() ->
new LuceneIndexStoreConfig(
Duration.ofSeconds(10), Duration.ZERO, "indexRoot", "logfile", true));
Duration.ofSeconds(10), Duration.ZERO, "indexRoot", "logfile", true, true));
}

@Test
Expand All @@ -30,7 +30,12 @@ public void testNegativeCommitDuration() {
.isThrownBy(
() ->
new LuceneIndexStoreConfig(
Duration.ofSeconds(-10), Duration.ofSeconds(10), "indexRoot", "logfile", true));
Duration.ofSeconds(-10),
Duration.ofSeconds(10),
"indexRoot",
"logfile",
true,
true));
}

@Test
Expand All @@ -43,6 +48,7 @@ public void testNegativeRefreshDuration() {
Duration.ofSeconds(-100),
"indexRoot",
"logfile",
true,
true));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,8 @@ private InternalAggregation makeHistogram(
Duration.of(1, ChronoUnit.MINUTES),
Duration.of(1, ChronoUnit.MINUTES),
tempFolder.getCanonicalPath(),
false);
false,
true);
MeterRegistry metricsRegistry = new SimpleMeterRegistry();
DocumentBuilder<LogMessage> documentBuilder =
SchemaAwareLogDocumentBuilderImpl.build(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public TemporaryLogStoreAndSearcherExtension(
public static LuceneIndexStoreConfig getIndexStoreConfig(
Duration commitInterval, Duration refreshInterval, File tempFolder) throws IOException {
return new LuceneIndexStoreConfig(
commitInterval, refreshInterval, tempFolder.getCanonicalPath(), false);
commitInterval, refreshInterval, tempFolder.getCanonicalPath(), false, true);
}

@Override
Expand Down