From f4e9f5e2638daf63a97f1ea1889143e4e52d5a10 Mon Sep 17 00:00:00 2001 From: brharrington Date: Sun, 9 Jun 2024 22:14:25 -0500 Subject: [PATCH] lwc-events: support percentile grouping (#1666) The `:percentiles` operator internally will group by `percentile`. Updates the datapoint converter to map the value to the appropriate bucket dimension so that percentile approximations can be used. For timers, the raw value type should be a `Duration` to allow for accurate unit conversion. --- .../atlas/lwc/events/DatapointConverter.scala | 88 ++++++++++++++++--- .../lwc/events/DatapointConverterSuite.scala | 53 ++++++++++- 2 files changed, 129 insertions(+), 12 deletions(-) diff --git a/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/DatapointConverter.scala b/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/DatapointConverter.scala index 0dfad1915..6f65764c0 100644 --- a/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/DatapointConverter.scala +++ b/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/DatapointConverter.scala @@ -17,11 +17,15 @@ package com.netflix.atlas.lwc.events import com.netflix.atlas.core.model.DataExpr import com.netflix.atlas.core.model.Query +import com.netflix.atlas.core.model.TagKey +import com.netflix.atlas.core.util.Strings import com.netflix.spectator.api.Clock +import com.netflix.spectator.api.histogram.PercentileBuckets import com.netflix.spectator.impl.AtomicDouble import com.netflix.spectator.impl.StepDouble import java.time.Duration +import java.util.Locale import java.util.concurrent.ConcurrentHashMap /** @@ -29,8 +33,13 @@ import java.util.concurrent.ConcurrentHashMap */ private[events] trait DatapointConverter { + /** Update the data point with an event. */ def update(event: LwcEvent): Unit + /** Update with a specific value that is already extracted from an event. */ + def update(value: Double): Unit + + /** Flush the data for a given timestamp. */ def flush(timestamp: Long): Unit } @@ -44,7 +53,7 @@ private[events] object DatapointConverter { consumer: (String, LwcEvent) => Unit ): DatapointConverter = { val tags = Query.tags(expr.query) - val mapper = createValueMapper(tags, expr.finalGrouping) + val mapper = createValueMapper(tags) val params = Params(id, Query.tags(expr.query), clock, step, mapper, consumer) toConverter(expr, params) } @@ -64,7 +73,7 @@ private[events] object DatapointConverter { * Extract value and map as needed based on the type. Uses statistic and grouping to * coerce events so they structurally work like spectator composite types. */ - def createValueMapper(tags: Map[String, String], grouping: List[String]): LwcEvent => Double = { + private def createValueMapper(tags: Map[String, String]): LwcEvent => Double = { tags.get("value") match { case Some(k) => tags.get("statistic") match { @@ -121,7 +130,10 @@ private[events] object DatapointConverter { private val buffer = new StepDouble(0.0, params.clock, params.step) override def update(event: LwcEvent): Unit = { - val value = params.valueMapper(event) + update(params.valueMapper(event)) + } + + override def update(value: Double): Unit = { if (value.isFinite && value >= 0.0) { buffer.getCurrent.addAndGet(value) } @@ -146,6 +158,10 @@ private[events] object DatapointConverter { buffer.getCurrent.addAndGet(1.0) } + override def update(value: Double): Unit = { + buffer.getCurrent.addAndGet(1.0) + } + override def flush(timestamp: Long): Unit = { val value = buffer.poll(timestamp) if (value.isFinite) { @@ -162,7 +178,10 @@ private[events] object DatapointConverter { private val buffer = new StepDouble(Double.NaN, params.clock, params.step) override def update(event: LwcEvent): Unit = { - val value = params.valueMapper(event) + update(params.valueMapper(event)) + } + + override def update(value: Double): Unit = { if (value.isFinite) { buffer.getCurrent.max(value) } @@ -184,7 +203,10 @@ private[events] object DatapointConverter { private val buffer = new StepDouble(Double.NaN, params.clock, params.step) override def update(event: LwcEvent): Unit = { - val value = params.valueMapper(event) + update(params.valueMapper(event)) + } + + override def update(value: Double): Unit = { if (value.isFinite) { min(buffer.getCurrent, value) } @@ -218,19 +240,65 @@ private[events] object DatapointConverter { private val groups = new ConcurrentHashMap[Map[String, String], DatapointConverter]() + private val isPercentile = by.keys.contains(TagKey.percentile) + override def update(event: LwcEvent): Unit = { - // Ignore events that are missing dimensions used in the grouping - val values = by.keys.map(event.tagValue).filterNot(_ == null) - if (by.keys.size == values.size) { + if (isPercentile) { + val rawValue = getRawValue(event) + val pctTag = toPercentileTag(rawValue) + val tagValues = by.keys + .map { + case TagKey.percentile => pctTag + case k => event.tagValue(k) + } + .filterNot(_ == null) + update(1.0, tagValues, event) + } else { + val tagValues = by.keys.map(event.tagValue).filterNot(_ == null) val value = params.valueMapper(event) + update(value, tagValues, event) + } + } + + override def update(value: Double): Unit = { + // Since there are no values for the group by tags, this is a no-op + } + + private def update(value: Double, tagValues: List[String], event: LwcEvent): Unit = { + // Ignore events that are missing dimensions used in the grouping + if (by.keys.size == tagValues.size) { if (value.isFinite) { - val tags = by.keys.zip(values).toMap + val tags = by.keys.zip(tagValues).toMap val converter = groups.computeIfAbsent(tags, groupConverter) - converter.update(event) + converter.update(value) } } } + private def getRawValue(event: LwcEvent): Any = { + params.tags.get("value") match { + case Some(k) => event.extractValue(k) + case None => event.value + } + } + + private def toPercentileTag(value: Any): String = { + value match { + case d: Duration => toPercentileHex("T", d.toNanos) + case v => toPercentileHex("D", toDouble(v, 1.0).longValue) + } + } + + private def toPercentileHex(prefix: String, value: Long): String = { + if (value <= 0) { + null + } else { + val b = PercentileBuckets.indexOf(value) + val hex = Integer.toString(b, 16).toUpperCase(Locale.US) + s"$prefix${Strings.zeroPad(hex, 4)}" + } + } + private def groupConverter(tags: Map[String, String]): DatapointConverter = { val ps = params.copy(consumer = (id, event) => groupConsumer(tags, id, event)) toConverter(by.af, ps) diff --git a/atlas-lwc-events/src/test/scala/com/netflix/atlas/lwc/events/DatapointConverterSuite.scala b/atlas-lwc-events/src/test/scala/com/netflix/atlas/lwc/events/DatapointConverterSuite.scala index d1fe842ea..22b484eb5 100644 --- a/atlas-lwc-events/src/test/scala/com/netflix/atlas/lwc/events/DatapointConverterSuite.scala +++ b/atlas-lwc-events/src/test/scala/com/netflix/atlas/lwc/events/DatapointConverterSuite.scala @@ -219,8 +219,39 @@ class DatapointConverterSuite extends FunSuite { assertEquals(results.head, DatapointEvent("id", Query.tags(expr.query), step, 0.0)) } - test("dist - sum of totalTime") { - val expr = DataExpr.Sum(Query.Equal("value", "responseSize").and(stat("totalTime"))) + test("timer - percentile") { + val expr = DataExpr.GroupBy(DataExpr.Sum(Query.Equal("value", "latency")), List("percentile")) + val events = List.newBuilder[LwcEvent] + val converter = DatapointConverter("id", expr, clock, step, (_, e) => events.addOne(e)) + (0 until 5).foreach { i => + val event = LwcEvent(Map("latency" -> Duration.ofMillis(i))) + converter.update(event) + } + clock.setWallTime(step + 1) + converter.flush(clock.wallTime()) + val results = events.result() + assertEquals(results.size, 4) + assertEquals(results.head.value, 0.2, 1e-12) + } + + test("dist - sum of totalAmount") { + val expr = DataExpr.Sum(Query.Equal("value", "responseSize").and(stat("totalAmount"))) + val events = List.newBuilder[LwcEvent] + val converter = DatapointConverter("id", expr, clock, step, (_, e) => events.addOne(e)) + (0 until 5).foreach { i => + val event = LwcEvent(Map("responseSize" -> i)) + converter.update(event) + } + clock.setWallTime(step + 1) + converter.flush(clock.wallTime()) + val results = events.result() + assertEquals(results.size, 1) + assertEquals(results.head, DatapointEvent("id", Query.tags(expr.query), step, 2.0)) + } + + test("dist - sum of total either") { + val stat = Query.In("statistic", List("totalTime", "totalAmount")) + val expr = DataExpr.Sum(Query.Equal("value", "responseSize").and(stat)) val events = List.newBuilder[LwcEvent] val converter = DatapointConverter("id", expr, clock, step, (_, e) => events.addOne(e)) (0 until 5).foreach { i => @@ -279,6 +310,24 @@ class DatapointConverterSuite extends FunSuite { assertEquals(results.head, DatapointEvent("id", Query.tags(expr.query), step, 4.0)) } + test("dist - percentile") { + val expr = DataExpr.GroupBy( + DataExpr.Sum(Query.Equal("value", "responseSize")), + List("percentile") + ) + val events = List.newBuilder[LwcEvent] + val converter = DatapointConverter("id", expr, clock, step, (_, e) => events.addOne(e)) + (0 until 5).foreach { i => + val event = LwcEvent(Map("responseSize" -> i)) + converter.update(event) + } + clock.setWallTime(step + 1) + converter.flush(clock.wallTime()) + val results = events.result() + assertEquals(results.size, 4) + assertEquals(results.head.value, 0.2, 1e-12) + } + test("groupBy - sum") { val expr = DataExpr.GroupBy(DataExpr.Sum(Query.Equal("value", "responseSize")), List("app")) val events = List.newBuilder[LwcEvent]