From e785850c92544f58a9c7c10624046da73cc443bb Mon Sep 17 00:00:00 2001 From: Arek Burdach Date: Tue, 20 Apr 2021 17:26:11 +0200 Subject: [PATCH] Default InfluxDb reporter for standalone-app metrics --- build.sbt | 4 +- .../standalone/http/StandaloneHttpApp.scala | 11 ++- .../influxdb/InfluxDbHttpReporter.scala | 75 +++++++++++++++++++ .../influxdb/StandaloneInfluxDbReporter.scala | 28 +++++++ 4 files changed, 113 insertions(+), 5 deletions(-) create mode 100644 engine/standalone/util/src/main/scala/pl/touk/nussknacker/engine/standalone/utils/metrics/dropwizard/influxdb/InfluxDbHttpReporter.scala create mode 100644 engine/standalone/util/src/main/scala/pl/touk/nussknacker/engine/standalone/utils/metrics/dropwizard/influxdb/StandaloneInfluxDbReporter.scala diff --git a/build.sbt b/build.sbt index e5c080699a0..8bb1b6997ff 100644 --- a/build.sbt +++ b/build.sbt @@ -782,8 +782,10 @@ lazy val standaloneUtil = (project in engine("standalone/util")). name := "nussknacker-standalone-util", libraryDependencies ++= { Seq( - "io.dropwizard.metrics5" % "metrics-core" % dropWizardV, + "io.dropwizard.metrics5" % "metrics-influxdb" % dropWizardV, + "com.softwaremill.sttp.client" %% "core" % sttpV, + "com.softwaremill.sttp.client" %% "async-http-client-backend-future" % sttpV, //akka-http is only for StandaloneRequestResponseLogger "com.typesafe.akka" %% "akka-http" % akkaHttpV % "provided", "com.typesafe.akka" %% "akka-stream" % akkaV % "provided" diff --git a/engine/standalone/app/src/main/scala/pl/touk/nussknacker/engine/standalone/http/StandaloneHttpApp.scala b/engine/standalone/app/src/main/scala/pl/touk/nussknacker/engine/standalone/http/StandaloneHttpApp.scala index 9a5394bee78..51e329dd6f2 100644 --- a/engine/standalone/app/src/main/scala/pl/touk/nussknacker/engine/standalone/http/StandaloneHttpApp.scala +++ b/engine/standalone/app/src/main/scala/pl/touk/nussknacker/engine/standalone/http/StandaloneHttpApp.scala @@ -11,6 +11,7 @@ import io.dropwizard.metrics5.MetricRegistry import pl.touk.nussknacker.engine.standalone.deployment.DeploymentService import pl.touk.nussknacker.engine.standalone.utils.StandaloneContextPreparer import pl.touk.nussknacker.engine.standalone.utils.logging.StandaloneRequestResponseLogger +import pl.touk.nussknacker.engine.standalone.utils.metrics.dropwizard.influxdb.StandaloneInfluxDbReporter import pl.touk.nussknacker.engine.standalone.utils.metrics.dropwizard.{DropwizardMetricsProvider, StandaloneMetricsReporter} import pl.touk.nussknacker.engine.util.loader.ScalaServiceLoader @@ -46,8 +47,6 @@ object StandaloneHttpApp extends Directives with FailFastCirceSupport with LazyL port = processesPort ) - - } object StandaloneMetrics extends LazyLogging { @@ -55,8 +54,12 @@ object StandaloneMetrics extends LazyLogging { def prepareRegistry(config: Config): MetricRegistry = { val metricRegistry = new MetricRegistry val metricReporters = loadMetricsReporters() - metricReporters.foreach { reporter => - reporter.createAndRunReporter(metricRegistry, config) + if (metricReporters.nonEmpty) { + metricReporters.foreach { reporter => + reporter.createAndRunReporter(metricRegistry, config) + } + } else { + StandaloneInfluxDbReporter.createAndRunReporterIfConfigured(metricRegistry, config) } metricRegistry } diff --git a/engine/standalone/util/src/main/scala/pl/touk/nussknacker/engine/standalone/utils/metrics/dropwizard/influxdb/InfluxDbHttpReporter.scala b/engine/standalone/util/src/main/scala/pl/touk/nussknacker/engine/standalone/utils/metrics/dropwizard/influxdb/InfluxDbHttpReporter.scala new file mode 100644 index 00000000000..362de09ee10 --- /dev/null +++ b/engine/standalone/util/src/main/scala/pl/touk/nussknacker/engine/standalone/utils/metrics/dropwizard/influxdb/InfluxDbHttpReporter.scala @@ -0,0 +1,75 @@ +package pl.touk.nussknacker.engine.standalone.utils.metrics.dropwizard.influxdb + +import com.typesafe.scalalogging.LazyLogging +import io.dropwizard.metrics5.influxdb._ +import io.dropwizard.metrics5.{MetricName, MetricRegistry} +import sttp.client._ + +import java.lang +import java.nio.charset.StandardCharsets +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.duration.Duration +import scala.util.{Failure, Success, Try} + +object InfluxDbHttpReporter { + + def build(metricRegistry: MetricRegistry, conf: InfluxSenderConfig): InfluxDbReporter = + InfluxDbReporter.forRegistry(metricRegistry) + .prefixedWith( + conf.prefix.map(MetricName.build(_)).getOrElse(MetricName.empty()) + .tagged("host", conf.host) + .tagged("env", conf.environment) + .tagged("type", conf.`type`) + ).build(new InfluxDbHttpSender(conf)) +} + +class InfluxDbHttpSender(conf: InfluxSenderConfig) extends InfluxDbSender with LazyLogging { + private implicit val backend: SttpBackend[Try, Nothing, NothingT] = TryHttpURLConnectionBackend() + + private val buffer = ArrayBuffer[String]() + + logger.info(s"InfluxSender created with url: ${conf.req.uri}") + + override def connect(): Unit = {} + + override def send(measurement: lang.StringBuilder): Unit = buffer.append(measurement.toString) + + override def flush(): Unit = { + val data = buffer.mkString + logger.debug(s"Sending ${buffer.size} metrics for conf $conf") + buffer.clear() + val answer = conf.req.body(data).send() + + answer match { + case Success(res) if res.code.isSuccess => // nothing + case Success(res) => logger.warn(s"Failed to send data to influx: ${res.code.code}, ${res.body}") + case Failure(ex) => logger.warn(s"Failed to send data to influx: ${ex.getMessage}", ex) + } + } + + override def disconnect(): Unit = {} + + override def isConnected: Boolean = true + + override def close(): Unit = {} +} + + +case class InfluxSenderConfig(url: String, + database: String, + `type`: String, + host: String, + environment: String, + prefix: Option[String], + retentionPolicy: Option[String], + username: Option[String], + password: Option[String], + reporterPolling: Duration) { + + + private val params = ("db" -> database) :: username.map("u" -> _).toList ::: password.map("p" -> _).toList ::: retentionPolicy.map("rp" -> _).toList + + def req: RequestT[Identity, Either[String, String], Nothing] = basicRequest.post(uri"$url?$params") + .contentType("application/json", StandardCharsets.UTF_8.name()) + +} diff --git a/engine/standalone/util/src/main/scala/pl/touk/nussknacker/engine/standalone/utils/metrics/dropwizard/influxdb/StandaloneInfluxDbReporter.scala b/engine/standalone/util/src/main/scala/pl/touk/nussknacker/engine/standalone/utils/metrics/dropwizard/influxdb/StandaloneInfluxDbReporter.scala new file mode 100644 index 00000000000..82ad17b3795 --- /dev/null +++ b/engine/standalone/util/src/main/scala/pl/touk/nussknacker/engine/standalone/utils/metrics/dropwizard/influxdb/StandaloneInfluxDbReporter.scala @@ -0,0 +1,28 @@ +package pl.touk.nussknacker.engine.standalone.utils.metrics.dropwizard.influxdb + +import com.typesafe.config.Config +import com.typesafe.scalalogging.LazyLogging +import io.dropwizard.metrics5.MetricRegistry +import io.dropwizard.metrics5.influxdb.InfluxDbReporter + +import java.util.concurrent.TimeUnit + +object StandaloneInfluxDbReporter extends LazyLogging { + + import net.ceedubs.ficus.Ficus._ + import net.ceedubs.ficus.readers.ArbitraryTypeReader._ + + def createAndRunReporterIfConfigured(metricRegistry: MetricRegistry, config: Config): Option[InfluxDbReporter] = { + config.getAs[InfluxSenderConfig]("metrics.influx").map { influxSenderConfig => + logger.info("Found Influxdb metrics reporter config, starting reporter") + val reporter = InfluxDbHttpReporter.build(metricRegistry, influxSenderConfig) + reporter.start(influxSenderConfig.reporterPolling.toSeconds, TimeUnit.SECONDS) + reporter + } orElse { + logger.info("Influxdb metrics reporter config not found") + None + } + } + +} +