Skip to content

Commit

Permalink
Fix LogConfig during dynamic update
Browse files Browse the repository at this point in the history
  • Loading branch information
hshi2022 committed Mar 13, 2024
1 parent e72c1bf commit df02548
Show file tree
Hide file tree
Showing 20 changed files with 140 additions and 465 deletions.
20 changes: 0 additions & 20 deletions bin/kafka-kernel-tls-perf-test.sh

This file was deleted.

4 changes: 0 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,8 @@ spotless {
allprojects {

repositories {
mavenLocal()
mavenCentral()
maven { url "https://linkedin.jfrog.io/artifactory/zookeeper" }
maven { url "https://linkedin.jfrog.io/artifactory/ktls-jni" }

}

dependencyUpdates {
Expand Down Expand Up @@ -1247,7 +1244,6 @@ project(':clients') {
implementation libs.snappy
implementation libs.slf4jApi
implementation libs.conscrypt
implementation libs.ktls

compileOnly libs.jacksonDatabind // for SASL/OAUTHBEARER bearer token parsing
compileOnly libs.jacksonJDK8Datatypes
Expand Down
1 change: 0 additions & 1 deletion checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
<allow pkg="org.apache.kafka.common.internals" exact-match="true" />
<allow pkg="org.apache.kafka.test" />
<allow pkg="org.conscrypt" />
<allow pkg="com.linkedin.ktls"/>

<subpackage name="acl">
<allow pkg="org.apache.kafka.common.annotation" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,6 @@ public class SslConfigs {
public static final String SSL_PROVIDER_CONFIG = "ssl.provider";
public static final String SSL_PROVIDER_DOC = "The name of the security provider used for SSL connections. Default value is the default security provider of the JVM.";

public static final String SSL_KERNEL_OFFLOAD_ENABLE_CONFIG = "ssl.kernel.offload.enable";
public static final String SSL_KERNEL_OFFLOAD_ENABLE_DOC = "ssl.kernel.offload.enable";
public static final boolean DEFAULT_SSL_KERNEL_OFFLOAD_ENABLE = false;

public static final String SSL_CIPHER_SUITES_CONFIG = "ssl.cipher.suites";
public static final String SSL_CIPHER_SUITES_DOC = "A list of cipher suites. This is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. "
+ "By default all the available cipher suites are supported.";
Expand Down Expand Up @@ -144,7 +140,6 @@ public static void addClientSslSupport(ConfigDef config) {
config.define(SslConfigs.SSL_PROTOCOL_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_PROTOCOL, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_PROTOCOL_DOC)
.define(SslConfigs.SSL_CONTEXT_PROVIDER_CLASS_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_CONTEXT_PROVIDER_CLASS, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_CONTEXT_PROVIDER_CLASS_DOC)
.define(SslConfigs.SSL_PROVIDER_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_PROVIDER_DOC)
.define(SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_CONFIG, ConfigDef.Type.BOOLEAN, SslConfigs.DEFAULT_SSL_KERNEL_OFFLOAD_ENABLE, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_DOC)
.define(SslConfigs.SSL_CIPHER_SUITES_CONFIG, ConfigDef.Type.LIST, null, ConfigDef.Importance.LOW, SslConfigs.SSL_CIPHER_SUITES_DOC)
.define(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, ConfigDef.Type.LIST, SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_ENABLED_PROTOCOLS_DOC)
.define(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_KEYSTORE_TYPE_DOC)
Expand All @@ -165,7 +160,6 @@ public static void addClientSslSupport(ConfigDef config) {
}

public static final Set<String> RECONFIGURABLE_CONFIGS = Utils.mkSet(
SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_CONFIG,
SslConfigs.SSL_KEYSTORE_TYPE_CONFIG,
SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ protected TransportLayer buildTransportLayer(String id, SelectionKey key, Socket
if (this.securityProtocol == SecurityProtocol.SASL_SSL) {
return SslTransportLayer.create(id, key,
sslFactory.createSslEngine(socketChannel.socket()),
metadataRegistry, false);
metadataRegistry);
} else {
return new PlaintextTransportLayer(key);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ public class SslChannelBuilder implements ChannelBuilder, ListenerReconfigurable
private final boolean isInterBrokerListener;
private SslFactory sslFactory;
private Mode mode;
private boolean isKernelOffloadEnabled;
private Map<String, ?> configs;
private SslPrincipalMapper sslPrincipalMapper;
private final Logger log;
Expand All @@ -72,7 +71,6 @@ public void configure(Map<String, ?> configs) throws KafkaException {
sslPrincipalMapper = SslPrincipalMapper.fromRules(sslPrincipalMappingRules);
this.sslFactory = new SslFactory(mode, null, isInterBrokerListener);
this.sslFactory.configure(this.configs);
this.isKernelOffloadEnabled = (Boolean) configs.get(SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_CONFIG);
} catch (KafkaException e) {
throw e;
} catch (Exception e) {
Expand All @@ -92,7 +90,6 @@ public void validateReconfiguration(Map<String, ?> configs) {

@Override
public void reconfigure(Map<String, ?> configs) {
isKernelOffloadEnabled = (Boolean) configs.get(SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_CONFIG);
sslFactory.reconfigure(configs);
}

Expand Down Expand Up @@ -121,14 +118,10 @@ public void close() {
if (sslFactory != null) sslFactory.close();
}

private boolean shouldEnableKernelOffload() {
return isKernelOffloadEnabled && mode == Mode.SERVER;
}

protected SslTransportLayer buildTransportLayer(SslFactory sslFactory, String id, SelectionKey key, ChannelMetadataRegistry metadataRegistry) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
return SslTransportLayer.create(id, key, sslFactory.createSslEngine(socketChannel.socket()),
metadataRegistry, shouldEnableKernelOffload());
metadataRegistry);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package org.apache.kafka.common.network;

import com.linkedin.ktls.KTLSEnableFailedException;
import com.linkedin.ktls.KernelTls;
import java.io.IOException;
import java.io.EOFException;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -81,8 +79,6 @@ private enum State {
private final SocketChannel socketChannel;
private final ChannelMetadataRegistry metadataRegistry;
private final Logger log;
private final KernelTls kernelTLS;
private boolean shouldAttemptKtls;

private HandshakeStatus handshakeStatus;
private SSLEngineResult handshakeResult;
Expand All @@ -93,31 +89,24 @@ private enum State {
private ByteBuffer appReadBuffer;
private ByteBuffer fileChannelBuffer;
private boolean hasBytesBuffered;
private boolean ktlsAttempted;
private boolean ktlsEnabled;

public static SslTransportLayer create(String channelId, SelectionKey key, SSLEngine sslEngine,
ChannelMetadataRegistry metadataRegistry, boolean shouldAttemptKtls)
throws IOException {
return new SslTransportLayer(channelId, key, sslEngine, metadataRegistry, shouldAttemptKtls);
ChannelMetadataRegistry metadataRegistry) throws IOException {
return new SslTransportLayer(channelId, key, sslEngine, metadataRegistry);
}

// Prefer `create`, only use this in tests
SslTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine, ChannelMetadataRegistry metadataRegistry,
boolean shouldAttemptKtls) {
SslTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine,
ChannelMetadataRegistry metadataRegistry) {
this.channelId = channelId;
this.key = key;
this.socketChannel = (SocketChannel) key.channel();
this.sslEngine = sslEngine;
this.shouldAttemptKtls = shouldAttemptKtls;
this.kernelTLS = new KernelTls();
this.state = State.NOT_INITIALIZED;
this.metadataRegistry = metadataRegistry;

final LogContext logContext = new LogContext("[SslTransportLayer channelId=" + channelId + " key=" + key + "] ");
final LogContext logContext = new LogContext(String.format("[SslTransportLayer channelId=%s key=%s] ", channelId, key));
this.log = logContext.logger(getClass());

log.debug("New SSL channel created with kernel offload turned {}", shouldAttemptKtls ? "on" : "off");
}

// Visible for testing
Expand Down Expand Up @@ -191,10 +180,6 @@ public void close() throws IOException {
state = State.CLOSING;
sslEngine.closeOutbound();
try {
if (ktlsEnabled) {
kernelTLS.closeNotify(socketChannel);
return;
}
if (prevState != State.NOT_INITIALIZED && isConnected()) {
if (!flush(netWriteBuffer)) {
throw new IOException("Remaining data in the network buffer, can't send SSL close message.");
Expand Down Expand Up @@ -422,7 +407,7 @@ private void doHandshake() throws IOException {
handshakeFinished();
break;
default:
throw new IllegalStateException("Unexpected status [" + handshakeStatus + "]");
throw new IllegalStateException(String.format("Unexpected status [%s]", handshakeStatus));
}
}

Expand Down Expand Up @@ -710,12 +695,6 @@ public int write(ByteBuffer src) throws IOException {
throw closingException();
if (!ready())
return 0;
if (shouldAttemptKtls && !ktlsAttempted) {
attemptToEnableKernelTls();
}
if (ktlsEnabled) {
return writeKernelTLS(src);
}

int written = 0;
while (flush(netWriteBuffer) && src.hasRemaining()) {
Expand Down Expand Up @@ -745,24 +724,6 @@ public int write(ByteBuffer src) throws IOException {
return written;
}

private int writeKernelTLS(ByteBuffer src) throws IOException {
log.trace("Writing with Kernel TLS enabled");
return socketChannel.write(src);
}

private void attemptToEnableKernelTls() {
try {
kernelTLS.enableKernelTlsForSend(sslEngine, socketChannel);
log.debug("Kernel TLS enabled on socket on channel {}", channelId);
ktlsEnabled = true;
} catch (KTLSEnableFailedException e) {
log.warn("Attempt to enable KTLS failed with exception, falling back to userspace encryption", e);
ktlsEnabled = false;
} finally {
ktlsAttempted = true;
}
}

/**
* Writes a sequence of bytes to this channel from the subsequence of the given buffers.
*
Expand All @@ -774,12 +735,6 @@ private void attemptToEnableKernelTls() {
*/
@Override
public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
if (shouldAttemptKtls && !ktlsAttempted) {
attemptToEnableKernelTls();
}
if (ktlsEnabled) {
return writeKernelTLS(srcs, offset, length);
}
if ((offset < 0) || (length < 0) || (offset > srcs.length - length))
throw new IndexOutOfBoundsException();
int totalWritten = 0;
Expand All @@ -802,11 +757,6 @@ public long write(ByteBuffer[] srcs, int offset, int length) throws IOException
return totalWritten;
}

private long writeKernelTLS(ByteBuffer[] srcs, int offset, int length) throws IOException {
log.trace("Writing with Kernel TLS enabled");
return socketChannel.write(srcs, offset, length);
}

/**
* Writes a sequence of bytes to this channel from the given buffers.
*
Expand Down Expand Up @@ -1004,12 +954,6 @@ public long transferFrom(FileChannel fileChannel, long position, long count) thr
throw closingException();
if (state != State.READY)
return 0;
if (shouldAttemptKtls && !ktlsAttempted) {
attemptToEnableKernelTls();
}
if (ktlsEnabled) {
return transferFromWithKernelTLS(fileChannel, position, count);
}

if (!flush(netWriteBuffer))
return 0;
Expand Down Expand Up @@ -1065,10 +1009,4 @@ public long transferFrom(FileChannel fileChannel, long position, long count) thr
throw e;
}
}

private long transferFromWithKernelTLS(
FileChannel fileChannel, long position, long count) throws IOException {
log.trace("Transferring from file with Kernel TLS enabled");
return fileChannel.transferTo(position, count, socketChannel);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.kafka.common.security.ssl;

import com.linkedin.ktls.KernelTls;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.SslClientAuth;
import org.apache.kafka.common.config.SslConfigs;
Expand Down Expand Up @@ -75,8 +74,6 @@ public final class DefaultSslEngineFactory implements SslEngineFactory {
private static final Logger log = LoggerFactory.getLogger(DefaultSslEngineFactory.class);
public static final String PEM_TYPE = "PEM";

private final KernelTls kernelTls = new KernelTls();

private Map<String, ?> configs;
private SslContextProvider sslContextProvider;
private String kmfAlgorithm;
Expand All @@ -88,8 +85,6 @@ public final class DefaultSslEngineFactory implements SslEngineFactory {
private SecureRandom secureRandomImplementation;
private SSLContext sslContext;
private SslClientAuth sslClientAuth;
private boolean isKernelOffloadEnabled;
private List<String> cipherSuitesWithKernelOffload;


@Override
Expand Down Expand Up @@ -144,10 +139,6 @@ public void configure(Map<String, ?> configs) {
this.sslContextProvider.configure(configs);
SecurityUtils.addConfiguredSecurityProviders(this.configs);

this.isKernelOffloadEnabled = (Boolean) configs.get(SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_CONFIG);
if (isKernelOffloadEnabled) {
this.cipherSuitesWithKernelOffload = kernelTls.supportedCipherSuites();
}
List<String> cipherSuitesList = (List<String>) configs.get(SslConfigs.SSL_CIPHER_SUITES_CONFIG);
if (cipherSuitesList != null && !cipherSuitesList.isEmpty()) {
this.cipherSuites = cipherSuitesList.toArray(new String[0]);
Expand Down Expand Up @@ -196,27 +187,9 @@ public SSLContext sslContext() {
return this.sslContext;
}

private void maybeSetSslEngineCipherSuites(SSLEngine sslEngine) {
if (cipherSuites != null) {
sslEngine.setEnabledCipherSuites(cipherSuites);
} else if (isKernelOffloadEnabled) {
final String[] cipherSuitesToEnable = sslEngine.getEnabledCipherSuites();

final List<String> reOrderedCipherSuites = new ArrayList<>();
Arrays.stream(cipherSuitesToEnable)
.filter(cipherSuitesWithKernelOffload::contains)
.forEach(reOrderedCipherSuites::add);
Arrays.stream(cipherSuitesToEnable)
.filter(cs -> !cipherSuitesWithKernelOffload.contains(cs))
.forEach(reOrderedCipherSuites::add);

sslEngine.setEnabledCipherSuites(reOrderedCipherSuites.toArray(new String[0]));
}
}

private SSLEngine createSslEngine(Mode mode, String peerHost, int peerPort, String endpointIdentification) {
SSLEngine sslEngine = sslContext.createSSLEngine(peerHost, peerPort);
maybeSetSslEngineCipherSuites(sslEngine);
if (cipherSuites != null) sslEngine.setEnabledCipherSuites(cipherSuites);
if (enabledProtocols != null) sslEngine.setEnabledProtocols(enabledProtocols);

if (mode == Mode.SERVER) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,6 @@ public SSLEngine createSslEngine(String peerHost, int peerPort) {
}
}

public Mode getMode() {
return mode;
}

/**
* Returns host/IP address of remote host without reverse DNS lookup to be used as the host
* for creating SSL engine. This is used as a hint for session reuse strategy and also for
Expand Down Expand Up @@ -483,4 +479,4 @@ void close() {
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,6 @@ public void testConnectionWithCustomKeyManager() throws Exception {
);
sslServerConfigs.put(SecurityConfig.SECURITY_PROVIDERS_CONFIG, testProviderCreator.getClass().getName());
sslServerConfigs.put(SslConfigs.SSL_CONTEXT_PROVIDER_CLASS_CONFIG, SslConfigs.DEFAULT_SSL_CONTEXT_PROVIDER_CLASS);
sslServerConfigs.put(SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_CONFIG, false);
sslServerConfigs.put(SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_DOC, false);
EchoServer server = new EchoServer(SecurityProtocol.SSL, sslServerConfigs);
server.start();
Time time = new MockTime();
Expand Down Expand Up @@ -373,7 +371,7 @@ static class TestSslTransportLayer extends SslTransportLayer {

public TestSslTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine,
ChannelMetadataRegistry metadataRegistry) throws IOException {
super(channelId, key, sslEngine, metadataRegistry, false);
super(channelId, key, sslEngine, metadataRegistry);
transportLayers.put(channelId, this);
}

Expand Down
Loading

0 comments on commit df02548

Please sign in to comment.