Skip to content

Commit

Permalink
Config options moved with 3.8.1
Browse files Browse the repository at this point in the history
  • Loading branch information
ozangunalp committed Nov 7, 2024
1 parent dd90c67 commit 26db42c
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 26 deletions.
24 changes: 24 additions & 0 deletions smallrye-reactive-messaging-kafka-test-companion/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,30 @@
<version>${kafka.version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-server</artifactId>
<version>${kafka.version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-raft</artifactId>
<version>${kafka.version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-group-coordinator</artifactId>
<version>${kafka.version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-storage</artifactId>
<version>${kafka.version}</version>
<optional>true</optional>
</dependency>
<!-- Optional dependencies for usage in Tests -->
<dependency>
<groupId>org.junit.jupiter</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,16 @@
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.metadata.properties.MetaProperties;
import org.apache.kafka.metadata.properties.MetaPropertiesVersion;
import org.apache.kafka.network.SocketServerConfigs;
import org.apache.kafka.raft.QuorumConfig;
import org.apache.kafka.server.config.KRaftConfigs;
import org.apache.kafka.server.config.ReplicationConfigs;
import org.apache.kafka.server.config.ServerConfigs;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.apache.kafka.storage.internals.log.CleanerConfig;
import org.jboss.logging.Logger;

import kafka.cluster.EndPoint;
Expand Down Expand Up @@ -194,7 +202,7 @@ public synchronized EmbeddedKafkaBroker start() {
brokerConfigModifier.accept(properties);
}

if (properties.get(KafkaConfig.LogDirProp()) == null) {
if (properties.get(ServerLogConfigs.LOG_DIR_CONFIG) == null) {
createAndSetlogDir(properties);
}

Expand Down Expand Up @@ -296,12 +304,12 @@ public static Endpoint parseEndpoint(String listenerStr) {
public static Properties createDefaultBrokerConfig(int nodeId, Endpoint controller, Endpoint internalEndpoint,
List<Endpoint> advertisedListeners) {
Properties props = new Properties();
props.put(KafkaConfig.BrokerIdProp(), Integer.toString(nodeId));
props.put(ServerConfigs.BROKER_ID_CONFIG, Integer.toString(nodeId));

// Configure kraft
props.put(KafkaConfig.ProcessRolesProp(), "broker,controller");
props.put(KafkaConfig.ControllerListenerNamesProp(), listenerName(controller));
props.put(KafkaConfig.QuorumVotersProp(), nodeId + "@" + controller.host() + ":" + controller.port());
props.put(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker,controller");
props.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, listenerName(controller));
props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, nodeId + "@" + controller.host() + ":" + controller.port());

// Configure listeners
Map<String, Endpoint> listeners = advertisedListeners.stream()
Expand All @@ -314,7 +322,7 @@ public static Properties createDefaultBrokerConfig(int nodeId, Endpoint controll
.map(EmbeddedKafkaBroker::toListenerString)
.distinct()
.collect(Collectors.joining(","));
props.put(KafkaConfig.ListenersProp(), listenersString);
props.put(SocketServerConfigs.LISTENERS_CONFIG, listenersString);

// Find a PLAINTEXT listener
Endpoint plaintextEndpoint = advertisedListeners.stream()
Expand All @@ -327,32 +335,32 @@ public static Properties createDefaultBrokerConfig(int nodeId, Endpoint controll
.distinct()
.collect(Collectors.joining(","));
if (!Utils.isBlank(advertisedListenersString)) {
props.put(KafkaConfig.AdvertisedListenersProp(), advertisedListenersString);
props.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, advertisedListenersString);
}

// Configure security protocol map
String securityProtocolMap = listeners.values().stream()
.map(EmbeddedKafkaBroker::toProtocolMap)
.distinct()
.collect(Collectors.joining(","));
props.put(KafkaConfig.ListenerSecurityProtocolMapProp(), securityProtocolMap);
props.put(KafkaConfig.InterBrokerListenerNameProp(), listenerName(plaintextEndpoint));
props.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, securityProtocolMap);
props.put(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG, listenerName(plaintextEndpoint));

// Configure static default props
props.put(KafkaConfig.ReplicaSocketTimeoutMsProp(), "1000");
props.put(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp(), String.valueOf(Long.MAX_VALUE));
props.put(KafkaConfig.ControllerSocketTimeoutMsProp(), "1000");
props.put(KafkaConfig.ControlledShutdownEnableProp(), Boolean.toString(false));
props.put(KafkaConfig.ControlledShutdownRetryBackoffMsProp(), "100");
props.put(KafkaConfig.DeleteTopicEnableProp(), Boolean.toString(true));
props.put(KafkaConfig.LogDeleteDelayMsProp(), "1000");
props.put(KafkaConfig.LogCleanerDedupeBufferSizeProp(), "2097152");
props.put(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp(), String.valueOf(Long.MAX_VALUE));
props.put(KafkaConfig.OffsetsTopicReplicationFactorProp(), "1");
props.put(KafkaConfig.OffsetsTopicPartitionsProp(), "5");
props.put(KafkaConfig.GroupInitialRebalanceDelayMsProp(), "0");
props.put(KafkaConfig.NumPartitionsProp(), "1");
props.put(KafkaConfig.DefaultReplicationFactorProp(), "1");
props.put(ReplicationConfigs.REPLICA_SOCKET_TIMEOUT_MS_CONFIG, "1000");
props.put(ReplicationConfigs.REPLICA_HIGH_WATERMARK_CHECKPOINT_INTERVAL_MS_CONFIG, String.valueOf(Long.MAX_VALUE));
props.put(ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_CONFIG, "1000");
props.put(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, Boolean.toString(false));
props.put(ServerConfigs.CONTROLLED_SHUTDOWN_RETRY_BACKOFF_MS_CONFIG, "100");
props.put(ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, Boolean.toString(true));
props.put(ServerLogConfigs.LOG_DELETE_DELAY_MS_CONFIG, "1000");
props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "2097152");
props.put(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DOC, String.valueOf(Long.MAX_VALUE));
props.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
props.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, "5");
props.put(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, "0");
props.put(ServerLogConfigs.NUM_PARTITIONS_CONFIG, "1");
props.put(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, "1");

return props;
}
Expand Down Expand Up @@ -406,7 +414,7 @@ private static int getUnusedPort(int port) {

private static void createAndSetlogDir(Properties properties) {
try {
properties.put(KafkaConfig.LogDirProp(),
properties.put(ServerLogConfigs.LOG_DIR_CONFIG,
Files.createTempDirectory(COMPANION_BROKER_PREFIX + "-" + UUID.randomUUID()).toString());
} catch (IOException e) {
throw new UncheckedIOException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import io.smallrye.reactive.messaging.kafka.companion.KafkaCompanion;
import io.smallrye.reactive.messaging.kafka.companion.TestTags;
import kafka.server.KafkaConfig;

@Tag(TestTags.FLAKY)
public class EmbeddedKafkaTest {
Expand All @@ -42,7 +42,7 @@ void test() {
void testWithExistingLogDir(@TempDir File dir) {
EmbeddedKafkaBroker broker = new EmbeddedKafkaBroker()
.withNodeId(0)
.withAdditionalProperties(props -> props.put(KafkaConfig.LogDirProp(), dir.toPath().toString()))
.withAdditionalProperties(props -> props.put(ServerLogConfigs.LOG_DIR_CONFIG, dir.toPath().toString()))
.withDeleteLogDirsOnClose(false);

// format storage before starting
Expand Down

0 comments on commit 26db42c

Please sign in to comment.