Skip to content

Commit

Permalink
Merge branch 'master' into update-gradle-wrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
rogervinas committed Jun 19, 2024
2 parents 329045f + d21d376 commit 3b5c2ad
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 87 deletions.
23 changes: 11 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,16 @@ class MyConfiguration {

### 4) For testing we start a Kafka container using [Testcontainers](https://www.testcontainers.org/):
```kotlin
@SpringBootTest
class MyApplicationShould {
@SpringBootTest(webEnvironment = NONE)
@Testcontainers
@ActiveProfiles("test")
class MyApplicationIntegrationTest {
@Autowired // We inject MyEventProducer (it should be a MyStreamEventProducer)
@Qualifier("myStreamEventProducer") // Avoid SpringBootTest issue: expected single matching bean but found 2
lateinit var eventProducer: MyEventProducer
@Test
fun `produce event`() {
fun `should produce event`() {
// We produce an event using MyEventProducer
val text = "hello ${UUID.randomUUID()}"
eventProducer.produce(MyEvent(text))
Expand All @@ -139,7 +141,7 @@ class MyApplicationShould {
}
}
```
* Check the complete test in [MyApplicationShould.kt](src/test/kotlin/com/rogervinas/stream/MyApplicationShould.kt).
* Check the complete test in [MyApplicationIntegrationTest.kt](src/test/kotlin/com/rogervinas/stream/MyApplicationIntegrationTest.kt).

## Consumer with functional programming model

Expand Down Expand Up @@ -211,8 +213,10 @@ class MyConfiguration {

### 4) For testing we start a Kafka container using [Testcontainers](https://www.testcontainers.org/):
```kotlin
@SpringBootTest
class MyApplicationShould {
@SpringBootTest(webEnvironment = NONE)
@Testcontainers
@ActiveProfiles("test")
class MyApplicationIntegrationTest {
@MockBean // We mock MyEventConsumer
lateinit var eventConsumer: MyEventConsumer
Expand All @@ -232,7 +236,7 @@ class MyApplicationShould {
}
}
```
* Check the complete test in [MyApplicationShould.kt](src/test/kotlin/com/rogervinas/stream/MyApplicationShould.kt).
* Check the complete test in [MyApplicationIntegrationTest.kt](src/test/kotlin/com/rogervinas/stream/MyApplicationIntegrationTest.kt).

## Extras

Expand Down Expand Up @@ -436,11 +440,6 @@ docker-compose up -d
docker-compose down
```

Run with testcontainers:
```shell
SPRING_PROFILES_ACTIVE=docker-compose ./gradlew bootRun
```

Then you can use [kcat](https://github.com/edenhill/kcat) to produce/consume to/from Kafka:
```shell
# consume
Expand Down
4 changes: 3 additions & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ repositories {
}

val springCloudVersion = "2023.0.2"
val testContainersVersion = "1.19.8"

dependencies {
implementation("org.springframework.boot:spring-boot-starter-web")
Expand All @@ -29,7 +30,8 @@ dependencies {
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")

implementation("org.testcontainers:testcontainers:1.19.8")
testImplementation("org.testcontainers:junit-jupiter:$testContainersVersion")
testImplementation("org.testcontainers:testcontainers:$testContainersVersion")

testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("com.nhaarman:mockito-kotlin:1.6.0")
Expand Down
37 changes: 20 additions & 17 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,26 @@ version: "3"

services:

kafka:
image: wurstmeister/kafka:2.12-2.5.0
environment:
- KAFKA_ADVERTISED_LISTENERS=INSIDE://:9092,OUTSIDE://localhost:9094
- KAFKA_LISTENERS=INSIDE://:9092,OUTSIDE://:9094
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME=INSIDE
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_BROKER_ID=0
- KAFKA_AUTO_LEADER_REBALANCE_ENABLE=false
- KAFKA_CREATE_TOPICS=my.topic:3:1,my.topic.errors:1:1
zookeeper:
image: confluentinc/cp-zookeeper:7.6.1
hostname: zookeeper
ports:
- "9094:9094"
links:
- zookeeper
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

zookeeper:
image: wurstmeister/zookeeper:3.4.6
broker:
image: confluentinc/cp-kafka:7.6.1
hostname: broker
depends_on:
- zookeeper
ports:
- "2181"
- "9094:9094"
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://broker:29094,LISTENER_DOCKER_EXTERNAL://localhost:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
33 changes: 0 additions & 33 deletions src/main/kotlin/com/rogervinas/stream/shared/MyContainers.kt

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import com.nhaarman.mockito_kotlin.argumentCaptor
import com.nhaarman.mockito_kotlin.doThrow
import com.nhaarman.mockito_kotlin.timeout
import com.nhaarman.mockito_kotlin.verify
import com.rogervinas.stream.helper.DockerComposeContainerHelper
import com.rogervinas.stream.helper.KafkaConsumerHelper
import com.rogervinas.stream.helper.KafkaProducerHelper
import com.rogervinas.stream.domain.MyEvent
import com.rogervinas.stream.domain.MyEventConsumer
import com.rogervinas.stream.domain.MyEventProducer
Expand All @@ -14,32 +17,36 @@ import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.mockito.Mockito.reset
import org.skyscreamer.jsonassert.JSONAssert
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.beans.factory.annotation.Value
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment.NONE
import org.springframework.boot.test.mock.mockito.MockBean
import org.springframework.core.env.Environment
import org.springframework.test.context.ActiveProfiles
import org.testcontainers.junit.jupiter.Container
import org.testcontainers.junit.jupiter.Testcontainers
import java.time.Duration
import java.util.UUID
import java.util.function.Consumer

@SpringBootTest(webEnvironment = NONE)
@ActiveProfiles("docker-compose")
class MyApplicationShould {
@Testcontainers
@ActiveProfiles("test")
class MyApplicationIntegrationTest {

companion object {
private const val TOPIC = "my.topic"
private const val TOPIC_DLQ = "my.topic.errors"

private val TEN_SECONDS = Duration.ofSeconds(10)
private const val FIVE = 5
}

@Autowired
lateinit var env: Environment
@Container
val container = DockerComposeContainerHelper().createContainer()
}

@Autowired
@Qualifier("myStreamEventProducer") // Avoid SpringBootTest issue: expected single matching bean but found 2
Expand All @@ -48,22 +55,24 @@ class MyApplicationShould {
@MockBean
lateinit var eventConsumer: MyEventConsumer

lateinit var kafkaProducerHelper: MyKafkaProducerHelper
lateinit var kafkaConsumerHelper: MyKafkaConsumerHelper
lateinit var kafkaDLQConsumerHelper: MyKafkaConsumerHelper
@Value("\${spring.cloud.stream.kafka.binder.brokers}")
lateinit var kafkaBroker: String
lateinit var kafkaProducerHelper: KafkaProducerHelper
lateinit var kafkaConsumerHelper: KafkaConsumerHelper
lateinit var kafkaDLQConsumerHelper: KafkaConsumerHelper

@BeforeEach
fun setUp() {
val bootstrapServers = env.getProperty("spring.cloud.stream.kafka.binder.brokers")!!
kafkaConsumerHelper = MyKafkaConsumerHelper(bootstrapServers, TOPIC)
reset(eventConsumer)
kafkaConsumerHelper = KafkaConsumerHelper(kafkaBroker, TOPIC)
kafkaConsumerHelper.consumeAll()
kafkaDLQConsumerHelper = MyKafkaConsumerHelper(bootstrapServers, TOPIC_DLQ)
kafkaDLQConsumerHelper = KafkaConsumerHelper(kafkaBroker, TOPIC_DLQ)
kafkaDLQConsumerHelper.consumeAll()
kafkaProducerHelper = MyKafkaProducerHelper(bootstrapServers)
kafkaProducerHelper = KafkaProducerHelper(kafkaBroker)
}

@Test
fun `produce event`() {
fun `should produce event`() {
val text = "hello ${UUID.randomUUID()}"
eventProducer.produce(MyEvent(text))

Expand All @@ -76,7 +85,7 @@ class MyApplicationShould {
}

@Test
fun `consume event`() {
fun `should consume event`() {
val text = "hello ${UUID.randomUUID()}"
kafkaProducerHelper.send(TOPIC, "{\"number\":${text.length},\"string\":\"$text\"}")

Expand All @@ -87,7 +96,7 @@ class MyApplicationShould {
}

@Test
fun `retry consume event 5 times`() {
fun `should retry consume event 5 times`() {
doThrow(MyRetryableException("retry later!")).`when`(eventConsumer).consume(any())

val text = "hello ${UUID.randomUUID()}"
Expand All @@ -100,7 +109,7 @@ class MyApplicationShould {
}

@Test
fun `send to DLQ rejected messages`() {
fun `should send to DLQ rejected messages`() {
doThrow(MyRetryableException("retry later!")).`when`(eventConsumer).consume(any())

val text = "hello ${UUID.randomUUID()}"
Expand All @@ -114,7 +123,7 @@ class MyApplicationShould {

@ParameterizedTest
@ValueSource(strings = ["plain text", "{\"unknownField\":\"not expected\"}"])
fun `send to DLQ undeserializable messages`(body: String) {
fun `should send to DLQ undeserializable messages`(body: String) {
kafkaProducerHelper.send(TOPIC, body)

val errorRecords = kafkaDLQConsumerHelper.consumeAtLeast(1, TEN_SECONDS)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.rogervinas.stream.helper

import org.testcontainers.containers.ComposeContainer
import org.testcontainers.containers.wait.strategy.Wait.forListeningPort
import org.testcontainers.containers.wait.strategy.Wait.forLogMessage
import org.testcontainers.containers.wait.strategy.WaitAllStrategy
import org.testcontainers.containers.wait.strategy.WaitAllStrategy.Mode.WITH_INDIVIDUAL_TIMEOUTS_ONLY
import java.io.File

class DockerComposeContainerHelper {

companion object {
private const val BROKER = "broker"
private const val BROKER_PORT = 9094
private const val ZOOKEEPER = "zookeeper"
private const val ZOOKEEPER_PORT = 2181
}

fun createContainer(): ComposeContainer {
return ComposeContainer(File("docker-compose.yml"))
.withLocalCompose(true)
.withExposedService(
BROKER,
BROKER_PORT,
WaitAllStrategy(WITH_INDIVIDUAL_TIMEOUTS_ONLY)
.withStrategy(forListeningPort())
.withStrategy(forLogMessage(".*started.*", 1))
)
.withExposedService(
ZOOKEEPER,
ZOOKEEPER_PORT,
WaitAllStrategy(WITH_INDIVIDUAL_TIMEOUTS_ONLY)
.withStrategy(forListeningPort())
.withStrategy(forLogMessage(".*Started.*", 1))
)
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.rogervinas.stream
package com.rogervinas.stream.helper

import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.ConsumerConfig
Expand All @@ -7,10 +7,11 @@ import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.consumer.OffsetResetStrategy
import org.apache.kafka.common.serialization.StringDeserializer
import java.time.Duration
import java.util.Locale
import java.util.Properties
import java.util.UUID

class MyKafkaConsumerHelper(bootstrapServers: String, topic: String) {
class KafkaConsumerHelper(bootstrapServers: String, topic: String) {

private val consumer: Consumer<String, String>

Expand All @@ -24,7 +25,7 @@ class MyKafkaConsumerHelper(bootstrapServers: String, topic: String) {
}

fun consumeAll(): List<ConsumerRecord<String, String>> {
return consumeAtLeast(1, Duration.ofSeconds(1))
return consumeAtLeast(100, Duration.ofSeconds(5))
}

fun consumeAtLeast(numberOfRecords: Int, timeout: Duration): List<ConsumerRecord<String, String>> {
Expand All @@ -43,7 +44,9 @@ class MyKafkaConsumerHelper(bootstrapServers: String, topic: String) {
config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java.name)
config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java.name)
config.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.name.lowercase())
config.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
OffsetResetStrategy.EARLIEST.name.lowercase(Locale.getDefault())
)
return config
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.rogervinas.stream
package com.rogervinas.stream.helper

import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.Producer
Expand All @@ -7,7 +7,7 @@ import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer
import java.util.Properties

class MyKafkaProducerHelper(bootstrapServers: String) {
class KafkaProducerHelper(bootstrapServers: String) {

private val producer: Producer<String, String>

Expand Down

0 comments on commit 3b5c2ad

Please sign in to comment.