diff --git a/operator/controller/src/main/java/io/apicurio/registry/operator/feat/KafkaSql.java b/operator/controller/src/main/java/io/apicurio/registry/operator/feat/KafkaSQL.java similarity index 50% rename from operator/controller/src/main/java/io/apicurio/registry/operator/feat/KafkaSql.java rename to operator/controller/src/main/java/io/apicurio/registry/operator/feat/KafkaSQL.java index d156d1bb0a..9cce3d350b 100644 --- a/operator/controller/src/main/java/io/apicurio/registry/operator/feat/KafkaSql.java +++ b/operator/controller/src/main/java/io/apicurio/registry/operator/feat/KafkaSQL.java @@ -2,24 +2,36 @@ import io.apicurio.registry.operator.api.v1.ApicurioRegistry3; import io.fabric8.kubernetes.api.model.EnvVar; -import io.fabric8.kubernetes.api.model.EnvVarBuilder; +import io.fabric8.kubernetes.api.model.apps.Deployment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Map; +import static io.apicurio.registry.operator.resource.ResourceFactory.APP_CONTAINER_NAME; import static io.apicurio.registry.operator.resource.app.AppDeploymentResource.addEnvVar; import static io.apicurio.registry.operator.utils.Utils.isBlank; -public class KafkaSql { +public class KafkaSQL { + + private static final Logger log = LoggerFactory.getLogger(KafkaSQL.class); public static String ENV_STORAGE_KIND = "APICURIO_STORAGE_KIND"; public static String ENV_KAFKASQL_BOOTSTRAP_SERVERS = "APICURIO_KAFKASQL_BOOTSTRAP_SERVERS"; - public static boolean configureKafkaSQL(ApicurioRegistry3 primary, Map env) { + public static boolean configureKafkaSQL(ApicurioRegistry3 primary, Deployment deployment, + Map env) { if (primary.getSpec().getApp().getKafkasql() != null && !isBlank(primary.getSpec().getApp().getKafkasql().getBootstrapServers())) { - addEnvVar(env, new EnvVarBuilder().withName(ENV_STORAGE_KIND).withValue("kafkasql").build()); - addEnvVar(env, new EnvVarBuilder().withName(ENV_KAFKASQL_BOOTSTRAP_SERVERS) - .withValue(primary.getSpec().getApp().getKafkasql().getBootstrapServers()).build()); + + addEnvVar(env, ENV_STORAGE_KIND, "kafkasql"); + addEnvVar(env, ENV_KAFKASQL_BOOTSTRAP_SERVERS, + primary.getSpec().getApp().getKafkasql().getBootstrapServers()); + + if (KafkaSQLTLS.configureKafkaSQLTLS(primary, deployment, APP_CONTAINER_NAME, env)) { + log.info("KafkaSQL storage with TLS security configured."); + } + return true; } return false; diff --git a/operator/controller/src/main/java/io/apicurio/registry/operator/feat/KafkaSQLTLS.java b/operator/controller/src/main/java/io/apicurio/registry/operator/feat/KafkaSQLTLS.java new file mode 100644 index 0000000000..b3f3e45702 --- /dev/null +++ b/operator/controller/src/main/java/io/apicurio/registry/operator/feat/KafkaSQLTLS.java @@ -0,0 +1,147 @@ +package io.apicurio.registry.operator.feat; + +import io.apicurio.registry.operator.OperatorException; +import io.apicurio.registry.operator.api.v1.ApicurioRegistry3; +import io.apicurio.registry.operator.api.v1.ApicurioRegistry3SpecKafkaSqlSecurity; +import io.apicurio.registry.operator.api.v1.ApicurioRegistry3SpecKafkaSqlTLS; +import io.fabric8.kubernetes.api.model.EnvVar; +import io.fabric8.kubernetes.api.model.EnvVarBuilder; +import io.fabric8.kubernetes.api.model.VolumeBuilder; +import io.fabric8.kubernetes.api.model.VolumeMountBuilder; +import io.fabric8.kubernetes.api.model.apps.Deployment; + +import java.util.Map; + +import static io.apicurio.registry.operator.resource.app.AppDeploymentResource.addEnvVar; +import static io.apicurio.registry.operator.resource.app.AppDeploymentResource.getContainerFromDeployment; +import static io.apicurio.registry.operator.utils.Utils.isBlank; + +public class KafkaSQLTLS { + + public static final String ENV_KAFKASQL_SECURITY_PROTOCOL = "APICURIO_KAFKA_COMMON_SECURITY_PROTOCOL"; + + public static final String ENV_KAFKASQL_SSL_KEYSTORE_TYPE = "APICURIO_KAFKA_COMMON_SSL_KEYSTORE_TYPE"; + public static final String ENV_KAFKASQL_SSL_KEYSTORE_LOCATION = "APICURIO_KAFKA_COMMON_SSL_KEYSTORE_LOCATION"; + public static final String ENV_KAFKASQL_SSL_KEYSTORE_PASSWORD = "APICURIO_KAFKA_COMMON_SSL_KEYSTORE_PASSWORD"; + + public static final String ENV_KAFKASQL_SSL_TRUSTSTORE_TYPE = "APICURIO_KAFKA_COMMON_SSL_TRUSTSTORE_TYPE"; + public static final String ENV_KAFKASQL_SSL_TRUSTSTORE_LOCATION = "APICURIO_KAFKA_COMMON_SSL_TRUSTSTORE_LOCATION"; + public static final String ENV_KAFKASQL_SSL_TRUSTSTORE_PASSWORD = "APICURIO_KAFKA_COMMON_SSL_TRUSTSTORE_PASSWORD"; + + public static final String KEYSTORE_SECRET_VOLUME_NAME = "registry-kafkasql-tls-keystore"; + public static final String TRUSTSTORE_SECRET_VOLUME_NAME = "registry-kafkasql-tls-truststore"; + + /** + * Plain KafkaSQL must be already configured. + */ + public static boolean configureKafkaSQLTLS(ApicurioRegistry3 primary, Deployment deployment, + String containerName, Map env) { + + if (primary.getSpec().getApp().getKafkasql() == null + || isBlank(primary.getSpec().getApp().getKafkasql().getBootstrapServers())) { + throw new OperatorException("Plain KafkaSQL must be already configured."); + } + + if (primary.getSpec().getApp().getKafkasql().getSecurity() == null) { + primary.getSpec().getApp().getKafkasql().setSecurity(new ApicurioRegistry3SpecKafkaSqlSecurity()); + } + + if (primary.getSpec().getApp().getKafkasql().getSecurity().getTls() == null) { + primary.getSpec().getApp().getKafkasql().getSecurity() + .setTls(new ApicurioRegistry3SpecKafkaSqlTLS()); + } + + if (!isBlank(primary.getSpec().getApp().getKafkasql().getSecurity().getTls().getKeystoreSecretName()) + && !isBlank(primary.getSpec().getApp().getKafkasql().getSecurity().getTls() + .getTruststoreSecretName())) { + + addEnvVar(env, ENV_KAFKASQL_SECURITY_PROTOCOL, "SSL"); + + // ===== Keystore + + addEnvVar(env, ENV_KAFKASQL_SSL_KEYSTORE_TYPE, "PKCS12"); + addEnvVar(env, ENV_KAFKASQL_SSL_KEYSTORE_LOCATION, + "/etc/" + KEYSTORE_SECRET_VOLUME_NAME + "/user.p12"); + // spotless:off + // @formatter:off + addEnvVar(env, new EnvVarBuilder() + .withName(ENV_KAFKASQL_SSL_KEYSTORE_PASSWORD) + .withNewValueFrom() + .withNewSecretKeyRef() + .withName(primary.getSpec().getApp().getKafkasql().getSecurity().getTls().getKeystoreSecretName()) + .withKey("user.password") + .endSecretKeyRef() + .endValueFrom() + .build() + ); + // @formatter:on + // spotless:on + + addSecretVolume(deployment, + primary.getSpec().getApp().getKafkasql().getSecurity().getTls().getKeystoreSecretName(), + KEYSTORE_SECRET_VOLUME_NAME); + addSecretVolumeMount(deployment, containerName, KEYSTORE_SECRET_VOLUME_NAME, + "etc/" + KEYSTORE_SECRET_VOLUME_NAME); + + // ===== Truststore + + addEnvVar(env, ENV_KAFKASQL_SSL_TRUSTSTORE_TYPE, "PKCS12"); + addEnvVar(env, ENV_KAFKASQL_SSL_TRUSTSTORE_LOCATION, + "/etc/" + TRUSTSTORE_SECRET_VOLUME_NAME + "/ca.p12"); + // spotless:off + // @formatter:off + addEnvVar(env, new EnvVarBuilder() + .withName(ENV_KAFKASQL_SSL_TRUSTSTORE_PASSWORD) + .withNewValueFrom() + .withNewSecretKeyRef() + .withName(primary.getSpec().getApp().getKafkasql().getSecurity().getTls().getTruststoreSecretName()) + .withKey("ca.password") + .endSecretKeyRef() + .endValueFrom() + .build() + ); + // @formatter:on + // spotless:on + + addSecretVolume(deployment, + primary.getSpec().getApp().getKafkasql().getSecurity().getTls().getTruststoreSecretName(), + TRUSTSTORE_SECRET_VOLUME_NAME); + addSecretVolumeMount(deployment, containerName, TRUSTSTORE_SECRET_VOLUME_NAME, + "etc/" + TRUSTSTORE_SECRET_VOLUME_NAME); + + return true; + } + return false; + } + + public static void addSecretVolume(Deployment deployment, String secretName, String volumeName) { + // spotless:off + // @formatter:off + deployment.getSpec().getTemplate().getSpec().getVolumes().add( + new VolumeBuilder() + .withName(volumeName) + .withNewSecret() + .withSecretName(secretName) + .endSecret() + .build() + ); + // @formatter:on + // spotless:on + } + + public static void addSecretVolumeMount(Deployment deployment, String containerName, String volumeName, + String mountPath) { + var c = getContainerFromDeployment(deployment, containerName); + // spotless:off + // @formatter:off + c.getVolumeMounts().add( + new VolumeMountBuilder() + .withName(volumeName) + .withReadOnly(true) + .withMountPath(mountPath) + .build() + ); + // @formatter:on + // spotless:on + } +} diff --git a/operator/controller/src/main/java/io/apicurio/registry/operator/resource/app/AppDeploymentResource.java b/operator/controller/src/main/java/io/apicurio/registry/operator/resource/app/AppDeploymentResource.java index d4d88c0529..3384358b87 100644 --- a/operator/controller/src/main/java/io/apicurio/registry/operator/resource/app/AppDeploymentResource.java +++ b/operator/controller/src/main/java/io/apicurio/registry/operator/resource/app/AppDeploymentResource.java @@ -2,7 +2,7 @@ import io.apicurio.registry.operator.OperatorException; import io.apicurio.registry.operator.api.v1.ApicurioRegistry3; -import io.apicurio.registry.operator.feat.KafkaSql; +import io.apicurio.registry.operator.feat.KafkaSQL; import io.apicurio.registry.operator.feat.PostgresSql; import io.fabric8.kubernetes.api.model.Container; import io.fabric8.kubernetes.api.model.EnvVar; @@ -62,7 +62,7 @@ protected Deployment desired(ApicurioRegistry3 primary, Context map, EnvVar envVar) { } } + public static void addEnvVar(Map map, String name, String value) { + addEnvVar(map, new EnvVarBuilder().withName(name).withValue(value).build()); + } + /** * Get container with a given name from the given Deployment. * diff --git a/operator/controller/src/test/java/io/apicurio/registry/operator/it/KafkaSQLITTest.java b/operator/controller/src/test/java/io/apicurio/registry/operator/it/KafkaSQLITTest.java index 08d750e7fa..a00f5e547f 100644 --- a/operator/controller/src/test/java/io/apicurio/registry/operator/it/KafkaSQLITTest.java +++ b/operator/controller/src/test/java/io/apicurio/registry/operator/it/KafkaSQLITTest.java @@ -33,20 +33,20 @@ public static void beforeAll() throws Exception { @Test void testKafkaSQLPlain() { - client.load(getClass().getResourceAsStream("/k8s/examples/kafkasql/plain/ephemeral.kafka.yaml")) - .create(); - final var clusterNAme = "my-cluster"; + client.load(getClass().getResourceAsStream("/k8s/examples/kafkasql/plain/example-cluster.kafka.yaml")) + .createOrReplace(); + final var clusterName = "example-cluster"; await().ignoreExceptions().untilAsserted(() -> // Strimzi uses StrimziPodSet instead of ReplicaSet, so we have to check pods - assertThat(client.pods().inNamespace(namespace).withName(clusterNAme + "-kafka-0").get().getStatus() + assertThat(client.pods().inNamespace(namespace).withName(clusterName + "-kafka-0").get().getStatus() .getConditions()).filteredOn(c -> "Ready".equals(c.getType())).map(PodCondition::getStatus) .containsOnly("True")); // We're guessing the value here to avoid using Strimzi Java model, and relying on retries below. - var bootstrapServers = clusterNAme + "-kafka-bootstrap." + namespace + ".svc:9092"; + var bootstrapServers = clusterName + "-kafka-bootstrap." + namespace + ".svc:9092"; - var registry = deserialize("k8s/examples/kafkasql/plain/kafka-plain.apicurioregistry3.yaml", + var registry = deserialize("k8s/examples/kafkasql/plain/kafkasql-plain.apicurioregistry3.yaml", ApicurioRegistry3.class); registry.getMetadata().setNamespace(namespace); registry.getSpec().getApp().getKafkasql().setBootstrapServers(bootstrapServers); @@ -63,12 +63,11 @@ void testKafkaSQLPlain() { .findFirst().get(); assertThat(client.pods().inNamespace(namespace).withName(podName).getLog()) .contains("Using Kafka-SQL artifactStore"); - return true; }); } - private static void applyStrimziResources() throws IOException { + static void applyStrimziResources() throws IOException { try (BufferedInputStream in = new BufferedInputStream( new URL("https://strimzi.io/install/latest").openStream())) { List resources = Serialization.unmarshal(in); diff --git a/operator/controller/src/test/java/io/apicurio/registry/operator/it/KafkaSQLTLSITTest.java b/operator/controller/src/test/java/io/apicurio/registry/operator/it/KafkaSQLTLSITTest.java new file mode 100644 index 0000000000..a88bea7003 --- /dev/null +++ b/operator/controller/src/test/java/io/apicurio/registry/operator/it/KafkaSQLTLSITTest.java @@ -0,0 +1,70 @@ +package io.apicurio.registry.operator.it; + +import io.apicurio.registry.operator.api.v1.ApicurioRegistry3; +import io.fabric8.kubernetes.api.model.PodCondition; +import io.quarkus.test.junit.QuarkusTest; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static io.apicurio.registry.operator.it.KafkaSQLITTest.applyStrimziResources; +import static io.apicurio.registry.operator.resource.ResourceFactory.deserialize; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +@QuarkusTest +public class KafkaSQLTLSITTest extends ITBase { + + private static final Logger log = LoggerFactory.getLogger(KafkaSQLTLSITTest.class); + + @BeforeAll + public static void beforeAll() throws Exception { + applyStrimziResources(); + } + + @Test + void testKafkaSQLTLS() { + client.load(getClass().getResourceAsStream("/k8s/examples/kafkasql/tls/example-cluster.kafka.yaml")) + .create(); + final var clusterName = "example-cluster"; + + await().ignoreExceptions().untilAsserted(() -> + // Strimzi uses StrimziPodSet instead of ReplicaSet, so we have to check pods + assertThat(client.pods().inNamespace(namespace).withName(clusterName + "-kafka-0").get().getStatus() + .getConditions()).filteredOn(c -> "Ready".equals(c.getType())).map(PodCondition::getStatus) + .containsOnly("True")); + + client.load(getClass().getResourceAsStream("/k8s/examples/kafkasql/tls/apicurio.kafkauser.yaml")) + .inNamespace(namespace).create(); + + final var userName = "apicurio"; + + await().untilAsserted( + () -> assertThat(client.secrets().inNamespace(namespace).withName(userName).get()) + .isNotNull()); + + // We're guessing the value here to avoid using Strimzi Java model, and relying on retries below. + var bootstrapServers = clusterName + "-kafka-bootstrap." + namespace + ".svc:9093"; + + var registry = deserialize("k8s/examples/kafkasql/tls/kafkasql-tls.apicurioregistry3.yaml", + ApicurioRegistry3.class); + registry.getMetadata().setNamespace(namespace); + registry.getSpec().getApp().getKafkasql().setBootstrapServers(bootstrapServers); + + client.resource(registry).create(); + + await().ignoreExceptions().until(() -> { + assertThat(client.apps().deployments().inNamespace(namespace) + .withName(registry.getMetadata().getName() + "-app-deployment").get().getStatus() + .getReadyReplicas().intValue()).isEqualTo(1); + var podName = client.pods().inNamespace(namespace).list().getItems().stream() + .map(pod -> pod.getMetadata().getName()) + .filter(podN -> podN.startsWith(registry.getMetadata().getName() + "-app-deployment")) + .findFirst().get(); + assertThat(client.pods().inNamespace(namespace).withName(podName).getLog()) + .contains("Using Kafka-SQL artifactStore"); + return true; + }); + } +} diff --git a/operator/controller/src/test/resources/k8s/examples/kafkasql/plain/ephemeral.kafka.yaml b/operator/controller/src/test/resources/k8s/examples/kafkasql/plain/ephemeral.kafka.yaml deleted file mode 100644 index 2801bcc778..0000000000 --- a/operator/controller/src/test/resources/k8s/examples/kafkasql/plain/ephemeral.kafka.yaml +++ /dev/null @@ -1,36 +0,0 @@ -apiVersion: kafka.strimzi.io/v1beta2 -kind: Kafka -metadata: - name: my-cluster -spec: - kafka: - version: 3.8.0 - replicas: 1 # Reduced from 3 for faster deployment - listeners: - - name: plain - port: 9092 - type: internal - tls: false - - name: tls - port: 9093 - type: internal - tls: true - config: - # Uncomment if you want to use 3 replicas: - # - # offsets.topic.replication.factor: 3 - # transaction.state.log.replication.factor: 3 - # transaction.state.log.min.isr: 2 - # default.replication.factor: 3 - # min.insync.replicas: 2 - offsets.topic.replication.factor: 1 - inter.broker.protocol.version: "3.8" - storage: - type: ephemeral - zookeeper: - replicas: 1 # Reduced from 3 for faster deployment - storage: - type: ephemeral - entityOperator: - topicOperator: {} - userOperator: {} diff --git a/operator/controller/src/test/resources/k8s/examples/kafkasql/plain/example-cluster.kafka.yaml b/operator/controller/src/test/resources/k8s/examples/kafkasql/plain/example-cluster.kafka.yaml new file mode 100644 index 0000000000..1e331f9d50 --- /dev/null +++ b/operator/controller/src/test/resources/k8s/examples/kafkasql/plain/example-cluster.kafka.yaml @@ -0,0 +1,25 @@ +apiVersion: kafka.strimzi.io/v1beta2 +kind: Kafka +metadata: + name: example-cluster +spec: + kafka: + version: 3.8.0 + replicas: 1 + listeners: + - name: plain + type: internal + port: 9092 + tls: false + config: + inter.broker.protocol.version: "3.8" + offsets.topic.replication.factor: 1 + storage: + type: ephemeral + zookeeper: + replicas: 1 + storage: + type: ephemeral + entityOperator: + topicOperator: {} + userOperator: {} diff --git a/operator/controller/src/test/resources/k8s/examples/kafkasql/plain/kafka-plain.apicurioregistry3.yaml b/operator/controller/src/test/resources/k8s/examples/kafkasql/plain/kafkasql-plain.apicurioregistry3.yaml similarity index 63% rename from operator/controller/src/test/resources/k8s/examples/kafkasql/plain/kafka-plain.apicurioregistry3.yaml rename to operator/controller/src/test/resources/k8s/examples/kafkasql/plain/kafkasql-plain.apicurioregistry3.yaml index 70bbe00557..e43f0a0eeb 100644 --- a/operator/controller/src/test/resources/k8s/examples/kafkasql/plain/kafka-plain.apicurioregistry3.yaml +++ b/operator/controller/src/test/resources/k8s/examples/kafkasql/plain/kafkasql-plain.apicurioregistry3.yaml @@ -7,3 +7,8 @@ spec: kafkasql: bootstrapServers: "..svc:9092" # Try using Strimzi/Red Hat AMQ Streams Operator! + env: + - name: QUARKUS_LOG_LEVEL + value: debug + - name: QUARKUS_LOG_CATEGORY_IO_APICURIO_LEVEL + value: debug diff --git a/operator/controller/src/test/resources/k8s/examples/kafkasql/tls/apicurio.kafkauser.yaml b/operator/controller/src/test/resources/k8s/examples/kafkasql/tls/apicurio.kafkauser.yaml new file mode 100644 index 0000000000..0046594537 --- /dev/null +++ b/operator/controller/src/test/resources/k8s/examples/kafkasql/tls/apicurio.kafkauser.yaml @@ -0,0 +1,32 @@ +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaUser +metadata: + name: apicurio + labels: + strimzi.io/cluster: example-cluster +spec: + authentication: + type: tls + authorization: + type: simple + acls: + - resource: + type: topic + name: kafkasql-journal + operations: + - All + - resource: + type: topic + name: kafkasql-snapshots + operations: + - All + - resource: + type: topic + name: registry-events + operations: + - All + - resource: + type: group + name: '*' + operations: + - All diff --git a/operator/controller/src/test/resources/k8s/examples/kafkasql/tls/example-cluster.kafka.yaml b/operator/controller/src/test/resources/k8s/examples/kafkasql/tls/example-cluster.kafka.yaml new file mode 100644 index 0000000000..d9459b8967 --- /dev/null +++ b/operator/controller/src/test/resources/k8s/examples/kafkasql/tls/example-cluster.kafka.yaml @@ -0,0 +1,29 @@ +apiVersion: kafka.strimzi.io/v1beta2 +kind: Kafka +metadata: + name: example-cluster +spec: + kafka: + version: 3.8.0 + replicas: 1 + listeners: + - name: tls + type: internal + port: 9093 + tls: true + authentication: + type: tls + authorization: + type: simple + config: + inter.broker.protocol.version: "3.8" + offsets.topic.replication.factor: 1 + storage: + type: ephemeral + zookeeper: + replicas: 1 + storage: + type: ephemeral + entityOperator: + topicOperator: {} + userOperator: {} diff --git a/operator/controller/src/test/resources/k8s/examples/kafkasql/tls/kafkasql-tls.apicurioregistry3.yaml b/operator/controller/src/test/resources/k8s/examples/kafkasql/tls/kafkasql-tls.apicurioregistry3.yaml new file mode 100644 index 0000000000..1f16ae15ac --- /dev/null +++ b/operator/controller/src/test/resources/k8s/examples/kafkasql/tls/kafkasql-tls.apicurioregistry3.yaml @@ -0,0 +1,18 @@ +apiVersion: registry.apicur.io/v1 +kind: ApicurioRegistry3 +metadata: + name: kafkasql-tls +spec: + app: + kafkasql: + bootstrapServers: "..svc:9093" + # Try using Strimzi/Red Hat AMQ Streams Operator! + security: + tls: + keystoreSecretName: apicurio + truststoreSecretName: example-cluster-cluster-ca-cert + env: + - name: QUARKUS_LOG_LEVEL + value: debug + - name: QUARKUS_LOG_CATEGORY_IO_APICURIO_LEVEL + value: debug diff --git a/operator/model/src/main/java/io/apicurio/registry/operator/api/v1/ApicurioRegistry3SpecKafkaSql.java b/operator/model/src/main/java/io/apicurio/registry/operator/api/v1/ApicurioRegistry3SpecKafkaSql.java index 759005ee3c..7925f602ab 100644 --- a/operator/model/src/main/java/io/apicurio/registry/operator/api/v1/ApicurioRegistry3SpecKafkaSql.java +++ b/operator/model/src/main/java/io/apicurio/registry/operator/api/v1/ApicurioRegistry3SpecKafkaSql.java @@ -8,7 +8,7 @@ import lombok.*; @JsonInclude(Include.NON_NULL) -@JsonPropertyOrder({ "bootstrapServers" }) +@JsonPropertyOrder({ "bootstrapServers", "security" }) @JsonDeserialize(using = None.class) @NoArgsConstructor @AllArgsConstructor(access = AccessLevel.PRIVATE) @@ -26,4 +26,13 @@ public class ApicurioRegistry3SpecKafkaSql implements KubernetesResource { Configure Kafka bootstrap servers.""") @JsonSetter(nulls = Nulls.SKIP) private String bootstrapServers; + + /** + * Provide the following configuration options if your Kafka cluster is secured using TLS or SCRAM. + */ + @JsonProperty("security") + @JsonPropertyDescription(""" + Provide the following configuration options if your Kafka cluster is secured using TLS or SCRAM.""") + @JsonSetter(nulls = Nulls.SKIP) + private ApicurioRegistry3SpecKafkaSqlSecurity security; } diff --git a/operator/model/src/main/java/io/apicurio/registry/operator/api/v1/ApicurioRegistry3SpecKafkaSqlSecurity.java b/operator/model/src/main/java/io/apicurio/registry/operator/api/v1/ApicurioRegistry3SpecKafkaSqlSecurity.java new file mode 100644 index 0000000000..fbdcce2c45 --- /dev/null +++ b/operator/model/src/main/java/io/apicurio/registry/operator/api/v1/ApicurioRegistry3SpecKafkaSqlSecurity.java @@ -0,0 +1,29 @@ +package io.apicurio.registry.operator.api.v1; + +import com.fasterxml.jackson.annotation.*; +import com.fasterxml.jackson.annotation.JsonInclude.Include; +import com.fasterxml.jackson.databind.JsonDeserializer.None; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import io.fabric8.kubernetes.api.model.KubernetesResource; +import lombok.*; + +@JsonInclude(Include.NON_NULL) +@JsonPropertyOrder({ "tls" }) +@JsonDeserialize(using = None.class) +@NoArgsConstructor +@AllArgsConstructor(access = AccessLevel.PRIVATE) +@Builder +@Getter +@Setter +@ToString +public class ApicurioRegistry3SpecKafkaSqlSecurity implements KubernetesResource { + + /** + * Kafka is secured using TLS. + */ + @JsonProperty("tls") + @JsonPropertyDescription(""" + Kafka is secured using TLS.""") + @JsonSetter(nulls = Nulls.SKIP) + private ApicurioRegistry3SpecKafkaSqlTLS tls; +} diff --git a/operator/model/src/main/java/io/apicurio/registry/operator/api/v1/ApicurioRegistry3SpecKafkaSqlTLS.java b/operator/model/src/main/java/io/apicurio/registry/operator/api/v1/ApicurioRegistry3SpecKafkaSqlTLS.java new file mode 100644 index 0000000000..77523a7f2b --- /dev/null +++ b/operator/model/src/main/java/io/apicurio/registry/operator/api/v1/ApicurioRegistry3SpecKafkaSqlTLS.java @@ -0,0 +1,40 @@ +package io.apicurio.registry.operator.api.v1; + +import com.fasterxml.jackson.annotation.*; +import com.fasterxml.jackson.annotation.JsonInclude.Include; +import com.fasterxml.jackson.databind.JsonDeserializer.None; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import io.fabric8.kubernetes.api.model.KubernetesResource; +import lombok.*; + +@JsonInclude(Include.NON_NULL) +@JsonPropertyOrder({ "keystoreSecretName", "truststoreSecretName" }) +@JsonDeserialize(using = None.class) +@NoArgsConstructor +@AllArgsConstructor(access = AccessLevel.PRIVATE) +@Builder +@Getter +@Setter +@ToString +public class ApicurioRegistry3SpecKafkaSqlTLS implements KubernetesResource { + + /** + * Name of a Secret that contains TLS keystore (in PKCS12 format) under the `user.p12` key, and keystore + * password under the `user.password` key. + */ + @JsonProperty("keystoreSecretName") + @JsonPropertyDescription(""" + Name of a Secret that contains TLS keystore (in PKCS12 format) under the `user.p12` key, and keystore password under the `user.password` key.""") + @JsonSetter(nulls = Nulls.SKIP) + private String keystoreSecretName; + + /** + * Name of a Secret that contains TLS truststore (in PKCS12 format) under the `ca.p12` key, and truststore + * password under the `ca.password` key. + */ + @JsonProperty("truststoreSecretName") + @JsonPropertyDescription(""" + Name of a Secret that contains TLS truststore (in PKCS12 format) under the `ca.p12` key, and truststore password under the `ca.password` key.""") + @JsonSetter(nulls = Nulls.SKIP) + private String truststoreSecretName; +}