Skip to content

Commit

Permalink
Add ConfluentKafkaContainer (#9139)
Browse files Browse the repository at this point in the history
* Deprecate `org.testcontainers.containers.KafkaContainer`
* Add `org.testcontainers.kafka.ConfluentKafkaContainer` which works with `confluentinc/cp-kafka` images with version `7.4.0` or later.
* `KafkaHelper` container common env vars, command, wait strategy to be shared with `org.testcontainers.kafka.KafkaContainer`

---------

Co-authored-by: Kevin Wittek <[email protected]>
  • Loading branch information
eddumelendez and kiview authored Aug 23, 2024
1 parent c1f4796 commit ba22a7b
Show file tree
Hide file tree
Showing 6 changed files with 360 additions and 9 deletions.
38 changes: 29 additions & 9 deletions docs/modules/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,25 @@ Testcontainers can be used to automatically instantiate and manage [Apache Kafka

Currently, two different Kafka images are supported:

* `org.testcontainers.containers.KafkaContainer` supports
* `org.testcontainers.kafka.ConfluentKafkaContainer` supports
[confluentinc/cp-kafka](https://hub.docker.com/r/confluentinc/cp-kafka/)
* `org.testcontainers.kafka.KafkaContainer` supports [apache/kafka](https://hub.docker.com/r/apache/kafka/) and [apache/kafka-native](https://hub.docker.com/r/apache/kafka-native/)

!!! note
`org.testcontainers.containers.KafkaContainer` is deprecated.
Please use `org.testcontainers.kafka.ConfluentKafkaContainer` or `org.testcontainers.kafka.KafkaContainer` instead, depending on the used image.

## Benefits

* Running a single node Kafka installation with just one line of code
* No need to manage external Zookeeper installation, required by Kafka. But see [below](#zookeeper)

## Example

### Using org.testcontainers.containers.KafkaContainer

Create a `KafkaContainer` to use it in your tests:

<!--codeinclude-->
[Creating a KafkaContainer](../../modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java) inside_block:constructorWithVersion
<!--/codeinclude-->
Expand All @@ -28,33 +35,46 @@ Now your tests or any other process running on your machine can get access to ru
[Bootstrap Servers](../../modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java) inside_block:getBootstrapServers
<!--/codeinclude-->

## Options
### Using org.testcontainers.kafka.ConfluentKafkaContainer

!!! note
Compatible with `confluentinc/cp-kafka` images version `7.4.0` and later.

Create a `ConfluentKafkaContainer` to use it in your tests:

!!! note
The options below are only available for `org.testcontainers.containers.KafkaContainer`
<!--codeinclude-->
[Creating a ConlfuentKafkaContainer](../../modules/kafka/src/test/java/org/testcontainers/kafka/ConfluentKafkaContainerTest.java) inside_block:constructorWithVersion
<!--/codeinclude-->

## Options
### <a name="zookeeper"></a> Using external Zookeeper

!!! note
Only available for `org.testcontainers.containers.KafkaContainer`

If for some reason you want to use an externally running Zookeeper, then just pass its location during construction:
<!--codeinclude-->
[External Zookeeper](../../modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java) inside_block:withExternalZookeeper
<!--/codeinclude-->

### Using Kraft mode

KRaft mode was declared production ready in 3.3.1 (confluentinc/cp-kafka:7.3.x)"
!!! note
Only available for `org.testcontainers.containers.KafkaContainer`

KRaft mode was declared production ready in 3.3.1 (confluentinc/cp-kafka:7.3.x)

<!--codeinclude-->
[Kraft mode](../../modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java) inside_block:withKraftMode
<!--/codeinclude-->

See the [versions interoperability matrix](https://docs.confluent.io/platform/current/installation/versions-interoperability.html) for more details.
See the [versions interoperability matrix](https://docs.confluent.io/platform/current/installation/versions-interoperability.html) for more details.

## Register listeners
### Register listeners

There are scenarios where additional listeners are needed because the consumer/producer can be in another
container in the same network or a different process where the port to connect differs from the default
exposed port `9093`. E.g [Toxiproxy](../../modules/toxiproxy/).
container in the same network or a different process where the port to connect differs from the default exposed port. E.g [Toxiproxy](../../modules/toxiproxy/).

<!--codeinclude-->
[Register additional listener](../../modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java) inside_block:registerListener
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@
* <li>Kafka: 9093</li>
* <li>Zookeeper: 2181</li>
* </ul>
*
* @deprecated use {@link org.testcontainers.kafka.ConfluentKafkaContainer} or
* {@link org.testcontainers.kafka.KafkaContainer} instead
*/
@Deprecated
public class KafkaContainer extends GenericContainer<KafkaContainer> {

private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse("confluentinc/cp-kafka");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package org.testcontainers.kafka;

import com.github.dockerjava.api.command.InspectContainerResponse;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.utility.DockerImageName;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Supplier;

/**
* Testcontainers implementation for Confluent Kafka.
* <p>
* Supported image: {@code confluentinc/cp-kafka}
* <p>
* Exposed ports: 9092
*/
public class ConfluentKafkaContainer extends GenericContainer<ConfluentKafkaContainer> {

private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse("confluentinc/cp-kafka");

private final Set<String> listeners = new HashSet<>();

private final Set<Supplier<String>> advertisedListeners = new HashSet<>();

public ConfluentKafkaContainer(String imageName) {
this(DockerImageName.parse(imageName));
}

public ConfluentKafkaContainer(DockerImageName dockerImageName) {
super(dockerImageName);
dockerImageName.assertCompatibleWith(DEFAULT_IMAGE_NAME);

withExposedPorts(KafkaHelper.KAFKA_PORT);
withEnv(KafkaHelper.envVars());

withCommand(KafkaHelper.COMMAND);
waitingFor(KafkaHelper.WAIT_STRATEGY);
}

@Override
protected void configure() {
KafkaHelper.resolveListeners(this, this.listeners);

String firstNetworkAlias = getNetworkAliases().stream().findFirst().orElse(null);
String networkAlias = getNetwork() != null ? firstNetworkAlias : "localhost";
String controllerQuorumVoters = String.format("%s@%s:9094", getEnvMap().get("KAFKA_NODE_ID"), networkAlias);
withEnv("KAFKA_CONTROLLER_QUORUM_VOTERS", controllerQuorumVoters);
}

@Override
protected void containerIsStarting(InspectContainerResponse containerInfo) {
String brokerAdvertisedListener = String.format(
"BROKER://%s:%s",
containerInfo.getConfig().getHostName(),
"9093"
);
List<String> advertisedListeners = new ArrayList<>();
advertisedListeners.add("PLAINTEXT://" + getBootstrapServers());
advertisedListeners.add(brokerAdvertisedListener);

advertisedListeners.addAll(KafkaHelper.resolveAdvertisedListeners(this.advertisedListeners));
String kafkaAdvertisedListeners = String.join(",", advertisedListeners);

String command = "#!/bin/bash\n";
// exporting KAFKA_ADVERTISED_LISTENERS with the container hostname
command += String.format("export KAFKA_ADVERTISED_LISTENERS=%s\n", kafkaAdvertisedListeners);

command += "/etc/confluent/docker/run \n";
copyFileToContainer(Transferable.of(command, 0777), KafkaHelper.STARTER_SCRIPT);
}

/**
* Add a listener in the format {@code host:port}.
* Host will be included as a network alias.
* <p>
* Use it to register additional connections to the Kafka broker within the same container network.
* <p>
* The listener will be added to the list of default listeners.
* <p>
* Default listeners:
* <ul>
* <li>0.0.0.0:9092</li>
* <li>0.0.0.0:9093</li>
* <li>0.0.0.0:9094</li>
* </ul>
* <p>
* The listener will be added to the list of default advertised listeners.
* <p>
* Default advertised listeners:
* <ul>
* <li>{@code container.getConfig().getHostName():9092}</li>
* <li>{@code container.getHost():container.getMappedPort(9093)}</li>
* </ul>
* @param listener a listener with format {@code host:port}
* @return this {@link ConfluentKafkaContainer} instance
*/
public ConfluentKafkaContainer withListener(String listener) {
this.listeners.add(listener);
this.advertisedListeners.add(() -> listener);
return this;
}

/**
* Add a listener in the format {@code host:port} and a {@link Supplier} for the advertised listener.
* Host from listener will be included as a network alias.
* <p>
* Use it to register additional connections to the Kafka broker from outside the container network
* <p>
* The listener will be added to the list of default listeners.
* <p>
* Default listeners:
* <ul>
* <li>0.0.0.0:9092</li>
* <li>0.0.0.0:9093</li>
* <li>0.0.0.0:9094</li>
* </ul>
* <p>
* The {@link Supplier} will be added to the list of default advertised listeners.
* <p>
* Default advertised listeners:
* <ul>
* <li>{@code container.getConfig().getHostName():9092}</li>
* <li>{@code container.getHost():container.getMappedPort(9093)}</li>
* </ul>
* @param listener a supplier that will provide a listener
* @param advertisedListener a supplier that will provide a listener
* @return this {@link ConfluentKafkaContainer} instance
*/
public ConfluentKafkaContainer withListener(String listener, Supplier<String> advertisedListener) {
this.listeners.add(listener);
this.advertisedListeners.add(advertisedListener);
return this;
}

public String getBootstrapServers() {
return String.format("%s:%s", getHost(), getMappedPort(KafkaHelper.KAFKA_PORT));
}
}
104 changes: 104 additions & 0 deletions modules/kafka/src/main/java/org/testcontainers/kafka/KafkaHelper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package org.testcontainers.kafka;

import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.containers.wait.strategy.WaitStrategy;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;

class KafkaHelper {

private static final String DEFAULT_INTERNAL_TOPIC_RF = "1";

private static final String DEFAULT_CLUSTER_ID = "4L6g3nShT-eMCtK--X86sw";

private static final String PROTOCOL_PREFIX = "TC";

static final int KAFKA_PORT = 9092;

static final String STARTER_SCRIPT = "/tmp/testcontainers_start.sh";

static final String[] COMMAND = {
"sh",
"-c",
"while [ ! -f " + STARTER_SCRIPT + " ]; do sleep 0.1; done; " + STARTER_SCRIPT,
};

static final WaitStrategy WAIT_STRATEGY = Wait.forLogMessage(".*Transitioning from RECOVERY to RUNNING.*", 1);

static Map<String, String> envVars() {
Map<String, String> envVars = new HashMap<>();
envVars.put("CLUSTER_ID", DEFAULT_CLUSTER_ID);

envVars.put(
"KAFKA_LISTENERS",
"PLAINTEXT://0.0.0.0:" + KAFKA_PORT + ",BROKER://0.0.0.0:9093,CONTROLLER://0.0.0.0:9094"
);
envVars.put(
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP",
"BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT"
);
envVars.put("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER");
envVars.put("KAFKA_PROCESS_ROLES", "broker,controller");
envVars.put("KAFKA_CONTROLLER_LISTENER_NAMES", "CONTROLLER");

envVars.put("KAFKA_NODE_ID", "1");
envVars.put("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF);
envVars.put("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", DEFAULT_INTERNAL_TOPIC_RF);
envVars.put("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF);
envVars.put("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", DEFAULT_INTERNAL_TOPIC_RF);
envVars.put("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + "");
envVars.put("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0");
return envVars;
}

static void resolveListeners(GenericContainer<?> kafkaContainer, Set<String> listenersSuppliers) {
Set<String> listeners = Arrays
.stream(kafkaContainer.getEnvMap().get("KAFKA_LISTENERS").split(","))
.collect(Collectors.toSet());
Set<String> listenerSecurityProtocolMap = Arrays
.stream(kafkaContainer.getEnvMap().get("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP").split(","))
.collect(Collectors.toSet());

List<String> listenersToTransform = new ArrayList<>(listenersSuppliers);
for (int i = 0; i < listenersToTransform.size(); i++) {
String protocol = String.format("%s-%d", PROTOCOL_PREFIX, i);
String listener = listenersToTransform.get(i);
String listenerHost = listener.split(":")[0];
String listenerPort = listener.split(":")[1];
String listenerProtocol = String.format("%s://%s:%s", protocol, listenerHost, listenerPort);
String protocolMap = String.format("%s:PLAINTEXT", protocol);
listeners.add(listenerProtocol);
listenerSecurityProtocolMap.add(protocolMap);

String host = listener.split(":")[0];
kafkaContainer.withNetworkAliases(host);
}

String kafkaListeners = String.join(",", listeners);
String kafkaListenerSecurityProtocolMap = String.join(",", listenerSecurityProtocolMap);

kafkaContainer.getEnvMap().put("KAFKA_LISTENERS", kafkaListeners);
kafkaContainer.getEnvMap().put("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", kafkaListenerSecurityProtocolMap);
}

static List<String> resolveAdvertisedListeners(Set<Supplier<String>> listenerSuppliers) {
List<String> advertisedListeners = new ArrayList<>();
List<Supplier<String>> listenersToTransform = new ArrayList<>(listenerSuppliers);
for (int i = 0; i < listenersToTransform.size(); i++) {
Supplier<String> listenerSupplier = listenersToTransform.get(i);
String protocol = String.format("%s-%d", PROTOCOL_PREFIX, i);
String listener = listenerSupplier.get();
String listenerProtocol = String.format("%s://%s", protocol, listener);
advertisedListeners.add(listenerProtocol);
}
return advertisedListeners;
}
}
16 changes: 16 additions & 0 deletions modules/kafka/src/test/java/org/testcontainers/KCatContainer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.testcontainers;

import org.testcontainers.containers.GenericContainer;
import org.testcontainers.images.builder.Transferable;

public class KCatContainer extends GenericContainer<KCatContainer> {

public KCatContainer() {
super("confluentinc/cp-kcat:7.4.1");
withCreateContainerCmdModifier(cmd -> {
cmd.withEntrypoint("sh");
});
withCopyToContainer(Transferable.of("Message produced by kcat"), "/data/msgs.txt");
withCommand("-c", "tail -f /dev/null");
}
}
Loading

0 comments on commit ba22a7b

Please sign in to comment.