From abcc28062c8383c4326191c452ce127da0c13e0b Mon Sep 17 00:00:00 2001 From: brharrington Date: Wed, 20 Nov 2024 11:34:00 -0600 Subject: [PATCH] add event sample operator (#1727) This operator allows events to be sampled based on a set of tag keys. It works similar to an analytics group by, but will capture some projection fields for a sample event to provide more context to the consumer. This change also adds a max groups limit for the LWC events client. The limit is needed to help avoid excessive memory usage if the sampling keys have a high cardinality. --- .../netflix/atlas/core/model/EventExpr.scala | 25 ++++ .../atlas/core/model/EventVocabulary.scala | 29 ++++- .../core/model/EventVocabularySuite.scala | 7 + .../atlas/eval/model/AggrDatapoint.scala | 6 +- .../atlas/eval/model/LwcDatapoint.scala | 17 ++- .../atlas/eval/model/LwcMessages.scala | 18 ++- .../atlas/eval/model/TimeSeriesMessage.scala | 14 +- .../atlas/eval/stream/ExprInterpreter.scala | 8 ++ .../atlas/eval/stream/FinalExprEval.scala | 55 +++++++- .../eval/stream/LwcToAggrDatapoint.scala | 38 +++++- .../eval/stream/SyntheticDataSource.scala | 44 ++++++- .../atlas/eval/model/AggrDatapointSuite.scala | 32 +++++ .../atlas/eval/model/LwcMessagesSuite.scala | 26 ++++ .../eval/model/TimeSeriesMessageSuite.scala | 3 +- .../atlas/eval/stream/EvaluatorSuite.scala | 24 ++++ .../src/main/resources/reference.conf | 4 + .../lwc/events/AbstractLwcEventClient.scala | 28 +++- .../atlas/lwc/events/DatapointConverter.scala | 48 +++++-- .../atlas/lwc/events/DatapointEvent.scala | 5 +- .../lwc/events/DatapointConverterSuite.scala | 121 +++++++++++++++--- .../lwc/events/LwcEventClientSuite.scala | 22 +++- .../atlas/lwcapi/SubscribeApiSuite.scala | 38 ++++++ 22 files changed, 555 insertions(+), 57 deletions(-) diff --git a/atlas-core/src/main/scala/com/netflix/atlas/core/model/EventExpr.scala b/atlas-core/src/main/scala/com/netflix/atlas/core/model/EventExpr.scala index 852b87858..22264c411 100644 --- a/atlas-core/src/main/scala/com/netflix/atlas/core/model/EventExpr.scala +++ b/atlas-core/src/main/scala/com/netflix/atlas/core/model/EventExpr.scala @@ -55,4 +55,29 @@ object EventExpr { Interpreter.append(builder, query, columns, ":table") } } + + /** + * Expression that specifies how to map an event to a simple row with the specified columns. + * + * @param query + * Query to determine if an event should be matched. + * @param sampleBy + * The set of tag values to extract for purposes of the sampling groups. A value will be + * sent for each distinct sample group. + * @param projectionKeys + * Set of columns to export into a row. + */ + case class Sample(query: Query, sampleBy: List[String], projectionKeys: List[String]) + extends EventExpr { + + require(sampleBy.nonEmpty, "sampleBy cannot be empty") + + override def append(builder: java.lang.StringBuilder): Unit = { + Interpreter.append(builder, query, sampleBy, projectionKeys, ":sample") + } + + def dataExpr: DataExpr = { + DataExpr.GroupBy(DataExpr.Sum(query), sampleBy) + } + } } diff --git a/atlas-core/src/main/scala/com/netflix/atlas/core/model/EventVocabulary.scala b/atlas-core/src/main/scala/com/netflix/atlas/core/model/EventVocabulary.scala index a180b8253..5f3b6205c 100644 --- a/atlas-core/src/main/scala/com/netflix/atlas/core/model/EventVocabulary.scala +++ b/atlas-core/src/main/scala/com/netflix/atlas/core/model/EventVocabulary.scala @@ -25,7 +25,7 @@ object EventVocabulary extends Vocabulary { val dependsOn: List[Vocabulary] = List(QueryVocabulary) - override def words: List[Word] = List(TableWord) + override def words: List[Word] = List(SampleWord, TableWord) case object TableWord extends SimpleWord { @@ -50,4 +50,31 @@ object EventVocabulary extends Vocabulary { override def examples: List[String] = List("level,ERROR,:eq,(,message,)") } + + case object SampleWord extends SimpleWord { + + import ModelExtractors.* + + override def name: String = "sample" + + override protected def matcher: PartialFunction[List[Any], Boolean] = { + case StringListType(_) :: StringListType(_) :: (_: Query) :: _ => true + } + + override protected def executor: PartialFunction[List[Any], List[Any]] = { + case StringListType(pks) :: StringListType(by) :: (q: Query) :: stack => + EventExpr.Sample(q, by, pks) :: stack + } + + override def signature: String = "q:Query sampleBy:List projectionKeys:List -- EventExpr" + + override def summary: String = + """ + |Find matching events and sample based on a set of keys. The output will be a count + |for the step interval along with some sample data for that group based on the projection + |keys. + |""".stripMargin + + override def examples: List[String] = List("level,ERROR,:eq,(,fingerprint,),(,message,)") + } } diff --git a/atlas-core/src/test/scala/com/netflix/atlas/core/model/EventVocabularySuite.scala b/atlas-core/src/test/scala/com/netflix/atlas/core/model/EventVocabularySuite.scala index 4d31ea8d7..6f400f83e 100644 --- a/atlas-core/src/test/scala/com/netflix/atlas/core/model/EventVocabularySuite.scala +++ b/atlas-core/src/test/scala/com/netflix/atlas/core/model/EventVocabularySuite.scala @@ -43,4 +43,11 @@ class EventVocabularySuite extends FunSuite { assertEquals(table.columns, List("a", "b")) assertEquals(table.query, Query.Equal("name", "sps")) } + + test("sample, empty set of sampleBy") { + val e = intercept[IllegalArgumentException] { + parse("name,sps,:eq,(,),(,message,),:sample") + } + assert(e.getMessage.contains("sampleBy cannot be empty")) + } } diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/AggrDatapoint.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/AggrDatapoint.scala index a007ef088..c879c0771 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/AggrDatapoint.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/AggrDatapoint.scala @@ -49,6 +49,9 @@ import com.netflix.spectator.api.Registry * Tags associated with the datapoint. * @param value * Value for the datapoint. + * @param samples + * Optional set of event samples associated with the message. Typically used when + * mapping events into a count with a few sample messages. */ case class AggrDatapoint( timestamp: Long, @@ -56,7 +59,8 @@ case class AggrDatapoint( expr: DataExpr, source: String, tags: Map[String, String], - value: Double + value: Double, + samples: List[List[Any]] = Nil ) { /** diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/LwcDatapoint.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/LwcDatapoint.scala index ed043eb73..fc35972e0 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/LwcDatapoint.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/LwcDatapoint.scala @@ -16,6 +16,7 @@ package com.netflix.atlas.eval.model import com.fasterxml.jackson.core.JsonGenerator +import com.netflix.atlas.json.Json import com.netflix.atlas.json.JsonSupport /** @@ -31,9 +32,17 @@ import com.netflix.atlas.json.JsonSupport * Tags associated with the datapoint. * @param value * Value for the datapoint. + * @param samples + * Optional set of event samples associated with the message. Typically used when + * mapping events into a count with a few sample messages. */ -case class LwcDatapoint(timestamp: Long, id: String, tags: Map[String, String], value: Double) - extends JsonSupport { +case class LwcDatapoint( + timestamp: Long, + id: String, + tags: Map[String, String], + value: Double, + samples: List[List[Any]] = Nil +) extends JsonSupport { val `type`: String = "datapoint" @@ -48,6 +57,10 @@ case class LwcDatapoint(timestamp: Long, id: String, tags: Map[String, String], tags.foreachEntry(gen.writeStringField) gen.writeEndObject() gen.writeNumberField("value", value) + if (samples.nonEmpty) { + gen.writeFieldName("samples") + Json.encode(gen, samples) + } gen.writeEndObject() } } diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/LwcMessages.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/LwcMessages.scala index 4ad63ceb2..36f86da78 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/LwcMessages.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/LwcMessages.scala @@ -79,6 +79,7 @@ object LwcMessages { var id: String = null var tags: Map[String, String] = Map.empty var value: Double = Double.NaN + var samples: List[List[Any]] = Nil // LwcEvent var payload: JsonNode = NullNode.instance @@ -110,6 +111,7 @@ object LwcMessages { case "id" => id = nextString(parser) case "tags" => tags = parseTags(parser) case "value" => value = nextDouble(parser) + case "samples" => samples = parseSamples(parser) case "payload" => payload = nextTree(parser) @@ -127,7 +129,7 @@ object LwcMessages { case "expression" => LwcExpression(expression, exprType, step) case "subscription" => LwcSubscription(expression, subExprs) case "subscription-v2" => LwcSubscriptionV2(expression, exprType, subExprs) - case "datapoint" => LwcDatapoint(timestamp, id, tags, value) + case "datapoint" => LwcDatapoint(timestamp, id, tags, value, samples) case "event" => LwcEvent(id, payload) case "diagnostic" => LwcDiagnosticMessage(id, diagnosticMessage) case "heartbeat" => LwcHeartbeat(timestamp, step) @@ -162,6 +164,20 @@ object LwcMessages { builder.result() } + private def parseSamples(parser: JsonParser): List[List[Any]] = { + val samples = List.newBuilder[List[Any]] + foreachItem(parser) { + val row = List.newBuilder[Any] + var t = parser.nextToken() + while (t != null && t != JsonToken.END_ARRAY) { + row += parser.readValueAsTree[JsonNode]() + t = parser.nextToken() + } + samples += row.result() + } + samples.result() + } + private def parseDiagnosticMessage(parser: JsonParser): DiagnosticMessage = { var typeDesc: String = null var message: String = null diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/TimeSeriesMessage.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/TimeSeriesMessage.scala index dfc31f23f..b2e514e55 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/TimeSeriesMessage.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/TimeSeriesMessage.scala @@ -20,6 +20,7 @@ import com.netflix.atlas.chart.model.LineStyle import com.netflix.atlas.chart.model.Palette import com.netflix.atlas.core.model.* import com.netflix.atlas.core.util.Strings +import com.netflix.atlas.json.Json import com.netflix.atlas.json.JsonSupport import java.awt.Color @@ -57,6 +58,9 @@ import java.time.Duration * Data for the time series. * @param styleMetadata * Metadata for presentation details related to how to render the line. + * @param samples + * Optional set of event samples associated with the message. Typically used when + * mapping events into a count with a few sample messages. */ case class TimeSeriesMessage( id: String, @@ -68,7 +72,8 @@ case class TimeSeriesMessage( label: String, tags: Map[String, String], data: ChunkData, - styleMetadata: Option[LineStyleMetadata] + styleMetadata: Option[LineStyleMetadata], + samples: List[List[Any]] ) extends JsonSupport { override def hasCustomEncoding: Boolean = true @@ -96,6 +101,10 @@ case class TimeSeriesMessage( gen.writeNumberField("step", step) gen.writeFieldName("data") data.encode(gen) + if (samples.nonEmpty) { + gen.writeFieldName("samples") + Json.encode(gen, samples) + } gen.writeEndObject() } @@ -150,7 +159,8 @@ object TimeSeriesMessage { ts.label, outputTags, ArrayData(data.data), - palette.map(p => createStyleMetadata(expr, ts.label, p)) + palette.map(p => createStyleMetadata(expr, ts.label, p)), + Nil ) } diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/ExprInterpreter.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/ExprInterpreter.scala index 38eba61f9..359c39bd7 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/ExprInterpreter.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/ExprInterpreter.scala @@ -221,6 +221,14 @@ class ExprInterpreter(config: Config) { exprType -> exprs.distinct } + /** Parse sampled event expression URI. */ + def parseSampleExpr(uri: Uri): List[EventExpr.Sample] = { + determineExprType(uri) match { + case ExprType.EVENTS => evalEvents(uri).collect { case s: EventExpr.Sample => s } + case _ => Nil + } + } + def dataExprs(uri: Uri): List[String] = { val exprs = determineExprType(uri) match { case ExprType.TIME_SERIES => eval(uri).exprs.flatMap(_.expr.dataExprs) diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/FinalExprEval.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/FinalExprEval.scala index d717af369..5c098072e 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/FinalExprEval.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/FinalExprEval.scala @@ -30,8 +30,10 @@ import com.netflix.atlas.core.model.DataExpr import com.netflix.atlas.core.model.EvalContext import com.netflix.atlas.core.model.StatefulExpr import com.netflix.atlas.core.model.StyleExpr +import com.netflix.atlas.core.model.TaggedItem import com.netflix.atlas.core.model.TimeSeries import com.netflix.atlas.core.util.IdentityMap +import com.netflix.atlas.eval.model.AggrDatapoint import com.netflix.atlas.eval.model.ArrayData import com.netflix.atlas.eval.model.ChunkData import com.netflix.atlas.eval.model.TimeGroup @@ -76,6 +78,9 @@ private[stream] class FinalExprEval(exprInterpreter: ExprInterpreter, enableNoDa // sources message that arrives and should be consistent for the life of this stage private var step = -1L + // Sampled event expressions + private var sampledEventRecipients = Map.empty[DataExpr, List[ExprInfo]] + // Each expression matched with a list of data source ids that should receive // the data for it private var recipients = List.empty[(StyleExpr, List[ExprInfo])] @@ -101,6 +106,16 @@ private[stream] class FinalExprEval(exprInterpreter: ExprInterpreter, enableNoDa // Get set of expressions before we update the list val previous = recipients.map(t => t._1 -> t._1).toMap + // Compute set of sampled event expressions + sampledEventRecipients = sources + .flatMap { s => + exprInterpreter.parseSampleExpr(Uri(s.uri)).map { expr => + expr.dataExpr -> ExprInfo(s.id, None) + } + } + .groupBy(_._1) + .map(t => t._1 -> t._2.map(_._2)) + // Error messages for invalid expressions val errors = List.newBuilder[MessageEnvelope] @@ -185,6 +200,23 @@ private[stream] class FinalExprEval(exprInterpreter: ExprInterpreter, enableNoDa val timestamp = group.timestamp val groupedDatapoints = group.dataExprValues + // Messages for sampled events that look similar to time series + val sampledEventMessages = groupedDatapoints.flatMap { + case (expr, vs) => + sampledEventRecipients + .get(expr) + .fold(List.empty[MessageEnvelope]) { infos => + val ts = vs.values.map(toTimeSeriesMessage) + ts.flatMap { msg => + infos.map { info => + new MessageEnvelope(info.id, msg) + } + } + } + }.toList + + // Data for each time series data expression or no-data line if there is no data for + // the interval val dataExprToDatapoints = noData ++ groupedDatapoints.map { case (k, vs) => k -> vs.values.map(_.toTimeSeries) @@ -194,12 +226,12 @@ private[stream] class FinalExprEval(exprInterpreter: ExprInterpreter, enableNoDa val rateCollector = new EvalDataRateCollector(timestamp, step) dataSourceIdToDataExprs.foreach { case (id, dataExprSet) => - dataExprSet.foreach(dataExpr => { + dataExprSet.foreach { dataExpr => group.dataExprValues.get(dataExpr).foreach { info => rateCollector.incrementInput(id, dataExpr, info.numRawDatapoints) rateCollector.incrementIntermediate(id, dataExpr, info.values.size) } - }) + } } // Generate the time series and diagnostic output @@ -249,7 +281,7 @@ private[stream] class FinalExprEval(exprInterpreter: ExprInterpreter, enableNoDa case (id, rate) => new MessageEnvelope(id, rate) }.toList - output ++ rateMessages + sampledEventMessages ++ output ++ rateMessages } private def hasFiniteValue(value: AnyRef): Boolean = { @@ -266,6 +298,23 @@ private[stream] class FinalExprEval(exprInterpreter: ExprInterpreter, enableNoDa } } + private def toTimeSeriesMessage(value: AggrDatapoint): TimeSeriesMessage = { + val id = TaggedItem.computeId(value.tags + ("atlas.query" -> value.source)).toString + TimeSeriesMessage( + id, + value.source, + value.expr.finalGrouping, + value.timestamp, + value.timestamp + step, + step, + TimeSeries.toLabel(value.tags), + value.tags, + ArrayData(Array(value.value)), + None, + value.samples + ) + } + private def handleSingleGroup(g: TimeGroup): Unit = { push(out, Source(handleData(g))) } diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/LwcToAggrDatapoint.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/LwcToAggrDatapoint.scala index 25a86ab73..29fe24595 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/LwcToAggrDatapoint.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/LwcToAggrDatapoint.scala @@ -17,6 +17,8 @@ package com.netflix.atlas.eval.stream import com.netflix.atlas.core.model.DataExpr import com.netflix.atlas.core.model.DataVocabulary +import com.netflix.atlas.core.model.EventExpr +import com.netflix.atlas.core.model.EventVocabulary import com.netflix.atlas.core.model.ModelExtractors import com.netflix.atlas.core.model.TraceVocabulary import com.netflix.atlas.core.stacklang.Interpreter @@ -38,6 +40,7 @@ import com.netflix.atlas.eval.model.LwcSubscription import com.netflix.atlas.eval.model.LwcSubscriptionV2 import com.netflix.atlas.eval.model.DatapointsTuple import com.netflix.atlas.eval.model.ExprType +import com.netflix.atlas.eval.model.LwcDataExpr /** * Process the SSE output from an LWC service and convert it into a stream of @@ -92,22 +95,39 @@ private[stream] class LwcToAggrDatapoint(context: StreamContext) private def updateStateV2(sub: LwcSubscriptionV2): Unit = { sub.subExprs.foreach { s => - if (sub.exprType.isTimeSeriesType && !tsState.contains(s.id)) { - val expr = parseExpr(s.expression, sub.exprType) - tsState.put(s.id, DatapointMetadata(s.expression, expr, s.step)) - } - if (sub.exprType.isEventType && !eventState.contains(s.id)) { + if (isTimeSeries(sub, s)) { + if (!tsState.contains(s.id)) { + val expr = parseExpr(s.expression, sub.exprType) + tsState.put(s.id, DatapointMetadata(s.expression, expr, s.step)) + } + } else if (sub.exprType.isEventType && !eventState.contains(s.id)) { eventState.put(s.id, s.expression) } } } + private def isTimeSeries(sub: LwcSubscriptionV2, s: LwcDataExpr): Boolean = { + // The sample type behaves similar to a time series for most processing, but maintains + // some sample events during aggregation. + sub.exprType.isTimeSeriesType || s.expression.contains(",:sample") + } + private def pushDatapoint(dp: LwcDatapoint): Option[AggrDatapoint] = { tsState.get(dp.id) match { case Some(sub) => val expr = sub.dataExpr val step = sub.step - Some(AggrDatapoint(dp.timestamp, step, expr, "datapoint", dp.tags, dp.value)) + Some( + AggrDatapoint( + dp.timestamp, + step, + expr, + sub.dataExprStr, + dp.tags, + dp.value, + dp.samples + ) + ) case None => unknown.increment() None @@ -158,6 +178,7 @@ private[stream] object LwcToAggrDatapoint { private val dataInterpreter = Interpreter(DataVocabulary.allWords) private val traceInterpreter = Interpreter(TraceVocabulary.allWords) + private val eventInterpreter = Interpreter(EventVocabulary.allWords) private def parseExpr(input: String, exprType: ExprType = ExprType.TIME_SERIES): DataExpr = { exprType match { @@ -171,6 +192,11 @@ private[stream] object LwcToAggrDatapoint { case ModelExtractors.TraceTimeSeriesType(tq) :: Nil => tq.expr.expr.dataExprs.head case _ => throw new IllegalArgumentException(s"invalid expr: $input") } + case ExprType.EVENTS => + eventInterpreter.execute(input).stack match { + case ModelExtractors.EventExprType(expr: EventExpr.Sample) :: Nil => expr.dataExpr + case _ => throw new IllegalArgumentException(s"invalid expr: $input") + } case _ => throw new IllegalArgumentException(s"unsupported expression type: $exprType") } diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/SyntheticDataSource.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/SyntheticDataSource.scala index 3c05b5689..cb4692788 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/SyntheticDataSource.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/SyntheticDataSource.scala @@ -156,22 +156,47 @@ object SyntheticDataSource { } private def source(settings: Settings, expr: EventExpr): Source[ByteString, NotUsed] = { - val id = computeId(ExprType.EVENTS, expr, 0L) - val dataExpr = LwcDataExpr(id, expr.toString, 0L) + val id = computeId(ExprType.EVENTS, expr, settings.step) + val dataExpr = LwcDataExpr(id, expr.toString, settings.step) val subMessage = LwcSubscriptionV2(expr.toString, ExprType.EVENTS, List(dataExpr)) + val start = System.currentTimeMillis() / settings.step * settings.step val exprSource = Source(0 until settings.numStepIntervals) .throttle(1, FiniteDuration(settings.step, TimeUnit.MILLISECONDS)) .flatMapConcat { i => Source(0 until settings.inputDataSize) - .map { j => + .flatMap { j => + val tags = Query.tags(expr.query) val data = Map( - "tags" -> Query.tags(expr.query), + "tags" -> tags, "i" -> i, "j" -> j ) - val json = Json.decode[JsonNode](Json.encode(data)) - LwcEvent(id, json) + expr match { + case EventExpr.Raw(_) => + val json = Json.decode[JsonNode](Json.encode(data)) + Source.single(LwcEvent(id, json)) + case EventExpr.Table(_, cs) => + val columns = cs.map(c => data.getOrElse(c, null)) + val json = Json.decode[JsonNode](Json.encode(columns)) + Source.single(LwcEvent(id, json)) + case EventExpr.Sample(_, by, cs) => + val groupKey = groupByKey(data, by) + if (groupKey == null) { + Source.empty + } else { + val timestamp = i * settings.step + start + val columns = cs.map(c => data.getOrElse(c, null)) + val datapoint = LwcDatapoint( + timestamp, + id, + tags ++ by.map(k => k -> data(k).toString), + 1.0, + List(columns) + ) + Source.single(datapoint) + } + } } } @@ -181,6 +206,13 @@ object SyntheticDataSource { .map(msg => ByteString(Json.encode(msg))) } + private def groupByKey(data: Map[String, Any], by: List[String]): String = { + if (by.forall(data.contains)) + by.map(k => data.getOrElse(k, null)).mkString(",") + else + null + } + private def computeId(exprType: ExprType, expr: Expr, step: Long): String = { val key = s"$exprType:$expr:$step" java.lang.Long.toString(new Hash64().updateString(key).compute(), 16) diff --git a/atlas-eval/src/test/scala/com/netflix/atlas/eval/model/AggrDatapointSuite.scala b/atlas-eval/src/test/scala/com/netflix/atlas/eval/model/AggrDatapointSuite.scala index bb77c8ed6..34d876bec 100644 --- a/atlas-eval/src/test/scala/com/netflix/atlas/eval/model/AggrDatapointSuite.scala +++ b/atlas-eval/src/test/scala/com/netflix/atlas/eval/model/AggrDatapointSuite.scala @@ -185,4 +185,36 @@ class AggrDatapointSuite extends FunSuite { assert(aggregator.get.limitExceeded) } + + test("aggregate with samples") { + val expr = DataExpr.Sum(Query.True) + val dataset = createDatapoints(expr, 0, 10).map { dp => + dp.copy(samples = List(List(dp.value))) + } + val aggregator = + AggrDatapoint.aggregate(dataset, settings(Integer.MAX_VALUE, Integer.MAX_VALUE)) + val result = aggregator.get.datapoints + + assertEquals(result.size, 1) + assertEquals(result.head.timestamp, 0L) + assertEquals(result.head.tags, Map("name" -> "cpu")) + assertEquals(result.head.value, 45.0) + assertEquals(result.head.samples, List(List(0.0))) + } + + test("aggregate group by with samples") { + val expr = DataExpr.GroupBy(DataExpr.Sum(Query.True), List("name", "node")) + val dataset = createDatapoints(expr, 0, 10).map { dp => + dp.copy(samples = List(List(dp.value))) + } + val aggregator = + AggrDatapoint.aggregate(dataset, settings(Integer.MAX_VALUE, Integer.MAX_VALUE)) + val result = aggregator.get.datapoints.sortWith(_.tags("node") > _.tags("node")) + + assertEquals(result.size, 10) + assertEquals(result.head.timestamp, 0L) + assertEquals(result.head.tags, Map("name" -> "cpu", "node" -> "i-00000009")) + assertEquals(result.head.value, 9.0) + assertEquals(result.head.samples, List(List(9.0))) + } } diff --git a/atlas-eval/src/test/scala/com/netflix/atlas/eval/model/LwcMessagesSuite.scala b/atlas-eval/src/test/scala/com/netflix/atlas/eval/model/LwcMessagesSuite.scala index 657e9b748..7df03abe2 100644 --- a/atlas-eval/src/test/scala/com/netflix/atlas/eval/model/LwcMessagesSuite.scala +++ b/atlas-eval/src/test/scala/com/netflix/atlas/eval/model/LwcMessagesSuite.scala @@ -104,6 +104,32 @@ class LwcMessagesSuite extends FunSuite { assertEquals(actual, expected) } + private def checkSamples(samples: List[List[Any]]): Unit = { + val tags = Map("foo" -> "bar") + val input = LwcDatapoint(step, "a", tags, 42.0, samples) + val actual = LwcMessages.parse(Json.encode(input)) + val expected = input.copy(samples = Json.decode[List[List[JsonNode]]](Json.encode(samples))) + assertEquals(actual, expected) + } + + test("datapoint, with samples empty") { + checkSamples(Nil) + } + + test("datapoint, with samples empty rows") { + checkSamples(List(Nil, Nil, Nil)) + } + + test("datapoint, with samples") { + val tags = Map("foo" -> "bar") + checkSamples(List(List("a", tags))) + } + + test("datapoint, with samples uneven") { + val tags = Map("foo" -> "bar") + checkSamples(List(List("a", tags), Nil, List("b", tags))) + } + test("event") { val payload = Json.decode[JsonNode]("""{"foo":"bar"}""") val expected = LwcEvent("123", payload) diff --git a/atlas-eval/src/test/scala/com/netflix/atlas/eval/model/TimeSeriesMessageSuite.scala b/atlas-eval/src/test/scala/com/netflix/atlas/eval/model/TimeSeriesMessageSuite.scala index b2a5c6e68..55cbf6905 100644 --- a/atlas-eval/src/test/scala/com/netflix/atlas/eval/model/TimeSeriesMessageSuite.scala +++ b/atlas-eval/src/test/scala/com/netflix/atlas/eval/model/TimeSeriesMessageSuite.scala @@ -29,7 +29,8 @@ class TimeSeriesMessageSuite extends FunSuite { label = "test", tags = Map("name" -> "sps", "cluster" -> "www"), data = ArrayData(Array(42.0)), - None + None, + Nil ) test("json encoding with empty group by") { diff --git a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/EvaluatorSuite.scala b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/EvaluatorSuite.scala index 24b857a01..b300a8118 100644 --- a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/EvaluatorSuite.scala +++ b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/EvaluatorSuite.scala @@ -927,6 +927,30 @@ class EvaluatorSuite extends FunSuite { assertEquals(result.size, 5000) } + test("publisher, events table") { + val evaluator = new Evaluator(config, registry, system) + val uri = + "synthetic://test/events?q=name,cpu,:eq,nf.app,foo,:eq,:and,(,j,),:table&step=1ms&numStepIntervals=5" + val future = Source.fromPublisher(evaluator.createPublisher(uri)).runWith(Sink.seq) + val result = Await.result(future, scala.concurrent.duration.Duration.Inf) + assertEquals(result.size, 5000) + } + + test("publisher, events sample") { + val evaluator = new Evaluator(config, registry, system) + val uri = + "synthetic://test/events?q=name,cpu,:eq,nf.app,foo,:eq,:and,(,i,),(,j,),:sample&step=1ms&numStepIntervals=5" + val future = Source.fromPublisher(evaluator.createPublisher(uri)).runWith(Sink.seq) + val result = Await.result(future, scala.concurrent.duration.Duration.Inf) + assertEquals(result.size, 5) + result.foreach { + case tsm: TimeSeriesMessage => + assert(tsm.samples.nonEmpty) + case msg => + fail(s"unexpected message: $msg") + } + } + test("publisher, trace time series") { val evaluator = new Evaluator(config, registry, system) diff --git a/atlas-lwc-events/src/main/resources/reference.conf b/atlas-lwc-events/src/main/resources/reference.conf index a74ccff6c..0742b5271 100644 --- a/atlas-lwc-events/src/main/resources/reference.conf +++ b/atlas-lwc-events/src/main/resources/reference.conf @@ -22,4 +22,8 @@ atlas.lwc.events { // Max payload size for events data. Default max payload size for server is 8MiB. payload-size = 7MiB + + // Maximum size for a group by. Used to avoid OOM for group by with a high cardinality dimension. + // If exceeded new groups will be dropped. + max-groups = 10000 } \ No newline at end of file diff --git a/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/AbstractLwcEventClient.scala b/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/AbstractLwcEventClient.scala index 8738e7816..3d6875197 100644 --- a/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/AbstractLwcEventClient.scala +++ b/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/AbstractLwcEventClient.scala @@ -60,16 +60,40 @@ abstract class AbstractLwcEventClient(clock: Clock) extends LwcEventClient { val handler = expr match { case EventExpr.Raw(_) => EventHandler(sub, e => List(e)) case EventExpr.Table(_, cs) => EventHandler(sub, e => List(LwcEvent.Row(e, cs))) + case expr: EventExpr.Sample => + val converter = DatapointConverter( + sub.id, + expr.dataExpr, + clock, + sub.step, + Some(event => expr.projectionKeys.map(event.extractValue)), + submit + ) + EventHandler( + sub, + event => { + converter.update(event) + Nil + }, + Some(converter) + ) } index.add(q, handler) subHandlers.put(sub, q -> handler) + if (handler.converter.isDefined) + flushableHandlers += handler + } + diff.unchanged.events.foreach { sub => + val handlerMeta = subHandlers.get(sub) + if (handlerMeta != null && handlerMeta._2.converter.isDefined) + flushableHandlers += handlerMeta._2 } diff.removed.events.foreach(removeSubscription) // Analytics based on events diff.added.timeSeries.foreach { sub => val expr = ExprUtils.parseDataExpr(sub.expression) - val converter = DatapointConverter(sub.id, expr, clock, sub.step, submit) + val converter = DatapointConverter(sub.id, expr, clock, sub.step, None, submit) val q = ExprUtils.toSpectatorQuery(removeValueClause(expr.query)) val handler = EventHandler( sub, @@ -99,7 +123,7 @@ abstract class AbstractLwcEventClient(clock: Clock) extends LwcEventClient { diff.added.traceTimeSeries.foreach { sub => val tq = ExprUtils.parseTraceTimeSeriesQuery(sub.expression) val dataExpr = tq.expr.expr.dataExprs.head - val converter = DatapointConverter(sub.id, dataExpr, clock, sub.step, submit) + val converter = DatapointConverter(sub.id, dataExpr, clock, sub.step, None, submit) val q = ExprUtils.toSpectatorQuery(removeValueClause(dataExpr.query)) val handler = EventHandler( sub, diff --git a/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/DatapointConverter.scala b/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/DatapointConverter.scala index 02bf0cefc..483c56d8f 100644 --- a/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/DatapointConverter.scala +++ b/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/DatapointConverter.scala @@ -19,6 +19,7 @@ import com.netflix.atlas.core.model.DataExpr import com.netflix.atlas.core.model.Query import com.netflix.atlas.core.model.TagKey import com.netflix.atlas.core.util.Strings +import com.netflix.iep.config.ConfigManager import com.netflix.spectator.api.Clock import com.netflix.spectator.api.histogram.PercentileBuckets import com.netflix.spectator.impl.StepDouble @@ -47,16 +48,23 @@ private[events] trait DatapointConverter { private[events] object DatapointConverter { + /** + * Limit on the number of groups for a group by expression. If exceeded new groups + * will be dropped. + */ + private val MaxGroupBySize: Int = ConfigManager.get().getInt("atlas.lwc.events.max-groups") + def apply( id: String, expr: DataExpr, clock: Clock, step: Long, + sampleMapper: Option[LwcEvent => List[Any]], consumer: (String, LwcEvent) => Unit ): DatapointConverter = { val tags = Query.tags(expr.query) val mapper = createValueMapper(tags) - val params = Params(id, Query.tags(expr.query), clock, step, mapper, consumer) + val params = Params(id, Query.tags(expr.query), clock, step, mapper, sampleMapper, consumer) toConverter(expr, params) } @@ -140,6 +148,7 @@ private[events] object DatapointConverter { clock: Clock, step: Long, valueMapper: LwcEvent => Double, + sampleMapper: Option[LwcEvent => List[Any]], consumer: (String, LwcEvent) => Unit ) @@ -148,8 +157,14 @@ private[events] object DatapointConverter { private val buffer = new StepDouble(Double.NaN, params.clock, params.step) + private val sampleMapper: LwcEvent => List[Any] = params.sampleMapper.orNull + @volatile private var sample: List[Any] = Nil + override def update(event: LwcEvent): Unit = { update(params.valueMapper(event)) + if (sampleMapper != null && sample.isEmpty) { + sample = sampleMapper(event) + } } override def update(value: Double): Unit = { @@ -161,8 +176,13 @@ private[events] object DatapointConverter { override def flush(timestamp: Long): Unit = { val value = buffer.pollAsRate(timestamp) if (value.isFinite) { + var s = List.empty[List[Any]] + if (sampleMapper != null) { + s = List(sample) + sample = Nil + } val ts = timestamp / params.step * params.step - val event = DatapointEvent(params.id, params.tags, ts, value) + val event = DatapointEvent(params.id, params.tags, ts, value, s) params.consumer(params.id, event) } } @@ -264,6 +284,8 @@ private[events] object DatapointConverter { private val isPercentile = by.keys.contains(TagKey.percentile) + @volatile private var maxGroupsExceeded: Boolean = false + override def update(event: LwcEvent): Unit = { if (isPercentile) { val rawValue = getRawValue(event) @@ -274,11 +296,10 @@ private[events] object DatapointConverter { case k => event.tagValue(k) } .filterNot(_ == null) - update(1.0, tagValues) + getConverter(tagValues).foreach(_.update(1.0)) } else { val tagValues = by.keys.map(event.tagValue).filterNot(_ == null) - val value = params.valueMapper(event) - update(value, tagValues) + getConverter(tagValues).foreach(_.update(event)) } } @@ -286,14 +307,18 @@ private[events] object DatapointConverter { // Since there are no values for the group by tags, this is a no-op } - private def update(value: Double, tagValues: List[String]): Unit = { + private def getConverter(tagValues: List[String]): Option[DatapointConverter] = { // Ignore events that are missing dimensions used in the grouping if (by.keys.size == tagValues.size) { - if (value.isFinite) { - val tags = by.keys.zip(tagValues).toMap - val converter = groups.computeIfAbsent(tags, groupConverter) - converter.update(value) - } + val tags = by.keys.zip(tagValues).toMap + // If the max has been exceeded, allow existing groups to get updates, but do not + // create new ones. + if (maxGroupsExceeded) + Option(groups.get(tags)) + else + Some(groups.computeIfAbsent(tags, groupConverter)) + } else { + None } } @@ -341,6 +366,7 @@ private[events] object DatapointConverter { if (converter.hasNoData) it.remove() } + maxGroupsExceeded = groups.size() >= MaxGroupBySize } override def hasNoData: Boolean = groups.isEmpty diff --git a/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/DatapointEvent.scala b/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/DatapointEvent.scala index e30065737..32a761687 100644 --- a/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/DatapointEvent.scala +++ b/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/DatapointEvent.scala @@ -29,12 +29,15 @@ import com.netflix.atlas.json.Json * Timestamp for the data point. * @param value * Value for the data point. + * @param samples + * Optional set of event samples associated with the message. */ case class DatapointEvent( id: String, tags: Map[String, String], timestamp: Long, - override val value: Double + override val value: Double, + samples: List[List[Any]] = Nil ) extends LwcEvent { override def rawEvent: Any = this diff --git a/atlas-lwc-events/src/test/scala/com/netflix/atlas/lwc/events/DatapointConverterSuite.scala b/atlas-lwc-events/src/test/scala/com/netflix/atlas/lwc/events/DatapointConverterSuite.scala index 0e29061da..80ac15b6f 100644 --- a/atlas-lwc-events/src/test/scala/com/netflix/atlas/lwc/events/DatapointConverterSuite.scala +++ b/atlas-lwc-events/src/test/scala/com/netflix/atlas/lwc/events/DatapointConverterSuite.scala @@ -54,7 +54,7 @@ class DatapointConverterSuite extends FunSuite { test("counter - sum") { val expr = DataExpr.Sum(Query.True) val events = List.newBuilder[LwcEvent] - val converter = DatapointConverter("id", expr, clock, step, (_, e) => events.addOne(e)) + val converter = DatapointConverter("id", expr, clock, step, None, (_, e) => events.addOne(e)) (0 until 5).foreach { i => val event = LwcEvent(Map("value" -> i)) converter.update(event) @@ -66,10 +66,32 @@ class DatapointConverterSuite extends FunSuite { assertEquals(results.head, DatapointEvent("id", Map.empty, step, 1.0)) } + test("counter - sum with sample") { + val expr = DataExpr.Sum(Query.True) + val events = List.newBuilder[LwcEvent] + val converter = DatapointConverter( + "id", + expr, + clock, + step, + Some(e => List(e.extractValue("value"))), + (_, e) => events.addOne(e) + ) + (0 until 5).foreach { i => + val event = LwcEvent(Map("value" -> i)) + converter.update(event) + } + clock.setWallTime(step + 1) + converter.flush(clock.wallTime()) + val results = events.result() + assertEquals(results.size, 1) + assertEquals(results.head, DatapointEvent("id", Map.empty, step, 1.0, List(List(0)))) + } + test("counter - sum custom value") { val expr = DataExpr.Sum(Query.Equal("value", "responseSize")) val events = List.newBuilder[LwcEvent] - val converter = DatapointConverter("id", expr, clock, step, (_, e) => events.addOne(e)) + val converter = DatapointConverter("id", expr, clock, step, None, (_, e) => events.addOne(e)) (0 until 5).foreach { i => val event = LwcEvent(Map("responseSize" -> i)) converter.update(event) @@ -84,7 +106,7 @@ class DatapointConverterSuite extends FunSuite { test("counter - count") { val expr = DataExpr.Count(Query.Equal("value", "responseSize")) val events = List.newBuilder[LwcEvent] - val converter = DatapointConverter("id", expr, clock, step, (_, e) => events.addOne(e)) + val converter = DatapointConverter("id", expr, clock, step, None, (_, e) => events.addOne(e)) (0 until 5).foreach { i => val event = LwcEvent(Map("responseSize" -> i)) converter.update(event) @@ -99,7 +121,7 @@ class DatapointConverterSuite extends FunSuite { test("counter - max custom value") { val expr = DataExpr.Max(Query.Equal("value", "responseSize")) val events = List.newBuilder[LwcEvent] - val converter = DatapointConverter("id", expr, clock, step, (_, e) => events.addOne(e)) + val converter = DatapointConverter("id", expr, clock, step, None, (_, e) => events.addOne(e)) (0 until 5).foreach { i => val event = LwcEvent(Map("responseSize" -> i)) converter.update(event) @@ -114,7 +136,7 @@ class DatapointConverterSuite extends FunSuite { test("counter - max negative value") { val expr = DataExpr.Max(Query.Equal("value", "responseSize")) val events = List.newBuilder[LwcEvent] - val converter = DatapointConverter("id", expr, clock, step, (_, e) => events.addOne(e)) + val converter = DatapointConverter("id", expr, clock, step, None, (_, e) => events.addOne(e)) (0 until 5).foreach { i => val event = LwcEvent(Map("responseSize" -> -(i + 10))) converter.update(event) @@ -129,7 +151,7 @@ class DatapointConverterSuite extends FunSuite { test("counter - min negative value") { val expr = DataExpr.Min(Query.Equal("value", "responseSize")) val events = List.newBuilder[LwcEvent] - val converter = DatapointConverter("id", expr, clock, step, (_, e) => events.addOne(e)) + val converter = DatapointConverter("id", expr, clock, step, None, (_, e) => events.addOne(e)) (0 until 5).foreach { i => val event = LwcEvent(Map("responseSize" -> -i)) converter.update(event) @@ -148,7 +170,7 @@ class DatapointConverterSuite extends FunSuite { test("timer - sum of totalTime") { val expr = DataExpr.Sum(Query.Equal("value", "latency").and(stat("totalTime"))) val events = List.newBuilder[LwcEvent] - val converter = DatapointConverter("id", expr, clock, step, (_, e) => events.addOne(e)) + val converter = DatapointConverter("id", expr, clock, step, None, (_, e) => events.addOne(e)) (0 until 5).foreach { i => val event = LwcEvent(Map("latency" -> i)) converter.update(event) @@ -163,7 +185,7 @@ class DatapointConverterSuite extends FunSuite { test("timer - sum of count") { val expr = DataExpr.Sum(Query.Equal("value", "latency").and(stat("count"))) val events = List.newBuilder[LwcEvent] - val converter = DatapointConverter("id", expr, clock, step, (_, e) => events.addOne(e)) + val converter = DatapointConverter("id", expr, clock, step, None, (_, e) => events.addOne(e)) (0 until 5).foreach { i => val event = LwcEvent(Map("latency" -> i)) converter.update(event) @@ -178,7 +200,7 @@ class DatapointConverterSuite extends FunSuite { test("timer - sum of totalOfSquares") { val expr = DataExpr.Sum(Query.Equal("value", "latency").and(stat("totalOfSquares"))) val events = List.newBuilder[LwcEvent] - val converter = DatapointConverter("id", expr, clock, step, (_, e) => events.addOne(e)) + val converter = DatapointConverter("id", expr, clock, step, None, (_, e) => events.addOne(e)) (0 until 5).foreach { i => val event = LwcEvent(Map("latency" -> i)) converter.update(event) @@ -193,7 +215,7 @@ class DatapointConverterSuite extends FunSuite { test("timer - dist-max") { val expr = DataExpr.Max(Query.Equal("value", "latency").and(stat("max"))) val events = List.newBuilder[LwcEvent] - val converter = DatapointConverter("id", expr, clock, step, (_, e) => events.addOne(e)) + val converter = DatapointConverter("id", expr, clock, step, None, (_, e) => events.addOne(e)) (0 until 5).foreach { i => val event = LwcEvent(Map("latency" -> i)) converter.update(event) @@ -208,7 +230,7 @@ class DatapointConverterSuite extends FunSuite { test("timer - dist-min") { val expr = DataExpr.Min(Query.Equal("value", "latency").and(stat("max"))) val events = List.newBuilder[LwcEvent] - val converter = DatapointConverter("id", expr, clock, step, (_, e) => events.addOne(e)) + val converter = DatapointConverter("id", expr, clock, step, None, (_, e) => events.addOne(e)) (0 until 5).foreach { i => val event = LwcEvent(Map("latency" -> i)) converter.update(event) @@ -223,7 +245,7 @@ class DatapointConverterSuite extends FunSuite { test("timer - percentile") { val expr = DataExpr.GroupBy(DataExpr.Sum(Query.Equal("value", "latency")), List("percentile")) val events = List.newBuilder[LwcEvent] - val converter = DatapointConverter("id", expr, clock, step, (_, e) => events.addOne(e)) + val converter = DatapointConverter("id", expr, clock, step, None, (_, e) => events.addOne(e)) (0 until 5).foreach { i => val event = LwcEvent(Map("latency" -> Duration.ofMillis(i))) converter.update(event) @@ -238,7 +260,7 @@ class DatapointConverterSuite extends FunSuite { test("dist - sum of totalAmount") { val expr = DataExpr.Sum(Query.Equal("value", "responseSize").and(stat("totalAmount"))) val events = List.newBuilder[LwcEvent] - val converter = DatapointConverter("id", expr, clock, step, (_, e) => events.addOne(e)) + val converter = DatapointConverter("id", expr, clock, step, None, (_, e) => events.addOne(e)) (0 until 5).foreach { i => val event = LwcEvent(Map("responseSize" -> i)) converter.update(event) @@ -254,7 +276,7 @@ class DatapointConverterSuite extends FunSuite { val stat = Query.In("statistic", List("totalTime", "totalAmount")) val expr = DataExpr.Sum(Query.Equal("value", "responseSize").and(stat)) val events = List.newBuilder[LwcEvent] - val converter = DatapointConverter("id", expr, clock, step, (_, e) => events.addOne(e)) + val converter = DatapointConverter("id", expr, clock, step, None, (_, e) => events.addOne(e)) (0 until 5).foreach { i => val event = LwcEvent(Map("responseSize" -> i)) converter.update(event) @@ -269,7 +291,7 @@ class DatapointConverterSuite extends FunSuite { test("dist - sum of count") { val expr = DataExpr.Sum(Query.Equal("value", "responseSize").and(stat("count"))) val events = List.newBuilder[LwcEvent] - val converter = DatapointConverter("id", expr, clock, step, (_, e) => events.addOne(e)) + val converter = DatapointConverter("id", expr, clock, step, None, (_, e) => events.addOne(e)) (0 until 5).foreach { i => val event = LwcEvent(Map("responseSize" -> i)) converter.update(event) @@ -284,7 +306,7 @@ class DatapointConverterSuite extends FunSuite { test("dist - sum of totalOfSquares") { val expr = DataExpr.Sum(Query.Equal("value", "responseSize").and(stat("totalOfSquares"))) val events = List.newBuilder[LwcEvent] - val converter = DatapointConverter("id", expr, clock, step, (_, e) => events.addOne(e)) + val converter = DatapointConverter("id", expr, clock, step, None, (_, e) => events.addOne(e)) (0 until 5).foreach { i => val event = LwcEvent(Map("responseSize" -> i)) converter.update(event) @@ -299,7 +321,7 @@ class DatapointConverterSuite extends FunSuite { test("dist - dist-max") { val expr = DataExpr.Max(Query.Equal("value", "responseSize").and(stat("max"))) val events = List.newBuilder[LwcEvent] - val converter = DatapointConverter("id", expr, clock, step, (_, e) => events.addOne(e)) + val converter = DatapointConverter("id", expr, clock, step, None, (_, e) => events.addOne(e)) (0 until 5).foreach { i => val event = LwcEvent(Map("responseSize" -> i)) converter.update(event) @@ -317,7 +339,7 @@ class DatapointConverterSuite extends FunSuite { List("percentile") ) val events = List.newBuilder[LwcEvent] - val converter = DatapointConverter("id", expr, clock, step, (_, e) => events.addOne(e)) + val converter = DatapointConverter("id", expr, clock, step, None, (_, e) => events.addOne(e)) (0 until 5).foreach { i => val event = LwcEvent(Map("responseSize" -> i)) converter.update(event) @@ -332,7 +354,7 @@ class DatapointConverterSuite extends FunSuite { test("groupBy - sum") { val expr = DataExpr.GroupBy(DataExpr.Sum(Query.Equal("value", "responseSize")), List("app")) val events = List.newBuilder[LwcEvent] - val converter = DatapointConverter("id", expr, clock, step, (_, e) => events.addOne(e)) + val converter = DatapointConverter("id", expr, clock, step, None, (_, e) => events.addOne(e)) (0 until 5).foreach { i => converter.update(LwcEvent(Map("responseSize" -> i, "app" -> "a"))) converter.update(LwcEvent(Map("responseSize" -> i * 2, "app" -> "b"))) @@ -352,4 +374,65 @@ class DatapointConverterSuite extends FunSuite { } } } + + test("groupBy - sum limit exceeded") { + val expr = + DataExpr.GroupBy(DataExpr.Sum(Query.Equal("value", "responseSize")), List("responseSize")) + val events = List.newBuilder[LwcEvent] + val converter = DatapointConverter("id", expr, clock, step, None, (_, e) => events.addOne(e)) + + (0 until 10_000).foreach { i => + converter.update(LwcEvent(Map("responseSize" -> i.toString, "app" -> "a"))) + } + clock.setWallTime(step + 1) + converter.flush(clock.wallTime()) + var results = events.result() + assertEquals(results.size, 10_000) + events.clear() + + (0 until 100_000).foreach { i => + converter.update(LwcEvent(Map("responseSize" -> i.toString, "app" -> "a"))) + } + clock.setWallTime(step * 2 + 1) + converter.flush(clock.wallTime()) + results = events.result() + assertEquals(results.size, 10_000) + } + + test("groupBy - sum with sample") { + val expr = DataExpr.GroupBy(DataExpr.Sum(Query.Equal("value", "responseSize")), List("app")) + val events = List.newBuilder[LwcEvent] + val converter = DatapointConverter( + "id", + expr, + clock, + step, + Some(e => List(e.extractValue("app"))), + (_, e) => events.addOne(e) + ) + (0 until 5).foreach { i => + converter.update(LwcEvent(Map("responseSize" -> i, "app" -> "a"))) + converter.update(LwcEvent(Map("responseSize" -> i * 2, "app" -> "b"))) + converter.update(LwcEvent(Map("responseSize" -> i * 3, "app" -> "c"))) + } + clock.setWallTime(step + 1) + converter.flush(clock.wallTime()) + val results = events.result() + assertEquals(results.size, 3) + results.foreach { event => + val dp = event.asInstanceOf[DatapointEvent] + assert(dp.tags.contains("app")) + dp.tags("app") match { + case "a" => + assertEqualsDouble(dp.value, 2.0, 1e-12) + assertEquals(dp.samples, List(List("a"))) + case "b" => + assertEqualsDouble(dp.value, 4.0, 1e-12) + assertEquals(dp.samples, List(List("b"))) + case "c" => + assertEqualsDouble(dp.value, 6.0, 1e-12) + assertEquals(dp.samples, List(List("c"))) + } + } + } } diff --git a/atlas-lwc-events/src/test/scala/com/netflix/atlas/lwc/events/LwcEventClientSuite.scala b/atlas-lwc-events/src/test/scala/com/netflix/atlas/lwc/events/LwcEventClientSuite.scala index 7ad372000..14ad3f44c 100644 --- a/atlas-lwc-events/src/test/scala/com/netflix/atlas/lwc/events/LwcEventClientSuite.scala +++ b/atlas-lwc-events/src/test/scala/com/netflix/atlas/lwc/events/LwcEventClientSuite.scala @@ -51,7 +51,7 @@ class LwcEventClientSuite extends FunSuite { ) ) val output = List.newBuilder[String] - val client = LwcEventClient(subs, output.addOne) + val client = LwcEventClient(subs, output.addOne, clock) client.process(sampleLwcEvent) assertEquals( List("""data: {"id":"2","event":{"tags":{"app":"www","node":"i-123"},"duration":42}}"""), @@ -59,6 +59,26 @@ class LwcEventClientSuite extends FunSuite { ) } + test("sampled pass-through") { + val subs = Subscriptions.fromTypedList( + List( + Subscription("1", step, "app,foo,:eq,(,app,),(,node,),:sample", Subscriptions.Events), + Subscription("2", step, "app,www,:eq,(,app,),(,node,),:sample", Subscriptions.Events) + ) + ) + val output = List.newBuilder[String] + val client = LwcEventClient(subs, output.addOne, clock) + client.process(sampleLwcEvent) + clock.setWallTime(step) + client.process(LwcEvent.HeartbeatLwcEvent(step)) + assertEquals( + List( + """data: {"id":"2","event":{"id":"2","tags":{"app":"www"},"timestamp":5000,"value":0.2,"samples":[["i-123"]]}}""" + ), + output.result() + ) + } + test("analytics, basic aggregate") { val subs = Subscriptions.fromTypedList( List( diff --git a/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/SubscribeApiSuite.scala b/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/SubscribeApiSuite.scala index 7c056622a..e74b63717 100644 --- a/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/SubscribeApiSuite.scala +++ b/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/SubscribeApiSuite.scala @@ -129,4 +129,42 @@ class SubscribeApiSuite extends MUnitRouteSuite { } } } + + test("subscribe websocket event sample") { + val client = WSProbe() + WS("/api/v2/subscribe/222", client.flow) ~> routes ~> check { + assert(isWebSocketUpgrade) + + // Send list of expressions to subscribe to + val exprs = + List(LwcExpression("name,disk,:eq,(,nf.app,),(,device,),:sample", ExprType.EVENTS, 5000L)) + client.sendMessage(LwcMessages.encodeBatch(exprs)) + + // Look for subscription messages, one for sum and one for count + var subscriptions = List.empty[LwcSubscriptionV2] + while (subscriptions.size < 1) { + parseBatch(client.expectMessage()).foreach { + case _: DiagnosticMessage => + case sub: LwcSubscriptionV2 => subscriptions = sub :: subscriptions + case h: LwcHeartbeat => assertEquals(h.step, 5000L) + case v => throw new MatchError(v) + } + } + + // Verify subscription is in the manager, push a message to the queue check that it + // is received by the client + assertEquals(subscriptions.flatMap(_.subExprs).size, 1) + subscriptions.flatMap(_.subExprs).foreach { m => + assertEquals(m.step, 5000L) + val tags = Map("name" -> "disk") + val json = Json.decode[JsonNode](Json.encode(tags)) + val event = LwcEvent(m.id, json) + val handlers = sm.handlersForSubscription(m.id) + assertEquals(handlers.size, 1) + handlers.head.offer(Seq(event)) + + assertEquals(parseBatch(client.expectMessage()), List(event)) + } + } + } }