From 3c743621bcd421cdf684f377a3029fb3b9d2515b Mon Sep 17 00:00:00 2001 From: brharrington Date: Wed, 27 Mar 2024 21:41:27 -0500 Subject: [PATCH] eval: add new subscription message (#1633) Updated subscription message that includes the expr type. Done as a new message to keep backwards compability for existing usage of subscription. --- .../atlas/eval/model/LwcDataExpr.scala | 22 +------- .../atlas/eval/model/LwcMessages.scala | 49 ++++++++++++++--- .../atlas/eval/model/LwcSubscriptionV2.scala | 34 ++++++++++++ .../eval/stream/LwcToAggrDatapoint.scala | 31 +++++++++-- .../atlas/eval/model/LwcDataExprSuite.scala | 41 -------------- .../atlas/eval/model/LwcMessagesSuite.scala | 55 +++++++++++++++++++ 6 files changed, 155 insertions(+), 77 deletions(-) create mode 100644 atlas-eval/src/main/scala/com/netflix/atlas/eval/model/LwcSubscriptionV2.scala delete mode 100644 atlas-eval/src/test/scala/com/netflix/atlas/eval/model/LwcDataExprSuite.scala diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/LwcDataExpr.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/LwcDataExpr.scala index efcddb57f..c640fc9e8 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/LwcDataExpr.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/LwcDataExpr.scala @@ -16,10 +16,6 @@ package com.netflix.atlas.eval.model import com.fasterxml.jackson.annotation.JsonAlias -import com.fasterxml.jackson.annotation.JsonIgnore -import com.netflix.atlas.core.model.DataExpr -import com.netflix.atlas.core.model.DataVocabulary -import com.netflix.atlas.core.stacklang.Interpreter /** * Triple representing the id, data expression, and frequency for a given data @@ -35,20 +31,4 @@ import com.netflix.atlas.core.stacklang.Interpreter * @param step * The step size used for this stream of data. */ -case class LwcDataExpr(id: String, expression: String, @JsonAlias(Array("frequency")) step: Long) { - - @JsonIgnore - lazy val expr: DataExpr = LwcDataExpr.parseExpr(expression) -} - -object LwcDataExpr { - - private val interpreter = Interpreter(DataVocabulary.allWords) - - private def parseExpr(input: String): DataExpr = { - interpreter.execute(input).stack match { - case (expr: DataExpr) :: Nil => expr - case _ => throw new IllegalArgumentException(s"invalid expr: $input") - } - } -} +case class LwcDataExpr(id: String, expression: String, @JsonAlias(Array("frequency")) step: Long) {} diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/LwcMessages.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/LwcMessages.scala index 72e7fca9e..e636ac7df 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/LwcMessages.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/LwcMessages.scala @@ -66,7 +66,12 @@ object LwcMessages { // LwcSubscription // - expression - var metrics: List[LwcDataExpr] = Nil + // - metrics + // LwcSubscriptionV2 + // - expression + // - exprType + // - subExprs + var subExprs: List[LwcDataExpr] = Nil // LwcDatapoint var timestamp: Long = -1L @@ -97,7 +102,8 @@ object LwcMessages { case "expression" => expression = nextString(parser) case "exprType" => exprType = ExprType.valueOf(nextString(parser)) case "step" => step = nextLong(parser) - case "metrics" => metrics = parseDataExprs(parser) + case "metrics" => subExprs = parseDataExprs(parser) + case "subExprs" => subExprs = parseDataExprs(parser) case "timestamp" => timestamp = nextLong(parser) case "id" => id = nextString(parser) @@ -117,13 +123,14 @@ object LwcMessages { } typeDesc match { - case "expression" => LwcExpression(expression, exprType, step) - case "subscription" => LwcSubscription(expression, metrics) - case "datapoint" => LwcDatapoint(timestamp, id, tags, value) - case "event" => LwcEvent(id, payload) - case "diagnostic" => LwcDiagnosticMessage(id, diagnosticMessage) - case "heartbeat" => LwcHeartbeat(timestamp, step) - case _ => DiagnosticMessage(typeDesc, message, None) + case "expression" => LwcExpression(expression, exprType, step) + case "subscription" => LwcSubscription(expression, subExprs) + case "subscription-v2" => LwcSubscriptionV2(expression, exprType, subExprs) + case "datapoint" => LwcDatapoint(timestamp, id, tags, value) + case "event" => LwcEvent(id, payload) + case "diagnostic" => LwcDiagnosticMessage(id, diagnosticMessage) + case "heartbeat" => LwcHeartbeat(timestamp, step) + case _ => DiagnosticMessage(typeDesc, message, None) } } finally { parser.close() @@ -180,6 +187,7 @@ object LwcMessages { private val Diagnostic = 4 private val Heartbeat = 5 private val Event = 6 + private val SubscriptionV2 = 7 /** * Encode messages using Jackson's smile format into a ByteString. @@ -215,6 +223,17 @@ object LwcMessages { gen.writeNumber(m.step) } gen.writeEndArray() + case msg: LwcSubscriptionV2 => + gen.writeNumber(SubscriptionV2) + gen.writeString(msg.expression) + gen.writeString(msg.exprType.name()) + gen.writeStartArray() + msg.subExprs.foreach { s => + gen.writeString(s.id) + gen.writeString(s.expression) + gen.writeNumber(s.step) + } + gen.writeEndArray() case msg: LwcDatapoint => gen.writeNumber(Datapoint) gen.writeNumber(msg.timestamp) @@ -288,6 +307,18 @@ object LwcMessages { ) } builder += LwcSubscription(expression, dataExprs.result()) + case SubscriptionV2 => + val expression = parser.nextTextValue() + val exprType = ExprType.valueOf(parser.nextTextValue()) + val subExprs = List.newBuilder[LwcDataExpr] + foreachItem(parser) { + subExprs += LwcDataExpr( + parser.getText, + parser.nextTextValue(), + parser.nextLongValue(-1L) + ) + } + builder += LwcSubscriptionV2(expression, exprType, subExprs.result()) case Datapoint => val timestamp = parser.nextLongValue(-1L) val id = parser.nextTextValue() diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/LwcSubscriptionV2.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/LwcSubscriptionV2.scala new file mode 100644 index 000000000..5dc8f8d23 --- /dev/null +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/LwcSubscriptionV2.scala @@ -0,0 +1,34 @@ +/* + * Copyright 2014-2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.atlas.eval.model + +import com.netflix.atlas.json.JsonSupport + +/** + * Subscription message that is returned by the LWC service. + * + * @param expression + * Expression that was used for the initial subscription. + * @param exprType + * Indicates the type of expression for the subscription. This is typically determined + * based on the endpoint used on the URI. + * @param subExprs + * Data expressions that result from the root expression. + */ +case class LwcSubscriptionV2(expression: String, exprType: ExprType, subExprs: List[LwcDataExpr]) + extends JsonSupport { + val `type`: String = "subscription-v2" +} diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/LwcToAggrDatapoint.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/LwcToAggrDatapoint.scala index 594aa5c17..57256ea38 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/LwcToAggrDatapoint.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/LwcToAggrDatapoint.scala @@ -15,6 +15,9 @@ */ package com.netflix.atlas.eval.stream +import com.netflix.atlas.core.model.DataExpr +import com.netflix.atlas.core.model.DataVocabulary +import com.netflix.atlas.core.stacklang.Interpreter import org.apache.pekko.stream.Attributes import org.apache.pekko.stream.FlowShape import org.apache.pekko.stream.Inlet @@ -25,7 +28,6 @@ import org.apache.pekko.stream.stage.InHandler import org.apache.pekko.stream.stage.OutHandler import com.netflix.atlas.eval.model.AggrDatapoint import com.netflix.atlas.eval.model.EventMessage -import com.netflix.atlas.eval.model.LwcDataExpr import com.netflix.atlas.eval.model.LwcDatapoint import com.netflix.atlas.eval.model.LwcDiagnosticMessage import com.netflix.atlas.eval.model.LwcEvent @@ -39,6 +41,8 @@ import com.netflix.atlas.eval.model.LwcSubscription private[stream] class LwcToAggrDatapoint(context: StreamContext) extends GraphStage[FlowShape[List[AnyRef], List[AggrDatapoint]]] { + import LwcToAggrDatapoint.* + private val unknown = context.registry.counter("atlas.eval.unknownMessages") private val in = Inlet[List[AnyRef]]("LwcToAggrDatapoint.in") @@ -49,7 +53,7 @@ private[stream] class LwcToAggrDatapoint(context: StreamContext) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = { new GraphStageLogic(shape) with InHandler with OutHandler { - private[this] val state = scala.collection.mutable.AnyRefMap.empty[String, LwcDataExpr] + private[this] val state = scala.collection.mutable.AnyRefMap.empty[String, DatapointMetadata] override def onPush(): Unit = { val builder = List.newBuilder[AggrDatapoint] @@ -71,7 +75,8 @@ private[stream] class LwcToAggrDatapoint(context: StreamContext) private def updateState(sub: LwcSubscription): Unit = { sub.metrics.foreach { m => if (!state.contains(m.id)) { - state.put(m.id, m) + val expr = parseExpr(m.expression) + state.put(m.id, DatapointMetadata(m.expression, expr, m.step)) } } } @@ -79,7 +84,7 @@ private[stream] class LwcToAggrDatapoint(context: StreamContext) private def pushDatapoint(dp: LwcDatapoint): Option[AggrDatapoint] = { state.get(dp.id) match { case Some(sub) => - val expr = sub.expr + val expr = sub.dataExpr val step = sub.step Some(AggrDatapoint(dp.timestamp, step, expr, "datapoint", dp.tags, dp.value)) case None => @@ -90,14 +95,14 @@ private[stream] class LwcToAggrDatapoint(context: StreamContext) private def pushEvent(event: LwcEvent): Unit = { state.get(event.id) match { - case Some(sub) => context.log(sub.expr, EventMessage(event.payload)) + case Some(sub) => context.log(sub.dataExpr, EventMessage(event.payload)) case None => unknown.increment() } } private def pushDiagnosticMessage(diagMsg: LwcDiagnosticMessage): Unit = { state.get(diagMsg.id) match { - case Some(sub) => context.log(sub.expr, diagMsg.message) + case Some(sub) => context.log(sub.dataExpr, diagMsg.message) case None => unknown.increment() } } @@ -118,3 +123,17 @@ private[stream] class LwcToAggrDatapoint(context: StreamContext) } } } + +private[stream] object LwcToAggrDatapoint { + + case class DatapointMetadata(dataExprStr: String, dataExpr: DataExpr, step: Long) + + private val interpreter = Interpreter(DataVocabulary.allWords) + + private def parseExpr(input: String): DataExpr = { + interpreter.execute(input).stack match { + case (expr: DataExpr) :: Nil => expr + case _ => throw new IllegalArgumentException(s"invalid expr: $input") + } + } +} diff --git a/atlas-eval/src/test/scala/com/netflix/atlas/eval/model/LwcDataExprSuite.scala b/atlas-eval/src/test/scala/com/netflix/atlas/eval/model/LwcDataExprSuite.scala deleted file mode 100644 index eb2f987e1..000000000 --- a/atlas-eval/src/test/scala/com/netflix/atlas/eval/model/LwcDataExprSuite.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright 2014-2024 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.netflix.atlas.eval.model - -import com.netflix.atlas.core.model.ModelExtractors -import com.netflix.atlas.core.model.StyleExpr -import com.netflix.atlas.core.model.StyleVocabulary -import com.netflix.atlas.core.stacklang.Interpreter -import munit.FunSuite - -class LwcDataExprSuite extends FunSuite { - - private def styleExpr(str: String): StyleExpr = { - val interpreter = new Interpreter(StyleVocabulary.allWords) - interpreter.execute(str).stack match { - case ModelExtractors.PresentationType(v) :: Nil => v - case _ => throw new IllegalArgumentException(s"invalid expr: $str") - } - } - - test("group by equals") { - val exprStr = "statistic,max,:eq,name,foo,:eq,:and,:max,(,nf.asg,),:by" - val distExprStr = "name,foo,:eq,:dist-max,(,nf.asg,),:by" - val lwcExpr = LwcDataExpr("123", exprStr, 10L) - assertEquals(lwcExpr.expr, styleExpr(distExprStr).expr.dataExprs.head) - assertEquals(lwcExpr.expr.hashCode, styleExpr(exprStr).expr.hashCode) - } -} diff --git a/atlas-eval/src/test/scala/com/netflix/atlas/eval/model/LwcMessagesSuite.scala b/atlas-eval/src/test/scala/com/netflix/atlas/eval/model/LwcMessagesSuite.scala index 5026352cf..657e9b748 100644 --- a/atlas-eval/src/test/scala/com/netflix/atlas/eval/model/LwcMessagesSuite.scala +++ b/atlas-eval/src/test/scala/com/netflix/atlas/eval/model/LwcMessagesSuite.scala @@ -52,6 +52,46 @@ class LwcMessagesSuite extends FunSuite { assertEquals(actual, expected) } + test("subscription-v2 time series") { + val expr = "name,cpu,:eq,:avg" + val sum = "name,cpu,:eq,:sum" + val count = "name,cpu,:eq,:count" + val dataExprs = List(LwcDataExpr("a", sum, step), LwcDataExpr("b", count, step)) + val expected = LwcSubscriptionV2(expr, ExprType.TIME_SERIES, dataExprs) + val actual = LwcMessages.parse(Json.encode(expected)) + assertEquals(actual, expected) + } + + test("subscription-v2 events") { + val raw = "name,cpu,:eq" + val table = "name,cpu,:eq,(,name,value,),:table" + val expr = s"$raw,$table" + val dataExprs = List(LwcDataExpr("a", raw, 0L), LwcDataExpr("b", table, 0L)) + val expected = LwcSubscriptionV2(expr, ExprType.EVENTS, dataExprs) + val actual = LwcMessages.parse(Json.encode(expected)) + assertEquals(actual, expected) + } + + test("subscription-v2 trace events") { + val q1 = "app,www,:eq,app,db,:eq,:child" + val q2 = "app,www,:eq,app,foo,:eq,:span-and" + val expr = s"$q1,$q2" + val dataExprs = List(LwcDataExpr("a", q1, 0L), LwcDataExpr("b", q2, 0L)) + val expected = LwcSubscriptionV2(expr, ExprType.TRACE_EVENTS, dataExprs) + val actual = LwcMessages.parse(Json.encode(expected)) + assertEquals(actual, expected) + } + + test("subscription-v2 trace time series") { + val q1 = "app,www,:eq,app,db,:eq,:child" + val q2 = "app,www,:eq,app,foo,:eq,:span-and" + val expr = s"$q1,$q2" + val dataExprs = List(LwcDataExpr("a", q1, 0L), LwcDataExpr("b", q2, 0L)) + val expected = LwcSubscriptionV2(expr, ExprType.TRACE_TIME_SERIES, dataExprs) + val actual = LwcMessages.parse(Json.encode(expected)) + assertEquals(actual, expected) + } + test("datapoint") { val expected = LwcDatapoint(step, "a", Map("foo" -> "bar"), 42.0) val actual = LwcMessages.parse(Json.encode(expected)) @@ -125,6 +165,21 @@ class LwcMessagesSuite extends FunSuite { assertEquals(actual, expected.toList) } + test("batch: subscription-v2") { + val expected = (0 until 10).map { i => + LwcSubscriptionV2( + "name,cpu,:eq,:avg", + ExprType.TIME_SERIES, + List( + LwcDataExpr(s"$i", "name,cpu,:eq,:sum", i), + LwcDataExpr(s"$i", "name,cpu,:eq,:count", i) + ) + ) + } + val actual = LwcMessages.parseBatch(LwcMessages.encodeBatch(expected)) + assertEquals(actual, expected.toList) + } + test("batch: datapoint") { val expected = (0 until 10).map { i => LwcDatapoint(