Skip to content

Commit

Permalink
eval: sum aggregation for gauges (#1534)
Browse files Browse the repository at this point in the history
Adds a new datapoint aggregator implementation to handle
sum/count of gauge values that come from an aggregation
service where there can be duplicates. The aggregation
service would add a tag `atlas.aggr` with a key to use
for grouping and deduping the relevant values. Values
within a group will use max aggregation to be consistent
with the behavior if there was a single instance of the
aggregation service.

Backport of #1533.
  • Loading branch information
brharrington authored Apr 3, 2023
1 parent 46076a7 commit 19bd286
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import com.netflix.atlas.core.model.Datapoint
import com.netflix.atlas.core.model.Query
import com.netflix.atlas.core.model.TimeSeries
import com.netflix.atlas.core.util.Math
import com.netflix.atlas.core.util.RefDoubleHashMap
import com.netflix.spectator.api.Counter
import com.netflix.spectator.api.Registry

Expand Down Expand Up @@ -74,6 +75,8 @@ case class AggrDatapoint(

object AggrDatapoint {

private val aggrTagKey = "atlas.aggr"

/**
* Creates a dummy datapoint passed along when a heartbeat message is received from the
* lwcapi server. These are used to ensure regular messages are flowing into the time
Expand Down Expand Up @@ -172,20 +175,58 @@ object AggrDatapoint {
def datapoint: AggrDatapoint = init.copy(value = value)
}

/**
* Aggregator for the sum or count when used with gauges. In cases where data is going
* to the aggregator service, there can be duplicates of the gauge values. To get a
* correct sum as if it was from a single aggregator service instance, this will compute
* the max for a given `atlas.aggr` key and then sum the final results.
*/
private class GaugeSumAggregator(
init: AggrDatapoint,
op: (Double, Double) => Double,
settings: AggregatorSettings
) extends Aggregator(settings) {

private val maxValues = new RefDoubleHashMap[String]
numIntermediateDatapoints = 1
aggregate(init)

override def aggregate(datapoint: AggrDatapoint): Aggregator = {
if (!checkLimits) {
val aggrKey = datapoint.tags.getOrElse(aggrTagKey, "unknown")
maxValues.max(aggrKey, datapoint.value)
numInputDatapoints += 1
}
this
}

override def datapoints: List[AggrDatapoint] = List(datapoint)

def datapoint: AggrDatapoint = {
val tags = init.tags - aggrTagKey
var sum = 0.0
maxValues.foreach { (_, v) => sum = op(sum, v) }
init.copy(tags = tags, value = sum)
}
}

/**
* Group the datapoints by the tags and maintain a simple aggregator per distinct tag
* set.
*/
private class GroupByAggregator(settings: AggregatorSettings) extends Aggregator(settings) {

private val aggregators =
scala.collection.mutable.AnyRefMap.empty[Map[String, String], SimpleAggregator]
scala.collection.mutable.AnyRefMap.empty[Map[String, String], Aggregator]

private def newAggregator(datapoint: AggrDatapoint): SimpleAggregator = {
private def newAggregator(datapoint: AggrDatapoint): Aggregator = {
datapoint.expr match {
case GroupBy(af: DataExpr.Sum, _) if datapoint.tags.contains(aggrTagKey) =>
new GaugeSumAggregator(datapoint, aggrOp(af), settings)
case GroupBy(af: DataExpr.Count, _) if datapoint.tags.contains(aggrTagKey) =>
new GaugeSumAggregator(datapoint, aggrOp(af), settings)
case GroupBy(af: AggregateFunction, _) =>
val aggregator = new SimpleAggregator(datapoint, aggrOp(af), settings)
aggregator
new SimpleAggregator(datapoint, aggrOp(af), settings)
case _ =>
throw new IllegalArgumentException("datapoint is not for a grouped expression")
}
Expand All @@ -206,7 +247,7 @@ object AggrDatapoint {
}

override def datapoints: List[AggrDatapoint] = {
aggregators.values.map(_.datapoint).toList
aggregators.values.flatMap(_.datapoints).toList
}
}

Expand Down Expand Up @@ -235,6 +276,10 @@ object AggrDatapoint {
*/
def newAggregator(datapoint: AggrDatapoint, settings: AggregatorSettings): Aggregator = {
datapoint.expr match {
case af: DataExpr.Sum if datapoint.tags.contains(aggrTagKey) =>
new GaugeSumAggregator(datapoint, aggrOp(af), settings)
case af: DataExpr.Count if datapoint.tags.contains(aggrTagKey) =>
new GaugeSumAggregator(datapoint, aggrOp(af), settings)
case af: AggregateFunction =>
new SimpleAggregator(datapoint, aggrOp(af), settings)
case _: GroupBy =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,18 @@ class AggrDatapointSuite extends FunSuite {
}
}

private def createGaugeDatapoints(expr: DataExpr, t: Long, nodes: Int): List[AggrDatapoint] = {
(0 until nodes).toList.map { i =>
val k = i % 2
val node = f"i-$k%08d"
val tags = Map("name" -> "cpu", "atlas.aggr" -> k.toString)
if (!expr.isInstanceOf[DataExpr.AggregateFunction])
AggrDatapoint(t, step, expr, i.toString, tags + ("node" -> node), i)
else
AggrDatapoint(t, step, expr, i.toString, tags, i)
}
}

test("aggregate empty") {
assertEquals(
AggrDatapoint.aggregate(Nil, settings(Integer.MAX_VALUE, Integer.MAX_VALUE)),
Expand Down Expand Up @@ -82,6 +94,32 @@ class AggrDatapointSuite extends FunSuite {
assertEquals(result.head.value, 45.0)
}

test("aggregate gauges sum") {
val expr = DataExpr.Sum(Query.True)
val dataset = createGaugeDatapoints(expr, 0, 10)
val aggregator =
AggrDatapoint.aggregate(dataset, settings(Integer.MAX_VALUE, Integer.MAX_VALUE))
val result = aggregator.get.datapoints

assertEquals(result.size, 1)
assertEquals(result.head.timestamp, 0L)
assertEquals(result.head.tags, Map("name" -> "cpu"))
assertEquals(result.head.value, 17.0)
}

test("aggregate gauges count") {
val expr = DataExpr.Count(Query.True)
val dataset = createGaugeDatapoints(expr, 0, 10)
val aggregator =
AggrDatapoint.aggregate(dataset, settings(Integer.MAX_VALUE, Integer.MAX_VALUE))
val result = aggregator.get.datapoints

assertEquals(result.size, 1)
assertEquals(result.head.timestamp, 0L)
assertEquals(result.head.tags, Map("name" -> "cpu"))
assertEquals(result.head.value, 17.0)
}

test("aggregate group by") {
val expr = DataExpr.GroupBy(DataExpr.Sum(Query.True), List("node"))
val dataset = createDatapoints(expr, 0, 10)
Expand All @@ -96,6 +134,20 @@ class AggrDatapointSuite extends FunSuite {
}
}

test("aggregate gauges group by") {
val expr = DataExpr.GroupBy(DataExpr.Sum(Query.True), List("node"))
val dataset = createGaugeDatapoints(expr, 0, 10)
val aggregator =
AggrDatapoint.aggregate(dataset, settings(Integer.MAX_VALUE, Integer.MAX_VALUE))
val result = aggregator.get.datapoints

assertEquals(result.size, 2)
result.foreach { d =>
val v = d.tags("node").substring(2).toInt
assertEquals(d.value, if (v % 2 == 0) 8.0 else 9.0)
}
}

test("aggregate group by exceeds input data points") {
val expr = DataExpr.GroupBy(DataExpr.Sum(Query.True), List("node"))
val dataset = createDatapoints(expr, 0, 10)
Expand Down

0 comments on commit 19bd286

Please sign in to comment.