Skip to content

Commit

Permalink
More readme (#23)
Browse files Browse the repository at this point in the history
* More readme

* Improve things

* More things

* More things
  • Loading branch information
rogervinas authored Dec 1, 2023
1 parent ff34f20 commit ccf9e5e
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 48 deletions.
66 changes: 33 additions & 33 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ It offers an abstraction (the **binding**) that works the same whatever undernea

Let's try to setup a simple example step by step and see how it works!

This demo has been created using this [spring initializr configuration](https://start.spring.io/#!type=gradle-project&language=kotlin&packaging=jar&groupId=com.example&artifactId=demo&name=demo&description=Demo%20project%20for%20Spring%20Boot&packageName=com.example.demo&dependencies=cloud-stream,web) adding Kafka binder dependency `spring-cloud-starter-stream-kafka`.
This demo has been created using this [spring initializr configuration](https://start.spring.io/#!type=gradle-project&language=kotlin&packaging=jar&groupId=com.example&artifactId=demo&name=demo&description=Demo%20project%20for%20Spring%20Boot&packageName=com.example.demo&dependencies=cloud-stream,web) adding Kafka binder dependency **spring-cloud-starter-stream-kafka**.

* [Producer with functional programming model](#producer-with-functional-programming-model)
* [Consumer with functional programming model](#consumer-with-functional-programming-model)
Expand All @@ -41,7 +41,7 @@ You can browse older versions of this repo:

Our final goal is to produce messages to a Kafka topic.

From the point of view of the application we want an interface `MyEventProducer` to produce events to a generic messaging system. These events will be of type `MyEvent`, just containing a `text` field to make it simpler:
From the point of view of the application we want an interface **MyEventProducer** to produce events to a generic messaging system. These events will be of type **MyEvent**, just containing a **text** field to make it simpler:
```kotlin
data class MyEvent(val text: String)

Expand Down Expand Up @@ -70,22 +70,18 @@ spring:
* Everything under `spring.cloud.stream.bindings` is related to the Spring Cloud Stream binding abstraction and we can use all these extra [binding properties](https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#binding-properties).
* As stated in [functional binding names](https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_functional_binding_names): `my-producer` is the function name, `out` is for output bindings and `0` is the index we have to use if we have a single function.

### 2) We create an implementation of `MyEventProducer` as a `Supplier` of `Flux<MyEventPayload>`, to fulfill the interfaces that both our application and Spring Cloud Stream are expecting:
### 2) We create an implementation of `MyEventProducer` as a Kotlin lambda `() -> Flux<MyEventPayload>`, to fulfill the interfaces that both our application and Spring Cloud Stream are expecting:
```kotlin
class MyStreamEventProducer : Supplier<Flux<MyEventPayload>>, MyEventProducer {
class MyStreamEventProducer : () -> Flux<MyEventPayload>, MyEventProducer {
private val sink = Sinks.many().unicast().onBackpressureBuffer<MyEventPayload>()
override fun produce(event: MyEvent) {
sink.emitNext(toPayload(event), FAIL_FAST)
}
override fun get(): Flux<MyEventPayload> {
return sink.asFlux()
}
override fun invoke() = sink.asFlux()
private fun toPayload(event: MyEvent): MyEventPayload {
return MyEventPayload(event.text, event.text.length)
}
private fun toPayload(event: MyEvent) = MyEventPayload(event.text, event.text.length)
}
data class MyEventPayload(
Expand All @@ -105,31 +101,33 @@ class MyConfiguration {
fun myStreamEventProducer() = MyStreamEventProducer()
@Bean("my-producer")
fun myStreamEventProducerFunction(producer: MyStreamEventProducer): () -> Flux<Message<MyEventPayload>> =
producer::get
fun myStreamEventProducerFunction(producer: MyStreamEventProducer): () -> Flux<MyEventPayload> = producer
}
```
* We create a `MyStreamEventProducer` that will be injected wherever a `MyEventProducer` is needed.
* We create a lambda returning a `Flux<Message<MyEventPayload>>` that will be linked to the `my-producer` function, implemented by calling `myStreamEventProducer.get()` method.
* We do not merge both beans in one to avoid issues with `KotlinLambdaToFunctionAutoConfiguration`.
* Both beans return the same instance ... why?
* We need an instance with type `MyStreamEventProducer` that will be injected wherever a `MyEventProducer` is needed.
* We need an instance with type `() -> Flux<MyEventPayload>` that will be bound to `my-producer` function.
* As we are using **Kotlin** we need to define it as a lambda (required by **KotlinLambdaToFunctionAutoConfiguration**).
* If we were using **Java** we should define it as `Supplier<Flux<MyEventPayload>>`.

### 4) For testing we start a Kafka container using [Testcontainers](https://www.testcontainers.org/):
```kotlin
@SpringBootTest
class MyApplicationShould {
// we inject MyEventProducer (it should be a MyStreamEventProducer)
@Autowired lateinit var eventProducer: MyEventProducer
@Autowired // We inject MyEventProducer (it should be a MyStreamEventProducer)
@Qualifier("myStreamEventProducer") // Avoid SpringBootTest issue: expected single matching bean but found 2
lateinit var eventProducer: MyEventProducer
@Test
fun `produce event`() {
// we produce using MyEventProducer
// We produce an event using MyEventProducer
val text = "hello ${UUID.randomUUID()}"
eventProducer.produce(MyEvent(text))

// we consume from Kafka using a helper
// We consume from Kafka using a helper
val records = consumerHelper.consumeAtLeast(1, FIVE_SECONDS)

// we verify the received json
// We verify the received json
assertThat(records).singleElement().satisfies { record ->
JSONAssert.assertEquals(
record.value(),
Expand Down Expand Up @@ -183,13 +181,11 @@ class MyStreamEventConsumer(private val consumer: MyEventConsumer) : (MyEventPay
consumer.consume(fromPayload(payload))
}
private fun fromPayload(payload: MyEventPayload): MyEvent {
return MyEvent(payload.string)
}
private fun fromPayload(payload: MyEventPayload) = MyEvent(payload.string)
}
```
* Every time a new message is received in the Kafka topic, its payload will be deserialized to a `MyEventPayload` and the `invoke` method will we called.
* Then the only thing we have to do is to tranform the `MyEventPayload` to a `MyEvent` and callback the generic `MyEventConsumer`.
* Then the only thing we have to do is to transform the `MyEventPayload` to a `MyEvent` and callback the generic `MyEventConsumer`.

### 3) Finally, we configure the beans needed to link `my-consumer` function definition:
```kotlin
Expand All @@ -202,28 +198,32 @@ class MyConfiguration {
}
}
@Bean
fun myStreamEventConsumer(consumer: MyEventConsumer) = MyStreamEventConsumer(consumer)
@Bean("my-consumer")
fun myStreamEventConsumerFunction(consumer: MyEventConsumer): (MyEventPayload) -> Unit =
MyStreamEventConsumer(consumer)
fun myStreamEventConsumerFunction(consumer: myStreamEventConsumer): (MyEventPayload) -> Unit = consumer
}
```
* We create a lambda receiving a `MyEventPayload` that will be linked to the `my-consumer` function, implemented by a `MyStreamEventConsumer`.
* We create a simple implementation of `MyEventConsumer` that justs prints the event.
* We need an instance with type `(MyEventPayload) -> Unit` that will be bound to `my-consumer` function.
* As we are using **Kotlin** we need to define it as a lambda (required by **KotlinLambdaToFunctionAutoConfiguration**).
* If we were using **Java** we should define it as `Consumer<MyEventPayload>`.
* We create a simple implementation of `MyEventConsumer` that just prints the event.

### 4) For testing we start a Kafka container using [Testcontainers](https://www.testcontainers.org/):
```kotlin
@SpringBootTest
class MyApplicationShould {
// we mock MyEventConsumer
@MockBean lateinit var eventConsumer: MyEventConsumer
@MockBean // We mock MyEventConsumer
lateinit var eventConsumer: MyEventConsumer
@Test
fun `consume event`() {
// we send a Kafka message using a helper
// We send a Kafka message using a helper
val text = "hello ${UUID.randomUUID()}"
kafkaProducerHelper.send(TOPIC, "{\"number\":${text.length},\"string\":\"$text\"}")

// we wait at most 5 seconds to receive the expected MyEvent in the MyEventConsumer mock
// We wait at most 5 seconds to receive the expected MyEvent in MyEventConsumer mock
val eventCaptor = argumentCaptor<MyEvent>()
verify(eventConsumer, timeout(FIVE_SECONDS.toMillis())).consume(eventCaptor.capture())

Expand All @@ -247,7 +247,7 @@ This is important on the consumer side, because **chronological order of message

To specify the message key in `MyStreamEventProducer` we can produce `Message<MyEventPayload>` instead of `MyEventPayload` and inform the `KafkaHeaders.KEY` header:
```kotlin
class MyStreamEventProducer : Supplier<Flux<Message<MyEventPayload>>>, MyEventProducer {
class MyStreamEventProducer : () -> Flux<Message<MyEventPayload>>, MyEventProducer {
// ...
override fun produce(event: MyEvent) {
val message = MessageBuilder
Expand Down
7 changes: 7 additions & 0 deletions src/main/kotlin/com/rogervinas/stream/MyApplication.kt
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
package com.rogervinas.stream

import com.rogervinas.stream.domain.MyEventProducer
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.stereotype.Component

@SpringBootApplication
class MyApplication

@Component
class MyApplicationUseCase(val producer: MyEventProducer) {
// This class just ensures that in a real scenario we can inject a MyEventProducer
}

fun main(args: Array<String>) {
runApplication<MyApplication>(*args)
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,5 @@ class MyConfiguration {
fun myStreamEventProducer() = MyStreamEventProducer()

@Bean("my-producer")
fun myStreamEventProducerFunction(producer: MyStreamEventProducer): () -> Flux<Message<MyEventPayload>> =
producer::get
fun myStreamEventProducerFunction(producer: MyStreamEventProducer): () -> Flux<Message<MyEventPayload>> = producer
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@ import org.springframework.messaging.support.MessageBuilder
import reactor.core.publisher.Flux
import reactor.core.publisher.Sinks
import reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST
import java.util.function.Supplier

class MyStreamEventProducer : Supplier<Flux<Message<MyEventPayload>>>, MyEventProducer {
class MyStreamEventProducer : () -> Flux<Message<MyEventPayload>>, MyEventProducer {

private val sink = Sinks.many().unicast().onBackpressureBuffer<Message<MyEventPayload>>()

Expand All @@ -23,15 +22,9 @@ class MyStreamEventProducer : Supplier<Flux<Message<MyEventPayload>>>, MyEventPr
sink.emitNext(message, FAIL_FAST)
}

override fun get(): Flux<Message<MyEventPayload>> {
return sink.asFlux()
}
override fun invoke() = sink.asFlux()

private fun toPayload(event: MyEvent): MyEventPayload {
return MyEventPayload(event.text, event.text.length)
}
private fun toPayload(event: MyEvent) = MyEventPayload(event.text, event.text.length)

private fun toKey(event: MyEvent): String {
return "key-${event.text.length}"
}
private fun toKey(event: MyEvent) = "key-${event.text.length}"
}
6 changes: 4 additions & 2 deletions src/test/kotlin/com/rogervinas/stream/MyApplicationShould.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,17 @@ import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.skyscreamer.jsonassert.JSONAssert
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment.DEFINED_PORT
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment.NONE
import org.springframework.boot.test.mock.mockito.MockBean
import org.springframework.core.env.Environment
import org.springframework.test.context.ActiveProfiles
import java.time.Duration
import java.util.UUID
import java.util.function.Consumer

@SpringBootTest(webEnvironment = DEFINED_PORT)
@SpringBootTest(webEnvironment = NONE)
@ActiveProfiles("docker-compose")
class MyApplicationShould {

Expand All @@ -41,6 +42,7 @@ class MyApplicationShould {
lateinit var env: Environment

@Autowired
@Qualifier("myStreamEventProducer") // Avoid SpringBootTest issue: expected single matching bean but found 2
lateinit var eventProducer: MyEventProducer

@MockBean
Expand Down

0 comments on commit ccf9e5e

Please sign in to comment.