From 3d54a6ecf771203444a625ae7fd0ac053528d9db Mon Sep 17 00:00:00 2001 From: Bryan Burkholder <771133+bryanlb@users.noreply.github.com> Date: Fri, 17 May 2024 11:47:39 -0700 Subject: [PATCH] Remove Kafka stream preprocessor functionality (#930) Co-authored-by: Bryan Burkholder --- astra/pom.xml | 5 - .../BulkIngestKafkaProducer.java | 23 +- .../slack/astra/logstore/LogWireMessage.java | 23 - .../slack/astra/preprocessor/AstraSerdes.java | 105 ---- .../preprocessor/PreprocessorPartitioner.java | 112 ---- .../preprocessor/PreprocessorRateLimiter.java | 83 +-- .../preprocessor/PreprocessorService.java | 323 ----------- .../preprocessor/PreprocessorValueMapper.java | 102 ---- .../java/com/slack/astra/server/Astra.java | 51 +- .../slack/astra/writer/JsonLogFormatter.java | 59 -- .../astra/writer/MurronLogFormatter.java | 93 --- .../com/slack/astra/writer/SpanFormatter.java | 77 --- astra/src/main/proto/astra_configs.proto | 36 -- .../BulkIngestKafkaProducerTest.java | 1 - .../astra/preprocessor/AstraSerdesTest.java | 158 ------ .../PreprocessorBulkRateLimiterTest.java | 448 --------------- .../PreprocessorRateLimiterTest.java | 122 ++-- .../PreprocessorServiceIntegrationTest.java | 328 ----------- .../PreprocessorServiceUnitTest.java | 537 ------------------ .../PreprocessorValueMapperTest.java | 166 ------ .../slack/astra/server/AstraConfigTest.java | 40 -- .../slack/astra/server/BulkIngestApiTest.java | 1 - .../astra/writer/JsonLogFormatterTest.java | 67 --- .../astra/writer/MurronLogFormatterTest.java | 182 ------ config/config.yaml | 11 - docs/topics/Config-options.md | 58 -- 26 files changed, 117 insertions(+), 3094 deletions(-) delete mode 100644 astra/src/main/java/com/slack/astra/preprocessor/AstraSerdes.java delete mode 100644 astra/src/main/java/com/slack/astra/preprocessor/PreprocessorPartitioner.java delete mode 100644 astra/src/main/java/com/slack/astra/preprocessor/PreprocessorService.java delete mode 100644 astra/src/main/java/com/slack/astra/preprocessor/PreprocessorValueMapper.java delete mode 100644 astra/src/main/java/com/slack/astra/writer/JsonLogFormatter.java delete mode 100644 astra/src/main/java/com/slack/astra/writer/MurronLogFormatter.java delete mode 100644 astra/src/test/java/com/slack/astra/preprocessor/AstraSerdesTest.java delete mode 100644 astra/src/test/java/com/slack/astra/preprocessor/PreprocessorBulkRateLimiterTest.java delete mode 100644 astra/src/test/java/com/slack/astra/preprocessor/PreprocessorServiceIntegrationTest.java delete mode 100644 astra/src/test/java/com/slack/astra/preprocessor/PreprocessorServiceUnitTest.java delete mode 100644 astra/src/test/java/com/slack/astra/preprocessor/PreprocessorValueMapperTest.java delete mode 100644 astra/src/test/java/com/slack/astra/writer/JsonLogFormatterTest.java delete mode 100644 astra/src/test/java/com/slack/astra/writer/MurronLogFormatterTest.java diff --git a/astra/pom.xml b/astra/pom.xml index f2bfb2edbf..eee932d1a4 100644 --- a/astra/pom.xml +++ b/astra/pom.xml @@ -203,11 +203,6 @@ kafka-clients ${kafka.version} - - org.apache.kafka - kafka-streams - ${kafka.version} - diff --git a/astra/src/main/java/com/slack/astra/bulkIngestApi/BulkIngestKafkaProducer.java b/astra/src/main/java/com/slack/astra/bulkIngestApi/BulkIngestKafkaProducer.java index a41ab81f70..0f15e068c3 100644 --- a/astra/src/main/java/com/slack/astra/bulkIngestApi/BulkIngestKafkaProducer.java +++ b/astra/src/main/java/com/slack/astra/bulkIngestApi/BulkIngestKafkaProducer.java @@ -3,12 +3,13 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.slack.astra.metadata.dataset.DatasetMetadata.MATCH_ALL_SERVICE; import static com.slack.astra.metadata.dataset.DatasetMetadata.MATCH_STAR_SERVICE; +import static com.slack.astra.server.ManagerApiGrpc.MAX_TIME; import com.google.common.util.concurrent.AbstractExecutionThreadService; import com.slack.astra.metadata.core.AstraMetadataStoreChangeListener; import com.slack.astra.metadata.dataset.DatasetMetadata; import com.slack.astra.metadata.dataset.DatasetMetadataStore; -import com.slack.astra.preprocessor.PreprocessorService; +import com.slack.astra.metadata.dataset.DatasetPartitionMetadata; import com.slack.astra.proto.config.AstraConfigs; import com.slack.astra.writer.KafkaUtils; import com.slack.service.murron.trace.Trace; @@ -19,10 +20,12 @@ import io.micrometer.prometheus.PrometheusMeterRegistry; import java.time.Duration; import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.UUID; @@ -30,6 +33,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; @@ -331,11 +335,26 @@ private int getPartition(String index) { if (serviceNamePattern.equals(MATCH_ALL_SERVICE) || serviceNamePattern.equals(MATCH_STAR_SERVICE) || index.equals(serviceNamePattern)) { - List partitions = PreprocessorService.getActivePartitionList(datasetMetadata); + List partitions = getActivePartitionList(datasetMetadata); return partitions.get(ThreadLocalRandom.current().nextInt(partitions.size())); } } // We don't have a provisioned service for this index return -1; } + + /** Gets the active list of partitions from the provided dataset metadata */ + private static List getActivePartitionList(DatasetMetadata datasetMetadata) { + Optional datasetPartitionMetadata = + datasetMetadata.getPartitionConfigs().stream() + .filter(partitionMetadata -> partitionMetadata.getEndTimeEpochMs() == MAX_TIME) + .findFirst(); + + if (datasetPartitionMetadata.isEmpty()) { + return Collections.emptyList(); + } + return datasetPartitionMetadata.get().getPartitions().stream() + .map(Integer::parseInt) + .collect(Collectors.toUnmodifiableList()); + } } diff --git a/astra/src/main/java/com/slack/astra/logstore/LogWireMessage.java b/astra/src/main/java/com/slack/astra/logstore/LogWireMessage.java index c95af0c223..147777891f 100644 --- a/astra/src/main/java/com/slack/astra/logstore/LogWireMessage.java +++ b/astra/src/main/java/com/slack/astra/logstore/LogWireMessage.java @@ -1,41 +1,18 @@ package com.slack.astra.logstore; -import com.slack.astra.preprocessor.AstraSerdes; -import com.slack.astra.util.JsonUtil; -import java.io.IOException; import java.time.Instant; import java.util.Collections; import java.util.Map; -import java.util.Optional; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * LogWireMessage is the raw message we get from Kafka. This message may be invalid or malformed. * LogMessage is a refined form of this message. */ public class LogWireMessage extends Message { - private static final Logger LOG = LoggerFactory.getLogger(LogWireMessage.class); private String index; private String type; - /** - * Move all Kafka message serializers to common class - * - * @see AstraSerdes - */ - @Deprecated - static Optional fromJson(String jsonStr) { - try { - LogWireMessage wireMessage = JsonUtil.read(jsonStr, LogWireMessage.class); - return Optional.of(wireMessage); - } catch (IOException e) { - LOG.error("Error parsing JSON Object from string " + jsonStr, e); - } - return Optional.empty(); - } - public LogWireMessage() { super("", Instant.now(), Collections.emptyMap()); } diff --git a/astra/src/main/java/com/slack/astra/preprocessor/AstraSerdes.java b/astra/src/main/java/com/slack/astra/preprocessor/AstraSerdes.java deleted file mode 100644 index b3beaf0674..0000000000 --- a/astra/src/main/java/com/slack/astra/preprocessor/AstraSerdes.java +++ /dev/null @@ -1,105 +0,0 @@ -package com.slack.astra.preprocessor; - -import com.google.protobuf.InvalidProtocolBufferException; -import com.slack.service.murron.Murron; -import com.slack.service.murron.trace.Trace; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.common.serialization.Serializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Factory for creating Astra specific serializers / deserializers. Based off of - * org.apache.kafka.common.serialization.Serdes - */ -public class AstraSerdes { - private static final Logger LOG = LoggerFactory.getLogger(AstraSerdes.class); - - public static Serde MurronMurronMessage() { - return new Serde<>() { - @Override - public Serializer serializer() { - return (topic, data) -> { - if (data == null) { - return null; - } - return data.toByteArray(); - }; - } - - @Override - public Deserializer deserializer() { - return (topic, data) -> { - Murron.MurronMessage murronMsg = null; - if (data == null || data.length == 0) return null; - - try { - murronMsg = Murron.MurronMessage.parseFrom(data); - } catch (InvalidProtocolBufferException e) { - LOG.error("Error parsing byte string into MurronMessage: {}", new String(data), e); - } - return murronMsg; - }; - } - }; - } - - public static Serde TraceSpan() { - return new Serde<>() { - @Override - public Serializer serializer() { - return (topic, data) -> { - if (data == null) { - return null; - } - return data.toByteArray(); - }; - } - - @Override - public Deserializer deserializer() { - return (topic, data) -> { - Trace.Span span = null; - if (data == null || data.length == 0) return null; - - try { - span = Trace.Span.parseFrom(data); - } catch (InvalidProtocolBufferException e) { - LOG.error("Error parsing byte string into Trace.Span: {}", new String(data), e); - } - return span; - }; - } - }; - } - - public static Serde TraceListOfSpans() { - return new Serde<>() { - @Override - public Serializer serializer() { - return (topic, data) -> { - if (data == null) { - return null; - } - return data.toByteArray(); - }; - } - - @Override - public Deserializer deserializer() { - return (topic, data) -> { - Trace.ListOfSpans listOfSpans = null; - if (data == null || data.length == 0) return null; - - try { - listOfSpans = Trace.ListOfSpans.parseFrom(data); - } catch (InvalidProtocolBufferException e) { - LOG.error("Error parsing byte string into Trace.ListOfSpans: {}", new String(data), e); - } - return listOfSpans; - }; - } - }; - } -} diff --git a/astra/src/main/java/com/slack/astra/preprocessor/PreprocessorPartitioner.java b/astra/src/main/java/com/slack/astra/preprocessor/PreprocessorPartitioner.java deleted file mode 100644 index 10603c047c..0000000000 --- a/astra/src/main/java/com/slack/astra/preprocessor/PreprocessorPartitioner.java +++ /dev/null @@ -1,112 +0,0 @@ -package com.slack.astra.preprocessor; - -import static com.slack.astra.metadata.dataset.DatasetMetadata.MATCH_ALL_SERVICE; -import static com.slack.astra.metadata.dataset.DatasetMetadata.MATCH_STAR_SERVICE; -import static com.slack.astra.preprocessor.PreprocessorService.sortDatasetsOnThroughput; - -import com.google.common.base.Supplier; -import com.google.common.base.Suppliers; -import com.slack.astra.metadata.dataset.DatasetMetadata; -import com.slack.service.murron.trace.Trace; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; -import org.apache.kafka.streams.processor.StreamPartitioner; - -public class PreprocessorPartitioner implements StreamPartitioner { - - final List throughputSortedDatasets; - ConcurrentHashMap> datasetPartitionSuppliers; - - public PreprocessorPartitioner( - List datasetMetadataList, int kafkaPartitionStickyTimeoutMs) { - this.throughputSortedDatasets = sortDatasetsOnThroughput(datasetMetadataList); - this.datasetPartitionSuppliers = - getDatasetPartitionSuppliers(datasetMetadataList, kafkaPartitionStickyTimeoutMs); - } - - /** - * When kafka streams produces data to write to the preprocessor, instead of choosing partitions - * at random for each dataset we want to route all documents to a single partition for a set - * time(stickyTimeoutMs). We pick the partition at random and doesn't have smart routing like - * picking partitions which have lesser load etc. The motivation here is we create better batches - * while producing data into kafka and thereby improving the efficiency We use Guava's - * Suppliers.memoizeWithExpiration library which gives us a nice construct to achieve this - */ - private ConcurrentHashMap> getDatasetPartitionSuppliers( - List datasetMetadataList, int kafkaPartitionStickyTimeoutMs) { - ConcurrentHashMap> datasetPartitionSuppliers = - new ConcurrentHashMap<>(); - for (DatasetMetadata datasetMetadata : datasetMetadataList) { - if (kafkaPartitionStickyTimeoutMs > 0) { - datasetPartitionSuppliers.put( - datasetMetadata.name, - Suppliers.memoizeWithExpiration( - () -> { - List partitions = - PreprocessorService.getActivePartitionList(datasetMetadata); - return partitions.get(ThreadLocalRandom.current().nextInt(partitions.size())); - }, - kafkaPartitionStickyTimeoutMs, - TimeUnit.MILLISECONDS)); - } else { - datasetPartitionSuppliers.put(datasetMetadata.name, new NonStickySupplier(datasetMetadata)); - } - } - return datasetPartitionSuppliers; - } - - /** - * Implements the Supplier interface to not cache partitions i.e if - * kafkaPartitionStickyTimeoutMs=0 we will return a new partition at random on every invocation - */ - protected static class NonStickySupplier implements Supplier { - - private final DatasetMetadata datasetMetadata; - - NonStickySupplier(DatasetMetadata datasetMetadata) { - this.datasetMetadata = datasetMetadata; - } - - @Override - public Integer get() { - List partitions = PreprocessorService.getActivePartitionList(datasetMetadata); - return partitions.get(ThreadLocalRandom.current().nextInt(partitions.size())); - } - } - - /** - * Returns a partition from the provided list of dataset metadata. If no valid dataset metadata - * are provided throws an exception. The partition is cached for a set period ( stickyTimeoutMs ) - */ - @Override - public Integer partition( - final String topic, final String key, final Trace.Span value, final int numPartitions) { - String serviceName = PreprocessorValueMapper.getServiceName(value); - if (serviceName == null) { - // this also should not happen since we drop messages with empty service names in the rate - // limiter - throw new IllegalStateException( - String.format("Service name not found within the message '%s'", value)); - } - - for (DatasetMetadata datasetMetadata : throughputSortedDatasets) { - String serviceNamePattern = datasetMetadata.getServiceNamePattern(); - // back-compat since this is a new field - if (serviceNamePattern == null) { - serviceNamePattern = datasetMetadata.getName(); - } - - if (serviceNamePattern.equals(MATCH_ALL_SERVICE) - || serviceNamePattern.equals(MATCH_STAR_SERVICE) - || serviceName.equals(serviceNamePattern)) { - return datasetPartitionSuppliers.get(datasetMetadata.name).get(); - } - } - // this shouldn't happen, as we should have filtered all the missing datasets in the value - // mapper stage - throw new IllegalStateException( - String.format("Service name '%s' was not found in dataset metadata", serviceName)); - } -} diff --git a/astra/src/main/java/com/slack/astra/preprocessor/PreprocessorRateLimiter.java b/astra/src/main/java/com/slack/astra/preprocessor/PreprocessorRateLimiter.java index 9a153027c6..b403648473 100644 --- a/astra/src/main/java/com/slack/astra/preprocessor/PreprocessorRateLimiter.java +++ b/astra/src/main/java/com/slack/astra/preprocessor/PreprocessorRateLimiter.java @@ -2,7 +2,6 @@ import static com.slack.astra.metadata.dataset.DatasetMetadata.MATCH_ALL_SERVICE; import static com.slack.astra.metadata.dataset.DatasetMetadata.MATCH_STAR_SERVICE; -import static com.slack.astra.preprocessor.PreprocessorService.sortDatasetsOnThroughput; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.RateLimiter; @@ -13,12 +12,12 @@ import java.lang.reflect.Constructor; import java.lang.reflect.Field; import java.lang.reflect.Method; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.function.BiPredicate; import java.util.stream.Collectors; import javax.annotation.concurrent.ThreadSafe; -import org.apache.kafka.streams.kstream.Predicate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,6 +41,9 @@ public class PreprocessorRateLimiter { public static final String MESSAGES_DROPPED = "preprocessor_rate_limit_messages_dropped"; public static final String BYTES_DROPPED = "preprocessor_rate_limit_bytes_dropped"; + /** Span key for KeyValue pair to use as the service name */ + public static String SERVICE_NAME_KEY = "service_name"; + public enum MessageDropReason { MISSING_SERVICE_NAME, NOT_PROVISIONED, @@ -196,75 +198,16 @@ public Map getRateLimiterMap( })); } - public Predicate createRateLimiter( - List datasetMetadataList) { - - List throughputSortedDatasets = sortDatasetsOnThroughput(datasetMetadataList); - - Map rateLimiterMap = getRateLimiterMap(throughputSortedDatasets); - - return (key, value) -> { - if (value == null) { - LOG.warn("Message was dropped, was null span"); - return false; - } - - String serviceName = PreprocessorValueMapper.getServiceName(value); - int bytes = value.getSerializedSize(); - if (serviceName == null || serviceName.isEmpty()) { - // service name wasn't provided - LOG.debug("Message was dropped due to missing service name - '{}'", value); - // todo - we may consider adding a logging BurstFilter so that a bad actor cannot - // inadvertently swamp the system if we want to increase this logging level - // https://logging.apache.org/log4j/2.x/manual/filters.html#BurstFilter - meterRegistry - .counter(MESSAGES_DROPPED, getMeterTags("", MessageDropReason.MISSING_SERVICE_NAME)) - .increment(); - meterRegistry - .counter(BYTES_DROPPED, getMeterTags("", MessageDropReason.MISSING_SERVICE_NAME)) - .increment(bytes); - return false; - } - - for (DatasetMetadata datasetMetadata : throughputSortedDatasets) { - String serviceNamePattern = datasetMetadata.getServiceNamePattern(); - // back-compat since this is a new field - if (serviceNamePattern == null) { - serviceNamePattern = datasetMetadata.getName(); - } - if (serviceNamePattern.equals(MATCH_ALL_SERVICE) - || serviceNamePattern.equals(MATCH_STAR_SERVICE) - || serviceName.equals(serviceNamePattern)) { - RateLimiter rateLimiter = rateLimiterMap.get(datasetMetadata.getName()); - if (rateLimiter.tryAcquire(bytes)) { - return true; - } - // message should be dropped due to rate limit - meterRegistry - .counter(MESSAGES_DROPPED, getMeterTags(serviceName, MessageDropReason.OVER_LIMIT)) - .increment(); - meterRegistry - .counter(BYTES_DROPPED, getMeterTags(serviceName, MessageDropReason.OVER_LIMIT)) - .increment(bytes); - LOG.debug( - "Message was dropped for dataset '{}' due to rate limiting ({} bytes per second)", - serviceName, - rateLimiter.getRate()); - return false; - } - } - // message should be dropped due to no matching service name being provisioned - meterRegistry - .counter(MESSAGES_DROPPED, getMeterTags(serviceName, MessageDropReason.NOT_PROVISIONED)) - .increment(); - meterRegistry - .counter(BYTES_DROPPED, getMeterTags(serviceName, MessageDropReason.NOT_PROVISIONED)) - .increment(bytes); - return false; - }; - } - private static List getMeterTags(String serviceName, MessageDropReason reason) { return List.of(Tag.of("service", serviceName), Tag.of("reason", reason.toString())); } + + // we sort the datasets to rank from which dataset do we start matching candidate service names + // in the future we can change the ordering from sort to something else + public static List sortDatasetsOnThroughput( + List datasetMetadataList) { + return datasetMetadataList.stream() + .sorted(Comparator.comparingLong(DatasetMetadata::getThroughputBytes).reversed()) + .collect(Collectors.toList()); + } } diff --git a/astra/src/main/java/com/slack/astra/preprocessor/PreprocessorService.java b/astra/src/main/java/com/slack/astra/preprocessor/PreprocessorService.java deleted file mode 100644 index 9da15afccf..0000000000 --- a/astra/src/main/java/com/slack/astra/preprocessor/PreprocessorService.java +++ /dev/null @@ -1,323 +0,0 @@ -package com.slack.astra.preprocessor; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.slack.astra.server.AstraConfig.DEFAULT_START_STOP_DURATION; - -import com.google.common.util.concurrent.AbstractService; -import com.slack.astra.metadata.core.AstraMetadataStoreChangeListener; -import com.slack.astra.metadata.dataset.DatasetMetadata; -import com.slack.astra.metadata.dataset.DatasetMetadataStore; -import com.slack.astra.metadata.dataset.DatasetPartitionMetadata; -import com.slack.astra.proto.config.AstraConfigs; -import com.slack.astra.writer.KafkaUtils; -import com.slack.service.murron.trace.Trace; -import io.micrometer.core.instrument.MeterRegistry; -import io.micrometer.core.instrument.Timer; -import io.micrometer.core.instrument.binder.kafka.KafkaStreamsMetrics; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Properties; -import java.util.concurrent.locks.ReentrantLock; -import java.util.stream.Collectors; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.Topology; -import org.apache.kafka.streams.kstream.Consumed; -import org.apache.kafka.streams.kstream.Predicate; -import org.apache.kafka.streams.kstream.Produced; -import org.apache.kafka.streams.kstream.ValueMapper; -import org.apache.kafka.streams.processor.StreamPartitioner; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * The PreprocessorService consumes from multiple upstream topics, applies rate limiting, transforms - * the data format, and then writes out the new message to a common downstream topic targeting - * specific partitions. The rate limits, and target partitions are read in via the - * DatasetMetadataStore, with the upstream topic, downstream topic, and transforms stored in the - * dataset config. - * - *

Changes to the DatasetMetadata will cause the existing Kafka Stream topology to be closed, and - * this service will restart consumption with a new stream topology representing the newly updated - * metadata. This class implements a doStart/doStop similar to the AbstractIdleService, but by - * extending an AbstractService we also gain access to notifyFailed() in the event a subsequent load - * were to fail. - */ -public class PreprocessorService extends AbstractService { - private static final Logger LOG = LoggerFactory.getLogger(PreprocessorService.class); - private static final long MAX_TIME = Long.MAX_VALUE; - - public static final boolean INITIALIZE_RATE_LIMIT_WARM = true; - - private final DatasetMetadataStore datasetMetadataStore; - private final PreprocessorRateLimiter rateLimiter; - private final Properties kafkaProperties; - private final List upstreamTopics; - private final String downstreamTopic; - private final String dataTransformer; - private final MeterRegistry meterRegistry; - private final int kafkaPartitionStickyTimeoutMs; - - protected KafkaStreams kafkaStreams; - private KafkaStreamsMetrics kafkaStreamsMetrics; - - private final Timer configReloadTimer; - public static final String CONFIG_RELOAD_TIMER = "preprocessor_config_reload_timer"; - - private final AstraMetadataStoreChangeListener datasetListener = - (datasetMetadata) -> load(); - - private final ReentrantLock schemaLoadingLock = new ReentrantLock(); - - public PreprocessorService( - DatasetMetadataStore datasetMetadataStore, - AstraConfigs.PreprocessorConfig preprocessorConfig, - MeterRegistry meterRegistry) { - this.datasetMetadataStore = datasetMetadataStore; - this.meterRegistry = meterRegistry; - this.configReloadTimer = meterRegistry.timer(CONFIG_RELOAD_TIMER); - - this.kafkaProperties = makeKafkaStreamsProps(preprocessorConfig.getKafkaStreamConfig()); - this.downstreamTopic = preprocessorConfig.getDownstreamTopic(); - this.upstreamTopics = Collections.unmodifiableList(preprocessorConfig.getUpstreamTopicsList()); - this.dataTransformer = preprocessorConfig.getDataTransformer(); - this.rateLimiter = - new PreprocessorRateLimiter( - meterRegistry, - preprocessorConfig.getPreprocessorInstanceCount(), - preprocessorConfig.getRateLimiterMaxBurstSeconds(), - INITIALIZE_RATE_LIMIT_WARM); - this.kafkaPartitionStickyTimeoutMs = preprocessorConfig.getKafkaPartitionStickyTimeoutMs(); - } - - @Override - protected void doStart() { - try { - startUp(); - notifyStarted(); - } catch (Throwable t) { - notifyFailed(t); - } - } - - @Override - protected void doStop() { - try { - shutDown(); - notifyStopped(); - } catch (Throwable t) { - notifyFailed(t); - } - } - - private void startUp() { - LOG.info("Starting preprocessor service"); - load(); - datasetMetadataStore.addListener(datasetListener); - LOG.info("Preprocessor service started"); - } - - private void shutDown() { - LOG.info("Stopping preprocessor service"); - if (kafkaStreams != null) { - kafkaStreams.close(DEFAULT_START_STOP_DURATION); - } - if (kafkaStreamsMetrics != null) { - kafkaStreamsMetrics.close(); - } - datasetMetadataStore.removeListener(datasetListener); - LOG.info("Preprocessor service closed"); - } - - /** - * Configures and starts a KafkaStream processor, based off of the cached DatasetMetadataStore. - * This method is reentrant, and will restart any existing KafkaStream processors. Access to this - * must be guarded with ReentrantLock if using this method as part of a listener. - */ - public void load() { - schemaLoadingLock.lock(); - try { - Timer.Sample loadTimer = Timer.start(meterRegistry); - LOG.info("Loading new Kafka stream processor config"); - if (kafkaStreams != null) { - LOG.info("Closing existing Kafka stream processor"); - kafkaStreams.close(); - kafkaStreams.cleanUp(); - } - if (kafkaStreamsMetrics != null) { - kafkaStreamsMetrics.close(); - } - - // only attempt to register stream processing on valid dataset configurations - List datasetMetadataToProcesses = - filterValidDatasetMetadata(datasetMetadataStore.listSync()); - - if (datasetMetadataToProcesses.size() > 0) { - Topology topology = - buildTopology( - datasetMetadataToProcesses, - rateLimiter, - upstreamTopics, - downstreamTopic, - dataTransformer, - kafkaPartitionStickyTimeoutMs); - kafkaStreams = new KafkaStreams(topology, kafkaProperties); - kafkaStreamsMetrics = new KafkaStreamsMetrics(kafkaStreams); - kafkaStreamsMetrics.bindTo(meterRegistry); - kafkaStreams.setStateListener( - (newState, oldState) -> { - if (newState == KafkaStreams.State.ERROR) { - LOG.warn("Kafka stream has shutdown unexpectedly, notifying Guava service"); - notifyFailed( - new IllegalStateException("Unexpected error in Kafka stream application")); - } - }); - kafkaStreams.start(); - LOG.info("Kafka stream processor config loaded successfully"); - } else { - LOG.info( - "No valid dataset configurations found to process - will retry on next dataset configuration update"); - } - loadTimer.stop(configReloadTimer); - } catch (Exception e) { - notifyFailed(e); - } finally { - schemaLoadingLock.unlock(); - } - } - - /** - * Builds a KafkaStream Topology with multiple source topics, targeting a single sink. Applies - * rate limits per-dataset if required, and uses the dataset configured target topic sink and data - * transformer. - */ - protected static Topology buildTopology( - List datasetMetadataList, - PreprocessorRateLimiter rateLimiter, - List upstreamTopics, - String downstreamTopic, - String dataTransformer, - int kafkaPartitionStickyTimeoutMs) { - checkArgument(!datasetMetadataList.isEmpty(), "dataset metadata list must not be empty"); - checkArgument(upstreamTopics.size() > 0, "upstream topic list must not be empty"); - checkArgument(!downstreamTopic.isEmpty(), "downstream topic must not be empty"); - checkArgument(!dataTransformer.isEmpty(), "data transformer must not be empty"); - checkArgument(kafkaPartitionStickyTimeoutMs >= 0, "kafkaPartitionStickyTimeoutMs must be >=0"); - - StreamsBuilder builder = new StreamsBuilder(); - - ValueMapper> valueMapper = - PreprocessorValueMapper.byteArrayToTraceSpans(dataTransformer); - - StreamPartitioner streamPartitioner = - new PreprocessorPartitioner<>(datasetMetadataList, kafkaPartitionStickyTimeoutMs); - - Predicate rateLimitPredicate = - rateLimiter.createRateLimiter(datasetMetadataList); - - upstreamTopics.forEach( - upstreamTopic -> - builder.stream(upstreamTopic, Consumed.with(Serdes.String(), Serdes.ByteArray())) - .flatMapValues(valueMapper) - .filter(rateLimitPredicate) - .peek( - (key, span) -> - LOG.debug( - "Processed span {} from topic {} to topic {}", - span, - upstreamTopic, - downstreamTopic)) - .to( - downstreamTopic, - Produced.with(Serdes.String(), AstraSerdes.TraceSpan(), streamPartitioner))); - - return builder.build(); - } - - /** - * Filters the provided list of dataset metadata to those that are valid. This includes correctly - * defined throughput and partition configurations. - */ - public static List filterValidDatasetMetadata( - List datasetMetadataList) { - return datasetMetadataList.stream() - .filter(datasetMetadata -> datasetMetadata.getThroughputBytes() > 0) - .filter(datasetMetadata -> getActivePartitionList(datasetMetadata).size() > 0) - .collect(Collectors.toList()); - } - - /** Gets the active list of partitions from the provided dataset metadata */ - public static List getActivePartitionList(DatasetMetadata datasetMetadata) { - Optional datasetPartitionMetadata = - datasetMetadata.getPartitionConfigs().stream() - .filter(partitionMetadata -> partitionMetadata.getEndTimeEpochMs() == MAX_TIME) - .findFirst(); - - if (datasetPartitionMetadata.isEmpty()) { - return Collections.emptyList(); - } - return datasetPartitionMetadata.get().getPartitions().stream() - .map(Integer::parseInt) - .collect(Collectors.toUnmodifiableList()); - } - - // we sort the datasets to rank from which dataset do we start matching candidate service names - // in the future we can change the ordering from sort to something else - public static List sortDatasetsOnThroughput( - List datasetMetadataList) { - return datasetMetadataList.stream() - .sorted(Comparator.comparingLong(DatasetMetadata::getThroughputBytes).reversed()) - .collect(Collectors.toList()); - } - - /** Builds a Properties hashtable using the provided config, and sensible defaults */ - protected static Properties makeKafkaStreamsProps( - AstraConfigs.PreprocessorConfig.KafkaStreamConfig kafkaStreamConfig) { - checkArgument( - !kafkaStreamConfig.getApplicationId().isEmpty(), - "Kafka stream applicationId must be provided"); - checkArgument( - !kafkaStreamConfig.getBootstrapServers().isEmpty(), - "Kafka stream bootstrapServers must be provided"); - checkArgument( - kafkaStreamConfig.getNumStreamThreads() > 0, - "Kafka stream numStreamThreads must be greater than 0"); - - Properties props = new Properties(); - props.put(StreamsConfig.APPLICATION_ID_CONFIG, kafkaStreamConfig.getApplicationId()); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaStreamConfig.getBootstrapServers()); - props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, kafkaStreamConfig.getNumStreamThreads()); - - // These props allow using brokers versions back to 2.0, by reverting breaking changes - // introduced in the client versions 3.0+ - // https://www.confluent.io/blog/apache-kafka-3-0-major-improvements-and-new-features/ - - // https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177050750 - props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1); - - // https://cwiki.apache.org/confluence/display/KAFKA/KIP-679%3A+Producer+will+enable+the+strongest+delivery+guarantee+by+default - props.put(StreamsConfig.producerPrefix(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG), false); - props.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), "1"); - - // This will allow parallel processing up to the amount of upstream partitions. You cannot have - // more threads than you have upstreams due to how the work is partitioned - props.put( - StreamsConfig.PROCESSING_GUARANTEE_CONFIG, kafkaStreamConfig.getProcessingGuarantee()); - - // don't override any property we already set - for (Map.Entry additionalProp : - kafkaStreamConfig.getAdditionalPropsMap().entrySet()) { - props = - KafkaUtils.maybeOverrideProps( - props, additionalProp.getKey(), additionalProp.getValue(), false); - } - - return props; - } -} diff --git a/astra/src/main/java/com/slack/astra/preprocessor/PreprocessorValueMapper.java b/astra/src/main/java/com/slack/astra/preprocessor/PreprocessorValueMapper.java deleted file mode 100644 index 2c35e09659..0000000000 --- a/astra/src/main/java/com/slack/astra/preprocessor/PreprocessorValueMapper.java +++ /dev/null @@ -1,102 +0,0 @@ -package com.slack.astra.preprocessor; - -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableMap; -import com.slack.astra.writer.JsonLogFormatter; -import com.slack.astra.writer.MurronLogFormatter; -import com.slack.astra.writer.SpanFormatter; -import com.slack.service.murron.Murron; -import com.slack.service.murron.trace.Trace; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.streams.kstream.ValueMapper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * KafkaStream value mappers for transforming upstream byte array messages into Iterable - */ -public class PreprocessorValueMapper { - private static final Logger LOG = LoggerFactory.getLogger(PreprocessorValueMapper.class); - - private static Deserializer murronMessageDeserializer = - AstraSerdes.MurronMurronMessage().deserializer(); - - @FunctionalInterface - private interface MessageTransformer { - List toTraceSpans(byte[] record) throws Exception; - } - - // An apiLog message is a json blob wrapped in a murron message. - public static final MessageTransformer apiLogTransformer = - record -> { - final Murron.MurronMessage murronMsg = murronMessageDeserializer.deserialize("", record); - return List.of(MurronLogFormatter.fromApiLog(murronMsg)); - }; - - // An envoy log message is a json blob wrapped in a murron message. - public static final MessageTransformer envoyLogTransformer = - record -> { - final Murron.MurronMessage murronMsg = murronMessageDeserializer.deserialize("", record); - return List.of(MurronLogFormatter.fromEnvoyLog(murronMsg)); - }; - - // JSON log - public static final MessageTransformer jsonLogTransformer = - record -> List.of(JsonLogFormatter.fromJsonLog(record)); - - // A single trace record consists of a list of spans wrapped in a murron message. - public static final MessageTransformer spanTransformer = - record -> { - Murron.MurronMessage murronMsg = murronMessageDeserializer.deserialize("", record); - return SpanFormatter.fromMurronMessage(murronMsg).getSpansList(); - }; - - // todo - add a json blob transformer, ie LogMessageWriterImpl.jsonLogMessageTransformer - - private static final Map PRE_PROCESSOR_DATA_TRANSFORMER_MAP = - ImmutableMap.of( - "api_log", - apiLogTransformer, - "spans", - spanTransformer, - "envoy_log", - envoyLogTransformer, - "json", - jsonLogTransformer); - - /** Span key for KeyValue pair to use as the service name */ - public static String SERVICE_NAME_KEY = "service_name"; - - /** - * Helper method to extract the service name from a Span - * - *

todo - consider putting the service name into a top-level Trace.Span property - */ - public static String getServiceName(Trace.Span span) { - return span.getTagsList().stream() - .filter(tag -> tag.getKey().equals(SERVICE_NAME_KEY)) - .map(Trace.KeyValue::getVStr) - .findFirst() - .orElse(null); - } - - /** KafkaStream ValueMapper for transforming upstream sources to target Trace.ListOfSpans */ - public static ValueMapper> byteArrayToTraceSpans( - String dataTransformer) { - Preconditions.checkArgument( - PRE_PROCESSOR_DATA_TRANSFORMER_MAP.containsKey(dataTransformer), - "Invalid data transformer provided, must be one of %s", - PRE_PROCESSOR_DATA_TRANSFORMER_MAP.toString()); - return messageBytes -> { - try { - return PRE_PROCESSOR_DATA_TRANSFORMER_MAP.get(dataTransformer).toTraceSpans(messageBytes); - } catch (Exception e) { - LOG.error("Error converting byte array to Trace.ListOfSpans", e); - return Collections.emptyList(); - } - }; - } -} diff --git a/astra/src/main/java/com/slack/astra/server/Astra.java b/astra/src/main/java/com/slack/astra/server/Astra.java index 87a4b092a1..d788d482a9 100644 --- a/astra/src/main/java/com/slack/astra/server/Astra.java +++ b/astra/src/main/java/com/slack/astra/server/Astra.java @@ -35,7 +35,6 @@ import com.slack.astra.metadata.schema.SchemaUtil; import com.slack.astra.metadata.search.SearchMetadataStore; import com.slack.astra.metadata.snapshot.SnapshotMetadataStore; -import com.slack.astra.preprocessor.PreprocessorService; import com.slack.astra.proto.config.AstraConfigs; import com.slack.astra.proto.metadata.Metadata; import com.slack.astra.proto.schema.Schema; @@ -393,36 +392,30 @@ private static Set getServices( new CloseableLifecycleManager( AstraConfigs.NodeRole.PREPROCESSOR, List.of(datasetMetadataStore))); - if (preprocessorConfig.getUseBulkApi()) { - BulkIngestKafkaProducer bulkIngestKafkaProducer = - new BulkIngestKafkaProducer(datasetMetadataStore, preprocessorConfig, meterRegistry); - services.add(bulkIngestKafkaProducer); - DatasetRateLimitingService datasetRateLimitingService = - new DatasetRateLimitingService(datasetMetadataStore, preprocessorConfig, meterRegistry); - services.add(datasetRateLimitingService); - - Schema.IngestSchema schema = Schema.IngestSchema.getDefaultInstance(); - if (!preprocessorConfig.getSchemaFile().isEmpty()) { - LOG.info("Loading schema file: {}", preprocessorConfig.getSchemaFile()); - schema = SchemaUtil.parseSchema(Path.of(preprocessorConfig.getSchemaFile())); - LOG.info("Loaded schema with total fields: {}", schema.getFieldsCount()); - } else { - LOG.info("No schema file provided, using default schema"); - } - schema = ReservedFields.addPredefinedFields(schema); - BulkIngestApi openSearchBulkApiService = - new BulkIngestApi( - bulkIngestKafkaProducer, - datasetRateLimitingService, - meterRegistry, - preprocessorConfig.getRateLimitExceededErrorCode(), - schema); - armeriaServiceBuilder.withAnnotatedService(openSearchBulkApiService); + BulkIngestKafkaProducer bulkIngestKafkaProducer = + new BulkIngestKafkaProducer(datasetMetadataStore, preprocessorConfig, meterRegistry); + services.add(bulkIngestKafkaProducer); + DatasetRateLimitingService datasetRateLimitingService = + new DatasetRateLimitingService(datasetMetadataStore, preprocessorConfig, meterRegistry); + services.add(datasetRateLimitingService); + + Schema.IngestSchema schema = Schema.IngestSchema.getDefaultInstance(); + if (!preprocessorConfig.getSchemaFile().isEmpty()) { + LOG.info("Loading schema file: {}", preprocessorConfig.getSchemaFile()); + schema = SchemaUtil.parseSchema(Path.of(preprocessorConfig.getSchemaFile())); + LOG.info("Loaded schema with total fields: {}", schema.getFieldsCount()); } else { - PreprocessorService preprocessorService = - new PreprocessorService(datasetMetadataStore, preprocessorConfig, meterRegistry); - services.add(preprocessorService); + LOG.info("No schema file provided, using default schema"); } + schema = ReservedFields.addPredefinedFields(schema); + BulkIngestApi openSearchBulkApiService = + new BulkIngestApi( + bulkIngestKafkaProducer, + datasetRateLimitingService, + meterRegistry, + preprocessorConfig.getRateLimitExceededErrorCode(), + schema); + armeriaServiceBuilder.withAnnotatedService(openSearchBulkApiService); services.add(armeriaServiceBuilder.build()); } diff --git a/astra/src/main/java/com/slack/astra/writer/JsonLogFormatter.java b/astra/src/main/java/com/slack/astra/writer/JsonLogFormatter.java deleted file mode 100644 index 618ac6f01a..0000000000 --- a/astra/src/main/java/com/slack/astra/writer/JsonLogFormatter.java +++ /dev/null @@ -1,59 +0,0 @@ -package com.slack.astra.writer; - -import com.fasterxml.jackson.core.type.TypeReference; -import com.slack.astra.logstore.LogMessage; -import com.slack.astra.util.JsonUtil; -import com.slack.service.murron.trace.Trace; -import java.io.IOException; -import java.time.Instant; -import java.util.Map; -import java.util.Optional; -import java.util.UUID; - -/* - Utility classes to take a byte array with a Json payload and converts to a Trace.Span. - If the Json does not have an "id" we generate a random UUID. - The payload Json is expected to contain a "service_name" and a "@timestamp" field otherwise the document will fail to index -*/ -public class JsonLogFormatter { - - public static Trace.Span fromJsonLog(byte[] data) throws IOException { - TypeReference> mapTypeRef = new TypeReference<>() {}; - Map jsonMsgMap = JsonUtil.read(data, mapTypeRef); - - String id = - (String) jsonMsgMap.getOrDefault(MurronLogFormatter.ID, UUID.randomUUID().toString()); - - String serviceName = - (String) jsonMsgMap.getOrDefault(LogMessage.ReservedField.SERVICE_NAME.fieldName, ""); - if (serviceName == null || serviceName.isEmpty()) { - throw new IllegalArgumentException("Document must contain service_name key"); - } - String name = - (String) jsonMsgMap.getOrDefault(LogMessage.ReservedField.NAME.fieldName, serviceName); - - long duration = - Long.parseLong( - String.valueOf( - jsonMsgMap.getOrDefault(LogMessage.ReservedField.DURATION_MS.fieldName, "1"))); - - String dateStr = (String) jsonMsgMap.getOrDefault("@timestamp", ""); - if (dateStr == null || dateStr.isEmpty()) { - throw new IllegalArgumentException("Document must contain timestamp key"); - } - long timestamp = Instant.parse(dateStr).toEpochMilli(); - - String traceId = (String) jsonMsgMap.get(LogMessage.ReservedField.TRACE_ID.fieldName); - String host = (String) jsonMsgMap.get(LogMessage.ReservedField.HOSTNAME.fieldName); - - return SpanFormatter.toSpan( - jsonMsgMap, - id, - name, - name, - timestamp, - duration, - Optional.ofNullable(host), - Optional.ofNullable(traceId)); - } -} diff --git a/astra/src/main/java/com/slack/astra/writer/MurronLogFormatter.java b/astra/src/main/java/com/slack/astra/writer/MurronLogFormatter.java deleted file mode 100644 index 13c2b7d841..0000000000 --- a/astra/src/main/java/com/slack/astra/writer/MurronLogFormatter.java +++ /dev/null @@ -1,93 +0,0 @@ -package com.slack.astra.writer; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.core.type.TypeReference; -import com.google.common.collect.ImmutableSet; -import com.slack.astra.logstore.LogMessage; -import com.slack.astra.util.JsonUtil; -import com.slack.service.murron.Murron; -import com.slack.service.murron.trace.Trace; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class MurronLogFormatter { - public static final Logger LOG = LoggerFactory.getLogger(MurronLogFormatter.class); - - public static final String API_LOG_DURATION_FIELD = "microtime_elapsed"; - public static final String ENVOY_DURATION_FIELD = "duration"; - public static final String ENVOY_REQUEST_ID = "request_id"; - public static final String ID = "id"; - protected static final Set nonTagFields = - ImmutableSet.of( - LogMessage.ReservedField.PARENT_ID.fieldName, - LogMessage.ReservedField.TRACE_ID.fieldName, - API_LOG_DURATION_FIELD); - private static final String TYPE_TAG = "type"; - - public static Trace.Span fromEnvoyLog(Murron.MurronMessage murronMsg) - throws JsonProcessingException { - return toSpan( - murronMsg, TYPE_TAG, ENVOY_DURATION_FIELD, 1000, ENVOY_REQUEST_ID, ENVOY_REQUEST_ID); - } - - public static Trace.Span fromApiLog(Murron.MurronMessage murronMsg) - throws JsonProcessingException { - return toSpan( - murronMsg, - TYPE_TAG, - API_LOG_DURATION_FIELD, - 1, - LogMessage.ReservedField.TRACE_ID.fieldName, - ""); - } - - private static Trace.Span toSpan( - Murron.MurronMessage murronMsg, - String typeTag, - String durationField, - int durationTimeMuiltiplier, - String traceIdFieldName, - String idField) - throws JsonProcessingException { - if (murronMsg == null) return null; - - LOG.trace( - "{} {} {} {}", - murronMsg.getTimestamp(), - murronMsg.getHost(), - murronMsg.getType(), - murronMsg.getMessage().toStringUtf8()); - - TypeReference> mapTypeRef = new TypeReference<>() {}; - Map jsonMsgMap = - JsonUtil.read(murronMsg.getMessage().toStringUtf8(), mapTypeRef); - - String name = (String) jsonMsgMap.getOrDefault(typeTag, murronMsg.getType()); - - String id = ""; - if (!idField.isEmpty()) { - id = (String) jsonMsgMap.getOrDefault(idField, ""); - } - if (id.isEmpty()) { - id = murronMsg.getHost() + ":" + murronMsg.getPid() + ":" + murronMsg.getOffset(); - } - long timestamp = murronMsg.getTimestamp() / 1000; - long duration = - Long.parseLong(String.valueOf(jsonMsgMap.getOrDefault(durationField, "1"))) - * durationTimeMuiltiplier; - String traceId = (String) jsonMsgMap.get(traceIdFieldName); - - return SpanFormatter.toSpan( - jsonMsgMap, - id, - name, - murronMsg.getType(), - timestamp, - duration, - Optional.of(murronMsg.getHost()), - Optional.ofNullable(traceId)); - } -} diff --git a/astra/src/main/java/com/slack/astra/writer/SpanFormatter.java b/astra/src/main/java/com/slack/astra/writer/SpanFormatter.java index 419d0c9fac..b0ef67151b 100644 --- a/astra/src/main/java/com/slack/astra/writer/SpanFormatter.java +++ b/astra/src/main/java/com/slack/astra/writer/SpanFormatter.java @@ -1,18 +1,14 @@ package com.slack.astra.writer; import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.Timestamp; -import com.slack.astra.logstore.LogMessage; import com.slack.astra.proto.schema.Schema; -import com.slack.service.murron.Murron; import com.slack.service.murron.trace.Trace; import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Optional; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,74 +20,6 @@ public class SpanFormatter { public static final String DEFAULT_LOG_MESSAGE_TYPE = "INFO"; public static final String DEFAULT_INDEX_NAME = "unknown"; - // TODO: Take duration unit as input. - // TODO: Take a generic field mapping dictionary as input for fields. - public static Trace.Span toSpan( - Map jsonMsgMap, - String id, - String name, - String serviceName, - long timestamp, - long duration, - Optional host, - Optional traceId) { - - Trace.Span.Builder spanBuilder = Trace.Span.newBuilder(); - - spanBuilder.setName(name); - - String parentId = - (String) jsonMsgMap.getOrDefault(LogMessage.ReservedField.PARENT_ID.fieldName, ""); - spanBuilder.setParentId(ByteString.copyFrom(parentId.getBytes())); - - traceId.ifPresent(s -> spanBuilder.setTraceId(ByteString.copyFromUtf8(s))); - spanBuilder.setTimestamp(timestamp); - spanBuilder.setDuration(duration); - - List tags = new ArrayList<>(jsonMsgMap.size()); - for (Map.Entry entry : jsonMsgMap.entrySet()) { - String key = entry.getKey(); - if (MurronLogFormatter.nonTagFields.contains(key)) { - continue; - } - tags.add(convertKVtoProto(key, entry.getValue())); - } - - // Add missing fields from murron message. - boolean containsHostName = - tags.stream() - .anyMatch( - keyValue -> keyValue.getKey().equals(LogMessage.ReservedField.HOSTNAME.fieldName)); - if (!containsHostName) { - host.ifPresent( - s -> - tags.add( - Trace.KeyValue.newBuilder() - .setKey(LogMessage.ReservedField.HOSTNAME.fieldName) - .setFieldType(Schema.SchemaFieldType.KEYWORD) - .setVStr(s) - .build())); - } - - spanBuilder.setId(ByteString.copyFrom(id.getBytes())); - - boolean containsServiceName = - tags.stream() - .anyMatch( - keyValue -> - keyValue.getKey().equals(LogMessage.ReservedField.SERVICE_NAME.fieldName)); - if (!containsServiceName) { - tags.add( - Trace.KeyValue.newBuilder() - .setKey(LogMessage.ReservedField.SERVICE_NAME.fieldName) - .setFieldType(Schema.SchemaFieldType.KEYWORD) - .setVStr(serviceName) - .build()); - } - spanBuilder.addAllTags(tags); - return spanBuilder.build(); - } - public static Timestamp parseDate(String dateStr, Schema.SchemaFieldType type) { Instant instant; try { @@ -242,11 +170,6 @@ private static Trace.KeyValue convertKVtoProto(String key, Object value) { return tagBuilder.build(); } - public static Trace.ListOfSpans fromMurronMessage(Murron.MurronMessage message) - throws InvalidProtocolBufferException { - return Trace.ListOfSpans.parseFrom(message.getMessage()); - } - /** * Determines if provided timestamp is a reasonable value, or is too far in the past/future for * use. This can happen when using user-provided timestamp (such as on a mobile client). diff --git a/astra/src/main/proto/astra_configs.proto b/astra/src/main/proto/astra_configs.proto index 5496261efc..ccb3b7965c 100644 --- a/astra/src/main/proto/astra_configs.proto +++ b/astra/src/main/proto/astra_configs.proto @@ -223,35 +223,7 @@ message RecoveryConfig { // Config for the preprocessor node. message PreprocessorConfig { - // Configuration for the kafka stream processor - message KafkaStreamConfig { - string bootstrap_servers = 1; - // An identifier for the stream processing application. Must be unique within the Kafka cluster - string application_id = 2; - // The number of threads to execute stream processing - int32 num_stream_threads = 3; - // This will allow parallel processing up to the amount of upstream partitions. You cannot have - // more threads than you have upstreams due to how the work is partitioned. E - string processing_guarantee = 4; - // any additional kafka props that we will add when creating the KafkaStreams object - // There are three sets of configs under this - // 1. StreamsConfig - // 2. ProducerConfig - These properties must be prefixed with "producer." - // 2. ConsumerConfig - These properties must be prefixed with "consumer." - map additional_props = 5; - } - ServerConfig server_config = 1; - KafkaStreamConfig kafka_stream_config = 2; - - // Upstream topics to consume from - repeated string upstream_topics = 3; - - // Downstream topic to write to - string downstream_topic = 4; - - // Name of the data transformation pipeline to use when ingesting the data. - string data_transformer = 5; // The number of preprocessor instances // Used for calculating target throughput per instance @@ -260,15 +232,7 @@ message PreprocessorConfig { // Amount of time in seconds the rate limiter can burst int32 rate_limiter_max_burst_seconds = 7; - // used to route all preprocessor data produced to the same kafka partition for the duration of time provided - // value must be more than 0 - // more docs on PreprocessorPartitioner#getDatasetPartitionSuppliers - int32 kafka_partition_sticky_timeout_ms = 8; - - // Kafka config needs to be set if the bulk API is used to bootstrap the producer kafka - // we plan on moving everything to the bulk API and removing KafkaStreamConfig in the future KafkaConfig kafka_config = 9; - bool use_bulk_api = 10; // Make the rate limit exceeded error code configurable // We default to 400 to prioritize fresh logs and drop excess logs diff --git a/astra/src/test/java/com/slack/astra/bulkIngestApi/BulkIngestKafkaProducerTest.java b/astra/src/test/java/com/slack/astra/bulkIngestApi/BulkIngestKafkaProducerTest.java index 2b6bcd79fd..1e26958af0 100644 --- a/astra/src/test/java/com/slack/astra/bulkIngestApi/BulkIngestKafkaProducerTest.java +++ b/astra/src/test/java/com/slack/astra/bulkIngestApi/BulkIngestKafkaProducerTest.java @@ -85,7 +85,6 @@ public void bootstrapCluster() throws Exception { preprocessorConfig = AstraConfigs.PreprocessorConfig.newBuilder() .setKafkaConfig(kafkaConfig) - .setUseBulkApi(true) .setServerConfig(serverConfig) .setPreprocessorInstanceCount(1) .setRateLimiterMaxBurstSeconds(1) diff --git a/astra/src/test/java/com/slack/astra/preprocessor/AstraSerdesTest.java b/astra/src/test/java/com/slack/astra/preprocessor/AstraSerdesTest.java deleted file mode 100644 index ef83214787..0000000000 --- a/astra/src/test/java/com/slack/astra/preprocessor/AstraSerdesTest.java +++ /dev/null @@ -1,158 +0,0 @@ -package com.slack.astra.preprocessor; - -import static org.assertj.core.api.Assertions.assertThat; - -import com.google.protobuf.ByteString; -import com.slack.service.murron.Murron; -import com.slack.service.murron.trace.Trace; -import java.time.Instant; -import org.apache.kafka.common.serialization.Serde; -import org.junit.jupiter.api.Test; - -public class AstraSerdesTest { - - @Test - public void shouldSerializeAndDeserializeMurronMessage() { - Serde serdes = AstraSerdes.MurronMurronMessage(); - - String topic = "topic"; - String message = "hello message"; - String host = "host"; - String type = "type"; - long timestamp = Instant.now().toEpochMilli(); - - Murron.MurronMessage messageToSerialize = - Murron.MurronMessage.newBuilder() - .setMessage(ByteString.copyFromUtf8(message)) - .setHost(host) - .setType(type) - .setTimestamp(timestamp) - .build(); - - byte[] serializedBytes = serdes.serializer().serialize(topic, messageToSerialize); - Murron.MurronMessage deserializedMessage = - serdes.deserializer().deserialize(topic, serializedBytes); - - assertThat(deserializedMessage.getMessage()).isEqualTo(ByteString.copyFromUtf8(message)); - assertThat(deserializedMessage.getHost()).isEqualTo(host); - assertThat(deserializedMessage.getType()).isEqualTo(type); - assertThat(deserializedMessage.getTimestamp()).isEqualTo(timestamp); - } - - @Test - public void shouldSerializeAndDeserializeMurronMessageNullsCorrectly() { - Serde serdes = AstraSerdes.MurronMurronMessage(); - - byte[] serializedBytes = serdes.serializer().serialize("topic", null); - assertThat(serializedBytes).isNull(); - - Murron.MurronMessage deserializedMessage = serdes.deserializer().deserialize("topic", null); - assertThat(deserializedMessage).isNull(); - } - - @Test - public void shouldSerializeAndDeserializeTraceListOfSpans() { - Serde serdes = AstraSerdes.TraceListOfSpans(); - - String topic = "topic"; - String id = "id"; - String traceId = "traceId"; - String name = "name"; - long timestamp = Instant.now().toEpochMilli() * 1000; - long duration = 10; - Trace.Span span = - Trace.Span.newBuilder() - .setId(ByteString.copyFromUtf8(id)) - .setTraceId(ByteString.copyFromUtf8(traceId)) - .setName(name) - .setTimestamp(timestamp) - .setDuration(duration) - .build(); - Trace.ListOfSpans listOfSpans = Trace.ListOfSpans.newBuilder().addSpans(span).build(); - - byte[] serializedBytes = serdes.serializer().serialize(topic, listOfSpans); - Trace.ListOfSpans deserializedMessage = - serdes.deserializer().deserialize(topic, serializedBytes); - - assertThat(deserializedMessage.getSpansList().size()).isEqualTo(1); - assertThat(deserializedMessage.getTagsList().size()).isEqualTo(0); - - assertThat(deserializedMessage.getSpansList().get(0).getId()) - .isEqualTo(ByteString.copyFromUtf8(id)); - assertThat(deserializedMessage.getSpansList().get(0).getTraceId()) - .isEqualTo(ByteString.copyFromUtf8(traceId)); - assertThat(deserializedMessage.getSpansList().get(0).getName()).isEqualTo(name); - assertThat(deserializedMessage.getSpansList().get(0).getTimestamp()).isEqualTo(timestamp); - assertThat(deserializedMessage.getSpansList().get(0).getDuration()).isEqualTo(duration); - assertThat(deserializedMessage.getSpansList().get(0).getTagsList().size()).isEqualTo(0); - } - - @Test - public void shouldSerializeAndDeserializeTraceListOfSpansNullsCorrectly() { - Serde serdes = AstraSerdes.TraceListOfSpans(); - - byte[] serializedBytes = serdes.serializer().serialize("topic", null); - assertThat(serializedBytes).isNull(); - - Trace.ListOfSpans deserializedMessage = serdes.deserializer().deserialize("topic", null); - assertThat(deserializedMessage).isNull(); - } - - @Test - public void shouldSerializeAndDeserializeTraceSpan() { - Serde serdes = AstraSerdes.TraceSpan(); - - String topic = "topic"; - String id = "id"; - String traceId = "traceId"; - String name = "name"; - long timestamp = Instant.now().toEpochMilli() * 1000; - long duration = 10; - Trace.Span span = - Trace.Span.newBuilder() - .setId(ByteString.copyFromUtf8(id)) - .setTraceId(ByteString.copyFromUtf8(traceId)) - .setName(name) - .setTimestamp(timestamp) - .setDuration(duration) - .build(); - - byte[] serializedBytes = serdes.serializer().serialize(topic, span); - Trace.Span deserializedMessage = serdes.deserializer().deserialize(topic, serializedBytes); - - assertThat(deserializedMessage.getId()).isEqualTo(ByteString.copyFromUtf8(id)); - assertThat(deserializedMessage.getTraceId()).isEqualTo(ByteString.copyFromUtf8(traceId)); - assertThat(deserializedMessage.getName()).isEqualTo(name); - assertThat(deserializedMessage.getTimestamp()).isEqualTo(timestamp); - assertThat(deserializedMessage.getDuration()).isEqualTo(duration); - assertThat(deserializedMessage.getTagsList().size()).isEqualTo(0); - } - - @Test - public void shouldSerializeAndDeserializeTraceSpanNullsCorrectly() { - Serde serdes = AstraSerdes.TraceSpan(); - - byte[] serializedBytes = serdes.serializer().serialize("topic", null); - assertThat(serializedBytes).isNull(); - - Trace.Span deserializedMessage = serdes.deserializer().deserialize("topic", null); - assertThat(deserializedMessage).isNull(); - } - - @Test - public void shouldGracefullyHandleWrongMessageTypes() { - byte[] malformedData = "malformed data".getBytes(); - - Trace.Span deserializedTraceSpan = - AstraSerdes.TraceSpan().deserializer().deserialize("topic", malformedData); - assertThat(deserializedTraceSpan).isEqualTo(null); - - Trace.ListOfSpans deserializedTraceListOfSpans = - AstraSerdes.TraceListOfSpans().deserializer().deserialize("topic", malformedData); - assertThat(deserializedTraceListOfSpans).isEqualTo(null); - - Murron.MurronMessage deserializedMurronMessage = - AstraSerdes.MurronMurronMessage().deserializer().deserialize("topic", malformedData); - assertThat(deserializedMurronMessage).isEqualTo(null); - } -} diff --git a/astra/src/test/java/com/slack/astra/preprocessor/PreprocessorBulkRateLimiterTest.java b/astra/src/test/java/com/slack/astra/preprocessor/PreprocessorBulkRateLimiterTest.java deleted file mode 100644 index 632a3a1240..0000000000 --- a/astra/src/test/java/com/slack/astra/preprocessor/PreprocessorBulkRateLimiterTest.java +++ /dev/null @@ -1,448 +0,0 @@ -package com.slack.astra.preprocessor; - -import static com.slack.astra.preprocessor.PreprocessorRateLimiter.BYTES_DROPPED; -import static com.slack.astra.preprocessor.PreprocessorRateLimiter.MESSAGES_DROPPED; -import static com.slack.astra.preprocessor.PreprocessorRateLimiter.getSpanBytes; -import static com.slack.astra.preprocessor.PreprocessorValueMapper.SERVICE_NAME_KEY; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; - -import com.slack.astra.metadata.dataset.DatasetMetadata; -import com.slack.astra.metadata.dataset.DatasetPartitionMetadata; -import com.slack.service.murron.trace.Trace; -import io.micrometer.core.instrument.MeterRegistry; -import io.micrometer.core.instrument.simple.SimpleMeterRegistry; -import java.util.List; -import java.util.function.BiPredicate; -import org.apache.kafka.streams.kstream.Predicate; -import org.junit.jupiter.api.Test; - -public class PreprocessorBulkRateLimiterTest { - - @Test - public void shouldApplyScaledRateLimit() throws InterruptedException { - MeterRegistry meterRegistry = new SimpleMeterRegistry(); - int preprocessorCount = 2; - int maxBurstSeconds = 1; - boolean initializeWarm = false; - PreprocessorRateLimiter rateLimiter = - new PreprocessorRateLimiter( - meterRegistry, preprocessorCount, maxBurstSeconds, initializeWarm); - - String name = "rateLimiter"; - Trace.Span span1 = - Trace.Span.newBuilder() - .addTags(Trace.KeyValue.newBuilder().setKey(SERVICE_NAME_KEY).setVStr(name).build()) - .build(); - - Trace.Span span2 = - Trace.Span.newBuilder() - .addTags(Trace.KeyValue.newBuilder().setKey(SERVICE_NAME_KEY).setVStr(name).build()) - .build(); - - List spans = List.of(span1, span2); - - // set the target so that we pass the first add, then fail the second - long targetThroughput = ((long) getSpanBytes(spans) * preprocessorCount) + 1; - DatasetMetadata datasetMetadata = - new DatasetMetadata( - name, - name, - targetThroughput, - List.of(new DatasetPartitionMetadata(100, 200, List.of("0"))), - name); - BiPredicate> predicate = - rateLimiter.createBulkIngestRateLimiter(List.of(datasetMetadata)); - - // try to get just below the scaled limit, then try to go over - assertThat(predicate.test(name, spans)).isTrue(); - assertThat(predicate.test(name, spans)).isFalse(); - - // rate limit is targetThroughput per second, so 1 second should refill our limit - Thread.sleep(1000); - - // try to get just below the scaled limit, then try to go over - assertThat(predicate.test(name, spans)).isTrue(); - assertThat(predicate.test(name, spans)).isFalse(); - - assertThat( - meterRegistry - .get(MESSAGES_DROPPED) - .tag("reason", String.valueOf(PreprocessorRateLimiter.MessageDropReason.OVER_LIMIT)) - .counter() - .count()) - .isEqualTo(2); - assertThat( - meterRegistry - .get(BYTES_DROPPED) - .tag("reason", String.valueOf(PreprocessorRateLimiter.MessageDropReason.OVER_LIMIT)) - .counter() - .count()) - .isEqualTo(getSpanBytes(spans) * 2); - } - - @Test - public void shouldApplyScaledRateLimitWithAllServices() throws InterruptedException { - MeterRegistry meterRegistry = new SimpleMeterRegistry(); - int preprocessorCount = 2; - int maxBurstSeconds = 1; - boolean initializeWarm = false; - PreprocessorRateLimiter rateLimiter = - new PreprocessorRateLimiter( - meterRegistry, preprocessorCount, maxBurstSeconds, initializeWarm); - - String name = "rateLimiter"; - Trace.Span span = - Trace.Span.newBuilder() - .addTags(Trace.KeyValue.newBuilder().setKey(SERVICE_NAME_KEY).setVStr(name).build()) - .build(); - - // set the target so that we pass the first add, then fail the second - long targetThroughput = ((long) span.toByteArray().length * preprocessorCount) + 1; - - // Check if _all works - DatasetMetadata datasetMetadata = - new DatasetMetadata( - name, - name, - targetThroughput, - List.of(new DatasetPartitionMetadata(100, 200, List.of("0"))), - DatasetMetadata.MATCH_ALL_SERVICE); - BiPredicate> predicate = - rateLimiter.createBulkIngestRateLimiter(List.of(datasetMetadata)); - - // try to get just below the scaled limit, then try to go over - assertThat(predicate.test(name, List.of(span))).isTrue(); - assertThat(predicate.test(name, List.of(span))).isFalse(); - - // rate limit is targetThroughput per second, so 1 second should refill our limit - Thread.sleep(1000); - - // try to get just below the scaled limit, then try to go over - assertThat(predicate.test(name, List.of(span))).isTrue(); - assertThat(predicate.test(name, List.of(span))).isFalse(); - - assertThat( - meterRegistry - .get(MESSAGES_DROPPED) - .tag("reason", String.valueOf(PreprocessorRateLimiter.MessageDropReason.OVER_LIMIT)) - .counter() - .count()) - .isEqualTo(2); - assertThat( - meterRegistry - .get(BYTES_DROPPED) - .tag("reason", String.valueOf(PreprocessorRateLimiter.MessageDropReason.OVER_LIMIT)) - .counter() - .count()) - .isEqualTo(span.toByteArray().length * 2); - - // Check if * works - MeterRegistry meterRegistry1 = new SimpleMeterRegistry(); - PreprocessorRateLimiter rateLimiter1 = - new PreprocessorRateLimiter( - meterRegistry1, preprocessorCount, maxBurstSeconds, initializeWarm); - DatasetMetadata datasetMetadata1 = - new DatasetMetadata( - name, - name, - targetThroughput, - List.of(new DatasetPartitionMetadata(100, 200, List.of("0"))), - DatasetMetadata.MATCH_STAR_SERVICE); - BiPredicate> predicate1 = - rateLimiter1.createBulkIngestRateLimiter(List.of(datasetMetadata1)); - - // try to get just below the scaled limit, then try to go over - assertThat(predicate1.test(name, List.of(span))).isTrue(); - assertThat(predicate1.test(name, List.of(span))).isFalse(); - - // rate limit is targetThroughput per second, so 1 second should refill our limit - Thread.sleep(1000); - - // try to get just below the scaled limit, then try to go over - assertThat(predicate1.test(name, List.of(span))).isTrue(); - assertThat(predicate1.test(name, List.of(span))).isFalse(); - - assertThat( - meterRegistry1 - .get(MESSAGES_DROPPED) - .tag("reason", String.valueOf(PreprocessorRateLimiter.MessageDropReason.OVER_LIMIT)) - .counter() - .count()) - .isEqualTo(2); - assertThat( - meterRegistry1 - .get(BYTES_DROPPED) - .tag("reason", String.valueOf(PreprocessorRateLimiter.MessageDropReason.OVER_LIMIT)) - .counter() - .count()) - .isEqualTo(span.toByteArray().length * 2); - - // check back compat where service name will be null - MeterRegistry meterRegistry2 = new SimpleMeterRegistry(); - PreprocessorRateLimiter rateLimiter2 = - new PreprocessorRateLimiter( - meterRegistry2, preprocessorCount, maxBurstSeconds, initializeWarm); - DatasetMetadata datasetMetadata2 = - new DatasetMetadata( - name, - name, - targetThroughput, - List.of(new DatasetPartitionMetadata(100, 200, List.of("0"))), - null); - BiPredicate> predicate2 = - rateLimiter2.createBulkIngestRateLimiter(List.of(datasetMetadata2)); - - // try to get just below the scaled limit, then try to go over - assertThat(predicate2.test(name, List.of(span))).isTrue(); - assertThat(predicate2.test(name, List.of(span))).isFalse(); - - // rate limit is targetThroughput per second, so 1 second should refill our limit - Thread.sleep(1000); - - // try to get just below the scaled limit, then try to go over - assertThat(predicate2.test(name, List.of(span))).isTrue(); - assertThat(predicate2.test(name, List.of(span))).isFalse(); - - assertThat( - meterRegistry2 - .get(MESSAGES_DROPPED) - .tag("reason", String.valueOf(PreprocessorRateLimiter.MessageDropReason.OVER_LIMIT)) - .counter() - .count()) - .isEqualTo(2); - assertThat( - meterRegistry2 - .get(BYTES_DROPPED) - .tag("reason", String.valueOf(PreprocessorRateLimiter.MessageDropReason.OVER_LIMIT)) - .counter() - .count()) - .isEqualTo(span.toByteArray().length * 2); - } - - @Test - public void shouldApplyRateLimitsAgainstMultipleDatasets() throws InterruptedException { - MeterRegistry meterRegistry = new SimpleMeterRegistry(); - int preprocessorCount = 2; - int maxBurstSeconds = 1; - boolean initializeWarm = false; - PreprocessorRateLimiter rateLimiter = - new PreprocessorRateLimiter( - meterRegistry, preprocessorCount, maxBurstSeconds, initializeWarm); - - String name1 = "rateLimiter1"; - String name2 = "rateLimiter2"; - Trace.Span span = - Trace.Span.newBuilder() - .addTags(Trace.KeyValue.newBuilder().setKey(SERVICE_NAME_KEY).setVStr(name2).build()) - .build(); - - // set the target so that we pass the first add, then fail the second - long targetThroughput = ((long) span.toByteArray().length * preprocessorCount) + 1; - - // Dataset 1 has service name will never match the record while dataset 2 will match - DatasetMetadata datasetMetadata1 = - new DatasetMetadata( - name1, - name1, - targetThroughput, - List.of(new DatasetPartitionMetadata(100, 200, List.of("0"))), - "no_service_matching_docs"); - DatasetMetadata datasetMetadata2 = - new DatasetMetadata( - name2, - name2, - targetThroughput, - List.of(new DatasetPartitionMetadata(100, 200, List.of("0"))), - DatasetMetadata.MATCH_ALL_SERVICE); - - // ensure we always drop for dataset1 - BiPredicate> predicate1 = - rateLimiter.createBulkIngestRateLimiter(List.of(datasetMetadata1)); - assertThat(predicate1.test(name1, List.of(span))).isFalse(); - - BiPredicate> predicate2 = - rateLimiter.createBulkIngestRateLimiter(List.of(datasetMetadata1, datasetMetadata2)); - - // try to get just below the scaled limit, then try to go over - assertThat(predicate2.test(name2, List.of(span))).isTrue(); - assertThat(predicate2.test(name2, List.of(span))).isFalse(); - - // rate limit is targetThroughput per second, so 1 second should refill our limit - Thread.sleep(1000); - - // try to get just below the scaled limit, then try to go over - assertThat(predicate2.test(name2, List.of(span))).isTrue(); - assertThat(predicate2.test(name2, List.of(span))).isFalse(); - - assertThat( - meterRegistry - .get(MESSAGES_DROPPED) - .tag("reason", String.valueOf(PreprocessorRateLimiter.MessageDropReason.OVER_LIMIT)) - .counter() - .count()) - .isEqualTo(2); - assertThat( - meterRegistry - .get(BYTES_DROPPED) - .tag("reason", String.valueOf(PreprocessorRateLimiter.MessageDropReason.OVER_LIMIT)) - .counter() - .count()) - .isEqualTo(span.toByteArray().length * 2); - } - - @Test - public void shouldDropMessagesWithNoConfiguration() { - MeterRegistry meterRegistry = new SimpleMeterRegistry(); - PreprocessorRateLimiter rateLimiter = new PreprocessorRateLimiter(meterRegistry, 1, 1, false); - - Trace.Span span = - Trace.Span.newBuilder() - .addTags( - Trace.KeyValue.newBuilder() - .setKey(SERVICE_NAME_KEY) - .setVStr("unprovisioned_service") - .build()) - .build(); - - DatasetMetadata datasetMetadata = - new DatasetMetadata( - "wrong_service", - "wrong_service", - Long.MAX_VALUE, - List.of(new DatasetPartitionMetadata(100, 200, List.of("0"))), - "wrong_service"); - BiPredicate> predicate = - rateLimiter.createBulkIngestRateLimiter(List.of(datasetMetadata)); - - // this should be immediately dropped - assertThat(predicate.test("unprovisioned_service", List.of(span))).isFalse(); - - assertThat( - meterRegistry - .get(MESSAGES_DROPPED) - .tag( - "reason", - String.valueOf(PreprocessorRateLimiter.MessageDropReason.NOT_PROVISIONED)) - .counter() - .count()) - .isEqualTo(1); - assertThat( - meterRegistry - .get(BYTES_DROPPED) - .tag( - "reason", - String.valueOf(PreprocessorRateLimiter.MessageDropReason.NOT_PROVISIONED)) - .counter() - .count()) - .isEqualTo(span.toByteArray().length); - } - - @Test - public void shouldHandleEmptyMessages() { - MeterRegistry meterRegistry = new SimpleMeterRegistry(); - int preprocessorCount = 1; - int maxBurstSeconds = 1; - boolean initializeWarm = false; - PreprocessorRateLimiter rateLimiter = - new PreprocessorRateLimiter( - meterRegistry, preprocessorCount, maxBurstSeconds, initializeWarm); - - String name = "rateLimiter"; - long targetThroughput = 1000; - DatasetMetadata datasetMetadata = - new DatasetMetadata( - name, - name, - targetThroughput, - List.of(new DatasetPartitionMetadata(100, 200, List.of("0"))), - name); - Predicate predicate = - rateLimiter.createRateLimiter(List.of(datasetMetadata)); - - assertThat(predicate.test("key", Trace.Span.newBuilder().build())).isFalse(); - assertThat(predicate.test("key", null)).isFalse(); - } - - @Test - public void shouldThrowOnInvalidConfigurations() { - MeterRegistry meterRegistry = new SimpleMeterRegistry(); - assertThatIllegalArgumentException() - .isThrownBy(() -> new PreprocessorRateLimiter(meterRegistry, 0, 1, true)); - assertThatIllegalArgumentException() - .isThrownBy(() -> new PreprocessorRateLimiter(meterRegistry, 1, 0, true)); - } - - @Test - public void shouldAllowBurstingOverLimitWarm() throws InterruptedException { - String name = "rateLimiter"; - Trace.Span span = - Trace.Span.newBuilder() - .addTags(Trace.KeyValue.newBuilder().setKey(SERVICE_NAME_KEY).setVStr(name).build()) - .build(); - - MeterRegistry meterRegistry = new SimpleMeterRegistry(); - PreprocessorRateLimiter rateLimiter = new PreprocessorRateLimiter(meterRegistry, 1, 3, true); - - long targetThroughput = span.getSerializedSize() - 1; - DatasetMetadata datasetMetadata = - new DatasetMetadata( - name, - name, - targetThroughput, - List.of(new DatasetPartitionMetadata(100, 200, List.of("0"))), - name); - BiPredicate> predicate = - rateLimiter.createBulkIngestRateLimiter(List.of(datasetMetadata)); - - assertThat(predicate.test(name, List.of(span))).isTrue(); - assertThat(predicate.test(name, List.of(span))).isTrue(); - assertThat(predicate.test(name, List.of(span))).isTrue(); - assertThat(predicate.test(name, List.of(span))).isFalse(); - - Thread.sleep(2000); - - assertThat(predicate.test(name, List.of(span))).isTrue(); - assertThat(predicate.test(name, List.of(span))).isTrue(); - assertThat(predicate.test(name, List.of(span))).isFalse(); - } - - @Test - public void shouldAllowBurstingOverLimitCold() throws InterruptedException { - String name = "rateLimiter"; - Trace.Span span1 = - Trace.Span.newBuilder() - .addTags(Trace.KeyValue.newBuilder().setKey(SERVICE_NAME_KEY).setVStr(name).build()) - .build(); - - Trace.Span span2 = - Trace.Span.newBuilder() - .addTags(Trace.KeyValue.newBuilder().setKey(SERVICE_NAME_KEY).setVStr(name).build()) - .build(); - - List spans = List.of(span1, span2); - - MeterRegistry meterRegistry = new SimpleMeterRegistry(); - PreprocessorRateLimiter rateLimiter = new PreprocessorRateLimiter(meterRegistry, 1, 2, false); - - long targetThroughput = getSpanBytes(spans) - 1; - DatasetMetadata datasetMetadata = - new DatasetMetadata( - name, - name, - targetThroughput, - List.of(new DatasetPartitionMetadata(100, 200, List.of("0"))), - name); - BiPredicate> predicate = - rateLimiter.createBulkIngestRateLimiter(List.of(datasetMetadata)); - - assertThat(predicate.test(name, spans)).isTrue(); - assertThat(predicate.test(name, spans)).isFalse(); - - Thread.sleep(2500); - - assertThat(predicate.test(name, spans)).isTrue(); - assertThat(predicate.test(name, spans)).isTrue(); - assertThat(predicate.test(name, spans)).isFalse(); - } -} diff --git a/astra/src/test/java/com/slack/astra/preprocessor/PreprocessorRateLimiterTest.java b/astra/src/test/java/com/slack/astra/preprocessor/PreprocessorRateLimiterTest.java index afea1923de..43feafc76d 100644 --- a/astra/src/test/java/com/slack/astra/preprocessor/PreprocessorRateLimiterTest.java +++ b/astra/src/test/java/com/slack/astra/preprocessor/PreprocessorRateLimiterTest.java @@ -2,7 +2,7 @@ import static com.slack.astra.preprocessor.PreprocessorRateLimiter.BYTES_DROPPED; import static com.slack.astra.preprocessor.PreprocessorRateLimiter.MESSAGES_DROPPED; -import static com.slack.astra.preprocessor.PreprocessorValueMapper.SERVICE_NAME_KEY; +import static com.slack.astra.preprocessor.PreprocessorRateLimiter.SERVICE_NAME_KEY; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; @@ -12,7 +12,7 @@ import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import java.util.List; -import org.apache.kafka.streams.kstream.Predicate; +import java.util.function.BiPredicate; import org.junit.jupiter.api.Test; public class PreprocessorRateLimiterTest { @@ -41,20 +41,20 @@ public void shouldApplyScaledRateLimit() throws InterruptedException { name, targetThroughput, List.of(new DatasetPartitionMetadata(100, 200, List.of("0"))), - name); - Predicate predicate = - rateLimiter.createRateLimiter(List.of(datasetMetadata)); + DatasetMetadata.MATCH_ALL_SERVICE); + BiPredicate> predicate = + rateLimiter.createBulkIngestRateLimiter(List.of(datasetMetadata)); // try to get just below the scaled limit, then try to go over - assertThat(predicate.test("key", span)).isTrue(); - assertThat(predicate.test("key", span)).isFalse(); + assertThat(predicate.test("key", List.of(span))).isTrue(); + assertThat(predicate.test("key", List.of(span))).isFalse(); // rate limit is targetThroughput per second, so 1 second should refill our limit Thread.sleep(1000); // try to get just below the scaled limit, then try to go over - assertThat(predicate.test("key", span)).isTrue(); - assertThat(predicate.test("key", span)).isFalse(); + assertThat(predicate.test("key", List.of(span))).isTrue(); + assertThat(predicate.test("key", List.of(span))).isFalse(); assertThat( meterRegistry @@ -99,19 +99,19 @@ public void shouldApplyScaledRateLimitWithAllServices() throws InterruptedExcept targetThroughput, List.of(new DatasetPartitionMetadata(100, 200, List.of("0"))), DatasetMetadata.MATCH_ALL_SERVICE); - Predicate predicate = - rateLimiter.createRateLimiter(List.of(datasetMetadata)); + BiPredicate> predicate = + rateLimiter.createBulkIngestRateLimiter(List.of(datasetMetadata)); // try to get just below the scaled limit, then try to go over - assertThat(predicate.test("key", span)).isTrue(); - assertThat(predicate.test("key", span)).isFalse(); + assertThat(predicate.test("key", List.of(span))).isTrue(); + assertThat(predicate.test("key", List.of(span))).isFalse(); // rate limit is targetThroughput per second, so 1 second should refill our limit Thread.sleep(1000); // try to get just below the scaled limit, then try to go over - assertThat(predicate.test("key", span)).isTrue(); - assertThat(predicate.test("key", span)).isFalse(); + assertThat(predicate.test("key", List.of(span))).isTrue(); + assertThat(predicate.test("key", List.of(span))).isFalse(); assertThat( meterRegistry @@ -140,19 +140,19 @@ public void shouldApplyScaledRateLimitWithAllServices() throws InterruptedExcept targetThroughput, List.of(new DatasetPartitionMetadata(100, 200, List.of("0"))), DatasetMetadata.MATCH_STAR_SERVICE); - Predicate predicate1 = - rateLimiter1.createRateLimiter(List.of(datasetMetadata1)); + BiPredicate> predicate1 = + rateLimiter1.createBulkIngestRateLimiter(List.of(datasetMetadata1)); // try to get just below the scaled limit, then try to go over - assertThat(predicate1.test("key", span)).isTrue(); - assertThat(predicate1.test("key", span)).isFalse(); + assertThat(predicate1.test("key", List.of(span))).isTrue(); + assertThat(predicate1.test("key", List.of(span))).isFalse(); // rate limit is targetThroughput per second, so 1 second should refill our limit Thread.sleep(1000); // try to get just below the scaled limit, then try to go over - assertThat(predicate1.test("key", span)).isTrue(); - assertThat(predicate1.test("key", span)).isFalse(); + assertThat(predicate1.test("key", List.of(span))).isTrue(); + assertThat(predicate1.test("key", List.of(span))).isFalse(); assertThat( meterRegistry1 @@ -180,20 +180,20 @@ public void shouldApplyScaledRateLimitWithAllServices() throws InterruptedExcept name, targetThroughput, List.of(new DatasetPartitionMetadata(100, 200, List.of("0"))), - null); - Predicate predicate2 = - rateLimiter2.createRateLimiter(List.of(datasetMetadata2)); + "key"); + BiPredicate> predicate2 = + rateLimiter2.createBulkIngestRateLimiter(List.of(datasetMetadata2)); // try to get just below the scaled limit, then try to go over - assertThat(predicate2.test("key", span)).isTrue(); - assertThat(predicate2.test("key", span)).isFalse(); + assertThat(predicate2.test("key", List.of(span))).isTrue(); + assertThat(predicate2.test("key", List.of(span))).isFalse(); // rate limit is targetThroughput per second, so 1 second should refill our limit Thread.sleep(1000); // try to get just below the scaled limit, then try to go over - assertThat(predicate2.test("key", span)).isTrue(); - assertThat(predicate2.test("key", span)).isFalse(); + assertThat(predicate2.test("key", List.of(span))).isTrue(); + assertThat(predicate2.test("key", List.of(span))).isFalse(); assertThat( meterRegistry2 @@ -248,23 +248,23 @@ public void shouldApplyRateLimitsAgainstMultipleDatasets() throws InterruptedExc DatasetMetadata.MATCH_ALL_SERVICE); // ensure we always drop for dataset1 - Predicate predicate1 = - rateLimiter.createRateLimiter(List.of(datasetMetadata1)); - assertThat(predicate1.test("key", span)).isFalse(); + BiPredicate> predicate1 = + rateLimiter.createBulkIngestRateLimiter(List.of(datasetMetadata1)); + assertThat(predicate1.test("key", List.of(span))).isFalse(); - Predicate predicate2 = - rateLimiter.createRateLimiter(List.of(datasetMetadata1, datasetMetadata2)); + BiPredicate> predicate2 = + rateLimiter.createBulkIngestRateLimiter(List.of(datasetMetadata1, datasetMetadata2)); // try to get just below the scaled limit, then try to go over - assertThat(predicate2.test("key", span)).isTrue(); - assertThat(predicate2.test("key", span)).isFalse(); + assertThat(predicate2.test("key", List.of(span))).isTrue(); + assertThat(predicate2.test("key", List.of(span))).isFalse(); // rate limit is targetThroughput per second, so 1 second should refill our limit Thread.sleep(1000); // try to get just below the scaled limit, then try to go over - assertThat(predicate2.test("key", span)).isTrue(); - assertThat(predicate2.test("key", span)).isFalse(); + assertThat(predicate2.test("key", List.of(span))).isTrue(); + assertThat(predicate2.test("key", List.of(span))).isFalse(); assertThat( meterRegistry @@ -303,11 +303,11 @@ public void shouldDropMessagesWithNoConfiguration() { Long.MAX_VALUE, List.of(new DatasetPartitionMetadata(100, 200, List.of("0"))), "wrong_service"); - Predicate predicate = - rateLimiter.createRateLimiter(List.of(datasetMetadata)); + BiPredicate> predicate = + rateLimiter.createBulkIngestRateLimiter(List.of(datasetMetadata)); // this should be immediately dropped - assertThat(predicate.test("key", span)).isFalse(); + assertThat(predicate.test("key", List.of(span))).isFalse(); assertThat( meterRegistry @@ -348,10 +348,10 @@ public void shouldHandleEmptyMessages() { targetThroughput, List.of(new DatasetPartitionMetadata(100, 200, List.of("0"))), name); - Predicate predicate = - rateLimiter.createRateLimiter(List.of(datasetMetadata)); + BiPredicate> predicate = + rateLimiter.createBulkIngestRateLimiter(List.of(datasetMetadata)); - assertThat(predicate.test("key", Trace.Span.newBuilder().build())).isFalse(); + assertThat(predicate.test("key", List.of(Trace.Span.newBuilder().build()))).isFalse(); assertThat(predicate.test("key", null)).isFalse(); } @@ -382,20 +382,20 @@ public void shouldAllowBurstingOverLimitWarm() throws InterruptedException { name, targetThroughput, List.of(new DatasetPartitionMetadata(100, 200, List.of("0"))), - name); - Predicate predicate = - rateLimiter.createRateLimiter(List.of(datasetMetadata)); + DatasetMetadata.MATCH_ALL_SERVICE); + BiPredicate> predicate = + rateLimiter.createBulkIngestRateLimiter(List.of(datasetMetadata)); - assertThat(predicate.test("key", span)).isTrue(); - assertThat(predicate.test("key", span)).isTrue(); - assertThat(predicate.test("key", span)).isTrue(); - assertThat(predicate.test("key", span)).isFalse(); + assertThat(predicate.test("key", List.of(span))).isTrue(); + assertThat(predicate.test("key", List.of(span))).isTrue(); + assertThat(predicate.test("key", List.of(span))).isTrue(); + assertThat(predicate.test("key", List.of(span))).isFalse(); Thread.sleep(2000); - assertThat(predicate.test("key", span)).isTrue(); - assertThat(predicate.test("key", span)).isTrue(); - assertThat(predicate.test("key", span)).isFalse(); + assertThat(predicate.test("key", List.of(span))).isTrue(); + assertThat(predicate.test("key", List.of(span))).isTrue(); + assertThat(predicate.test("key", List.of(span))).isFalse(); } @Test @@ -416,17 +416,17 @@ public void shouldAllowBurstingOverLimitCold() throws InterruptedException { name, targetThroughput, List.of(new DatasetPartitionMetadata(100, 200, List.of("0"))), - name); - Predicate predicate = - rateLimiter.createRateLimiter(List.of(datasetMetadata)); + DatasetMetadata.MATCH_ALL_SERVICE); + BiPredicate> predicate = + rateLimiter.createBulkIngestRateLimiter(List.of(datasetMetadata)); - assertThat(predicate.test("key", span)).isTrue(); - assertThat(predicate.test("key", span)).isFalse(); + assertThat(predicate.test("key", List.of(span))).isTrue(); + assertThat(predicate.test("key", List.of(span))).isFalse(); Thread.sleep(2500); - assertThat(predicate.test("key", span)).isTrue(); - assertThat(predicate.test("key", span)).isTrue(); - assertThat(predicate.test("key", span)).isFalse(); + assertThat(predicate.test("key", List.of(span))).isTrue(); + assertThat(predicate.test("key", List.of(span))).isTrue(); + assertThat(predicate.test("key", List.of(span))).isFalse(); } } diff --git a/astra/src/test/java/com/slack/astra/preprocessor/PreprocessorServiceIntegrationTest.java b/astra/src/test/java/com/slack/astra/preprocessor/PreprocessorServiceIntegrationTest.java deleted file mode 100644 index 0d31b25d4d..0000000000 --- a/astra/src/test/java/com/slack/astra/preprocessor/PreprocessorServiceIntegrationTest.java +++ /dev/null @@ -1,328 +0,0 @@ -package com.slack.astra.preprocessor; - -import static com.slack.astra.server.AstraConfig.DEFAULT_START_STOP_DURATION; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatExceptionOfType; -import static org.awaitility.Awaitility.await; - -import com.google.common.util.concurrent.Service; -import com.slack.astra.metadata.core.AstraMetadataTestUtils; -import com.slack.astra.metadata.core.CuratorBuilder; -import com.slack.astra.metadata.dataset.DatasetMetadata; -import com.slack.astra.metadata.dataset.DatasetMetadataStore; -import com.slack.astra.metadata.dataset.DatasetPartitionMetadata; -import com.slack.astra.proto.config.AstraConfigs; -import com.slack.astra.testlib.MetricsUtil; -import com.slack.astra.testlib.TestKafkaServer; -import io.micrometer.core.instrument.simple.SimpleMeterRegistry; -import java.time.Duration; -import java.time.Instant; -import java.time.temporal.ChronoUnit; -import java.util.List; -import java.util.Properties; -import org.apache.curator.test.TestingServer; -import org.apache.curator.x.async.AsyncCuratorFramework; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.streams.KafkaStreams; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class PreprocessorServiceIntegrationTest { - private static final Logger LOG = - LoggerFactory.getLogger(PreprocessorServiceIntegrationTest.class); - - private TestKafkaServer kafkaServer; - private TestingServer zkServer; - - @BeforeEach - public void setUp() throws Exception { - zkServer = new TestingServer(); - kafkaServer = new TestKafkaServer(); - } - - @AfterEach - public void teardown() throws Exception { - kafkaServer.close(); - zkServer.close(); - } - - @Test - @Disabled("ZK reconnect support currently disabled") - public void shouldHandleStreamError() throws Exception { - SimpleMeterRegistry meterRegistry = new SimpleMeterRegistry(); - AstraConfigs.ZookeeperConfig zkConfig = - AstraConfigs.ZookeeperConfig.newBuilder() - .setZkConnectString(zkServer.getConnectString()) - .setZkPathPrefix("test") - .setZkSessionTimeoutMs(100) - .setZkConnectionTimeoutMs(100) - .setSleepBetweenRetriesMs(100) - .build(); - AsyncCuratorFramework curatorFramework = CuratorBuilder.build(meterRegistry, zkConfig); - DatasetMetadataStore datasetMetadataStore = new DatasetMetadataStore(curatorFramework, true); - - AstraConfigs.PreprocessorConfig.KafkaStreamConfig kafkaStreamConfig = - AstraConfigs.PreprocessorConfig.KafkaStreamConfig.newBuilder() - .setApplicationId("applicationId") - .setBootstrapServers(kafkaServer.getBroker().getBrokerList().get()) - .setNumStreamThreads(1) - .setProcessingGuarantee("at_least_once") - .build(); - AstraConfigs.ServerConfig serverConfig = - AstraConfigs.ServerConfig.newBuilder() - .setServerPort(8080) - .setServerAddress("localhost") - .build(); - AstraConfigs.PreprocessorConfig preprocessorConfig = - AstraConfigs.PreprocessorConfig.newBuilder() - .setKafkaStreamConfig(kafkaStreamConfig) - .setServerConfig(serverConfig) - .setPreprocessorInstanceCount(1) - .setDataTransformer("api_log") - .setRateLimiterMaxBurstSeconds(1) - .addAllUpstreamTopics(List.of("foo")) - .setDownstreamTopic("bar") - .build(); - - PreprocessorService preprocessorService = - new PreprocessorService(datasetMetadataStore, preprocessorConfig, meterRegistry); - - datasetMetadataStore.createSync( - new DatasetMetadata( - "name", - "owner", - 1, - List.of(new DatasetPartitionMetadata(1, Long.MAX_VALUE, List.of("1"))), - "name")); - await().until(() -> datasetMetadataStore.listSync().size(), (size) -> size == 1); - - preprocessorService.startAsync(); - preprocessorService.awaitRunning(DEFAULT_START_STOP_DURATION); - assertThat(MetricsUtil.getTimerCount(PreprocessorService.CONFIG_RELOAD_TIMER, meterRegistry)) - .isEqualTo(1); - - // restarting ZK should cause a stream application error due to missing source topics - zkServer.restart(); - - assertThatExceptionOfType(RuntimeException.class).isThrownBy(() -> zkServer.restart()); - - await() - .until( - () -> preprocessorService.kafkaStreams.state(), - KafkaStreams.State::hasCompletedShutdown); - await().until(preprocessorService::state, (state) -> state.equals(Service.State.FAILED)); - } - - @Test - public void shouldLoadConfigOnStartAndReloadOnMetadataChange() throws Exception { - SimpleMeterRegistry meterRegistry = new SimpleMeterRegistry(); - AstraConfigs.ZookeeperConfig zkConfig = - AstraConfigs.ZookeeperConfig.newBuilder() - .setZkConnectString(zkServer.getConnectString()) - .setZkPathPrefix("test") - .setZkSessionTimeoutMs(30000) - .setZkConnectionTimeoutMs(30000) - .setSleepBetweenRetriesMs(30000) - .build(); - AsyncCuratorFramework curatorFramework = CuratorBuilder.build(meterRegistry, zkConfig); - DatasetMetadataStore datasetMetadataStore = new DatasetMetadataStore(curatorFramework, true); - - AstraConfigs.PreprocessorConfig.KafkaStreamConfig kafkaStreamConfig = - AstraConfigs.PreprocessorConfig.KafkaStreamConfig.newBuilder() - .setApplicationId("applicationId") - .setBootstrapServers(kafkaServer.getBroker().getBrokerList().get()) - .setNumStreamThreads(1) - .build(); - AstraConfigs.ServerConfig serverConfig = - AstraConfigs.ServerConfig.newBuilder() - .setServerPort(8080) - .setServerAddress("localhost") - .build(); - AstraConfigs.PreprocessorConfig preprocessorConfig = - AstraConfigs.PreprocessorConfig.newBuilder() - .setKafkaStreamConfig(kafkaStreamConfig) - .setServerConfig(serverConfig) - .setPreprocessorInstanceCount(1) - .setDataTransformer("api_log") - .setRateLimiterMaxBurstSeconds(1) - .build(); - - PreprocessorService preprocessorService = - new PreprocessorService(datasetMetadataStore, preprocessorConfig, meterRegistry); - - preprocessorService.startAsync(); - preprocessorService.awaitRunning(DEFAULT_START_STOP_DURATION); - - assertThat(MetricsUtil.getTimerCount(PreprocessorService.CONFIG_RELOAD_TIMER, meterRegistry)) - .isEqualTo(1); - datasetMetadataStore.createSync(new DatasetMetadata("name", "owner", 0, List.of(), "name")); - - // wait for the cache to be updated - await().until(() -> AstraMetadataTestUtils.listSyncUncached(datasetMetadataStore).size() == 1); - assertThat(MetricsUtil.getTimerCount(PreprocessorService.CONFIG_RELOAD_TIMER, meterRegistry)) - .isEqualTo(2); - - preprocessorService.stopAsync(); - preprocessorService.awaitTerminated(); - - // close out the metadata stores - datasetMetadataStore.close(); - curatorFramework.unwrap().close(); - } - - // Ignore flaky test. This test can be potentially merged with the above test. - @Disabled - @Test - @SuppressWarnings({"rawtypes", "unchecked"}) - public void shouldProcessMessageStartToFinish() throws Exception { - SimpleMeterRegistry meterRegistry = new SimpleMeterRegistry(); - AstraConfigs.ZookeeperConfig zkConfig = - AstraConfigs.ZookeeperConfig.newBuilder() - .setZkConnectString(zkServer.getConnectString()) - .setZkPathPrefix("test") - .setZkSessionTimeoutMs(30000) - .setZkConnectionTimeoutMs(30000) - .setSleepBetweenRetriesMs(30000) - .build(); - AsyncCuratorFramework curatorFramework = CuratorBuilder.build(meterRegistry, zkConfig); - DatasetMetadataStore datasetMetadataStore = new DatasetMetadataStore(curatorFramework, true); - - // initialize the downstream topic - String downstreamTopic = "test-topic-out"; - kafkaServer.createTopicWithPartitions(downstreamTopic, 3); - - AstraConfigs.PreprocessorConfig.KafkaStreamConfig kafkaStreamConfig = - AstraConfigs.PreprocessorConfig.KafkaStreamConfig.newBuilder() - .setApplicationId("applicationId") - .setBootstrapServers(kafkaServer.getBroker().getBrokerList().get()) - .setNumStreamThreads(1) - .setProcessingGuarantee("at_least_once") - .build(); - AstraConfigs.ServerConfig serverConfig = - AstraConfigs.ServerConfig.newBuilder() - .setServerPort(8080) - .setServerAddress("localhost") - .build(); - - List upstreamTopics = List.of("test-topic"); - AstraConfigs.PreprocessorConfig preprocessorConfig = - AstraConfigs.PreprocessorConfig.newBuilder() - .setKafkaStreamConfig(kafkaStreamConfig) - .setServerConfig(serverConfig) - .setPreprocessorInstanceCount(1) - .addAllUpstreamTopics(upstreamTopics) - .setDownstreamTopic(downstreamTopic) - .setDataTransformer("api_log") - .build(); - - PreprocessorService preprocessorService = - new PreprocessorService(datasetMetadataStore, preprocessorConfig, meterRegistry); - - preprocessorService.startAsync(); - preprocessorService.awaitRunning(DEFAULT_START_STOP_DURATION); - - assertThat(MetricsUtil.getTimerCount(PreprocessorService.CONFIG_RELOAD_TIMER, meterRegistry)) - .isEqualTo(1); - - // create a new service config with dummy data - String serviceName = "testindex"; - DatasetMetadata datasetMetadata = - new DatasetMetadata( - serviceName, - "owner", - 100, - List.of(new DatasetPartitionMetadata(1, Long.MAX_VALUE, List.of("3"))), - serviceName); - datasetMetadataStore.createSync(datasetMetadata); - - // wait for the cache to be updated - await().until(() -> AstraMetadataTestUtils.listSyncUncached(datasetMetadataStore).size() == 1); - await() - .until( - () -> - MetricsUtil.getTimerCount(PreprocessorService.CONFIG_RELOAD_TIMER, meterRegistry) - == 2); - - // update the service config with our desired configuration - DatasetMetadata updatedDatasetMetadata = - new DatasetMetadata( - datasetMetadata.getName(), - datasetMetadata.getOwner(), - Long.MAX_VALUE, - List.of( - new DatasetPartitionMetadata(1, 10000, List.of("3")), - new DatasetPartitionMetadata(10001, Long.MAX_VALUE, List.of("2"))), - datasetMetadata.getName()); - datasetMetadataStore.updateSync(updatedDatasetMetadata); - - // wait for the cache to be updated - await() - .until( - () -> - MetricsUtil.getTimerCount(PreprocessorService.CONFIG_RELOAD_TIMER, meterRegistry) - == 3); - assertThat(AstraMetadataTestUtils.listSyncUncached(datasetMetadataStore).size()).isEqualTo(1); - assertThat( - AstraMetadataTestUtils.listSyncUncached(datasetMetadataStore) - .get(0) - .getThroughputBytes()) - .isEqualTo(Long.MAX_VALUE); - - // produce messages to upstream - final Instant startTime = Instant.now(); - TestKafkaServer.produceMessagesToKafka(kafkaServer.getBroker(), startTime); - - // verify the message exist on the downstream - Properties properties = kafkaServer.getBroker().consumerConfig(); - properties.put( - ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, - "org.apache.kafka.common.serialization.StringDeserializer"); - properties.put( - ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - "org.apache.kafka.common.serialization.ByteArrayDeserializer"); - properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); - KafkaConsumer kafkaConsumer = new KafkaConsumer(properties); - kafkaConsumer.subscribe(List.of(downstreamTopic)); - - // wait until we see an offset of 100 on our target partition before we poll - await() - .until( - () -> { - @SuppressWarnings("OptionalGetWithoutIsPresent") - Long partition2Offset = - ((Long) - kafkaConsumer - .endOffsets(List.of(new TopicPartition(downstreamTopic, 2))) - .values() - .stream() - .findFirst() - .get()); - LOG.debug("Current partition2Offset - {}", partition2Offset); - return partition2Offset == 100; - }); - - // double check that only 100 records were fetched and all are on partition 2 - ConsumerRecords records = - kafkaConsumer.poll(Duration.of(10, ChronoUnit.SECONDS)); - assertThat(records.count()).isEqualTo(100); - records.forEach(record -> assertThat(record.partition()).isEqualTo(2)); - - // close the kafka consumer used in the test - kafkaConsumer.close(); - - // close the preprocessor - preprocessorService.stopAsync(); - preprocessorService.awaitTerminated(); - - // close out the metadata stores - datasetMetadataStore.close(); - curatorFramework.unwrap().close(); - } -} diff --git a/astra/src/test/java/com/slack/astra/preprocessor/PreprocessorServiceUnitTest.java b/astra/src/test/java/com/slack/astra/preprocessor/PreprocessorServiceUnitTest.java deleted file mode 100644 index 2109cabd42..0000000000 --- a/astra/src/test/java/com/slack/astra/preprocessor/PreprocessorServiceUnitTest.java +++ /dev/null @@ -1,537 +0,0 @@ -package com.slack.astra.preprocessor; - -import static com.slack.astra.preprocessor.PreprocessorValueMapper.SERVICE_NAME_KEY; -import static com.slack.astra.server.AstraConfig.DEFAULT_START_STOP_DURATION; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; -import static org.assertj.core.api.Assertions.assertThatNullPointerException; -import static org.assertj.core.api.Assertions.fail; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import com.slack.astra.metadata.dataset.DatasetMetadata; -import com.slack.astra.metadata.dataset.DatasetMetadataStore; -import com.slack.astra.metadata.dataset.DatasetPartitionMetadata; -import com.slack.astra.proto.config.AstraConfigs; -import com.slack.astra.testlib.MetricsUtil; -import com.slack.service.murron.trace.Trace; -import io.micrometer.core.instrument.MeterRegistry; -import io.micrometer.core.instrument.simple.SimpleMeterRegistry; -import java.time.Instant; -import java.util.Collections; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.TimeoutException; -import java.util.stream.Collectors; -import java.util.stream.IntStream; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.Topology; -import org.apache.kafka.streams.processor.StreamPartitioner; -import org.junit.jupiter.api.Test; - -public class PreprocessorServiceUnitTest { - - @Test - public void shouldBuildValidPropsFromStreamConfig() { - String applicationId = "applicationId"; - String bootstrapServers = "bootstrap"; - String processingGuarantee = "at_least_once"; - int replicationFactor = 1; - boolean enableIdempotence = false; - String acksConfig = "1"; - int numStreamThreads = 1; - - AstraConfigs.PreprocessorConfig.KafkaStreamConfig kafkaStreamConfig = - AstraConfigs.PreprocessorConfig.KafkaStreamConfig.newBuilder() - .setApplicationId(applicationId) - .setBootstrapServers(bootstrapServers) - .setNumStreamThreads(numStreamThreads) - .setProcessingGuarantee(processingGuarantee) - .build(); - - Properties properties = PreprocessorService.makeKafkaStreamsProps(kafkaStreamConfig); - assertThat(properties.size()).isEqualTo(7); - - assertThat(properties.get(StreamsConfig.APPLICATION_ID_CONFIG)).isEqualTo(applicationId); - - assertThat(properties.get(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG)).isEqualTo(bootstrapServers); - - assertThat(properties.get(StreamsConfig.NUM_STREAM_THREADS_CONFIG)).isEqualTo(numStreamThreads); - assertThat(properties.get(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)) - .isEqualTo(processingGuarantee); - assertThat(properties.get(StreamsConfig.REPLICATION_FACTOR_CONFIG)) - .isEqualTo(replicationFactor); - assertThat( - properties.get(StreamsConfig.producerPrefix(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG))) - .isEqualTo(enableIdempotence); - assertThat(properties.get(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG))) - .isEqualTo(acksConfig); - } - - @Test - public void shouldPreventInvalidStreamPropsConfig() { - String applicationId = "applicationId"; - String bootstrapServers = "bootstrap"; - int numStreamThreads = 1; - - assertThatIllegalArgumentException() - .isThrownBy( - () -> { - AstraConfigs.PreprocessorConfig.KafkaStreamConfig kafkaStreamConfig = - AstraConfigs.PreprocessorConfig.KafkaStreamConfig.newBuilder() - .setBootstrapServers(bootstrapServers) - .setNumStreamThreads(numStreamThreads) - .build(); - PreprocessorService.makeKafkaStreamsProps(kafkaStreamConfig); - }); - - assertThatIllegalArgumentException() - .isThrownBy( - () -> { - AstraConfigs.PreprocessorConfig.KafkaStreamConfig kafkaStreamConfig = - AstraConfigs.PreprocessorConfig.KafkaStreamConfig.newBuilder() - .setApplicationId(applicationId) - .setNumStreamThreads(numStreamThreads) - .build(); - PreprocessorService.makeKafkaStreamsProps(kafkaStreamConfig); - }); - - assertThatIllegalArgumentException() - .isThrownBy( - () -> { - AstraConfigs.PreprocessorConfig.KafkaStreamConfig kafkaStreamConfig = - AstraConfigs.PreprocessorConfig.KafkaStreamConfig.newBuilder() - .setApplicationId(applicationId) - .setBootstrapServers(bootstrapServers) - .build(); - PreprocessorService.makeKafkaStreamsProps(kafkaStreamConfig); - }); - - assertThatIllegalArgumentException() - .isThrownBy( - () -> { - AstraConfigs.PreprocessorConfig.KafkaStreamConfig kafkaStreamConfig = - AstraConfigs.PreprocessorConfig.KafkaStreamConfig.newBuilder() - .setApplicationId(applicationId) - .setBootstrapServers(bootstrapServers) - .setNumStreamThreads(0) - .build(); - PreprocessorService.makeKafkaStreamsProps(kafkaStreamConfig); - }); - } - - @Test - public void shouldCorrectlyThroughputSortDatasets() { - DatasetMetadata datasetMetadata1 = - new DatasetMetadata( - "service1", - "service1", - 1, - List.of(new DatasetPartitionMetadata(100, 200, List.of("0"))), - "no_service_matching_docs"); - DatasetMetadata datasetMetadata2 = - new DatasetMetadata( - "service2", - "service2", - 3, - List.of(new DatasetPartitionMetadata(100, 200, List.of("0"))), - DatasetMetadata.MATCH_ALL_SERVICE); - DatasetMetadata datasetMetadata3 = - new DatasetMetadata( - "service3", - "service3", - 2, - List.of(new DatasetPartitionMetadata(100, 200, List.of("0"))), - DatasetMetadata.MATCH_ALL_SERVICE); - - List throughputSortedDatasets = - PreprocessorService.sortDatasetsOnThroughput( - List.of(datasetMetadata1, datasetMetadata2, datasetMetadata3)); - assertThat(throughputSortedDatasets.size()).isEqualTo(3); - assertThat(throughputSortedDatasets.get(0).getName()).isEqualTo("service2"); - assertThat(throughputSortedDatasets.get(1).getName()).isEqualTo("service3"); - assertThat(throughputSortedDatasets.get(2).getName()).isEqualTo("service1"); - } - - @Test - public void testValidDatasetMetadataMappingAndStreamPartition() { - String datasetName1 = "datasetName1"; - List partitionList1 = List.of(33, 44, 55); - DatasetMetadata datasetMetadata1 = - new DatasetMetadata( - datasetName1, - datasetName1, - 1, - List.of(new DatasetPartitionMetadata(100, Long.MAX_VALUE, List.of("33", "44", "55"))), - datasetName1); - - String datasetName2 = "datasetName2"; - List partitionList2 = List.of(1, 2, 3); - DatasetMetadata datasetMetadata2 = - new DatasetMetadata( - datasetName2, - datasetName2, - 1, - List.of(new DatasetPartitionMetadata(500, Long.MAX_VALUE, List.of("1", "2", "3"))), - datasetName2); - - StreamPartitioner streamPartitioner = - new PreprocessorPartitioner<>(List.of(datasetMetadata1, datasetMetadata2), 100); - - Trace.Span span = - Trace.Span.newBuilder() - .addTags( - Trace.KeyValue.newBuilder().setKey(SERVICE_NAME_KEY).setVStr(datasetName1).build()) - .build(); - - // all arguments except value are currently unused for determining the partition to assign, as - // this comes the internal partition list that is set on stream partitioner initialization - assertThat(partitionList1.contains(streamPartitioner.partition("topic", null, span, 0))) - .isTrue(); - assertThat(partitionList1.contains(streamPartitioner.partition("topic", null, span, 1))) - .isTrue(); - assertThat(partitionList1.contains(streamPartitioner.partition("topic", "", span, 0))).isTrue(); - assertThat(partitionList1.contains(streamPartitioner.partition("", null, span, 0))).isTrue(); - - StreamPartitioner streamPartitioner2 = - new PreprocessorPartitioner<>(List.of(datasetMetadata1, datasetMetadata2), 100); - - Trace.Span span2 = - Trace.Span.newBuilder() - .addTags( - Trace.KeyValue.newBuilder().setKey(SERVICE_NAME_KEY).setVStr(datasetName2).build()) - .build(); - - // all arguments except value are currently unused for determining the partition to assign, as - // this comes the internal partition list that is set on stream partitioner initialization - assertThat(partitionList2.contains(streamPartitioner2.partition("topic", null, span2, 0))) - .isTrue(); - assertThat(partitionList2.contains(streamPartitioner2.partition("topic", null, span2, 1))) - .isTrue(); - assertThat(partitionList2.contains(streamPartitioner2.partition("topic", "", span2, 0))) - .isTrue(); - assertThat(partitionList2.contains(streamPartitioner2.partition("", null, span2, 0))).isTrue(); - } - - @Test - public void shouldReturnRandomPartitionFromStreamPartitioner() { - String datasetName = "datasetName"; - List partitionList = List.of(33, 44, 55); - DatasetMetadata datasetMetadata = - new DatasetMetadata( - datasetName, - datasetName, - 1, - List.of(new DatasetPartitionMetadata(100, Long.MAX_VALUE, List.of("33", "44", "55"))), - datasetName); - StreamPartitioner streamPartitioner = - new PreprocessorPartitioner<>(List.of(datasetMetadata), 100); - - Trace.Span span = - Trace.Span.newBuilder() - .addTags( - Trace.KeyValue.newBuilder().setKey(SERVICE_NAME_KEY).setVStr(datasetName).build()) - .build(); - - // all arguments except value are currently unused for determining the partition to assign, as - // this comes the internal partition list that is set on stream partitioner initialization - assertThat(partitionList.contains(streamPartitioner.partition("topic", null, span, 0))) - .isTrue(); - assertThat(partitionList.contains(streamPartitioner.partition("topic", null, span, 1))) - .isTrue(); - assertThat(partitionList.contains(streamPartitioner.partition("topic", "", span, 0))).isTrue(); - assertThat(partitionList.contains(streamPartitioner.partition("", null, span, 0))).isTrue(); - } - - @Test - public void shouldNotCachePartitionBeyondStickyTimeout() throws InterruptedException { - int stickyTimeoutMs = 1; - List partitions = - IntStream.range(0, 100).mapToObj(String::valueOf).collect(Collectors.toList()); - - String datasetName = "datasetName"; - DatasetMetadata datasetMetadata = - new DatasetMetadata( - datasetName, - datasetName, - 1, - List.of(new DatasetPartitionMetadata(100, Long.MAX_VALUE, partitions)), - datasetName); - StreamPartitioner streamPartitioner = - new PreprocessorPartitioner<>(List.of(datasetMetadata), stickyTimeoutMs); - - Trace.Span span = - Trace.Span.newBuilder() - .addTags( - Trace.KeyValue.newBuilder().setKey(SERVICE_NAME_KEY).setVStr(datasetName).build()) - .build(); - - int partition = streamPartitioner.partition("topic", null, span, Integer.MAX_VALUE); - int count = 0; - int nextPartition = -1; - while (count++ < 5) { - Thread.sleep(stickyTimeoutMs); - nextPartition = streamPartitioner.partition("topic", null, span, Integer.MAX_VALUE); - if (nextPartition != partition) { - break; - } - } - if (count == 5 && partition == nextPartition) { - fail("Should not have gotten the same partition number"); - } - } - - @Test - public void shouldFilterInvalidConfigurationsFromServiceMetadata() { - DatasetMetadata validDatasetMetadata = - new DatasetMetadata( - "valid", - "owner1", - 1000, - List.of(new DatasetPartitionMetadata(1, Long.MAX_VALUE, List.of("1"))), - "valid"); - - List datasetMetadataList = - List.of( - new DatasetMetadata( - "invalidServicePartitionList", - "owner1", - 1000, - List.of(), - "invalidServicePartitionList"), - new DatasetMetadata( - "invalidThroughputBytes", - "owner1", - 0, - List.of(new DatasetPartitionMetadata(1, Long.MAX_VALUE, List.of("1"))), - "invalidThroughputBytes"), - new DatasetMetadata( - "invalidActivePartitions", - "owner1", - 1000, - List.of(new DatasetPartitionMetadata(1, Long.MAX_VALUE, List.of())), - "invalidActivePartitions"), - new DatasetMetadata( - "invalidNoActivePartitions", - "owner1", - 1000, - List.of( - new DatasetPartitionMetadata(1, Instant.now().toEpochMilli(), List.of("1"))), - "invalidNoActivePartitions"), - validDatasetMetadata); - - List datasetMetadata1 = - PreprocessorService.filterValidDatasetMetadata(datasetMetadataList); - - assertThat(datasetMetadata1.size()).isEqualTo(1); - assertThat(datasetMetadata1.contains(validDatasetMetadata)).isTrue(); - - Collections.shuffle(datasetMetadata1); - - List datasetMetadata2 = - PreprocessorService.filterValidDatasetMetadata(datasetMetadataList); - - assertThat(datasetMetadata2.size()).isEqualTo(1); - assertThat(datasetMetadata2.contains(validDatasetMetadata)).isTrue(); - - List datasetMetadata3 = - PreprocessorService.filterValidDatasetMetadata(List.of()); - assertThat(datasetMetadata3.size()).isEqualTo(0); - } - - @Test - public void shouldGetActivePartitionsFromServiceMetadata() { - DatasetMetadata datasetMetadataEmptyPartitions = - new DatasetMetadata("empty", "owner1", 1000, List.of(), "empty"); - DatasetMetadata datasetMetadataNoActivePartitions = - new DatasetMetadata( - "empty", - "owner1", - 1000, - List.of( - new DatasetPartitionMetadata(1, Instant.now().toEpochMilli(), List.of("1", "2"))), - "empty"); - - DatasetMetadata datasetMetadataNoPartitions = - new DatasetMetadata( - "empty", - "owner1", - 1000, - List.of(new DatasetPartitionMetadata(1, Long.MAX_VALUE, List.of())), - "empty"); - - DatasetMetadata datasetMetadataMultiplePartitions = - new DatasetMetadata( - "empty", - "owner1", - 1000, - List.of( - new DatasetPartitionMetadata(1, 10000, List.of("3", "4")), - new DatasetPartitionMetadata(10001, Long.MAX_VALUE, List.of("5", "6"))), - "empty"); - - assertThat(PreprocessorService.getActivePartitionList(datasetMetadataEmptyPartitions)) - .isEqualTo(List.of()); - assertThat(PreprocessorService.getActivePartitionList(datasetMetadataNoActivePartitions)) - .isEqualTo(List.of()); - assertThat(PreprocessorService.getActivePartitionList(datasetMetadataNoPartitions)) - .isEqualTo(List.of()); - assertThat(PreprocessorService.getActivePartitionList(datasetMetadataMultiplePartitions)) - .isEqualTo(List.of(5, 6)); - } - - @Test - public void shouldBuildStreamTopology() { - List datasetMetadata = - List.of( - new DatasetMetadata( - "dataset1", - "owner1", - 1000, - List.of(new DatasetPartitionMetadata(1, Long.MAX_VALUE, List.of("1", "2"))), - "dataset1"), - new DatasetMetadata( - "dataset2", - "owner1", - 1000, - List.of(new DatasetPartitionMetadata(1, Long.MAX_VALUE, List.of("1", "2"))), - "dataset2")); - - MeterRegistry meterRegistry = new SimpleMeterRegistry(); - int preprocessorCount = 1; - int maxBurstSeconds = 1; - boolean initializeWarm = false; - PreprocessorRateLimiter rateLimiter = - new PreprocessorRateLimiter( - meterRegistry, preprocessorCount, maxBurstSeconds, initializeWarm); - - List upstreamTopics = List.of("upstream1", "upstream2", "upstream3"); - String downstreamTopic = "downstream"; - String dataTransformer = "api_log"; - Topology topology = - PreprocessorService.buildTopology( - datasetMetadata, rateLimiter, upstreamTopics, downstreamTopic, dataTransformer, 100); - - // we have limited visibility into the topology, so we just verify we have the correct number of - // stream processors as we expect - assertThat(topology.describe().subtopologies().size()).isEqualTo(upstreamTopics.size()); - } - - @Test - public void shouldThrowOnInvalidTopologyConfigs() { - DatasetMetadata datasetMetadata = - new DatasetMetadata( - "dataset1", - "owner1", - 1000, - List.of(new DatasetPartitionMetadata(1, Long.MAX_VALUE, List.of("1", "2"))), - "dataset1"); - - MeterRegistry meterRegistry = new SimpleMeterRegistry(); - int preprocessorCount = 1; - int maxBurstSeconds = 1; - boolean initializeWarm = false; - PreprocessorRateLimiter rateLimiter = - new PreprocessorRateLimiter( - meterRegistry, preprocessorCount, maxBurstSeconds, initializeWarm); - List upstreamTopics = List.of("upstream"); - String downstreamTopic = "downstream"; - String dataTransformer = "api_log"; - - assertThatIllegalArgumentException() - .isThrownBy( - () -> - PreprocessorService.buildTopology( - List.of(), rateLimiter, upstreamTopics, downstreamTopic, dataTransformer, 100)); - assertThatIllegalArgumentException() - .isThrownBy( - () -> - PreprocessorService.buildTopology( - List.of(datasetMetadata), - rateLimiter, - List.of(), - downstreamTopic, - dataTransformer, - 100)); - assertThatNullPointerException() - .isThrownBy( - () -> - PreprocessorService.buildTopology( - List.of(datasetMetadata), - null, - upstreamTopics, - downstreamTopic, - dataTransformer, - 100)); - assertThatIllegalArgumentException() - .isThrownBy( - () -> - PreprocessorService.buildTopology( - List.of(datasetMetadata), - rateLimiter, - upstreamTopics, - "", - dataTransformer, - 100)); - assertThatIllegalArgumentException() - .isThrownBy( - () -> - PreprocessorService.buildTopology( - List.of(datasetMetadata), - rateLimiter, - upstreamTopics, - downstreamTopic, - "", - 100)); - assertThatIllegalArgumentException() - .isThrownBy( - () -> - PreprocessorService.buildTopology( - List.of(datasetMetadata), - rateLimiter, - upstreamTopics, - downstreamTopic, - "invalid", - 100)); - } - - @Test - public void shouldHandleEmptyDatasetMetadata() throws TimeoutException { - DatasetMetadataStore datasetMetadataStore = mock(DatasetMetadataStore.class); - when(datasetMetadataStore.listSync()).thenReturn(List.of()); - - AstraConfigs.PreprocessorConfig.KafkaStreamConfig kafkaStreamConfig = - AstraConfigs.PreprocessorConfig.KafkaStreamConfig.newBuilder() - .setApplicationId("applicationId") - .setBootstrapServers("bootstrap") - .setNumStreamThreads(1) - .build(); - AstraConfigs.ServerConfig serverConfig = - AstraConfigs.ServerConfig.newBuilder() - .setServerPort(8080) - .setServerAddress("localhost") - .build(); - AstraConfigs.PreprocessorConfig preprocessorConfig = - AstraConfigs.PreprocessorConfig.newBuilder() - .setKafkaStreamConfig(kafkaStreamConfig) - .setServerConfig(serverConfig) - .setPreprocessorInstanceCount(1) - .setDataTransformer("api_log") - .setRateLimiterMaxBurstSeconds(1) - .build(); - - SimpleMeterRegistry meterRegistry = new SimpleMeterRegistry(); - PreprocessorService preprocessorService = - new PreprocessorService(datasetMetadataStore, preprocessorConfig, meterRegistry); - - preprocessorService.startAsync(); - preprocessorService.awaitRunning(DEFAULT_START_STOP_DURATION); - - assertThat(MetricsUtil.getTimerCount(PreprocessorService.CONFIG_RELOAD_TIMER, meterRegistry)) - .isEqualTo(1); - - preprocessorService.stopAsync(); - preprocessorService.awaitTerminated(); - } -} diff --git a/astra/src/test/java/com/slack/astra/preprocessor/PreprocessorValueMapperTest.java b/astra/src/test/java/com/slack/astra/preprocessor/PreprocessorValueMapperTest.java deleted file mode 100644 index 50c5cd4810..0000000000 --- a/astra/src/test/java/com/slack/astra/preprocessor/PreprocessorValueMapperTest.java +++ /dev/null @@ -1,166 +0,0 @@ -package com.slack.astra.preprocessor; - -import static com.slack.astra.testlib.SpanUtil.makeSpan; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; - -import com.google.protobuf.ByteString; -import com.slack.service.murron.Murron; -import com.slack.service.murron.trace.Trace; -import java.nio.charset.StandardCharsets; -import java.util.Iterator; -import org.apache.kafka.streams.kstream.ValueMapper; -import org.junit.jupiter.api.Test; - -public class PreprocessorValueMapperTest { - - @Test - public void shouldValueMapApiLog() { - // Make a test message - String message = - "{\"http_method\":\"POST\",\"method\":\"verifyToken\",\"type\":\"api_log\",\"level\":\"info\"}"; - String indexName = "api_log"; - String host = "www-host"; - long timestamp = 1612550512340953000L; - Murron.MurronMessage testMurronMsg = - Murron.MurronMessage.newBuilder() - .setMessage(ByteString.copyFrom(message.getBytes(StandardCharsets.UTF_8))) - .setType(indexName) - .setHost(host) - .setTimestamp(timestamp) - .build(); - - ValueMapper> valueMapper = - PreprocessorValueMapper.byteArrayToTraceSpans("api_log"); - byte[] inputBytes = - AstraSerdes.MurronMurronMessage().serializer().serialize(indexName, testMurronMsg); - - Iterable spanIterable = valueMapper.apply(inputBytes); - Iterator spanIterator = spanIterable.iterator(); - Trace.Span span = spanIterator.next(); - - assertThat(span.getTimestamp()).isEqualTo(timestamp / 1000); - assertThat(span.getDuration()).isEqualTo(1); - - assertThat(span.getTagsList().size()).isEqualTo(6); - assertThat( - span.getTagsList() - .contains( - Trace.KeyValue.newBuilder().setKey("http_method").setVStr("POST").build())) - .isTrue(); - assertThat( - span.getTagsList() - .contains( - Trace.KeyValue.newBuilder().setKey("method").setVStr("verifyToken").build())) - .isTrue(); - assertThat( - span.getTagsList() - .contains(Trace.KeyValue.newBuilder().setKey("type").setVStr(indexName).build())) - .isTrue(); - assertThat( - span.getTagsList() - .contains(Trace.KeyValue.newBuilder().setKey("level").setVStr("info").build())) - .isTrue(); - assertThat( - span.getTagsList() - .contains(Trace.KeyValue.newBuilder().setKey("hostname").setVStr(host).build())) - .isTrue(); - assertThat( - span.getTagsList() - .contains( - Trace.KeyValue.newBuilder().setKey("service_name").setVStr(indexName).build())) - .isTrue(); - assertThat(spanIterator.hasNext()).isFalse(); - } - - @Test - public void shouldValueMapSpans() { - final String traceId = "t1"; - final String id = "i1"; - final String parentId = "p2"; - final long timestampMicros = 1612550512340953L; - final long durationMicros = 500000L; - final String serviceName = "test_service"; - final String name = "testSpanName"; - final String msgType = "test_message_type"; - final Trace.Span inputSpan = - makeSpan( - traceId, id, parentId, timestampMicros, durationMicros, name, serviceName, msgType); - - final String type = "testIndex"; - final String host = "testHost"; - final Murron.MurronMessage murronMessage = - Murron.MurronMessage.newBuilder() - .setMessage(Trace.ListOfSpans.newBuilder().addSpans(inputSpan).build().toByteString()) - .setType(type) - .setHost(host) - .setTimestamp(timestampMicros * 1000 + 1) - .build(); - - ValueMapper> valueMapper = - PreprocessorValueMapper.byteArrayToTraceSpans("spans"); - byte[] inputBytes = - AstraSerdes.MurronMurronMessage().serializer().serialize(serviceName, murronMessage); - - Iterable spanIterable = valueMapper.apply(inputBytes); - Iterator spanIterator = spanIterable.iterator(); - Trace.Span mappedSpan = spanIterator.next(); - - assertThat(mappedSpan.getTimestamp()).isEqualTo(timestampMicros); - assertThat(mappedSpan.getDuration()).isEqualTo(durationMicros); - - assertThat(mappedSpan.getTagsList().size()).isEqualTo(8); - assertThat( - mappedSpan - .getTagsList() - .contains( - Trace.KeyValue.newBuilder() - .setKey("service_name") - .setVStr("test_service") - .build())) - .isTrue(); - assertThat( - mappedSpan - .getTagsList() - .contains( - Trace.KeyValue.newBuilder().setKey("http_method").setVStr("POST").build())) - .isTrue(); - assertThat( - mappedSpan - .getTagsList() - .contains( - Trace.KeyValue.newBuilder() - .setKey("method") - .setVStr("callbacks.flannel") - .build())) - .isTrue(); - assertThat(spanIterator.hasNext()).isFalse(); - } - - @Test - public void shouldPreventInvalidTransform() { - assertThatIllegalArgumentException() - .isThrownBy( - () -> { - PreprocessorValueMapper.byteArrayToTraceSpans("invalid"); - }); - } - - @Test - public void shouldExtractServiceName() { - String service1 = "service1"; - Trace.Span span1 = - Trace.Span.newBuilder() - .addTags( - Trace.KeyValue.newBuilder() - .setKey(PreprocessorValueMapper.SERVICE_NAME_KEY) - .setVStr(service1) - .build()) - .build(); - - assertThat(PreprocessorValueMapper.getServiceName(span1)).isEqualTo(service1); - - Trace.Span span2 = Trace.Span.newBuilder().build(); - assertThat(PreprocessorValueMapper.getServiceName(span2)).isNull(); - } -} diff --git a/astra/src/test/java/com/slack/astra/server/AstraConfigTest.java b/astra/src/test/java/com/slack/astra/server/AstraConfigTest.java index 2a29c158ec..5dfffb4b52 100644 --- a/astra/src/test/java/com/slack/astra/server/AstraConfigTest.java +++ b/astra/src/test/java/com/slack/astra/server/AstraConfigTest.java @@ -270,10 +270,7 @@ public void testParseAstraJsonConfigFile() throws IOException { final AstraConfigs.PreprocessorConfig preprocessorConfig = config.getPreprocessorConfig(); assertThat(preprocessorConfig.getPreprocessorInstanceCount()).isEqualTo(1); - assertThat(preprocessorConfig.getUpstreamTopicsList()).isEqualTo(List.of("test-topic")); - assertThat(preprocessorConfig.getDownstreamTopic()).isEqualTo("test-topic-out"); assertThat(preprocessorConfig.getRateLimiterMaxBurstSeconds()).isEqualTo(2); - assertThat(preprocessorConfig.getUseBulkApi()).isEqualTo(false); assertThat(preprocessorConfig.getRateLimitExceededErrorCode()).isEqualTo(400); assertThat(preprocessorConfig.getSchemaFile()).isEqualTo("schema/test_schema.yaml"); @@ -285,13 +282,6 @@ public void testParseAstraJsonConfigFile() throws IOException { final AstraConfigs.ServerConfig preprocessorServerConfig = preprocessorConfig.getServerConfig(); assertThat(preprocessorServerConfig.getServerPort()).isEqualTo(8085); assertThat(preprocessorServerConfig.getServerAddress()).isEqualTo("localhost"); - - final AstraConfigs.PreprocessorConfig.KafkaStreamConfig preprocessorKafkaStreamConfig = - preprocessorConfig.getKafkaStreamConfig(); - assertThat(preprocessorKafkaStreamConfig.getBootstrapServers()).isEqualTo("localhost:9092"); - assertThat(preprocessorKafkaStreamConfig.getApplicationId()).isEqualTo("astra_preprocessor"); - assertThat(preprocessorKafkaStreamConfig.getNumStreamThreads()).isEqualTo(2); - assertThat(preprocessorKafkaStreamConfig.getProcessingGuarantee()).isEqualTo("at_least_once"); } @Test @@ -446,8 +436,6 @@ public void testParseAstraYamlConfigFile() throws IOException { final AstraConfigs.PreprocessorConfig preprocessorConfig = config.getPreprocessorConfig(); assertThat(preprocessorConfig.getPreprocessorInstanceCount()).isEqualTo(1); - assertThat(preprocessorConfig.getUpstreamTopicsList()).isEqualTo(List.of("test-topic")); - assertThat(preprocessorConfig.getDownstreamTopic()).isEqualTo("test-topic-out"); assertThat(preprocessorConfig.getRateLimiterMaxBurstSeconds()).isEqualTo(2); final AstraConfigs.KafkaConfig preprocessorKafkaConfig = @@ -455,20 +443,12 @@ public void testParseAstraYamlConfigFile() throws IOException { assertThat(preprocessorKafkaConfig.getKafkaBootStrapServers()).isEqualTo("localhost:9092"); assertThat(preprocessorKafkaConfig.getKafkaTopic()).isEqualTo("test-topic"); - assertThat(preprocessorConfig.getUseBulkApi()).isEqualTo(true); assertThat(preprocessorConfig.getRateLimitExceededErrorCode()).isEqualTo(429); assertThat(preprocessorConfig.getSchemaFile()).isEqualTo("schema/test_schema.yaml"); final AstraConfigs.ServerConfig preprocessorServerConfig = preprocessorConfig.getServerConfig(); assertThat(preprocessorServerConfig.getServerPort()).isEqualTo(8085); assertThat(preprocessorServerConfig.getServerAddress()).isEqualTo("localhost"); - - final AstraConfigs.PreprocessorConfig.KafkaStreamConfig preprocessorKafkaStreamConfig = - preprocessorConfig.getKafkaStreamConfig(); - assertThat(preprocessorKafkaStreamConfig.getBootstrapServers()).isEqualTo("localhost:9092"); - assertThat(preprocessorKafkaStreamConfig.getApplicationId()).isEqualTo("astra_preprocessor"); - assertThat(preprocessorKafkaStreamConfig.getNumStreamThreads()).isEqualTo(2); - assertThat(preprocessorKafkaStreamConfig.getProcessingGuarantee()).isEqualTo("at_least_once"); } @Test @@ -615,21 +595,11 @@ public void testEmptyJsonStringInit() throws InvalidProtocolBufferException { final AstraConfigs.PreprocessorConfig preprocessorConfig = config.getPreprocessorConfig(); assertThat(preprocessorConfig.getPreprocessorInstanceCount()).isZero(); - assertThat(preprocessorConfig.getUpstreamTopicsList()).isEmpty(); - assertThat(preprocessorConfig.getDownstreamTopic()).isEmpty(); assertThat(preprocessorConfig.getRateLimiterMaxBurstSeconds()).isZero(); - assertThat(preprocessorConfig.getUseBulkApi()).isFalse(); final AstraConfigs.ServerConfig preprocessorServerConfig = preprocessorConfig.getServerConfig(); assertThat(preprocessorServerConfig.getServerPort()).isZero(); assertThat(preprocessorServerConfig.getServerAddress()).isEmpty(); - - final AstraConfigs.PreprocessorConfig.KafkaStreamConfig preprocessorKafkaStreamConfig = - preprocessorConfig.getKafkaStreamConfig(); - assertThat(preprocessorKafkaStreamConfig.getBootstrapServers()).isEmpty(); - assertThat(preprocessorKafkaStreamConfig.getApplicationId()).isEmpty(); - assertThat(preprocessorKafkaStreamConfig.getNumStreamThreads()).isZero(); - assertThat(preprocessorKafkaStreamConfig.getProcessingGuarantee()).isEmpty(); } @Test @@ -743,21 +713,11 @@ public void testEmptyYamlStringInit() final AstraConfigs.PreprocessorConfig preprocessorConfig = config.getPreprocessorConfig(); assertThat(preprocessorConfig.getPreprocessorInstanceCount()).isZero(); - assertThat(preprocessorConfig.getUpstreamTopicsList()).isEmpty(); - assertThat(preprocessorConfig.getDownstreamTopic()).isEmpty(); assertThat(preprocessorConfig.getRateLimiterMaxBurstSeconds()).isZero(); final AstraConfigs.ServerConfig preprocessorServerConfig = preprocessorConfig.getServerConfig(); assertThat(preprocessorServerConfig.getServerPort()).isZero(); assertThat(preprocessorServerConfig.getServerAddress()).isEmpty(); - assertThat(preprocessorConfig.getUseBulkApi()).isFalse(); - - final AstraConfigs.PreprocessorConfig.KafkaStreamConfig preprocessorKafkaStreamConfig = - preprocessorConfig.getKafkaStreamConfig(); - assertThat(preprocessorKafkaStreamConfig.getBootstrapServers()).isEmpty(); - assertThat(preprocessorKafkaStreamConfig.getApplicationId()).isEmpty(); - assertThat(preprocessorKafkaStreamConfig.getNumStreamThreads()).isZero(); - assertThat(preprocessorKafkaStreamConfig.getProcessingGuarantee()).isEmpty(); } @Test diff --git a/astra/src/test/java/com/slack/astra/server/BulkIngestApiTest.java b/astra/src/test/java/com/slack/astra/server/BulkIngestApiTest.java index 1c6ae9a252..43b10377ea 100644 --- a/astra/src/test/java/com/slack/astra/server/BulkIngestApiTest.java +++ b/astra/src/test/java/com/slack/astra/server/BulkIngestApiTest.java @@ -98,7 +98,6 @@ public void bootstrapCluster() throws Exception { preprocessorConfig = AstraConfigs.PreprocessorConfig.newBuilder() .setKafkaConfig(kafkaConfig) - .setUseBulkApi(true) .setServerConfig(serverConfig) .setPreprocessorInstanceCount(1) .setRateLimiterMaxBurstSeconds(1) diff --git a/astra/src/test/java/com/slack/astra/writer/JsonLogFormatterTest.java b/astra/src/test/java/com/slack/astra/writer/JsonLogFormatterTest.java deleted file mode 100644 index 4a5e014b47..0000000000 --- a/astra/src/test/java/com/slack/astra/writer/JsonLogFormatterTest.java +++ /dev/null @@ -1,67 +0,0 @@ -package com.slack.astra.writer; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatIOException; -import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; - -import com.slack.service.murron.trace.Trace; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.Set; -import java.util.stream.Collectors; -import org.junit.jupiter.api.Test; - -public class JsonLogFormatterTest { - - @Test - public void testJsonByteArrayToTraceSpan() throws IOException { - String json = - """ - { - "service_name": "my-service", - "@timestamp": "2007-12-03T10:15:30.00Z" - }"""; - Trace.Span span = JsonLogFormatter.fromJsonLog(json.getBytes(StandardCharsets.UTF_8)); - assertThat(span.getName()).isEqualTo("my-service"); - assertThat(span.getTimestamp()).isEqualTo(1196676930000L); // milliseconds - - json = - """ - { - "service_name": "my-service", - "@timestamp": "2007-12-03T10:15:30.00Z", - "field1": "value1" - }"""; - span = JsonLogFormatter.fromJsonLog(json.getBytes(StandardCharsets.UTF_8)); - assertThat(span.getTagsList().size()).isEqualTo(3); - Set tags = - span.getTagsList().stream() - .filter(kv -> kv.getKey().equals("field1")) - .collect(Collectors.toSet()); - assertThat(tags.size()).isEqualTo(1); - assertThat(tags.iterator().next().getVStr()).isEqualTo("value1"); - assertThat(span.getTimestamp()).isEqualTo(1196676930000L); // milliseconds - } - - @Test - public void testMissingJsonKeys() { - assertThatIllegalArgumentException() - .isThrownBy(() -> JsonLogFormatter.fromJsonLog("{}}".getBytes(StandardCharsets.UTF_8))); - assertThatIOException() - .isThrownBy(() -> JsonLogFormatter.fromJsonLog("{".getBytes(StandardCharsets.UTF_8))); - assertThatIllegalArgumentException() - .isThrownBy( - () -> - JsonLogFormatter.fromJsonLog( - """ - {"service_name": "my-service"}""" - .getBytes(StandardCharsets.UTF_8))); - assertThatIllegalArgumentException() - .isThrownBy( - () -> - JsonLogFormatter.fromJsonLog( - """ - {"@timestamp": "2007-12-03T10:15:30.00Z"}""" - .getBytes(StandardCharsets.UTF_8))); - } -} diff --git a/astra/src/test/java/com/slack/astra/writer/MurronLogFormatterTest.java b/astra/src/test/java/com/slack/astra/writer/MurronLogFormatterTest.java deleted file mode 100644 index 3e809cc76d..0000000000 --- a/astra/src/test/java/com/slack/astra/writer/MurronLogFormatterTest.java +++ /dev/null @@ -1,182 +0,0 @@ -package com.slack.astra.writer; - -import static com.slack.astra.writer.MurronLogFormatter.API_LOG_DURATION_FIELD; -import static com.slack.astra.writer.MurronLogFormatter.ENVOY_DURATION_FIELD; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.exc.MismatchedInputException; -import com.google.protobuf.ByteString; -import com.slack.astra.logstore.LogMessage; -import com.slack.astra.proto.schema.Schema; -import com.slack.service.murron.Murron; -import com.slack.service.murron.trace.Trace; -import java.nio.charset.StandardCharsets; -import java.util.List; -import org.junit.jupiter.api.Test; - -public class MurronLogFormatterTest { - public Object getTagValue(List tags, String key) { - for (Trace.KeyValue tag : tags) { - if (tag.getKey().equals(key)) { - Schema.SchemaFieldType schemaFieldType = tag.getFieldType(); - if (schemaFieldType.equals(Schema.SchemaFieldType.STRING) - || schemaFieldType.equals(Schema.SchemaFieldType.KEYWORD)) { - return tag.getVStr(); - } - if (schemaFieldType.equals(Schema.SchemaFieldType.INTEGER)) { - return tag.getVInt32(); - } - if (schemaFieldType.equals(Schema.SchemaFieldType.LONG)) { - return tag.getVInt64(); - } - if (schemaFieldType.equals(Schema.SchemaFieldType.DOUBLE)) { - return tag.getVFloat64(); - } - if (schemaFieldType.equals(Schema.SchemaFieldType.BOOLEAN)) { - return tag.getVBool(); - } - if (schemaFieldType.equals(Schema.SchemaFieldType.BINARY)) { - return tag.getVBinary(); - } - } - } - return null; - } - - @Test - public void testApiSpanConversion() throws JsonProcessingException { - String message = - "{\"ip_address\":\"3.86.63.133\",\"http_method\":\"POST\",\"method\":true,\"enterprise\":\"E012Y1ZD5PU\",\"team\":\"T012YTS8XKM\",\"user\":\"U012YTS942X\",\"status\":\"ok\",\"http_params\":\"flannel_host=flannelbe-dev-iad-iaz2&include_permissions=falseth\",\"ua\":\"Slack-Flannel-Web\\/vef2bd:4046\",\"unique_id\":\"YB2RcPgcUv7PCuIbo8posQAAoDg\",\"request_queue_time\":2262,\"microtime_elapsed\":1418,\"mysql_query_count\":0,\"mysql_query_time\":0,\"mysql_conns_count\":0,\"mysql_conns_time\":0,\"mysql_rows_count\":0,\"mysql_rows_affected\":0,\"mc_queries_count\":11,\"mc_queries_time\":6782,\"frl_time\":0,\"init_time\":1283,\"api_dispatch_time\":0,\"api_output_time\":0,\"api_output_size\":0,\"api_strict\":false,\"ekm_decrypt_reqs_time\":0,\"ekm_decrypt_reqs_count\":0,\"ekm_encrypt_reqs_time\":0,\"ekm_encrypt_reqs_count\":0,\"grpc_req_count\":0,\"grpc_req_time\":0,\"agenda_req_count\":0,\"agenda_req_time\":0,\"trace\":\"#route_main() -> lib_controller.php:69#Controller::handlePost() -> Controller.hack:58#CallbackApiController::handleRequest() -> api.php:45#local_callbacks_api_main_inner() -> api.php:250#api_dispatch() -> lib_api.php:179#api_callbacks_flannel_verifyToken() -> api__callbacks_flannel.php:1714#api_output_fb_thrift() -> lib_api_output.php:390#_api_output_log_call()\",\"client_connection_state\":\"unset\",\"ms_requests_count\":0,\"ms_requests_time\":0,\"token_type\":\"cookie\",\"limited_access_requester_workspace\":\"\",\"limited_access_allowed_workspaces\":\"\",\"repo_auth\":true,\"cf_id\":\"6999afc7b6:haproxy-edge-dev-iad-igu2\",\"external_user\":\"W012XXXFC\",\"timestamp\":\"2021-02-05 10:41:52.340\",\"git_sha\":\"unknown\",\"hhvm_version\":\"4.39.0\",\"slath\":\"callbacks.flannel.verifyToken\",\"php_type\":\"api\",\"webapp_cluster_pbucket\":0,\"webapp_cluster_name\":\"callbacks\",\"webapp_cluster_nest\":\"normal\",\"dev_env\":\"dev-main\",\"pay_product_level\":\"enterprise\",\"level\":\"info\"}"; - String indexName = "hhvm-api_log"; - String host = "slack-www-hhvm-dev-dev-callbacks-iad-j8zj"; - long timestamp = 1612550512340953000L; - - // Make a test message - Murron.MurronMessage.Builder testMurronMsgBuilder = Murron.MurronMessage.newBuilder(); - testMurronMsgBuilder - .setMessage(ByteString.copyFrom(message.getBytes(StandardCharsets.UTF_8))) - .setType(indexName) - .setHost(host) - .setTimestamp(timestamp); - Murron.MurronMessage testMurronMsg = testMurronMsgBuilder.build(); - - Trace.Span apiSpan = MurronLogFormatter.fromApiLog(testMurronMsg); - assertThat(apiSpan.getDuration()).isEqualTo(1418L); - assertThat(apiSpan.getName()).isEqualTo(indexName); - assertThat(apiSpan.getId().toString().contains(host)).isTrue(); - assertThat(apiSpan.getTraceId().isEmpty()).isTrue(); - assertThat(apiSpan.getParentId().isEmpty()).isTrue(); - assertThat(apiSpan.getTimestamp()).isEqualTo(timestamp / 1000); - List tags = apiSpan.getTagsList(); - assertThat(getTagValue(tags, "http_method")).isEqualTo("POST"); - assertThat(getTagValue(tags, "mysql_conns_count")).isEqualTo(0); - assertThat((boolean) getTagValue(tags, "method")).isTrue(); - assertThat(getTagValue(tags, API_LOG_DURATION_FIELD)).isNull(); - assertThat(getTagValue(tags, LogMessage.ReservedField.PARENT_ID.fieldName)).isNull(); - assertThat(getTagValue(tags, LogMessage.ReservedField.TRACE_ID.fieldName)).isNull(); - assertThat(getTagValue(tags, LogMessage.ReservedField.HOSTNAME.fieldName)).isEqualTo(host); - assertThat(getTagValue(tags, LogMessage.ReservedField.SERVICE_NAME.fieldName)) - .isEqualTo("hhvm-api_log"); - } - - @Test - public void testApiSpanWithTraceContextConversion() throws JsonProcessingException { - final String message = - "{\"ip_address\":\"3.86.63.133\",\"trace_id\":\"123\",\"parent_id\":\"4567\",\"http_method\":\"POST\",\"method\":true,\"enterprise\":\"E012Y1ZD5PU\",\"team\":\"T012YTS8XKM\",\"user\":\"U012YTS942X\",\"status\":\"ok\",\"http_params\":\"flannel_host=flannelbe-dev-iad-iaz2&include_permissions=falseth\",\"ua\":\"Slack-Flannel-Web\\/vef2bd:4046\",\"unique_id\":\"YB2RcPgcUv7PCuIbo8posQAAoDg\",\"request_queue_time\":2262,\"microtime_elapsed\":14168,\"mysql_query_count\":0,\"mysql_query_time\":0,\"mysql_conns_count\":0,\"mysql_conns_time\":0,\"mysql_rows_count\":0,\"mysql_rows_affected\":0,\"mc_queries_count\":11,\"mc_queries_time\":6782,\"frl_time\":0,\"init_time\":1283,\"api_dispatch_time\":0,\"api_output_time\":0,\"api_output_size\":0,\"api_strict\":false,\"ekm_decrypt_reqs_time\":0,\"ekm_decrypt_reqs_count\":0,\"ekm_encrypt_reqs_time\":0,\"ekm_encrypt_reqs_count\":0,\"grpc_req_count\":0,\"grpc_req_time\":0,\"agenda_req_count\":0,\"agenda_req_time\":0,\"trace\":\"#route_main() -> lib_controller.php:69#Controller::handlePost() -> Controller.hack:58#CallbackApiController::handleRequest() -> api.php:45#local_callbacks_api_main_inner() -> api.php:250#api_dispatch() -> lib_api.php:179#api_callbacks_flannel_verifyToken() -> api__callbacks_flannel.php:1714#api_output_fb_thrift() -> lib_api_output.php:390#_api_output_log_call()\",\"client_connection_state\":\"unset\",\"ms_requests_count\":0,\"ms_requests_time\":0,\"token_type\":\"cookie\",\"limited_access_requester_workspace\":\"\",\"limited_access_allowed_workspaces\":\"\",\"repo_auth\":true,\"cf_id\":\"6999afc7b6:haproxy-edge-dev-iad-igu2\",\"external_user\":\"W012XXXFC\",\"timestamp\":\"2021-02-05 10:41:52.340\",\"git_sha\":\"unknown\",\"hhvm_version\":\"4.39.0\",\"slath\":\"callbacks.flannel.verifyToken\",\"php_type\":\"api\",\"webapp_cluster_pbucket\":0,\"webapp_cluster_name\":\"callbacks\",\"webapp_cluster_nest\":\"normal\",\"dev_env\":\"dev-main\",\"pay_product_level\":\"enterprise\",\"type\":\"api_log\",\"level\":\"info\"}"; - final String indexName = "hhvm-api_log"; - final String host = "slack-www-hhvm-dev-dev-callbacks-iad-j8zj"; - final long timestamp = 1612550512340953000L; - - // Make a test message - Murron.MurronMessage.Builder testMurronMsgBuilder = Murron.MurronMessage.newBuilder(); - testMurronMsgBuilder - .setMessage(ByteString.copyFrom(message.getBytes(StandardCharsets.UTF_8))) - .setType(indexName) - .setHost(host) - .setTimestamp(timestamp); - Murron.MurronMessage testMurronMsg = testMurronMsgBuilder.build(); - - Trace.Span apiSpan = MurronLogFormatter.fromApiLog(testMurronMsg); - assertThat(apiSpan.getName()).isEqualTo("api_log"); - assertThat(apiSpan.getDuration()).isEqualTo(14168L); - assertThat(apiSpan.getId().toString().contains(host)).isTrue(); - assertThat(apiSpan.getTraceId().toStringUtf8()).isEqualTo("123"); - assertThat(apiSpan.getParentId().toStringUtf8()).isEqualTo("4567"); - assertThat(apiSpan.getTimestamp()).isEqualTo(timestamp / 1000); - List tags = apiSpan.getTagsList(); - assertThat(getTagValue(tags, "http_method")).isEqualTo("POST"); - assertThat(getTagValue(tags, "mysql_conns_count")).isEqualTo(0); - assertThat((boolean) getTagValue(tags, "method")).isTrue(); - assertThat(getTagValue(tags, API_LOG_DURATION_FIELD)).isNull(); - assertThat(getTagValue(tags, LogMessage.ReservedField.PARENT_ID.fieldName)).isNull(); - assertThat(getTagValue(tags, LogMessage.ReservedField.TRACE_ID.fieldName)).isNull(); - assertThat(getTagValue(tags, LogMessage.ReservedField.HOSTNAME.fieldName)).isEqualTo(host); - assertThat(getTagValue(tags, LogMessage.ReservedField.SERVICE_NAME.fieldName)) - .isEqualTo("hhvm-api_log"); - } - - @Test - public void testNullMurronMessage() throws JsonProcessingException { - MurronLogFormatter.fromApiLog(null); - } - - @Test - public void testEmptyMurronMessage() { - assertThatExceptionOfType(MismatchedInputException.class) - .isThrownBy(() -> MurronLogFormatter.fromApiLog(Murron.MurronMessage.newBuilder().build())); - } - - @Test - public void testEnvoySpanWithTraceContextConversion() throws JsonProcessingException { - // duration as a string - testEnvoySpanWithTraceContextConversion( - "{\"unique_id\":\"d009c63c-be2f-47fa-8c53\",\"http_header_upgrade\":null,\"grpc_status\":null,\"tcpip_remote_ip\":\"10.232.200.31\",\"duration\":\"36\",\"req_method\":\"POST\",\"tcpip_local_with_port\":\"10.219.290.121:81\",\"tcpip_remote_with_port\":\"101.22.228.30:36884\",\"upstream_hostname\":\"slack-www-hhvm-main-iad-sss2\",\"downstream_host_with_port\":\"10.47.47.46:0\",\"request_id\":\"d009c63c-be2f-47fa-8c53\",\"path\":\"/api/eventlog.history\",\"bytes_sent\":69,\"region\":\"us-east-1\",\"http_header_connection\":null,\"zone\":\"us-east-1g\",\"timestamp\":\"2022-05-27T18:15:10.971Z\",\"authority\":\"slack.com\",\"bytes_received\":325,\"listener\":\"nebula\",\"forwarder\":\"127.47.47.40\",\"app\":\"envoy-www\",\"response_code\":200,\"service_time\":\"35\",\"protocol\":\"HTTP/1.1\",\"platform\":\"chef\",\"tcpip_local_ip\":\"10.1.1.1\",\"user_agent\":\"com.t.co/22.05.10 (iPhone; iOS 15.4.1; Scale/3.00)\",\"route_name\":\"main_default\",\"upstream_cluster\":\"main_normal\",\"zone_id\":\"use1-az4\",\"response_flags\":\"via_upstream:-\",\"grpc_unique_id\":null}", - true); - // duration as a int - testEnvoySpanWithTraceContextConversion( - "{\"unique_id\":\"d009c63c-be2f-47fa-8c53\",\"http_header_upgrade\":null,\"grpc_status\":null,\"tcpip_remote_ip\":\"10.232.200.31\",\"duration\":36,\"req_method\":\"POST\",\"tcpip_local_with_port\":\"10.219.290.121:81\",\"tcpip_remote_with_port\":\"101.22.228.30:36884\",\"upstream_hostname\":\"slack-www-hhvm-main-iad-sss2\",\"downstream_host_with_port\":\"10.47.47.46:0\",\"request_id\":\"d009c63c-be2f-47fa-8c53\",\"path\":\"/api/eventlog.history\",\"bytes_sent\":69,\"region\":\"us-east-1\",\"http_header_connection\":null,\"zone\":\"us-east-1g\",\"timestamp\":\"2022-05-27T18:15:10.971Z\",\"authority\":\"slack.com\",\"bytes_received\":325,\"listener\":\"nebula\",\"forwarder\":\"127.47.47.40\",\"app\":\"envoy-www\",\"response_code\":200,\"service_time\":\"35\",\"protocol\":\"HTTP/1.1\",\"platform\":\"chef\",\"tcpip_local_ip\":\"10.1.1.1\",\"user_agent\":\"com.t.co/22.05.10 (iPhone; iOS 15.4.1; Scale/3.00)\",\"route_name\":\"main_default\",\"upstream_cluster\":\"main_normal\",\"zone_id\":\"use1-az4\",\"response_flags\":\"via_upstream:-\",\"grpc_unique_id\":null}", - false); - } - - private void testEnvoySpanWithTraceContextConversion(String message, boolean isDurationAsString) - throws JsonProcessingException { - final String indexName = "envoy"; - final String host = "myserver-server-j8zj"; - final long timestamp = 1612550512340953000L; - final String requestId = "d009c63c-be2f-47fa-8c53"; - - // Make a test message - Murron.MurronMessage.Builder testMurronMsgBuilder = Murron.MurronMessage.newBuilder(); - testMurronMsgBuilder - .setMessage(ByteString.copyFrom(message.getBytes(StandardCharsets.UTF_8))) - .setType(indexName) - .setHost(host) - .setTimestamp(timestamp); - Murron.MurronMessage testMurronMsg = testMurronMsgBuilder.build(); - - Trace.Span apiSpan = MurronLogFormatter.fromEnvoyLog(testMurronMsg); - assertThat(apiSpan.getName()).isEqualTo(indexName); - assertThat(apiSpan.getDuration()).isEqualTo(36000L); - assertThat(apiSpan.getId().toStringUtf8()).isEqualTo(requestId); - assertThat(apiSpan.getTraceId().toStringUtf8()).isEqualTo(requestId); - assertThat(apiSpan.getParentId().toStringUtf8()).isEmpty(); - assertThat(apiSpan.getTimestamp()).isEqualTo(timestamp / 1000); - List tags = apiSpan.getTagsList(); - assertThat(getTagValue(tags, "tcpip_remote_ip")).isEqualTo("10.232.200.31"); - assertThat(getTagValue(tags, "req_method")).isEqualTo("POST"); - assertThat(getTagValue(tags, "bytes_sent")).isEqualTo(69); - assertThat(getTagValue(tags, "path")).isEqualTo("/api/eventlog.history"); - if (isDurationAsString) { - assertThat(getTagValue(tags, ENVOY_DURATION_FIELD)).isEqualTo("36"); - } else { - assertThat(getTagValue(tags, ENVOY_DURATION_FIELD)).isEqualTo(36); - } - assertThat(getTagValue(tags, LogMessage.ReservedField.PARENT_ID.fieldName)).isNull(); - assertThat(getTagValue(tags, LogMessage.ReservedField.TRACE_ID.fieldName)).isNull(); - assertThat(getTagValue(tags, LogMessage.ReservedField.HOSTNAME.fieldName)).isEqualTo(host); - assertThat(getTagValue(tags, LogMessage.ReservedField.SERVICE_NAME.fieldName)) - .isEqualTo(indexName); - } -} diff --git a/config/config.yaml b/config/config.yaml index 5e0b0e1029..e5b174ce8d 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -119,12 +119,6 @@ recoveryConfig: additionalProps: ${KAFKA_ADDITIONAL_PROPS:-} preprocessorConfig: - kafkaStreamConfig: - bootstrapServers: ${KAFKA_BOOTSTRAP_SERVERS:-localhost:9092} - applicationId: ${KAFKA_APPLICATION_ID:-ASTRA_preprocessor} - numStreamThreads: ${KAFKA_STREAM_THREADS:-2} - processingGuarantee: ${KAFKA_STREAM_PROCESSING_GUARANTEE:-at_least_once} - additionalProps: ${KAFKA_ADDITIONAL_PROPS:-} kafkaConfig: kafkaTopic: ${KAFKA_TOPIC:-test-topic} kafkaBootStrapServers: ${KAFKA_BOOTSTRAP_SERVERS:-localhost:9092} @@ -135,11 +129,6 @@ preprocessorConfig: serverPort: ${ASTRA_PREPROCESSOR_SERVER_PORT:-8086} serverAddress: ${ASTRA_PREPROCESSOR_SERVER_ADDRESS:-localhost} requestTimeoutMs: ${ASTRA_PREPROCESSOR_REQUEST_TIMEOUT_MS:-30000} - upstreamTopics: [${KAKFA_UPSTREAM_TOPICS:-test-topic-in}] - downstreamTopic: ${KAKFA_DOWNSTREAM_TOPIC:-test-topic} preprocessorInstanceCount: ${PREPROCESSOR_INSTANCE_COUNT:-1} - dataTransformer: ${PREPROCESSOR_TRANSFORMER:-json} rateLimiterMaxBurstSeconds: ${PREPROCESSOR_RATE_LIMITER_MAX_BURST_SECONDS:-1} - kafkaPartitionStickyTimeoutMs: ${KAFKA_PARTITION_STICKY_TIMEOUT_MS:-0} - useBulkApi: ${ASTRA_PREPROCESSOR_USE_BULK_API:-true} rateLimitExceededErrorCode: ${ASTRA_PREPROCESSOR_RATE_LIMIT_EXCEEDED_ERROR_CODE:-400} diff --git a/docs/topics/Config-options.md b/docs/topics/Config-options.md index 5546e8271f..0a25977b93 100644 --- a/docs/topics/Config-options.md +++ b/docs/topics/Config-options.md @@ -685,18 +685,6 @@ recoveryConfig: Configuration options for the preprocessor node. -### kafkaStreamConfig -```yaml -preprocessorConfig: - kafkaStreamConfig: - bootstrapServers: localhost:9092 - applicationId: astra_preprocessor - numStreamThreads: 2 - processingGuarantee: at_least_once - additionalProps: "" -``` -kafkaStreamConfig is deprecated and unsupported. - ### kafkaConfig {id=kafka-preprocessor} ```yaml @@ -739,22 +727,6 @@ preprocessorConfig: -### upstreamTopics -```yaml -preprocessorConfig: - upstreamTopics: "" -``` -upstreamTopics is deprecated. -Should always be set to "" - -### downstreamTopic -```yaml -preprocessorConfig: - downstreamTopic: "" -``` -downstreamTopic is deprecated. -Should always be set to "" - ### preprocessorInstanceCount ```yaml preprocessorConfig: @@ -763,15 +735,6 @@ preprocessorConfig: Indicates how many instances of the preprocessor are currently deployed. Used for scaling rate limiters such that each preprocessor instance will allow the `total rate limit / preprocessor instance count` through before applying. -### dataTransformer -dataTransformer is deprecated. - -```yaml -preprocessorConfig: - dataTransformer: json -``` -Should always be set to json - ### rateLimiterMaxBurstSeconds ```yaml preprocessorConfig: @@ -781,27 +744,6 @@ Defines how many seconds rate limiting unused permits can be accumulated before Must be greater than or equal to 1. -### kafkaPartitionStickyTimeoutMs - -```yaml -preprocessorConfig: - kafkaPartitionStickyTimeoutMs: 0 -``` - -kafkaPartitionStickyTimeoutMs is deprecated. -Should always be set to 0 - -### useBulkApi - -```yaml -preprocessorConfig: - useBulkApi: true -``` -useBulkApi is deprecated. -Should always be set to true - -Enable bulk ingest API, replacing the Kafka Streams API _(deprecated)_. - ### rateLimitExceededErrorCode ```yaml