Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

Commit

Permalink
[D2IQ-68540] enable custom TLS (#465)
Browse files Browse the repository at this point in the history
* add option to providde custom certificates

* customise keystore and truststore paths

* add secrets when kerberos is disabled

* Add docs for custom TLS

* make it possible to use default dcos certificate for TLS encryption
  • Loading branch information
rishabh96b authored Jun 16, 2020
1 parent 79c2785 commit d6de33b
Show file tree
Hide file tree
Showing 10 changed files with 336 additions and 44 deletions.
120 changes: 120 additions & 0 deletions docs/CustomTLS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
# Use custom TLS certificate for Kafka

## Using Custom TLS settings Kafka package

To use the custom TLS certs for kafka service, we need to add the following options in the configuration of the package:

```
"service": {
"name" : "kafka",
"transport_encryption": {
"enabled": true,
"key_store": "${SERVICE_NAME}/keystore",
"key_store_password_file": "${SERVICE_NAME}/keystorepass",
"trust_store": "${SERVICE_NAME}/truststore",
"trust_store_password_file": "${SERVICE_NAME}/truststorepass"
"allow_plaintext": false
}
}
```

> Note: `transport_encryption.enabled:true` means that custom transport encryption is enabled. In future releases, we will separate the custom tls and default tls feature.
## Example with self-signed certificate

Generate CA-cert and CA-private-key, called `ca-cert` and `ca-key` respectively

```
openssl req -new -newkey rsa:4096 -days 365 -x509 -subj "/C=US/ST=CA/L=SF/O=Mesosphere/OU=Mesosphere/CN=kafka" -keyout ca-key -out ca-cert -nodes
```

Generate a keystore, called `broker.keystore`
```
keytool -genkey -keyalg RSA -keystore broker.keystore -validity 365 -storepass changeit -keypass changeit -dname "CN=kafka" -storetype JKS
```

Generate Certificate Signing Request (CSR) called `cert-file`
```
keytool -keystore broker.keystore -certreq -file cert-file -storepass changeit -keypass changeit
```

Sign the Generated certificate
```
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:changeit
```

Generate a truststore with ca-cert
```
keytool -keystore broker.truststore -alias CARoot -import -file ca-cert -storepass changeit -keypass changeit -noprompt
```

Generate a truststore called `broker.truststore` with ca-cert
```
keytool -keystore broker.truststore -alias CARoot -importcert -file ca-cert -storepass changeit -keypass changeit -noprompt
```

Generate a truststore with self-signed cert
```
keytool -keystore broker.truststore -alias CertSigned -importcert -file cert-signed -storepass changeit -keypass changeit -noprompt
```

Attach the dcos cluster using
```
dcos cluster setup {CLUSTER_URL}
```

Create Service account and its secret to use the TLS feature
```
dcos security org service-accounts keypair ${SERVICE_NAME}-private-key.pem ${SERVICE_NAME}-public-key.pem
dcos security org service-accounts create -p ${SERVICE_NAME}-public-key.pem -d "desc" "${SERVICE_NAME}"
dcos security secrets create-sa-secret --strict ${SERVICE_NAME}-private-key.pem "${SERVICE_NAME}" "${SERVICE_NAME}-secret"
```

Assign permissions to the user

```
dcos security org groups add_user superusers ${SERVICE_NAME}
dcos security org users grant ${SERVICE_NAME}dcos:mesos:master:task:user:nobody create
dcos security org users grant ${SERVICE_NAME}dcos:mesos:master:framework:role:${SERVICE_NAME}-role create
dcos security org users grant ${SERVICE_NAME}dcos:mesos:master:reservation:role:${SERVICE_NAME}-role create
dcos security org users grant ${SERVICE_NAME}dcos:mesos:master:volume:role:${SERVICE_NAME}-role create
dcos security org users grant ${SERVICE_NAME}dcos:mesos:master:reservation:principal:${SERVICE_NAME} delete
dcos security org users grant ${SERVICE_NAME}dcos:mesos:master:volume:principal:${SERVICE_NAME} delete
dcos security org users grant ${SERVICE_NAME}dcos:secrets:default:/${SERVICE_NAME}/\* full
dcos security org users grant ${SERVICE_NAME}dcos:secrets:list:default:/${SERVICE_NAME} read
```

Now we are ready to install a Kafka cluster with custom transport encryption enabled.

Create a file named `dcos-kafka-options-customtls.json` with following configuration

```
cat <<EOF >>dcos-kafka-options-customtls.json
{
"service": {
"name": "${SERVICE_NAME}",
"service_account": "${SERVICE_NAME}",
"service_account_secret": "${SERVICE_NAME}-secret",
"security": {
"transport_encryption": {
"enabled": true,
"allow_plaintext": false,
"key_store": "${SERVICE_NAME}/keystore",
"key_store_password_file": "${SERVICE_NAME}/keystorepass",
"trust_store": "${SERVICE_NAME}/truststore",
"trust_store_password_file": "${SERVICE_NAME}/truststorepass"
}
}
}
}
EOF
```

Install the kafka service
```
dcos package install kafka --options=dcos-kafka-options-customtls.json --yes
```

The custom transport encryption settings can be verified from the `server.properties` file of brokers.
14 changes: 14 additions & 0 deletions frameworks/kafka/src/main/dist/server.properties.mustache
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,33 @@ broker.rack={{ZONE}}

{{#SECURITY_TRANSPORT_ENCRYPTION_ENABLED}}
############################# TLS Settings #############################

# Use default TLS credentials if custom TLS is not enabled
{{^SECURITY_CUSTOM_TRANSPORT_ENCRYPTION_ENABLED}}
ssl.keystore.location={{MESOS_SANDBOX}}/broker.keystore
ssl.keystore.password=notsecure
ssl.key.password=notsecure

ssl.truststore.location={{MESOS_SANDBOX}}/broker.truststore
ssl.truststore.password=notsecure
{{/SECURITY_CUSTOM_TRANSPORT_ENCRYPTION_ENABLED}}

ssl.enabled.protocols=TLSv1.2

{{#SECURITY_TRANSPORT_ENCRYPTION_CIPHERS}}
ssl.cipher.suites={{SECURITY_TRANSPORT_ENCRYPTION_CIPHERS}}
{{/SECURITY_TRANSPORT_ENCRYPTION_CIPHERS}}

# Use TLS certificate provided by user when custom TLS is enabled
{{#SECURITY_CUSTOM_TRANSPORT_ENCRYPTION_ENABLED}}
ssl.keystore.location={{MESOS_SANDBOX}}/customtls/broker.keystore
ssl.keystore.password={{CUSTOM_TLS_KEY_STORE_KEY}}
ssl.key.password={{CUSTOM_TLS_KEY_STORE_KEY}}

ssl.truststore.location={{MESOS_SANDBOX}}/customtls/broker.truststore
ssl.truststore.password={{CUSTOM_TLS_TRUST_STORE_KEY}}
{{/SECURITY_CUSTOM_TRANSPORT_ENCRYPTION_ENABLED}}

# If Kerberos is NOT enabled, then SSL authentication can be turned on.
{{^SECURITY_KERBEROS_ENABLED}}
{{#SECURITY_SSL_AUTHENTICATION_ENABLED}}
Expand Down
44 changes: 44 additions & 0 deletions frameworks/kafka/src/main/dist/svc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,20 @@ pods:
keytab:
secret: {{SECURITY_KERBEROS_KEYTAB_SECRET}}
file: kafka.keytab
{{#TASKCFG_ALL_SECURITY_CUSTOM_TRANSPORT_ENCRYPTION_ENABLED}}
tlsTrustStore:
secret: {{CUSTOM_TLS_TRUST_STORE}}
file: broker.truststore
tlsTrustStorePass:
secret: {{CUSTOM_TLS_TRUST_STORE_PASSWORD}}
file: trust_store_pass
tlsKeyStore:
secret: {{CUSTOM_TLS_KEY_STORE}}
file: broker.keystore
tlsKeyStorePass:
secret: {{CUSTOM_TLS_KEY_STORE_PASSWORD}}
file: key_store_pass
{{/TASKCFG_ALL_SECURITY_CUSTOM_TRANSPORT_ENCRYPTION_ENABLED}}
{{#TASKCFG_ALL_SECURE_JMX_ENABLED}}
jmxPasswordFile:
secret: {{SECURE_JMX_PWD_FILE}}
Expand Down Expand Up @@ -79,6 +93,21 @@ pods:
file: jmx/key_store_pass
{{/TASKCFG_ALL_SECURITY_KERBEROS_ENABLED}}
{{/TASKCFG_ALL_SECURE_JMX_ENABLED}}
{{#TASKCFG_ALL_SECURITY_CUSTOM_TRANSPORT_ENCRYPTION_ENABLED}}
secrets:
tlsTrustStore:
secret: {{CUSTOM_TLS_TRUST_STORE}}
file: customtls/broker.truststore
tlsTrustStorePass:
secret: {{CUSTOM_TLS_TRUST_STORE_PASSWORD}}
file: customtls/trust_store_pass
tlsKeyStore:
secret: {{CUSTOM_TLS_KEY_STORE}}
file: customtls/broker.keystore
tlsKeyStorePass:
secret: {{CUSTOM_TLS_KEY_STORE_PASSWORD}}
file: customtls/key_store_pass
{{/TASKCFG_ALL_SECURITY_CUSTOM_TRANSPORT_ENCRYPTION_ENABLED}}
tasks:
broker:
cpus: {{BROKER_CPUS}}
Expand Down Expand Up @@ -175,6 +204,21 @@ pods:
mkdir jmx_properties
(umask u=rw,g=,o= && cp $MESOS_SANDBOX/jmx/key_file $MESOS_SANDBOX/jmx_properties/key_file)
{{/TASKCFG_ALL_SECURE_JMX_ENABLED}}
{{#TASKCFG_ALL_SECURITY_CUSTOM_TRANSPORT_ENCRYPTION_ENABLED}}
export KEY_STORE_PATH=$MESOS_SANDBOX/customtls/broker.keystore
if [ -f "$MESOS_SANDBOX/customtls/broker.keystore" ]; then
# load user provided trust store
echo "Found broker.keystore" >&2
export CUSTOM_TLS_KEY_STORE_PATH=$MESOS_SANDBOX/customtls/broker.truststore
export CUSTOM_TLS_KEY_STORE_KEY=`cat $MESOS_SANDBOX/customtls/trust_store_pass`
fi
if [ -f "$MESOS_SANDBOX/customtls/broker.truststore" ]; then
# load user provided trust store
echo "Found broker.truststore" >&2
export CUSTOM_TLS_TRUST_STORE_PATH=$MESOS_SANDBOX/customtls/broker.truststore
export CUSTOM_TLS_TRUST_STORE_KEY=`cat $MESOS_SANDBOX/customtls/trust_store_pass`
fi
{{/TASKCFG_ALL_SECURITY_CUSTOM_TRANSPORT_ENCRYPTION_ENABLED}}

# setup-helper determines the correct listeners and security.inter.broker.protocol.
# it relies on the task IP being stored in MESOS_CONTAINER_IP
Expand Down
5 changes: 4 additions & 1 deletion frameworks/kafka/tests/test_ssl_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ def test_authn_client_can_read_and_write(
"service_account": service_account["name"],
"service_account_secret": service_account["secret"],
"security": {
"transport_encryption": {"enabled": True},
"transport_encryption": {
"enabled": True,
"ciphers": "TLS_RSA_WITH_AES_128_GCM_SHA256,TLS_RSA_WITH_AES_128_CBC_SHA256,TLS_RSA_WITH_AES_256_GCM_SHA384,TLS_RSA_WITH_AES_256_CBC_SHA256,TLS_DHE_RSA_WITH_AES_128_GCM_SHA256,TLS_DHE_RSA_WITH_AES_128_CBC_SHA256,TLS_DHE_RSA_WITH_AES_256_GCM_SHA384,TLS_DHE_RSA_WITH_AES_256_CBC_SHA256,TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256,TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384",
},
"ssl_authentication": {"enabled": True},
},
}
Expand Down
13 changes: 8 additions & 5 deletions frameworks/kafka/tests/test_ssl_kerberos_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@ def kafka_server(kerberos, service_account):
"realm": kerberos.get_realm(),
"keytab_secret": kerberos.get_keytab_path(),
},
"transport_encryption": {"enabled": True},
"transport_encryption": {
"enabled": True,
"ciphers": "TLS_RSA_WITH_AES_128_GCM_SHA256,TLS_RSA_WITH_AES_128_CBC_SHA256,TLS_RSA_WITH_AES_256_GCM_SHA384,TLS_RSA_WITH_AES_256_CBC_SHA256,TLS_DHE_RSA_WITH_AES_128_GCM_SHA256,TLS_DHE_RSA_WITH_AES_128_CBC_SHA256,TLS_DHE_RSA_WITH_AES_256_GCM_SHA384,TLS_DHE_RSA_WITH_AES_256_CBC_SHA256,TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256,TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384",
},
},
}
}
Expand All @@ -94,7 +97,9 @@ def kafka_server(kerberos, service_account):
@pytest.fixture(scope="module", autouse=True)
def kafka_client(kerberos):
try:
kafka_client = client.KafkaClient("kafka-client", config.PACKAGE_NAME, config.SERVICE_NAME, kerberos)
kafka_client = client.KafkaClient(
"kafka-client", config.PACKAGE_NAME, config.SERVICE_NAME, kerberos
)
kafka_client.install()

# TODO: This flag should be set correctly.
Expand Down Expand Up @@ -123,9 +128,7 @@ def test_client_can_read_and_write(kafka_client: client.KafkaClient, kafka_serve
kafka_client.connect()

user = "client"
write_success, read_successes, _ = kafka_client.can_write_and_read(
user, topic_name
)
write_success, read_successes, _ = kafka_client.can_write_and_read(user, topic_name)

assert write_success, "Write failed (user={})".format(user)
assert read_successes, (
Expand Down
37 changes: 14 additions & 23 deletions frameworks/kafka/tests/test_ssl_kerberos_authz.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ def kerberos(configure_security):
@pytest.fixture(scope="module", autouse=True)
def kafka_client(kerberos):
try:
kafka_client = client.KafkaClient("kafka-client", config.PACKAGE_NAME, config.SERVICE_NAME, kerberos)
kafka_client = client.KafkaClient(
"kafka-client", config.PACKAGE_NAME, config.SERVICE_NAME, kerberos
)
kafka_client.install()

# TODO: This flag should be set correctly.
Expand Down Expand Up @@ -91,7 +93,10 @@ def test_authz_acls_required(
"realm": kerberos.get_realm(),
"keytab_secret": kerberos.get_keytab_path(),
},
"transport_encryption": {"enabled": True},
"transport_encryption": {
"enabled": True,
"ciphers": "TLS_RSA_WITH_AES_128_GCM_SHA256,TLS_RSA_WITH_AES_128_CBC_SHA256,TLS_RSA_WITH_AES_256_GCM_SHA384,TLS_RSA_WITH_AES_256_CBC_SHA256,TLS_DHE_RSA_WITH_AES_128_GCM_SHA256,TLS_DHE_RSA_WITH_AES_128_CBC_SHA256,TLS_DHE_RSA_WITH_AES_256_GCM_SHA384,TLS_DHE_RSA_WITH_AES_256_CBC_SHA256,TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256,TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384",
},
"authorization": {"enabled": True, "super_users": "User:{}".format("super")},
},
}
Expand Down Expand Up @@ -121,9 +126,7 @@ def test_authz_acls_required(
# Since no ACLs are specified, only the super user can read and write
for user in ["super"]:
log.info("Checking write / read permissions for user=%s", user)
write_success, read_successes, _ = kafka_client.can_write_and_read(
user, topic_name
)
write_success, read_successes, _ = kafka_client.can_write_and_read(user, topic_name)
assert write_success, "Write failed (user={})".format(user)
assert read_successes, (
"Read failed (user={}): "
Expand All @@ -133,9 +136,7 @@ def test_authz_acls_required(

for user in ["authorized", "unauthorized"]:
log.info("Checking lack of write / read permissions for user=%s", user)
write_success, _, read_messages = kafka_client.can_write_and_read(
user, topic_name
)
write_success, _, read_messages = kafka_client.can_write_and_read(user, topic_name)
assert not write_success, "Write not expected to succeed (user={})".format(user)
assert auth.is_not_authorized(read_messages), "Unauthorized expected (user={}".format(
user
Expand All @@ -147,9 +148,7 @@ def test_authz_acls_required(
# After adding ACLs the authorized user and super user should still have access to the topic.
for user in ["authorized", "super"]:
log.info("Checking write / read permissions for user=%s", user)
write_success, read_successes, _ = kafka_client.can_write_and_read(
user, topic_name
)
write_success, read_successes, _ = kafka_client.can_write_and_read(user, topic_name)
assert write_success, "Write failed (user={})".format(user)
assert read_successes, (
"Read failed (user={}): "
Expand All @@ -159,9 +158,7 @@ def test_authz_acls_required(

for user in ["unauthorized"]:
log.info("Checking lack of write / read permissions for user=%s", user)
write_success, _, read_messages = kafka_client.can_write_and_read(
user, topic_name
)
write_success, _, read_messages = kafka_client.can_write_and_read(user, topic_name)
assert not write_success, "Write not expected to succeed (user={})".format(user)
assert auth.is_not_authorized(read_messages), "Unauthorized expected (user={}".format(
user
Expand Down Expand Up @@ -226,9 +223,7 @@ def test_authz_acls_not_required(
# Since no ACLs are specified, all users can read and write.
for user in ["authorized", "unauthorized", "super"]:
log.info("Checking write / read permissions for user=%s", user)
write_success, read_successes, _ = kafka_client.can_write_and_read(
user, topic_name
)
write_success, read_successes, _ = kafka_client.can_write_and_read(user, topic_name)
assert write_success, "Write failed (user={})".format(user)
assert read_successes, (
"Read failed (user={}): "
Expand All @@ -242,9 +237,7 @@ def test_authz_acls_not_required(
# After adding ACLs the authorized user and super user should still have access to the topic.
for user in ["authorized", "super"]:
log.info("Checking write / read permissions for user=%s", user)
write_success, read_successes, _ = kafka_client.can_write_and_read(
user, topic_name
)
write_success, read_successes, _ = kafka_client.can_write_and_read(user, topic_name)
assert write_success, "Write failed (user={})".format(user)
assert read_successes, (
"Read failed (user={}): "
Expand All @@ -254,9 +247,7 @@ def test_authz_acls_not_required(

for user in ["unauthorized"]:
log.info("Checking lack of write / read permissions for user=%s", user)
write_success, _, read_messages = kafka_client.can_write_and_read(
user, topic_name
)
write_success, _, read_messages = kafka_client.can_write_and_read(user, topic_name)
assert not write_success, "Write not expected to succeed (user={})".format(user)
assert auth.is_not_authorized(read_messages), "Unauthorized expected (user={}".format(
user
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,10 @@ def kafka_server(kerberos, service_account):
"realm": kerberos.get_realm(),
"keytab_secret": kerberos.get_keytab_path(),
},
"transport_encryption": {"enabled": True},
"transport_encryption": {
"enabled": True,
"ciphers": "TLS_RSA_WITH_AES_128_GCM_SHA256,TLS_RSA_WITH_AES_128_CBC_SHA256,TLS_RSA_WITH_AES_256_GCM_SHA384,TLS_RSA_WITH_AES_256_CBC_SHA256,TLS_DHE_RSA_WITH_AES_128_GCM_SHA256,TLS_DHE_RSA_WITH_AES_128_CBC_SHA256,TLS_DHE_RSA_WITH_AES_256_GCM_SHA384,TLS_DHE_RSA_WITH_AES_256_CBC_SHA256,TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256,TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384",
},
},
}
}
Expand Down
Loading

0 comments on commit d6de33b

Please sign in to comment.