Skip to content

Commit

Permalink
eval: add new subscription message (Netflix#1633)
Browse files Browse the repository at this point in the history
Updated subscription message that includes the expr type.
Done as a new message to keep backwards compability for
existing usage of subscription.
  • Loading branch information
brharrington authored and manolama committed May 22, 2024
1 parent 9af0ed4 commit 0ac5550
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@
package com.netflix.atlas.eval.model

import com.fasterxml.jackson.annotation.JsonAlias
import com.fasterxml.jackson.annotation.JsonIgnore
import com.netflix.atlas.core.model.DataExpr
import com.netflix.atlas.core.model.DataVocabulary
import com.netflix.atlas.core.stacklang.Interpreter

/**
* Triple representing the id, data expression, and frequency for a given data
Expand All @@ -35,20 +31,4 @@ import com.netflix.atlas.core.stacklang.Interpreter
* @param step
* The step size used for this stream of data.
*/
case class LwcDataExpr(id: String, expression: String, @JsonAlias(Array("frequency")) step: Long) {

@JsonIgnore
lazy val expr: DataExpr = LwcDataExpr.parseExpr(expression)
}

object LwcDataExpr {

private val interpreter = Interpreter(DataVocabulary.allWords)

private def parseExpr(input: String): DataExpr = {
interpreter.execute(input).stack match {
case (expr: DataExpr) :: Nil => expr
case _ => throw new IllegalArgumentException(s"invalid expr: $input")
}
}
}
case class LwcDataExpr(id: String, expression: String, @JsonAlias(Array("frequency")) step: Long) {}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,12 @@ object LwcMessages {

// LwcSubscription
// - expression
var metrics: List[LwcDataExpr] = Nil
// - metrics
// LwcSubscriptionV2
// - expression
// - exprType
// - subExprs
var subExprs: List[LwcDataExpr] = Nil

// LwcDatapoint
var timestamp: Long = -1L
Expand Down Expand Up @@ -97,7 +102,8 @@ object LwcMessages {
case "expression" => expression = nextString(parser)
case "exprType" => exprType = ExprType.valueOf(nextString(parser))
case "step" => step = nextLong(parser)
case "metrics" => metrics = parseDataExprs(parser)
case "metrics" => subExprs = parseDataExprs(parser)
case "subExprs" => subExprs = parseDataExprs(parser)

case "timestamp" => timestamp = nextLong(parser)
case "id" => id = nextString(parser)
Expand All @@ -117,13 +123,14 @@ object LwcMessages {
}

typeDesc match {
case "expression" => LwcExpression(expression, exprType, step)
case "subscription" => LwcSubscription(expression, metrics)
case "datapoint" => LwcDatapoint(timestamp, id, tags, value)
case "event" => LwcEvent(id, payload)
case "diagnostic" => LwcDiagnosticMessage(id, diagnosticMessage)
case "heartbeat" => LwcHeartbeat(timestamp, step)
case _ => DiagnosticMessage(typeDesc, message, None)
case "expression" => LwcExpression(expression, exprType, step)
case "subscription" => LwcSubscription(expression, subExprs)
case "subscription-v2" => LwcSubscriptionV2(expression, exprType, subExprs)
case "datapoint" => LwcDatapoint(timestamp, id, tags, value)
case "event" => LwcEvent(id, payload)
case "diagnostic" => LwcDiagnosticMessage(id, diagnosticMessage)
case "heartbeat" => LwcHeartbeat(timestamp, step)
case _ => DiagnosticMessage(typeDesc, message, None)
}
} finally {
parser.close()
Expand Down Expand Up @@ -180,6 +187,7 @@ object LwcMessages {
private val Diagnostic = 4
private val Heartbeat = 5
private val Event = 6
private val SubscriptionV2 = 7

/**
* Encode messages using Jackson's smile format into a ByteString.
Expand Down Expand Up @@ -215,6 +223,17 @@ object LwcMessages {
gen.writeNumber(m.step)
}
gen.writeEndArray()
case msg: LwcSubscriptionV2 =>
gen.writeNumber(SubscriptionV2)
gen.writeString(msg.expression)
gen.writeString(msg.exprType.name())
gen.writeStartArray()
msg.subExprs.foreach { s =>
gen.writeString(s.id)
gen.writeString(s.expression)
gen.writeNumber(s.step)
}
gen.writeEndArray()
case msg: LwcDatapoint =>
gen.writeNumber(Datapoint)
gen.writeNumber(msg.timestamp)
Expand Down Expand Up @@ -288,6 +307,18 @@ object LwcMessages {
)
}
builder += LwcSubscription(expression, dataExprs.result())
case SubscriptionV2 =>
val expression = parser.nextTextValue()
val exprType = ExprType.valueOf(parser.nextTextValue())
val subExprs = List.newBuilder[LwcDataExpr]
foreachItem(parser) {
subExprs += LwcDataExpr(
parser.getText,
parser.nextTextValue(),
parser.nextLongValue(-1L)
)
}
builder += LwcSubscriptionV2(expression, exprType, subExprs.result())
case Datapoint =>
val timestamp = parser.nextLongValue(-1L)
val id = parser.nextTextValue()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2014-2024 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.atlas.eval.model

import com.netflix.atlas.json.JsonSupport

/**
* Subscription message that is returned by the LWC service.
*
* @param expression
* Expression that was used for the initial subscription.
* @param exprType
* Indicates the type of expression for the subscription. This is typically determined
* based on the endpoint used on the URI.
* @param subExprs
* Data expressions that result from the root expression.
*/
case class LwcSubscriptionV2(expression: String, exprType: ExprType, subExprs: List[LwcDataExpr])
extends JsonSupport {
val `type`: String = "subscription-v2"
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
*/
package com.netflix.atlas.eval.stream

import com.netflix.atlas.core.model.DataExpr
import com.netflix.atlas.core.model.DataVocabulary
import com.netflix.atlas.core.stacklang.Interpreter
import org.apache.pekko.stream.Attributes
import org.apache.pekko.stream.FlowShape
import org.apache.pekko.stream.Inlet
Expand All @@ -25,7 +28,6 @@ import org.apache.pekko.stream.stage.InHandler
import org.apache.pekko.stream.stage.OutHandler
import com.netflix.atlas.eval.model.AggrDatapoint
import com.netflix.atlas.eval.model.EventMessage
import com.netflix.atlas.eval.model.LwcDataExpr
import com.netflix.atlas.eval.model.LwcDatapoint
import com.netflix.atlas.eval.model.LwcDiagnosticMessage
import com.netflix.atlas.eval.model.LwcEvent
Expand All @@ -39,6 +41,8 @@ import com.netflix.atlas.eval.model.LwcSubscription
private[stream] class LwcToAggrDatapoint(context: StreamContext)
extends GraphStage[FlowShape[List[AnyRef], List[AggrDatapoint]]] {

import LwcToAggrDatapoint.*

private val unknown = context.registry.counter("atlas.eval.unknownMessages")

private val in = Inlet[List[AnyRef]]("LwcToAggrDatapoint.in")
Expand All @@ -49,7 +53,7 @@ private[stream] class LwcToAggrDatapoint(context: StreamContext)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
new GraphStageLogic(shape) with InHandler with OutHandler {

private[this] val state = scala.collection.mutable.AnyRefMap.empty[String, LwcDataExpr]
private[this] val state = scala.collection.mutable.AnyRefMap.empty[String, DatapointMetadata]

override def onPush(): Unit = {
val builder = List.newBuilder[AggrDatapoint]
Expand All @@ -71,15 +75,16 @@ private[stream] class LwcToAggrDatapoint(context: StreamContext)
private def updateState(sub: LwcSubscription): Unit = {
sub.metrics.foreach { m =>
if (!state.contains(m.id)) {
state.put(m.id, m)
val expr = parseExpr(m.expression)
state.put(m.id, DatapointMetadata(m.expression, expr, m.step))
}
}
}

private def pushDatapoint(dp: LwcDatapoint): Option[AggrDatapoint] = {
state.get(dp.id) match {
case Some(sub) =>
val expr = sub.expr
val expr = sub.dataExpr
val step = sub.step
Some(AggrDatapoint(dp.timestamp, step, expr, "datapoint", dp.tags, dp.value))
case None =>
Expand All @@ -90,14 +95,14 @@ private[stream] class LwcToAggrDatapoint(context: StreamContext)

private def pushEvent(event: LwcEvent): Unit = {
state.get(event.id) match {
case Some(sub) => context.log(sub.expr, EventMessage(event.payload))
case Some(sub) => context.log(sub.dataExpr, EventMessage(event.payload))
case None => unknown.increment()
}
}

private def pushDiagnosticMessage(diagMsg: LwcDiagnosticMessage): Unit = {
state.get(diagMsg.id) match {
case Some(sub) => context.log(sub.expr, diagMsg.message)
case Some(sub) => context.log(sub.dataExpr, diagMsg.message)
case None => unknown.increment()
}
}
Expand All @@ -118,3 +123,17 @@ private[stream] class LwcToAggrDatapoint(context: StreamContext)
}
}
}

private[stream] object LwcToAggrDatapoint {

case class DatapointMetadata(dataExprStr: String, dataExpr: DataExpr, step: Long)

private val interpreter = Interpreter(DataVocabulary.allWords)

private def parseExpr(input: String): DataExpr = {
interpreter.execute(input).stack match {
case (expr: DataExpr) :: Nil => expr
case _ => throw new IllegalArgumentException(s"invalid expr: $input")
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,46 @@ class LwcMessagesSuite extends FunSuite {
assertEquals(actual, expected)
}

test("subscription-v2 time series") {
val expr = "name,cpu,:eq,:avg"
val sum = "name,cpu,:eq,:sum"
val count = "name,cpu,:eq,:count"
val dataExprs = List(LwcDataExpr("a", sum, step), LwcDataExpr("b", count, step))
val expected = LwcSubscriptionV2(expr, ExprType.TIME_SERIES, dataExprs)
val actual = LwcMessages.parse(Json.encode(expected))
assertEquals(actual, expected)
}

test("subscription-v2 events") {
val raw = "name,cpu,:eq"
val table = "name,cpu,:eq,(,name,value,),:table"
val expr = s"$raw,$table"
val dataExprs = List(LwcDataExpr("a", raw, 0L), LwcDataExpr("b", table, 0L))
val expected = LwcSubscriptionV2(expr, ExprType.EVENTS, dataExprs)
val actual = LwcMessages.parse(Json.encode(expected))
assertEquals(actual, expected)
}

test("subscription-v2 trace events") {
val q1 = "app,www,:eq,app,db,:eq,:child"
val q2 = "app,www,:eq,app,foo,:eq,:span-and"
val expr = s"$q1,$q2"
val dataExprs = List(LwcDataExpr("a", q1, 0L), LwcDataExpr("b", q2, 0L))
val expected = LwcSubscriptionV2(expr, ExprType.TRACE_EVENTS, dataExprs)
val actual = LwcMessages.parse(Json.encode(expected))
assertEquals(actual, expected)
}

test("subscription-v2 trace time series") {
val q1 = "app,www,:eq,app,db,:eq,:child"
val q2 = "app,www,:eq,app,foo,:eq,:span-and"
val expr = s"$q1,$q2"
val dataExprs = List(LwcDataExpr("a", q1, 0L), LwcDataExpr("b", q2, 0L))
val expected = LwcSubscriptionV2(expr, ExprType.TRACE_TIME_SERIES, dataExprs)
val actual = LwcMessages.parse(Json.encode(expected))
assertEquals(actual, expected)
}

test("datapoint") {
val expected = LwcDatapoint(step, "a", Map("foo" -> "bar"), 42.0)
val actual = LwcMessages.parse(Json.encode(expected))
Expand Down Expand Up @@ -125,6 +165,21 @@ class LwcMessagesSuite extends FunSuite {
assertEquals(actual, expected.toList)
}

test("batch: subscription-v2") {
val expected = (0 until 10).map { i =>
LwcSubscriptionV2(
"name,cpu,:eq,:avg",
ExprType.TIME_SERIES,
List(
LwcDataExpr(s"$i", "name,cpu,:eq,:sum", i),
LwcDataExpr(s"$i", "name,cpu,:eq,:count", i)
)
)
}
val actual = LwcMessages.parseBatch(LwcMessages.encodeBatch(expected))
assertEquals(actual, expected.toList)
}

test("batch: datapoint") {
val expected = (0 until 10).map { i =>
LwcDatapoint(
Expand Down

0 comments on commit 0ac5550

Please sign in to comment.