From fc5797ee39b1bccd66923014f195f6e0f7502aa4 Mon Sep 17 00:00:00 2001 From: brharrington Date: Fri, 29 Mar 2024 12:16:41 -0500 Subject: [PATCH] eval: fix test case for trace time series (#1642) Update test case to have the correct expression for a trace time series subscription response. --- .../eval/stream/LwcToAggrDatapoint.scala | 26 ++++++++++++++----- .../eval/stream/LwcToAggrDatapointSuite.scala | 2 +- 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/LwcToAggrDatapoint.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/LwcToAggrDatapoint.scala index 07d8d6fac..25a86ab73 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/LwcToAggrDatapoint.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/LwcToAggrDatapoint.scala @@ -17,6 +17,8 @@ package com.netflix.atlas.eval.stream import com.netflix.atlas.core.model.DataExpr import com.netflix.atlas.core.model.DataVocabulary +import com.netflix.atlas.core.model.ModelExtractors +import com.netflix.atlas.core.model.TraceVocabulary import com.netflix.atlas.core.stacklang.Interpreter import org.apache.pekko.stream.Attributes import org.apache.pekko.stream.FlowShape @@ -35,6 +37,7 @@ import com.netflix.atlas.eval.model.LwcHeartbeat import com.netflix.atlas.eval.model.LwcSubscription import com.netflix.atlas.eval.model.LwcSubscriptionV2 import com.netflix.atlas.eval.model.DatapointsTuple +import com.netflix.atlas.eval.model.ExprType /** * Process the SSE output from an LWC service and convert it into a stream of @@ -90,7 +93,7 @@ private[stream] class LwcToAggrDatapoint(context: StreamContext) private def updateStateV2(sub: LwcSubscriptionV2): Unit = { sub.subExprs.foreach { s => if (sub.exprType.isTimeSeriesType && !tsState.contains(s.id)) { - val expr = parseExpr(s.expression) + val expr = parseExpr(s.expression, sub.exprType) tsState.put(s.id, DatapointMetadata(s.expression, expr, s.step)) } if (sub.exprType.isEventType && !eventState.contains(s.id)) { @@ -153,12 +156,23 @@ private[stream] object LwcToAggrDatapoint { case class DatapointMetadata(dataExprStr: String, dataExpr: DataExpr, step: Long) - private val interpreter = Interpreter(DataVocabulary.allWords) + private val dataInterpreter = Interpreter(DataVocabulary.allWords) + private val traceInterpreter = Interpreter(TraceVocabulary.allWords) - private def parseExpr(input: String): DataExpr = { - interpreter.execute(input).stack match { - case (expr: DataExpr) :: Nil => expr - case _ => throw new IllegalArgumentException(s"invalid expr: $input") + private def parseExpr(input: String, exprType: ExprType = ExprType.TIME_SERIES): DataExpr = { + exprType match { + case ExprType.TIME_SERIES => + dataInterpreter.execute(input).stack match { + case (expr: DataExpr) :: Nil => expr + case _ => throw new IllegalArgumentException(s"invalid expr: $input") + } + case ExprType.TRACE_TIME_SERIES => + traceInterpreter.execute(input).stack match { + case ModelExtractors.TraceTimeSeriesType(tq) :: Nil => tq.expr.expr.dataExprs.head + case _ => throw new IllegalArgumentException(s"invalid expr: $input") + } + case _ => + throw new IllegalArgumentException(s"unsupported expression type: $exprType") } } } diff --git a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/LwcToAggrDatapointSuite.scala b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/LwcToAggrDatapointSuite.scala index bcf79f774..85abaa418 100644 --- a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/LwcToAggrDatapointSuite.scala +++ b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/LwcToAggrDatapointSuite.scala @@ -114,7 +114,7 @@ class LwcToAggrDatapointSuite extends FunSuite { val styleExpr = "name,cpu,:eq,:avg" val tsExpr = s"app,foo,:eq,$styleExpr,:span-time-series" def subExpr(n: String, e: String): String = { - s"""{"id":"$n","expression":"$e","frequency":$step}""" + s"""{"id":"$n","expression":"app,foo,:eq,$e,:span-time-series","frequency":$step}""" } val expr1 = subExpr("sum", "name,cpu,:eq,:sum") val expr2 = subExpr("count", "name,cpu,:eq,:count")