Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#4167] Kafka certificate authentication #4168

Merged
merged 5 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ spec:
initialDelaySeconds: 120
periodSeconds: 10
failureThreshold: 3
{{ if .Values.global.kafkaCertAuth }}
volumeMounts:
- name: kafka-config-certs
mountPath: /opt/kafka/certs
{{ end }}
resources:
{{ toYaml .Values.components.api.communication.resources | indent 10 }}
initContainers:
Expand All @@ -62,3 +67,8 @@ spec:
- name: provisioning-scripts
configMap:
name: provisioning-scripts
{{ if .Values.global.kafkaCertAuth }}
- name: kafka-config-certs
configMap:
name: kafka-config-certs
{{ end }}
5 changes: 5 additions & 0 deletions infrastructure/helm-chart/templates/config/kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,8 @@ data:
{{- end }}
KAFKA_SCHEMA_REGISTRY_URL: {{ .Values.config.kafka.schemaRegistryUrl }}
KAFKA_COMMIT_INTERVAL_MS: "{{ .Values.config.kafka.commitInterval }}"
KAFKA_SASL_CA: |-
{{ .Values.config.kafka.saslCaCertificate | nindent 4 | trim }}
KAFKA_SASL_USERNAME: {{ .Values.config.kafka.saslUsername }}
KAFKA_SASL_PASSWORD: {{ .Values.config.kafka.saslPassword }}
KAFKA_KEY_TRUST_SECRET: {{ .Values.config.kafka.keyTrustSecret }}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ spec:
volumeMounts:
- name: kafka-create-topics
mountPath: /opt/provisioning
- name: kafka-config
mountPath: /opt/kafka/ca.pem
subPath: KAFKA_SASL_CA
initContainers:
- name: wait
image: "{{ .Values.global.busyboxImage }}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ data:
REPLICAS=${KAFKA_MINIMUM_REPLICAS:-1}
AIRY_CORE_NAMESPACE=${AIRY_CORE_NAMESPACE:-}
AUTH_JAAS=${AUTH_JAAS:-}
KAFKA_SASL_USERNAME=${KAFKA_SASL_USERNAME:-}
KAFKA_SASL_PASSWORD=${KAFKA_SASL_PASSWORD:-}
KAFKA_SASL_CA=${KAFKA_SASL_CA:-}

if [ -n "${AIRY_CORE_NAMESPACE}" ]; then
AIRY_CORE_NAMESPACE="${AIRY_CORE_NAMESPACE}."
Expand All @@ -45,6 +48,18 @@ data:
echo "Using jaas authentication for connecting to Kafka"
fi

if [ -n "${KAFKA_SASL_CA}" ]; then
cat <<EOF > /opt/kafka/jaas.config
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.username=$KAFKA_SASL_USERNAME
sasl.password=$KAFKA_SASL_PASSWORD
sasl.ca.location=/opt/kafka/ca.pem
EOF
CONNECTION_OPTS+=(--command-config /opt/kafka/jaas.config)
echo "Using jaas authentication for connecting to Kafka"
fi

echo "Creating Kafka topics"


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
import java.time.temporal.ChronoUnit;
import java.util.Collection;
import java.util.Properties;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.SslConfigs;
import java.util.HashMap;



public class KafkaConsumerWrapper<K, V> {
Expand All @@ -22,6 +26,7 @@ public class KafkaConsumerWrapper<K, V> {
private KafkaConsumer<K, V> consumer;

private String jaasConfig;
private String kafkaKeyTrustSecret;

public KafkaConsumerWrapper(final String brokers, final String schemaRegistryUrl) {
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
Expand All @@ -33,13 +38,22 @@ public KafkaConsumerWrapper(final String brokers, final String schemaRegistryUrl
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
}

public KafkaConsumerWrapper<K,V> withAuthJaas(String jaasConfig) {
public KafkaConsumerWrapper<K,V> withAuthJaas(String jaasConfig, String kafkaKeyTrustSecret) {
this.jaasConfig = jaasConfig;
if(jaasConfig != null) {
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "PLAIN");
props.put("sasl.jaas.config", jaasConfig);
}
if (kafkaKeyTrustSecret != null) {
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/opt/kafka/certs/client.truststore.jks");
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, kafkaKeyTrustSecret);
props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "PKCS12");
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/opt/kafka/certs/client.keystore.p12");
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, kafkaKeyTrustSecret);
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaKeyTrustSecret);
}
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,18 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.SslConfigs;
import java.util.HashMap;


public class KafkaStreamsWrapper {
private static final Logger log = AiryLoggerFactory.getLogger(KafkaStreamsWrapper.class);
private final HealthCheckRunner healthCheckRunnerThread;
private final String brokers;
private final String schemaRegistryUrl;
private String jaasConfig;
private String kafkaKeyTrustSecret;
private long commitIntervalInMs;
private long suppressIntervalInMs;
private int threadCount;
Expand Down Expand Up @@ -70,8 +75,9 @@ public KafkaStreamsWrapper(final String brokers, final String schemaRegistryUrl)
healthCheckRunnerThread = new HealthCheckRunner(testMode);
}

public KafkaStreamsWrapper withJaasConfig(String jaasConfig) {
public KafkaStreamsWrapper withJaasConfig(String jaasConfig, String kafkaKeyTrustSecret) {
this.jaasConfig = jaasConfig;
this.kafkaKeyTrustSecret = kafkaKeyTrustSecret;
return this;
}

Expand Down Expand Up @@ -227,6 +233,17 @@ public synchronized void start(final Topology topology, final String appId) thro
props.put("sasl.jaas.config", jaasConfig);
}

if (this.kafkaKeyTrustSecret != null) {
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/opt/kafka/certs/client.truststore.jks");
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, kafkaKeyTrustSecret);
props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "PKCS12");
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/opt/kafka/certs/client.keystore.p12");
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, kafkaKeyTrustSecret);
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaKeyTrustSecret);
}


props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, this.maxRequestSize);
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, this.fetchMaxBytes);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
import org.springframework.context.annotation.Scope;

import java.util.Properties;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.SslConfigs;
import java.util.HashMap;

@Configuration
@PropertySource("classpath:kafka-core.properties")
Expand All @@ -21,7 +24,8 @@ public class KafkaCoreConfig {
@Lazy
@Scope("prototype")
public <K, V> KafkaProducer<K, V> kafkaProducer(@Value("${kafka.brokers}") final String brokers, @Value("${kafka.schema-registry-url}") final String schemaRegistryUrl,
@Value("${AUTH_JAAS:#{null}}") final String jaasConfig) {
@Value("${AUTH_JAAS:#{null}}") final String jaasConfig,
@Value("${KAFKA_KEY_TRUST_SECRET:#{null}}") final String kafkaKeyTrustSecret) {
final Properties props = new Properties();

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
Expand All @@ -37,15 +41,26 @@ public <K, V> KafkaProducer<K, V> kafkaProducer(@Value("${kafka.brokers}") final
props.put("sasl.jaas.config", jaasConfig);
}

if (kafkaKeyTrustSecret != null) {
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/opt/kafka/certs/client.truststore.jks");
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, kafkaKeyTrustSecret);
props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "PKCS12");
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/opt/kafka/certs/client.keystore.p12");
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, kafkaKeyTrustSecret);
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaKeyTrustSecret);
}

return new KafkaProducer<>(props);
}

@Bean
@Lazy
@Scope("prototype")
public <K, V> KafkaConsumerWrapper<K, V> kafkaConsumer(@Value("${kafka.brokers}") final String brokers, @Value("${kafka.schema-registry-url}") final String schemaRegistryUrl,
@Value("${kafka.sasl.jaas.config:#{null}}") final String jaasConfig) {
@Value("${kafka.sasl.jaas.config:#{null}}") final String jaasConfig,
@Value("${KAFKA_KEY_TRUST_SECRET:#{null}}") final String kafkaKeyTrustSecret) {
return new KafkaConsumerWrapper<K, V>(brokers, schemaRegistryUrl)
.withAuthJaas(jaasConfig);
.withAuthJaas(jaasConfig, kafkaKeyTrustSecret);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ public class KafkaStreamsConfig {
@Value("${AUTH_JAAS:#{null}}")
private String jaasConfig;

@Value("${KAFKA_KEY_TRUST_SECRET:#{null}}")
private String kafkaKeyTrustSecret;

@Value("${kafka.rpc-port:0}")
private int rpcPort;

Expand Down Expand Up @@ -68,7 +71,7 @@ public class KafkaStreamsConfig {
public KafkaStreamsWrapper airyKafkaStreams(@Value("${kafka.brokers}") final String brokers, @Value("${kafka.schema-registry-url}") final String schemaRegistryUrl) {
return new KafkaStreamsWrapper(brokers, schemaRegistryUrl)
.withCommitIntervalInMs(commitIntervalMs)
.withJaasConfig(jaasConfig)
.withJaasConfig(jaasConfig, kafkaKeyTrustSecret)
.withSuppressIntervalInMs(suppressIntervalMs)
.withThreadCount(streamsThreadCount)
.withAppServerHost(rpcHost)
Expand Down
Loading