Skip to content

Commit

Permalink
Add shutdown method to buffer API
Browse files Browse the repository at this point in the history
Signed-off-by: Chase Engelbrecht <[email protected]>
  • Loading branch information
engechas committed Oct 23, 2023
1 parent d118cf6 commit a85a4fc
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class KafkaBuffer<T extends Record<?>> extends AbstractBuffer<T> {
public static final int INNER_BUFFER_CAPACITY = 1000000;
public static final int INNER_BUFFER_BATCH_SIZE = 250000;
private final KafkaCustomProducer producer;
private final List<KafkaCustomConsumer> emptyCheckingConsumers;
private final AbstractBuffer innerBuffer;
private final ExecutorService executorService;
private final Duration drainTimeout;
Expand All @@ -60,6 +61,8 @@ public KafkaBuffer(final PluginSetting pluginSetting, final KafkaBufferConfig ka
this.shutdownInProgress = new AtomicBoolean(false);
final List<KafkaCustomConsumer> consumers = kafkaCustomConsumerFactory.createConsumersForTopic(kafkaBufferConfig, kafkaBufferConfig.getTopic(),
innerBuffer, pluginMetrics, acknowledgementSetManager, byteDecoder, shutdownInProgress);
emptyCheckingConsumers = kafkaCustomConsumerFactory.createConsumersForTopic(kafkaBufferConfig, kafkaBufferConfig.getTopic(),
innerBuffer, pluginMetrics, acknowledgementSetManager, byteDecoder, shutdownInProgress);
this.executorService = Executors.newFixedThreadPool(consumers.size());
consumers.forEach(this.executorService::submit);

Expand Down Expand Up @@ -115,8 +118,11 @@ public void doCheckpoint(CheckpointState checkpointState) {

@Override
public boolean isEmpty() {
final boolean areTopicsEmpty = emptyCheckingConsumers.stream()
.allMatch(KafkaCustomConsumer::isEmpty);

// TODO: check Kafka topic is empty as well.
return innerBuffer.isEmpty();
return areTopicsEmpty && innerBuffer.isEmpty();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public String getClientDnsLookup() {

@Override
public boolean getAcknowledgementsEnabled() {
return false;
return true;
}

public Duration getDrainTimeout() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.RecordDeserializationException;
Expand Down Expand Up @@ -55,6 +56,7 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static org.apache.commons.lang3.exception.ExceptionUtils.getRootCause;

Expand Down Expand Up @@ -528,4 +530,36 @@ final String getTopicPartitionOffset(final Map<TopicPartition, Long> offsetMap,
final Long offset = offsetMap.get(topicPartition);
return Objects.isNull(offset) ? "-" : offset.toString();
}

public boolean isEmpty() {
final List<PartitionInfo> partitions = consumer.partitionsFor(topicName);
final List<TopicPartition> topicPartitions = partitions.stream()
.map(partitionInfo -> new TopicPartition(topicName, partitionInfo.partition()))
.collect(Collectors.toList());

final Map<TopicPartition, OffsetAndMetadata> committedOffsets = consumer.committed(new HashSet<>(topicPartitions));
final Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions);

for (TopicPartition topicPartition : topicPartitions) {
final OffsetAndMetadata offsetAndMetadata = committedOffsets.get(topicPartition);
final Long endOffset = endOffsets.get(topicPartition);
LOG.info("Partition {} offsets: endOffset: {}, committedOffset: {}",
topicPartition, endOffset, Objects.isNull(offsetAndMetadata) ? "-" : offsetAndMetadata.offset());

// If there is data in the partition
if (endOffset != 0L) {
// If there is no committed offset for the partition
if (Objects.isNull(offsetAndMetadata)) {
return false;
}

// If the committed offset is behind the end offset
if (offsetAndMetadata.offset() < endOffset) {
return false;
}
}
}

return true;
}
}

0 comments on commit a85a4fc

Please sign in to comment.