Skip to content

Commit

Permalink
lwc-events: batch based on estimated payload size (#1653)
Browse files Browse the repository at this point in the history
For pass-through events the sizes can vary wildly from one
event to the next. Rather than use a fixed size, use an
estimated size for the event.
  • Loading branch information
brharrington authored Apr 26, 2024
1 parent c75e8fb commit c68c210
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 6 deletions.
3 changes: 3 additions & 0 deletions atlas-lwc-events/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,7 @@ atlas.lwc.events {

// Batch size for publishing data
batch-size = 5000

// Max payload size for events data. Default max payload size for server is 8MiB.
payload-size = 7MiB
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,15 @@ trait LwcEvent {
w.toString
}
}

/**
* Estimates the size of the events in bytes. This is used for batching to ensure that
* the payload is not too big for the backend. The default implementation just encodes
* to a JSON string. If possible, override with a more efficient implementation.
*/
def estimatedSizeInBytes: Int = {
toJson.length
}
}

object LwcEvent {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class RemoteLwcEventClient(registry: Registry, config: Config)
private val bufferSize = scopedConfig.getInt("buffer-size")
private val flushSize = scopedConfig.getInt("flush-size")
private val batchSize = scopedConfig.getInt("batch-size")
private val payloadSize = scopedConfig.getBytes("payload-size")

private val buffer = new ArrayBlockingQueue[Event](bufferSize)

Expand All @@ -54,6 +55,7 @@ class RemoteLwcEventClient(registry: Registry, config: Config)
private val droppedQueueFull =
registry.counter("lwc.events", "id", "dropped", "error", "queue-full")
private val droppedSend = registry.counter("lwc.events", "id", "dropped", "error", "http")
private val droppedTooBig = registry.counter("lwc.events", "id", "dropped", "error", "too-big")

private var scheduler: Scheduler = _

Expand Down Expand Up @@ -141,13 +143,45 @@ class RemoteLwcEventClient(registry: Registry, config: Config)

// Write out other events for pass through
val now = registry.clock().wallTime()
events.asScala
.filterNot(_.isDatapoint)
.toList
.grouped(batchSize)
.foreach { vs =>
send(EvalPayload(now, events = vs))
batch(
events.asScala.filterNot(_.isDatapoint).toList,
es => send(EvalPayload(now, events = es))
)
}
}

private[events] def batch(events: List[Event], sink: List[Event] => Unit): Unit = {
var size = 0L
val eventBatch = List.newBuilder[Event]
val it = events.iterator
while (it.hasNext) {
val event = it.next()
val estimatedSize = event.payload.estimatedSizeInBytes

// Flush batch if size is met
if (size + estimatedSize > payloadSize) {
val es = eventBatch.result()
eventBatch.clear()
size = 0L
if (es.nonEmpty) {
sink(es)
}
}

// Verify that a single message doesn't exceed the size
if (estimatedSize > payloadSize) {
logger.warn(s"dropping event with excessive size ($estimatedSize > $payloadSize)")
droppedTooBig.increment()
} else {
size += estimatedSize
eventBatch += event
}
}

// Flush final batch
val es = eventBatch.result()
if (es.nonEmpty) {
sink(es)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright 2014-2024 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.atlas.lwc.events

import com.netflix.spectator.api.DefaultRegistry
import com.netflix.spectator.api.NoopRegistry
import com.netflix.spectator.api.Registry
import com.netflix.spectator.api.Utils
import com.typesafe.config.ConfigFactory
import munit.FunSuite

class RemoveLwcEventClientSuite extends FunSuite {

private val config = ConfigFactory.load()
private var registry: Registry = _
private var client: RemoteLwcEventClient = _

override def beforeEach(context: BeforeEach): Unit = {
registry = new DefaultRegistry()
client = new RemoteLwcEventClient(registry, config) {
override def start(): Unit = ()
}
}

test("batch events by size") {
val str = "1234567890" * 100_000
val event = LwcEvent(str, _ => str)
val events = (0 until 100).map(_ => RemoteLwcEventClient.Event("_", event)).toList
val results = List.newBuilder[List[RemoteLwcEventClient.Event]]
client.batch(events, results.addOne)

val batches = results.result()
assertEquals(batches.size, 100 / 7 + 1)
batches.foreach { batch =>
assert(batch.size <= 7)
}
}

test("batch events, single event is too big") {
val str = "1234567890" * 1_000_000
val event = LwcEvent(str, _ => str)
val events = List(RemoteLwcEventClient.Event("_", event))
val results = List.newBuilder[List[RemoteLwcEventClient.Event]]
client.batch(events, results.addOne)

val batches = results.result()
assertEquals(batches.size, 0)

val errors = registry
.counters()
.filter(c => c.id.name == "lwc.events" && Utils.getTagValue(c.id, "error") == "too-big")
.toList
assertEquals(errors.size(), 1)
errors.forEach { c =>
assertEquals(c.count(), 1L)
}
}
}

0 comments on commit c68c210

Please sign in to comment.