Skip to content

Commit

Permalink
eval: simplify data point source (#1627)
Browse files Browse the repository at this point in the history
This was originally intended for some deduping, but was
never utilized. The only function right now is to indicate
a synthetic data point generated from a heartbeat. This
change removes some of the deduping overhead based on the
source that won't actually dedup anything.
  • Loading branch information
brharrington authored Mar 19, 2024
1 parent 9f90bac commit e4fefc1
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,8 @@ import com.netflix.spectator.api.Registry
* Data expression associated with the value. This is needed if further aggregation
* is necessary and later for matching in the final evaluation phase.
* @param source
* The source combined with the expression are used for deduping the intermediate
* aggregates. This can be ignored at the risk of some values being included in the
* final result multiple times.
* Indicates whether it is an actual data point or a synthetic data point generated
* from a heartbeat.
* @param tags
* Tags associated with the datapoint.
* @param value
Expand All @@ -60,9 +59,6 @@ case class AggrDatapoint(
value: Double
) {

/** Identifier used for deduping intermediate aggregates. */
def id: String = s"$source:$expr"

/**
* Converts this value to a time series type that can be used for the final evaluation
* phase.
Expand Down Expand Up @@ -307,22 +303,12 @@ object AggrDatapoint {
def aggregate(values: List[AggrDatapoint], settings: AggregatorSettings): Option[Aggregator] = {
if (values.isEmpty) Option.empty
else {
val vs = dedup(values)
val aggr = newAggregator(vs.head, settings)
val aggregator = vs.tail
val aggr = newAggregator(values.head, settings)
val aggregator = values.tail
.foldLeft(aggr) { (acc, d) =>
acc.aggregate(d)
}
Some(aggregator)
}
}

/**
* Dedup the values using the ids for each value. This will take into account the
* group by keys such that values with different grouping keys will not be considered
* as duplicates.
*/
private def dedup(values: List[AggrDatapoint]): List[AggrDatapoint] = {
values.groupBy(_.id).map(_._2.head).toList
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@ private[stream] class LwcToAggrDatapoint(context: StreamContext)

private[this] val state = scala.collection.mutable.AnyRefMap.empty[String, LwcDataExpr]

// HACK: needed until we can plumb the actual source through the system
private var nextSource: Int = 0

override def onPush(): Unit = {
val builder = List.newBuilder[AggrDatapoint]
grab(in).foreach {
Expand Down Expand Up @@ -79,11 +76,9 @@ private[stream] class LwcToAggrDatapoint(context: StreamContext)
private def pushDatapoint(dp: LwcDatapoint): Option[AggrDatapoint] = {
state.get(dp.id) match {
case Some(sub) =>
// TODO, put in source, for now make it random to avoid dedup
nextSource += 1
val expr = sub.expr
val step = sub.step
Some(AggrDatapoint(dp.timestamp, step, expr, nextSource.toString, dp.tags, dp.value))
Some(AggrDatapoint(dp.timestamp, step, expr, "datapoint", dp.tags, dp.value))
case None =>
unknown.increment()
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,19 +81,6 @@ class AggrDatapointSuite extends FunSuite {
assert(aggregator.get.limitExceeded)
}

test("aggregate dedups using source") {
val expr = DataExpr.Sum(Query.True)
val dataset = createDatapoints(expr, 0, 10)
val aggregator =
AggrDatapoint.aggregate(dataset ::: dataset, settings(Integer.MAX_VALUE, Integer.MAX_VALUE))
val result = aggregator.get.datapoints

assertEquals(result.size, 1)
assertEquals(result.head.timestamp, 0L)
assertEquals(result.head.tags, Map("name" -> "cpu"))
assertEquals(result.head.value, 45.0)
}

test("aggregate gauges sum") {
val expr = DataExpr.Sum(Query.True)
val dataset = createGaugeDatapoints(expr, 0, 10)
Expand Down Expand Up @@ -176,20 +163,6 @@ class AggrDatapointSuite extends FunSuite {
assert(aggregator.get.limitExceeded)
}

test("aggregate, dedup and group by") {
val expr = DataExpr.GroupBy(DataExpr.Sum(Query.True), List("node"))
val dataset = createDatapoints(expr, 0, 10)
val aggregator =
AggrDatapoint.aggregate(dataset ::: dataset, settings(Integer.MAX_VALUE, Integer.MAX_VALUE))
val result = aggregator.get.datapoints

assertEquals(result.size, 10)
result.foreach { d =>
val v = d.tags("node").substring(2).toDouble
assertEquals(d.value, v)
}
}

test("aggregate all") {
val expr = DataExpr.All(Query.True)
val dataset = createDatapoints(expr, 0, 10)
Expand Down

0 comments on commit e4fefc1

Please sign in to comment.