Skip to content

Commit

Permalink
Use lambdas
Browse files Browse the repository at this point in the history
  • Loading branch information
rogervinas committed Dec 1, 2023
1 parent 18f90cb commit 7d45160
Show file tree
Hide file tree
Showing 8 changed files with 29 additions and 27 deletions.
28 changes: 16 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,23 +85,25 @@ spring:
cloud:
stream:
bindings:
supplier-out-0:
myProducer-out-0:
destination: sensor-topic
producer:
useNativeEncoding: true
kafka:
binder:
brokers: localhost:9092
bindings:
supplier-out-0:
myProducer-out-0:
producer:
configuration:
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
schema.registry.url: http://localhost:8081
```
Then Spring Cloud Stream:
* Will expect us to implement a @Bean named `supplier` of type `java.util.function.Supplier` returning a value or a `reactor.core.publisher.Flux` of values.
* Will expect us to implement a @Bean named `myProducer` returning a value or a `Flux` of values:
* In **Kotlin** we can use a lambda `() -> Value` or `() -> Flux<Value>`.
* In **Java** we can use a `Supplier<Value>` or `Supplier<Flux<Value>>`.
* Will call this @Bean one or many times to retrieve the values to be published.
* Will connect to a kafka broker on `localhost:9092`.
* Will use the confluent `KafkaAvroSerializer` and connect to the schema registry server on `localhost:8081`.
Expand All @@ -117,7 +119,7 @@ class Application {
@Autowired private lateinit var random: Random
@Bean
fun supplier() = Supplier { unbounded.poll() }
fun myProducer(): () -> Sensor = { unbounded.poll() }
@RequestMapping(value = ["/messages"], method = [RequestMethod.POST])
fun sendMessage(): String {
Expand Down Expand Up @@ -256,15 +258,15 @@ spring:
cloud:
stream:
bindings:
process-in-0:
myConsumer-in-0:
destination: sensor-topic
consumer:
useNativeDecoding: true
kafka:
binder:
brokers: localhost:9092
bindings:
process-in-0:
myConsumer-in-0:
consumer:
configuration:
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
Expand All @@ -273,7 +275,9 @@ spring:
```
Spring Cloud Stream:
* Will expect us to implement a @Bean named `process` of type `java.util.function.Consumer` accepting a value.
* Will expect us to implement a @Bean named `myConsumer` accepting a value:
* In **Kotlin** we can use a lambda `(Value) -> Unit`.
* In **Java** we can use a `Consumer<Value>`.
* Will call this @Bean every time a value is consumed.
* Will connect to a kafka broker on `localhost:9092`.
* Will use the confluent `KafkaAvroDeserializer` and connect to the schema registry server on `localhost:8081`.
Expand All @@ -283,7 +287,7 @@ So for the sake of a demo the implementation can be as simple as:
@SpringBootApplication
class Application {
@Bean
fun process() = Consumer { input: Sensor -> println("Consumed $input") }
fun myConsumer(): (Sensor) -> Unit = { println("Consumed $it") }
}
fun main(args: Array<String>) {
Expand All @@ -299,8 +303,8 @@ To produce test messages we will use a simple [KafkaProducer using the Avro Seri

First of all we will mock the `process` @Bean so we can verify it has been called:
```kotlin
@MockBean(name = "process")
private lateinit var process: Consumer<Sensor>
@MockBean(name = "myConsumer")
private lateinit var myConsumer: Consumer<Sensor>
```

Then we test that we can consume Sensor v1 messages:
Expand Down Expand Up @@ -335,7 +339,7 @@ fun `should consume sensor v1 message`() {

produceRecord(id, recordV1)

verify(process, timeout(TIMEOUT.toMillis()))
verify(myConsumer, timeout(TIMEOUT.toMillis()))
.accept(Sensor(id, temperature, 0f, acceleration, velocity))
}
```
Expand Down Expand Up @@ -379,7 +383,7 @@ fun `should consume sensor v2 message`() {

produceRecord(id, recordV2)

verify(process, timeout(TIMEOUT.toMillis()))
verify(myConsumer, timeout(TIMEOUT.toMillis()))
.accept(Sensor(id, internalTemperature, externalTemperature, acceleration, velocity))
}
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import java.util.function.Consumer
@SpringBootApplication
class Application {
@Bean
fun process() = Consumer { input: Sensor -> println("Consumed $input") }
fun myConsumer(): (Sensor) -> Unit = { println("Consumed $it") }
}

fun main(args: Array<String>) {
Expand Down
4 changes: 2 additions & 2 deletions consumer/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ spring:
cloud:
stream:
bindings:
process-in-0:
myConsumer-in-0:
destination: sensor-topic
consumer:
useNativeDecoding: true
kafka:
binder:
brokers: localhost:9092
bindings:
process-in-0:
myConsumer-in-0:
consumer:
configuration:
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ class ConsumerApplicationTest {
.withExposedService("schema-registry", SCHEMA_REGISTRY_PORT, forListeningPort())
}

@MockBean(name = "process")
private lateinit var process: Consumer<Sensor>
@MockBean(name = "myConsumer")
private lateinit var myConsumer: Consumer<Sensor>

@Test
fun `should consume sensor v1 message`() {
Expand Down Expand Up @@ -76,7 +76,7 @@ class ConsumerApplicationTest {

produceRecord(id, recordV1)

verify(process, timeout(TIMEOUT.toMillis()))
verify(myConsumer, timeout(TIMEOUT.toMillis()))
.accept(Sensor(id, temperature, 0f, acceleration, velocity))
}

Expand Down Expand Up @@ -117,7 +117,7 @@ class ConsumerApplicationTest {

produceRecord(id, recordV2)

verify(process, timeout(TIMEOUT.toMillis()))
verify(myConsumer, timeout(TIMEOUT.toMillis()))
.accept(Sensor(id, internalTemperature, externalTemperature, acceleration, velocity))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import org.springframework.web.bind.annotation.RequestMethod
import org.springframework.web.bind.annotation.RestController
import java.util.concurrent.BlockingQueue
import java.util.concurrent.LinkedBlockingQueue
import java.util.function.Supplier
import kotlin.random.Random

@SpringBootApplication
Expand All @@ -23,7 +22,7 @@ class Application {
@Autowired private lateinit var random: Random

@Bean
fun supplier() = Supplier { unbounded.poll() }
fun myProducer(): () -> Sensor = { unbounded.poll() }

@RequestMapping(value = ["/messages"], method = [RequestMethod.POST])
fun sendMessage(): String {
Expand Down
4 changes: 2 additions & 2 deletions producer1/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ spring:
cloud:
stream:
bindings:
supplier-out-0:
myProducer-out-0:
destination: sensor-topic
producer:
useNativeEncoding: true
kafka:
binder:
brokers: localhost:9092
bindings:
supplier-out-0:
myProducer-out-0:
producer:
configuration:
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import org.springframework.web.bind.annotation.RequestMethod
import org.springframework.web.bind.annotation.RestController
import java.util.concurrent.BlockingQueue
import java.util.concurrent.LinkedBlockingQueue
import java.util.function.Supplier
import kotlin.random.Random

@SpringBootApplication
Expand All @@ -23,7 +22,7 @@ class Application {
@Autowired private lateinit var random: Random

@Bean
fun supplier() = Supplier { unbounded.poll() }
fun myProducer(): () -> Sensor = { unbounded.poll() }

@RequestMapping(value = ["/messages"], method = [RequestMethod.POST])
fun sendMessage(): String {
Expand Down
4 changes: 2 additions & 2 deletions producer2/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ spring:
cloud:
stream:
bindings:
supplier-out-0:
myProducer-out-0:
destination: sensor-topic
producer:
useNativeEncoding: true
kafka:
binder:
brokers: localhost:9092
bindings:
supplier-out-0:
myProducer-out-0:
producer:
configuration:
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
Expand Down

0 comments on commit 7d45160

Please sign in to comment.