diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/BatchConsumer.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/BatchConsumer.java index f83080e2a7..03d67d1615 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/BatchConsumer.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/BatchConsumer.java @@ -139,6 +139,7 @@ private void commitIfReady() { if (!offsetsToCommit.isEmpty()) { commit(offsetsToCommit); } + maxPendingOffsets.clear(); lastCommitTime = Instant.now(); } } @@ -180,7 +181,8 @@ public void initialize() { compositeMessageContentWrapper, topic, trackers, - loadRecorder + loadRecorder, + this::commitIfReady ); metrics.initialize(); } diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/batch/MessageBatchReceiver.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/batch/MessageBatchReceiver.java index f85236ea36..66fccbacc7 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/batch/MessageBatchReceiver.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/batch/MessageBatchReceiver.java @@ -42,6 +42,7 @@ public class MessageBatchReceiver { private final Topic topic; private final SubscriptionLoadRecorder loadRecorder; private boolean receiving = true; + private final Runnable commitIfReady; public MessageBatchReceiver(MessageReceiver receiver, MessageBatchFactory batchFactory, @@ -49,7 +50,8 @@ public MessageBatchReceiver(MessageReceiver receiver, CompositeMessageContentWrapper compositeMessageContentWrapper, Topic topic, Trackers trackers, - SubscriptionLoadRecorder loadRecorder) { + SubscriptionLoadRecorder loadRecorder, + Runnable commitIfReady) { this.receiver = receiver; this.batchFactory = batchFactory; this.messageConverterResolver = messageConverterResolver; @@ -58,6 +60,7 @@ public MessageBatchReceiver(MessageReceiver receiver, this.trackers = trackers; this.loadRecorder = loadRecorder; this.inflight = new ArrayDeque<>(1); + this.commitIfReady = commitIfReady; } public MessageBatchingResult next(Subscription subscription, Runnable signalsInterrupt) { @@ -74,6 +77,9 @@ public MessageBatchingResult next(Subscription subscription, Runnable signalsInt while (isReceiving() && !batch.isReadyForDelivery() && !Thread.currentThread().isInterrupted()) { loadRecorder.recordSingleOperation(); signalsInterrupt.run(); + // We need a commit function call here to prevent cases where it takes a long time to create a batch and messages are filtered at the same time. + // Otherwise, this would lead to ever-increasing lag despite the processing and filtering of current messages. + commitIfReady.run(); Optional maybeMessage = inflight.isEmpty() ? readAndTransform(subscription, batch.getId()) : Optional.ofNullable(inflight.poll()); diff --git a/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/BatchDeliveryTest.java b/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/BatchDeliveryTest.java index e3ead4bd1b..e994641533 100644 --- a/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/BatchDeliveryTest.java +++ b/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/BatchDeliveryTest.java @@ -118,6 +118,44 @@ public void shouldFilterIncomingEventsForBatch() { ); } + @Test + public void shouldCommitFilteredMessagesForBatch() { + // given + TestSubscriber subscriber = subscribers.createSubscriber(); + Topic topic = hermes.initHelper().createTopic(topicWithRandomName().build()); + final Subscription subscription = hermes.initHelper().createSubscription(subscriptionWithRandomName(topic.getName(), subscriber.getEndpoint()) + .withSubscriptionPolicy(buildBatchPolicy() + .withBatchSize(10) + .withBatchTime(Integer.MAX_VALUE) + .withBatchVolume(1024) + .build()) + .withFilter(MESSAGE_NAME_FILTER) + .build()); + + // when + hermes.api().publishUntilSuccess(topic.getQualifiedName(), ALICE.asJson()); + hermes.api().publishUntilSuccess(topic.getQualifiedName(), ALICE.asJson()); + hermes.api().publishUntilSuccess(topic.getQualifiedName(), ALICE.asJson()); + + // then + waitAtMost(Duration.ofSeconds(10)).untilAsserted(() -> + hermes.api().getConsumersMetrics() + .expectStatus() + .isOk() + .expectBody(String.class) + .value((body) -> assertThatMetrics(body) + .contains("hermes_consumers_subscription_filtered_out_total") + .withLabels( + "group", topic.getName().getGroupName(), + "subscription", subscription.getName(), + "topic", topic.getName().getName() + ) + .withValue(3.0) + ) + ); + hermes.api().waitUntilConsumerCommitsOffset(topic.getQualifiedName(), subscription.getName()); + } + @Test public void shouldDeliverBatchInGivenTimePeriod() { // given