diff --git a/README.md b/README.md index 26a6358..db2ca99 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ It offers an abstraction (the **binding**) that works the same whatever undernea Let's try to setup a simple example step by step and see how it works! -This demo has been created using this [spring initializr configuration](https://start.spring.io/#!type=gradle-project&language=kotlin&packaging=jar&groupId=com.example&artifactId=demo&name=demo&description=Demo%20project%20for%20Spring%20Boot&packageName=com.example.demo&dependencies=cloud-stream,web) adding Kafka binder dependency `spring-cloud-starter-stream-kafka`. +This demo has been created using this [spring initializr configuration](https://start.spring.io/#!type=gradle-project&language=kotlin&packaging=jar&groupId=com.example&artifactId=demo&name=demo&description=Demo%20project%20for%20Spring%20Boot&packageName=com.example.demo&dependencies=cloud-stream,web) adding Kafka binder dependency **spring-cloud-starter-stream-kafka**. * [Producer with functional programming model](#producer-with-functional-programming-model) * [Consumer with functional programming model](#consumer-with-functional-programming-model) @@ -41,7 +41,7 @@ You can browse older versions of this repo: Our final goal is to produce messages to a Kafka topic. -From the point of view of the application we want an interface `MyEventProducer` to produce events to a generic messaging system. These events will be of type `MyEvent`, just containing a `text` field to make it simpler: +From the point of view of the application we want an interface **MyEventProducer** to produce events to a generic messaging system. These events will be of type **MyEvent**, just containing a **text** field to make it simpler: ```kotlin data class MyEvent(val text: String) @@ -70,22 +70,18 @@ spring: * Everything under `spring.cloud.stream.bindings` is related to the Spring Cloud Stream binding abstraction and we can use all these extra [binding properties](https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#binding-properties). * As stated in [functional binding names](https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_functional_binding_names): `my-producer` is the function name, `out` is for output bindings and `0` is the index we have to use if we have a single function. -### 2) We create an implementation of `MyEventProducer` as a `Supplier` of `Flux`, to fulfill the interfaces that both our application and Spring Cloud Stream are expecting: +### 2) We create an implementation of `MyEventProducer` as a Kotlin lambda `() -> Flux`, to fulfill the interfaces that both our application and Spring Cloud Stream are expecting: ```kotlin -class MyStreamEventProducer : Supplier>, MyEventProducer { +class MyStreamEventProducer : () -> Flux, MyEventProducer { private val sink = Sinks.many().unicast().onBackpressureBuffer() override fun produce(event: MyEvent) { sink.emitNext(toPayload(event), FAIL_FAST) } - override fun get(): Flux { - return sink.asFlux() - } + override fun invoke() = sink.asFlux() - private fun toPayload(event: MyEvent): MyEventPayload { - return MyEventPayload(event.text, event.text.length) - } + private fun toPayload(event: MyEvent) = MyEventPayload(event.text, event.text.length) } data class MyEventPayload( @@ -105,31 +101,33 @@ class MyConfiguration { fun myStreamEventProducer() = MyStreamEventProducer() @Bean("my-producer") - fun myStreamEventProducerFunction(producer: MyStreamEventProducer): () -> Flux> = - producer::get + fun myStreamEventProducerFunction(producer: MyStreamEventProducer): () -> Flux = producer } ``` -* We create a `MyStreamEventProducer` that will be injected wherever a `MyEventProducer` is needed. -* We create a lambda returning a `Flux>` that will be linked to the `my-producer` function, implemented by calling `myStreamEventProducer.get()` method. -* We do not merge both beans in one to avoid issues with `KotlinLambdaToFunctionAutoConfiguration`. +* Both beans return the same instance ... why? + * We need an instance with type `MyStreamEventProducer` that will be injected wherever a `MyEventProducer` is needed. + * We need an instance with type `() -> Flux` that will be bound to `my-producer` function. + * As we are using **Kotlin** we need to define it as a lambda (required by **KotlinLambdaToFunctionAutoConfiguration**). + * If we were using **Java** we should define it as `Supplier>`. ### 4) For testing we start a Kafka container using [Testcontainers](https://www.testcontainers.org/): ```kotlin @SpringBootTest class MyApplicationShould { - // we inject MyEventProducer (it should be a MyStreamEventProducer) - @Autowired lateinit var eventProducer: MyEventProducer + @Autowired // We inject MyEventProducer (it should be a MyStreamEventProducer) + @Qualifier("myStreamEventProducer") // Avoid SpringBootTest issue: expected single matching bean but found 2 + lateinit var eventProducer: MyEventProducer @Test fun `produce event`() { - // we produce using MyEventProducer + // We produce an event using MyEventProducer val text = "hello ${UUID.randomUUID()}" eventProducer.produce(MyEvent(text)) - // we consume from Kafka using a helper + // We consume from Kafka using a helper val records = consumerHelper.consumeAtLeast(1, FIVE_SECONDS) - // we verify the received json + // We verify the received json assertThat(records).singleElement().satisfies { record -> JSONAssert.assertEquals( record.value(), @@ -183,13 +181,11 @@ class MyStreamEventConsumer(private val consumer: MyEventConsumer) : (MyEventPay consumer.consume(fromPayload(payload)) } - private fun fromPayload(payload: MyEventPayload): MyEvent { - return MyEvent(payload.string) - } + private fun fromPayload(payload: MyEventPayload) = MyEvent(payload.string) } ``` * Every time a new message is received in the Kafka topic, its payload will be deserialized to a `MyEventPayload` and the `invoke` method will we called. -* Then the only thing we have to do is to tranform the `MyEventPayload` to a `MyEvent` and callback the generic `MyEventConsumer`. +* Then the only thing we have to do is to transform the `MyEventPayload` to a `MyEvent` and callback the generic `MyEventConsumer`. ### 3) Finally, we configure the beans needed to link `my-consumer` function definition: ```kotlin @@ -202,28 +198,32 @@ class MyConfiguration { } } + @Bean + fun myStreamEventConsumer(consumer: MyEventConsumer) = MyStreamEventConsumer(consumer) + @Bean("my-consumer") - fun myStreamEventConsumerFunction(consumer: MyEventConsumer): (MyEventPayload) -> Unit = - MyStreamEventConsumer(consumer) + fun myStreamEventConsumerFunction(consumer: myStreamEventConsumer): (MyEventPayload) -> Unit = consumer } ``` -* We create a lambda receiving a `MyEventPayload` that will be linked to the `my-consumer` function, implemented by a `MyStreamEventConsumer`. -* We create a simple implementation of `MyEventConsumer` that justs prints the event. +* We need an instance with type `(MyEventPayload) -> Unit` that will be bound to `my-consumer` function. + * As we are using **Kotlin** we need to define it as a lambda (required by **KotlinLambdaToFunctionAutoConfiguration**). + * If we were using **Java** we should define it as `Consumer`. +* We create a simple implementation of `MyEventConsumer` that just prints the event. ### 4) For testing we start a Kafka container using [Testcontainers](https://www.testcontainers.org/): ```kotlin @SpringBootTest class MyApplicationShould { - // we mock MyEventConsumer - @MockBean lateinit var eventConsumer: MyEventConsumer + @MockBean // We mock MyEventConsumer + lateinit var eventConsumer: MyEventConsumer @Test fun `consume event`() { - // we send a Kafka message using a helper + // We send a Kafka message using a helper val text = "hello ${UUID.randomUUID()}" kafkaProducerHelper.send(TOPIC, "{\"number\":${text.length},\"string\":\"$text\"}") - // we wait at most 5 seconds to receive the expected MyEvent in the MyEventConsumer mock + // We wait at most 5 seconds to receive the expected MyEvent in MyEventConsumer mock val eventCaptor = argumentCaptor() verify(eventConsumer, timeout(FIVE_SECONDS.toMillis())).consume(eventCaptor.capture()) @@ -247,7 +247,7 @@ This is important on the consumer side, because **chronological order of message To specify the message key in `MyStreamEventProducer` we can produce `Message` instead of `MyEventPayload` and inform the `KafkaHeaders.KEY` header: ```kotlin -class MyStreamEventProducer : Supplier>>, MyEventProducer { +class MyStreamEventProducer : () -> Flux>, MyEventProducer { // ... override fun produce(event: MyEvent) { val message = MessageBuilder diff --git a/src/main/kotlin/com/rogervinas/stream/MyApplication.kt b/src/main/kotlin/com/rogervinas/stream/MyApplication.kt index 3d8afd6..3be3b13 100644 --- a/src/main/kotlin/com/rogervinas/stream/MyApplication.kt +++ b/src/main/kotlin/com/rogervinas/stream/MyApplication.kt @@ -1,11 +1,18 @@ package com.rogervinas.stream +import com.rogervinas.stream.domain.MyEventProducer import org.springframework.boot.autoconfigure.SpringBootApplication import org.springframework.boot.runApplication +import org.springframework.stereotype.Component @SpringBootApplication class MyApplication +@Component +class MyApplicationUseCase(val producer: MyEventProducer) { + // This class just ensures that in a real scenario we can inject a MyEventProducer +} + fun main(args: Array) { runApplication(*args) } diff --git a/src/main/kotlin/com/rogervinas/stream/functional/MyConfiguration.kt b/src/main/kotlin/com/rogervinas/stream/functional/MyConfiguration.kt index 85778bf..a5d9d2b 100644 --- a/src/main/kotlin/com/rogervinas/stream/functional/MyConfiguration.kt +++ b/src/main/kotlin/com/rogervinas/stream/functional/MyConfiguration.kt @@ -26,6 +26,5 @@ class MyConfiguration { fun myStreamEventProducer() = MyStreamEventProducer() @Bean("my-producer") - fun myStreamEventProducerFunction(producer: MyStreamEventProducer): () -> Flux> = - producer::get + fun myStreamEventProducerFunction(producer: MyStreamEventProducer): () -> Flux> = producer } diff --git a/src/main/kotlin/com/rogervinas/stream/functional/MyStreamEventProducer.kt b/src/main/kotlin/com/rogervinas/stream/functional/MyStreamEventProducer.kt index ec5de3f..12d69d0 100644 --- a/src/main/kotlin/com/rogervinas/stream/functional/MyStreamEventProducer.kt +++ b/src/main/kotlin/com/rogervinas/stream/functional/MyStreamEventProducer.kt @@ -9,9 +9,8 @@ import org.springframework.messaging.support.MessageBuilder import reactor.core.publisher.Flux import reactor.core.publisher.Sinks import reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST -import java.util.function.Supplier -class MyStreamEventProducer : Supplier>>, MyEventProducer { +class MyStreamEventProducer : () -> Flux>, MyEventProducer { private val sink = Sinks.many().unicast().onBackpressureBuffer>() @@ -23,15 +22,9 @@ class MyStreamEventProducer : Supplier>>, MyEventPr sink.emitNext(message, FAIL_FAST) } - override fun get(): Flux> { - return sink.asFlux() - } + override fun invoke() = sink.asFlux() - private fun toPayload(event: MyEvent): MyEventPayload { - return MyEventPayload(event.text, event.text.length) - } + private fun toPayload(event: MyEvent) = MyEventPayload(event.text, event.text.length) - private fun toKey(event: MyEvent): String { - return "key-${event.text.length}" - } + private fun toKey(event: MyEvent) = "key-${event.text.length}" } diff --git a/src/test/kotlin/com/rogervinas/stream/MyApplicationShould.kt b/src/test/kotlin/com/rogervinas/stream/MyApplicationShould.kt index eef57fb..26ba5e7 100644 --- a/src/test/kotlin/com/rogervinas/stream/MyApplicationShould.kt +++ b/src/test/kotlin/com/rogervinas/stream/MyApplicationShould.kt @@ -16,8 +16,9 @@ import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource import org.skyscreamer.jsonassert.JSONAssert import org.springframework.beans.factory.annotation.Autowired +import org.springframework.beans.factory.annotation.Qualifier import org.springframework.boot.test.context.SpringBootTest -import org.springframework.boot.test.context.SpringBootTest.WebEnvironment.DEFINED_PORT +import org.springframework.boot.test.context.SpringBootTest.WebEnvironment.NONE import org.springframework.boot.test.mock.mockito.MockBean import org.springframework.core.env.Environment import org.springframework.test.context.ActiveProfiles @@ -25,7 +26,7 @@ import java.time.Duration import java.util.UUID import java.util.function.Consumer -@SpringBootTest(webEnvironment = DEFINED_PORT) +@SpringBootTest(webEnvironment = NONE) @ActiveProfiles("docker-compose") class MyApplicationShould { @@ -41,6 +42,7 @@ class MyApplicationShould { lateinit var env: Environment @Autowired + @Qualifier("myStreamEventProducer") // Avoid SpringBootTest issue: expected single matching bean but found 2 lateinit var eventProducer: MyEventProducer @MockBean