From df8b36ff203db97ef1bb661ccd1a20b72d1be6ac Mon Sep 17 00:00:00 2001 From: Bryan Burkholder <771133+bryanlb@users.noreply.github.com> Date: Wed, 25 Oct 2023 17:09:45 -0700 Subject: [PATCH 1/5] Fix rounding error in HPA autoscaler (#711) Co-authored-by: Bryan Burkholder --- .../kaldb/clusterManager/ClusterHpaMetricService.java | 8 +++++--- .../kaldb/clusterManager/ClusterHpaMetricServiceTest.java | 7 +++++++ 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/kaldb/src/main/java/com/slack/kaldb/clusterManager/ClusterHpaMetricService.java b/kaldb/src/main/java/com/slack/kaldb/clusterManager/ClusterHpaMetricService.java index c3445a818b..deddcb1e24 100644 --- a/kaldb/src/main/java/com/slack/kaldb/clusterManager/ClusterHpaMetricService.java +++ b/kaldb/src/main/java/com/slack/kaldb/clusterManager/ClusterHpaMetricService.java @@ -1,5 +1,6 @@ package com.slack.kaldb.clusterManager; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.AbstractScheduledService; import com.slack.kaldb.metadata.cache.CacheSlotMetadataStore; import com.slack.kaldb.metadata.hpa.HpaMetricMetadata; @@ -142,7 +143,8 @@ private void publishCacheHpaMetrics() { } } - private static double calculateDemandFactor( + @VisibleForTesting + protected static double calculateDemandFactor( long totalCacheSlotCapacity, long totalReplicaDemand) { if (totalCacheSlotCapacity == 0) { // we have no provisioned capacity, so cannot determine a value @@ -154,8 +156,8 @@ private static double calculateDemandFactor( } // demand factor will be < 1 indicating a scale-down demand, and > 1 indicating a scale-up double rawDemandFactor = (double) (totalReplicaDemand) / (totalCacheSlotCapacity); - // round to 2 decimals - return (double) Math.round(rawDemandFactor * 100) / 100; + // round up to 2 decimals + return Math.ceil(rawDemandFactor * 100) / 100; } /** Updates or inserts an (ephemeral) HPA metric for the cache nodes. This is NOT threadsafe. */ diff --git a/kaldb/src/test/java/com/slack/kaldb/clusterManager/ClusterHpaMetricServiceTest.java b/kaldb/src/test/java/com/slack/kaldb/clusterManager/ClusterHpaMetricServiceTest.java index da659296af..a55cf01294 100644 --- a/kaldb/src/test/java/com/slack/kaldb/clusterManager/ClusterHpaMetricServiceTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/clusterManager/ClusterHpaMetricServiceTest.java @@ -321,4 +321,11 @@ void twoReplicasetScaleup() { // 2 replicas, 0 slots (will log an error and return a default no-op) assertThat(rep2Metadata.getValue()).isEqualTo(1); } + + @Test + void testDemandFactorRounding() { + assertThat(ClusterHpaMetricService.calculateDemandFactor(100, 98)).isEqualTo(0.98); + assertThat(ClusterHpaMetricService.calculateDemandFactor(98, 100)).isEqualTo(1.03); + assertThat(ClusterHpaMetricService.calculateDemandFactor(9999, 10000)).isEqualTo(1.01); + } } From 392bdcc7eb5af843a7f372bdfc6a1426df7003f7 Mon Sep 17 00:00:00 2001 From: Bryan Burkholder <771133+bryanlb@users.noreply.github.com> Date: Wed, 25 Oct 2023 17:20:16 -0700 Subject: [PATCH 2/5] Refactor Zipkin API, fix duration bug (#710) Co-authored-by: Bryan Burkholder --- .../java/com/slack/kaldb/server/Kaldb.java | 1 + .../zipkinApi/ZipkinEndpointResponse.java | 21 + .../{server => zipkinApi}/ZipkinService.java | 88 ++-- .../kaldb/zipkinApi/ZipkinSpanResponse.java | 110 +++++ .../ZipkinServiceSpanConversionTest.java | 69 --- .../slack/kaldb/server/ZipkinServiceTest.java | 441 ------------------ .../ZipkinServiceSpanConversionTest.java | 72 +++ 7 files changed, 238 insertions(+), 564 deletions(-) create mode 100644 kaldb/src/main/java/com/slack/kaldb/zipkinApi/ZipkinEndpointResponse.java rename kaldb/src/main/java/com/slack/kaldb/{server => zipkinApi}/ZipkinService.java (78%) create mode 100644 kaldb/src/main/java/com/slack/kaldb/zipkinApi/ZipkinSpanResponse.java delete mode 100644 kaldb/src/test/java/com/slack/kaldb/server/ZipkinServiceSpanConversionTest.java delete mode 100644 kaldb/src/test/java/com/slack/kaldb/server/ZipkinServiceTest.java create mode 100644 kaldb/src/test/java/com/slack/kaldb/zipkinApi/ZipkinServiceSpanConversionTest.java diff --git a/kaldb/src/main/java/com/slack/kaldb/server/Kaldb.java b/kaldb/src/main/java/com/slack/kaldb/server/Kaldb.java index 23a3d9b920..e867ea750e 100644 --- a/kaldb/src/main/java/com/slack/kaldb/server/Kaldb.java +++ b/kaldb/src/main/java/com/slack/kaldb/server/Kaldb.java @@ -36,6 +36,7 @@ import com.slack.kaldb.proto.metadata.Metadata; import com.slack.kaldb.recovery.RecoveryService; import com.slack.kaldb.util.RuntimeHalterImpl; +import com.slack.kaldb.zipkinApi.ZipkinService; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.binder.jvm.ClassLoaderMetrics; diff --git a/kaldb/src/main/java/com/slack/kaldb/zipkinApi/ZipkinEndpointResponse.java b/kaldb/src/main/java/com/slack/kaldb/zipkinApi/ZipkinEndpointResponse.java new file mode 100644 index 0000000000..71a6f61080 --- /dev/null +++ b/kaldb/src/main/java/com/slack/kaldb/zipkinApi/ZipkinEndpointResponse.java @@ -0,0 +1,21 @@ +package com.slack.kaldb.zipkinApi; + +/** + * Endpoint response object for Zipkin API (local, remote) + * + * @see Zipkin API Spec + */ +@SuppressWarnings("unused") +public class ZipkinEndpointResponse { + private String serviceName; + + public ZipkinEndpointResponse() {} + + public String getServiceName() { + return serviceName; + } + + public void setServiceName(String serviceName) { + this.serviceName = serviceName; + } +} diff --git a/kaldb/src/main/java/com/slack/kaldb/server/ZipkinService.java b/kaldb/src/main/java/com/slack/kaldb/zipkinApi/ZipkinService.java similarity index 78% rename from kaldb/src/main/java/com/slack/kaldb/server/ZipkinService.java rename to kaldb/src/main/java/com/slack/kaldb/zipkinApi/ZipkinService.java index 9958fc4853..2c7ecbfa18 100644 --- a/kaldb/src/main/java/com/slack/kaldb/server/ZipkinService.java +++ b/kaldb/src/main/java/com/slack/kaldb/zipkinApi/ZipkinService.java @@ -1,12 +1,15 @@ -package com.slack.kaldb.server; +package com.slack.kaldb.zipkinApi; import static com.slack.kaldb.metadata.dataset.DatasetPartitionMetadata.MATCH_ALL_DATASET; import brave.Tracing; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.MapperFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.json.JsonMapper; import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.util.JsonFormat; import com.linecorp.armeria.common.HttpResponse; import com.linecorp.armeria.common.HttpStatus; import com.linecorp.armeria.common.MediaType; @@ -18,8 +21,10 @@ import com.slack.kaldb.logstore.LogMessage; import com.slack.kaldb.logstore.LogWireMessage; import com.slack.kaldb.proto.service.KaldbSearch; +import com.slack.kaldb.server.KaldbQueryServiceBase; import com.slack.kaldb.util.JsonUtil; import java.io.IOException; +import java.time.Duration; import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.ArrayList; @@ -27,11 +32,9 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.StringJoiner; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import zipkin2.proto3.Endpoint; -import zipkin2.proto3.Span; @SuppressWarnings("OptionalUsedAsFieldOrParameterType") /** @@ -47,8 +50,8 @@ public class ZipkinService { protected static String convertLogWireMessageToZipkinSpan(List messages) - throws InvalidProtocolBufferException { - List traces = new ArrayList<>(messages.size()); + throws JsonProcessingException { + List traces = new ArrayList<>(messages.size()); for (LogWireMessage message : messages) { if (message.getId() == null) { LOG.warn("Document={} cannot have missing id ", message); @@ -60,7 +63,7 @@ protected static String convertLogWireMessageToZipkinSpan(List m String name = null; String serviceName = null; String timestamp = String.valueOf(message.getTimestamp().toEpochMilli()); - long duration = Long.MIN_VALUE; + long duration = Integer.MIN_VALUE; Map messageTags = new HashMap<>(); for (String k : message.getSource().keySet()) { @@ -74,7 +77,7 @@ protected static String convertLogWireMessageToZipkinSpan(List m } else if (LogMessage.ReservedField.SERVICE_NAME.fieldName.equals(k)) { serviceName = (String) value; } else if (LogMessage.ReservedField.DURATION_MS.fieldName.equals(k)) { - duration = ((Number) value).longValue(); + duration = TimeUnit.MICROSECONDS.convert(Duration.ofMillis(((Number) value).intValue())); } else { messageTags.put(k, String.valueOf(value)); } @@ -92,24 +95,20 @@ protected static String convertLogWireMessageToZipkinSpan(List m continue; } - final long messageConvertedTimestamp = convertToMicroSeconds(message.getTimestamp()); - - final Span span = - makeSpan( - messageTraceId, - Optional.ofNullable(parentId), - message.getId(), - Optional.ofNullable(name), - Optional.ofNullable(serviceName), - messageConvertedTimestamp, - duration, - messageTags); - String spanJson = printer.print(span); - traces.add(spanJson); + final ZipkinSpanResponse span = new ZipkinSpanResponse(message.getId(), messageTraceId); + span.setParentId(parentId); + span.setName(name); + if (serviceName != null) { + ZipkinEndpointResponse remoteEndpoint = new ZipkinEndpointResponse(); + remoteEndpoint.setServiceName(serviceName); + span.setRemoteEndpoint(remoteEndpoint); + } + span.setTimestamp(convertToMicroSeconds(message.getTimestamp())); + span.setDuration(Math.toIntExact(duration)); + span.setTags(messageTags); + traces.add(span); } - StringJoiner outputJsonArray = new StringJoiner(",", "[", "]"); - traces.forEach(outputJsonArray::add); - return outputJsonArray.toString(); + return objectMapper.writeValueAsString(traces); } // returning LogWireMessage instead of LogMessage @@ -127,31 +126,6 @@ private static List searchResultToLogWireMessage( return messages; } - private static Span makeSpan( - String traceId, - Optional parentId, - String id, - Optional name, - Optional serviceName, - long timestamp, - long duration, - Map tags) { - Span.Builder spanBuilder = Span.newBuilder(); - - spanBuilder.setTraceId(ByteString.copyFrom(traceId.getBytes()).toStringUtf8()); - spanBuilder.setId(ByteString.copyFrom(id.getBytes()).toStringUtf8()); - spanBuilder.setTimestamp(timestamp); - spanBuilder.setDuration(duration); - - parentId.ifPresent( - s -> spanBuilder.setParentId(ByteString.copyFrom(s.getBytes()).toStringUtf8())); - name.ifPresent(spanBuilder::setName); - serviceName.ifPresent( - s -> spanBuilder.setRemoteEndpoint(Endpoint.newBuilder().setServiceName(s))); - spanBuilder.putAllTags(tags); - return spanBuilder.build(); - } - @VisibleForTesting protected static long convertToMicroSeconds(Instant instant) { return ChronoUnit.MICROS.between(Instant.EPOCH, instant); @@ -163,8 +137,14 @@ protected static long convertToMicroSeconds(Instant instant) { private static final int MAX_SPANS = 20_000; private final KaldbQueryServiceBase searcher; - private static final JsonFormat.Printer printer = - JsonFormat.printer().includingDefaultValueFields(); + + private static final ObjectMapper objectMapper = + JsonMapper.builder() + // sort alphabetically for easier test asserts + .configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true) + // don't serialize null values or empty maps + .serializationInclusion(JsonInclude.Include.NON_EMPTY) + .build(); public ZipkinService(KaldbQueryServiceBase searcher) { this.searcher = searcher; diff --git a/kaldb/src/main/java/com/slack/kaldb/zipkinApi/ZipkinSpanResponse.java b/kaldb/src/main/java/com/slack/kaldb/zipkinApi/ZipkinSpanResponse.java new file mode 100644 index 0000000000..173f756164 --- /dev/null +++ b/kaldb/src/main/java/com/slack/kaldb/zipkinApi/ZipkinSpanResponse.java @@ -0,0 +1,110 @@ +package com.slack.kaldb.zipkinApi; + +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Map; + +/** + * Span response object for Zipkin API + * + * @see Zipkin API Spec + */ +@SuppressWarnings("unused") +public class ZipkinSpanResponse { + private final String id; + private final String traceId; + private String parentId = null; + private String name = null; + + @JsonProperty("timestamp") + private Long timestampMicros = null; + + private ZipkinEndpointResponse localEndpoint = null; + + private ZipkinEndpointResponse remoteEndpoint = null; + + @JsonProperty("duration") + // Zipkin spec defines this is integer, even though a long seems like it would be more appropriate + private int durationMicros; + + private String kind; + + private Map tags; + + public ZipkinSpanResponse(String id, String traceId) { + // id and traceId are only required fields + this.id = id; + this.traceId = traceId; + } + + public void setParentId(String parentId) { + this.parentId = parentId; + } + + public void setName(String name) { + this.name = name; + } + + public void setTimestamp(long timestampMicros) { + this.timestampMicros = timestampMicros; + } + + public void setDuration(int durationMicros) { + this.durationMicros = durationMicros; + } + + public void setKind(String kind) { + this.kind = kind; + } + + public void setTags(Map tags) { + this.tags = tags; + } + + public ZipkinEndpointResponse getLocalEndpoint() { + return localEndpoint; + } + + public void setLocalEndpoint(ZipkinEndpointResponse localEndpoint) { + this.localEndpoint = localEndpoint; + } + + public ZipkinEndpointResponse getRemoteEndpoint() { + return remoteEndpoint; + } + + public void setRemoteEndpoint(ZipkinEndpointResponse remoteEndpoint) { + this.remoteEndpoint = remoteEndpoint; + } + + public String getId() { + return id; + } + + public String getTraceId() { + return traceId; + } + + public String getParentId() { + return parentId; + } + + public String getName() { + return name; + } + + public long getTimestamp() { + return timestampMicros; + } + + public int getDuration() { + return durationMicros; + } + + public String getKind() { + return kind; + } + + public Map getTags() { + return tags; + } +} diff --git a/kaldb/src/test/java/com/slack/kaldb/server/ZipkinServiceSpanConversionTest.java b/kaldb/src/test/java/com/slack/kaldb/server/ZipkinServiceSpanConversionTest.java deleted file mode 100644 index 464688ff59..0000000000 --- a/kaldb/src/test/java/com/slack/kaldb/server/ZipkinServiceSpanConversionTest.java +++ /dev/null @@ -1,69 +0,0 @@ -package com.slack.kaldb.server; - -import static com.slack.kaldb.server.ZipkinServiceTest.generateLogWireMessagesForOneTrace; -import static org.assertj.core.api.Assertions.assertThat; - -import com.google.protobuf.InvalidProtocolBufferException; -import com.slack.kaldb.logstore.LogWireMessage; -import java.time.Instant; -import java.util.ArrayList; -import java.util.List; -import org.junit.jupiter.api.Test; - -public class ZipkinServiceSpanConversionTest { - - @Test - public void testLogWireMessageToZipkinSpanConversion() throws InvalidProtocolBufferException { - Instant time = Instant.now(); - List messages = generateLogWireMessagesForOneTrace(time, 2, "1"); - - // follows output format from https://zipkin.io/zipkin-api/#/default/get_trace__traceId_ - String output = - String.format( - """ - [{ - "traceId": "1", - "parentId": "", - "id": "1", - "kind": "SPAN_KIND_UNSPECIFIED", - "name": "Trace1", - "timestamp": "%d", - "duration": "1", - "remoteEndpoint": { - "serviceName": "service1", - "ipv4": "", - "ipv6": "", - "port": 0 - }, - "annotations": [], - "tags": { - }, - "debug": false, - "shared": false - },{ - "traceId": "1", - "parentId": "1", - "id": "2", - "kind": "SPAN_KIND_UNSPECIFIED", - "name": "Trace2", - "timestamp": "%d", - "duration": "2", - "remoteEndpoint": { - "serviceName": "service1", - "ipv4": "", - "ipv6": "", - "port": 0 - }, - "annotations": [], - "tags": { - }, - "debug": false, - "shared": false - }]""", - ZipkinService.convertToMicroSeconds(time.plusSeconds(1)), - ZipkinService.convertToMicroSeconds(time.plusSeconds(2))); - assertThat(ZipkinService.convertLogWireMessageToZipkinSpan(messages)).isEqualTo(output); - - assertThat(ZipkinService.convertLogWireMessageToZipkinSpan(new ArrayList<>())).isEqualTo("[]"); - } -} diff --git a/kaldb/src/test/java/com/slack/kaldb/server/ZipkinServiceTest.java b/kaldb/src/test/java/com/slack/kaldb/server/ZipkinServiceTest.java deleted file mode 100644 index 27a2e240c2..0000000000 --- a/kaldb/src/test/java/com/slack/kaldb/server/ZipkinServiceTest.java +++ /dev/null @@ -1,441 +0,0 @@ -package com.slack.kaldb.server; - -import static com.slack.kaldb.logstore.LuceneIndexStoreImpl.MESSAGES_RECEIVED_COUNTER; -import static com.slack.kaldb.server.KaldbConfig.DEFAULT_START_STOP_DURATION; -import static com.slack.kaldb.testlib.ChunkManagerUtil.ZK_PATH_PREFIX; -import static com.slack.kaldb.testlib.KaldbSearchUtils.searchUsingGrpcApi; -import static com.slack.kaldb.testlib.MessageUtil.TEST_DATASET_NAME; -import static com.slack.kaldb.testlib.MessageUtil.TEST_MESSAGE_TYPE; -import static com.slack.kaldb.testlib.MetricsUtil.getCount; -import static com.slack.kaldb.testlib.TestKafkaServer.produceMessagesToKafka; -import static org.assertj.core.api.Assertions.assertThat; -import static org.awaitility.Awaitility.await; - -import com.adobe.testing.s3mock.junit5.S3MockExtension; -import com.linecorp.armeria.client.WebClient; -import com.linecorp.armeria.common.AggregatedHttpResponse; -import com.slack.kaldb.blobfs.s3.S3TestUtils; -import com.slack.kaldb.chunkManager.RollOverChunkTask; -import com.slack.kaldb.logstore.LogMessage; -import com.slack.kaldb.logstore.LogWireMessage; -import com.slack.kaldb.metadata.core.CuratorBuilder; -import com.slack.kaldb.metadata.core.KaldbMetadataTestUtils; -import com.slack.kaldb.metadata.dataset.DatasetMetadata; -import com.slack.kaldb.metadata.dataset.DatasetMetadataStore; -import com.slack.kaldb.metadata.dataset.DatasetPartitionMetadata; -import com.slack.kaldb.proto.config.KaldbConfigs; -import com.slack.kaldb.proto.service.KaldbSearch; -import com.slack.kaldb.testlib.KaldbConfigUtil; -import com.slack.kaldb.testlib.TestKafkaServer; -import io.micrometer.prometheus.PrometheusConfig; -import io.micrometer.prometheus.PrometheusMeterRegistry; -import java.nio.charset.StandardCharsets; -import java.time.Instant; -import java.time.temporal.ChronoUnit; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.stream.Collectors; -import org.apache.curator.test.TestingServer; -import org.apache.curator.x.async.AsyncCuratorFramework; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import software.amazon.awssdk.services.s3.S3AsyncClient; - -public class ZipkinServiceTest { - - private static final Logger LOG = LoggerFactory.getLogger(ZipkinServiceTest.class); - - private static final String TEST_S3_BUCKET = "test-s3-bucket"; - private static final String TEST_KAFKA_TOPIC_1 = "test-topic-1"; - private static final String KALDB_TEST_CLIENT_1 = "kaldb-test-client1"; - - private DatasetMetadataStore datasetMetadataStore; - private AsyncCuratorFramework curatorFramework; - private PrometheusMeterRegistry meterRegistry; - - @RegisterExtension - public static final S3MockExtension S3_MOCK_EXTENSION = - S3MockExtension.builder() - .withInitialBuckets(TEST_S3_BUCKET) - .silent() - .withSecureConnection(false) - .build(); - - private TestKafkaServer kafkaServer; - private TestingServer zkServer; - private S3AsyncClient s3Client; - - @BeforeEach - public void setUp() throws Exception { - zkServer = new TestingServer(); - kafkaServer = new TestKafkaServer(); - s3Client = S3TestUtils.createS3CrtClient(S3_MOCK_EXTENSION.getServiceEndpoint()); - - // We side load a service metadata entry telling it to create an entry with the partitions that - // we use in test - meterRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT); - KaldbConfigs.ZookeeperConfig zkConfig = - KaldbConfigs.ZookeeperConfig.newBuilder() - .setZkConnectString(zkServer.getConnectString()) - .setZkPathPrefix(ZK_PATH_PREFIX) - .setZkSessionTimeoutMs(1000) - .setZkConnectionTimeoutMs(1000) - .setSleepBetweenRetriesMs(1000) - .build(); - curatorFramework = CuratorBuilder.build(meterRegistry, zkConfig); - datasetMetadataStore = new DatasetMetadataStore(curatorFramework, true); - final DatasetPartitionMetadata partition = - new DatasetPartitionMetadata(1, Long.MAX_VALUE, List.of("0", "1")); - final List partitionConfigs = Collections.singletonList(partition); - DatasetMetadata datasetMetadata = - new DatasetMetadata( - TEST_DATASET_NAME, "serviceOwner", 1000, partitionConfigs, TEST_DATASET_NAME); - datasetMetadataStore.createSync(datasetMetadata); - await().until(() -> KaldbMetadataTestUtils.listSyncUncached(datasetMetadataStore).size() == 1); - } - - @AfterEach - public void teardown() throws Exception { - if (kafkaServer != null) { - kafkaServer.close(); - } - if (meterRegistry != null) { - meterRegistry.close(); - } - if (datasetMetadataStore != null) { - datasetMetadataStore.close(); - } - if (curatorFramework != null) { - curatorFramework.unwrap().close(); - } - if (zkServer != null) { - zkServer.close(); - } - } - - public static LogWireMessage makeWireMessageForSpans( - String id, - Instant ts, - String traceId, - Optional parentId, - long durationMs, - String serviceName, - String name) { - Map fieldMap = new HashMap<>(); - fieldMap.put(LogMessage.ReservedField.TRACE_ID.fieldName, traceId); - fieldMap.put(LogMessage.ReservedField.SERVICE_NAME.fieldName, serviceName); - fieldMap.put(LogMessage.ReservedField.NAME.fieldName, name); - parentId.ifPresent(s -> fieldMap.put(LogMessage.ReservedField.PARENT_ID.fieldName, s)); - fieldMap.put(LogMessage.ReservedField.DURATION_MS.fieldName, durationMs); - return new LogWireMessage(TEST_DATASET_NAME, TEST_MESSAGE_TYPE, id, ts, fieldMap); - } - - public static List generateLogWireMessagesForOneTrace( - Instant time, int count, String traceId) { - List messages = new ArrayList<>(); - for (int i = 1; i <= count; i++) { - String parentId = null; - if (i > 1) { - parentId = String.valueOf(i - 1); - } - messages.add( - makeWireMessageForSpans( - String.valueOf(i), - time.plusSeconds(i), - traceId, - Optional.ofNullable(parentId), - i, - "service1", - ("Trace" + i))); - } - return messages; - } - - @Test - @Disabled // Flakey test, occasionally returns an empty result - public void testDistributedQueryOneIndexerOneQueryNode() throws Exception { - assertThat(kafkaServer.getBroker().isRunning()).isTrue(); - - LOG.info("Starting query service"); - int queryServicePort = 8887; - KaldbConfigs.KaldbConfig queryServiceConfig = - KaldbConfigUtil.makeKaldbConfig( - "localhost:" + kafkaServer.getBroker().getKafkaPort().get(), - -1, - TEST_KAFKA_TOPIC_1, - 0, - KALDB_TEST_CLIENT_1, - TEST_S3_BUCKET, - queryServicePort, - zkServer.getConnectString(), - ZK_PATH_PREFIX, - KaldbConfigs.NodeRole.QUERY, - 1000, - "api_Log", - -1, - 100); - Kaldb queryService = new Kaldb(queryServiceConfig, meterRegistry); - queryService.start(); - queryService.serviceManager.awaitHealthy(DEFAULT_START_STOP_DURATION); - - int indexerPort = 10000; - int totalMessagesToIndex = 8; - LOG.info( - "Creating indexer service at port {}, topic: {} and partition {}", - indexerPort, - TEST_KAFKA_TOPIC_1, - 0); - // create a kaldb indexer - KaldbConfigs.KaldbConfig indexerConfig = - KaldbConfigUtil.makeKaldbConfig( - "localhost:" + kafkaServer.getBroker().getKafkaPort().get(), - indexerPort, - TEST_KAFKA_TOPIC_1, - 0, - KALDB_TEST_CLIENT_1, - TEST_S3_BUCKET, - -1, - zkServer.getConnectString(), - ZK_PATH_PREFIX, - KaldbConfigs.NodeRole.INDEX, - 1000, - "api_log", - 9003, - totalMessagesToIndex); - - PrometheusMeterRegistry indexerMeterRegistry = - new PrometheusMeterRegistry(PrometheusConfig.DEFAULT); - Kaldb indexer = new Kaldb(indexerConfig, s3Client, indexerMeterRegistry); - indexer.start(); - indexer.serviceManager.awaitHealthy(DEFAULT_START_STOP_DURATION); - await().until(() -> kafkaServer.getConnectedConsumerGroups() == 1); - - // Produce messages to kafka, so the indexer can consume them. - final Instant trace1StartTime = Instant.now().minus(20, ChronoUnit.MINUTES); - List messages = - new ArrayList<>(generateLogWireMessagesForOneTrace(trace1StartTime, 2, "1")); - - final Instant trace2StartTime = Instant.now().minus(10, ChronoUnit.MINUTES); - messages.addAll(generateLogWireMessagesForOneTrace(trace2StartTime, 5, "2")); - - final Instant trace3StartTime = Instant.now().minus(5, ChronoUnit.MINUTES); - messages.addAll(generateLogWireMessagesForOneTrace(trace3StartTime, 1, "3")); - - List logMessages = - messages.stream().map(LogMessage::fromWireMessage).collect(Collectors.toList()); - - final int indexedMessagesCount = - produceMessagesToKafka(kafkaServer.getBroker(), TEST_KAFKA_TOPIC_1, 0, logMessages); - assertThat(totalMessagesToIndex).isEqualTo(indexedMessagesCount); - - await() - .until( - () -> - getCount(MESSAGES_RECEIVED_COUNTER, indexerMeterRegistry) == indexedMessagesCount); - - await().until(() -> getCount(RollOverChunkTask.ROLLOVERS_COMPLETED, indexerMeterRegistry) == 1); - assertThat(getCount(RollOverChunkTask.ROLLOVERS_FAILED, indexerMeterRegistry)).isZero(); - - // Query from the grpc search service - KaldbSearch.SearchResult queryServiceSearchResponse = - searchUsingGrpcApi("*:*", queryServicePort, 0, Instant.now().toEpochMilli(), "365d"); - - assertThat(queryServiceSearchResponse.getTotalNodes()).isEqualTo(1); - assertThat(queryServiceSearchResponse.getFailedNodes()).isEqualTo(0); - assertThat(queryServiceSearchResponse.getHitsCount()).isEqualTo(indexedMessagesCount); - - // Query from the zipkin search service - String endpoint = "http://127.0.0.1:" + queryServicePort; - WebClient webClient = WebClient.of(endpoint); - AggregatedHttpResponse response = webClient.get("/api/v2/trace/1").aggregate().join(); - String body = response.content(StandardCharsets.UTF_8); - assertThat(response.status().code()).isEqualTo(200); - String expectedTrace = - String.format( - """ - [ - { - "traceId": "1", - "parentId": "1", - "id": "localhost:100:1", - "kind": "SPAN_KIND_UNSPECIFIED", - "name": "Trace2", - "timestamp": "%d", - "duration": "2", - "remoteEndpoint": { - "serviceName": "testDataSet", - "ipv4": "", - "ipv6": "", - "port": 0 - }, - "annotations": [], - "tags": { - "hostname": "localhost" - }, - "debug": false, - "shared": false - }, - { - "traceId": "1", - "parentId": "", - "id": "localhost:100:0", - "kind": "SPAN_KIND_UNSPECIFIED", - "name": "Trace1", - "timestamp": "%d", - "duration": "1", - "remoteEndpoint": { - "serviceName": "service1", - "ipv4": "", - "ipv6": "", - "port": 0 - }, - "annotations": [], - "tags": { - "hostname": "localhost" - }, - "debug": false, - "shared": false - } - ]""", - ZipkinService.convertToMicroSeconds(trace1StartTime.plusSeconds(2)), - ZipkinService.convertToMicroSeconds(trace1StartTime.plusSeconds(1))); - assertThat(body).isEqualTo(expectedTrace); - - String params = - String.format( - "?startTimeEpochMs=%d&endTimeEpochMs=%d", - trace1StartTime.minus(10, ChronoUnit.SECONDS).toEpochMilli(), - trace1StartTime.plus(5, ChronoUnit.SECONDS).toEpochMilli()); - response = webClient.get("/api/v2/trace/1" + params).aggregate().join(); - body = response.content(StandardCharsets.UTF_8); - assertThat(response.status().code()).isEqualTo(200); - assertThat(body).isEqualTo(expectedTrace); - - params = - String.format( - "?startTimeEpochMs=%d&endTimeEpochMs=%d", - trace1StartTime.plus(0, ChronoUnit.SECONDS).toEpochMilli(), - trace1StartTime.plus(2, ChronoUnit.SECONDS).toEpochMilli()); - response = webClient.get("/api/v2/trace/1" + params).aggregate().join(); - body = response.content(StandardCharsets.UTF_8); - assertThat(response.status().code()).isEqualTo(200); - assertThat(body).isEqualTo(expectedTrace); - - params = - String.format( - "?startTimeEpochMs=%d&endTimeEpochMs=%d", - trace1StartTime.plus(1, ChronoUnit.SECONDS).toEpochMilli(), - trace1StartTime.plus(2, ChronoUnit.SECONDS).toEpochMilli()); - response = webClient.get("/api/v2/trace/1" + params).aggregate().join(); - body = response.content(StandardCharsets.UTF_8); - assertThat(response.status().code()).isEqualTo(200); - assertThat(body).isEqualTo(expectedTrace); - - params = String.format("?maxSpans=%d", 1); - response = webClient.get("/api/v2/trace/1" + params).aggregate().join(); - body = response.content(StandardCharsets.UTF_8); - assertThat(response.status().code()).isEqualTo(200); - expectedTrace = - String.format( - """ - [ - { - "traceId": "1", - "parentId": "1", - "id": "localhost:100:1", - "kind": "SPAN_KIND_UNSPECIFIED", - "name": "Trace2", - "timestamp": "%d", - "duration": "2", - "remoteEndpoint": { - "serviceName": "service1", - "ipv4": "", - "ipv6": "", - "port": 0 - }, - "annotations": [], - "tags": { - "hostname": "localhost" - }, - "debug": false, - "shared": false - } - ]""", - ZipkinService.convertToMicroSeconds(trace1StartTime.plusSeconds(2))); - assertThat(body).isEqualTo(expectedTrace); - - params = - String.format( - "?startTimeEpochMs=%d&endTimeEpochMs=%d", - trace1StartTime.plus(2, ChronoUnit.SECONDS).toEpochMilli(), - trace1StartTime.plus(3, ChronoUnit.SECONDS).toEpochMilli()); - response = webClient.get("/api/v2/trace/1" + params).aggregate().join(); - body = response.content(StandardCharsets.UTF_8); - assertThat(response.status().code()).isEqualTo(200); - assertThat(body).isEqualTo(expectedTrace); - - params = - String.format( - "?startTimeEpochMs=%d&endTimeEpochMs=%d", - trace1StartTime.minus(10, ChronoUnit.SECONDS).toEpochMilli(), - trace1StartTime.minus(1, ChronoUnit.SECONDS).toEpochMilli()); - response = webClient.get("/api/v2/trace/1" + params).aggregate().join(); - body = response.content(StandardCharsets.UTF_8); - assertThat(response.status().code()).isEqualTo(200); - expectedTrace = "[]"; - assertThat(body).isEqualTo(expectedTrace); - - response = webClient.get("/api/v2/trace/3").aggregate().join(); - body = response.content(StandardCharsets.UTF_8); - assertThat(response.status().code()).isEqualTo(200); - expectedTrace = - String.format( - """ - [ - { - "traceId": "3", - "parentId": "", - "id": "localhost:100:7", - "kind": "SPAN_KIND_UNSPECIFIED", - "name": "Trace1", - "timestamp": "%d", - "duration": "1", - "remoteEndpoint": { - "serviceName": "service1", - "ipv4": "", - "ipv6": "", - "port": 0 - }, - "annotations": [], - "tags": { - "hostname": "localhost" - }, - "debug": false, - "shared": false - } - ]""", - ZipkinService.convertToMicroSeconds(trace3StartTime.plusSeconds(1))); - assertThat(body).isEqualTo(expectedTrace); - - response = webClient.get("/api/v2/trace/4").aggregate().join(); - body = response.content(StandardCharsets.UTF_8); - assertThat(response.status().code()).isEqualTo(200); - expectedTrace = "[]"; - assertThat(body).isEqualTo(expectedTrace); - - // Shutdown - LOG.info("Shutting down query service."); - queryService.shutdown(); - LOG.info("Shutting down indexer."); - indexer.shutdown(); - } -} diff --git a/kaldb/src/test/java/com/slack/kaldb/zipkinApi/ZipkinServiceSpanConversionTest.java b/kaldb/src/test/java/com/slack/kaldb/zipkinApi/ZipkinServiceSpanConversionTest.java new file mode 100644 index 0000000000..52d13ac4e7 --- /dev/null +++ b/kaldb/src/test/java/com/slack/kaldb/zipkinApi/ZipkinServiceSpanConversionTest.java @@ -0,0 +1,72 @@ +package com.slack.kaldb.zipkinApi; + +import static com.slack.kaldb.testlib.MessageUtil.TEST_DATASET_NAME; +import static com.slack.kaldb.testlib.MessageUtil.TEST_MESSAGE_TYPE; +import static org.assertj.core.api.Assertions.assertThat; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.slack.kaldb.logstore.LogMessage; +import com.slack.kaldb.logstore.LogWireMessage; +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import org.junit.jupiter.api.Test; + +public class ZipkinServiceSpanConversionTest { + private static LogWireMessage makeWireMessageForSpans( + String id, + Instant ts, + String traceId, + Optional parentId, + long durationMs, + String serviceName, + String name) { + Map fieldMap = new HashMap<>(); + fieldMap.put(LogMessage.ReservedField.TRACE_ID.fieldName, traceId); + fieldMap.put(LogMessage.ReservedField.SERVICE_NAME.fieldName, serviceName); + fieldMap.put(LogMessage.ReservedField.NAME.fieldName, name); + parentId.ifPresent(s -> fieldMap.put(LogMessage.ReservedField.PARENT_ID.fieldName, s)); + fieldMap.put(LogMessage.ReservedField.DURATION_MS.fieldName, durationMs); + return new LogWireMessage(TEST_DATASET_NAME, TEST_MESSAGE_TYPE, id, ts, fieldMap); + } + + private static List generateLogWireMessagesForOneTrace( + Instant time, int count, String traceId) { + List messages = new ArrayList<>(); + for (int i = 1; i <= count; i++) { + String parentId = null; + if (i > 1) { + parentId = String.valueOf(i - 1); + } + messages.add( + makeWireMessageForSpans( + String.valueOf(i), + time.plusSeconds(i), + traceId, + Optional.ofNullable(parentId), + i, + "service1", + ("Trace" + i))); + } + return messages; + } + + @Test + public void testLogWireMessageToZipkinSpanConversion() throws JsonProcessingException { + Instant time = Instant.now(); + List messages = generateLogWireMessagesForOneTrace(time, 2, "1"); + + // follows output format from https://zipkin.io/zipkin-api/#/default/get_trace__traceId_ + String output = + String.format( + "[{\"duration\":1000,\"id\":\"1\",\"name\":\"Trace1\",\"remoteEndpoint\":{\"serviceName\":\"service1\"},\"timestamp\":%d,\"traceId\":\"1\"},{\"duration\":2000,\"id\":\"2\",\"name\":\"Trace2\",\"parentId\":\"1\",\"remoteEndpoint\":{\"serviceName\":\"service1\"},\"timestamp\":%d,\"traceId\":\"1\"}]", + ZipkinService.convertToMicroSeconds(time.plusSeconds(1)), + ZipkinService.convertToMicroSeconds(time.plusSeconds(2))); + assertThat(ZipkinService.convertLogWireMessageToZipkinSpan(messages)).isEqualTo(output); + + assertThat(ZipkinService.convertLogWireMessageToZipkinSpan(new ArrayList<>())).isEqualTo("[]"); + } +} From ee74bc30d800c3452b7f47486b981410320c6798 Mon Sep 17 00:00:00 2001 From: Bryan Burkholder <771133+bryanlb@users.noreply.github.com> Date: Thu, 26 Oct 2023 10:35:43 -0700 Subject: [PATCH 3/5] Rewrite Lucene queries for better caching compatibility (#708) Co-authored-by: Bryan Burkholder Co-authored-by: Varun Thacker --- .../java/com/slack/kaldb/QueryBenchmark.java | 2 +- .../slack/kaldb/chunk/ReadOnlyChunkImpl.java | 35 +++++++- .../opensearch/OpenSearchAdapter.java | 25 ++++-- .../SchemaAwareLogDocumentBuilderImpl.java | 2 +- .../logstore/search/LogIndexSearcher.java | 2 +- .../logstore/search/LogIndexSearcherImpl.java | 12 ++- .../kaldb/chunk/ReadOnlyChunkImplTest.java | 21 +++++ .../logstore/LuceneIndexStoreImplTest.java | 6 +- .../opensearch/OpenSearchAdapterTest.java | 62 ++++++++++++- .../AlreadyClosedLogIndexSearcherImpl.java | 4 +- .../IllegalArgumentLogIndexSearcherImpl.java | 4 +- .../search/LogIndexSearcherImplTest.java | 88 +++++++++---------- ...TemporaryLogStoreAndSearcherExtension.java | 2 +- 13 files changed, 195 insertions(+), 70 deletions(-) diff --git a/benchmarks/src/main/java/com/slack/kaldb/QueryBenchmark.java b/benchmarks/src/main/java/com/slack/kaldb/QueryBenchmark.java index 380a9c4d1e..19c28c65de 100644 --- a/benchmarks/src/main/java/com/slack/kaldb/QueryBenchmark.java +++ b/benchmarks/src/main/java/com/slack/kaldb/QueryBenchmark.java @@ -126,7 +126,7 @@ public void measureLogSearcherSearch() { logIndexSearcher.search( "*", "", - 0, + 0L, Long.MAX_VALUE, 500, new DateHistogramAggBuilder( diff --git a/kaldb/src/main/java/com/slack/kaldb/chunk/ReadOnlyChunkImpl.java b/kaldb/src/main/java/com/slack/kaldb/chunk/ReadOnlyChunkImpl.java index c255fcd3e0..e59d0645cf 100644 --- a/kaldb/src/main/java/com/slack/kaldb/chunk/ReadOnlyChunkImpl.java +++ b/kaldb/src/main/java/com/slack/kaldb/chunk/ReadOnlyChunkImpl.java @@ -381,15 +381,46 @@ public String id() { @Override public SearchResult query(SearchQuery query) { if (logSearcher != null) { + Long searchStartTime = + determineStartTime(query.startTimeEpochMs, chunkInfo.getDataStartTimeEpochMs()); + Long searchEndTime = + determineEndTime(query.endTimeEpochMs, chunkInfo.getDataEndTimeEpochMs()); + return logSearcher.search( query.dataset, query.queryStr, - query.startTimeEpochMs, - query.endTimeEpochMs, + searchStartTime, + searchEndTime, query.howMany, query.aggBuilder); } else { return (SearchResult) SearchResult.empty(); } } + + /** + * Determines the start time to use for the query, given the original query start time and the + * start time of data in the chunk + */ + protected static Long determineStartTime(long queryStartTimeEpochMs, long chunkStartTimeEpochMs) { + Long searchStartTime = null; + if (queryStartTimeEpochMs > chunkStartTimeEpochMs) { + // if the query start time falls after the beginning of the chunk + searchStartTime = queryStartTimeEpochMs; + } + return searchStartTime; + } + + /** + * Determines the end time to use for the query, given the original query end time and the end + * time of data in the chunk + */ + protected static Long determineEndTime(long queryEndTimeEpochMs, long chunkEndTimeEpochMs) { + Long searchEndTime = null; + if (queryEndTimeEpochMs < chunkEndTimeEpochMs) { + // if the query end time falls before the end of the chunk + searchEndTime = queryEndTimeEpochMs; + } + return searchEndTime; + } } diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/opensearch/OpenSearchAdapter.java b/kaldb/src/main/java/com/slack/kaldb/logstore/opensearch/OpenSearchAdapter.java index 5a3bdc47c0..36dbff037c 100644 --- a/kaldb/src/main/java/com/slack/kaldb/logstore/opensearch/OpenSearchAdapter.java +++ b/kaldb/src/main/java/com/slack/kaldb/logstore/opensearch/OpenSearchAdapter.java @@ -143,8 +143,8 @@ public OpenSearchAdapter(Map chunkSchema) { public Query buildQuery( String dataset, String queryStr, - long startTimeMsEpoch, - long endTimeMsEpoch, + Long startTimeMsEpoch, + Long endTimeMsEpoch, IndexSearcher indexSearcher) throws IOException { LOG.trace("Query raw input string: '{}'", queryStr); @@ -158,11 +158,22 @@ public Query buildQuery( try { BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder(); - RangeQueryBuilder rangeQueryBuilder = - new RangeQueryBuilder(LogMessage.SystemField.TIME_SINCE_EPOCH.fieldName) - .gte(startTimeMsEpoch) - .lte(endTimeMsEpoch); - boolQueryBuilder.filter(rangeQueryBuilder); + // only add a range filter if either start or end time is provided + if (startTimeMsEpoch != null || endTimeMsEpoch != null) { + RangeQueryBuilder rangeQueryBuilder = + new RangeQueryBuilder(LogMessage.SystemField.TIME_SINCE_EPOCH.fieldName); + + // todo - consider supporting something other than GTE/LTE (ie GT/LT?) + if (startTimeMsEpoch != null) { + rangeQueryBuilder.gte(startTimeMsEpoch); + } + + if (endTimeMsEpoch != null) { + rangeQueryBuilder.lte(endTimeMsEpoch); + } + + boolQueryBuilder.filter(rangeQueryBuilder); + } // todo - dataset? diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/schema/SchemaAwareLogDocumentBuilderImpl.java b/kaldb/src/main/java/com/slack/kaldb/logstore/schema/SchemaAwareLogDocumentBuilderImpl.java index d8ad4185cf..16edce5ea5 100644 --- a/kaldb/src/main/java/com/slack/kaldb/logstore/schema/SchemaAwareLogDocumentBuilderImpl.java +++ b/kaldb/src/main/java/com/slack/kaldb/logstore/schema/SchemaAwareLogDocumentBuilderImpl.java @@ -48,7 +48,7 @@ private static void addTextField( } // TODO: Move this definition to the config file. - private static ImmutableMap getDefaultLuceneFieldDefinitions( + public static ImmutableMap getDefaultLuceneFieldDefinitions( boolean enableFullTextSearch) { ImmutableMap.Builder fieldDefBuilder = ImmutableMap.builder(); diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/search/LogIndexSearcher.java b/kaldb/src/main/java/com/slack/kaldb/logstore/search/LogIndexSearcher.java index 23c98c20fe..3092962f1f 100644 --- a/kaldb/src/main/java/com/slack/kaldb/logstore/search/LogIndexSearcher.java +++ b/kaldb/src/main/java/com/slack/kaldb/logstore/search/LogIndexSearcher.java @@ -5,5 +5,5 @@ public interface LogIndexSearcher extends Closeable { SearchResult search( - String dataset, String query, long minTime, long maxTime, int howMany, AggBuilder aggBuilder); + String dataset, String query, Long minTime, Long maxTime, int howMany, AggBuilder aggBuilder); } diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/search/LogIndexSearcherImpl.java b/kaldb/src/main/java/com/slack/kaldb/logstore/search/LogIndexSearcherImpl.java index f7e7e1d06e..4188b12a50 100644 --- a/kaldb/src/main/java/com/slack/kaldb/logstore/search/LogIndexSearcherImpl.java +++ b/kaldb/src/main/java/com/slack/kaldb/logstore/search/LogIndexSearcherImpl.java @@ -84,15 +84,19 @@ public void afterRefresh(boolean didRefresh) { public SearchResult search( String dataset, String queryStr, - long startTimeMsEpoch, - long endTimeMsEpoch, + Long startTimeMsEpoch, + Long endTimeMsEpoch, int howMany, AggBuilder aggBuilder) { ensureNonEmptyString(dataset, "dataset should be a non-empty string"); ensureNonNullString(queryStr, "query should be a non-empty string"); - ensureTrue(startTimeMsEpoch >= 0, "start time should be non-negative value"); - ensureTrue(startTimeMsEpoch < endTimeMsEpoch, "end time should be greater than start time"); + if (startTimeMsEpoch != null) { + ensureTrue(startTimeMsEpoch >= 0, "start time should be non-negative value"); + } + if (startTimeMsEpoch != null && endTimeMsEpoch != null) { + ensureTrue(startTimeMsEpoch < endTimeMsEpoch, "end time should be greater than start time"); + } ensureTrue(howMany >= 0, "hits requested should not be negative."); ensureTrue(howMany > 0 || aggBuilder != null, "Hits or aggregation should be requested."); diff --git a/kaldb/src/test/java/com/slack/kaldb/chunk/ReadOnlyChunkImplTest.java b/kaldb/src/test/java/com/slack/kaldb/chunk/ReadOnlyChunkImplTest.java index 2a8dd4a516..da7d7c14ca 100644 --- a/kaldb/src/test/java/com/slack/kaldb/chunk/ReadOnlyChunkImplTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/chunk/ReadOnlyChunkImplTest.java @@ -458,6 +458,27 @@ public void closeShouldCleanupLiveChunkCorrectly() throws Exception { curatorFramework.unwrap().close(); } + @Test + public void shouldUseOptimizedQueryStartEndTime() { + // Query is before the chunk data, so do not return a start time + assertThat(ReadOnlyChunkImpl.determineStartTime(10, 12)).isNull(); + + // Query matches chunk start time, do not return a start time + assertThat(ReadOnlyChunkImpl.determineStartTime(10, 10)).isNull(); + + // Query only matches part of the chunk, return the query start time + assertThat(ReadOnlyChunkImpl.determineStartTime(10, 9)).isEqualTo(10); + + // Query only matches part of the chunk, return the query end time + assertThat(ReadOnlyChunkImpl.determineEndTime(10, 12)).isEqualTo(10); + + // Query matches chunk end time, do not return an end time + assertThat(ReadOnlyChunkImpl.determineEndTime(10, 10)).isNull(); + + // Query is after the chunk data, so do not return an end time + assertThat(ReadOnlyChunkImpl.determineEndTime(12, 10)).isNull(); + } + private void assignReplicaToChunk( CacheSlotMetadataStore cacheSlotMetadataStore, String replicaId, diff --git a/kaldb/src/test/java/com/slack/kaldb/logstore/LuceneIndexStoreImplTest.java b/kaldb/src/test/java/com/slack/kaldb/logstore/LuceneIndexStoreImplTest.java index 50221d563d..464a6c4ea3 100644 --- a/kaldb/src/test/java/com/slack/kaldb/logstore/LuceneIndexStoreImplTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/logstore/LuceneIndexStoreImplTest.java @@ -100,7 +100,7 @@ public void testSearchAndQueryDocsWithNestedJson() throws InterruptedException { logStore.logSearcher.search( MessageUtil.TEST_DATASET_NAME, "nested.key1:value1", - 0, + 0L, MAX_TIME, 100, new DateHistogramAggBuilder( @@ -111,7 +111,7 @@ public void testSearchAndQueryDocsWithNestedJson() throws InterruptedException { logStore.logSearcher.search( MessageUtil.TEST_DATASET_NAME, "duplicateproperty:duplicate1", - 0, + 0L, MAX_TIME, 100, new DateHistogramAggBuilder( @@ -122,7 +122,7 @@ public void testSearchAndQueryDocsWithNestedJson() throws InterruptedException { logStore.logSearcher.search( MessageUtil.TEST_DATASET_NAME, "nested.duplicateproperty:2", - 0, + 0L, MAX_TIME, 100, new DateHistogramAggBuilder( diff --git a/kaldb/src/test/java/com/slack/kaldb/logstore/opensearch/OpenSearchAdapterTest.java b/kaldb/src/test/java/com/slack/kaldb/logstore/opensearch/OpenSearchAdapterTest.java index 463f94a3d0..38a5502316 100644 --- a/kaldb/src/test/java/com/slack/kaldb/logstore/opensearch/OpenSearchAdapterTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/logstore/opensearch/OpenSearchAdapterTest.java @@ -4,6 +4,7 @@ import static org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType; import com.slack.kaldb.logstore.LogMessage; +import com.slack.kaldb.logstore.schema.SchemaAwareLogDocumentBuilderImpl; import com.slack.kaldb.logstore.search.aggregations.AggBuilder; import com.slack.kaldb.logstore.search.aggregations.AggBuilderBase; import com.slack.kaldb.logstore.search.aggregations.AvgAggBuilder; @@ -22,7 +23,13 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Optional; +import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.CollectorManager; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.IndexSortSortedNumericDocValuesRangeQuery; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.Query; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.opensearch.search.aggregations.AbstractAggregationBuilder; @@ -48,9 +55,14 @@ public class OpenSearchAdapterTest { public TemporaryLogStoreAndSearcherExtension logStoreAndSearcherRule = new TemporaryLogStoreAndSearcherExtension(false); - private final OpenSearchAdapter openSearchAdapter = new OpenSearchAdapter(Map.of()); + private final OpenSearchAdapter openSearchAdapter = + new OpenSearchAdapter( + SchemaAwareLogDocumentBuilderImpl.getDefaultLuceneFieldDefinitions(false)); - public OpenSearchAdapterTest() throws IOException {} + public OpenSearchAdapterTest() throws IOException { + // We need to reload the schema so that query optimizations take into account the schema + openSearchAdapter.reloadSchema(); + } @Test public void safelyHandlesUnknownAggregations() throws IOException { @@ -391,4 +403,50 @@ public void handlesDateHistogramExtendedBoundsMinDocEdgeCases() throws IOExcepti assertThatExceptionOfType(IllegalArgumentException.class) .isThrownBy(() -> collectorManager.newCollector()); } + + @Test + public void shouldExcludeDateFilterWhenNullTimestamps() throws Exception { + IndexSearcher indexSearcher = logStoreAndSearcherRule.logStore.getSearcherManager().acquire(); + Query nullBothTimestamps = openSearchAdapter.buildQuery("foo", "", null, null, indexSearcher); + // null for both timestamps with no query string should be optimized into a matchall + assertThat(nullBothTimestamps).isInstanceOf(MatchAllDocsQuery.class); + + Query nullStartTimestamp = openSearchAdapter.buildQuery("foo", "a", null, 100L, indexSearcher); + assertThat(nullStartTimestamp).isInstanceOf(BooleanQuery.class); + + Optional filterNullStartQuery = + ((BooleanQuery) nullStartTimestamp) + .clauses().stream() + .filter( + booleanClause -> + booleanClause.getQuery() + instanceof IndexSortSortedNumericDocValuesRangeQuery) + .map( + booleanClause -> + (IndexSortSortedNumericDocValuesRangeQuery) booleanClause.getQuery()) + .findFirst(); + assertThat(filterNullStartQuery).isPresent(); + // a null start and provided end should result in an optimized range query of min long to the + // end value + assertThat(filterNullStartQuery.get().toString()).contains(String.valueOf(Long.MIN_VALUE)); + assertThat(filterNullStartQuery.get().toString()).contains(String.valueOf(100L)); + + Query nullEndTimestamp = openSearchAdapter.buildQuery("foo", "", 100L, null, indexSearcher); + Optional filterNullEndQuery = + ((BooleanQuery) nullEndTimestamp) + .clauses().stream() + .filter( + booleanClause -> + booleanClause.getQuery() + instanceof IndexSortSortedNumericDocValuesRangeQuery) + .map( + booleanClause -> + (IndexSortSortedNumericDocValuesRangeQuery) booleanClause.getQuery()) + .findFirst(); + assertThat(filterNullEndQuery).isPresent(); + // a null end and provided start should result in an optimized range query of start value to max + // long + assertThat(filterNullEndQuery.get().toString()).contains(String.valueOf(100L)); + assertThat(filterNullEndQuery.get().toString()).contains(String.valueOf(Long.MAX_VALUE)); + } } diff --git a/kaldb/src/test/java/com/slack/kaldb/logstore/search/AlreadyClosedLogIndexSearcherImpl.java b/kaldb/src/test/java/com/slack/kaldb/logstore/search/AlreadyClosedLogIndexSearcherImpl.java index b67643353e..80ffd84fb7 100644 --- a/kaldb/src/test/java/com/slack/kaldb/logstore/search/AlreadyClosedLogIndexSearcherImpl.java +++ b/kaldb/src/test/java/com/slack/kaldb/logstore/search/AlreadyClosedLogIndexSearcherImpl.java @@ -9,8 +9,8 @@ public class AlreadyClosedLogIndexSearcherImpl implements LogIndexSearcher search( String dataset, String query, - long minTime, - long maxTime, + Long minTime, + Long maxTime, int howMany, AggBuilder aggBuilder) { throw new AlreadyClosedException("Failed to acquire an index searcher"); diff --git a/kaldb/src/test/java/com/slack/kaldb/logstore/search/IllegalArgumentLogIndexSearcherImpl.java b/kaldb/src/test/java/com/slack/kaldb/logstore/search/IllegalArgumentLogIndexSearcherImpl.java index b6df6cdcc8..a8df466587 100644 --- a/kaldb/src/test/java/com/slack/kaldb/logstore/search/IllegalArgumentLogIndexSearcherImpl.java +++ b/kaldb/src/test/java/com/slack/kaldb/logstore/search/IllegalArgumentLogIndexSearcherImpl.java @@ -8,8 +8,8 @@ public class IllegalArgumentLogIndexSearcherImpl implements LogIndexSearcher search( String dataset, String query, - long minTime, - long maxTime, + Long minTime, + Long maxTime, int howMany, AggBuilder aggBuilder) { throw new IllegalArgumentException("Failed to acquire an index searcher"); diff --git a/kaldb/src/test/java/com/slack/kaldb/logstore/search/LogIndexSearcherImplTest.java b/kaldb/src/test/java/com/slack/kaldb/logstore/search/LogIndexSearcherImplTest.java index dc0c3ac625..be9a33dc87 100644 --- a/kaldb/src/test/java/com/slack/kaldb/logstore/search/LogIndexSearcherImplTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/logstore/search/LogIndexSearcherImplTest.java @@ -649,7 +649,7 @@ public void testFullIndexSearch() { strictLogStore.logSearcher.search( TEST_DATASET_NAME, "", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -677,7 +677,7 @@ public void testAggregationWithScripting() { strictLogStore.logSearcher.search( TEST_DATASET_NAME, "", - 0, + 0L, MAX_TIME, 1000, new AvgAggBuilder("1", TEST_SOURCE_LONG_PROPERTY, 0, null)); @@ -687,7 +687,7 @@ public void testAggregationWithScripting() { strictLogStore.logSearcher.search( TEST_DATASET_NAME, "", - 0, + 0L, MAX_TIME, 1000, new AvgAggBuilder("1", TEST_SOURCE_LONG_PROPERTY, 0, "")); @@ -697,7 +697,7 @@ public void testAggregationWithScripting() { strictLogStore.logSearcher.search( TEST_DATASET_NAME, "", - 0, + 0L, MAX_TIME, 1000, new AvgAggBuilder("1", TEST_SOURCE_LONG_PROPERTY, 0, "return 9;")); @@ -713,7 +713,7 @@ public void testFilterAggregations() { strictLogStore.logSearcher.search( TEST_DATASET_NAME, "", - 0, + 0L, MAX_TIME, 1000, new FiltersAggBuilder( @@ -758,7 +758,7 @@ public void testFullIndexSearchForMinAgg() { strictLogStore.logSearcher.search( TEST_DATASET_NAME, "", - 0, + 0L, MAX_TIME, 1000, new MinAggBuilder( @@ -783,7 +783,7 @@ public void testFullIndexSearchForMaxAgg() { strictLogStore.logSearcher.search( TEST_DATASET_NAME, "", - 0, + 0L, MAX_TIME, 1000, new MaxAggBuilder( @@ -809,7 +809,7 @@ public void testFullIndexSearchForSumAgg() { strictLogStore.logSearcher.search( TEST_DATASET_NAME, "", - 0, + 0L, MAX_TIME, 1000, new SumAggBuilder("test", TEST_SOURCE_LONG_PROPERTY, "0", null)); @@ -832,7 +832,7 @@ public void testFullIndexSearchForExtendedStatsAgg() { strictLogStore.logSearcher.search( TEST_DATASET_NAME, "", - 0, + 0L, MAX_TIME, 1000, new ExtendedStatsAggBuilder("test", TEST_SOURCE_LONG_PROPERTY, "0", null, null)); @@ -862,7 +862,7 @@ public void testTermsAggregation() { strictLogStore.logSearcher.search( TEST_DATASET_NAME, "", - 0, + 0L, MAX_TIME, 1000, new TermsAggBuilder( @@ -969,7 +969,7 @@ public void testTermsAggregationMissingValues() { strictLogStore.logSearcher.search( TEST_DATASET_NAME, "", - 0, + 0L, MAX_TIME, 1000, new TermsAggBuilder( @@ -998,7 +998,7 @@ public void testFullTextSearch() { .search( TEST_DATASET_NAME, "_all:baby", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1012,7 +1012,7 @@ public void testFullTextSearch() { .search( TEST_DATASET_NAME, "_all:1234", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1027,7 +1027,7 @@ public void testFullTextSearch() { .search( TEST_DATASET_NAME, "baby", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1041,7 +1041,7 @@ public void testFullTextSearch() { .search( TEST_DATASET_NAME, "1234", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1063,7 +1063,7 @@ public void testFullTextSearch() { .search( TEST_DATASET_NAME, "_all:baby", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1077,7 +1077,7 @@ public void testFullTextSearch() { .search( TEST_DATASET_NAME, "_all:1234", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1092,7 +1092,7 @@ public void testFullTextSearch() { .search( TEST_DATASET_NAME, "baby", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1106,7 +1106,7 @@ public void testFullTextSearch() { .search( TEST_DATASET_NAME, "1234", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1128,7 +1128,7 @@ public void testFullTextSearch() { .search( TEST_DATASET_NAME, "_all:baby", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1142,7 +1142,7 @@ public void testFullTextSearch() { .search( TEST_DATASET_NAME, "_all:1234", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1157,7 +1157,7 @@ public void testFullTextSearch() { .search( TEST_DATASET_NAME, "baby", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1171,7 +1171,7 @@ public void testFullTextSearch() { .search( TEST_DATASET_NAME, "1234", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1187,7 +1187,7 @@ public void testFullTextSearch() { .search( TEST_DATASET_NAME, "", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1202,7 +1202,7 @@ public void testFullTextSearch() { .search( TEST_DATASET_NAME, "app*", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1218,7 +1218,7 @@ public void testFullTextSearch() { .search( TEST_DATASET_NAME, "baby car", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1234,7 +1234,7 @@ public void testFullTextSearch() { .search( TEST_DATASET_NAME, "apple 1234", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1249,7 +1249,7 @@ public void testFullTextSearch() { .search( TEST_DATASET_NAME, "123", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1285,7 +1285,7 @@ public void testDisabledFullTextSearch() { .search( TEST_DATASET_NAME, "_all:baby", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1300,7 +1300,7 @@ public void testDisabledFullTextSearch() { .search( TEST_DATASET_NAME, "_all:1234", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1316,7 +1316,7 @@ public void testDisabledFullTextSearch() { .search( TEST_DATASET_NAME, "baby", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1331,7 +1331,7 @@ public void testDisabledFullTextSearch() { .search( TEST_DATASET_NAME, "1234", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1347,7 +1347,7 @@ public void testDisabledFullTextSearch() { .search( TEST_DATASET_NAME, "", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1362,7 +1362,7 @@ public void testDisabledFullTextSearch() { .search( TEST_DATASET_NAME, "app*", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1378,7 +1378,7 @@ public void testDisabledFullTextSearch() { .search( TEST_DATASET_NAME, "baby car", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1394,7 +1394,7 @@ public void testDisabledFullTextSearch() { .search( TEST_DATASET_NAME, "apple 1234", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1409,7 +1409,7 @@ public void testDisabledFullTextSearch() { .search( TEST_DATASET_NAME, "123", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1430,7 +1430,7 @@ public void testNullSearchString() { strictLogStore.logSearcher.search( TEST_DATASET_NAME + "miss", null, - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1447,7 +1447,7 @@ public void testMissingIndexSearch() { strictLogStore.logSearcher.search( TEST_DATASET_NAME + "miss", "apple", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1475,7 +1475,7 @@ public void testNoResultQuery() { strictLogStore.logSearcher.search( TEST_DATASET_NAME, "elephant", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1544,7 +1544,7 @@ public void testEmptyIndexName() { strictLogStore.logSearcher.search( "", "test", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1561,7 +1561,7 @@ public void testNullIndexName() { strictLogStore.logSearcher.search( null, "test", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1595,7 +1595,7 @@ public void testInvalidEndTime() { strictLogStore.logSearcher.search( TEST_DATASET_NAME, "test", - 0, + 0L, -1L, 1000, new DateHistogramAggBuilder( @@ -1704,7 +1704,7 @@ public void testConcurrentSearches() throws InterruptedException { strictLogStore.logSearcher.search( TEST_DATASET_NAME, "baby", - 0, + 0L, MAX_TIME, 100, new DateHistogramAggBuilder( diff --git a/kaldb/src/test/java/com/slack/kaldb/testlib/TemporaryLogStoreAndSearcherExtension.java b/kaldb/src/test/java/com/slack/kaldb/testlib/TemporaryLogStoreAndSearcherExtension.java index 1245a265ae..ff24b10841 100644 --- a/kaldb/src/test/java/com/slack/kaldb/testlib/TemporaryLogStoreAndSearcherExtension.java +++ b/kaldb/src/test/java/com/slack/kaldb/testlib/TemporaryLogStoreAndSearcherExtension.java @@ -47,7 +47,7 @@ public static List findAllMessages( searcher.search( dataset, query, - 0, + 0L, MAX_TIME, howMany, new DateHistogramAggBuilder( From c44937dcfc12346301c1aadb39dff381f37e0db7 Mon Sep 17 00:00:00 2001 From: Bryan Burkholder <771133+bryanlb@users.noreply.github.com> Date: Thu, 26 Oct 2023 11:07:22 -0700 Subject: [PATCH 4/5] Fix replica assignment over-assigning (#707) Co-authored-by: Bryan Burkholder --- .../ReplicaAssignmentService.java | 11 ++- .../ReplicaAssignmentServiceTest.java | 81 +++++++++++++++++++ 2 files changed, 89 insertions(+), 3 deletions(-) diff --git a/kaldb/src/main/java/com/slack/kaldb/clusterManager/ReplicaAssignmentService.java b/kaldb/src/main/java/com/slack/kaldb/clusterManager/ReplicaAssignmentService.java index 4361de9acc..88f947d64f 100644 --- a/kaldb/src/main/java/com/slack/kaldb/clusterManager/ReplicaAssignmentService.java +++ b/kaldb/src/main/java/com/slack/kaldb/clusterManager/ReplicaAssignmentService.java @@ -187,20 +187,25 @@ protected Map assignReplicasToCacheSlots() { .stream() .flatMap( (cacheSlotsPerHost) -> { - int currentlyAssigned = + int currentlyAssignedOrLoading = cacheSlotsPerHost.stream() .filter( cacheSlotMetadata -> cacheSlotMetadata.cacheSlotState.equals( - Metadata.CacheSlotMetadata.CacheSlotState.ASSIGNED)) + Metadata.CacheSlotMetadata.CacheSlotState.ASSIGNED) + || cacheSlotMetadata.cacheSlotState.equals( + Metadata.CacheSlotMetadata.CacheSlotState.LOADING)) .toList() .size(); + return cacheSlotsPerHost.stream() .filter( cacheSlotMetadata -> cacheSlotMetadata.cacheSlotState.equals( Metadata.CacheSlotMetadata.CacheSlotState.FREE)) - .limit(Math.max(0, maxConcurrentAssignmentsPerNode - currentlyAssigned)); + .limit( + Math.max( + 0, maxConcurrentAssignmentsPerNode - currentlyAssignedOrLoading)); }) .collect(Collectors.toList()); diff --git a/kaldb/src/test/java/com/slack/kaldb/clusterManager/ReplicaAssignmentServiceTest.java b/kaldb/src/test/java/com/slack/kaldb/clusterManager/ReplicaAssignmentServiceTest.java index 76c562f023..d61dfe86d4 100644 --- a/kaldb/src/test/java/com/slack/kaldb/clusterManager/ReplicaAssignmentServiceTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/clusterManager/ReplicaAssignmentServiceTest.java @@ -1371,6 +1371,87 @@ public void assignmentPreservesSupportedIndexTypes() throws Exception { assertThat(assignedCacheSlot.size()).isEqualTo(1); } + @Test + public void shouldNotAssignIfAlreadyLoading() throws Exception { + MeterRegistry concurrentAssignmentsRegistry = new SimpleMeterRegistry(); + + KaldbConfigs.ManagerConfig.ReplicaAssignmentServiceConfig replicaAssignmentServiceConfig = + KaldbConfigs.ManagerConfig.ReplicaAssignmentServiceConfig.newBuilder() + .setSchedulePeriodMins(1) + .addAllReplicaSets(List.of(REPLICA_SET)) + .setMaxConcurrentPerNode(1) + .build(); + KaldbConfigs.ManagerConfig managerConfig = + KaldbConfigs.ManagerConfig.newBuilder() + .setEventAggregationSecs(2) + .setScheduleInitialDelayMins(1) + .setReplicaAssignmentServiceConfig(replicaAssignmentServiceConfig) + .build(); + + ReplicaAssignmentService replicaAssignmentService = + new ReplicaAssignmentService( + cacheSlotMetadataStore, + replicaMetadataStore, + managerConfig, + concurrentAssignmentsRegistry); + + Instant now = Instant.now(); + ReplicaMetadata expectedUnassignedMetadata = + new ReplicaMetadata( + UUID.randomUUID().toString(), + UUID.randomUUID().toString(), + REPLICA_SET, + now.minus(1, ChronoUnit.HOURS).toEpochMilli(), + now.plusSeconds(60).toEpochMilli(), + false, + LOGS_LUCENE9); + replicaMetadataStore.createAsync(expectedUnassignedMetadata); + + ReplicaMetadata loadingMetadata = + new ReplicaMetadata( + UUID.randomUUID().toString(), + UUID.randomUUID().toString(), + REPLICA_SET, + now.minus(1, ChronoUnit.HOURS).toEpochMilli(), + now.plusSeconds(60).toEpochMilli(), + false, + LOGS_LUCENE9); + replicaMetadataStore.createAsync(loadingMetadata); + + await().until(() -> replicaMetadataStore.listSync().size() == 2); + + CacheSlotMetadata cacheSlotMetadata = + new CacheSlotMetadata( + UUID.randomUUID().toString(), + Metadata.CacheSlotMetadata.CacheSlotState.LOADING, + loadingMetadata.snapshotId, + Instant.now().toEpochMilli(), + List.of(LOGS_LUCENE9, LOGS_LUCENE9), + HOSTNAME, + REPLICA_SET); + cacheSlotMetadataStore.createAsync(cacheSlotMetadata); + + CacheSlotMetadata freeCacheSlot = + new CacheSlotMetadata( + UUID.randomUUID().toString(), + Metadata.CacheSlotMetadata.CacheSlotState.FREE, + "", + Instant.now().toEpochMilli(), + List.of(LOGS_LUCENE9, LOGS_LUCENE9), + HOSTNAME, + REPLICA_SET); + cacheSlotMetadataStore.createAsync(freeCacheSlot); + + await().until(() -> cacheSlotMetadataStore.listSync().size() == 2); + + replicaAssignmentService.startAsync(); + replicaAssignmentService.awaitRunning(DEFAULT_START_STOP_DURATION); + + // immediately force a run + Map assignments = replicaAssignmentService.assignReplicasToCacheSlots(); + assertThat(assignments.get(REPLICA_SET)).isEqualTo(0); + } + @Test public void shouldPreventConcurrentAssignmentsExceedingLimit() throws Exception { MeterRegistry concurrentAssignmentsRegistry = new SimpleMeterRegistry(); From 014da40f334feadcb8cf2d5ec9144bcb465d389a Mon Sep 17 00:00:00 2001 From: Bryan Burkholder <771133+bryanlb@users.noreply.github.com> Date: Thu, 26 Oct 2023 14:30:12 -0700 Subject: [PATCH 5/5] Switch executors to use virtual threads where possible (#704) Co-authored-by: Bryan Burkholder --- .gitignore | 1 + kaldb/pom.xml | 4 +- .../slack/kaldb/chunk/ReadOnlyChunkImpl.java | 18 +- .../kaldb/chunkManager/ChunkManagerBase.java | 189 +++++++++--------- .../ElasticsearchApiService.java | 50 ++--- .../kaldb/logstore/LuceneIndexStoreImpl.java | 23 ++- .../search/KaldbDistributedQueryService.java | 109 +++++----- .../kaldb/logstore/search/SearchResult.java | 7 + .../search/SearchResultAggregatorImpl.java | 8 + .../metadata/core/KaldbMetadataStore.java | 2 +- .../preprocessor/PreprocessorService.java | 10 +- .../ElasticsearchApiServiceTest.java | 14 -- 12 files changed, 226 insertions(+), 209 deletions(-) diff --git a/.gitignore b/.gitignore index bd83586f6b..275d58d139 100644 --- a/.gitignore +++ b/.gitignore @@ -20,6 +20,7 @@ benchmarks/jmh-output/ # Test files indices/ +**kaldb-slot-log** # go vendor files. vendor/ diff --git a/kaldb/pom.xml b/kaldb/pom.xml index e75bb79032..798f81ba59 100644 --- a/kaldb/pom.xml +++ b/kaldb/pom.xml @@ -562,7 +562,8 @@ true -XDcompilePolicy=simple - -Xplugin:ErrorProne -XepDisableWarningsInGeneratedCode -XepExcludedPaths:.*/protobuf/.* -Xep:WildcardImport:ERROR -Xep:AssertEqualsArgumentOrderChecker:ERROR + --enable-preview + -Xplugin:ErrorProne -XepDisableWarningsInGeneratedCode -XepExcludedPaths:.*/protobuf/.* -Xep:WildcardImport:ERROR -Xep:AssertEqualsArgumentOrderChecker:ERROR -J--add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED -J--add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED -J--add-exports=jdk.compiler/com.sun.tools.javac.main=ALL-UNNAMED @@ -730,6 +731,7 @@ maven-surefire-plugin 3.1.0 + --enable-preview false src/test/resources/log4j2.xml diff --git a/kaldb/src/main/java/com/slack/kaldb/chunk/ReadOnlyChunkImpl.java b/kaldb/src/main/java/com/slack/kaldb/chunk/ReadOnlyChunkImpl.java index e59d0645cf..b690fff740 100644 --- a/kaldb/src/main/java/com/slack/kaldb/chunk/ReadOnlyChunkImpl.java +++ b/kaldb/src/main/java/com/slack/kaldb/chunk/ReadOnlyChunkImpl.java @@ -31,6 +31,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import org.apache.commons.io.FileUtils; import org.apache.curator.x.async.AsyncCuratorFramework; @@ -79,6 +80,8 @@ public class ReadOnlyChunkImpl implements Chunk { private final KaldbMetadataStoreChangeListener cacheSlotListener = this::cacheNodeListener; + private final ReentrantLock chunkAssignmentLock = new ReentrantLock(); + public ReadOnlyChunkImpl( AsyncCuratorFramework curatorFramework, MeterRegistry meterRegistry, @@ -187,12 +190,12 @@ private void unregisterSearchMetadata() } } - // We synchronize access when manipulating the chunk, as the close() can + // We lock access when manipulating the chunk, as the close() can // run concurrently with an assignment - private synchronized void handleChunkAssignment(CacheSlotMetadata cacheSlotMetadata) { + private void handleChunkAssignment(CacheSlotMetadata cacheSlotMetadata) { Timer.Sample assignmentTimer = Timer.start(meterRegistry); + chunkAssignmentLock.lock(); try { - /// if (!setChunkMetadataState( cacheSlotMetadata, Metadata.CacheSlotMetadata.CacheSlotState.LOADING)) { throw new InterruptedException("Failed to set chunk metadata state to loading"); @@ -249,6 +252,8 @@ private synchronized void handleChunkAssignment(CacheSlotMetadata cacheSlotMetad setChunkMetadataState(cacheSlotMetadata, Metadata.CacheSlotMetadata.CacheSlotState.FREE); LOG.error("Error handling chunk assignment", e); assignmentTimer.stop(chunkAssignmentTimerFailure); + } finally { + chunkAssignmentLock.unlock(); } } @@ -265,10 +270,11 @@ private SnapshotMetadata getSnapshotMetadata(String replicaId) .get(TIMEOUT_MS, TimeUnit.MILLISECONDS); } - // We synchronize access when manipulating the chunk, as the close() + // We lock access when manipulating the chunk, as the close() // can run concurrently with an eviction - private synchronized void handleChunkEviction(CacheSlotMetadata cacheSlotMetadata) { + private void handleChunkEviction(CacheSlotMetadata cacheSlotMetadata) { Timer.Sample evictionTimer = Timer.start(meterRegistry); + chunkAssignmentLock.lock(); try { if (!setChunkMetadataState( cacheSlotMetadata, Metadata.CacheSlotMetadata.CacheSlotState.EVICTING)) { @@ -297,6 +303,8 @@ private synchronized void handleChunkEviction(CacheSlotMetadata cacheSlotMetadat // re-assignment or queries hitting this slot LOG.error("Error handling chunk eviction", e); evictionTimer.stop(chunkEvictionTimerFailure); + } finally { + chunkAssignmentLock.unlock(); } } diff --git a/kaldb/src/main/java/com/slack/kaldb/chunkManager/ChunkManagerBase.java b/kaldb/src/main/java/com/slack/kaldb/chunkManager/ChunkManagerBase.java index c4cc793480..1a3e9906a4 100644 --- a/kaldb/src/main/java/com/slack/kaldb/chunkManager/ChunkManagerBase.java +++ b/kaldb/src/main/java/com/slack/kaldb/chunkManager/ChunkManagerBase.java @@ -1,14 +1,10 @@ package com.slack.kaldb.chunkManager; +import brave.ScopedSpan; import brave.Tracing; import brave.propagation.CurrentTraceContext; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.AbstractIdleService; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.slack.kaldb.chunk.Chunk; import com.slack.kaldb.logstore.search.SearchQuery; import com.slack.kaldb.logstore.search.SearchResult; @@ -16,6 +12,7 @@ import com.slack.kaldb.logstore.search.SearchResultAggregatorImpl; import com.slack.kaldb.metadata.schema.FieldType; import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -23,10 +20,9 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.Semaphore; +import java.util.concurrent.StructuredTaskScope; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,36 +39,17 @@ public abstract class ChunkManagerBase extends AbstractIdleService implements // we use a CopyOnWriteArrayList as we expect to have very few edits to this list compared // to the amount of reads, and it must be a threadsafe implementation protected final List> chunkList = new CopyOnWriteArrayList<>(); - - private static final ListeningExecutorService queryExecutorService = queryThreadPool(); - - private static final ScheduledExecutorService queryCancellationService = - Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder() - .setNameFormat("chunk-manager-query-cancellation-%d") - .setUncaughtExceptionHandler( - (t, e) -> LOG.error("Exception on thread {}: {}", t.getName(), e)) - .build()); - - /* - * We want to provision the chunk query capacity such that we can almost saturate the CPU. In the event we allow - * these to saturate the CPU it can result in the container being killed due to failed healthchecks. - * - * Revisit the thread pool settings if this becomes a perf issue. Also, we may need - * different thread pools for indexer and cache nodes in the future. - */ - private static ListeningExecutorService queryThreadPool() { - // todo - consider making the thread count a config option; this would allow for more - // fine-grained tuning, but we might not need to expose this to the user if we can set sensible - // defaults - return MoreExecutors.listeningDecorator( - Executors.newFixedThreadPool( - Math.max(1, Runtime.getRuntime().availableProcessors() - 2), - new ThreadFactoryBuilder() - .setNameFormat("chunk-manager-query-%d") - .setUncaughtExceptionHandler( - (t, e) -> LOG.error("Exception on thread {}: {}", t.getName(), e)) - .build())); + private final Semaphore concurrentQueries; + + public ChunkManagerBase() { + // todo - move this to a config value if we end up needing this param + int semaphoreCount = + Integer.parseInt( + System.getProperty( + "kaldb.concurrent.query", + String.valueOf(Runtime.getRuntime().availableProcessors() - 1))); + LOG.info("Using kaldb.concurrent.query - {}", semaphoreCount); + concurrentQueries = new Semaphore(semaphoreCount, true); } /* @@ -107,72 +84,88 @@ public SearchResult query(SearchQuery query, Duration queryTimeout) { // a single IndexSearcher. Collections.shuffle(chunksMatchingQuery); - List>> queries = - chunksMatchingQuery.stream() - .map( - (chunk) -> - queryExecutorService.submit( - currentTraceContext.wrap( - () -> { - try { - if (Thread.interrupted()) { - LOG.warn( - "Chunk query thread timed out without starting work, returning error result."); - return errorResult; - } - return chunk.query(query); - } catch (Exception err) { - // Only log the exception message as warn, and not the entire trace - // as this can cause performance issues if significant amounts of - // invalid queries are received - LOG.warn("Chunk Query Exception: {}", err.getMessage()); - LOG.debug("Chunk Query Exception", err); - // We catch IllegalArgumentException ( and any other exception that - // represents a parse failure ) and instead of returning an empty - // result we throw back an error to the user - if (err instanceof IllegalArgumentException) { - throw err; - } - return errorResult; - } - }))) - .peek( - (future) -> - queryCancellationService.schedule( - () -> future.cancel(true), queryTimeout.toMillis(), TimeUnit.MILLISECONDS)) - .collect(Collectors.toList()); - - Future>> searchResultFuture = Futures.successfulAsList(queries); try { - List> searchResults = - searchResultFuture.get(queryTimeout.toMillis(), TimeUnit.MILLISECONDS); - - // check if all results are null, and if so return an error to the user - if (searchResults.size() > 0 && searchResults.stream().allMatch(Objects::isNull)) { + try (var scope = new StructuredTaskScope>()) { + List>> chunkSubtasks = + chunksMatchingQuery.stream() + .map( + (chunk) -> + scope.fork( + currentTraceContext.wrap( + () -> { + ScopedSpan span = + Tracing.currentTracer() + .startScopedSpan("ChunkManagerBase.chunkQuery"); + span.tag("chunkId", chunk.id()); + span.tag("chunkSnapshotPath", chunk.info().getSnapshotPath()); + concurrentQueries.acquire(); + try { + return chunk.query(query); + } finally { + concurrentQueries.release(); + span.finish(); + } + }))) + .toList(); try { - Futures.allAsList(queries).get(0, TimeUnit.SECONDS); - } catch (Exception e) { - throw new IllegalArgumentException(e); + scope.joinUntil(Instant.now().plusSeconds(queryTimeout.toSeconds())); + } catch (TimeoutException timeoutException) { + scope.shutdown(); + scope.join(); } - // not expected to happen - we should be guaranteed that the list has at least one failed - // future, which should throw when we try to get on allAsList - throw new IllegalArgumentException( - "Chunk query error - all results returned null values with no exceptions thrown"); - } - //noinspection unchecked - SearchResult aggregatedResults = - ((SearchResultAggregator) new SearchResultAggregatorImpl<>(query)) - .aggregate(searchResults, false); - return incrementNodeCount(aggregatedResults); + List> searchResults = + chunkSubtasks.stream() + .map( + searchResultSubtask -> { + try { + if (searchResultSubtask + .state() + .equals(StructuredTaskScope.Subtask.State.SUCCESS)) { + return searchResultSubtask.get(); + } else if (searchResultSubtask + .state() + .equals(StructuredTaskScope.Subtask.State.FAILED)) { + Throwable throwable = searchResultSubtask.exception(); + if (throwable instanceof IllegalArgumentException) { + // We catch IllegalArgumentException ( and any other exception that + // represents a parse failure ) and instead of returning an empty + // result we throw back an error to the user + throw new IllegalArgumentException(throwable); + } + LOG.warn("Chunk Query Exception: {}", throwable.getMessage()); + } + // else UNAVAILABLE (ie, timedout) + return errorResult; + } catch (Exception err) { + if (err instanceof IllegalArgumentException) { + throw err; + } + + // Only log the exception message as warn, and not the entire trace + // as this can cause performance issues if significant amounts of + // invalid queries are received + LOG.warn("Chunk Query Exception: {}", err.getMessage()); + return errorResult; + } + }) + .toList(); + + // check if all results are null, and if so return an error to the user + if (!searchResults.isEmpty() && searchResults.stream().allMatch(Objects::isNull)) { + throw new IllegalArgumentException( + "Chunk query error - all results returned null values"); + } + + //noinspection unchecked + SearchResult aggregatedResults = + ((SearchResultAggregator) new SearchResultAggregatorImpl<>(query)) + .aggregate(searchResults, false); + return incrementNodeCount(aggregatedResults); + } } catch (Exception e) { LOG.error("Error searching across chunks ", e); throw new RuntimeException(e); - } finally { - // always request future cancellation. This won't interrupt I/O or downstream futures, - // but is good practice. Since this is backed by a CompletableFuture - // mayInterruptIfRunning has no effect - searchResultFuture.cancel(true); } } diff --git a/kaldb/src/main/java/com/slack/kaldb/elasticsearchApi/ElasticsearchApiService.java b/kaldb/src/main/java/com/slack/kaldb/elasticsearchApi/ElasticsearchApiService.java index 747084a5a7..514481dcbd 100644 --- a/kaldb/src/main/java/com/slack/kaldb/elasticsearchApi/ElasticsearchApiService.java +++ b/kaldb/src/main/java/com/slack/kaldb/elasticsearchApi/ElasticsearchApiService.java @@ -2,13 +2,11 @@ import brave.ScopedSpan; import brave.Tracing; +import brave.propagation.CurrentTraceContext; import brave.propagation.TraceContext; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.protobuf.ByteString; import com.linecorp.armeria.common.HttpResponse; import com.linecorp.armeria.common.HttpStatus; @@ -38,9 +36,7 @@ import java.util.Map; import java.util.Optional; import java.util.TreeMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.stream.Collectors; +import java.util.concurrent.StructuredTaskScope; import org.opensearch.search.aggregations.InternalAggregation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,17 +54,6 @@ public class ElasticsearchApiService { private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchApiService.class); private final KaldbQueryServiceBase searcher; - // This uses a separate cached threadpool for multisearch queries so that we can run these in - // parallel. A cached threadpool was chosen over something like forkjoin, as it's easier to - // propagate the trace instrumentation, and has better visibility using a custom threadfactory. - private final ExecutorService multisearchExecutor = - Executors.newCachedThreadPool( - new ThreadFactoryBuilder() - .setUncaughtExceptionHandler( - (t, e) -> LOG.error("Exception on thread {}: {}", t.getName(), e)) - .setNameFormat("elasticsearch-multisearch-api-%d") - .build()); - private final OpenSearchRequest openSearchRequest = new OpenSearchRequest(); private final ObjectMapper objectMapper = new ObjectMapper(); @@ -89,21 +74,22 @@ public ElasticsearchApiService(KaldbQueryServiceBase searcher) { public HttpResponse multiSearch(String postBody) throws Exception { LOG.debug("Search request: {}", postBody); - List requests = openSearchRequest.parseHttpPostBody(postBody); - List> responseFutures = - requests.stream() - .map( - (request) -> - Futures.submit( - () -> this.doSearch(request), - Tracing.current().currentTraceContext().executor(multisearchExecutor))) - .collect(Collectors.toList()); - - SearchResponseMetadata responseMetadata = - new SearchResponseMetadata( - 0, Futures.allAsList(responseFutures).get(), Map.of("traceId", getTraceId())); - return HttpResponse.of( - HttpStatus.OK, MediaType.JSON_UTF_8, JsonUtil.writeAsString(responseMetadata)); + CurrentTraceContext currentTraceContext = Tracing.current().currentTraceContext(); + try (var scope = new StructuredTaskScope()) { + List> requestSubtasks = + openSearchRequest.parseHttpPostBody(postBody).stream() + .map((request) -> scope.fork(currentTraceContext.wrap(() -> doSearch(request)))) + .toList(); + + scope.join(); + SearchResponseMetadata responseMetadata = + new SearchResponseMetadata( + 0, + requestSubtasks.stream().map(StructuredTaskScope.Subtask::get).toList(), + Map.of("traceId", getTraceId())); + return HttpResponse.of( + HttpStatus.OK, MediaType.JSON_UTF_8, JsonUtil.writeAsString(responseMetadata)); + } } private EsSearchResponse doSearch(KaldbSearch.SearchRequest searchRequest) { diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreImpl.java b/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreImpl.java index b1cefa26de..a5d4cea29a 100644 --- a/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreImpl.java +++ b/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreImpl.java @@ -15,6 +15,7 @@ import java.util.TimerTask; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.io.FileUtils; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.standard.StandardAnalyzer; @@ -70,6 +71,8 @@ public class LuceneIndexStoreImpl implements LogStore { // If we ever revisit this - the value was picked thinking it's a good "default" private final Integer CFS_FILES_SIZE_MB_CUTOFF = 128; + private final ReentrantLock indexWriterLock = new ReentrantLock(); + // TODO: Set the policy via a lucene config file. public static LuceneIndexStoreImpl makeLogStore( File dataDirectory, KaldbConfigs.LuceneConfig luceneConfig, MeterRegistry metricsRegistry) @@ -208,26 +211,35 @@ private IndexWriterConfig buildIndexWriterConfig( // TODO: IOException can be logged and recovered from?. private void syncCommit() throws IOException { - synchronized (this) { + indexWriterLock.lock(); + try { if (indexWriter.isPresent()) { indexWriter.get().commit(); } + } finally { + indexWriterLock.unlock(); } } private void syncRefresh() throws IOException { - synchronized (this) { + indexWriterLock.lock(); + try { if (indexWriter.isPresent()) { searcherManager.maybeRefresh(); } + } finally { + indexWriterLock.unlock(); } } private void syncFinalMerge() throws IOException { - synchronized (this) { + indexWriterLock.lock(); + try { if (indexWriter.isPresent()) { indexWriter.get().forceMerge(1); } + } finally { + indexWriterLock.unlock(); } } @@ -353,7 +365,8 @@ public void releaseIndexCommit(IndexCommit indexCommit) { */ @Override public void close() { - synchronized (this) { + indexWriterLock.lock(); + try { if (indexWriter.isEmpty()) { // Closable.close() requires this be idempotent, so silently exit instead of throwing an // exception @@ -367,6 +380,8 @@ public void close() { LOG.error("Error closing index " + id, e); } indexWriter = Optional.empty(); + } finally { + indexWriterLock.unlock(); } } diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/search/KaldbDistributedQueryService.java b/kaldb/src/main/java/com/slack/kaldb/logstore/search/KaldbDistributedQueryService.java index 27ac96c860..7b12cbd06a 100644 --- a/kaldb/src/main/java/com/slack/kaldb/logstore/search/KaldbDistributedQueryService.java +++ b/kaldb/src/main/java/com/slack/kaldb/logstore/search/KaldbDistributedQueryService.java @@ -5,10 +5,10 @@ import brave.ScopedSpan; import brave.Tracing; import brave.grpc.GrpcTracing; +import brave.propagation.CurrentTraceContext; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.MoreExecutors; import com.linecorp.armeria.client.grpc.GrpcClients; import com.slack.kaldb.logstore.LogMessage; import com.slack.kaldb.metadata.core.KaldbMetadataStoreChangeListener; @@ -25,6 +25,7 @@ import io.micrometer.core.instrument.MeterRegistry; import java.io.Closeable; import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -33,14 +34,13 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.StructuredTaskScope; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Function; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -369,64 +369,69 @@ private List> distributedSearch( getNodesAndSnapshotsToQuery(searchMetadataNodesMatchingQuery); span.tag("queryServerCount", String.valueOf(nodesAndSnapshotsToQuery.size())); - List>> queryServers = new ArrayList<>(stubs.size()); - for (Map.Entry> searchNode : nodesAndSnapshotsToQuery.entrySet()) { - KaldbServiceGrpc.KaldbServiceFutureStub stub = getStub(searchNode.getKey()); - if (stub == null) { - // TODO: insert a failed result in the results object that we return from this method - // mimicing - continue; - } - - KaldbSearch.SearchRequest localSearchReq = - distribSearchReq.toBuilder().addAllChunkIds(searchNode.getValue()).build(); - - // make sure all underlying futures finish executing (successful/cancelled/failed/other) - // and cannot be pending when the successfulAsList.get(SAME_TIMEOUT_MS) runs - ListenableFuture searchRequest = - stub.withDeadlineAfter(defaultQueryTimeout.toMillis(), TimeUnit.MILLISECONDS) - .withInterceptors( - GrpcTracing.newBuilder(Tracing.current()).build().newClientInterceptor()) - .search(localSearchReq); - Function> searchRequestTransform = - SearchResultUtils::fromSearchResultProtoOrEmpty; - - queryServers.add( - Futures.transform( - searchRequest, searchRequestTransform::apply, MoreExecutors.directExecutor())); - } - Future>> searchFuture = Futures.successfulAsList(queryServers); + CurrentTraceContext currentTraceContext = Tracing.current().currentTraceContext(); try { - List> searchResults = - searchFuture.get(requestTimeout.toMillis(), TimeUnit.MILLISECONDS); - LOG.debug("searchResults.size={} searchResults={}", searchResults.size(), searchResults); + try (var scope = new StructuredTaskScope>()) { + List>> searchSubtasks = + nodesAndSnapshotsToQuery.entrySet().stream() + .map( + (searchNode) -> + scope.fork( + currentTraceContext.wrap( + () -> { + KaldbServiceGrpc.KaldbServiceFutureStub stub = + getStub(searchNode.getKey()); + + if (stub == null) { + // TODO: insert a failed result in the results object that we + // return from this method + return null; + } + + KaldbSearch.SearchRequest localSearchReq = + distribSearchReq.toBuilder() + .addAllChunkIds(searchNode.getValue()) + .build(); + return SearchResultUtils.fromSearchResultProtoOrEmpty( + stub.withDeadlineAfter( + defaultQueryTimeout.toMillis(), TimeUnit.MILLISECONDS) + .withInterceptors( + GrpcTracing.newBuilder(Tracing.current()) + .build() + .newClientInterceptor()) + .search(localSearchReq) + .get()); + }))) + .toList(); + + try { + scope.joinUntil(Instant.now().plusSeconds(defaultQueryTimeout.toSeconds())); + } catch (TimeoutException timeoutException) { + scope.shutdown(); + scope.join(); + } - List> response = new ArrayList(searchResults.size()); - for (SearchResult searchResult : searchResults) { - response.add(searchResult == null ? SearchResult.empty() : searchResult); + List> response = new ArrayList(searchSubtasks.size()); + for (StructuredTaskScope.Subtask> searchResult : searchSubtasks) { + try { + if (searchResult.state().equals(StructuredTaskScope.Subtask.State.SUCCESS)) { + response.add(searchResult.get() == null ? SearchResult.error() : searchResult.get()); + } else { + response.add(SearchResult.error()); + } + } catch (Exception e) { + LOG.error("Error fetching search result", e); + response.add(SearchResult.error()); + } + } + return response; } - return response; - } catch (TimeoutException e) { - // We provide a deadline to the stub of "defaultQueryTimeout" - if this is sufficiently lower - // than the request timeout, we would expect searchFuture.get(requestTimeout) to never throw - // an exception. This however doesn't necessarily hold true if the query node is CPU - // saturated, and there is not enough cpu time to fail the pending stub queries that have - // exceeded their deadline - causing the searchFuture get to fail with a timeout. - LOG.error( - "Search failed with timeout exception. This is potentially due to CPU saturation of the query node.", - e); - span.error(e); - return List.of(SearchResult.empty()); } catch (Exception e) { LOG.error("Search failed with ", e); span.error(e); return List.of(SearchResult.empty()); } finally { - // always request future cancellation, so that any exceptions or incomplete futures don't - // continue to consume CPU on work that will not be used - searchFuture.cancel(false); - LOG.debug("Finished distributed search for request: {}", distribSearchReq); span.finish(); } } diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/search/SearchResult.java b/kaldb/src/main/java/com/slack/kaldb/logstore/search/SearchResult.java index 3e4824bf16..e7b5d6021c 100644 --- a/kaldb/src/main/java/com/slack/kaldb/logstore/search/SearchResult.java +++ b/kaldb/src/main/java/com/slack/kaldb/logstore/search/SearchResult.java @@ -11,6 +11,9 @@ public class SearchResult { private static final SearchResult EMPTY = new SearchResult<>(Collections.emptyList(), 0, 0, 1, 0, 0, null); + private static final SearchResult ERROR = + new SearchResult<>(Collections.emptyList(), 0, 1, 1, 0, 0, null); + // TODO: Make hits an iterator. // An iterator helps with the early termination of a search and may be efficient in some cases. public final List hits; @@ -108,4 +111,8 @@ public int hashCode() { public static SearchResult empty() { return EMPTY; } + + public static SearchResult error() { + return ERROR; + } } diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/search/SearchResultAggregatorImpl.java b/kaldb/src/main/java/com/slack/kaldb/logstore/search/SearchResultAggregatorImpl.java index 244d161068..20aa3c9225 100644 --- a/kaldb/src/main/java/com/slack/kaldb/logstore/search/SearchResultAggregatorImpl.java +++ b/kaldb/src/main/java/com/slack/kaldb/logstore/search/SearchResultAggregatorImpl.java @@ -1,5 +1,7 @@ package com.slack.kaldb.logstore.search; +import brave.ScopedSpan; +import brave.Tracing; import com.slack.kaldb.logstore.LogMessage; import com.slack.kaldb.logstore.opensearch.KaldbBigArrays; import com.slack.kaldb.logstore.opensearch.OpenSearchAdapter; @@ -26,6 +28,8 @@ public SearchResultAggregatorImpl(SearchQuery searchQuery) { @Override public SearchResult aggregate(List> searchResults, boolean finalAggregation) { + ScopedSpan span = + Tracing.currentTracer().startScopedSpan("SearchResultAggregatorImpl.aggregate"); long tookMicros = 0; int failedNodes = 0; int totalNodes = 0; @@ -93,6 +97,10 @@ public SearchResult aggregate(List> searchResults, boolean fi .limit(searchQuery.howMany) .collect(Collectors.toList()); + span.tag("resultHits", String.valueOf(resultHits.size())); + span.tag("finalAggregation", String.valueOf(finalAggregation)); + span.finish(); + return new SearchResult<>( resultHits, tookMicros, diff --git a/kaldb/src/main/java/com/slack/kaldb/metadata/core/KaldbMetadataStore.java b/kaldb/src/main/java/com/slack/kaldb/metadata/core/KaldbMetadataStore.java index dfd48fe019..d7c0abd3a7 100644 --- a/kaldb/src/main/java/com/slack/kaldb/metadata/core/KaldbMetadataStore.java +++ b/kaldb/src/main/java/com/slack/kaldb/metadata/core/KaldbMetadataStore.java @@ -226,7 +226,7 @@ public void initialized() { } @Override - public synchronized void close() { + public void close() { if (cachedModeledFramework != null) { listenerMap.forEach( ((kaldbMetadataStoreChangeListener, tModeledCacheListener) -> diff --git a/kaldb/src/main/java/com/slack/kaldb/preprocessor/PreprocessorService.java b/kaldb/src/main/java/com/slack/kaldb/preprocessor/PreprocessorService.java index 9690a19278..3241428789 100644 --- a/kaldb/src/main/java/com/slack/kaldb/preprocessor/PreprocessorService.java +++ b/kaldb/src/main/java/com/slack/kaldb/preprocessor/PreprocessorService.java @@ -20,6 +20,7 @@ import java.util.Map; import java.util.Optional; import java.util.Properties; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.Serdes; @@ -72,6 +73,8 @@ public class PreprocessorService extends AbstractService { private final KaldbMetadataStoreChangeListener datasetListener = (datasetMetadata) -> load(); + private final ReentrantLock schemaLoadingLock = new ReentrantLock(); + public PreprocessorService( DatasetMetadataStore datasetMetadataStore, KaldbConfigs.PreprocessorConfig preprocessorConfig, @@ -135,9 +138,10 @@ private void shutDown() { /** * Configures and starts a KafkaStream processor, based off of the cached DatasetMetadataStore. * This method is reentrant, and will restart any existing KafkaStream processors. Access to this - * must be synchronized if using this method as part of a listener. + * must be guarded with ReentrantLock if using this method as part of a listener. */ - public synchronized void load() { + public void load() { + schemaLoadingLock.lock(); try { Timer.Sample loadTimer = Timer.start(meterRegistry); LOG.info("Loading new Kafka stream processor config"); @@ -183,6 +187,8 @@ public synchronized void load() { loadTimer.stop(configReloadTimer); } catch (Exception e) { notifyFailed(e); + } finally { + schemaLoadingLock.unlock(); } } diff --git a/kaldb/src/test/java/com/slack/kaldb/elasticsearchApi/ElasticsearchApiServiceTest.java b/kaldb/src/test/java/com/slack/kaldb/elasticsearchApi/ElasticsearchApiServiceTest.java index c9e393fb15..fb9a0f099e 100644 --- a/kaldb/src/test/java/com/slack/kaldb/elasticsearchApi/ElasticsearchApiServiceTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/elasticsearchApi/ElasticsearchApiServiceTest.java @@ -4,7 +4,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; @@ -34,8 +33,6 @@ import java.time.temporal.ChronoUnit; import java.util.List; import java.util.Optional; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeoutException; import org.assertj.core.data.Offset; import org.junit.jupiter.api.AfterEach; @@ -230,14 +227,6 @@ public void testLargeSetOfQueries() throws Exception { ElasticsearchApiService slowElasticsearchApiService = new ElasticsearchApiService(slowSearcher); HttpResponse response = slowElasticsearchApiService.multiSearch(postBody); - Set threadNames = ConcurrentHashMap.newKeySet(); - doAnswer( - invocationOnMock -> { - threadNames.add(Thread.currentThread().getName()); - return invocationOnMock.callRealMethod(); - }) - .when(slowSearcher) - .doSearch(any()); slowElasticsearchApiService = new ElasticsearchApiService(slowSearcher); response = slowElasticsearchApiService.multiSearch(postBody.repeat(100)); @@ -248,9 +237,6 @@ public void testLargeSetOfQueries() throws Exception { assertThat(aggregatedRes.status().code()).isEqualTo(200); - // assert that more than one thread executed our code - assertThat(threadNames.size()).isGreaterThan(1); - // ensure we have all 100 results assertThat(jsonNode.get("responses").size()).isEqualTo(100); }