Skip to content

Commit

Permalink
lwcapi: use subv2 type for events (Netflix#1635)
Browse files Browse the repository at this point in the history
Update the subscribe API to send back the subscription
V2 message when using event types other than time series.
Time series can be converted once all clients are updated.
  • Loading branch information
brharrington authored and manolama committed May 22, 2024
1 parent 2fb4f04 commit 9ddd346
Show file tree
Hide file tree
Showing 8 changed files with 171 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import com.netflix.atlas.core.model.Expr
import com.netflix.atlas.core.model.FilterExpr
import com.netflix.atlas.core.model.ModelExtractors
import com.netflix.atlas.core.model.StatefulExpr
import com.netflix.atlas.core.model.StyleExpr
import com.netflix.atlas.core.model.TraceQuery
import com.netflix.atlas.core.model.TraceVocabulary
import com.netflix.atlas.core.stacklang.Interpreter
Expand All @@ -37,10 +38,12 @@ import com.typesafe.config.Config

import scala.util.Success

private[stream] class ExprInterpreter(config: Config) {
class ExprInterpreter(config: Config) {

import ExprInterpreter.*

private val tsInterpreter = Interpreter(new CustomVocabulary(config).allWords)

private val eventInterpreter = Interpreter(
new CustomVocabulary(config, List(EventVocabulary)).allWords
)
Expand All @@ -60,23 +63,32 @@ private[stream] class ExprInterpreter(config: Config) {
// time shifts, filters, and integral. The filters and integral are excluded because
// they can be confusing as the time window for evaluation is not bounded.
val results = graphCfg.exprs.flatMap(_.perOffset)
results.foreach { result =>
results.foreach(validate)

// Perform host rewrites based on the Atlas hostname
val host = uri.authority.host.toString()
val rewritten = hostRewriter.rewrite(host, results)
graphCfg.copy(query = rewritten.mkString(","), parsedQuery = Success(rewritten))
}

/**
* Check that data expressions are supported. The streaming path doesn't support
* time shifts, filters, and integral. The filters and integral are excluded because
* they can be confusing as the time window for evaluation is not bounded.
*/
private def validate(styleExpr: StyleExpr): Unit = {
styleExpr.perOffset.foreach { s =>
// Use rewrite as a helper for searching the expression for invalid operations
result.expr.rewrite {
s.expr.rewrite {
case op: StatefulExpr.Integral => invalidOperator(op); op
case op: FilterExpr => invalidOperator(op); op
case op: DataExpr if !op.offset.isZero => invalidOperator(op); op
}

// Double check all data expressions do not have an offset. In some cases for named rewrites
// the check above may not detect the offset.
result.expr.dataExprs.filterNot(_.offset.isZero).foreach(invalidOperator)
s.expr.dataExprs.filterNot(_.offset.isZero).foreach(invalidOperator)
}

// Perform host rewrites based on the Atlas hostname
val host = uri.authority.host.toString()
val rewritten = hostRewriter.rewrite(host, results)
graphCfg.copy(query = rewritten.mkString(","), parsedQuery = Success(rewritten))
}

private def invalidOperator(expr: Expr): Unit = {
Expand All @@ -103,45 +115,80 @@ private[stream] class ExprInterpreter(config: Config) {
}

private def invalidValue(value: Any): IllegalArgumentException = {
new IllegalArgumentException(s"invalid value on stack; $value")
new IllegalArgumentException(s"invalid value on stack: $value")
}

private def parseTimeSeries(query: String): List[StyleExpr] = {
val exprs = tsInterpreter.execute(query).stack.map {
case ModelExtractors.PresentationType(t) => t
case value => throw invalidValue(value)
}
exprs.foreach(validate)
exprs
}

private def parseEvents(query: String): List[EventExpr] = {
eventInterpreter.execute(query).stack.map {
case ModelExtractors.EventExprType(t) => t
case value => throw invalidValue(value)
}
}

private def evalEvents(uri: Uri): List[EventExpr] = {
uri.query().get("q") match {
case Some(query) =>
eventInterpreter.execute(query).stack.map {
case ModelExtractors.EventExprType(t) => t
case value => throw invalidValue(value)
}
parseEvents(query)
case None =>
throw new IllegalArgumentException(s"missing required parameter: q ($uri)")
}
}

private def parseTraceEvents(query: String): List[TraceQuery.SpanFilter] = {
traceInterpreter.execute(query).stack.map {
case ModelExtractors.TraceFilterType(t) => t
case value => throw invalidValue(value)
}
}

private def evalTraceEvents(uri: Uri): List[TraceQuery.SpanFilter] = {
uri.query().get("q") match {
case Some(query) =>
traceInterpreter.execute(query).stack.map {
case ModelExtractors.TraceFilterType(t) => t
case value => throw invalidValue(value)
}
parseTraceEvents(query)
case None =>
throw new IllegalArgumentException(s"missing required parameter: q ($uri)")
}
}

private def parseTraceTimeSeries(query: String): List[TraceQuery.SpanTimeSeries] = {
val exprs = traceInterpreter.execute(query).stack.map {
case ModelExtractors.TraceTimeSeriesType(t) => t
case value => throw invalidValue(value)
}
exprs.foreach(t => validate(t.expr))
exprs
}

private def evalTraceTimeSeries(uri: Uri): List[TraceQuery.SpanTimeSeries] = {
uri.query().get("q") match {
case Some(query) =>
traceInterpreter.execute(query).stack.map {
case ModelExtractors.TraceTimeSeriesType(t) => t
case value => throw invalidValue(value)
}
parseTraceTimeSeries(query)
case None =>
throw new IllegalArgumentException(s"missing required parameter: q ($uri)")
}
}

/** Parse an expression based on the type. */
def parseQuery(expr: String, exprType: ExprType): List[Expr] = {
val exprs = exprType match {
case ExprType.TIME_SERIES => parseTimeSeries(expr)
case ExprType.EVENTS => parseEvents(expr)
case ExprType.TRACE_EVENTS => parseTraceEvents(expr)
case ExprType.TRACE_TIME_SERIES => parseTraceTimeSeries(expr)
}
exprs.distinct
}

/** Parse an expression that is part of a URI. */
def parseQuery(uri: Uri): (ExprType, List[Expr]) = {
val exprType = determineExprType(uri)
val exprs = exprType match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,16 @@ case class ExpressionMetadata(
object ExpressionMetadata {

def apply(expression: String, exprType: ExprType, step: Long): ExpressionMetadata = {
val f = if (step > 0) step else ApiSettings.defaultStep
new ExpressionMetadata(expression, exprType, f, computeId(expression, f))
val dfltStep = if (exprType.isTimeSeriesType) ApiSettings.defaultStep else 0L
val f = if (step > 0) step else dfltStep
new ExpressionMetadata(expression, exprType, f, computeId(expression, exprType, f))
}

def apply(expression: String): ExpressionMetadata = {
apply(expression, ExprType.TIME_SERIES, ApiSettings.defaultStep)
}

def computeId(e: String, f: Long): String = {
Strings.zeroPad(Hash.sha1bytes(s"$f~$e"), 40)
def computeId(e: String, t: ExprType, f: Long): String = {
Strings.zeroPad(Hash.sha1bytes(s"$f~$t~$e"), 40)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@ package com.netflix.atlas.lwcapi

import java.util.concurrent.TimeUnit
import com.github.benmanes.caffeine.cache.Caffeine
import com.netflix.atlas.core.model.CustomVocabulary
import com.netflix.atlas.core.model.DataExpr
import com.netflix.atlas.core.model.ModelExtractors
import com.netflix.atlas.core.model.EventExpr
import com.netflix.atlas.core.model.Query
import com.netflix.atlas.core.model.Query.KeyQuery
import com.netflix.atlas.core.stacklang.Interpreter
import com.netflix.atlas.core.model.StyleExpr
import com.netflix.atlas.eval.model.ExprType
import com.netflix.atlas.eval.stream.ExprInterpreter
import com.netflix.spectator.ipc.ServerGroup
import com.typesafe.config.Config

Expand All @@ -42,7 +41,7 @@ class ExpressionSplitter(config: Config) {

private val keepKeys = Set("nf.app", "nf.cluster", "nf.shard1", "nf.shard2", "nf.stack")

private val interpreter = Interpreter(new CustomVocabulary(config).allWords)
private val interpreter = new ExprInterpreter(config)

/**
* Processing the expressions can be quite expensive. In particular compiling regular
Expand Down Expand Up @@ -115,25 +114,31 @@ class ExpressionSplitter(config: Config) {
}
}

private def parse(expression: String): Try[List[DataExprMeta]] = Try {
val context = interpreter.execute(expression)
val dataExprs = context.stack.flatMap {
case ModelExtractors.PresentationType(t) => t.perOffset.flatMap(_.expr.dataExprs)
case _ => throw new IllegalArgumentException("expression is invalid")
}

// Offsets are not supported
dataExprs.foreach { dataExpr =>
if (!dataExpr.offset.isZero) {
throw new IllegalArgumentException(
s":offset not supported for streaming evaluation [[$dataExpr]]"
)
}
}

dataExprs.distinct.map { e =>
val q = intern(compress(e.query))
DataExprMeta(e, e.toString, q)
private def parse(expression: String, exprType: ExprType): Try[List[DataExprMeta]] = Try {
val parsedExpressions = interpreter.parseQuery(expression, exprType)
exprType match {
case ExprType.EVENTS =>
parsedExpressions.collect {
case e: EventExpr =>
val q = intern(compress(e.query))
DataExprMeta(e.toString, q)
}
case ExprType.TIME_SERIES =>
parsedExpressions
.collect {
case se: StyleExpr => se.expr.dataExprs
}
.flatten
.distinct
.map { e =>
val q = intern(compress(e.query))
DataExprMeta(e.toString, q)
}
case ExprType.TRACE_EVENTS | ExprType.TRACE_TIME_SERIES =>
parsedExpressions.map { e =>
// Tracing cannot be scoped to specific infrastructure, always use True
DataExprMeta(e.toString, Query.True)
}
}
}

Expand All @@ -142,19 +147,20 @@ class ExpressionSplitter(config: Config) {
* contention and most threads are blocked. This just does and get/put which potentially
* recomputes some values, but for this case that is preferable.
*/
private def getFromCache(k: String): Try[List[DataExprMeta]] = {
val value = exprCache.getIfPresent(k)
private def getFromCache(k: String, exprType: ExprType): Try[List[DataExprMeta]] = {
val key = s"$k:$exprType"
val value = exprCache.getIfPresent(key)
if (value == null) {
val tmp = parse(k)
exprCache.put(k, tmp)
val tmp = parse(k, exprType)
exprCache.put(key, tmp)
tmp
} else {
value
}
}

def split(expression: String, exprType: ExprType, frequency: Long): List[Subscription] = {
getFromCache(expression) match {
getFromCache(expression, exprType) match {
case Success(exprs: List[?]) => exprs.map(e => toSubscription(e, exprType, frequency))
case Failure(t) => throw t
}
Expand Down Expand Up @@ -207,5 +213,5 @@ class ExpressionSplitter(config: Config) {
}

object ExpressionSplitter {
private case class DataExprMeta(expr: DataExpr, exprString: String, compressedQuery: Query)
private case class DataExprMeta(exprString: String, compressedQuery: Query)
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.netflix.atlas.lwcapi

import com.netflix.atlas.eval.model.ExprType
import org.apache.pekko.NotUsed
import org.apache.pekko.http.scaladsl.model.ws.BinaryMessage
import org.apache.pekko.http.scaladsl.model.ws.Message
Expand All @@ -31,6 +32,7 @@ import com.netflix.atlas.eval.model.LwcDataExpr
import com.netflix.atlas.eval.model.LwcHeartbeat
import com.netflix.atlas.eval.model.LwcMessages
import com.netflix.atlas.eval.model.LwcSubscription
import com.netflix.atlas.eval.model.LwcSubscriptionV2
import com.netflix.atlas.json.JsonSupport
import com.netflix.atlas.pekko.CustomDirectives.*
import com.netflix.atlas.pekko.DiagnosticMessage
Expand Down Expand Up @@ -163,7 +165,11 @@ class SubscribeApi(
.flatMapConcat { _ =>
val steps = sm
.subscriptionsForStream(streamId)
.map(_.metadata.frequency)
.map { sub =>
// For events where step doesn't really matter use 5s as that is the typical heartbeat
// frequency. This only gets used for the time associated with the heartbeat messages.
if (sub.metadata.frequency == 0L) 5_000L else sub.metadata.frequency
}
.distinct
.map { step =>
// To account for some delays for data coming from real systems, the heartbeat
Expand Down Expand Up @@ -198,7 +204,13 @@ class SubscribeApi(
val subMessages = addedSubs.map { sub =>
val meta = sub.metadata
val exprInfo = LwcDataExpr(meta.id, meta.expression, meta.frequency)
LwcSubscription(expr.expression, List(exprInfo))
if (expr.exprType == ExprType.TIME_SERIES) {
// For backwards compatibility for older versions of eval library, use v1
// subscription response when it is a time series type
LwcSubscription(expr.expression, List(exprInfo))
} else {
LwcSubscriptionV2(expr.expression, expr.exprType, List(exprInfo))
}
}
queue.offer(subMessages)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,11 @@ class ExpressionApiSuite extends MUnitRouteSuite {
}

private val skanCount =
"""{"expression":"nf.cluster,skan,:eq,:count","exprType":"TIME_SERIES","frequency":60000,"id":"6278fa6047c07316d7e265a1004882ab9e1007af"}"""
"""{"expression":"nf.cluster,skan,:eq,:count","exprType":"TIME_SERIES","frequency":60000,"id":"039722fefa66c2cdd0147595fb9b9a50351f90f0"}"""

private val skanSum =
"""{"expression":"nf.cluster,skan,:eq,:sum","exprType":"TIME_SERIES","frequency":60000,"id":"36e0a2c61b48e062bba5361d059afd313c82c674"}"""
"""{"expression":"nf.cluster,skan,:eq,:sum","exprType":"TIME_SERIES","frequency":60000,"id":"17e0dc5b1224c825c81cf033c46d0f0490c1ca7f"}"""

private val brhMax =
"""{"expression":"nf.app,brh,:eq,:max","exprType":"TIME_SERIES","frequency":60000,"id":"16f1b0930c0eeae0225374ea88c01e161e589aff"}"""
"""{"expression":"nf.app,brh,:eq,:max","exprType":"TIME_SERIES","frequency":60000,"id":"b19b2aa2c802c29216d4aa36024f71a3c92f84db"}"""
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class ExpressionMetadataSuite extends FunSuite {

test("computes id") {
val expr = ExpressionMetadata("test")
assertEquals(expr.id, "2684d3c5cb245bd2fd6ee4ea30a500e97ace8141")
assertEquals(expr.id, "59ec3895749cc3fb279d41dabc1d4943361de999")
}

test("id computation considers frequency") {
Expand Down Expand Up @@ -114,14 +114,14 @@ class ExpressionMetadataSuite extends FunSuite {

test("renders as json with default frequency") {
val expected =
"{\"expression\":\"this\",\"exprType\":\"TIME_SERIES\",\"frequency\":60000,\"id\":\"fc3a081088771e05bdc3aa99ffd8770157dfe6ce\"}"
"{\"expression\":\"this\",\"exprType\":\"TIME_SERIES\",\"frequency\":60000,\"id\":\"8189ff24a1e801c924689aeb0490d5d840f23582\"}"
val json = ExpressionMetadata("this").toJson
assertEquals(expected, json)
}

test("renders as json with frequency of 0") {
val expected =
"{\"expression\":\"this\",\"exprType\":\"TIME_SERIES\",\"frequency\":60000,\"id\":\"fc3a081088771e05bdc3aa99ffd8770157dfe6ce\"}"
"{\"expression\":\"this\",\"exprType\":\"TIME_SERIES\",\"frequency\":60000,\"id\":\"8189ff24a1e801c924689aeb0490d5d840f23582\"}"
val json = ExpressionMetadata("this", ExprType.TIME_SERIES, 0).toJson
assertEquals(expected, json)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class ExpressionSplitterSuite extends FunSuite {
val msg = intercept[IllegalArgumentException] {
splitter.split("foo", ExprType.TIME_SERIES, frequency1)
}
assertEquals(msg.getMessage, "expression is invalid")
assertEquals(msg.getMessage, "invalid value on stack: foo")
}

test("throws IAE for expressions with offset") {
Expand Down
Loading

0 comments on commit 9ddd346

Please sign in to comment.