Skip to content

Commit

Permalink
lwc-events: support percentile grouping (#1666)
Browse files Browse the repository at this point in the history
The `:percentiles` operator internally will group by
`percentile`. Updates the datapoint converter to map
the value to the appropriate bucket dimension so that
percentile approximations can be used. For timers, the
raw value type should be a `Duration` to allow for
accurate unit conversion.
  • Loading branch information
brharrington authored Jun 10, 2024
1 parent 420d37f commit f4e9f5e
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,29 @@ package com.netflix.atlas.lwc.events

import com.netflix.atlas.core.model.DataExpr
import com.netflix.atlas.core.model.Query
import com.netflix.atlas.core.model.TagKey
import com.netflix.atlas.core.util.Strings
import com.netflix.spectator.api.Clock
import com.netflix.spectator.api.histogram.PercentileBuckets
import com.netflix.spectator.impl.AtomicDouble
import com.netflix.spectator.impl.StepDouble

import java.time.Duration
import java.util.Locale
import java.util.concurrent.ConcurrentHashMap

/**
* Helper to convert a sequence of events into a data point.
*/
private[events] trait DatapointConverter {

/** Update the data point with an event. */
def update(event: LwcEvent): Unit

/** Update with a specific value that is already extracted from an event. */
def update(value: Double): Unit

/** Flush the data for a given timestamp. */
def flush(timestamp: Long): Unit
}

Expand All @@ -44,7 +53,7 @@ private[events] object DatapointConverter {
consumer: (String, LwcEvent) => Unit
): DatapointConverter = {
val tags = Query.tags(expr.query)
val mapper = createValueMapper(tags, expr.finalGrouping)
val mapper = createValueMapper(tags)
val params = Params(id, Query.tags(expr.query), clock, step, mapper, consumer)
toConverter(expr, params)
}
Expand All @@ -64,7 +73,7 @@ private[events] object DatapointConverter {
* Extract value and map as needed based on the type. Uses statistic and grouping to
* coerce events so they structurally work like spectator composite types.
*/
def createValueMapper(tags: Map[String, String], grouping: List[String]): LwcEvent => Double = {
private def createValueMapper(tags: Map[String, String]): LwcEvent => Double = {
tags.get("value") match {
case Some(k) =>
tags.get("statistic") match {
Expand Down Expand Up @@ -121,7 +130,10 @@ private[events] object DatapointConverter {
private val buffer = new StepDouble(0.0, params.clock, params.step)

override def update(event: LwcEvent): Unit = {
val value = params.valueMapper(event)
update(params.valueMapper(event))
}

override def update(value: Double): Unit = {
if (value.isFinite && value >= 0.0) {
buffer.getCurrent.addAndGet(value)
}
Expand All @@ -146,6 +158,10 @@ private[events] object DatapointConverter {
buffer.getCurrent.addAndGet(1.0)
}

override def update(value: Double): Unit = {
buffer.getCurrent.addAndGet(1.0)
}

override def flush(timestamp: Long): Unit = {
val value = buffer.poll(timestamp)
if (value.isFinite) {
Expand All @@ -162,7 +178,10 @@ private[events] object DatapointConverter {
private val buffer = new StepDouble(Double.NaN, params.clock, params.step)

override def update(event: LwcEvent): Unit = {
val value = params.valueMapper(event)
update(params.valueMapper(event))
}

override def update(value: Double): Unit = {
if (value.isFinite) {
buffer.getCurrent.max(value)
}
Expand All @@ -184,7 +203,10 @@ private[events] object DatapointConverter {
private val buffer = new StepDouble(Double.NaN, params.clock, params.step)

override def update(event: LwcEvent): Unit = {
val value = params.valueMapper(event)
update(params.valueMapper(event))
}

override def update(value: Double): Unit = {
if (value.isFinite) {
min(buffer.getCurrent, value)
}
Expand Down Expand Up @@ -218,19 +240,65 @@ private[events] object DatapointConverter {

private val groups = new ConcurrentHashMap[Map[String, String], DatapointConverter]()

private val isPercentile = by.keys.contains(TagKey.percentile)

override def update(event: LwcEvent): Unit = {
// Ignore events that are missing dimensions used in the grouping
val values = by.keys.map(event.tagValue).filterNot(_ == null)
if (by.keys.size == values.size) {
if (isPercentile) {
val rawValue = getRawValue(event)
val pctTag = toPercentileTag(rawValue)
val tagValues = by.keys
.map {
case TagKey.percentile => pctTag
case k => event.tagValue(k)
}
.filterNot(_ == null)
update(1.0, tagValues, event)
} else {
val tagValues = by.keys.map(event.tagValue).filterNot(_ == null)
val value = params.valueMapper(event)
update(value, tagValues, event)
}
}

override def update(value: Double): Unit = {
// Since there are no values for the group by tags, this is a no-op
}

private def update(value: Double, tagValues: List[String], event: LwcEvent): Unit = {
// Ignore events that are missing dimensions used in the grouping
if (by.keys.size == tagValues.size) {
if (value.isFinite) {
val tags = by.keys.zip(values).toMap
val tags = by.keys.zip(tagValues).toMap
val converter = groups.computeIfAbsent(tags, groupConverter)
converter.update(event)
converter.update(value)
}
}
}

private def getRawValue(event: LwcEvent): Any = {
params.tags.get("value") match {
case Some(k) => event.extractValue(k)
case None => event.value
}
}

private def toPercentileTag(value: Any): String = {
value match {
case d: Duration => toPercentileHex("T", d.toNanos)
case v => toPercentileHex("D", toDouble(v, 1.0).longValue)
}
}

private def toPercentileHex(prefix: String, value: Long): String = {
if (value <= 0) {
null
} else {
val b = PercentileBuckets.indexOf(value)
val hex = Integer.toString(b, 16).toUpperCase(Locale.US)
s"$prefix${Strings.zeroPad(hex, 4)}"
}
}

private def groupConverter(tags: Map[String, String]): DatapointConverter = {
val ps = params.copy(consumer = (id, event) => groupConsumer(tags, id, event))
toConverter(by.af, ps)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,39 @@ class DatapointConverterSuite extends FunSuite {
assertEquals(results.head, DatapointEvent("id", Query.tags(expr.query), step, 0.0))
}

test("dist - sum of totalTime") {
val expr = DataExpr.Sum(Query.Equal("value", "responseSize").and(stat("totalTime")))
test("timer - percentile") {
val expr = DataExpr.GroupBy(DataExpr.Sum(Query.Equal("value", "latency")), List("percentile"))
val events = List.newBuilder[LwcEvent]
val converter = DatapointConverter("id", expr, clock, step, (_, e) => events.addOne(e))
(0 until 5).foreach { i =>
val event = LwcEvent(Map("latency" -> Duration.ofMillis(i)))
converter.update(event)
}
clock.setWallTime(step + 1)
converter.flush(clock.wallTime())
val results = events.result()
assertEquals(results.size, 4)
assertEquals(results.head.value, 0.2, 1e-12)
}

test("dist - sum of totalAmount") {
val expr = DataExpr.Sum(Query.Equal("value", "responseSize").and(stat("totalAmount")))
val events = List.newBuilder[LwcEvent]
val converter = DatapointConverter("id", expr, clock, step, (_, e) => events.addOne(e))
(0 until 5).foreach { i =>
val event = LwcEvent(Map("responseSize" -> i))
converter.update(event)
}
clock.setWallTime(step + 1)
converter.flush(clock.wallTime())
val results = events.result()
assertEquals(results.size, 1)
assertEquals(results.head, DatapointEvent("id", Query.tags(expr.query), step, 2.0))
}

test("dist - sum of total either") {
val stat = Query.In("statistic", List("totalTime", "totalAmount"))
val expr = DataExpr.Sum(Query.Equal("value", "responseSize").and(stat))
val events = List.newBuilder[LwcEvent]
val converter = DatapointConverter("id", expr, clock, step, (_, e) => events.addOne(e))
(0 until 5).foreach { i =>
Expand Down Expand Up @@ -279,6 +310,24 @@ class DatapointConverterSuite extends FunSuite {
assertEquals(results.head, DatapointEvent("id", Query.tags(expr.query), step, 4.0))
}

test("dist - percentile") {
val expr = DataExpr.GroupBy(
DataExpr.Sum(Query.Equal("value", "responseSize")),
List("percentile")
)
val events = List.newBuilder[LwcEvent]
val converter = DatapointConverter("id", expr, clock, step, (_, e) => events.addOne(e))
(0 until 5).foreach { i =>
val event = LwcEvent(Map("responseSize" -> i))
converter.update(event)
}
clock.setWallTime(step + 1)
converter.flush(clock.wallTime())
val results = events.result()
assertEquals(results.size, 4)
assertEquals(results.head.value, 0.2, 1e-12)
}

test("groupBy - sum") {
val expr = DataExpr.GroupBy(DataExpr.Sum(Query.Equal("value", "responseSize")), List("app"))
val events = List.newBuilder[LwcEvent]
Expand Down

0 comments on commit f4e9f5e

Please sign in to comment.