From a85a4fc35ce99ff3b8673c7d50c88dbbd94022c1 Mon Sep 17 00:00:00 2001 From: Chase Engelbrecht Date: Mon, 9 Oct 2023 13:41:56 -0500 Subject: [PATCH] Add shutdown method to buffer API Signed-off-by: Chase Engelbrecht --- .../plugins/kafka/buffer/KafkaBuffer.java | 8 ++++- .../configuration/KafkaBufferConfig.java | 2 +- .../kafka/consumer/KafkaCustomConsumer.java | 34 +++++++++++++++++++ 3 files changed, 42 insertions(+), 2 deletions(-) diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java index b139fe5db9..852892cd5c 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java @@ -40,6 +40,7 @@ public class KafkaBuffer> extends AbstractBuffer { public static final int INNER_BUFFER_CAPACITY = 1000000; public static final int INNER_BUFFER_BATCH_SIZE = 250000; private final KafkaCustomProducer producer; + private final List emptyCheckingConsumers; private final AbstractBuffer innerBuffer; private final ExecutorService executorService; private final Duration drainTimeout; @@ -60,6 +61,8 @@ public KafkaBuffer(final PluginSetting pluginSetting, final KafkaBufferConfig ka this.shutdownInProgress = new AtomicBoolean(false); final List consumers = kafkaCustomConsumerFactory.createConsumersForTopic(kafkaBufferConfig, kafkaBufferConfig.getTopic(), innerBuffer, pluginMetrics, acknowledgementSetManager, byteDecoder, shutdownInProgress); + emptyCheckingConsumers = kafkaCustomConsumerFactory.createConsumersForTopic(kafkaBufferConfig, kafkaBufferConfig.getTopic(), + innerBuffer, pluginMetrics, acknowledgementSetManager, byteDecoder, shutdownInProgress); this.executorService = Executors.newFixedThreadPool(consumers.size()); consumers.forEach(this.executorService::submit); @@ -115,8 +118,11 @@ public void doCheckpoint(CheckpointState checkpointState) { @Override public boolean isEmpty() { + final boolean areTopicsEmpty = emptyCheckingConsumers.stream() + .allMatch(KafkaCustomConsumer::isEmpty); + // TODO: check Kafka topic is empty as well. - return innerBuffer.isEmpty(); + return areTopicsEmpty && innerBuffer.isEmpty(); } @Override diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaBufferConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaBufferConfig.java index 1cab8d7133..8634ffc082 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaBufferConfig.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaBufferConfig.java @@ -116,7 +116,7 @@ public String getClientDnsLookup() { @Override public boolean getAcknowledgementsEnabled() { - return false; + return true; } public Duration getDrainTimeout() { diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java index 79e50f0647..6687bc363e 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java @@ -16,6 +16,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.RecordDeserializationException; @@ -55,6 +56,7 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import static org.apache.commons.lang3.exception.ExceptionUtils.getRootCause; @@ -528,4 +530,36 @@ final String getTopicPartitionOffset(final Map offsetMap, final Long offset = offsetMap.get(topicPartition); return Objects.isNull(offset) ? "-" : offset.toString(); } + + public boolean isEmpty() { + final List partitions = consumer.partitionsFor(topicName); + final List topicPartitions = partitions.stream() + .map(partitionInfo -> new TopicPartition(topicName, partitionInfo.partition())) + .collect(Collectors.toList()); + + final Map committedOffsets = consumer.committed(new HashSet<>(topicPartitions)); + final Map endOffsets = consumer.endOffsets(topicPartitions); + + for (TopicPartition topicPartition : topicPartitions) { + final OffsetAndMetadata offsetAndMetadata = committedOffsets.get(topicPartition); + final Long endOffset = endOffsets.get(topicPartition); + LOG.info("Partition {} offsets: endOffset: {}, committedOffset: {}", + topicPartition, endOffset, Objects.isNull(offsetAndMetadata) ? "-" : offsetAndMetadata.offset()); + + // If there is data in the partition + if (endOffset != 0L) { + // If there is no committed offset for the partition + if (Objects.isNull(offsetAndMetadata)) { + return false; + } + + // If the committed offset is behind the end offset + if (offsetAndMetadata.offset() < endOffset) { + return false; + } + } + } + + return true; + } }