diff --git a/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasRegistry.java b/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasRegistry.java index f957bab3b..38fe289b6 100644 --- a/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasRegistry.java +++ b/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasRegistry.java @@ -310,10 +310,7 @@ void sendToLWC() { EvalPayload payload = evaluator.eval(t); if (!payload.getMetrics().isEmpty()) { List> futures = new ArrayList<>(); - for (EvalPayload batch : payload.toBatches(batchSize)) { - CompletableFuture future = publisher.publish(batch); - futures.add(future); - } + payload.toBatches(batchSize, p -> futures.add(publisher.publish(p))); CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); } } catch (Exception e) { diff --git a/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/impl/EvalPayload.java b/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/impl/EvalPayload.java index 7ff5c9e8e..3a5c839a7 100644 --- a/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/impl/EvalPayload.java +++ b/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/impl/EvalPayload.java @@ -19,6 +19,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.function.Consumer; /** * Wraps a list of measurements with a set of common tags. The common tags are @@ -70,18 +71,31 @@ public List getMessages() { * List of payloads that have at most {@code batchSize} metrics per payload. */ public List toBatches(int batchSize) { + List payloads = new ArrayList<>(metrics.size() / batchSize + 1); + toBatches(batchSize, payloads::add); + return payloads; + } + + /** + * Break the payload down to a set of batches to limit the size of requests going to the + * service. + * + * @param batchSize + * Size of the metric batches to create. + * @param consumer + * Consumer to receive an eval payload batch. + */ + public void toBatches(int batchSize, Consumer consumer) { int size = metrics.size(); if (size <= batchSize) { - return Collections.singletonList(this); + consumer.accept(this); } else { - List payloads = new ArrayList<>(size / batchSize + 1); for (int i = 0; i < size; i += batchSize) { List batch = metrics.subList(i, Math.min(size, i + batchSize)); // There shouldn't be many messages, stick in the first batch List msgs = (i == 0) ? messages : Collections.emptyList(); - payloads.add(new EvalPayload(timestamp, batch, msgs)); + consumer.accept(new EvalPayload(timestamp, batch, msgs)); } - return payloads; } }