diff --git a/README.md b/README.md index 2b35664e..d6dbefcf 100644 --- a/README.md +++ b/README.md @@ -75,6 +75,29 @@ val records: IO[ConsumerRecords[String, String]] = consumer.use { consumer => } ``` +## Java client metrics example + +The example below demonstrates creation of `Consumer`, but same can be done for `Producer` as well. + +> :warning: using `ConsumerMetricsOf.withJavaClientMetrics` (or its alternative `metrics.exposeJavaClientMetrics`) +> registers new Prometheus collector under the hood. Please use unique prefixes for each collector +> to avoid duplicated metrics in Prometheus (i.e. runtime exception on registration). +> Prefix can be set as parameter in: `ConsumerMetricsOf.withJavaClientMetrics(prometheus, Some("the_prefix"))` + +```scala +import ConsumerMetricsOf.* + +val config: ConsumerConfig = ??? +val prometheus: CollectorRegistry = ??? +val metrics: ConsumerMetrics[IO] = ??? + +for { + metrics <- metrics.exposeJavaClientMetrics(prometheus) + consumerOf = ConsumerOf.apply1(metrics1.some) + consumer <- consumerOf(config) +} yield ??? +``` + ## Setup ```scala diff --git a/build.sbt b/build.sbt index 70b26b14..99112428 100644 --- a/build.sbt +++ b/build.sbt @@ -39,7 +39,9 @@ lazy val commonSettings = Seq( ProblemFilters.exclude[ReversedMissingMethodProblem]("com.evolutiongaming.skafka.consumer.Consumer.subscribe"), ProblemFilters.exclude[DirectMissingMethodProblem]( "com.evolutiongaming.skafka.Converters#MapJOps.asScalaMap$extension" - ) + ), + ProblemFilters.exclude[DirectMissingMethodProblem]("com.evolutiongaming.skafka.consumer.ConsumerMetrics#ConsumerMetricsOps.mapK$extension"), + ProblemFilters.exclude[DirectMissingMethodProblem]("com.evolutiongaming.skafka.producer.ProducerMetrics#ProducerMetricsOps.mapK$extension"), ) ) diff --git a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala new file mode 100644 index 00000000..ef02d20b --- /dev/null +++ b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala @@ -0,0 +1,66 @@ +package com.evolutiongaming.skafka.consumer + +import cats.effect.{Resource, Sync} +import cats.effect.std.UUIDGen +import com.evolutiongaming.catshelper.ToTry +import com.evolutiongaming.skafka.{Topic, TopicPartition} +import com.evolutiongaming.skafka.metrics.KafkaMetricsRegistry +import io.prometheus.client.CollectorRegistry + +import scala.concurrent.duration.FiniteDuration + +object ConsumerMetricsOf { + + /** + * Construct [[ConsumerMetrics]] that will expose Java Kafka client metrics. + * + * @param source original [[ConsumerMetrics]] + * @param prometheus instance of Prometheus registry + * @param prefix metric name prefix + * @return [[ConsumerMetrics]] that exposes Java Kafka client metrics + */ + def withJavaClientMetrics[F[_]: Sync: ToTry: UUIDGen]( + source: ConsumerMetrics[F], + prometheus: CollectorRegistry, + prefix: Option[String], + ): Resource[F, ConsumerMetrics[F]] = + for { + registry <- KafkaMetricsRegistry.of(prometheus, prefix) + } yield new ConsumerMetrics[F] { + override def call(name: String, topic: Topic, latency: FiniteDuration, success: Boolean): F[Unit] = + source.call(name, topic, latency, success) + + override def poll(topic: Topic, bytes: Int, records: Int, age: Option[FiniteDuration]): F[Unit] = + source.poll(topic, bytes, records, age) + + override def count(name: String, topic: Topic): F[Unit] = + source.count(name, topic) + + override def rebalance(name: String, topicPartition: TopicPartition): F[Unit] = + source.rebalance(name, topicPartition) + + override def topics(latency: FiniteDuration): F[Unit] = + source.topics(latency) + + override def exposeJavaMetrics[K, V](consumer: Consumer[F, K, V]): Resource[F, Unit] = + registry.register(consumer.clientMetrics) + + } + + implicit final class ConsumerMetricsOps[F[_]](val source: ConsumerMetrics[F]) extends AnyVal { + + /** + * Construct [[ConsumerMetrics]] that will expose Java Kafka client metrics. + * + * @param prometheus instance of Prometheus registry + * @param prefix metric name prefix + * @return [[ConsumerMetrics]] that exposes Java Kafka client metrics + */ + def exposeJavaClientMetrics( + prometheus: CollectorRegistry, + prefix: Option[String] = None, + )(implicit F: Sync[F], toTry: ToTry[F]): Resource[F, ConsumerMetrics[F]] = + withJavaClientMetrics(source, prometheus, prefix) + + } +} diff --git a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/metrics/KafkaMetricsRegistry.scala b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/metrics/KafkaMetricsRegistry.scala new file mode 100644 index 00000000..d93e4862 --- /dev/null +++ b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/metrics/KafkaMetricsRegistry.scala @@ -0,0 +1,85 @@ +package com.evolutiongaming.skafka.metrics + +import cats.syntax.all._ +import cats.effect.syntax.resource._ +import cats.effect.{Resource, Ref, Sync} +import cats.effect.std.UUIDGen +import com.evolutiongaming.catshelper.ToTry +import com.evolutiongaming.skafka.ClientMetric +import io.prometheus.client.CollectorRegistry + +import java.util.UUID + +/** + * Allows reporting metrics of multiple Kafka clients inside a single VM. + * + * Example: + * {{{ + * val prometheus: CollectorRegistry = ??? + * val consumerOf: ConsumerOf[F] = ??? + * val producerOf: ProducerOf[F] = ??? + * + * for { + * registry <- KafkaMetricsRegistry.of(prometheus) + * + * consumer <- consumerOf(consumerConfig) + * _ <- registry.register(consumer.clientMetrics) + * + * producer <- producerOf(producerConfig) + * _ <- registry.register(producer.clientMetrics) + * } yield () + * }}} + */ +trait KafkaMetricsRegistry[F[_]] { + + /** + * Register a function to obtain a list of client metrics. + * Normally, you would pass + * [[com.evolutiongaming.skafka.consumer.Consumer#clientMetrics]] or + * [[com.evolutiongaming.skafka.producer.Producer#clientMetrics]] + */ + def register(metrics: F[Seq[ClientMetric[F]]]): Resource[F, Unit] +} + +object KafkaMetricsRegistry { + + def of[F[_]: Sync: ToTry: UUIDGen]( + prometheus: CollectorRegistry, + prefix: Option[String] = None, + ): Resource[F, KafkaMetricsRegistry[F]] = + for { + sources <- Ref[F].of(Map.empty[UUID, F[Seq[ClientMetric[F]]]]).toResource + + metrics = sources + .get + .flatMap { sources => + sources + .toList + .flatTraverse { + case (uuid, metrics) => + metrics.map { metrics => + metrics.toList.map { metric => metric.copy(tags = metric.tags + ("uuid" -> uuid.toString)) } + } + } + } + .widen[Seq[ClientMetric[F]]] + + collector = new KafkaMetricsCollector[F](metrics, prefix) + allocate = Sync[F].delay { prometheus.register(collector) } + release = Sync[F].delay { prometheus.unregister(collector) } + + _ <- Resource.make(allocate)(_ => release) + } yield new KafkaMetricsRegistry[F] { + + def register(metrics: F[Seq[ClientMetric[F]]]): Resource[F, Unit] = + for { + uuid <- UUIDGen[F].randomUUID.toResource + + allocate = sources.update { sources => sources.updated(uuid, metrics) } + release = sources.update { sources => sources - uuid } + + _ <- Resource.make(allocate)(_ => release) + } yield {} + } + +} diff --git a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetricsOf.scala b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetricsOf.scala new file mode 100644 index 00000000..ee3ce3a8 --- /dev/null +++ b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetricsOf.scala @@ -0,0 +1,72 @@ +package com.evolutiongaming.skafka.producer + +import cats.effect.{Resource, Sync} +import cats.effect.std.UUIDGen +import com.evolutiongaming.catshelper.ToTry +import com.evolutiongaming.skafka.Topic +import com.evolutiongaming.skafka.metrics.KafkaMetricsRegistry +import io.prometheus.client.CollectorRegistry + +import scala.concurrent.duration.FiniteDuration + +object ProducerMetricsOf { + + /** + * Construct [[ProducerMetrics]] that will expose Java Kafka client metrics. + * + * @param source original [[ProducerMetrics]] + * @param prometheus instance of Prometheus registry + * @param prefix metric name prefix + * @return [[ProducerMetrics]] that exposes Java Kafka client metrics + */ + def withJavaClientMetrics[F[_]: Sync: ToTry: UUIDGen]( + source: ProducerMetrics[F], + prometheus: CollectorRegistry, + prefix: Option[String], + ): Resource[F, ProducerMetrics[F]] = + for { + registry <- KafkaMetricsRegistry.of(prometheus, prefix) + } yield new ProducerMetrics[F] { + override def initTransactions(latency: FiniteDuration): F[Unit] = source.initTransactions(latency) + + override def beginTransaction: F[Unit] = source.beginTransaction + + override def sendOffsetsToTransaction(latency: FiniteDuration): F[Unit] = source.sendOffsetsToTransaction(latency) + + override def commitTransaction(latency: FiniteDuration): F[Unit] = source.commitTransaction(latency) + + override def abortTransaction(latency: FiniteDuration): F[Unit] = source.abortTransaction(latency) + + override def send(topic: Topic, latency: FiniteDuration, bytes: Int): F[Unit] = source.send(topic, latency, bytes) + + override def block(topic: Topic, latency: FiniteDuration): F[Unit] = source.block(topic, latency) + + override def failure(topic: Topic, latency: FiniteDuration): F[Unit] = source.failure(topic, latency) + + override def partitions(topic: Topic, latency: FiniteDuration): F[Unit] = source.partitions(topic, latency) + + override def flush(latency: FiniteDuration): F[Unit] = source.flush(latency) + + override def exposeJavaMetrics(producer: Producer[F]): Resource[F, Unit] = + registry.register(producer.clientMetrics) + + } + + implicit final class ProducerMetricsOps[F[_]](val source: ProducerMetrics[F]) extends AnyVal { + + /** + * Construct [[ProducerMetrics]] that will expose Java Kafka client metrics. + * + * @param prometheus instance of Prometheus registry + * @param prefix metric name prefix + * @return [[ProducerMetrics]] that exposes Java Kafka client metrics + */ + def exposeJavaClientMetrics( + prometheus: CollectorRegistry, + prefix: Option[String], + )(implicit F: Sync[F], toTry: ToTry[F]): Resource[F, ProducerMetrics[F]] = + withJavaClientMetrics(source, prometheus, prefix) + + } + +} diff --git a/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetrics.scala b/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetrics.scala index 6d0f8340..a2d38504 100644 --- a/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetrics.scala +++ b/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetrics.scala @@ -1,6 +1,6 @@ package com.evolutiongaming.skafka.consumer -import cats.effect.Resource +import cats.effect.{MonadCancel, Resource} import cats.implicits._ import cats.{Applicative, Monad, ~>} import com.evolutiongaming.skafka.{ClientId, Topic, TopicPartition} @@ -8,6 +8,7 @@ import com.evolutiongaming.smetrics.MetricsHelper._ import com.evolutiongaming.smetrics.{CollectorRegistry, LabelNames, Quantiles} import scala.concurrent.duration.FiniteDuration +import scala.annotation.nowarn trait ConsumerMetrics[F[_]] { @@ -20,6 +21,9 @@ trait ConsumerMetrics[F[_]] { def rebalance(name: String, topicPartition: TopicPartition): F[Unit] def topics(latency: FiniteDuration): F[Unit] + + private[consumer] def exposeJavaMetrics[K, V](@nowarn consumer: Consumer[F, K, V]): Resource[F, Unit] = + Resource.unit[F] } object ConsumerMetrics { @@ -104,11 +108,11 @@ object ConsumerMetrics { bytesSummary <- bytesSummary rebalancesCounter <- rebalancesCounter topicsLatency <- topicsLatency - ageSummary <- registry.summary( - name = s"${ prefix }_poll_age", - help = "Poll records age, time since record.timestamp", + ageSummary <- registry.summary( + name = s"${prefix}_poll_age", + help = "Poll records age, time since record.timestamp", quantiles = Quantiles.Default, - labels = LabelNames("client", "topic") + labels = LabelNames("client", "topic") ) } yield { (clientId: ClientId) => new ConsumerMetrics[F] { @@ -162,6 +166,7 @@ object ConsumerMetrics { implicit class ConsumerMetricsOps[F[_]](val self: ConsumerMetrics[F]) extends AnyVal { + @deprecated("Use mapK(f, g) instead", "16.2.0") def mapK[G[_]](f: F ~> G): ConsumerMetrics[G] = { new MapK with ConsumerMetrics[G] { @@ -186,5 +191,36 @@ object ConsumerMetrics { } } } + + def mapK[G[_]]( + fg: F ~> G, + gf: G ~> F + )(implicit F: MonadCancel[F, Throwable], G: MonadCancel[G, Throwable]): ConsumerMetrics[G] = { + new MapK with ConsumerMetrics[G] { + + def call(name: String, topic: Topic, latency: FiniteDuration, success: Boolean) = { + fg(self.call(name, topic, latency, success)) + } + + def poll(topic: Topic, bytes: Int, records: Int, age: Option[FiniteDuration]) = { + fg(self.poll(topic, bytes, records, age)) + } + + def count(name: String, topic: Topic) = { + fg(self.count(name, topic)) + } + + def rebalance(name: String, topicPartition: TopicPartition) = { + fg(self.rebalance(name, topicPartition)) + } + + def topics(latency: FiniteDuration) = { + fg(self.topics(latency)) + } + + override def exposeJavaMetrics[K, V](consumer: Consumer[G, K, V]) = + self.exposeJavaMetrics(consumer.mapK(gf, fg)).mapK(fg) + } + } } } diff --git a/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerOf.scala b/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerOf.scala index cd721f83..c9a97662 100644 --- a/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerOf.scala +++ b/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerOf.scala @@ -31,20 +31,31 @@ object ConsumerOf { def apply[K, V](config: ConsumerConfig)(implicit fromBytesK: FromBytes[F, K], fromBytesV: FromBytes[F, V]) = { Consumer .of[F, K, V](config) - .map { consumer => - metrics.fold { consumer } { consumer.withMetrics1[Throwable] } + .flatMap { consumer => + metrics match { + + case None => + Resource.pure[F, Consumer[F, K, V]](consumer) + + case Some(metrics) => + for { + _ <- metrics.exposeJavaMetrics[K, V](consumer) + } yield { + consumer.withMetrics1[Throwable](metrics) + } + } } } } } /** The sole purpose of this method is to support binary compatibility with an intermediate - * version (namely, 15.2.0) which had `apply1` method using `MeasureDuration` from `smetrics` - * and `apply2` using `MeasureDuration` from `cats-helper`. - * This should not be used and should be removed in a reasonable amount of time. - */ + * version (namely, 15.2.0) which had `apply1` method using `MeasureDuration` from `smetrics` + * and `apply2` using `MeasureDuration` from `cats-helper`. + * This should not be used and should be removed in a reasonable amount of time. + */ @deprecated("Use `apply1`", since = "16.0.3") - def apply2[F[_] : Async : ToTry : ToFuture : MeasureDuration]( + def apply2[F[_]: Async: ToTry: ToFuture: MeasureDuration]( metrics: Option[ConsumerMetrics[F]] = None ): ConsumerOf[F] = apply1(metrics) diff --git a/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetrics.scala b/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetrics.scala index 8accb7c0..8a1610e0 100644 --- a/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetrics.scala +++ b/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetrics.scala @@ -1,6 +1,6 @@ package com.evolutiongaming.skafka.producer -import cats.effect.Resource +import cats.effect.{MonadCancel, Resource} import cats.implicits._ import cats.{Applicative, Monad, ~>} import com.evolutiongaming.skafka.{ClientId, Topic} @@ -8,6 +8,7 @@ import com.evolutiongaming.smetrics.MetricsHelper._ import com.evolutiongaming.smetrics.{CollectorRegistry, LabelNames, Quantile, Quantiles} import scala.concurrent.duration.FiniteDuration +import scala.annotation.nowarn trait ProducerMetrics[F[_]] { @@ -30,6 +31,8 @@ trait ProducerMetrics[F[_]] { def partitions(topic: Topic, latency: FiniteDuration): F[Unit] def flush(latency: FiniteDuration): F[Unit] + + private[producer] def exposeJavaMetrics(@nowarn producer: Producer[F]): Resource[F, Unit] = Resource.unit[F] } object ProducerMetrics { @@ -177,6 +180,7 @@ object ProducerMetrics { implicit class ProducerMetricsOps[F[_]](val self: ProducerMetrics[F]) extends AnyVal { + @deprecated("Use mapK(f, g) instead", "16.2.0") def mapK[G[_]](f: F ~> G): ProducerMetrics[G] = new ProducerMetrics[G] { def initTransactions(latency: FiniteDuration) = f(self.initTransactions(latency)) @@ -199,5 +203,35 @@ object ProducerMetrics { def flush(latency: FiniteDuration) = f(self.flush(latency)) } + + def mapK[G[_]]( + fg: F ~> G, + gf: G ~> F + )(implicit F: MonadCancel[F, Throwable], G: MonadCancel[G, Throwable]): ProducerMetrics[G] = + new ProducerMetrics[G] { + + def initTransactions(latency: FiniteDuration) = fg(self.initTransactions(latency)) + + def beginTransaction = fg(self.beginTransaction) + + def sendOffsetsToTransaction(latency: FiniteDuration) = fg(self.sendOffsetsToTransaction(latency)) + + def commitTransaction(latency: FiniteDuration) = fg(self.commitTransaction(latency)) + + def abortTransaction(latency: FiniteDuration) = fg(self.abortTransaction(latency)) + + def send(topic: Topic, latency: FiniteDuration, bytes: Int) = fg(self.send(topic, latency, bytes)) + + def block(topic: Topic, latency: FiniteDuration) = fg(self.block(topic, latency)) + + def failure(topic: Topic, latency: FiniteDuration) = fg(self.failure(topic, latency)) + + def partitions(topic: Topic, latency: FiniteDuration) = fg(self.partitions(topic, latency)) + + def flush(latency: FiniteDuration) = fg(self.flush(latency)) + + override def exposeJavaMetrics(producer: Producer[G]) = + self.exposeJavaMetrics(producer.mapK[F](gf, fg)).mapK(fg) + } } } diff --git a/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerOf.scala b/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerOf.scala index 9dc0fa21..e431616c 100644 --- a/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerOf.scala +++ b/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerOf.scala @@ -26,19 +26,30 @@ object ProducerOf { def apply(config: ProducerConfig) = { for { producer <- Producer.of[F](config = config) - } yield { - metrics.fold(producer)(producer.withMetrics[Throwable]) - } + producer <- metrics match { + + case None => + Resource.pure[F, Producer[F]](producer) + + case Some(metrics) => + for { + _ <- metrics.exposeJavaMetrics(producer) + } yield { + producer.withMetrics[Throwable](metrics) + } + + } + } yield producer } } /** The sole purpose of this method is to support binary compatibility with an intermediate - * version (namely, 15.2.0) which had `apply1` method using `MeasureDuration` from `smetrics` - * and `apply2` using `MeasureDuration` from `cats-helper`. - * This should not be used and should be removed in a reasonable amount of time. - */ + * version (namely, 15.2.0) which had `apply1` method using `MeasureDuration` from `smetrics` + * and `apply2` using `MeasureDuration` from `cats-helper`. + * This should not be used and should be removed in a reasonable amount of time. + */ @deprecated("Use `apply1`", since = "16.0.3") - def apply2[F[_] : MeasureDuration : ToTry : Async]( + def apply2[F[_]: MeasureDuration: ToTry: Async]( metrics: Option[ProducerMetrics[F]] = None ): ProducerOf[F] = apply1(metrics) diff --git a/version.sbt b/version.sbt index dfc364c5..ec7da3a9 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -ThisBuild / version := "16.1.3-SNAPSHOT" +ThisBuild / version := "16.2.0"