Skip to content

Commit

Permalink
Update spring-boot (#12)
Browse files Browse the repository at this point in the history
* Update spring-boot

* More updates

* Improve readme
  • Loading branch information
rogervinas authored Nov 10, 2023
1 parent 7d257fa commit cd42a55
Show file tree
Hide file tree
Showing 16 changed files with 360 additions and 310 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/gradle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ jobs:
- name: Set up Java
uses: actions/setup-java@v1
with:
java-version: 11
java-version: '21'
distribution: 'temurin'
- name: Gradle cache
uses: actions/cache@v2
with:
Expand Down
73 changes: 38 additions & 35 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
![CI](https://github.com/rogervinas/spring-cloud-stream-kafka-step-by-step/actions/workflows/gradle.yml/badge.svg)
![Java](https://img.shields.io/badge/Java-11-blue?labelColor=black)
![Kotlin](https://img.shields.io/badge/Kotlin-1.6.10-blue?labelColor=black)
![SpringBoot](https://img.shields.io/badge/SpringBoot-2.6.2-blue?labelColor=black)
![SpringCloudStream](https://img.shields.io/badge/SpringCloudStream-3.2.1-blue?labelColor=black)
![Java](https://img.shields.io/badge/Java-21-blue?labelColor=black)
![Kotlin](https://img.shields.io/badge/Kotlin-1.9.20-blue?labelColor=black)
![SpringBoot](https://img.shields.io/badge/SpringBoot-3.1.5-blue?labelColor=black)
![SpringCloudStream](https://img.shields.io/badge/SpringCloudStream-4.0.4-blue?labelColor=black)

# Spring Cloud Stream & Kafka binder step by step

Expand Down Expand Up @@ -43,10 +43,10 @@ Our final goal is to produce messages to a Kafka topic.

From the point of view of the application we want an interface `MyEventProducer` to produce events to a generic messaging system. These events will be of type `MyEvent`, just containing a `text` field to make it simpler:
```kotlin
class MyEvent(val text: String)
data class MyEvent(val text: String)

interface MyEventProducer {
fun produce(event: MyEvent)
interface MyEventProducer {
fun produce(event: MyEvent)
}
```

Expand All @@ -73,7 +73,7 @@ spring:
### 2) We create an implementation of `MyEventProducer` as a `Supplier` of `Flux<MyEventPayload>`, to fulfill the interfaces that both our application and Spring Cloud Stream are expecting:
```kotlin
class MyStreamEventProducer : Supplier<Flux<MyEventPayload>>, MyEventProducer {
val sink = Sinks.many().unicast().onBackpressureBuffer<MyEventPayload>()
private val sink = Sinks.many().unicast().onBackpressureBuffer<MyEventPayload>()
override fun produce(event: MyEvent) {
sink.emitNext(toPayload(event), FAIL_FAST)
Expand All @@ -88,25 +88,30 @@ class MyStreamEventProducer : Supplier<Flux<MyEventPayload>>, MyEventProducer {
}
}
class MyEventPayload @JsonCreator constructor(
@JsonProperty("string") val string: String,
@JsonProperty("number") val number: Int
data class MyEventPayload(
val string: String,
val number: Int
)
```
* We use a DTO `MyEventPayload` to specify how do we want the payload to be serialized to JSON (using [Jackson](https://github.com/FasterXML/jackson) annotations).
* We use a DTO `MyEventPayload` to specify how do we want the payload to be serialized to JSON. In this case we don't need to but we could use [Jackson](https://github.com/FasterXML/jackson) annotations if we wanted to customize the JSON serialization.
* We do a simple transformation between `MyEvent` and `MyEventPayload` just as an example.
* Every time we emit a `MyEventPayload` through the `Flux`, Spring Cloud Stream will publish it to Kafka.

### 3) Finally, we create an instance of `MyStreamEventProducer` naming it `my-producer` to link it to the function definition:
### 3) Finally, we configure the beans needed to link `my-producer` function definition:
```kotlin
@Configuration
class MyConfiguration {
@Bean
fun myStreamEventProducer() = MyStreamEventProducer()
@Bean("my-producer")
fun myStreamEventProducer(): MyEventProducer {
return MyStreamEventProducer()
}
fun myStreamEventProducerFunction(producer: MyStreamEventProducer): () -> Flux<Message<MyEventPayload>> =
producer::get
}
```
* We create a `MyStreamEventProducer` that will be injected wherever a `MyEventProducer` is needed.
* We create a lambda returning a `Flux<Message<MyEventPayload>>` that will be linked to the `my-producer` function, implemented by calling `myStreamEventProducer.get()` method.
* We do not merge both beans in one to avoid issues with `KotlinLambdaToFunctionAutoConfiguration`.

### 4) For testing we start a Kafka container using [Testcontainers](https://www.testcontainers.org/):
```kotlin
Expand Down Expand Up @@ -143,7 +148,7 @@ Our final goal is to consume messages from a Kafka topic.

From the point of view of the application we want an interface `MyEventConsumer` to be called every time an event is consumed from a generic messaging system. These events will be of type `MyEvent` like in the producer example:
```kotlin
class MyEvent(val text: String)
data class MyEvent(val text: String)

interface MyEventConsumer {
fun consume(event: MyEvent)
Expand All @@ -152,7 +157,7 @@ interface MyEventConsumer {

Then we follow these steps:

### 1) We declare the binding `my-consumer` in application.yml:
### 1) We configure the binding `my-consumer` in application.yml declaring it as a function:
```yaml
spring:
cloud:
Expand All @@ -171,10 +176,10 @@ spring:
* We configure a `group` because we want the application to consume from Kafka identifiying itself as a consumer group so if there were to be more than one instance of the application every message will be delivered to only one of the instances.
* As stated in [functional binding names](https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_functional_binding_names): `my-consumer` is the function name, `in` is for input bindings and `0` is the index we have to use if we have a single function.

### 2) We create the same class `MyStreamEventConsumer` implementing `Consumer<MyEventPayload>` to fulfill the interface required by Spring Cloud Stream:
### 2) We create `MyStreamEventConsumer` to fulfill the interface required by Spring Cloud Stream:
```kotlin
class MyStreamEventConsumer(private val consumer: MyEventConsumer) : Consumer<MyEventPayload> {
override fun accept(payload: MyEventPayload) {
class MyStreamEventConsumer(private val consumer: MyEventConsumer) : (MyEventPayload) -> Unit {
override fun invoke(payload: MyEventPayload) {
consumer.consume(fromPayload(payload))
}
Expand All @@ -183,28 +188,26 @@ class MyStreamEventConsumer(private val consumer: MyEventConsumer) : Consumer<My
}
}
```
* Every time a new message is received in the Kafka topic, its payload will be deserialized to a `MyEventPayload` (applying [Jackson](https://github.com/FasterXML/jackson) annotations) and the `consume` method will we called.
* Every time a new message is received in the Kafka topic, its payload will be deserialized to a `MyEventPayload` and the `invoke` method will we called.
* Then the only thing we have to do is to tranform the `MyEventPayload` to a `MyEvent` and callback the generic `MyEventConsumer`.

### 3) Finally, we create an instance of `MyStreamEventConsumer` naming it `my-consumer` to link it to the function definition:
### 3) Finally, we configure the beans needed to link `my-consumer` function definition:
```kotlin
@Configuration
class MyConfiguration {
@Bean("my-consumer")
fun myStreamEventConsumer(consumer: MyEventConsumer): MyStreamEventConsumer {
return MyStreamEventConsumer(consumer)
}
@Bean
fun myEventConsumer(): MyEventConsumer {
return object : MyEventConsumer {
override fun consume(event: MyEvent) {
println("Received ${event.text}")
}
fun myEventConsumer() = object : MyEventConsumer {
override fun consume(event: MyEvent) {
println("Received ${event.text}")
}
}
@Bean("my-consumer")
fun myStreamEventConsumerFunction(consumer: MyEventConsumer): (MyEventPayload) -> Unit =
MyStreamEventConsumer(consumer)
}
```
* We create a lambda receiving a `MyEventPayload` that will be linked to the `my-consumer` function, implemented by a `MyStreamEventConsumer`.
* We create a simple implementation of `MyEventConsumer` that justs prints the event.

### 4) For testing we start a Kafka container using [Testcontainers](https://www.testcontainers.org/):
Expand Down Expand Up @@ -242,14 +245,14 @@ When a message is sent to a topic, Kafka chooses randomly the destination partit

This is important on the consumer side, because **chronological order of messages is only guaranteed within the same partition**, so if we need to consume some messages in the order they were produced, we should use the same key for all of them (i.e. for messages of a *user*, we use the *user* id as the message key).

To specify the message key in `MyStreamEventProducer` we can produce `Message<MyEventPayload>` instead of `MyEventPayload` and inform the `KafkaHeaders.MESSAGE_KEY` header:
To specify the message key in `MyStreamEventProducer` we can produce `Message<MyEventPayload>` instead of `MyEventPayload` and inform the `KafkaHeaders.KEY` header:
```kotlin
class MyStreamEventProducer : Supplier<Flux<Message<MyEventPayload>>>, MyEventProducer {
// ...
override fun produce(event: MyEvent) {
val message = MessageBuilder
.withPayload(MyEventPayload(event.text, event.text.length))
.setHeader(KafkaHeaders.MESSAGE_KEY, "key-${event.text.length}")
.setHeader(KafkaHeaders.KEY, "key-${event.text.length}")
.build()
sink.emitNext(message, FAIL_FAST)
}
Expand Down
17 changes: 9 additions & 8 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,22 @@ import org.gradle.api.tasks.testing.logging.TestLogEvent.*
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile

plugins {
id("org.springframework.boot") version "2.6.2"
id("io.spring.dependency-management") version "1.0.11.RELEASE"
kotlin("jvm") version "1.6.10"
kotlin("plugin.spring") version "1.6.10"
id("org.springframework.boot") version "3.1.5"
id("io.spring.dependency-management") version "1.1.3"
kotlin("jvm") version "1.9.20"
kotlin("plugin.spring") version "1.9.20"
}

group = "com.rogervinas"
version = "0.0.1-SNAPSHOT"
java.sourceCompatibility = JavaVersion.VERSION_11
java.sourceCompatibility = JavaVersion.VERSION_21
java.targetCompatibility = JavaVersion.VERSION_21

repositories {
mavenCentral()
}

extra["springCloudVersion"] = "2021.0.0"
extra["springCloudVersion"] = "2022.0.4"

dependencies {
implementation("org.springframework.boot:spring-boot-starter-web")
Expand All @@ -43,7 +44,7 @@ dependencyManagement {
tasks.withType<KotlinCompile> {
kotlinOptions {
freeCompilerArgs = listOf("-Xjsr305=strict")
jvmTarget = "11"
jvmTarget = "21"
}
}

Expand All @@ -55,5 +56,5 @@ tasks.withType<Test> {
showExceptions = true
showCauses = true
showStackTraces = true
}
}
}
Binary file modified gradle/wrapper/gradle-wrapper.jar
Binary file not shown.
4 changes: 3 additions & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-7.3.3-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-8.4-bin.zip
networkTimeout=10000
validateDistributionUrl=true
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
Loading

0 comments on commit cd42a55

Please sign in to comment.