From d21d376601b6bd2d86fb38e082c2ff16b86382eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roger=20Vi=C3=B1as=20Alcon?= Date: Wed, 19 Jun 2024 18:05:04 +0200 Subject: [PATCH] Use confluent containers (#54) * Use confluent containers * Fix README * Fix consumeAll * Try DirtiesContext * Reset mock --- README.md | 23 +++++----- build.gradle.kts | 4 +- docker-compose.yml | 37 ++++++++------- .../rogervinas/stream/shared/MyContainers.kt | 33 -------------- ...uld.kt => MyApplicationIntegrationTest.kt} | 45 +++++++++++-------- .../helper/DockerComposeContainerHelper.kt | 37 +++++++++++++++ .../KafkaConsumerHelper.kt} | 11 +++-- .../KafkaProducerHelper.kt} | 4 +- 8 files changed, 107 insertions(+), 87 deletions(-) delete mode 100644 src/main/kotlin/com/rogervinas/stream/shared/MyContainers.kt rename src/test/kotlin/com/rogervinas/stream/{MyApplicationShould.kt => MyApplicationIntegrationTest.kt} (75%) create mode 100644 src/test/kotlin/com/rogervinas/stream/helper/DockerComposeContainerHelper.kt rename src/test/kotlin/com/rogervinas/stream/{MyKafkaConsumerHelper.kt => helper/KafkaConsumerHelper.kt} (83%) rename src/test/kotlin/com/rogervinas/stream/{MyKafkaProducerHelper.kt => helper/KafkaProducerHelper.kt} (90%) diff --git a/README.md b/README.md index 217bc2d..2e4a9e2 100644 --- a/README.md +++ b/README.md @@ -113,14 +113,16 @@ class MyConfiguration { ### 4) For testing we start a Kafka container using [Testcontainers](https://www.testcontainers.org/): ```kotlin -@SpringBootTest -class MyApplicationShould { +@SpringBootTest(webEnvironment = NONE) +@Testcontainers +@ActiveProfiles("test") +class MyApplicationIntegrationTest { @Autowired // We inject MyEventProducer (it should be a MyStreamEventProducer) @Qualifier("myStreamEventProducer") // Avoid SpringBootTest issue: expected single matching bean but found 2 lateinit var eventProducer: MyEventProducer @Test - fun `produce event`() { + fun `should produce event`() { // We produce an event using MyEventProducer val text = "hello ${UUID.randomUUID()}" eventProducer.produce(MyEvent(text)) @@ -139,7 +141,7 @@ class MyApplicationShould { } } ``` -* Check the complete test in [MyApplicationShould.kt](src/test/kotlin/com/rogervinas/stream/MyApplicationShould.kt). +* Check the complete test in [MyApplicationIntegrationTest.kt](src/test/kotlin/com/rogervinas/stream/MyApplicationIntegrationTest.kt). ## Consumer with functional programming model @@ -211,8 +213,10 @@ class MyConfiguration { ### 4) For testing we start a Kafka container using [Testcontainers](https://www.testcontainers.org/): ```kotlin -@SpringBootTest -class MyApplicationShould { +@SpringBootTest(webEnvironment = NONE) +@Testcontainers +@ActiveProfiles("test") +class MyApplicationIntegrationTest { @MockBean // We mock MyEventConsumer lateinit var eventConsumer: MyEventConsumer @@ -232,7 +236,7 @@ class MyApplicationShould { } } ``` -* Check the complete test in [MyApplicationShould.kt](src/test/kotlin/com/rogervinas/stream/MyApplicationShould.kt). +* Check the complete test in [MyApplicationIntegrationTest.kt](src/test/kotlin/com/rogervinas/stream/MyApplicationIntegrationTest.kt). ## Extras @@ -436,11 +440,6 @@ docker-compose up -d docker-compose down ``` -Run with testcontainers: -```shell -SPRING_PROFILES_ACTIVE=docker-compose ./gradlew bootRun -``` - Then you can use [kcat](https://github.com/edenhill/kcat) to produce/consume to/from Kafka: ```shell # consume diff --git a/build.gradle.kts b/build.gradle.kts index 1420f46..e8548ae 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -19,6 +19,7 @@ repositories { } val springCloudVersion = "2023.0.2" +val testContainersVersion = "1.19.8" dependencies { implementation("org.springframework.boot:spring-boot-starter-web") @@ -29,7 +30,8 @@ dependencies { implementation("org.jetbrains.kotlin:kotlin-reflect") implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8") - implementation("org.testcontainers:testcontainers:1.19.8") + testImplementation("org.testcontainers:junit-jupiter:$testContainersVersion") + testImplementation("org.testcontainers:testcontainers:$testContainersVersion") testImplementation("org.springframework.boot:spring-boot-starter-test") testImplementation("com.nhaarman:mockito-kotlin:1.6.0") diff --git a/docker-compose.yml b/docker-compose.yml index 5221294..37c7346 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,23 +2,26 @@ version: "3" services: - kafka: - image: wurstmeister/kafka:2.12-2.5.0 - environment: - - KAFKA_ADVERTISED_LISTENERS=INSIDE://:9092,OUTSIDE://localhost:9094 - - KAFKA_LISTENERS=INSIDE://:9092,OUTSIDE://:9094 - - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT - - KAFKA_INTER_BROKER_LISTENER_NAME=INSIDE - - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 - - KAFKA_BROKER_ID=0 - - KAFKA_AUTO_LEADER_REBALANCE_ENABLE=false - - KAFKA_CREATE_TOPICS=my.topic:3:1,my.topic.errors:1:1 + zookeeper: + image: confluentinc/cp-zookeeper:7.6.1 + hostname: zookeeper ports: - - "9094:9094" - links: - - zookeeper + - "2181:2181" + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 - zookeeper: - image: wurstmeister/zookeeper:3.4.6 + broker: + image: confluentinc/cp-kafka:7.6.1 + hostname: broker + depends_on: + - zookeeper ports: - - "2181" + - "9094:9094" + environment: + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://broker:29094,LISTENER_DOCKER_EXTERNAL://localhost:9094 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL + KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 diff --git a/src/main/kotlin/com/rogervinas/stream/shared/MyContainers.kt b/src/main/kotlin/com/rogervinas/stream/shared/MyContainers.kt deleted file mode 100644 index 8f270b6..0000000 --- a/src/main/kotlin/com/rogervinas/stream/shared/MyContainers.kt +++ /dev/null @@ -1,33 +0,0 @@ -package com.rogervinas.stream.shared - -import jakarta.annotation.PostConstruct -import jakarta.annotation.PreDestroy -import org.springframework.context.annotation.Profile -import org.springframework.stereotype.Component -import org.testcontainers.containers.ComposeContainer -import org.testcontainers.containers.wait.strategy.Wait.forLogMessage -import java.io.File - -@Component -@Profile("docker-compose") -class MyContainers { - - companion object { - private const val KAFKA = "kafka" - private const val KAFKA_PORT = 9094 - - private const val ZOOKEEPER = "zookeeper" - private const val ZOOKEEPER_PORT = 2181 - } - - private val container = ComposeContainer(File("docker-compose.yml")) - .withLocalCompose(true) - .withExposedService(KAFKA, KAFKA_PORT, forLogMessage(".*creating topics.*", 1)) - .withExposedService(ZOOKEEPER, ZOOKEEPER_PORT, forLogMessage(".*binding to port.*", 1)) - - @PostConstruct - fun start() = container.start() - - @PreDestroy - fun stop() = container.stop() -} diff --git a/src/test/kotlin/com/rogervinas/stream/MyApplicationShould.kt b/src/test/kotlin/com/rogervinas/stream/MyApplicationIntegrationTest.kt similarity index 75% rename from src/test/kotlin/com/rogervinas/stream/MyApplicationShould.kt rename to src/test/kotlin/com/rogervinas/stream/MyApplicationIntegrationTest.kt index 26ba5e7..d16cefc 100644 --- a/src/test/kotlin/com/rogervinas/stream/MyApplicationShould.kt +++ b/src/test/kotlin/com/rogervinas/stream/MyApplicationIntegrationTest.kt @@ -5,6 +5,9 @@ import com.nhaarman.mockito_kotlin.argumentCaptor import com.nhaarman.mockito_kotlin.doThrow import com.nhaarman.mockito_kotlin.timeout import com.nhaarman.mockito_kotlin.verify +import com.rogervinas.stream.helper.DockerComposeContainerHelper +import com.rogervinas.stream.helper.KafkaConsumerHelper +import com.rogervinas.stream.helper.KafkaProducerHelper import com.rogervinas.stream.domain.MyEvent import com.rogervinas.stream.domain.MyEventConsumer import com.rogervinas.stream.domain.MyEventProducer @@ -14,21 +17,25 @@ import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource +import org.mockito.Mockito.reset import org.skyscreamer.jsonassert.JSONAssert import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Qualifier +import org.springframework.beans.factory.annotation.Value import org.springframework.boot.test.context.SpringBootTest import org.springframework.boot.test.context.SpringBootTest.WebEnvironment.NONE import org.springframework.boot.test.mock.mockito.MockBean -import org.springframework.core.env.Environment import org.springframework.test.context.ActiveProfiles +import org.testcontainers.junit.jupiter.Container +import org.testcontainers.junit.jupiter.Testcontainers import java.time.Duration import java.util.UUID import java.util.function.Consumer @SpringBootTest(webEnvironment = NONE) -@ActiveProfiles("docker-compose") -class MyApplicationShould { +@Testcontainers +@ActiveProfiles("test") +class MyApplicationIntegrationTest { companion object { private const val TOPIC = "my.topic" @@ -36,10 +43,10 @@ class MyApplicationShould { private val TEN_SECONDS = Duration.ofSeconds(10) private const val FIVE = 5 - } - @Autowired - lateinit var env: Environment + @Container + val container = DockerComposeContainerHelper().createContainer() + } @Autowired @Qualifier("myStreamEventProducer") // Avoid SpringBootTest issue: expected single matching bean but found 2 @@ -48,22 +55,24 @@ class MyApplicationShould { @MockBean lateinit var eventConsumer: MyEventConsumer - lateinit var kafkaProducerHelper: MyKafkaProducerHelper - lateinit var kafkaConsumerHelper: MyKafkaConsumerHelper - lateinit var kafkaDLQConsumerHelper: MyKafkaConsumerHelper + @Value("\${spring.cloud.stream.kafka.binder.brokers}") + lateinit var kafkaBroker: String + lateinit var kafkaProducerHelper: KafkaProducerHelper + lateinit var kafkaConsumerHelper: KafkaConsumerHelper + lateinit var kafkaDLQConsumerHelper: KafkaConsumerHelper @BeforeEach fun setUp() { - val bootstrapServers = env.getProperty("spring.cloud.stream.kafka.binder.brokers")!! - kafkaConsumerHelper = MyKafkaConsumerHelper(bootstrapServers, TOPIC) + reset(eventConsumer) + kafkaConsumerHelper = KafkaConsumerHelper(kafkaBroker, TOPIC) kafkaConsumerHelper.consumeAll() - kafkaDLQConsumerHelper = MyKafkaConsumerHelper(bootstrapServers, TOPIC_DLQ) + kafkaDLQConsumerHelper = KafkaConsumerHelper(kafkaBroker, TOPIC_DLQ) kafkaDLQConsumerHelper.consumeAll() - kafkaProducerHelper = MyKafkaProducerHelper(bootstrapServers) + kafkaProducerHelper = KafkaProducerHelper(kafkaBroker) } @Test - fun `produce event`() { + fun `should produce event`() { val text = "hello ${UUID.randomUUID()}" eventProducer.produce(MyEvent(text)) @@ -76,7 +85,7 @@ class MyApplicationShould { } @Test - fun `consume event`() { + fun `should consume event`() { val text = "hello ${UUID.randomUUID()}" kafkaProducerHelper.send(TOPIC, "{\"number\":${text.length},\"string\":\"$text\"}") @@ -87,7 +96,7 @@ class MyApplicationShould { } @Test - fun `retry consume event 5 times`() { + fun `should retry consume event 5 times`() { doThrow(MyRetryableException("retry later!")).`when`(eventConsumer).consume(any()) val text = "hello ${UUID.randomUUID()}" @@ -100,7 +109,7 @@ class MyApplicationShould { } @Test - fun `send to DLQ rejected messages`() { + fun `should send to DLQ rejected messages`() { doThrow(MyRetryableException("retry later!")).`when`(eventConsumer).consume(any()) val text = "hello ${UUID.randomUUID()}" @@ -114,7 +123,7 @@ class MyApplicationShould { @ParameterizedTest @ValueSource(strings = ["plain text", "{\"unknownField\":\"not expected\"}"]) - fun `send to DLQ undeserializable messages`(body: String) { + fun `should send to DLQ undeserializable messages`(body: String) { kafkaProducerHelper.send(TOPIC, body) val errorRecords = kafkaDLQConsumerHelper.consumeAtLeast(1, TEN_SECONDS) diff --git a/src/test/kotlin/com/rogervinas/stream/helper/DockerComposeContainerHelper.kt b/src/test/kotlin/com/rogervinas/stream/helper/DockerComposeContainerHelper.kt new file mode 100644 index 0000000..a7a002c --- /dev/null +++ b/src/test/kotlin/com/rogervinas/stream/helper/DockerComposeContainerHelper.kt @@ -0,0 +1,37 @@ +package com.rogervinas.stream.helper + +import org.testcontainers.containers.ComposeContainer +import org.testcontainers.containers.wait.strategy.Wait.forListeningPort +import org.testcontainers.containers.wait.strategy.Wait.forLogMessage +import org.testcontainers.containers.wait.strategy.WaitAllStrategy +import org.testcontainers.containers.wait.strategy.WaitAllStrategy.Mode.WITH_INDIVIDUAL_TIMEOUTS_ONLY +import java.io.File + +class DockerComposeContainerHelper { + + companion object { + private const val BROKER = "broker" + private const val BROKER_PORT = 9094 + private const val ZOOKEEPER = "zookeeper" + private const val ZOOKEEPER_PORT = 2181 + } + + fun createContainer(): ComposeContainer { + return ComposeContainer(File("docker-compose.yml")) + .withLocalCompose(true) + .withExposedService( + BROKER, + BROKER_PORT, + WaitAllStrategy(WITH_INDIVIDUAL_TIMEOUTS_ONLY) + .withStrategy(forListeningPort()) + .withStrategy(forLogMessage(".*started.*", 1)) + ) + .withExposedService( + ZOOKEEPER, + ZOOKEEPER_PORT, + WaitAllStrategy(WITH_INDIVIDUAL_TIMEOUTS_ONLY) + .withStrategy(forListeningPort()) + .withStrategy(forLogMessage(".*Started.*", 1)) + ) + } +} diff --git a/src/test/kotlin/com/rogervinas/stream/MyKafkaConsumerHelper.kt b/src/test/kotlin/com/rogervinas/stream/helper/KafkaConsumerHelper.kt similarity index 83% rename from src/test/kotlin/com/rogervinas/stream/MyKafkaConsumerHelper.kt rename to src/test/kotlin/com/rogervinas/stream/helper/KafkaConsumerHelper.kt index 0b5b136..ae1100e 100644 --- a/src/test/kotlin/com/rogervinas/stream/MyKafkaConsumerHelper.kt +++ b/src/test/kotlin/com/rogervinas/stream/helper/KafkaConsumerHelper.kt @@ -1,4 +1,4 @@ -package com.rogervinas.stream +package com.rogervinas.stream.helper import org.apache.kafka.clients.consumer.Consumer import org.apache.kafka.clients.consumer.ConsumerConfig @@ -7,10 +7,11 @@ import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.clients.consumer.OffsetResetStrategy import org.apache.kafka.common.serialization.StringDeserializer import java.time.Duration +import java.util.Locale import java.util.Properties import java.util.UUID -class MyKafkaConsumerHelper(bootstrapServers: String, topic: String) { +class KafkaConsumerHelper(bootstrapServers: String, topic: String) { private val consumer: Consumer @@ -24,7 +25,7 @@ class MyKafkaConsumerHelper(bootstrapServers: String, topic: String) { } fun consumeAll(): List> { - return consumeAtLeast(1, Duration.ofSeconds(1)) + return consumeAtLeast(100, Duration.ofSeconds(5)) } fun consumeAtLeast(numberOfRecords: Int, timeout: Duration): List> { @@ -43,7 +44,9 @@ class MyKafkaConsumerHelper(bootstrapServers: String, topic: String) { config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers) config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java.name) config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java.name) - config.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.name.lowercase()) + config.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, + OffsetResetStrategy.EARLIEST.name.lowercase(Locale.getDefault()) + ) return config } } diff --git a/src/test/kotlin/com/rogervinas/stream/MyKafkaProducerHelper.kt b/src/test/kotlin/com/rogervinas/stream/helper/KafkaProducerHelper.kt similarity index 90% rename from src/test/kotlin/com/rogervinas/stream/MyKafkaProducerHelper.kt rename to src/test/kotlin/com/rogervinas/stream/helper/KafkaProducerHelper.kt index 6baf6a7..bbacc3e 100644 --- a/src/test/kotlin/com/rogervinas/stream/MyKafkaProducerHelper.kt +++ b/src/test/kotlin/com/rogervinas/stream/helper/KafkaProducerHelper.kt @@ -1,4 +1,4 @@ -package com.rogervinas.stream +package com.rogervinas.stream.helper import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.producer.Producer @@ -7,7 +7,7 @@ import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.serialization.StringSerializer import java.util.Properties -class MyKafkaProducerHelper(bootstrapServers: String) { +class KafkaProducerHelper(bootstrapServers: String) { private val producer: Producer