Skip to content

Commit

Permalink
add event sample operator (#1727)
Browse files Browse the repository at this point in the history
This operator allows events to be sampled based on a set
of tag keys. It works similar to an analytics group by,
but will capture some projection fields for a sample event
to provide more context to the consumer.

This change also adds a max groups limit for the LWC events
client. The limit is needed to help avoid excessive memory
usage if the sampling keys have a high cardinality.
  • Loading branch information
brharrington authored Nov 20, 2024
1 parent 36ebeaf commit abcc280
Show file tree
Hide file tree
Showing 22 changed files with 555 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,29 @@ object EventExpr {
Interpreter.append(builder, query, columns, ":table")
}
}

/**
* Expression that specifies how to map an event to a simple row with the specified columns.
*
* @param query
* Query to determine if an event should be matched.
* @param sampleBy
* The set of tag values to extract for purposes of the sampling groups. A value will be
* sent for each distinct sample group.
* @param projectionKeys
* Set of columns to export into a row.
*/
case class Sample(query: Query, sampleBy: List[String], projectionKeys: List[String])
extends EventExpr {

require(sampleBy.nonEmpty, "sampleBy cannot be empty")

override def append(builder: java.lang.StringBuilder): Unit = {
Interpreter.append(builder, query, sampleBy, projectionKeys, ":sample")
}

def dataExpr: DataExpr = {
DataExpr.GroupBy(DataExpr.Sum(query), sampleBy)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ object EventVocabulary extends Vocabulary {

val dependsOn: List[Vocabulary] = List(QueryVocabulary)

override def words: List[Word] = List(TableWord)
override def words: List[Word] = List(SampleWord, TableWord)

case object TableWord extends SimpleWord {

Expand All @@ -50,4 +50,31 @@ object EventVocabulary extends Vocabulary {

override def examples: List[String] = List("level,ERROR,:eq,(,message,)")
}

case object SampleWord extends SimpleWord {

import ModelExtractors.*

override def name: String = "sample"

override protected def matcher: PartialFunction[List[Any], Boolean] = {
case StringListType(_) :: StringListType(_) :: (_: Query) :: _ => true
}

override protected def executor: PartialFunction[List[Any], List[Any]] = {
case StringListType(pks) :: StringListType(by) :: (q: Query) :: stack =>
EventExpr.Sample(q, by, pks) :: stack
}

override def signature: String = "q:Query sampleBy:List projectionKeys:List -- EventExpr"

override def summary: String =
"""
|Find matching events and sample based on a set of keys. The output will be a count
|for the step interval along with some sample data for that group based on the projection
|keys.
|""".stripMargin

override def examples: List[String] = List("level,ERROR,:eq,(,fingerprint,),(,message,)")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,11 @@ class EventVocabularySuite extends FunSuite {
assertEquals(table.columns, List("a", "b"))
assertEquals(table.query, Query.Equal("name", "sps"))
}

test("sample, empty set of sampleBy") {
val e = intercept[IllegalArgumentException] {
parse("name,sps,:eq,(,),(,message,),:sample")
}
assert(e.getMessage.contains("sampleBy cannot be empty"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,18 @@ import com.netflix.spectator.api.Registry
* Tags associated with the datapoint.
* @param value
* Value for the datapoint.
* @param samples
* Optional set of event samples associated with the message. Typically used when
* mapping events into a count with a few sample messages.
*/
case class AggrDatapoint(
timestamp: Long,
step: Long,
expr: DataExpr,
source: String,
tags: Map[String, String],
value: Double
value: Double,
samples: List[List[Any]] = Nil
) {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.netflix.atlas.eval.model

import com.fasterxml.jackson.core.JsonGenerator
import com.netflix.atlas.json.Json
import com.netflix.atlas.json.JsonSupport

/**
Expand All @@ -31,9 +32,17 @@ import com.netflix.atlas.json.JsonSupport
* Tags associated with the datapoint.
* @param value
* Value for the datapoint.
* @param samples
* Optional set of event samples associated with the message. Typically used when
* mapping events into a count with a few sample messages.
*/
case class LwcDatapoint(timestamp: Long, id: String, tags: Map[String, String], value: Double)
extends JsonSupport {
case class LwcDatapoint(
timestamp: Long,
id: String,
tags: Map[String, String],
value: Double,
samples: List[List[Any]] = Nil
) extends JsonSupport {

val `type`: String = "datapoint"

Expand All @@ -48,6 +57,10 @@ case class LwcDatapoint(timestamp: Long, id: String, tags: Map[String, String],
tags.foreachEntry(gen.writeStringField)
gen.writeEndObject()
gen.writeNumberField("value", value)
if (samples.nonEmpty) {
gen.writeFieldName("samples")
Json.encode(gen, samples)
}
gen.writeEndObject()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ object LwcMessages {
var id: String = null
var tags: Map[String, String] = Map.empty
var value: Double = Double.NaN
var samples: List[List[Any]] = Nil

// LwcEvent
var payload: JsonNode = NullNode.instance
Expand Down Expand Up @@ -110,6 +111,7 @@ object LwcMessages {
case "id" => id = nextString(parser)
case "tags" => tags = parseTags(parser)
case "value" => value = nextDouble(parser)
case "samples" => samples = parseSamples(parser)

case "payload" => payload = nextTree(parser)

Expand All @@ -127,7 +129,7 @@ object LwcMessages {
case "expression" => LwcExpression(expression, exprType, step)
case "subscription" => LwcSubscription(expression, subExprs)
case "subscription-v2" => LwcSubscriptionV2(expression, exprType, subExprs)
case "datapoint" => LwcDatapoint(timestamp, id, tags, value)
case "datapoint" => LwcDatapoint(timestamp, id, tags, value, samples)
case "event" => LwcEvent(id, payload)
case "diagnostic" => LwcDiagnosticMessage(id, diagnosticMessage)
case "heartbeat" => LwcHeartbeat(timestamp, step)
Expand Down Expand Up @@ -162,6 +164,20 @@ object LwcMessages {
builder.result()
}

private def parseSamples(parser: JsonParser): List[List[Any]] = {
val samples = List.newBuilder[List[Any]]
foreachItem(parser) {
val row = List.newBuilder[Any]
var t = parser.nextToken()
while (t != null && t != JsonToken.END_ARRAY) {
row += parser.readValueAsTree[JsonNode]()
t = parser.nextToken()
}
samples += row.result()
}
samples.result()
}

private def parseDiagnosticMessage(parser: JsonParser): DiagnosticMessage = {
var typeDesc: String = null
var message: String = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import com.netflix.atlas.chart.model.LineStyle
import com.netflix.atlas.chart.model.Palette
import com.netflix.atlas.core.model.*
import com.netflix.atlas.core.util.Strings
import com.netflix.atlas.json.Json
import com.netflix.atlas.json.JsonSupport

import java.awt.Color
Expand Down Expand Up @@ -57,6 +58,9 @@ import java.time.Duration
* Data for the time series.
* @param styleMetadata
* Metadata for presentation details related to how to render the line.
* @param samples
* Optional set of event samples associated with the message. Typically used when
* mapping events into a count with a few sample messages.
*/
case class TimeSeriesMessage(
id: String,
Expand All @@ -68,7 +72,8 @@ case class TimeSeriesMessage(
label: String,
tags: Map[String, String],
data: ChunkData,
styleMetadata: Option[LineStyleMetadata]
styleMetadata: Option[LineStyleMetadata],
samples: List[List[Any]]
) extends JsonSupport {

override def hasCustomEncoding: Boolean = true
Expand Down Expand Up @@ -96,6 +101,10 @@ case class TimeSeriesMessage(
gen.writeNumberField("step", step)
gen.writeFieldName("data")
data.encode(gen)
if (samples.nonEmpty) {
gen.writeFieldName("samples")
Json.encode(gen, samples)
}
gen.writeEndObject()
}

Expand Down Expand Up @@ -150,7 +159,8 @@ object TimeSeriesMessage {
ts.label,
outputTags,
ArrayData(data.data),
palette.map(p => createStyleMetadata(expr, ts.label, p))
palette.map(p => createStyleMetadata(expr, ts.label, p)),
Nil
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,14 @@ class ExprInterpreter(config: Config) {
exprType -> exprs.distinct
}

/** Parse sampled event expression URI. */
def parseSampleExpr(uri: Uri): List[EventExpr.Sample] = {
determineExprType(uri) match {
case ExprType.EVENTS => evalEvents(uri).collect { case s: EventExpr.Sample => s }
case _ => Nil
}
}

def dataExprs(uri: Uri): List[String] = {
val exprs = determineExprType(uri) match {
case ExprType.TIME_SERIES => eval(uri).exprs.flatMap(_.expr.dataExprs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ import com.netflix.atlas.core.model.DataExpr
import com.netflix.atlas.core.model.EvalContext
import com.netflix.atlas.core.model.StatefulExpr
import com.netflix.atlas.core.model.StyleExpr
import com.netflix.atlas.core.model.TaggedItem
import com.netflix.atlas.core.model.TimeSeries
import com.netflix.atlas.core.util.IdentityMap
import com.netflix.atlas.eval.model.AggrDatapoint
import com.netflix.atlas.eval.model.ArrayData
import com.netflix.atlas.eval.model.ChunkData
import com.netflix.atlas.eval.model.TimeGroup
Expand Down Expand Up @@ -76,6 +78,9 @@ private[stream] class FinalExprEval(exprInterpreter: ExprInterpreter, enableNoDa
// sources message that arrives and should be consistent for the life of this stage
private var step = -1L

// Sampled event expressions
private var sampledEventRecipients = Map.empty[DataExpr, List[ExprInfo]]

// Each expression matched with a list of data source ids that should receive
// the data for it
private var recipients = List.empty[(StyleExpr, List[ExprInfo])]
Expand All @@ -101,6 +106,16 @@ private[stream] class FinalExprEval(exprInterpreter: ExprInterpreter, enableNoDa
// Get set of expressions before we update the list
val previous = recipients.map(t => t._1 -> t._1).toMap

// Compute set of sampled event expressions
sampledEventRecipients = sources
.flatMap { s =>
exprInterpreter.parseSampleExpr(Uri(s.uri)).map { expr =>
expr.dataExpr -> ExprInfo(s.id, None)
}
}
.groupBy(_._1)
.map(t => t._1 -> t._2.map(_._2))

// Error messages for invalid expressions
val errors = List.newBuilder[MessageEnvelope]

Expand Down Expand Up @@ -185,6 +200,23 @@ private[stream] class FinalExprEval(exprInterpreter: ExprInterpreter, enableNoDa
val timestamp = group.timestamp
val groupedDatapoints = group.dataExprValues

// Messages for sampled events that look similar to time series
val sampledEventMessages = groupedDatapoints.flatMap {
case (expr, vs) =>
sampledEventRecipients
.get(expr)
.fold(List.empty[MessageEnvelope]) { infos =>
val ts = vs.values.map(toTimeSeriesMessage)
ts.flatMap { msg =>
infos.map { info =>
new MessageEnvelope(info.id, msg)
}
}
}
}.toList

// Data for each time series data expression or no-data line if there is no data for
// the interval
val dataExprToDatapoints = noData ++ groupedDatapoints.map {
case (k, vs) =>
k -> vs.values.map(_.toTimeSeries)
Expand All @@ -194,12 +226,12 @@ private[stream] class FinalExprEval(exprInterpreter: ExprInterpreter, enableNoDa
val rateCollector = new EvalDataRateCollector(timestamp, step)
dataSourceIdToDataExprs.foreach {
case (id, dataExprSet) =>
dataExprSet.foreach(dataExpr => {
dataExprSet.foreach { dataExpr =>
group.dataExprValues.get(dataExpr).foreach { info =>
rateCollector.incrementInput(id, dataExpr, info.numRawDatapoints)
rateCollector.incrementIntermediate(id, dataExpr, info.values.size)
}
})
}
}

// Generate the time series and diagnostic output
Expand Down Expand Up @@ -249,7 +281,7 @@ private[stream] class FinalExprEval(exprInterpreter: ExprInterpreter, enableNoDa
case (id, rate) => new MessageEnvelope(id, rate)
}.toList

output ++ rateMessages
sampledEventMessages ++ output ++ rateMessages
}

private def hasFiniteValue(value: AnyRef): Boolean = {
Expand All @@ -266,6 +298,23 @@ private[stream] class FinalExprEval(exprInterpreter: ExprInterpreter, enableNoDa
}
}

private def toTimeSeriesMessage(value: AggrDatapoint): TimeSeriesMessage = {
val id = TaggedItem.computeId(value.tags + ("atlas.query" -> value.source)).toString
TimeSeriesMessage(
id,
value.source,
value.expr.finalGrouping,
value.timestamp,
value.timestamp + step,
step,
TimeSeries.toLabel(value.tags),
value.tags,
ArrayData(Array(value.value)),
None,
value.samples
)
}

private def handleSingleGroup(g: TimeGroup): Unit = {
push(out, Source(handleData(g)))
}
Expand Down
Loading

0 comments on commit abcc280

Please sign in to comment.