diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/AbstractConsumerSeekAwareTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/AbstractConsumerSeekAwareTests.java index 5230dc967..eacad565e 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/AbstractConsumerSeekAwareTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/AbstractConsumerSeekAwareTests.java @@ -53,6 +53,7 @@ /** * @author Borahm Lee + * @author Artem Bilan * @since 3.3 */ @DirtiesContext @@ -73,48 +74,59 @@ class AbstractConsumerSeekAwareTests { @Test public void checkCallbacksAndTopicPartitions() { - await().timeout(Duration.ofSeconds(5)).untilAsserted(() -> { - Map> callbacksAndTopics = multiGroupListener.getCallbacksAndTopics(); - Set registeredCallbacks = callbacksAndTopics.keySet(); - Set registeredTopicPartitions = callbacksAndTopics.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()); - - Map> topicsAndCallbacks = multiGroupListener.getTopicsAndCallbacks(); - Set getTopicPartitions = topicsAndCallbacks.keySet(); - Set getCallbacks = topicsAndCallbacks.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()); - - assertThat(registeredCallbacks).containsExactlyInAnyOrderElementsOf(getCallbacks).isNotEmpty(); - assertThat(registeredTopicPartitions).containsExactlyInAnyOrderElementsOf(getTopicPartitions).hasSize(3); - }); + await().timeout(Duration.ofSeconds(5)) + .untilAsserted(() -> { + Map> callbacksAndTopics = + multiGroupListener.getCallbacksAndTopics(); + Set registeredCallbacks = callbacksAndTopics.keySet(); + Set registeredTopicPartitions = + callbacksAndTopics.values() + .stream() + .flatMap(Collection::stream) + .collect(Collectors.toSet()); + + Map> topicsAndCallbacks = + multiGroupListener.getTopicsAndCallbacks(); + Set getTopicPartitions = topicsAndCallbacks.keySet(); + Set getCallbacks = + topicsAndCallbacks.values() + .stream() + .flatMap(Collection::stream) + .collect(Collectors.toSet()); + + assertThat(registeredCallbacks).containsExactlyInAnyOrderElementsOf(getCallbacks).isNotEmpty(); + assertThat(registeredTopicPartitions).containsExactlyInAnyOrderElementsOf(getTopicPartitions); + }); } @Test void seekForAllGroups() throws Exception { template.send(TOPIC, "test-data"); template.send(TOPIC, "test-data"); - assertThat(MultiGroupListener.latch1.await(10, TimeUnit.SECONDS)).isTrue(); - assertThat(MultiGroupListener.latch2.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(MultiGroupListener.latch1.await(30, TimeUnit.SECONDS)).isTrue(); + assertThat(MultiGroupListener.latch2.await(30, TimeUnit.SECONDS)).isTrue(); MultiGroupListener.latch1 = new CountDownLatch(2); MultiGroupListener.latch2 = new CountDownLatch(2); multiGroupListener.seekToBeginning(); - assertThat(MultiGroupListener.latch1.await(10, TimeUnit.SECONDS)).isTrue(); - assertThat(MultiGroupListener.latch2.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(MultiGroupListener.latch1.await(30, TimeUnit.SECONDS)).isTrue(); + assertThat(MultiGroupListener.latch2.await(30, TimeUnit.SECONDS)).isTrue(); } @Test void seekForSpecificGroup() throws Exception { template.send(TOPIC, "test-data"); template.send(TOPIC, "test-data"); - assertThat(MultiGroupListener.latch1.await(10, TimeUnit.SECONDS)).isTrue(); - assertThat(MultiGroupListener.latch2.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(MultiGroupListener.latch1.await(30, TimeUnit.SECONDS)).isTrue(); + assertThat(MultiGroupListener.latch2.await(30, TimeUnit.SECONDS)).isTrue(); MultiGroupListener.latch1 = new CountDownLatch(2); MultiGroupListener.latch2 = new CountDownLatch(2); multiGroupListener.seekToBeginningForGroup("group2"); - assertThat(MultiGroupListener.latch2.await(10, TimeUnit.SECONDS)).isTrue(); - assertThat(MultiGroupListener.latch1.await(100, TimeUnit.MICROSECONDS)).isFalse(); + assertThat(MultiGroupListener.latch2.await(30, TimeUnit.SECONDS)).isTrue(); + assertThat(MultiGroupListener.latch1.await(1, TimeUnit.SECONDS)).isFalse(); assertThat(MultiGroupListener.latch1.getCount()).isEqualTo(2); } @@ -128,7 +140,8 @@ static class Config { @Bean ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( ConsumerFactory consumerFactory) { - ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory); return factory; } @@ -172,7 +185,9 @@ void seekToBeginningForGroup(String groupIdForSeek) { } }); } + } + } }