Skip to content

Commit

Permalink
Fix AbstractConsumerSeekAwareTests for race condition
Browse files Browse the repository at this point in the history
Looks like there is some delay between calling seek and actual consume.

* Just increase latch timeout for 30 second
  • Loading branch information
artembilan committed Jul 11, 2024
1 parent 1ab37ab commit bfe6125
Showing 1 changed file with 36 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@

/**
* @author Borahm Lee
* @author Artem Bilan
* @since 3.3
*/
@DirtiesContext
Expand All @@ -73,48 +74,59 @@ class AbstractConsumerSeekAwareTests {

@Test
public void checkCallbacksAndTopicPartitions() {
await().timeout(Duration.ofSeconds(5)).untilAsserted(() -> {
Map<ConsumerSeekCallback, List<TopicPartition>> callbacksAndTopics = multiGroupListener.getCallbacksAndTopics();
Set<ConsumerSeekCallback> registeredCallbacks = callbacksAndTopics.keySet();
Set<TopicPartition> registeredTopicPartitions = callbacksAndTopics.values().stream().flatMap(Collection::stream).collect(Collectors.toSet());

Map<TopicPartition, List<ConsumerSeekCallback>> topicsAndCallbacks = multiGroupListener.getTopicsAndCallbacks();
Set<TopicPartition> getTopicPartitions = topicsAndCallbacks.keySet();
Set<ConsumerSeekCallback> getCallbacks = topicsAndCallbacks.values().stream().flatMap(Collection::stream).collect(Collectors.toSet());

assertThat(registeredCallbacks).containsExactlyInAnyOrderElementsOf(getCallbacks).isNotEmpty();
assertThat(registeredTopicPartitions).containsExactlyInAnyOrderElementsOf(getTopicPartitions).hasSize(3);
});
await().timeout(Duration.ofSeconds(5))
.untilAsserted(() -> {
Map<ConsumerSeekCallback, List<TopicPartition>> callbacksAndTopics =
multiGroupListener.getCallbacksAndTopics();
Set<ConsumerSeekCallback> registeredCallbacks = callbacksAndTopics.keySet();
Set<TopicPartition> registeredTopicPartitions =
callbacksAndTopics.values()
.stream()
.flatMap(Collection::stream)
.collect(Collectors.toSet());

Map<TopicPartition, List<ConsumerSeekCallback>> topicsAndCallbacks =
multiGroupListener.getTopicsAndCallbacks();
Set<TopicPartition> getTopicPartitions = topicsAndCallbacks.keySet();
Set<ConsumerSeekCallback> getCallbacks =
topicsAndCallbacks.values()
.stream()
.flatMap(Collection::stream)
.collect(Collectors.toSet());

assertThat(registeredCallbacks).containsExactlyInAnyOrderElementsOf(getCallbacks).isNotEmpty();
assertThat(registeredTopicPartitions).containsExactlyInAnyOrderElementsOf(getTopicPartitions);
});
}

@Test
void seekForAllGroups() throws Exception {
template.send(TOPIC, "test-data");
template.send(TOPIC, "test-data");
assertThat(MultiGroupListener.latch1.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(MultiGroupListener.latch2.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(MultiGroupListener.latch1.await(30, TimeUnit.SECONDS)).isTrue();
assertThat(MultiGroupListener.latch2.await(30, TimeUnit.SECONDS)).isTrue();

MultiGroupListener.latch1 = new CountDownLatch(2);
MultiGroupListener.latch2 = new CountDownLatch(2);

multiGroupListener.seekToBeginning();
assertThat(MultiGroupListener.latch1.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(MultiGroupListener.latch2.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(MultiGroupListener.latch1.await(30, TimeUnit.SECONDS)).isTrue();
assertThat(MultiGroupListener.latch2.await(30, TimeUnit.SECONDS)).isTrue();
}

@Test
void seekForSpecificGroup() throws Exception {
template.send(TOPIC, "test-data");
template.send(TOPIC, "test-data");
assertThat(MultiGroupListener.latch1.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(MultiGroupListener.latch2.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(MultiGroupListener.latch1.await(30, TimeUnit.SECONDS)).isTrue();
assertThat(MultiGroupListener.latch2.await(30, TimeUnit.SECONDS)).isTrue();

MultiGroupListener.latch1 = new CountDownLatch(2);
MultiGroupListener.latch2 = new CountDownLatch(2);

multiGroupListener.seekToBeginningForGroup("group2");
assertThat(MultiGroupListener.latch2.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(MultiGroupListener.latch1.await(100, TimeUnit.MICROSECONDS)).isFalse();
assertThat(MultiGroupListener.latch2.await(30, TimeUnit.SECONDS)).isTrue();
assertThat(MultiGroupListener.latch1.await(1, TimeUnit.SECONDS)).isFalse();
assertThat(MultiGroupListener.latch1.getCount()).isEqualTo(2);
}

Expand All @@ -128,7 +140,8 @@ static class Config {
@Bean
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
return factory;
}
Expand Down Expand Up @@ -172,7 +185,9 @@ void seekToBeginningForGroup(String groupIdForSeek) {
}
});
}

}

}

}

0 comments on commit bfe6125

Please sign in to comment.