Skip to content

Commit

Permalink
Rework schema queries to be quicker (#763)
Browse files Browse the repository at this point in the history
* Fix schema query

* Add aggressive schema timeout

* PR cleanup

---------

Co-authored-by: Bryan Burkholder <[email protected]>
  • Loading branch information
bryanlb and bryanlb authored Feb 6, 2024
1 parent 452c461 commit d55d87c
Showing 1 changed file with 60 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
import brave.grpc.GrpcTracing;
import brave.propagation.CurrentTraceContext;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.linecorp.armeria.client.grpc.GrpcClients;
import com.slack.kaldb.logstore.LogMessage;
import com.slack.kaldb.metadata.core.KaldbMetadataStoreChangeListener;
Expand Down Expand Up @@ -93,14 +91,16 @@ public class KaldbDistributedQueryService extends KaldbQueryServiceBase implemen
// include metadata that should always be present. The Armeria timeout is used at the top request,
// distributed query is used as a deadline for all nodes to return, and the local query timeout
// is used for controlling lucene future timeouts.
private final Duration requestTimeout;
private final Duration defaultQueryTimeout;
private final ScheduledExecutorService executorService =
Executors.newSingleThreadScheduledExecutor();
private ScheduledFuture<?> pendingStubUpdate;
private final KaldbMetadataStoreChangeListener<SearchMetadata> searchMetadataListener =
(searchMetadata) -> triggerStubUpdate();

private final int SCHEMA_TIMEOUT_MS =
Integer.parseInt(System.getProperty("kalDb.query.schemaTimeoutMs", "500"));

// For now we will use SearchMetadataStore to populate servers
// But this is wasteful since we add snapshots more often than we add/remove nodes ( hopefully )
// So this should be replaced cache/index metadata store when that info is present in ZK
Expand All @@ -116,7 +116,6 @@ public KaldbDistributedQueryService(
this.searchMetadataStore = searchMetadataStore;
this.snapshotMetadataStore = snapshotMetadataStore;
this.datasetMetadataStore = datasetMetadataStore;
this.requestTimeout = requestTimeout;
this.defaultQueryTimeout = defaultQueryTimeout;
searchMetadataTotalChangeCounter = meterRegistry.counter(SEARCH_METADATA_TOTAL_CHANGE_COUNTER);
this.distributedQueryApdexSatisfied = meterRegistry.counter(DISTRIBUTED_QUERY_APDEX_SATISFIED);
Expand Down Expand Up @@ -494,60 +493,70 @@ public KaldbSearch.SchemaResult getSchema(KaldbSearch.SchemaRequest distribSchem
Map<String, List<String>> nodesAndSnapshotsToQuery =
getNodesAndSnapshotsToQuery(searchMetadataNodesMatchingQuery);

List<ListenableFuture<KaldbSearch.SchemaResult>> queryServers = new ArrayList<>(stubs.size());
CurrentTraceContext currentTraceContext = Tracing.current().currentTraceContext();
try {
try (var scope = new StructuredTaskScope<KaldbSearch.SchemaResult>()) {
List<StructuredTaskScope.Subtask<KaldbSearch.SchemaResult>> searchSubtasks =
nodesAndSnapshotsToQuery.entrySet().stream()
.limit(LIMIT_SCHEMA_NODES_TO_QUERY)
.map(
(searchNode) ->
scope.fork(
currentTraceContext.wrap(
() -> {
KaldbServiceGrpc.KaldbServiceFutureStub stub =
getStub(searchNode.getKey());

List<Map.Entry<String, List<String>>> limitedNodesToQuery =
nodesAndSnapshotsToQuery.entrySet().stream().limit(LIMIT_SCHEMA_NODES_TO_QUERY).toList();
for (Map.Entry<String, List<String>> searchNode : limitedNodesToQuery) {
KaldbServiceGrpc.KaldbServiceFutureStub stub = getStub(searchNode.getKey());
if (stub == null) {
// TODO: insert a failed result in the results object that we return from this method
// mimicing
continue;
}
if (stub == null) {
return null;
}

KaldbSearch.SchemaRequest localSearchReq =
distribSchemaReq.toBuilder().addAllChunkIds(searchNode.getValue()).build();

// make sure all underlying futures finish executing (successful/cancelled/failed/other)
// and cannot be pending when the successfulAsList.get(SAME_TIMEOUT_MS) runs
ListenableFuture<KaldbSearch.SchemaResult> schemaRequest =
stub.withDeadlineAfter(defaultQueryTimeout.toMillis(), TimeUnit.MILLISECONDS)
.withInterceptors(
GrpcTracing.newBuilder(Tracing.current()).build().newClientInterceptor())
.schema(localSearchReq);
queryServers.add(schemaRequest);
}
ListenableFuture<List<KaldbSearch.SchemaResult>> searchFuture =
Futures.successfulAsList(queryServers);
try {
List<KaldbSearch.SchemaResult> searchResults =
searchFuture.get(requestTimeout.toMillis(), TimeUnit.MILLISECONDS);
KaldbSearch.SchemaResult.Builder schemaBuilder = KaldbSearch.SchemaResult.newBuilder();
searchResults.forEach(
schemaResult ->
schemaBuilder.putAllFieldDefinition(schemaResult.getFieldDefinitionMap()));
return schemaBuilder.build();
} catch (TimeoutException e) {
// We provide a deadline to the stub of "defaultQueryTimeout" - if this is sufficiently lower
// than the request timeout, we would expect searchFuture.get(requestTimeout) to never throw
// an exception. This however doesn't necessarily hold true if the query node is CPU
// saturated, and there is not enough cpu time to fail the pending stub queries that have
// exceeded their deadline - causing the searchFuture get to fail with a timeout.
LOG.error(
"Schema failed with timeout exception. This is potentially due to CPU saturation of the query node.",
e);
span.error(e);
return KaldbSearch.SchemaResult.newBuilder().build();
KaldbSearch.SchemaRequest localSearchReq =
distribSchemaReq.toBuilder()
.addAllChunkIds(searchNode.getValue())
.build();

return stub.withDeadlineAfter(
SCHEMA_TIMEOUT_MS, TimeUnit.MILLISECONDS)
.withInterceptors(
GrpcTracing.newBuilder(Tracing.current())
.build()
.newClientInterceptor())
.schema(localSearchReq)
.get();
})))
.toList();

try {
scope.joinUntil(Instant.now().plusMillis(SCHEMA_TIMEOUT_MS));
} catch (TimeoutException timeoutException) {
scope.shutdown();
scope.join();
}

KaldbSearch.SchemaResult.Builder schemaBuilder = KaldbSearch.SchemaResult.newBuilder();
for (StructuredTaskScope.Subtask<KaldbSearch.SchemaResult> schemaResult : searchSubtasks) {
try {
if (schemaResult.state().equals(StructuredTaskScope.Subtask.State.SUCCESS)) {
if (schemaResult.get() != null) {
schemaBuilder.putAllFieldDefinition(schemaResult.get().getFieldDefinitionMap());
} else {
LOG.error("Schema result was unexpectedly null {}", schemaResult);
}
} else {
LOG.error("Schema query result state was not success {}", schemaResult);
}
} catch (Exception e) {
LOG.error("Error fetching search result", e);
}
}
return schemaBuilder.build();
}
} catch (Exception e) {
LOG.error("Schema failed with ", e);
span.error(e);
return KaldbSearch.SchemaResult.newBuilder().build();
} finally {
// always request future cancellation, so that any exceptions or incomplete futures don't
// continue to consume CPU on work that will not be used
searchFuture.cancel(false);
LOG.debug("Finished distributed search for request: {}", distribSchemaReq);
span.finish();
}
}
Expand Down

0 comments on commit d55d87c

Please sign in to comment.