From 4bef43c137d80d8c866ca2e582bf73ed80b4a63f Mon Sep 17 00:00:00 2001 From: brharrington Date: Fri, 11 Nov 2022 09:41:55 -0600 Subject: [PATCH] batch updater for counter, timer, and dist summary (#1003) Adds a helper that can be used to batch updates to the delegate meter. The main use-case for this is timers or distributions summaries that need to be updated from a single thread with a high throughput. For counters, this can easily be achieved by the user with a local variable to track the count and then calling increment with the amount when complete or at the desired batch interval. However, for timers and distribution summaries that doesn't work because the implementation needs the raw samples. In #941 a method was added so the samples could be accumulated in an array and posted in a batch. This helps a bit, but it is cumbersome to use, incurs memory overhead, and the full set of samples have to be traversed and processed when recorded. This change introduces batch updaters that can be customized for a given implementation. So for registries like Atlas where it is possible to accumulate the stats without the memory overhead, it can provide an optimized implementation to do so. --- .../com/netflix/spectator/api/Counter.java | 47 +++++- .../spectator/api/CounterBatchUpdater.java | 55 +++++++ .../netflix/spectator/api/DefaultTimer.java | 4 +- .../api/DistSummaryBatchUpdater.java | 53 +++++++ .../spectator/api/DistributionSummary.java | 38 ++++- .../netflix/spectator/api/SwapCounter.java | 16 +- .../api/SwapDistributionSummary.java | 16 +- .../com/netflix/spectator/api/SwapTimer.java | 16 +- .../java/com/netflix/spectator/api/Timer.java | 52 ++++++- .../spectator/api/TimerBatchUpdater.java | 55 +++++++ .../spectator/api/DefaultCounterTest.java | 62 +++++++- .../api/DefaultDistributionSummaryTest.java | 36 ++++- .../spectator/api/DefaultTimerTest.java | 46 +++++- .../spectator/api/ExpiringRegistry.java | 76 ++++++++++ .../netflix/spectator/api/SwapMeterTest.java | 63 ++++++++ .../netflix/spectator/atlas/BatchUpdates.java | 143 ++++++++++++++++++ .../atlas/AtlasDistSummaryBatchUpdater.java | 82 ++++++++++ .../atlas/AtlasDistributionSummary.java | 19 ++- .../netflix/spectator/atlas/AtlasTimer.java | 17 +++ .../atlas/AtlasTimerBatchUpdater.java | 83 ++++++++++ .../atlas/AtlasDistributionSummaryTest.java | 24 ++- .../spectator/atlas/AtlasTimerTest.java | 26 +++- 22 files changed, 1010 insertions(+), 19 deletions(-) create mode 100644 spectator-api/src/main/java/com/netflix/spectator/api/CounterBatchUpdater.java create mode 100644 spectator-api/src/main/java/com/netflix/spectator/api/DistSummaryBatchUpdater.java create mode 100644 spectator-api/src/main/java/com/netflix/spectator/api/TimerBatchUpdater.java create mode 100644 spectator-reg-atlas/src/jmh/java/com/netflix/spectator/atlas/BatchUpdates.java create mode 100644 spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasDistSummaryBatchUpdater.java create mode 100644 spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasTimerBatchUpdater.java diff --git a/spectator-api/src/main/java/com/netflix/spectator/api/Counter.java b/spectator-api/src/main/java/com/netflix/spectator/api/Counter.java index 2a2df0595..bef9e9c15 100644 --- a/spectator-api/src/main/java/com/netflix/spectator/api/Counter.java +++ b/spectator-api/src/main/java/com/netflix/spectator/api/Counter.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2019 Netflix, Inc. + * Copyright 2014-2022 Netflix, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -50,4 +50,49 @@ default long count() { * often a counter is reset depends on the underlying registry implementation. */ double actualCount(); + + /** + * Returns a helper that can be used to more efficiently update the counter within a + * single thread. For example, if you need to update a meter within a loop where the + * rest of the loop body is fairly cheap, the instrumentation code may add considerable + * overhead if done in the loop body. A batched updater can offset a fair amount of that + * cost, but the updates may be delayed a bit in reaching the meter. The updates will only + * be seen after the updater is explicitly flushed. + * + * The caller should ensure that the updater is closed after using to guarantee any resources + * associated with it are cleaned up. In some cases failure to close the updater could result + * in a memory leak. + * + * @param batchSize + * Number of updates to batch before forcing a flush to the meter. + * @return + * Batch updater implementation for this meter. + */ + default BatchUpdater batchUpdater(int batchSize) { + return new CounterBatchUpdater(this, batchSize); + } + + /** See {@link #batchUpdater(int)}. */ + interface BatchUpdater extends AutoCloseable { + /** Update the counter by one. */ + default void increment() { + add(1.0); + } + + /** + * Update the counter by {@code amount}. + * + * @param amount + * Amount to add to the counter. + */ + default void increment(long amount) { + add(amount); + } + + /** Update the counter by the specified amount. */ + void add(double amount); + + /** Push updates to the associated counter. */ + void flush(); + } } diff --git a/spectator-api/src/main/java/com/netflix/spectator/api/CounterBatchUpdater.java b/spectator-api/src/main/java/com/netflix/spectator/api/CounterBatchUpdater.java new file mode 100644 index 000000000..9e50c5c17 --- /dev/null +++ b/spectator-api/src/main/java/com/netflix/spectator/api/CounterBatchUpdater.java @@ -0,0 +1,55 @@ +/* + * Copyright 2014-2022 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.spectator.api; + +final class CounterBatchUpdater implements Counter.BatchUpdater { + + private final Counter counter; + private final int batchSize; + + private int count; + private double sum; + + CounterBatchUpdater(Counter counter, int batchSize) { + this.counter = counter; + this.batchSize = batchSize; + this.count = 0; + this.sum = 0.0; + } + + @Override + public void add(double amount) { + if (Double.isFinite(amount) && amount > 0.0) { + sum += amount; + ++count; + if (count >= batchSize) { + flush(); + } + } + } + + @Override + public void flush() { + counter.add(sum); + sum = 0.0; + count = 0; + } + + @Override + public void close() throws Exception { + flush(); + } +} diff --git a/spectator-api/src/main/java/com/netflix/spectator/api/DefaultTimer.java b/spectator-api/src/main/java/com/netflix/spectator/api/DefaultTimer.java index 9d3392ade..07bba0de3 100644 --- a/spectator-api/src/main/java/com/netflix/spectator/api/DefaultTimer.java +++ b/spectator-api/src/main/java/com/netflix/spectator/api/DefaultTimer.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2019 Netflix, Inc. + * Copyright 2014-2022 Netflix, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -44,7 +44,7 @@ final class DefaultTimer extends AbstractTimer { } @Override public void record(long amount, TimeUnit unit) { - if (amount >= 0) { + if (amount >= 0L) { final long nanos = TimeUnit.NANOSECONDS.convert(amount, unit); totalTime.addAndGet(nanos); count.incrementAndGet(); diff --git a/spectator-api/src/main/java/com/netflix/spectator/api/DistSummaryBatchUpdater.java b/spectator-api/src/main/java/com/netflix/spectator/api/DistSummaryBatchUpdater.java new file mode 100644 index 000000000..12165eb2e --- /dev/null +++ b/spectator-api/src/main/java/com/netflix/spectator/api/DistSummaryBatchUpdater.java @@ -0,0 +1,53 @@ +/* + * Copyright 2014-2022 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.spectator.api; + +class DistSummaryBatchUpdater implements DistributionSummary.BatchUpdater { + + private final DistributionSummary distSummary; + private final int batchSize; + + private int count; + private final long[] amounts; + + DistSummaryBatchUpdater(DistributionSummary distSummary, int batchSize) { + this.distSummary = distSummary; + this.batchSize = batchSize; + this.count = 0; + this.amounts = new long[batchSize]; + } + + @Override + public void record(long amount) { + if (amount >= 0) { + amounts[count++] = amount; + if (count >= batchSize) { + flush(); + } + } + } + + @Override + public void flush() { + distSummary.record(amounts, count); + count = 0; + } + + @Override + public void close() throws Exception { + flush(); + } +} diff --git a/spectator-api/src/main/java/com/netflix/spectator/api/DistributionSummary.java b/spectator-api/src/main/java/com/netflix/spectator/api/DistributionSummary.java index 897ec8992..95a90d9eb 100644 --- a/spectator-api/src/main/java/com/netflix/spectator/api/DistributionSummary.java +++ b/spectator-api/src/main/java/com/netflix/spectator/api/DistributionSummary.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2019 Netflix, Inc. + * Copyright 2014-2022 Netflix, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -67,4 +67,40 @@ default void record(long[] amounts, int n) { * How often a timer is reset depends on the underlying registry implementation. */ long totalAmount(); + + /** + * Returns a helper that can be used to more efficiently update the distribution summary + * within a single thread. For example, if you need to update a meter within a loop where the + * rest of the loop body is fairly cheap, the instrumentation code may add considerable + * overhead if done in the loop body. A batched updater can offset a fair amount of that + * cost, but the updates may be delayed a bit in reaching the meter. The updates will only + * be seen after the updater is explicitly flushed. + * + * The caller should ensure that the updater is closed after using to guarantee any resources + * associated with it are cleaned up. In some cases failure to close the updater could result + * in a memory leak. + * + * @param batchSize + * Number of updates to batch before forcing a flush to the meter. + * @return + * Batch updater implementation for this meter. + */ + default BatchUpdater batchUpdater(int batchSize) { + return new DistSummaryBatchUpdater(this, batchSize); + } + + /** See {@link #batchUpdater(int)}. */ + interface BatchUpdater extends AutoCloseable { + /** + * Updates the statistics kept by the summary with the specified amount. + * + * @param amount + * Amount for an event being measured. For example, if the size in bytes of responses + * from a server. If the amount is less than 0 the value will be dropped. + */ + void record(long amount); + + /** Push updates to the associated timer. */ + void flush(); + } } diff --git a/spectator-api/src/main/java/com/netflix/spectator/api/SwapCounter.java b/spectator-api/src/main/java/com/netflix/spectator/api/SwapCounter.java index 45869ab46..7ecfbf732 100644 --- a/spectator-api/src/main/java/com/netflix/spectator/api/SwapCounter.java +++ b/spectator-api/src/main/java/com/netflix/spectator/api/SwapCounter.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2019 Netflix, Inc. + * Copyright 2014-2022 Netflix, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,7 +17,9 @@ import com.netflix.spectator.impl.SwapMeter; +import java.util.function.Consumer; import java.util.function.LongSupplier; +import java.util.function.Supplier; /** Wraps another counter allowing the underlying type to be swapped. */ final class SwapCounter extends SwapMeter implements Counter { @@ -38,4 +40,16 @@ final class SwapCounter extends SwapMeter implements Counter { @Override public double actualCount() { return get().actualCount(); } + + @SuppressWarnings("unchecked") + @Override public BatchUpdater batchUpdater(int batchSize) { + BatchUpdater updater = get().batchUpdater(batchSize); + // Registry implementations can implement `Consumer>` to allow the + // meter to be resolved when flushed and avoid needing to hold on to a particular + // instance of the meter that might have expired and been removed from the registry. + if (updater instanceof Consumer) { + ((Consumer>) updater).accept(this::get); + } + return updater; + } } diff --git a/spectator-api/src/main/java/com/netflix/spectator/api/SwapDistributionSummary.java b/spectator-api/src/main/java/com/netflix/spectator/api/SwapDistributionSummary.java index 644813796..a1bb02b8c 100644 --- a/spectator-api/src/main/java/com/netflix/spectator/api/SwapDistributionSummary.java +++ b/spectator-api/src/main/java/com/netflix/spectator/api/SwapDistributionSummary.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2019 Netflix, Inc. + * Copyright 2014-2022 Netflix, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,7 +17,9 @@ import com.netflix.spectator.impl.SwapMeter; +import java.util.function.Consumer; import java.util.function.LongSupplier; +import java.util.function.Supplier; /** Wraps another distribution summary allowing the underlying type to be swapped. */ final class SwapDistributionSummary extends SwapMeter implements DistributionSummary { @@ -52,4 +54,16 @@ public long count() { public long totalAmount() { return get().totalAmount(); } + + @SuppressWarnings("unchecked") + @Override public BatchUpdater batchUpdater(int batchSize) { + BatchUpdater updater = get().batchUpdater(batchSize); + // Registry implementations can implement `Consumer>` to + // allow the meter to be resolved when flushed and avoid needing to hold on to a particular + // instance of the meter that might have expired and been removed from the registry. + if (updater instanceof Consumer) { + ((Consumer>) updater).accept(this::get); + } + return updater; + } } diff --git a/spectator-api/src/main/java/com/netflix/spectator/api/SwapTimer.java b/spectator-api/src/main/java/com/netflix/spectator/api/SwapTimer.java index 6873c29f7..1e5ff4823 100644 --- a/spectator-api/src/main/java/com/netflix/spectator/api/SwapTimer.java +++ b/spectator-api/src/main/java/com/netflix/spectator/api/SwapTimer.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2019 Netflix, Inc. + * Copyright 2014-2022 Netflix, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,7 +19,9 @@ import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import java.util.function.LongSupplier; +import java.util.function.Supplier; /** Wraps another timer allowing the underlying type to be swapped. */ final class SwapTimer extends SwapMeter implements Timer { @@ -64,4 +66,16 @@ final class SwapTimer extends SwapMeter implements Timer { @Override public long totalTime() { return get().totalTime(); } + + @SuppressWarnings("unchecked") + @Override public BatchUpdater batchUpdater(int batchSize) { + BatchUpdater updater = get().batchUpdater(batchSize); + // Registry implementations can implement `Consumer>` to allow the + // meter to be resolved when flushed and avoid needing to hold on to a particular + // instance of the meter that might have expired and been removed from the registry. + if (updater instanceof Consumer) { + ((Consumer>) updater).accept(this::get); + } + return updater; + } } diff --git a/spectator-api/src/main/java/com/netflix/spectator/api/Timer.java b/spectator-api/src/main/java/com/netflix/spectator/api/Timer.java index 96134257b..8eed5eb51 100644 --- a/spectator-api/src/main/java/com/netflix/spectator/api/Timer.java +++ b/spectator-api/src/main/java/com/netflix/spectator/api/Timer.java @@ -31,7 +31,7 @@ */ public interface Timer extends Meter { /** - * Updates the statistics kept by the counter with the specified amount. + * Updates the statistics kept by the timer with the specified amount. * * @param amount * Duration of a single event being measured by this timer. If the amount is less than 0 @@ -42,7 +42,7 @@ public interface Timer extends Meter { void record(long amount, TimeUnit unit); /** - * Updates the statistics kept by the counter with the specified amount. + * Updates the statistics kept by the timer with the specified amount. * * @param amount * Duration of a single event being measured by this timer. @@ -124,4 +124,52 @@ default void record(Duration[] amounts, int n) { * How often a timer is reset depends on the underlying registry implementation. */ long totalTime(); + + /** + * Returns a helper that can be used to more efficiently update the timer within a + * single thread. For example, if you need to update a meter within a loop where the + * rest of the loop body is fairly cheap, the instrumentation code may add considerable + * overhead if done in the loop body. A batched updater can offset a fair amount of that + * cost, but the updates may be delayed a bit in reaching the meter. The updates will only + * be seen after the updater is explicitly flushed. + * + * The caller should ensure that the updater is closed after using to guarantee any resources + * associated with it are cleaned up. In some cases failure to close the updater could result + * in a memory leak. + * + * @param batchSize + * Number of updates to batch before forcing a flush to the meter. + * @return + * Batch updater implementation for this meter. + */ + default BatchUpdater batchUpdater(int batchSize) { + return new TimerBatchUpdater(this, batchSize); + } + + /** See {@link #batchUpdater(int)}. */ + interface BatchUpdater extends AutoCloseable { + /** + * Updates the statistics kept by the timer with the specified amount. + * + * @param amount + * Duration of a single event being measured by this timer. If the amount is less than 0 + * the value will be dropped. + * @param unit + * Time unit for the amount being recorded. + */ + void record(long amount, TimeUnit unit); + + /** + * Updates the statistics kept by the timer with the specified amount. + * + * @param amount + * Duration of a single event being measured by this timer. + */ + default void record(Duration amount) { + record(amount.toNanos(), TimeUnit.NANOSECONDS); + } + + /** Push updates to the associated timer. */ + void flush(); + } } diff --git a/spectator-api/src/main/java/com/netflix/spectator/api/TimerBatchUpdater.java b/spectator-api/src/main/java/com/netflix/spectator/api/TimerBatchUpdater.java new file mode 100644 index 000000000..f1d58bfb9 --- /dev/null +++ b/spectator-api/src/main/java/com/netflix/spectator/api/TimerBatchUpdater.java @@ -0,0 +1,55 @@ +/* + * Copyright 2014-2022 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.spectator.api; + +import java.util.concurrent.TimeUnit; + +final class TimerBatchUpdater implements Timer.BatchUpdater { + + private final Timer timer; + private final int batchSize; + + private int count; + private final long[] amounts; + + TimerBatchUpdater(Timer timer, int batchSize) { + this.timer = timer; + this.batchSize = batchSize; + this.count = 0; + this.amounts = new long[batchSize]; + } + + @Override + public void record(long amount, TimeUnit unit) { + if (amount >= 0L) { + amounts[count++] = unit.toNanos(amount); + if (count >= batchSize) { + flush(); + } + } + } + + @Override + public void flush() { + timer.record(amounts, count, TimeUnit.NANOSECONDS); + count = 0; + } + + @Override + public void close() throws Exception { + flush(); + } +} diff --git a/spectator-api/src/test/java/com/netflix/spectator/api/DefaultCounterTest.java b/spectator-api/src/test/java/com/netflix/spectator/api/DefaultCounterTest.java index 1fbff8832..1ced810cf 100644 --- a/spectator-api/src/test/java/com/netflix/spectator/api/DefaultCounterTest.java +++ b/spectator-api/src/test/java/com/netflix/spectator/api/DefaultCounterTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2019 Netflix, Inc. + * Copyright 2014-2022 Netflix, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,6 +38,20 @@ public void testIncrement() { Assertions.assertEquals(c.count(), 3L); } + @Test + public void testIncrementBatch() throws Exception { + Counter c = new DefaultCounter(clock, NoopId.INSTANCE); + try (Counter.BatchUpdater b = c.batchUpdater(2)) { + b.increment(); + Assertions.assertEquals(c.count(), 0L); + b.increment(); + Assertions.assertEquals(c.count(), 2L); + b.increment(); + Assertions.assertEquals(c.count(), 2L); + } + Assertions.assertEquals(c.count(), 3L); + } + @Test public void testIncrementAmount() { Counter c = new DefaultCounter(clock, NoopId.INSTANCE); @@ -45,6 +59,15 @@ public void testIncrementAmount() { Assertions.assertEquals(c.count(), 42L); } + @Test + public void testIncrementAmountBatch() throws Exception { + Counter c = new DefaultCounter(clock, NoopId.INSTANCE); + try (Counter.BatchUpdater b = c.batchUpdater(2)) { + b.increment(42); + } + Assertions.assertEquals(c.count(), 42L); + } + @Test public void testAddAmount() { Counter c = new DefaultCounter(clock, NoopId.INSTANCE); @@ -52,6 +75,15 @@ public void testAddAmount() { Assertions.assertEquals(c.actualCount(), 42.0, 1e-12); } + @Test + public void testAddAmountBatch() throws Exception { + Counter c = new DefaultCounter(clock, NoopId.INSTANCE); + try (Counter.BatchUpdater b = c.batchUpdater(2)) { + b.add(42.0); + } + Assertions.assertEquals(c.actualCount(), 42.0, 1e-12); + } + @Test public void testAddNegativeAmount() { Counter c = new DefaultCounter(clock, NoopId.INSTANCE); @@ -59,6 +91,15 @@ public void testAddNegativeAmount() { Assertions.assertEquals(c.actualCount(), 0.0, 1e-12); } + @Test + public void testAddNegativeAmountBatch() throws Exception { + Counter c = new DefaultCounter(clock, NoopId.INSTANCE); + try (Counter.BatchUpdater b = c.batchUpdater(2)) { + b.add(-42.0); + } + Assertions.assertEquals(c.actualCount(), 0.0, 1e-12); + } + @Test public void testAddNaN() { Counter c = new DefaultCounter(clock, NoopId.INSTANCE); @@ -67,6 +108,16 @@ public void testAddNaN() { Assertions.assertEquals(c.actualCount(), 1.0, 1e-12); } + @Test + public void testAddNaNBatch() throws Exception { + Counter c = new DefaultCounter(clock, NoopId.INSTANCE); + try (Counter.BatchUpdater b = c.batchUpdater(2)) { + b.add(1.0); + b.add(Double.NaN); + } + Assertions.assertEquals(c.actualCount(), 1.0, 1e-12); + } + @Test public void testAddInfinity() { Counter c = new DefaultCounter(clock, NoopId.INSTANCE); @@ -74,6 +125,15 @@ public void testAddInfinity() { Assertions.assertEquals(c.actualCount(), 0.0, 1e-12); } + @Test + public void testAddInfinityBatch() throws Exception { + Counter c = new DefaultCounter(clock, NoopId.INSTANCE); + try (Counter.BatchUpdater b = c.batchUpdater(2)) { + b.add(Double.POSITIVE_INFINITY); + } + Assertions.assertEquals(c.actualCount(), 0.0, 1e-12); + } + @Test public void testMeasure() { Counter c = new DefaultCounter(clock, NoopId.INSTANCE); diff --git a/spectator-api/src/test/java/com/netflix/spectator/api/DefaultDistributionSummaryTest.java b/spectator-api/src/test/java/com/netflix/spectator/api/DefaultDistributionSummaryTest.java index 92d4cd956..5d5c88400 100644 --- a/spectator-api/src/test/java/com/netflix/spectator/api/DefaultDistributionSummaryTest.java +++ b/spectator-api/src/test/java/com/netflix/spectator/api/DefaultDistributionSummaryTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2019 Netflix, Inc. + * Copyright 2014-2022 Netflix, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,6 +38,20 @@ public void testRecord() { Assertions.assertEquals(t.totalAmount(), 42L); } + @Test + public void testRecordBatch() throws Exception { + DistributionSummary t = new DefaultDistributionSummary(clock, NoopId.INSTANCE); + try (DistributionSummary.BatchUpdater b = t.batchUpdater(2)) { + b.record(42); + b.record(42); + Assertions.assertEquals(t.count(), 2L); + Assertions.assertEquals(t.totalAmount(), 84L); + b.record(1); + } + Assertions.assertEquals(t.count(), 3L); + Assertions.assertEquals(t.totalAmount(), 85L); + } + @Test public void testRecordNegative() { DistributionSummary t = new DefaultDistributionSummary(clock, NoopId.INSTANCE); @@ -46,6 +60,16 @@ public void testRecordNegative() { Assertions.assertEquals(t.totalAmount(), 0L); } + @Test + public void testRecordNegativeBatch() throws Exception { + DistributionSummary t = new DefaultDistributionSummary(clock, NoopId.INSTANCE); + try (DistributionSummary.BatchUpdater b = t.batchUpdater(2)) { + b.record(-42); + } + Assertions.assertEquals(t.count(), 0L); + Assertions.assertEquals(t.totalAmount(), 0L); + } + @Test public void testRecordZero() { DistributionSummary t = new DefaultDistributionSummary(clock, NoopId.INSTANCE); @@ -54,6 +78,16 @@ public void testRecordZero() { Assertions.assertEquals(t.totalAmount(), 0L); } + @Test + public void testRecordZeroBatch() throws Exception { + DistributionSummary t = new DefaultDistributionSummary(clock, NoopId.INSTANCE); + try (DistributionSummary.BatchUpdater b = t.batchUpdater(2)) { + b.record(0); + } + Assertions.assertEquals(t.count(), 1L); + Assertions.assertEquals(t.totalAmount(), 0L); + } + @Test public void testMeasure() { DistributionSummary t = new DefaultDistributionSummary(clock, new DefaultId("foo")); diff --git a/spectator-api/src/test/java/com/netflix/spectator/api/DefaultTimerTest.java b/spectator-api/src/test/java/com/netflix/spectator/api/DefaultTimerTest.java index 8507ca405..fde213a3c 100644 --- a/spectator-api/src/test/java/com/netflix/spectator/api/DefaultTimerTest.java +++ b/spectator-api/src/test/java/com/netflix/spectator/api/DefaultTimerTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2019 Netflix, Inc. + * Copyright 2014-2022 Netflix, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -41,6 +41,20 @@ public void testRecord() { Assertions.assertEquals(t.totalTime(), 42000000L); } + @Test + public void testRecordBatch() throws Exception { + Timer t = new DefaultTimer(clock, NoopId.INSTANCE); + try (Timer.BatchUpdater b = t.batchUpdater(2)) { + b.record(42, TimeUnit.MILLISECONDS); + b.record(42, TimeUnit.MILLISECONDS); + Assertions.assertEquals(t.count(), 2L); + Assertions.assertEquals(t.totalTime(), 84000000L); + b.record(1, TimeUnit.MILLISECONDS); + } + Assertions.assertEquals(t.count(), 3L); + Assertions.assertEquals(t.totalTime(), 85000000L); + } + @Test public void testRecordDuration() { Timer t = new DefaultTimer(clock, NoopId.INSTANCE); @@ -49,6 +63,16 @@ public void testRecordDuration() { Assertions.assertEquals(t.totalTime(), 42000000L); } + @Test + public void testRecordDurationBatch() throws Exception { + Timer t = new DefaultTimer(clock, NoopId.INSTANCE); + try (Timer.BatchUpdater b = t.batchUpdater(2)) { + b.record(Duration.ofMillis(42)); + } + Assertions.assertEquals(t.count(), 1L); + Assertions.assertEquals(t.totalTime(), 42000000L); + } + @Test public void testRecordNegative() { Timer t = new DefaultTimer(clock, NoopId.INSTANCE); @@ -57,6 +81,16 @@ public void testRecordNegative() { Assertions.assertEquals(t.totalTime(), 0L); } + @Test + public void testRecordNegativeBatch() throws Exception { + Timer t = new DefaultTimer(clock, NoopId.INSTANCE); + try (Timer.BatchUpdater b = t.batchUpdater(2)) { + b.record(-42, TimeUnit.MILLISECONDS); + } + Assertions.assertEquals(t.count(), 0L); + Assertions.assertEquals(t.totalTime(), 0L); + } + @Test public void testRecordZero() { Timer t = new DefaultTimer(clock, NoopId.INSTANCE); @@ -65,6 +99,16 @@ public void testRecordZero() { Assertions.assertEquals(t.totalTime(), 0L); } + @Test + public void testRecordZeroBatch() throws Exception { + Timer t = new DefaultTimer(clock, NoopId.INSTANCE); + try (Timer.BatchUpdater b = t.batchUpdater(2)) { + b.record(0, TimeUnit.MILLISECONDS); + } + Assertions.assertEquals(t.count(), 1L); + Assertions.assertEquals(t.totalTime(), 0L); + } + @Test public void testRecordCallable() throws Exception { Timer t = new DefaultTimer(clock, NoopId.INSTANCE); diff --git a/spectator-api/src/test/java/com/netflix/spectator/api/ExpiringRegistry.java b/spectator-api/src/test/java/com/netflix/spectator/api/ExpiringRegistry.java index c916442e3..7aff950b0 100644 --- a/spectator-api/src/test/java/com/netflix/spectator/api/ExpiringRegistry.java +++ b/spectator-api/src/test/java/com/netflix/spectator/api/ExpiringRegistry.java @@ -17,6 +17,8 @@ import java.util.Collections; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Supplier; public class ExpiringRegistry extends AbstractRegistry { @@ -48,9 +50,36 @@ public ExpiringRegistry(Clock clock) { @Override public boolean hasExpired() { return clock().wallTime() > creationTime; } + + @Override public BatchUpdater batchUpdater(int batchSize) { + return new CounterUpdater(); + } }; } + private static class CounterUpdater implements Counter.BatchUpdater, Consumer> { + + private Supplier counterSupplier; + private double sum; + + @Override public void add(double amount) { + sum += amount; + } + + @Override public void flush() { + counterSupplier.get().add(sum); + sum = 0.0; + } + + @Override public void close() throws Exception { + flush(); + } + + @Override public void accept(Supplier counterSupplier) { + this.counterSupplier = counterSupplier; + } + } + @Override protected DistributionSummary newDistributionSummary(Id id) { return new DistributionSummary() { private final long creationTime = clock().wallTime(); @@ -79,9 +108,33 @@ public ExpiringRegistry(Clock clock) { @Override public boolean hasExpired() { return clock().wallTime() > creationTime; } + + @Override public BatchUpdater batchUpdater(int batchSize) { + return new DistributionSummaryUpdater(); + } }; } + private static class DistributionSummaryUpdater + implements DistributionSummary.BatchUpdater, Consumer> { + + private Supplier distSummarySupplier; + + @Override public void record(long amount) { + distSummarySupplier.get().record(amount); + } + + @Override public void flush() { + } + + @Override public void close() throws Exception { + } + + @Override public void accept(Supplier distSummarySupplier) { + this.distSummarySupplier = distSummarySupplier; + } + } + @Override protected Timer newTimer(Id id) { return new AbstractTimer(clock()) { private final long creationTime = clock().wallTime(); @@ -110,9 +163,32 @@ public ExpiringRegistry(Clock clock) { @Override public boolean hasExpired() { return clock().wallTime() > creationTime; } + + @Override public BatchUpdater batchUpdater(int batchSize) { + return new TimerUpdater(); + } }; } + private static class TimerUpdater implements Timer.BatchUpdater, Consumer> { + + private Supplier timerSupplier; + + @Override public void record(long amount, TimeUnit unit) { + timerSupplier.get().record(amount, unit); + } + + @Override public void flush() { + } + + @Override public void close() throws Exception { + } + + @Override public void accept(Supplier timerSupplier) { + this.timerSupplier = timerSupplier; + } + } + @Override protected Gauge newGauge(Id id) { return new Gauge() { private final long creationTime = clock().wallTime(); diff --git a/spectator-api/src/test/java/com/netflix/spectator/api/SwapMeterTest.java b/spectator-api/src/test/java/com/netflix/spectator/api/SwapMeterTest.java index 188f70c75..de26dd5c5 100644 --- a/spectator-api/src/test/java/com/netflix/spectator/api/SwapMeterTest.java +++ b/spectator-api/src/test/java/com/netflix/spectator/api/SwapMeterTest.java @@ -60,6 +60,69 @@ public void wrapExpiredCounter() { Assertions.assertEquals(1, s1.count()); } + @Test + public void wrappedCounterBatchUpdater() throws Exception { + Counter c = new DefaultCounter(clock, counterId); + SwapCounter sc = new SwapCounter(registry, VERSION, counterId, c); + try (Counter.BatchUpdater b = sc.batchUpdater(2)) { + b.increment(); + } + Assertions.assertEquals(1, c.count()); + Assertions.assertEquals(1, sc.count()); + } + + @Test + public void wrappedCounterBatchUpdaterCustom() throws Exception { + ExpiringRegistry registry = new ExpiringRegistry(clock); + Counter c = registry.counter(counterId); + try (Counter.BatchUpdater b = c.batchUpdater(2)) { + b.increment(); + } + Assertions.assertEquals(1, c.count()); + } + + @Test + public void wrappedTimerBatchUpdater() throws Exception { + Timer t = new DefaultTimer(clock, timerId); + SwapTimer st = new SwapTimer(registry, VERSION, timerId, t); + try (Timer.BatchUpdater b = st.batchUpdater(2)) { + b.record(1, TimeUnit.NANOSECONDS); + } + Assertions.assertEquals(1, t.count()); + Assertions.assertEquals(1, st.count()); + } + + @Test + public void wrappedTimerBatchUpdaterCustom() throws Exception { + ExpiringRegistry registry = new ExpiringRegistry(clock); + Timer t = registry.timer(timerId); + try (Timer.BatchUpdater b = t.batchUpdater(2)) { + b.record(1, TimeUnit.NANOSECONDS); + } + Assertions.assertEquals(1, t.count()); + } + + @Test + public void wrappedDistributionSummaryBatchUpdater() throws Exception { + DistributionSummary d = new DefaultDistributionSummary(clock, distSummaryId); + SwapDistributionSummary sd = new SwapDistributionSummary(registry, VERSION, distSummaryId, d); + try (DistributionSummary.BatchUpdater b = sd.batchUpdater(2)) { + b.record(1); + } + Assertions.assertEquals(1, d.count()); + Assertions.assertEquals(1, sd.count()); + } + + @Test + public void wrappedDistributionSummaryBatchUpdaterCustom() throws Exception { + ExpiringRegistry registry = new ExpiringRegistry(clock); + DistributionSummary d = registry.distributionSummary(distSummaryId); + try (DistributionSummary.BatchUpdater b = d.batchUpdater(2)) { + b.record(1); + } + Assertions.assertEquals(1, d.count()); + } + @Test public void wrapExpiredTimer() { ExpiringRegistry registry = new ExpiringRegistry(clock); diff --git a/spectator-reg-atlas/src/jmh/java/com/netflix/spectator/atlas/BatchUpdates.java b/spectator-reg-atlas/src/jmh/java/com/netflix/spectator/atlas/BatchUpdates.java new file mode 100644 index 000000000..d3c3ad834 --- /dev/null +++ b/spectator-reg-atlas/src/jmh/java/com/netflix/spectator/atlas/BatchUpdates.java @@ -0,0 +1,143 @@ +/* + * Copyright 2014-2022 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.spectator.atlas; + +import com.netflix.spectator.api.Clock; +import com.netflix.spectator.api.Counter; +import com.netflix.spectator.api.DistributionSummary; +import com.netflix.spectator.api.Registry; +import com.netflix.spectator.api.Timer; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.infra.Blackhole; + +import java.util.concurrent.TimeUnit; + +/** + *
+ * Benchmark                          Mode  Cnt      Score      Error   Units
+ *
+ * noInstrumentation                 thrpt    5   3978.248 ±  136.863   ops/s
+ *
+ * counter                           thrpt    5     14.138 ±    0.229   ops/s
+ * counterBatch                      thrpt    5    464.445 ±    8.175   ops/s
+ *
+ * distSummary                       thrpt    5      9.383 ±    0.732   ops/s
+ * distSummaryBatch                  thrpt    5    353.769 ±   10.698   ops/s
+ *
+ * timer                             thrpt    5     10.505 ±    0.170   ops/s
+ * timerBatch                        thrpt    5    336.505 ±    3.538   ops/s
+ * 
+ */ +@State(Scope.Thread) +public class BatchUpdates { + + private Registry registry; + + @Setup + public void setup() { + registry = new AtlasRegistry(Clock.SYSTEM, System::getProperty); + } + + @TearDown + public void tearDown() { + registry = null; + } + + @Benchmark + public void noInstrumentation(Blackhole bh) { + long sum = 0L; + for (int i = 0; i < 1_000_000; ++i) { + sum += i; + } + bh.consume(sum); + } + + @Benchmark + public void counter(Blackhole bh) { + Counter c = registry.counter("test"); + long sum = 0L; + for (int i = 0; i < 1_000_000; ++i) { + sum += i; + c.increment(); + } + bh.consume(sum); + } + + @Benchmark + public void counterBatch(Blackhole bh) throws Exception { + Counter c = registry.counter("test"); + try (Counter.BatchUpdater b = c.batchUpdater(100_000)) { + long sum = 0L; + for (int i = 0; i < 1_000_000; ++i) { + sum += i; + b.increment(); + } + bh.consume(sum); + } + } + + @Benchmark + public void timer(Blackhole bh) { + Timer t = registry.timer("test"); + long sum = 0L; + for (int i = 0; i < 1_000_000; ++i) { + sum += i; + t.record(i, TimeUnit.MILLISECONDS); + } + bh.consume(sum); + } + + @Benchmark + public void timerBatch(Blackhole bh) throws Exception { + Timer t = registry.timer("test"); + try (Timer.BatchUpdater b = t.batchUpdater(100_000)) { + long sum = 0L; + for (int i = 0; i < 1_000_000; ++i) { + sum += i; + b.record(i, TimeUnit.MILLISECONDS); + } + bh.consume(sum); + } + } + + @Benchmark + public void distSummary(Blackhole bh) { + DistributionSummary d = registry.distributionSummary("test"); + long sum = 0L; + for (int i = 0; i < 1_000_000; ++i) { + sum += i; + d.record(i); + } + bh.consume(sum); + } + + @Benchmark + public void distSummaryBatch(Blackhole bh) throws Exception { + DistributionSummary d = registry.distributionSummary("test"); + try (DistributionSummary.BatchUpdater b = d.batchUpdater(100_000)) { + long sum = 0L; + for (int i = 0; i < 1_000_000; ++i) { + sum += i; + b.record(i); + } + bh.consume(sum); + } + } +} diff --git a/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasDistSummaryBatchUpdater.java b/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasDistSummaryBatchUpdater.java new file mode 100644 index 000000000..8eecc1e34 --- /dev/null +++ b/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasDistSummaryBatchUpdater.java @@ -0,0 +1,82 @@ +/* + * Copyright 2014-2022 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.spectator.atlas; + +import com.netflix.spectator.api.DistributionSummary; + +import java.util.function.Consumer; +import java.util.function.Supplier; + +final class AtlasDistSummaryBatchUpdater + implements DistributionSummary.BatchUpdater, Consumer> { + + private Supplier distSummarySupplier; + private final int batchSize; + + private int count; + private long total; + private double totalOfSquares; + private long max; + + AtlasDistSummaryBatchUpdater(int batchSize) { + this.batchSize = batchSize; + } + + @Override + public void accept(Supplier distSummarySupplier) { + this.distSummarySupplier = distSummarySupplier; + } + + private AtlasDistributionSummary getDistributionSummary() { + if (distSummarySupplier != null) { + DistributionSummary d = distSummarySupplier.get(); + return (d instanceof AtlasDistributionSummary) ? (AtlasDistributionSummary) d : null; + } + return null; + } + + @Override + public void record(long amount) { + ++count; + if (amount > 0L) { + total += amount; + totalOfSquares += (double) amount * amount; + if (amount > max) { + max = amount; + } + } + if (count >= batchSize) { + flush(); + } + } + + @Override + public void flush() { + AtlasDistributionSummary distSummary = getDistributionSummary(); + if (distSummary != null) { + distSummary.update(count, total, totalOfSquares, max); + count = 0; + total = 0L; + totalOfSquares = 0.0; + max = 0L; + } + } + + @Override + public void close() throws Exception { + flush(); + } +} diff --git a/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasDistributionSummary.java b/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasDistributionSummary.java index 639b3f057..b67ae54ad 100644 --- a/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasDistributionSummary.java +++ b/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasDistributionSummary.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2019 Netflix, Inc. + * Copyright 2014-2022 Netflix, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -142,4 +142,21 @@ private void updateMax(AtomicLong maxValue, long v) { @Override public long totalAmount() { return total.poll(); } + + @Override public BatchUpdater batchUpdater(int batchSize) { + AtlasDistSummaryBatchUpdater updater = new AtlasDistSummaryBatchUpdater(batchSize); + updater.accept(() -> this); + return updater; + } + + /** + * Helper to allow the batch updater to directly update the individual stats. + */ + void update(long count, long total, double totalOfSquares, long max) { + long now = clock.wallTime(); + this.count.getCurrent(now).addAndGet(count); + this.total.getCurrent(now).addAndGet(total); + this.totalOfSquares.getCurrent(now).addAndGet(totalOfSquares); + updateMax(this.max.getCurrent(now), max); + } } diff --git a/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasTimer.java b/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasTimer.java index 7cfba4bea..a8eb0c952 100644 --- a/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasTimer.java +++ b/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasTimer.java @@ -194,4 +194,21 @@ private void updateMax(AtomicLong maxValue, long v) { // unit tests so it is rarely a problem in practice. API can be revisited in 2.0. return (long) total.poll(); } + + @Override public BatchUpdater batchUpdater(int batchSize) { + AtlasTimerBatchUpdater updater = new AtlasTimerBatchUpdater(batchSize); + updater.accept(() -> this); + return updater; + } + + /** + * Helper to allow the batch updater to directly update the individual stats. + */ + void update(long count, double total, double totalOfSquares, long max) { + long now = clock.wallTime(); + this.count.getCurrent(now).addAndGet(count); + this.total.getCurrent(now).addAndGet(total); + this.totalOfSquares.getCurrent(now).addAndGet(totalOfSquares); + updateMax(this.max.getCurrent(now), max); + } } diff --git a/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasTimerBatchUpdater.java b/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasTimerBatchUpdater.java new file mode 100644 index 000000000..859f920be --- /dev/null +++ b/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasTimerBatchUpdater.java @@ -0,0 +1,83 @@ +/* + * Copyright 2014-2022 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.spectator.atlas; + +import com.netflix.spectator.api.Timer; + +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Supplier; + +final class AtlasTimerBatchUpdater implements Timer.BatchUpdater, Consumer> { + + private Supplier timerSupplier; + private final int batchSize; + + private int count; + private double total; + private double totalOfSquares; + private long max; + + AtlasTimerBatchUpdater(int batchSize) { + this.batchSize = batchSize; + } + + @Override + public void accept(Supplier timerSupplier) { + this.timerSupplier = timerSupplier; + } + + private AtlasTimer getTimer() { + if (timerSupplier != null) { + Timer t = timerSupplier.get(); + return (t instanceof AtlasTimer) ? (AtlasTimer) t : null; + } + return null; + } + + @Override + public void record(long amount, TimeUnit unit) { + ++count; + if (amount > 0L) { + final long nanos = unit.toNanos(amount); + total += nanos; + totalOfSquares += (double) nanos * nanos; + if (nanos > max) { + max = nanos; + } + } + if (count >= batchSize) { + flush(); + } + } + + @Override + public void flush() { + AtlasTimer timer = getTimer(); + if (timer != null) { + timer.update(count, total, totalOfSquares, max); + count = 0; + total = 0.0; + totalOfSquares = 0.0; + max = 0L; + } + } + + @Override + public void close() throws Exception { + flush(); + } +} diff --git a/spectator-reg-atlas/src/test/java/com/netflix/spectator/atlas/AtlasDistributionSummaryTest.java b/spectator-reg-atlas/src/test/java/com/netflix/spectator/atlas/AtlasDistributionSummaryTest.java index 6da965966..e0d09e2a9 100644 --- a/spectator-reg-atlas/src/test/java/com/netflix/spectator/atlas/AtlasDistributionSummaryTest.java +++ b/spectator-reg-atlas/src/test/java/com/netflix/spectator/atlas/AtlasDistributionSummaryTest.java @@ -18,6 +18,7 @@ import java.util.Arrays; import com.netflix.spectator.api.DefaultRegistry; +import com.netflix.spectator.api.DistributionSummary; import com.netflix.spectator.api.Id; import com.netflix.spectator.api.Measurement; import com.netflix.spectator.api.Registry; @@ -29,9 +30,8 @@ public class AtlasDistributionSummaryTest { private final CountingManualClock clock = new CountingManualClock(); - private final Registry registry = new DefaultRegistry(); private final long step = 10000L; - private final AtlasDistributionSummary dist = new AtlasDistributionSummary(registry.createId("test"), clock, step, step); + private final AtlasDistributionSummary dist = new AtlasDistributionSummary(Id.create("test"), clock, step, step); private void checkValue(long count, long amount, long square, long max) { int num = 0; @@ -110,6 +110,26 @@ public void recordSeveralValues() { checkValue(4, 1 + 2 + 3 + 1, 1 + 4 + 9 + 1, 3); } + public void recordSeveralValuesBatch(int batchSize) throws Exception { + try (DistributionSummary.BatchUpdater b = dist.batchUpdater(batchSize)) { + b.record(1); + b.record(2); + b.record(3); + b.record(1); + } + clock.setWallTime(step + 1); + checkValue(4, 1 + 2 + 3 + 1, 1 + 4 + 9 + 1, 3); + } + + @Test + public void recordSeveralValuesBatch() throws Exception { + recordSeveralValuesBatch(1); + recordSeveralValuesBatch(2); + recordSeveralValuesBatch(3); + recordSeveralValuesBatch(4); + recordSeveralValuesBatch(5); + } + @Test public void recordBatchMismatchedLengths() { dist.record(new long[0], 1); diff --git a/spectator-reg-atlas/src/test/java/com/netflix/spectator/atlas/AtlasTimerTest.java b/spectator-reg-atlas/src/test/java/com/netflix/spectator/atlas/AtlasTimerTest.java index 72dd59a6f..a13274ec2 100644 --- a/spectator-reg-atlas/src/test/java/com/netflix/spectator/atlas/AtlasTimerTest.java +++ b/spectator-reg-atlas/src/test/java/com/netflix/spectator/atlas/AtlasTimerTest.java @@ -15,10 +15,9 @@ */ package com.netflix.spectator.atlas; -import com.netflix.spectator.api.DefaultRegistry; import com.netflix.spectator.api.Id; import com.netflix.spectator.api.Measurement; -import com.netflix.spectator.api.Registry; +import com.netflix.spectator.api.Timer; import com.netflix.spectator.api.Utils; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -30,9 +29,8 @@ public class AtlasTimerTest { private final CountingManualClock clock = new CountingManualClock(); - private final Registry registry = new DefaultRegistry(); private final long step = 10000L; - private final AtlasTimer dist = new AtlasTimer(registry.createId("test"), clock, step, step); + private final AtlasTimer dist = new AtlasTimer(Id.create("test"), clock, step, step); private void checkValue(long count, double amount, double square, long max) { int num = 0; @@ -144,6 +142,26 @@ public void recordSeveralValues() { checkValue(4, 1 + 2 + 3 + 1, 1 + 4 + 9 + 1, 3); } + public void recordSeveralValuesBatch(int batchSize) throws Exception { + try (Timer.BatchUpdater b = dist.batchUpdater(batchSize)) { + b.record(1, TimeUnit.NANOSECONDS); + b.record(2, TimeUnit.NANOSECONDS); + b.record(3, TimeUnit.NANOSECONDS); + b.record(1, TimeUnit.NANOSECONDS); + } + clock.setWallTime(step + 1); + checkValue(4, 1 + 2 + 3 + 1, 1 + 4 + 9 + 1, 3); + } + + @Test + public void recordSeveralValuesBatch() throws Exception { + recordSeveralValuesBatch(1); + recordSeveralValuesBatch(2); + recordSeveralValuesBatch(3); + recordSeveralValuesBatch(4); + recordSeveralValuesBatch(5); + } + @Test public void recordBatchMismatchedLengths() { dist.record(new long[0], 1, TimeUnit.NANOSECONDS);