From 50840e7ef2ed9db0f4297ba3096ab6dfbad3fc56 Mon Sep 17 00:00:00 2001 From: brharrington Date: Thu, 8 Feb 2024 10:33:09 -0600 Subject: [PATCH] eval: add counter for unknown messages (#1605) This will get inremented when we get messages from the LWC API service and do not recognize the id. --- .../eval/stream/LwcToAggrDatapoint.scala | 23 ++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/LwcToAggrDatapoint.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/LwcToAggrDatapoint.scala index 6d5d273be..28027ec43 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/LwcToAggrDatapoint.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/LwcToAggrDatapoint.scala @@ -37,6 +37,8 @@ import com.netflix.atlas.eval.model.LwcSubscription private[stream] class LwcToAggrDatapoint(context: StreamContext) extends GraphStage[FlowShape[List[AnyRef], List[AggrDatapoint]]] { + private val unknown = context.registry.counter("atlas.eval.unknownMessages") + private val in = Inlet[List[AnyRef]]("LwcToAggrDatapoint.in") private val out = Outlet[List[AggrDatapoint]]("LwcToAggrDatapoint.out") @@ -75,18 +77,23 @@ private[stream] class LwcToAggrDatapoint(context: StreamContext) } private def pushDatapoint(dp: LwcDatapoint): Option[AggrDatapoint] = { - state.get(dp.id).map { sub => - // TODO, put in source, for now make it random to avoid dedup - nextSource += 1 - val expr = sub.expr - val step = sub.step - AggrDatapoint(dp.timestamp, step, expr, nextSource.toString, dp.tags, dp.value) + state.get(dp.id) match { + case Some(sub) => + // TODO, put in source, for now make it random to avoid dedup + nextSource += 1 + val expr = sub.expr + val step = sub.step + Some(AggrDatapoint(dp.timestamp, step, expr, nextSource.toString, dp.tags, dp.value)) + case None => + unknown.increment() + None } } private def pushDiagnosticMessage(diagMsg: LwcDiagnosticMessage): Unit = { - state.get(diagMsg.id).foreach { sub => - context.log(sub.expr, diagMsg.message) + state.get(diagMsg.id) match { + case Some(sub) => context.log(sub.expr, diagMsg.message) + case None => unknown.increment() } }