diff --git a/core/cloudflow-operator-actions/src/main/scala/cloudflow/operator/action/ConfigurationScopeLayering.scala b/core/cloudflow-operator-actions/src/main/scala/cloudflow/operator/action/ConfigurationScopeLayering.scala deleted file mode 100644 index 0887a14f8..000000000 --- a/core/cloudflow-operator-actions/src/main/scala/cloudflow/operator/action/ConfigurationScopeLayering.scala +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Copyright (C) 2016-2020 Lightbend Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cloudflow.operator.action - -import scala.jdk.CollectionConverters._ -import scala.util.Try - -import cloudflow.blueprint.deployment.StreamletDeployment -import cloudflow.operator.action.TopicActions -import com.typesafe.config.Config -import com.typesafe.config.ConfigFactory - -/** - * Implementation of https://cloudflow.io/docs/current/develop/cloudflow-configuration.html - */ -object ConfigurationScopeLayering { - final case class Configs(streamlet: Config, runtime: Config, pods: Config) - - def configs(streamletDeployment: StreamletDeployment, appConfig: Config, clusterSecretConfigs: Map[String, Config]): Configs = { - val streamletName = streamletDeployment.streamletName - val runtime = streamletDeployment.runtime - - val streamletConfig = mergeToStreamletConfig(runtime, streamletName, appConfig) - val podsConfig = extractPodsConfig(streamletConfig) - val runtimeConfig = extractRuntimeConfig(runtime, streamletConfig) - - val asPortMappings = moveTopicsConfigToPortMappings(streamletDeployment, streamletConfig, appConfig, clusterSecretConfigs) - - Configs(asPortMappings, runtimeConfig, podsConfig) - } - - // open for unit testing - private[operator] def mergeToStreamletConfig(runtime: String, streamletName: String, appConfig: Config): Config = { - val runtimeConfig = getGlobalRuntimeConfigAtStreamletPath(runtime, streamletName, appConfig) - val kubernetesConfig = getGlobalKubernetesConfigAtStreamletPath(runtime, streamletName, appConfig) - var streamletConfig = getMergedStreamletConfig(streamletName, appConfig, runtimeConfig, kubernetesConfig) - streamletConfig = moveConfigParameters(streamletConfig, streamletName) - streamletConfig = mergeConfigToRoot(streamletConfig, streamletName, "config", prefixWithConfigKey = false) - mergeConfigToRoot(streamletConfig, streamletName, "kubernetes", prefixWithConfigKey = true) - } - - private val TopicsConfigPath = "cloudflow.topics" - private def streamletConfigPath(streamletName: String) = s"cloudflow.streamlets.$streamletName" - private val KubernetesKey = "kubernetes" - private def streamletRuntimeConfigPath(streamletName: String) = s"cloudflow.streamlets.$streamletName.config" - private def globalRuntimeConfigPath(runtime: String) = s"cloudflow.runtimes.$runtime.config" - - private def streamletKubernetesConfigPath(streamletName: String) = s"cloudflow.streamlets.$streamletName.$KubernetesKey" - private def globalKubernetesConfigPath(runtime: String) = s"cloudflow.runtimes.$runtime.$KubernetesKey" - - private def getMergedStreamletConfig(streamletName: String, appConfig: Config, runtimeConfig: Config, kubernetesConfig: Config) = { - val path = streamletConfigPath(streamletName) - - Try(appConfig.getConfig(path)).toOption - .getOrElse(ConfigFactory.empty()) - .atPath(path) - .withFallback(runtimeConfig) - .withFallback(kubernetesConfig) - } - - private def getGlobalRuntimeConfigAtStreamletPath(runtime: String, streamletName: String, appConfig: Config) = - Try(appConfig.getConfig(globalRuntimeConfigPath(runtime))).toOption - .getOrElse(ConfigFactory.empty()) - .atPath(streamletRuntimeConfigPath(streamletName)) - - private def getGlobalKubernetesConfigAtStreamletPath(runtime: String, streamletName: String, appConfig: Config) = - Try(appConfig.getConfig(globalKubernetesConfigPath(runtime))).toOption - .getOrElse(ConfigFactory.empty()) - .atPath(streamletKubernetesConfigPath(streamletName)) - - private def moveConfigParameters(config: Config, streamletName: String): Config = { - val key = streamletConfigPath(streamletName) - val configParametersKey = "config-parameters" - val absoluteConfigParameterKey = s"$key.$configParametersKey" - val configParametersSection = Try(config.getConfig(absoluteConfigParameterKey)).toOption - - configParametersSection - .map { c => - val adjustedConfigParametersConfig = c.atPath(key) - config.withoutPath(absoluteConfigParameterKey).withFallback(adjustedConfigParametersConfig) - } - .getOrElse(config) - } - - private def mergeConfigToRoot(streamletConfig: Config, streamletName: String, configKey: String, prefixWithConfigKey: Boolean): Config = { - val streamletKey = streamletConfigPath(streamletName) - - val absoluteConfigKey = s"$streamletKey.$configKey" - val configSection = Try(streamletConfig.getConfig(absoluteConfigKey)).toOption - // removing section and move its contents in the root. - configSection - .map { c => - val configs = c - .root() - .entrySet() - .asScala - .map { entry => - val key = - if (prefixWithConfigKey) s"$configKey.${entry.getKey}" - else entry.getKey - entry.getValue.atPath(key) - } - .toVector - val mergedConfig = streamletConfig.withoutPath(absoluteConfigKey) - configs.foldLeft(mergedConfig) { (acc, el) => - acc.withFallback(el) - } - } - .getOrElse(streamletConfig) - } - - /* - * Moves cloudflow.topics. config to cloudflow.runner.streamlet.context.port_mappings..config. - * If no cloudflow.topics. exists then use the named Kafka cluster if one exists, otherwise default. - * The runner merges the secret on top of the configmap, which brings everything together. - */ - private[operator] def moveTopicsConfigToPortMappings(deployment: StreamletDeployment, - streamletConfig: Config, - appConfig: Config, - clusterSecretConfigs: Map[String, Config]): Config = { - val defaultClusterConfig = clusterSecretConfigs.get(TopicActions.DefaultConfigurationName) - // Adding bootstrap-servers key for backwards compatibility - val kafkaBootstrapServersCompat2010 = defaultClusterConfig.map(_.getString("bootstrap.servers")) - - val portMappingConfigs = deployment.portMappings.flatMap { - case (port, topic) => - Try { - val clusterSecretConfig = - topic.cluster - .flatMap(clusterName => clusterSecretConfigs.get(clusterName)) - .orElse(defaultClusterConfig) - .getOrElse(ConfigFactory.empty()) - - val portMappingConfig = - if (appConfig.hasPath(s"$TopicsConfigPath.${topic.id}")) - appConfig.getConfig(s"$TopicsConfigPath.${topic.id}") - else - appConfig - - val portMappingWithFallbackConfig = portMappingConfig - .withFallback(topic.config) - .withFallback(clusterSecretConfig) - - portMappingWithFallbackConfig - .atPath(s"cloudflow.runner.streamlet.context.port_mappings.$port.config") - // Need to retain the topic.id - .withFallback(ConfigFactory.parseString(s""" - cloudflow.runner.streamlet.context.port_mappings.$port.id = ${topic.id} - """)) - }.toOption - } - val conf = portMappingConfigs - .foldLeft(streamletConfig) { (acc, el) => - acc.withFallback(el) - } - kafkaBootstrapServersCompat2010 - .map { bs => - conf.withFallback(ConfigFactory.parseString(s"""cloudflow.kafka.bootstrap-servers="${bs}"""")) - } - .getOrElse(conf) - } - - private def extractPodsConfig(streamletConfig: Config) = - Try(streamletConfig.getConfig(KubernetesKey).atPath(KubernetesKey)).getOrElse(ConfigFactory.empty) - - private def extractRuntimeConfig(runtime: String, streamletConfig: Config) = - Try(streamletConfig.getConfig(runtime).atPath(runtime)).getOrElse(ConfigFactory.empty) -} diff --git a/core/cloudflow-operator-actions/src/main/scala/cloudflow/operator/action/TopicActions.scala b/core/cloudflow-operator-actions/src/main/scala/cloudflow/operator/action/TopicActions.scala index b0bb266aa..d9c33d5a6 100644 --- a/core/cloudflow-operator-actions/src/main/scala/cloudflow/operator/action/TopicActions.scala +++ b/core/cloudflow-operator-actions/src/main/scala/cloudflow/operator/action/TopicActions.scala @@ -16,6 +16,7 @@ package cloudflow.operator.action +import java.nio.charset.StandardCharsets import java.util.Collections import scala.collection.immutable._ import scala.concurrent.duration.Duration @@ -25,7 +26,7 @@ import scala.concurrent.Promise import scala.jdk.CollectionConverters._ import akka.actor.ActorSystem -import com.typesafe.config.Config +import com.typesafe.config.{ Config, ConfigFactory } import org.apache.kafka.clients.admin.Admin import org.apache.kafka.clients.admin.AdminClientConfig import org.apache.kafka.clients.admin.CreateTopicsOptions @@ -39,7 +40,7 @@ import skuber.json.format._ import cloudflow.blueprint.Blueprint import cloudflow.blueprint.deployment._ -import cloudflow.operator.event.ConfigInputChangeEvent +import cloudflow.operator.event.ConfigInput /** * Creates topic actions for managed topics. @@ -128,12 +129,20 @@ object TopicActions { .getOrElse(useClusterConfiguration(topic)) } + def getData(secret: Secret): String = + secret.data.get(ConfigInput.SecretDataKey).map(bytes => new String(bytes, StandardCharsets.UTF_8)).getOrElse("") + + def getConfigFromSecret(secret: Secret): Config = { + val str = getData(secret) + ConfigFactory.parseString(str) + } + def createActionFromKafkaConfigSecret(secret: Secret, newApp: CloudflowApplication.CR, runners: Map[String, runner.Runner[_]], labels: CloudflowLabels, topic: TopicInfo) = { - val config = ConfigInputChangeEvent.getConfigFromSecret(secret) + val config = getConfigFromSecret(secret) val topicInfo = TopicInfo(Topic(id = topic.id, cluster = topic.cluster, config = config)) createTopicOrError(newApp, runners, labels, topicInfo) } @@ -145,7 +154,7 @@ object TopicActions { topic: TopicInfo) = for { secret <- secretOption - config = ConfigInputChangeEvent.getConfigFromSecret(secret) + config = getConfigFromSecret(secret) kafkaConfig <- getKafkaConfig(config, topic) topicWithKafkaConfig = TopicInfo(Topic(id = topic.id, config = kafkaConfig)) _ <- topicWithKafkaConfig.bootstrapServers diff --git a/core/cloudflow-operator-actions/src/main/scala/cloudflow/operator/action/runner/Runner.scala b/core/cloudflow-operator-actions/src/main/scala/cloudflow/operator/action/runner/Runner.scala index fa02999f5..07b15db63 100644 --- a/core/cloudflow-operator-actions/src/main/scala/cloudflow/operator/action/runner/Runner.scala +++ b/core/cloudflow-operator-actions/src/main/scala/cloudflow/operator/action/runner/Runner.scala @@ -29,8 +29,8 @@ import skuber.rbac._ import skuber._ import cloudflow.blueprint.deployment._ -import cloudflow.operator.event.ConfigInputChangeEvent import cloudflow.operator.action._ +import cloudflow.operator.event.ConfigInput object Runner { val ConfigMapMountPath = "/etc/cloudflow-runner" @@ -240,7 +240,7 @@ trait Runner[T <: ObjectResource] { ): T def getPodsConfig(secret: Secret): PodsConfig = { - val str = getData(secret, ConfigInputChangeEvent.PodsConfigDataKey) + val str = getData(secret, ConfigInput.PodsConfigDataKey) PodsConfig .fromConfig(ConfigFactory.parseString(str)) .recover { @@ -255,7 +255,7 @@ trait Runner[T <: ObjectResource] { } def getRuntimeConfig(secret: Secret): Config = { - val str = getData(secret, ConfigInputChangeEvent.RuntimeConfigDataKey) + val str = getData(secret, ConfigInput.RuntimeConfigDataKey) Try(ConfigFactory.parseString(str)) .recover { case e => diff --git a/core/cloudflow-operator-actions/src/main/scala/cloudflow/operator/event/ConfigInput.scala b/core/cloudflow-operator-actions/src/main/scala/cloudflow/operator/event/ConfigInput.scala new file mode 100644 index 000000000..8d18a654e --- /dev/null +++ b/core/cloudflow-operator-actions/src/main/scala/cloudflow/operator/event/ConfigInput.scala @@ -0,0 +1,24 @@ +/* + * Copyright (C) 2016-2020 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cloudflow.operator +package event + +object ConfigInput { + val SecretDataKey = "secret.conf" + val RuntimeConfigDataKey = "runtime-config.conf" + val PodsConfigDataKey = "pods-config.conf" +} diff --git a/core/cloudflow-operator-actions/src/main/scala/cloudflow/operator/event/ConfigInputChangeEvent.scala b/core/cloudflow-operator-actions/src/main/scala/cloudflow/operator/event/ConfigInputChangeEvent.scala deleted file mode 100644 index f159adff2..000000000 --- a/core/cloudflow-operator-actions/src/main/scala/cloudflow/operator/event/ConfigInputChangeEvent.scala +++ /dev/null @@ -1,220 +0,0 @@ -/* - * Copyright (C) 2016-2020 Lightbend Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cloudflow.operator -package event - -import java.nio.charset.StandardCharsets - -import scala.util.Try -import com.typesafe.config._ - -import org.slf4j.LoggerFactory - -import skuber.{ ListResource, ObjectEditor, ObjectMeta, ObjectResource, Secret } -import skuber.api.client.{ EventType, WatchEvent } -import skuber.json.format.secretFmt - -import cloudflow.operator.action._ -import cloudflow.blueprint.deployment.StreamletDeployment - -/** - * Indicates that the configuration of the application has been changed by the user. - */ -case class ConfigInputChangeEvent(appId: String, namespace: String, watchEvent: WatchEvent[Secret]) extends AppChangeEvent[Secret] - -object ConfigInputChangeEvent extends Event { - val log = LoggerFactory.getLogger(this.getClass) - - def toConfigInputChangeEvent( - currentSecrets: Map[String, WatchEvent[Secret]], - watchEvent: WatchEvent[Secret] - ): (Map[String, WatchEvent[Secret]], List[ConfigInputChangeEvent]) = { - val secret = watchEvent._object - val metadata = secret.metadata - val secretName = secret.metadata.name - val namespace = secret.metadata.namespace - val absoluteName = s"$namespace.$secretName" - - def hasChanged(existingEvent: WatchEvent[Secret]) = - watchEvent._object.resourceVersion != existingEvent._object.resourceVersion && getData(existingEvent._object) != getData(secret) - - watchEvent._type match { - case EventType.DELETED => - (currentSecrets - absoluteName, List()) - case EventType.ADDED | EventType.MODIFIED => - if (currentSecrets.get(absoluteName).forall(hasChanged)) { - (for { - appId <- metadata.labels.get(CloudflowLabels.AppIdLabel) - configFormat <- metadata.labels.get(CloudflowLabels.ConfigFormat) if configFormat == CloudflowLabels.InputConfig - _ = log.info(s"[app: $appId application configuration changed ${changeInfo(watchEvent)}]") - } yield { - (currentSecrets + (absoluteName -> watchEvent), List(ConfigInputChangeEvent(appId, namespace, watchEvent))) - }).getOrElse((currentSecrets, List())) - - } else (currentSecrets, List()) - } - } - - def toActionList(mappedApp: Option[CloudflowApplication.CR], event: ConfigInputChangeEvent, podNamespace: String): Seq[Action] = - (mappedApp, event) match { - case (Some(app), configInputChangeEvent) => - val appConfig = getConfigFromSecret(configInputChangeEvent) - - val clusterNames = (for { - d <- app.spec.deployments - (_, topic) <- d.portMappings - cluster <- topic.cluster.toVector - } yield cluster) :+ TopicActions.DefaultConfigurationName - - val providedAction = Action.providedByLabel[Secret](TopicActions.KafkaClusterNameLabel, clusterNames, podNamespace) { - clusterSecrets => - val allNamedClusters = namedClusters(app.name, clusterNames, clusterSecrets) - val actions = app.spec.deployments.map { streamletDeployment => - val configs = ConfigurationScopeLayering.configs(streamletDeployment, appConfig, allNamedClusters) - - // create update action for output secret action which is mounted as config by runtime specific deployments - val configSecret = - createSecret( - streamletDeployment.secretName, - app, - streamletDeployment, - configs.streamlet, - configs.runtime, - configs.pods, - CloudflowLabels.StreamletDeploymentConfigFormat - ) - Action.createOrUpdate(configSecret, secretEditor) - } - - Action.composite(actions) - } - - List(providedAction) - case _ => Nil // app could not be found, do nothing. - } - - val SecretDataKey = "secret.conf" - val RuntimeConfigDataKey = "runtime-config.conf" - val PodsConfigDataKey = "pods-config.conf" - - /** log message for when a ConfigInputChangeEvent is identified as a configuration change event */ - def detected[T <: ObjectResource](event: ConfigInputChangeEvent) = - s"User created or modified configuration for application ${event.appId}." - - def getConfigFromSecret(secret: Secret): Config = { - val str = getData(secret) - ConfigFactory.parseString(str) - } - - def getData(secret: Secret): String = - secret.data.get(ConfigInputChangeEvent.SecretDataKey).map(bytes => new String(bytes, StandardCharsets.UTF_8)).getOrElse("") - - /** - * Look up all Kafka cluster names referenced in streamlet definitions with the named cluster secrets found in K8s. - * If a referenced cluster name does not have a corresponding secret K8s then log an error, but continue. - * Return all named cluster configs and the 'default' cluster config, if one is defined. - */ - def namedClusters(appName: String, clusterNames: Vector[String], clusterSecrets: ListResource[Secret]) = { - val namedClusters: Map[String, Config] = clusterNames.flatMap { name => - val secret = clusterSecrets.items.find(_.metadata.labels.get(TopicActions.KafkaClusterNameLabel).contains(name)) - secret match { - case Some(secret) => - val clusterConfigStr = getConfigFromSecret(secret) - Vector(name -> clusterConfigStr) - case None => - log.error( - s""" - |The referenced cluster configuration secret '$name' for app '$appName' does not exist. - |This should have been detected at deploy time. - |This will lead to a runtime exception when the streamlet runs. - """.stripMargin - ) - Nil - } - }.toMap - - val maybeDefaultCluster = clusterSecrets.items - .find(_.metadata.labels.get(TopicActions.KafkaClusterNameLabel).contains(TopicActions.DefaultConfigurationName)) - .map(s => TopicActions.DefaultConfigurationName -> getConfigFromSecret(s)) - .toMap - - namedClusters ++ maybeDefaultCluster - } - - def getConfigFromSecret(configInputChangeEvent: ConfigInputChangeEvent) = { - val secret: Secret = configInputChangeEvent.watchEvent._object - secret.data - .get(ConfigInputChangeEvent.SecretDataKey) - .flatMap { bytes => - val str = new String(bytes, StandardCharsets.UTF_8) - Try(ConfigFactory.parseString(str).resolve()).recover { - case cause => - log.error( - s"Detected input secret '${secret.metadata.name}' contains invalid configuration data, IGNORING configuration.", - cause - ) - ConfigFactory.empty() - }.toOption - } - .getOrElse { - log.error( - s"Detected input secret '${secret.metadata.name}' does not have data key '${ConfigInputChangeEvent.SecretDataKey}', IGNORING configuration." - ) - ConfigFactory.empty() - } - } - - def createSecret( - secretName: String, - app: CloudflowApplication.CR, - streamletDeployment: StreamletDeployment, - config: Config, - runtimeConfig: Config, - podsConfig: Config, - configFormat: String - ) = { - def render(config: Config): Array[Byte] = - config - .root() - .render(ConfigRenderOptions.concise()) - .getBytes(StandardCharsets.UTF_8) - - Secret( - metadata = ObjectMeta( - name = secretName, - namespace = app.metadata.namespace, - labels = - CloudflowLabels(app).baseLabels ++ Map( - CloudflowLabels.AppIdLabel -> app.spec.appId, - CloudflowLabels.StreamletNameLabel -> streamletDeployment.streamletName, - CloudflowLabels.ConfigFormat -> configFormat - ), - ownerReferences = CloudflowApplication.getOwnerReferences(app) - ), - data = Map( - ConfigInputChangeEvent.SecretDataKey -> render(config), - ConfigInputChangeEvent.RuntimeConfigDataKey -> render(runtimeConfig), - ConfigInputChangeEvent.PodsConfigDataKey -> render(podsConfig) - ) - ) - } - - def secretEditor = new ObjectEditor[Secret] { - def updateMetadata(obj: Secret, newMetadata: ObjectMeta): Secret = obj.copy(metadata = newMetadata) - } - -} diff --git a/core/cloudflow-operator-actions/src/test/scala/cloudflow/operator/action/ConfigurationScopeLayeringSpec.scala b/core/cloudflow-operator-actions/src/test/scala/cloudflow/operator/action/ConfigurationScopeLayeringSpec.scala deleted file mode 100644 index c18afe25e..000000000 --- a/core/cloudflow-operator-actions/src/test/scala/cloudflow/operator/action/ConfigurationScopeLayeringSpec.scala +++ /dev/null @@ -1,224 +0,0 @@ -/* - * Copyright (C) 2016-2020 Lightbend Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cloudflow.operator.action - -import cloudflow.blueprint.deployment.StreamletDeployment -import cloudflow.blueprint.deployment.Topic -import org.scalatest.{ ConfigMap => _ } -import org.scalatest._ -import com.typesafe.config.ConfigFactory - -class ConfigurationScopeLayeringSpec - extends WordSpec - with MustMatchers - with GivenWhenThen - with EitherValues - with OptionValues - with Inspectors - with TestDeploymentContext { - - "ConfigurationScopeLayering" should { - "transform the config" in { - val appConfig = ConfigFactory.parseString(""" - cloudflow { - streamlets.logger { - config-parameters { - log-level = warning - foo = bar - } - config { - akka.loglevel = "DEBUG" - } - kubernetes { - pods { - pod { - containers { - cloudflow { - env = [ - { name = "JAVA_OPTS" - value = "-XX:MaxRAMPercentage=40.0" - } - ] - # limited settings that we want to support - resources { - requests { - memory = "1G" - } - } - } - } - } - } - } - } - runtimes.akka.config { - akka.loglevel = INFO - akka.kafka.producer.parallelism = 15000 - } - } - cloudflow.streamlets.logger.config-parameters.log-level="info" - cloudflow.streamlets.logger.config-parameters.msg-prefix="valid-logger" - """) - - val runtime = "akka" - val streamletName = "logger" - val loggerConfig = ConfigurationScopeLayering.mergeToStreamletConfig(runtime, streamletName, appConfig) - - loggerConfig.getString("cloudflow.streamlets.logger.log-level") mustBe "info" - loggerConfig.getString("cloudflow.streamlets.logger.foo") mustBe "bar" - loggerConfig.getString("cloudflow.streamlets.logger.msg-prefix") mustBe "valid-logger" - loggerConfig.getInt("akka.kafka.producer.parallelism") mustBe 15000 - loggerConfig.getString("akka.loglevel") mustBe "DEBUG" - loggerConfig.getMemorySize("kubernetes.pods.pod.containers.cloudflow.resources.requests.memory").toBytes mustBe 1024 * 1024 * 1024 - } - - "transform the Kafka config for a StreamletDeployment" in { - val deployment = StreamletDeployment( - name = "some-app-id", - runtime = "akka", - image = "docker-registry.foo.com/lightbend/call-record-pipeline:277-ceb9629", - streamletName = "akka-streamlet", - className = "cloudflow.operator.runner.AkkaRunner", - endpoint = None, - secretName = "akka-streamlet", - config = ConfigFactory.empty(), - portMappings = Map( - "maybe" -> Topic("maybe-valid"), - "sometimes" -> Topic("sometimes"), - "invalid" -> Topic("invalid-metrics"), - "valid" -> Topic("valid-metrics", cluster = Some("non-default-named-cluster")), - "in" -> Topic( - "metrics", - config = ConfigFactory.parseString(""" - |bootstrap.servers = "inline-config-kafka-bootstrap:9092" - |connection-config { - | connection.foo.bar = "inline-baz" - |} - |producer-config { - | producer.foo.bar = "inline-baz" - |} - |consumer-config { - | consumer.foo.bar = "inline-baz" - |} - |""".stripMargin) - ) - ), - volumeMounts = None, - replicas = None - ) - - val appConfig = ConfigFactory.parseString(""" - |cloudflow.topics { - | invalid-metrics { - | bootstrap.servers = "app-cluster:9092" - | connection-config { - | connection.foo.bar = "app-baz" - | } - | producer-config { - | producer.foo.bar = "app-baz" - | } - | consumer-config { - | consumer.foo.bar = "app-baz" - | } - | } - | sometimes { - | connection-config { - | connection2.foo.bar = "sometimes-baz" - | } - | producer-config { - | producer.foo.bar = "sometimes-baz" - | } - | consumer-config { - | consumer.foo.bar = "sometimes-baz" - | } - | } - |} - |""".stripMargin) - - val defaultClusterConfig = ConfigFactory.parseString(""" - |bootstrap.servers = "default-named-cluster:9092" - |connection-config { - | connection.foo.bar = "default-baz" - |} - |producer-config { - | producer.foo.bar = "default-baz" - |} - |consumer-config { - | consumer.foo.bar = "default-baz" - |} - """.stripMargin) - - val nonDefaultClusterConfig = ConfigFactory.parseString(""" - |bootstrap.servers = "non-default-named-cluster:9092" - |connection-config { - | connection.foo.bar = "non-default-baz" - |} - |producer-config { - | producer.foo.bar = "non-default-baz" - |} - |consumer-config { - | consumer.foo.bar = "non-default-baz" - |} - """.stripMargin) - - val clusterSecretConfigs = Map("default" -> defaultClusterConfig, "non-default-named-cluster" -> nonDefaultClusterConfig) - - val configs = ConfigurationScopeLayering.configs(deployment, appConfig, clusterSecretConfigs) - - // 'maybe' port uses 'default' named config - val maybeValidPort = "maybe" - val maybeValidConfig = configs.streamlet.getConfig(s"cloudflow.runner.streamlet.context.port_mappings.$maybeValidPort.config") - maybeValidConfig.getString("bootstrap.servers") mustBe "default-named-cluster:9092" - maybeValidConfig.getConfig("connection-config").getString("connection.foo.bar") mustBe "default-baz" - maybeValidConfig.getConfig("producer-config").getString("producer.foo.bar") mustBe "default-baz" - maybeValidConfig.getConfig("consumer-config").getString("consumer.foo.bar") mustBe "default-baz" - configs.streamlet.getString("cloudflow.kafka.bootstrap-servers") mustBe "default-named-cluster:9092" - // 'sometimes' port uses global topic configuration from app config, and falls back 'default' named config for bootstrap.servers and other config - val sometimesPort = "sometimes" - val sometimesConfig = configs.streamlet.getConfig(s"cloudflow.runner.streamlet.context.port_mappings.$sometimesPort.config") - sometimesConfig.getString("bootstrap.servers") mustBe "default-named-cluster:9092" - sometimesConfig.getConfig("connection-config").getString("connection.foo.bar") mustBe "default-baz" - sometimesConfig.getConfig("connection-config").getString("connection2.foo.bar") mustBe "sometimes-baz" - sometimesConfig.getConfig("producer-config").getString("producer.foo.bar") mustBe "sometimes-baz" - sometimesConfig.getConfig("consumer-config").getString("consumer.foo.bar") mustBe "sometimes-baz" - - // 'invalid' port uses global topic configuration from app config - val invalidPort = "invalid" - val invalidConfig = configs.streamlet.getConfig(s"cloudflow.runner.streamlet.context.port_mappings.$invalidPort.config") - invalidConfig.getString("bootstrap.servers") mustBe "app-cluster:9092" - invalidConfig.getConfig("connection-config").getString("connection.foo.bar") mustBe "app-baz" - invalidConfig.getConfig("producer-config").getString("producer.foo.bar") mustBe "app-baz" - invalidConfig.getConfig("consumer-config").getString("consumer.foo.bar") mustBe "app-baz" - - // 'valid' port uses a user-defined 'non-default' named config - val validPort = "valid" - val validConfig = configs.streamlet.getConfig(s"cloudflow.runner.streamlet.context.port_mappings.$validPort.config") - validConfig.getString("bootstrap.servers") mustBe "non-default-named-cluster:9092" - validConfig.getConfig("connection-config").getString("connection.foo.bar") mustBe "non-default-baz" - validConfig.getConfig("producer-config").getString("producer.foo.bar") mustBe "non-default-baz" - validConfig.getConfig("consumer-config").getString("consumer.foo.bar") mustBe "non-default-baz" - - // 'in' port uses inline config - val inPort = "in" - val inConfig = configs.streamlet.getConfig(s"cloudflow.runner.streamlet.context.port_mappings.$inPort.config") - inConfig.getString("bootstrap.servers") mustBe "inline-config-kafka-bootstrap:9092" - inConfig.getConfig("connection-config").getString("connection.foo.bar") mustBe "inline-baz" - inConfig.getConfig("producer-config").getString("producer.foo.bar") mustBe "inline-baz" - inConfig.getConfig("consumer-config").getString("consumer.foo.bar") mustBe "inline-baz" - } - } -} diff --git a/core/cloudflow-operator-actions/src/test/scala/cloudflow/operator/action/runner/AkkaRunnerSpec.scala b/core/cloudflow-operator-actions/src/test/scala/cloudflow/operator/action/runner/AkkaRunnerSpec.scala index b5a74acf3..5a86b3509 100644 --- a/core/cloudflow-operator-actions/src/test/scala/cloudflow/operator/action/runner/AkkaRunnerSpec.scala +++ b/core/cloudflow-operator-actions/src/test/scala/cloudflow/operator/action/runner/AkkaRunnerSpec.scala @@ -85,7 +85,7 @@ class AkkaRunnerSpec extends WordSpecLike with OptionValues with MustMatchers wi configSecret = Secret( metadata = ObjectMeta(), data = Map( - cloudflow.operator.event.ConfigInputChangeEvent.PodsConfigDataKey -> + cloudflow.operator.event.ConfigInput.PodsConfigDataKey -> """ |kubernetes.pods.pod { | labels { @@ -110,7 +110,7 @@ class AkkaRunnerSpec extends WordSpecLike with OptionValues with MustMatchers wi configSecret = Secret( metadata = ObjectMeta(), data = Map( - cloudflow.operator.event.ConfigInputChangeEvent.PodsConfigDataKey -> + cloudflow.operator.event.ConfigInput.PodsConfigDataKey -> """ |kubernetes.pods.pod { | annotations { @@ -135,7 +135,7 @@ class AkkaRunnerSpec extends WordSpecLike with OptionValues with MustMatchers wi configSecret = Secret( metadata = ObjectMeta(), data = Map( - cloudflow.operator.event.ConfigInputChangeEvent.PodsConfigDataKey -> + cloudflow.operator.event.ConfigInput.PodsConfigDataKey -> """ |kubernetes.pods.pod { | containers.container { @@ -170,7 +170,7 @@ class AkkaRunnerSpec extends WordSpecLike with OptionValues with MustMatchers wi configSecret = Secret( metadata = ObjectMeta(), data = Map( - cloudflow.operator.event.ConfigInputChangeEvent.PodsConfigDataKey -> + cloudflow.operator.event.ConfigInput.PodsConfigDataKey -> """ |kubernetes.pods.pod { | volumes { diff --git a/core/cloudflow-operator-actions/src/test/scala/cloudflow/operator/action/runner/FlinkRunnerSpec.scala b/core/cloudflow-operator-actions/src/test/scala/cloudflow/operator/action/runner/FlinkRunnerSpec.scala index 7cdf2f85c..43aa5b72f 100644 --- a/core/cloudflow-operator-actions/src/test/scala/cloudflow/operator/action/runner/FlinkRunnerSpec.scala +++ b/core/cloudflow-operator-actions/src/test/scala/cloudflow/operator/action/runner/FlinkRunnerSpec.scala @@ -87,7 +87,7 @@ class FlinkRunnerSpec extends WordSpecLike with OptionValues with MustMatchers w configSecret = Secret( metadata = ObjectMeta(), data = Map( - cloudflow.operator.event.ConfigInputChangeEvent.PodsConfigDataKey -> + cloudflow.operator.event.ConfigInput.PodsConfigDataKey -> """ |kubernetes.pods.pod.containers.container { | env = [ @@ -151,7 +151,7 @@ class FlinkRunnerSpec extends WordSpecLike with OptionValues with MustMatchers w configSecret = Secret( metadata = ObjectMeta(), data = Map( - cloudflow.operator.event.ConfigInputChangeEvent.PodsConfigDataKey -> + cloudflow.operator.event.ConfigInput.PodsConfigDataKey -> """ |kubernetes.pods.pod { | labels: { @@ -187,7 +187,7 @@ class FlinkRunnerSpec extends WordSpecLike with OptionValues with MustMatchers w configSecret = Secret( metadata = ObjectMeta(), data = Map( - cloudflow.operator.event.ConfigInputChangeEvent.PodsConfigDataKey -> + cloudflow.operator.event.ConfigInput.PodsConfigDataKey -> """ |kubernetes.pods.pod { | volumes { @@ -240,7 +240,7 @@ class FlinkRunnerSpec extends WordSpecLike with OptionValues with MustMatchers w configSecret = Secret( metadata = ObjectMeta(), data = Map( - cloudflow.operator.event.ConfigInputChangeEvent.PodsConfigDataKey -> + cloudflow.operator.event.ConfigInput.PodsConfigDataKey -> """ |kubernetes.pods.pod.containers.container { | env = [ @@ -265,7 +265,7 @@ class FlinkRunnerSpec extends WordSpecLike with OptionValues with MustMatchers w configSecret = Secret( metadata = ObjectMeta(), data = Map( - cloudflow.operator.event.ConfigInputChangeEvent.PodsConfigDataKey -> + cloudflow.operator.event.ConfigInput.PodsConfigDataKey -> """ |kubernetes.pods.pod.containers.container { | env = [ @@ -275,7 +275,7 @@ class FlinkRunnerSpec extends WordSpecLike with OptionValues with MustMatchers w | ] |} | """.stripMargin.getBytes(), - cloudflow.operator.event.ConfigInputChangeEvent.RuntimeConfigDataKey -> + cloudflow.operator.event.ConfigInput.RuntimeConfigDataKey -> """flink.env.java.opts = "-XX:-DisableExplicitGC"""".getBytes() ) ) diff --git a/core/cloudflow-operator-actions/src/test/scala/cloudflow/operator/action/runner/SparkRunnerSpec.scala b/core/cloudflow-operator-actions/src/test/scala/cloudflow/operator/action/runner/SparkRunnerSpec.scala index b4911f360..caf3e179c 100644 --- a/core/cloudflow-operator-actions/src/test/scala/cloudflow/operator/action/runner/SparkRunnerSpec.scala +++ b/core/cloudflow-operator-actions/src/test/scala/cloudflow/operator/action/runner/SparkRunnerSpec.scala @@ -119,7 +119,7 @@ class SparkRunnerSpec extends WordSpecLike with OptionValues with MustMatchers w configSecret = Secret( metadata = ObjectMeta(), data = Map( - cloudflow.operator.event.ConfigInputChangeEvent.PodsConfigDataKey -> + cloudflow.operator.event.ConfigInput.PodsConfigDataKey -> """ |kubernetes.pods.driver { | labels: { @@ -154,7 +154,7 @@ class SparkRunnerSpec extends WordSpecLike with OptionValues with MustMatchers w configSecret = Secret( metadata = ObjectMeta(), data = Map( - cloudflow.operator.event.ConfigInputChangeEvent.PodsConfigDataKey -> + cloudflow.operator.event.ConfigInput.PodsConfigDataKey -> """ |kubernetes.pods.executor { | labels: { @@ -189,7 +189,7 @@ class SparkRunnerSpec extends WordSpecLike with OptionValues with MustMatchers w configSecret = Secret( metadata = ObjectMeta(), data = Map( - cloudflow.operator.event.ConfigInputChangeEvent.PodsConfigDataKey -> + cloudflow.operator.event.ConfigInput.PodsConfigDataKey -> """ |kubernetes.pods.pod { | labels: { @@ -224,7 +224,7 @@ class SparkRunnerSpec extends WordSpecLike with OptionValues with MustMatchers w configSecret = Secret( metadata = ObjectMeta(), data = Map( - cloudflow.operator.event.ConfigInputChangeEvent.PodsConfigDataKey -> + cloudflow.operator.event.ConfigInput.PodsConfigDataKey -> """ |kubernetes.pods.pod { | volumes { @@ -282,7 +282,7 @@ class SparkRunnerSpec extends WordSpecLike with OptionValues with MustMatchers w configSecret = Secret( metadata = ObjectMeta(), data = Map( - cloudflow.operator.event.ConfigInputChangeEvent.PodsConfigDataKey -> + cloudflow.operator.event.ConfigInput.PodsConfigDataKey -> """ |kubernetes.pods { | pod { @@ -348,7 +348,7 @@ class SparkRunnerSpec extends WordSpecLike with OptionValues with MustMatchers w configSecret = Secret( metadata = ObjectMeta(), data = Map( - cloudflow.operator.event.ConfigInputChangeEvent.PodsConfigDataKey -> + cloudflow.operator.event.ConfigInput.PodsConfigDataKey -> """ |kubernetes.pods { | driver { @@ -402,7 +402,7 @@ class SparkRunnerSpec extends WordSpecLike with OptionValues with MustMatchers w configSecret = Secret( metadata = ObjectMeta(), data = Map( - cloudflow.operator.event.ConfigInputChangeEvent.PodsConfigDataKey -> + cloudflow.operator.event.ConfigInput.PodsConfigDataKey -> """ |kubernetes.pods.pod { | volumes { @@ -444,7 +444,7 @@ class SparkRunnerSpec extends WordSpecLike with OptionValues with MustMatchers w configSecret = Secret( metadata = ObjectMeta(), data = Map( - cloudflow.operator.event.ConfigInputChangeEvent.PodsConfigDataKey -> + cloudflow.operator.event.ConfigInput.PodsConfigDataKey -> """ |kubernetes.pods { | pod { diff --git a/core/cloudflow-operator/src/main/scala/cloudflow/operator/Main.scala b/core/cloudflow-operator/src/main/scala/cloudflow/operator/Main.scala index 69a449eb0..e95228327 100644 --- a/core/cloudflow-operator/src/main/scala/cloudflow/operator/Main.scala +++ b/core/cloudflow-operator/src/main/scala/cloudflow/operator/Main.scala @@ -58,7 +58,6 @@ object Main extends { ) Operator.handleAppEvents(client, runners, ctx.podName, ctx.podNamespace) Operator.handleConfigurationUpdates(client, runners, ctx.podName) - Operator.handleConfigurationInput(client, ctx.podNamespace) Operator.handleStatusUpdates(client, runners) } catch { case t: Throwable => diff --git a/core/cloudflow-operator/src/main/scala/cloudflow/operator/Operator.scala b/core/cloudflow-operator/src/main/scala/cloudflow/operator/Operator.scala index 989d8e78e..68b8f0d00 100644 --- a/core/cloudflow-operator/src/main/scala/cloudflow/operator/Operator.scala +++ b/core/cloudflow-operator/src/main/scala/cloudflow/operator/Operator.scala @@ -41,7 +41,7 @@ import cloudflow.operator.flow._ object Operator { lazy val log = LoggerFactory.getLogger("Operator") - val ProtocolVersion = "4" + val ProtocolVersion = "5" val ProtocolVersionKey = "protocol-version" val ProtocolVersionConfigMapName = "cloudflow-protocol-version" def ProtocolVersionConfigMap(ownerReferences: List[OwnerReference]) = ConfigMap( @@ -105,37 +105,6 @@ object Operator { ) } - def handleConfigurationInput( - client: KubernetesClient, - podNamespace: String - )(implicit system: ActorSystem, mat: Materializer, ec: ExecutionContext) = { - val logAttributes = Attributes.logLevels(onElement = Attributes.LogLevels.Info) - val actionExecutor = new SkuberActionExecutor() - // only watch secrets that contain input config - val watchOptions = ListOptions( - labelSelector = Some( - LabelSelector( - LabelSelector.IsEqualRequirement(CloudflowLabels.ManagedBy, CloudflowLabels.ManagedByCloudflow), - LabelSelector.IsEqualRequirement(CloudflowLabels.ConfigFormat, CloudflowLabels.InputConfig) - ) - ), - resourceVersion = None - ) - // watch only Input secrets, transform the application input secret - // into Output secret create actions. - runStream( - watch[Secret](client, watchOptions) - .via(ConfigInputChangeEventFlow.fromWatchEvent()) - .log("config-input-change-event", ConfigInputChangeEvent.detected) - .via(mapToAppInSameNamespace[Secret, ConfigInputChangeEvent](client)) - .via(ConfigInputChangeEventFlow.toInputConfigUpdateAction(podNamespace)) - .via(executeActions(actionExecutor, logAttributes)) - .toMat(Sink.ignore)(Keep.right), - "The configuration input stream completed unexpectedly, terminating.", - "The configuration input stream failed, terminating." - ) - } - def handleConfigurationUpdates( client: KubernetesClient, runners: Map[String, Runner[_]], diff --git a/core/cloudflow-operator/src/main/scala/cloudflow/operator/flow/ConfigInputChangeEventFlow.scala b/core/cloudflow-operator/src/main/scala/cloudflow/operator/flow/ConfigInputChangeEventFlow.scala deleted file mode 100644 index 67d45e3b6..000000000 --- a/core/cloudflow-operator/src/main/scala/cloudflow/operator/flow/ConfigInputChangeEventFlow.scala +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Copyright (C) 2016-2020 Lightbend Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/* - * Copyright (C) 2016-2020 Lightbend Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cloudflow.operator -package flow - -import java.util.concurrent.atomic.AtomicReference -import akka.NotUsed -import akka.stream.scaladsl._ -import skuber._ -import skuber.api.client._ -import cloudflow.operator.action._ -import cloudflow.operator.event._ - -object ConfigInputChangeEventFlow { - import ConfigInputChangeEvent._ - - // keeps state of config input across stream restarts - val secretsRef = new AtomicReference(Map[String, WatchEvent[Secret]]()) - - /** - * Transforms [[skuber.api.client.WatchEvent]]s into [[ConfigInputChangeEvent]]s. - */ - def fromWatchEvent(): Flow[WatchEvent[Secret], ConfigInputChangeEvent, NotUsed] = - Flow[WatchEvent[Secret]] - .mapConcat { watchEvent => - val currentSecrets = secretsRef.get - val (updatedSecrets, events) = toConfigInputChangeEvent(currentSecrets, watchEvent) - secretsRef.set(updatedSecrets) - events - } - - def toInputConfigUpdateAction( - podNamespace: String - ): Flow[(Option[CloudflowApplication.CR], ConfigInputChangeEvent), Action, NotUsed] = - Flow[(Option[CloudflowApplication.CR], ConfigInputChangeEvent)] - .map { - case (mappedApp, event) => toActionList(mappedApp, event, podNamespace) - } - .mapConcat(_.toList) -}