diff --git a/atlas-core/src/main/scala/com/netflix/atlas/core/model/MathVocabulary.scala b/atlas-core/src/main/scala/com/netflix/atlas/core/model/MathVocabulary.scala index 9de6c10d1..a36eda2b1 100644 --- a/atlas-core/src/main/scala/com/netflix/atlas/core/model/MathVocabulary.scala +++ b/atlas-core/src/main/scala/com/netflix/atlas/core/model/MathVocabulary.scala @@ -17,7 +17,6 @@ package com.netflix.atlas.core.model import java.time.ZoneId import java.time.ZoneOffset - import com.netflix.atlas.core.model.DataExpr.AggregateFunction import com.netflix.atlas.core.model.MathExpr.AggrMathExpr import com.netflix.atlas.core.model.MathExpr.NamedRewrite @@ -26,6 +25,7 @@ import com.netflix.atlas.core.stacklang.SimpleWord import com.netflix.atlas.core.stacklang.StandardVocabulary.Macro import com.netflix.atlas.core.stacklang.Vocabulary import com.netflix.atlas.core.stacklang.Word +import com.netflix.spectator.api.histogram.PercentileBuckets object MathVocabulary extends Vocabulary { @@ -75,6 +75,7 @@ object MathVocabulary extends Vocabulary { Min, Max, Percentiles, + SampleCount, Macro( "avg", List( @@ -1240,4 +1241,65 @@ object MathVocabulary extends Vocabulary { ) } + /** + * Compute the estimated number of samples within a range of the distribution for a + * percentile approximation. + */ + case object SampleCount extends Word { + + override def name: String = "sample-count" + + override def isStable: Boolean = false + + override def matches(stack: List[Any]): Boolean = { + stack match { + case DoubleType(_) :: DoubleType(_) :: (_: Query) :: _ => true + case _ => false + } + } + + private def bucketLabel(prefix: String, v: Double): String = { + val idx = PercentileBuckets.indexOf(v.toLong) + f"$prefix$idx%04X" + } + + private def bucketQuery(prefix: String, min: Double, max: Double): Query = { + val ge = Query.GreaterThanEqual(TagKey.percentile, bucketLabel(prefix, min)) + val le = Query.LessThanEqual(TagKey.percentile, bucketLabel(prefix, max)) + ge.and(le) + } + + private def bucketQuery(min: Double, max: Double): Query = { + // Verify values are reasonable + require(min < max, s"min >= max (min=$min, max=$max)") + require(min >= 0.0, s"min < 0 (min=$min)") + + // Query for the ranges of both distribution summaries and timers. This allows it + // to work for either type. + val distQuery = bucketQuery("D", min, max) + val timerQuery = bucketQuery("T", min * 1e9, max * 1e9) + distQuery.or(timerQuery) + } + + override def execute(context: Context): Context = { + context.stack match { + case DoubleType(max) :: DoubleType(min) :: (q: Query) :: stack => + val rangeQuery = q.and(bucketQuery(min, max)) + val expr = DataExpr.Sum(rangeQuery) + context.copy(stack = expr :: stack) + case _ => + invalidStack + } + } + + override def signature: String = "Query Double Double -- DataExpr" + + override def summary: String = + """ + |Estimate the number of samples for a percentile approximation within a range of + |the distribution. + |""".stripMargin + + override def examples: List[String] = Nil + } } diff --git a/atlas-core/src/test/scala/com/netflix/atlas/core/model/PercentilesSuite.scala b/atlas-core/src/test/scala/com/netflix/atlas/core/model/PercentilesSuite.scala index 3ce5a3623..9f7e8766e 100644 --- a/atlas-core/src/test/scala/com/netflix/atlas/core/model/PercentilesSuite.scala +++ b/atlas-core/src/test/scala/com/netflix/atlas/core/model/PercentilesSuite.scala @@ -17,8 +17,8 @@ package com.netflix.atlas.core.model import java.util.concurrent.TimeUnit import java.util.stream.Collectors - import com.netflix.atlas.core.stacklang.Interpreter +import com.netflix.atlas.core.util.Features import com.netflix.spectator.api.Counter import com.netflix.spectator.api.DefaultRegistry import com.netflix.spectator.api.histogram.PercentileBuckets @@ -50,6 +50,14 @@ class PercentilesSuite extends FunSuite { expr.eval(context, input).data } + def evalUnstable(str: String, input: List[TimeSeries]): List[TimeSeries] = { + val expr = interpreter.execute(str, Map.empty[String, Any], Features.UNSTABLE).stack match { + case (v: TimeSeriesExpr) :: _ => v + case _ => throw new IllegalArgumentException("invalid expr") + } + expr.eval(context, input).data + } + private val input100 = { (0 until 100).map { i => val bucket = f"D${PercentileBuckets.indexOf(i)}%04X" @@ -351,4 +359,76 @@ class PercentilesSuite extends FunSuite { assertEquals(ts.head.tags, Map("name" -> "NO_DATA")) assertEquals(ts.head.label, "NO DATA") } + + test("sample-count: distribution summary, range") { + val data = evalUnstable("name,test,:eq,50,100,:sample-count", input100) + assertEquals(data.size, 1) + val t = data.head + assertEqualsDouble(t.data(0L), 0.9, 1e-6) + } + + test("sample-count: distribution summary, 0 - N") { + val data = evalUnstable("name,test,:eq,0,50,:sample-count", input100) + assertEquals(data.size, 1) + val t = data.head + assertEqualsDouble(t.data(0L), 0.85, 1e-6) + } + + test("sample-count: distribution summary, N - Max") { + val data = evalUnstable("name,test,:eq,50,Infinity,:sample-count", input100) + assertEquals(data.size, 1) + val t = data.head + assertEqualsDouble(t.data(0L), 0.9, 1e-6) + } + + test("sample-count: distribution summary, Min >= Max") { + val e = intercept[IllegalArgumentException] { + evalUnstable("name,test,:eq,5,5,:sample-count", input100) + } + assertEquals(e.getMessage, "requirement failed: min >= max (min=5.0, max=5.0)") + } + + test("sample-count: distribution summary, Min < 0") { + val e = intercept[IllegalArgumentException] { + evalUnstable("name,test,:eq,-5,5,:sample-count", input100) + } + assertEquals(e.getMessage, "requirement failed: min < 0 (min=-5.0)") + } + + test("sample-count: distribution summary, NaN - 100") { + val e = intercept[IllegalArgumentException] { + evalUnstable("name,test,:eq,NaN,100,:sample-count", input100) + } + assertEquals(e.getMessage, "requirement failed: min >= max (min=NaN, max=100.0)") + } + + test("sample-count: distribution summary, 0 - NaN") { + val e = intercept[IllegalArgumentException] { + evalUnstable("name,test,:eq,0,NaN,:sample-count", input100) + } + assertEquals(e.getMessage, "requirement failed: min >= max (min=0.0, max=NaN)") + } + + test("sample-count: distribution summary, NaN - NaN") { + val e = intercept[IllegalArgumentException] { + evalUnstable("name,test,:eq,NaN,NaN,:sample-count", input100) + } + assertEquals(e.getMessage, "requirement failed: min >= max (min=NaN, max=NaN)") + } + + test("sample-count: timer, range too high") { + // Timer range is in seconds, sample data is 0-100 ns + val data = evalUnstable("name,test,:eq,50,100,:sample-count", inputTimer100) + assertEquals(data.size, 1) + val t = data.head + assert(t.data(0L).isNaN) + } + + test("sample-count: timer, range") { + // Timer range is in seconds, sample data is 0-100 ns + val data = evalUnstable("name,test,:eq,50e-9,100e-9,:sample-count", inputTimer100) + assertEquals(data.size, 1) + val t = data.head + assertEqualsDouble(t.data(0L), 0.9, 1e-6) + } }