diff --git a/atlas-eval/src/main/resources/reference.conf b/atlas-eval/src/main/resources/reference.conf index e0bbf789f..3ee809eca 100644 --- a/atlas-eval/src/main/resources/reference.conf +++ b/atlas-eval/src/main/resources/reference.conf @@ -20,6 +20,11 @@ atlas.eval { max-input-datapoints = 2147483647 // Maximum number of datapoints resulting from a group by. Defaults to Integer.MaxValue max-intermediate-datapoints = 2147483647 + + // Maximum step size allowed for streaming. Large step sizes require a large amount of time + // to accumulate data and could complicate deployments if not data loss is desirable. Limit + // to avoid usage. + max-step = 60s } // Broad tag keys that should be ignored for the purposes of dropping expensive queries diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/ExprInterpreter.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/ExprInterpreter.scala index 50d533408..802a84ae7 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/ExprInterpreter.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/ExprInterpreter.scala @@ -32,6 +32,7 @@ import com.netflix.atlas.eval.stream.Evaluator.DataSources import com.netflix.atlas.eval.util.HostRewriter import com.typesafe.config.Config +import java.time.Duration import scala.util.Success private[stream] class ExprInterpreter(config: Config) { @@ -42,6 +43,8 @@ private[stream] class ExprInterpreter(config: Config) { private val hostRewriter = new HostRewriter(config.getConfig("atlas.eval.host-rewrite")) + private val maxStep = config.getDuration("atlas.eval.stream.limits.max-step") + // Use simple legends for expressions private val simpleLegendsEnabled: Boolean = config.getBoolean("atlas.eval.graph.simple-legends-enabled") @@ -57,6 +60,12 @@ private[stream] class ExprInterpreter(config: Config) { def eval(uri: Uri): GraphConfig = { val graphCfg = grapher.toGraphConfig(uri) + // Check step size is within bounds + if (graphCfg.stepSize > maxStep.toMillis) { + val step = Duration.ofMillis(graphCfg.stepSize) + throw new IllegalArgumentException(s"max allowed step size exceeded ($step > $maxStep)") + } + // Check that data expressions are supported. The streaming path doesn't support // time shifts, filters, and integral. The filters and integral are excluded because // they can be confusing as the time window for evaluation is not bounded. diff --git a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/EvaluatorSuite.scala b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/EvaluatorSuite.scala index 817a84d0e..55677091c 100644 --- a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/EvaluatorSuite.scala +++ b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/EvaluatorSuite.scala @@ -363,6 +363,14 @@ class EvaluatorSuite extends FunSuite { testError(ds1, msg) } + test("create processor, reject large step size") { + val expr = "name,foo,:eq,:sum" + val uri = s"http://test/api/v1/graph?q=$expr&step=5m" + val msg = s"IllegalArgumentException: max allowed step size exceeded (PT5M > PT1M)" + val ds1 = Evaluator.DataSources.of(ds("one", uri)) + testError(ds1, msg) + } + test("processor handles multiple steps") { val evaluator = new Evaluator(config, registry, system) diff --git a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/TestContext.scala b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/TestContext.scala index 5132dbc91..d3e50e58f 100644 --- a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/TestContext.scala +++ b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/TestContext.scala @@ -56,6 +56,7 @@ object TestContext { | limits { | max-input-datapoints = 50000 | max-intermediate-datapoints = 10000000 + | max-step = 60s | } | | expression-limit = 50000