diff --git a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/AsyncCloseable.java b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/AsyncCloseable.java index 8a40032993..8361748021 100644 --- a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/AsyncCloseable.java +++ b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/AsyncCloseable.java @@ -21,7 +21,9 @@ import io.vertx.core.Future; import io.vertx.core.Promise; import io.vertx.core.Vertx; + import java.util.Arrays; +import java.util.Objects; import java.util.stream.Collectors; /** @@ -53,7 +55,10 @@ static AutoCloseable toAutoCloseable(AsyncCloseable closeable) { } /** - * Compose several {@link AsyncCloseable} into a single {@link AsyncCloseable}. One close failure will cause the whole close to fail. + * Compose several {@link AsyncCloseable}s into a single {@link AsyncCloseable}. + * One close failure will cause the whole close to fail. + *

+ * It filters null futures returned by individual {@link AsyncCloseable} on close. * * @param closeables the closeables to compose * @return the composed closeables @@ -61,7 +66,9 @@ static AutoCloseable toAutoCloseable(AsyncCloseable closeable) { static AsyncCloseable compose(AsyncCloseable... closeables) { return () -> CompositeFuture.all( Arrays.stream(closeables) + .filter(Objects::nonNull) .map(AsyncCloseable::close) + .filter(Objects::nonNull) .collect(Collectors.toList()) ).mapEmpty(); } diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/RecordDispatcherListener.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/RecordDispatcherListener.java index ec5d4745bf..7898c15ff5 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/RecordDispatcherListener.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/RecordDispatcherListener.java @@ -15,13 +15,13 @@ */ package dev.knative.eventing.kafka.broker.dispatcher; -import io.vertx.core.Future; +import dev.knative.eventing.kafka.broker.core.AsyncCloseable; import io.vertx.kafka.client.consumer.KafkaConsumerRecord; /** * This class contains hooks for listening events through the {@link dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcher} lifecycle. */ -public interface RecordDispatcherListener { +public interface RecordDispatcherListener extends AsyncCloseable { /** * The given record has been received. diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java index 7d059f4618..7e9757eae1 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java @@ -81,7 +81,7 @@ public RecordDispatcherImpl( this.subscriberSender = composeSenderAndSinkHandler(subscriberSender, responseHandler, "subscriber"); this.dlsSender = composeSenderAndSinkHandler(deadLetterSinkSender, responseHandler, "dead letter sink"); this.recordDispatcherListener = recordDispatcherListener; - this.closeable = AsyncCloseable.compose(responseHandler, deadLetterSinkSender, subscriberSender); + this.closeable = AsyncCloseable.compose(responseHandler, deadLetterSinkSender, subscriberSender, recordDispatcherListener); this.consumerTracer = consumerTracer; } diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/BaseConsumerVerticle.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/BaseConsumerVerticle.java index 12fac0eb29..572f94f2bf 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/BaseConsumerVerticle.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/BaseConsumerVerticle.java @@ -67,11 +67,15 @@ public void start(Promise startPromise) { public void stop(Promise stopPromise) { logger.info("Stopping consumer"); - AsyncCloseable.compose( - this.consumer::close, - this.recordDispatcher, - this.closeable - ).close(stopPromise); + final Promise dependenciesClosedPromise = Promise.promise(); + + // Close consumer after other objects have been closed. + dependenciesClosedPromise.future() + .onComplete(r -> this.consumer.close().onComplete(stopPromise)); + + AsyncCloseable + .compose(this.recordDispatcher, this.closeable) + .close(dependenciesClosedPromise); } public void setConsumer(KafkaConsumer consumer) { diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OffsetManager.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OffsetManager.java index 38a019319d..ce6cd36cc5 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OffsetManager.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OffsetManager.java @@ -16,6 +16,8 @@ package dev.knative.eventing.kafka.broker.dispatcher.impl.consumer; import dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcherListener; +import io.vertx.core.CompositeFuture; +import io.vertx.core.Future; import io.vertx.core.Vertx; import io.vertx.kafka.client.common.TopicPartition; import io.vertx.kafka.client.consumer.KafkaConsumer; @@ -30,6 +32,7 @@ import java.util.Map; import java.util.Objects; import java.util.function.Consumer; +import java.util.stream.Collectors; /** * This class implements the offset strategy that makes sure that, even unordered, the offset commit is ordered. @@ -60,7 +63,7 @@ public OffsetManager(final Vertx vertx, this.offsetTrackers = new HashMap<>(); this.onCommit = onCommit; - vertx.setPeriodic(commitIntervalMs, l -> this.offsetTrackers.forEach(this::commit)); + vertx.setPeriodic(commitIntervalMs, l -> commitAll()); } /** @@ -115,7 +118,7 @@ private void commit(final KafkaConsumerRecord record) { .recordNewOffset(record.offset()); } - private synchronized void commit(final TopicPartition topicPartition, final OffsetTracker tracker) { + private synchronized Future commit(final TopicPartition topicPartition, final OffsetTracker tracker) { long newOffset = tracker.offsetToCommit(); if (newOffset > tracker.getCommitted()) { // Reset the state @@ -124,7 +127,7 @@ private synchronized void commit(final TopicPartition topicPartition, final Offs logger.debug("Committing offset for {} offset {}", topicPartition, newOffset); // Execute the actual commit - consumer.commit(Map.of(topicPartition, new OffsetAndMetadata(newOffset, ""))) + return consumer.commit(Map.of(topicPartition, new OffsetAndMetadata(newOffset, ""))) .onSuccess(ignored -> { if (onCommit != null) { onCommit.accept((int) newOffset); @@ -133,6 +136,27 @@ private synchronized void commit(final TopicPartition topicPartition, final Offs .onFailure(cause -> logger.error("failed to commit topic partition {} offset {}", topicPartition, newOffset, cause)) .mapEmpty(); } + return null; + } + + /** + * Commit all tracked offsets by colling commit on every offsetTracker entry. + * + * @return succeeded or failed future. + */ + private Future commitAll() { + return CompositeFuture.all( + this.offsetTrackers.entrySet() + .stream() + .map(e -> commit(e.getKey(), e.getValue())) + .filter(Objects::nonNull) + .collect(Collectors.toList()) + ).mapEmpty(); + } + + @Override + public Future close() { + return commitAll(); } /** diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OrderedConsumerVerticle.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OrderedConsumerVerticle.java index ca78dab17a..2743f51887 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OrderedConsumerVerticle.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OrderedConsumerVerticle.java @@ -111,6 +111,9 @@ public void stop(Promise stopPromise) { } void recordsHandler(KafkaConsumerRecords records) { + if (records == null) { + return; + } // Put records in queues // I assume the records are ordered per topic-partition for (int i = 0; i < records.size(); i++) { diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/AbstractOffsetManagerTest.java b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/AbstractOffsetManagerTest.java index 971ca11b05..e8136d32cd 100644 --- a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/AbstractOffsetManagerTest.java +++ b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/AbstractOffsetManagerTest.java @@ -100,13 +100,15 @@ protected MapAssert assertThatOffsetCommittedWithFailures( .when(vertxConsumer) .commit(any(Map.class)); - testExecutor.accept(createOffsetManager(Vertx.vertx(), vertxConsumer), failureFlag); + final var offsetManager = createOffsetManager(Vertx.vertx(), vertxConsumer); + testExecutor.accept(offsetManager, failureFlag); try { Thread.sleep(1000); } catch (final InterruptedException e) { throw new RuntimeException(e); } + assertThat(offsetManager.close().succeeded()).isTrue(); final var committed = mockConsumer.committed(Set.copyOf(partitionsConsumed)); return assertThat(