Skip to content
This repository has been archived by the owner on Nov 22, 2024. It is now read-only.

Commit

Permalink
Remove Streamlet configurations creation (#945)
Browse files Browse the repository at this point in the history
* Remove Streamlet configurations creation

* Bump the protocol version
  • Loading branch information
andreaTP authored Jan 7, 2021
1 parent 39c04f9 commit d5b1979
Show file tree
Hide file tree
Showing 12 changed files with 59 additions and 754 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package cloudflow.operator.action

import java.nio.charset.StandardCharsets
import java.util.Collections
import scala.collection.immutable._
import scala.concurrent.duration.Duration
Expand All @@ -25,7 +26,7 @@ import scala.concurrent.Promise
import scala.jdk.CollectionConverters._

import akka.actor.ActorSystem
import com.typesafe.config.Config
import com.typesafe.config.{ Config, ConfigFactory }
import org.apache.kafka.clients.admin.Admin
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.clients.admin.CreateTopicsOptions
Expand All @@ -39,7 +40,7 @@ import skuber.json.format._

import cloudflow.blueprint.Blueprint
import cloudflow.blueprint.deployment._
import cloudflow.operator.event.ConfigInputChangeEvent
import cloudflow.operator.event.ConfigInput

/**
* Creates topic actions for managed topics.
Expand Down Expand Up @@ -128,12 +129,20 @@ object TopicActions {
.getOrElse(useClusterConfiguration(topic))
}

def getData(secret: Secret): String =
secret.data.get(ConfigInput.SecretDataKey).map(bytes => new String(bytes, StandardCharsets.UTF_8)).getOrElse("")

def getConfigFromSecret(secret: Secret): Config = {
val str = getData(secret)
ConfigFactory.parseString(str)
}

def createActionFromKafkaConfigSecret(secret: Secret,
newApp: CloudflowApplication.CR,
runners: Map[String, runner.Runner[_]],
labels: CloudflowLabels,
topic: TopicInfo) = {
val config = ConfigInputChangeEvent.getConfigFromSecret(secret)
val config = getConfigFromSecret(secret)
val topicInfo = TopicInfo(Topic(id = topic.id, cluster = topic.cluster, config = config))
createTopicOrError(newApp, runners, labels, topicInfo)
}
Expand All @@ -145,7 +154,7 @@ object TopicActions {
topic: TopicInfo) =
for {
secret <- secretOption
config = ConfigInputChangeEvent.getConfigFromSecret(secret)
config = getConfigFromSecret(secret)
kafkaConfig <- getKafkaConfig(config, topic)
topicWithKafkaConfig = TopicInfo(Topic(id = topic.id, config = kafkaConfig))
_ <- topicWithKafkaConfig.bootstrapServers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import skuber.rbac._
import skuber._
import cloudflow.blueprint.deployment._

import cloudflow.operator.event.ConfigInputChangeEvent
import cloudflow.operator.action._
import cloudflow.operator.event.ConfigInput

object Runner {
val ConfigMapMountPath = "/etc/cloudflow-runner"
Expand Down Expand Up @@ -240,7 +240,7 @@ trait Runner[T <: ObjectResource] {
): T

def getPodsConfig(secret: Secret): PodsConfig = {
val str = getData(secret, ConfigInputChangeEvent.PodsConfigDataKey)
val str = getData(secret, ConfigInput.PodsConfigDataKey)
PodsConfig
.fromConfig(ConfigFactory.parseString(str))
.recover {
Expand All @@ -255,7 +255,7 @@ trait Runner[T <: ObjectResource] {
}

def getRuntimeConfig(secret: Secret): Config = {
val str = getData(secret, ConfigInputChangeEvent.RuntimeConfigDataKey)
val str = getData(secret, ConfigInput.RuntimeConfigDataKey)
Try(ConfigFactory.parseString(str))
.recover {
case e =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cloudflow.operator
package event

object ConfigInput {
val SecretDataKey = "secret.conf"
val RuntimeConfigDataKey = "runtime-config.conf"
val PodsConfigDataKey = "pods-config.conf"
}
Loading

0 comments on commit d5b1979

Please sign in to comment.