Skip to content

Commit

Permalink
feat: support running integration tests against real kafka (#1779)
Browse files Browse the repository at this point in the history
* feat: support running integration tests against real kafka

* docs: add reference to support of kafka

* chore: add ci configs and remove old config files
  • Loading branch information
efgpinto authored Sep 4, 2023
1 parent 027816f commit 36038c8
Show file tree
Hide file tree
Showing 12 changed files with 202 additions and 29 deletions.
24 changes: 24 additions & 0 deletions .circleci/kafka.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
version: '2.0'

services:
kafka:
image: confluentinc/cp-kafka:7.2.6
depends_on:
- zookeeper
ports:
- 9092:9092 # used when running clients from containerized Kalix proxy
- 9093:9093 # used when running non-docker clients from host
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
# advertised listener running on port 9092 must be accessible to kalix-proxy container running for integration tests
# therefore we need to use host.testcontainers.internal (testcontainers's bridge address) to access it through the host machine
# advertised listener running on port 9093 is to be used by non-docker clients running on host machine
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL_DOCKER://host.testcontainers.internal:9092,HOST://localhost:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL_DOCKER:PLAINTEXT,HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
zookeeper:
image: zookeeper:3.9
ports:
- "2181:2181"
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ jobs:
include:
- { sample: java-protobuf-shopping-cart-quickstart, it: true }
- { sample: java-protobuf-customer-registry-quickstart, it: true }
- { sample: java-protobuf-customer-registry-kafka-quickstart, it: true }
- { sample: java-protobuf-customer-registry-kafka-quickstart, pre_cmd: 'docker-compose -f ../../.circleci/kafka.yml up -d', it: true }
- { sample: java-protobuf-customer-registry-views-quickstart, it: true }

- { sample: java-spring-shopping-cart-quickstart, it: true }
Expand Down Expand Up @@ -652,7 +652,7 @@ jobs:
fi
if [ true == '${{matrix.it}}' ]; then
${PRE_CMD}
mvn -Dkalix-sdk.version=${SDK_VERSION} verify -Pit
KALIX_TESTKIT_DEBUG=true mvn -Dkalix-sdk.version=${SDK_VERSION} verify -Pit
fi
- name: ${{ matrix.sample }} rm & test-compile
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ NOTE: Despite the example, you are neither forced to clear all topics nor to do

=== External Broker

To run an integration test against a real instance of Google PubSub (or its Emulator), use the TestKit settings to override the default eventing support, as shown below:
To run an integration test against a real instance of Google PubSub (or its Emulator) or Kafka, use the TestKit settings to override the default eventing support, as shown below:

[.tabset]
Java::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ NOTE: Despite the example, you are neither forced to clear all topics nor to do

=== External Broker

To run an integration test against a real instance of Google PubSub (or its Emulator), use the TestKit settings to override the default eventing support, as shown below:
To run an integration test against a real instance of Google PubSub (or its Emulator) or Kafka, use the TestKit settings to override the default eventing support, as shown below:

[source,java]
.src/it/java/com/example/TestkitConfig.java
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,30 @@ services:
-Dkalix.proxy.eventing.support=kafka
USER_FUNCTION_HOST: ${USER_FUNCTION_HOST:-host.docker.internal}
USER_FUNCTION_PORT: ${USER_FUNCTION_PORT:-8080}
BROKER_CONFIG_FILE: /conf/my-local.kafka.properties
volumes:
- .:/conf
# configuring kafka broker used for eventing
BROKER_SERVERS: kafka:29092

kafka:
image: confluentinc/cp-kafka:7.1.0
# image not available for arm64 so this is needed to run it on Apple M1
platform: linux/amd64
image: confluentinc/cp-kafka:7.2.6
depends_on:
- zookeeper
ports:
- 9092:9092
- 9092:9092 # used when running clients from containerized Kalix proxy
- 9093:9093 # used when running non-docker clients from host
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
# advertised listener running on port 9092 must be accessible to kalix-proxy container run by kalix:runAll
# advertised listener running on port 9092 must be accessible to kalix-proxy container running for integration tests
# therefore we need to use host.docker.internal (docker's bridge address) to access it through the host machine
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://host.docker.internal:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
# advertised listener running on port 9093 is to be used by non-docker clients running on host machine
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL_DOCKER://host.docker.internal:9092,HOST://localhost:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL_DOCKER:PLAINTEXT,HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
volumes:
- /var/run/docker.sock:/var/run/docker.sock

zookeeper:
image: wurstmeister/zookeeper
image: zookeeper:3.9
ports:
- "2181:2181"

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -289,5 +289,11 @@
<version>4.13.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.5.1</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package customer.api;

import com.google.protobuf.Empty;
import customer.Main;
import kalix.javasdk.testkit.KalixTestKit;
import kalix.javasdk.testkit.junit.KalixTestKitResource;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.ClassRule;
import org.junit.Test;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.Assert.assertTrue;
import static org.testcontainers.shaded.org.awaitility.Awaitility.await;

// This class was initially generated based on the .proto definition by Kalix tooling.
//
// As long as this file exists it will not be overwritten: you can maintain it yourself,
// or delete it so it is regenerated as needed.

// Example of an integration test calling our service via the Kalix proxy
// IMPORTANT: this tests depends on an external kafka instance. Make sure to have it running
// Run all test classes ending with "IntegrationTest" using `mvn verify -Pit`
public class CustomerActionWithKafkaIntegrationTest {

/**
* The test kit starts both the service container and the Kalix proxy.
*/
@ClassRule
public static final KalixTestKitResource testKit =
new KalixTestKitResource(Main.createKalix(),
KalixTestKit.Settings.DEFAULT.withEventingSupport(KalixTestKit.Settings.EventingSupport.KAFKA));

/**
* Use the generated gRPC client to call the service through the Kalix proxy.
*/
private final CustomerService client;


public CustomerActionWithKafkaIntegrationTest() {
client = testKit.getGrpcClient(CustomerService.class);
}

@Test
public void createAndPublish() throws Exception {
var id = UUID.randomUUID().toString();
var customer = buildCustomer(id, "Johanna", "[email protected]", "Porto", "Long Road");
createCustomer(customer);

//change customer name
var completeName = "Johanna Doe";
client.changeName(CustomerApi.ChangeNameRequest.newBuilder()
.setCustomerId(id)
.setNewName(completeName)
.build())
.toCompletableFuture()
.get(5, SECONDS);


// Example Kafka consumer configuration
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "0.0.0.0:9093");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "it-test-" + System.currentTimeMillis()); // using new consumer group every run
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

// Create Kafka consumer and subscribe to topic
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("customer_changes"));

await()
.ignoreExceptions()
.atMost(20, TimeUnit.of(ChronoUnit.SECONDS))
.untilAsserted(() -> {
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(200));
var foundRecord = false;
for (ConsumerRecord<String, byte[]> r : records) {
var customerState = CustomerApi.Customer.parseFrom(r.value());
if (completeName.equals(customerState.getName())) {
foundRecord = true;
break;
}
}

assertTrue(foundRecord);
});
}

private CustomerApi.Customer buildCustomer(String id, String name, String email, String city, String street) {
return CustomerApi.Customer.newBuilder()
.setCustomerId(id)
.setName(name)
.setEmail(email)
.setAddress(CustomerApi.Address.newBuilder()
.setCity(city)
.setStreet(street)
.build())
.build();
}
private Empty createCustomer(CustomerApi.Customer toCreate) throws ExecutionException, InterruptedException, TimeoutException {
return client.create(toCreate)
.toCompletableFuture()
.get(5, SECONDS);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public class KalixProxyContainer extends GenericContainer<KalixProxyContainer> {
/** Default local port where the Google Pub/Sub emulator is available (8085). */
public static final int DEFAULT_GOOGLE_PUBSUB_PORT = 8085;

public static final int DEFAULT_KAFKA_PORT = 9092;

static {
String customImage = System.getenv("KALIX_TESTKIT_PROXY_IMAGE");
if (customImage == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.stream.Collectors;

import static kalix.javasdk.testkit.KalixProxyContainer.DEFAULT_GOOGLE_PUBSUB_PORT;
import static kalix.javasdk.testkit.KalixProxyContainer.DEFAULT_KAFKA_PORT;

/**
* Testkit for running Kalix services locally.
Expand Down Expand Up @@ -104,11 +105,18 @@ public enum EventingSupport {
TEST_BROKER,

/**
* Used if you want to use an external Google PubSub Emulator on your tests.
* Used if you want to use an external Google PubSub (or its Emulator) on your tests.
*
* Note: the Google PubSub Emulator need to be started independently.
* Note: the Google PubSub broker instance needs to be started independently.
*/
GOOGLE_PUBSUB
GOOGLE_PUBSUB,

/**
* Used if you want to use an external Kafka broker on your tests.
*
* Note: the Kafka broker instance needs to be started independently.
*/
KAFKA
}

private Settings(
Expand Down Expand Up @@ -301,7 +309,7 @@ public KalixTestKit start(final Config config) {

testSystem = ActorSystem.create("KalixTestkit", ConfigFactory.parseString("akka.http.server.preview.enable-http2 = true"));

int eventingBackendPort = settings.eventingSupport.equals(Settings.EventingSupport.GOOGLE_PUBSUB) ? DEFAULT_GOOGLE_PUBSUB_PORT : startEventingTestkit();
int eventingBackendPort = getEventingBackendPort(settings.eventingSupport);
runProxy(useTestContainers, port, eventingBackendPort);

started = true;
Expand All @@ -312,6 +320,17 @@ public KalixTestKit start(final Config config) {
return this;
}

private int getEventingBackendPort(Settings.EventingSupport eventingSupport) {
switch(eventingSupport) {
case KAFKA:
return DEFAULT_KAFKA_PORT;
case GOOGLE_PUBSUB:
return DEFAULT_GOOGLE_PUBSUB_PORT;
default:
return startEventingTestkit();
}
}

private int startEventingTestkit() {
var port = availableLocalPort();
log.info("Eventing TestKit booting up on port: " + port);
Expand All @@ -335,6 +354,9 @@ private void runProxy(Boolean useTestContainers, int port, int eventingBackendPo
javaOptions.add("-Dkalix.proxy.eventing.support=grpc-backend");
javaOptions.add("-Dkalix.proxy.eventing.grpc-backend.host=host.testcontainers.internal");
javaOptions.add("-Dkalix.proxy.eventing.grpc-backend.port=" + eventingBackendPort);
} else if (settings.eventingSupport.equals(Settings.EventingSupport.KAFKA)) {
javaOptions.add("-Dkalix.proxy.eventing.support=kafka");
javaOptions.add("-Dkalix.proxy.eventing.kafka.bootstrap-servers=host.testcontainers.internal:"+eventingBackendPort);
}
settings.servicePortMappings.forEach((serviceName, hostPort) -> {
javaOptions.add("-Dkalix.dev-mode.service-port-mappings." + serviceName + "=" + hostPort);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import kalix.javasdk.testkit.{ KalixTestKit => JTestKit }
import kalix.javasdk.testkit.KalixTestKit.Settings.{ EventingSupport => JEventingSupport }
import kalix.scalasdk.testkit.KalixTestKit.Settings.EventingSupport
import kalix.scalasdk.testkit.KalixTestKit.Settings.GooglePubSub
import kalix.scalasdk.testkit.KalixTestKit.Settings.Kafka
import kalix.scalasdk.testkit.KalixTestKit.Settings.TestBroker
import scala.concurrent.duration.FiniteDuration
import scala.jdk.DurationConverters._
Expand Down Expand Up @@ -65,6 +66,7 @@ object KalixTestKit {
val jEventingSupport = eventingSupport match {
case TestBroker => JEventingSupport.TEST_BROKER
case GooglePubSub => JEventingSupport.GOOGLE_PUBSUB
case Kafka => JEventingSupport.KAFKA
}
new Settings(jSettings.withEventingSupport(jEventingSupport))
}
Expand All @@ -84,11 +86,18 @@ object KalixTestKit {
object TestBroker extends EventingSupport

/**
* Used if you want to use an external Google PubSub Emulator on your tests.
* Used if you want to use an external Google PubSub (or its Emulator) on your tests.
*
* Note: the Google PubSub Emulator need to be started independently.
* Note: the Google PubSub broker instance needs to be started independently.
*/
object GooglePubSub extends EventingSupport

/**
* Used if you want to use an external Kafka broker on your tests.
*
* Note: the Kafka broker instance needs to be started independently.
*/
object Kafka extends EventingSupport
}

val DefaultSettings: Settings = new Settings(JTestKit.Settings.DEFAULT)
Expand Down

0 comments on commit 36038c8

Please sign in to comment.