Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lwc-events: support percentile grouping #1666

Merged
merged 1 commit into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,29 @@ package com.netflix.atlas.lwc.events

import com.netflix.atlas.core.model.DataExpr
import com.netflix.atlas.core.model.Query
import com.netflix.atlas.core.model.TagKey
import com.netflix.atlas.core.util.Strings
import com.netflix.spectator.api.Clock
import com.netflix.spectator.api.histogram.PercentileBuckets
import com.netflix.spectator.impl.AtomicDouble
import com.netflix.spectator.impl.StepDouble

import java.time.Duration
import java.util.Locale
import java.util.concurrent.ConcurrentHashMap

/**
* Helper to convert a sequence of events into a data point.
*/
private[events] trait DatapointConverter {

/** Update the data point with an event. */
def update(event: LwcEvent): Unit

/** Update with a specific value that is already extracted from an event. */
def update(value: Double): Unit

/** Flush the data for a given timestamp. */
def flush(timestamp: Long): Unit
}

Expand All @@ -44,7 +53,7 @@ private[events] object DatapointConverter {
consumer: (String, LwcEvent) => Unit
): DatapointConverter = {
val tags = Query.tags(expr.query)
val mapper = createValueMapper(tags, expr.finalGrouping)
val mapper = createValueMapper(tags)
val params = Params(id, Query.tags(expr.query), clock, step, mapper, consumer)
toConverter(expr, params)
}
Expand All @@ -64,7 +73,7 @@ private[events] object DatapointConverter {
* Extract value and map as needed based on the type. Uses statistic and grouping to
* coerce events so they structurally work like spectator composite types.
*/
def createValueMapper(tags: Map[String, String], grouping: List[String]): LwcEvent => Double = {
private def createValueMapper(tags: Map[String, String]): LwcEvent => Double = {
tags.get("value") match {
case Some(k) =>
tags.get("statistic") match {
Expand Down Expand Up @@ -121,7 +130,10 @@ private[events] object DatapointConverter {
private val buffer = new StepDouble(0.0, params.clock, params.step)

override def update(event: LwcEvent): Unit = {
val value = params.valueMapper(event)
update(params.valueMapper(event))
}

override def update(value: Double): Unit = {
if (value.isFinite && value >= 0.0) {
buffer.getCurrent.addAndGet(value)
}
Expand All @@ -146,6 +158,10 @@ private[events] object DatapointConverter {
buffer.getCurrent.addAndGet(1.0)
}

override def update(value: Double): Unit = {
buffer.getCurrent.addAndGet(1.0)
}

override def flush(timestamp: Long): Unit = {
val value = buffer.poll(timestamp)
if (value.isFinite) {
Expand All @@ -162,7 +178,10 @@ private[events] object DatapointConverter {
private val buffer = new StepDouble(Double.NaN, params.clock, params.step)

override def update(event: LwcEvent): Unit = {
val value = params.valueMapper(event)
update(params.valueMapper(event))
}

override def update(value: Double): Unit = {
if (value.isFinite) {
buffer.getCurrent.max(value)
}
Expand All @@ -184,7 +203,10 @@ private[events] object DatapointConverter {
private val buffer = new StepDouble(Double.NaN, params.clock, params.step)

override def update(event: LwcEvent): Unit = {
val value = params.valueMapper(event)
update(params.valueMapper(event))
}

override def update(value: Double): Unit = {
if (value.isFinite) {
min(buffer.getCurrent, value)
}
Expand Down Expand Up @@ -218,19 +240,65 @@ private[events] object DatapointConverter {

private val groups = new ConcurrentHashMap[Map[String, String], DatapointConverter]()

private val isPercentile = by.keys.contains(TagKey.percentile)

override def update(event: LwcEvent): Unit = {
// Ignore events that are missing dimensions used in the grouping
val values = by.keys.map(event.tagValue).filterNot(_ == null)
if (by.keys.size == values.size) {
if (isPercentile) {
val rawValue = getRawValue(event)
val pctTag = toPercentileTag(rawValue)
val tagValues = by.keys
.map {
case TagKey.percentile => pctTag
case k => event.tagValue(k)
}
.filterNot(_ == null)
update(1.0, tagValues, event)
} else {
val tagValues = by.keys.map(event.tagValue).filterNot(_ == null)
val value = params.valueMapper(event)
update(value, tagValues, event)
}
}

override def update(value: Double): Unit = {
// Since there are no values for the group by tags, this is a no-op
}

private def update(value: Double, tagValues: List[String], event: LwcEvent): Unit = {
// Ignore events that are missing dimensions used in the grouping
if (by.keys.size == tagValues.size) {
if (value.isFinite) {
val tags = by.keys.zip(values).toMap
val tags = by.keys.zip(tagValues).toMap
val converter = groups.computeIfAbsent(tags, groupConverter)
converter.update(event)
converter.update(value)
}
}
}

private def getRawValue(event: LwcEvent): Any = {
params.tags.get("value") match {
case Some(k) => event.extractValue(k)
case None => event.value
}
}

private def toPercentileTag(value: Any): String = {
value match {
case d: Duration => toPercentileHex("T", d.toNanos)
case v => toPercentileHex("D", toDouble(v, 1.0).longValue)
}
}

private def toPercentileHex(prefix: String, value: Long): String = {
if (value <= 0) {
null
} else {
val b = PercentileBuckets.indexOf(value)
val hex = Integer.toString(b, 16).toUpperCase(Locale.US)
s"$prefix${Strings.zeroPad(hex, 4)}"
}
}

private def groupConverter(tags: Map[String, String]): DatapointConverter = {
val ps = params.copy(consumer = (id, event) => groupConsumer(tags, id, event))
toConverter(by.af, ps)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,39 @@ class DatapointConverterSuite extends FunSuite {
assertEquals(results.head, DatapointEvent("id", Query.tags(expr.query), step, 0.0))
}

test("dist - sum of totalTime") {
val expr = DataExpr.Sum(Query.Equal("value", "responseSize").and(stat("totalTime")))
test("timer - percentile") {
val expr = DataExpr.GroupBy(DataExpr.Sum(Query.Equal("value", "latency")), List("percentile"))
val events = List.newBuilder[LwcEvent]
val converter = DatapointConverter("id", expr, clock, step, (_, e) => events.addOne(e))
(0 until 5).foreach { i =>
val event = LwcEvent(Map("latency" -> Duration.ofMillis(i)))
converter.update(event)
}
clock.setWallTime(step + 1)
converter.flush(clock.wallTime())
val results = events.result()
assertEquals(results.size, 4)
assertEquals(results.head.value, 0.2, 1e-12)
}

test("dist - sum of totalAmount") {
val expr = DataExpr.Sum(Query.Equal("value", "responseSize").and(stat("totalAmount")))
val events = List.newBuilder[LwcEvent]
val converter = DatapointConverter("id", expr, clock, step, (_, e) => events.addOne(e))
(0 until 5).foreach { i =>
val event = LwcEvent(Map("responseSize" -> i))
converter.update(event)
}
clock.setWallTime(step + 1)
converter.flush(clock.wallTime())
val results = events.result()
assertEquals(results.size, 1)
assertEquals(results.head, DatapointEvent("id", Query.tags(expr.query), step, 2.0))
}

test("dist - sum of total either") {
val stat = Query.In("statistic", List("totalTime", "totalAmount"))
val expr = DataExpr.Sum(Query.Equal("value", "responseSize").and(stat))
val events = List.newBuilder[LwcEvent]
val converter = DatapointConverter("id", expr, clock, step, (_, e) => events.addOne(e))
(0 until 5).foreach { i =>
Expand Down Expand Up @@ -279,6 +310,24 @@ class DatapointConverterSuite extends FunSuite {
assertEquals(results.head, DatapointEvent("id", Query.tags(expr.query), step, 4.0))
}

test("dist - percentile") {
val expr = DataExpr.GroupBy(
DataExpr.Sum(Query.Equal("value", "responseSize")),
List("percentile")
)
val events = List.newBuilder[LwcEvent]
val converter = DatapointConverter("id", expr, clock, step, (_, e) => events.addOne(e))
(0 until 5).foreach { i =>
val event = LwcEvent(Map("responseSize" -> i))
converter.update(event)
}
clock.setWallTime(step + 1)
converter.flush(clock.wallTime())
val results = events.result()
assertEquals(results.size, 4)
assertEquals(results.head.value, 0.2, 1e-12)
}

test("groupBy - sum") {
val expr = DataExpr.GroupBy(DataExpr.Sum(Query.Equal("value", "responseSize")), List("app"))
val events = List.newBuilder[LwcEvent]
Expand Down
Loading