diff --git a/README.md b/README.md index ce37dbc..46a7991 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,7 @@ ![Java](https://img.shields.io/badge/Java-11-blue?labelColor=black) ![Kotlin](https://img.shields.io/badge/Kotlin-1.6.10-blue?labelColor=black) ![SpringBoot](https://img.shields.io/badge/SpringBoot-2.6.2-blue?labelColor=black) +![SpringCloudStream](https://img.shields.io/badge/SpringCloudStream-3.2.1-blue?labelColor=black) # Spring Cloud Stream & Kafka binder step by step @@ -14,33 +15,29 @@ It offers an abstraction (the **binding**) that works the same whatever undernea * **Amazon Kinesis** * ... -As of today there are two ways to configure Spring Cloud Stream: -* With annotations (legacy since 3.1) -* With functional programming model - Let's try to setup a simple example step by step and see how it works! -This demo has been created using this [spring initializr configuration](https://start.spring.io/#!type=gradle-project&language=kotlin&platformVersion=2.4.4.RELEASE&packaging=jar&jvmVersion=11&groupId=com.example&artifactId=demo&name=demo&description=Demo%20project%20for%20Spring%20Boot&packageName=com.example.demo&dependencies=cloud-stream,web) adding Kafka binder dependency `spring-cloud-starter-stream-kafka`. +This demo has been created using this [spring initializr configuration](https://start.spring.io/#!type=gradle-project&language=kotlin&packaging=jar&groupId=com.example&artifactId=demo&name=demo&description=Demo%20project%20for%20Spring%20Boot&packageName=com.example.demo&dependencies=cloud-stream,web) adding Kafka binder dependency `spring-cloud-starter-stream-kafka`. -* [Producer](#producer) - * [Producer with annotations (legacy)](#producer-with-annotations-legacy) - * [Producer with functional programming model](#producer-with-functional-programming-model) -* [Consumer](#consumer) - * [Consumer with annotations (legacy)](#consumer-with-annotations-legacy) - * [Consumer with functional programming model](#consumer-with-functional-programming-model) +* [Producer with functional programming model](#producer-with-functional-programming-model) +* [Consumer with functional programming model](#consumer-with-functional-programming-model) * [Extras](#extras) * [Kafka Message Key](#kafka-message-key) * [Retries](#retries) * [Dead Letter Queue](#dead-letter-queue) * [Test this demo](#test-this-demo) * [Run this demo](#run-this-demo) -* See also +* See also: * :octocat: [Spring Cloud Stream & Kafka Confluent Avro Schema Registry](https://github.com/rogervinas/spring-cloud-stream-kafka-confluent-avro-schema-registry) * :octocat: [Spring Cloud Stream & Kafka Streams Binder first steps](https://github.com/rogervinas/spring-cloud-stream-kafka-streams-first-steps) * :octocat: [Spring Cloud Stream Multibinder](https://github.com/rogervinas/spring-cloud-stream-multibinder) * :octocat: [Spring Cloud Stream & Kafka Streams Binder + Processor API](https://github.com/rogervinas/spring-cloud-stream-kafka-streams-processor) -## Producer +You can browse older versions of this repo: +* [Spring 2.x with legacy annotations](https://github.com/rogervinas/spring-cloud-stream-kafka-step-by-step/tree/spring-2.x-legacy-annotations) (deprecated since spring-cloud-stream:3.1) +* [Spring 2.x with functional programming model](https://github.com/rogervinas/spring-cloud-stream-kafka-step-by-step/tree/spring-2.x) + +## Producer with functional programming model Our final goal is to produce messages to a Kafka topic. @@ -53,11 +50,9 @@ interface MyEventProducer { } ``` -We can first configure the binding using annotations (legacy way) and later we can change it to use functional interfaces. Checkout **legacy** tag in this repository if you want to go back to the legacy version. - -### Producer with annotations (legacy) +Then we follow these steps: -#### 1) We configure the binding `my-producer` in application.yml: +### 1) We configure the binding `my-producer` in application.yml: ```yaml spring: cloud: @@ -66,144 +61,83 @@ spring: binder: brokers: "localhost:9094" bindings: - my-producer: + my-producer-out-0: destination: "my.topic" + function: + definition: "my-producer" ``` -Everything under `spring.cloud.kafka.binder` is related to the Kafka binder implementation and we can use all these extra [Kafka binder properties](https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.1/reference/html/spring-cloud-stream-binder-kafka.html#_kafka_binder_properties). - -Everything under `spring.cloud.stream.bindings` is related to the Spring Cloud Stream binding abstraction and we can use all these extra [binding properties](https://docs.spring.io/spring-cloud-stream/docs/3.1.1/reference/html/spring-cloud-stream.html#binding-properties). +* Everything under `spring.cloud.kafka.binder` is related to the Kafka binder implementation and we can use all these extra [Kafka binder properties](https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/current/reference/html/spring-cloud-stream-binder-kafka.html#_kafka_binder_properties). +* Everything under `spring.cloud.stream.bindings` is related to the Spring Cloud Stream binding abstraction and we can use all these extra [binding properties](https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#binding-properties). +* As stated in [functional binding names](https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_functional_binding_names): `my-producer` is the function name, `out` is for output bindings and `0` is the index we have to use if we have a single function. -Now we have to link the binding `my-producer` with our implementation using annotations... ready? - -#### 2) We create an interface with a method annotated with `@Output` and returning a `MessageChannel`: +### 2) We create an implementation of `MyEventProducer` as a `Supplier` of `Flux`, to fulfill the interfaces that both our application and Spring Cloud Stream are expecting: ```kotlin -interface MyProducerBinding { - @Output("my-producer") - fun myProducer(): MessageChannel -} -``` -We use the name of the binding `my-producer` as the value of the `@Output` annotation. +class MyStreamEventProducer : Supplier>, MyEventProducer { + val sink = Sinks.many().unicast().onBackpressureBuffer() -#### 3) We create an implementation of `MyEventProducer` using a `MessageChannel`: -```kotlin -class MyStreamEventProducer(private val messageChannel: MessageChannel) : MyEventProducer { - override fun produce(event: MyEvent) { - val message = MessageBuilder.createMessage(toPayload(event), MessageHeaders(emptyMap())) - messageChannel.send(message) - } + override fun produce(event: MyEvent) { + sink.emitNext(toPayload(event), FAIL_FAST) + } - private fun toPayload(event: MyEvent): MyEventPayload { - return MyEventPayload(event.text, event.text.length) - } + override fun get(): Flux { + return sink.asFlux() + } + + private fun toPayload(event: MyEvent): MyEventPayload { + return MyEventPayload(event.text, event.text.length) + } } class MyEventPayload @JsonCreator constructor( - @JsonProperty("string") val string: String, - @JsonProperty("number") val number: Int + @JsonProperty("string") val string: String, + @JsonProperty("number") val number: Int ) ``` -We use a DTO `MyEventPayload` to specify how do we want the payload to be serialized to JSON (using [Jackson](https://github.com/FasterXML/jackson) annotations). - -We do a simple transformation between `MyEvent` and `MyEventPayload` just as an example. - -We use the `MessageChannel` to send the message. +* We use a DTO `MyEventPayload` to specify how do we want the payload to be serialized to JSON (using [Jackson](https://github.com/FasterXML/jackson) annotations). +* We do a simple transformation between `MyEvent` and `MyEventPayload` just as an example. +* Every time we emit a `MyEventPayload` through the `Flux`, Spring Cloud Stream will publish it to Kafka. -#### 4) We wire everything together: +### 3) Finally, we create an instance of `MyStreamEventProducer` naming it `my-producer` to link it to the function definition: ```kotlin -@EnableBinding(MyProducerBinding::class) -class MyConfiguration { - @Bean - fun myStreamEventProducer(binding: MyProducerBinding): MyEventProducer { - return MyStreamEventProducer(binding.myProducer()) - } +@Configuration +class MyConfiguration { + @Bean("my-producer") + fun myStreamEventProducer(): MyEventProducer { + return MyStreamEventProducer() + } } ``` -`@EnableBinding` annotation makes Spring to create an instance of `MyProducerBinding` with a `myProducer()` method returning a `MessageChannel` that will be linked to the `my-producer` binding thanks to the `@Output("my-producer")` annotation. -Then we create an instance of type `MyEventProducer` that we can use in our code. This instance is implemented by a `MyStreamEventProducer` that will use the `MessageChannel` linked to the `my-producer` binding. - -#### 5) For testing we start a Kafka container using [Testcontainers](https://www.testcontainers.org/): +### 4) For testing we start a Kafka container using [Testcontainers](https://www.testcontainers.org/): ```kotlin @SpringBootTest class MyApplicationShould { - // we inject MyEventProducer (it should be a MyStreamEventProducer) - @Autowired lateinit var eventProducer: MyEventProducer + // we inject MyEventProducer (it should be a MyStreamEventProducer) + @Autowired lateinit var eventProducer: MyEventProducer - @Test - fun `produce event`() { - // we produce using MyEventProducer - val text = "hello ${UUID.randomUUID()}" - eventProducer.produce(MyEvent(text)) - - // we consume from Kafka using a helper - val records = consumerHelper.consumeAtLeast(1, FIVE_SECONDS) - - // we verify the received json - assertThat(records) - .singleElement().satisfies { record -> - JSONAssert.assertEquals( - record.value(), - "{\"number\":${text.length},\"string\":\"$text\"}", - true - ) - } - } -} -``` -Check the complete test in [MyApplicationShould.kt](src/test/kotlin/com/rogervinas/stream/MyApplicationShould.kt). - -### Producer with functional programming model - -#### 1) We configure the binding `my-producer` in application.yml but declaring it as a function: -```yaml -spring: - cloud: - stream: - kafka: - binder: - brokers: "localhost:9094" - bindings: - my-producer-out-0: - destination: "my.topic" - function: - definition: "my-producer" -``` -As stated in [functional binding names](https://docs.spring.io/spring-cloud-stream/docs/3.1.2/reference/html/spring-cloud-stream.html#_functional_binding_names): `my-producer` is the function name, `out` is for output bindings and `0` is the index we have to use if we have a single function. - -#### 2) We create an implementation of `MyEventProducer` as a `Supplier` of `Flux`, to fulfill the interfaces that both our application and Spring Cloud Stream are expecting: -```kotlin -class MyStreamEventProducer : Supplier>, MyEventProducer { - val sink = Sinks.many().unicast().onBackpressureBuffer() - - override fun produce(event: MyEvent) { - sink.emitNext(toPayload(event), FAIL_FAST) - } + @Test + fun `produce event`() { + // we produce using MyEventProducer + val text = "hello ${UUID.randomUUID()}" + eventProducer.produce(MyEvent(text)) - override fun get(): Flux { - return sink.asFlux() - } + // we consume from Kafka using a helper + val records = consumerHelper.consumeAtLeast(1, FIVE_SECONDS) - private fun toPayload(event: MyEvent): MyEventPayload { - return MyEventPayload(event.text, event.text.length) - } -} -``` -Every time we emit a `MyEventPayload` through the `Flux`, Spring Cloud Stream will publish it to Kafka. - -#### 3) Finally, we create an instance of `MyStreamEventProducer` naming it `my-producer` to link it to the function definition: -```kotlin -@Configuration -class MyConfiguration { - @Bean("my-producer") - fun myStreamEventProducer(): MyEventProducer { - return MyStreamEventProducer() + // we verify the received json + assertThat(records).singleElement().satisfies { record -> + JSONAssert.assertEquals( + record.value(), + "{\"number\":${text.length},\"string\":\"$text\"}", + true + ) } + } } ``` +* Check the complete test in [MyApplicationShould.kt](src/test/kotlin/com/rogervinas/stream/MyApplicationShould.kt). -#### 4) Test [MyApplicationShould.kt](src/test/kotlin/com/rogervinas/stream/MyApplicationShould.kt) should work the same! - -## Consumer +## Consumer with functional programming model Our final goal is to consume messages from a Kafka topic. @@ -212,15 +146,13 @@ From the point of view of the application we want an interface `MyEventConsumer` class MyEvent(val text: String) interface MyEventConsumer { - fun consume(event: MyEvent) + fun consume(event: MyEvent) } ``` -Again we can first configure the binding using annotations (legacy way) and later we can change it to use functional interfaces. Checkout **legacy** tag in this repository if you want to go back to the legacy version. - -### Consumer with annotations (legacy) +Then we follow these steps: -#### 1) We configure the binding `my-consumer` in application.yml: +### 1) We configure the binding `my-consumer` in application.yml but declaring it as a function: ```yaml spring: cloud: @@ -229,146 +161,76 @@ spring: binder: brokers: "localhost:9094" bindings: - my-consumer: + my-consumer-in-0: destination: "my.topic" group: "${spring.application.name}" -``` -Remember that everything under `spring.cloud.kafka.binder` is related to the Kafka binder implementation and we can use all these extra [Kafka binder properties](https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.1/reference/html/spring-cloud-stream-binder-kafka.html#_kafka_binder_properties) and everything under `spring.cloud.stream.bindings` is related to the Spring Cloud Stream binding abstraction and we can use all these extra [binding properties](https://docs.spring.io/spring-cloud-stream/docs/3.1.1/reference/html/spring-cloud-stream.html#binding-properties). - -We configure a `group` because we want the application to consume from Kafka identifiying itself as a consumer group so if there were to be more than one instance of the application every message will be delivered to only one of the instances. - -Now we have to link the binding `my-consumer` with our implementation using annotations... ready? - -#### 2) We create an interface with a method annotated with `@Input` and returning a `SubscribableChannel`: -```kotlin -interface MyConsumerBinding { - @Input("my-consumer") - fun myConsumer(): SubscribableChannel -} + function: + definition: "my-consumer" ``` -We use the name of the binding `my-consumer` as the value of the `@Input` annotation. +* Remember that everything under `spring.cloud.kafka.binder` is related to the Kafka binder implementation and we can use all these extra [Kafka binder properties](https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/current/reference/html/spring-cloud-stream-binder-kafka.html#_kafka_binder_properties) and everything under `spring.cloud.stream.bindings` is related to the Spring Cloud Stream binding abstraction and we can use all these extra [binding properties](https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#binding-properties). +* We configure a `group` because we want the application to consume from Kafka identifiying itself as a consumer group so if there were to be more than one instance of the application every message will be delivered to only one of the instances. +* As stated in [functional binding names](https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_functional_binding_names): `my-consumer` is the function name, `in` is for input bindings and `0` is the index we have to use if we have a single function. -#### 3) We create a class `MyStreamEventConsumer` that will receive `MyEventPayload`, transform to `MyEvent` and redirect to a `MyEventConsumer`: +### 2) We create the same class `MyStreamEventConsumer` but implementing `Consumer` to fulfill the interface required by Spring Cloud Stream: ```kotlin -class MyStreamEventConsumer(private val consumer: MyEventConsumer) { - @StreamListener("my-consumer") - fun consume(payload: MyEventPayload) { - consumer.consume(fromPayload(payload)) - } - - private fun fromPayload(payload: MyEventPayload): MyEvent { - return MyEvent(payload.string) - } +class MyStreamEventConsumer(private val consumer: MyEventConsumer) : Consumer { + override fun accept(payload: MyEventPayload) { + consumer.consume(fromPayload(payload)) + } + + private fun fromPayload(payload: MyEventPayload): MyEvent { + return MyEvent(payload.string) + } } ``` -* `MyStreamEventConsumer` has a method `consume` annotated with `@StreamListener` linking it to the `my-consumer` binding. This means that every time a new message is received in the Kafka topic, its payload will be deserialized to a `MyEventPayload` (applying [Jackson](https://github.com/FasterXML/jackson) annotations) and the `consume` method will we called. +* Every time a new message is received in the Kafka topic, its payload will be deserialized to a `MyEventPayload` (applying [Jackson](https://github.com/FasterXML/jackson) annotations) and the `consume` method will we called. * Then the only thing we have to do is to tranform the `MyEventPayload` to a `MyEvent` and callback the generic `MyEventConsumer`. -#### 4) We wire everything together: +### 3) Finally, we create an instance of `MyStreamEventConsumer` naming it `my-consumer` to link it to the function definition: ```kotlin -@EnableBinding(MyConsumerBinding::class) +@Configuration class MyConfiguration { - @Bean - fun myStreamEventConsumer(consumer: MyEventConsumer): MyStreamEventConsumer { - return MyStreamEventConsumer(consumer) - } - - @Bean - fun myEventConsumer(): MyEventConsumer { - return object : MyEventConsumer { - override fun consume(event: MyEvent) { - println("Received ${event.text}") - } + @Bean("my-consumer") + fun myStreamEventConsumer(consumer: MyEventConsumer): MyStreamEventConsumer { + return MyStreamEventConsumer(consumer) + } + + @Bean + fun myEventConsumer(): MyEventConsumer { + return object : MyEventConsumer { + override fun consume(event: MyEvent) { + println("Received ${event.text}") } - } + } + } } ``` -`@EnableBinding` annotation makes Spring to create an instance of `MyConsumerBinding` and invoke its `myConsumer` method annotated with `@Input("my-consumer")` so we will have a `SubscribableChannel` up and running, listening to messages. +* We create a simple implementation of `MyEventConsumer` that justs prints the event. -We create an instance of type `MyStreamEventConsumer` and its method `consume` annotated with `@StreamListener("my-consumer")` will be linked automatically to the `SubscribableChannel`. - -We create a simple implementation of `MyEventConsumer` that justs prints the event. - -#### 5) For testing we start a Kafka container using [Testcontainers](https://www.testcontainers.org/): +### 4) For testing we start a Kafka container using [Testcontainers](https://www.testcontainers.org/): ```kotlin @SpringBootTest class MyApplicationShould { - // we mock MyEventConsumer - @MockBean lateinit var eventConsumer: MyEventConsumer - - @Test - fun `consume event`() { - // we send a Kafka message using a helper - val text = "hello ${UUID.randomUUID()}" - kafkaProducerHelper.send(TOPIC, "{\"number\":${text.length},\"string\":\"$text\"}") - - // we wait at most 5 seconds to receive the expected MyEvent in the MyEventConsumer mock - val eventCaptor = argumentCaptor() - verify(eventConsumer, timeout(FIVE_SECONDS.toMillis())).consume(eventCaptor.capture()) - - assertThat(eventCaptor.firstValue).satisfies { event -> assertThat(event.text).isEqualTo(text) } - } -} -``` -Check the complete test in [MyApplicationShould.kt](src/test/kotlin/com/rogervinas/stream/MyApplicationShould.kt). - -### Consumer with functional programming model - -#### 1) We configure the binding `my-consumer` in application.yml but declaring it as a function: -```yaml -spring: - cloud: - stream: - kafka: - binder: - brokers: "localhost:9094" - bindings: - my-consumer-in-0: - destination: "my.topic" - group: "${spring.application.name}" - function: - definition: "my-consumer" -``` -As stated in [functional binding names](https://docs.spring.io/spring-cloud-stream/docs/3.1.2/reference/html/spring-cloud-stream.html#_functional_binding_names): `my-consumer` is the function name, `in` is for input bindings and `0` is the index we have to use if we have a single function. - -#### 2) We create the same class `MyStreamEventConsumer` but implementing `Consumer` to fulfill the interface required by Spring Cloud Stream: -```kotlin -class MyStreamEventConsumer(private val consumer: MyEventConsumer) : Consumer { - override fun accept(payload: MyEventPayload) { - consumer.consume(fromPayload(payload)) - } + // we mock MyEventConsumer + @MockBean lateinit var eventConsumer: MyEventConsumer - private fun fromPayload(payload: MyEventPayload): MyEvent { - return MyEvent(payload.string) - } -} -``` -Every time a new message is received in the Kafka topic, its payload will be deserialized to a `MyEventPayload` (applying [Jackson](https://github.com/FasterXML/jackson) annotations) and the `consume` method will we called. + @Test + fun `consume event`() { + // we send a Kafka message using a helper + val text = "hello ${UUID.randomUUID()}" + kafkaProducerHelper.send(TOPIC, "{\"number\":${text.length},\"string\":\"$text\"}") -Then the only thing we have to do is to tranform the `MyEventPayload` to a `MyEvent` and callback the generic `MyEventConsumer`. + // we wait at most 5 seconds to receive the expected MyEvent in the MyEventConsumer mock + val eventCaptor = argumentCaptor() + verify(eventConsumer, timeout(FIVE_SECONDS.toMillis())).consume(eventCaptor.capture()) -#### 3) Finally, we create an instance of `MyStreamEventConsumer` naming it `my-consumer` to link it to the function definition: -```kotlin -@Configuration -class MyConfiguration { - @Bean("my-consumer") - fun myStreamEventConsumer(consumer: MyEventConsumer): MyStreamEventConsumer { - return MyStreamEventConsumer(consumer) - } - - @Bean - fun myEventConsumer(): MyEventConsumer { - return object : MyEventConsumer { - override fun consume(event: MyEvent) { - println("Received ${event.text}") - } - } - } + assertThat(eventCaptor.firstValue).satisfies { event -> + assertThat(event.text).isEqualTo(text) + } + } } ``` -We create a simple implementation of `MyEventConsumer` that justs prints the event. - -#### 4) Test [MyApplicationShould.kt](src/test/kotlin/com/rogervinas/stream/MyApplicationShould.kt) should work the same! +* Check the complete test in [MyApplicationShould.kt](src/test/kotlin/com/rogervinas/stream/MyApplicationShould.kt). ## Extras @@ -386,9 +248,9 @@ class MyStreamEventProducer : Supplier>>, MyEventPr // ... override fun produce(event: MyEvent) { val message = MessageBuilder - .withPayload(MyEventPayload(event.text, event.text.length)) - .setHeader(KafkaHeaders.MESSAGE_KEY, "key-${event.text.length}") - .build() + .withPayload(MyEventPayload(event.text, event.text.length)) + .setHeader(KafkaHeaders.MESSAGE_KEY, "key-${event.text.length}") + .build() sink.emitNext(message, FAIL_FAST) } // ... @@ -396,7 +258,7 @@ class MyStreamEventProducer : Supplier>>, MyEventPr ``` As we are setting a key of type `String` we should use a `StringSerializer` as `key.serializer`: -``` +```yaml spring: cloud: stream: @@ -411,31 +273,29 @@ And we can test it like this: ```kotlin @Test fun `produce event`() { - val text = "hello ${UUID.randomUUID()}" - eventProducer.produce(MyEvent(text)) - - val records = kafkaConsumerHelper.consumeAtLeast(1, TEN_SECONDS) - - assertThat(records) - .singleElement().satisfies { record -> - // check the message payload - JSONAssert.assertEquals( - record.value(), - "{\"number\":${text.length},\"string\":\"$text\"}", - true - ) - // check the message key - assertThat(record.key()) - .isEqualTo("key-${text.length}") - } + val text = "hello ${UUID.randomUUID()}" + eventProducer.produce(MyEvent(text)) + + val records = kafkaConsumerHelper.consumeAtLeast(1, TEN_SECONDS) + + assertThat(records).singleElement().satisfies { record -> + // check the message payload + JSONAssert.assertEquals( + record.value(), + "{\"number\":${text.length},\"string\":\"$text\"}", + true + ) + // check the message key + assertThat(record.key()) + .isEqualTo("key-${text.length}") + } } ``` - -Alternatively we can use `partitionKeyExpression` and other related [binding producer properties](https://docs.spring.io/spring-cloud-stream/docs/3.1.1/reference/html/spring-cloud-stream.html#_producer_properties) to achieve the same but at the binding abstraction level of Spring Cloud Stream. +* Alternatively we can use `partitionKeyExpression` and other related [binding producer properties](https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_producer_properties) to achieve the same but at the binding abstraction level of Spring Cloud Stream. ### Retries -If errors are thrown while consuming messages, we can tell Spring Cloud Stream what to do using the following [binding consumer properties](https://docs.spring.io/spring-cloud-stream/docs/3.1.1/reference/html/spring-cloud-stream.html#_consumer_properties): +If errors are thrown while consuming messages, we can tell Spring Cloud Stream what to do using the following [binding consumer properties](https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_consumer_properties): * **maxAttempts**: number of retries * **backOffInitialInterval**, **backOffMaxInterval**, **backOffMultiplier**: backoff parameters to increase delay between retries * **defaultRetryable**, **retryableExceptions**: which exceptions retry or not @@ -461,17 +321,19 @@ And we can test it like this: ```kotlin @Test fun `retry consume event 5 times`() { - // we throw a MyRetryableException every time we receive a message - doThrow(MyRetryableException("retry later!")).`when`(eventConsumer).consume(any()) - - // we send a Kafka message using a helper - val text = "hello ${UUID.randomUUID()}" - kafkaProducerHelper.send(TOPIC, "{\"number\":${text.length},\"string\":\"$text\"}") - - // consumer has been called five times with the same message - val eventCaptor = argumentCaptor() - verify(eventConsumer, timeout(TEN_SECONDS.toMillis()).times(FIVE)).consume(eventCaptor.capture()) - assertThat(eventCaptor.allValues).allSatisfy { event -> assertThat(event.text).isEqualTo(text) } + // we throw a MyRetryableException every time we receive a message + doThrow(MyRetryableException("retry later!")).`when`(eventConsumer).consume(any()) + + // we send a Kafka message using a helper + val text = "hello ${UUID.randomUUID()}" + kafkaProducerHelper.send(TOPIC, "{\"number\":${text.length},\"string\":\"$text\"}") + + // consumer has been called five times with the same message + val eventCaptor = argumentCaptor() + verify(eventConsumer, timeout(TEN_SECONDS.toMillis()).times(FIVE)).consume(eventCaptor.capture()) + assertThat(eventCaptor.allValues).allSatisfy { event -> + assertThat(event.text).isEqualTo(text) + } } ``` @@ -481,7 +343,7 @@ Additional to retries, DLQ is another mechanism we can use to deal with consumer In the case of Kafka it consists of sending to another topic all the messages that the consumer has rejected. -We can configure the DLQ using these [Kafka binder consumer properties](https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.1/reference/html/spring-cloud-stream-binder-kafka.html#kafka-consumer-properties): +We can configure the DLQ using these [Kafka binder consumer properties](https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/current/reference/html/spring-cloud-stream-binder-kafka.html#kafka-consumer-properties): * **enableDlq**: enable DLQ * **dlqName**: * **not set**: defaults to `error..` @@ -513,51 +375,49 @@ spring: And we can test it like this: -Application errors: +#### Application errors: ```kotlin @Test fun `send to DLQ rejected messages`() { - // we throw a MyRetryableException every time we receive a message - doThrow(MyRetryableException("retry later!")).`when`(eventConsumer).consume(any()) - - // we send a Kafka message using a helper - val text = "hello ${UUID.randomUUID()}" - kafkaProducerHelper.send(TOPIC, "{\"number\":${text.length},\"string\":\"$text\"}") - - // we check the message has been sent to the DLQ - val errorRecords = kafkaDLQConsumerHelper.consumeAtLeast(1, TEN_SECONDS) - assertThat(errorRecords) - .singleElement().satisfies { record -> - JSONAssert.assertEquals( - record.value(), - "{\"number\":${text.length},\"string\":\"$text\"}", - true - ) - } + // we throw a MyRetryableException every time we receive a message + doThrow(MyRetryableException("retry later!")).`when`(eventConsumer).consume(any()) + + // we send a Kafka message using a helper + val text = "hello ${UUID.randomUUID()}" + kafkaProducerHelper.send(TOPIC, "{\"number\":${text.length},\"string\":\"$text\"}") + + // we check the message has been sent to the DLQ + val errorRecords = kafkaDLQConsumerHelper.consumeAtLeast(1, TEN_SECONDS) + assertThat(errorRecords).singleElement().satisfies { record -> + JSONAssert.assertEquals( + record.value(), + "{\"number\":${text.length},\"string\":\"$text\"}", + true + ) + } } ``` -Message deserialization errors: +#### Message deserialization errors: ```kotlin @ParameterizedTest @ValueSource(strings = [ - "plain text", - "{\"unknownField\":\"not expected\"}" + "plain text", + "{\"unknownField\":\"not expected\"}" ]) fun `send to DLQ undeserializable messages`(body: String) { - // we send a Kafka message with an invalid body using a helper - kafkaProducerHelper.send(TOPIC, body) - - // we check the message has been sent to the DLQ - val errorRecords = kafkaDLQConsumerHelper.consumeAtLeast(1, TEN_SECONDS) - assertThat(errorRecords) - .singleElement().satisfies { record -> - assertThat(record.value()).isEqualTo(body) - } + // we send a Kafka message with an invalid body using a helper + kafkaProducerHelper.send(TOPIC, body) + + // we check the message has been sent to the DLQ + val errorRecords = kafkaDLQConsumerHelper.consumeAtLeast(1, TEN_SECONDS) + assertThat(errorRecords).singleElement().satisfies { record -> + assertThat(record.value()).isEqualTo(body) + } } ``` -That's it! Happy coding! +That's it! Happy coding! 💙 ## Test this demo @@ -579,15 +439,15 @@ Run with testcontainers: SPRING_PROFILES_ACTIVE=docker-compose ./gradlew bootRun ``` -Then you can use [kafkacat](https://github.com/edenhill/kafkacat) to produce/consume to/from Kafka: +Then you can use [kcat](https://github.com/edenhill/kcat) to produce/consume to/from Kafka: ```shell # consume -kafkacat -b localhost:9094 -C -t my.topic -kafkacat -b localhost:9094 -C -t my.topic.errors +kcat -b localhost:9094 -C -t my.topic +kcat -b localhost:9094 -C -t my.topic.errors # produce a valid message -echo '{"string":"hello!", "number":37}' | kafkacat -b localhost:9094 -P -t my.topic +echo '{"string":"hello!", "number":37}' | kcat -b localhost:9094 -P -t my.topic # produce an invalid message -echo 'hello!' | kafkacat -b localhost:9094 -P -t my.topic +echo 'hello!' | kcat -b localhost:9094 -P -t my.topic ``` diff --git a/src/main/kotlin/com/rogervinas/stream/MyApplication.kt b/src/main/kotlin/com/rogervinas/stream/MyApplication.kt index a9c61a4..3d8afd6 100644 --- a/src/main/kotlin/com/rogervinas/stream/MyApplication.kt +++ b/src/main/kotlin/com/rogervinas/stream/MyApplication.kt @@ -7,5 +7,5 @@ import org.springframework.boot.runApplication class MyApplication fun main(args: Array) { - runApplication(*args) -} \ No newline at end of file + runApplication(*args) +} diff --git a/src/main/kotlin/com/rogervinas/stream/domain/MyEventConsumer.kt b/src/main/kotlin/com/rogervinas/stream/domain/MyEventConsumer.kt index 00c7ff6..1d25c24 100644 --- a/src/main/kotlin/com/rogervinas/stream/domain/MyEventConsumer.kt +++ b/src/main/kotlin/com/rogervinas/stream/domain/MyEventConsumer.kt @@ -1,5 +1,5 @@ package com.rogervinas.stream.domain interface MyEventConsumer { - fun consume(event: MyEvent) -} \ No newline at end of file + fun consume(event: MyEvent) +} diff --git a/src/main/kotlin/com/rogervinas/stream/domain/MyEventProducer.kt b/src/main/kotlin/com/rogervinas/stream/domain/MyEventProducer.kt index bc9ce05..e396f35 100644 --- a/src/main/kotlin/com/rogervinas/stream/domain/MyEventProducer.kt +++ b/src/main/kotlin/com/rogervinas/stream/domain/MyEventProducer.kt @@ -1,5 +1,5 @@ package com.rogervinas.stream.domain interface MyEventProducer { - fun produce(event: MyEvent) -} \ No newline at end of file + fun produce(event: MyEvent) +} diff --git a/src/main/kotlin/com/rogervinas/stream/functional/MyConfiguration.kt b/src/main/kotlin/com/rogervinas/stream/functional/MyConfiguration.kt index a94a68e..1d2f1ac 100644 --- a/src/main/kotlin/com/rogervinas/stream/functional/MyConfiguration.kt +++ b/src/main/kotlin/com/rogervinas/stream/functional/MyConfiguration.kt @@ -9,22 +9,22 @@ import org.springframework.context.annotation.Configuration @Configuration class MyConfiguration { - @Bean("my-producer") - fun myStreamEventProducer(): MyEventProducer { - return MyStreamEventProducer() - } + @Bean("my-producer") + fun myStreamEventProducer(): MyEventProducer { + return MyStreamEventProducer() + } - @Bean("my-consumer") - fun myStreamEventConsumer(consumer: MyEventConsumer): MyStreamEventConsumer { - return MyStreamEventConsumer(consumer) - } + @Bean("my-consumer") + fun myStreamEventConsumer(consumer: MyEventConsumer): MyStreamEventConsumer { + return MyStreamEventConsumer(consumer) + } - @Bean - fun myEventConsumer(): MyEventConsumer { - return object : MyEventConsumer { - override fun consume(event: MyEvent) { - println("Received ${event.text}") - } - } + @Bean + fun myEventConsumer(): MyEventConsumer { + return object : MyEventConsumer { + override fun consume(event: MyEvent) { + println("Received ${event.text}") + } } -} \ No newline at end of file + } +} diff --git a/src/main/kotlin/com/rogervinas/stream/functional/MyStreamEventConsumer.kt b/src/main/kotlin/com/rogervinas/stream/functional/MyStreamEventConsumer.kt index 4c7d207..8952cd9 100644 --- a/src/main/kotlin/com/rogervinas/stream/functional/MyStreamEventConsumer.kt +++ b/src/main/kotlin/com/rogervinas/stream/functional/MyStreamEventConsumer.kt @@ -7,11 +7,11 @@ import java.util.function.Consumer class MyStreamEventConsumer(private val consumer: MyEventConsumer) : Consumer { - override fun accept(payload: MyEventPayload) { - consumer.consume(fromPayload(payload)) - } + override fun accept(payload: MyEventPayload) { + consumer.consume(fromPayload(payload)) + } - private fun fromPayload(payload: MyEventPayload): MyEvent { - return MyEvent(payload.string) - } -} \ No newline at end of file + private fun fromPayload(payload: MyEventPayload): MyEvent { + return MyEvent(payload.string) + } +} diff --git a/src/main/kotlin/com/rogervinas/stream/functional/MyStreamEventProducer.kt b/src/main/kotlin/com/rogervinas/stream/functional/MyStreamEventProducer.kt index 1502112..7417afc 100644 --- a/src/main/kotlin/com/rogervinas/stream/functional/MyStreamEventProducer.kt +++ b/src/main/kotlin/com/rogervinas/stream/functional/MyStreamEventProducer.kt @@ -13,25 +13,25 @@ import java.util.function.Supplier class MyStreamEventProducer : Supplier>>, MyEventProducer { - val sink = Sinks.many().unicast().onBackpressureBuffer>() + val sink = Sinks.many().unicast().onBackpressureBuffer>() - override fun produce(event: MyEvent) { - val message = MessageBuilder - .withPayload(toPayload(event)) - .setHeader(KafkaHeaders.MESSAGE_KEY, toKey(event)) - .build() - sink.emitNext(message, FAIL_FAST) - } + override fun produce(event: MyEvent) { + val message = MessageBuilder + .withPayload(toPayload(event)) + .setHeader(KafkaHeaders.MESSAGE_KEY, toKey(event)) + .build() + sink.emitNext(message, FAIL_FAST) + } - override fun get(): Flux> { - return sink.asFlux() - } + override fun get(): Flux> { + return sink.asFlux() + } - private fun toPayload(event: MyEvent): MyEventPayload { - return MyEventPayload(event.text, event.text.length) - } + private fun toPayload(event: MyEvent): MyEventPayload { + return MyEventPayload(event.text, event.text.length) + } - private fun toKey(event: MyEvent): String { - return "key-${event.text.length}" - } -} \ No newline at end of file + private fun toKey(event: MyEvent): String { + return "key-${event.text.length}" + } +} diff --git a/src/main/kotlin/com/rogervinas/stream/shared/MyContainers.kt b/src/main/kotlin/com/rogervinas/stream/shared/MyContainers.kt index d3d9b7c..7a33949 100644 --- a/src/main/kotlin/com/rogervinas/stream/shared/MyContainers.kt +++ b/src/main/kotlin/com/rogervinas/stream/shared/MyContainers.kt @@ -16,40 +16,40 @@ import javax.annotation.PreDestroy @Profile("docker-compose") class MyContainers { - private val KAFKA = "kafka" - private val KAFKA_PORT = 9094 - - private val ZOOKEEPER = "zookeeper" - private val ZOOKEEPER_PORT = 2181 - - private val container = DockerComposeContainer(File("docker-compose.yml")) - .apply { withLocalCompose(true) } - .apply { - withExposedService(KAFKA, KAFKA_PORT, WaitAllStrategy(WITH_INDIVIDUAL_TIMEOUTS_ONLY) - .apply { withStrategy(forListeningPortFixDockerDesktop322()) } - .apply { withStrategy(forLogMessage(".*creating topics.*", 1)) } - ) - } - .apply { - withExposedService(ZOOKEEPER, ZOOKEEPER_PORT, WaitAllStrategy(WITH_INDIVIDUAL_TIMEOUTS_ONLY) - .apply { withStrategy(forListeningPortFixDockerDesktop322()) } - .apply { withStrategy(forLogMessage(".*binding to port.*", 1)) } - ) - } - - private fun forListeningPortFixDockerDesktop322() = HostPortWaitStrategyFixDockerDesktop322() - - @PostConstruct - fun start() = container.start() - - @PreDestroy - fun stop() = container.stop() + private val KAFKA = "kafka" + private val KAFKA_PORT = 9094 + + private val ZOOKEEPER = "zookeeper" + private val ZOOKEEPER_PORT = 2181 + + private val container = DockerComposeContainer(File("docker-compose.yml")) + .apply { withLocalCompose(true) } + .apply { + withExposedService(KAFKA, KAFKA_PORT, WaitAllStrategy(WITH_INDIVIDUAL_TIMEOUTS_ONLY) + .apply { withStrategy(forListeningPortFixDockerDesktop322()) } + .apply { withStrategy(forLogMessage(".*creating topics.*", 1)) } + ) + } + .apply { + withExposedService(ZOOKEEPER, ZOOKEEPER_PORT, WaitAllStrategy(WITH_INDIVIDUAL_TIMEOUTS_ONLY) + .apply { withStrategy(forListeningPortFixDockerDesktop322()) } + .apply { withStrategy(forLogMessage(".*binding to port.*", 1)) } + ) + } + + private fun forListeningPortFixDockerDesktop322() = HostPortWaitStrategyFixDockerDesktop322() + + @PostConstruct + fun start() = container.start() + + @PreDestroy + fun stop() = container.stop() } class HostPortWaitStrategyFixDockerDesktop322 : HostPortWaitStrategy() { - override fun getLivenessCheckPorts(): MutableSet { - return super.getLivenessCheckPorts().stream() - .filter { port -> port > 0 } - .collect(Collectors.toSet()) - } -} \ No newline at end of file + override fun getLivenessCheckPorts(): MutableSet { + return super.getLivenessCheckPorts().stream() + .filter { port -> port > 0 } + .collect(Collectors.toSet()) + } +} diff --git a/src/main/kotlin/com/rogervinas/stream/shared/MyEventPayload.kt b/src/main/kotlin/com/rogervinas/stream/shared/MyEventPayload.kt index 97bac95..0c47977 100644 --- a/src/main/kotlin/com/rogervinas/stream/shared/MyEventPayload.kt +++ b/src/main/kotlin/com/rogervinas/stream/shared/MyEventPayload.kt @@ -4,6 +4,6 @@ import com.fasterxml.jackson.annotation.JsonCreator import com.fasterxml.jackson.annotation.JsonProperty class MyEventPayload @JsonCreator constructor( - @JsonProperty("string") val string: String, - @JsonProperty("number") val number: Int -) \ No newline at end of file + @JsonProperty("string") val string: String, + @JsonProperty("number") val number: Int +) diff --git a/src/test/kotlin/com/rogervinas/stream/MyApplicationShould.kt b/src/test/kotlin/com/rogervinas/stream/MyApplicationShould.kt index 8c84e34..addf41c 100644 --- a/src/test/kotlin/com/rogervinas/stream/MyApplicationShould.kt +++ b/src/test/kotlin/com/rogervinas/stream/MyApplicationShould.kt @@ -1,6 +1,10 @@ package com.rogervinas.stream -import com.nhaarman.mockito_kotlin.* +import com.nhaarman.mockito_kotlin.any +import com.nhaarman.mockito_kotlin.argumentCaptor +import com.nhaarman.mockito_kotlin.doThrow +import com.nhaarman.mockito_kotlin.timeout +import com.nhaarman.mockito_kotlin.verify import com.rogervinas.stream.domain.MyEvent import com.rogervinas.stream.domain.MyEventConsumer import com.rogervinas.stream.domain.MyEventProducer @@ -18,7 +22,7 @@ import org.springframework.boot.test.mock.mockito.MockBean import org.springframework.core.env.Environment import org.springframework.test.context.ActiveProfiles import java.time.Duration -import java.util.* +import java.util.UUID import java.util.function.Consumer @SpringBootTest(webEnvironment = DEFINED_PORT) diff --git a/src/test/kotlin/com/rogervinas/stream/MyKafkaConsumerHelper.kt b/src/test/kotlin/com/rogervinas/stream/MyKafkaConsumerHelper.kt index db06d82..99edba6 100644 --- a/src/test/kotlin/com/rogervinas/stream/MyKafkaConsumerHelper.kt +++ b/src/test/kotlin/com/rogervinas/stream/MyKafkaConsumerHelper.kt @@ -1,45 +1,49 @@ package com.rogervinas.stream -import org.apache.kafka.clients.consumer.* +import org.apache.kafka.clients.consumer.Consumer +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.clients.consumer.OffsetResetStrategy import org.apache.kafka.common.serialization.StringDeserializer import java.time.Duration -import java.util.* -import kotlin.collections.ArrayList +import java.util.Properties +import java.util.UUID class MyKafkaConsumerHelper(bootstrapServers: String, topic: String) { - private val consumer: Consumer - - companion object { - private const val MILLIS_POLL: Long = 250 - } - - init { - consumer = KafkaConsumer(consumerConfig(bootstrapServers)) - consumer.subscribe(arrayListOf(topic)) - } - - fun consumeAll(): List> { - return consumeAtLeast(1, Duration.ofSeconds(1)) - } - - fun consumeAtLeast(numberOfRecords: Int, timeout: Duration): List> { - val records: MutableList> = ArrayList() - var millisLeft = timeout.toMillis() - do { - consumer.poll(Duration.ofMillis(MILLIS_POLL)).iterator().forEachRemaining { records.add(it) } - millisLeft -= MILLIS_POLL - } while (millisLeft > 0 && records.size < numberOfRecords) - return records - } - - private fun consumerConfig(bootstrapServers: String): Properties { - val config = Properties() - config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()) - config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers) - config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java.name) - config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java.name) - config.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.name.toLowerCase()) - return config - } -} \ No newline at end of file + private val consumer: Consumer + + companion object { + private const val MILLIS_POLL: Long = 250 + } + + init { + consumer = KafkaConsumer(consumerConfig(bootstrapServers)) + consumer.subscribe(arrayListOf(topic)) + } + + fun consumeAll(): List> { + return consumeAtLeast(1, Duration.ofSeconds(1)) + } + + fun consumeAtLeast(numberOfRecords: Int, timeout: Duration): List> { + val records: MutableList> = ArrayList() + var millisLeft = timeout.toMillis() + do { + consumer.poll(Duration.ofMillis(MILLIS_POLL)).iterator().forEachRemaining { records.add(it) } + millisLeft -= MILLIS_POLL + } while (millisLeft > 0 && records.size < numberOfRecords) + return records + } + + private fun consumerConfig(bootstrapServers: String): Properties { + val config = Properties() + config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()) + config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers) + config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java.name) + config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java.name) + config.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.name.toLowerCase()) + return config + } +} diff --git a/src/test/kotlin/com/rogervinas/stream/MyKafkaProducerHelper.kt b/src/test/kotlin/com/rogervinas/stream/MyKafkaProducerHelper.kt index be32778..ffa50bb 100644 --- a/src/test/kotlin/com/rogervinas/stream/MyKafkaProducerHelper.kt +++ b/src/test/kotlin/com/rogervinas/stream/MyKafkaProducerHelper.kt @@ -9,19 +9,19 @@ import java.util.* class MyKafkaProducerHelper(bootstrapServers: String) { - private val producer: Producer + private val producer: Producer - init { - val config = Properties() - config[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name - config[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name - config[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers - producer = KafkaProducer(config) - } + init { + val config = Properties() + config[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name + config[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name + config[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers + producer = KafkaProducer(config) + } - @Throws(Exception::class) - fun send(topic: String?, body: String) { - producer.send(ProducerRecord(topic, body)).get() - producer.flush() - } -} \ No newline at end of file + @Throws(Exception::class) + fun send(topic: String?, body: String) { + producer.send(ProducerRecord(topic, body)).get() + producer.flush() + } +}