diff --git a/streams-bootstrap-test/src/main/java/com/bakdata/kafka/KafkaTestClient.java b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/KafkaTestClient.java index 7d3ea30b..1d1a68c1 100644 --- a/streams-bootstrap-test/src/main/java/com/bakdata/kafka/KafkaTestClient.java +++ b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/KafkaTestClient.java @@ -33,6 +33,8 @@ import java.util.Map; import lombok.NonNull; import lombok.RequiredArgsConstructor; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; /** * Client that supports communication with Kafka clusters in test setups, including topic management, reading from @@ -41,17 +43,16 @@ @RequiredArgsConstructor public class KafkaTestClient { - private static final TopicSettingsBuilder DEFAULT_TOPIC_SETTINGS = TopicSettings.builder() - .partitions(1) - .replicationFactor((short) 1); private final @NonNull KafkaEndpointConfig endpointConfig; /** - * Create q new {@code TopicSettingsBuilder} which uses a single partition and no replicas + * Create a new {@code TopicSettingsBuilder} which uses a single partition and no replicas * @return default topic settings */ public static TopicSettingsBuilder defaultTopicSettings() { - return DEFAULT_TOPIC_SETTINGS; + return TopicSettings.builder() + .partitions(1) + .replicationFactor((short) 1); } /** @@ -63,11 +64,13 @@ public SenderBuilder send() { } /** - * Prepare reading data from the cluster + * Prepare reading data from the cluster. {@link ConsumerConfig#AUTO_OFFSET_RESET_CONFIG} is configured to + * {@link OffsetResetStrategy#EARLIEST} * @return configured {@code ReaderBuilder} */ public ReaderBuilder read() { - return new ReaderBuilder(this.endpointConfig.createKafkaProperties()); + return new ReaderBuilder(this.endpointConfig.createKafkaProperties()) + .with(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.toString()); } /** diff --git a/streams-bootstrap-test/src/main/java/com/bakdata/kafka/ReaderBuilder.java b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/ReaderBuilder.java index 4bd134f4..0d867dac 100644 --- a/streams-bootstrap-test/src/main/java/com/bakdata/kafka/ReaderBuilder.java +++ b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/ReaderBuilder.java @@ -64,7 +64,6 @@ private static List> readAll(final Consumer co .map(ReaderBuilder::toTopicPartition) .collect(Collectors.toList()); consumer.assign(topicPartitions); - consumer.seekToBeginning(topicPartitions); return pollAll(consumer, timeout); } @@ -85,7 +84,8 @@ public ReaderBuilder with(final String key, final Object value) { } /** - * Read data from a topic + * Read all data from a topic. This method is idempotent, meaning calling it multiple times will read the same + * data unless the data in the topic changes. * @param topic topic to read from * @param timeout consumer poll timeout * @return consumed records