diff --git a/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/DatapointConverter.scala b/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/DatapointConverter.scala index ae8ee26fa..bd95a9e74 100644 --- a/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/DatapointConverter.scala +++ b/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/DatapointConverter.scala @@ -28,6 +28,7 @@ import com.typesafe.scalalogging.StrictLogging import java.time.Duration import java.util.Locale import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.locks.ReentrantLock /** * Helper to convert a sequence of events into a data point. @@ -161,13 +162,24 @@ private[events] object DatapointConverter { private val buffer = new StepDouble(Double.NaN, params.clock, params.step) + // Lock to avoid race condition between update and flush that can otherwise result + // in the samples occasionally being missing for an aggregate. + private val lock = new ReentrantLock() private val sampleMapper: LwcEvent => List[Any] = params.sampleMapper.orNull - @volatile private var sample: List[Any] = Nil + private var sample: List[Any] = Nil override def update(event: LwcEvent): Unit = { - update(params.valueMapper(event)) - if (sampleMapper != null && sample.isEmpty) { - sample = sampleMapper(event) + if (sampleMapper == null) { + update(params.valueMapper(event)) + } else { + lock.lock() + try { + update(params.valueMapper(event)) + if (sample.isEmpty) + sample = sampleMapper(event) + } finally { + lock.unlock() + } } } @@ -177,14 +189,25 @@ private[events] object DatapointConverter { } } + private def getAndResetSample(): List[List[Any]] = { + if (sampleMapper == null) { + Nil + } else { + lock.lock() + try { + val s = sample + sample = Nil + List(s) + } finally { + lock.unlock() + } + } + } + override def flush(timestamp: Long): Unit = { val value = buffer.pollAsRate(timestamp) if (value.isFinite) { - var s = List.empty[List[Any]] - if (sampleMapper != null) { - s = List(sample) - sample = Nil - } + val s = getAndResetSample() val ts = timestamp / params.step * params.step val event = DatapointEvent(params.id, params.tags, ts, value, s) params.consumer(params.id, event)