Skip to content

Commit

Permalink
eval: add counter for unknown messages (#1605)
Browse files Browse the repository at this point in the history
This will get inremented when we get messages from the
LWC API service and do not recognize the id.
  • Loading branch information
brharrington authored Feb 8, 2024
1 parent c51f04a commit 50840e7
Showing 1 changed file with 15 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ import com.netflix.atlas.eval.model.LwcSubscription
private[stream] class LwcToAggrDatapoint(context: StreamContext)
extends GraphStage[FlowShape[List[AnyRef], List[AggrDatapoint]]] {

private val unknown = context.registry.counter("atlas.eval.unknownMessages")

private val in = Inlet[List[AnyRef]]("LwcToAggrDatapoint.in")
private val out = Outlet[List[AggrDatapoint]]("LwcToAggrDatapoint.out")

Expand Down Expand Up @@ -75,18 +77,23 @@ private[stream] class LwcToAggrDatapoint(context: StreamContext)
}

private def pushDatapoint(dp: LwcDatapoint): Option[AggrDatapoint] = {
state.get(dp.id).map { sub =>
// TODO, put in source, for now make it random to avoid dedup
nextSource += 1
val expr = sub.expr
val step = sub.step
AggrDatapoint(dp.timestamp, step, expr, nextSource.toString, dp.tags, dp.value)
state.get(dp.id) match {
case Some(sub) =>
// TODO, put in source, for now make it random to avoid dedup
nextSource += 1
val expr = sub.expr
val step = sub.step
Some(AggrDatapoint(dp.timestamp, step, expr, nextSource.toString, dp.tags, dp.value))
case None =>
unknown.increment()
None
}
}

private def pushDiagnosticMessage(diagMsg: LwcDiagnosticMessage): Unit = {
state.get(diagMsg.id).foreach { sub =>
context.log(sub.expr, diagMsg.message)
state.get(diagMsg.id) match {
case Some(sub) => context.log(sub.expr, diagMsg.message)
case None => unknown.increment()
}
}

Expand Down

0 comments on commit 50840e7

Please sign in to comment.