Skip to content

Commit

Permalink
Deploying to kafka: initial changes, a separate service
Browse files Browse the repository at this point in the history
Signed-off-by: Laszlo Bende <[email protected]>
  • Loading branch information
mw-lb committed Dec 1, 2023
1 parent 056fcbc commit 9a6a0e6
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import com.mwam.kafkakewl.common.http.HttpServer
import com.mwam.kafkakewl.common.persistence.{KafkaPersistentStore, PersistentStore}
import com.mwam.kafkakewl.common.telemetry.GlobalTracer
import com.mwam.kafkakewl.deploy.endpoints.*
import com.mwam.kafkakewl.deploy.services.TopologyDeploymentsService
import com.mwam.kafkakewl.deploy.services.{TopologyDeploymentsService, TopologyDeploymentsToKafkaService}
import com.mwam.kafkakewl.domain.config.KafkaClientConfig
import sttp.tapir.server.metrics.zio.ZioMetrics
import sttp.tapir.server.ziohttp.{ZioHttpInterpreter, ZioHttpServerOptions}
Expand Down Expand Up @@ -71,6 +71,7 @@ object Main extends ZIOAppDefault {
MetricsConfig.live,
KafkaPersistentStore.live,
TopologyDeploymentsService.live,
TopologyDeploymentsToKafkaService.live,
DeploymentsEndpoints.live,
DeploymentsServerEndpoints.live,
Endpoints.live,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ import zio.*

class TopologyDeploymentsService private (
private val persistentStore: PersistentStore,
private val topologyDeploymentsToKafkaService: TopologyDeploymentsToKafkaService,
private val mutex: Semaphore,
private val topologyDeploymentsRef: Ref[TopologyDeployments]
) {
def deploy(deployments: Deployments): IO[PostDeploymentsFailure, DeploymentsSuccess] =
mutex.withPermit {
for {
_ <- ZIO.logInfo(s"deploying $deployments")

// TODO authorization

// Validation before deployment
Expand All @@ -29,20 +31,29 @@ class TopologyDeploymentsService private (
.toZIOParallelErrors
.mapError(DeploymentsFailure.validation)

// TODO performing the kafka deployment itself, returns the new TopologyDeployments
// TODO persisting the new TopologyDeployments
// TODO publishing the change-log messages
// TODO update the in-memory state
// Performing the actual deployment to kafka (dryRun is respected here)
success <- topologyDeploymentsToKafkaService
.deploy(deployments)
.logError("deploying TopologyDeployments")
.mapError(DeploymentsFailure.deployment)

// Just same fake topology deployments for now
// Creating the new TopologyDeployments from the statuses
topologyDeployments = deployments.deploy
.map(t => (t.id, TopologyDeployment(t.id, TopologyDeploymentStatus(), Some(t))))
.map(t => (t.id, TopologyDeployment(t.id, success.statuses(t.id), Some(t))))
.toMap ++ deployments.delete
.map(tid => (tid, TopologyDeployment(tid, TopologyDeploymentStatus(), None)))
.map(tid => (tid, TopologyDeployment(tid, success.statuses(tid), None)))
.toMap

_ <- persistentStore.save(topologyDeployments).logError("saving TopologyDeployments").mapError(DeploymentsFailure.persistence)
_ <- topologyDeploymentsRef.update { _ ++ topologyDeployments -- deployments.delete }
_ <- (for {
// Persisting the new TopologyDeployments
_ <- persistentStore.save(topologyDeployments).logError("saving TopologyDeployments").mapError(DeploymentsFailure.persistence)

// Updating the in-memory state if everything succeeded so far
_ <- topologyDeploymentsRef.update {
_ ++ topologyDeployments -- deployments.delete
}
} yield ()).unless(deployments.options.dryRun) // Not making any actual changes if dryRun = true

_ <- ZIO.logInfo(s"finished deploying $deployments")
} yield DeploymentsSuccess(
topologyDeployments
Expand All @@ -62,14 +73,15 @@ class TopologyDeploymentsService private (
}

object TopologyDeploymentsService {
def live: ZLayer[PersistentStore, Nothing, TopologyDeploymentsService] =
def live: ZLayer[PersistentStore & TopologyDeploymentsToKafkaService, Nothing, TopologyDeploymentsService] =
ZLayer.fromZIO {
for {
persistentStore <- ZIO.service[PersistentStore]
topologyDeploymentsToKafkaService <- ZIO.service[TopologyDeploymentsToKafkaService]
// TODO what happens if we fail here?
topologyDeployments <- persistentStore.loadLatest().orDie
mutex <- Semaphore.make(permits = 1)
topologyDeploymentsRef <- Ref.make(topologyDeployments)
} yield TopologyDeploymentsService(persistentStore, mutex, topologyDeploymentsRef)
} yield TopologyDeploymentsService(persistentStore, topologyDeploymentsToKafkaService, mutex, topologyDeploymentsRef)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* SPDX-FileCopyrightText: 2023 Marshall Wace <[email protected]>
*
* SPDX-License-Identifier: Apache-2.0
*/

package com.mwam.kafkakewl.deploy.services

import com.mwam.kafkakewl.domain.{Deployments, DeploymentsSuccess, TopologyDeploymentStatus}
import zio.*

class TopologyDeploymentsToKafkaService {
def deploy(deployments: Deployments): Task[DeploymentsSuccess] = {
ZIO.succeed {
// For now we just return empty statuses for all deploy/delete topologies
DeploymentsSuccess(
deployments.deploy.map(topology => (topology.id, TopologyDeploymentStatus())).toMap ++
deployments.delete.map(tid => (tid, TopologyDeploymentStatus())).toMap
)
}
}
}

object TopologyDeploymentsToKafkaService {
def live: ZLayer[Any, Nothing, TopologyDeploymentsToKafkaService] =
// TODO proper dependencies, etc...
ZLayer.succeed {
TopologyDeploymentsToKafkaService()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package com.mwam.kafkakewl.domain
import zio.NonEmptyChunk

final case class DeploymentOptions(
dryRun: Boolean = true,
// TODO make allowing unsafe operations more granular if needed
allowUnsafe: Boolean = false
)
Expand Down

0 comments on commit 9a6a0e6

Please sign in to comment.