diff --git a/README.md b/README.md index e7fd5409..433af4eb 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ repositories { } dependencies { - compile group: 'org.radarcns', name: 'radar-commons', version: '0.8.0' + compile group: 'org.radarcns', name: 'radar-commons', version: '0.8.1' } ``` @@ -26,7 +26,7 @@ repositories { } dependencies { - testCompile group: 'org.radarcns', name: 'radar-commons-testing', version: '0.8.0' + testCompile group: 'org.radarcns', name: 'radar-commons-testing', version: '0.8.1' } ``` @@ -51,7 +51,7 @@ configurations.all { } dependencies { - compile group: 'org.radarcns', name: 'radar-commons', version: '0.8.1-SNAPSHOT', changing: true + compile group: 'org.radarcns', name: 'radar-commons', version: '0.8.2-SNAPSHOT', changing: true } ``` diff --git a/build.gradle b/build.gradle index f8d5ab72..cba93e9b 100644 --- a/build.gradle +++ b/build.gradle @@ -36,7 +36,7 @@ allprojects { // Configuration // //---------------------------------------------------------------------------// - version = '0.8.0' + version = '0.8.1' group = 'org.radarcns' ext.githubRepoName = 'RADAR-CNS/RADAR-Commons' diff --git a/src/main/java/org/radarcns/producer/rest/RestTopicSender.java b/src/main/java/org/radarcns/producer/rest/RestTopicSender.java index 9f36fbb4..f0fbf7d3 100644 --- a/src/main/java/org/radarcns/producer/rest/RestTopicSender.java +++ b/src/main/java/org/radarcns/producer/rest/RestTopicSender.java @@ -85,7 +85,7 @@ public void send(RecordData records) throws IOException { logger.debug("Added message to topic {} -> {}", topic, responseBody(response)); } - } else if (response.code() == 401 || response.code() == 403 || response.code() == 422) { + } else if (response.code() == 401 || response.code() == 403) { state.wasUnauthorized(); } else if (response.code() == 415 && Objects.equals(request.header("Accept"), KAFKA_REST_ACCEPT_ENCODING)) { diff --git a/src/main/java/org/radarcns/stream/collector/NumericAggregateCollector.java b/src/main/java/org/radarcns/stream/collector/NumericAggregateCollector.java index 7f949461..a06ff945 100644 --- a/src/main/java/org/radarcns/stream/collector/NumericAggregateCollector.java +++ b/src/main/java/org/radarcns/stream/collector/NumericAggregateCollector.java @@ -16,41 +16,44 @@ package org.radarcns.stream.collector; -import static org.radarcns.util.Serialization.floatToDouble; - import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import java.math.BigDecimal; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; +import com.fasterxml.jackson.annotation.JsonSetter; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder; import org.apache.avro.Schema; import org.apache.avro.Schema.Field; import org.apache.avro.Schema.Type; import org.apache.avro.specific.SpecificRecord; +import java.math.BigDecimal; +import java.util.List; +import java.util.Objects; + +import static org.radarcns.util.Serialization.floatToDouble; + /** * Java class to aggregate data using Kafka Streams. Double is the base type. - * Only the sum and sorted history are collected, other values are calculated on request. + * Only the sum and sorted history are collected, other getSamples are calculated on request. */ +@JsonDeserialize(builder = NumericAggregateCollector.Builder.class) public class NumericAggregateCollector implements RecordCollector { private final String name; private final int pos; private final Type fieldType; + private double min; + private double max; private BigDecimal sum; - private final List history; + private final UniformSamplingReservoir reservoir; - @JsonCreator - public NumericAggregateCollector( - @JsonProperty("name") String name, @JsonProperty("pos") int pos, - @JsonProperty("fieldType") Type fieldType, @JsonProperty("sum") BigDecimal sum, - @JsonProperty("history") List history) { - this.name = name; - this.pos = pos; - this.fieldType = fieldType; - this.sum = sum; - this.history = new ArrayList<>(history); + public NumericAggregateCollector(Builder builder) { + this.name = builder.nameValue; + this.pos = builder.posValue; + this.fieldType = builder.fieldTypeValue; + this.min = builder.minValue; + this.max = builder.maxValue; + this.sum = builder.sumValue; + this.reservoir = builder.reservoirValue; } public NumericAggregateCollector(String fieldName) { @@ -59,38 +62,44 @@ public NumericAggregateCollector(String fieldName) { public NumericAggregateCollector(String fieldName, Schema schema) { sum = BigDecimal.ZERO; - this.history = new ArrayList<>(); + min = Double.POSITIVE_INFINITY; + max = Double.NEGATIVE_INFINITY; + reservoir = new UniformSamplingReservoir(); - this.name = fieldName; + name = fieldName; if (schema == null) { - this.pos = -1; - this.fieldType = null; + pos = -1; + fieldType = null; } else { Field field = schema.getField(fieldName); if (field == null) { throw new IllegalArgumentException( "Field " + fieldName + " does not exist in schema " + schema.getFullName()); } - this.pos = field.pos(); - - Type apparentType = field.schema().getType(); - if (apparentType == Type.UNION) { - for (Schema subSchema : field.schema().getTypes()) { - if (subSchema.getType() != Type.NULL) { - apparentType = subSchema.getType(); - break; - } + pos = field.pos(); + fieldType = getType(field); + } + } + + private static Type getType(Field field) { + Type apparentType = field.schema().getType(); + if (apparentType == Type.UNION) { + for (Schema subSchema : field.schema().getTypes()) { + if (subSchema.getType() != Type.NULL) { + apparentType = subSchema.getType(); + break; } } - fieldType = apparentType; + } - if (fieldType != Type.DOUBLE - && fieldType != Type.FLOAT - && fieldType != Type.INT - && fieldType != Type.LONG) { - throw new IllegalArgumentException("Field " + fieldName + " is not a number type."); - } + if (apparentType != Type.DOUBLE + && apparentType != Type.FLOAT + && apparentType != Type.INT + && apparentType != Type.LONG) { + throw new IllegalArgumentException("Field " + field.name() + " is not a number type."); } + + return apparentType; } @Override @@ -120,12 +129,12 @@ public NumericAggregateCollector add(float value) { */ public NumericAggregateCollector add(double value) { sum = sum.add(BigDecimal.valueOf(value)); - - int index = Collections.binarySearch(history, value); - if (index >= 0) { - history.add(index, value); - } else { - history.add(-index - 1, value); + reservoir.add(value); + if (value > max) { + max = value; + } + if (value < min) { + min = value; } return this; @@ -138,18 +147,17 @@ public String toString() { + ", min=" + getMin() + ", max=" + getMax() + ", sum=" + getSum() - + ", count=" + getCount() + ", mean=" + getMean() + ", quartile=" + getQuartile() - + ", history=" + history + '}'; + + ", reservoir=" + reservoir + '}'; } public double getMin() { - return history.get(0); + return min; } public double getMax() { - return history.get(history.size() - 1); + return max; } public double getSum() { @@ -157,38 +165,15 @@ public double getSum() { } public int getCount() { - return history.size(); + return reservoir.getCount(); } public double getMean() { - return sum.doubleValue() / history.size(); + return sum.doubleValue() / getCount(); } public List getQuartile() { - int length = history.size(); - - List quartiles; - if (length == 1) { - Double elem = history.get(0); - quartiles = Arrays.asList(elem, elem, elem); - } else { - quartiles = new ArrayList<>(3); - for (int i = 1; i <= 3; i++) { - double pos = i * (length + 1) / 4.0d; // == i * 25 * (length + 1) / 100 - int intPos = (int) pos; - if (intPos == 0) { - quartiles.add(history.get(0)); - } else if (intPos == length) { - quartiles.add(history.get(length - 1)); - } else { - double diff = pos - intPos; - double base = history.get(intPos - 1); - quartiles.add(base + diff * (history.get(intPos) - base)); - } - } - } - - return quartiles; + return reservoir.getQuartiles(); } public double getInterQuartileRange() { @@ -200,4 +185,106 @@ public double getInterQuartileRange() { public String getName() { return name; } + + protected UniformSamplingReservoir getReservoir() { + return reservoir; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + NumericAggregateCollector that = (NumericAggregateCollector) o; + return pos == that.pos + && Double.compare(that.min, min) == 0 + && Double.compare(that.max, max) == 0 + && Objects.equals(name, that.name) + && fieldType == that.fieldType + && Objects.equals(sum, that.sum) + && Objects.equals(reservoir, that.reservoir); + } + + @Override + public int hashCode() { + return Objects.hash(name, pos, fieldType, min, max, sum, reservoir); + } + + @JsonPOJOBuilder(withPrefix = "") + public static class Builder { + private double maxValue = Double.NEGATIVE_INFINITY; + private double minValue = Double.POSITIVE_INFINITY; + private final String nameValue; + private int posValue = -1; + private Type fieldTypeValue = null; + private BigDecimal sumValue = BigDecimal.ZERO; + private UniformSamplingReservoir reservoirValue = new UniformSamplingReservoir(); + + @JsonCreator + public Builder(@JsonProperty("name") String name) { + this.nameValue = Objects.requireNonNull(name); + } + + @JsonSetter + public Builder pos(int pos) { + posValue = pos; + return this; + } + + @JsonSetter + public Builder fieldType(Type fieldType) { + fieldTypeValue = fieldType; + return this; + } + + @JsonSetter + public Builder min(double min) { + if (min < minValue) { + minValue = min; + } + return this; + } + + @JsonSetter + public Builder max(double max) { + if (max > maxValue) { + maxValue = max; + } + return this; + } + + @JsonSetter + public Builder sum(BigDecimal sum) { + sumValue = sum; + return this; + } + + @JsonSetter + public Builder reservoir(UniformSamplingReservoir reservoir) { + this.reservoirValue = reservoir; + return this; + } + + /** + * For backwards compatibility purposes, convert a full history to a reservoir. + * @param history stored history. + * @return the current builder. + * @deprecated use reservoir instead. + */ + @Deprecated + @JsonSetter + public Builder history(List history) { + min(history.get(0)); + max(history.get(history.size() - 1)); + reservoir(new UniformSamplingReservoir(history)); + return this; + } + + public NumericAggregateCollector build() { + return new NumericAggregateCollector(this); + } + } } diff --git a/src/main/java/org/radarcns/stream/collector/UniformSamplingReservoir.java b/src/main/java/org/radarcns/stream/collector/UniformSamplingReservoir.java new file mode 100644 index 00000000..7df655f5 --- /dev/null +++ b/src/main/java/org/radarcns/stream/collector/UniformSamplingReservoir.java @@ -0,0 +1,179 @@ +package org.radarcns.stream.collector; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ThreadLocalRandom; + +/** + * Uniform sampling reservoir for streaming. This should capture the input distribution in order + * to compute quartiles, using so-called Algorithm R. + * + *

The maximum size of the reservoir can be increased to get more accurate quartile estimations. + * As long as the number of samples is lower than the maximum size of the reservoir, the quartiles + * are computed exactly. + */ +public class UniformSamplingReservoir { + private final List samples; + private final int maxSize; + private int count; + private static final int MAX_SIZE_DEFAULT = 999; + + /** Empty reservoir with default maximum size. */ + public UniformSamplingReservoir() { + this(Collections.emptyList(), 0, MAX_SIZE_DEFAULT); + } + + /** + * Create a reservoir that samples from given values. + * @param allValues list of values to sample from. + * @throws NullPointerException if given allValues are {@code null}. + */ + public UniformSamplingReservoir(List allValues) { + this(allValues, allValues.size(), MAX_SIZE_DEFAULT); + } + + /** + * Create a reservoir that samples from given values. + * @param samples list of values to sample from. + * @param count current size of the number of samples that the reservoir represents. + * @param maxSize maximum reservoir size. + * @throws NullPointerException if given allValues are {@code null} + */ + @JsonCreator + public UniformSamplingReservoir( + @JsonProperty("samples") List samples, + @JsonProperty("count") int count, + @JsonProperty("maxSize") int maxSize) { + this.samples = new ArrayList<>(Objects.requireNonNull(samples)); + + if (maxSize <= 0) { + throw new IllegalArgumentException("Reservoir size must be strictly positive"); + } + this.maxSize = maxSize; + + if (count < 0) { + throw new IllegalArgumentException("Reservoir size must be positive"); + } + this.count = count; + + + int toRemove = this.samples.size() - maxSize; + if (toRemove > 0) { + ThreadLocalRandom random = ThreadLocalRandom.current(); + for (int i = 0; i < toRemove; i++) { + this.samples.remove(random.nextInt(this.samples.size())); + } + } + Collections.sort(this.samples); + } + + /** Add a sample to the reservoir. */ + public void add(double value) { + boolean doAdd; + int removeIndex; + + if (count < maxSize) { + doAdd = true; + } else { + removeIndex = ThreadLocalRandom.current().nextInt(count); + if (removeIndex < maxSize) { + samples.remove(removeIndex); + doAdd = true; + } else { + doAdd = false; + } + } + + if (doAdd) { + int index = Collections.binarySearch(samples, value); + if (index >= 0) { + samples.add(index, value); + } else { + samples.add(-index - 1, value); + } + } + + count++; + } + + /** + * Get the quartiles of the underlying distribution. If the number of samples is larger than + * the maximum size of the reservoir, this will be an estimate. + * @return list with size three, of the 25, 50 and 75 percentiles. + */ + public List getQuartiles() { + int length = samples.size(); + + List quartiles; + if (length == 1) { + Double elem = samples.get(0); + quartiles = Arrays.asList(elem, elem, elem); + } else { + quartiles = new ArrayList<>(3); + for (int i = 1; i <= 3; i++) { + double pos = i * (length + 1) * 0.25d; // 25 percentile steps + int intPos = (int) pos; + if (intPos == 0) { + quartiles.add(samples.get(0)); + } else if (intPos == length) { + quartiles.add(samples.get(length - 1)); + } else { + double diff = pos - intPos; + double base = samples.get(intPos - 1); + quartiles.add(base + diff * (samples.get(intPos) - base)); + } + } + } + + return quartiles; + } + + /** Get the currently stored samples. */ + public List getSamples() { + return Collections.unmodifiableList(samples); + } + + /** Get the maximum size of this reservoir. */ + public int getMaxSize() { + return maxSize; + } + + /** Get the number of samples that are being represented by the reservoir. */ + public int getCount() { + return count; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + UniformSamplingReservoir that = (UniformSamplingReservoir) o; + return count == that.count + && maxSize == that.maxSize + && Objects.equals(samples, that.samples); + } + + @Override + public int hashCode() { + return Objects.hash(samples, maxSize, count); + } + + @Override + public String toString() { + return "UniformSamplingReservoir{" + + "samples=" + samples + + ", maxSize=" + maxSize + + ", count=" + count + + '}'; + } +} diff --git a/src/test/java/org/radarcns/stream/collector/AggregateListCollectorTest.java b/src/test/java/org/radarcns/stream/collector/AggregateListCollectorTest.java index 63602c88..1f7199bc 100644 --- a/src/test/java/org/radarcns/stream/collector/AggregateListCollectorTest.java +++ b/src/test/java/org/radarcns/stream/collector/AggregateListCollectorTest.java @@ -31,7 +31,11 @@ public void add() { new String[]{"a", "b", "c", "d"}); double[] arrayvalues = {0.15d, 1.0d, 2.0d, 3.0d}; arrayCollector.add(arrayvalues); - assertEquals("[DoubleValueCollector{name=a, min=0.15, max=0.15, sum=0.15, count=1, mean=0.15, quartile=[0.15, 0.15, 0.15], history=[0.15]}, DoubleValueCollector{name=b, min=1.0, max=1.0, sum=1.0, count=1, mean=1.0, quartile=[1.0, 1.0, 1.0], history=[1.0]}, DoubleValueCollector{name=c, min=2.0, max=2.0, sum=2.0, count=1, mean=2.0, quartile=[2.0, 2.0, 2.0], history=[2.0]}, DoubleValueCollector{name=d, min=3.0, max=3.0, sum=3.0, count=1, mean=3.0, quartile=[3.0, 3.0, 3.0], history=[3.0]}]" , arrayCollector.toString()); + assertEquals(4, arrayCollector.getCollectors().size()); + assertEquals(0.15, arrayCollector.getCollectors().get(0).getMin(), 0.0d); + assertEquals(1.0, arrayCollector.getCollectors().get(1).getMin(), 0.0d); + assertEquals(2.0, arrayCollector.getCollectors().get(2).getMin(), 0.0d); + assertEquals(3.0, arrayCollector.getCollectors().get(3).getMin(), 0.0d); } @Test @@ -39,6 +43,10 @@ public void addRecord() { AggregateListCollector arrayCollector = new AggregateListCollector(new String[] {"x", "y", "z"}, EmpaticaE4Acceleration.getClassSchema()); arrayCollector.add(new EmpaticaE4Acceleration(0d, 0d, 0.15f, 1.0f, 2.0f)); - assertEquals("[DoubleValueCollector{name=x, min=0.15, max=0.15, sum=0.15, count=1, mean=0.15, quartile=[0.15, 0.15, 0.15], history=[0.15]}, DoubleValueCollector{name=y, min=1.0, max=1.0, sum=1.0, count=1, mean=1.0, quartile=[1.0, 1.0, 1.0], history=[1.0]}, DoubleValueCollector{name=z, min=2.0, max=2.0, sum=2.0, count=1, mean=2.0, quartile=[2.0, 2.0, 2.0], history=[2.0]}]" , arrayCollector.toString()); + + assertEquals(3, arrayCollector.getCollectors().size()); + assertEquals(0.15, arrayCollector.getCollectors().get(0).getMin(), 0.0d); + assertEquals(1.0, arrayCollector.getCollectors().get(1).getMin(), 0.0d); + assertEquals(2.0, arrayCollector.getCollectors().get(2).getMin(), 0.0d); } } diff --git a/src/test/java/org/radarcns/stream/collector/NumericAggregateCollectorTest.java b/src/test/java/org/radarcns/stream/collector/NumericAggregateCollectorTest.java index c8fd03f7..8d2bde35 100644 --- a/src/test/java/org/radarcns/stream/collector/NumericAggregateCollectorTest.java +++ b/src/test/java/org/radarcns/stream/collector/NumericAggregateCollectorTest.java @@ -18,6 +18,9 @@ import static org.junit.Assert.assertEquals; +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.PropertyAccessor; +import com.fasterxml.jackson.databind.ObjectMapper; import org.junit.Before; import org.junit.Test; import org.radarcns.kafka.AggregateKey; @@ -25,16 +28,21 @@ import org.radarcns.passive.empatica.EmpaticaE4BloodVolumePulse; import org.radarcns.passive.phone.PhoneBatteryLevel; +import java.io.IOException; +import java.math.BigDecimal; +import java.util.Arrays; + /** * Created by nivethika on 20-12-16. */ public class NumericAggregateCollectorTest { - private NumericAggregateCollector valueCollector ; + private NumericAggregateCollector valueCollector; @Before public void setUp() { - this.valueCollector = new NumericAggregateCollector("test"); + this.valueCollector = new NumericAggregateCollector.Builder("test") + .build(); } @Test @@ -154,4 +162,41 @@ public void testAddRecordWithNull() { assertEquals(2, valueCollector.getCount()); assertEquals(1.5d, valueCollector.getMean(), 1e-5d); } + + @Test + public void testSerialization() throws IOException { + ObjectMapper mapper = new ObjectMapper(); + + mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.NONE); + mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); + + valueCollector = new NumericAggregateCollector.Builder("test") + .history(Arrays.asList(-1d, 15d)) + .sum(BigDecimal.valueOf(14)) + .build(); + + String valueString = mapper.writeValueAsString(valueCollector); + System.out.println(valueString); + assertEquals(valueCollector, mapper.readValue(valueString, NumericAggregateCollector.class)); + } + + @Test + public void testReservoirBuilder() { + valueCollector = new NumericAggregateCollector.Builder("test") + .reservoir(new UniformSamplingReservoir(Arrays.asList(-1d, 15d), 2, 1)) + .build(); + + assertEquals(2, valueCollector.getCount()); + assertEquals(1, valueCollector.getReservoir().getSamples().size()); + } + + @Test + public void testReservoirBuilderUnlimited() { + valueCollector = new NumericAggregateCollector.Builder("name") + .reservoir(new UniformSamplingReservoir(Arrays.asList(-1d, 15d), 2, 1000)) + .build(); + + assertEquals(2, valueCollector.getCount()); + assertEquals(2, valueCollector.getReservoir().getSamples().size()); + } } diff --git a/src/test/java/org/radarcns/stream/collector/UniformSamplingReservoirTest.java b/src/test/java/org/radarcns/stream/collector/UniformSamplingReservoirTest.java new file mode 100644 index 00000000..136740eb --- /dev/null +++ b/src/test/java/org/radarcns/stream/collector/UniformSamplingReservoirTest.java @@ -0,0 +1,70 @@ +package org.radarcns.stream.collector; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +import static org.junit.Assert.*; + +public class UniformSamplingReservoirTest { + @Test + public void add() { + UniformSamplingReservoir reservoir = new UniformSamplingReservoir(Arrays.asList(0.1, 0.3, 0.5), 3, 3); + reservoir.add(0.7); + assertEquals(3, reservoir.getSamples().size()); + assertEquals(3, reservoir.getMaxSize()); + reservoir.add(0.7); + assertEquals(3, reservoir.getSamples().size()); + reservoir.add(0.7); + assertEquals(3, reservoir.getSamples().size()); + } + + @Test + public void addRandom() { + UniformSamplingReservoir reservoir = new UniformSamplingReservoir(Collections.emptyList(), 0, 50); + + ThreadLocalRandom random = ThreadLocalRandom.current(); + for (int i = 0; i < 100; i++) { + reservoir.add(random.nextDouble(-1.0, 1.0)); + assertTrue(isOrdered(reservoir.getSamples())); + assertTrue(reservoir.getSamples().size() <= 50); + } + assertEquals(50, reservoir.getSamples().size()); + } + + @Test + public void addFromRandom() { + UniformSamplingReservoir reservoir = new UniformSamplingReservoir(Collections.emptyList(), 0, 50); + + ThreadLocalRandom random = ThreadLocalRandom.current(); + + double[] chooseFrom = {-0.1, Double.NEGATIVE_INFINITY, Double.NaN, 1.0}; + + for (int i = 0; i < 100; i++) { + reservoir.add(chooseFrom[random.nextInt(chooseFrom.length)]); + assertTrue(isOrdered(reservoir.getSamples())); + assertTrue(reservoir.getSamples().size() <= 50); + } + assertEquals(50, reservoir.getSamples().size()); + } + + private static > boolean isOrdered(List list) { + Iterator iterator = list.iterator(); + if (!iterator.hasNext()) { + return true; + } + T previous = iterator.next(); + while (iterator.hasNext()) { + T current = iterator.next(); + if (previous.compareTo(current) > 0) { + return false; + } + previous = current; + } + return true; + } +} \ No newline at end of file