Skip to content

Commit

Permalink
Merge pull request #46 from GrigoriiBerezin/update_kafka_quickstart
Browse files Browse the repository at this point in the history
Update zio-kafka version for quick start
  • Loading branch information
khajavi authored Apr 23, 2024
2 parents 7edd19a + 91909de commit 3bbb14d
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 18 deletions.
8 changes: 2 additions & 6 deletions zio-quickstart-kafka/build.sbt
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
val `zio-quickstart-kafka` =
project.settings(
stdSettings(),
enableZIO(enableStreaming = true)
)
scalaVersion := "2.13.13"

libraryDependencies ++= Seq(
"dev.zio" %% "zio-kafka" % "2.6.0",
"dev.zio" %% "zio-kafka" % "2.7.4",
"dev.zio" %% "zio-json" % "0.6.2"
)
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ object KafkaSerde {

val value: Serde[Any, Event] =
Serde.string.inmapM[Any, Event](s =>
ZIO.fromEither(s.fromJson[Event])
ZIO
.fromEither(s.fromJson[Event])
.mapError(e => new RuntimeException(e))
)(r => ZIO.succeed(r.toJson))
}
Expand Down Expand Up @@ -68,9 +69,8 @@ object JsonStreamingKafkaApp extends ZIOAppDefault {

val c: ZStream[Consumer, Throwable, Nothing] =
Consumer
.subscribeAnd(Subscription.topics(KAFKA_TOPIC))
.plainStream(KafkaSerde.key, KafkaSerde.value)
.tap(e => Console.printLine(e.value))
.plainStream(Subscription.topics(KAFKA_TOPIC), Serde.int, Serde.string)
.tap(r => Console.printLine(r.value))
.map(_.offset)
.aggregateAsync(Consumer.offsetBatches)
.mapZIO(_.commit)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import zio.kafka.serde._
*/
object SimpleKafkaApp extends ZIOAppDefault {
private val BOOSTRAP_SERVERS = List("localhost:29092")
private val KAFKA_TOPIC = "hello"
private val KAFKA_TOPIC = "hello"

private def produce(
topic: String,
Expand All @@ -36,7 +36,7 @@ object SimpleKafkaApp extends ZIOAppDefault {
subscription = Subscription.topics(topic),
keyDeserializer = Serde.long,
valueDeserializer = Serde.string
)((k, v) => Console.printLine((k, v)).orDie)
)(record => Console.printLine((record.key(), record.value())).orDie)

private val producer: ZLayer[Any, Throwable, Producer] =
ZLayer.scoped(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,26 @@ object StreamingKafkaApp extends ZIOAppDefault {
ZStream
.repeatZIO(Clock.currentDateTime)
.schedule(Schedule.spaced(1.second))
.map(time => new ProducerRecord(KAFKA_TOPIC, time.getMinute, s"$time -- Hello, World!"))
.map { time =>
new ProducerRecord(
KAFKA_TOPIC,
time.getMinute,
s"$time -- Hello, World!"
)
}
.via(Producer.produceAll(Serde.int, Serde.string))
.drain

val c: ZStream[Consumer, Throwable, Nothing] =
Consumer
.subscribeAnd(Subscription.topics(KAFKA_TOPIC))
.plainStream(Serde.int, Serde.string)
.tap(e => Console.printLine(e.value))
.plainStream(Subscription.topics(KAFKA_TOPIC), Serde.int, Serde.string)
// do not use `tap` it in prod because it destroys the chunking structure and leads to lower performance
.tap(r => Console.printLine(r.value))
.map(_.offset)
.aggregateAsync(Consumer.offsetBatches)
.mapZIO(_.commit)
.drain

(p merge c).runDrain.provide(producer, consumer)
}

Expand Down

0 comments on commit 3bbb14d

Please sign in to comment.