Skip to content

Commit

Permalink
Add health probe endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Oct 19, 2023
1 parent 967b54a commit 08665ae
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 15 deletions.
4 changes: 4 additions & 0 deletions modules/core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
"tags": {
}
}
"healthProbe": {
"port": "8000"
"unhealthyLatency": "5 minutes"
}
}

"telemetry": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import io.circe.generic.extras.semiauto._
import io.circe.generic.extras.Configuration
import io.circe.config.syntax._
import net.snowflake.ingest.utils.SnowflakeURL
import com.comcast.ip4s.Port

import scala.concurrent.duration.FiniteDuration
import scala.util.Try
Expand Down Expand Up @@ -82,9 +83,12 @@ object Config {

type Sentry = SentryM[Id]

case class HealthProbe(port: Port, unhealthyLatency: FiniteDuration)

case class Monitoring(
metrics: Metrics,
sentry: Option[Sentry]
sentry: Option[Sentry],
healthProbe: HealthProbe
)

implicit def decoder[Source: Decoder, Sink: Decoder]: Decoder[Config[Source, Sink]] = {
Expand All @@ -104,8 +108,12 @@ object Config {
case SentryM(None, _) =>
None
}
implicit val metricsDecoder = deriveConfiguredDecoder[Metrics]
implicit val monitoringDecoder = deriveConfiguredDecoder[Monitoring]
implicit val metricsDecoder = deriveConfiguredDecoder[Metrics]
implicit val portDecoder = Decoder.decodeInt.emap { port =>
Port.fromInt(port).toRight("Invalid port")
}
implicit val healthProbeDecoder = deriveConfiguredDecoder[HealthProbe]
implicit val monitoringDecoder = deriveConfiguredDecoder[Monitoring]
deriveConfiguredDecoder[Config[Source, Sink]]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package com.snowplowanalytics.snowplow.snowflake

import cats.implicits._
import cats.Functor
import cats.effect.{Async, Resource, Sync}
import cats.effect.unsafe.implicits.global
import org.http4s.client.Client
Expand All @@ -17,7 +18,7 @@ import io.sentry.Sentry
import com.snowplowanalytics.snowplow.sources.SourceAndAck
import com.snowplowanalytics.snowplow.sinks.Sink
import com.snowplowanalytics.snowplow.snowflake.processing.{ChannelProvider, TableManager}
import com.snowplowanalytics.snowplow.loaders.runtime.AppInfo
import com.snowplowanalytics.snowplow.loaders.runtime.{AppInfo, HealthProbe}

case class Environment[F[_]](
appInfo: AppInfo,
Expand All @@ -35,21 +36,23 @@ object Environment {
def fromConfig[F[_]: Async, SourceConfig, SinkConfig](
config: Config[SourceConfig, SinkConfig],
appInfo: AppInfo,
source: SourceConfig => SourceAndAck[F],
sink: SinkConfig => Resource[F, Sink[F]]
toSource: SourceConfig => F[SourceAndAck[F]],
toSink: SinkConfig => Resource[F, Sink[F]]
): Resource[F, Environment[F]] =
for {
_ <- enableSentry[F](appInfo, config.monitoring.sentry)
httpClient <- BlazeClientBuilder[F].withExecutionContext(global.compute).resource
badSink <- sink(config.output.bad)
badSink <- toSink(config.output.bad)
metrics <- Resource.eval(Metrics.build(config.monitoring.metrics))
xa <- Resource.eval(SQLUtils.transactor[F](config.output.good))
_ <- Resource.eval(SQLUtils.createTable(config.output.good, xa))
tblManager = TableManager.fromTransactor(config.output.good, xa)
channelProvider <- ChannelProvider.make(config.output.good, config.batching)
sourceAndAck <- Resource.eval(toSource(config.input))
_ <- HealthProbe.resource(config.monitoring.healthProbe.port, isHealthy(config.monitoring.healthProbe, sourceAndAck))
} yield Environment(
appInfo = appInfo,
source = source(config.input),
source = sourceAndAck,
badSink = badSink,
httpClient = httpClient,
tblManager = tblManager,
Expand Down Expand Up @@ -79,4 +82,12 @@ object Environment {
Resource.unit[F]
}

private def isHealthy[F[_]: Functor](config: Config.HealthProbe, source: SourceAndAck[F]): F[HealthProbe.Status] =
source.processingLatency.map { latency =>
if (latency > config.unhealthyLatency)
HealthProbe.Unhealthy(show"Processing latency is $latency")
else
HealthProbe.Healthy
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ abstract class LoaderApp[SourceConfig: Decoder, SinkConfig: Decoder](
super.runtimeConfig.copy(cpuStarvationCheckInterval = 10.seconds)

type SinkProvider = SinkConfig => Resource[IO, Sink[IO]]
type SourceProvider = SourceConfig => SourceAndAck[IO]
type SourceProvider = SourceConfig => IO[SourceAndAck[IO]]

def source: SourceProvider
def badSink: SinkProvider
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ object Run {

def fromCli[F[_]: Async, SourceConfig: Decoder, SinkConfig: Decoder](
appInfo: AppInfo,
toSource: SourceConfig => SourceAndAck[F],
toSource: SourceConfig => F[SourceAndAck[F]],
toBadSink: SinkConfig => Resource[F, Sink[F]]
): Opts[F[ExitCode]] = {
val configPathOpt = Opts.option[Path]("config", help = "path to config file")
Expand All @@ -37,7 +37,7 @@ object Run {

private def fromConfigPaths[F[_]: Async, SourceConfig: Decoder, SinkConfig: Decoder](
appInfo: AppInfo,
toSource: SourceConfig => SourceAndAck[F],
toSource: SourceConfig => F[SourceAndAck[F]],
toBadSink: SinkConfig => Resource[F, Sink[F]],
pathToConfig: Path
): F[ExitCode] = {
Expand All @@ -55,7 +55,7 @@ object Run {

private def fromConfig[F[_]: Async, SourceConfig, SinkConfig](
appInfo: AppInfo,
toSource: SourceConfig => SourceAndAck[F],
toSource: SourceConfig => F[SourceAndAck[F]],
toBadSink: SinkConfig => Resource[F, Sink[F]],
config: Config[SourceConfig, SinkConfig]
): F[ExitCode] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import com.snowplowanalytics.snowplow.sinks.Sink
import com.snowplowanalytics.snowplow.snowflake.processing.{ChannelProvider, TableManager}
import com.snowplowanalytics.snowplow.loaders.runtime.AppInfo

import scala.concurrent.duration.DurationInt
import scala.concurrent.duration.{Duration, DurationInt, FiniteDuration}

case class MockEnvironment(state: Ref[IO, Vector[MockEnvironment.Action]], environment: Environment[IO])

Expand Down Expand Up @@ -91,6 +91,8 @@ object MockEnvironment {
state.update(_ :+ Checkpointed(chunk.toList))
}
.drain

def processingLatency: IO[FiniteDuration] = IO.pure(Duration.Zero)
}

private def testSink(ref: Ref[IO, Vector[Action]]): Sink[IO] = Sink[IO] { batch =>
Expand Down
3 changes: 1 addition & 2 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ object Dependencies {
val awsSdk2 = "2.20.135"

// Snowplow
val streams = "0.1.0-M4"
val streams = "0.1.0-M5"

// tests
val specs2 = "4.20.0"
Expand All @@ -50,7 +50,6 @@ object Dependencies {
val jaxb = "javax.xml.bind" % "jaxb-api" % V.jaxb
val stsSdk2 = "software.amazon.awssdk" % "sts" % V.awsSdk2

// snowplow: Note jackson-databind 2.14.x is incompatible with Spark
val streamsCore = "com.snowplowanalytics" %% "streams-core" % V.streams
val kinesis = "com.snowplowanalytics" %% "kinesis" % V.streams
val kafka = "com.snowplowanalytics" %% "kafka" % V.streams
Expand Down

0 comments on commit 08665ae

Please sign in to comment.