From 5eef485980a6e01239cc08211b937a8dc3c44210 Mon Sep 17 00:00:00 2001 From: Brian Harrington Date: Fri, 29 Mar 2024 08:25:32 -0500 Subject: [PATCH] lwc-events: reuse handlers when syncing expressions Update the sync logic to reuse the existing handlers rather than completely replace them. For data point converters this helps avoid losing state during the sync. Also changes some of the naming to be more consistent with the expressions response. --- .../lwc/events/AbstractLwcEventClient.scala | 77 ++++++++++++++++--- .../netflix/atlas/lwc/events/ExprUtils.scala | 18 +---- .../atlas/lwc/events/Subscription.scala | 4 +- .../atlas/lwc/events/Subscriptions.scala | 61 +++++++++++++-- .../lwc/events/LwcEventClientSuite.scala | 46 +++++++---- .../atlas/lwc/events/LwcEventSuite.scala | 15 ++++ .../src/main/resources/reference.conf | 8 +- .../lwc/events/LwcEventConfiguration.scala | 16 ++-- 8 files changed, 190 insertions(+), 55 deletions(-) diff --git a/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/AbstractLwcEventClient.scala b/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/AbstractLwcEventClient.scala index d7d18de8b..24f873433 100644 --- a/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/AbstractLwcEventClient.scala +++ b/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/AbstractLwcEventClient.scala @@ -23,35 +23,46 @@ import com.netflix.spectator.api.Clock import com.netflix.spectator.api.NoopRegistry import com.netflix.spectator.atlas.impl.QueryIndex +import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicLong abstract class AbstractLwcEventClient(clock: Clock) extends LwcEventClient { import AbstractLwcEventClient.* - @volatile private var handlers: List[EventHandler] = _ + private val subHandlers = new ConcurrentHashMap[Subscription, (SpectatorQuery, EventHandler)]() + + private val index: QueryIndex[EventHandler] = QueryIndex.newInstance(new NoopRegistry) - @volatile private var index: QueryIndex[EventHandler] = QueryIndex.newInstance(new NoopRegistry) + @volatile private var currentSubs: Subscriptions = Subscriptions() + + @volatile private var handlers: List[EventHandler] = _ @volatile private var traceHandlers: Map[Subscription, TraceQuery.SpanFilter] = Map.empty + @volatile private var traceHandlersTS: Map[Subscription, TraceQuery.SpanTimeSeries] = Map.empty + protected def sync(subscriptions: Subscriptions): Unit = { + val diff = Subscriptions.diff(currentSubs, subscriptions) + currentSubs = subscriptions + val flushableHandlers = List.newBuilder[EventHandler] - val idx = QueryIndex.newInstance[EventHandler](new NoopRegistry) // Pass-through events - subscriptions.passThrough.foreach { sub => + diff.added.events.foreach { sub => val expr = ExprUtils.parseEventExpr(sub.expression) val q = removeValueClause(expr.query) val handler = expr match { case EventExpr.Raw(_) => EventHandler(sub, e => List(e)) case EventExpr.Table(_, cs) => EventHandler(sub, e => List(LwcEvent.Row(e, cs))) } - idx.add(q, handler) + index.add(q, handler) + subHandlers.put(sub, q -> handler) } + diff.removed.events.foreach(removeSubscription) // Analytics based on events - subscriptions.analytics.foreach { sub => + diff.added.timeSeries.foreach { sub => val expr = ExprUtils.parseDataExpr(sub.expression) val converter = DatapointConverter(sub.id, expr, clock, sub.step, submit) val q = removeValueClause(expr.query) @@ -63,19 +74,55 @@ abstract class AbstractLwcEventClient(clock: Clock) extends LwcEventClient { }, Some(converter) ) - idx.add(q, handler) + index.add(q, handler) + subHandlers.put(sub, q -> handler) flushableHandlers += handler } + diff.removed.timeSeries.foreach(removeSubscription) // Trace pass-through - traceHandlers = subscriptions.tracePassThrough.map { sub => + traceHandlers = subscriptions.traceEvents.map { sub => sub -> ExprUtils.parseTraceEventsQuery(sub.expression) }.toMap - index = idx + // Analytics based on traces + diff.added.traceTimeSeries.foreach { sub => + val tq = ExprUtils.parseTraceTimeSeriesQuery(sub.expression) + val dataExpr = tq.expr.expr.dataExprs.head + val converter = DatapointConverter(sub.id, dataExpr, clock, sub.step, submit) + val q = removeValueClause(dataExpr.query) + val handler = EventHandler( + sub, + event => { + converter.update(event) + Nil + }, + Some(converter) + ) + subHandlers.put(sub, q -> handler) + flushableHandlers += handler + } + diff.unchanged.traceTimeSeries.foreach { sub => + val handlerMeta = subHandlers.get(sub) + if (handlerMeta != null) + flushableHandlers += handlerMeta._2 + } + diff.removed.traceTimeSeries.foreach(sub => subHandlers.remove(sub)) + traceHandlersTS = subscriptions.traceTimeSeries.map { sub => + sub -> ExprUtils.parseTraceTimeSeriesQuery(sub.expression) + }.toMap + handlers = flushableHandlers.result() } + private def removeSubscription(sub: Subscription): Unit = { + val handlerMeta = subHandlers.remove(sub) + if (handlerMeta != null) { + val (q, handler) = handlerMeta + index.remove(q, handler) + } + } + private def removeValueClause(query: Query): SpectatorQuery = { val q = query .rewrite { @@ -109,6 +156,18 @@ abstract class AbstractLwcEventClient(clock: Clock) extends LwcEventClient { } } } + traceHandlersTS.foreachEntry { (sub, tq) => + if (TraceMatcher.matches(tq.q, trace)) { + val f = tq.expr.expr.dataExprs.head.query + val filtered = trace.filter(event => ExprUtils.matches(f, event.tagValue)) + if (filtered.nonEmpty) { + val handlerMeta = subHandlers.get(sub) + if (handlerMeta != null) { + filtered.foreach(handlerMeta._2.mapper) + } + } + } + } } } diff --git a/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/ExprUtils.scala b/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/ExprUtils.scala index 0242aedc2..7e29696b3 100644 --- a/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/ExprUtils.scala +++ b/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/ExprUtils.scala @@ -56,29 +56,19 @@ private[events] object ExprUtils { private val traceInterpreter = Interpreter(TraceVocabulary.allWords) - private def matchAllSpans(q: TraceQuery): TraceQuery.SpanFilter = { - TraceQuery.SpanFilter(q, Query.True) - } - /** Parse a single trace events query expression. */ def parseTraceEventsQuery(str: String): TraceQuery.SpanFilter = { traceInterpreter.execute(str).stack match { - case TraceQueryType(q) :: Nil => matchAllSpans(q) - case (f: TraceQuery.SpanFilter) :: Nil => f - case _ => throw new IllegalArgumentException(str) + case TraceFilterType(tq) :: Nil => tq + case _ => throw new IllegalArgumentException(str) } } - private def sumAllSpans(q: TraceQuery): TraceQuery.SpanTimeSeries = { - TraceQuery.SpanTimeSeries(q, StyleExpr(DataExpr.Sum(Query.True), Map.empty)) - } - /** Parse a single trace time series query expression. */ def parseTraceTimeSeriesQuery(str: String): TraceQuery.SpanTimeSeries = { traceInterpreter.execute(str).stack match { - case TraceQueryType(q) :: Nil => sumAllSpans(q) - case (f: TraceQuery.SpanTimeSeries) :: Nil => f - case _ => throw new IllegalArgumentException(str) + case TraceTimeSeriesType(tq) :: Nil => tq + case _ => throw new IllegalArgumentException(str) } } diff --git a/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/Subscription.scala b/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/Subscription.scala index d205378c4..4ee7dc66e 100644 --- a/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/Subscription.scala +++ b/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/Subscription.scala @@ -24,5 +24,7 @@ package com.netflix.atlas.lwc.events * Step size for the subscription when mapped into a time series. * @param expression * Expression for matching events and mapping into the expected output. + * @param exprType + * Type for the expression. */ -case class Subscription(id: String, step: Long, expression: String) +case class Subscription(id: String, step: Long, expression: String, exprType: String) diff --git a/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/Subscriptions.scala b/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/Subscriptions.scala index 503912d43..1295611ca 100644 --- a/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/Subscriptions.scala +++ b/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/Subscriptions.scala @@ -18,18 +18,63 @@ package com.netflix.atlas.lwc.events /** * Set of subscriptions to receive event data. * - * @param passThrough + * @param events * Subscriptions looking for the raw events to be passed through. - * @param analytics + * @param timeSeries * Subscriptions that should be mapped into time series. - * @param tracePassThrough + * @param traceEvents * Trace subscriptions looking for the trace spans to be passed through. - * @param traceAnalytics + * @param traceTimeSeries * Trace subscriptions that should map the selected spans into time-series. */ case class Subscriptions( - passThrough: List[Subscription] = Nil, - analytics: List[Subscription] = Nil, - tracePassThrough: List[Subscription] = Nil, - traceAnalytics: List[Subscription] = Nil + events: List[Subscription] = Nil, + timeSeries: List[Subscription] = Nil, + traceEvents: List[Subscription] = Nil, + traceTimeSeries: List[Subscription] = Nil ) + +object Subscriptions { + + val Events = "EVENTS" + val TimeSeries = "TIME_SERIES" + val TraceEvents = "TRACE_EVENTS" + val TraceTimeSeries = "TRACE_TIME_SERIES" + + /** + * Create instance from a flattened list with types based on the ExprType enum + * from the eval library. + */ + def fromTypedList(subs: List[Subscription]): Subscriptions = { + val groups = subs.groupBy(_.exprType) + Subscriptions( + events = groups.getOrElse(Events, Nil), + timeSeries = groups.getOrElse(TimeSeries, Nil), + traceEvents = groups.getOrElse(TraceEvents, Nil), + traceTimeSeries = groups.getOrElse(TraceTimeSeries, Nil) + ) + } + + /** Compute set of added and removed expressions between the two sets. */ + def diff(a: Subscriptions, b: Subscriptions): Diff = { + val (addedE, removedE, unchangedE) = diff(a.events, b.events) + val (addedTS, removedTS, unchangedTS) = diff(a.timeSeries, b.timeSeries) + val (addedTE, removedTE, unchangedTE) = diff(a.traceEvents, b.traceEvents) + val (addedTTS, removedTTS, unchangedTTS) = diff(a.traceTimeSeries, b.traceTimeSeries) + val added = Subscriptions(addedE, addedTS, addedTE, addedTTS) + val removed = Subscriptions(removedE, removedTS, removedTE, removedTTS) + val unchanged = Subscriptions(unchangedE, unchangedTS, unchangedTE, unchangedTTS) + Diff(added, removed, unchanged) + } + + private def diff[T](a: List[T], b: List[T]): (List[T], List[T], List[T]) = { + val setA = a.toSet + val setB = b.toSet + val added = setB.diff(setA) + val removed = setA.diff(setB) + val unchanged = setA.intersect(setB) + (added.toList, removed.toList, unchanged.toList) + } + + case class Diff(added: Subscriptions, removed: Subscriptions, unchanged: Subscriptions) +} diff --git a/atlas-lwc-events/src/test/scala/com/netflix/atlas/lwc/events/LwcEventClientSuite.scala b/atlas-lwc-events/src/test/scala/com/netflix/atlas/lwc/events/LwcEventClientSuite.scala index f039e7b3e..e17f5c133 100644 --- a/atlas-lwc-events/src/test/scala/com/netflix/atlas/lwc/events/LwcEventClientSuite.scala +++ b/atlas-lwc-events/src/test/scala/com/netflix/atlas/lwc/events/LwcEventClientSuite.scala @@ -38,10 +38,10 @@ class LwcEventClientSuite extends FunSuite { private val sampleLwcEvent: LwcEvent = LwcEvent(sampleSpan, extractSpanValue(sampleSpan)) test("pass-through") { - val subs = Subscriptions(passThrough = + val subs = Subscriptions.fromTypedList( List( - Subscription("1", 60000, "app,foo,:eq"), - Subscription("2", 60000, "app,www,:eq") + Subscription("1", 60000, "app,foo,:eq", Subscriptions.Events), + Subscription("2", 60000, "app,www,:eq", Subscriptions.Events) ) ) val output = List.newBuilder[String] @@ -54,10 +54,10 @@ class LwcEventClientSuite extends FunSuite { } test("analytics, basic aggregate") { - val subs = Subscriptions(analytics = + val subs = Subscriptions.fromTypedList( List( - Subscription("1", step, "app,foo,:eq,:sum"), - Subscription("2", step, "app,www,:eq,:sum") + Subscription("1", step, "app,foo,:eq,:sum", Subscriptions.TimeSeries), + Subscription("2", step, "app,www,:eq,:sum", Subscriptions.TimeSeries) ) ) val output = List.newBuilder[String] @@ -70,9 +70,14 @@ class LwcEventClientSuite extends FunSuite { } test("analytics, basic aggregate extract value") { - val subs = Subscriptions(analytics = + val subs = Subscriptions.fromTypedList( List( - Subscription("1", step, "app,www,:eq,value,duration,:eq,:and,:sum") + Subscription( + "1", + step, + "app,www,:eq,value,duration,:eq,:and,:sum", + Subscriptions.TimeSeries + ) ) ) val output = List.newBuilder[String] @@ -87,10 +92,10 @@ class LwcEventClientSuite extends FunSuite { } test("analytics, group by") { - val subs = Subscriptions(analytics = + val subs = Subscriptions.fromTypedList( List( - Subscription("1", step, "app,foo,:eq,:sum,(,node,),:by"), - Subscription("2", step, "app,www,:eq,:sum,(,node,),:by") + Subscription("1", step, "app,foo,:eq,:sum,(,node,),:by", Subscriptions.TimeSeries), + Subscription("2", step, "app,www,:eq,:sum,(,node,),:by", Subscriptions.TimeSeries) ) ) val output = List.newBuilder[String] @@ -105,9 +110,9 @@ class LwcEventClientSuite extends FunSuite { } test("analytics, group by missing key") { - val subs = Subscriptions(analytics = + val subs = Subscriptions.fromTypedList( List( - Subscription("1", 60000, "app,www,:eq,:sum,(,foo,),:by") + Subscription("1", 60000, "app,www,:eq,:sum,(,foo,),:by", Subscriptions.TimeSeries) ) ) val output = List.newBuilder[String] @@ -115,4 +120,19 @@ class LwcEventClientSuite extends FunSuite { client.process(sampleLwcEvent) assert(output.result().isEmpty) } + + test("trace analytics, basic aggregate") { + val subs = Subscriptions.fromTypedList( + List( + Subscription("1", step, "app,www,:eq", Subscriptions.TraceTimeSeries) + ) + ) + val output = List.newBuilder[String] + val client = LwcEventClient(subs, output.addOne, clock) + client.processTrace(Seq(new TestSpan(sampleSpan))) + clock.setWallTime(step) + client.process(LwcEvent.HeartbeatLwcEvent(step)) + val vs = output.result() + assertEquals(vs.size, 1) + } } diff --git a/atlas-lwc-events/src/test/scala/com/netflix/atlas/lwc/events/LwcEventSuite.scala b/atlas-lwc-events/src/test/scala/com/netflix/atlas/lwc/events/LwcEventSuite.scala index 3eca92854..2c711723f 100644 --- a/atlas-lwc-events/src/test/scala/com/netflix/atlas/lwc/events/LwcEventSuite.scala +++ b/atlas-lwc-events/src/test/scala/com/netflix/atlas/lwc/events/LwcEventSuite.scala @@ -83,4 +83,19 @@ object LwcEventSuite { case k => span.tags.getOrElse(k, null) } } + + class TestSpan(event: TestEvent) extends LwcEvent.Span { + + override def spanId: String = "test" + + override def parentId: String = "parent" + + override def rawEvent: Any = event + + override def timestamp: Long = 0L + + override def extractValue(key: String): Any = { + extractSpanValue(event)(key) + } + } } diff --git a/atlas-spring-lwc-events/src/main/resources/reference.conf b/atlas-spring-lwc-events/src/main/resources/reference.conf index e4ae7445c..596239633 100644 --- a/atlas-spring-lwc-events/src/main/resources/reference.conf +++ b/atlas-spring-lwc-events/src/main/resources/reference.conf @@ -8,9 +8,9 @@ atlas.lwc.events { // subscription = "app,foo,:eq,:sum" // } subscriptions { - pass-through = [] - analytics = [] - trace-pass-through = [] - trace-analytics = [] + events = [] + time-series = [] + trace-events = [] + trace-time-series = [] } } \ No newline at end of file diff --git a/atlas-spring-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/LwcEventConfiguration.scala b/atlas-spring-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/LwcEventConfiguration.scala index eee1182e4..5ceb3a357 100644 --- a/atlas-spring-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/LwcEventConfiguration.scala +++ b/atlas-spring-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/LwcEventConfiguration.scala @@ -39,17 +39,21 @@ class LwcEventConfiguration { private def toSubscriptions(config: Config): Subscriptions = { val cfg = config.getConfig("atlas.lwc.events.subscriptions") Subscriptions( - passThrough = toSubscriptions(cfg.getConfigList("pass-through")), - analytics = toSubscriptions(cfg.getConfigList("analytics")), - tracePassThrough = toSubscriptions(cfg.getConfigList("trace-pass-through")), - traceAnalytics = toSubscriptions(cfg.getConfigList("trace-analytics")) + events = toSubscriptions(cfg.getConfigList("events"), Subscriptions.Events), + timeSeries = toSubscriptions(cfg.getConfigList("time-series"), Subscriptions.TimeSeries), + traceEvents = toSubscriptions(cfg.getConfigList("trace-events"), Subscriptions.TraceEvents), + traceTimeSeries = + toSubscriptions(cfg.getConfigList("trace-time-series"), Subscriptions.TraceTimeSeries) ) } - private def toSubscriptions(configs: java.util.List[? <: Config]): List[Subscription] = { + private def toSubscriptions( + configs: java.util.List[? <: Config], + exprType: String + ): List[Subscription] = { configs.asScala.toList.map { c => val step = if (c.hasPath("step")) c.getDuration("step").toMillis else 60_000L - Subscription(c.getString("id"), step, c.getString("expression")) + Subscription(c.getString("id"), step, c.getString("expression"), exprType) } } }