diff --git a/zio-quickstart-kafka/build.sbt b/zio-quickstart-kafka/build.sbt index 9893b0d..0029e64 100644 --- a/zio-quickstart-kafka/build.sbt +++ b/zio-quickstart-kafka/build.sbt @@ -1,10 +1,6 @@ -val `zio-quickstart-kafka` = - project.settings( - stdSettings(), - enableZIO(enableStreaming = true) - ) +scalaVersion := "2.13.13" libraryDependencies ++= Seq( - "dev.zio" %% "zio-kafka" % "2.6.0", + "dev.zio" %% "zio-kafka" % "2.7.4", "dev.zio" %% "zio-json" % "0.6.2" ) diff --git a/zio-quickstart-kafka/src/main/scala/dev/zio/quickstart/JsonStreamingKafkaApp.scala b/zio-quickstart-kafka/src/main/scala/dev/zio/quickstart/JsonStreamingKafkaApp.scala index 0a82bc0..72d0ab8 100644 --- a/zio-quickstart-kafka/src/main/scala/dev/zio/quickstart/JsonStreamingKafkaApp.scala +++ b/zio-quickstart-kafka/src/main/scala/dev/zio/quickstart/JsonStreamingKafkaApp.scala @@ -27,7 +27,8 @@ object KafkaSerde { val value: Serde[Any, Event] = Serde.string.inmapM[Any, Event](s => - ZIO.fromEither(s.fromJson[Event]) + ZIO + .fromEither(s.fromJson[Event]) .mapError(e => new RuntimeException(e)) )(r => ZIO.succeed(r.toJson)) } @@ -68,9 +69,8 @@ object JsonStreamingKafkaApp extends ZIOAppDefault { val c: ZStream[Consumer, Throwable, Nothing] = Consumer - .subscribeAnd(Subscription.topics(KAFKA_TOPIC)) - .plainStream(KafkaSerde.key, KafkaSerde.value) - .tap(e => Console.printLine(e.value)) + .plainStream(Subscription.topics(KAFKA_TOPIC), Serde.int, Serde.string) + .tap(r => Console.printLine(r.value)) .map(_.offset) .aggregateAsync(Consumer.offsetBatches) .mapZIO(_.commit) diff --git a/zio-quickstart-kafka/src/main/scala/dev/zio/quickstart/SimpleKafkaApp.scala b/zio-quickstart-kafka/src/main/scala/dev/zio/quickstart/SimpleKafkaApp.scala index e23d949..06681f6 100644 --- a/zio-quickstart-kafka/src/main/scala/dev/zio/quickstart/SimpleKafkaApp.scala +++ b/zio-quickstart-kafka/src/main/scala/dev/zio/quickstart/SimpleKafkaApp.scala @@ -11,7 +11,7 @@ import zio.kafka.serde._ */ object SimpleKafkaApp extends ZIOAppDefault { private val BOOSTRAP_SERVERS = List("localhost:29092") - private val KAFKA_TOPIC = "hello" + private val KAFKA_TOPIC = "hello" private def produce( topic: String, @@ -36,7 +36,7 @@ object SimpleKafkaApp extends ZIOAppDefault { subscription = Subscription.topics(topic), keyDeserializer = Serde.long, valueDeserializer = Serde.string - )((k, v) => Console.printLine((k, v)).orDie) + )(record => Console.printLine((record.key(), record.value())).orDie) private val producer: ZLayer[Any, Throwable, Producer] = ZLayer.scoped( diff --git a/zio-quickstart-kafka/src/main/scala/dev/zio/quickstart/StreamingKafkaApp.scala b/zio-quickstart-kafka/src/main/scala/dev/zio/quickstart/StreamingKafkaApp.scala index cbf7202..e202002 100644 --- a/zio-quickstart-kafka/src/main/scala/dev/zio/quickstart/StreamingKafkaApp.scala +++ b/zio-quickstart-kafka/src/main/scala/dev/zio/quickstart/StreamingKafkaApp.scala @@ -31,20 +31,26 @@ object StreamingKafkaApp extends ZIOAppDefault { ZStream .repeatZIO(Clock.currentDateTime) .schedule(Schedule.spaced(1.second)) - .map(time => new ProducerRecord(KAFKA_TOPIC, time.getMinute, s"$time -- Hello, World!")) + .map { time => + new ProducerRecord( + KAFKA_TOPIC, + time.getMinute, + s"$time -- Hello, World!" + ) + } .via(Producer.produceAll(Serde.int, Serde.string)) .drain - + val c: ZStream[Consumer, Throwable, Nothing] = Consumer - .subscribeAnd(Subscription.topics(KAFKA_TOPIC)) - .plainStream(Serde.int, Serde.string) - .tap(e => Console.printLine(e.value)) + .plainStream(Subscription.topics(KAFKA_TOPIC), Serde.int, Serde.string) + // do not use `tap` it in prod because it destroys the chunking structure and leads to lower performance + .tap(r => Console.printLine(r.value)) .map(_.offset) .aggregateAsync(Consumer.offsetBatches) .mapZIO(_.commit) .drain - + (p merge c).runDrain.provide(producer, consumer) }