diff --git a/kafkakewl-deploy/src/main/scala/com/mwam/kafkakewl/deploy/Main.scala b/kafkakewl-deploy/src/main/scala/com/mwam/kafkakewl/deploy/Main.scala index 14fbe95..5a4c8de 100644 --- a/kafkakewl-deploy/src/main/scala/com/mwam/kafkakewl/deploy/Main.scala +++ b/kafkakewl-deploy/src/main/scala/com/mwam/kafkakewl/deploy/Main.scala @@ -10,7 +10,7 @@ import com.mwam.kafkakewl.common.http.HttpServer import com.mwam.kafkakewl.common.persistence.{KafkaPersistentStore, PersistentStore} import com.mwam.kafkakewl.common.telemetry.GlobalTracer import com.mwam.kafkakewl.deploy.endpoints.* -import com.mwam.kafkakewl.deploy.services.TopologyDeploymentsService +import com.mwam.kafkakewl.deploy.services.{TopologyDeploymentsService, TopologyDeploymentsToKafkaService} import com.mwam.kafkakewl.domain.config.KafkaClientConfig import sttp.tapir.server.metrics.zio.ZioMetrics import sttp.tapir.server.ziohttp.{ZioHttpInterpreter, ZioHttpServerOptions} @@ -71,6 +71,7 @@ object Main extends ZIOAppDefault { MetricsConfig.live, KafkaPersistentStore.live, TopologyDeploymentsService.live, + TopologyDeploymentsToKafkaService.live, DeploymentsEndpoints.live, DeploymentsServerEndpoints.live, Endpoints.live, diff --git a/kafkakewl-deploy/src/main/scala/com/mwam/kafkakewl/deploy/services/TopologyDeploymentsService.scala b/kafkakewl-deploy/src/main/scala/com/mwam/kafkakewl/deploy/services/TopologyDeploymentsService.scala index bcd6bfc..fd02fa2 100644 --- a/kafkakewl-deploy/src/main/scala/com/mwam/kafkakewl/deploy/services/TopologyDeploymentsService.scala +++ b/kafkakewl-deploy/src/main/scala/com/mwam/kafkakewl/deploy/services/TopologyDeploymentsService.scala @@ -13,6 +13,7 @@ import zio.* class TopologyDeploymentsService private ( private val persistentStore: PersistentStore, + private val topologyDeploymentsToKafkaService: TopologyDeploymentsToKafkaService, private val mutex: Semaphore, private val topologyDeploymentsRef: Ref[TopologyDeployments] ) { @@ -20,6 +21,7 @@ class TopologyDeploymentsService private ( mutex.withPermit { for { _ <- ZIO.logInfo(s"deploying $deployments") + // TODO authorization // Validation before deployment @@ -29,20 +31,29 @@ class TopologyDeploymentsService private ( .toZIOParallelErrors .mapError(DeploymentsFailure.validation) - // TODO performing the kafka deployment itself, returns the new TopologyDeployments - // TODO persisting the new TopologyDeployments - // TODO publishing the change-log messages - // TODO update the in-memory state + // Performing the actual deployment to kafka (dryRun is respected here) + success <- topologyDeploymentsToKafkaService + .deploy(deployments) + .logError("deploying TopologyDeployments") + .mapError(DeploymentsFailure.deployment) - // Just same fake topology deployments for now + // Creating the new TopologyDeployments from the statuses topologyDeployments = deployments.deploy - .map(t => (t.id, TopologyDeployment(t.id, TopologyDeploymentStatus(), Some(t)))) + .map(t => (t.id, TopologyDeployment(t.id, success.statuses(t.id), Some(t)))) .toMap ++ deployments.delete - .map(tid => (tid, TopologyDeployment(tid, TopologyDeploymentStatus(), None))) + .map(tid => (tid, TopologyDeployment(tid, success.statuses(tid), None))) .toMap - _ <- persistentStore.save(topologyDeployments).logError("saving TopologyDeployments").mapError(DeploymentsFailure.persistence) - _ <- topologyDeploymentsRef.update { _ ++ topologyDeployments -- deployments.delete } + _ <- (for { + // Persisting the new TopologyDeployments + _ <- persistentStore.save(topologyDeployments).logError("saving TopologyDeployments").mapError(DeploymentsFailure.persistence) + + // Updating the in-memory state if everything succeeded so far + _ <- topologyDeploymentsRef.update { + _ ++ topologyDeployments -- deployments.delete + } + } yield ()).unless(deployments.options.dryRun) // Not making any actual changes if dryRun = true + _ <- ZIO.logInfo(s"finished deploying $deployments") } yield DeploymentsSuccess( topologyDeployments @@ -62,14 +73,15 @@ class TopologyDeploymentsService private ( } object TopologyDeploymentsService { - def live: ZLayer[PersistentStore, Nothing, TopologyDeploymentsService] = + def live: ZLayer[PersistentStore & TopologyDeploymentsToKafkaService, Nothing, TopologyDeploymentsService] = ZLayer.fromZIO { for { persistentStore <- ZIO.service[PersistentStore] + topologyDeploymentsToKafkaService <- ZIO.service[TopologyDeploymentsToKafkaService] // TODO what happens if we fail here? topologyDeployments <- persistentStore.loadLatest().orDie mutex <- Semaphore.make(permits = 1) topologyDeploymentsRef <- Ref.make(topologyDeployments) - } yield TopologyDeploymentsService(persistentStore, mutex, topologyDeploymentsRef) + } yield TopologyDeploymentsService(persistentStore, topologyDeploymentsToKafkaService, mutex, topologyDeploymentsRef) } } diff --git a/kafkakewl-deploy/src/main/scala/com/mwam/kafkakewl/deploy/services/TopologyDeploymentsToKafkaService.scala b/kafkakewl-deploy/src/main/scala/com/mwam/kafkakewl/deploy/services/TopologyDeploymentsToKafkaService.scala new file mode 100644 index 0000000..d3c1c37 --- /dev/null +++ b/kafkakewl-deploy/src/main/scala/com/mwam/kafkakewl/deploy/services/TopologyDeploymentsToKafkaService.scala @@ -0,0 +1,30 @@ +/* + * SPDX-FileCopyrightText: 2023 Marshall Wace + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.mwam.kafkakewl.deploy.services + +import com.mwam.kafkakewl.domain.{Deployments, DeploymentsSuccess, TopologyDeploymentStatus} +import zio.* + +class TopologyDeploymentsToKafkaService { + def deploy(deployments: Deployments): Task[DeploymentsSuccess] = { + ZIO.succeed { + // For now we just return empty statuses for all deploy/delete topologies + DeploymentsSuccess( + deployments.deploy.map(topology => (topology.id, TopologyDeploymentStatus())).toMap ++ + deployments.delete.map(tid => (tid, TopologyDeploymentStatus())).toMap + ) + } + } +} + +object TopologyDeploymentsToKafkaService { + def live: ZLayer[Any, Nothing, TopologyDeploymentsToKafkaService] = + // TODO proper dependencies, etc... + ZLayer.succeed { + TopologyDeploymentsToKafkaService() + } +} diff --git a/kafkakewl-domain/src/main/scala/com/mwam/kafkakewl/domain/Deployments.scala b/kafkakewl-domain/src/main/scala/com/mwam/kafkakewl/domain/Deployments.scala index 7c2a63a..182c3c0 100644 --- a/kafkakewl-domain/src/main/scala/com/mwam/kafkakewl/domain/Deployments.scala +++ b/kafkakewl-domain/src/main/scala/com/mwam/kafkakewl/domain/Deployments.scala @@ -9,6 +9,7 @@ package com.mwam.kafkakewl.domain import zio.NonEmptyChunk final case class DeploymentOptions( + dryRun: Boolean = true, // TODO make allowing unsafe operations more granular if needed allowUnsafe: Boolean = false )