Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lwc-events: reuse handlers when syncing expressions #1640

Merged
merged 1 commit into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,35 +23,46 @@ import com.netflix.spectator.api.Clock
import com.netflix.spectator.api.NoopRegistry
import com.netflix.spectator.atlas.impl.QueryIndex

import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong

abstract class AbstractLwcEventClient(clock: Clock) extends LwcEventClient {

import AbstractLwcEventClient.*

@volatile private var handlers: List[EventHandler] = _
private val subHandlers = new ConcurrentHashMap[Subscription, (SpectatorQuery, EventHandler)]()

private val index: QueryIndex[EventHandler] = QueryIndex.newInstance(new NoopRegistry)

@volatile private var index: QueryIndex[EventHandler] = QueryIndex.newInstance(new NoopRegistry)
@volatile private var currentSubs: Subscriptions = Subscriptions()

@volatile private var handlers: List[EventHandler] = _

@volatile private var traceHandlers: Map[Subscription, TraceQuery.SpanFilter] = Map.empty

@volatile private var traceHandlersTS: Map[Subscription, TraceQuery.SpanTimeSeries] = Map.empty

protected def sync(subscriptions: Subscriptions): Unit = {
val diff = Subscriptions.diff(currentSubs, subscriptions)
currentSubs = subscriptions

val flushableHandlers = List.newBuilder[EventHandler]
val idx = QueryIndex.newInstance[EventHandler](new NoopRegistry)

// Pass-through events
subscriptions.passThrough.foreach { sub =>
diff.added.events.foreach { sub =>
val expr = ExprUtils.parseEventExpr(sub.expression)
val q = removeValueClause(expr.query)
val handler = expr match {
case EventExpr.Raw(_) => EventHandler(sub, e => List(e))
case EventExpr.Table(_, cs) => EventHandler(sub, e => List(LwcEvent.Row(e, cs)))
}
idx.add(q, handler)
index.add(q, handler)
subHandlers.put(sub, q -> handler)
}
diff.removed.events.foreach(removeSubscription)

// Analytics based on events
subscriptions.analytics.foreach { sub =>
diff.added.timeSeries.foreach { sub =>
val expr = ExprUtils.parseDataExpr(sub.expression)
val converter = DatapointConverter(sub.id, expr, clock, sub.step, submit)
val q = removeValueClause(expr.query)
Expand All @@ -63,19 +74,55 @@ abstract class AbstractLwcEventClient(clock: Clock) extends LwcEventClient {
},
Some(converter)
)
idx.add(q, handler)
index.add(q, handler)
subHandlers.put(sub, q -> handler)
flushableHandlers += handler
}
diff.removed.timeSeries.foreach(removeSubscription)

// Trace pass-through
traceHandlers = subscriptions.tracePassThrough.map { sub =>
traceHandlers = subscriptions.traceEvents.map { sub =>
sub -> ExprUtils.parseTraceEventsQuery(sub.expression)
}.toMap

index = idx
// Analytics based on traces
diff.added.traceTimeSeries.foreach { sub =>
val tq = ExprUtils.parseTraceTimeSeriesQuery(sub.expression)
val dataExpr = tq.expr.expr.dataExprs.head
val converter = DatapointConverter(sub.id, dataExpr, clock, sub.step, submit)
val q = removeValueClause(dataExpr.query)
val handler = EventHandler(
sub,
event => {
converter.update(event)
Nil
},
Some(converter)
)
subHandlers.put(sub, q -> handler)
flushableHandlers += handler
}
diff.unchanged.traceTimeSeries.foreach { sub =>
val handlerMeta = subHandlers.get(sub)
if (handlerMeta != null)
flushableHandlers += handlerMeta._2
}
diff.removed.traceTimeSeries.foreach(sub => subHandlers.remove(sub))
traceHandlersTS = subscriptions.traceTimeSeries.map { sub =>
sub -> ExprUtils.parseTraceTimeSeriesQuery(sub.expression)
}.toMap

handlers = flushableHandlers.result()
}

private def removeSubscription(sub: Subscription): Unit = {
val handlerMeta = subHandlers.remove(sub)
if (handlerMeta != null) {
val (q, handler) = handlerMeta
index.remove(q, handler)
}
}

private def removeValueClause(query: Query): SpectatorQuery = {
val q = query
.rewrite {
Expand Down Expand Up @@ -109,6 +156,18 @@ abstract class AbstractLwcEventClient(clock: Clock) extends LwcEventClient {
}
}
}
traceHandlersTS.foreachEntry { (sub, tq) =>
if (TraceMatcher.matches(tq.q, trace)) {
val f = tq.expr.expr.dataExprs.head.query
val filtered = trace.filter(event => ExprUtils.matches(f, event.tagValue))
if (filtered.nonEmpty) {
val handlerMeta = subHandlers.get(sub)
if (handlerMeta != null) {
filtered.foreach(handlerMeta._2.mapper)
}
}
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,29 +56,19 @@ private[events] object ExprUtils {

private val traceInterpreter = Interpreter(TraceVocabulary.allWords)

private def matchAllSpans(q: TraceQuery): TraceQuery.SpanFilter = {
TraceQuery.SpanFilter(q, Query.True)
}

/** Parse a single trace events query expression. */
def parseTraceEventsQuery(str: String): TraceQuery.SpanFilter = {
traceInterpreter.execute(str).stack match {
case TraceQueryType(q) :: Nil => matchAllSpans(q)
case (f: TraceQuery.SpanFilter) :: Nil => f
case _ => throw new IllegalArgumentException(str)
case TraceFilterType(tq) :: Nil => tq
case _ => throw new IllegalArgumentException(str)
}
}

private def sumAllSpans(q: TraceQuery): TraceQuery.SpanTimeSeries = {
TraceQuery.SpanTimeSeries(q, StyleExpr(DataExpr.Sum(Query.True), Map.empty))
}

/** Parse a single trace time series query expression. */
def parseTraceTimeSeriesQuery(str: String): TraceQuery.SpanTimeSeries = {
traceInterpreter.execute(str).stack match {
case TraceQueryType(q) :: Nil => sumAllSpans(q)
case (f: TraceQuery.SpanTimeSeries) :: Nil => f
case _ => throw new IllegalArgumentException(str)
case TraceTimeSeriesType(tq) :: Nil => tq
case _ => throw new IllegalArgumentException(str)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,7 @@ package com.netflix.atlas.lwc.events
* Step size for the subscription when mapped into a time series.
* @param expression
* Expression for matching events and mapping into the expected output.
* @param exprType
* Type for the expression.
*/
case class Subscription(id: String, step: Long, expression: String)
case class Subscription(id: String, step: Long, expression: String, exprType: String)
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,63 @@ package com.netflix.atlas.lwc.events
/**
* Set of subscriptions to receive event data.
*
* @param passThrough
* @param events
* Subscriptions looking for the raw events to be passed through.
* @param analytics
* @param timeSeries
* Subscriptions that should be mapped into time series.
* @param tracePassThrough
* @param traceEvents
* Trace subscriptions looking for the trace spans to be passed through.
* @param traceAnalytics
* @param traceTimeSeries
* Trace subscriptions that should map the selected spans into time-series.
*/
case class Subscriptions(
passThrough: List[Subscription] = Nil,
analytics: List[Subscription] = Nil,
tracePassThrough: List[Subscription] = Nil,
traceAnalytics: List[Subscription] = Nil
events: List[Subscription] = Nil,
timeSeries: List[Subscription] = Nil,
traceEvents: List[Subscription] = Nil,
traceTimeSeries: List[Subscription] = Nil
)

object Subscriptions {

val Events = "EVENTS"
val TimeSeries = "TIME_SERIES"
val TraceEvents = "TRACE_EVENTS"
val TraceTimeSeries = "TRACE_TIME_SERIES"

/**
* Create instance from a flattened list with types based on the ExprType enum
* from the eval library.
*/
def fromTypedList(subs: List[Subscription]): Subscriptions = {
val groups = subs.groupBy(_.exprType)
Subscriptions(
events = groups.getOrElse(Events, Nil),
timeSeries = groups.getOrElse(TimeSeries, Nil),
traceEvents = groups.getOrElse(TraceEvents, Nil),
traceTimeSeries = groups.getOrElse(TraceTimeSeries, Nil)
)
}

/** Compute set of added and removed expressions between the two sets. */
def diff(a: Subscriptions, b: Subscriptions): Diff = {
val (addedE, removedE, unchangedE) = diff(a.events, b.events)
val (addedTS, removedTS, unchangedTS) = diff(a.timeSeries, b.timeSeries)
val (addedTE, removedTE, unchangedTE) = diff(a.traceEvents, b.traceEvents)
val (addedTTS, removedTTS, unchangedTTS) = diff(a.traceTimeSeries, b.traceTimeSeries)
val added = Subscriptions(addedE, addedTS, addedTE, addedTTS)
val removed = Subscriptions(removedE, removedTS, removedTE, removedTTS)
val unchanged = Subscriptions(unchangedE, unchangedTS, unchangedTE, unchangedTTS)
Diff(added, removed, unchanged)
}

private def diff[T](a: List[T], b: List[T]): (List[T], List[T], List[T]) = {
val setA = a.toSet
val setB = b.toSet
val added = setB.diff(setA)
val removed = setA.diff(setB)
val unchanged = setA.intersect(setB)
(added.toList, removed.toList, unchanged.toList)
}

case class Diff(added: Subscriptions, removed: Subscriptions, unchanged: Subscriptions)
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ class LwcEventClientSuite extends FunSuite {
private val sampleLwcEvent: LwcEvent = LwcEvent(sampleSpan, extractSpanValue(sampleSpan))

test("pass-through") {
val subs = Subscriptions(passThrough =
val subs = Subscriptions.fromTypedList(
List(
Subscription("1", 60000, "app,foo,:eq"),
Subscription("2", 60000, "app,www,:eq")
Subscription("1", 60000, "app,foo,:eq", Subscriptions.Events),
Subscription("2", 60000, "app,www,:eq", Subscriptions.Events)
)
)
val output = List.newBuilder[String]
Expand All @@ -54,10 +54,10 @@ class LwcEventClientSuite extends FunSuite {
}

test("analytics, basic aggregate") {
val subs = Subscriptions(analytics =
val subs = Subscriptions.fromTypedList(
List(
Subscription("1", step, "app,foo,:eq,:sum"),
Subscription("2", step, "app,www,:eq,:sum")
Subscription("1", step, "app,foo,:eq,:sum", Subscriptions.TimeSeries),
Subscription("2", step, "app,www,:eq,:sum", Subscriptions.TimeSeries)
)
)
val output = List.newBuilder[String]
Expand All @@ -70,9 +70,14 @@ class LwcEventClientSuite extends FunSuite {
}

test("analytics, basic aggregate extract value") {
val subs = Subscriptions(analytics =
val subs = Subscriptions.fromTypedList(
List(
Subscription("1", step, "app,www,:eq,value,duration,:eq,:and,:sum")
Subscription(
"1",
step,
"app,www,:eq,value,duration,:eq,:and,:sum",
Subscriptions.TimeSeries
)
)
)
val output = List.newBuilder[String]
Expand All @@ -87,10 +92,10 @@ class LwcEventClientSuite extends FunSuite {
}

test("analytics, group by") {
val subs = Subscriptions(analytics =
val subs = Subscriptions.fromTypedList(
List(
Subscription("1", step, "app,foo,:eq,:sum,(,node,),:by"),
Subscription("2", step, "app,www,:eq,:sum,(,node,),:by")
Subscription("1", step, "app,foo,:eq,:sum,(,node,),:by", Subscriptions.TimeSeries),
Subscription("2", step, "app,www,:eq,:sum,(,node,),:by", Subscriptions.TimeSeries)
)
)
val output = List.newBuilder[String]
Expand All @@ -105,14 +110,29 @@ class LwcEventClientSuite extends FunSuite {
}

test("analytics, group by missing key") {
val subs = Subscriptions(analytics =
val subs = Subscriptions.fromTypedList(
List(
Subscription("1", 60000, "app,www,:eq,:sum,(,foo,),:by")
Subscription("1", 60000, "app,www,:eq,:sum,(,foo,),:by", Subscriptions.TimeSeries)
)
)
val output = List.newBuilder[String]
val client = LwcEventClient(subs, output.addOne)
client.process(sampleLwcEvent)
assert(output.result().isEmpty)
}

test("trace analytics, basic aggregate") {
val subs = Subscriptions.fromTypedList(
List(
Subscription("1", step, "app,www,:eq", Subscriptions.TraceTimeSeries)
)
)
val output = List.newBuilder[String]
val client = LwcEventClient(subs, output.addOne, clock)
client.processTrace(Seq(new TestSpan(sampleSpan)))
clock.setWallTime(step)
client.process(LwcEvent.HeartbeatLwcEvent(step))
val vs = output.result()
assertEquals(vs.size, 1)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,19 @@ object LwcEventSuite {
case k => span.tags.getOrElse(k, null)
}
}

class TestSpan(event: TestEvent) extends LwcEvent.Span {

override def spanId: String = "test"

override def parentId: String = "parent"

override def rawEvent: Any = event

override def timestamp: Long = 0L

override def extractValue(key: String): Any = {
extractSpanValue(event)(key)
}
}
}
8 changes: 4 additions & 4 deletions atlas-spring-lwc-events/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ atlas.lwc.events {
// subscription = "app,foo,:eq,:sum"
// }
subscriptions {
pass-through = []
analytics = []
trace-pass-through = []
trace-analytics = []
events = []
time-series = []
trace-events = []
trace-time-series = []
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,21 @@ class LwcEventConfiguration {
private def toSubscriptions(config: Config): Subscriptions = {
val cfg = config.getConfig("atlas.lwc.events.subscriptions")
Subscriptions(
passThrough = toSubscriptions(cfg.getConfigList("pass-through")),
analytics = toSubscriptions(cfg.getConfigList("analytics")),
tracePassThrough = toSubscriptions(cfg.getConfigList("trace-pass-through")),
traceAnalytics = toSubscriptions(cfg.getConfigList("trace-analytics"))
events = toSubscriptions(cfg.getConfigList("events"), Subscriptions.Events),
timeSeries = toSubscriptions(cfg.getConfigList("time-series"), Subscriptions.TimeSeries),
traceEvents = toSubscriptions(cfg.getConfigList("trace-events"), Subscriptions.TraceEvents),
traceTimeSeries =
toSubscriptions(cfg.getConfigList("trace-time-series"), Subscriptions.TraceTimeSeries)
)
}

private def toSubscriptions(configs: java.util.List[? <: Config]): List[Subscription] = {
private def toSubscriptions(
configs: java.util.List[? <: Config],
exprType: String
): List[Subscription] = {
configs.asScala.toList.map { c =>
val step = if (c.hasPath("step")) c.getDuration("step").toMillis else 60_000L
Subscription(c.getString("id"), step, c.getString("expression"))
Subscription(c.getString("id"), step, c.getString("expression"), exprType)
}
}
}
Loading