From 4f447e9531ead0e6c01f64f1511fc2f18d55ce5e Mon Sep 17 00:00:00 2001 From: Denys Fakhritdinov Date: Thu, 11 Jul 2024 13:18:26 +0200 Subject: [PATCH 1/9] Abstract Java Kafka client metrics via Producer/ConsumerMetrics --- README.md | 18 ++++++ .../skafka/consumer/ConsumerMetricsOf.scala | 52 ++++++++++++++++ .../skafka/producer/ProducerMetricsOf.scala | 60 +++++++++++++++++++ .../skafka/consumer/ConsumerMetrics.scala | 46 ++++++++++++-- .../skafka/consumer/ConsumerOf.scala | 25 +++++--- .../skafka/producer/ProducerMetrics.scala | 36 ++++++++++- .../skafka/producer/ProducerOf.scala | 27 ++++++--- version.sbt | 2 +- 8 files changed, 244 insertions(+), 22 deletions(-) create mode 100644 modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala create mode 100644 modules/metrics/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetricsOf.scala diff --git a/README.md b/README.md index 2b35664e..3bc45108 100644 --- a/README.md +++ b/README.md @@ -75,6 +75,24 @@ val records: IO[ConsumerRecords[String, String]] = consumer.use { consumer => } ``` +## Java client metrics example + +The example below demonstrates creation of `Consumer`, but same can be done for `Producer` as well. +```scala +import ConsumerMetricsOf.* + +val config: ConsumerConfig = ??? +val registry: CollectorRegistry = ??? // Prometheus client +val metrics0: ConsumerMetrics[IO] = ??? +val metrics1: ConsumerMetrics[IO] = metrics0.exposeJavaClientMetrics("my-app", registry) +ConsumerOf + .apply1(metrics1.some) + .apply(config) + .use { consumer => + ??? + } +``` + ## Setup ```scala diff --git a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala new file mode 100644 index 00000000..88a68199 --- /dev/null +++ b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala @@ -0,0 +1,52 @@ +package com.evolutiongaming.skafka.consumer + +import cats.effect.{Resource, Sync} +import cats.syntax.all.* +import com.evolutiongaming.catshelper.ToTry +import com.evolutiongaming.skafka.{Topic, TopicPartition} +import com.evolutiongaming.skafka.metrics.KafkaMetricsCollector +import io.prometheus.client.CollectorRegistry + +import scala.concurrent.duration.FiniteDuration + +object ConsumerMetricsOf { + + def withJavaClientMetrics[F[_]: Sync: ToTry]( + source: ConsumerMetrics[F], + prefix: String, + prometheus: CollectorRegistry + ): ConsumerMetrics[F] = + new ConsumerMetrics[F] { + override def call(name: String, topic: Topic, latency: FiniteDuration, success: Boolean): F[Unit] = + source.call(name, topic, latency, success) + + override def poll(topic: Topic, bytes: Int, records: Int, age: Option[FiniteDuration]): F[Unit] = + source.poll(topic, bytes, records, age) + + override def count(name: String, topic: Topic): F[Unit] = source.count(name, topic) + + override def rebalance(name: String, topicPartition: TopicPartition): F[Unit] = + source.rebalance(name, topicPartition) + + override def topics(latency: FiniteDuration): F[Unit] = source.topics(latency) + + override def exposeJavaMetrics[K, V](consumer: Consumer[F, K, V]): Resource[F, Unit] = { + val collector = new KafkaMetricsCollector[F](consumer.clientMetrics, prefix.some) + Resource.make { + Sync[F].delay { prometheus.register(collector) } + } { _ => + Sync[F].delay { prometheus.unregister(collector) } + } + } + + } + + implicit final class Syntax[F[_]](val source: ConsumerMetrics[F]) extends AnyVal { + + def exposeJavaClientMetrics( + prefix: String, + prometheus: CollectorRegistry + )(implicit F: Sync[F], toTry: ToTry[F]): ConsumerMetrics[F] = withJavaClientMetrics(source, prefix, prometheus) + + } +} diff --git a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetricsOf.scala b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetricsOf.scala new file mode 100644 index 00000000..d71dd852 --- /dev/null +++ b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetricsOf.scala @@ -0,0 +1,60 @@ +package com.evolutiongaming.skafka.producer + +import cats.effect.{Resource, Sync} +import cats.syntax.all.* +import com.evolutiongaming.catshelper.ToTry +import com.evolutiongaming.skafka.Topic +import com.evolutiongaming.skafka.metrics.KafkaMetricsCollector +import io.prometheus.client.CollectorRegistry + +import scala.concurrent.duration.FiniteDuration + +object ProducerMetricsOf { + + def withJavaClientMetrics[F[_]: Sync: ToTry]( + source: ProducerMetrics[F], + prefix: String, + prometheus: CollectorRegistry + ): ProducerMetrics[F] = + new ProducerMetrics[F] { + override def initTransactions(latency: FiniteDuration): F[Unit] = source.initTransactions(latency) + + override def beginTransaction: F[Unit] = source.beginTransaction + + override def sendOffsetsToTransaction(latency: FiniteDuration): F[Unit] = source.sendOffsetsToTransaction(latency) + + override def commitTransaction(latency: FiniteDuration): F[Unit] = source.commitTransaction(latency) + + override def abortTransaction(latency: FiniteDuration): F[Unit] = source.abortTransaction(latency) + + override def send(topic: Topic, latency: FiniteDuration, bytes: Int): F[Unit] = source.send(topic, latency, bytes) + + override def block(topic: Topic, latency: FiniteDuration): F[Unit] = source.block(topic, latency) + + override def failure(topic: Topic, latency: FiniteDuration): F[Unit] = source.failure(topic, latency) + + override def partitions(topic: Topic, latency: FiniteDuration): F[Unit] = source.partitions(topic, latency) + + override def flush(latency: FiniteDuration): F[Unit] = source.flush(latency) + + override def exposeJavaMetrics(producer: Producer[F]): Resource[F, Unit] = { + val collector = new KafkaMetricsCollector[F](producer.clientMetrics, prefix.some) + Resource.make { + Sync[F].delay { prometheus.register(collector) } + } { _ => + Sync[F].delay { prometheus.unregister(collector) } + } + } + + } + + implicit final class Syntax[F[_]](val source: ProducerMetrics[F]) extends AnyVal { + + def exposeJavaClientMetrics( + prefix: String, + prometheus: CollectorRegistry + )(implicit F: Sync[F], toTry: ToTry[F]): ProducerMetrics[F] = withJavaClientMetrics(source, prefix, prometheus) + + } + +} diff --git a/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetrics.scala b/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetrics.scala index 6d0f8340..a2d38504 100644 --- a/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetrics.scala +++ b/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetrics.scala @@ -1,6 +1,6 @@ package com.evolutiongaming.skafka.consumer -import cats.effect.Resource +import cats.effect.{MonadCancel, Resource} import cats.implicits._ import cats.{Applicative, Monad, ~>} import com.evolutiongaming.skafka.{ClientId, Topic, TopicPartition} @@ -8,6 +8,7 @@ import com.evolutiongaming.smetrics.MetricsHelper._ import com.evolutiongaming.smetrics.{CollectorRegistry, LabelNames, Quantiles} import scala.concurrent.duration.FiniteDuration +import scala.annotation.nowarn trait ConsumerMetrics[F[_]] { @@ -20,6 +21,9 @@ trait ConsumerMetrics[F[_]] { def rebalance(name: String, topicPartition: TopicPartition): F[Unit] def topics(latency: FiniteDuration): F[Unit] + + private[consumer] def exposeJavaMetrics[K, V](@nowarn consumer: Consumer[F, K, V]): Resource[F, Unit] = + Resource.unit[F] } object ConsumerMetrics { @@ -104,11 +108,11 @@ object ConsumerMetrics { bytesSummary <- bytesSummary rebalancesCounter <- rebalancesCounter topicsLatency <- topicsLatency - ageSummary <- registry.summary( - name = s"${ prefix }_poll_age", - help = "Poll records age, time since record.timestamp", + ageSummary <- registry.summary( + name = s"${prefix}_poll_age", + help = "Poll records age, time since record.timestamp", quantiles = Quantiles.Default, - labels = LabelNames("client", "topic") + labels = LabelNames("client", "topic") ) } yield { (clientId: ClientId) => new ConsumerMetrics[F] { @@ -162,6 +166,7 @@ object ConsumerMetrics { implicit class ConsumerMetricsOps[F[_]](val self: ConsumerMetrics[F]) extends AnyVal { + @deprecated("Use mapK(f, g) instead", "16.2.0") def mapK[G[_]](f: F ~> G): ConsumerMetrics[G] = { new MapK with ConsumerMetrics[G] { @@ -186,5 +191,36 @@ object ConsumerMetrics { } } } + + def mapK[G[_]]( + fg: F ~> G, + gf: G ~> F + )(implicit F: MonadCancel[F, Throwable], G: MonadCancel[G, Throwable]): ConsumerMetrics[G] = { + new MapK with ConsumerMetrics[G] { + + def call(name: String, topic: Topic, latency: FiniteDuration, success: Boolean) = { + fg(self.call(name, topic, latency, success)) + } + + def poll(topic: Topic, bytes: Int, records: Int, age: Option[FiniteDuration]) = { + fg(self.poll(topic, bytes, records, age)) + } + + def count(name: String, topic: Topic) = { + fg(self.count(name, topic)) + } + + def rebalance(name: String, topicPartition: TopicPartition) = { + fg(self.rebalance(name, topicPartition)) + } + + def topics(latency: FiniteDuration) = { + fg(self.topics(latency)) + } + + override def exposeJavaMetrics[K, V](consumer: Consumer[G, K, V]) = + self.exposeJavaMetrics(consumer.mapK(gf, fg)).mapK(fg) + } + } } } diff --git a/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerOf.scala b/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerOf.scala index cd721f83..c9a97662 100644 --- a/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerOf.scala +++ b/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerOf.scala @@ -31,20 +31,31 @@ object ConsumerOf { def apply[K, V](config: ConsumerConfig)(implicit fromBytesK: FromBytes[F, K], fromBytesV: FromBytes[F, V]) = { Consumer .of[F, K, V](config) - .map { consumer => - metrics.fold { consumer } { consumer.withMetrics1[Throwable] } + .flatMap { consumer => + metrics match { + + case None => + Resource.pure[F, Consumer[F, K, V]](consumer) + + case Some(metrics) => + for { + _ <- metrics.exposeJavaMetrics[K, V](consumer) + } yield { + consumer.withMetrics1[Throwable](metrics) + } + } } } } } /** The sole purpose of this method is to support binary compatibility with an intermediate - * version (namely, 15.2.0) which had `apply1` method using `MeasureDuration` from `smetrics` - * and `apply2` using `MeasureDuration` from `cats-helper`. - * This should not be used and should be removed in a reasonable amount of time. - */ + * version (namely, 15.2.0) which had `apply1` method using `MeasureDuration` from `smetrics` + * and `apply2` using `MeasureDuration` from `cats-helper`. + * This should not be used and should be removed in a reasonable amount of time. + */ @deprecated("Use `apply1`", since = "16.0.3") - def apply2[F[_] : Async : ToTry : ToFuture : MeasureDuration]( + def apply2[F[_]: Async: ToTry: ToFuture: MeasureDuration]( metrics: Option[ConsumerMetrics[F]] = None ): ConsumerOf[F] = apply1(metrics) diff --git a/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetrics.scala b/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetrics.scala index 8accb7c0..8a1610e0 100644 --- a/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetrics.scala +++ b/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetrics.scala @@ -1,6 +1,6 @@ package com.evolutiongaming.skafka.producer -import cats.effect.Resource +import cats.effect.{MonadCancel, Resource} import cats.implicits._ import cats.{Applicative, Monad, ~>} import com.evolutiongaming.skafka.{ClientId, Topic} @@ -8,6 +8,7 @@ import com.evolutiongaming.smetrics.MetricsHelper._ import com.evolutiongaming.smetrics.{CollectorRegistry, LabelNames, Quantile, Quantiles} import scala.concurrent.duration.FiniteDuration +import scala.annotation.nowarn trait ProducerMetrics[F[_]] { @@ -30,6 +31,8 @@ trait ProducerMetrics[F[_]] { def partitions(topic: Topic, latency: FiniteDuration): F[Unit] def flush(latency: FiniteDuration): F[Unit] + + private[producer] def exposeJavaMetrics(@nowarn producer: Producer[F]): Resource[F, Unit] = Resource.unit[F] } object ProducerMetrics { @@ -177,6 +180,7 @@ object ProducerMetrics { implicit class ProducerMetricsOps[F[_]](val self: ProducerMetrics[F]) extends AnyVal { + @deprecated("Use mapK(f, g) instead", "16.2.0") def mapK[G[_]](f: F ~> G): ProducerMetrics[G] = new ProducerMetrics[G] { def initTransactions(latency: FiniteDuration) = f(self.initTransactions(latency)) @@ -199,5 +203,35 @@ object ProducerMetrics { def flush(latency: FiniteDuration) = f(self.flush(latency)) } + + def mapK[G[_]]( + fg: F ~> G, + gf: G ~> F + )(implicit F: MonadCancel[F, Throwable], G: MonadCancel[G, Throwable]): ProducerMetrics[G] = + new ProducerMetrics[G] { + + def initTransactions(latency: FiniteDuration) = fg(self.initTransactions(latency)) + + def beginTransaction = fg(self.beginTransaction) + + def sendOffsetsToTransaction(latency: FiniteDuration) = fg(self.sendOffsetsToTransaction(latency)) + + def commitTransaction(latency: FiniteDuration) = fg(self.commitTransaction(latency)) + + def abortTransaction(latency: FiniteDuration) = fg(self.abortTransaction(latency)) + + def send(topic: Topic, latency: FiniteDuration, bytes: Int) = fg(self.send(topic, latency, bytes)) + + def block(topic: Topic, latency: FiniteDuration) = fg(self.block(topic, latency)) + + def failure(topic: Topic, latency: FiniteDuration) = fg(self.failure(topic, latency)) + + def partitions(topic: Topic, latency: FiniteDuration) = fg(self.partitions(topic, latency)) + + def flush(latency: FiniteDuration) = fg(self.flush(latency)) + + override def exposeJavaMetrics(producer: Producer[G]) = + self.exposeJavaMetrics(producer.mapK[F](gf, fg)).mapK(fg) + } } } diff --git a/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerOf.scala b/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerOf.scala index 9dc0fa21..e431616c 100644 --- a/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerOf.scala +++ b/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerOf.scala @@ -26,19 +26,30 @@ object ProducerOf { def apply(config: ProducerConfig) = { for { producer <- Producer.of[F](config = config) - } yield { - metrics.fold(producer)(producer.withMetrics[Throwable]) - } + producer <- metrics match { + + case None => + Resource.pure[F, Producer[F]](producer) + + case Some(metrics) => + for { + _ <- metrics.exposeJavaMetrics(producer) + } yield { + producer.withMetrics[Throwable](metrics) + } + + } + } yield producer } } /** The sole purpose of this method is to support binary compatibility with an intermediate - * version (namely, 15.2.0) which had `apply1` method using `MeasureDuration` from `smetrics` - * and `apply2` using `MeasureDuration` from `cats-helper`. - * This should not be used and should be removed in a reasonable amount of time. - */ + * version (namely, 15.2.0) which had `apply1` method using `MeasureDuration` from `smetrics` + * and `apply2` using `MeasureDuration` from `cats-helper`. + * This should not be used and should be removed in a reasonable amount of time. + */ @deprecated("Use `apply1`", since = "16.0.3") - def apply2[F[_] : MeasureDuration : ToTry : Async]( + def apply2[F[_]: MeasureDuration: ToTry: Async]( metrics: Option[ProducerMetrics[F]] = None ): ProducerOf[F] = apply1(metrics) diff --git a/version.sbt b/version.sbt index dfc364c5..ec7da3a9 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -ThisBuild / version := "16.1.3-SNAPSHOT" +ThisBuild / version := "16.2.0" From d8fe291ee689d56147cc112ec0a6f4a57d484cb1 Mon Sep 17 00:00:00 2001 From: Denys Fakhritdinov Date: Thu, 11 Jul 2024 16:07:58 +0200 Subject: [PATCH 2/9] Calculate metrics prefix based on client ID --- README.md | 2 +- .../skafka/consumer/ConsumerMetricsOf.scala | 14 ++++++++------ .../skafka/producer/ProducerMetricsOf.scala | 11 +++++------ .../skafka/consumer/ConsumerMetrics.scala | 10 ++++++---- .../skafka/consumer/ConsumerOf.scala | 2 +- .../skafka/producer/ProducerMetrics.scala | 9 ++++++--- .../skafka/producer/ProducerOf.scala | 2 +- 7 files changed, 28 insertions(+), 22 deletions(-) diff --git a/README.md b/README.md index 3bc45108..971bee08 100644 --- a/README.md +++ b/README.md @@ -84,7 +84,7 @@ import ConsumerMetricsOf.* val config: ConsumerConfig = ??? val registry: CollectorRegistry = ??? // Prometheus client val metrics0: ConsumerMetrics[IO] = ??? -val metrics1: ConsumerMetrics[IO] = metrics0.exposeJavaClientMetrics("my-app", registry) +val metrics1: ConsumerMetrics[IO] = metrics0.exposeJavaClientMetrics(identity, registry) ConsumerOf .apply1(metrics1.some) .apply(config) diff --git a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala index 88a68199..859b4481 100644 --- a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala +++ b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala @@ -1,9 +1,8 @@ package com.evolutiongaming.skafka.consumer import cats.effect.{Resource, Sync} -import cats.syntax.all.* import com.evolutiongaming.catshelper.ToTry -import com.evolutiongaming.skafka.{Topic, TopicPartition} +import com.evolutiongaming.skafka.{ClientId, Topic, TopicPartition} import com.evolutiongaming.skafka.metrics.KafkaMetricsCollector import io.prometheus.client.CollectorRegistry @@ -13,7 +12,7 @@ object ConsumerMetricsOf { def withJavaClientMetrics[F[_]: Sync: ToTry]( source: ConsumerMetrics[F], - prefix: String, + prefix: ClientId => String, prometheus: CollectorRegistry ): ConsumerMetrics[F] = new ConsumerMetrics[F] { @@ -30,8 +29,11 @@ object ConsumerMetricsOf { override def topics(latency: FiniteDuration): F[Unit] = source.topics(latency) - override def exposeJavaMetrics[K, V](consumer: Consumer[F, K, V]): Resource[F, Unit] = { - val collector = new KafkaMetricsCollector[F](consumer.clientMetrics, prefix.some) + override def exposeJavaMetrics[K, V]( + consumer: Consumer[F, K, V], + clientId: Option[ClientId] + ): Resource[F, Unit] = { + val collector = new KafkaMetricsCollector[F](consumer.clientMetrics, clientId.map(prefix)) Resource.make { Sync[F].delay { prometheus.register(collector) } } { _ => @@ -44,7 +46,7 @@ object ConsumerMetricsOf { implicit final class Syntax[F[_]](val source: ConsumerMetrics[F]) extends AnyVal { def exposeJavaClientMetrics( - prefix: String, + prefix: ClientId => String, prometheus: CollectorRegistry )(implicit F: Sync[F], toTry: ToTry[F]): ConsumerMetrics[F] = withJavaClientMetrics(source, prefix, prometheus) diff --git a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetricsOf.scala b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetricsOf.scala index d71dd852..9553186a 100644 --- a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetricsOf.scala +++ b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetricsOf.scala @@ -1,9 +1,8 @@ package com.evolutiongaming.skafka.producer import cats.effect.{Resource, Sync} -import cats.syntax.all.* import com.evolutiongaming.catshelper.ToTry -import com.evolutiongaming.skafka.Topic +import com.evolutiongaming.skafka.{ClientId, Topic} import com.evolutiongaming.skafka.metrics.KafkaMetricsCollector import io.prometheus.client.CollectorRegistry @@ -13,7 +12,7 @@ object ProducerMetricsOf { def withJavaClientMetrics[F[_]: Sync: ToTry]( source: ProducerMetrics[F], - prefix: String, + prefix: ClientId => String, prometheus: CollectorRegistry ): ProducerMetrics[F] = new ProducerMetrics[F] { @@ -37,8 +36,8 @@ object ProducerMetricsOf { override def flush(latency: FiniteDuration): F[Unit] = source.flush(latency) - override def exposeJavaMetrics(producer: Producer[F]): Resource[F, Unit] = { - val collector = new KafkaMetricsCollector[F](producer.clientMetrics, prefix.some) + override def exposeJavaMetrics(producer: Producer[F], clientId: Option[ClientId]): Resource[F, Unit] = { + val collector = new KafkaMetricsCollector[F](producer.clientMetrics, clientId.map(prefix)) Resource.make { Sync[F].delay { prometheus.register(collector) } } { _ => @@ -51,7 +50,7 @@ object ProducerMetricsOf { implicit final class Syntax[F[_]](val source: ProducerMetrics[F]) extends AnyVal { def exposeJavaClientMetrics( - prefix: String, + prefix: ClientId => String, prometheus: CollectorRegistry )(implicit F: Sync[F], toTry: ToTry[F]): ProducerMetrics[F] = withJavaClientMetrics(source, prefix, prometheus) diff --git a/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetrics.scala b/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetrics.scala index a2d38504..3c30488f 100644 --- a/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetrics.scala +++ b/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetrics.scala @@ -22,8 +22,10 @@ trait ConsumerMetrics[F[_]] { def topics(latency: FiniteDuration): F[Unit] - private[consumer] def exposeJavaMetrics[K, V](@nowarn consumer: Consumer[F, K, V]): Resource[F, Unit] = - Resource.unit[F] + private[consumer] def exposeJavaMetrics[K, V]( + @nowarn consumer: Consumer[F, K, V], + @nowarn clientId: Option[ClientId], + ): Resource[F, Unit] = Resource.unit[F] } object ConsumerMetrics { @@ -218,8 +220,8 @@ object ConsumerMetrics { fg(self.topics(latency)) } - override def exposeJavaMetrics[K, V](consumer: Consumer[G, K, V]) = - self.exposeJavaMetrics(consumer.mapK(gf, fg)).mapK(fg) + override def exposeJavaMetrics[K, V](consumer: Consumer[G, K, V], clientId: Option[ClientId]) = + self.exposeJavaMetrics(consumer.mapK(gf, fg), clientId).mapK(fg) } } } diff --git a/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerOf.scala b/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerOf.scala index c9a97662..1a6911a4 100644 --- a/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerOf.scala +++ b/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerOf.scala @@ -39,7 +39,7 @@ object ConsumerOf { case Some(metrics) => for { - _ <- metrics.exposeJavaMetrics[K, V](consumer) + _ <- metrics.exposeJavaMetrics[K, V](consumer, config.common.clientId) } yield { consumer.withMetrics1[Throwable](metrics) } diff --git a/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetrics.scala b/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetrics.scala index 8a1610e0..6911337a 100644 --- a/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetrics.scala +++ b/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetrics.scala @@ -32,7 +32,10 @@ trait ProducerMetrics[F[_]] { def flush(latency: FiniteDuration): F[Unit] - private[producer] def exposeJavaMetrics(@nowarn producer: Producer[F]): Resource[F, Unit] = Resource.unit[F] + private[producer] def exposeJavaMetrics( + @nowarn producer: Producer[F], + @nowarn clientId: Option[ClientId], + ): Resource[F, Unit] = Resource.unit[F] } object ProducerMetrics { @@ -230,8 +233,8 @@ object ProducerMetrics { def flush(latency: FiniteDuration) = fg(self.flush(latency)) - override def exposeJavaMetrics(producer: Producer[G]) = - self.exposeJavaMetrics(producer.mapK[F](gf, fg)).mapK(fg) + override def exposeJavaMetrics(producer: Producer[G], clientId: Option[ClientId]) = + self.exposeJavaMetrics(producer.mapK[F](gf, fg), clientId).mapK(fg) } } } diff --git a/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerOf.scala b/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerOf.scala index e431616c..de5b4547 100644 --- a/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerOf.scala +++ b/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerOf.scala @@ -33,7 +33,7 @@ object ProducerOf { case Some(metrics) => for { - _ <- metrics.exposeJavaMetrics(producer) + _ <- metrics.exposeJavaMetrics(producer, config.common.clientId) } yield { producer.withMetrics[Throwable](metrics) } From 9655d6d45a3ee92f4e0e9ce49e089154b1ac3be8 Mon Sep 17 00:00:00 2001 From: Denys Fakhritdinov Date: Thu, 11 Jul 2024 16:08:20 +0200 Subject: [PATCH 3/9] Update modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Mareks Rampāns <8796159+mr-git@users.noreply.github.com> --- .../evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala index 859b4481..30aa892f 100644 --- a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala +++ b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala @@ -27,7 +27,8 @@ object ConsumerMetricsOf { override def rebalance(name: String, topicPartition: TopicPartition): F[Unit] = source.rebalance(name, topicPartition) - override def topics(latency: FiniteDuration): F[Unit] = source.topics(latency) + override def topics(latency: FiniteDuration): F[Unit] = + source.topics(latency) override def exposeJavaMetrics[K, V]( consumer: Consumer[F, K, V], From 49467c0600a5c722d10b6c187ab41bf5f68396da Mon Sep 17 00:00:00 2001 From: Denys Fakhritdinov Date: Thu, 11 Jul 2024 16:08:28 +0200 Subject: [PATCH 4/9] Update modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Mareks Rampāns <8796159+mr-git@users.noreply.github.com> --- .../evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala index 30aa892f..bc5cf9e2 100644 --- a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala +++ b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala @@ -22,7 +22,8 @@ object ConsumerMetricsOf { override def poll(topic: Topic, bytes: Int, records: Int, age: Option[FiniteDuration]): F[Unit] = source.poll(topic, bytes, records, age) - override def count(name: String, topic: Topic): F[Unit] = source.count(name, topic) + override def count(name: String, topic: Topic): F[Unit] = + source.count(name, topic) override def rebalance(name: String, topicPartition: TopicPartition): F[Unit] = source.rebalance(name, topicPartition) From 1da4c596d0733134183496f7798fa3aadc4a9219 Mon Sep 17 00:00:00 2001 From: Denys Fakhritdinov Date: Thu, 11 Jul 2024 16:21:41 +0200 Subject: [PATCH 5/9] Add docs, silent mima --- README.md | 5 +++++ build.sbt | 4 +++- .../skafka/consumer/ConsumerMetricsOf.scala | 15 +++++++++++++++ .../skafka/producer/ProducerMetricsOf.scala | 15 +++++++++++++++ 4 files changed, 38 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 971bee08..c693f776 100644 --- a/README.md +++ b/README.md @@ -78,6 +78,11 @@ val records: IO[ConsumerRecords[String, String]] = consumer.use { consumer => ## Java client metrics example The example below demonstrates creation of `Consumer`, but same can be done for `Producer` as well. + +> :warning: Creating consumer/producer will register new set of metrics in Prometheus registry +> thus metric names must not clash. For that you have to provide function of type `ClientId => String`, +> that takes client ID and returns metric prefix. The prefix expected to be unique! + ```scala import ConsumerMetricsOf.* diff --git a/build.sbt b/build.sbt index 70b26b14..99112428 100644 --- a/build.sbt +++ b/build.sbt @@ -39,7 +39,9 @@ lazy val commonSettings = Seq( ProblemFilters.exclude[ReversedMissingMethodProblem]("com.evolutiongaming.skafka.consumer.Consumer.subscribe"), ProblemFilters.exclude[DirectMissingMethodProblem]( "com.evolutiongaming.skafka.Converters#MapJOps.asScalaMap$extension" - ) + ), + ProblemFilters.exclude[DirectMissingMethodProblem]("com.evolutiongaming.skafka.consumer.ConsumerMetrics#ConsumerMetricsOps.mapK$extension"), + ProblemFilters.exclude[DirectMissingMethodProblem]("com.evolutiongaming.skafka.producer.ProducerMetrics#ProducerMetricsOps.mapK$extension"), ) ) diff --git a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala index bc5cf9e2..91b5c709 100644 --- a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala +++ b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala @@ -10,6 +10,14 @@ import scala.concurrent.duration.FiniteDuration object ConsumerMetricsOf { + /** + * Construct [[ConsumerMetrics]] that will expose Java Kafka client metrics. + * + * @param source original [[ConsumerMetrics]] + * @param prefix function that provides _unique_ prefix for each client + * @param prometheus instance of Prometheus registry + * @return [[ConsumerMetrics]] that exposes Java Kafka client metrics + */ def withJavaClientMetrics[F[_]: Sync: ToTry]( source: ConsumerMetrics[F], prefix: ClientId => String, @@ -47,6 +55,13 @@ object ConsumerMetricsOf { implicit final class Syntax[F[_]](val source: ConsumerMetrics[F]) extends AnyVal { + /** + * Construct [[ConsumerMetrics]] that will expose Java Kafka client metrics. + * + * @param prefix function that provides _unique_ prefix for each client + * @param prometheus instance of Prometheus registry + * @return [[ConsumerMetrics]] that exposes Java Kafka client metrics + */ def exposeJavaClientMetrics( prefix: ClientId => String, prometheus: CollectorRegistry diff --git a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetricsOf.scala b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetricsOf.scala index 9553186a..2e7fd62f 100644 --- a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetricsOf.scala +++ b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetricsOf.scala @@ -10,6 +10,14 @@ import scala.concurrent.duration.FiniteDuration object ProducerMetricsOf { + /** + * Construct [[ProducerMetrics]] that will expose Java Kafka client metrics. + * + * @param source original [[ProducerMetrics]] + * @param prefix function that provides _unique_ prefix for each client + * @param prometheus instance of Prometheus registry + * @return [[ProducerMetrics]] that exposes Java Kafka client metrics + */ def withJavaClientMetrics[F[_]: Sync: ToTry]( source: ProducerMetrics[F], prefix: ClientId => String, @@ -49,6 +57,13 @@ object ProducerMetricsOf { implicit final class Syntax[F[_]](val source: ProducerMetrics[F]) extends AnyVal { + /** + * Construct [[ProducerMetrics]] that will expose Java Kafka client metrics. + * + * @param prefix function that provides _unique_ prefix for each client + * @param prometheus instance of Prometheus registry + * @return [[ProducerMetrics]] that exposes Java Kafka client metrics + */ def exposeJavaClientMetrics( prefix: ClientId => String, prometheus: CollectorRegistry From eac1fdd89d13dcebda2afe05e9523a13ca96a671 Mon Sep 17 00:00:00 2001 From: Denys Fakhritdinov Date: Fri, 12 Jul 2024 16:12:04 +0200 Subject: [PATCH 6/9] Re-implement java kafka metrics collection using registry, kudos to Z1kkurat --- .../skafka/consumer/ConsumerMetricsOf.scala | 34 ++++---- .../skafka/metrics/KafkaMetricsRegistry.scala | 80 +++++++++++++++++++ .../skafka/producer/ProducerMetricsOf.scala | 33 ++++---- .../skafka/consumer/ConsumerMetrics.scala | 10 +-- .../skafka/consumer/ConsumerOf.scala | 2 +- .../skafka/producer/ProducerMetrics.scala | 9 +-- .../skafka/producer/ProducerOf.scala | 2 +- 7 files changed, 118 insertions(+), 52 deletions(-) create mode 100644 modules/metrics/src/main/scala/com/evolutiongaming/skafka/metrics/KafkaMetricsRegistry.scala diff --git a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala index 91b5c709..e0709fb5 100644 --- a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala +++ b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala @@ -2,8 +2,8 @@ package com.evolutiongaming.skafka.consumer import cats.effect.{Resource, Sync} import com.evolutiongaming.catshelper.ToTry -import com.evolutiongaming.skafka.{ClientId, Topic, TopicPartition} -import com.evolutiongaming.skafka.metrics.KafkaMetricsCollector +import com.evolutiongaming.skafka.{Topic, TopicPartition} +import com.evolutiongaming.skafka.metrics.KafkaMetricsRegistry import io.prometheus.client.CollectorRegistry import scala.concurrent.duration.FiniteDuration @@ -14,16 +14,18 @@ object ConsumerMetricsOf { * Construct [[ConsumerMetrics]] that will expose Java Kafka client metrics. * * @param source original [[ConsumerMetrics]] - * @param prefix function that provides _unique_ prefix for each client + * @param prefix metric name prefix * @param prometheus instance of Prometheus registry * @return [[ConsumerMetrics]] that exposes Java Kafka client metrics */ def withJavaClientMetrics[F[_]: Sync: ToTry]( source: ConsumerMetrics[F], - prefix: ClientId => String, + prefix: Option[String], prometheus: CollectorRegistry - ): ConsumerMetrics[F] = - new ConsumerMetrics[F] { + ): Resource[F, ConsumerMetrics[F]] = + for { + registry <- KafkaMetricsRegistry.of(prometheus, prefix) + } yield new ConsumerMetrics[F] { override def call(name: String, topic: Topic, latency: FiniteDuration, success: Boolean): F[Unit] = source.call(name, topic, latency, success) @@ -39,21 +41,12 @@ object ConsumerMetricsOf { override def topics(latency: FiniteDuration): F[Unit] = source.topics(latency) - override def exposeJavaMetrics[K, V]( - consumer: Consumer[F, K, V], - clientId: Option[ClientId] - ): Resource[F, Unit] = { - val collector = new KafkaMetricsCollector[F](consumer.clientMetrics, clientId.map(prefix)) - Resource.make { - Sync[F].delay { prometheus.register(collector) } - } { _ => - Sync[F].delay { prometheus.unregister(collector) } - } - } + override def exposeJavaMetrics[K, V](consumer: Consumer[F, K, V]): Resource[F, Unit] = + registry.register(consumer.clientMetrics) } - implicit final class Syntax[F[_]](val source: ConsumerMetrics[F]) extends AnyVal { + implicit final class ConsumerMetricsOps[F[_]](val source: ConsumerMetrics[F]) extends AnyVal { /** * Construct [[ConsumerMetrics]] that will expose Java Kafka client metrics. @@ -63,9 +56,10 @@ object ConsumerMetricsOf { * @return [[ConsumerMetrics]] that exposes Java Kafka client metrics */ def exposeJavaClientMetrics( - prefix: ClientId => String, + prefix: Option[String], prometheus: CollectorRegistry - )(implicit F: Sync[F], toTry: ToTry[F]): ConsumerMetrics[F] = withJavaClientMetrics(source, prefix, prometheus) + )(implicit F: Sync[F], toTry: ToTry[F]): Resource[F, ConsumerMetrics[F]] = + withJavaClientMetrics(source, prefix, prometheus) } } diff --git a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/metrics/KafkaMetricsRegistry.scala b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/metrics/KafkaMetricsRegistry.scala new file mode 100644 index 00000000..0d55dc49 --- /dev/null +++ b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/metrics/KafkaMetricsRegistry.scala @@ -0,0 +1,80 @@ +package com.evolutiongaming.skafka.metrics + +import cats.syntax.all._ +import cats.effect.syntax.resource._ +import cats.effect.{Resource, Ref, Sync} +import cats.effect.std.UUIDGen +import com.evolutiongaming.catshelper.ToTry +import com.evolutiongaming.skafka.ClientMetric +import io.prometheus.client.CollectorRegistry + +import java.util.UUID + +/** + * Allows reporting metrics of multiple Kafka clients inside a single VM. + * + * Example: + * {{{ + * val prometheus: CollectorRegistry = ??? + * val consumerOf: ConsumerOf[F] = ??? + * val producerOf: ProducerOf[F] = ??? + * + * for { + * registry <- KafkaMetricsRegistry.of(prometheus) + * + * consumer <- consumerOf(consumerConfig) + * _ <- registry.register(consumer.clientMetrics) + * + * producer <- producerOf(producerConfig) + * _ <- registry.register(producer.clientMetrics) + * } yield () + * }}} + */ +trait KafkaMetricsRegistry[F[_]] { + + /** + * Register a function to obtain a list of client metrics. + * Normally, you would pass + * [[com.evolutiongaming.skafka.consumer.Consumer#clientMetrics]] or + * [[com.evolutiongaming.skafka.producer.Producer#clientMetrics]] + */ + def register(metrics: F[Seq[ClientMetric[F]]]): Resource[F, Unit] +} + +object KafkaMetricsRegistry { + + def of[F[_]: Sync: ToTry: UUIDGen]( + prometheus: CollectorRegistry, + prefix: Option[String] = None, + ): Resource[F, KafkaMetricsRegistry[F]] = + for { + sources <- Ref[F].of(Map.empty[UUID, F[Seq[ClientMetric[F]]]]).toResource + + metrics = sources.get.flatMap { sources => + sources.toSeq.flatTraverse { + case (uuid, metrics) => + metrics.map { metrics => + metrics.map { metric => metric.copy(tags = metric.tags + ("uuid" -> uuid.toString)) } + } + } + } + + collector = new KafkaMetricsCollector[F](metrics, prefix) + allocate = Sync[F].delay { prometheus.register(collector) } + release = Sync[F].delay { prometheus.unregister(collector) } + + _ <- Resource.make(allocate)(_ => release) + } yield new KafkaMetricsRegistry[F] { + + def register(metrics: F[Seq[ClientMetric[F]]]): Resource[F, Unit] = + for { + uuid <- UUIDGen[F].randomUUID.toResource + + allocate = sources.update { sources => sources.updated(uuid, metrics) } + release = sources.update { sources => sources - uuid } + + _ <- Resource.make(allocate)(_ => release) + } yield {} + } + +} diff --git a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetricsOf.scala b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetricsOf.scala index 2e7fd62f..6efba510 100644 --- a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetricsOf.scala +++ b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetricsOf.scala @@ -2,8 +2,8 @@ package com.evolutiongaming.skafka.producer import cats.effect.{Resource, Sync} import com.evolutiongaming.catshelper.ToTry -import com.evolutiongaming.skafka.{ClientId, Topic} -import com.evolutiongaming.skafka.metrics.KafkaMetricsCollector +import com.evolutiongaming.skafka.Topic +import com.evolutiongaming.skafka.metrics.KafkaMetricsRegistry import io.prometheus.client.CollectorRegistry import scala.concurrent.duration.FiniteDuration @@ -14,16 +14,18 @@ object ProducerMetricsOf { * Construct [[ProducerMetrics]] that will expose Java Kafka client metrics. * * @param source original [[ProducerMetrics]] - * @param prefix function that provides _unique_ prefix for each client + * @param prefix metric name prefix * @param prometheus instance of Prometheus registry * @return [[ProducerMetrics]] that exposes Java Kafka client metrics */ def withJavaClientMetrics[F[_]: Sync: ToTry]( source: ProducerMetrics[F], - prefix: ClientId => String, + prefix: Option[String], prometheus: CollectorRegistry - ): ProducerMetrics[F] = - new ProducerMetrics[F] { + ): Resource[F, ProducerMetrics[F]] = + for { + registry <- KafkaMetricsRegistry.of(prometheus, prefix) + } yield new ProducerMetrics[F] { override def initTransactions(latency: FiniteDuration): F[Unit] = source.initTransactions(latency) override def beginTransaction: F[Unit] = source.beginTransaction @@ -44,30 +46,25 @@ object ProducerMetricsOf { override def flush(latency: FiniteDuration): F[Unit] = source.flush(latency) - override def exposeJavaMetrics(producer: Producer[F], clientId: Option[ClientId]): Resource[F, Unit] = { - val collector = new KafkaMetricsCollector[F](producer.clientMetrics, clientId.map(prefix)) - Resource.make { - Sync[F].delay { prometheus.register(collector) } - } { _ => - Sync[F].delay { prometheus.unregister(collector) } - } - } + override def exposeJavaMetrics(producer: Producer[F]): Resource[F, Unit] = + registry.register(producer.clientMetrics) } - implicit final class Syntax[F[_]](val source: ProducerMetrics[F]) extends AnyVal { + implicit final class ProducerMetricsOps[F[_]](val source: ProducerMetrics[F]) extends AnyVal { /** * Construct [[ProducerMetrics]] that will expose Java Kafka client metrics. * - * @param prefix function that provides _unique_ prefix for each client + * @param prefix metric name prefix * @param prometheus instance of Prometheus registry * @return [[ProducerMetrics]] that exposes Java Kafka client metrics */ def exposeJavaClientMetrics( - prefix: ClientId => String, + prefix: Option[String], prometheus: CollectorRegistry - )(implicit F: Sync[F], toTry: ToTry[F]): ProducerMetrics[F] = withJavaClientMetrics(source, prefix, prometheus) + )(implicit F: Sync[F], toTry: ToTry[F]): Resource[F, ProducerMetrics[F]] = + withJavaClientMetrics(source, prefix, prometheus) } diff --git a/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetrics.scala b/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetrics.scala index 3c30488f..a2d38504 100644 --- a/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetrics.scala +++ b/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetrics.scala @@ -22,10 +22,8 @@ trait ConsumerMetrics[F[_]] { def topics(latency: FiniteDuration): F[Unit] - private[consumer] def exposeJavaMetrics[K, V]( - @nowarn consumer: Consumer[F, K, V], - @nowarn clientId: Option[ClientId], - ): Resource[F, Unit] = Resource.unit[F] + private[consumer] def exposeJavaMetrics[K, V](@nowarn consumer: Consumer[F, K, V]): Resource[F, Unit] = + Resource.unit[F] } object ConsumerMetrics { @@ -220,8 +218,8 @@ object ConsumerMetrics { fg(self.topics(latency)) } - override def exposeJavaMetrics[K, V](consumer: Consumer[G, K, V], clientId: Option[ClientId]) = - self.exposeJavaMetrics(consumer.mapK(gf, fg), clientId).mapK(fg) + override def exposeJavaMetrics[K, V](consumer: Consumer[G, K, V]) = + self.exposeJavaMetrics(consumer.mapK(gf, fg)).mapK(fg) } } } diff --git a/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerOf.scala b/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerOf.scala index 1a6911a4..c9a97662 100644 --- a/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerOf.scala +++ b/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerOf.scala @@ -39,7 +39,7 @@ object ConsumerOf { case Some(metrics) => for { - _ <- metrics.exposeJavaMetrics[K, V](consumer, config.common.clientId) + _ <- metrics.exposeJavaMetrics[K, V](consumer) } yield { consumer.withMetrics1[Throwable](metrics) } diff --git a/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetrics.scala b/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetrics.scala index 6911337a..8a1610e0 100644 --- a/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetrics.scala +++ b/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetrics.scala @@ -32,10 +32,7 @@ trait ProducerMetrics[F[_]] { def flush(latency: FiniteDuration): F[Unit] - private[producer] def exposeJavaMetrics( - @nowarn producer: Producer[F], - @nowarn clientId: Option[ClientId], - ): Resource[F, Unit] = Resource.unit[F] + private[producer] def exposeJavaMetrics(@nowarn producer: Producer[F]): Resource[F, Unit] = Resource.unit[F] } object ProducerMetrics { @@ -233,8 +230,8 @@ object ProducerMetrics { def flush(latency: FiniteDuration) = fg(self.flush(latency)) - override def exposeJavaMetrics(producer: Producer[G], clientId: Option[ClientId]) = - self.exposeJavaMetrics(producer.mapK[F](gf, fg), clientId).mapK(fg) + override def exposeJavaMetrics(producer: Producer[G]) = + self.exposeJavaMetrics(producer.mapK[F](gf, fg)).mapK(fg) } } } diff --git a/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerOf.scala b/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerOf.scala index de5b4547..e431616c 100644 --- a/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerOf.scala +++ b/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerOf.scala @@ -33,7 +33,7 @@ object ProducerOf { case Some(metrics) => for { - _ <- metrics.exposeJavaMetrics(producer, config.common.clientId) + _ <- metrics.exposeJavaMetrics(producer) } yield { producer.withMetrics[Throwable](metrics) } From 2c8439144ce14a5db62849fd7365bfa95ac180e7 Mon Sep 17 00:00:00 2001 From: Denys Fakhritdinov Date: Fri, 12 Jul 2024 16:18:21 +0200 Subject: [PATCH 7/9] Fix readme --- README.md | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index c693f776..23fdb832 100644 --- a/README.md +++ b/README.md @@ -79,23 +79,23 @@ val records: IO[ConsumerRecords[String, String]] = consumer.use { consumer => The example below demonstrates creation of `Consumer`, but same can be done for `Producer` as well. -> :warning: Creating consumer/producer will register new set of metrics in Prometheus registry -> thus metric names must not clash. For that you have to provide function of type `ClientId => String`, -> that takes client ID and returns metric prefix. The prefix expected to be unique! +> :warning: using `ConsumerMetricsOf.withJavaClientMetrics` registers new Prometheus collector +> under the hood. Please use unique prefixes for each collector to avoid duplicated metrics +> in Prometheus (i.e. runtime exception on registration). Prefix can be set as parameter in: +> `ConsumerMetricsOf.withJavaClientMetrics(registry, Some("the_prefix"))` ```scala import ConsumerMetricsOf.* val config: ConsumerConfig = ??? val registry: CollectorRegistry = ??? // Prometheus client -val metrics0: ConsumerMetrics[IO] = ??? -val metrics1: ConsumerMetrics[IO] = metrics0.exposeJavaClientMetrics(identity, registry) -ConsumerOf - .apply1(metrics1.some) - .apply(config) - .use { consumer => - ??? - } +val metrics: ConsumerMetrics[IO] = ??? + +for { + metrics <- metrics.exposeJavaClientMetrics(registry) + consumerOf = ConsumerOf.apply1(metrics1.some) + consumer <- consumerOf(config) +} yield ??? ``` ## Setup From 2a454d529f43ef2c990fc26bd4017b3d7834e6c6 Mon Sep 17 00:00:00 2001 From: Denys Fakhritdinov Date: Fri, 12 Jul 2024 16:27:24 +0200 Subject: [PATCH 8/9] make compatible with scala 2.12 --- .../skafka/metrics/KafkaMetricsRegistry.scala | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/metrics/KafkaMetricsRegistry.scala b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/metrics/KafkaMetricsRegistry.scala index 0d55dc49..d93e4862 100644 --- a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/metrics/KafkaMetricsRegistry.scala +++ b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/metrics/KafkaMetricsRegistry.scala @@ -50,14 +50,19 @@ object KafkaMetricsRegistry { for { sources <- Ref[F].of(Map.empty[UUID, F[Seq[ClientMetric[F]]]]).toResource - metrics = sources.get.flatMap { sources => - sources.toSeq.flatTraverse { - case (uuid, metrics) => - metrics.map { metrics => - metrics.map { metric => metric.copy(tags = metric.tags + ("uuid" -> uuid.toString)) } + metrics = sources + .get + .flatMap { sources => + sources + .toList + .flatTraverse { + case (uuid, metrics) => + metrics.map { metrics => + metrics.toList.map { metric => metric.copy(tags = metric.tags + ("uuid" -> uuid.toString)) } + } } } - } + .widen[Seq[ClientMetric[F]]] collector = new KafkaMetricsCollector[F](metrics, prefix) allocate = Sync[F].delay { prometheus.register(collector) } From 3b4796192166a4e6c10c30aca7d53f6e4043aae3 Mon Sep 17 00:00:00 2001 From: Denys Fakhritdinov Date: Mon, 15 Jul 2024 12:16:29 +0200 Subject: [PATCH 9/9] minor refactoring + fix docs --- README.md | 12 ++++++------ .../skafka/consumer/ConsumerMetricsOf.scala | 15 ++++++++------- .../skafka/producer/ProducerMetricsOf.scala | 13 +++++++------ 3 files changed, 21 insertions(+), 19 deletions(-) diff --git a/README.md b/README.md index 23fdb832..d6dbefcf 100644 --- a/README.md +++ b/README.md @@ -79,20 +79,20 @@ val records: IO[ConsumerRecords[String, String]] = consumer.use { consumer => The example below demonstrates creation of `Consumer`, but same can be done for `Producer` as well. -> :warning: using `ConsumerMetricsOf.withJavaClientMetrics` registers new Prometheus collector -> under the hood. Please use unique prefixes for each collector to avoid duplicated metrics -> in Prometheus (i.e. runtime exception on registration). Prefix can be set as parameter in: -> `ConsumerMetricsOf.withJavaClientMetrics(registry, Some("the_prefix"))` +> :warning: using `ConsumerMetricsOf.withJavaClientMetrics` (or its alternative `metrics.exposeJavaClientMetrics`) +> registers new Prometheus collector under the hood. Please use unique prefixes for each collector +> to avoid duplicated metrics in Prometheus (i.e. runtime exception on registration). +> Prefix can be set as parameter in: `ConsumerMetricsOf.withJavaClientMetrics(prometheus, Some("the_prefix"))` ```scala import ConsumerMetricsOf.* val config: ConsumerConfig = ??? -val registry: CollectorRegistry = ??? // Prometheus client +val prometheus: CollectorRegistry = ??? val metrics: ConsumerMetrics[IO] = ??? for { - metrics <- metrics.exposeJavaClientMetrics(registry) + metrics <- metrics.exposeJavaClientMetrics(prometheus) consumerOf = ConsumerOf.apply1(metrics1.some) consumer <- consumerOf(config) } yield ??? diff --git a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala index e0709fb5..ef02d20b 100644 --- a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala +++ b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala @@ -1,6 +1,7 @@ package com.evolutiongaming.skafka.consumer import cats.effect.{Resource, Sync} +import cats.effect.std.UUIDGen import com.evolutiongaming.catshelper.ToTry import com.evolutiongaming.skafka.{Topic, TopicPartition} import com.evolutiongaming.skafka.metrics.KafkaMetricsRegistry @@ -14,14 +15,14 @@ object ConsumerMetricsOf { * Construct [[ConsumerMetrics]] that will expose Java Kafka client metrics. * * @param source original [[ConsumerMetrics]] - * @param prefix metric name prefix * @param prometheus instance of Prometheus registry + * @param prefix metric name prefix * @return [[ConsumerMetrics]] that exposes Java Kafka client metrics */ - def withJavaClientMetrics[F[_]: Sync: ToTry]( + def withJavaClientMetrics[F[_]: Sync: ToTry: UUIDGen]( source: ConsumerMetrics[F], + prometheus: CollectorRegistry, prefix: Option[String], - prometheus: CollectorRegistry ): Resource[F, ConsumerMetrics[F]] = for { registry <- KafkaMetricsRegistry.of(prometheus, prefix) @@ -51,15 +52,15 @@ object ConsumerMetricsOf { /** * Construct [[ConsumerMetrics]] that will expose Java Kafka client metrics. * - * @param prefix function that provides _unique_ prefix for each client * @param prometheus instance of Prometheus registry + * @param prefix metric name prefix * @return [[ConsumerMetrics]] that exposes Java Kafka client metrics */ def exposeJavaClientMetrics( - prefix: Option[String], - prometheus: CollectorRegistry + prometheus: CollectorRegistry, + prefix: Option[String] = None, )(implicit F: Sync[F], toTry: ToTry[F]): Resource[F, ConsumerMetrics[F]] = - withJavaClientMetrics(source, prefix, prometheus) + withJavaClientMetrics(source, prometheus, prefix) } } diff --git a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetricsOf.scala b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetricsOf.scala index 6efba510..ee3ce3a8 100644 --- a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetricsOf.scala +++ b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetricsOf.scala @@ -1,6 +1,7 @@ package com.evolutiongaming.skafka.producer import cats.effect.{Resource, Sync} +import cats.effect.std.UUIDGen import com.evolutiongaming.catshelper.ToTry import com.evolutiongaming.skafka.Topic import com.evolutiongaming.skafka.metrics.KafkaMetricsRegistry @@ -14,14 +15,14 @@ object ProducerMetricsOf { * Construct [[ProducerMetrics]] that will expose Java Kafka client metrics. * * @param source original [[ProducerMetrics]] - * @param prefix metric name prefix * @param prometheus instance of Prometheus registry + * @param prefix metric name prefix * @return [[ProducerMetrics]] that exposes Java Kafka client metrics */ - def withJavaClientMetrics[F[_]: Sync: ToTry]( + def withJavaClientMetrics[F[_]: Sync: ToTry: UUIDGen]( source: ProducerMetrics[F], + prometheus: CollectorRegistry, prefix: Option[String], - prometheus: CollectorRegistry ): Resource[F, ProducerMetrics[F]] = for { registry <- KafkaMetricsRegistry.of(prometheus, prefix) @@ -56,15 +57,15 @@ object ProducerMetricsOf { /** * Construct [[ProducerMetrics]] that will expose Java Kafka client metrics. * - * @param prefix metric name prefix * @param prometheus instance of Prometheus registry + * @param prefix metric name prefix * @return [[ProducerMetrics]] that exposes Java Kafka client metrics */ def exposeJavaClientMetrics( + prometheus: CollectorRegistry, prefix: Option[String], - prometheus: CollectorRegistry )(implicit F: Sync[F], toTry: ToTry[F]): Resource[F, ProducerMetrics[F]] = - withJavaClientMetrics(source, prefix, prometheus) + withJavaClientMetrics(source, prometheus, prefix) }