Skip to content

Commit

Permalink
More readme
Browse files Browse the repository at this point in the history
  • Loading branch information
rogervinas committed Dec 1, 2023
1 parent c06479d commit b0e7747
Showing 1 changed file with 8 additions and 9 deletions.
17 changes: 8 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ It offers an abstraction (the **binding**) that works the same whatever undernea
* **Amazon Kinesis**
* ...

Let's try to setup a simple example step by step and see how it works!
Let's try to set up a simple example step by step and see how it works!

This demo has been created using this [spring initializr configuration](https://start.spring.io/#!type=gradle-project&language=kotlin&packaging=jar&groupId=com.example&artifactId=demo&name=demo&description=Demo%20project%20for%20Spring%20Boot&packageName=com.example.demo&dependencies=cloud-stream,web) adding Kafka binder dependency **spring-cloud-starter-stream-kafka**.

Step by step:
* [Producer with functional programming model](#producer-with-functional-programming-model)
* [Consumer with functional programming model](#consumer-with-functional-programming-model)
* [Extras](#extras)
Expand All @@ -41,7 +42,7 @@ You can browse older versions of this repo:

Our final goal is to produce messages to a Kafka topic.

From the point of view of the application we want an interface **MyEventProducer** to produce events to a generic messaging system. These events will be of type **MyEvent**, just containing a **text** field to make it simpler:
From the point of view of the application we want an interface `MyEventProducer` to produce events to a generic messaging system. These events will be of type `MyEvent`, just containing a `text` field to make it simpler:
```kotlin
data class MyEvent(val text: String)

Expand Down Expand Up @@ -171,7 +172,7 @@ spring:
definition: "my-consumer"
```
* Remember that everything under `spring.cloud.kafka.binder` is related to the Kafka binder implementation and we can use all these extra [Kafka binder properties](https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/current/reference/html/spring-cloud-stream-binder-kafka.html#_kafka_binder_properties) and everything under `spring.cloud.stream.bindings` is related to the Spring Cloud Stream binding abstraction and we can use all these extra [binding properties](https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#binding-properties).
* We configure a `group` because we want the application to consume from Kafka identifiying itself as a consumer group so if there were to be more than one instance of the application every message will be delivered to only one of the instances.
* We configure a `group` because we want the application to consume from Kafka identifying itself as a consumer group so if there were to be more than one instance of the application every message will be delivered to only one of the instances.
* As stated in [functional binding names](https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_functional_binding_names): `my-consumer` is the function name, `in` is for input bindings and `0` is the index we have to use if we have a single function.

### 2) We create `MyStreamEventConsumer` to fulfill the interface required by Spring Cloud Stream:
Expand All @@ -198,11 +199,9 @@ class MyConfiguration {
}
}
@Bean
fun myStreamEventConsumer(consumer: MyEventConsumer) = MyStreamEventConsumer(consumer)
@Bean("my-consumer")
fun myStreamEventConsumerFunction(consumer: myStreamEventConsumer): (MyEventPayload) -> Unit = consumer
fun myStreamEventConsumerFunction(consumer: MyEventConsumer): (MyEventPayload) -> Unit =
MyStreamEventConsumer(consumer)
}
```
* We need an instance with type `(MyEventPayload) -> Unit` that will be bound to `my-consumer` function.
Expand Down Expand Up @@ -245,14 +244,14 @@ When a message is sent to a topic, Kafka chooses randomly the destination partit

This is important on the consumer side, because **chronological order of messages is only guaranteed within the same partition**, so if we need to consume some messages in the order they were produced, we should use the same key for all of them (i.e. for messages of a *user*, we use the *user* id as the message key).

To specify the message key in `MyStreamEventProducer` we can produce `Message<MyEventPayload>` instead of `MyEventPayload` and inform the `KafkaHeaders.KEY` header:
To specify the message key in `MyStreamEventProducer` we can produce `Message<MyEventPayload>` instead of `MyEventPayload` and inform the `KafkaHeaders.MESSAGE_KEY` header:
```kotlin
class MyStreamEventProducer : () -> Flux<Message<MyEventPayload>>, MyEventProducer {
// ...
override fun produce(event: MyEvent) {
val message = MessageBuilder
.withPayload(MyEventPayload(event.text, event.text.length))
.setHeader(KafkaHeaders.KEY, "key-${event.text.length}")
.setHeader(KafkaHeaders.MESSAGE_KEY, "key-${event.text.length}")
.build()
sink.emitNext(message, FAIL_FAST)
}
Expand Down

0 comments on commit b0e7747

Please sign in to comment.