Skip to content

Commit

Permalink
eval: add counter for unknown messages
Browse files Browse the repository at this point in the history
This will get inremented when we get messages from the
LWC API service and do not recognize the id.
  • Loading branch information
brharrington committed Feb 8, 2024
1 parent c51f04a commit 89f577b
Showing 1 changed file with 15 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ import com.netflix.atlas.eval.model.LwcSubscription
private[stream] class LwcToAggrDatapoint(context: StreamContext)
extends GraphStage[FlowShape[List[AnyRef], List[AggrDatapoint]]] {

private val unknown = context.registry.counter("atlas.eval.unknownMessages")

private val in = Inlet[List[AnyRef]]("LwcToAggrDatapoint.in")
private val out = Outlet[List[AggrDatapoint]]("LwcToAggrDatapoint.out")

Expand Down Expand Up @@ -75,18 +77,23 @@ private[stream] class LwcToAggrDatapoint(context: StreamContext)
}

private def pushDatapoint(dp: LwcDatapoint): Option[AggrDatapoint] = {
state.get(dp.id).map { sub =>
// TODO, put in source, for now make it random to avoid dedup
nextSource += 1
val expr = sub.expr
val step = sub.step
AggrDatapoint(dp.timestamp, step, expr, nextSource.toString, dp.tags, dp.value)
state.get(dp.id) match {
case Some(sub) =>
// TODO, put in source, for now make it random to avoid dedup
nextSource += 1
val expr = sub.expr
val step = sub.step
Some(AggrDatapoint(dp.timestamp, step, expr, nextSource.toString, dp.tags, dp.value))
case None =>
unknown.increment()
None
}
}

private def pushDiagnosticMessage(diagMsg: LwcDiagnosticMessage): Unit = {
state.get(diagMsg.id).foreach { sub =>
context.log(sub.expr, diagMsg.message)
state.get(diagMsg.id) match {
case Some(sub) => context.log(sub.expr, diagMsg.message)
case None => unknown.increment()
}
}

Expand Down

0 comments on commit 89f577b

Please sign in to comment.