From 5033700beae99886f1894646db55d6a555fcf40a Mon Sep 17 00:00:00 2001 From: brharrington Date: Fri, 29 Mar 2024 12:02:12 -0500 Subject: [PATCH] lwc-events: add remote client (#1641) Update config so when disabled it will still use local client to allow for debugging. Remote client that talks to lwcapi service will be used when enabled. --- .../src/main/resources/reference.conf | 16 ++ .../lwc/events/AbstractLwcEventClient.scala | 3 +- .../lwc/events/RemoteLwcEventClient.scala | 264 ++++++++++++++++++ .../atlas/lwc/events/Subscription.scala | 7 +- .../src/main/resources/reference.conf | 6 +- .../lwc/events/LwcEventConfiguration.scala | 17 +- 6 files changed, 304 insertions(+), 9 deletions(-) create mode 100644 atlas-lwc-events/src/main/resources/reference.conf create mode 100644 atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/RemoteLwcEventClient.scala diff --git a/atlas-lwc-events/src/main/resources/reference.conf b/atlas-lwc-events/src/main/resources/reference.conf new file mode 100644 index 000000000..e2c68bed9 --- /dev/null +++ b/atlas-lwc-events/src/main/resources/reference.conf @@ -0,0 +1,16 @@ + +atlas.lwc.events { + + base-uri = "http://localhost:7101/lwc/api/v1" + config-uri = ${atlas.lwc.events.base-uri}"/expressions" + eval-uri = ${atlas.lwc.events.base-uri}"/evaluate" + + // Max buffer size before events will start getting dropped. Used to avoid OOM. + buffer-size = 1000000 + + // Buffer size to force a flush before regular time interval. + flush-size = 100000 + + // Batch size for publishing data + batch-size = 5000 +} \ No newline at end of file diff --git a/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/AbstractLwcEventClient.scala b/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/AbstractLwcEventClient.scala index 24f873433..66e71d6c0 100644 --- a/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/AbstractLwcEventClient.scala +++ b/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/AbstractLwcEventClient.scala @@ -15,7 +15,6 @@ */ package com.netflix.atlas.lwc.events -import com.netflix.atlas.core.model.DataExpr import com.netflix.atlas.core.model.EventExpr import com.netflix.atlas.core.model.Query import com.netflix.atlas.core.model.TraceQuery @@ -36,7 +35,7 @@ abstract class AbstractLwcEventClient(clock: Clock) extends LwcEventClient { @volatile private var currentSubs: Subscriptions = Subscriptions() - @volatile private var handlers: List[EventHandler] = _ + @volatile private var handlers: List[EventHandler] = Nil @volatile private var traceHandlers: Map[Subscription, TraceQuery.SpanFilter] = Map.empty diff --git a/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/RemoteLwcEventClient.scala b/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/RemoteLwcEventClient.scala new file mode 100644 index 000000000..ec06cef01 --- /dev/null +++ b/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/RemoteLwcEventClient.scala @@ -0,0 +1,264 @@ +/* + * Copyright 2014-2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.atlas.lwc.events + +import com.fasterxml.jackson.core.JsonGenerator +import com.netflix.atlas.core.util.FastGzipOutputStream +import com.netflix.atlas.json.Json +import com.netflix.spectator.api.Registry +import com.netflix.spectator.impl.Scheduler +import com.netflix.spectator.ipc.http.HttpClient +import com.typesafe.config.Config +import com.typesafe.scalalogging.StrictLogging + +import java.io.ByteArrayOutputStream +import java.net.URI +import java.time.Duration +import java.util.concurrent.ArrayBlockingQueue +import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors +import scala.util.Using + +class RemoteLwcEventClient(registry: Registry, config: Config) + extends AbstractLwcEventClient(registry.clock()) + with AutoCloseable + with StrictLogging { + + import RemoteLwcEventClient.* + + private val scopedConfig = config.getConfig("atlas.lwc.events") + private val configUri = URI.create(scopedConfig.getString("config-uri")) + private val evalUri = URI.create(scopedConfig.getString("eval-uri")) + + private val bufferSize = scopedConfig.getInt("buffer-size") + private val flushSize = scopedConfig.getInt("flush-size") + private val batchSize = scopedConfig.getInt("batch-size") + + private val buffer = new ArrayBlockingQueue[Event](bufferSize) + + private val sentEvents = registry.counter("lwc.events", "id", "sent") + + private val droppedQueueFull = + registry.counter("lwc.events", "id", "dropped", "error", "queue-full") + private val droppedSend = registry.counter("lwc.events", "id", "dropped", "error", "http") + + private var scheduler: Scheduler = _ + + private var executorService: ExecutorService = _ + + def start(): Unit = { + if (scheduler == null) { + scheduler = new Scheduler(registry, "LwcEventClient", 2) + + val refreshOptions = new Scheduler.Options() + .withFrequency(Scheduler.Policy.FIXED_DELAY, Duration.ofSeconds(10)) + .withStopOnFailure(false) + scheduler.schedule(refreshOptions, () => refreshConfigs()) + + val heartbeatOptions = new Scheduler.Options() + .withFrequency(Scheduler.Policy.FIXED_DELAY, Duration.ofSeconds(2)) + .withStopOnFailure(false) + scheduler.schedule(heartbeatOptions, () => sendHeartbeat()) + + executorService = createExecutorService + } + } + + def stop(): Unit = { + if (scheduler != null) { + scheduler.shutdown() + scheduler = null + executorService.shutdown() + } + } + + override def close(): Unit = { + stop() + } + + override def submit(id: String, event: LwcEvent): Unit = { + if (!buffer.offer(Event(id, event))) + droppedQueueFull.increment() + if (buffer.size() > flushSize) + flush() + } + + private def refreshConfigs(): Unit = { + try { + val response = HttpClient.DEFAULT_CLIENT + .get(configUri) + .acceptGzip() + .send() + .decompress() + + if (response.status() == 200) { + try { + val subs = Json.decode[Expressions](response.entity()) + sync(Subscriptions.fromTypedList(subs.expressions)) + } catch { + case e: Exception => + logger.warn("failed to process expressions payload", e) + } + } else if (response.status() >= 400) { + logger.warn(s"failed to refresh expressions, status code ${response.status()}") + } + } catch { + case e: Exception => logger.warn("failed to refresh expressions", e) + } + } + + private def sendHeartbeat(): Unit = { + // Send explicit heartbeat used to flush data for time series + process(LwcEvent.HeartbeatLwcEvent(registry.clock().wallTime())) + if (!buffer.isEmpty) + flush() + } + + private def flush(): Unit = { + val events = new java.util.ArrayList[Event](buffer.size()) + buffer.drainTo(events) + if (!events.isEmpty) { + import scala.jdk.CollectionConverters.* + + // Write out datapoints that need to be batch by timestamp + val ds = events.asScala.collect { + case Event(_, e: DatapointEvent) => e + }.toList + flushDatapoints(ds) + + // Write out other events for pass through + val now = registry.clock().wallTime() + events.asScala + .filterNot(_.isDatapoint) + .toList + .grouped(batchSize) + .foreach { vs => + send(EvalPayload(now, events = vs)) + } + } + } + + private def flushDatapoints(events: List[DatapointEvent]): Unit = { + events.groupBy(_.timestamp).foreach { + case (timestamp, ds) => + ds.grouped(batchSize).foreach { vs => + send(EvalPayload(timestamp, metrics = vs)) + } + } + } + + private def send(payload: EvalPayload): Unit = { + val task: Runnable = () => { + try { + val response = HttpClient.DEFAULT_CLIENT + .post(evalUri) + .addHeader("Content-Encoding", "gzip") + .withContent("application/x-jackson-smile", encodePayload(payload)) + .send() + + if (response.status() == 200) { + val numEvents = payload.size + logger.debug(s"sent $numEvents events") + sentEvents.increment(numEvents) + } else { + logger.warn(s"failed to send events, status code ${response.status()}") + droppedSend.increment(payload.size) + } + } catch { + case e: Exception => + logger.warn("failed to send events", e) + droppedSend.increment(payload.size) + } + } + executorService.submit(task) + } + + private def encodePayload(payload: EvalPayload): Array[Byte] = { + val baos = new ByteArrayOutputStream() + Using.resource(new FastGzipOutputStream(baos)) { out => + Using.resource(Json.newSmileGenerator(out)) { gen => + gen.writeStartObject() + gen.writeNumberField("timestamp", payload.timestamp) + gen.writeArrayFieldStart("metrics") + payload.metrics.foreach(m => encodeMetric(m, gen)) + gen.writeEndArray() + gen.writeArrayFieldStart("events") + payload.events.foreach(e => encodeEvent(e, gen)) + gen.writeEndArray() + gen.writeEndObject() + } + } + baos.toByteArray + } + + private def encodeMetric(event: DatapointEvent, gen: JsonGenerator): Unit = { + gen.writeStartObject() + gen.writeStringField("id", event.id) + gen.writeObjectFieldStart("tags") + event.tags.foreachEntry { (k, v) => + gen.writeStringField(k, v) + } + gen.writeEndObject() + gen.writeNumberField("value", event.value) + gen.writeEndObject() + } + + private def encodeEvent(event: Event, gen: JsonGenerator): Unit = { + gen.writeStartObject() + gen.writeStringField("id", event.id) + gen.writeFieldName("payload") + event.payload.encode(gen) + gen.writeEndObject() + } +} + +object RemoteLwcEventClient { + + case class Expressions(expressions: List[Subscription]) + + case class EvalPayload( + timestamp: Long, + metrics: List[DatapointEvent] = Nil, + events: List[Event] = Nil + ) { + + def size: Int = metrics.size + events.size + } + + case class Event(id: String, payload: LwcEvent) { + + def isDatapoint: Boolean = { + payload.isInstanceOf[DatapointEvent] + } + } + + /** + * On JDK21 uses virtual threads, on earlier versions uses fixed thread pool based on + * number of cores. + */ + def createExecutorService: ExecutorService = { + try { + val cls = classOf[Executors] + val method = cls.getMethod("newVirtualThreadPerTaskExecutor") + method.invoke(null).asInstanceOf[ExecutorService] + } catch { + case _: Exception => + val numProcessors = Runtime.getRuntime.availableProcessors() + val numThreads = Math.max(numProcessors / 2, 1) + Executors.newFixedThreadPool(numThreads) + } + } +} diff --git a/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/Subscription.scala b/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/Subscription.scala index 4ee7dc66e..25e1b6720 100644 --- a/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/Subscription.scala +++ b/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/Subscription.scala @@ -20,11 +20,14 @@ package com.netflix.atlas.lwc.events * * @param id * Id used to pair the received data with a given consumer. - * @param step + * @param frequency * Step size for the subscription when mapped into a time series. * @param expression * Expression for matching events and mapping into the expected output. * @param exprType * Type for the expression. */ -case class Subscription(id: String, step: Long, expression: String, exprType: String) +case class Subscription(id: String, frequency: Long, expression: String, exprType: String) { + + def step: Long = frequency +} diff --git a/atlas-spring-lwc-events/src/main/resources/reference.conf b/atlas-spring-lwc-events/src/main/resources/reference.conf index 596239633..70797e16a 100644 --- a/atlas-spring-lwc-events/src/main/resources/reference.conf +++ b/atlas-spring-lwc-events/src/main/resources/reference.conf @@ -1,7 +1,11 @@ atlas.lwc.events { - // Sample subscription. Step is optional. + // If remote processing is enabled. When disabled a minimal local client can be used + // for debugging with a preconfigured set of expressions. + enabled = true + + // Sample subscription. Step is optional. This block is only used in local mode. // { // id = "foo" // step = 60s diff --git a/atlas-spring-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/LwcEventConfiguration.scala b/atlas-spring-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/LwcEventConfiguration.scala index 5ceb3a357..e5166a23d 100644 --- a/atlas-spring-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/LwcEventConfiguration.scala +++ b/atlas-spring-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/LwcEventConfiguration.scala @@ -15,6 +15,8 @@ */ package com.netflix.atlas.lwc.events +import com.netflix.spectator.api.NoopRegistry +import com.netflix.spectator.api.Registry import com.typesafe.config.Config import com.typesafe.config.ConfigFactory import org.slf4j.LoggerFactory @@ -29,11 +31,18 @@ class LwcEventConfiguration { import scala.jdk.CollectionConverters.* @Bean - def lwcEventClient(config: Optional[Config]): LwcEventClient = { + def lwcEventClient(registry: Optional[Registry], config: Optional[Config]): LwcEventClient = { + val r = registry.orElseGet(() => new NoopRegistry) val c = config.orElseGet(() => ConfigFactory.load()) - val logger = LoggerFactory.getLogger(classOf[LwcEventClient]) - val subs = toSubscriptions(c) - LwcEventClient(subs, logger.info) + if (c.getBoolean("atlas.lwc.events.enabled")) { + val client = new RemoteLwcEventClient(r, c) + client.start() + client + } else { + val logger = LoggerFactory.getLogger(classOf[LwcEventClient]) + val subs = toSubscriptions(c) + LwcEventClient(subs, logger.info) + } } private def toSubscriptions(config: Config): Subscriptions = {