Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eval: add event message type #1629

Merged
merged 1 commit into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2014-2024 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.atlas.eval.model

import com.fasterxml.jackson.core.JsonGenerator
import com.fasterxml.jackson.databind.JsonNode
import com.netflix.atlas.json.Json
import com.netflix.atlas.json.JsonSupport

/**
* Message type use for events to forward to a consumer.
*/
case class EventMessage(payload: JsonNode) extends JsonSupport {

override def encode(gen: JsonGenerator): Unit = {
gen.writeStartObject()
gen.writeStringField("type", "event")
gen.writeFieldName("payload")
Json.encode(gen, payload)
gen.writeEndObject()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2014-2024 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.atlas.eval.model

import com.fasterxml.jackson.databind.JsonNode
import com.netflix.atlas.json.JsonSupport

/**
* Raw event data to pass through to the consumer.
*
* @param id
* Identifies the expression that resulted in this datapoint being generated.
* @param payload
* Raw event payload.
*/
case class LwcEvent(id: String, payload: JsonNode) extends JsonSupport {
val `type`: String = "event"
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package com.netflix.atlas.eval.model
import org.apache.pekko.util.ByteString
import com.fasterxml.jackson.core.JsonParser
import com.fasterxml.jackson.core.JsonToken
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.NullNode
import com.netflix.atlas.core.util.SmallHashMap
import com.netflix.atlas.core.util.SortedTagMap
import com.netflix.atlas.json.Json
Expand All @@ -32,6 +34,9 @@ import java.io.ByteArrayOutputStream
*/
object LwcMessages {

// For reading arbitrary json structures for events
private val mapper = Json.newMapper

/**
* Parse the message string into an internal model object based on the type.
*/
Expand Down Expand Up @@ -69,6 +74,9 @@ object LwcMessages {
var tags: Map[String, String] = Map.empty
var value: Double = Double.NaN

// LwcEvent
var payload: JsonNode = NullNode.instance

// LwcDiagnosticMessage
// - id
// - message: DiagnosticMessage
Expand Down Expand Up @@ -96,6 +104,8 @@ object LwcMessages {
case "tags" => tags = parseTags(parser)
case "value" => value = nextDouble(parser)

case "payload" => payload = nextTree(parser)

case "message" =>
val t = parser.nextToken()
if (t == JsonToken.VALUE_STRING)
Expand All @@ -110,6 +120,7 @@ object LwcMessages {
case "expression" => LwcExpression(expression, exprType, step)
case "subscription" => LwcSubscription(expression, metrics)
case "datapoint" => LwcDatapoint(timestamp, id, tags, value)
case "event" => LwcEvent(id, payload)
case "diagnostic" => LwcDiagnosticMessage(id, diagnosticMessage)
case "heartbeat" => LwcHeartbeat(timestamp, step)
case _ => DiagnosticMessage(typeDesc, message, None)
Expand All @@ -119,6 +130,11 @@ object LwcMessages {
}
}

private def nextTree(parser: JsonParser): JsonNode = {
parser.nextToken()
mapper.readTree[JsonNode](parser)
}

private[model] def parseDataExprs(parser: JsonParser): List[LwcDataExpr] = {
val builder = List.newBuilder[LwcDataExpr]
foreachItem(parser) {
Expand Down Expand Up @@ -163,6 +179,7 @@ object LwcMessages {
private val LwcDiagnostic = 3
private val Diagnostic = 4
private val Heartbeat = 5
private val Event = 6

/**
* Encode messages using Jackson's smile format into a ByteString.
Expand Down Expand Up @@ -224,8 +241,12 @@ object LwcMessages {
gen.writeNumber(Heartbeat)
gen.writeNumber(msg.timestamp)
gen.writeNumber(msg.step)
case _ =>
throw new MatchError("foo")
case msg: LwcEvent =>
gen.writeNumber(Event)
gen.writeString(msg.id)
mapper.writeTree(gen, msg.payload)
case msg =>
throw new MatchError(s"$msg")
}
gen.writeEndArray()
} finally {
Expand Down Expand Up @@ -286,6 +307,9 @@ object LwcMessages {
val timestamp = parser.nextLongValue(-1L)
val step = parser.nextLongValue(-1L)
builder += LwcHeartbeat(timestamp, step)
case Event =>
val id = parser.nextTextValue()
builder += LwcEvent(id, nextTree(parser))
case v =>
throw new MatchError(s"invalid type id: $v")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ import org.apache.pekko.stream.stage.GraphStageLogic
import org.apache.pekko.stream.stage.InHandler
import org.apache.pekko.stream.stage.OutHandler
import com.netflix.atlas.eval.model.AggrDatapoint
import com.netflix.atlas.eval.model.EventMessage
import com.netflix.atlas.eval.model.LwcDataExpr
import com.netflix.atlas.eval.model.LwcDatapoint
import com.netflix.atlas.eval.model.LwcDiagnosticMessage
import com.netflix.atlas.eval.model.LwcEvent
import com.netflix.atlas.eval.model.LwcHeartbeat
import com.netflix.atlas.eval.model.LwcSubscription

Expand Down Expand Up @@ -54,6 +56,7 @@ private[stream] class LwcToAggrDatapoint(context: StreamContext)
grab(in).foreach {
case sb: LwcSubscription => updateState(sb)
case dp: LwcDatapoint => builder ++= pushDatapoint(dp)
case ev: LwcEvent => pushEvent(ev)
case dg: LwcDiagnosticMessage => pushDiagnosticMessage(dg)
case hb: LwcHeartbeat => builder += pushHeartbeat(hb)
case _ =>
Expand Down Expand Up @@ -85,6 +88,13 @@ private[stream] class LwcToAggrDatapoint(context: StreamContext)
}
}

private def pushEvent(event: LwcEvent): Unit = {
state.get(event.id) match {
case Some(sub) => context.log(sub.expr, EventMessage(event.payload))
case None => unknown.increment()
}
}

private def pushDiagnosticMessage(diagMsg: LwcDiagnosticMessage): Unit = {
state.get(diagMsg.id) match {
case Some(sub) => context.log(sub.expr, diagMsg.message)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import com.netflix.atlas.core.model.Query
import com.netflix.atlas.core.util.Streams
import com.netflix.atlas.eval.stream.Evaluator.DataSource
import com.netflix.atlas.eval.stream.Evaluator.DataSources
import com.netflix.atlas.json.JsonSupport
import com.netflix.atlas.pekko.AccessLogger
import com.netflix.atlas.pekko.DiagnosticMessage
import com.netflix.atlas.pekko.StreamOps
Expand Down Expand Up @@ -233,7 +234,7 @@ private[stream] class StreamContext(
/**
* Send a diagnostic message to all data sources that use a particular data expression.
*/
def log(expr: DataExpr, msg: DiagnosticMessage): Unit = {
def log(expr: DataExpr, msg: JsonSupport): Unit = {
dataExprMap.get(expr).foreach { ds =>
ds.foreach(s => dsLogger(s, msg))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.netflix.atlas.eval.model

import com.fasterxml.jackson.databind.JsonNode
import org.apache.pekko.util.ByteString
import com.netflix.atlas.core.util.Streams
import com.netflix.atlas.json.Json
Expand Down Expand Up @@ -63,6 +64,13 @@ class LwcMessagesSuite extends FunSuite {
assertEquals(actual, expected)
}

test("event") {
val payload = Json.decode[JsonNode]("""{"foo":"bar"}""")
val expected = LwcEvent("123", payload)
val actual = LwcMessages.parse(Json.encode(expected))
assertEquals(actual, expected)
}

test("diagnostic message") {
val expected = DiagnosticMessage.error("something bad happened")
val actual = LwcMessages.parse(Json.encode(expected))
Expand Down Expand Up @@ -130,6 +138,16 @@ class LwcMessagesSuite extends FunSuite {
assertEquals(actual, expected.toList)
}

test("batch: event") {
val expected = (0 until 10).map { i =>
val tags = Map("name" -> "cpu", "node" -> s"i-$i")
val payload = Json.decode[JsonNode](Json.encode(tags))
LwcEvent(s"$i", payload)
}
val actual = LwcMessages.parseBatch(LwcMessages.encodeBatch(expected))
assertEquals(actual, expected.toList)
}

test("batch: lwc diagnostic") {
val expected = (0 until 10).map { i =>
LwcDiagnosticMessage(s"$i", DiagnosticMessage.error("foo"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import com.netflix.atlas.chart.util.SrcPath
import com.netflix.atlas.eval.model.ArrayData
import com.netflix.atlas.eval.model.LwcDatapoint
import com.netflix.atlas.eval.model.LwcDiagnosticMessage
import com.netflix.atlas.eval.model.LwcEvent
import com.netflix.atlas.eval.model.LwcExpression
import com.netflix.atlas.eval.model.LwcHeartbeat
import com.netflix.atlas.eval.model.LwcMessages
Expand Down Expand Up @@ -117,6 +118,7 @@ class EvaluatorSuite extends FunSuite {
case m: LwcExpression => Option(m.`type`)
case m: LwcSubscription => Option(m.`type`)
case m: LwcDatapoint => Option(m.`type`)
case m: LwcEvent => Option(m.`type`)
case m: LwcDiagnosticMessage => Option(m.`type`)
case m: LwcHeartbeat => Option(m.`type`)
case _ => None
Expand Down
Loading