Skip to content

Commit

Permalink
Improve readme
Browse files Browse the repository at this point in the history
  • Loading branch information
rogervinas committed Nov 10, 2023
1 parent f6b8cdb commit fdb8cff
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 38 deletions.
65 changes: 34 additions & 31 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ Our final goal is to produce messages to a Kafka topic.

From the point of view of the application we want an interface `MyEventProducer` to produce events to a generic messaging system. These events will be of type `MyEvent`, just containing a `text` field to make it simpler:
```kotlin
class MyEvent(val text: String)
data class MyEvent(val text: String)

interface MyEventProducer {
fun produce(event: MyEvent)
interface MyEventProducer {
fun produce(event: MyEvent)
}
```

Expand All @@ -73,7 +73,7 @@ spring:
### 2) We create an implementation of `MyEventProducer` as a `Supplier` of `Flux<MyEventPayload>`, to fulfill the interfaces that both our application and Spring Cloud Stream are expecting:
```kotlin
class MyStreamEventProducer : Supplier<Flux<MyEventPayload>>, MyEventProducer {
val sink = Sinks.many().unicast().onBackpressureBuffer<MyEventPayload>()
private val sink = Sinks.many().unicast().onBackpressureBuffer<MyEventPayload>()
override fun produce(event: MyEvent) {
sink.emitNext(toPayload(event), FAIL_FAST)
Expand All @@ -88,25 +88,30 @@ class MyStreamEventProducer : Supplier<Flux<MyEventPayload>>, MyEventProducer {
}
}
class MyEventPayload @JsonCreator constructor(
@JsonProperty("string") val string: String,
@JsonProperty("number") val number: Int
data class MyEventPayload(
val string: String,
val number: Int
)
```
* We use a DTO `MyEventPayload` to specify how do we want the payload to be serialized to JSON (using [Jackson](https://github.com/FasterXML/jackson) annotations).
* We use a DTO `MyEventPayload` to specify how do we want the payload to be serialized to JSON. In this case we don't need to but we could use [Jackson](https://github.com/FasterXML/jackson) annotations if we wanted to customize the JSON serialization.
* We do a simple transformation between `MyEvent` and `MyEventPayload` just as an example.
* Every time we emit a `MyEventPayload` through the `Flux`, Spring Cloud Stream will publish it to Kafka.

### 3) Finally, we create an instance of `MyStreamEventProducer` naming it `my-producer` to link it to the function definition:
### 3) Finally, we configure the beans needed to link `my-producer` function definition:
```kotlin
@Configuration
class MyConfiguration {
@Bean
fun myStreamEventProducer() = MyStreamEventProducer()
@Bean("my-producer")
fun myStreamEventProducer(): MyEventProducer {
return MyStreamEventProducer()
}
fun myStreamEventProducerFunction(producer: MyStreamEventProducer): () -> Flux<Message<MyEventPayload>> =
producer::get
}
```
* We create a `MyStreamEventProducer` that will be injected wherever a `MyEventProducer` is needed.
* We create a lambda returning a `Flux<Message<MyEventPayload>>` that will be linked to the `my-producer` function, implemented by calling `myStreamEventProducer.get()` method.
* We do not merge both beans in one to avoid issues with `KotlinLambdaToFunctionAutoConfiguration`.

### 4) For testing we start a Kafka container using [Testcontainers](https://www.testcontainers.org/):
```kotlin
Expand Down Expand Up @@ -143,7 +148,7 @@ Our final goal is to consume messages from a Kafka topic.

From the point of view of the application we want an interface `MyEventConsumer` to be called every time an event is consumed from a generic messaging system. These events will be of type `MyEvent` like in the producer example:
```kotlin
class MyEvent(val text: String)
data class MyEvent(val text: String)

interface MyEventConsumer {
fun consume(event: MyEvent)
Expand All @@ -152,7 +157,7 @@ interface MyEventConsumer {

Then we follow these steps:

### 1) We configure the binding `my-consumer` in application.yml but declaring it as a function:
### 1) We configure the binding `my-consumer` in application.yml declaring it as a function:
```yaml
spring:
cloud:
Expand All @@ -171,10 +176,10 @@ spring:
* We configure a `group` because we want the application to consume from Kafka identifiying itself as a consumer group so if there were to be more than one instance of the application every message will be delivered to only one of the instances.
* As stated in [functional binding names](https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_functional_binding_names): `my-consumer` is the function name, `in` is for input bindings and `0` is the index we have to use if we have a single function.

### 2) We create the same class `MyStreamEventConsumer` but implementing `Consumer<MyEventPayload>` to fulfill the interface required by Spring Cloud Stream:
### 2) We create `MyStreamEventConsumer` to fulfill the interface required by Spring Cloud Stream:
```kotlin
class MyStreamEventConsumer(private val consumer: MyEventConsumer) : Consumer<MyEventPayload> {
override fun accept(payload: MyEventPayload) {
class MyStreamEventConsumer(private val consumer: MyEventConsumer) : (MyEventPayload) -> Unit {
override fun invoke(payload: MyEventPayload) {
consumer.consume(fromPayload(payload))
}
Expand All @@ -183,28 +188,26 @@ class MyStreamEventConsumer(private val consumer: MyEventConsumer) : Consumer<My
}
}
```
* Every time a new message is received in the Kafka topic, its payload will be deserialized to a `MyEventPayload` (applying [Jackson](https://github.com/FasterXML/jackson) annotations) and the `consume` method will we called.
* Every time a new message is received in the Kafka topic, its payload will be deserialized to a `MyEventPayload` and the `invoke` method will we called.
* Then the only thing we have to do is to tranform the `MyEventPayload` to a `MyEvent` and callback the generic `MyEventConsumer`.

### 3) Finally, we create an instance of `MyStreamEventConsumer` naming it `my-consumer` to link it to the function definition:
### 3) Finally, we configure the beans needed to link `my-consumer` function definition:
```kotlin
@Configuration
class MyConfiguration {
@Bean("my-consumer")
fun myStreamEventConsumer(consumer: MyEventConsumer): MyStreamEventConsumer {
return MyStreamEventConsumer(consumer)
}
@Bean
fun myEventConsumer(): MyEventConsumer {
return object : MyEventConsumer {
override fun consume(event: MyEvent) {
println("Received ${event.text}")
}
fun myEventConsumer() = object : MyEventConsumer {
override fun consume(event: MyEvent) {
println("Received ${event.text}")
}
}
@Bean("my-consumer")
fun myStreamEventConsumerFunction(consumer: MyEventConsumer): (MyEventPayload) -> Unit =
MyStreamEventConsumer(consumer)
}
```
* We create a lambda receiving a `MyEventPayload` that will be linked to the `my-consumer` function, implemented by a `MyStreamEventConsumer`.
* We create a simple implementation of `MyEventConsumer` that justs prints the event.

### 4) For testing we start a Kafka container using [Testcontainers](https://www.testcontainers.org/):
Expand Down Expand Up @@ -242,14 +245,14 @@ When a message is sent to a topic, Kafka chooses randomly the destination partit

This is important on the consumer side, because **chronological order of messages is only guaranteed within the same partition**, so if we need to consume some messages in the order they were produced, we should use the same key for all of them (i.e. for messages of a *user*, we use the *user* id as the message key).

To specify the message key in `MyStreamEventProducer` we can produce `Message<MyEventPayload>` instead of `MyEventPayload` and inform the `KafkaHeaders.MESSAGE_KEY` header:
To specify the message key in `MyStreamEventProducer` we can produce `Message<MyEventPayload>` instead of `MyEventPayload` and inform the `KafkaHeaders.KEY` header:
```kotlin
class MyStreamEventProducer : Supplier<Flux<Message<MyEventPayload>>>, MyEventProducer {
// ...
override fun produce(event: MyEvent) {
val message = MessageBuilder
.withPayload(MyEventPayload(event.text, event.text.length))
.setHeader(KafkaHeaders.MESSAGE_KEY, "key-${event.text.length}")
.setHeader(KafkaHeaders.KEY, "key-${event.text.length}")
.build()
sink.emitNext(message, FAIL_FAST)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,21 @@ import reactor.core.publisher.Flux
@Configuration
class MyConfiguration {

@Bean("my-consumer")
fun myStreamEventConsumerFunction(consumer: MyEventConsumer): (MyEventPayload) -> Unit =
MyStreamEventConsumer(consumer)

@Bean
fun myEventConsumer() = object : MyEventConsumer {
override fun consume(event: MyEvent) {
println("Received ${event.text}")
}
}

@Bean("my-producer")
fun myStreamEventProducerFunction(producer: MyStreamEventProducer): () -> Flux<Message<MyEventPayload>> =
producer::get
@Bean("my-consumer")
fun myStreamEventConsumerFunction(consumer: MyEventConsumer): (MyEventPayload) -> Unit =
MyStreamEventConsumer(consumer)

@Bean
fun myStreamEventProducer() = MyStreamEventProducer()

@Bean("my-producer")
fun myStreamEventProducerFunction(producer: MyStreamEventProducer): () -> Flux<Message<MyEventPayload>> =
producer::get
}

0 comments on commit fdb8cff

Please sign in to comment.