diff --git a/.circleci/kafka.yml b/.circleci/kafka.yml
new file mode 100644
index 0000000000..0c02b59097
--- /dev/null
+++ b/.circleci/kafka.yml
@@ -0,0 +1,24 @@
+version: '2.0'
+
+services:
+ kafka:
+ image: confluentinc/cp-kafka:7.2.6
+ depends_on:
+ - zookeeper
+ ports:
+ - 9092:9092 # used when running clients from containerized Kalix proxy
+ - 9093:9093 # used when running non-docker clients from host
+ environment:
+ KAFKA_BROKER_ID: 1
+ KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+ # advertised listener running on port 9092 must be accessible to kalix-proxy container running for integration tests
+ # therefore we need to use host.testcontainers.internal (testcontainers's bridge address) to access it through the host machine
+ # advertised listener running on port 9093 is to be used by non-docker clients running on host machine
+ KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL_DOCKER://host.testcontainers.internal:9092,HOST://localhost:9093
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL_DOCKER:PLAINTEXT,HOST:PLAINTEXT
+ KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+ zookeeper:
+ image: zookeeper:3.9
+ ports:
+ - "2181:2181"
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 0700bae274..59e250f67f 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -541,7 +541,7 @@ jobs:
include:
- { sample: java-protobuf-shopping-cart-quickstart, it: true }
- { sample: java-protobuf-customer-registry-quickstart, it: true }
- - { sample: java-protobuf-customer-registry-kafka-quickstart, it: true }
+ - { sample: java-protobuf-customer-registry-kafka-quickstart, pre_cmd: 'docker-compose -f ../../.circleci/kafka.yml up -d', it: true }
- { sample: java-protobuf-customer-registry-views-quickstart, it: true }
- { sample: java-spring-shopping-cart-quickstart, it: true }
@@ -652,7 +652,7 @@ jobs:
fi
if [ true == '${{matrix.it}}' ]; then
${PRE_CMD}
- mvn -Dkalix-sdk.version=${SDK_VERSION} verify -Pit
+ KALIX_TESTKIT_DEBUG=true mvn -Dkalix-sdk.version=${SDK_VERSION} verify -Pit
fi
- name: ${{ matrix.sample }} rm & test-compile
diff --git a/docs/src/modules/java-protobuf/pages/actions-publishing-subscribing.adoc b/docs/src/modules/java-protobuf/pages/actions-publishing-subscribing.adoc
index 90f422c039..d0f0b83418 100644
--- a/docs/src/modules/java-protobuf/pages/actions-publishing-subscribing.adoc
+++ b/docs/src/modules/java-protobuf/pages/actions-publishing-subscribing.adoc
@@ -639,7 +639,7 @@ NOTE: Despite the example, you are neither forced to clear all topics nor to do
=== External Broker
-To run an integration test against a real instance of Google PubSub (or its Emulator), use the TestKit settings to override the default eventing support, as shown below:
+To run an integration test against a real instance of Google PubSub (or its Emulator) or Kafka, use the TestKit settings to override the default eventing support, as shown below:
[.tabset]
Java::
diff --git a/docs/src/modules/java/pages/actions-publishing-subscribing.adoc b/docs/src/modules/java/pages/actions-publishing-subscribing.adoc
index 4d524080f1..ccb6063b3e 100644
--- a/docs/src/modules/java/pages/actions-publishing-subscribing.adoc
+++ b/docs/src/modules/java/pages/actions-publishing-subscribing.adoc
@@ -253,7 +253,7 @@ NOTE: Despite the example, you are neither forced to clear all topics nor to do
=== External Broker
-To run an integration test against a real instance of Google PubSub (or its Emulator), use the TestKit settings to override the default eventing support, as shown below:
+To run an integration test against a real instance of Google PubSub (or its Emulator) or Kafka, use the TestKit settings to override the default eventing support, as shown below:
[source,java]
.src/it/java/com/example/TestkitConfig.java
diff --git a/samples/java-protobuf-customer-registry-kafka-quickstart/docker-compose.yml b/samples/java-protobuf-customer-registry-kafka-quickstart/docker-compose.yml
index 3bf568a4f2..409314abbf 100644
--- a/samples/java-protobuf-customer-registry-kafka-quickstart/docker-compose.yml
+++ b/samples/java-protobuf-customer-registry-kafka-quickstart/docker-compose.yml
@@ -13,31 +13,30 @@ services:
-Dkalix.proxy.eventing.support=kafka
USER_FUNCTION_HOST: ${USER_FUNCTION_HOST:-host.docker.internal}
USER_FUNCTION_PORT: ${USER_FUNCTION_PORT:-8080}
- BROKER_CONFIG_FILE: /conf/my-local.kafka.properties
- volumes:
- - .:/conf
+ # configuring kafka broker used for eventing
+ BROKER_SERVERS: kafka:29092
kafka:
- image: confluentinc/cp-kafka:7.1.0
- # image not available for arm64 so this is needed to run it on Apple M1
- platform: linux/amd64
+ image: confluentinc/cp-kafka:7.2.6
depends_on:
- zookeeper
ports:
- - 9092:9092
+ - 9092:9092 # used when running clients from containerized Kalix proxy
+ - 9093:9093 # used when running non-docker clients from host
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
- # advertised listener running on port 9092 must be accessible to kalix-proxy container run by kalix:runAll
+ # advertised listener running on port 9092 must be accessible to kalix-proxy container running for integration tests
# therefore we need to use host.docker.internal (docker's bridge address) to access it through the host machine
- KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://host.docker.internal:9092
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
+ # advertised listener running on port 9093 is to be used by non-docker clients running on host machine
+ KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL_DOCKER://host.docker.internal:9092,HOST://localhost:9093
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL_DOCKER:PLAINTEXT,HOST:PLAINTEXT
+ KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
volumes:
- /var/run/docker.sock:/var/run/docker.sock
zookeeper:
- image: wurstmeister/zookeeper
+ image: zookeeper:3.9
ports:
- "2181:2181"
diff --git a/samples/java-protobuf-customer-registry-kafka-quickstart/my-dev.kafka.properties b/samples/java-protobuf-customer-registry-kafka-quickstart/my-dev.kafka.properties
deleted file mode 100644
index 58c130af63..0000000000
--- a/samples/java-protobuf-customer-registry-kafka-quickstart/my-dev.kafka.properties
+++ /dev/null
@@ -1,6 +0,0 @@
-bootstrap.servers=[your-url]:9092
-security.protocol=SASL_SSL
-sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='[your-user]' password='[your-pass]';
-sasl.mechanism=PLAIN
-# Required for correctness in Apache Kafka clients prior to 2.6
-client.dns.lookup=use_all_dns_ips
\ No newline at end of file
diff --git a/samples/java-protobuf-customer-registry-kafka-quickstart/my-local.kafka.properties b/samples/java-protobuf-customer-registry-kafka-quickstart/my-local.kafka.properties
deleted file mode 100644
index 052232937d..0000000000
--- a/samples/java-protobuf-customer-registry-kafka-quickstart/my-local.kafka.properties
+++ /dev/null
@@ -1 +0,0 @@
-bootstrap.servers=kafka:9092
diff --git a/samples/java-protobuf-customer-registry-kafka-quickstart/pom.xml b/samples/java-protobuf-customer-registry-kafka-quickstart/pom.xml
index 0838d2ff7b..eab113450e 100644
--- a/samples/java-protobuf-customer-registry-kafka-quickstart/pom.xml
+++ b/samples/java-protobuf-customer-registry-kafka-quickstart/pom.xml
@@ -289,5 +289,11 @@
4.13.2
test
+
+ org.apache.kafka
+ kafka-clients
+ 3.5.1
+ test
+
diff --git a/samples/java-protobuf-customer-registry-kafka-quickstart/src/it/java/customer/api/CustomerActionWithKafkaIntegrationTest.java b/samples/java-protobuf-customer-registry-kafka-quickstart/src/it/java/customer/api/CustomerActionWithKafkaIntegrationTest.java
new file mode 100644
index 0000000000..9153c0f5fd
--- /dev/null
+++ b/samples/java-protobuf-customer-registry-kafka-quickstart/src/it/java/customer/api/CustomerActionWithKafkaIntegrationTest.java
@@ -0,0 +1,118 @@
+package customer.api;
+
+import com.google.protobuf.Empty;
+import customer.Main;
+import kalix.javasdk.testkit.KalixTestKit;
+import kalix.javasdk.testkit.junit.KalixTestKitResource;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.junit.Assert.assertTrue;
+import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
+
+// This class was initially generated based on the .proto definition by Kalix tooling.
+//
+// As long as this file exists it will not be overwritten: you can maintain it yourself,
+// or delete it so it is regenerated as needed.
+
+// Example of an integration test calling our service via the Kalix proxy
+// IMPORTANT: this tests depends on an external kafka instance. Make sure to have it running
+// Run all test classes ending with "IntegrationTest" using `mvn verify -Pit`
+public class CustomerActionWithKafkaIntegrationTest {
+
+ /**
+ * The test kit starts both the service container and the Kalix proxy.
+ */
+ @ClassRule
+ public static final KalixTestKitResource testKit =
+ new KalixTestKitResource(Main.createKalix(),
+ KalixTestKit.Settings.DEFAULT.withEventingSupport(KalixTestKit.Settings.EventingSupport.KAFKA));
+
+ /**
+ * Use the generated gRPC client to call the service through the Kalix proxy.
+ */
+ private final CustomerService client;
+
+
+ public CustomerActionWithKafkaIntegrationTest() {
+ client = testKit.getGrpcClient(CustomerService.class);
+ }
+
+ @Test
+ public void createAndPublish() throws Exception {
+ var id = UUID.randomUUID().toString();
+ var customer = buildCustomer(id, "Johanna", "foo@example.com", "Porto", "Long Road");
+ createCustomer(customer);
+
+ //change customer name
+ var completeName = "Johanna Doe";
+ client.changeName(CustomerApi.ChangeNameRequest.newBuilder()
+ .setCustomerId(id)
+ .setNewName(completeName)
+ .build())
+ .toCompletableFuture()
+ .get(5, SECONDS);
+
+
+ // Example Kafka consumer configuration
+ Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "0.0.0.0:9093");
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, "it-test-" + System.currentTimeMillis()); // using new consumer group every run
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+ // Create Kafka consumer and subscribe to topic
+ KafkaConsumer consumer = new KafkaConsumer<>(props);
+ consumer.subscribe(Collections.singletonList("customer_changes"));
+
+ await()
+ .ignoreExceptions()
+ .atMost(20, TimeUnit.of(ChronoUnit.SECONDS))
+ .untilAsserted(() -> {
+ ConsumerRecords records = consumer.poll(Duration.ofMillis(200));
+ var foundRecord = false;
+ for (ConsumerRecord r : records) {
+ var customerState = CustomerApi.Customer.parseFrom(r.value());
+ if (completeName.equals(customerState.getName())) {
+ foundRecord = true;
+ break;
+ }
+ }
+
+ assertTrue(foundRecord);
+ });
+ }
+
+ private CustomerApi.Customer buildCustomer(String id, String name, String email, String city, String street) {
+ return CustomerApi.Customer.newBuilder()
+ .setCustomerId(id)
+ .setName(name)
+ .setEmail(email)
+ .setAddress(CustomerApi.Address.newBuilder()
+ .setCity(city)
+ .setStreet(street)
+ .build())
+ .build();
+ }
+ private Empty createCustomer(CustomerApi.Customer toCreate) throws ExecutionException, InterruptedException, TimeoutException {
+ return client.create(toCreate)
+ .toCompletableFuture()
+ .get(5, SECONDS);
+ }
+
+}
diff --git a/sdk/java-sdk-protobuf-testkit/src/main/java/kalix/javasdk/testkit/KalixProxyContainer.java b/sdk/java-sdk-protobuf-testkit/src/main/java/kalix/javasdk/testkit/KalixProxyContainer.java
index fba2004fce..e018af415b 100644
--- a/sdk/java-sdk-protobuf-testkit/src/main/java/kalix/javasdk/testkit/KalixProxyContainer.java
+++ b/sdk/java-sdk-protobuf-testkit/src/main/java/kalix/javasdk/testkit/KalixProxyContainer.java
@@ -39,6 +39,8 @@ public class KalixProxyContainer extends GenericContainer {
/** Default local port where the Google Pub/Sub emulator is available (8085). */
public static final int DEFAULT_GOOGLE_PUBSUB_PORT = 8085;
+ public static final int DEFAULT_KAFKA_PORT = 9092;
+
static {
String customImage = System.getenv("KALIX_TESTKIT_PROXY_IMAGE");
if (customImage == null) {
diff --git a/sdk/java-sdk-protobuf-testkit/src/main/java/kalix/javasdk/testkit/KalixTestKit.java b/sdk/java-sdk-protobuf-testkit/src/main/java/kalix/javasdk/testkit/KalixTestKit.java
index a6ab5a66bb..65613ba1aa 100644
--- a/sdk/java-sdk-protobuf-testkit/src/main/java/kalix/javasdk/testkit/KalixTestKit.java
+++ b/sdk/java-sdk-protobuf-testkit/src/main/java/kalix/javasdk/testkit/KalixTestKit.java
@@ -46,6 +46,7 @@
import java.util.stream.Collectors;
import static kalix.javasdk.testkit.KalixProxyContainer.DEFAULT_GOOGLE_PUBSUB_PORT;
+import static kalix.javasdk.testkit.KalixProxyContainer.DEFAULT_KAFKA_PORT;
/**
* Testkit for running Kalix services locally.
@@ -104,11 +105,18 @@ public enum EventingSupport {
TEST_BROKER,
/**
- * Used if you want to use an external Google PubSub Emulator on your tests.
+ * Used if you want to use an external Google PubSub (or its Emulator) on your tests.
*
- * Note: the Google PubSub Emulator need to be started independently.
+ * Note: the Google PubSub broker instance needs to be started independently.
*/
- GOOGLE_PUBSUB
+ GOOGLE_PUBSUB,
+
+ /**
+ * Used if you want to use an external Kafka broker on your tests.
+ *
+ * Note: the Kafka broker instance needs to be started independently.
+ */
+ KAFKA
}
private Settings(
@@ -301,7 +309,7 @@ public KalixTestKit start(final Config config) {
testSystem = ActorSystem.create("KalixTestkit", ConfigFactory.parseString("akka.http.server.preview.enable-http2 = true"));
- int eventingBackendPort = settings.eventingSupport.equals(Settings.EventingSupport.GOOGLE_PUBSUB) ? DEFAULT_GOOGLE_PUBSUB_PORT : startEventingTestkit();
+ int eventingBackendPort = getEventingBackendPort(settings.eventingSupport);
runProxy(useTestContainers, port, eventingBackendPort);
started = true;
@@ -312,6 +320,17 @@ public KalixTestKit start(final Config config) {
return this;
}
+ private int getEventingBackendPort(Settings.EventingSupport eventingSupport) {
+ switch(eventingSupport) {
+ case KAFKA:
+ return DEFAULT_KAFKA_PORT;
+ case GOOGLE_PUBSUB:
+ return DEFAULT_GOOGLE_PUBSUB_PORT;
+ default:
+ return startEventingTestkit();
+ }
+ }
+
private int startEventingTestkit() {
var port = availableLocalPort();
log.info("Eventing TestKit booting up on port: " + port);
@@ -335,6 +354,9 @@ private void runProxy(Boolean useTestContainers, int port, int eventingBackendPo
javaOptions.add("-Dkalix.proxy.eventing.support=grpc-backend");
javaOptions.add("-Dkalix.proxy.eventing.grpc-backend.host=host.testcontainers.internal");
javaOptions.add("-Dkalix.proxy.eventing.grpc-backend.port=" + eventingBackendPort);
+ } else if (settings.eventingSupport.equals(Settings.EventingSupport.KAFKA)) {
+ javaOptions.add("-Dkalix.proxy.eventing.support=kafka");
+ javaOptions.add("-Dkalix.proxy.eventing.kafka.bootstrap-servers=host.testcontainers.internal:"+eventingBackendPort);
}
settings.servicePortMappings.forEach((serviceName, hostPort) -> {
javaOptions.add("-Dkalix.dev-mode.service-port-mappings." + serviceName + "=" + hostPort);
diff --git a/sdk/scala-sdk-protobuf-testkit/src/main/scala/kalix/scalasdk/testkit/KalixTestKit.scala b/sdk/scala-sdk-protobuf-testkit/src/main/scala/kalix/scalasdk/testkit/KalixTestKit.scala
index 1af3e522ae..c5da4b6c9b 100644
--- a/sdk/scala-sdk-protobuf-testkit/src/main/scala/kalix/scalasdk/testkit/KalixTestKit.scala
+++ b/sdk/scala-sdk-protobuf-testkit/src/main/scala/kalix/scalasdk/testkit/KalixTestKit.scala
@@ -26,6 +26,7 @@ import kalix.javasdk.testkit.{ KalixTestKit => JTestKit }
import kalix.javasdk.testkit.KalixTestKit.Settings.{ EventingSupport => JEventingSupport }
import kalix.scalasdk.testkit.KalixTestKit.Settings.EventingSupport
import kalix.scalasdk.testkit.KalixTestKit.Settings.GooglePubSub
+import kalix.scalasdk.testkit.KalixTestKit.Settings.Kafka
import kalix.scalasdk.testkit.KalixTestKit.Settings.TestBroker
import scala.concurrent.duration.FiniteDuration
import scala.jdk.DurationConverters._
@@ -65,6 +66,7 @@ object KalixTestKit {
val jEventingSupport = eventingSupport match {
case TestBroker => JEventingSupport.TEST_BROKER
case GooglePubSub => JEventingSupport.GOOGLE_PUBSUB
+ case Kafka => JEventingSupport.KAFKA
}
new Settings(jSettings.withEventingSupport(jEventingSupport))
}
@@ -84,11 +86,18 @@ object KalixTestKit {
object TestBroker extends EventingSupport
/**
- * Used if you want to use an external Google PubSub Emulator on your tests.
+ * Used if you want to use an external Google PubSub (or its Emulator) on your tests.
*
- * Note: the Google PubSub Emulator need to be started independently.
+ * Note: the Google PubSub broker instance needs to be started independently.
*/
object GooglePubSub extends EventingSupport
+
+ /**
+ * Used if you want to use an external Kafka broker on your tests.
+ *
+ * Note: the Kafka broker instance needs to be started independently.
+ */
+ object Kafka extends EventingSupport
}
val DefaultSettings: Settings = new Settings(JTestKit.Settings.DEFAULT)