Skip to content

Commit

Permalink
core: add sample-count operation
Browse files Browse the repository at this point in the history
Adds a `sample-count` operation that can be used to estimate
the number of samples received for a percentile approximation
baed on a range of the distribution. This estimate is computed
by adjusting the query to restrict the set of buckets used to
model the distribution to those for the specified range.

One use-case for this is to be able to track number of requests
that are not within an SLO based on latency. If more precise
tracking is needed, then a bucket counter could be used with a
bucket boundary that matches the SLO threshold. However, this
operation can be used to get an estimate if an existing
percentile timer exists.
  • Loading branch information
brharrington committed Dec 13, 2023
1 parent 1c7b6a4 commit 49defcc
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package com.netflix.atlas.core.model

import java.time.ZoneId
import java.time.ZoneOffset

import com.netflix.atlas.core.model.DataExpr.AggregateFunction
import com.netflix.atlas.core.model.MathExpr.AggrMathExpr
import com.netflix.atlas.core.model.MathExpr.NamedRewrite
Expand All @@ -26,6 +25,7 @@ import com.netflix.atlas.core.stacklang.SimpleWord
import com.netflix.atlas.core.stacklang.StandardVocabulary.Macro
import com.netflix.atlas.core.stacklang.Vocabulary
import com.netflix.atlas.core.stacklang.Word
import com.netflix.spectator.api.histogram.PercentileBuckets

object MathVocabulary extends Vocabulary {

Expand Down Expand Up @@ -75,6 +75,7 @@ object MathVocabulary extends Vocabulary {
Min,
Max,
Percentiles,
SampleCount,
Macro(
"avg",
List(
Expand Down Expand Up @@ -1240,4 +1241,65 @@ object MathVocabulary extends Vocabulary {
)
}

/**
* Compute the estimated number of samples within a range of the distribution for a
* percentile approximation.
*/
case object SampleCount extends Word {

override def name: String = "sample-count"

override def isStable: Boolean = false

override def matches(stack: List[Any]): Boolean = {
stack match {
case DoubleType(_) :: DoubleType(_) :: (_: Query) :: _ => true
case _ => false
}
}

private def bucketLabel(prefix: String, v: Double): String = {
val idx = PercentileBuckets.indexOf(v.toLong)
f"$prefix$idx%04X"
}

private def bucketQuery(prefix: String, min: Double, max: Double): Query = {
val ge = Query.GreaterThanEqual(TagKey.percentile, bucketLabel(prefix, min))
val le = Query.LessThanEqual(TagKey.percentile, bucketLabel(prefix, max))
ge.and(le)
}

private def bucketQuery(min: Double, max: Double): Query = {
// Verify values are reasonable
require(min < max, s"min >= max (min=$min, max=$max)")
require(min >= 0.0, s"min < 0 (min=$min)")

// Query for the ranges of both distribution summaries and timers. This allows it
// to work for either type.
val distQuery = bucketQuery("D", min, max)
val timerQuery = bucketQuery("T", min * 1e9, max * 1e9)
distQuery.or(timerQuery)
}

override def execute(context: Context): Context = {
context.stack match {
case DoubleType(max) :: DoubleType(min) :: (q: Query) :: stack =>
val rangeQuery = q.and(bucketQuery(min, max))
val expr = DataExpr.Sum(rangeQuery)
context.copy(stack = expr :: stack)
case _ =>
invalidStack
}
}

override def signature: String = "Query Double Double -- DataExpr"

override def summary: String =
"""
|Estimate the number of samples for a percentile approximation within a range of
|the distribution.
|""".stripMargin

override def examples: List[String] = Nil
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ package com.netflix.atlas.core.model

import java.util.concurrent.TimeUnit
import java.util.stream.Collectors

import com.netflix.atlas.core.stacklang.Interpreter
import com.netflix.atlas.core.util.Features
import com.netflix.spectator.api.Counter
import com.netflix.spectator.api.DefaultRegistry
import com.netflix.spectator.api.histogram.PercentileBuckets
Expand Down Expand Up @@ -50,6 +50,14 @@ class PercentilesSuite extends FunSuite {
expr.eval(context, input).data
}

def evalUnstable(str: String, input: List[TimeSeries]): List[TimeSeries] = {
val expr = interpreter.execute(str, Map.empty[String, Any], Features.UNSTABLE).stack match {
case (v: TimeSeriesExpr) :: _ => v
case _ => throw new IllegalArgumentException("invalid expr")
}
expr.eval(context, input).data
}

private val input100 = {
(0 until 100).map { i =>
val bucket = f"D${PercentileBuckets.indexOf(i)}%04X"
Expand Down Expand Up @@ -351,4 +359,76 @@ class PercentilesSuite extends FunSuite {
assertEquals(ts.head.tags, Map("name" -> "NO_DATA"))
assertEquals(ts.head.label, "NO DATA")
}

test("sample-count: distribution summary, range") {
val data = evalUnstable("name,test,:eq,50,100,:sample-count", input100)
assertEquals(data.size, 1)
val t = data.head
assertEqualsDouble(t.data(0L), 0.9, 1e-6)
}

test("sample-count: distribution summary, 0 - N") {
val data = evalUnstable("name,test,:eq,0,50,:sample-count", input100)
assertEquals(data.size, 1)
val t = data.head
assertEqualsDouble(t.data(0L), 0.85, 1e-6)
}

test("sample-count: distribution summary, N - Max") {
val data = evalUnstable("name,test,:eq,50,Infinity,:sample-count", input100)
assertEquals(data.size, 1)
val t = data.head
assertEqualsDouble(t.data(0L), 0.9, 1e-6)
}

test("sample-count: distribution summary, Min >= Max") {
val e = intercept[IllegalArgumentException] {
evalUnstable("name,test,:eq,5,5,:sample-count", input100)
}
assertEquals(e.getMessage, "requirement failed: min >= max (min=5.0, max=5.0)")
}

test("sample-count: distribution summary, Min < 0") {
val e = intercept[IllegalArgumentException] {
evalUnstable("name,test,:eq,-5,5,:sample-count", input100)
}
assertEquals(e.getMessage, "requirement failed: min < 0 (min=-5.0)")
}

test("sample-count: distribution summary, NaN - 100") {
val e = intercept[IllegalArgumentException] {
evalUnstable("name,test,:eq,NaN,100,:sample-count", input100)
}
assertEquals(e.getMessage, "requirement failed: min >= max (min=NaN, max=100.0)")
}

test("sample-count: distribution summary, 0 - NaN") {
val e = intercept[IllegalArgumentException] {
evalUnstable("name,test,:eq,0,NaN,:sample-count", input100)
}
assertEquals(e.getMessage, "requirement failed: min >= max (min=0.0, max=NaN)")
}

test("sample-count: distribution summary, NaN - NaN") {
val e = intercept[IllegalArgumentException] {
evalUnstable("name,test,:eq,NaN,NaN,:sample-count", input100)
}
assertEquals(e.getMessage, "requirement failed: min >= max (min=NaN, max=NaN)")
}

test("sample-count: timer, range too high") {
// Timer range is in seconds, sample data is 0-100 ns
val data = evalUnstable("name,test,:eq,50,100,:sample-count", inputTimer100)
assertEquals(data.size, 1)
val t = data.head
assert(t.data(0L).isNaN)
}

test("sample-count: timer, range") {
// Timer range is in seconds, sample data is 0-100 ns
val data = evalUnstable("name,test,:eq,50e-9,100e-9,:sample-count", inputTimer100)
assertEquals(data.size, 1)
val t = data.head
assertEqualsDouble(t.data(0L), 0.9, 1e-6)
}
}

0 comments on commit 49defcc

Please sign in to comment.