diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/EvaluatorImpl.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/EvaluatorImpl.scala index ca7f90eef..76f11da3f 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/EvaluatorImpl.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/EvaluatorImpl.scala @@ -311,7 +311,7 @@ private[stream] abstract class EvaluatorImpl( // Extract data expressions to reuse for creating time groups val exprs = sources.sources.asScala - .flatMap(ds => interpreter.eval(Uri(ds.uri))) + .flatMap(ds => interpreter.eval(Uri(ds.uri)).exprs) .flatMap(_.expr.dataExprs) .toList .distinct @@ -506,7 +506,7 @@ private[stream] abstract class EvaluatorImpl( private def toExprSet(dss: DataSources, interpreter: ExprInterpreter): Set[LwcExpression] = { dss.sources.asScala.flatMap { dataSource => - interpreter.eval(Uri(dataSource.uri)).map { expr => + interpreter.eval(Uri(dataSource.uri)).exprs.map { expr => LwcExpression(expr.toString, ExprType.TIME_SERIES, dataSource.step.toMillis) } }.toSet diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/ExprInterpreter.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/ExprInterpreter.scala index 46bac73e7..25e1bbc3f 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/ExprInterpreter.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/ExprInterpreter.scala @@ -24,16 +24,22 @@ import com.netflix.atlas.core.model.ModelExtractors import com.netflix.atlas.core.model.StatefulExpr import com.netflix.atlas.core.model.StyleExpr import com.netflix.atlas.core.stacklang.Interpreter +import com.netflix.atlas.eval.graph.GraphConfig +import com.netflix.atlas.eval.graph.Grapher import com.netflix.atlas.eval.graph.SimpleLegends import com.netflix.atlas.eval.stream.Evaluator.DataSource import com.netflix.atlas.eval.stream.Evaluator.DataSources import com.netflix.atlas.eval.util.HostRewriter import com.typesafe.config.Config +import scala.util.Success + private[stream] class ExprInterpreter(config: Config) { private val interpreter = Interpreter(new CustomVocabulary(config).allWords) + private val grapher = Grapher(config) + private val hostRewriter = new HostRewriter(config.getConfig("atlas.eval.host-rewrite")) // Use simple legends for expressions @@ -48,15 +54,13 @@ private[stream] class ExprInterpreter(config: Config) { if (simpleLegendsEnabled) SimpleLegends.generate(exprs) else exprs } - def eval(uri: Uri): List[StyleExpr] = { - val expr = uri.query().get("q").getOrElse { - throw new IllegalArgumentException(s"missing required URI parameter `q`: $uri") - } + def eval(uri: Uri): GraphConfig = { + val graphCfg = grapher.toGraphConfig(uri) // Check that data expressions are supported. The streaming path doesn't support // time shifts, filters, and integral. The filters and integral are excluded because // they can be confusing as the time window for evaluation is not bounded. - val results = eval(expr).flatMap(_.perOffset) + val results = graphCfg.exprs.flatMap(_.perOffset) results.foreach { result => // Use rewrite as a helper for searching the expression for invalid operations result.expr.rewrite { @@ -72,7 +76,8 @@ private[stream] class ExprInterpreter(config: Config) { // Perform host rewrites based on the Atlas hostname val host = uri.authority.host.toString() - hostRewriter.rewrite(host, results) + val rewritten = hostRewriter.rewrite(host, results) + graphCfg.copy(query = rewritten.mkString(","), parsedQuery = Success(rewritten)) } private def invalidOperator(expr: Expr): Unit = { @@ -90,7 +95,7 @@ private[stream] class ExprInterpreter(config: Config) { import scala.jdk.CollectionConverters.* ds.sources.asScala.toList .flatMap { s => - val exprs = eval(Uri(s.uri)).flatMap(_.expr.dataExprs).distinct + val exprs = eval(Uri(s.uri)).exprs.flatMap(_.expr.dataExprs).distinct exprs.map(_ -> s) } .groupBy(_._1) diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/FinalExprEval.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/FinalExprEval.scala index aa7780211..2797c5e22 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/FinalExprEval.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/FinalExprEval.scala @@ -52,6 +52,8 @@ private[stream] class FinalExprEval(exprInterpreter: ExprInterpreter) extends GraphStage[FlowShape[AnyRef, Source[MessageEnvelope, NotUsed]]] with StrictLogging { + import FinalExprEval.* + private val in = Inlet[AnyRef]("FinalExprEval.in") private val out = Outlet[Source[MessageEnvelope, NotUsed]]("FinalExprEval.out") @@ -70,7 +72,7 @@ private[stream] class FinalExprEval(exprInterpreter: ExprInterpreter) // Each expression matched with a list of data source ids that should receive // the data for it - private var recipients = List.empty[(StyleExpr, List[String])] + private var recipients = List.empty[(StyleExpr, List[ExprInfo])] // Track the set of DataExprs per DataSource private var dataSourceIdToDataExprs = Map.empty[String, Set[DataExpr]] @@ -100,11 +102,21 @@ private[stream] class FinalExprEval(exprInterpreter: ExprInterpreter) recipients = sources .flatMap { s => try { - val exprs = exprInterpreter.eval(Uri(s.uri)) + val graphCfg = exprInterpreter.eval(Uri(s.uri)) + val exprs = graphCfg.exprs // Reuse the previous evaluated expression if available. States for the stateful // expressions are maintained in an IdentityHashMap so if the instances change // the state will be reset. - exprs.map(e => previous.getOrElse(e, e) -> s.id) + exprs.map { e => + val paletteName = + if (graphCfg.flags.presentationMetadataEnabled) { + val axis = e.axis.getOrElse(0) + Some(graphCfg.flags.axisPalette(graphCfg.settings, axis)) + } else { + None + } + previous.getOrElse(e, e) -> ExprInfo(s.id, paletteName) + } } catch { case e: Exception => errors += new MessageEnvelope(s.id, error(s.uri, "invalid expression", e)) @@ -121,9 +133,9 @@ private[stream] class FinalExprEval(exprInterpreter: ExprInterpreter) ) // Fold to mutable map to avoid creating new Map on every update .foldLeft(mutable.Map.empty[String, Set[DataExpr]]) { - case (map, (id, dataExprs)) => - map += map.get(id).fold(id -> dataExprs) { vs => - id -> (dataExprs ++ vs) + case (map, (info, dataExprs)) => + map += map.get(info.id).fold(info.id -> dataExprs) { vs => + info.id -> (dataExprs ++ vs) } } .toMap @@ -185,7 +197,8 @@ private[stream] class FinalExprEval(exprInterpreter: ExprInterpreter) // Generate the time series and diagnostic output val output = recipients.flatMap { - case (styleExpr, ids) => + case (styleExpr, infos) => + val ids = infos.map(_.id) // Use an identity map for the state to ensure that multiple equivalent stateful // expressions, e.g. derivative(a) + derivative(a), will have isolated state. val state = states.getOrElse(styleExpr, IdentityMap.empty[StatefulExpr, Any]) @@ -194,16 +207,20 @@ private[stream] class FinalExprEval(exprInterpreter: ExprInterpreter) val result = styleExpr.expr.eval(context, dataExprToDatapoints) states(styleExpr) = result.state val data = if (result.data.isEmpty) List(noData(styleExpr)) else result.data - val msgs = data.map { t => - TimeSeriesMessage(styleExpr, context, t.withLabel(styleExpr.legend(t))) - } // Collect final data size per DataSource ids.foreach(rateCollector.incrementOutput(_, data.size)) - ids.flatMap { id => - msgs.map { msg => - new MessageEnvelope(id, msg) + // Create time series messages + infos.flatMap { info => + data.map { t => + val ts = TimeSeriesMessage( + styleExpr, + context, + t.withLabel(styleExpr.legend(t)), + info.palette + ) + new MessageEnvelope(info.id, ts) } } } catch { @@ -242,3 +259,8 @@ private[stream] class FinalExprEval(exprInterpreter: ExprInterpreter) } } } + +object FinalExprEval { + + case class ExprInfo(id: String, palette: Option[String]) +} diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/StreamContext.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/StreamContext.scala index fcaf27847..c7a0348ee 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/StreamContext.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/StreamContext.scala @@ -152,7 +152,7 @@ private[stream] class StreamContext( // Check that expression is parseable and perform basic static analysis of DataExprs to // weed out expensive queries up front - val results = interpreter.eval(uri) + val results = interpreter.eval(uri).exprs results.foreach(_.expr.dataExprs.foreach(validateDataExpr)) // Check that there is a backend available for it diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/SyntheticDataSource.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/SyntheticDataSource.scala index dc672b35b..38db1aae3 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/SyntheticDataSource.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/SyntheticDataSource.scala @@ -57,7 +57,7 @@ object SyntheticDataSource { def apply(interpreter: ExprInterpreter, uri: Uri): Source[ByteString, Future[IOResult]] = { val settings = getSettings(uri) - val exprs = interpreter.eval(uri) + val exprs = interpreter.eval(uri).exprs val promise = Promise[IOResult]() Source(exprs) .flatMapMerge(Int.MaxValue, expr => source(settings, expr)) diff --git a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/EvaluatorSuite.scala b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/EvaluatorSuite.scala index 128ce42a0..817a84d0e 100644 --- a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/EvaluatorSuite.scala +++ b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/EvaluatorSuite.scala @@ -151,7 +151,7 @@ class EvaluatorSuite extends FunSuite { assertEquals(t, "error") assertEquals( msg, - "IllegalArgumentException: missing required URI parameter `q`: resource:///gc-pause.dat/api/v1/graph" + "IllegalArgumentException: missing required parameter 'q'" ) case v => throw new MatchError(v) @@ -313,7 +313,7 @@ class EvaluatorSuite extends FunSuite { val uri = "http://test/api/v1/graph" val ds1 = Evaluator.DataSources.of(ds("one", uri)) val msg = - "IllegalArgumentException: missing required URI parameter `q`: http://test/api/v1/graph" + "IllegalArgumentException: missing required parameter 'q'" testError(ds1, msg) } diff --git a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/FinalExprEvalSuite.scala b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/FinalExprEvalSuite.scala index 219fbf839..9879d76f5 100644 --- a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/FinalExprEvalSuite.scala +++ b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/FinalExprEvalSuite.scala @@ -15,6 +15,7 @@ */ package com.netflix.atlas.eval.stream +import com.netflix.atlas.chart.model.LineStyle import org.apache.pekko.actor.ActorSystem import org.apache.pekko.stream.scaladsl.Sink import org.apache.pekko.stream.scaladsl.Source @@ -36,6 +37,7 @@ import com.netflix.spectator.api.DefaultRegistry import com.typesafe.config.ConfigFactory import munit.FunSuite +import java.awt.Color import scala.concurrent.Await import scala.concurrent.duration.Duration @@ -735,4 +737,21 @@ class FinalExprEvalSuite extends FunSuite { assertEquals(v, 0.0) } } + + test("presentation metadata, explicit color") { + val expr = DataExpr.Sum(Query.Equal("name", "rps")) + val tags = Map("name" -> "rps") + val input = List( + sources(ds("a", s"http://atlas/graph?q=$expr,f00,:color,:stack&hints=presentation-metadata")), + group(1, AggrDatapoint(0, step, expr, "i-1", tags, 42.0)) + ) + + val output = run(input) + + val timeseries = output.filter(isTimeSeries) + assertEquals(timeseries.size, 1) + val ts = timeseries.head.message().asInstanceOf[TimeSeriesMessage] + assertEquals(ts.styleMetadata.get.color, Color.RED) + assertEquals(ts.styleMetadata.get.lineStyle, LineStyle.STACK) + } } diff --git a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/TestContext.scala b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/TestContext.scala index d821715d1..267d1966d 100644 --- a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/TestContext.scala +++ b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/TestContext.scala @@ -65,6 +65,32 @@ object TestContext { | simple-legends-enabled = false |} | + |atlas.eval.graph { + | step = 60s + | start-time = e-3h + | end-time = now + | timezone = US/Pacific + | width = 700 + | height = 300 + | theme = "light" + | + | light { + | palette { + | primary = "armytage" + | offset = "bw" + | } + | named-colors = { + | } + | } + | + | max-datapoints = 1440 + | png-metadata-enabled = false + | browser-agent-pattern = "mozilla|msie|gecko|chrome|opera|webkit" + | simple-legends-enabled = false + | engines = [] + | vocabulary = "default" + |} + | |atlas.eval.host-rewrite { | pattern = "$^" | key = ""