diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/AggrDatapoint.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/AggrDatapoint.scala index c774897d3..c889d6143 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/AggrDatapoint.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/AggrDatapoint.scala @@ -24,6 +24,7 @@ import com.netflix.atlas.core.model.Datapoint import com.netflix.atlas.core.model.Query import com.netflix.atlas.core.model.TimeSeries import com.netflix.atlas.core.util.Math +import com.netflix.atlas.core.util.RefDoubleHashMap import com.netflix.spectator.api.Counter import com.netflix.spectator.api.Registry @@ -74,6 +75,8 @@ case class AggrDatapoint( object AggrDatapoint { + private val aggrTagKey = "atlas.aggr" + /** * Creates a dummy datapoint passed along when a heartbeat message is received from the * lwcapi server. These are used to ensure regular messages are flowing into the time @@ -172,6 +175,41 @@ object AggrDatapoint { def datapoint: AggrDatapoint = init.copy(value = value) } + /** + * Aggregator for the sum or count when used with gauges. In cases where data is going + * to the aggregator service, there can be duplicates of the gauge values. To get a + * correct sum as if it was from a single aggregator service instance, this will compute + * the max for a given `atlas.aggr` key and then sum the final results. + */ + private class GaugeSumAggregator( + init: AggrDatapoint, + op: (Double, Double) => Double, + settings: AggregatorSettings + ) extends Aggregator(settings) { + + private val maxValues = new RefDoubleHashMap[String] + numIntermediateDatapoints = 1 + aggregate(init) + + override def aggregate(datapoint: AggrDatapoint): Aggregator = { + if (!checkLimits) { + val aggrKey = datapoint.tags.getOrElse(aggrTagKey, "unknown") + maxValues.max(aggrKey, datapoint.value) + numInputDatapoints += 1 + } + this + } + + override def datapoints: List[AggrDatapoint] = List(datapoint) + + def datapoint: AggrDatapoint = { + val tags = init.tags - aggrTagKey + var sum = 0.0 + maxValues.foreach { (_, v) => sum = op(sum, v) } + init.copy(tags = tags, value = sum) + } + } + /** * Group the datapoints by the tags and maintain a simple aggregator per distinct tag * set. @@ -179,13 +217,16 @@ object AggrDatapoint { private class GroupByAggregator(settings: AggregatorSettings) extends Aggregator(settings) { private val aggregators = - scala.collection.mutable.AnyRefMap.empty[Map[String, String], SimpleAggregator] + scala.collection.mutable.AnyRefMap.empty[Map[String, String], Aggregator] - private def newAggregator(datapoint: AggrDatapoint): SimpleAggregator = { + private def newAggregator(datapoint: AggrDatapoint): Aggregator = { datapoint.expr match { + case GroupBy(af: DataExpr.Sum, _) if datapoint.tags.contains(aggrTagKey) => + new GaugeSumAggregator(datapoint, aggrOp(af), settings) + case GroupBy(af: DataExpr.Count, _) if datapoint.tags.contains(aggrTagKey) => + new GaugeSumAggregator(datapoint, aggrOp(af), settings) case GroupBy(af: AggregateFunction, _) => - val aggregator = new SimpleAggregator(datapoint, aggrOp(af), settings) - aggregator + new SimpleAggregator(datapoint, aggrOp(af), settings) case _ => throw new IllegalArgumentException("datapoint is not for a grouped expression") } @@ -206,7 +247,7 @@ object AggrDatapoint { } override def datapoints: List[AggrDatapoint] = { - aggregators.values.map(_.datapoint).toList + aggregators.values.flatMap(_.datapoints).toList } } @@ -235,6 +276,10 @@ object AggrDatapoint { */ def newAggregator(datapoint: AggrDatapoint, settings: AggregatorSettings): Aggregator = { datapoint.expr match { + case af: DataExpr.Sum if datapoint.tags.contains(aggrTagKey) => + new GaugeSumAggregator(datapoint, aggrOp(af), settings) + case af: DataExpr.Count if datapoint.tags.contains(aggrTagKey) => + new GaugeSumAggregator(datapoint, aggrOp(af), settings) case af: AggregateFunction => new SimpleAggregator(datapoint, aggrOp(af), settings) case _: GroupBy => diff --git a/atlas-eval/src/test/scala/com/netflix/atlas/eval/model/AggrDatapointSuite.scala b/atlas-eval/src/test/scala/com/netflix/atlas/eval/model/AggrDatapointSuite.scala index c636ebb3a..19f696ad3 100644 --- a/atlas-eval/src/test/scala/com/netflix/atlas/eval/model/AggrDatapointSuite.scala +++ b/atlas-eval/src/test/scala/com/netflix/atlas/eval/model/AggrDatapointSuite.scala @@ -40,6 +40,18 @@ class AggrDatapointSuite extends FunSuite { } } + private def createGaugeDatapoints(expr: DataExpr, t: Long, nodes: Int): List[AggrDatapoint] = { + (0 until nodes).toList.map { i => + val k = i % 2 + val node = f"i-$k%08d" + val tags = Map("name" -> "cpu", "atlas.aggr" -> k.toString) + if (!expr.isInstanceOf[DataExpr.AggregateFunction]) + AggrDatapoint(t, step, expr, i.toString, tags + ("node" -> node), i) + else + AggrDatapoint(t, step, expr, i.toString, tags, i) + } + } + test("aggregate empty") { assertEquals( AggrDatapoint.aggregate(Nil, settings(Integer.MAX_VALUE, Integer.MAX_VALUE)), @@ -82,6 +94,32 @@ class AggrDatapointSuite extends FunSuite { assertEquals(result.head.value, 45.0) } + test("aggregate gauges sum") { + val expr = DataExpr.Sum(Query.True) + val dataset = createGaugeDatapoints(expr, 0, 10) + val aggregator = + AggrDatapoint.aggregate(dataset, settings(Integer.MAX_VALUE, Integer.MAX_VALUE)) + val result = aggregator.get.datapoints + + assertEquals(result.size, 1) + assertEquals(result.head.timestamp, 0L) + assertEquals(result.head.tags, Map("name" -> "cpu")) + assertEquals(result.head.value, 17.0) + } + + test("aggregate gauges count") { + val expr = DataExpr.Count(Query.True) + val dataset = createGaugeDatapoints(expr, 0, 10) + val aggregator = + AggrDatapoint.aggregate(dataset, settings(Integer.MAX_VALUE, Integer.MAX_VALUE)) + val result = aggregator.get.datapoints + + assertEquals(result.size, 1) + assertEquals(result.head.timestamp, 0L) + assertEquals(result.head.tags, Map("name" -> "cpu")) + assertEquals(result.head.value, 17.0) + } + test("aggregate group by") { val expr = DataExpr.GroupBy(DataExpr.Sum(Query.True), List("node")) val dataset = createDatapoints(expr, 0, 10) @@ -96,6 +134,20 @@ class AggrDatapointSuite extends FunSuite { } } + test("aggregate gauges group by") { + val expr = DataExpr.GroupBy(DataExpr.Sum(Query.True), List("node")) + val dataset = createGaugeDatapoints(expr, 0, 10) + val aggregator = + AggrDatapoint.aggregate(dataset, settings(Integer.MAX_VALUE, Integer.MAX_VALUE)) + val result = aggregator.get.datapoints + + assertEquals(result.size, 2) + result.foreach { d => + val v = d.tags("node").substring(2).toInt + assertEquals(d.value, if (v % 2 == 0) 8.0 else 9.0) + } + } + test("aggregate group by exceeds input data points") { val expr = DataExpr.GroupBy(DataExpr.Sum(Query.True), List("node")) val dataset = createDatapoints(expr, 0, 10)