Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update embedded-kafka from 3.4.1 to 3.6.1 #275

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ object Dependencies {
}

object Kafka {
private val version = "3.1.0"
private val version = "3.6.1"
val kafkaClients = "org.apache.kafka" % "kafka-clients" % version
val kafka = "org.apache.kafka" %% "kafka" % version % Test
val base = Seq(kafkaClients)
Expand Down Expand Up @@ -64,7 +64,7 @@ object Dependencies {
val logbackClassic = "ch.qos.logback" % "logback-classic" % "1.5.1" % Runtime
val logbackEncoder = "net.logstash.logback" % "logstash-logback-encoder" % "7.4" % Runtime

val embeddedKafka = "io.github.embeddedkafka" %% "embedded-kafka" % "3.4.1" % Test
val embeddedKafka = "io.github.embeddedkafka" %% "embedded-kafka" % "3.6.1" % Test
val mockito = "org.mockito" % "mockito-core" % "5.10.0" % Test
val randomDataGenerator = "com.danielasfregola" %% "random-data-generator" % "2.9" % Test
val scalaCheck = "org.scalacheck" %% "scalacheck" % "1.17.0" % Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,21 @@ trait KafkaIntSpecBase extends AnyWordSpec with EmbeddedKafka {

val scheduleTopic: Topic = "scheduleTopic"
val extraScheduleTopic: Topic = "extraScheduleTopic"
def kafkaConsumerTimeout: FiniteDuration = 60.seconds
def kafkaConsumerTimeout: FiniteDuration = 120.seconds

private def subscribeAndPoll[K, V](topic: String): KafkaConsumer[K, V] => Iterator[ConsumerRecord[K, V]] = { cr =>
cr.subscribe(List(topic).asJavaCollection)
cr.poll(kafkaConsumerTimeout.toJava).iterator().asScala
}

def consumeFirstFrom[T : Deserializer](topic: String): ConsumerRecord[Array[Byte], T] =
def consumeFirstFrom[T : Deserializer](topic: String): ConsumerRecord[Array[Byte], T] = {
val c = implicitly[EmbeddedKafkaConfig]
println(s">>> ConsumeFirstFrom: $topic || Config: $c")
withConsumer { cr: KafkaConsumer[Array[Byte], T] =>
println(s">>> withConsumer: ${cr.listTopics()} || Subscribe and Pool from: $topic")
subscribeAndPoll(topic)(cr).next()
}
}

def consumeSomeFrom[T : Deserializer](topic: String, numMsgs: Int): List[ConsumerRecord[String, T]] =
withConsumer { cr: KafkaConsumer[String, T] =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ import scala.concurrent.duration.*
abstract class SchedulerIntSpecBase extends AkkaKafkaSpecBase {
implicit val conf: SchedulerConfig = TestConfig(NonEmptyList.of(scheduleTopic, extraScheduleTopic))
val tolerance: FiniteDuration = 1300.milliseconds
override implicit val patienceConfig: PatienceConfig = PatienceConfig(60.seconds, 1.second)
override implicit val patienceConfig: PatienceConfig = PatienceConfig(120.seconds, 1.second)
}
43 changes: 36 additions & 7 deletions scheduler/src/test/scala/com/sky/kms/e2e/SchedulerIntSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import com.sky.kms.domain.*
import com.sky.kms.utils.TestDataUtils.*
import eu.timepit.refined.auto.*
import io.github.embeddedkafka.Codecs.*
import io.github.embeddedkafka.EmbeddedKafkaConfig

class SchedulerIntSpec extends SchedulerIntSpecBase {

Expand All @@ -25,19 +26,38 @@ class SchedulerIntSpec extends SchedulerIntSpecBase {
}

"schedule a past message to be sent to Kafka immediately and delete it after it has been emitted" in new TestContext {
println("-" * 90)
withRunningKafka {
withSchedulerApp {
val schedule =
createSchedules(1, forTopics = List(scheduleTopic, extraScheduleTopic), fromNow = -100000L)

publish(schedule)
.foreach(_ => assertMessagesWrittenFrom(OffsetDateTime.now(), schedule))
createSchedules(1, forTopics = List(scheduleTopic), fromNow = -100000L)

val outputTopicName = schedule.map(_._2.outputTopic).toSet.headOption.getOrElse("Not Topic Name Found")
println(s">>> Create output topic before publishing: $outputTopicName")
createCustomTopic(outputTopicName)

println(s">>> Kafka config: $kafkaConfig")
println("<BLANK>")
println(s">>> Created schedule: $schedule")
println("<BLANK>")

val published = publish(schedule)
println(s">>> Published: $published")
println("<BLANK>")

published.foreach { _ =>
val now = OffsetDateTime.now()
assertMessagesWrittenFrom(OffsetDateTime.now(), schedule)
println(s">>> assertMessagesWrittenFrom... now: $now || schedule: $schedule")
println("<BLANK>")
}

println(s">>> About to assertTombstoned. Schedule: $schedule")
println("<BLANK>")
assertTombstoned(schedule)
}
}
}(kafkaConfig)
}

}

private class TestContext {
Expand All @@ -54,14 +74,22 @@ class SchedulerIntSpec extends SchedulerIntSpecBase {

def publish: List[(ScheduleId, ScheduleEvent)] => List[OffsetDateTime] = _.map { case (id, scheduleEvent) =>
val schedule = scheduleEvent.toSchedule
val c = implicitly[EmbeddedKafkaConfig]
println(s">>> Publish. scheduleEvent.inputTopic: ${scheduleEvent.inputTopic} || id: $id || config: $c")
println("<BLANK>")
publishToKafka(scheduleEvent.inputTopic, id, schedule.toAvro)
schedule.time
}

def assertMessagesWrittenFrom(time: OffsetDateTime, schedules: List[(ScheduleId, ScheduleEvent)]): Unit =
def assertMessagesWrittenFrom(time: OffsetDateTime, schedules: List[(ScheduleId, ScheduleEvent)]): Unit = {
println(s">>> Schedules: $schedules")
println("<BLANK>")
schedules.foreach { case (_, schedule) =>
println(s">>> Schedule Output Topic: ${schedule.outputTopic}")
val cr = consumeFirstFrom[Array[Byte]](schedule.outputTopic)

println(s">>> Consumer Record: $cr")
println("<BLANK>")
cr.key should contain theSameElementsInOrderAs schedule.key

schedule.value match {
Expand All @@ -75,6 +103,7 @@ class SchedulerIntSpec extends SchedulerIntSpecBase {
(k, v.toList)
}
}
}

def assertTombstoned(schedules: List[(ScheduleId, ScheduleEvent)]): Unit =
schedules.groupBy(_._2.inputTopic).foreach { case (topic, schedulesByInputTopic) =>
Expand Down
5 changes: 4 additions & 1 deletion scheduler/src/test/scala/com/sky/kms/e2e/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ package object e2e {
def withSchedulerApp[T](
scenario: => T
)(implicit conf: SchedulerConfig, system: ActorSystem): T =
withRunningScheduler(SchedulerApp.configure apply AppConfig(conf))(_ => scenario)
withRunningScheduler(SchedulerApp.configure apply AppConfig(conf)) { _ =>
println(s">>> Scheduler Conf: ${AppConfig(conf)}")
scenario
}

def withRunningScheduler[T](
schedulerApp: SchedulerApp
Expand Down