Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use confluent containers #54

Merged
merged 5 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 11 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,16 @@ class MyConfiguration {

### 4) For testing we start a Kafka container using [Testcontainers](https://www.testcontainers.org/):
```kotlin
@SpringBootTest
class MyApplicationShould {
@SpringBootTest(webEnvironment = NONE)
@Testcontainers
@ActiveProfiles("test")
class MyApplicationIntegrationTest {
@Autowired // We inject MyEventProducer (it should be a MyStreamEventProducer)
@Qualifier("myStreamEventProducer") // Avoid SpringBootTest issue: expected single matching bean but found 2
lateinit var eventProducer: MyEventProducer

@Test
fun `produce event`() {
fun `should produce event`() {
// We produce an event using MyEventProducer
val text = "hello ${UUID.randomUUID()}"
eventProducer.produce(MyEvent(text))
Expand All @@ -139,7 +141,7 @@ class MyApplicationShould {
}
}
```
* Check the complete test in [MyApplicationShould.kt](src/test/kotlin/com/rogervinas/stream/MyApplicationShould.kt).
* Check the complete test in [MyApplicationIntegrationTest.kt](src/test/kotlin/com/rogervinas/stream/MyApplicationIntegrationTest.kt).

## Consumer with functional programming model

Expand Down Expand Up @@ -211,8 +213,10 @@ class MyConfiguration {

### 4) For testing we start a Kafka container using [Testcontainers](https://www.testcontainers.org/):
```kotlin
@SpringBootTest
class MyApplicationShould {
@SpringBootTest(webEnvironment = NONE)
@Testcontainers
@ActiveProfiles("test")
class MyApplicationIntegrationTest {
@MockBean // We mock MyEventConsumer
lateinit var eventConsumer: MyEventConsumer

Expand All @@ -232,7 +236,7 @@ class MyApplicationShould {
}
}
```
* Check the complete test in [MyApplicationShould.kt](src/test/kotlin/com/rogervinas/stream/MyApplicationShould.kt).
* Check the complete test in [MyApplicationIntegrationTest.kt](src/test/kotlin/com/rogervinas/stream/MyApplicationIntegrationTest.kt).

## Extras

Expand Down Expand Up @@ -436,11 +440,6 @@ docker-compose up -d
docker-compose down
```

Run with testcontainers:
```shell
SPRING_PROFILES_ACTIVE=docker-compose ./gradlew bootRun
```

Then you can use [kcat](https://github.com/edenhill/kcat) to produce/consume to/from Kafka:
```shell
# consume
Expand Down
4 changes: 3 additions & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ repositories {
}

val springCloudVersion = "2023.0.2"
val testContainersVersion = "1.19.8"

dependencies {
implementation("org.springframework.boot:spring-boot-starter-web")
Expand All @@ -29,7 +30,8 @@ dependencies {
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")

implementation("org.testcontainers:testcontainers:1.19.8")
testImplementation("org.testcontainers:junit-jupiter:$testContainersVersion")
testImplementation("org.testcontainers:testcontainers:$testContainersVersion")

testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("com.nhaarman:mockito-kotlin:1.6.0")
Expand Down
37 changes: 20 additions & 17 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,26 @@ version: "3"

services:

kafka:
image: wurstmeister/kafka:2.12-2.5.0
environment:
- KAFKA_ADVERTISED_LISTENERS=INSIDE://:9092,OUTSIDE://localhost:9094
- KAFKA_LISTENERS=INSIDE://:9092,OUTSIDE://:9094
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME=INSIDE
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_BROKER_ID=0
- KAFKA_AUTO_LEADER_REBALANCE_ENABLE=false
- KAFKA_CREATE_TOPICS=my.topic:3:1,my.topic.errors:1:1
zookeeper:
image: confluentinc/cp-zookeeper:7.6.1
hostname: zookeeper
ports:
- "9094:9094"
links:
- zookeeper
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

zookeeper:
image: wurstmeister/zookeeper:3.4.6
broker:
image: confluentinc/cp-kafka:7.6.1
hostname: broker
depends_on:
- zookeeper
ports:
- "2181"
- "9094:9094"
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://broker:29094,LISTENER_DOCKER_EXTERNAL://localhost:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
33 changes: 0 additions & 33 deletions src/main/kotlin/com/rogervinas/stream/shared/MyContainers.kt

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import com.nhaarman.mockito_kotlin.argumentCaptor
import com.nhaarman.mockito_kotlin.doThrow
import com.nhaarman.mockito_kotlin.timeout
import com.nhaarman.mockito_kotlin.verify
import com.rogervinas.stream.helper.DockerComposeContainerHelper
import com.rogervinas.stream.helper.KafkaConsumerHelper
import com.rogervinas.stream.helper.KafkaProducerHelper
import com.rogervinas.stream.domain.MyEvent
import com.rogervinas.stream.domain.MyEventConsumer
import com.rogervinas.stream.domain.MyEventProducer
Expand All @@ -14,32 +17,36 @@ import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.mockito.Mockito.reset
import org.skyscreamer.jsonassert.JSONAssert
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.beans.factory.annotation.Value
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment.NONE
import org.springframework.boot.test.mock.mockito.MockBean
import org.springframework.core.env.Environment
import org.springframework.test.context.ActiveProfiles
import org.testcontainers.junit.jupiter.Container
import org.testcontainers.junit.jupiter.Testcontainers
import java.time.Duration
import java.util.UUID
import java.util.function.Consumer

@SpringBootTest(webEnvironment = NONE)
@ActiveProfiles("docker-compose")
class MyApplicationShould {
@Testcontainers
@ActiveProfiles("test")
class MyApplicationIntegrationTest {

companion object {
private const val TOPIC = "my.topic"
private const val TOPIC_DLQ = "my.topic.errors"

private val TEN_SECONDS = Duration.ofSeconds(10)
private const val FIVE = 5
}

@Autowired
lateinit var env: Environment
@Container
val container = DockerComposeContainerHelper().createContainer()
}

@Autowired
@Qualifier("myStreamEventProducer") // Avoid SpringBootTest issue: expected single matching bean but found 2
Expand All @@ -48,22 +55,24 @@ class MyApplicationShould {
@MockBean
lateinit var eventConsumer: MyEventConsumer

lateinit var kafkaProducerHelper: MyKafkaProducerHelper
lateinit var kafkaConsumerHelper: MyKafkaConsumerHelper
lateinit var kafkaDLQConsumerHelper: MyKafkaConsumerHelper
@Value("\${spring.cloud.stream.kafka.binder.brokers}")
lateinit var kafkaBroker: String
lateinit var kafkaProducerHelper: KafkaProducerHelper
lateinit var kafkaConsumerHelper: KafkaConsumerHelper
lateinit var kafkaDLQConsumerHelper: KafkaConsumerHelper

@BeforeEach
fun setUp() {
val bootstrapServers = env.getProperty("spring.cloud.stream.kafka.binder.brokers")!!
kafkaConsumerHelper = MyKafkaConsumerHelper(bootstrapServers, TOPIC)
reset(eventConsumer)
kafkaConsumerHelper = KafkaConsumerHelper(kafkaBroker, TOPIC)
kafkaConsumerHelper.consumeAll()
kafkaDLQConsumerHelper = MyKafkaConsumerHelper(bootstrapServers, TOPIC_DLQ)
kafkaDLQConsumerHelper = KafkaConsumerHelper(kafkaBroker, TOPIC_DLQ)
kafkaDLQConsumerHelper.consumeAll()
kafkaProducerHelper = MyKafkaProducerHelper(bootstrapServers)
kafkaProducerHelper = KafkaProducerHelper(kafkaBroker)
}

@Test
fun `produce event`() {
fun `should produce event`() {
val text = "hello ${UUID.randomUUID()}"
eventProducer.produce(MyEvent(text))

Expand All @@ -76,7 +85,7 @@ class MyApplicationShould {
}

@Test
fun `consume event`() {
fun `should consume event`() {
val text = "hello ${UUID.randomUUID()}"
kafkaProducerHelper.send(TOPIC, "{\"number\":${text.length},\"string\":\"$text\"}")

Expand All @@ -87,7 +96,7 @@ class MyApplicationShould {
}

@Test
fun `retry consume event 5 times`() {
fun `should retry consume event 5 times`() {
doThrow(MyRetryableException("retry later!")).`when`(eventConsumer).consume(any())

val text = "hello ${UUID.randomUUID()}"
Expand All @@ -100,7 +109,7 @@ class MyApplicationShould {
}

@Test
fun `send to DLQ rejected messages`() {
fun `should send to DLQ rejected messages`() {
doThrow(MyRetryableException("retry later!")).`when`(eventConsumer).consume(any())

val text = "hello ${UUID.randomUUID()}"
Expand All @@ -114,7 +123,7 @@ class MyApplicationShould {

@ParameterizedTest
@ValueSource(strings = ["plain text", "{\"unknownField\":\"not expected\"}"])
fun `send to DLQ undeserializable messages`(body: String) {
fun `should send to DLQ undeserializable messages`(body: String) {
kafkaProducerHelper.send(TOPIC, body)

val errorRecords = kafkaDLQConsumerHelper.consumeAtLeast(1, TEN_SECONDS)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.rogervinas.stream.helper

import org.testcontainers.containers.ComposeContainer
import org.testcontainers.containers.wait.strategy.Wait.forListeningPort
import org.testcontainers.containers.wait.strategy.Wait.forLogMessage
import org.testcontainers.containers.wait.strategy.WaitAllStrategy
import org.testcontainers.containers.wait.strategy.WaitAllStrategy.Mode.WITH_INDIVIDUAL_TIMEOUTS_ONLY
import java.io.File

class DockerComposeContainerHelper {

companion object {
private const val BROKER = "broker"
private const val BROKER_PORT = 9094
private const val ZOOKEEPER = "zookeeper"
private const val ZOOKEEPER_PORT = 2181
}

fun createContainer(): ComposeContainer {
return ComposeContainer(File("docker-compose.yml"))
.withLocalCompose(true)
.withExposedService(
BROKER,
BROKER_PORT,
WaitAllStrategy(WITH_INDIVIDUAL_TIMEOUTS_ONLY)
.withStrategy(forListeningPort())
.withStrategy(forLogMessage(".*started.*", 1))
)
.withExposedService(
ZOOKEEPER,
ZOOKEEPER_PORT,
WaitAllStrategy(WITH_INDIVIDUAL_TIMEOUTS_ONLY)
.withStrategy(forListeningPort())
.withStrategy(forLogMessage(".*Started.*", 1))
)
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.rogervinas.stream
package com.rogervinas.stream.helper

import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.ConsumerConfig
Expand All @@ -7,10 +7,11 @@ import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.consumer.OffsetResetStrategy
import org.apache.kafka.common.serialization.StringDeserializer
import java.time.Duration
import java.util.Locale
import java.util.Properties
import java.util.UUID

class MyKafkaConsumerHelper(bootstrapServers: String, topic: String) {
class KafkaConsumerHelper(bootstrapServers: String, topic: String) {

private val consumer: Consumer<String, String>

Expand All @@ -24,7 +25,7 @@ class MyKafkaConsumerHelper(bootstrapServers: String, topic: String) {
}

fun consumeAll(): List<ConsumerRecord<String, String>> {
return consumeAtLeast(1, Duration.ofSeconds(1))
return consumeAtLeast(100, Duration.ofSeconds(5))
}

fun consumeAtLeast(numberOfRecords: Int, timeout: Duration): List<ConsumerRecord<String, String>> {
Expand All @@ -43,7 +44,9 @@ class MyKafkaConsumerHelper(bootstrapServers: String, topic: String) {
config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java.name)
config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java.name)
config.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.name.lowercase())
config.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
OffsetResetStrategy.EARLIEST.name.lowercase(Locale.getDefault())
)
return config
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.rogervinas.stream
package com.rogervinas.stream.helper

import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.Producer
Expand All @@ -7,7 +7,7 @@ import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer
import java.util.Properties

class MyKafkaProducerHelper(bootstrapServers: String) {
class KafkaProducerHelper(bootstrapServers: String) {

private val producer: Producer<String, String>

Expand Down