Skip to content

Commit

Permalink
Refactor gradle (#55)
Browse files Browse the repository at this point in the history
  • Loading branch information
rogervinas authored Jun 20, 2024
1 parent 9c48bea commit fb13cfa
Show file tree
Hide file tree
Showing 12 changed files with 74 additions and 62 deletions.
3 changes: 0 additions & 3 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,3 @@ charset=utf-8
insert_final_newline=true
trim_trailing_whitespace=true
max_line_length=120

[*.{kt,kts}]
disabled_rules=import-ordering
48 changes: 25 additions & 23 deletions build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,57 +1,59 @@
import org.gradle.api.tasks.testing.logging.TestExceptionFormat.FULL
import org.gradle.api.tasks.testing.logging.TestLogEvent.*
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
import org.gradle.api.tasks.testing.logging.TestLogEvent.FAILED
import org.gradle.api.tasks.testing.logging.TestLogEvent.PASSED
import org.gradle.api.tasks.testing.logging.TestLogEvent.SKIPPED

plugins {
id("org.springframework.boot") version "3.3.0"
id("io.spring.dependency-management") version "1.1.5"
kotlin("jvm") version "2.0.0"
kotlin("plugin.spring") version "2.0.0"
id("org.jlleitschuh.gradle.ktlint") version "12.1.1"
}

group = "com.rogervinas"
version = "0.0.1-SNAPSHOT"

repositories {
mavenCentral()
mavenCentral()
}

val springCloudVersion = "2023.0.2"
val testContainersVersion = "1.19.8"

java {
toolchain {
languageVersion = JavaLanguageVersion.of(21)
}
}

dependencies {
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka")
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka")

implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")

implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")

testImplementation("org.testcontainers:junit-jupiter:$testContainersVersion")
testImplementation("org.testcontainers:testcontainers:$testContainersVersion")

testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("com.nhaarman:mockito-kotlin:1.6.0")
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("com.nhaarman:mockito-kotlin:1.6.0")
testImplementation("org.awaitility:awaitility:4.2.1")
}

dependencyManagement {
imports {
mavenBom("org.springframework.cloud:spring-cloud-dependencies:$springCloudVersion")
}
imports {
mavenBom("org.springframework.cloud:spring-cloud-dependencies:$springCloudVersion")
}
}

tasks.withType<KotlinCompile> {
compilerOptions {
freeCompilerArgs = listOf("-Xjsr305=strict")
}
java {
toolchain {
languageVersion = JavaLanguageVersion.of(21)
}
}

kotlin {
compilerOptions {
freeCompilerArgs.addAll("-Xjsr305=strict")
}
}

tasks.withType<Test> {
Expand Down
2 changes: 1 addition & 1 deletion settings.gradle.kts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
rootProject.name = "spring-cloud-stream-step-by-step"
rootProject.name = "spring-cloud-stream-kafka-step-by-step"
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
package com.rogervinas.stream.domain

class MyRetryableException(message: String) : RuntimeException(message)
class MyRetryableException(message: String) : RuntimeException(message)
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ import reactor.core.publisher.Flux

@Configuration
class MyConfiguration {

@Bean
fun myEventConsumer() = object : MyEventConsumer {
override fun consume(event: MyEvent) {
println("Received ${event.text}")
fun myEventConsumer() =
object : MyEventConsumer {
override fun consume(event: MyEvent) {
println("Received ${event.text}")
}
}
}

@Bean("my-consumer")
fun myStreamEventConsumerFunction(consumer: MyEventConsumer): (MyEventPayload) -> Unit =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import com.rogervinas.stream.domain.MyEventConsumer
import com.rogervinas.stream.shared.MyEventPayload

class MyStreamEventConsumer(private val consumer: MyEventConsumer) : (MyEventPayload) -> Unit {

override fun invoke(payload: MyEventPayload) {
consumer.consume(fromPayload(payload))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ import reactor.core.publisher.Sinks
import reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST

class MyStreamEventProducer : () -> Flux<Message<MyEventPayload>>, MyEventProducer {

private val sink = Sinks.many().unicast().onBackpressureBuffer<Message<MyEventPayload>>()

override fun produce(event: MyEvent) {
val message = MessageBuilder
.withPayload(toPayload(event))
.setHeader(KafkaHeaders.KEY, toKey(event))
.build()
val message =
MessageBuilder
.withPayload(toPayload(event))
.setHeader(KafkaHeaders.KEY, toKey(event))
.build()
sink.emitNext(message, FAIL_FAST)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ package com.rogervinas.stream.shared

data class MyEventPayload(
val string: String,
val number: Int
val number: Int,
)
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package com.rogervinas.stream

import com.nhaarman.mockito_kotlin.*
import com.nhaarman.mockito_kotlin.any
import com.nhaarman.mockito_kotlin.argumentCaptor
import com.nhaarman.mockito_kotlin.doNothing
import com.nhaarman.mockito_kotlin.doThrow
import com.nhaarman.mockito_kotlin.timeout
import com.nhaarman.mockito_kotlin.verify
import com.rogervinas.stream.domain.MyEvent
import com.rogervinas.stream.domain.MyEventConsumer
import com.rogervinas.stream.domain.MyEventProducer
Expand All @@ -26,14 +31,13 @@ import org.springframework.boot.test.mock.mockito.MockReset
import org.springframework.test.context.ActiveProfiles
import org.testcontainers.junit.jupiter.Container
import org.testcontainers.junit.jupiter.Testcontainers
import java.util.*
import java.util.UUID
import java.util.function.Consumer

@SpringBootTest(webEnvironment = NONE)
@Testcontainers
@ActiveProfiles("test")
class MyApplicationIntegrationTest {

companion object {
private const val TOPIC = "my.topic"
private const val TOPIC_DLQ = "my.topic.errors"
Expand Down Expand Up @@ -74,10 +78,12 @@ class MyApplicationIntegrationTest {

val records = kafkaConsumerHelper.consumeAtLeast(1, TEN_SECONDS)

assertThat(records).singleElement().satisfies(Consumer { record ->
JSONAssert.assertEquals(record.value(), "{\"number\":${text.length},\"string\":\"$text\"}", true)
assertThat(record.key()).isEqualTo("key-${text.length}")
})
assertThat(records).singleElement().satisfies(
Consumer { record ->
JSONAssert.assertEquals(record.value(), "{\"number\":${text.length},\"string\":\"$text\"}", true)
assertThat(record.key()).isEqualTo("key-${text.length}")
},
)
}

@Test
Expand Down Expand Up @@ -116,9 +122,11 @@ class MyApplicationIntegrationTest {
kafkaProducerHelper.send(TOPIC, "{\"number\":${text.length},\"string\":\"$text\"}")

val errorRecords = kafkaDLQConsumerHelper.consumeAtLeast(1, TEN_SECONDS)
assertThat(errorRecords).singleElement().satisfies(Consumer { record ->
JSONAssert.assertEquals(record.value(), "{\"number\":${text.length},\"string\":\"$text\"}", true)
})
assertThat(errorRecords).singleElement().satisfies(
Consumer { record ->
JSONAssert.assertEquals(record.value(), "{\"number\":${text.length},\"string\":\"$text\"}", true)
},
)
}

@ParameterizedTest
Expand All @@ -127,8 +135,10 @@ class MyApplicationIntegrationTest {
kafkaProducerHelper.send(TOPIC, body)

val errorRecords = kafkaDLQConsumerHelper.consumeAtLeast(1, TEN_SECONDS)
assertThat(errorRecords).singleElement().satisfies(Consumer { record ->
assertThat(record.value()).isEqualTo(body)
})
assertThat(errorRecords).singleElement().satisfies(
Consumer { record ->
assertThat(record.value()).isEqualTo(body)
},
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import org.testcontainers.containers.wait.strategy.WaitAllStrategy.Mode.WITH_IND
import java.io.File

class DockerComposeContainerHelper {

companion object {
private const val BROKER = "broker"
private const val BROKER_PORT = 9094
Expand All @@ -24,14 +23,14 @@ class DockerComposeContainerHelper {
BROKER_PORT,
WaitAllStrategy(WITH_INDIVIDUAL_TIMEOUTS_ONLY)
.withStrategy(forListeningPort())
.withStrategy(forLogMessage(".*started.*", 1))
.withStrategy(forLogMessage(".*started.*", 1)),
)
.withExposedService(
ZOOKEEPER,
ZOOKEEPER_PORT,
WaitAllStrategy(WITH_INDIVIDUAL_TIMEOUTS_ONLY)
.withStrategy(forListeningPort())
.withStrategy(forLogMessage(".*Started.*", 1))
.withStrategy(forLogMessage(".*Started.*", 1)),
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import java.util.Properties
import java.util.UUID

class KafkaConsumerHelper(bootstrapServers: String, topic: String) {

private val consumer: Consumer<String, String>

companion object {
Expand All @@ -28,7 +27,10 @@ class KafkaConsumerHelper(bootstrapServers: String, topic: String) {
return consumeAtLeast(100, Duration.ofSeconds(5))
}

fun consumeAtLeast(numberOfRecords: Int, timeout: Duration): List<ConsumerRecord<String, String>> {
fun consumeAtLeast(
numberOfRecords: Int,
timeout: Duration,
): List<ConsumerRecord<String, String>> {
val records: MutableList<ConsumerRecord<String, String>> = ArrayList()
var millisLeft = timeout.toMillis()
do {
Expand All @@ -44,8 +46,9 @@ class KafkaConsumerHelper(bootstrapServers: String, topic: String) {
config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java.name)
config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java.name)
config.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
OffsetResetStrategy.EARLIEST.name.lowercase(Locale.getDefault())
config.setProperty(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
OffsetResetStrategy.EARLIEST.name.lowercase(Locale.getDefault()),
)
return config
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import org.apache.kafka.common.serialization.StringSerializer
import java.util.Properties

class KafkaProducerHelper(bootstrapServers: String) {

private val producer: Producer<String, String>

init {
Expand All @@ -20,7 +19,10 @@ class KafkaProducerHelper(bootstrapServers: String) {
}

@Throws(Exception::class)
fun send(topic: String?, body: String) {
fun send(
topic: String?,
body: String,
) {
producer.send(ProducerRecord(topic, body)).get()
producer.flush()
}
Expand Down

0 comments on commit fb13cfa

Please sign in to comment.