Skip to content

Commit

Permalink
Merge branch 'master' into bburkholder/bump-lucene
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanlb authored Oct 26, 2023
2 parents f73932d + 014da40 commit a148ec7
Show file tree
Hide file tree
Showing 35 changed files with 760 additions and 849 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ benchmarks/jmh-output/

# Test files
indices/
**kaldb-slot-log**

# go vendor files.
vendor/
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public void measureLogSearcherSearch() {
logIndexSearcher.search(
"*",
"",
0,
0L,
Long.MAX_VALUE,
500,
new DateHistogramAggBuilder(
Expand Down
4 changes: 3 additions & 1 deletion kaldb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,8 @@
<fork>true</fork>
<compilerArgs>
<arg>-XDcompilePolicy=simple</arg>
<arg> -Xplugin:ErrorProne -XepDisableWarningsInGeneratedCode -XepExcludedPaths:.*/protobuf/.* -Xep:WildcardImport:ERROR -Xep:AssertEqualsArgumentOrderChecker:ERROR</arg>
<arg>--enable-preview</arg>
<arg>-Xplugin:ErrorProne -XepDisableWarningsInGeneratedCode -XepExcludedPaths:.*/protobuf/.* -Xep:WildcardImport:ERROR -Xep:AssertEqualsArgumentOrderChecker:ERROR</arg>
<arg>-J--add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED</arg>
<arg>-J--add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED</arg>
<arg>-J--add-exports=jdk.compiler/com.sun.tools.javac.main=ALL-UNNAMED</arg>
Expand Down Expand Up @@ -740,6 +741,7 @@
<artifactId>maven-surefire-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<argLine>--enable-preview</argLine>
<trimStackTrace>false</trimStackTrace>
<systemPropertyVariables>
<log4j.configurationFile>src/test/resources/log4j2.xml</log4j.configurationFile>
Expand Down
53 changes: 46 additions & 7 deletions kaldb/src/main/java/com/slack/kaldb/chunk/ReadOnlyChunkImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.curator.x.async.AsyncCuratorFramework;
Expand Down Expand Up @@ -79,6 +80,8 @@ public class ReadOnlyChunkImpl<T> implements Chunk<T> {
private final KaldbMetadataStoreChangeListener<CacheSlotMetadata> cacheSlotListener =
this::cacheNodeListener;

private final ReentrantLock chunkAssignmentLock = new ReentrantLock();

public ReadOnlyChunkImpl(
AsyncCuratorFramework curatorFramework,
MeterRegistry meterRegistry,
Expand Down Expand Up @@ -187,12 +190,12 @@ private void unregisterSearchMetadata()
}
}

// We synchronize access when manipulating the chunk, as the close() can
// We lock access when manipulating the chunk, as the close() can
// run concurrently with an assignment
private synchronized void handleChunkAssignment(CacheSlotMetadata cacheSlotMetadata) {
private void handleChunkAssignment(CacheSlotMetadata cacheSlotMetadata) {
Timer.Sample assignmentTimer = Timer.start(meterRegistry);
chunkAssignmentLock.lock();
try {
///
if (!setChunkMetadataState(
cacheSlotMetadata, Metadata.CacheSlotMetadata.CacheSlotState.LOADING)) {
throw new InterruptedException("Failed to set chunk metadata state to loading");
Expand Down Expand Up @@ -249,6 +252,8 @@ private synchronized void handleChunkAssignment(CacheSlotMetadata cacheSlotMetad
setChunkMetadataState(cacheSlotMetadata, Metadata.CacheSlotMetadata.CacheSlotState.FREE);
LOG.error("Error handling chunk assignment", e);
assignmentTimer.stop(chunkAssignmentTimerFailure);
} finally {
chunkAssignmentLock.unlock();
}
}

Expand All @@ -265,10 +270,11 @@ private SnapshotMetadata getSnapshotMetadata(String replicaId)
.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
}

// We synchronize access when manipulating the chunk, as the close()
// We lock access when manipulating the chunk, as the close()
// can run concurrently with an eviction
private synchronized void handleChunkEviction(CacheSlotMetadata cacheSlotMetadata) {
private void handleChunkEviction(CacheSlotMetadata cacheSlotMetadata) {
Timer.Sample evictionTimer = Timer.start(meterRegistry);
chunkAssignmentLock.lock();
try {
if (!setChunkMetadataState(
cacheSlotMetadata, Metadata.CacheSlotMetadata.CacheSlotState.EVICTING)) {
Expand Down Expand Up @@ -297,6 +303,8 @@ private synchronized void handleChunkEviction(CacheSlotMetadata cacheSlotMetadat
// re-assignment or queries hitting this slot
LOG.error("Error handling chunk eviction", e);
evictionTimer.stop(chunkEvictionTimerFailure);
} finally {
chunkAssignmentLock.unlock();
}
}

Expand Down Expand Up @@ -381,15 +389,46 @@ public String id() {
@Override
public SearchResult<T> query(SearchQuery query) {
if (logSearcher != null) {
Long searchStartTime =
determineStartTime(query.startTimeEpochMs, chunkInfo.getDataStartTimeEpochMs());
Long searchEndTime =
determineEndTime(query.endTimeEpochMs, chunkInfo.getDataEndTimeEpochMs());

return logSearcher.search(
query.dataset,
query.queryStr,
query.startTimeEpochMs,
query.endTimeEpochMs,
searchStartTime,
searchEndTime,
query.howMany,
query.aggBuilder);
} else {
return (SearchResult<T>) SearchResult.empty();
}
}

/**
* Determines the start time to use for the query, given the original query start time and the
* start time of data in the chunk
*/
protected static Long determineStartTime(long queryStartTimeEpochMs, long chunkStartTimeEpochMs) {
Long searchStartTime = null;
if (queryStartTimeEpochMs > chunkStartTimeEpochMs) {
// if the query start time falls after the beginning of the chunk
searchStartTime = queryStartTimeEpochMs;
}
return searchStartTime;
}

/**
* Determines the end time to use for the query, given the original query end time and the end
* time of data in the chunk
*/
protected static Long determineEndTime(long queryEndTimeEpochMs, long chunkEndTimeEpochMs) {
Long searchEndTime = null;
if (queryEndTimeEpochMs < chunkEndTimeEpochMs) {
// if the query end time falls before the end of the chunk
searchEndTime = queryEndTimeEpochMs;
}
return searchEndTime;
}
}
189 changes: 91 additions & 98 deletions kaldb/src/main/java/com/slack/kaldb/chunkManager/ChunkManagerBase.java
Original file line number Diff line number Diff line change
@@ -1,32 +1,28 @@
package com.slack.kaldb.chunkManager;

import brave.ScopedSpan;
import brave.Tracing;
import brave.propagation.CurrentTraceContext;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.slack.kaldb.chunk.Chunk;
import com.slack.kaldb.logstore.search.SearchQuery;
import com.slack.kaldb.logstore.search.SearchResult;
import com.slack.kaldb.logstore.search.SearchResultAggregator;
import com.slack.kaldb.logstore.search.SearchResultAggregatorImpl;
import com.slack.kaldb.metadata.schema.FieldType;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.Semaphore;
import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -43,36 +39,17 @@ public abstract class ChunkManagerBase<T> extends AbstractIdleService implements
// we use a CopyOnWriteArrayList as we expect to have very few edits to this list compared
// to the amount of reads, and it must be a threadsafe implementation
protected final List<Chunk<T>> chunkList = new CopyOnWriteArrayList<>();

private static final ListeningExecutorService queryExecutorService = queryThreadPool();

private static final ScheduledExecutorService queryCancellationService =
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setNameFormat("chunk-manager-query-cancellation-%d")
.setUncaughtExceptionHandler(
(t, e) -> LOG.error("Exception on thread {}: {}", t.getName(), e))
.build());

/*
* We want to provision the chunk query capacity such that we can almost saturate the CPU. In the event we allow
* these to saturate the CPU it can result in the container being killed due to failed healthchecks.
*
* Revisit the thread pool settings if this becomes a perf issue. Also, we may need
* different thread pools for indexer and cache nodes in the future.
*/
private static ListeningExecutorService queryThreadPool() {
// todo - consider making the thread count a config option; this would allow for more
// fine-grained tuning, but we might not need to expose this to the user if we can set sensible
// defaults
return MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(
Math.max(1, Runtime.getRuntime().availableProcessors() - 2),
new ThreadFactoryBuilder()
.setNameFormat("chunk-manager-query-%d")
.setUncaughtExceptionHandler(
(t, e) -> LOG.error("Exception on thread {}: {}", t.getName(), e))
.build()));
private final Semaphore concurrentQueries;

public ChunkManagerBase() {
// todo - move this to a config value if we end up needing this param
int semaphoreCount =
Integer.parseInt(
System.getProperty(
"kaldb.concurrent.query",
String.valueOf(Runtime.getRuntime().availableProcessors() - 1)));
LOG.info("Using kaldb.concurrent.query - {}", semaphoreCount);
concurrentQueries = new Semaphore(semaphoreCount, true);
}

/*
Expand Down Expand Up @@ -107,72 +84,88 @@ public SearchResult<T> query(SearchQuery query, Duration queryTimeout) {
// a single IndexSearcher.
Collections.shuffle(chunksMatchingQuery);

List<ListenableFuture<SearchResult<T>>> queries =
chunksMatchingQuery.stream()
.map(
(chunk) ->
queryExecutorService.submit(
currentTraceContext.wrap(
() -> {
try {
if (Thread.interrupted()) {
LOG.warn(
"Chunk query thread timed out without starting work, returning error result.");
return errorResult;
}
return chunk.query(query);
} catch (Exception err) {
// Only log the exception message as warn, and not the entire trace
// as this can cause performance issues if significant amounts of
// invalid queries are received
LOG.warn("Chunk Query Exception: {}", err.getMessage());
LOG.debug("Chunk Query Exception", err);
// We catch IllegalArgumentException ( and any other exception that
// represents a parse failure ) and instead of returning an empty
// result we throw back an error to the user
if (err instanceof IllegalArgumentException) {
throw err;
}
return errorResult;
}
})))
.peek(
(future) ->
queryCancellationService.schedule(
() -> future.cancel(true), queryTimeout.toMillis(), TimeUnit.MILLISECONDS))
.collect(Collectors.toList());

Future<List<SearchResult<T>>> searchResultFuture = Futures.successfulAsList(queries);
try {
List<SearchResult<T>> searchResults =
searchResultFuture.get(queryTimeout.toMillis(), TimeUnit.MILLISECONDS);

// check if all results are null, and if so return an error to the user
if (searchResults.size() > 0 && searchResults.stream().allMatch(Objects::isNull)) {
try (var scope = new StructuredTaskScope<SearchResult<T>>()) {
List<StructuredTaskScope.Subtask<SearchResult<T>>> chunkSubtasks =
chunksMatchingQuery.stream()
.map(
(chunk) ->
scope.fork(
currentTraceContext.wrap(
() -> {
ScopedSpan span =
Tracing.currentTracer()
.startScopedSpan("ChunkManagerBase.chunkQuery");
span.tag("chunkId", chunk.id());
span.tag("chunkSnapshotPath", chunk.info().getSnapshotPath());
concurrentQueries.acquire();
try {
return chunk.query(query);
} finally {
concurrentQueries.release();
span.finish();
}
})))
.toList();
try {
Futures.allAsList(queries).get(0, TimeUnit.SECONDS);
} catch (Exception e) {
throw new IllegalArgumentException(e);
scope.joinUntil(Instant.now().plusSeconds(queryTimeout.toSeconds()));
} catch (TimeoutException timeoutException) {
scope.shutdown();
scope.join();
}
// not expected to happen - we should be guaranteed that the list has at least one failed
// future, which should throw when we try to get on allAsList
throw new IllegalArgumentException(
"Chunk query error - all results returned null values with no exceptions thrown");
}

//noinspection unchecked
SearchResult<T> aggregatedResults =
((SearchResultAggregator<T>) new SearchResultAggregatorImpl<>(query))
.aggregate(searchResults, false);
return incrementNodeCount(aggregatedResults);
List<SearchResult<T>> searchResults =
chunkSubtasks.stream()
.map(
searchResultSubtask -> {
try {
if (searchResultSubtask
.state()
.equals(StructuredTaskScope.Subtask.State.SUCCESS)) {
return searchResultSubtask.get();
} else if (searchResultSubtask
.state()
.equals(StructuredTaskScope.Subtask.State.FAILED)) {
Throwable throwable = searchResultSubtask.exception();
if (throwable instanceof IllegalArgumentException) {
// We catch IllegalArgumentException ( and any other exception that
// represents a parse failure ) and instead of returning an empty
// result we throw back an error to the user
throw new IllegalArgumentException(throwable);
}
LOG.warn("Chunk Query Exception: {}", throwable.getMessage());
}
// else UNAVAILABLE (ie, timedout)
return errorResult;
} catch (Exception err) {
if (err instanceof IllegalArgumentException) {
throw err;
}

// Only log the exception message as warn, and not the entire trace
// as this can cause performance issues if significant amounts of
// invalid queries are received
LOG.warn("Chunk Query Exception: {}", err.getMessage());
return errorResult;
}
})
.toList();

// check if all results are null, and if so return an error to the user
if (!searchResults.isEmpty() && searchResults.stream().allMatch(Objects::isNull)) {
throw new IllegalArgumentException(
"Chunk query error - all results returned null values");
}

//noinspection unchecked
SearchResult<T> aggregatedResults =
((SearchResultAggregator<T>) new SearchResultAggregatorImpl<>(query))
.aggregate(searchResults, false);
return incrementNodeCount(aggregatedResults);
}
} catch (Exception e) {
LOG.error("Error searching across chunks ", e);
throw new RuntimeException(e);
} finally {
// always request future cancellation. This won't interrupt I/O or downstream futures,
// but is good practice. Since this is backed by a CompletableFuture
// mayInterruptIfRunning has no effect
searchResultFuture.cancel(true);
}
}

Expand Down
Loading

0 comments on commit a148ec7

Please sign in to comment.