Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eval: support presentation metadata for stream #1618

Merged
merged 1 commit into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ private[stream] abstract class EvaluatorImpl(

// Extract data expressions to reuse for creating time groups
val exprs = sources.sources.asScala
.flatMap(ds => interpreter.eval(Uri(ds.uri)))
.flatMap(ds => interpreter.eval(Uri(ds.uri)).exprs)
.flatMap(_.expr.dataExprs)
.toList
.distinct
Expand Down Expand Up @@ -506,7 +506,7 @@ private[stream] abstract class EvaluatorImpl(

private def toExprSet(dss: DataSources, interpreter: ExprInterpreter): Set[LwcExpression] = {
dss.sources.asScala.flatMap { dataSource =>
interpreter.eval(Uri(dataSource.uri)).map { expr =>
interpreter.eval(Uri(dataSource.uri)).exprs.map { expr =>
LwcExpression(expr.toString, ExprType.TIME_SERIES, dataSource.step.toMillis)
}
}.toSet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,22 @@ import com.netflix.atlas.core.model.ModelExtractors
import com.netflix.atlas.core.model.StatefulExpr
import com.netflix.atlas.core.model.StyleExpr
import com.netflix.atlas.core.stacklang.Interpreter
import com.netflix.atlas.eval.graph.GraphConfig
import com.netflix.atlas.eval.graph.Grapher
import com.netflix.atlas.eval.graph.SimpleLegends
import com.netflix.atlas.eval.stream.Evaluator.DataSource
import com.netflix.atlas.eval.stream.Evaluator.DataSources
import com.netflix.atlas.eval.util.HostRewriter
import com.typesafe.config.Config

import scala.util.Success

private[stream] class ExprInterpreter(config: Config) {

private val interpreter = Interpreter(new CustomVocabulary(config).allWords)

private val grapher = Grapher(config)

private val hostRewriter = new HostRewriter(config.getConfig("atlas.eval.host-rewrite"))

// Use simple legends for expressions
Expand All @@ -48,15 +54,13 @@ private[stream] class ExprInterpreter(config: Config) {
if (simpleLegendsEnabled) SimpleLegends.generate(exprs) else exprs
}

def eval(uri: Uri): List[StyleExpr] = {
val expr = uri.query().get("q").getOrElse {
throw new IllegalArgumentException(s"missing required URI parameter `q`: $uri")
}
def eval(uri: Uri): GraphConfig = {
val graphCfg = grapher.toGraphConfig(uri)

// Check that data expressions are supported. The streaming path doesn't support
// time shifts, filters, and integral. The filters and integral are excluded because
// they can be confusing as the time window for evaluation is not bounded.
val results = eval(expr).flatMap(_.perOffset)
val results = graphCfg.exprs.flatMap(_.perOffset)
results.foreach { result =>
// Use rewrite as a helper for searching the expression for invalid operations
result.expr.rewrite {
Expand All @@ -72,7 +76,8 @@ private[stream] class ExprInterpreter(config: Config) {

// Perform host rewrites based on the Atlas hostname
val host = uri.authority.host.toString()
hostRewriter.rewrite(host, results)
val rewritten = hostRewriter.rewrite(host, results)
graphCfg.copy(query = rewritten.mkString(","), parsedQuery = Success(rewritten))
}

private def invalidOperator(expr: Expr): Unit = {
Expand All @@ -90,7 +95,7 @@ private[stream] class ExprInterpreter(config: Config) {
import scala.jdk.CollectionConverters.*
ds.sources.asScala.toList
.flatMap { s =>
val exprs = eval(Uri(s.uri)).flatMap(_.expr.dataExprs).distinct
val exprs = eval(Uri(s.uri)).exprs.flatMap(_.expr.dataExprs).distinct
exprs.map(_ -> s)
}
.groupBy(_._1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ private[stream] class FinalExprEval(exprInterpreter: ExprInterpreter)
extends GraphStage[FlowShape[AnyRef, Source[MessageEnvelope, NotUsed]]]
with StrictLogging {

import FinalExprEval.*

private val in = Inlet[AnyRef]("FinalExprEval.in")
private val out = Outlet[Source[MessageEnvelope, NotUsed]]("FinalExprEval.out")

Expand All @@ -70,7 +72,7 @@ private[stream] class FinalExprEval(exprInterpreter: ExprInterpreter)

// Each expression matched with a list of data source ids that should receive
// the data for it
private var recipients = List.empty[(StyleExpr, List[String])]
private var recipients = List.empty[(StyleExpr, List[ExprInfo])]

// Track the set of DataExprs per DataSource
private var dataSourceIdToDataExprs = Map.empty[String, Set[DataExpr]]
Expand Down Expand Up @@ -100,11 +102,21 @@ private[stream] class FinalExprEval(exprInterpreter: ExprInterpreter)
recipients = sources
.flatMap { s =>
try {
val exprs = exprInterpreter.eval(Uri(s.uri))
val graphCfg = exprInterpreter.eval(Uri(s.uri))
val exprs = graphCfg.exprs
// Reuse the previous evaluated expression if available. States for the stateful
// expressions are maintained in an IdentityHashMap so if the instances change
// the state will be reset.
exprs.map(e => previous.getOrElse(e, e) -> s.id)
exprs.map { e =>
val paletteName =
if (graphCfg.flags.presentationMetadataEnabled) {
val axis = e.axis.getOrElse(0)
Some(graphCfg.flags.axisPalette(graphCfg.settings, axis))
} else {
None
}
previous.getOrElse(e, e) -> ExprInfo(s.id, paletteName)
}
} catch {
case e: Exception =>
errors += new MessageEnvelope(s.id, error(s.uri, "invalid expression", e))
Expand All @@ -121,9 +133,9 @@ private[stream] class FinalExprEval(exprInterpreter: ExprInterpreter)
)
// Fold to mutable map to avoid creating new Map on every update
.foldLeft(mutable.Map.empty[String, Set[DataExpr]]) {
case (map, (id, dataExprs)) =>
map += map.get(id).fold(id -> dataExprs) { vs =>
id -> (dataExprs ++ vs)
case (map, (info, dataExprs)) =>
map += map.get(info.id).fold(info.id -> dataExprs) { vs =>
info.id -> (dataExprs ++ vs)
}
}
.toMap
Expand Down Expand Up @@ -185,7 +197,8 @@ private[stream] class FinalExprEval(exprInterpreter: ExprInterpreter)

// Generate the time series and diagnostic output
val output = recipients.flatMap {
case (styleExpr, ids) =>
case (styleExpr, infos) =>
val ids = infos.map(_.id)
// Use an identity map for the state to ensure that multiple equivalent stateful
// expressions, e.g. derivative(a) + derivative(a), will have isolated state.
val state = states.getOrElse(styleExpr, IdentityMap.empty[StatefulExpr, Any])
Expand All @@ -194,16 +207,20 @@ private[stream] class FinalExprEval(exprInterpreter: ExprInterpreter)
val result = styleExpr.expr.eval(context, dataExprToDatapoints)
states(styleExpr) = result.state
val data = if (result.data.isEmpty) List(noData(styleExpr)) else result.data
val msgs = data.map { t =>
TimeSeriesMessage(styleExpr, context, t.withLabel(styleExpr.legend(t)))
}

// Collect final data size per DataSource
ids.foreach(rateCollector.incrementOutput(_, data.size))

ids.flatMap { id =>
msgs.map { msg =>
new MessageEnvelope(id, msg)
// Create time series messages
infos.flatMap { info =>
data.map { t =>
val ts = TimeSeriesMessage(
styleExpr,
context,
t.withLabel(styleExpr.legend(t)),
info.palette
)
new MessageEnvelope(info.id, ts)
}
}
} catch {
Expand Down Expand Up @@ -242,3 +259,8 @@ private[stream] class FinalExprEval(exprInterpreter: ExprInterpreter)
}
}
}

object FinalExprEval {

case class ExprInfo(id: String, palette: Option[String])
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ private[stream] class StreamContext(

// Check that expression is parseable and perform basic static analysis of DataExprs to
// weed out expensive queries up front
val results = interpreter.eval(uri)
val results = interpreter.eval(uri).exprs
results.foreach(_.expr.dataExprs.foreach(validateDataExpr))

// Check that there is a backend available for it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ object SyntheticDataSource {

def apply(interpreter: ExprInterpreter, uri: Uri): Source[ByteString, Future[IOResult]] = {
val settings = getSettings(uri)
val exprs = interpreter.eval(uri)
val exprs = interpreter.eval(uri).exprs
val promise = Promise[IOResult]()
Source(exprs)
.flatMapMerge(Int.MaxValue, expr => source(settings, expr))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ class EvaluatorSuite extends FunSuite {
assertEquals(t, "error")
assertEquals(
msg,
"IllegalArgumentException: missing required URI parameter `q`: resource:///gc-pause.dat/api/v1/graph"
"IllegalArgumentException: missing required parameter 'q'"
)
case v =>
throw new MatchError(v)
Expand Down Expand Up @@ -313,7 +313,7 @@ class EvaluatorSuite extends FunSuite {
val uri = "http://test/api/v1/graph"
val ds1 = Evaluator.DataSources.of(ds("one", uri))
val msg =
"IllegalArgumentException: missing required URI parameter `q`: http://test/api/v1/graph"
"IllegalArgumentException: missing required parameter 'q'"
testError(ds1, msg)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.netflix.atlas.eval.stream

import com.netflix.atlas.chart.model.LineStyle
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.scaladsl.Sink
import org.apache.pekko.stream.scaladsl.Source
Expand All @@ -36,6 +37,7 @@ import com.netflix.spectator.api.DefaultRegistry
import com.typesafe.config.ConfigFactory
import munit.FunSuite

import java.awt.Color
import scala.concurrent.Await
import scala.concurrent.duration.Duration

Expand Down Expand Up @@ -735,4 +737,21 @@ class FinalExprEvalSuite extends FunSuite {
assertEquals(v, 0.0)
}
}

test("presentation metadata, explicit color") {
val expr = DataExpr.Sum(Query.Equal("name", "rps"))
val tags = Map("name" -> "rps")
val input = List(
sources(ds("a", s"http://atlas/graph?q=$expr,f00,:color,:stack&hints=presentation-metadata")),
group(1, AggrDatapoint(0, step, expr, "i-1", tags, 42.0))
)

val output = run(input)

val timeseries = output.filter(isTimeSeries)
assertEquals(timeseries.size, 1)
val ts = timeseries.head.message().asInstanceOf[TimeSeriesMessage]
assertEquals(ts.styleMetadata.get.color, Color.RED)
assertEquals(ts.styleMetadata.get.lineStyle, LineStyle.STACK)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,32 @@ object TestContext {
| simple-legends-enabled = false
|}
|
|atlas.eval.graph {
| step = 60s
| start-time = e-3h
| end-time = now
| timezone = US/Pacific
| width = 700
| height = 300
| theme = "light"
|
| light {
| palette {
| primary = "armytage"
| offset = "bw"
| }
| named-colors = {
| }
| }
|
| max-datapoints = 1440
| png-metadata-enabled = false
| browser-agent-pattern = "mozilla|msie|gecko|chrome|opera|webkit"
| simple-legends-enabled = false
| engines = []
| vocabulary = "default"
|}
|
|atlas.eval.host-rewrite {
| pattern = "$^"
| key = ""
Expand Down
Loading