Skip to content

Commit

Permalink
lwc-events: reuse handlers when syncing expressions (Netflix#1640)
Browse files Browse the repository at this point in the history
Update the sync logic to reuse the existing handlers rather
than completely replace them. For data point converters this
helps avoid losing state during the sync.

Also changes some of the naming to be more consistent with
the expressions response.
  • Loading branch information
brharrington authored and manolama committed May 22, 2024
1 parent 90e507e commit ef23011
Show file tree
Hide file tree
Showing 8 changed files with 190 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,35 +23,46 @@ import com.netflix.spectator.api.Clock
import com.netflix.spectator.api.NoopRegistry
import com.netflix.spectator.atlas.impl.QueryIndex

import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong

abstract class AbstractLwcEventClient(clock: Clock) extends LwcEventClient {

import AbstractLwcEventClient.*

@volatile private var handlers: List[EventHandler] = _
private val subHandlers = new ConcurrentHashMap[Subscription, (SpectatorQuery, EventHandler)]()

private val index: QueryIndex[EventHandler] = QueryIndex.newInstance(new NoopRegistry)

@volatile private var index: QueryIndex[EventHandler] = QueryIndex.newInstance(new NoopRegistry)
@volatile private var currentSubs: Subscriptions = Subscriptions()

@volatile private var handlers: List[EventHandler] = _

@volatile private var traceHandlers: Map[Subscription, TraceQuery.SpanFilter] = Map.empty

@volatile private var traceHandlersTS: Map[Subscription, TraceQuery.SpanTimeSeries] = Map.empty

protected def sync(subscriptions: Subscriptions): Unit = {
val diff = Subscriptions.diff(currentSubs, subscriptions)
currentSubs = subscriptions

val flushableHandlers = List.newBuilder[EventHandler]
val idx = QueryIndex.newInstance[EventHandler](new NoopRegistry)

// Pass-through events
subscriptions.passThrough.foreach { sub =>
diff.added.events.foreach { sub =>
val expr = ExprUtils.parseEventExpr(sub.expression)
val q = removeValueClause(expr.query)
val handler = expr match {
case EventExpr.Raw(_) => EventHandler(sub, e => List(e))
case EventExpr.Table(_, cs) => EventHandler(sub, e => List(LwcEvent.Row(e, cs)))
}
idx.add(q, handler)
index.add(q, handler)
subHandlers.put(sub, q -> handler)
}
diff.removed.events.foreach(removeSubscription)

// Analytics based on events
subscriptions.analytics.foreach { sub =>
diff.added.timeSeries.foreach { sub =>
val expr = ExprUtils.parseDataExpr(sub.expression)
val converter = DatapointConverter(sub.id, expr, clock, sub.step, submit)
val q = removeValueClause(expr.query)
Expand All @@ -63,19 +74,55 @@ abstract class AbstractLwcEventClient(clock: Clock) extends LwcEventClient {
},
Some(converter)
)
idx.add(q, handler)
index.add(q, handler)
subHandlers.put(sub, q -> handler)
flushableHandlers += handler
}
diff.removed.timeSeries.foreach(removeSubscription)

// Trace pass-through
traceHandlers = subscriptions.tracePassThrough.map { sub =>
traceHandlers = subscriptions.traceEvents.map { sub =>
sub -> ExprUtils.parseTraceEventsQuery(sub.expression)
}.toMap

index = idx
// Analytics based on traces
diff.added.traceTimeSeries.foreach { sub =>
val tq = ExprUtils.parseTraceTimeSeriesQuery(sub.expression)
val dataExpr = tq.expr.expr.dataExprs.head
val converter = DatapointConverter(sub.id, dataExpr, clock, sub.step, submit)
val q = removeValueClause(dataExpr.query)
val handler = EventHandler(
sub,
event => {
converter.update(event)
Nil
},
Some(converter)
)
subHandlers.put(sub, q -> handler)
flushableHandlers += handler
}
diff.unchanged.traceTimeSeries.foreach { sub =>
val handlerMeta = subHandlers.get(sub)
if (handlerMeta != null)
flushableHandlers += handlerMeta._2
}
diff.removed.traceTimeSeries.foreach(sub => subHandlers.remove(sub))
traceHandlersTS = subscriptions.traceTimeSeries.map { sub =>
sub -> ExprUtils.parseTraceTimeSeriesQuery(sub.expression)
}.toMap

handlers = flushableHandlers.result()
}

private def removeSubscription(sub: Subscription): Unit = {
val handlerMeta = subHandlers.remove(sub)
if (handlerMeta != null) {
val (q, handler) = handlerMeta
index.remove(q, handler)
}
}

private def removeValueClause(query: Query): SpectatorQuery = {
val q = query
.rewrite {
Expand Down Expand Up @@ -109,6 +156,18 @@ abstract class AbstractLwcEventClient(clock: Clock) extends LwcEventClient {
}
}
}
traceHandlersTS.foreachEntry { (sub, tq) =>
if (TraceMatcher.matches(tq.q, trace)) {
val f = tq.expr.expr.dataExprs.head.query
val filtered = trace.filter(event => ExprUtils.matches(f, event.tagValue))
if (filtered.nonEmpty) {
val handlerMeta = subHandlers.get(sub)
if (handlerMeta != null) {
filtered.foreach(handlerMeta._2.mapper)
}
}
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,29 +56,19 @@ private[events] object ExprUtils {

private val traceInterpreter = Interpreter(TraceVocabulary.allWords)

private def matchAllSpans(q: TraceQuery): TraceQuery.SpanFilter = {
TraceQuery.SpanFilter(q, Query.True)
}

/** Parse a single trace events query expression. */
def parseTraceEventsQuery(str: String): TraceQuery.SpanFilter = {
traceInterpreter.execute(str).stack match {
case TraceQueryType(q) :: Nil => matchAllSpans(q)
case (f: TraceQuery.SpanFilter) :: Nil => f
case _ => throw new IllegalArgumentException(str)
case TraceFilterType(tq) :: Nil => tq
case _ => throw new IllegalArgumentException(str)
}
}

private def sumAllSpans(q: TraceQuery): TraceQuery.SpanTimeSeries = {
TraceQuery.SpanTimeSeries(q, StyleExpr(DataExpr.Sum(Query.True), Map.empty))
}

/** Parse a single trace time series query expression. */
def parseTraceTimeSeriesQuery(str: String): TraceQuery.SpanTimeSeries = {
traceInterpreter.execute(str).stack match {
case TraceQueryType(q) :: Nil => sumAllSpans(q)
case (f: TraceQuery.SpanTimeSeries) :: Nil => f
case _ => throw new IllegalArgumentException(str)
case TraceTimeSeriesType(tq) :: Nil => tq
case _ => throw new IllegalArgumentException(str)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,7 @@ package com.netflix.atlas.lwc.events
* Step size for the subscription when mapped into a time series.
* @param expression
* Expression for matching events and mapping into the expected output.
* @param exprType
* Type for the expression.
*/
case class Subscription(id: String, step: Long, expression: String)
case class Subscription(id: String, step: Long, expression: String, exprType: String)
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,63 @@ package com.netflix.atlas.lwc.events
/**
* Set of subscriptions to receive event data.
*
* @param passThrough
* @param events
* Subscriptions looking for the raw events to be passed through.
* @param analytics
* @param timeSeries
* Subscriptions that should be mapped into time series.
* @param tracePassThrough
* @param traceEvents
* Trace subscriptions looking for the trace spans to be passed through.
* @param traceAnalytics
* @param traceTimeSeries
* Trace subscriptions that should map the selected spans into time-series.
*/
case class Subscriptions(
passThrough: List[Subscription] = Nil,
analytics: List[Subscription] = Nil,
tracePassThrough: List[Subscription] = Nil,
traceAnalytics: List[Subscription] = Nil
events: List[Subscription] = Nil,
timeSeries: List[Subscription] = Nil,
traceEvents: List[Subscription] = Nil,
traceTimeSeries: List[Subscription] = Nil
)

object Subscriptions {

val Events = "EVENTS"
val TimeSeries = "TIME_SERIES"
val TraceEvents = "TRACE_EVENTS"
val TraceTimeSeries = "TRACE_TIME_SERIES"

/**
* Create instance from a flattened list with types based on the ExprType enum
* from the eval library.
*/
def fromTypedList(subs: List[Subscription]): Subscriptions = {
val groups = subs.groupBy(_.exprType)
Subscriptions(
events = groups.getOrElse(Events, Nil),
timeSeries = groups.getOrElse(TimeSeries, Nil),
traceEvents = groups.getOrElse(TraceEvents, Nil),
traceTimeSeries = groups.getOrElse(TraceTimeSeries, Nil)
)
}

/** Compute set of added and removed expressions between the two sets. */
def diff(a: Subscriptions, b: Subscriptions): Diff = {
val (addedE, removedE, unchangedE) = diff(a.events, b.events)
val (addedTS, removedTS, unchangedTS) = diff(a.timeSeries, b.timeSeries)
val (addedTE, removedTE, unchangedTE) = diff(a.traceEvents, b.traceEvents)
val (addedTTS, removedTTS, unchangedTTS) = diff(a.traceTimeSeries, b.traceTimeSeries)
val added = Subscriptions(addedE, addedTS, addedTE, addedTTS)
val removed = Subscriptions(removedE, removedTS, removedTE, removedTTS)
val unchanged = Subscriptions(unchangedE, unchangedTS, unchangedTE, unchangedTTS)
Diff(added, removed, unchanged)
}

private def diff[T](a: List[T], b: List[T]): (List[T], List[T], List[T]) = {
val setA = a.toSet
val setB = b.toSet
val added = setB.diff(setA)
val removed = setA.diff(setB)
val unchanged = setA.intersect(setB)
(added.toList, removed.toList, unchanged.toList)
}

case class Diff(added: Subscriptions, removed: Subscriptions, unchanged: Subscriptions)
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ class LwcEventClientSuite extends FunSuite {
private val sampleLwcEvent: LwcEvent = LwcEvent(sampleSpan, extractSpanValue(sampleSpan))

test("pass-through") {
val subs = Subscriptions(passThrough =
val subs = Subscriptions.fromTypedList(
List(
Subscription("1", 60000, "app,foo,:eq"),
Subscription("2", 60000, "app,www,:eq")
Subscription("1", 60000, "app,foo,:eq", Subscriptions.Events),
Subscription("2", 60000, "app,www,:eq", Subscriptions.Events)
)
)
val output = List.newBuilder[String]
Expand All @@ -54,10 +54,10 @@ class LwcEventClientSuite extends FunSuite {
}

test("analytics, basic aggregate") {
val subs = Subscriptions(analytics =
val subs = Subscriptions.fromTypedList(
List(
Subscription("1", step, "app,foo,:eq,:sum"),
Subscription("2", step, "app,www,:eq,:sum")
Subscription("1", step, "app,foo,:eq,:sum", Subscriptions.TimeSeries),
Subscription("2", step, "app,www,:eq,:sum", Subscriptions.TimeSeries)
)
)
val output = List.newBuilder[String]
Expand All @@ -70,9 +70,14 @@ class LwcEventClientSuite extends FunSuite {
}

test("analytics, basic aggregate extract value") {
val subs = Subscriptions(analytics =
val subs = Subscriptions.fromTypedList(
List(
Subscription("1", step, "app,www,:eq,value,duration,:eq,:and,:sum")
Subscription(
"1",
step,
"app,www,:eq,value,duration,:eq,:and,:sum",
Subscriptions.TimeSeries
)
)
)
val output = List.newBuilder[String]
Expand All @@ -87,10 +92,10 @@ class LwcEventClientSuite extends FunSuite {
}

test("analytics, group by") {
val subs = Subscriptions(analytics =
val subs = Subscriptions.fromTypedList(
List(
Subscription("1", step, "app,foo,:eq,:sum,(,node,),:by"),
Subscription("2", step, "app,www,:eq,:sum,(,node,),:by")
Subscription("1", step, "app,foo,:eq,:sum,(,node,),:by", Subscriptions.TimeSeries),
Subscription("2", step, "app,www,:eq,:sum,(,node,),:by", Subscriptions.TimeSeries)
)
)
val output = List.newBuilder[String]
Expand All @@ -105,14 +110,29 @@ class LwcEventClientSuite extends FunSuite {
}

test("analytics, group by missing key") {
val subs = Subscriptions(analytics =
val subs = Subscriptions.fromTypedList(
List(
Subscription("1", 60000, "app,www,:eq,:sum,(,foo,),:by")
Subscription("1", 60000, "app,www,:eq,:sum,(,foo,),:by", Subscriptions.TimeSeries)
)
)
val output = List.newBuilder[String]
val client = LwcEventClient(subs, output.addOne)
client.process(sampleLwcEvent)
assert(output.result().isEmpty)
}

test("trace analytics, basic aggregate") {
val subs = Subscriptions.fromTypedList(
List(
Subscription("1", step, "app,www,:eq", Subscriptions.TraceTimeSeries)
)
)
val output = List.newBuilder[String]
val client = LwcEventClient(subs, output.addOne, clock)
client.processTrace(Seq(new TestSpan(sampleSpan)))
clock.setWallTime(step)
client.process(LwcEvent.HeartbeatLwcEvent(step))
val vs = output.result()
assertEquals(vs.size, 1)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,19 @@ object LwcEventSuite {
case k => span.tags.getOrElse(k, null)
}
}

class TestSpan(event: TestEvent) extends LwcEvent.Span {

override def spanId: String = "test"

override def parentId: String = "parent"

override def rawEvent: Any = event

override def timestamp: Long = 0L

override def extractValue(key: String): Any = {
extractSpanValue(event)(key)
}
}
}
8 changes: 4 additions & 4 deletions atlas-spring-lwc-events/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ atlas.lwc.events {
// subscription = "app,foo,:eq,:sum"
// }
subscriptions {
pass-through = []
analytics = []
trace-pass-through = []
trace-analytics = []
events = []
time-series = []
trace-events = []
trace-time-series = []
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,21 @@ class LwcEventConfiguration {
private def toSubscriptions(config: Config): Subscriptions = {
val cfg = config.getConfig("atlas.lwc.events.subscriptions")
Subscriptions(
passThrough = toSubscriptions(cfg.getConfigList("pass-through")),
analytics = toSubscriptions(cfg.getConfigList("analytics")),
tracePassThrough = toSubscriptions(cfg.getConfigList("trace-pass-through")),
traceAnalytics = toSubscriptions(cfg.getConfigList("trace-analytics"))
events = toSubscriptions(cfg.getConfigList("events"), Subscriptions.Events),
timeSeries = toSubscriptions(cfg.getConfigList("time-series"), Subscriptions.TimeSeries),
traceEvents = toSubscriptions(cfg.getConfigList("trace-events"), Subscriptions.TraceEvents),
traceTimeSeries =
toSubscriptions(cfg.getConfigList("trace-time-series"), Subscriptions.TraceTimeSeries)
)
}

private def toSubscriptions(configs: java.util.List[? <: Config]): List[Subscription] = {
private def toSubscriptions(
configs: java.util.List[? <: Config],
exprType: String
): List[Subscription] = {
configs.asScala.toList.map { c =>
val step = if (c.hasPath("step")) c.getDuration("step").toMillis else 60_000L
Subscription(c.getString("id"), step, c.getString("expression"))
Subscription(c.getString("id"), step, c.getString("expression"), exprType)
}
}
}

0 comments on commit ef23011

Please sign in to comment.