diff --git a/aliases.sbt b/aliases.sbt new file mode 100644 index 00000000..bebf98f5 --- /dev/null +++ b/aliases.sbt @@ -0,0 +1,10 @@ +lazy val avroBuild = "project avro; test; schema" +lazy val schedulerBuild = "project scheduler; test; dockerComposeTest" + +addCommandAlias("checkFix", "scalafixAll --check OrganizeImports; scalafixAll --check") +addCommandAlias("runFix", "scalafixAll OrganizeImports; scalafixAll") +addCommandAlias("checkFmt", "scalafmtCheckAll; scalafmtSbtCheck") +addCommandAlias("runFmt", "scalafmtAll; scalafmtSbt") + +addCommandAlias("ciBuild", s"checkFmt; checkFix; $avroBuild; $schedulerBuild;") +addCommandAlias("ciRelease", "clean; schema; project scheduler; release with-defaults") diff --git a/build.sbt b/build.sbt index 8160bd6b..25870382 100644 --- a/build.sbt +++ b/build.sbt @@ -1,4 +1,3 @@ -import Aliases._ import Release._ import DockerPublish._ @@ -8,9 +7,12 @@ ThisBuild / semanticdbVersion := scalafixSemanticdb ThisBuild / scalafixDependencies += "com.github.liancheng" %% "organize-imports" % "0.6.0" Global / onChangedBuildSource := ReloadOnSourceChanges +Global / excludeLintKeys ++= Set(testCasesJar, composeContainerPauseBeforeTestSeconds) Test / testOptions += Tests.Argument(TestFrameworks.ScalaTest, "-oF") +lazy val IntegrationTest = config("it") extend Test + val commonSettings = Seq( organization := "com.sky", scalaVersion := "2.13.10" @@ -24,15 +26,25 @@ val compilerSettings = Seq( } ) +lazy val integrationTestSettings = + Defaults.itSettings ++ inConfig(IntegrationTest)(scalafixConfigSettings(IntegrationTest)) ++ Seq( + testCasesPackageTask := (IntegrationTest / sbt.Keys.packageBin).value, + testCasesJar := (IntegrationTest / packageBin / artifactPath).value.getAbsolutePath, + dockerImageCreationTask := (Docker / publishLocal).value, + composeContainerPauseBeforeTestSeconds := 45 + ) + val buildInfoSettings = Seq( buildInfoKeys := Seq[BuildInfoKey](name, version, scalaVersion, sbtVersion), buildInfoPackage := "com.sky" ) lazy val scheduler = (project in file("scheduler")) - .enablePlugins(BuildInfoPlugin, JavaAppPackaging, UniversalDeployPlugin, JavaAgent, DockerPlugin) + .enablePlugins(BuildInfoPlugin, JavaAppPackaging, UniversalDeployPlugin, JavaAgent, DockerPlugin, DockerComposePlugin) .settings(commonSettings) .settings(compilerSettings) + .settings(integrationTestSettings) + .configs(IntegrationTest) .settings( libraryDependencies ++= Dependencies.all, addCompilerPlugin("org.typelevel" % "kind-projector" % "0.13.2" cross CrossVersion.full), @@ -58,7 +70,6 @@ lazy val avro = (project in file("avro")) lazy val root = (project in file(".")) .withId("kafka-message-scheduler") .settings(commonSettings) - .settings(defineCommandAliases) .settings(dockerImageCreationTask := (scheduler / Docker / publishLocal).value) .aggregate(scheduler, avro) .enablePlugins(DockerComposePlugin) diff --git a/project/Aliases.scala b/project/Aliases.scala deleted file mode 100644 index 160ce5a8..00000000 --- a/project/Aliases.scala +++ /dev/null @@ -1,13 +0,0 @@ -import sbt._ - -object Aliases { - - lazy val defineCommandAliases = - addCommandAlias("ciBuild", "checkFmt; checkFix; test; schema") ++ - addCommandAlias("ciRelease", "clean; schema; project scheduler; release with-defaults") ++ - addCommandAlias("checkFix", "scalafixAll --check OrganizeImports; scalafixAll --check") ++ - addCommandAlias("runFix", "scalafixAll OrganizeImports; scalafixAll") ++ - addCommandAlias("checkFmt", "scalafmtCheckAll; scalafmtSbtCheck") ++ - addCommandAlias("runFmt", "scalafmtAll; scalafmtSbt") - -} diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 13beb464..9d4c894a 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -8,8 +8,8 @@ object Dependencies { val stream = "com.typesafe.akka" %% "akka-stream" % version val streamKafka = "com.typesafe.akka" %% "akka-stream-kafka" % "3.0.1" val slf4j = "com.typesafe.akka" %% "akka-slf4j" % version - val testKit = "com.typesafe.akka" %% "akka-testkit" % version % Test - val streamTestKit = "com.typesafe.akka" %% "akka-stream-testkit" % version % Test + val testKit = "com.typesafe.akka" %% "akka-testkit" % version % "test,it" + val streamTestKit = "com.typesafe.akka" %% "akka-stream-testkit" % version % "test,it" val base = Seq(actor, stream, streamKafka, slf4j) val test = Seq(testKit, streamTestKit) } @@ -17,16 +17,23 @@ object Dependencies { object Cats { private val version = "2.7.0" val core = "org.typelevel" %% "cats-core" % version - val testKit = "org.typelevel" %% "cats-testkit" % version % Test - val scalatest = "com.ironcorelabs" %% "cats-scalatest" % "3.1.1" % Test + val testKit = "org.typelevel" %% "cats-testkit" % version % "test,it" + val scalatest = "com.ironcorelabs" %% "cats-scalatest" % "3.1.1" % "test,it" val base = Seq(core) val test = Seq(testKit, scalatest) } + object DockerJava { + private val version = "3.2.14" + val core = "com.github.docker-java" % "docker-java" % version % "test,it" + val httpClient = "com.github.docker-java" % "docker-java-transport-httpclient5" % version % "test,it" + val test = Seq(core, httpClient) + } + object Kafka { private val version = "3.1.0" val kafkaClients = "org.apache.kafka" % "kafka-clients" % version - val kafka = "org.apache.kafka" %% "kafka" % version % Test + val kafka = "org.apache.kafka" %% "kafka" % version % "test,it" val base = Seq(kafkaClients) val test = Seq(kafka) } @@ -50,7 +57,7 @@ object Dependencies { private val version = "0.9.28" val refined = "eu.timepit" %% "refined" % version val pureconfig = "eu.timepit" %% "refined-pureconfig" % version - val scalaCheck = "eu.timepit" %% "refined-scalacheck" % version % Test + val scalaCheck = "eu.timepit" %% "refined-scalacheck" % version % "test,it" val base = Seq(refined, pureconfig) val test = Seq(scalaCheck) } @@ -64,13 +71,13 @@ object Dependencies { val logbackClassic = "ch.qos.logback" % "logback-classic" % "1.4.5" % Runtime val logbackEncoder = "net.logstash.logback" % "logstash-logback-encoder" % "7.3" % Runtime - val embeddedKafka = "io.github.embeddedkafka" %% "embedded-kafka" % "3.4.0" % Test - val mockito = "org.mockito" % "mockito-core" % "5.1.1" % Test - val randomDataGenerator = "com.danielasfregola" %% "random-data-generator" % "2.9" % Test - val scalaCheck = "org.scalacheck" %% "scalacheck" % "1.17.0" % Test - val scalaCheckDatetime = "com.47deg" %% "scalacheck-toolbox-datetime" % "0.7.0" % Test - val scalaTest = "org.scalatest" %% "scalatest" % "3.2.15" % Test - val scalaTestPlusMockito = "org.scalatestplus" %% "mockito-3-12" % "3.2.10.0" % Test + val embeddedKafka = "io.github.embeddedkafka" %% "embedded-kafka" % "3.4.0" % "test,it" + val mockito = "org.mockito" % "mockito-core" % "5.1.1" % "test,it" + val randomDataGenerator = "com.danielasfregola" %% "random-data-generator" % "2.9" % "test,it" + val scalaCheck = "org.scalacheck" %% "scalacheck" % "1.17.0" % "test,it" + val scalaCheckDatetime = "com.47deg" %% "scalacheck-toolbox-datetime" % "0.7.0" % "test,it" + val scalaTest = "org.scalatest" %% "scalatest" % "3.2.15" % "test,it" + val scalaTestPlusMockito = "org.scalatestplus" %% "mockito-3-12" % "3.2.10.0" % "test,it" val core: Seq[ModuleID] = Akka.base ++ Cats.base ++ Kafka.base ++ Kamon.all ++ PureConfig.all ++ Refined.base ++ Seq( avro4s, @@ -83,7 +90,7 @@ object Dependencies { logbackClassic, logbackEncoder ) - val test: Seq[ModuleID] = Akka.test ++ Cats.test ++ Kafka.test ++ Refined.test ++ Seq( + val test: Seq[ModuleID] = Akka.test ++ Cats.test ++ DockerJava.test ++ Kafka.test ++ Refined.test ++ Seq( embeddedKafka, mockito, randomDataGenerator, diff --git a/scheduler/docker/docker-compose.yml b/scheduler/docker/docker-compose.yml new file mode 100644 index 00000000..f1be6126 --- /dev/null +++ b/scheduler/docker/docker-compose.yml @@ -0,0 +1,47 @@ +version: '3.9' + +services: + + kms: + image: skyuk/kafka-message-scheduler:latest + depends_on: + - kafka + - zookeeper + environment: + KAFKA_BROKERS: kafka:9092 + JAVA_TOOL_OPTIONS: + -Dscheduler.reader.schedule-topics.0=scheduleTopic + -Dscheduler.reader.schedule-topics.1=extraScheduleTopic + ports: + - "9095:9095" + + zookeeper: + image: confluentinc/cp-zookeeper:7.0.1 + environment: + ZOOKEEPER_CLIENT_PORT: '2181' + + kafka: + image: confluentinc/cp-kafka:7.0.1 + ports: + - "9093:9093" + depends_on: + - zookeeper + environment: + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: '1' + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,OUTSIDE://localhost:9093 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT + +# akhq: +# image: tchiotludo/akhq:0.23.0 +# depends_on: +# - kafka +# ports: +# - "8080:8080" +# environment: +# AKHQ_CONFIGURATION: | +# akhq: +# connections: +# kafka: +# properties: +# bootstrap.servers: "kafka:9092" diff --git a/scheduler/src/it/scala/base/DockerBase.scala b/scheduler/src/it/scala/base/DockerBase.scala new file mode 100644 index 00000000..adec78ec --- /dev/null +++ b/scheduler/src/it/scala/base/DockerBase.scala @@ -0,0 +1,23 @@ +package base + +import com.github.dockerjava.api.DockerClient +import com.github.dockerjava.core.{DefaultDockerClientConfig, DockerClientImpl} +import com.github.dockerjava.httpclient5.ApacheDockerHttpClient + +import scala.compat.java8.DurationConverters._ +import scala.concurrent.duration._ + +trait DockerBase { + + val dockerConfig: DefaultDockerClientConfig = DefaultDockerClientConfig.createDefaultConfigBuilder.build + val httpClient: ApacheDockerHttpClient = new ApacheDockerHttpClient.Builder() + .dockerHost(dockerConfig.getDockerHost) + .sslConfig(dockerConfig.getSSLConfig) + .maxConnections(100) + .connectionTimeout(30.seconds.toJava) + .responseTimeout(45.seconds.toJava) + .build() + + val dockerClient: DockerClient = DockerClientImpl.getInstance(dockerConfig, httpClient) + +} diff --git a/scheduler/src/it/scala/base/IntegrationBase.scala b/scheduler/src/it/scala/base/IntegrationBase.scala new file mode 100644 index 00000000..90b1ad50 --- /dev/null +++ b/scheduler/src/it/scala/base/IntegrationBase.scala @@ -0,0 +1,32 @@ +package base + +import com.danielasfregola.randomdatagenerator.RandomDataGenerator +import com.sky.kms.base.KafkaIntSpecBase +import io.github.embeddedkafka.EmbeddedKafkaConfig +import org.scalatest._ +import org.scalatest.concurrent.{Eventually, ScalaFutures} +import org.scalatest.featurespec.FixtureAnyFeatureSpec +import org.scalatest.matchers.should.Matchers + +import scala.concurrent.duration._ + +abstract class IntegrationBase + extends FixtureAnyFeatureSpec + with fixture.ConfigMapFixture + with BeforeAndAfterEach + with Matchers + with RandomDataGenerator + with ScalaFutures + with Eventually + with KafkaIntSpecBase + with DockerBase { + + override implicit val patienceConfig: PatienceConfig = PatienceConfig(kafkaConsumerTimeout, 200.millis) + + override implicit lazy val kafkaConfig: EmbeddedKafkaConfig = EmbeddedKafkaConfig(kafkaPort = 9093) + + override def afterEach(): Unit = { + super.afterEach() + seekToEnd() + } +} diff --git a/scheduler/src/it/scala/integration/SchedulerFeature.scala b/scheduler/src/it/scala/integration/SchedulerFeature.scala new file mode 100644 index 00000000..3fabd589 --- /dev/null +++ b/scheduler/src/it/scala/integration/SchedulerFeature.scala @@ -0,0 +1,145 @@ +package integration + +import java.time.OffsetDateTime +import java.util.UUID + +import base.IntegrationBase +import com.sky.kms.domain.{ScheduleEvent, ScheduleId} +import com.sky.kms.kafka.Topic +import com.sky.kms.utils.TestDataUtils._ +import io.github.embeddedkafka.Codecs.{nullDeserializer, nullSerializer, stringDeserializer, stringSerializer} +import org.scalatest.ConfigMap + +import scala.concurrent.duration._ + +class SchedulerFeature extends IntegrationBase { + + Feature("Scheduler stream") { + Scenario("schedule a message to be sent to Kafka and delete it after it has been emitted") { + new TestContext(_) { + val schedules: List[(ScheduleId, ScheduleEvent)] = + createSchedules(2, forTopics = allTopics) + + publish(schedules) + .foreach(assertMessagesWrittenFrom(_, schedules)) + + assertTombstoned(schedules) + } + } + + Scenario("schedule a delete message if the value of the scheduled message is empty") { + new TestContext(_) { + val (scheduleId, scheduleEvent) = createSchedule(scheduleTopic) + val schedule = scheduleEvent.toSchedule.copy(value = None) + + publishToKafka(scheduleTopic.value, scheduleId, schedule.toAvro) + + val cr = consumeFirstFrom[String](schedule.topic) + + cr.key should contain theSameElementsInOrderAs schedule.key + cr.value shouldBe null + cr.timestamp shouldBe schedule.timeInMillis +- tolerance.toMillis + } + } + + Scenario("continue processing when Kafka becomes available") { + new TestContext(_) { + val (beforeId, beforeSchedule) = createSchedule(scheduleTopic) + val (afterId, afterSchedule) = createSchedule(scheduleTopic) + + publishToKafka(scheduleTopic.value, beforeId, beforeSchedule.toSchedule.toAvro) + + consumeFirstFrom[Array[Byte]](beforeSchedule.outputTopic) + .key() should contain theSameElementsInOrderAs beforeSchedule.key + + publishToKafka(scheduleTopic.value, afterId, afterSchedule.secondsFromNow(30).toSchedule.toAvro) + + restartContainer("kafka") + + eventually { + consumeFirstFrom[Array[Byte]](afterSchedule.outputTopic) + .key() should contain theSameElementsInOrderAs afterSchedule.key + } + } + } + + Scenario("not schedule messages that have been deleted but not compacted on startup") { + new TestContext(_) { + // Increase schedule time to allow the container to restart + val schedules @ (id, scheduleToDelete) :: otherSchedules = createSchedules(4, List(scheduleTopic), 45) + + val publishedSchedules = publish(schedules) + + delete(id, scheduleToDelete) + + restartContainer("kms") + + publishedSchedules + .foreach(assertMessagesWrittenFrom(_, otherSchedules)) + + } + } + + } + + private abstract class TestContext(cm: ConfigMap) { + val tolerance: FiniteDuration = 2000.milliseconds + + def restartContainer(containerName: String): Unit = + dockerClient.restartContainerCmd(cm(s"$containerName:containerId").toString).exec() + + def createSchedule(forTopic: Topic, seconds: Long = 4): (ScheduleId, ScheduleEvent) = { + val key = UUID.randomUUID().toString + key -> random[ScheduleEvent] + .copy(inputTopic = forTopic.value, outputTopic = "output-" + UUID.randomUUID().toString, key = key.getBytes) + .secondsFromNow(seconds) + } + + def createSchedules( + numSchedules: Int, + forTopics: List[Topic], + seconds: Long = 4 + ): List[(ScheduleId, ScheduleEvent)] = + LazyList + .continually(forTopics.to(LazyList)) + .flatten + .take(numSchedules) + .toList + .map(createSchedule(_, seconds)) + + def publish(scheduleEvents: List[(ScheduleId, ScheduleEvent)]): List[OffsetDateTime] = scheduleEvents.map { + case (id, scheduleEvent) => + val schedule = scheduleEvent.toSchedule + publishToKafka(scheduleEvent.inputTopic, id, schedule.toAvro) + schedule.time + } + + def delete(id: ScheduleId, scheduleEvent: ScheduleEvent): Unit = + publishToKafka(scheduleEvent.inputTopic, id, null.asInstanceOf[String]) + + def assertMessagesWrittenFrom(time: OffsetDateTime, schedules: List[(ScheduleId, ScheduleEvent)]): Unit = + schedules.foreach { case (_, schedule) => + val cr = consumeFirstFrom[Array[Byte]](schedule.outputTopic) + + cr.key should contain theSameElementsInOrderAs schedule.key + + schedule.value match { + case Some(value) => cr.value should contain theSameElementsAs value + case None => cr.value shouldBe null + } + + cr.timestamp shouldBe time.toInstant.toEpochMilli +- tolerance.toMillis + cr.headers().toArray.map(h => h.key() -> h.value().toList) should contain theSameElementsAs + schedule.headers.map { case (k, v) => + (k, v.toList) + } + } + + def assertTombstoned(schedules: List[(ScheduleId, ScheduleEvent)]): Unit = + schedules.groupBy(_._2.inputTopic).foreach { case (topic, schedulesByInputTopic) => + val tombstones = consumeSomeFrom[String](topic, schedulesByInputTopic.size * 2).filter(_.value == null) + tombstones.size shouldBe schedulesByInputTopic.size + tombstones.map(_.key) shouldBe schedulesByInputTopic.map(_._1).distinct + } + } +} diff --git a/scheduler/src/it/scala/utils/ScheduleEventNoHeaders.scala b/scheduler/src/it/scala/utils/ScheduleEventNoHeaders.scala new file mode 100644 index 00000000..c5818598 --- /dev/null +++ b/scheduler/src/it/scala/utils/ScheduleEventNoHeaders.scala @@ -0,0 +1,11 @@ +package utils + +import scala.concurrent.duration.FiniteDuration + +case class ScheduleEventNoHeaders( + delay: FiniteDuration, + inputTopic: String, + outputTopic: String, + key: Array[Byte], + value: Option[Array[Byte]] +) diff --git a/scheduler/src/test/scala/com/sky/kms/e2e/SchedulerResiliencySpec.scala b/scheduler/src/test/scala/com/sky/kms/acceptance/SchedulerResiliencySpec.scala similarity index 99% rename from scheduler/src/test/scala/com/sky/kms/e2e/SchedulerResiliencySpec.scala rename to scheduler/src/test/scala/com/sky/kms/acceptance/SchedulerResiliencySpec.scala index 516cbb81..4b1d703f 100644 --- a/scheduler/src/test/scala/com/sky/kms/e2e/SchedulerResiliencySpec.scala +++ b/scheduler/src/test/scala/com/sky/kms/acceptance/SchedulerResiliencySpec.scala @@ -1,4 +1,4 @@ -package com.sky.kms.e2e +package com.sky.kms.acceptance import java.util.UUID diff --git a/scheduler/src/test/scala/com/sky/kms/e2e/SchedulerSchemaEvolutionSpec.scala b/scheduler/src/test/scala/com/sky/kms/acceptance/SchedulerSchemaEvolutionSpec.scala similarity index 98% rename from scheduler/src/test/scala/com/sky/kms/e2e/SchedulerSchemaEvolutionSpec.scala rename to scheduler/src/test/scala/com/sky/kms/acceptance/SchedulerSchemaEvolutionSpec.scala index 05f04b51..9d18daa7 100644 --- a/scheduler/src/test/scala/com/sky/kms/e2e/SchedulerSchemaEvolutionSpec.scala +++ b/scheduler/src/test/scala/com/sky/kms/acceptance/SchedulerSchemaEvolutionSpec.scala @@ -1,4 +1,4 @@ -package com.sky.kms.e2e +package com.sky.kms.acceptance import cats.syntax.option._ import com.danielasfregola.randomdatagenerator.RandomDataGenerator diff --git a/scheduler/src/test/scala/com/sky/kms/e2e/package.scala b/scheduler/src/test/scala/com/sky/kms/acceptance/package.scala similarity index 96% rename from scheduler/src/test/scala/com/sky/kms/e2e/package.scala rename to scheduler/src/test/scala/com/sky/kms/acceptance/package.scala index 39127528..bb2fc937 100644 --- a/scheduler/src/test/scala/com/sky/kms/e2e/package.scala +++ b/scheduler/src/test/scala/com/sky/kms/acceptance/package.scala @@ -4,7 +4,7 @@ import akka.actor.CoordinatedShutdown.UnknownReason import akka.actor.{ActorSystem, CoordinatedShutdown} import com.sky.kms.config.{AppConfig, SchedulerConfig} -package object e2e { +package object acceptance { def withSchedulerApp[T]( scenario: => T diff --git a/scheduler/src/test/scala/com/sky/kms/base/KafkaIntSpecBase.scala b/scheduler/src/test/scala/com/sky/kms/base/KafkaIntSpecBase.scala index 1752af9a..737873b5 100644 --- a/scheduler/src/test/scala/com/sky/kms/base/KafkaIntSpecBase.scala +++ b/scheduler/src/test/scala/com/sky/kms/base/KafkaIntSpecBase.scala @@ -6,14 +6,14 @@ import eu.timepit.refined.auto._ import io.github.embeddedkafka.Codecs.{nullDeserializer, stringDeserializer} import io.github.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig} import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer} +import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.Deserializer -import org.scalatest.wordspec.AnyWordSpec import scala.compat.java8.DurationConverters._ import scala.concurrent.duration._ import scala.jdk.CollectionConverters._ -trait KafkaIntSpecBase extends AnyWordSpec with EmbeddedKafka { +trait KafkaIntSpecBase extends EmbeddedKafka { implicit lazy val kafkaConfig: EmbeddedKafkaConfig = EmbeddedKafkaConfig( @@ -21,8 +21,10 @@ trait KafkaIntSpecBase extends AnyWordSpec with EmbeddedKafka { zooKeeperPort = randomPort() ) - val scheduleTopic: Topic = "scheduleTopic" - val extraScheduleTopic: Topic = "extraScheduleTopic" + val scheduleTopic: Topic = "scheduleTopic" + val extraScheduleTopic: Topic = "extraScheduleTopic" + val allTopics: List[Topic] = List(scheduleTopic, extraScheduleTopic) + def kafkaConsumerTimeout: FiniteDuration = 60.seconds private def subscribeAndPoll[K, V](topic: String): KafkaConsumer[K, V] => Iterator[ConsumerRecord[K, V]] = { cr => @@ -39,4 +41,18 @@ trait KafkaIntSpecBase extends AnyWordSpec with EmbeddedKafka { withConsumer { cr: KafkaConsumer[String, T] => subscribeAndPoll(topic)(cr).toList.take(numMsgs) } + + def seekToEnd(): Unit = + withConsumer[String, String, Unit] { consumer => + val tps = for { + topic <- allTopics.map(_.value) + pi <- consumer.partitionsFor(topic).asScala + } yield new TopicPartition(pi.topic, pi.partition) + + consumer.assign(tps.asJava) + consumer.seekToEnd(tps.asJava) + tps.foreach(consumer.position) + consumer.commitSync() + consumer.unsubscribe() + } } diff --git a/scheduler/src/test/scala/com/sky/kms/e2e/SchedulerDeleteIntSpec.scala b/scheduler/src/test/scala/com/sky/kms/e2e/SchedulerDeleteIntSpec.scala deleted file mode 100644 index 1825ba5a..00000000 --- a/scheduler/src/test/scala/com/sky/kms/e2e/SchedulerDeleteIntSpec.scala +++ /dev/null @@ -1,27 +0,0 @@ -package com.sky.kms.e2e - -import com.sky.kms.base.SchedulerIntSpecBase -import com.sky.kms.domain._ -import com.sky.kms.utils.TestDataUtils._ -import eu.timepit.refined.auto._ -import io.github.embeddedkafka.Codecs.{nullSerializer => arrayByteSerializer, stringDeserializer, stringSerializer} - -class SchedulerDeleteIntSpec extends SchedulerIntSpecBase { - - "Scheduler stream" should { - "schedule a delete message if the value of the scheduled message is empty" in withRunningKafka { - withSchedulerApp { - val scheduleId = random[String] - val schedule = random[ScheduleEvent].copy(value = None).secondsFromNow(4).toSchedule - - publishToKafka(scheduleTopic, scheduleId, schedule.toAvro) - - val cr = consumeFirstFrom[String](schedule.topic) - - cr.key should contain theSameElementsInOrderAs schedule.key - cr.value shouldBe null - cr.timestamp shouldBe schedule.timeInMillis +- tolerance.toMillis - } - } - } -} diff --git a/scheduler/src/test/scala/com/sky/kms/e2e/SchedulerIntSpec.scala b/scheduler/src/test/scala/com/sky/kms/e2e/SchedulerIntSpec.scala deleted file mode 100644 index 91caa995..00000000 --- a/scheduler/src/test/scala/com/sky/kms/e2e/SchedulerIntSpec.scala +++ /dev/null @@ -1,68 +0,0 @@ -package com.sky.kms.e2e - -import java.time.OffsetDateTime - -import com.sky.kms.base.SchedulerIntSpecBase -import com.sky.kms.domain._ -import com.sky.kms.utils.TestDataUtils._ -import eu.timepit.refined.auto._ -import io.github.embeddedkafka.Codecs._ - -class SchedulerIntSpec extends SchedulerIntSpecBase { - - "Scheduler stream" should { - "schedule a message to be sent to Kafka and delete it after it has been emitted" in new TestContext { - withRunningKafka { - withSchedulerApp { - val schedules = createSchedules(2, forTopics = List(scheduleTopic, extraScheduleTopic)) - - publish(schedules) - .foreach(assertMessagesWrittenFrom(_, schedules)) - - assertTombstoned(schedules) - } - } - } - } - - private class TestContext { - def createSchedules(numSchedules: Int, forTopics: List[String]): List[(ScheduleId, ScheduleEvent)] = - random[(ScheduleId, ScheduleEvent)](numSchedules).toList - .zip(LazyList.continually(forTopics.to(LazyList)).flatten.take(numSchedules).toList) - .map { case ((id, schedule), topic) => - id -> schedule.copy(inputTopic = topic).secondsFromNow(4) - } - - def publish: List[(ScheduleId, ScheduleEvent)] => List[OffsetDateTime] = _.map { case (id, scheduleEvent) => - val schedule = scheduleEvent.toSchedule - publishToKafka(scheduleEvent.inputTopic, id, schedule.toAvro) - schedule.time - } - - def assertMessagesWrittenFrom(time: OffsetDateTime, schedules: List[(ScheduleId, ScheduleEvent)]): Unit = - schedules.foreach { case (_, schedule) => - val cr = consumeFirstFrom[Array[Byte]](schedule.outputTopic) - - cr.key should contain theSameElementsInOrderAs schedule.key - - schedule.value match { - case Some(value) => cr.value should contain theSameElementsAs value - case None => cr.value shouldBe null - } - - cr.timestamp shouldBe time.toInstant.toEpochMilli +- tolerance.toMillis - cr.headers().toArray.map(h => h.key() -> h.value().toList) should contain theSameElementsAs - schedule.headers.map { case (k, v) => - (k, v.toList) - } - } - - def assertTombstoned(schedules: List[(ScheduleId, ScheduleEvent)]): Unit = - schedules.groupBy(_._2.inputTopic).foreach { case (topic, schedulesByInputTopic) => - val tombstones = consumeSomeFrom[String](topic, schedulesByInputTopic.size * 2).filter(_.value == null) - tombstones.size shouldBe schedulesByInputTopic.size - tombstones.map(_.key) shouldBe schedulesByInputTopic.map(_._1).distinct - } - } - -} diff --git a/scheduler/src/test/scala/com/sky/kms/integration/ScheduleReaderIntSpec.scala b/scheduler/src/test/scala/com/sky/kms/integration/ScheduleReaderIntSpec.scala deleted file mode 100644 index 3ac0ef06..00000000 --- a/scheduler/src/test/scala/com/sky/kms/integration/ScheduleReaderIntSpec.scala +++ /dev/null @@ -1,101 +0,0 @@ -package com.sky.kms.integration - -import java.util.UUID - -import akka.actor.ActorSystem -import akka.testkit.{TestActor, TestProbe} -import cats.syntax.functor._ -import com.sky.kms.actors.SchedulingActor._ -import com.sky.kms.base.SchedulerIntSpecBase -import com.sky.kms.config._ -import com.sky.kms.domain.{ScheduleEvent, ScheduleId} -import com.sky.kms.streams.ScheduleReader -import com.sky.kms.utils.TestActorSystem -import com.sky.kms.utils.TestDataUtils._ -import eu.timepit.refined.auto._ -import io.github.embeddedkafka.Codecs.{nullSerializer => arrayByteSerializer, stringSerializer} - -import scala.concurrent.Await -import scala.concurrent.duration._ - -class ScheduleReaderIntSpec extends SchedulerIntSpecBase { - - override implicit lazy val system: ActorSystem = - TestActorSystem(kafkaConfig.kafkaPort, akkaExpectDuration = 20.seconds) - - val numSchedules = 3 - - "stream" should { - "continue processing when Kafka becomes available" in withRunningScheduleReader { probe => - withRunningKafka { - probe.expectMsg(StreamStarted) - probe.expectMsg(Initialised) - scheduleShouldFlow(probe) - } - // Wait 5 seconds. Embedded Kafka causes issues if you restart too quickly on the same ports. - Thread.sleep(5000) - withRunningKafka { - scheduleShouldFlow(probe) - } - } - - "not schedule messages that have been deleted but not compacted on startup" in withRunningKafka { - val schedules @ firstSchedule :: _ = List.fill(numSchedules)(generateSchedule) - writeSchedulesToKafka(schedules: _*) - deleteSchedulesInKafka(firstSchedule) - - withRunningScheduleReader { probe => - probe.expectMsg(StreamStarted) - val receivedScheduleIds = List.fill(schedules.size)(probe.expectMsgType[CreateOrUpdate].scheduleId) - - receivedScheduleIds should contain theSameElementsAs schedules.map(_._1) - probe.expectMsgType[Cancel].scheduleId shouldBe firstSchedule._1 - probe.expectMsg(Initialised) - } - } - } - - private def generateSchedule: (ScheduleId, ScheduleEvent) = UUID.randomUUID().toString -> random[ScheduleEvent] - - private def withRunningScheduleReader[T](scenario: TestProbe => T): T = { - val probe = { - val p = TestProbe() - p.setAutoPilot((sender, msg) => - msg match { - case _ => - sender ! Ack - TestActor.KeepRunning - } - ) - p - } - - val controlF = ScheduleReader - .configure(probe.ref) - .apply(AppConfig(conf)) - .stream - .run() - - try - scenario(probe) - finally - Await.ready(controlF.flatMap(_.shutdown())(system.dispatcher), 5.seconds) - } - - private def writeSchedulesToKafka(schedules: (ScheduleId, ScheduleEvent)*): Unit = - publishToKafka( - scheduleTopic, - schedules.map { case (scheduleId, scheduleEvent) => - (scheduleId, scheduleEvent.toSchedule.toAvro) - } - ) - - private def scheduleShouldFlow(probe: TestProbe): SchedulingMessage = { - writeSchedulesToKafka(generateSchedule) - probe.expectMsgType[CreateOrUpdate] - } - - private def deleteSchedulesInKafka(schedules: (ScheduleId, ScheduleEvent)*): Unit = - publishToKafka(scheduleTopic, schedules.map(_.map(_ => null.asInstanceOf[Array[Byte]]))) - -}