Skip to content

Commit

Permalink
Merge branch 'feature/test-improvements' into feature/awaitility
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Jan 20, 2025
2 parents a4b4c33 + 87e21cd commit 89c2edf
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import java.util.Map;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;

/**
* Client that supports communication with Kafka clusters in test setups, including topic management, reading from
Expand All @@ -41,17 +43,16 @@
@RequiredArgsConstructor
public class KafkaTestClient {

private static final TopicSettingsBuilder DEFAULT_TOPIC_SETTINGS = TopicSettings.builder()
.partitions(1)
.replicationFactor((short) 1);
private final @NonNull KafkaEndpointConfig endpointConfig;

/**
* Create q new {@code TopicSettingsBuilder} which uses a single partition and no replicas
* Create a new {@code TopicSettingsBuilder} which uses a single partition and no replicas
* @return default topic settings
*/
public static TopicSettingsBuilder defaultTopicSettings() {
return DEFAULT_TOPIC_SETTINGS;
return TopicSettings.builder()
.partitions(1)
.replicationFactor((short) 1);
}

/**
Expand All @@ -63,11 +64,13 @@ public SenderBuilder send() {
}

/**
* Prepare reading data from the cluster
* Prepare reading data from the cluster. {@link ConsumerConfig#AUTO_OFFSET_RESET_CONFIG} is configured to
* {@link OffsetResetStrategy#EARLIEST}
* @return configured {@code ReaderBuilder}
*/
public ReaderBuilder read() {
return new ReaderBuilder(this.endpointConfig.createKafkaProperties());
return new ReaderBuilder(this.endpointConfig.createKafkaProperties())
.with(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.toString());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ private static <K, V> List<ConsumerRecord<K, V>> readAll(final Consumer<K, V> co
.map(ReaderBuilder::toTopicPartition)
.collect(Collectors.toList());
consumer.assign(topicPartitions);
consumer.seekToBeginning(topicPartitions);
return pollAll(consumer, timeout);
}

Expand All @@ -85,7 +84,8 @@ public ReaderBuilder with(final String key, final Object value) {
}

/**
* Read data from a topic
* Read all data from a topic. This method is idempotent, meaning calling it multiple times will read the same
* data unless the data in the topic changes.
* @param topic topic to read from
* @param timeout consumer poll timeout
* @return consumed records
Expand Down

0 comments on commit 89c2edf

Please sign in to comment.