From fb13cfad1558ea383850c9ed9eae3cd233bdfa17 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roger=20Vi=C3=B1as=20Alcon?= Date: Thu, 20 Jun 2024 13:11:50 +0200 Subject: [PATCH] Refactor gradle (#55) --- .editorconfig | 3 -- build.gradle.kts | 48 ++++++++++--------- settings.gradle.kts | 2 +- .../stream/domain/MyRetryableException.kt | 2 +- .../stream/functional/MyConfiguration.kt | 10 ++-- .../functional/MyStreamEventConsumer.kt | 1 - .../functional/MyStreamEventProducer.kt | 10 ++-- .../stream/shared/MyEventPayload.kt | 2 +- .../stream/MyApplicationIntegrationTest.kt | 36 +++++++++----- .../helper/DockerComposeContainerHelper.kt | 5 +- .../stream/helper/KafkaConsumerHelper.kt | 11 +++-- .../stream/helper/KafkaProducerHelper.kt | 6 ++- 12 files changed, 74 insertions(+), 62 deletions(-) diff --git a/.editorconfig b/.editorconfig index 3cc9bcb..8efe39a 100644 --- a/.editorconfig +++ b/.editorconfig @@ -5,6 +5,3 @@ charset=utf-8 insert_final_newline=true trim_trailing_whitespace=true max_line_length=120 - -[*.{kt,kts}] -disabled_rules=import-ordering \ No newline at end of file diff --git a/build.gradle.kts b/build.gradle.kts index 87671a6..fc384cb 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -1,57 +1,59 @@ import org.gradle.api.tasks.testing.logging.TestExceptionFormat.FULL -import org.gradle.api.tasks.testing.logging.TestLogEvent.* -import org.jetbrains.kotlin.gradle.tasks.KotlinCompile +import org.gradle.api.tasks.testing.logging.TestLogEvent.FAILED +import org.gradle.api.tasks.testing.logging.TestLogEvent.PASSED +import org.gradle.api.tasks.testing.logging.TestLogEvent.SKIPPED plugins { id("org.springframework.boot") version "3.3.0" id("io.spring.dependency-management") version "1.1.5" kotlin("jvm") version "2.0.0" kotlin("plugin.spring") version "2.0.0" + id("org.jlleitschuh.gradle.ktlint") version "12.1.1" } group = "com.rogervinas" version = "0.0.1-SNAPSHOT" repositories { - mavenCentral() + mavenCentral() } val springCloudVersion = "2023.0.2" val testContainersVersion = "1.19.8" -java { - toolchain { - languageVersion = JavaLanguageVersion.of(21) - } -} - dependencies { - implementation("org.springframework.boot:spring-boot-starter-web") - implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka") + implementation("org.springframework.boot:spring-boot-starter-web") + implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka") - implementation("com.fasterxml.jackson.module:jackson-module-kotlin") + implementation("com.fasterxml.jackson.module:jackson-module-kotlin") - implementation("org.jetbrains.kotlin:kotlin-reflect") - implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8") + implementation("org.jetbrains.kotlin:kotlin-reflect") + implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8") testImplementation("org.testcontainers:junit-jupiter:$testContainersVersion") testImplementation("org.testcontainers:testcontainers:$testContainersVersion") - testImplementation("org.springframework.boot:spring-boot-starter-test") - testImplementation("com.nhaarman:mockito-kotlin:1.6.0") + testImplementation("org.springframework.boot:spring-boot-starter-test") + testImplementation("com.nhaarman:mockito-kotlin:1.6.0") testImplementation("org.awaitility:awaitility:4.2.1") } dependencyManagement { - imports { - mavenBom("org.springframework.cloud:spring-cloud-dependencies:$springCloudVersion") - } + imports { + mavenBom("org.springframework.cloud:spring-cloud-dependencies:$springCloudVersion") + } } -tasks.withType { - compilerOptions { - freeCompilerArgs = listOf("-Xjsr305=strict") - } +java { + toolchain { + languageVersion = JavaLanguageVersion.of(21) + } +} + +kotlin { + compilerOptions { + freeCompilerArgs.addAll("-Xjsr305=strict") + } } tasks.withType { diff --git a/settings.gradle.kts b/settings.gradle.kts index e855b22..28b0b9c 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -1 +1 @@ -rootProject.name = "spring-cloud-stream-step-by-step" +rootProject.name = "spring-cloud-stream-kafka-step-by-step" diff --git a/src/main/kotlin/com/rogervinas/stream/domain/MyRetryableException.kt b/src/main/kotlin/com/rogervinas/stream/domain/MyRetryableException.kt index 3ac592e..16c4d0e 100644 --- a/src/main/kotlin/com/rogervinas/stream/domain/MyRetryableException.kt +++ b/src/main/kotlin/com/rogervinas/stream/domain/MyRetryableException.kt @@ -1,3 +1,3 @@ package com.rogervinas.stream.domain -class MyRetryableException(message: String) : RuntimeException(message) \ No newline at end of file +class MyRetryableException(message: String) : RuntimeException(message) diff --git a/src/main/kotlin/com/rogervinas/stream/functional/MyConfiguration.kt b/src/main/kotlin/com/rogervinas/stream/functional/MyConfiguration.kt index a5d9d2b..2f159e2 100644 --- a/src/main/kotlin/com/rogervinas/stream/functional/MyConfiguration.kt +++ b/src/main/kotlin/com/rogervinas/stream/functional/MyConfiguration.kt @@ -10,13 +10,13 @@ import reactor.core.publisher.Flux @Configuration class MyConfiguration { - @Bean - fun myEventConsumer() = object : MyEventConsumer { - override fun consume(event: MyEvent) { - println("Received ${event.text}") + fun myEventConsumer() = + object : MyEventConsumer { + override fun consume(event: MyEvent) { + println("Received ${event.text}") + } } - } @Bean("my-consumer") fun myStreamEventConsumerFunction(consumer: MyEventConsumer): (MyEventPayload) -> Unit = diff --git a/src/main/kotlin/com/rogervinas/stream/functional/MyStreamEventConsumer.kt b/src/main/kotlin/com/rogervinas/stream/functional/MyStreamEventConsumer.kt index 874802a..8766dd5 100644 --- a/src/main/kotlin/com/rogervinas/stream/functional/MyStreamEventConsumer.kt +++ b/src/main/kotlin/com/rogervinas/stream/functional/MyStreamEventConsumer.kt @@ -5,7 +5,6 @@ import com.rogervinas.stream.domain.MyEventConsumer import com.rogervinas.stream.shared.MyEventPayload class MyStreamEventConsumer(private val consumer: MyEventConsumer) : (MyEventPayload) -> Unit { - override fun invoke(payload: MyEventPayload) { consumer.consume(fromPayload(payload)) } diff --git a/src/main/kotlin/com/rogervinas/stream/functional/MyStreamEventProducer.kt b/src/main/kotlin/com/rogervinas/stream/functional/MyStreamEventProducer.kt index 12d69d0..5b0d2ba 100644 --- a/src/main/kotlin/com/rogervinas/stream/functional/MyStreamEventProducer.kt +++ b/src/main/kotlin/com/rogervinas/stream/functional/MyStreamEventProducer.kt @@ -11,14 +11,14 @@ import reactor.core.publisher.Sinks import reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST class MyStreamEventProducer : () -> Flux>, MyEventProducer { - private val sink = Sinks.many().unicast().onBackpressureBuffer>() override fun produce(event: MyEvent) { - val message = MessageBuilder - .withPayload(toPayload(event)) - .setHeader(KafkaHeaders.KEY, toKey(event)) - .build() + val message = + MessageBuilder + .withPayload(toPayload(event)) + .setHeader(KafkaHeaders.KEY, toKey(event)) + .build() sink.emitNext(message, FAIL_FAST) } diff --git a/src/main/kotlin/com/rogervinas/stream/shared/MyEventPayload.kt b/src/main/kotlin/com/rogervinas/stream/shared/MyEventPayload.kt index a27972f..ac6ce61 100644 --- a/src/main/kotlin/com/rogervinas/stream/shared/MyEventPayload.kt +++ b/src/main/kotlin/com/rogervinas/stream/shared/MyEventPayload.kt @@ -2,5 +2,5 @@ package com.rogervinas.stream.shared data class MyEventPayload( val string: String, - val number: Int + val number: Int, ) diff --git a/src/test/kotlin/com/rogervinas/stream/MyApplicationIntegrationTest.kt b/src/test/kotlin/com/rogervinas/stream/MyApplicationIntegrationTest.kt index 29ebc8a..9cc201d 100644 --- a/src/test/kotlin/com/rogervinas/stream/MyApplicationIntegrationTest.kt +++ b/src/test/kotlin/com/rogervinas/stream/MyApplicationIntegrationTest.kt @@ -1,6 +1,11 @@ package com.rogervinas.stream -import com.nhaarman.mockito_kotlin.* +import com.nhaarman.mockito_kotlin.any +import com.nhaarman.mockito_kotlin.argumentCaptor +import com.nhaarman.mockito_kotlin.doNothing +import com.nhaarman.mockito_kotlin.doThrow +import com.nhaarman.mockito_kotlin.timeout +import com.nhaarman.mockito_kotlin.verify import com.rogervinas.stream.domain.MyEvent import com.rogervinas.stream.domain.MyEventConsumer import com.rogervinas.stream.domain.MyEventProducer @@ -26,14 +31,13 @@ import org.springframework.boot.test.mock.mockito.MockReset import org.springframework.test.context.ActiveProfiles import org.testcontainers.junit.jupiter.Container import org.testcontainers.junit.jupiter.Testcontainers -import java.util.* +import java.util.UUID import java.util.function.Consumer @SpringBootTest(webEnvironment = NONE) @Testcontainers @ActiveProfiles("test") class MyApplicationIntegrationTest { - companion object { private const val TOPIC = "my.topic" private const val TOPIC_DLQ = "my.topic.errors" @@ -74,10 +78,12 @@ class MyApplicationIntegrationTest { val records = kafkaConsumerHelper.consumeAtLeast(1, TEN_SECONDS) - assertThat(records).singleElement().satisfies(Consumer { record -> - JSONAssert.assertEquals(record.value(), "{\"number\":${text.length},\"string\":\"$text\"}", true) - assertThat(record.key()).isEqualTo("key-${text.length}") - }) + assertThat(records).singleElement().satisfies( + Consumer { record -> + JSONAssert.assertEquals(record.value(), "{\"number\":${text.length},\"string\":\"$text\"}", true) + assertThat(record.key()).isEqualTo("key-${text.length}") + }, + ) } @Test @@ -116,9 +122,11 @@ class MyApplicationIntegrationTest { kafkaProducerHelper.send(TOPIC, "{\"number\":${text.length},\"string\":\"$text\"}") val errorRecords = kafkaDLQConsumerHelper.consumeAtLeast(1, TEN_SECONDS) - assertThat(errorRecords).singleElement().satisfies(Consumer { record -> - JSONAssert.assertEquals(record.value(), "{\"number\":${text.length},\"string\":\"$text\"}", true) - }) + assertThat(errorRecords).singleElement().satisfies( + Consumer { record -> + JSONAssert.assertEquals(record.value(), "{\"number\":${text.length},\"string\":\"$text\"}", true) + }, + ) } @ParameterizedTest @@ -127,8 +135,10 @@ class MyApplicationIntegrationTest { kafkaProducerHelper.send(TOPIC, body) val errorRecords = kafkaDLQConsumerHelper.consumeAtLeast(1, TEN_SECONDS) - assertThat(errorRecords).singleElement().satisfies(Consumer { record -> - assertThat(record.value()).isEqualTo(body) - }) + assertThat(errorRecords).singleElement().satisfies( + Consumer { record -> + assertThat(record.value()).isEqualTo(body) + }, + ) } } diff --git a/src/test/kotlin/com/rogervinas/stream/helper/DockerComposeContainerHelper.kt b/src/test/kotlin/com/rogervinas/stream/helper/DockerComposeContainerHelper.kt index a7a002c..9b69df9 100644 --- a/src/test/kotlin/com/rogervinas/stream/helper/DockerComposeContainerHelper.kt +++ b/src/test/kotlin/com/rogervinas/stream/helper/DockerComposeContainerHelper.kt @@ -8,7 +8,6 @@ import org.testcontainers.containers.wait.strategy.WaitAllStrategy.Mode.WITH_IND import java.io.File class DockerComposeContainerHelper { - companion object { private const val BROKER = "broker" private const val BROKER_PORT = 9094 @@ -24,14 +23,14 @@ class DockerComposeContainerHelper { BROKER_PORT, WaitAllStrategy(WITH_INDIVIDUAL_TIMEOUTS_ONLY) .withStrategy(forListeningPort()) - .withStrategy(forLogMessage(".*started.*", 1)) + .withStrategy(forLogMessage(".*started.*", 1)), ) .withExposedService( ZOOKEEPER, ZOOKEEPER_PORT, WaitAllStrategy(WITH_INDIVIDUAL_TIMEOUTS_ONLY) .withStrategy(forListeningPort()) - .withStrategy(forLogMessage(".*Started.*", 1)) + .withStrategy(forLogMessage(".*Started.*", 1)), ) } } diff --git a/src/test/kotlin/com/rogervinas/stream/helper/KafkaConsumerHelper.kt b/src/test/kotlin/com/rogervinas/stream/helper/KafkaConsumerHelper.kt index ae1100e..eba88fb 100644 --- a/src/test/kotlin/com/rogervinas/stream/helper/KafkaConsumerHelper.kt +++ b/src/test/kotlin/com/rogervinas/stream/helper/KafkaConsumerHelper.kt @@ -12,7 +12,6 @@ import java.util.Properties import java.util.UUID class KafkaConsumerHelper(bootstrapServers: String, topic: String) { - private val consumer: Consumer companion object { @@ -28,7 +27,10 @@ class KafkaConsumerHelper(bootstrapServers: String, topic: String) { return consumeAtLeast(100, Duration.ofSeconds(5)) } - fun consumeAtLeast(numberOfRecords: Int, timeout: Duration): List> { + fun consumeAtLeast( + numberOfRecords: Int, + timeout: Duration, + ): List> { val records: MutableList> = ArrayList() var millisLeft = timeout.toMillis() do { @@ -44,8 +46,9 @@ class KafkaConsumerHelper(bootstrapServers: String, topic: String) { config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers) config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java.name) config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java.name) - config.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, - OffsetResetStrategy.EARLIEST.name.lowercase(Locale.getDefault()) + config.setProperty( + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, + OffsetResetStrategy.EARLIEST.name.lowercase(Locale.getDefault()), ) return config } diff --git a/src/test/kotlin/com/rogervinas/stream/helper/KafkaProducerHelper.kt b/src/test/kotlin/com/rogervinas/stream/helper/KafkaProducerHelper.kt index bbacc3e..a4c7df6 100644 --- a/src/test/kotlin/com/rogervinas/stream/helper/KafkaProducerHelper.kt +++ b/src/test/kotlin/com/rogervinas/stream/helper/KafkaProducerHelper.kt @@ -8,7 +8,6 @@ import org.apache.kafka.common.serialization.StringSerializer import java.util.Properties class KafkaProducerHelper(bootstrapServers: String) { - private val producer: Producer init { @@ -20,7 +19,10 @@ class KafkaProducerHelper(bootstrapServers: String) { } @Throws(Exception::class) - fun send(topic: String?, body: String) { + fun send( + topic: String?, + body: String, + ) { producer.send(ProducerRecord(topic, body)).get() producer.flush() }