Skip to content

Commit

Permalink
Default InfluxDb reporter for standalone-app metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
arkadius committed Apr 20, 2021
1 parent b6d3032 commit e785850
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 5 deletions.
4 changes: 3 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -782,8 +782,10 @@ lazy val standaloneUtil = (project in engine("standalone/util")).
name := "nussknacker-standalone-util",
libraryDependencies ++= {
Seq(

"io.dropwizard.metrics5" % "metrics-core" % dropWizardV,
"io.dropwizard.metrics5" % "metrics-influxdb" % dropWizardV,
"com.softwaremill.sttp.client" %% "core" % sttpV,
"com.softwaremill.sttp.client" %% "async-http-client-backend-future" % sttpV,
//akka-http is only for StandaloneRequestResponseLogger
"com.typesafe.akka" %% "akka-http" % akkaHttpV % "provided",
"com.typesafe.akka" %% "akka-stream" % akkaV % "provided"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import io.dropwizard.metrics5.MetricRegistry
import pl.touk.nussknacker.engine.standalone.deployment.DeploymentService
import pl.touk.nussknacker.engine.standalone.utils.StandaloneContextPreparer
import pl.touk.nussknacker.engine.standalone.utils.logging.StandaloneRequestResponseLogger
import pl.touk.nussknacker.engine.standalone.utils.metrics.dropwizard.influxdb.StandaloneInfluxDbReporter
import pl.touk.nussknacker.engine.standalone.utils.metrics.dropwizard.{DropwizardMetricsProvider, StandaloneMetricsReporter}
import pl.touk.nussknacker.engine.util.loader.ScalaServiceLoader

Expand Down Expand Up @@ -46,17 +47,19 @@ object StandaloneHttpApp extends Directives with FailFastCirceSupport with LazyL
port = processesPort
)



}

object StandaloneMetrics extends LazyLogging {

def prepareRegistry(config: Config): MetricRegistry = {
val metricRegistry = new MetricRegistry
val metricReporters = loadMetricsReporters()
metricReporters.foreach { reporter =>
reporter.createAndRunReporter(metricRegistry, config)
if (metricReporters.nonEmpty) {
metricReporters.foreach { reporter =>
reporter.createAndRunReporter(metricRegistry, config)
}
} else {
StandaloneInfluxDbReporter.createAndRunReporterIfConfigured(metricRegistry, config)
}
metricRegistry
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package pl.touk.nussknacker.engine.standalone.utils.metrics.dropwizard.influxdb

import com.typesafe.scalalogging.LazyLogging
import io.dropwizard.metrics5.influxdb._
import io.dropwizard.metrics5.{MetricName, MetricRegistry}
import sttp.client._

import java.lang
import java.nio.charset.StandardCharsets
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration.Duration
import scala.util.{Failure, Success, Try}

object InfluxDbHttpReporter {

def build(metricRegistry: MetricRegistry, conf: InfluxSenderConfig): InfluxDbReporter =
InfluxDbReporter.forRegistry(metricRegistry)
.prefixedWith(
conf.prefix.map(MetricName.build(_)).getOrElse(MetricName.empty())
.tagged("host", conf.host)
.tagged("env", conf.environment)
.tagged("type", conf.`type`)
).build(new InfluxDbHttpSender(conf))
}

class InfluxDbHttpSender(conf: InfluxSenderConfig) extends InfluxDbSender with LazyLogging {
private implicit val backend: SttpBackend[Try, Nothing, NothingT] = TryHttpURLConnectionBackend()

private val buffer = ArrayBuffer[String]()

logger.info(s"InfluxSender created with url: ${conf.req.uri}")

override def connect(): Unit = {}

override def send(measurement: lang.StringBuilder): Unit = buffer.append(measurement.toString)

override def flush(): Unit = {
val data = buffer.mkString
logger.debug(s"Sending ${buffer.size} metrics for conf $conf")
buffer.clear()
val answer = conf.req.body(data).send()

answer match {
case Success(res) if res.code.isSuccess => // nothing
case Success(res) => logger.warn(s"Failed to send data to influx: ${res.code.code}, ${res.body}")
case Failure(ex) => logger.warn(s"Failed to send data to influx: ${ex.getMessage}", ex)
}
}

override def disconnect(): Unit = {}

override def isConnected: Boolean = true

override def close(): Unit = {}
}


case class InfluxSenderConfig(url: String,
database: String,
`type`: String,
host: String,
environment: String,
prefix: Option[String],
retentionPolicy: Option[String],
username: Option[String],
password: Option[String],
reporterPolling: Duration) {


private val params = ("db" -> database) :: username.map("u" -> _).toList ::: password.map("p" -> _).toList ::: retentionPolicy.map("rp" -> _).toList

def req: RequestT[Identity, Either[String, String], Nothing] = basicRequest.post(uri"$url?$params")
.contentType("application/json", StandardCharsets.UTF_8.name())

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package pl.touk.nussknacker.engine.standalone.utils.metrics.dropwizard.influxdb

import com.typesafe.config.Config
import com.typesafe.scalalogging.LazyLogging
import io.dropwizard.metrics5.MetricRegistry
import io.dropwizard.metrics5.influxdb.InfluxDbReporter

import java.util.concurrent.TimeUnit

object StandaloneInfluxDbReporter extends LazyLogging {

import net.ceedubs.ficus.Ficus._
import net.ceedubs.ficus.readers.ArbitraryTypeReader._

def createAndRunReporterIfConfigured(metricRegistry: MetricRegistry, config: Config): Option[InfluxDbReporter] = {
config.getAs[InfluxSenderConfig]("metrics.influx").map { influxSenderConfig =>
logger.info("Found Influxdb metrics reporter config, starting reporter")
val reporter = InfluxDbHttpReporter.build(metricRegistry, influxSenderConfig)
reporter.start(influxSenderConfig.reporterPolling.toSeconds, TimeUnit.SECONDS)
reporter
} orElse {
logger.info("Influxdb metrics reporter config not found")
None
}
}

}

0 comments on commit e785850

Please sign in to comment.