Skip to content

Commit

Permalink
Remove Kafka stream preprocessor functionality (#930)
Browse files Browse the repository at this point in the history
Co-authored-by: Bryan Burkholder <[email protected]>
  • Loading branch information
bryanlb and bryanlb authored May 17, 2024
1 parent ac2021f commit 3d54a6e
Show file tree
Hide file tree
Showing 26 changed files with 117 additions and 3,094 deletions.
5 changes: 0 additions & 5 deletions astra/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -203,11 +203,6 @@
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
</dependency>

<!-- Jackson JSON parser dependencies -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.slack.astra.metadata.dataset.DatasetMetadata.MATCH_ALL_SERVICE;
import static com.slack.astra.metadata.dataset.DatasetMetadata.MATCH_STAR_SERVICE;
import static com.slack.astra.server.ManagerApiGrpc.MAX_TIME;

import com.google.common.util.concurrent.AbstractExecutionThreadService;
import com.slack.astra.metadata.core.AstraMetadataStoreChangeListener;
import com.slack.astra.metadata.dataset.DatasetMetadata;
import com.slack.astra.metadata.dataset.DatasetMetadataStore;
import com.slack.astra.preprocessor.PreprocessorService;
import com.slack.astra.metadata.dataset.DatasetPartitionMetadata;
import com.slack.astra.proto.config.AstraConfigs;
import com.slack.astra.writer.KafkaUtils;
import com.slack.service.murron.trace.Trace;
Expand All @@ -19,17 +20,20 @@
import io.micrometer.prometheus.PrometheusMeterRegistry;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
Expand Down Expand Up @@ -331,11 +335,26 @@ private int getPartition(String index) {
if (serviceNamePattern.equals(MATCH_ALL_SERVICE)
|| serviceNamePattern.equals(MATCH_STAR_SERVICE)
|| index.equals(serviceNamePattern)) {
List<Integer> partitions = PreprocessorService.getActivePartitionList(datasetMetadata);
List<Integer> partitions = getActivePartitionList(datasetMetadata);
return partitions.get(ThreadLocalRandom.current().nextInt(partitions.size()));
}
}
// We don't have a provisioned service for this index
return -1;
}

/** Gets the active list of partitions from the provided dataset metadata */
private static List<Integer> getActivePartitionList(DatasetMetadata datasetMetadata) {
Optional<DatasetPartitionMetadata> datasetPartitionMetadata =
datasetMetadata.getPartitionConfigs().stream()
.filter(partitionMetadata -> partitionMetadata.getEndTimeEpochMs() == MAX_TIME)
.findFirst();

if (datasetPartitionMetadata.isEmpty()) {
return Collections.emptyList();
}
return datasetPartitionMetadata.get().getPartitions().stream()
.map(Integer::parseInt)
.collect(Collectors.toUnmodifiableList());
}
}
23 changes: 0 additions & 23 deletions astra/src/main/java/com/slack/astra/logstore/LogWireMessage.java
Original file line number Diff line number Diff line change
@@ -1,41 +1,18 @@
package com.slack.astra.logstore;

import com.slack.astra.preprocessor.AstraSerdes;
import com.slack.astra.util.JsonUtil;
import java.io.IOException;
import java.time.Instant;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* LogWireMessage is the raw message we get from Kafka. This message may be invalid or malformed.
* LogMessage is a refined form of this message.
*/
public class LogWireMessage extends Message {
private static final Logger LOG = LoggerFactory.getLogger(LogWireMessage.class);

private String index;
private String type;

/**
* Move all Kafka message serializers to common class
*
* @see AstraSerdes
*/
@Deprecated
static Optional<LogWireMessage> fromJson(String jsonStr) {
try {
LogWireMessage wireMessage = JsonUtil.read(jsonStr, LogWireMessage.class);
return Optional.of(wireMessage);
} catch (IOException e) {
LOG.error("Error parsing JSON Object from string " + jsonStr, e);
}
return Optional.empty();
}

public LogWireMessage() {
super("", Instant.now(), Collections.emptyMap());
}
Expand Down
105 changes: 0 additions & 105 deletions astra/src/main/java/com/slack/astra/preprocessor/AstraSerdes.java

This file was deleted.

This file was deleted.

Loading

0 comments on commit 3d54a6e

Please sign in to comment.