From 298515397d0461fd41ef7b643522b2f37310d20b Mon Sep 17 00:00:00 2001 From: Bryan Burkholder <771133+bryanlb@users.noreply.github.com> Date: Wed, 25 Oct 2023 16:45:47 -0700 Subject: [PATCH 1/5] Bump Kafka version (#712) Co-authored-by: Bryan Burkholder --- kaldb/pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kaldb/pom.xml b/kaldb/pom.xml index dfe34ec022..e75bb79032 100644 --- a/kaldb/pom.xml +++ b/kaldb/pom.xml @@ -22,7 +22,7 @@ 1.57.2 1.11.0 1.25.2 - 3.4.0 + 3.5.0 2.15.2 2.15.1 9.5.0 @@ -487,7 +487,7 @@ com.github.charithe kafka-junit - 4.2.4 + 4.2.7 test From df8b36ff203db97ef1bb661ccd1a20b72d1be6ac Mon Sep 17 00:00:00 2001 From: Bryan Burkholder <771133+bryanlb@users.noreply.github.com> Date: Wed, 25 Oct 2023 17:09:45 -0700 Subject: [PATCH 2/5] Fix rounding error in HPA autoscaler (#711) Co-authored-by: Bryan Burkholder --- .../kaldb/clusterManager/ClusterHpaMetricService.java | 8 +++++--- .../kaldb/clusterManager/ClusterHpaMetricServiceTest.java | 7 +++++++ 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/kaldb/src/main/java/com/slack/kaldb/clusterManager/ClusterHpaMetricService.java b/kaldb/src/main/java/com/slack/kaldb/clusterManager/ClusterHpaMetricService.java index c3445a818b..deddcb1e24 100644 --- a/kaldb/src/main/java/com/slack/kaldb/clusterManager/ClusterHpaMetricService.java +++ b/kaldb/src/main/java/com/slack/kaldb/clusterManager/ClusterHpaMetricService.java @@ -1,5 +1,6 @@ package com.slack.kaldb.clusterManager; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.AbstractScheduledService; import com.slack.kaldb.metadata.cache.CacheSlotMetadataStore; import com.slack.kaldb.metadata.hpa.HpaMetricMetadata; @@ -142,7 +143,8 @@ private void publishCacheHpaMetrics() { } } - private static double calculateDemandFactor( + @VisibleForTesting + protected static double calculateDemandFactor( long totalCacheSlotCapacity, long totalReplicaDemand) { if (totalCacheSlotCapacity == 0) { // we have no provisioned capacity, so cannot determine a value @@ -154,8 +156,8 @@ private static double calculateDemandFactor( } // demand factor will be < 1 indicating a scale-down demand, and > 1 indicating a scale-up double rawDemandFactor = (double) (totalReplicaDemand) / (totalCacheSlotCapacity); - // round to 2 decimals - return (double) Math.round(rawDemandFactor * 100) / 100; + // round up to 2 decimals + return Math.ceil(rawDemandFactor * 100) / 100; } /** Updates or inserts an (ephemeral) HPA metric for the cache nodes. This is NOT threadsafe. */ diff --git a/kaldb/src/test/java/com/slack/kaldb/clusterManager/ClusterHpaMetricServiceTest.java b/kaldb/src/test/java/com/slack/kaldb/clusterManager/ClusterHpaMetricServiceTest.java index da659296af..a55cf01294 100644 --- a/kaldb/src/test/java/com/slack/kaldb/clusterManager/ClusterHpaMetricServiceTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/clusterManager/ClusterHpaMetricServiceTest.java @@ -321,4 +321,11 @@ void twoReplicasetScaleup() { // 2 replicas, 0 slots (will log an error and return a default no-op) assertThat(rep2Metadata.getValue()).isEqualTo(1); } + + @Test + void testDemandFactorRounding() { + assertThat(ClusterHpaMetricService.calculateDemandFactor(100, 98)).isEqualTo(0.98); + assertThat(ClusterHpaMetricService.calculateDemandFactor(98, 100)).isEqualTo(1.03); + assertThat(ClusterHpaMetricService.calculateDemandFactor(9999, 10000)).isEqualTo(1.01); + } } From 392bdcc7eb5af843a7f372bdfc6a1426df7003f7 Mon Sep 17 00:00:00 2001 From: Bryan Burkholder <771133+bryanlb@users.noreply.github.com> Date: Wed, 25 Oct 2023 17:20:16 -0700 Subject: [PATCH 3/5] Refactor Zipkin API, fix duration bug (#710) Co-authored-by: Bryan Burkholder --- .../java/com/slack/kaldb/server/Kaldb.java | 1 + .../zipkinApi/ZipkinEndpointResponse.java | 21 + .../{server => zipkinApi}/ZipkinService.java | 88 ++-- .../kaldb/zipkinApi/ZipkinSpanResponse.java | 110 +++++ .../ZipkinServiceSpanConversionTest.java | 69 --- .../slack/kaldb/server/ZipkinServiceTest.java | 441 ------------------ .../ZipkinServiceSpanConversionTest.java | 72 +++ 7 files changed, 238 insertions(+), 564 deletions(-) create mode 100644 kaldb/src/main/java/com/slack/kaldb/zipkinApi/ZipkinEndpointResponse.java rename kaldb/src/main/java/com/slack/kaldb/{server => zipkinApi}/ZipkinService.java (78%) create mode 100644 kaldb/src/main/java/com/slack/kaldb/zipkinApi/ZipkinSpanResponse.java delete mode 100644 kaldb/src/test/java/com/slack/kaldb/server/ZipkinServiceSpanConversionTest.java delete mode 100644 kaldb/src/test/java/com/slack/kaldb/server/ZipkinServiceTest.java create mode 100644 kaldb/src/test/java/com/slack/kaldb/zipkinApi/ZipkinServiceSpanConversionTest.java diff --git a/kaldb/src/main/java/com/slack/kaldb/server/Kaldb.java b/kaldb/src/main/java/com/slack/kaldb/server/Kaldb.java index 23a3d9b920..e867ea750e 100644 --- a/kaldb/src/main/java/com/slack/kaldb/server/Kaldb.java +++ b/kaldb/src/main/java/com/slack/kaldb/server/Kaldb.java @@ -36,6 +36,7 @@ import com.slack.kaldb.proto.metadata.Metadata; import com.slack.kaldb.recovery.RecoveryService; import com.slack.kaldb.util.RuntimeHalterImpl; +import com.slack.kaldb.zipkinApi.ZipkinService; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.binder.jvm.ClassLoaderMetrics; diff --git a/kaldb/src/main/java/com/slack/kaldb/zipkinApi/ZipkinEndpointResponse.java b/kaldb/src/main/java/com/slack/kaldb/zipkinApi/ZipkinEndpointResponse.java new file mode 100644 index 0000000000..71a6f61080 --- /dev/null +++ b/kaldb/src/main/java/com/slack/kaldb/zipkinApi/ZipkinEndpointResponse.java @@ -0,0 +1,21 @@ +package com.slack.kaldb.zipkinApi; + +/** + * Endpoint response object for Zipkin API (local, remote) + * + * @see Zipkin API Spec + */ +@SuppressWarnings("unused") +public class ZipkinEndpointResponse { + private String serviceName; + + public ZipkinEndpointResponse() {} + + public String getServiceName() { + return serviceName; + } + + public void setServiceName(String serviceName) { + this.serviceName = serviceName; + } +} diff --git a/kaldb/src/main/java/com/slack/kaldb/server/ZipkinService.java b/kaldb/src/main/java/com/slack/kaldb/zipkinApi/ZipkinService.java similarity index 78% rename from kaldb/src/main/java/com/slack/kaldb/server/ZipkinService.java rename to kaldb/src/main/java/com/slack/kaldb/zipkinApi/ZipkinService.java index 9958fc4853..2c7ecbfa18 100644 --- a/kaldb/src/main/java/com/slack/kaldb/server/ZipkinService.java +++ b/kaldb/src/main/java/com/slack/kaldb/zipkinApi/ZipkinService.java @@ -1,12 +1,15 @@ -package com.slack.kaldb.server; +package com.slack.kaldb.zipkinApi; import static com.slack.kaldb.metadata.dataset.DatasetPartitionMetadata.MATCH_ALL_DATASET; import brave.Tracing; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.MapperFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.json.JsonMapper; import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.util.JsonFormat; import com.linecorp.armeria.common.HttpResponse; import com.linecorp.armeria.common.HttpStatus; import com.linecorp.armeria.common.MediaType; @@ -18,8 +21,10 @@ import com.slack.kaldb.logstore.LogMessage; import com.slack.kaldb.logstore.LogWireMessage; import com.slack.kaldb.proto.service.KaldbSearch; +import com.slack.kaldb.server.KaldbQueryServiceBase; import com.slack.kaldb.util.JsonUtil; import java.io.IOException; +import java.time.Duration; import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.ArrayList; @@ -27,11 +32,9 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.StringJoiner; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import zipkin2.proto3.Endpoint; -import zipkin2.proto3.Span; @SuppressWarnings("OptionalUsedAsFieldOrParameterType") /** @@ -47,8 +50,8 @@ public class ZipkinService { protected static String convertLogWireMessageToZipkinSpan(List messages) - throws InvalidProtocolBufferException { - List traces = new ArrayList<>(messages.size()); + throws JsonProcessingException { + List traces = new ArrayList<>(messages.size()); for (LogWireMessage message : messages) { if (message.getId() == null) { LOG.warn("Document={} cannot have missing id ", message); @@ -60,7 +63,7 @@ protected static String convertLogWireMessageToZipkinSpan(List m String name = null; String serviceName = null; String timestamp = String.valueOf(message.getTimestamp().toEpochMilli()); - long duration = Long.MIN_VALUE; + long duration = Integer.MIN_VALUE; Map messageTags = new HashMap<>(); for (String k : message.getSource().keySet()) { @@ -74,7 +77,7 @@ protected static String convertLogWireMessageToZipkinSpan(List m } else if (LogMessage.ReservedField.SERVICE_NAME.fieldName.equals(k)) { serviceName = (String) value; } else if (LogMessage.ReservedField.DURATION_MS.fieldName.equals(k)) { - duration = ((Number) value).longValue(); + duration = TimeUnit.MICROSECONDS.convert(Duration.ofMillis(((Number) value).intValue())); } else { messageTags.put(k, String.valueOf(value)); } @@ -92,24 +95,20 @@ protected static String convertLogWireMessageToZipkinSpan(List m continue; } - final long messageConvertedTimestamp = convertToMicroSeconds(message.getTimestamp()); - - final Span span = - makeSpan( - messageTraceId, - Optional.ofNullable(parentId), - message.getId(), - Optional.ofNullable(name), - Optional.ofNullable(serviceName), - messageConvertedTimestamp, - duration, - messageTags); - String spanJson = printer.print(span); - traces.add(spanJson); + final ZipkinSpanResponse span = new ZipkinSpanResponse(message.getId(), messageTraceId); + span.setParentId(parentId); + span.setName(name); + if (serviceName != null) { + ZipkinEndpointResponse remoteEndpoint = new ZipkinEndpointResponse(); + remoteEndpoint.setServiceName(serviceName); + span.setRemoteEndpoint(remoteEndpoint); + } + span.setTimestamp(convertToMicroSeconds(message.getTimestamp())); + span.setDuration(Math.toIntExact(duration)); + span.setTags(messageTags); + traces.add(span); } - StringJoiner outputJsonArray = new StringJoiner(",", "[", "]"); - traces.forEach(outputJsonArray::add); - return outputJsonArray.toString(); + return objectMapper.writeValueAsString(traces); } // returning LogWireMessage instead of LogMessage @@ -127,31 +126,6 @@ private static List searchResultToLogWireMessage( return messages; } - private static Span makeSpan( - String traceId, - Optional parentId, - String id, - Optional name, - Optional serviceName, - long timestamp, - long duration, - Map tags) { - Span.Builder spanBuilder = Span.newBuilder(); - - spanBuilder.setTraceId(ByteString.copyFrom(traceId.getBytes()).toStringUtf8()); - spanBuilder.setId(ByteString.copyFrom(id.getBytes()).toStringUtf8()); - spanBuilder.setTimestamp(timestamp); - spanBuilder.setDuration(duration); - - parentId.ifPresent( - s -> spanBuilder.setParentId(ByteString.copyFrom(s.getBytes()).toStringUtf8())); - name.ifPresent(spanBuilder::setName); - serviceName.ifPresent( - s -> spanBuilder.setRemoteEndpoint(Endpoint.newBuilder().setServiceName(s))); - spanBuilder.putAllTags(tags); - return spanBuilder.build(); - } - @VisibleForTesting protected static long convertToMicroSeconds(Instant instant) { return ChronoUnit.MICROS.between(Instant.EPOCH, instant); @@ -163,8 +137,14 @@ protected static long convertToMicroSeconds(Instant instant) { private static final int MAX_SPANS = 20_000; private final KaldbQueryServiceBase searcher; - private static final JsonFormat.Printer printer = - JsonFormat.printer().includingDefaultValueFields(); + + private static final ObjectMapper objectMapper = + JsonMapper.builder() + // sort alphabetically for easier test asserts + .configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true) + // don't serialize null values or empty maps + .serializationInclusion(JsonInclude.Include.NON_EMPTY) + .build(); public ZipkinService(KaldbQueryServiceBase searcher) { this.searcher = searcher; diff --git a/kaldb/src/main/java/com/slack/kaldb/zipkinApi/ZipkinSpanResponse.java b/kaldb/src/main/java/com/slack/kaldb/zipkinApi/ZipkinSpanResponse.java new file mode 100644 index 0000000000..173f756164 --- /dev/null +++ b/kaldb/src/main/java/com/slack/kaldb/zipkinApi/ZipkinSpanResponse.java @@ -0,0 +1,110 @@ +package com.slack.kaldb.zipkinApi; + +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Map; + +/** + * Span response object for Zipkin API + * + * @see Zipkin API Spec + */ +@SuppressWarnings("unused") +public class ZipkinSpanResponse { + private final String id; + private final String traceId; + private String parentId = null; + private String name = null; + + @JsonProperty("timestamp") + private Long timestampMicros = null; + + private ZipkinEndpointResponse localEndpoint = null; + + private ZipkinEndpointResponse remoteEndpoint = null; + + @JsonProperty("duration") + // Zipkin spec defines this is integer, even though a long seems like it would be more appropriate + private int durationMicros; + + private String kind; + + private Map tags; + + public ZipkinSpanResponse(String id, String traceId) { + // id and traceId are only required fields + this.id = id; + this.traceId = traceId; + } + + public void setParentId(String parentId) { + this.parentId = parentId; + } + + public void setName(String name) { + this.name = name; + } + + public void setTimestamp(long timestampMicros) { + this.timestampMicros = timestampMicros; + } + + public void setDuration(int durationMicros) { + this.durationMicros = durationMicros; + } + + public void setKind(String kind) { + this.kind = kind; + } + + public void setTags(Map tags) { + this.tags = tags; + } + + public ZipkinEndpointResponse getLocalEndpoint() { + return localEndpoint; + } + + public void setLocalEndpoint(ZipkinEndpointResponse localEndpoint) { + this.localEndpoint = localEndpoint; + } + + public ZipkinEndpointResponse getRemoteEndpoint() { + return remoteEndpoint; + } + + public void setRemoteEndpoint(ZipkinEndpointResponse remoteEndpoint) { + this.remoteEndpoint = remoteEndpoint; + } + + public String getId() { + return id; + } + + public String getTraceId() { + return traceId; + } + + public String getParentId() { + return parentId; + } + + public String getName() { + return name; + } + + public long getTimestamp() { + return timestampMicros; + } + + public int getDuration() { + return durationMicros; + } + + public String getKind() { + return kind; + } + + public Map getTags() { + return tags; + } +} diff --git a/kaldb/src/test/java/com/slack/kaldb/server/ZipkinServiceSpanConversionTest.java b/kaldb/src/test/java/com/slack/kaldb/server/ZipkinServiceSpanConversionTest.java deleted file mode 100644 index 464688ff59..0000000000 --- a/kaldb/src/test/java/com/slack/kaldb/server/ZipkinServiceSpanConversionTest.java +++ /dev/null @@ -1,69 +0,0 @@ -package com.slack.kaldb.server; - -import static com.slack.kaldb.server.ZipkinServiceTest.generateLogWireMessagesForOneTrace; -import static org.assertj.core.api.Assertions.assertThat; - -import com.google.protobuf.InvalidProtocolBufferException; -import com.slack.kaldb.logstore.LogWireMessage; -import java.time.Instant; -import java.util.ArrayList; -import java.util.List; -import org.junit.jupiter.api.Test; - -public class ZipkinServiceSpanConversionTest { - - @Test - public void testLogWireMessageToZipkinSpanConversion() throws InvalidProtocolBufferException { - Instant time = Instant.now(); - List messages = generateLogWireMessagesForOneTrace(time, 2, "1"); - - // follows output format from https://zipkin.io/zipkin-api/#/default/get_trace__traceId_ - String output = - String.format( - """ - [{ - "traceId": "1", - "parentId": "", - "id": "1", - "kind": "SPAN_KIND_UNSPECIFIED", - "name": "Trace1", - "timestamp": "%d", - "duration": "1", - "remoteEndpoint": { - "serviceName": "service1", - "ipv4": "", - "ipv6": "", - "port": 0 - }, - "annotations": [], - "tags": { - }, - "debug": false, - "shared": false - },{ - "traceId": "1", - "parentId": "1", - "id": "2", - "kind": "SPAN_KIND_UNSPECIFIED", - "name": "Trace2", - "timestamp": "%d", - "duration": "2", - "remoteEndpoint": { - "serviceName": "service1", - "ipv4": "", - "ipv6": "", - "port": 0 - }, - "annotations": [], - "tags": { - }, - "debug": false, - "shared": false - }]""", - ZipkinService.convertToMicroSeconds(time.plusSeconds(1)), - ZipkinService.convertToMicroSeconds(time.plusSeconds(2))); - assertThat(ZipkinService.convertLogWireMessageToZipkinSpan(messages)).isEqualTo(output); - - assertThat(ZipkinService.convertLogWireMessageToZipkinSpan(new ArrayList<>())).isEqualTo("[]"); - } -} diff --git a/kaldb/src/test/java/com/slack/kaldb/server/ZipkinServiceTest.java b/kaldb/src/test/java/com/slack/kaldb/server/ZipkinServiceTest.java deleted file mode 100644 index 27a2e240c2..0000000000 --- a/kaldb/src/test/java/com/slack/kaldb/server/ZipkinServiceTest.java +++ /dev/null @@ -1,441 +0,0 @@ -package com.slack.kaldb.server; - -import static com.slack.kaldb.logstore.LuceneIndexStoreImpl.MESSAGES_RECEIVED_COUNTER; -import static com.slack.kaldb.server.KaldbConfig.DEFAULT_START_STOP_DURATION; -import static com.slack.kaldb.testlib.ChunkManagerUtil.ZK_PATH_PREFIX; -import static com.slack.kaldb.testlib.KaldbSearchUtils.searchUsingGrpcApi; -import static com.slack.kaldb.testlib.MessageUtil.TEST_DATASET_NAME; -import static com.slack.kaldb.testlib.MessageUtil.TEST_MESSAGE_TYPE; -import static com.slack.kaldb.testlib.MetricsUtil.getCount; -import static com.slack.kaldb.testlib.TestKafkaServer.produceMessagesToKafka; -import static org.assertj.core.api.Assertions.assertThat; -import static org.awaitility.Awaitility.await; - -import com.adobe.testing.s3mock.junit5.S3MockExtension; -import com.linecorp.armeria.client.WebClient; -import com.linecorp.armeria.common.AggregatedHttpResponse; -import com.slack.kaldb.blobfs.s3.S3TestUtils; -import com.slack.kaldb.chunkManager.RollOverChunkTask; -import com.slack.kaldb.logstore.LogMessage; -import com.slack.kaldb.logstore.LogWireMessage; -import com.slack.kaldb.metadata.core.CuratorBuilder; -import com.slack.kaldb.metadata.core.KaldbMetadataTestUtils; -import com.slack.kaldb.metadata.dataset.DatasetMetadata; -import com.slack.kaldb.metadata.dataset.DatasetMetadataStore; -import com.slack.kaldb.metadata.dataset.DatasetPartitionMetadata; -import com.slack.kaldb.proto.config.KaldbConfigs; -import com.slack.kaldb.proto.service.KaldbSearch; -import com.slack.kaldb.testlib.KaldbConfigUtil; -import com.slack.kaldb.testlib.TestKafkaServer; -import io.micrometer.prometheus.PrometheusConfig; -import io.micrometer.prometheus.PrometheusMeterRegistry; -import java.nio.charset.StandardCharsets; -import java.time.Instant; -import java.time.temporal.ChronoUnit; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.stream.Collectors; -import org.apache.curator.test.TestingServer; -import org.apache.curator.x.async.AsyncCuratorFramework; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import software.amazon.awssdk.services.s3.S3AsyncClient; - -public class ZipkinServiceTest { - - private static final Logger LOG = LoggerFactory.getLogger(ZipkinServiceTest.class); - - private static final String TEST_S3_BUCKET = "test-s3-bucket"; - private static final String TEST_KAFKA_TOPIC_1 = "test-topic-1"; - private static final String KALDB_TEST_CLIENT_1 = "kaldb-test-client1"; - - private DatasetMetadataStore datasetMetadataStore; - private AsyncCuratorFramework curatorFramework; - private PrometheusMeterRegistry meterRegistry; - - @RegisterExtension - public static final S3MockExtension S3_MOCK_EXTENSION = - S3MockExtension.builder() - .withInitialBuckets(TEST_S3_BUCKET) - .silent() - .withSecureConnection(false) - .build(); - - private TestKafkaServer kafkaServer; - private TestingServer zkServer; - private S3AsyncClient s3Client; - - @BeforeEach - public void setUp() throws Exception { - zkServer = new TestingServer(); - kafkaServer = new TestKafkaServer(); - s3Client = S3TestUtils.createS3CrtClient(S3_MOCK_EXTENSION.getServiceEndpoint()); - - // We side load a service metadata entry telling it to create an entry with the partitions that - // we use in test - meterRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT); - KaldbConfigs.ZookeeperConfig zkConfig = - KaldbConfigs.ZookeeperConfig.newBuilder() - .setZkConnectString(zkServer.getConnectString()) - .setZkPathPrefix(ZK_PATH_PREFIX) - .setZkSessionTimeoutMs(1000) - .setZkConnectionTimeoutMs(1000) - .setSleepBetweenRetriesMs(1000) - .build(); - curatorFramework = CuratorBuilder.build(meterRegistry, zkConfig); - datasetMetadataStore = new DatasetMetadataStore(curatorFramework, true); - final DatasetPartitionMetadata partition = - new DatasetPartitionMetadata(1, Long.MAX_VALUE, List.of("0", "1")); - final List partitionConfigs = Collections.singletonList(partition); - DatasetMetadata datasetMetadata = - new DatasetMetadata( - TEST_DATASET_NAME, "serviceOwner", 1000, partitionConfigs, TEST_DATASET_NAME); - datasetMetadataStore.createSync(datasetMetadata); - await().until(() -> KaldbMetadataTestUtils.listSyncUncached(datasetMetadataStore).size() == 1); - } - - @AfterEach - public void teardown() throws Exception { - if (kafkaServer != null) { - kafkaServer.close(); - } - if (meterRegistry != null) { - meterRegistry.close(); - } - if (datasetMetadataStore != null) { - datasetMetadataStore.close(); - } - if (curatorFramework != null) { - curatorFramework.unwrap().close(); - } - if (zkServer != null) { - zkServer.close(); - } - } - - public static LogWireMessage makeWireMessageForSpans( - String id, - Instant ts, - String traceId, - Optional parentId, - long durationMs, - String serviceName, - String name) { - Map fieldMap = new HashMap<>(); - fieldMap.put(LogMessage.ReservedField.TRACE_ID.fieldName, traceId); - fieldMap.put(LogMessage.ReservedField.SERVICE_NAME.fieldName, serviceName); - fieldMap.put(LogMessage.ReservedField.NAME.fieldName, name); - parentId.ifPresent(s -> fieldMap.put(LogMessage.ReservedField.PARENT_ID.fieldName, s)); - fieldMap.put(LogMessage.ReservedField.DURATION_MS.fieldName, durationMs); - return new LogWireMessage(TEST_DATASET_NAME, TEST_MESSAGE_TYPE, id, ts, fieldMap); - } - - public static List generateLogWireMessagesForOneTrace( - Instant time, int count, String traceId) { - List messages = new ArrayList<>(); - for (int i = 1; i <= count; i++) { - String parentId = null; - if (i > 1) { - parentId = String.valueOf(i - 1); - } - messages.add( - makeWireMessageForSpans( - String.valueOf(i), - time.plusSeconds(i), - traceId, - Optional.ofNullable(parentId), - i, - "service1", - ("Trace" + i))); - } - return messages; - } - - @Test - @Disabled // Flakey test, occasionally returns an empty result - public void testDistributedQueryOneIndexerOneQueryNode() throws Exception { - assertThat(kafkaServer.getBroker().isRunning()).isTrue(); - - LOG.info("Starting query service"); - int queryServicePort = 8887; - KaldbConfigs.KaldbConfig queryServiceConfig = - KaldbConfigUtil.makeKaldbConfig( - "localhost:" + kafkaServer.getBroker().getKafkaPort().get(), - -1, - TEST_KAFKA_TOPIC_1, - 0, - KALDB_TEST_CLIENT_1, - TEST_S3_BUCKET, - queryServicePort, - zkServer.getConnectString(), - ZK_PATH_PREFIX, - KaldbConfigs.NodeRole.QUERY, - 1000, - "api_Log", - -1, - 100); - Kaldb queryService = new Kaldb(queryServiceConfig, meterRegistry); - queryService.start(); - queryService.serviceManager.awaitHealthy(DEFAULT_START_STOP_DURATION); - - int indexerPort = 10000; - int totalMessagesToIndex = 8; - LOG.info( - "Creating indexer service at port {}, topic: {} and partition {}", - indexerPort, - TEST_KAFKA_TOPIC_1, - 0); - // create a kaldb indexer - KaldbConfigs.KaldbConfig indexerConfig = - KaldbConfigUtil.makeKaldbConfig( - "localhost:" + kafkaServer.getBroker().getKafkaPort().get(), - indexerPort, - TEST_KAFKA_TOPIC_1, - 0, - KALDB_TEST_CLIENT_1, - TEST_S3_BUCKET, - -1, - zkServer.getConnectString(), - ZK_PATH_PREFIX, - KaldbConfigs.NodeRole.INDEX, - 1000, - "api_log", - 9003, - totalMessagesToIndex); - - PrometheusMeterRegistry indexerMeterRegistry = - new PrometheusMeterRegistry(PrometheusConfig.DEFAULT); - Kaldb indexer = new Kaldb(indexerConfig, s3Client, indexerMeterRegistry); - indexer.start(); - indexer.serviceManager.awaitHealthy(DEFAULT_START_STOP_DURATION); - await().until(() -> kafkaServer.getConnectedConsumerGroups() == 1); - - // Produce messages to kafka, so the indexer can consume them. - final Instant trace1StartTime = Instant.now().minus(20, ChronoUnit.MINUTES); - List messages = - new ArrayList<>(generateLogWireMessagesForOneTrace(trace1StartTime, 2, "1")); - - final Instant trace2StartTime = Instant.now().minus(10, ChronoUnit.MINUTES); - messages.addAll(generateLogWireMessagesForOneTrace(trace2StartTime, 5, "2")); - - final Instant trace3StartTime = Instant.now().minus(5, ChronoUnit.MINUTES); - messages.addAll(generateLogWireMessagesForOneTrace(trace3StartTime, 1, "3")); - - List logMessages = - messages.stream().map(LogMessage::fromWireMessage).collect(Collectors.toList()); - - final int indexedMessagesCount = - produceMessagesToKafka(kafkaServer.getBroker(), TEST_KAFKA_TOPIC_1, 0, logMessages); - assertThat(totalMessagesToIndex).isEqualTo(indexedMessagesCount); - - await() - .until( - () -> - getCount(MESSAGES_RECEIVED_COUNTER, indexerMeterRegistry) == indexedMessagesCount); - - await().until(() -> getCount(RollOverChunkTask.ROLLOVERS_COMPLETED, indexerMeterRegistry) == 1); - assertThat(getCount(RollOverChunkTask.ROLLOVERS_FAILED, indexerMeterRegistry)).isZero(); - - // Query from the grpc search service - KaldbSearch.SearchResult queryServiceSearchResponse = - searchUsingGrpcApi("*:*", queryServicePort, 0, Instant.now().toEpochMilli(), "365d"); - - assertThat(queryServiceSearchResponse.getTotalNodes()).isEqualTo(1); - assertThat(queryServiceSearchResponse.getFailedNodes()).isEqualTo(0); - assertThat(queryServiceSearchResponse.getHitsCount()).isEqualTo(indexedMessagesCount); - - // Query from the zipkin search service - String endpoint = "http://127.0.0.1:" + queryServicePort; - WebClient webClient = WebClient.of(endpoint); - AggregatedHttpResponse response = webClient.get("/api/v2/trace/1").aggregate().join(); - String body = response.content(StandardCharsets.UTF_8); - assertThat(response.status().code()).isEqualTo(200); - String expectedTrace = - String.format( - """ - [ - { - "traceId": "1", - "parentId": "1", - "id": "localhost:100:1", - "kind": "SPAN_KIND_UNSPECIFIED", - "name": "Trace2", - "timestamp": "%d", - "duration": "2", - "remoteEndpoint": { - "serviceName": "testDataSet", - "ipv4": "", - "ipv6": "", - "port": 0 - }, - "annotations": [], - "tags": { - "hostname": "localhost" - }, - "debug": false, - "shared": false - }, - { - "traceId": "1", - "parentId": "", - "id": "localhost:100:0", - "kind": "SPAN_KIND_UNSPECIFIED", - "name": "Trace1", - "timestamp": "%d", - "duration": "1", - "remoteEndpoint": { - "serviceName": "service1", - "ipv4": "", - "ipv6": "", - "port": 0 - }, - "annotations": [], - "tags": { - "hostname": "localhost" - }, - "debug": false, - "shared": false - } - ]""", - ZipkinService.convertToMicroSeconds(trace1StartTime.plusSeconds(2)), - ZipkinService.convertToMicroSeconds(trace1StartTime.plusSeconds(1))); - assertThat(body).isEqualTo(expectedTrace); - - String params = - String.format( - "?startTimeEpochMs=%d&endTimeEpochMs=%d", - trace1StartTime.minus(10, ChronoUnit.SECONDS).toEpochMilli(), - trace1StartTime.plus(5, ChronoUnit.SECONDS).toEpochMilli()); - response = webClient.get("/api/v2/trace/1" + params).aggregate().join(); - body = response.content(StandardCharsets.UTF_8); - assertThat(response.status().code()).isEqualTo(200); - assertThat(body).isEqualTo(expectedTrace); - - params = - String.format( - "?startTimeEpochMs=%d&endTimeEpochMs=%d", - trace1StartTime.plus(0, ChronoUnit.SECONDS).toEpochMilli(), - trace1StartTime.plus(2, ChronoUnit.SECONDS).toEpochMilli()); - response = webClient.get("/api/v2/trace/1" + params).aggregate().join(); - body = response.content(StandardCharsets.UTF_8); - assertThat(response.status().code()).isEqualTo(200); - assertThat(body).isEqualTo(expectedTrace); - - params = - String.format( - "?startTimeEpochMs=%d&endTimeEpochMs=%d", - trace1StartTime.plus(1, ChronoUnit.SECONDS).toEpochMilli(), - trace1StartTime.plus(2, ChronoUnit.SECONDS).toEpochMilli()); - response = webClient.get("/api/v2/trace/1" + params).aggregate().join(); - body = response.content(StandardCharsets.UTF_8); - assertThat(response.status().code()).isEqualTo(200); - assertThat(body).isEqualTo(expectedTrace); - - params = String.format("?maxSpans=%d", 1); - response = webClient.get("/api/v2/trace/1" + params).aggregate().join(); - body = response.content(StandardCharsets.UTF_8); - assertThat(response.status().code()).isEqualTo(200); - expectedTrace = - String.format( - """ - [ - { - "traceId": "1", - "parentId": "1", - "id": "localhost:100:1", - "kind": "SPAN_KIND_UNSPECIFIED", - "name": "Trace2", - "timestamp": "%d", - "duration": "2", - "remoteEndpoint": { - "serviceName": "service1", - "ipv4": "", - "ipv6": "", - "port": 0 - }, - "annotations": [], - "tags": { - "hostname": "localhost" - }, - "debug": false, - "shared": false - } - ]""", - ZipkinService.convertToMicroSeconds(trace1StartTime.plusSeconds(2))); - assertThat(body).isEqualTo(expectedTrace); - - params = - String.format( - "?startTimeEpochMs=%d&endTimeEpochMs=%d", - trace1StartTime.plus(2, ChronoUnit.SECONDS).toEpochMilli(), - trace1StartTime.plus(3, ChronoUnit.SECONDS).toEpochMilli()); - response = webClient.get("/api/v2/trace/1" + params).aggregate().join(); - body = response.content(StandardCharsets.UTF_8); - assertThat(response.status().code()).isEqualTo(200); - assertThat(body).isEqualTo(expectedTrace); - - params = - String.format( - "?startTimeEpochMs=%d&endTimeEpochMs=%d", - trace1StartTime.minus(10, ChronoUnit.SECONDS).toEpochMilli(), - trace1StartTime.minus(1, ChronoUnit.SECONDS).toEpochMilli()); - response = webClient.get("/api/v2/trace/1" + params).aggregate().join(); - body = response.content(StandardCharsets.UTF_8); - assertThat(response.status().code()).isEqualTo(200); - expectedTrace = "[]"; - assertThat(body).isEqualTo(expectedTrace); - - response = webClient.get("/api/v2/trace/3").aggregate().join(); - body = response.content(StandardCharsets.UTF_8); - assertThat(response.status().code()).isEqualTo(200); - expectedTrace = - String.format( - """ - [ - { - "traceId": "3", - "parentId": "", - "id": "localhost:100:7", - "kind": "SPAN_KIND_UNSPECIFIED", - "name": "Trace1", - "timestamp": "%d", - "duration": "1", - "remoteEndpoint": { - "serviceName": "service1", - "ipv4": "", - "ipv6": "", - "port": 0 - }, - "annotations": [], - "tags": { - "hostname": "localhost" - }, - "debug": false, - "shared": false - } - ]""", - ZipkinService.convertToMicroSeconds(trace3StartTime.plusSeconds(1))); - assertThat(body).isEqualTo(expectedTrace); - - response = webClient.get("/api/v2/trace/4").aggregate().join(); - body = response.content(StandardCharsets.UTF_8); - assertThat(response.status().code()).isEqualTo(200); - expectedTrace = "[]"; - assertThat(body).isEqualTo(expectedTrace); - - // Shutdown - LOG.info("Shutting down query service."); - queryService.shutdown(); - LOG.info("Shutting down indexer."); - indexer.shutdown(); - } -} diff --git a/kaldb/src/test/java/com/slack/kaldb/zipkinApi/ZipkinServiceSpanConversionTest.java b/kaldb/src/test/java/com/slack/kaldb/zipkinApi/ZipkinServiceSpanConversionTest.java new file mode 100644 index 0000000000..52d13ac4e7 --- /dev/null +++ b/kaldb/src/test/java/com/slack/kaldb/zipkinApi/ZipkinServiceSpanConversionTest.java @@ -0,0 +1,72 @@ +package com.slack.kaldb.zipkinApi; + +import static com.slack.kaldb.testlib.MessageUtil.TEST_DATASET_NAME; +import static com.slack.kaldb.testlib.MessageUtil.TEST_MESSAGE_TYPE; +import static org.assertj.core.api.Assertions.assertThat; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.slack.kaldb.logstore.LogMessage; +import com.slack.kaldb.logstore.LogWireMessage; +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import org.junit.jupiter.api.Test; + +public class ZipkinServiceSpanConversionTest { + private static LogWireMessage makeWireMessageForSpans( + String id, + Instant ts, + String traceId, + Optional parentId, + long durationMs, + String serviceName, + String name) { + Map fieldMap = new HashMap<>(); + fieldMap.put(LogMessage.ReservedField.TRACE_ID.fieldName, traceId); + fieldMap.put(LogMessage.ReservedField.SERVICE_NAME.fieldName, serviceName); + fieldMap.put(LogMessage.ReservedField.NAME.fieldName, name); + parentId.ifPresent(s -> fieldMap.put(LogMessage.ReservedField.PARENT_ID.fieldName, s)); + fieldMap.put(LogMessage.ReservedField.DURATION_MS.fieldName, durationMs); + return new LogWireMessage(TEST_DATASET_NAME, TEST_MESSAGE_TYPE, id, ts, fieldMap); + } + + private static List generateLogWireMessagesForOneTrace( + Instant time, int count, String traceId) { + List messages = new ArrayList<>(); + for (int i = 1; i <= count; i++) { + String parentId = null; + if (i > 1) { + parentId = String.valueOf(i - 1); + } + messages.add( + makeWireMessageForSpans( + String.valueOf(i), + time.plusSeconds(i), + traceId, + Optional.ofNullable(parentId), + i, + "service1", + ("Trace" + i))); + } + return messages; + } + + @Test + public void testLogWireMessageToZipkinSpanConversion() throws JsonProcessingException { + Instant time = Instant.now(); + List messages = generateLogWireMessagesForOneTrace(time, 2, "1"); + + // follows output format from https://zipkin.io/zipkin-api/#/default/get_trace__traceId_ + String output = + String.format( + "[{\"duration\":1000,\"id\":\"1\",\"name\":\"Trace1\",\"remoteEndpoint\":{\"serviceName\":\"service1\"},\"timestamp\":%d,\"traceId\":\"1\"},{\"duration\":2000,\"id\":\"2\",\"name\":\"Trace2\",\"parentId\":\"1\",\"remoteEndpoint\":{\"serviceName\":\"service1\"},\"timestamp\":%d,\"traceId\":\"1\"}]", + ZipkinService.convertToMicroSeconds(time.plusSeconds(1)), + ZipkinService.convertToMicroSeconds(time.plusSeconds(2))); + assertThat(ZipkinService.convertLogWireMessageToZipkinSpan(messages)).isEqualTo(output); + + assertThat(ZipkinService.convertLogWireMessageToZipkinSpan(new ArrayList<>())).isEqualTo("[]"); + } +} From ee74bc30d800c3452b7f47486b981410320c6798 Mon Sep 17 00:00:00 2001 From: Bryan Burkholder <771133+bryanlb@users.noreply.github.com> Date: Thu, 26 Oct 2023 10:35:43 -0700 Subject: [PATCH 4/5] Rewrite Lucene queries for better caching compatibility (#708) Co-authored-by: Bryan Burkholder Co-authored-by: Varun Thacker --- .../java/com/slack/kaldb/QueryBenchmark.java | 2 +- .../slack/kaldb/chunk/ReadOnlyChunkImpl.java | 35 +++++++- .../opensearch/OpenSearchAdapter.java | 25 ++++-- .../SchemaAwareLogDocumentBuilderImpl.java | 2 +- .../logstore/search/LogIndexSearcher.java | 2 +- .../logstore/search/LogIndexSearcherImpl.java | 12 ++- .../kaldb/chunk/ReadOnlyChunkImplTest.java | 21 +++++ .../logstore/LuceneIndexStoreImplTest.java | 6 +- .../opensearch/OpenSearchAdapterTest.java | 62 ++++++++++++- .../AlreadyClosedLogIndexSearcherImpl.java | 4 +- .../IllegalArgumentLogIndexSearcherImpl.java | 4 +- .../search/LogIndexSearcherImplTest.java | 88 +++++++++---------- ...TemporaryLogStoreAndSearcherExtension.java | 2 +- 13 files changed, 195 insertions(+), 70 deletions(-) diff --git a/benchmarks/src/main/java/com/slack/kaldb/QueryBenchmark.java b/benchmarks/src/main/java/com/slack/kaldb/QueryBenchmark.java index 380a9c4d1e..19c28c65de 100644 --- a/benchmarks/src/main/java/com/slack/kaldb/QueryBenchmark.java +++ b/benchmarks/src/main/java/com/slack/kaldb/QueryBenchmark.java @@ -126,7 +126,7 @@ public void measureLogSearcherSearch() { logIndexSearcher.search( "*", "", - 0, + 0L, Long.MAX_VALUE, 500, new DateHistogramAggBuilder( diff --git a/kaldb/src/main/java/com/slack/kaldb/chunk/ReadOnlyChunkImpl.java b/kaldb/src/main/java/com/slack/kaldb/chunk/ReadOnlyChunkImpl.java index c255fcd3e0..e59d0645cf 100644 --- a/kaldb/src/main/java/com/slack/kaldb/chunk/ReadOnlyChunkImpl.java +++ b/kaldb/src/main/java/com/slack/kaldb/chunk/ReadOnlyChunkImpl.java @@ -381,15 +381,46 @@ public String id() { @Override public SearchResult query(SearchQuery query) { if (logSearcher != null) { + Long searchStartTime = + determineStartTime(query.startTimeEpochMs, chunkInfo.getDataStartTimeEpochMs()); + Long searchEndTime = + determineEndTime(query.endTimeEpochMs, chunkInfo.getDataEndTimeEpochMs()); + return logSearcher.search( query.dataset, query.queryStr, - query.startTimeEpochMs, - query.endTimeEpochMs, + searchStartTime, + searchEndTime, query.howMany, query.aggBuilder); } else { return (SearchResult) SearchResult.empty(); } } + + /** + * Determines the start time to use for the query, given the original query start time and the + * start time of data in the chunk + */ + protected static Long determineStartTime(long queryStartTimeEpochMs, long chunkStartTimeEpochMs) { + Long searchStartTime = null; + if (queryStartTimeEpochMs > chunkStartTimeEpochMs) { + // if the query start time falls after the beginning of the chunk + searchStartTime = queryStartTimeEpochMs; + } + return searchStartTime; + } + + /** + * Determines the end time to use for the query, given the original query end time and the end + * time of data in the chunk + */ + protected static Long determineEndTime(long queryEndTimeEpochMs, long chunkEndTimeEpochMs) { + Long searchEndTime = null; + if (queryEndTimeEpochMs < chunkEndTimeEpochMs) { + // if the query end time falls before the end of the chunk + searchEndTime = queryEndTimeEpochMs; + } + return searchEndTime; + } } diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/opensearch/OpenSearchAdapter.java b/kaldb/src/main/java/com/slack/kaldb/logstore/opensearch/OpenSearchAdapter.java index 5a3bdc47c0..36dbff037c 100644 --- a/kaldb/src/main/java/com/slack/kaldb/logstore/opensearch/OpenSearchAdapter.java +++ b/kaldb/src/main/java/com/slack/kaldb/logstore/opensearch/OpenSearchAdapter.java @@ -143,8 +143,8 @@ public OpenSearchAdapter(Map chunkSchema) { public Query buildQuery( String dataset, String queryStr, - long startTimeMsEpoch, - long endTimeMsEpoch, + Long startTimeMsEpoch, + Long endTimeMsEpoch, IndexSearcher indexSearcher) throws IOException { LOG.trace("Query raw input string: '{}'", queryStr); @@ -158,11 +158,22 @@ public Query buildQuery( try { BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder(); - RangeQueryBuilder rangeQueryBuilder = - new RangeQueryBuilder(LogMessage.SystemField.TIME_SINCE_EPOCH.fieldName) - .gte(startTimeMsEpoch) - .lte(endTimeMsEpoch); - boolQueryBuilder.filter(rangeQueryBuilder); + // only add a range filter if either start or end time is provided + if (startTimeMsEpoch != null || endTimeMsEpoch != null) { + RangeQueryBuilder rangeQueryBuilder = + new RangeQueryBuilder(LogMessage.SystemField.TIME_SINCE_EPOCH.fieldName); + + // todo - consider supporting something other than GTE/LTE (ie GT/LT?) + if (startTimeMsEpoch != null) { + rangeQueryBuilder.gte(startTimeMsEpoch); + } + + if (endTimeMsEpoch != null) { + rangeQueryBuilder.lte(endTimeMsEpoch); + } + + boolQueryBuilder.filter(rangeQueryBuilder); + } // todo - dataset? diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/schema/SchemaAwareLogDocumentBuilderImpl.java b/kaldb/src/main/java/com/slack/kaldb/logstore/schema/SchemaAwareLogDocumentBuilderImpl.java index d8ad4185cf..16edce5ea5 100644 --- a/kaldb/src/main/java/com/slack/kaldb/logstore/schema/SchemaAwareLogDocumentBuilderImpl.java +++ b/kaldb/src/main/java/com/slack/kaldb/logstore/schema/SchemaAwareLogDocumentBuilderImpl.java @@ -48,7 +48,7 @@ private static void addTextField( } // TODO: Move this definition to the config file. - private static ImmutableMap getDefaultLuceneFieldDefinitions( + public static ImmutableMap getDefaultLuceneFieldDefinitions( boolean enableFullTextSearch) { ImmutableMap.Builder fieldDefBuilder = ImmutableMap.builder(); diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/search/LogIndexSearcher.java b/kaldb/src/main/java/com/slack/kaldb/logstore/search/LogIndexSearcher.java index 23c98c20fe..3092962f1f 100644 --- a/kaldb/src/main/java/com/slack/kaldb/logstore/search/LogIndexSearcher.java +++ b/kaldb/src/main/java/com/slack/kaldb/logstore/search/LogIndexSearcher.java @@ -5,5 +5,5 @@ public interface LogIndexSearcher extends Closeable { SearchResult search( - String dataset, String query, long minTime, long maxTime, int howMany, AggBuilder aggBuilder); + String dataset, String query, Long minTime, Long maxTime, int howMany, AggBuilder aggBuilder); } diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/search/LogIndexSearcherImpl.java b/kaldb/src/main/java/com/slack/kaldb/logstore/search/LogIndexSearcherImpl.java index f7e7e1d06e..4188b12a50 100644 --- a/kaldb/src/main/java/com/slack/kaldb/logstore/search/LogIndexSearcherImpl.java +++ b/kaldb/src/main/java/com/slack/kaldb/logstore/search/LogIndexSearcherImpl.java @@ -84,15 +84,19 @@ public void afterRefresh(boolean didRefresh) { public SearchResult search( String dataset, String queryStr, - long startTimeMsEpoch, - long endTimeMsEpoch, + Long startTimeMsEpoch, + Long endTimeMsEpoch, int howMany, AggBuilder aggBuilder) { ensureNonEmptyString(dataset, "dataset should be a non-empty string"); ensureNonNullString(queryStr, "query should be a non-empty string"); - ensureTrue(startTimeMsEpoch >= 0, "start time should be non-negative value"); - ensureTrue(startTimeMsEpoch < endTimeMsEpoch, "end time should be greater than start time"); + if (startTimeMsEpoch != null) { + ensureTrue(startTimeMsEpoch >= 0, "start time should be non-negative value"); + } + if (startTimeMsEpoch != null && endTimeMsEpoch != null) { + ensureTrue(startTimeMsEpoch < endTimeMsEpoch, "end time should be greater than start time"); + } ensureTrue(howMany >= 0, "hits requested should not be negative."); ensureTrue(howMany > 0 || aggBuilder != null, "Hits or aggregation should be requested."); diff --git a/kaldb/src/test/java/com/slack/kaldb/chunk/ReadOnlyChunkImplTest.java b/kaldb/src/test/java/com/slack/kaldb/chunk/ReadOnlyChunkImplTest.java index 2a8dd4a516..da7d7c14ca 100644 --- a/kaldb/src/test/java/com/slack/kaldb/chunk/ReadOnlyChunkImplTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/chunk/ReadOnlyChunkImplTest.java @@ -458,6 +458,27 @@ public void closeShouldCleanupLiveChunkCorrectly() throws Exception { curatorFramework.unwrap().close(); } + @Test + public void shouldUseOptimizedQueryStartEndTime() { + // Query is before the chunk data, so do not return a start time + assertThat(ReadOnlyChunkImpl.determineStartTime(10, 12)).isNull(); + + // Query matches chunk start time, do not return a start time + assertThat(ReadOnlyChunkImpl.determineStartTime(10, 10)).isNull(); + + // Query only matches part of the chunk, return the query start time + assertThat(ReadOnlyChunkImpl.determineStartTime(10, 9)).isEqualTo(10); + + // Query only matches part of the chunk, return the query end time + assertThat(ReadOnlyChunkImpl.determineEndTime(10, 12)).isEqualTo(10); + + // Query matches chunk end time, do not return an end time + assertThat(ReadOnlyChunkImpl.determineEndTime(10, 10)).isNull(); + + // Query is after the chunk data, so do not return an end time + assertThat(ReadOnlyChunkImpl.determineEndTime(12, 10)).isNull(); + } + private void assignReplicaToChunk( CacheSlotMetadataStore cacheSlotMetadataStore, String replicaId, diff --git a/kaldb/src/test/java/com/slack/kaldb/logstore/LuceneIndexStoreImplTest.java b/kaldb/src/test/java/com/slack/kaldb/logstore/LuceneIndexStoreImplTest.java index 50221d563d..464a6c4ea3 100644 --- a/kaldb/src/test/java/com/slack/kaldb/logstore/LuceneIndexStoreImplTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/logstore/LuceneIndexStoreImplTest.java @@ -100,7 +100,7 @@ public void testSearchAndQueryDocsWithNestedJson() throws InterruptedException { logStore.logSearcher.search( MessageUtil.TEST_DATASET_NAME, "nested.key1:value1", - 0, + 0L, MAX_TIME, 100, new DateHistogramAggBuilder( @@ -111,7 +111,7 @@ public void testSearchAndQueryDocsWithNestedJson() throws InterruptedException { logStore.logSearcher.search( MessageUtil.TEST_DATASET_NAME, "duplicateproperty:duplicate1", - 0, + 0L, MAX_TIME, 100, new DateHistogramAggBuilder( @@ -122,7 +122,7 @@ public void testSearchAndQueryDocsWithNestedJson() throws InterruptedException { logStore.logSearcher.search( MessageUtil.TEST_DATASET_NAME, "nested.duplicateproperty:2", - 0, + 0L, MAX_TIME, 100, new DateHistogramAggBuilder( diff --git a/kaldb/src/test/java/com/slack/kaldb/logstore/opensearch/OpenSearchAdapterTest.java b/kaldb/src/test/java/com/slack/kaldb/logstore/opensearch/OpenSearchAdapterTest.java index 463f94a3d0..38a5502316 100644 --- a/kaldb/src/test/java/com/slack/kaldb/logstore/opensearch/OpenSearchAdapterTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/logstore/opensearch/OpenSearchAdapterTest.java @@ -4,6 +4,7 @@ import static org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType; import com.slack.kaldb.logstore.LogMessage; +import com.slack.kaldb.logstore.schema.SchemaAwareLogDocumentBuilderImpl; import com.slack.kaldb.logstore.search.aggregations.AggBuilder; import com.slack.kaldb.logstore.search.aggregations.AggBuilderBase; import com.slack.kaldb.logstore.search.aggregations.AvgAggBuilder; @@ -22,7 +23,13 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Optional; +import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.CollectorManager; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.IndexSortSortedNumericDocValuesRangeQuery; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.Query; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.opensearch.search.aggregations.AbstractAggregationBuilder; @@ -48,9 +55,14 @@ public class OpenSearchAdapterTest { public TemporaryLogStoreAndSearcherExtension logStoreAndSearcherRule = new TemporaryLogStoreAndSearcherExtension(false); - private final OpenSearchAdapter openSearchAdapter = new OpenSearchAdapter(Map.of()); + private final OpenSearchAdapter openSearchAdapter = + new OpenSearchAdapter( + SchemaAwareLogDocumentBuilderImpl.getDefaultLuceneFieldDefinitions(false)); - public OpenSearchAdapterTest() throws IOException {} + public OpenSearchAdapterTest() throws IOException { + // We need to reload the schema so that query optimizations take into account the schema + openSearchAdapter.reloadSchema(); + } @Test public void safelyHandlesUnknownAggregations() throws IOException { @@ -391,4 +403,50 @@ public void handlesDateHistogramExtendedBoundsMinDocEdgeCases() throws IOExcepti assertThatExceptionOfType(IllegalArgumentException.class) .isThrownBy(() -> collectorManager.newCollector()); } + + @Test + public void shouldExcludeDateFilterWhenNullTimestamps() throws Exception { + IndexSearcher indexSearcher = logStoreAndSearcherRule.logStore.getSearcherManager().acquire(); + Query nullBothTimestamps = openSearchAdapter.buildQuery("foo", "", null, null, indexSearcher); + // null for both timestamps with no query string should be optimized into a matchall + assertThat(nullBothTimestamps).isInstanceOf(MatchAllDocsQuery.class); + + Query nullStartTimestamp = openSearchAdapter.buildQuery("foo", "a", null, 100L, indexSearcher); + assertThat(nullStartTimestamp).isInstanceOf(BooleanQuery.class); + + Optional filterNullStartQuery = + ((BooleanQuery) nullStartTimestamp) + .clauses().stream() + .filter( + booleanClause -> + booleanClause.getQuery() + instanceof IndexSortSortedNumericDocValuesRangeQuery) + .map( + booleanClause -> + (IndexSortSortedNumericDocValuesRangeQuery) booleanClause.getQuery()) + .findFirst(); + assertThat(filterNullStartQuery).isPresent(); + // a null start and provided end should result in an optimized range query of min long to the + // end value + assertThat(filterNullStartQuery.get().toString()).contains(String.valueOf(Long.MIN_VALUE)); + assertThat(filterNullStartQuery.get().toString()).contains(String.valueOf(100L)); + + Query nullEndTimestamp = openSearchAdapter.buildQuery("foo", "", 100L, null, indexSearcher); + Optional filterNullEndQuery = + ((BooleanQuery) nullEndTimestamp) + .clauses().stream() + .filter( + booleanClause -> + booleanClause.getQuery() + instanceof IndexSortSortedNumericDocValuesRangeQuery) + .map( + booleanClause -> + (IndexSortSortedNumericDocValuesRangeQuery) booleanClause.getQuery()) + .findFirst(); + assertThat(filterNullEndQuery).isPresent(); + // a null end and provided start should result in an optimized range query of start value to max + // long + assertThat(filterNullEndQuery.get().toString()).contains(String.valueOf(100L)); + assertThat(filterNullEndQuery.get().toString()).contains(String.valueOf(Long.MAX_VALUE)); + } } diff --git a/kaldb/src/test/java/com/slack/kaldb/logstore/search/AlreadyClosedLogIndexSearcherImpl.java b/kaldb/src/test/java/com/slack/kaldb/logstore/search/AlreadyClosedLogIndexSearcherImpl.java index b67643353e..80ffd84fb7 100644 --- a/kaldb/src/test/java/com/slack/kaldb/logstore/search/AlreadyClosedLogIndexSearcherImpl.java +++ b/kaldb/src/test/java/com/slack/kaldb/logstore/search/AlreadyClosedLogIndexSearcherImpl.java @@ -9,8 +9,8 @@ public class AlreadyClosedLogIndexSearcherImpl implements LogIndexSearcher search( String dataset, String query, - long minTime, - long maxTime, + Long minTime, + Long maxTime, int howMany, AggBuilder aggBuilder) { throw new AlreadyClosedException("Failed to acquire an index searcher"); diff --git a/kaldb/src/test/java/com/slack/kaldb/logstore/search/IllegalArgumentLogIndexSearcherImpl.java b/kaldb/src/test/java/com/slack/kaldb/logstore/search/IllegalArgumentLogIndexSearcherImpl.java index b6df6cdcc8..a8df466587 100644 --- a/kaldb/src/test/java/com/slack/kaldb/logstore/search/IllegalArgumentLogIndexSearcherImpl.java +++ b/kaldb/src/test/java/com/slack/kaldb/logstore/search/IllegalArgumentLogIndexSearcherImpl.java @@ -8,8 +8,8 @@ public class IllegalArgumentLogIndexSearcherImpl implements LogIndexSearcher search( String dataset, String query, - long minTime, - long maxTime, + Long minTime, + Long maxTime, int howMany, AggBuilder aggBuilder) { throw new IllegalArgumentException("Failed to acquire an index searcher"); diff --git a/kaldb/src/test/java/com/slack/kaldb/logstore/search/LogIndexSearcherImplTest.java b/kaldb/src/test/java/com/slack/kaldb/logstore/search/LogIndexSearcherImplTest.java index dc0c3ac625..be9a33dc87 100644 --- a/kaldb/src/test/java/com/slack/kaldb/logstore/search/LogIndexSearcherImplTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/logstore/search/LogIndexSearcherImplTest.java @@ -649,7 +649,7 @@ public void testFullIndexSearch() { strictLogStore.logSearcher.search( TEST_DATASET_NAME, "", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -677,7 +677,7 @@ public void testAggregationWithScripting() { strictLogStore.logSearcher.search( TEST_DATASET_NAME, "", - 0, + 0L, MAX_TIME, 1000, new AvgAggBuilder("1", TEST_SOURCE_LONG_PROPERTY, 0, null)); @@ -687,7 +687,7 @@ public void testAggregationWithScripting() { strictLogStore.logSearcher.search( TEST_DATASET_NAME, "", - 0, + 0L, MAX_TIME, 1000, new AvgAggBuilder("1", TEST_SOURCE_LONG_PROPERTY, 0, "")); @@ -697,7 +697,7 @@ public void testAggregationWithScripting() { strictLogStore.logSearcher.search( TEST_DATASET_NAME, "", - 0, + 0L, MAX_TIME, 1000, new AvgAggBuilder("1", TEST_SOURCE_LONG_PROPERTY, 0, "return 9;")); @@ -713,7 +713,7 @@ public void testFilterAggregations() { strictLogStore.logSearcher.search( TEST_DATASET_NAME, "", - 0, + 0L, MAX_TIME, 1000, new FiltersAggBuilder( @@ -758,7 +758,7 @@ public void testFullIndexSearchForMinAgg() { strictLogStore.logSearcher.search( TEST_DATASET_NAME, "", - 0, + 0L, MAX_TIME, 1000, new MinAggBuilder( @@ -783,7 +783,7 @@ public void testFullIndexSearchForMaxAgg() { strictLogStore.logSearcher.search( TEST_DATASET_NAME, "", - 0, + 0L, MAX_TIME, 1000, new MaxAggBuilder( @@ -809,7 +809,7 @@ public void testFullIndexSearchForSumAgg() { strictLogStore.logSearcher.search( TEST_DATASET_NAME, "", - 0, + 0L, MAX_TIME, 1000, new SumAggBuilder("test", TEST_SOURCE_LONG_PROPERTY, "0", null)); @@ -832,7 +832,7 @@ public void testFullIndexSearchForExtendedStatsAgg() { strictLogStore.logSearcher.search( TEST_DATASET_NAME, "", - 0, + 0L, MAX_TIME, 1000, new ExtendedStatsAggBuilder("test", TEST_SOURCE_LONG_PROPERTY, "0", null, null)); @@ -862,7 +862,7 @@ public void testTermsAggregation() { strictLogStore.logSearcher.search( TEST_DATASET_NAME, "", - 0, + 0L, MAX_TIME, 1000, new TermsAggBuilder( @@ -969,7 +969,7 @@ public void testTermsAggregationMissingValues() { strictLogStore.logSearcher.search( TEST_DATASET_NAME, "", - 0, + 0L, MAX_TIME, 1000, new TermsAggBuilder( @@ -998,7 +998,7 @@ public void testFullTextSearch() { .search( TEST_DATASET_NAME, "_all:baby", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1012,7 +1012,7 @@ public void testFullTextSearch() { .search( TEST_DATASET_NAME, "_all:1234", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1027,7 +1027,7 @@ public void testFullTextSearch() { .search( TEST_DATASET_NAME, "baby", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1041,7 +1041,7 @@ public void testFullTextSearch() { .search( TEST_DATASET_NAME, "1234", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1063,7 +1063,7 @@ public void testFullTextSearch() { .search( TEST_DATASET_NAME, "_all:baby", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1077,7 +1077,7 @@ public void testFullTextSearch() { .search( TEST_DATASET_NAME, "_all:1234", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1092,7 +1092,7 @@ public void testFullTextSearch() { .search( TEST_DATASET_NAME, "baby", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1106,7 +1106,7 @@ public void testFullTextSearch() { .search( TEST_DATASET_NAME, "1234", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1128,7 +1128,7 @@ public void testFullTextSearch() { .search( TEST_DATASET_NAME, "_all:baby", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1142,7 +1142,7 @@ public void testFullTextSearch() { .search( TEST_DATASET_NAME, "_all:1234", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1157,7 +1157,7 @@ public void testFullTextSearch() { .search( TEST_DATASET_NAME, "baby", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1171,7 +1171,7 @@ public void testFullTextSearch() { .search( TEST_DATASET_NAME, "1234", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1187,7 +1187,7 @@ public void testFullTextSearch() { .search( TEST_DATASET_NAME, "", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1202,7 +1202,7 @@ public void testFullTextSearch() { .search( TEST_DATASET_NAME, "app*", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1218,7 +1218,7 @@ public void testFullTextSearch() { .search( TEST_DATASET_NAME, "baby car", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1234,7 +1234,7 @@ public void testFullTextSearch() { .search( TEST_DATASET_NAME, "apple 1234", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1249,7 +1249,7 @@ public void testFullTextSearch() { .search( TEST_DATASET_NAME, "123", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1285,7 +1285,7 @@ public void testDisabledFullTextSearch() { .search( TEST_DATASET_NAME, "_all:baby", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1300,7 +1300,7 @@ public void testDisabledFullTextSearch() { .search( TEST_DATASET_NAME, "_all:1234", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1316,7 +1316,7 @@ public void testDisabledFullTextSearch() { .search( TEST_DATASET_NAME, "baby", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1331,7 +1331,7 @@ public void testDisabledFullTextSearch() { .search( TEST_DATASET_NAME, "1234", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1347,7 +1347,7 @@ public void testDisabledFullTextSearch() { .search( TEST_DATASET_NAME, "", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1362,7 +1362,7 @@ public void testDisabledFullTextSearch() { .search( TEST_DATASET_NAME, "app*", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1378,7 +1378,7 @@ public void testDisabledFullTextSearch() { .search( TEST_DATASET_NAME, "baby car", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1394,7 +1394,7 @@ public void testDisabledFullTextSearch() { .search( TEST_DATASET_NAME, "apple 1234", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1409,7 +1409,7 @@ public void testDisabledFullTextSearch() { .search( TEST_DATASET_NAME, "123", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1430,7 +1430,7 @@ public void testNullSearchString() { strictLogStore.logSearcher.search( TEST_DATASET_NAME + "miss", null, - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1447,7 +1447,7 @@ public void testMissingIndexSearch() { strictLogStore.logSearcher.search( TEST_DATASET_NAME + "miss", "apple", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1475,7 +1475,7 @@ public void testNoResultQuery() { strictLogStore.logSearcher.search( TEST_DATASET_NAME, "elephant", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1544,7 +1544,7 @@ public void testEmptyIndexName() { strictLogStore.logSearcher.search( "", "test", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1561,7 +1561,7 @@ public void testNullIndexName() { strictLogStore.logSearcher.search( null, "test", - 0, + 0L, MAX_TIME, 1000, new DateHistogramAggBuilder( @@ -1595,7 +1595,7 @@ public void testInvalidEndTime() { strictLogStore.logSearcher.search( TEST_DATASET_NAME, "test", - 0, + 0L, -1L, 1000, new DateHistogramAggBuilder( @@ -1704,7 +1704,7 @@ public void testConcurrentSearches() throws InterruptedException { strictLogStore.logSearcher.search( TEST_DATASET_NAME, "baby", - 0, + 0L, MAX_TIME, 100, new DateHistogramAggBuilder( diff --git a/kaldb/src/test/java/com/slack/kaldb/testlib/TemporaryLogStoreAndSearcherExtension.java b/kaldb/src/test/java/com/slack/kaldb/testlib/TemporaryLogStoreAndSearcherExtension.java index 1245a265ae..ff24b10841 100644 --- a/kaldb/src/test/java/com/slack/kaldb/testlib/TemporaryLogStoreAndSearcherExtension.java +++ b/kaldb/src/test/java/com/slack/kaldb/testlib/TemporaryLogStoreAndSearcherExtension.java @@ -47,7 +47,7 @@ public static List findAllMessages( searcher.search( dataset, query, - 0, + 0L, MAX_TIME, howMany, new DateHistogramAggBuilder( From c44937dcfc12346301c1aadb39dff381f37e0db7 Mon Sep 17 00:00:00 2001 From: Bryan Burkholder <771133+bryanlb@users.noreply.github.com> Date: Thu, 26 Oct 2023 11:07:22 -0700 Subject: [PATCH 5/5] Fix replica assignment over-assigning (#707) Co-authored-by: Bryan Burkholder --- .../ReplicaAssignmentService.java | 11 ++- .../ReplicaAssignmentServiceTest.java | 81 +++++++++++++++++++ 2 files changed, 89 insertions(+), 3 deletions(-) diff --git a/kaldb/src/main/java/com/slack/kaldb/clusterManager/ReplicaAssignmentService.java b/kaldb/src/main/java/com/slack/kaldb/clusterManager/ReplicaAssignmentService.java index 4361de9acc..88f947d64f 100644 --- a/kaldb/src/main/java/com/slack/kaldb/clusterManager/ReplicaAssignmentService.java +++ b/kaldb/src/main/java/com/slack/kaldb/clusterManager/ReplicaAssignmentService.java @@ -187,20 +187,25 @@ protected Map assignReplicasToCacheSlots() { .stream() .flatMap( (cacheSlotsPerHost) -> { - int currentlyAssigned = + int currentlyAssignedOrLoading = cacheSlotsPerHost.stream() .filter( cacheSlotMetadata -> cacheSlotMetadata.cacheSlotState.equals( - Metadata.CacheSlotMetadata.CacheSlotState.ASSIGNED)) + Metadata.CacheSlotMetadata.CacheSlotState.ASSIGNED) + || cacheSlotMetadata.cacheSlotState.equals( + Metadata.CacheSlotMetadata.CacheSlotState.LOADING)) .toList() .size(); + return cacheSlotsPerHost.stream() .filter( cacheSlotMetadata -> cacheSlotMetadata.cacheSlotState.equals( Metadata.CacheSlotMetadata.CacheSlotState.FREE)) - .limit(Math.max(0, maxConcurrentAssignmentsPerNode - currentlyAssigned)); + .limit( + Math.max( + 0, maxConcurrentAssignmentsPerNode - currentlyAssignedOrLoading)); }) .collect(Collectors.toList()); diff --git a/kaldb/src/test/java/com/slack/kaldb/clusterManager/ReplicaAssignmentServiceTest.java b/kaldb/src/test/java/com/slack/kaldb/clusterManager/ReplicaAssignmentServiceTest.java index 76c562f023..d61dfe86d4 100644 --- a/kaldb/src/test/java/com/slack/kaldb/clusterManager/ReplicaAssignmentServiceTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/clusterManager/ReplicaAssignmentServiceTest.java @@ -1371,6 +1371,87 @@ public void assignmentPreservesSupportedIndexTypes() throws Exception { assertThat(assignedCacheSlot.size()).isEqualTo(1); } + @Test + public void shouldNotAssignIfAlreadyLoading() throws Exception { + MeterRegistry concurrentAssignmentsRegistry = new SimpleMeterRegistry(); + + KaldbConfigs.ManagerConfig.ReplicaAssignmentServiceConfig replicaAssignmentServiceConfig = + KaldbConfigs.ManagerConfig.ReplicaAssignmentServiceConfig.newBuilder() + .setSchedulePeriodMins(1) + .addAllReplicaSets(List.of(REPLICA_SET)) + .setMaxConcurrentPerNode(1) + .build(); + KaldbConfigs.ManagerConfig managerConfig = + KaldbConfigs.ManagerConfig.newBuilder() + .setEventAggregationSecs(2) + .setScheduleInitialDelayMins(1) + .setReplicaAssignmentServiceConfig(replicaAssignmentServiceConfig) + .build(); + + ReplicaAssignmentService replicaAssignmentService = + new ReplicaAssignmentService( + cacheSlotMetadataStore, + replicaMetadataStore, + managerConfig, + concurrentAssignmentsRegistry); + + Instant now = Instant.now(); + ReplicaMetadata expectedUnassignedMetadata = + new ReplicaMetadata( + UUID.randomUUID().toString(), + UUID.randomUUID().toString(), + REPLICA_SET, + now.minus(1, ChronoUnit.HOURS).toEpochMilli(), + now.plusSeconds(60).toEpochMilli(), + false, + LOGS_LUCENE9); + replicaMetadataStore.createAsync(expectedUnassignedMetadata); + + ReplicaMetadata loadingMetadata = + new ReplicaMetadata( + UUID.randomUUID().toString(), + UUID.randomUUID().toString(), + REPLICA_SET, + now.minus(1, ChronoUnit.HOURS).toEpochMilli(), + now.plusSeconds(60).toEpochMilli(), + false, + LOGS_LUCENE9); + replicaMetadataStore.createAsync(loadingMetadata); + + await().until(() -> replicaMetadataStore.listSync().size() == 2); + + CacheSlotMetadata cacheSlotMetadata = + new CacheSlotMetadata( + UUID.randomUUID().toString(), + Metadata.CacheSlotMetadata.CacheSlotState.LOADING, + loadingMetadata.snapshotId, + Instant.now().toEpochMilli(), + List.of(LOGS_LUCENE9, LOGS_LUCENE9), + HOSTNAME, + REPLICA_SET); + cacheSlotMetadataStore.createAsync(cacheSlotMetadata); + + CacheSlotMetadata freeCacheSlot = + new CacheSlotMetadata( + UUID.randomUUID().toString(), + Metadata.CacheSlotMetadata.CacheSlotState.FREE, + "", + Instant.now().toEpochMilli(), + List.of(LOGS_LUCENE9, LOGS_LUCENE9), + HOSTNAME, + REPLICA_SET); + cacheSlotMetadataStore.createAsync(freeCacheSlot); + + await().until(() -> cacheSlotMetadataStore.listSync().size() == 2); + + replicaAssignmentService.startAsync(); + replicaAssignmentService.awaitRunning(DEFAULT_START_STOP_DURATION); + + // immediately force a run + Map assignments = replicaAssignmentService.assignReplicasToCacheSlots(); + assertThat(assignments.get(REPLICA_SET)).isEqualTo(0); + } + @Test public void shouldPreventConcurrentAssignmentsExceedingLimit() throws Exception { MeterRegistry concurrentAssignmentsRegistry = new SimpleMeterRegistry();