From df025482164e6128ebdcbbc154c141fa4f8ad271 Mon Sep 17 00:00:00 2001 From: Huilin Shi Date: Wed, 13 Mar 2024 14:59:26 -0700 Subject: [PATCH] Fix LogConfig during dynamic update --- bin/kafka-kernel-tls-perf-test.sh | 20 -- build.gradle | 4 - checkstyle/import-control.xml | 1 - .../kafka/common/config/SslConfigs.java | 6 - .../common/network/SaslChannelBuilder.java | 2 +- .../common/network/SslChannelBuilder.java | 9 +- .../common/network/SslTransportLayer.java | 74 +---- .../security/ssl/DefaultSslEngineFactory.java | 29 +- .../kafka/common/security/ssl/SslFactory.java | 6 +- .../kafka/common/network/SslSelectorTest.java | 4 +- .../common/network/SslTransportLayerTest.java | 15 +- .../ssl/DefaultSslEngineFactoryTest.java | 1 - .../org/apache/kafka/test/TestSslUtils.java | 6 - core/src/main/scala/kafka/log/Log.scala | 1 - .../scala/kafka/network/RequestChannel.scala | 13 - .../kafka/server/DynamicBrokerConfig.scala | 18 +- .../main/scala/kafka/server/KafkaConfig.scala | 4 - .../kafka/tools/KernelTLSBenchmark.scala | 270 ------------------ .../DynamicBrokerReconfigurationTest.scala | 120 +++++++- gradle/dependencies.gradle | 2 - 20 files changed, 140 insertions(+), 465 deletions(-) delete mode 100755 bin/kafka-kernel-tls-perf-test.sh delete mode 100644 core/src/main/scala/kafka/tools/KernelTLSBenchmark.scala diff --git a/bin/kafka-kernel-tls-perf-test.sh b/bin/kafka-kernel-tls-perf-test.sh deleted file mode 100755 index b89076f31524f..0000000000000 --- a/bin/kafka-kernel-tls-perf-test.sh +++ /dev/null @@ -1,20 +0,0 @@ -#!/bin/bash -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then - export KAFKA_HEAP_OPTS="-Xmx512M" -fi -exec $(dirname $0)/kafka-run-class.sh kafka.tools.KernelTLSBenchmark "$@" diff --git a/build.gradle b/build.gradle index 921b882047398..b3b610d5c4c6f 100644 --- a/build.gradle +++ b/build.gradle @@ -57,11 +57,8 @@ spotless { allprojects { repositories { - mavenLocal() mavenCentral() maven { url "https://linkedin.jfrog.io/artifactory/zookeeper" } - maven { url "https://linkedin.jfrog.io/artifactory/ktls-jni" } - } dependencyUpdates { @@ -1247,7 +1244,6 @@ project(':clients') { implementation libs.snappy implementation libs.slf4jApi implementation libs.conscrypt - implementation libs.ktls compileOnly libs.jacksonDatabind // for SASL/OAUTHBEARER bearer token parsing compileOnly libs.jacksonJDK8Datatypes diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 01182b44a2c36..0566fcf4e40b2 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -58,7 +58,6 @@ - diff --git a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java index affa614fb614c..e986583cb0b82 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java +++ b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java @@ -51,10 +51,6 @@ public class SslConfigs { public static final String SSL_PROVIDER_CONFIG = "ssl.provider"; public static final String SSL_PROVIDER_DOC = "The name of the security provider used for SSL connections. Default value is the default security provider of the JVM."; - public static final String SSL_KERNEL_OFFLOAD_ENABLE_CONFIG = "ssl.kernel.offload.enable"; - public static final String SSL_KERNEL_OFFLOAD_ENABLE_DOC = "ssl.kernel.offload.enable"; - public static final boolean DEFAULT_SSL_KERNEL_OFFLOAD_ENABLE = false; - public static final String SSL_CIPHER_SUITES_CONFIG = "ssl.cipher.suites"; public static final String SSL_CIPHER_SUITES_DOC = "A list of cipher suites. This is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. " + "By default all the available cipher suites are supported."; @@ -144,7 +140,6 @@ public static void addClientSslSupport(ConfigDef config) { config.define(SslConfigs.SSL_PROTOCOL_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_PROTOCOL, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_PROTOCOL_DOC) .define(SslConfigs.SSL_CONTEXT_PROVIDER_CLASS_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_CONTEXT_PROVIDER_CLASS, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_CONTEXT_PROVIDER_CLASS_DOC) .define(SslConfigs.SSL_PROVIDER_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_PROVIDER_DOC) - .define(SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_CONFIG, ConfigDef.Type.BOOLEAN, SslConfigs.DEFAULT_SSL_KERNEL_OFFLOAD_ENABLE, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_DOC) .define(SslConfigs.SSL_CIPHER_SUITES_CONFIG, ConfigDef.Type.LIST, null, ConfigDef.Importance.LOW, SslConfigs.SSL_CIPHER_SUITES_DOC) .define(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, ConfigDef.Type.LIST, SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_ENABLED_PROTOCOLS_DOC) .define(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_KEYSTORE_TYPE_DOC) @@ -165,7 +160,6 @@ public static void addClientSslSupport(ConfigDef config) { } public static final Set RECONFIGURABLE_CONFIGS = Utils.mkSet( - SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_CONFIG, SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java index 94123f8bcef50..8b390d11bc6dd 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java @@ -257,7 +257,7 @@ protected TransportLayer buildTransportLayer(String id, SelectionKey key, Socket if (this.securityProtocol == SecurityProtocol.SASL_SSL) { return SslTransportLayer.create(id, key, sslFactory.createSslEngine(socketChannel.socket()), - metadataRegistry, false); + metadataRegistry); } else { return new PlaintextTransportLayer(key); } diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java index 9056f23ae5df0..4dabf0a15be86 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java @@ -45,7 +45,6 @@ public class SslChannelBuilder implements ChannelBuilder, ListenerReconfigurable private final boolean isInterBrokerListener; private SslFactory sslFactory; private Mode mode; - private boolean isKernelOffloadEnabled; private Map configs; private SslPrincipalMapper sslPrincipalMapper; private final Logger log; @@ -72,7 +71,6 @@ public void configure(Map configs) throws KafkaException { sslPrincipalMapper = SslPrincipalMapper.fromRules(sslPrincipalMappingRules); this.sslFactory = new SslFactory(mode, null, isInterBrokerListener); this.sslFactory.configure(this.configs); - this.isKernelOffloadEnabled = (Boolean) configs.get(SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_CONFIG); } catch (KafkaException e) { throw e; } catch (Exception e) { @@ -92,7 +90,6 @@ public void validateReconfiguration(Map configs) { @Override public void reconfigure(Map configs) { - isKernelOffloadEnabled = (Boolean) configs.get(SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_CONFIG); sslFactory.reconfigure(configs); } @@ -121,14 +118,10 @@ public void close() { if (sslFactory != null) sslFactory.close(); } - private boolean shouldEnableKernelOffload() { - return isKernelOffloadEnabled && mode == Mode.SERVER; - } - protected SslTransportLayer buildTransportLayer(SslFactory sslFactory, String id, SelectionKey key, ChannelMetadataRegistry metadataRegistry) throws IOException { SocketChannel socketChannel = (SocketChannel) key.channel(); return SslTransportLayer.create(id, key, sslFactory.createSslEngine(socketChannel.socket()), - metadataRegistry, shouldEnableKernelOffload()); + metadataRegistry); } /** diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java index 04d251a6eec6a..6aba13750e04c 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java +++ b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java @@ -16,8 +16,6 @@ */ package org.apache.kafka.common.network; -import com.linkedin.ktls.KTLSEnableFailedException; -import com.linkedin.ktls.KernelTls; import java.io.IOException; import java.io.EOFException; import java.nio.ByteBuffer; @@ -81,8 +79,6 @@ private enum State { private final SocketChannel socketChannel; private final ChannelMetadataRegistry metadataRegistry; private final Logger log; - private final KernelTls kernelTLS; - private boolean shouldAttemptKtls; private HandshakeStatus handshakeStatus; private SSLEngineResult handshakeResult; @@ -93,31 +89,24 @@ private enum State { private ByteBuffer appReadBuffer; private ByteBuffer fileChannelBuffer; private boolean hasBytesBuffered; - private boolean ktlsAttempted; - private boolean ktlsEnabled; public static SslTransportLayer create(String channelId, SelectionKey key, SSLEngine sslEngine, - ChannelMetadataRegistry metadataRegistry, boolean shouldAttemptKtls) - throws IOException { - return new SslTransportLayer(channelId, key, sslEngine, metadataRegistry, shouldAttemptKtls); + ChannelMetadataRegistry metadataRegistry) throws IOException { + return new SslTransportLayer(channelId, key, sslEngine, metadataRegistry); } // Prefer `create`, only use this in tests - SslTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine, ChannelMetadataRegistry metadataRegistry, - boolean shouldAttemptKtls) { + SslTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine, + ChannelMetadataRegistry metadataRegistry) { this.channelId = channelId; this.key = key; this.socketChannel = (SocketChannel) key.channel(); this.sslEngine = sslEngine; - this.shouldAttemptKtls = shouldAttemptKtls; - this.kernelTLS = new KernelTls(); this.state = State.NOT_INITIALIZED; this.metadataRegistry = metadataRegistry; - final LogContext logContext = new LogContext("[SslTransportLayer channelId=" + channelId + " key=" + key + "] "); + final LogContext logContext = new LogContext(String.format("[SslTransportLayer channelId=%s key=%s] ", channelId, key)); this.log = logContext.logger(getClass()); - - log.debug("New SSL channel created with kernel offload turned {}", shouldAttemptKtls ? "on" : "off"); } // Visible for testing @@ -191,10 +180,6 @@ public void close() throws IOException { state = State.CLOSING; sslEngine.closeOutbound(); try { - if (ktlsEnabled) { - kernelTLS.closeNotify(socketChannel); - return; - } if (prevState != State.NOT_INITIALIZED && isConnected()) { if (!flush(netWriteBuffer)) { throw new IOException("Remaining data in the network buffer, can't send SSL close message."); @@ -422,7 +407,7 @@ private void doHandshake() throws IOException { handshakeFinished(); break; default: - throw new IllegalStateException("Unexpected status [" + handshakeStatus + "]"); + throw new IllegalStateException(String.format("Unexpected status [%s]", handshakeStatus)); } } @@ -710,12 +695,6 @@ public int write(ByteBuffer src) throws IOException { throw closingException(); if (!ready()) return 0; - if (shouldAttemptKtls && !ktlsAttempted) { - attemptToEnableKernelTls(); - } - if (ktlsEnabled) { - return writeKernelTLS(src); - } int written = 0; while (flush(netWriteBuffer) && src.hasRemaining()) { @@ -745,24 +724,6 @@ public int write(ByteBuffer src) throws IOException { return written; } - private int writeKernelTLS(ByteBuffer src) throws IOException { - log.trace("Writing with Kernel TLS enabled"); - return socketChannel.write(src); - } - - private void attemptToEnableKernelTls() { - try { - kernelTLS.enableKernelTlsForSend(sslEngine, socketChannel); - log.debug("Kernel TLS enabled on socket on channel {}", channelId); - ktlsEnabled = true; - } catch (KTLSEnableFailedException e) { - log.warn("Attempt to enable KTLS failed with exception, falling back to userspace encryption", e); - ktlsEnabled = false; - } finally { - ktlsAttempted = true; - } - } - /** * Writes a sequence of bytes to this channel from the subsequence of the given buffers. * @@ -774,12 +735,6 @@ private void attemptToEnableKernelTls() { */ @Override public long write(ByteBuffer[] srcs, int offset, int length) throws IOException { - if (shouldAttemptKtls && !ktlsAttempted) { - attemptToEnableKernelTls(); - } - if (ktlsEnabled) { - return writeKernelTLS(srcs, offset, length); - } if ((offset < 0) || (length < 0) || (offset > srcs.length - length)) throw new IndexOutOfBoundsException(); int totalWritten = 0; @@ -802,11 +757,6 @@ public long write(ByteBuffer[] srcs, int offset, int length) throws IOException return totalWritten; } - private long writeKernelTLS(ByteBuffer[] srcs, int offset, int length) throws IOException { - log.trace("Writing with Kernel TLS enabled"); - return socketChannel.write(srcs, offset, length); - } - /** * Writes a sequence of bytes to this channel from the given buffers. * @@ -1004,12 +954,6 @@ public long transferFrom(FileChannel fileChannel, long position, long count) thr throw closingException(); if (state != State.READY) return 0; - if (shouldAttemptKtls && !ktlsAttempted) { - attemptToEnableKernelTls(); - } - if (ktlsEnabled) { - return transferFromWithKernelTLS(fileChannel, position, count); - } if (!flush(netWriteBuffer)) return 0; @@ -1065,10 +1009,4 @@ public long transferFrom(FileChannel fileChannel, long position, long count) thr throw e; } } - - private long transferFromWithKernelTLS( - FileChannel fileChannel, long position, long count) throws IOException { - log.trace("Transferring from file with Kernel TLS enabled"); - return fileChannel.transferTo(position, count, socketChannel); - } } diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java index a870cc55ae47b..f921c679a6a25 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java +++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.common.security.ssl; -import com.linkedin.ktls.KernelTls; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.config.SslClientAuth; import org.apache.kafka.common.config.SslConfigs; @@ -75,8 +74,6 @@ public final class DefaultSslEngineFactory implements SslEngineFactory { private static final Logger log = LoggerFactory.getLogger(DefaultSslEngineFactory.class); public static final String PEM_TYPE = "PEM"; - private final KernelTls kernelTls = new KernelTls(); - private Map configs; private SslContextProvider sslContextProvider; private String kmfAlgorithm; @@ -88,8 +85,6 @@ public final class DefaultSslEngineFactory implements SslEngineFactory { private SecureRandom secureRandomImplementation; private SSLContext sslContext; private SslClientAuth sslClientAuth; - private boolean isKernelOffloadEnabled; - private List cipherSuitesWithKernelOffload; @Override @@ -144,10 +139,6 @@ public void configure(Map configs) { this.sslContextProvider.configure(configs); SecurityUtils.addConfiguredSecurityProviders(this.configs); - this.isKernelOffloadEnabled = (Boolean) configs.get(SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_CONFIG); - if (isKernelOffloadEnabled) { - this.cipherSuitesWithKernelOffload = kernelTls.supportedCipherSuites(); - } List cipherSuitesList = (List) configs.get(SslConfigs.SSL_CIPHER_SUITES_CONFIG); if (cipherSuitesList != null && !cipherSuitesList.isEmpty()) { this.cipherSuites = cipherSuitesList.toArray(new String[0]); @@ -196,27 +187,9 @@ public SSLContext sslContext() { return this.sslContext; } - private void maybeSetSslEngineCipherSuites(SSLEngine sslEngine) { - if (cipherSuites != null) { - sslEngine.setEnabledCipherSuites(cipherSuites); - } else if (isKernelOffloadEnabled) { - final String[] cipherSuitesToEnable = sslEngine.getEnabledCipherSuites(); - - final List reOrderedCipherSuites = new ArrayList<>(); - Arrays.stream(cipherSuitesToEnable) - .filter(cipherSuitesWithKernelOffload::contains) - .forEach(reOrderedCipherSuites::add); - Arrays.stream(cipherSuitesToEnable) - .filter(cs -> !cipherSuitesWithKernelOffload.contains(cs)) - .forEach(reOrderedCipherSuites::add); - - sslEngine.setEnabledCipherSuites(reOrderedCipherSuites.toArray(new String[0])); - } - } - private SSLEngine createSslEngine(Mode mode, String peerHost, int peerPort, String endpointIdentification) { SSLEngine sslEngine = sslContext.createSSLEngine(peerHost, peerPort); - maybeSetSslEngineCipherSuites(sslEngine); + if (cipherSuites != null) sslEngine.setEnabledCipherSuites(cipherSuites); if (enabledProtocols != null) sslEngine.setEnabledProtocols(enabledProtocols); if (mode == Mode.SERVER) { diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java index f9fa097d363e7..d0cc4cc1e6951 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java +++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java @@ -204,10 +204,6 @@ public SSLEngine createSslEngine(String peerHost, int peerPort) { } } - public Mode getMode() { - return mode; - } - /** * Returns host/IP address of remote host without reverse DNS lookup to be used as the host * for creating SSL engine. This is used as a hint for session reuse strategy and also for @@ -483,4 +479,4 @@ void close() { } } } -} +} \ No newline at end of file diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java index 127edffc5f2cb..ca6037d275610 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java @@ -113,8 +113,6 @@ public void testConnectionWithCustomKeyManager() throws Exception { ); sslServerConfigs.put(SecurityConfig.SECURITY_PROVIDERS_CONFIG, testProviderCreator.getClass().getName()); sslServerConfigs.put(SslConfigs.SSL_CONTEXT_PROVIDER_CLASS_CONFIG, SslConfigs.DEFAULT_SSL_CONTEXT_PROVIDER_CLASS); - sslServerConfigs.put(SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_CONFIG, false); - sslServerConfigs.put(SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_DOC, false); EchoServer server = new EchoServer(SecurityProtocol.SSL, sslServerConfigs); server.start(); Time time = new MockTime(); @@ -373,7 +371,7 @@ static class TestSslTransportLayer extends SslTransportLayer { public TestSslTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine, ChannelMetadataRegistry metadataRegistry) throws IOException { - super(channelId, key, sslEngine, metadataRegistry, false); + super(channelId, key, sslEngine, metadataRegistry); transportLayers.put(channelId, this); } diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java index a029cba491776..fe066a83c0439 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java @@ -99,7 +99,6 @@ public Args(String tlsProtocol, boolean useInlinePem, TestSslUtils.SSLProvider p sslConfigOverrides.put(SslConfigs.SSL_PROTOCOL_CONFIG, tlsProtocol); sslConfigOverrides.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Collections.singletonList(tlsProtocol)); sslConfigOverrides.put(SslConfigs.SSL_CONTEXT_PROVIDER_CLASS_CONFIG, SslConfigs.DEFAULT_SSL_CONTEXT_PROVIDER_CLASS); - sslConfigOverrides.put(SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_CONFIG, false); init(); } @@ -115,8 +114,6 @@ private void init() throws Exception { clientCertStores = certBuilder(false, "client", useInlinePem, provider).addHostName("localhost").build(); sslServerConfigs = getTrustingConfig(serverCertStores, clientCertStores); sslClientConfigs = getTrustingConfig(clientCertStores, serverCertStores); - sslClientConfigs.put(SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_CONFIG, false); - sslServerConfigs.put(SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_CONFIG, false); sslServerConfigs.put(SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG, DefaultSslEngineFactory.class); sslClientConfigs.put(SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG, DefaultSslEngineFactory.class); } @@ -352,11 +349,11 @@ public void testListenerConfigOverride(Args args) throws Exception { ListenerName clientListenerName = new ListenerName("client"); args.sslServerConfigs.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "required"); args.sslServerConfigs.put(clientListenerName.configPrefix() + BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "none"); - args.sslServerConfigs.put(SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_CONFIG, false); + // `client` listener is not configured at this point, so client auth should be required server = createEchoServer(args, SecurityProtocol.SSL); InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); - args.sslClientConfigs.put(SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_CONFIG, false); + // Connect with client auth should work fine createSelector(args.sslClientConfigs); selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); @@ -1070,7 +1067,6 @@ false, securityProtocol, config, null, null, time, new LogContext(), ListenerReconfigurable reconfigurableBuilder = (ListenerReconfigurable) serverChannelBuilder; assertEquals(listenerName, reconfigurableBuilder.listenerName()); reconfigurableBuilder.validateReconfiguration(newKeystoreConfigs); - newKeystoreConfigs.put(SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_CONFIG, false); reconfigurableBuilder.reconfigure(newKeystoreConfigs); // Verify that new client with old truststore fails @@ -1095,7 +1091,6 @@ false, securityProtocol, config, null, null, time, new LogContext(), missingStoreConfigs.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "some.keystore.path"); missingStoreConfigs.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, new Password("some.keystore.password")); missingStoreConfigs.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, new Password("some.key.password")); - missingStoreConfigs.put(SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_CONFIG, false); verifyInvalidReconfigure(reconfigurableBuilder, missingStoreConfigs, "keystore not found"); // Verify that new connections continue to work with the server with previously configured keystore after failed reconfiguration @@ -1140,7 +1135,6 @@ false, securityProtocol, config, null, null, time, new LogContext(), } ListenerReconfigurable reconfigurableBuilder = (ListenerReconfigurable) serverChannelBuilder; reconfigurableBuilder.validateReconfiguration(newKeystoreConfigs); - newKeystoreConfigs.put(SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_CONFIG, false); reconfigurableBuilder.reconfigure(newKeystoreConfigs); for (String propName : CertStores.TRUSTSTORE_PROPS) { @@ -1159,7 +1153,6 @@ false, securityProtocol, config, null, null, time, new LogContext(), for (String propName : CertStores.KEYSTORE_PROPS) { invalidKeystoreConfigs.put(propName, invalidConfig.get(propName)); } - invalidKeystoreConfigs.put(SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_CONFIG, false); verifyInvalidReconfigure(reconfigurableBuilder, invalidKeystoreConfigs, "keystore without existing SubjectAltName"); String node3 = "3"; selector.connect(node3, addr, BUFFER_SIZE, BUFFER_SIZE); @@ -1198,7 +1191,6 @@ false, securityProtocol, config, null, null, time, new LogContext(), ListenerReconfigurable reconfigurableBuilder = (ListenerReconfigurable) serverChannelBuilder; assertEquals(listenerName, reconfigurableBuilder.listenerName()); reconfigurableBuilder.validateReconfiguration(newTruststoreConfigs); - newTruststoreConfigs.put(SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_CONFIG, false); reconfigurableBuilder.reconfigure(newTruststoreConfigs); // Verify that new client with old truststore fails @@ -1221,7 +1213,6 @@ false, securityProtocol, config, null, null, time, new LogContext(), missingStoreConfigs.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "PKCS12"); missingStoreConfigs.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "some.truststore.path"); missingStoreConfigs.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, new Password("some.truststore.password")); - missingStoreConfigs.put(SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_CONFIG, false); verifyInvalidReconfigure(reconfigurableBuilder, missingStoreConfigs, "truststore not found"); // Verify that new connections continue to work with the server with previously configured keystore after failed reconfiguration @@ -1396,7 +1387,7 @@ class TestSslTransportLayer extends SslTransportLayer { private final AtomicInteger numDelayedFlushesRemaining; public TestSslTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine) { - super(channelId, key, sslEngine, new DefaultChannelMetadataRegistry(), false); + super(channelId, key, sslEngine, new DefaultChannelMetadataRegistry()); this.netReadBufSize = new ResizeableBufferSize(netReadBufSizeOverride); this.netWriteBufSize = new ResizeableBufferSize(netWriteBufSizeOverride); this.appBufSize = new ResizeableBufferSize(appBufSizeOverride); diff --git a/clients/src/test/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactoryTest.java b/clients/src/test/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactoryTest.java index 3148b32cb4079..c737abf2ad5b9 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactoryTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactoryTest.java @@ -204,7 +204,6 @@ public void setUp() { factory = new DefaultSslEngineFactory(); configs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2"); configs.put(SslConfigs.SSL_CONTEXT_PROVIDER_CLASS_CONFIG, SslConfigs.DEFAULT_SSL_CONTEXT_PROVIDER_CLASS); - configs.put(SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_CONFIG, false); } @Test diff --git a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java index d7a35700d2151..2b56622a20576 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java @@ -184,8 +184,6 @@ public static Map createSslConfig(String keyManagerAlgorithm, St enabledProtocols.add(tlsProtocol); sslConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, enabledProtocols); sslConfigs.put(SslConfigs.SSL_CONTEXT_PROVIDER_CLASS_CONFIG, SslConfigs.DEFAULT_SSL_CONTEXT_PROVIDER_CLASS); - sslConfigs.put(SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_CONFIG, false); - sslConfigs.put(SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_DOC, false); return sslConfigs; } @@ -566,8 +564,6 @@ private Map buildJks() throws IOException, GeneralSecurityExcept } else { sslConfigs.put(SslConfigs.SSL_CONTEXT_PROVIDER_CLASS_CONFIG, SimpleSslContextProvider.class.getName()); } - sslConfigs.put(SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_CONFIG, false); - sslConfigs.put(SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_DOC, false); return sslConfigs; } @@ -593,8 +589,6 @@ private Map buildPem() throws IOException, GeneralSecurityExcept sslConfigs.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, keyPassword); sslConfigs.put(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, certPem); } - sslConfigs.put(SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_CONFIG, false); - sslConfigs.put(SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_DOC, false); return sslConfigs; } } diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 0e2fab85d49ed..4570554972e33 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -2052,7 +2052,6 @@ class Log(@volatile private var _dir: File, ) } // FIXME: this code path involves not only data plane segments but also KRaft metadata logs. Should find a way to distinguish after moving to KRaft. - // XXX: An internal dashboard depends on parsing this warn log line. Get SRE reviews before changing the format. warn(s"Attempted truncating to offset $targetOffset. Resulted in truncated to $offsetTruncatedTo from the original log end offset $originalLogEndOffset, " + s"with $messagesTruncated messages and $bytesTruncated bytes truncated") diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 591fffbb5f03e..c6b8eb5a0e646 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -314,15 +314,6 @@ object RequestChannel extends Logging { def updateRequestMetrics(networkThreadTimeNanos: Long, response: Response): Unit = { endTimeNanos = Time.SYSTEM.nanoseconds - val requestQueueTimeMs = nanosToMs(requestDequeueTimeNanos - startTimeNanos) - val apiLocalTimeMs = nanosToMs(apiLocalCompleteTimeNanos - requestDequeueTimeNanos) - val apiRemoteTimeMs = nanosToMs(responseCompleteTimeNanos - apiLocalCompleteTimeNanos) - val responseQueueTimeMs = nanosToMs(responseDequeueTimeNanos - responseCompleteTimeNanos) - val responseSendTimeMs = nanosToMs(endTimeNanos - responseDequeueTimeNanos) - val responseSendTimeNs = endTimeNanos - responseDequeueTimeNanos - val messageConversionsTimeMs = nanosToMs(messageConversionsTimeNanos) - val totalTimeMs = nanosToMs(endTimeNanos - startTimeNanos) - val fetchMetricNames = if (header.apiKey == ApiKeys.FETCH) { val isFromFollower = body[FetchRequest].isFromFollower @@ -349,7 +340,6 @@ object RequestChannel extends Logging { m.throttleTimeHist.update(apiThrottleTimeMs) m.responseQueueTimeHist.update(Math.round(responseQueueTimeMs)) m.responseSendTimeHist.update(Math.round(responseSendTimeMs)) - m.responseSendTimeNsHist.update(Math.round(responseSendTimeNs)) m.totalTimeHist.update(Math.round(totalTimeMs)) m.totalTimeBucketHist.foreach(_.update(totalTimeMs)) m.requestBytesHist.update(sizeOfBodyInBytes) @@ -690,7 +680,6 @@ object RequestMetrics { val ThrottleTimeMs = "ThrottleTimeMs" val ResponseQueueTimeMs = "ResponseQueueTimeMs" val ResponseSendTimeMs = "ResponseSendTimeMs" - val ResponseSendTimeNs = "ResponseSendTimeNs" val TotalTimeMs = "TotalTimeMs" val RequestBytes = "RequestBytes" val ResponseBytes = "ResponseBytes" @@ -745,7 +734,6 @@ class RequestMetrics(name: String, config: KafkaConfig) extends KafkaMetricsGrou val responseQueueTimeHist = newHistogram(ResponseQueueTimeMs, biased = true, tags) // time to send the response to the requester val responseSendTimeHist = newHistogram(ResponseSendTimeMs, biased = true, tags) - val responseSendTimeNsHist = newHistogram(ResponseSendTimeNs, biased = true, tags) val totalTimeHist = newHistogram(TotalTimeMs, biased = true, tags) // request size in bytes val requestBytesHist = newHistogram(RequestBytes, biased = true, tags) @@ -855,7 +843,6 @@ class RequestMetrics(name: String, config: KafkaConfig) extends KafkaMetricsGrou removeMetric(ResponseQueueTimeMs, tags) removeMetric(TotalTimeMs, tags) removeMetric(ResponseSendTimeMs, tags) - removeMetric(ResponseSendTimeNs, tags) removeMetric(RequestBytes, tags) removeMetric(ResponseBytes, tags) if (name == ApiKeys.FETCH.name || name == ApiKeys.PRODUCE.name) { diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index 602a23ff04d61..bbb5b72b86aa5 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -677,14 +677,15 @@ class DynamicLogConfig(logManager: LogManager, server: KafkaBroker) extends Brok val currentLogConfig = logManager.currentDefaultConfig val origUncleanLeaderElectionEnable = logManager.currentDefaultConfig.uncleanLeaderElectionEnable val newBrokerDefaults = new util.HashMap[String, Object](currentLogConfig.originals) - newConfig.valuesFromThisConfig.forEach { (k, v) => - if (DynamicLogConfig.ReconfigurableConfigs.contains(k)) { - DynamicLogConfig.KafkaConfigToLogConfigName.get(k).foreach { configName => - if (v == null) - newBrokerDefaults.remove(configName) - else - newBrokerDefaults.put(configName, v.asInstanceOf[AnyRef]) - } + // Call extractLogConfigMap to get the new LogConfig because there are some special handling for some configs, e.g., + // RetentionMsProp. This is to ensure that the new LogConfig is consistent with LogManager's behavior. + val newLogConfig = LogConfig.extractLogConfigMap(newConfig) + newLogConfig.forEach { (k, v) => + if (LogConfig.TopicConfigSynonyms.contains(k) && DynamicLogConfig.ReconfigurableConfigs.contains(LogConfig.TopicConfigSynonyms(k))) { + if (v == null) { + newBrokerDefaults.remove(k) + } else + newBrokerDefaults.put(k, v.asInstanceOf[AnyRef]) } } @@ -872,7 +873,6 @@ object DynamicListenerConfig { KafkaConfig.SslProviderProp, KafkaConfig.SslCipherSuitesProp, KafkaConfig.SslEnabledProtocolsProp, - KafkaConfig.SslKernelOffloadEnableProp, KafkaConfig.SslKeystoreTypeProp, KafkaConfig.SslKeystoreLocationProp, KafkaConfig.SslKeystorePasswordProp, diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index eeea8cf750bd8..3ec1dab925eb8 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -280,7 +280,6 @@ object Defaults { val SslProtocol = SslConfigs.DEFAULT_SSL_PROTOCOL val SslContextProviderClass = SslConfigs.DEFAULT_SSL_CONTEXT_PROVIDER_CLASS val SslEnabledProtocols = SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS - val SslKernelOffloadEnable = SslConfigs.DEFAULT_SSL_KERNEL_OFFLOAD_ENABLE val SslKeystoreType = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE val SslTruststoreType = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE val SslKeyManagerAlgorithm = SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM @@ -660,7 +659,6 @@ object KafkaConfig { val SslCipherSuitesProp = SslConfigs.SSL_CIPHER_SUITES_CONFIG val SslEnabledProtocolsProp = SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG val SslKeystoreTypeProp = SslConfigs.SSL_KEYSTORE_TYPE_CONFIG - val SslKernelOffloadEnableProp = SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_CONFIG val SslKeystoreLocationProp = SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG val SslKeystorePasswordProp = SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG val SslKeyPasswordProp = SslConfigs.SSL_KEY_PASSWORD_CONFIG @@ -1126,7 +1124,6 @@ object KafkaConfig { val SslContextProviderClassDoc = SslConfigs.SSL_CONTEXT_PROVIDER_CLASS_DOC val SslCipherSuitesDoc = SslConfigs.SSL_CIPHER_SUITES_DOC val SslEnabledProtocolsDoc = SslConfigs.SSL_ENABLED_PROTOCOLS_DOC - val SslKernelOffloadEnableDoc = SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_DOC val SslKeystoreTypeDoc = SslConfigs.SSL_KEYSTORE_TYPE_DOC val SslKeystoreLocationDoc = SslConfigs.SSL_KEYSTORE_LOCATION_DOC val SslKeystorePasswordDoc = SslConfigs.SSL_KEYSTORE_PASSWORD_DOC @@ -1466,7 +1463,6 @@ object KafkaConfig { .define(PrincipalBuilderClassProp, CLASS, Defaults.DefaultPrincipalSerde, MEDIUM, PrincipalBuilderClassDoc) .define(SslProtocolProp, STRING, Defaults.SslProtocol, MEDIUM, SslProtocolDoc) .define(SslProviderProp, STRING, null, MEDIUM, SslProviderDoc) - .define(SslKernelOffloadEnableProp, BOOLEAN, Defaults.SslKernelOffloadEnable, MEDIUM, SslKernelOffloadEnableDoc) .define(SslContextProviderClassProp, STRING, Defaults.SslContextProviderClass, MEDIUM, SslContextProviderClassDoc) .define(SslEnabledProtocolsProp, LIST, Defaults.SslEnabledProtocols, MEDIUM, SslEnabledProtocolsDoc) .define(SslKeystoreTypeProp, STRING, Defaults.SslKeystoreType, MEDIUM, SslKeystoreTypeDoc) diff --git a/core/src/main/scala/kafka/tools/KernelTLSBenchmark.scala b/core/src/main/scala/kafka/tools/KernelTLSBenchmark.scala deleted file mode 100644 index b2a71970608cd..0000000000000 --- a/core/src/main/scala/kafka/tools/KernelTLSBenchmark.scala +++ /dev/null @@ -1,270 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package kafka.tools - -import com.typesafe.scalalogging.LazyLogging -import joptsimple.OptionException -import kafka.utils.{CommandDefaultOptions, CommandLineUtils} -import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, AlterConfigOp, ConfigEntry} -import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} -import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.config.{ConfigResource, SslConfigs} -import org.apache.kafka.common.serialization.ByteArrayDeserializer -import org.apache.kafka.common.utils.Utils - -import java.time.Duration -import java.util -import java.util.concurrent.CountDownLatch -import java.util.concurrent.atomic.AtomicLong -import java.util.{Properties, UUID} -import scala.collection.JavaConverters._ - -object KernelTLSBenchmark extends LazyLogging { - - def main(args: Array[String]): Unit = { - val config = new KernelTLSBenchmarkConfig(args) - println("Warming up page cache...") - - val adminProps = filterProps(config.props, AdminClientConfig.configNames) - val adminClient = AdminClient.create(adminProps) - val (partitionCount, leaderIds) = getPartitions(config.topic, adminClient) - - config.partitions.foreach(partitions => { - if (partitions > partitionCount) { - throw new IllegalArgumentException( - s"Number of partitions of topic ${config.topic} found to " + - s"be ${partitionCount}, which is less than $partitions") - } - }) - - val partitionsToConsume: Int = config.partitions match { - case Some(p) => p - case None => partitionCount - } - - runConsume(print = false, 1, partitionsToConsume, config) - val withDisabled = multipleRuns( - print = true, kernelOffloadEnabled = false, adminClient, partitionsToConsume, leaderIds, config) - val withEnabled = multipleRuns( - print = true, kernelOffloadEnabled = true, adminClient, partitionsToConsume, leaderIds, config) - val gainPercentage = 100.0 * (withEnabled - withDisabled) / withDisabled - println("Throughput gain percentage = %.2f%%".format(gainPercentage)) - } - - private def filterProps(in: Properties, allowedKeys: util.Set[String]): Properties = { - val out = new Properties() - val map = in.asScala - .filter(entry => allowedKeys.contains(entry._1)) - .asJava - out.putAll(map) - out - } - - private def getPartitions(topicName: String, adminClient: AdminClient): (Int, Set[Int]) = { - val result = adminClient.describeTopics(Seq(topicName).asJava).all().get() - val partitionCount = result.get(topicName).partitions().size() - val leaderIds = result.get(topicName).partitions().asScala - .map(tpInfo => tpInfo.leader().id()).toSet - (partitionCount, leaderIds) - } - - private def setKernelTlsConfig( - kernelOffloadEnabled: Boolean, adminClient: AdminClient, - brokerIds: Iterable[Int], sslListenerName: String): Unit = { - val configKey = s"listener.name.${sslListenerName.toLowerCase}.${SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_CONFIG}" - val configValue = if (kernelOffloadEnabled) "true" else "false" - val configEntry = new ConfigEntry(configKey, configValue) - - val configMap = new util.HashMap[ConfigResource, util.Collection[AlterConfigOp]] - - brokerIds.foreach(brokerId => { - val configResource = new ConfigResource(ConfigResource.Type.BROKER, brokerId.toString) - configMap.put(configResource, Seq(new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET)).asJava) - }) - - val result = adminClient.incrementalAlterConfigs(configMap) - result.all().get() - } - - private def multipleRuns( - print: Boolean, kernelOffloadEnabled: Boolean, adminClient: AdminClient, - partitionsToConsume: Int, brokerIds: Iterable[Int], config: KernelTLSBenchmarkConfig): Double = { - setKernelTlsConfig(kernelOffloadEnabled, adminClient, brokerIds, config.sslListenerName) - Thread.sleep(10 * 1000) - val enableStr = if (kernelOffloadEnabled) "enabled" else "disabled" - if (print) { - println(s"Consuming with KTLS $enableStr") - } - var totalBytesRead: Long = 0 - var totalElapsedMillis: Long = 0 - for (runIndex <- 1 to config.numRuns) { - val (runBytesRead: Long, runElapsedMillis: Long) = runConsume(print, runIndex, partitionsToConsume, config) - totalBytesRead += runBytesRead - totalElapsedMillis += runElapsedMillis - } - val totalMB = totalBytesRead * 1.0 / (1024 * 1024) - val totalSec = totalElapsedMillis / 1000.0 - val totalMBPerSec = totalMB / totalSec - if (print) { - println("Total throughput with KTLS %s = %.2f MB/s, time elapsed = %d ms" - .format(enableStr, totalMBPerSec, totalElapsedMillis)) - } - totalMBPerSec - } - - private def runConsume(print: Boolean, runIndex: Int, partitionsToConsume: Int, config: KernelTLSBenchmarkConfig): (Long, Long) = { - val groupId = UUID.randomUUID.toString - val props = filterProps(config.props, ConsumerConfig.configNames) - props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId) - - val totalRecordsRead = new AtomicLong(0) - val totalBytesRead = new AtomicLong(0) - - var startMs, endMs = 0L - - val countDownLatch = new CountDownLatch(partitionsToConsume) - - if (print) { - printf(s"[Run $runIndex] Fetching records...") - } - startMs = System.currentTimeMillis - for (partition <- 0 to partitionsToConsume - 1) { - val runnable = new ConsumeRunnable( - config.topic, partition, props, config, countDownLatch, totalRecordsRead, totalBytesRead) - val thread = new Thread(runnable, "consumer-" + partition.toString) - thread.start() - } - - countDownLatch.await() - endMs = System.currentTimeMillis - - val elapsedMillis = endMs - startMs - val elapsedSecs = elapsedMillis / 1000.0 - - val totalMBRead = (totalBytesRead.get * 1.0) / (1024 * 1024) - val mbRate: Double = totalMBRead / elapsedSecs - val messageRate = totalRecordsRead.get / elapsedSecs - - if (print) { - println(" Throughput = %.2f MB/s".format(mbRate)) - } - return (totalBytesRead.get, elapsedMillis) - } - - class ConsumeRunnable( - topic: String, partition: Int, props: Properties, config: KernelTLSBenchmarkConfig, countDownLatch: CountDownLatch, - totalRecordsRead: AtomicLong, totalBytesRead: AtomicLong) extends Runnable { - override def run(): Unit = { - val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](props) - consumer.assign(Seq(new TopicPartition(topic, partition)).asJava) - - // Now start the benchmark - var currentTimeMillis = System.currentTimeMillis - var lastConsumedTime = currentTimeMillis - - var tot: Long = 0 - while (totalRecordsRead.get < config.numRecords && currentTimeMillis - lastConsumedTime <= config.timeoutMs) { - val records = consumer.poll(Duration.ofMillis(100)).asScala - currentTimeMillis = System.currentTimeMillis - if (records.nonEmpty) - lastConsumedTime = currentTimeMillis - var bytesRead = 0L - var recordsRead = 0L - for (record <- records) { - recordsRead += 1 - if (record.key != null) - bytesRead += record.key.length - if (record.value != null) - bytesRead += record.value.length - } - totalRecordsRead.addAndGet(recordsRead) - totalBytesRead.addAndGet(bytesRead) - tot += recordsRead - } - - if (totalRecordsRead.get() < config.numRecords) { - println(s"WARNING: Exiting before consuming the expected number of records: timeout (${config.timeoutMs} ms) exceeded. ") - } - consumer.close() - countDownLatch.countDown() - } - } - - class KernelTLSBenchmarkConfig(args: Array[String]) extends CommandDefaultOptions(args) { - val consumerConfigOpt = parser.accepts("consumer-config", "Consumer config properties file.") - .withRequiredArg - .describedAs("config file") - .ofType(classOf[String]) - val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.") - .withRequiredArg - .describedAs("topic") - .ofType(classOf[String]) - val numRecordsOpt = parser.accepts("records", "REQUIRED: The number of records to consume") - .withRequiredArg - .describedAs("count") - .ofType(classOf[java.lang.Long]) - val partitionsOpt = parser.accepts("partitions", "REQUIRED: The number of partitions from which to consume") - .withRequiredArg - .describedAs("partitions") - .ofType(classOf[java.lang.Integer]) - val numRunsOpt = parser.accepts("runs", "Number of runs to perform during the benchmark.") - .withRequiredArg - .describedAs("runs") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1) - val sslListenerNameOpt = parser.accepts("ssl-listener-name", - "The name of the SSL listener as configured in Kafka broker config.") - .withRequiredArg - .describedAs("ssl listener name") - .ofType(classOf[String]) - .defaultsTo("SSL") - - try - options = parser.parse(args: _*) - catch { - case e: OptionException => - CommandLineUtils.printUsageAndDie(parser, e.getMessage) - } - - CommandLineUtils.printHelpAndExitIfNeeded(this, "This tool helps in performance test for the full zookeeper consumer") - - CommandLineUtils.checkRequiredArgs(parser, options, - consumerConfigOpt, topicOpt, numRecordsOpt, numRunsOpt) - - val props: Properties = Utils.loadProps(options.valueOf(consumerConfigOpt)) - - import org.apache.kafka.clients.consumer.ConsumerConfig - - // props.put(ConsumerConfig.GROUP_ID_CONFIG, options.valueOf(groupIdOpt)) - // props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, options.valueOf(socketBufferSizeOpt).toString) - // props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, options.valueOf(fetchSizeOpt).toString) - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer].getName) - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer].getName) - props.put(ConsumerConfig.CHECK_CRCS_CONFIG, "false") - props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") - - val topic = options.valueOf(topicOpt) - val numRecords = options.valueOf(numRecordsOpt).longValue - val numRuns = options.valueOf(numRunsOpt).intValue - val sslListenerName = options.valueOf(sslListenerNameOpt) - val timeoutMs = 10 * 1000 - val partitions : Option[Int] = if (options.has(partitionsOpt)) Some(options.valueOf(partitionsOpt).intValue()) else None - } -} diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index 2177b796f6361..bdf1669ab9405 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -102,6 +102,10 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet @BeforeEach override def setUp(): Unit = { + doSetUp(true) + } + + private def doSetUp(setLogRetentionTimeMillis: Boolean): Unit = { startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism))) super.setUp() @@ -123,8 +127,10 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet props.put(KafkaConfig.NumReplicaFetchersProp, "2") // greater than one to test reducing threads props.put(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, "10000000") // non-default value to trigger a new metric props.put(KafkaConfig.PasswordEncoderSecretProp, "dynamic-config-secret") - props.put(KafkaConfig.LogRetentionTimeMillisProp, 1680000000.toString) - props.put(KafkaConfig.LogRetentionTimeHoursProp, 168.toString) + if (setLogRetentionTimeMillis) { + props.put(KafkaConfig.LogRetentionTimeMillisProp, 1680000000.toString) + } + props.put(KafkaConfig.LogRetentionTimeHoursProp, 137.toString) addExtraProps(props) props ++= sslProperties1 @@ -232,7 +238,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet // Verify a few log configs with and without synonyms val expectedProps = new Properties expectedProps.setProperty(KafkaConfig.LogRetentionTimeMillisProp, "1680000000") - expectedProps.setProperty(KafkaConfig.LogRetentionTimeHoursProp, "168") + expectedProps.setProperty(KafkaConfig.LogRetentionTimeHoursProp, "137") expectedProps.setProperty(KafkaConfig.LogRollTimeHoursProp, "168") expectedProps.setProperty(KafkaConfig.LogCleanerThreadsProp, "1") val logRetentionMs = configEntry(configDesc, KafkaConfig.LogRetentionTimeMillisProp) @@ -678,6 +684,114 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet } } + @Test + def testLogRetentionConfig(): Unit = { + // Verify that only setting LogRetentionTimeHoursProp without setting LogRetentionTimeMillisProp, the log retention is updated correctly + // after dynamic reconfigure. + + tearDown() + adminClients.clear() + servers.clear() + // Create servers without setting LogRetentionTimeMillisProp + doSetUp(false) + + val (producerThread, consumerThread) = startProduceConsume(retries = 0) + + val props = new Properties + props.put(KafkaConfig.MessageMaxBytesProp, "99800") + + reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.MessageMaxBytesProp, "99800")) + + // Verify that all broker defaults have been updated + servers.foreach { server => + props.forEach { (k, v) => + assertEquals(server.config.originals.get(k).toString, v, s"Not reconfigured $k") + } + } + + // Verify that configs of existing logs have been updated + var newLogConfig = LogConfig(LogConfig.extractLogConfigMap(servers.head.config)) + newLogConfig.values().forEach((k, v) => TestUtils.waitUntilTrue(() => servers.head.logManager.currentDefaultConfig.values().get(k) == v, + k + " not updated in LogManager. expected: value=" + v + " actual: value=" + servers.head.logManager.currentDefaultConfig.values().get(k))) + + // Verify that retentionMs is set with the value of LogRetentionTimeHoursProp + assertEquals(137 * 3600000, servers.head.logManager.currentDefaultConfig.retentionMs); + + val log = servers.head.logManager.getLog(new TopicPartition(topic, 0)).getOrElse(throw new IllegalStateException("Log not found")) + TestUtils.waitUntilTrue(() => log.config.maxMessageSize == 99800, "Existing topic config using defaults not updated") + TestUtils.waitUntilTrue(() => log.config.retentionMs == 137 * 3600000, "retenionMs is updated incorrectly"); + + props.asScala.foreach { case (k, v) => + val logConfigName = DynamicLogConfig.KafkaConfigToLogConfigName(k) + val expectedValue = if (k == KafkaConfig.LogCleanupPolicyProp) s"[$v]" else v + assertEquals(expectedValue, log.config.originals.get(logConfigName).toString, + s"Not reconfigured $logConfigName for existing log") + } + + // Verify that overridden topic configs are not updated when broker default is updated + val log2 = servers.head.logManager.getLog(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)) + .getOrElse(throw new IllegalStateException("Log not found")) + assertFalse(log2.config.delete, "Overridden clean up policy should not be updated") + assertEquals(ProducerCompressionCodec.name, log2.config.compressionType) + + // Verify that even though broker defaults can be defined at default cluster level for consistent + // configuration across brokers, they can also be defined at per-broker level for testing + props.clear() + props.put(KafkaConfig.LogIndexSizeMaxBytesProp, "500000") + props.put(KafkaConfig.LogRetentionTimeMillisProp, TimeUnit.DAYS.toMillis(2).toString) + alterConfigsOnServer(servers.head, props) + assertEquals(500000, servers.head.config.values.get(KafkaConfig.LogIndexSizeMaxBytesProp)) + assertEquals(TimeUnit.DAYS.toMillis(2), servers.head.config.values.get(KafkaConfig.LogRetentionTimeMillisProp)) + servers.tail.foreach { server => + assertEquals(Defaults.LogIndexSizeMaxBytes, server.config.values.get(KafkaConfig.LogIndexSizeMaxBytesProp)) + assertEquals(null, server.config.values.get(KafkaConfig.LogRetentionTimeMillisProp)) + } + assertEquals(TimeUnit.DAYS.toMillis(2), servers.head.logManager.currentDefaultConfig.retentionMs) + + // Verify that produce/consume worked throughout this test without any retries in producer + stopAndVerifyProduceConsume(producerThread, consumerThread) + + // Verify that configuration at both per-broker level and default cluster level could be deleted and + // the default value should be restored + props.clear() + props.put(KafkaConfig.LogRetentionTimeMillisProp, "") + props.put(KafkaConfig.LogIndexSizeMaxBytesProp, "") + TestUtils.incrementalAlterConfigs(servers.take(1), adminClients.head, props, perBrokerConfig = true, opType = OpType.DELETE).all.get + TestUtils.incrementalAlterConfigs(servers, adminClients.head, props, perBrokerConfig = false, opType = OpType.DELETE).all.get + servers.foreach { server => + waitForConfigOnServer(server, KafkaConfig.LogRetentionTimeMillisProp, null) + } + servers.foreach { server => assertEquals(137 * 3600000, server.logManager.currentDefaultConfig.retentionMs)} + + servers.foreach { server => + val log = server.logManager.getLog(new TopicPartition(topic, 0)).getOrElse(throw new IllegalStateException("Log not found")) + // Verify default values for these two configurations are restored on all brokers + TestUtils.waitUntilTrue(() => log.config.maxIndexSize == Defaults.LogIndexSizeMaxBytes && log.config.retentionMs == 137 * 3600000, + "Existing topic config using defaults not updated") + } + + props.clear() + props.put(KafkaConfig.MessageMaxBytesProp, "") + TestUtils.incrementalAlterConfigs(servers, adminClients.head, props, perBrokerConfig = false, opType = OpType.DELETE).all.get + servers.foreach { server => + waitForConfigOnServer(server, KafkaConfig.MessageMaxBytesProp, null) + } + + servers.foreach { server => assertEquals(137 * 3600000, server.logManager.currentDefaultConfig.retentionMs)} + + // Verify that configs of existing logs have been updated + newLogConfig = LogConfig(LogConfig.extractLogConfigMap(servers.head.config)) + newLogConfig.values().forEach((k, v) => TestUtils.waitUntilTrue(() => servers.head.logManager.currentDefaultConfig.values().get(k) == v, + k + " not updated in LogManager. expected: value=" + v + " actual: value=" + servers.head.logManager.currentDefaultConfig.values().get(k))) + + servers.foreach { server => + val log = server.logManager.getLog(new TopicPartition(topic, 0)).getOrElse(throw new IllegalStateException("Log not found")) + // Verify default values for these two configurations are restored on all brokers + TestUtils.waitUntilTrue(() => log.config.maxMessageSize == Defaults.MessageMaxBytes && log.config.retentionMs == 137 * 3600000, + "Existing topic config using defaults not updated") + } + } + @Test def testUncleanLeaderElectionEnable(): Unit = { val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 999b8dee9d4ae..59b4cf1efd2da 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -101,7 +101,6 @@ versions += [ kafka_26: "2.6.2", kafka_27: "2.7.1", kafka_28: "2.8.0", - ktls: "0.0.3", lz4: "1.8.0", mavenArtifact: "3.8.4", metrics: "2.2.0", @@ -177,7 +176,6 @@ libs += [ kafkaStreams_26: "org.apache.kafka:kafka-streams:$versions.kafka_26", kafkaStreams_27: "org.apache.kafka:kafka-streams:$versions.kafka_27", kafkaStreams_28: "org.apache.kafka:kafka-streams:$versions.kafka_28", - ktls: "com.linkedin.ktls:ktls-jni:$versions.ktls", log4j: "log4j:log4j:$versions.log4j", lz4: "org.lz4:lz4-java:$versions.lz4", metrics: "com.yammer.metrics:metrics-core:$versions.metrics",