diff --git a/README.md b/README.md index 8c44944..f465afa 100644 --- a/README.md +++ b/README.md @@ -15,10 +15,11 @@ It offers an abstraction (the **binding**) that works the same whatever undernea * **Amazon Kinesis** * ... -Let's try to setup a simple example step by step and see how it works! +Let's try to set up a simple example step by step and see how it works! This demo has been created using this [spring initializr configuration](https://start.spring.io/#!type=gradle-project&language=kotlin&packaging=jar&groupId=com.example&artifactId=demo&name=demo&description=Demo%20project%20for%20Spring%20Boot&packageName=com.example.demo&dependencies=cloud-stream,web) adding Kafka binder dependency **spring-cloud-starter-stream-kafka**. +Step by step: * [Producer with functional programming model](#producer-with-functional-programming-model) * [Consumer with functional programming model](#consumer-with-functional-programming-model) * [Extras](#extras) @@ -41,7 +42,7 @@ You can browse older versions of this repo: Our final goal is to produce messages to a Kafka topic. -From the point of view of the application we want an interface **MyEventProducer** to produce events to a generic messaging system. These events will be of type **MyEvent**, just containing a **text** field to make it simpler: +From the point of view of the application we want an interface `MyEventProducer` to produce events to a generic messaging system. These events will be of type `MyEvent`, just containing a `text` field to make it simpler: ```kotlin data class MyEvent(val text: String) @@ -171,7 +172,7 @@ spring: definition: "my-consumer" ``` * Remember that everything under `spring.cloud.kafka.binder` is related to the Kafka binder implementation and we can use all these extra [Kafka binder properties](https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/current/reference/html/spring-cloud-stream-binder-kafka.html#_kafka_binder_properties) and everything under `spring.cloud.stream.bindings` is related to the Spring Cloud Stream binding abstraction and we can use all these extra [binding properties](https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#binding-properties). -* We configure a `group` because we want the application to consume from Kafka identifiying itself as a consumer group so if there were to be more than one instance of the application every message will be delivered to only one of the instances. +* We configure a `group` because we want the application to consume from Kafka identifying itself as a consumer group so if there were to be more than one instance of the application every message will be delivered to only one of the instances. * As stated in [functional binding names](https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_functional_binding_names): `my-consumer` is the function name, `in` is for input bindings and `0` is the index we have to use if we have a single function. ### 2) We create `MyStreamEventConsumer` to fulfill the interface required by Spring Cloud Stream: @@ -198,11 +199,9 @@ class MyConfiguration { } } - @Bean - fun myStreamEventConsumer(consumer: MyEventConsumer) = MyStreamEventConsumer(consumer) - @Bean("my-consumer") - fun myStreamEventConsumerFunction(consumer: myStreamEventConsumer): (MyEventPayload) -> Unit = consumer + fun myStreamEventConsumerFunction(consumer: MyEventConsumer): (MyEventPayload) -> Unit = + MyStreamEventConsumer(consumer) } ``` * We need an instance with type `(MyEventPayload) -> Unit` that will be bound to `my-consumer` function. @@ -245,14 +244,14 @@ When a message is sent to a topic, Kafka chooses randomly the destination partit This is important on the consumer side, because **chronological order of messages is only guaranteed within the same partition**, so if we need to consume some messages in the order they were produced, we should use the same key for all of them (i.e. for messages of a *user*, we use the *user* id as the message key). -To specify the message key in `MyStreamEventProducer` we can produce `Message` instead of `MyEventPayload` and inform the `KafkaHeaders.KEY` header: +To specify the message key in `MyStreamEventProducer` we can produce `Message` instead of `MyEventPayload` and inform the `KafkaHeaders.MESSAGE_KEY` header: ```kotlin class MyStreamEventProducer : () -> Flux>, MyEventProducer { // ... override fun produce(event: MyEvent) { val message = MessageBuilder .withPayload(MyEventPayload(event.text, event.text.length)) - .setHeader(KafkaHeaders.KEY, "key-${event.text.length}") + .setHeader(KafkaHeaders.MESSAGE_KEY, "key-${event.text.length}") .build() sink.emitNext(message, FAIL_FAST) }