Skip to content

Commit

Permalink
Update EmbeddedKafkaBroker to Kafka 3.9 changes
Browse files Browse the repository at this point in the history
  • Loading branch information
ozangunalp committed Nov 8, 2024
1 parent 953dac7 commit 40d2510
Showing 1 changed file with 45 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import static io.smallrye.reactive.messaging.kafka.companion.test.EmbeddedKafkaBroker.LoggingOutputStream.loggerPrintStream;
import static org.apache.kafka.common.security.auth.SecurityProtocol.PLAINTEXT;
import static org.apache.kafka.server.common.MetadataVersion.MINIMUM_BOOTSTRAP_VERSION;

import java.io.ByteArrayOutputStream;
import java.io.Closeable;
Expand All @@ -17,6 +16,8 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Function;
Expand All @@ -28,8 +29,7 @@
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.metadata.properties.MetaProperties;
import org.apache.kafka.metadata.properties.MetaPropertiesVersion;
import org.apache.kafka.metadata.storage.Formatter;
import org.apache.kafka.network.SocketServerConfigs;
import org.apache.kafka.raft.QuorumConfig;
import org.apache.kafka.server.config.KRaftConfigs;
Expand All @@ -42,9 +42,6 @@
import kafka.cluster.EndPoint;
import kafka.server.KafkaConfig;
import kafka.server.KafkaRaftServer;
import kafka.tools.StorageTool;
import scala.collection.immutable.Seq;
import scala.jdk.CollectionConverters;
import scala.jdk.javaapi.StreamConverters;

/**
Expand Down Expand Up @@ -367,21 +364,51 @@ public static Properties createDefaultBrokerConfig(int nodeId, Endpoint controll

public static KafkaConfig formatStorageFromConfig(Properties properties, String clusterId, boolean ignoreFormatted) {
KafkaConfig config = KafkaConfig.fromProps(properties, false);
Seq<String> directories = StorageTool.configToLogDirectories(config);
MetaProperties metaProperties = StorageTool.buildMetadataProperties(clusterId, config);
StorageTool.formatCommand(loggerPrintStream(LOGGER), directories, metaProperties, MINIMUM_BOOTSTRAP_VERSION,
ignoreFormatted);
Formatter formatter = new Formatter();
formatter.setClusterId(clusterId)
.setNodeId(config.nodeId())
.setUnstableFeatureVersionsEnabled(config.unstableFeatureVersionsEnabled())
.setIgnoreFormatted(ignoreFormatted)
.setControllerListenerName(config.controllerListenerNames().head())
.setMetadataLogDirectory(config.metadataLogDir());
configToLogDirectories(config).forEach(formatter::addDirectory);
try {
formatter.run();
} catch (Exception e) {
throw new RuntimeException(e);
}
return config;
}

public static void formatStorage(List<String> directories, String clusterId, int nodeId, boolean ignoreFormatted) {
MetaProperties metaProperties = new MetaProperties.Builder()
.setVersion(MetaPropertiesVersion.V1)
.setClusterId(clusterId)
static Set<String> configToLogDirectories(KafkaConfig config) {
TreeSet<String> dirs = new TreeSet<>();
config.logDirs().foreach(dirs::add);
String metadataLogDir = config.metadataLogDir();
if (metadataLogDir != null) {
dirs.add(metadataLogDir);
}
return dirs;
}

public static void formatStorage(List<String> directories, String controllerListenerName,
String metadataLogDirectory,
String clusterId, int nodeId, boolean ignoreFormatted) {
Formatter formatter = new Formatter();
formatter.setClusterId(clusterId)
.setNodeId(nodeId)
.build();
Seq<String> dirs = CollectionConverters.ListHasAsScala(directories).asScala().toSeq();
StorageTool.formatCommand(loggerPrintStream(LOGGER), dirs, metaProperties, MINIMUM_BOOTSTRAP_VERSION, ignoreFormatted);
.setIgnoreFormatted(ignoreFormatted)
.setControllerListenerName(controllerListenerName)
.setMetadataLogDirectory(metadataLogDirectory);
directories.forEach(formatter::addDirectory);
try {
formatter.run();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

public static void formatStorage(List<String> directories, String clusterId, int nodeId, boolean ignoreFormatted) {
formatStorage(directories, "CONTROLLER", directories.get(0), clusterId, nodeId, ignoreFormatted);
}

public static KafkaRaftServer createServer(final KafkaConfig config) {
Expand All @@ -391,7 +418,7 @@ public static KafkaRaftServer createServer(final KafkaConfig config) {
}

private static String getAdvertisedListeners(KafkaConfig config) {
return StreamConverters.asJavaParStream(config.effectiveAdvertisedListeners())
return StreamConverters.asJavaParStream(config.effectiveAdvertisedBrokerListeners())
.map(EndPoint::connectionString)
.collect(Collectors.joining(","));
}
Expand Down

0 comments on commit 40d2510

Please sign in to comment.