From 7e09ac40a2cc8cbbb8f1e34340607b20edc4e76a Mon Sep 17 00:00:00 2001 From: brharrington Date: Tue, 19 Sep 2023 15:45:58 -0500 Subject: [PATCH] eval: preserve state with multi-level grouping (#1579) When executing an expression in a streaming context, stateful operators used as input to a second level grouping were not preserving the state. --- .../netflix/atlas/core/model/MathExpr.scala | 2 +- .../eval/stream/FinalExprEvalSuite.scala | 48 +++++++++++++++++++ 2 files changed, 49 insertions(+), 1 deletion(-) diff --git a/atlas-core/src/main/scala/com/netflix/atlas/core/model/MathExpr.scala b/atlas-core/src/main/scala/com/netflix/atlas/core/model/MathExpr.scala index f5dbba4e8..eb754614b 100644 --- a/atlas-core/src/main/scala/com/netflix/atlas/core/model/MathExpr.scala +++ b/atlas-core/src/main/scala/com/netflix/atlas/core/model/MathExpr.scala @@ -798,7 +798,7 @@ object MathExpr { List(TimeSeries(tags, k, t.data)) } - ResultSet(this, newData, context.state) + ResultSet(this, newData, inner.state) } } diff --git a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/FinalExprEvalSuite.scala b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/FinalExprEvalSuite.scala index 97726e37f..221edfb03 100644 --- a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/FinalExprEvalSuite.scala +++ b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/FinalExprEvalSuite.scala @@ -461,6 +461,54 @@ class FinalExprEvalSuite extends FunSuite { }) } + test("multi-level group by with stateful operation") { + val expr = DataExpr.GroupBy(DataExpr.Sum(Query.Equal("name", "rps")), List("node", "app")) + val input = List( + sources(ds("a", s"http://atlas/graph?q=$expr,2,:rolling-max,:max,(,app,),:by")), + group( + 0, + AggrDatapoint(0, step, expr, "i-1", Map("app" -> "foo", "node" -> "i-1"), 42.0), + AggrDatapoint(0, step, expr, "i-2", Map("app" -> "foo", "node" -> "i-2"), 1.0) + ), + group( + 1, + AggrDatapoint(0, step, expr, "i-1", Map("app" -> "foo", "node" -> "i-1"), 42.0), + AggrDatapoint(0, step, expr, "i-2", Map("app" -> "foo", "node" -> "i-2"), 21.0) + ), + group( + 2, + AggrDatapoint(0, step, expr, "i-1", Map("app" -> "foo", "node" -> "i-1"), 42.0), + AggrDatapoint(0, step, expr, "i-2", Map("app" -> "foo", "node" -> "i-2"), 84.0) + ), + group( + 3, + AggrDatapoint(0, step, expr, "i-1", Map("app" -> "foo", "node" -> "i-1"), 42.0), + AggrDatapoint(0, step, expr, "i-2", Map("app" -> "foo", "node" -> "i-2"), 21.0) + ), + group( + 4, + AggrDatapoint(0, step, expr, "i-1", Map("app" -> "foo", "node" -> "i-1"), 21.0), + AggrDatapoint(0, step, expr, "i-2", Map("app" -> "foo", "node" -> "i-2"), 21.0) + ), + group( + 5, + AggrDatapoint(0, step, expr, "i-1", Map("app" -> "foo", "node" -> "i-1"), 21.0), + AggrDatapoint(0, step, expr, "i-2", Map("app" -> "foo", "node" -> "i-2"), 21.0) + ) + ) + + val output = run(input) + + val expected = Array(42.0, 42.0, 84.0, 84.0, 42.0, 21.0) + val timeseries = output.filter(isTimeSeries) + assertEquals(timeseries.size, 6) + timeseries.zip(expected).foreach { + case (envelope, expectedValue) => + val ts = envelope.message.asInstanceOf[TimeSeriesMessage] + checkValue(ts, expectedValue) + } + } + // https://github.com/Netflix/atlas/issues/762 test(":legend is honored") { val expr = DataExpr.Sum(Query.Equal("name", "rps"))