Kafka Test Companion feedback and improvements #1852
Replies: 1 comment 1 reply
-
Hi @loicmathieu,
You can also use the companion.registerSerde(Person.class, new GenericSerializer<>(), new GenericDeserializer<>());
ProducerRecord<String, Person> record = new ProducerRecord<>(topic, new Person("1", 20));
companion.produce(Person.class)
.fromRecords(record)
.awaitCompletion();
ConsumerBuilder<String, Person> consumer = companion.consume(Person.class);
ConsumerTask<String, Person> task = consumer.fromTopics(topic, 1).awaitCompletion();
assertThat(task.getRecords()).hasSize(1); Note that you need to call
|
Beta Was this translation helpful? Give feedback.
-
I recently migrated a bunch of integration tests from raw Kafka consumer/producer API to the Kafka Companion.
The API is very convenient but here a some feedback and possible improvements.
There is shortcut methods for consuming / producing strings and primitive but as soon as you want to use custom serializers and deserializers you're ending up to write a lot of
companion.consumeWithDeserializer(StringDeserializer.class, MyObjectDeserializer.class)
andcompanion.produceWithSerializer(StringSerializer.class, MyObjectSerializer.class)
.As String is often used as keys, providing
companion.consumeWithDeserializer(MyObjectDeserializer.class)
with a defaultStringDeserializer
for the key andcompanion.produceWithSerializer(MyObjectSerializer.class)
with a defaultStringSerializer
for the key will be convenient. I can propose a PR for that.When working with AVRO (or other generic serializer / deserializer) the bound of the serializer will be object wich is not very elegant. For example:
A solution would be to add a type hint to the method:
I don't know if something better can be done, we can always add an
as(Class, Class)
method but I'm not sure it's better. Maybe removing the bound to<K,T>
would do the trick.Maybe we can have a
companion.consumeStrings().fromTopics("my_topic").awaitNoRecords(Duration)
that will throws an exception or return a boolean to false if it receive a list of record (or return the received record). I can propose a PR for that.companion.consume()
) on the same test case, the second one receive the messages from the begining, so if we want to have several test methods based on the same topic, we will need to reuse the same consumer for all test methods, due to how it is integrated in Quarkus (via injection), we will need to switch to PER_CLASS lifecycle and things get complicated a lot. I'm not sure if the issue is that each consumer task get it's own group-id or if the offset was not commited so I don't know if there is an easy way to workaround this. I'll comment this discussion with example that will explain more this issue.Beta Was this translation helpful? Give feedback.
All reactions