Skip to content

Commit

Permalink
Fixed issue for committing offsets in BatchConsumer (#1908)
Browse files Browse the repository at this point in the history
* Fixed issue for committing offsets in BatchConsumer when almost all messages are filtered

* Added explanation

* Clearing maxPendingOffsets map in BatchConsumer
  • Loading branch information
szczygiel-m authored Oct 5, 2024
1 parent 4d47803 commit 9611ede
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ private void commitIfReady() {
if (!offsetsToCommit.isEmpty()) {
commit(offsetsToCommit);
}
maxPendingOffsets.clear();
lastCommitTime = Instant.now();
}
}
Expand Down Expand Up @@ -180,7 +181,8 @@ public void initialize() {
compositeMessageContentWrapper,
topic,
trackers,
loadRecorder
loadRecorder,
this::commitIfReady
);
metrics.initialize();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,16 @@ public class MessageBatchReceiver {
private final Topic topic;
private final SubscriptionLoadRecorder loadRecorder;
private boolean receiving = true;
private final Runnable commitIfReady;

public MessageBatchReceiver(MessageReceiver receiver,
MessageBatchFactory batchFactory,
MessageConverterResolver messageConverterResolver,
CompositeMessageContentWrapper compositeMessageContentWrapper,
Topic topic,
Trackers trackers,
SubscriptionLoadRecorder loadRecorder) {
SubscriptionLoadRecorder loadRecorder,
Runnable commitIfReady) {
this.receiver = receiver;
this.batchFactory = batchFactory;
this.messageConverterResolver = messageConverterResolver;
Expand All @@ -58,6 +60,7 @@ public MessageBatchReceiver(MessageReceiver receiver,
this.trackers = trackers;
this.loadRecorder = loadRecorder;
this.inflight = new ArrayDeque<>(1);
this.commitIfReady = commitIfReady;
}

public MessageBatchingResult next(Subscription subscription, Runnable signalsInterrupt) {
Expand All @@ -74,6 +77,9 @@ public MessageBatchingResult next(Subscription subscription, Runnable signalsInt
while (isReceiving() && !batch.isReadyForDelivery() && !Thread.currentThread().isInterrupted()) {
loadRecorder.recordSingleOperation();
signalsInterrupt.run();
// We need a commit function call here to prevent cases where it takes a long time to create a batch and messages are filtered at the same time.
// Otherwise, this would lead to ever-increasing lag despite the processing and filtering of current messages.
commitIfReady.run();
Optional<Message> maybeMessage = inflight.isEmpty()
? readAndTransform(subscription, batch.getId())
: Optional.ofNullable(inflight.poll());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,44 @@ public void shouldFilterIncomingEventsForBatch() {
);
}

@Test
public void shouldCommitFilteredMessagesForBatch() {
// given
TestSubscriber subscriber = subscribers.createSubscriber();
Topic topic = hermes.initHelper().createTopic(topicWithRandomName().build());
final Subscription subscription = hermes.initHelper().createSubscription(subscriptionWithRandomName(topic.getName(), subscriber.getEndpoint())
.withSubscriptionPolicy(buildBatchPolicy()
.withBatchSize(10)
.withBatchTime(Integer.MAX_VALUE)
.withBatchVolume(1024)
.build())
.withFilter(MESSAGE_NAME_FILTER)
.build());

// when
hermes.api().publishUntilSuccess(topic.getQualifiedName(), ALICE.asJson());
hermes.api().publishUntilSuccess(topic.getQualifiedName(), ALICE.asJson());
hermes.api().publishUntilSuccess(topic.getQualifiedName(), ALICE.asJson());

// then
waitAtMost(Duration.ofSeconds(10)).untilAsserted(() ->
hermes.api().getConsumersMetrics()
.expectStatus()
.isOk()
.expectBody(String.class)
.value((body) -> assertThatMetrics(body)
.contains("hermes_consumers_subscription_filtered_out_total")
.withLabels(
"group", topic.getName().getGroupName(),
"subscription", subscription.getName(),
"topic", topic.getName().getName()
)
.withValue(3.0)
)
);
hermes.api().waitUntilConsumerCommitsOffset(topic.getQualifiedName(), subscription.getName());
}

@Test
public void shouldDeliverBatchInGivenTimePeriod() {
// given
Expand Down

0 comments on commit 9611ede

Please sign in to comment.