From 561f3c088a008cec447c736609e59330328b12ac Mon Sep 17 00:00:00 2001 From: shawkins Date: Tue, 1 Jun 2021 13:36:22 -0400 Subject: [PATCH] marking crd fields as required generally this guards against unexpected npes due to a fabric8 bug, we can't mark the specs as requires, so they will always be there based upon the api switching kafka.authentication.enabled and kafka.external.certificate.enabled to be based upon the cr, rather than the config --- api/pom.xml | 4 ++ .../resources/v1alpha1/ManagedKafka.java | 16 +++++ .../resources/v1alpha1/ManagedKafkaAgent.java | 16 +++++ .../v1alpha1/ManagedKafkaAgentSpec.java | 3 + .../ManagedKafkaAuthenticationOAuth.java | 3 + .../v1alpha1/ManagedKafkaEndpoint.java | 3 + .../resources/v1alpha1/ManagedKafkaSpec.java | 15 ++++- .../v1alpha1/ObservabilityConfiguration.java | 5 +- .../resources/v1alpha1/TlsKeyPair.java | 4 ++ .../operator/resources/v1alpha1/Versions.java | 4 ++ .../bf2/operator/operands/AdminServer.java | 19 +++--- .../bf2/operator/operands/KafkaCluster.java | 13 +---- .../secrets/SecuritySecretManager.java | 58 +++++++++---------- .../src/main/resources/application.properties | 3 - .../operator/operands/AdminServerTest.java | 1 + .../src/test/resources/expected/strimzi.yml | 4 -- .../org/bf2/sync/ManagedKafkaAgentSync.java | 5 +- 17 files changed, 114 insertions(+), 62 deletions(-) diff --git a/api/pom.xml b/api/pom.xml index cc68a15a4..7e1eed75f 100644 --- a/api/pom.xml +++ b/api/pom.xml @@ -23,6 +23,10 @@ org.projectlombok lombok + + jakarta.validation + jakarta.validation-api + org.junit.jupiter diff --git a/api/src/main/java/org/bf2/operator/resources/v1alpha1/ManagedKafka.java b/api/src/main/java/org/bf2/operator/resources/v1alpha1/ManagedKafka.java index 1c4bb12b1..44dbbea65 100644 --- a/api/src/main/java/org/bf2/operator/resources/v1alpha1/ManagedKafka.java +++ b/api/src/main/java/org/bf2/operator/resources/v1alpha1/ManagedKafka.java @@ -32,6 +32,22 @@ public class ManagedKafka extends CustomResource corsAllowList; - @ConfigProperty(name = "kafka.external.certificate.enabled", defaultValue = "false") - boolean isKafkaExternalCertificateEnabled; - OpenShiftClient openShiftClient; @Inject @@ -172,7 +169,7 @@ protected Service serviceFrom(ManagedKafka managedKafka, Service current) { .editOrNewSpec() .withClusterIP(null) // to prevent 422 errors .withSelector(getSelectorLabels(adminServerName)) - .withPorts(getServicePorts()) + .withPorts(getServicePorts(managedKafka)) .endSpec() .build(); @@ -192,7 +189,7 @@ protected Route routeFrom(ManagedKafka managedKafka, Route current) { final IntOrString targetPort; final TLSConfig tlsConfig; - if (isKafkaExternalCertificateEnabled) { + if (SecuritySecretManager.isKafkaExternalCertificateEnabled(managedKafka)) { targetPort = HTTPS_PORT_TARGET; tlsConfig = new TLSConfigBuilder().withTermination("passthrough").build(); } else { @@ -231,7 +228,7 @@ protected List getContainers(ManagedKafka managedKafka) { .withName("admin-server") .withImage(adminApiImage) .withEnv(getEnvVar(managedKafka)) - .withPorts(getContainerPorts()) + .withPorts(getContainerPorts(managedKafka)) .withResources(getResources()) .withReadinessProbe(getProbe()) .withLivenessProbe(getProbe()) @@ -298,7 +295,7 @@ private List getEnvVar(ManagedKafka managedKafka) { .withValue(managedKafka.getMetadata().getName() + "-kafka-bootstrap:9095") .build()); - if (isKafkaExternalCertificateEnabled) { + if (SecuritySecretManager.isKafkaExternalCertificateEnabled(managedKafka)) { envVars.add(new EnvVarBuilder() .withName("KAFKA_ADMIN_TLS_CERT") .withNewValueFrom() @@ -324,11 +321,11 @@ private List getEnvVar(ManagedKafka managedKafka) { return envVars; } - private List getContainerPorts() { + private List getContainerPorts(ManagedKafka managedKafka) { final String apiPortName; final int apiContainerPort; - if (isKafkaExternalCertificateEnabled) { + if (SecuritySecretManager.isKafkaExternalCertificateEnabled(managedKafka)) { apiPortName = HTTPS_PORT_NAME; apiContainerPort = HTTPS_PORT; } else { @@ -346,12 +343,12 @@ private List getContainerPorts() { .build()); } - private List getServicePorts() { + private List getServicePorts(ManagedKafka managedKafka) { final String apiPortName; final int apiPort; final IntOrString apiTargetPort; - if (isKafkaExternalCertificateEnabled) { + if (SecuritySecretManager.isKafkaExternalCertificateEnabled(managedKafka)) { apiPortName = HTTPS_PORT_NAME; apiPort = HTTPS_PORT; apiTargetPort = HTTPS_PORT_TARGET; diff --git a/operator/src/main/java/org/bf2/operator/operands/KafkaCluster.java b/operator/src/main/java/org/bf2/operator/operands/KafkaCluster.java index f67906544..92f66d2d6 100644 --- a/operator/src/main/java/org/bf2/operator/operands/KafkaCluster.java +++ b/operator/src/main/java/org/bf2/operator/operands/KafkaCluster.java @@ -59,7 +59,6 @@ import org.bf2.operator.resources.v1alpha1.ManagedKafkaAuthenticationOAuth; import org.bf2.operator.secrets.ImagePullSecretManager; import org.bf2.operator.secrets.SecuritySecretManager; -import org.eclipse.microprofile.config.inject.ConfigProperty; import org.jboss.logging.Logger; import javax.enterprise.context.ApplicationScoped; @@ -118,12 +117,6 @@ public class KafkaCluster extends AbstractKafkaCluster { @Inject protected SecuritySecretManager secretManager; - @ConfigProperty(name = "kafka.authentication.enabled", defaultValue = "false") - protected boolean isKafkaAuthenticationEnabled; - - @ConfigProperty(name = "kafka.external.certificate.enabled", defaultValue = "false") - protected boolean isKafkaExternalCertificateEnabled; - @Inject protected ImagePullSecretManager imagePullSecretManager; @@ -263,7 +256,7 @@ protected GenericSecretSource getSsoClientGenericSecretSource(ManagedKafka manag } protected CertSecretSource getSsoTlsCertSecretSource(ManagedKafka managedKafka) { - if (!isKafkaAuthenticationEnabled || managedKafka.getSpec().getOauth().getTlsTrustedCertificate() == null) { + if (!SecuritySecretManager.isKafkaAuthenticationEnabled(managedKafka) || managedKafka.getSpec().getOauth().getTlsTrustedCertificate() == null) { return null; } return new CertSecretSourceBuilder() @@ -273,7 +266,7 @@ protected CertSecretSource getSsoTlsCertSecretSource(ManagedKafka managedKafka) } protected CertAndKeySecretSource getTlsCertAndKeySecretSource(ManagedKafka managedKafka) { - if (!isKafkaExternalCertificateEnabled) { + if (!SecuritySecretManager.isKafkaExternalCertificateEnabled(managedKafka)) { return null; } return new CertAndKeySecretSourceBuilder() @@ -448,7 +441,7 @@ private ArrayOrObjectKafkaListeners getKafkaListeners(ManagedKafka managedKafka) KafkaListenerAuthentication plainOverOauthAuthenticationListener = null; KafkaListenerAuthentication oauthAuthenticationListener = null; - if (isKafkaAuthenticationEnabled) { + if (SecuritySecretManager.isKafkaAuthenticationEnabled(managedKafka)) { ManagedKafkaAuthenticationOAuth managedKafkaAuthenticationOAuth = managedKafka.getSpec().getOauth(); CertSecretSource ssoTlsCertSecretSource = getSsoTlsCertSecretSource(managedKafka); diff --git a/operator/src/main/java/org/bf2/operator/secrets/SecuritySecretManager.java b/operator/src/main/java/org/bf2/operator/secrets/SecuritySecretManager.java index e3cdcbc28..f0fc0ec09 100644 --- a/operator/src/main/java/org/bf2/operator/secrets/SecuritySecretManager.java +++ b/operator/src/main/java/org/bf2/operator/secrets/SecuritySecretManager.java @@ -7,7 +7,6 @@ import org.bf2.common.OperandUtils; import org.bf2.operator.InformerManager; import org.bf2.operator.resources.v1alpha1.ManagedKafka; -import org.eclipse.microprofile.config.inject.ConfigProperty; import javax.enterprise.context.ApplicationScoped; import javax.inject.Inject; @@ -26,11 +25,13 @@ public class SecuritySecretManager { @Inject private InformerManager informerManager; - @ConfigProperty(name = "kafka.authentication.enabled", defaultValue = "false") - private boolean isKafkaAuthenticationEnabled; + public static boolean isKafkaAuthenticationEnabled(ManagedKafka managedKafka) { + return managedKafka.getSpec().getOauth() != null; + } - @ConfigProperty(name = "kafka.external.certificate.enabled", defaultValue = "false") - private boolean isKafkaExternalCertificateEnabled; + public static boolean isKafkaExternalCertificateEnabled(ManagedKafka managedKafka) { + return managedKafka.getSpec().getEndpoint().getTls() != null; + } public static String kafkaTlsSecretName(ManagedKafka managedKafka) { return managedKafka.getMetadata().getName() + "-tls-secret"; @@ -51,11 +52,11 @@ public static String kafkaClusterNamespace(ManagedKafka managedKafka) { public boolean isDeleted(ManagedKafka managedKafka) { boolean isDeleted = true; - if (isKafkaExternalCertificateEnabled) { + if (isKafkaExternalCertificateEnabled(managedKafka)) { isDeleted = cachedSecret(managedKafka, kafkaTlsSecretName(managedKafka)) == null; } - if (isKafkaAuthenticationEnabled) { + if (isKafkaAuthenticationEnabled(managedKafka)) { isDeleted = isDeleted && cachedSecret(managedKafka, ssoClientSecretName(managedKafka)) == null && cachedSecret(managedKafka, ssoTlsSecretName(managedKafka)) == null; } @@ -64,48 +65,43 @@ public boolean isDeleted(ManagedKafka managedKafka) { } public void createOrUpdate(ManagedKafka managedKafka) { - if (isKafkaExternalCertificateEnabled) { - Secret currentKafkaTlsSecret = cachedSecret(managedKafka, kafkaTlsSecretName(managedKafka)); + Secret currentKafkaTlsSecret = cachedSecret(managedKafka, kafkaTlsSecretName(managedKafka)); + if (isKafkaExternalCertificateEnabled(managedKafka)) { Secret kafkaTlsSecret = kafkaTlsSecretFrom(managedKafka, currentKafkaTlsSecret); createOrUpdate(kafkaTlsSecret); + } else if (currentKafkaTlsSecret != null) { + secretResource(managedKafka, kafkaTlsSecretName(managedKafka)).delete(); } - if (isKafkaAuthenticationEnabled) { - Secret currentSsoClientSecret = cachedSecret(managedKafka, ssoClientSecretName(managedKafka)); + Secret currentSsoClientSecret = cachedSecret(managedKafka, ssoClientSecretName(managedKafka)); + Secret currentSsoTlsSecret = cachedSecret(managedKafka, ssoTlsSecretName(managedKafka)); + + if (isKafkaAuthenticationEnabled(managedKafka)) { Secret ssoClientSecret = ssoClientSecretFrom(managedKafka, currentSsoClientSecret); createOrUpdate(ssoClientSecret); if (managedKafka.getSpec().getOauth().getTlsTrustedCertificate() != null) { - Secret currentSsoTlsSecret = cachedSecret(managedKafka, ssoTlsSecretName(managedKafka)); Secret ssoTlsSecret = ssoTlsSecretFrom(managedKafka, currentSsoTlsSecret); createOrUpdate(ssoTlsSecret); - } else { - deleteOldTlsTrustedCertificateSecret(managedKafka); + } else if (currentSsoTlsSecret != null) { + secretResource(managedKafka, ssoTlsSecretName(managedKafka)).delete(); + } + } else { + if (currentSsoClientSecret != null) { + secretResource(managedKafka, ssoClientSecretName(managedKafka)).delete(); + } + if (currentSsoTlsSecret != null) { + secretResource(managedKafka, ssoTlsSecretName(managedKafka)).delete(); } - } - } - - /** - * Delete "not used" Secret containing TLS trusted certificates for OAuth server - * NOTE: - * If TLS trusted certificates are signed by a public CA (i.e. Let's Encrypt), passing them - * is not needed in the ManagedKafka resource, so for already running Kafka instances we can - * delete the corresponding Secret hosting them. - * - * @param managedKafka - */ - private void deleteOldTlsTrustedCertificateSecret(ManagedKafka managedKafka) { - if (cachedSecret(managedKafka, ssoTlsSecretName(managedKafka)) != null) { - secretResource(managedKafka, ssoTlsSecretName(managedKafka)).delete(); } } public void delete(ManagedKafka managedKafka) { - if (isKafkaExternalCertificateEnabled) { + if (isKafkaExternalCertificateEnabled(managedKafka)) { secretResource(managedKafka, kafkaTlsSecretName(managedKafka)).delete(); } - if (isKafkaAuthenticationEnabled) { + if (isKafkaAuthenticationEnabled(managedKafka)) { secretResource(managedKafka, ssoClientSecretName(managedKafka)).delete(); if (managedKafka.getSpec().getOauth().getTlsTrustedCertificate() != null) { secretResource(managedKafka, ssoTlsSecretName(managedKafka)).delete(); diff --git a/operator/src/main/resources/application.properties b/operator/src/main/resources/application.properties index 18c348f1a..ae4823ed3 100644 --- a/operator/src/main/resources/application.properties +++ b/operator/src/main/resources/application.properties @@ -1,8 +1,5 @@ agent.status.interval=60s -kafka.authentication.enabled=true -kafka.external.certificate.enabled=true - # can be removed after https://github.com/fabric8io/kubernetes-client/pull/2993 quarkus.log.category."io.fabric8.kubernetes.client.informers.cache.ReflectorWatcher".level=WARNING quarkus.log.console.format=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p [%c{3.}] (%t) %x %s%e%n diff --git a/operator/src/test/java/org/bf2/operator/operands/AdminServerTest.java b/operator/src/test/java/org/bf2/operator/operands/AdminServerTest.java index 671d4b50d..b44ccc599 100644 --- a/operator/src/test/java/org/bf2/operator/operands/AdminServerTest.java +++ b/operator/src/test/java/org/bf2/operator/operands/AdminServerTest.java @@ -36,6 +36,7 @@ void createAdminServerDeployment() { .build()) .withSpec( new ManagedKafkaSpecBuilder() + .withNewEndpoint().endEndpoint() .withNewVersions() .withKafka("2.6.0") .endVersions() diff --git a/operator/src/test/resources/expected/strimzi.yml b/operator/src/test/resources/expected/strimzi.yml index bf517d209..f2485e2e9 100644 --- a/operator/src/test/resources/expected/strimzi.yml +++ b/operator/src/test/resources/expected/strimzi.yml @@ -47,10 +47,6 @@ spec: enableOauthBearer: true type: "oauth" configuration: - brokerCertChainAndKey: - secretName: "test-mk-tls-secret" - certificate: "tls.crt" - key: "tls.key" bootstrap: host: "xxx.yyy.zzz" brokers: diff --git a/sync/src/main/java/org/bf2/sync/ManagedKafkaAgentSync.java b/sync/src/main/java/org/bf2/sync/ManagedKafkaAgentSync.java index 365c57458..bba69e341 100644 --- a/sync/src/main/java/org/bf2/sync/ManagedKafkaAgentSync.java +++ b/sync/src/main/java/org/bf2/sync/ManagedKafkaAgentSync.java @@ -13,6 +13,8 @@ import javax.enterprise.context.ApplicationScoped; import javax.inject.Inject; +import java.util.Objects; + @ApplicationScoped public class ManagedKafkaAgentSync { @@ -37,6 +39,7 @@ void loop() { return; } ManagedKafkaAgent managedKafkaAgent = controlPlane.getManagedKafkaAgent(); + Objects.requireNonNull(managedKafkaAgent); createOrUpdateManagedKafkaAgent(managedKafkaAgent); } @@ -52,7 +55,7 @@ private void createOrUpdateManagedKafkaAgent(ManagedKafkaAgent remoteAgent) { remoteAgent.getMetadata().setName(AgentResourceClient.RESOURCE_NAME); this.agentClient.create(remoteAgent); log.infof("ManagedKafkaAgent CR created"); - } else if (remoteAgent.getSpec() != null && !remoteAgent.getSpec().equals(resource.getSpec())) { + } else if (!remoteAgent.getSpec().equals(resource.getSpec())) { this.agentClient.edit(this.agentClient.getNamespace(), AgentResourceClient.RESOURCE_NAME, mka -> { mka.setSpec(remoteAgent.getSpec()); return mka;