Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hazelcast storage driver #118

Open
wants to merge 3 commits into
base: series/2.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 26 additions & 11 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,18 @@ val scala213 = "2.13.11"
val scala3 = "3.3.0"
val allScala = Seq(scala213, scala3)

val zioVersion = "2.0.21"
val zioGrpcVersion = "0.6.0"
val zioK8sVersion = "2.1.1"
val zioCacheVersion = "0.2.3"
val zioCatsInteropVersion = "23.1.0.0"
val sttpVersion = "3.9.3"
val calibanVersion = "2.5.3"
val redis4catsVersion = "1.5.2"
val redissonVersion = "3.27.1"
val scalaKryoVersion = "1.0.2"
val testContainersVersion = "0.41.3"
val zioVersion = "2.0.21"
val zioGrpcVersion = "0.6.0"
val zioK8sVersion = "2.1.1"
val zioCacheVersion = "0.2.3"
val zioCatsInteropVersion = "23.1.0.0"
val sttpVersion = "3.9.3"
val calibanVersion = "2.5.3"
val redis4catsVersion = "1.5.2"
val redissonVersion = "3.27.1"
val hazelcastClientVersion = "5.0.1"
val scalaKryoVersion = "1.0.2"
val testContainersVersion = "0.41.3"

inThisBuild(
List(
Expand Down Expand Up @@ -55,6 +56,7 @@ lazy val root = project
healthK8s,
storageRedis,
storageRedisson,
storageHazelcast,
serializationKryo,
grpcProtocol,
examples
Expand Down Expand Up @@ -141,6 +143,19 @@ lazy val storageRedisson = project
)
)

lazy val storageHazelcast = project
.in(file("storage-hazelcast"))
.settings(name := "shardcake-storage-hazelcast")
.settings(commonSettings)
.dependsOn(core)
.settings(
libraryDependencies ++=
Seq(
"com.hazelcast" % "hazelcast" % hazelcastClientVersion,
"ch.qos.logback" % "logback-classic" % "1.4.7"
)
)

lazy val serializationKryo = project
.in(file("serialization-kryo"))
.settings(name := "shardcake-serialization-kryo")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.devsisters.shardcake
package com.devsisters.shardcake

import com.devsisters.shardcake.ShardManager._
import com.devsisters.shardcake.interfaces._
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.devsisters.shardcake

/**
* The configuration for the Hazelcast storage implementation.
* @param assignmentsKey the key to store shard assignments
* @param podsKey the key to store registered pods
*/
case class HazelcastConfig(assignmentsKey: String, podsKey: String)

object HazelcastConfig {
val default: HazelcastConfig = HazelcastConfig(assignmentsKey = "shard_assignments", podsKey = "pods")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package com.devsisters.shardcake

import com.devsisters.shardcake.interfaces.Storage
import com.hazelcast.core.HazelcastInstance
import com.hazelcast.topic.{ Message, MessageListener }
import zio.stream.ZStream
import zio.{ Queue, Task, Unsafe, ZIO, ZLayer }

import scala.jdk.CollectionConverters._

object StorageHazelcast {

/**
* A layer that returns a Storage implementation using Hazelcast
*/
val live: ZLayer[HazelcastInstance with HazelcastConfig, Nothing, Storage] =
ZLayer {
for {
config <- ZIO.service[HazelcastConfig]
hazelcastInstance <- ZIO.service[HazelcastInstance]
assignmentsMap = hazelcastInstance.getMap[String, String](config.assignmentsKey)
podsMap = hazelcastInstance.getMap[String, String](config.podsKey)
assignmentsTopic = hazelcastInstance.getTopic[String](config.assignmentsKey)
} yield new Storage {
def getAssignments: Task[Map[ShardId, Option[PodAddress]]] =
ZIO
.attemptBlocking(assignmentsMap.entrySet())
.map(
_.asScala.toList
.flatMap(entry =>
entry.getKey.toIntOption.map(
_ -> (if (entry.getValue.isEmpty) None
else PodAddress(entry.getValue))
)
)
.toMap
)

def saveAssignments(assignments: Map[ShardId, Option[PodAddress]]): Task[Unit] =
ZIO.attemptBlocking(assignmentsMap.putAll(assignments.map { case (k, v) =>
Yomanz marked this conversation as resolved.
Show resolved Hide resolved
k.toString -> v.fold("")(_.toString)
}.asJava)) *>
ZIO.attemptBlocking(assignmentsTopic.publish("ping")).unit

def assignmentsStream: ZStream[Any, Throwable, Map[ShardId, Option[PodAddress]]] =
ZStream.unwrap {
for {
queue <- Queue.unbounded[String]
runtime <- ZIO.runtime[Any]
_ <- ZIO.attemptBlocking(
assignmentsTopic.addMessageListener(
new MessageListener[String] {
def onMessage(msg: Message[String]): Unit =
Unsafe.unsafe(implicit unsafe => runtime.unsafe.run(queue.offer(msg.getMessageObject)))
}
)
)
} yield ZStream.fromQueueWithShutdown(queue).mapZIO(_ => getAssignments)
}

def getPods: Task[Map[PodAddress, Pod]] =
ZIO
.attemptBlocking(podsMap.entrySet())
.map(
_.asScala
.flatMap(entry => PodAddress(entry.getKey).map(address => address -> Pod(address, entry.getValue)))
.toMap
)
def savePods(pods: Map[PodAddress, Pod]): Task[Unit] =
ZIO.fromCompletionStage(podsMap.putAllAsync(pods.map { case (k, v) => k.toString -> v.version }.asJava)).unit
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package com.devsisters.shardcake

import com.devsisters.shardcake.interfaces.Storage
import com.dimafeng.testcontainers.GenericContainer
import com.hazelcast.client.HazelcastClient
import com.hazelcast.client.config.{ ClientConfig, ClientNetworkConfig }
import com.hazelcast.core.HazelcastInstance
import zio.Clock.ClockLive
import zio._
import zio.stream.ZStream
import zio.test.TestAspect.sequential
import zio.test._

object StorageHazelcastSpec extends ZIOSpecDefault {
val container: ZLayer[Any, Nothing, GenericContainer] =
ZLayer.scoped {
ZIO.acquireRelease {
ZIO.attemptBlocking {
val container = new GenericContainer(dockerImage = "hazelcast/hazelcast:5.1.2", exposedPorts = Seq(5701))
container.start()
container
}.orDie
}(container => ZIO.attemptBlocking(container.stop()).orDie)
}

val hazelcast: ZLayer[GenericContainer, Throwable, HazelcastInstance] =
ZLayer {
for {
container <- ZIO.service[GenericContainer]
uri = s"${container.host}:${container.mappedPort(container.exposedPorts.head)}"
hazelConfig = new ClientConfig
hazelNetConfig = new ClientNetworkConfig
_ = hazelNetConfig.addAddress(uri)
_ = hazelConfig.setNetworkConfig(hazelNetConfig)
} yield HazelcastClient.newHazelcastClient(hazelConfig)
}

def spec: Spec[TestEnvironment with Scope, Any] =
suite("StorageHazelcastSpec")(
test("save and get pods") {
val expected = List(Pod(PodAddress("host1", 1), "1.0.0"), Pod(PodAddress("host2", 2), "2.0.0"))
.map(p => p.address -> p)
.toMap
for {
_ <- ZIO.serviceWithZIO[Storage](_.savePods(expected))
actual <- ZIO.serviceWithZIO[Storage](_.getPods)
} yield assertTrue(expected == actual)
},
test("save and get assignments") {
val expected = Map(1 -> Some(PodAddress("host1", 1)), 2 -> None)
for {
_ <- ZIO.serviceWithZIO[Storage](_.saveAssignments(expected))
actual <- ZIO.serviceWithZIO[Storage](_.getAssignments)
} yield assertTrue(expected == actual)
},
test("assignments stream") {
val expected = Map(1 -> Some(PodAddress("host1", 1)), 2 -> None)
for {
p <- Promise.make[Nothing, Map[Int, Option[PodAddress]]]
_ <- ZStream.serviceWithStream[Storage](_.assignmentsStream).runForeach(p.succeed(_)).fork
_ <- ClockLive.sleep(1 second)
_ <- ZIO.serviceWithZIO[Storage](_.saveAssignments(expected))
actual <- p.await
} yield assertTrue(expected == actual)
}
).provideLayerShared(
container >>> hazelcast ++ ZLayer.succeed(HazelcastConfig.default) >>> StorageHazelcast.live
) @@ sequential
}