diff --git a/.github/workflows/gradle.yml b/.github/workflows/gradle.yml index 1d0ae22..69efc82 100644 --- a/.github/workflows/gradle.yml +++ b/.github/workflows/gradle.yml @@ -16,7 +16,8 @@ jobs: - name: Set up Java uses: actions/setup-java@v1 with: - java-version: 11 + java-version: '21' + distribution: 'temurin' - name: Gradle cache uses: actions/cache@v2 with: diff --git a/README.md b/README.md index 16454e3..7406410 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,8 @@ ![CI](https://github.com/rogervinas/spring-cloud-stream-kafka-step-by-step/actions/workflows/gradle.yml/badge.svg) -![Java](https://img.shields.io/badge/Java-11-blue?labelColor=black) -![Kotlin](https://img.shields.io/badge/Kotlin-1.6.10-blue?labelColor=black) -![SpringBoot](https://img.shields.io/badge/SpringBoot-2.6.2-blue?labelColor=black) -![SpringCloudStream](https://img.shields.io/badge/SpringCloudStream-3.2.1-blue?labelColor=black) +![Java](https://img.shields.io/badge/Java-21-blue?labelColor=black) +![Kotlin](https://img.shields.io/badge/Kotlin-1.9.20-blue?labelColor=black) +![SpringBoot](https://img.shields.io/badge/SpringBoot-3.1.5-blue?labelColor=black) +![SpringCloudStream](https://img.shields.io/badge/SpringCloudStream-4.0.4-blue?labelColor=black) # Spring Cloud Stream & Kafka binder step by step @@ -43,10 +43,10 @@ Our final goal is to produce messages to a Kafka topic. From the point of view of the application we want an interface `MyEventProducer` to produce events to a generic messaging system. These events will be of type `MyEvent`, just containing a `text` field to make it simpler: ```kotlin -class MyEvent(val text: String) +data class MyEvent(val text: String) -interface MyEventProducer { - fun produce(event: MyEvent) +interface MyEventProducer { + fun produce(event: MyEvent) } ``` @@ -73,7 +73,7 @@ spring: ### 2) We create an implementation of `MyEventProducer` as a `Supplier` of `Flux`, to fulfill the interfaces that both our application and Spring Cloud Stream are expecting: ```kotlin class MyStreamEventProducer : Supplier>, MyEventProducer { - val sink = Sinks.many().unicast().onBackpressureBuffer() + private val sink = Sinks.many().unicast().onBackpressureBuffer() override fun produce(event: MyEvent) { sink.emitNext(toPayload(event), FAIL_FAST) @@ -88,25 +88,30 @@ class MyStreamEventProducer : Supplier>, MyEventProducer { } } -class MyEventPayload @JsonCreator constructor( - @JsonProperty("string") val string: String, - @JsonProperty("number") val number: Int +data class MyEventPayload( + val string: String, + val number: Int ) ``` -* We use a DTO `MyEventPayload` to specify how do we want the payload to be serialized to JSON (using [Jackson](https://github.com/FasterXML/jackson) annotations). +* We use a DTO `MyEventPayload` to specify how do we want the payload to be serialized to JSON. In this case we don't need to but we could use [Jackson](https://github.com/FasterXML/jackson) annotations if we wanted to customize the JSON serialization. * We do a simple transformation between `MyEvent` and `MyEventPayload` just as an example. * Every time we emit a `MyEventPayload` through the `Flux`, Spring Cloud Stream will publish it to Kafka. -### 3) Finally, we create an instance of `MyStreamEventProducer` naming it `my-producer` to link it to the function definition: +### 3) Finally, we configure the beans needed to link `my-producer` function definition: ```kotlin @Configuration class MyConfiguration { + @Bean + fun myStreamEventProducer() = MyStreamEventProducer() + @Bean("my-producer") - fun myStreamEventProducer(): MyEventProducer { - return MyStreamEventProducer() - } + fun myStreamEventProducerFunction(producer: MyStreamEventProducer): () -> Flux> = + producer::get } ``` +* We create a `MyStreamEventProducer` that will be injected wherever a `MyEventProducer` is needed. +* We create a lambda returning a `Flux>` that will be linked to the `my-producer` function, implemented by calling `myStreamEventProducer.get()` method. +* We do not merge both beans in one to avoid issues with `KotlinLambdaToFunctionAutoConfiguration`. ### 4) For testing we start a Kafka container using [Testcontainers](https://www.testcontainers.org/): ```kotlin @@ -143,7 +148,7 @@ Our final goal is to consume messages from a Kafka topic. From the point of view of the application we want an interface `MyEventConsumer` to be called every time an event is consumed from a generic messaging system. These events will be of type `MyEvent` like in the producer example: ```kotlin -class MyEvent(val text: String) +data class MyEvent(val text: String) interface MyEventConsumer { fun consume(event: MyEvent) @@ -152,7 +157,7 @@ interface MyEventConsumer { Then we follow these steps: -### 1) We declare the binding `my-consumer` in application.yml: +### 1) We configure the binding `my-consumer` in application.yml declaring it as a function: ```yaml spring: cloud: @@ -171,10 +176,10 @@ spring: * We configure a `group` because we want the application to consume from Kafka identifiying itself as a consumer group so if there were to be more than one instance of the application every message will be delivered to only one of the instances. * As stated in [functional binding names](https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_functional_binding_names): `my-consumer` is the function name, `in` is for input bindings and `0` is the index we have to use if we have a single function. -### 2) We create the same class `MyStreamEventConsumer` implementing `Consumer` to fulfill the interface required by Spring Cloud Stream: +### 2) We create `MyStreamEventConsumer` to fulfill the interface required by Spring Cloud Stream: ```kotlin -class MyStreamEventConsumer(private val consumer: MyEventConsumer) : Consumer { - override fun accept(payload: MyEventPayload) { +class MyStreamEventConsumer(private val consumer: MyEventConsumer) : (MyEventPayload) -> Unit { + override fun invoke(payload: MyEventPayload) { consumer.consume(fromPayload(payload)) } @@ -183,28 +188,26 @@ class MyStreamEventConsumer(private val consumer: MyEventConsumer) : Consumer Unit = + MyStreamEventConsumer(consumer) } ``` +* We create a lambda receiving a `MyEventPayload` that will be linked to the `my-consumer` function, implemented by a `MyStreamEventConsumer`. * We create a simple implementation of `MyEventConsumer` that justs prints the event. ### 4) For testing we start a Kafka container using [Testcontainers](https://www.testcontainers.org/): @@ -242,14 +245,14 @@ When a message is sent to a topic, Kafka chooses randomly the destination partit This is important on the consumer side, because **chronological order of messages is only guaranteed within the same partition**, so if we need to consume some messages in the order they were produced, we should use the same key for all of them (i.e. for messages of a *user*, we use the *user* id as the message key). -To specify the message key in `MyStreamEventProducer` we can produce `Message` instead of `MyEventPayload` and inform the `KafkaHeaders.MESSAGE_KEY` header: +To specify the message key in `MyStreamEventProducer` we can produce `Message` instead of `MyEventPayload` and inform the `KafkaHeaders.KEY` header: ```kotlin class MyStreamEventProducer : Supplier>>, MyEventProducer { // ... override fun produce(event: MyEvent) { val message = MessageBuilder .withPayload(MyEventPayload(event.text, event.text.length)) - .setHeader(KafkaHeaders.MESSAGE_KEY, "key-${event.text.length}") + .setHeader(KafkaHeaders.KEY, "key-${event.text.length}") .build() sink.emitNext(message, FAIL_FAST) } diff --git a/build.gradle.kts b/build.gradle.kts index 1bdcba8..6e91f77 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -3,21 +3,22 @@ import org.gradle.api.tasks.testing.logging.TestLogEvent.* import org.jetbrains.kotlin.gradle.tasks.KotlinCompile plugins { - id("org.springframework.boot") version "2.6.2" - id("io.spring.dependency-management") version "1.0.11.RELEASE" - kotlin("jvm") version "1.6.10" - kotlin("plugin.spring") version "1.6.10" + id("org.springframework.boot") version "3.1.5" + id("io.spring.dependency-management") version "1.1.3" + kotlin("jvm") version "1.9.20" + kotlin("plugin.spring") version "1.9.20" } group = "com.rogervinas" version = "0.0.1-SNAPSHOT" -java.sourceCompatibility = JavaVersion.VERSION_11 +java.sourceCompatibility = JavaVersion.VERSION_21 +java.targetCompatibility = JavaVersion.VERSION_21 repositories { mavenCentral() } -extra["springCloudVersion"] = "2021.0.0" +extra["springCloudVersion"] = "2022.0.4" dependencies { implementation("org.springframework.boot:spring-boot-starter-web") @@ -43,7 +44,7 @@ dependencyManagement { tasks.withType { kotlinOptions { freeCompilerArgs = listOf("-Xjsr305=strict") - jvmTarget = "11" + jvmTarget = "21" } } @@ -55,5 +56,5 @@ tasks.withType { showExceptions = true showCauses = true showStackTraces = true - } + } } diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index e708b1c..033e24c 100644 Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 2e6e589..3fa8f86 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,7 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-7.3.3-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-8.4-bin.zip +networkTimeout=10000 +validateDistributionUrl=true zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/gradlew b/gradlew index 4f906e0..fcb6fca 100755 --- a/gradlew +++ b/gradlew @@ -1,7 +1,7 @@ -#!/usr/bin/env sh +#!/bin/sh # -# Copyright 2015 the original author or authors. +# Copyright © 2015-2021 the original authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -17,67 +17,98 @@ # ############################################################################## -## -## Gradle start up script for UN*X -## +# +# Gradle start up script for POSIX generated by Gradle. +# +# Important for running: +# +# (1) You need a POSIX-compliant shell to run this script. If your /bin/sh is +# noncompliant, but you have some other compliant shell such as ksh or +# bash, then to run this script, type that shell name before the whole +# command line, like: +# +# ksh Gradle +# +# Busybox and similar reduced shells will NOT work, because this script +# requires all of these POSIX shell features: +# * functions; +# * expansions «$var», «${var}», «${var:-default}», «${var+SET}», +# «${var#prefix}», «${var%suffix}», and «$( cmd )»; +# * compound commands having a testable exit status, especially «case»; +# * various built-in commands including «command», «set», and «ulimit». +# +# Important for patching: +# +# (2) This script targets any POSIX shell, so it avoids extensions provided +# by Bash, Ksh, etc; in particular arrays are avoided. +# +# The "traditional" practice of packing multiple parameters into a +# space-separated string is a well documented source of bugs and security +# problems, so this is (mostly) avoided, by progressively accumulating +# options in "$@", and eventually passing that to Java. +# +# Where the inherited environment variables (DEFAULT_JVM_OPTS, JAVA_OPTS, +# and GRADLE_OPTS) rely on word-splitting, this is performed explicitly; +# see the in-line comments for details. +# +# There are tweaks for specific operating systems such as AIX, CygWin, +# Darwin, MinGW, and NonStop. +# +# (3) This script is generated from the Groovy template +# https://github.com/gradle/gradle/blob/HEAD/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt +# within the Gradle project. +# +# You can find Gradle at https://github.com/gradle/gradle/. +# ############################################################################## # Attempt to set APP_HOME + # Resolve links: $0 may be a link -PRG="$0" -# Need this for relative symlinks. -while [ -h "$PRG" ] ; do - ls=`ls -ld "$PRG"` - link=`expr "$ls" : '.*-> \(.*\)$'` - if expr "$link" : '/.*' > /dev/null; then - PRG="$link" - else - PRG=`dirname "$PRG"`"/$link" - fi +app_path=$0 + +# Need this for daisy-chained symlinks. +while + APP_HOME=${app_path%"${app_path##*/}"} # leaves a trailing /; empty if no leading path + [ -h "$app_path" ] +do + ls=$( ls -ld "$app_path" ) + link=${ls#*' -> '} + case $link in #( + /*) app_path=$link ;; #( + *) app_path=$APP_HOME$link ;; + esac done -SAVED="`pwd`" -cd "`dirname \"$PRG\"`/" >/dev/null -APP_HOME="`pwd -P`" -cd "$SAVED" >/dev/null -APP_NAME="Gradle" -APP_BASE_NAME=`basename "$0"` - -# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. -DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' +# This is normally unused +# shellcheck disable=SC2034 +APP_BASE_NAME=${0##*/} +APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit # Use the maximum available, or set MAX_FD != -1 to use that value. -MAX_FD="maximum" +MAX_FD=maximum warn () { echo "$*" -} +} >&2 die () { echo echo "$*" echo exit 1 -} +} >&2 # OS specific support (must be 'true' or 'false'). cygwin=false msys=false darwin=false nonstop=false -case "`uname`" in - CYGWIN* ) - cygwin=true - ;; - Darwin* ) - darwin=true - ;; - MINGW* ) - msys=true - ;; - NONSTOP* ) - nonstop=true - ;; +case "$( uname )" in #( + CYGWIN* ) cygwin=true ;; #( + Darwin* ) darwin=true ;; #( + MSYS* | MINGW* ) msys=true ;; #( + NONSTOP* ) nonstop=true ;; esac CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar @@ -87,9 +118,9 @@ CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar if [ -n "$JAVA_HOME" ] ; then if [ -x "$JAVA_HOME/jre/sh/java" ] ; then # IBM's JDK on AIX uses strange locations for the executables - JAVACMD="$JAVA_HOME/jre/sh/java" + JAVACMD=$JAVA_HOME/jre/sh/java else - JAVACMD="$JAVA_HOME/bin/java" + JAVACMD=$JAVA_HOME/bin/java fi if [ ! -x "$JAVACMD" ] ; then die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME @@ -98,88 +129,120 @@ Please set the JAVA_HOME variable in your environment to match the location of your Java installation." fi else - JAVACMD="java" - which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + JAVACMD=java + if ! command -v java >/dev/null 2>&1 + then + die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. Please set the JAVA_HOME variable in your environment to match the location of your Java installation." + fi fi # Increase the maximum file descriptors if we can. -if [ "$cygwin" = "false" -a "$darwin" = "false" -a "$nonstop" = "false" ] ; then - MAX_FD_LIMIT=`ulimit -H -n` - if [ $? -eq 0 ] ; then - if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then - MAX_FD="$MAX_FD_LIMIT" - fi - ulimit -n $MAX_FD - if [ $? -ne 0 ] ; then - warn "Could not set maximum file descriptor limit: $MAX_FD" - fi - else - warn "Could not query maximum file descriptor limit: $MAX_FD_LIMIT" - fi +if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then + case $MAX_FD in #( + max*) + # In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked. + # shellcheck disable=SC3045 + MAX_FD=$( ulimit -H -n ) || + warn "Could not query maximum file descriptor limit" + esac + case $MAX_FD in #( + '' | soft) :;; #( + *) + # In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked. + # shellcheck disable=SC3045 + ulimit -n "$MAX_FD" || + warn "Could not set maximum file descriptor limit to $MAX_FD" + esac fi -# For Darwin, add options to specify how the application appears in the dock -if $darwin; then - GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\"" -fi +# Collect all arguments for the java command, stacking in reverse order: +# * args from the command line +# * the main class name +# * -classpath +# * -D...appname settings +# * --module-path (only if needed) +# * DEFAULT_JVM_OPTS, JAVA_OPTS, and GRADLE_OPTS environment variables. # For Cygwin or MSYS, switch paths to Windows format before running java -if [ "$cygwin" = "true" -o "$msys" = "true" ] ; then - APP_HOME=`cygpath --path --mixed "$APP_HOME"` - CLASSPATH=`cygpath --path --mixed "$CLASSPATH"` - - JAVACMD=`cygpath --unix "$JAVACMD"` - - # We build the pattern for arguments to be converted via cygpath - ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null` - SEP="" - for dir in $ROOTDIRSRAW ; do - ROOTDIRS="$ROOTDIRS$SEP$dir" - SEP="|" - done - OURCYGPATTERN="(^($ROOTDIRS))" - # Add a user-defined pattern to the cygpath arguments - if [ "$GRADLE_CYGPATTERN" != "" ] ; then - OURCYGPATTERN="$OURCYGPATTERN|($GRADLE_CYGPATTERN)" - fi +if "$cygwin" || "$msys" ; then + APP_HOME=$( cygpath --path --mixed "$APP_HOME" ) + CLASSPATH=$( cygpath --path --mixed "$CLASSPATH" ) + + JAVACMD=$( cygpath --unix "$JAVACMD" ) + # Now convert the arguments - kludge to limit ourselves to /bin/sh - i=0 - for arg in "$@" ; do - CHECK=`echo "$arg"|egrep -c "$OURCYGPATTERN" -` - CHECK2=`echo "$arg"|egrep -c "^-"` ### Determine if an option - - if [ $CHECK -ne 0 ] && [ $CHECK2 -eq 0 ] ; then ### Added a condition - eval `echo args$i`=`cygpath --path --ignore --mixed "$arg"` - else - eval `echo args$i`="\"$arg\"" + for arg do + if + case $arg in #( + -*) false ;; # don't mess with options #( + /?*) t=${arg#/} t=/${t%%/*} # looks like a POSIX filepath + [ -e "$t" ] ;; #( + *) false ;; + esac + then + arg=$( cygpath --path --ignore --mixed "$arg" ) fi - i=`expr $i + 1` + # Roll the args list around exactly as many times as the number of + # args, so each arg winds up back in the position where it started, but + # possibly modified. + # + # NB: a `for` loop captures its iteration list before it begins, so + # changing the positional parameters here affects neither the number of + # iterations, nor the values presented in `arg`. + shift # remove old arg + set -- "$@" "$arg" # push replacement arg done - case $i in - 0) set -- ;; - 1) set -- "$args0" ;; - 2) set -- "$args0" "$args1" ;; - 3) set -- "$args0" "$args1" "$args2" ;; - 4) set -- "$args0" "$args1" "$args2" "$args3" ;; - 5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;; - 6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;; - 7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;; - 8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;; - 9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;; - esac fi -# Escape application args -save () { - for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done - echo " " -} -APP_ARGS=`save "$@"` -# Collect all arguments for the java command, following the shell quoting and substitution rules -eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS" +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' + +# Collect all arguments for the java command; +# * $DEFAULT_JVM_OPTS, $JAVA_OPTS, and $GRADLE_OPTS can contain fragments of +# shell script including quotes and variable substitutions, so put them in +# double quotes to make sure that they get re-expanded; and +# * put everything else in single quotes, so that it's not re-expanded. + +set -- \ + "-Dorg.gradle.appname=$APP_BASE_NAME" \ + -classpath "$CLASSPATH" \ + org.gradle.wrapper.GradleWrapperMain \ + "$@" + +# Stop when "xargs" is not available. +if ! command -v xargs >/dev/null 2>&1 +then + die "xargs is not available" +fi + +# Use "xargs" to parse quoted args. +# +# With -n1 it outputs one arg per line, with the quotes and backslashes removed. +# +# In Bash we could simply go: +# +# readarray ARGS < <( xargs -n1 <<<"$var" ) && +# set -- "${ARGS[@]}" "$@" +# +# but POSIX shell has neither arrays nor command substitution, so instead we +# post-process each arg (as a line of input to sed) to backslash-escape any +# character that might be a shell metacharacter, then use eval to reverse +# that process (while maintaining the separation between arguments), and wrap +# the whole thing up as a single "set" statement. +# +# This will of course break if any of these variables contains a newline or +# an unmatched quote. +# + +eval "set -- $( + printf '%s\n' "$DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS" | + xargs -n1 | + sed ' s~[^-[:alnum:]+,./:=@_]~\\&~g; ' | + tr '\n' ' ' + )" '"$@"' exec "$JAVACMD" "$@" diff --git a/gradlew.bat b/gradlew.bat index 107acd3..6689b85 100644 --- a/gradlew.bat +++ b/gradlew.bat @@ -1,89 +1,92 @@ -@rem -@rem Copyright 2015 the original author or authors. -@rem -@rem Licensed under the Apache License, Version 2.0 (the "License"); -@rem you may not use this file except in compliance with the License. -@rem You may obtain a copy of the License at -@rem -@rem https://www.apache.org/licenses/LICENSE-2.0 -@rem -@rem Unless required by applicable law or agreed to in writing, software -@rem distributed under the License is distributed on an "AS IS" BASIS, -@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -@rem See the License for the specific language governing permissions and -@rem limitations under the License. -@rem - -@if "%DEBUG%" == "" @echo off -@rem ########################################################################## -@rem -@rem Gradle startup script for Windows -@rem -@rem ########################################################################## - -@rem Set local scope for the variables with windows NT shell -if "%OS%"=="Windows_NT" setlocal - -set DIRNAME=%~dp0 -if "%DIRNAME%" == "" set DIRNAME=. -set APP_BASE_NAME=%~n0 -set APP_HOME=%DIRNAME% - -@rem Resolve any "." and ".." in APP_HOME to make it shorter. -for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi - -@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. -set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m" - -@rem Find java.exe -if defined JAVA_HOME goto findJavaFromJavaHome - -set JAVA_EXE=java.exe -%JAVA_EXE% -version >NUL 2>&1 -if "%ERRORLEVEL%" == "0" goto execute - -echo. -echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. -echo. -echo Please set the JAVA_HOME variable in your environment to match the -echo location of your Java installation. - -goto fail - -:findJavaFromJavaHome -set JAVA_HOME=%JAVA_HOME:"=% -set JAVA_EXE=%JAVA_HOME%/bin/java.exe - -if exist "%JAVA_EXE%" goto execute - -echo. -echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% -echo. -echo Please set the JAVA_HOME variable in your environment to match the -echo location of your Java installation. - -goto fail - -:execute -@rem Setup the command line - -set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar - - -@rem Execute Gradle -"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %* - -:end -@rem End local scope for the variables with windows NT shell -if "%ERRORLEVEL%"=="0" goto mainEnd - -:fail -rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of -rem the _cmd.exe /c_ return code! -if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 -exit /b 1 - -:mainEnd -if "%OS%"=="Windows_NT" endlocal - -:omega +@rem +@rem Copyright 2015 the original author or authors. +@rem +@rem Licensed under the Apache License, Version 2.0 (the "License"); +@rem you may not use this file except in compliance with the License. +@rem You may obtain a copy of the License at +@rem +@rem https://www.apache.org/licenses/LICENSE-2.0 +@rem +@rem Unless required by applicable law or agreed to in writing, software +@rem distributed under the License is distributed on an "AS IS" BASIS, +@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +@rem See the License for the specific language governing permissions and +@rem limitations under the License. +@rem + +@if "%DEBUG%"=="" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables with windows NT shell +if "%OS%"=="Windows_NT" setlocal + +set DIRNAME=%~dp0 +if "%DIRNAME%"=="" set DIRNAME=. +@rem This is normally unused +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Resolve any "." and ".." in APP_HOME to make it shorter. +for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m" + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if %ERRORLEVEL% equ 0 goto execute + +echo. +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto execute + +echo. +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:execute +@rem Setup the command line + +set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + + +@rem Execute Gradle +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %* + +:end +@rem End local scope for the variables with windows NT shell +if %ERRORLEVEL% equ 0 goto mainEnd + +:fail +rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of +rem the _cmd.exe /c_ return code! +set EXIT_CODE=%ERRORLEVEL% +if %EXIT_CODE% equ 0 set EXIT_CODE=1 +if not ""=="%GRADLE_EXIT_CONSOLE%" exit %EXIT_CODE% +exit /b %EXIT_CODE% + +:mainEnd +if "%OS%"=="Windows_NT" endlocal + +:omega diff --git a/src/main/kotlin/com/rogervinas/stream/domain/MyEvent.kt b/src/main/kotlin/com/rogervinas/stream/domain/MyEvent.kt index 6c3e73e..f6feb45 100644 --- a/src/main/kotlin/com/rogervinas/stream/domain/MyEvent.kt +++ b/src/main/kotlin/com/rogervinas/stream/domain/MyEvent.kt @@ -1,3 +1,3 @@ package com.rogervinas.stream.domain -class MyEvent(val text: String) +data class MyEvent(val text: String) diff --git a/src/main/kotlin/com/rogervinas/stream/functional/MyConfiguration.kt b/src/main/kotlin/com/rogervinas/stream/functional/MyConfiguration.kt index 1d2f1ac..85778bf 100644 --- a/src/main/kotlin/com/rogervinas/stream/functional/MyConfiguration.kt +++ b/src/main/kotlin/com/rogervinas/stream/functional/MyConfiguration.kt @@ -2,29 +2,30 @@ package com.rogervinas.stream.functional import com.rogervinas.stream.domain.MyEvent import com.rogervinas.stream.domain.MyEventConsumer -import com.rogervinas.stream.domain.MyEventProducer +import com.rogervinas.stream.shared.MyEventPayload import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration +import org.springframework.messaging.Message +import reactor.core.publisher.Flux @Configuration class MyConfiguration { - @Bean("my-producer") - fun myStreamEventProducer(): MyEventProducer { - return MyStreamEventProducer() + @Bean + fun myEventConsumer() = object : MyEventConsumer { + override fun consume(event: MyEvent) { + println("Received ${event.text}") + } } @Bean("my-consumer") - fun myStreamEventConsumer(consumer: MyEventConsumer): MyStreamEventConsumer { - return MyStreamEventConsumer(consumer) - } + fun myStreamEventConsumerFunction(consumer: MyEventConsumer): (MyEventPayload) -> Unit = + MyStreamEventConsumer(consumer) @Bean - fun myEventConsumer(): MyEventConsumer { - return object : MyEventConsumer { - override fun consume(event: MyEvent) { - println("Received ${event.text}") - } - } - } + fun myStreamEventProducer() = MyStreamEventProducer() + + @Bean("my-producer") + fun myStreamEventProducerFunction(producer: MyStreamEventProducer): () -> Flux> = + producer::get } diff --git a/src/main/kotlin/com/rogervinas/stream/functional/MyStreamEventConsumer.kt b/src/main/kotlin/com/rogervinas/stream/functional/MyStreamEventConsumer.kt index 8952cd9..874802a 100644 --- a/src/main/kotlin/com/rogervinas/stream/functional/MyStreamEventConsumer.kt +++ b/src/main/kotlin/com/rogervinas/stream/functional/MyStreamEventConsumer.kt @@ -3,11 +3,10 @@ package com.rogervinas.stream.functional import com.rogervinas.stream.domain.MyEvent import com.rogervinas.stream.domain.MyEventConsumer import com.rogervinas.stream.shared.MyEventPayload -import java.util.function.Consumer -class MyStreamEventConsumer(private val consumer: MyEventConsumer) : Consumer { +class MyStreamEventConsumer(private val consumer: MyEventConsumer) : (MyEventPayload) -> Unit { - override fun accept(payload: MyEventPayload) { + override fun invoke(payload: MyEventPayload) { consumer.consume(fromPayload(payload)) } diff --git a/src/main/kotlin/com/rogervinas/stream/functional/MyStreamEventProducer.kt b/src/main/kotlin/com/rogervinas/stream/functional/MyStreamEventProducer.kt index 7417afc..ec5de3f 100644 --- a/src/main/kotlin/com/rogervinas/stream/functional/MyStreamEventProducer.kt +++ b/src/main/kotlin/com/rogervinas/stream/functional/MyStreamEventProducer.kt @@ -13,12 +13,12 @@ import java.util.function.Supplier class MyStreamEventProducer : Supplier>>, MyEventProducer { - val sink = Sinks.many().unicast().onBackpressureBuffer>() + private val sink = Sinks.many().unicast().onBackpressureBuffer>() override fun produce(event: MyEvent) { val message = MessageBuilder .withPayload(toPayload(event)) - .setHeader(KafkaHeaders.MESSAGE_KEY, toKey(event)) + .setHeader(KafkaHeaders.KEY, toKey(event)) .build() sink.emitNext(message, FAIL_FAST) } diff --git a/src/main/kotlin/com/rogervinas/stream/shared/MyContainers.kt b/src/main/kotlin/com/rogervinas/stream/shared/MyContainers.kt index 7a33949..8f270b6 100644 --- a/src/main/kotlin/com/rogervinas/stream/shared/MyContainers.kt +++ b/src/main/kotlin/com/rogervinas/stream/shared/MyContainers.kt @@ -1,43 +1,29 @@ package com.rogervinas.stream.shared +import jakarta.annotation.PostConstruct +import jakarta.annotation.PreDestroy import org.springframework.context.annotation.Profile import org.springframework.stereotype.Component -import org.testcontainers.containers.DockerComposeContainer -import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy +import org.testcontainers.containers.ComposeContainer import org.testcontainers.containers.wait.strategy.Wait.forLogMessage -import org.testcontainers.containers.wait.strategy.WaitAllStrategy -import org.testcontainers.containers.wait.strategy.WaitAllStrategy.Mode.WITH_INDIVIDUAL_TIMEOUTS_ONLY import java.io.File -import java.util.stream.Collectors -import javax.annotation.PostConstruct -import javax.annotation.PreDestroy @Component @Profile("docker-compose") class MyContainers { - private val KAFKA = "kafka" - private val KAFKA_PORT = 9094 + companion object { + private const val KAFKA = "kafka" + private const val KAFKA_PORT = 9094 - private val ZOOKEEPER = "zookeeper" - private val ZOOKEEPER_PORT = 2181 - - private val container = DockerComposeContainer(File("docker-compose.yml")) - .apply { withLocalCompose(true) } - .apply { - withExposedService(KAFKA, KAFKA_PORT, WaitAllStrategy(WITH_INDIVIDUAL_TIMEOUTS_ONLY) - .apply { withStrategy(forListeningPortFixDockerDesktop322()) } - .apply { withStrategy(forLogMessage(".*creating topics.*", 1)) } - ) - } - .apply { - withExposedService(ZOOKEEPER, ZOOKEEPER_PORT, WaitAllStrategy(WITH_INDIVIDUAL_TIMEOUTS_ONLY) - .apply { withStrategy(forListeningPortFixDockerDesktop322()) } - .apply { withStrategy(forLogMessage(".*binding to port.*", 1)) } - ) - } + private const val ZOOKEEPER = "zookeeper" + private const val ZOOKEEPER_PORT = 2181 + } - private fun forListeningPortFixDockerDesktop322() = HostPortWaitStrategyFixDockerDesktop322() + private val container = ComposeContainer(File("docker-compose.yml")) + .withLocalCompose(true) + .withExposedService(KAFKA, KAFKA_PORT, forLogMessage(".*creating topics.*", 1)) + .withExposedService(ZOOKEEPER, ZOOKEEPER_PORT, forLogMessage(".*binding to port.*", 1)) @PostConstruct fun start() = container.start() @@ -45,11 +31,3 @@ class MyContainers { @PreDestroy fun stop() = container.stop() } - -class HostPortWaitStrategyFixDockerDesktop322 : HostPortWaitStrategy() { - override fun getLivenessCheckPorts(): MutableSet { - return super.getLivenessCheckPorts().stream() - .filter { port -> port > 0 } - .collect(Collectors.toSet()) - } -} diff --git a/src/main/kotlin/com/rogervinas/stream/shared/MyEventPayload.kt b/src/main/kotlin/com/rogervinas/stream/shared/MyEventPayload.kt index 0c47977..a27972f 100644 --- a/src/main/kotlin/com/rogervinas/stream/shared/MyEventPayload.kt +++ b/src/main/kotlin/com/rogervinas/stream/shared/MyEventPayload.kt @@ -1,9 +1,6 @@ package com.rogervinas.stream.shared -import com.fasterxml.jackson.annotation.JsonCreator -import com.fasterxml.jackson.annotation.JsonProperty - -class MyEventPayload @JsonCreator constructor( - @JsonProperty("string") val string: String, - @JsonProperty("number") val number: Int +data class MyEventPayload( + val string: String, + val number: Int ) diff --git a/src/test/kotlin/com/rogervinas/stream/MyApplicationShould.kt b/src/test/kotlin/com/rogervinas/stream/MyApplicationShould.kt index addf41c..eef57fb 100644 --- a/src/test/kotlin/com/rogervinas/stream/MyApplicationShould.kt +++ b/src/test/kotlin/com/rogervinas/stream/MyApplicationShould.kt @@ -29,11 +29,13 @@ import java.util.function.Consumer @ActiveProfiles("docker-compose") class MyApplicationShould { - val TOPIC = "my.topic" - val TOPIC_DLQ = "my.topic.errors" + companion object { + private const val TOPIC = "my.topic" + private const val TOPIC_DLQ = "my.topic.errors" - val TEN_SECONDS = Duration.ofSeconds(10) - val FIVE = 5 + private val TEN_SECONDS = Duration.ofSeconds(10) + private const val FIVE = 5 + } @Autowired lateinit var env: Environment diff --git a/src/test/kotlin/com/rogervinas/stream/MyKafkaConsumerHelper.kt b/src/test/kotlin/com/rogervinas/stream/MyKafkaConsumerHelper.kt index 99edba6..0b5b136 100644 --- a/src/test/kotlin/com/rogervinas/stream/MyKafkaConsumerHelper.kt +++ b/src/test/kotlin/com/rogervinas/stream/MyKafkaConsumerHelper.kt @@ -43,7 +43,7 @@ class MyKafkaConsumerHelper(bootstrapServers: String, topic: String) { config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers) config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java.name) config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java.name) - config.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.name.toLowerCase()) + config.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.name.lowercase()) return config } } diff --git a/src/test/kotlin/com/rogervinas/stream/MyKafkaProducerHelper.kt b/src/test/kotlin/com/rogervinas/stream/MyKafkaProducerHelper.kt index ffa50bb..6baf6a7 100644 --- a/src/test/kotlin/com/rogervinas/stream/MyKafkaProducerHelper.kt +++ b/src/test/kotlin/com/rogervinas/stream/MyKafkaProducerHelper.kt @@ -5,7 +5,7 @@ import org.apache.kafka.clients.producer.Producer import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.serialization.StringSerializer -import java.util.* +import java.util.Properties class MyKafkaProducerHelper(bootstrapServers: String) {