diff --git a/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/AbstractLwcEventClient.scala b/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/AbstractLwcEventClient.scala index f762e0d6e..be3bdf9d7 100644 --- a/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/AbstractLwcEventClient.scala +++ b/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/AbstractLwcEventClient.scala @@ -41,6 +41,12 @@ abstract class AbstractLwcEventClient(clock: Clock) extends LwcEventClient { @volatile private var traceHandlersTS: Map[Subscription, TraceTimeSeries] = Map.empty + /** + * Called to force flushing the data. Implementations should override if they have + * some buffering. + */ + protected def flush(): Unit = {} + protected def sync(subscriptions: Subscriptions): Unit = { val diff = Subscriptions.diff(currentSubs, subscriptions) currentSubs = subscriptions @@ -142,6 +148,7 @@ abstract class AbstractLwcEventClient(clock: Clock) extends LwcEventClient { event match { case LwcEvent.HeartbeatLwcEvent(timestamp) => handlers.foreach(_.flush(timestamp)) + flush() case _ => index.forEachMatch(k => event.tagValue(k), h => handleMatch(event, h)) } diff --git a/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/RemoteLwcEventClient.scala b/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/RemoteLwcEventClient.scala index b46fbefa8..971ca6220 100644 --- a/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/RemoteLwcEventClient.scala +++ b/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/RemoteLwcEventClient.scala @@ -30,6 +30,7 @@ import java.time.Duration import java.util.concurrent.ArrayBlockingQueue import java.util.concurrent.ExecutorService import java.util.concurrent.Executors +import java.util.concurrent.locks.ReentrantLock import scala.util.Using class RemoteLwcEventClient(registry: Registry, config: Config) @@ -60,6 +61,8 @@ class RemoteLwcEventClient(registry: Registry, config: Config) private val droppedSend = registry.counter("lwc.events", "id", "dropped", "error", "http") private val droppedTooBig = registry.counter("lwc.events", "id", "dropped", "error", "too-big") + private val flushLock = new ReentrantLock() + private var scheduler: Scheduler = _ private var executorService: ExecutorService = _ @@ -132,24 +135,30 @@ class RemoteLwcEventClient(registry: Registry, config: Config) flush() } - private def flush(): Unit = { - val events = new java.util.ArrayList[Event](buffer.size()) - buffer.drainTo(events) - if (!events.isEmpty) { - import scala.jdk.CollectionConverters.* - - // Write out datapoints that need to be batch by timestamp - val ds = events.asScala.collect { - case Event(_, e: DatapointEvent) => e - }.toList - flushDatapoints(ds) - - // Write out other events for pass through - val now = registry.clock().wallTime() - batch( - events.asScala.filterNot(_.isDatapoint).toList, - es => send(EvalPayload(now, events = es)) - ) + override protected def flush(): Unit = { + if (flushLock.tryLock()) { + try { + val events = new java.util.ArrayList[Event](buffer.size()) + buffer.drainTo(events) + if (!events.isEmpty) { + import scala.jdk.CollectionConverters.* + + // Write out datapoints that need to be batch by timestamp + val ds = events.asScala.collect { + case Event(_, e: DatapointEvent) => e + }.toList + flushDatapoints(ds) + + // Write out other events for pass through + val now = registry.clock().wallTime() + batch( + events.asScala.filterNot(_.isDatapoint).toList, + es => send(EvalPayload(now, events = es)) + ) + } + } finally { + flushLock.unlock() + } } } @@ -197,7 +206,7 @@ class RemoteLwcEventClient(registry: Registry, config: Config) } } - private def send(payload: EvalPayload): Unit = { + protected def send(payload: EvalPayload): Unit = { val task: Runnable = () => { try { val response = HttpClient.DEFAULT_CLIENT diff --git a/atlas-lwc-events/src/test/scala/com/netflix/atlas/lwc/events/RemoveLwcEventClientSuite.scala b/atlas-lwc-events/src/test/scala/com/netflix/atlas/lwc/events/RemoveLwcEventClientSuite.scala index b025ad6c2..59317edfe 100644 --- a/atlas-lwc-events/src/test/scala/com/netflix/atlas/lwc/events/RemoveLwcEventClientSuite.scala +++ b/atlas-lwc-events/src/test/scala/com/netflix/atlas/lwc/events/RemoveLwcEventClientSuite.scala @@ -16,22 +16,41 @@ package com.netflix.atlas.lwc.events import com.netflix.spectator.api.DefaultRegistry -import com.netflix.spectator.api.NoopRegistry import com.netflix.spectator.api.Registry import com.netflix.spectator.api.Utils import com.typesafe.config.ConfigFactory import munit.FunSuite +import java.util.concurrent.CopyOnWriteArrayList + class RemoveLwcEventClientSuite extends FunSuite { private val config = ConfigFactory.load() + private var payloads: java.util.List[RemoteLwcEventClient.EvalPayload] = _ private var registry: Registry = _ private var client: RemoteLwcEventClient = _ override def beforeEach(context: BeforeEach): Unit = { + payloads = new CopyOnWriteArrayList[RemoteLwcEventClient.EvalPayload]() registry = new DefaultRegistry() client = new RemoteLwcEventClient(registry, config) { - override def start(): Unit = () + override def start(): Unit = { + val subs = Subscriptions(events = + List( + Subscription( + "test", + 0L, + ":true", + "EVENTS" + ) + ) + ) + sync(subs) + } + + override protected def send(payload: RemoteLwcEventClient.EvalPayload): Unit = { + payloads.add(payload) + } } } @@ -68,4 +87,19 @@ class RemoveLwcEventClientSuite extends FunSuite { assertEquals(c.count(), 1L) } } + + test("flush on heartbeat") { + client.start() + + val str = "1234567890" + val event = LwcEvent(str, _ => str) + val events = (0 until 100).map(_ => event).toList + + events.foreach(client.process) + assert(payloads.isEmpty) + + client.process(LwcEvent.HeartbeatLwcEvent(registry.clock().wallTime())) + assertEquals(payloads.size(), 1) + assertEquals(payloads.get(0).size, 100) + } }