From ef400a3b8b615f1eefe0b7ea5ff89908aea2b39a Mon Sep 17 00:00:00 2001 From: Brian Harrington Date: Thu, 28 Mar 2024 20:47:03 -0500 Subject: [PATCH 1/2] eval: preseve presentation meta for trace graph Map back to normal graph URI and generate the config to support consistent presentation metadata behavior. --- .../atlas/eval/stream/ExprInterpreter.scala | 21 +++++++ .../atlas/eval/stream/FinalExprEval.scala | 32 +++++------ .../eval/stream/FinalExprEvalSuite.scala | 56 +++++++++++++++++++ 3 files changed, 92 insertions(+), 17 deletions(-) diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/ExprInterpreter.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/ExprInterpreter.scala index 7cd27f423..38eba61f9 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/ExprInterpreter.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/ExprInterpreter.scala @@ -56,6 +56,7 @@ class ExprInterpreter(config: Config) { private val hostRewriter = new HostRewriter(config.getConfig("atlas.eval.host-rewrite")) + /** Evaluate a normal Atlas Graph URI and produce and graph config. */ def eval(uri: Uri): GraphConfig = { val graphCfg = grapher.toGraphConfig(uri) @@ -71,6 +72,26 @@ class ExprInterpreter(config: Config) { graphCfg.copy(query = rewritten.mkString(","), parsedQuery = Success(rewritten)) } + /** + * Evaluate a time series URI. This could be an normal Atlas Graph URI or a trace + * time series URI. If a non-time series URI is passed in the result will be None. + */ + def evalTimeSeries(uri: Uri): Option[GraphConfig] = { + determineExprType(uri) match { + case ExprType.TIME_SERIES => Some(eval(uri)) + case ExprType.TRACE_TIME_SERIES => Some(eval(toGraphUri(uri))) + case _ => None + } + } + + /** Convert a trace time series URI to a normal graph URI for generating the config. */ + private def toGraphUri(uri: Uri): Uri = { + val ts = evalTraceTimeSeries(uri) + val newExpr = ts.map(_.expr).mkString(",") + val newQuery = uri.query().filterNot(_._1 == "q").prepended("q" -> newExpr) + uri.withQuery(Uri.Query(newQuery*)) + } + /** * Check that data expressions are supported. The streaming path doesn't support * time shifts, filters, and integral. The filters and integral are excluded because diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/FinalExprEval.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/FinalExprEval.scala index ecfb13573..f47d1bdb1 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/FinalExprEval.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/FinalExprEval.scala @@ -102,25 +102,23 @@ private[stream] class FinalExprEval(exprInterpreter: ExprInterpreter) // Compute the new set of expressions recipients = sources - .filter { s => - exprInterpreter.determineExprType(Uri(s.uri)).isTimeSeriesType - } .flatMap { s => try { - val graphCfg = exprInterpreter.eval(Uri(s.uri)) - val exprs = graphCfg.exprs - // Reuse the previous evaluated expression if available. States for the stateful - // expressions are maintained in an IdentityHashMap so if the instances change - // the state will be reset. - exprs.map { e => - val paletteName = - if (graphCfg.flags.presentationMetadataEnabled) { - val axis = e.axis.getOrElse(0) - Some(graphCfg.flags.axisPalette(graphCfg.settings, axis)) - } else { - None - } - previous.getOrElse(e, e) -> ExprInfo(s.id, paletteName) + exprInterpreter.evalTimeSeries(Uri(s.uri)).toList.flatMap { graphCfg => + val exprs = graphCfg.exprs + // Reuse the previous evaluated expression if available. States for the stateful + // expressions are maintained in an IdentityHashMap so if the instances change + // the state will be reset. + exprs.map { e => + val paletteName = + if (graphCfg.flags.presentationMetadataEnabled) { + val axis = e.axis.getOrElse(0) + Some(graphCfg.flags.axisPalette(graphCfg.settings, axis)) + } else { + None + } + previous.getOrElse(e, e) -> ExprInfo(s.id, paletteName) + } } } catch { case e: Exception => diff --git a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/FinalExprEvalSuite.scala b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/FinalExprEvalSuite.scala index a9ede5a63..359bdcfbc 100644 --- a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/FinalExprEvalSuite.scala +++ b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/FinalExprEvalSuite.scala @@ -762,4 +762,60 @@ class FinalExprEvalSuite extends FunSuite { assertEquals(ts.styleMetadata.get.color, Color.RED) assertEquals(ts.styleMetadata.get.lineStyle, LineStyle.STACK) } + + test("trace time series: aggregate with single datapoint per group") { + val expr = DataExpr.Sum(Query.Equal("name", "rps")) + val tags = Map("name" -> "rps") + val input = List( + sources(ds("a", s"http://atlas/traces/graph?q=app,foo,:eq,$expr,:span-time-series")), + group(0), + group(1, AggrDatapoint(0, step, expr, "i-1", tags, 42.0)), + group(2, AggrDatapoint(0, step, expr, "i-1", tags, 43.0)), + group(3, AggrDatapoint(0, step, expr, "i-1", tags, 44.0)) + ) + + val output = run(input) + + val timeseries = output.filter(isTimeSeries) + assertEquals(timeseries.size, 4) + val expectedTimeseries = List(Double.NaN, 42.0, 43.0, 44.0) + timeseries.zip(expectedTimeseries).foreach { + case (env, expectedValue) => + assertEquals(env.id, "a") + val ts = env.message.asInstanceOf[TimeSeriesMessage] + checkValue(ts, expectedValue) + } + + val dataRateMsgs = output.filter(isEvalDataRate).filter(_.id == "a") + assert(dataRateMsgs.size == 3) + val expectedSizes = Array( + Array( + EvalDataSize(1, Map(expr.toString -> 1)), + EvalDataSize(1, Map(expr.toString -> 1)), + EvalDataSize(1) + ), + Array( + EvalDataSize(1, Map(expr.toString -> 1)), + EvalDataSize(1, Map(expr.toString -> 1)), + EvalDataSize(1) + ), + Array( + EvalDataSize(1, Map(expr.toString -> 1)), + EvalDataSize(1, Map(expr.toString -> 1)), + EvalDataSize(1) + ) + ) + dataRateMsgs.zipWithIndex.foreach(envAndIndex => { + val rate = getAsEvalDataRate(envAndIndex._1) + val i = envAndIndex._2 + checkRate( + rate, + 60000 * (i + 1), + 60000, + expectedSizes(i)(0), + expectedSizes(i)(1), + expectedSizes(i)(2) + ) + }) + } } From 26df9ea6f626431f55b9db3aa1b7ab0845bcd56d Mon Sep 17 00:00:00 2001 From: Brian Harrington Date: Thu, 28 Mar 2024 20:51:34 -0500 Subject: [PATCH 2/2] fix test --- .../scala/com/netflix/atlas/eval/stream/EvaluatorSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/EvaluatorSuite.scala b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/EvaluatorSuite.scala index 5dc45cd0e..07b6b1bbb 100644 --- a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/EvaluatorSuite.scala +++ b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/EvaluatorSuite.scala @@ -789,6 +789,6 @@ class EvaluatorSuite extends FunSuite { "synthetic://test/traces/graph?q=name,cpu,:eq,nf.app,foo,:eq,:and&step=1ms&numStepIntervals=5" val future = Source.fromPublisher(evaluator.createPublisher(uri)).runWith(Sink.seq) val result = Await.result(future, scala.concurrent.duration.Duration.Inf) - assertEquals(result.size, 5) + assertEquals(result.size, 10) } }