diff --git a/README.md b/README.md index 4c785930..1626885c 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ repositories { } dependencies { - compile group: 'org.radarcns', name: 'radar-commons', version: '0.6.3' + compile group: 'org.radarcns', name: 'radar-commons', version: '0.7' } ``` @@ -26,7 +26,7 @@ repositories { } dependencies { - testCompile group: 'org.radarcns', name: 'radar-commons-testing', version: '0.6.3' + testCompile group: 'org.radarcns', name: 'radar-commons-testing', version: '0.7' } ``` @@ -51,7 +51,7 @@ configurations.all { } dependencies { - compile group: 'org.radarcns', name: 'radar-commons', version: '0.6.4-SNAPSHOT', changing: true + compile group: 'org.radarcns', name: 'radar-commons', version: '0.7.1-SNAPSHOT', changing: true } ``` diff --git a/build.gradle b/build.gradle index 4c5c87d0..58562ae1 100644 --- a/build.gradle +++ b/build.gradle @@ -16,8 +16,8 @@ plugins { // Get bintray version - id 'com.jfrog.bintray' version '1.7.3' - id 'com.jfrog.artifactory' version '4.4.18' + id 'com.jfrog.bintray' version '1.8.0' + id 'com.jfrog.artifactory' version '4.5.4' } allprojects { @@ -36,23 +36,21 @@ allprojects { // Configuration // //---------------------------------------------------------------------------// - version = '0.6.3' + version = '0.7' group = 'org.radarcns' ext.githubRepoName = 'RADAR-CNS/RADAR-Commons' - ext.slf4jVersion = '1.7.21' + ext.slf4jVersion = '1.7.25' ext.kafkaVersion = '0.11.0.1' ext.avroVersion = '1.8.2' - ext.confluentVersion = '3.3.0' - ext.log4jVersion = '2.7' - ext.jacksonVersion = '2.8.5' - ext.okhttpVersion = '3.8.0' + ext.confluentVersion = '3.3.1' + ext.jacksonVersion = '2.9.3' + ext.okhttpVersion = '3.9.1' ext.junitVersion = '4.12' - ext.mockitoVersion = '2.2.29' - ext.mathVersion = '3.0' + ext.mockitoVersion = '2.13.0' ext.hamcrestVersion = '1.3' - ext.codacyVersion = '1.0.10' - ext.radarSchemasVersion = '0.2' + ext.codacyVersion = '2.0.1' + ext.radarSchemasVersion = '0.2.3' ext.orgJsonVersion = '20170516' ext.githubUrl = 'https://github.com/' + githubRepoName + '.git' @@ -366,6 +364,6 @@ artifactoryPublish { } task wrapper(type: Wrapper) { - gradleVersion = '4.1' + gradleVersion = '4.4' distributionType 'all' } diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index 7a3265ee..01b8bf6b 100644 Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index bf1b63c3..b6517bb1 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -2,4 +2,4 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-4.1-all.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-4.4-all.zip diff --git a/radar-commons-testing/src/main/java/org/radarcns/mock/MockDevice.java b/radar-commons-testing/src/main/java/org/radarcns/mock/MockDevice.java index 976f48b3..99c1011d 100644 --- a/radar-commons-testing/src/main/java/org/radarcns/mock/MockDevice.java +++ b/radar-commons-testing/src/main/java/org/radarcns/mock/MockDevice.java @@ -39,7 +39,7 @@ public class MockDevice extends Thread { private static final Logger logger = LoggerFactory.getLogger(MockDevice.class); private final int baseFrequency; - private final KafkaSender sender; + private final KafkaSender sender; private final AtomicBoolean stopping; private final List> generators; private final K key; @@ -52,8 +52,7 @@ public class MockDevice extends Thread { * @param key key to send all messages with * @param generators data generators that produce the data we send */ - public MockDevice(KafkaSender sender, K key, - List> generators) { + public MockDevice(KafkaSender sender, K key, List> generators) { this.generators = generators; this.key = key; baseFrequency = computeBaseFrequency(generators); @@ -86,7 +85,7 @@ public void run() { int frequency = generators.get(i).getConfig().getFrequency(); if (frequency > 0 && beat % (baseFrequency / frequency) == 0) { Record record = recordIterators.get(i).next(); - topicSenders.get(i).send(record.offset, record.key, record.value); + topicSenders.get(i).send(record.key, record.value); } } } diff --git a/radar-commons-testing/src/main/java/org/radarcns/mock/MockFileSender.java b/radar-commons-testing/src/main/java/org/radarcns/mock/MockFileSender.java index 3bf5926a..7ae4c94e 100644 --- a/radar-commons-testing/src/main/java/org/radarcns/mock/MockFileSender.java +++ b/radar-commons-testing/src/main/java/org/radarcns/mock/MockFileSender.java @@ -16,14 +16,16 @@ package org.radarcns.mock; -import java.io.IOException; -import org.apache.avro.specific.SpecificRecord; +import org.radarcns.data.AvroRecordData; import org.radarcns.data.Record; -import org.radarcns.kafka.ObservationKey; import org.radarcns.mock.data.MockCsvParser; import org.radarcns.producer.KafkaSender; import org.radarcns.producer.KafkaTopicSender; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; + /** * Send mock data from a CSV file. * @@ -33,8 +35,7 @@ public class MockFileSender { private final KafkaSender sender; private final MockCsvParser parser; - public MockFileSender(KafkaSender sender, - MockCsvParser parser) { + public MockFileSender(KafkaSender sender, MockCsvParser parser) { this.parser = parser; this.sender = sender; } @@ -46,10 +47,11 @@ public MockFileSender(KafkaSender sender, @SuppressWarnings("unchecked") public void send() throws IOException { try (KafkaTopicSender topicSender = sender.sender(parser.getTopic())) { + Collection records = new ArrayList<>(); while (parser.hasNext()) { - Record record = parser.next(); - topicSender.send(record.offset, record.key, record.value); + records.add(parser.next()); } + topicSender.send(new AvroRecordData(parser.getTopic(), records)); } } } diff --git a/radar-commons-testing/src/main/java/org/radarcns/mock/MockProducer.java b/radar-commons-testing/src/main/java/org/radarcns/mock/MockProducer.java index 0ab94ed7..8f4d1198 100644 --- a/radar-commons-testing/src/main/java/org/radarcns/mock/MockProducer.java +++ b/radar-commons-testing/src/main/java/org/radarcns/mock/MockProducer.java @@ -16,46 +16,45 @@ package org.radarcns.mock; -import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG; -import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; -import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; -import static org.radarcns.util.serde.AbstractKafkaAvroSerde.SCHEMA_REGISTRY_CONFIG; - -import java.io.File; -import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.avro.specific.SpecificRecord; import org.radarcns.config.ServerConfig; import org.radarcns.config.YamlConfigLoader; -import org.radarcns.data.SpecificRecordEncoder; +import org.radarcns.kafka.ObservationKey; +import org.radarcns.mock.config.BasicMockConfig; +import org.radarcns.mock.config.MockDataConfig; +import org.radarcns.mock.data.MockCsvParser; +import org.radarcns.mock.data.RecordGenerator; import org.radarcns.passive.empatica.EmpaticaE4Acceleration; import org.radarcns.passive.empatica.EmpaticaE4BatteryLevel; import org.radarcns.passive.empatica.EmpaticaE4BloodVolumePulse; import org.radarcns.passive.empatica.EmpaticaE4ElectroDermalActivity; import org.radarcns.passive.empatica.EmpaticaE4InterBeatInterval; import org.radarcns.passive.empatica.EmpaticaE4Temperature; -import org.radarcns.kafka.ObservationKey; -import org.radarcns.mock.config.BasicMockConfig; -import org.radarcns.mock.config.MockDataConfig; -import org.radarcns.mock.data.MockCsvParser; -import org.radarcns.mock.data.RecordGenerator; +import org.radarcns.producer.BatchedKafkaSender; import org.radarcns.producer.KafkaSender; -import org.radarcns.producer.rest.SchemaRetriever; import org.radarcns.producer.direct.DirectSender; -import org.radarcns.producer.BatchedKafkaSender; import org.radarcns.producer.rest.ConnectionState; import org.radarcns.producer.rest.ManagedConnectionPool; import org.radarcns.producer.rest.RestSender; +import org.radarcns.producer.rest.SchemaRetriever; import org.radarcns.util.serde.KafkaAvroSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; +import static org.radarcns.util.serde.AbstractKafkaAvroSerde.SCHEMA_REGISTRY_CONFIG; + /** * A Mock Producer class that can be used to stream data. It can use MockFileSender and MockDevice * for testing purposes, with direct or indirect streaming. @@ -66,7 +65,7 @@ public class MockProducer { private final List> devices; private final List files; - private final List> senders; + private final List senders; private final SchemaRetriever retriever; /** @@ -89,7 +88,7 @@ public MockProducer(BasicMockConfig mockConfig, File root) throws IOException { int numDevices = mockConfig.getNumberOfDevices(); retriever = new SchemaRetriever(mockConfig.getSchemaRegistry(), 10); - List> tmpSenders = null; + List tmpSenders = null; try { devices = new ArrayList<>(numDevices); @@ -127,7 +126,7 @@ public MockProducer(BasicMockConfig mockConfig, File root) throws IOException { } } catch (Exception ex) { if (tmpSenders != null) { - for (KafkaSender sender : tmpSenders) { + for (KafkaSender sender : tmpSenders) { sender.close(); } } @@ -138,7 +137,7 @@ public MockProducer(BasicMockConfig mockConfig, File root) throws IOException { senders = tmpSenders; } - private List> createSenders( + private List createSenders( BasicMockConfig mockConfig, int numDevices) { if (mockConfig.isDirectProducer()) { @@ -150,9 +149,9 @@ private List> createSenders( } /** Create senders that directly produce data to Kafka. */ - private List> createDirectSenders(int numDevices, + private List createDirectSenders(int numDevices, SchemaRetriever retriever, String brokerPaths) { - List> result = new ArrayList<>(numDevices); + List result = new ArrayList<>(numDevices); for (int i = 0; i < numDevices; i++) { Properties properties = new Properties(); properties.put(KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); @@ -160,32 +159,29 @@ private List> createDirectSenders(in properties.put(SCHEMA_REGISTRY_CONFIG, retriever); properties.put(BOOTSTRAP_SERVERS_CONFIG, brokerPaths); - result.add(new DirectSender(properties)); + result.add(new DirectSender(properties)); } return result; } /** Create senders that produce data to Kafka via the REST proxy. */ - private List> createRestSenders(int numDevices, + private List createRestSenders(int numDevices, SchemaRetriever retriever, ServerConfig restProxy, boolean useCompression) { - List> result = new ArrayList<>(numDevices); + List result = new ArrayList<>(numDevices); ConnectionState sharedState = new ConnectionState(10, TimeUnit.SECONDS); - RestSender.Builder restBuilder = - new RestSender.Builder() + RestSender.Builder restBuilder = + new RestSender.Builder() .server(restProxy) .schemaRetriever(retriever) .useCompression(useCompression) - .encoders(new SpecificRecordEncoder(false), - new SpecificRecordEncoder(false)) .connectionState(sharedState) .connectionTimeout(10, TimeUnit.SECONDS); for (int i = 0; i < numDevices; i++) { - RestSender firstSender = restBuilder + RestSender firstSender = restBuilder .connectionPool(new ManagedConnectionPool()) .build(); - - result.add(new BatchedKafkaSender<>(firstSender, 1_000, 1000)); + result.add(new BatchedKafkaSender(firstSender, 1000, 1000)); } return result; } @@ -214,7 +210,7 @@ public void shutdown() throws IOException, InterruptedException { device.join(5_000L); } logger.info("Closing channels"); - for (KafkaSender sender : senders) { + for (KafkaSender sender : senders) { sender.close(); } retriever.close(); diff --git a/radar-commons-testing/src/main/java/org/radarcns/mock/data/MockCsvParser.java b/radar-commons-testing/src/main/java/org/radarcns/mock/data/MockCsvParser.java index 25556e68..5c9c0ce1 100644 --- a/radar-commons-testing/src/main/java/org/radarcns/mock/data/MockCsvParser.java +++ b/radar-commons-testing/src/main/java/org/radarcns/mock/data/MockCsvParser.java @@ -50,7 +50,6 @@ public class MockCsvParser implements Closeable { private final BufferedReader bufferedReader; private final FileReader fileReader; private List currentLine; - private long offset; /** * Base constructor. @@ -62,7 +61,7 @@ public MockCsvParser(MockDataConfig config, File root) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException, IOException { //noinspection unchecked - topic = (AvroTopic) config.parseAvroTopic(); + topic = config.parseAvroTopic(); fileReader = new FileReader(config.getDataFile(root)); bufferedReader = new BufferedReader(fileReader); @@ -73,7 +72,6 @@ public MockCsvParser(MockDataConfig config, File root) headerMap.put(header.get(i), i); } currentLine = csvReader.parseLine(); - offset = 0; } public AvroTopic getTopic() { @@ -99,7 +97,7 @@ public Record next() throws IOException { currentLine = csvReader.parseLine(); - return new Record<>(offset++, key, value); + return new Record<>(key, value); } /** diff --git a/radar-commons-testing/src/main/java/org/radarcns/mock/data/RecordGenerator.java b/radar-commons-testing/src/main/java/org/radarcns/mock/data/RecordGenerator.java index 8d7d270b..112f9f88 100644 --- a/radar-commons-testing/src/main/java/org/radarcns/mock/data/RecordGenerator.java +++ b/radar-commons-testing/src/main/java/org/radarcns/mock/data/RecordGenerator.java @@ -65,8 +65,7 @@ public RecordGenerator(MockDataConfig config, Class keyClass) this.config = config; // doing type checking below. - //noinspection unchecked - topic = (AvroTopic) config.parseAvroTopic(); + topic = config.parseAvroTopic(); if (!topic.getKeyClass().equals(keyClass)) { throw new IllegalArgumentException( "RecordGenerator only generates ObservationKey keys, not " @@ -147,7 +146,7 @@ public Iterator> iterateRawValues(K key, long duration) { * @return list containing simulated values */ public Iterator> iterateValues(final K key, final long duration) { - return new RecordIterator<>(duration, key); + return new RecordIterator(duration, key); } /** @@ -212,17 +211,14 @@ public AvroTopic getTopic() { return topic; } - private class RecordIterator implements - Iterator> { + private class RecordIterator implements Iterator> { private final Metronome timestamps; private final K key; - private long offset; public RecordIterator(long duration, K key) { this.key = key; timestamps = new Metronome(duration * config.getFrequency() / 1000L, config.getFrequency()); - offset = 0; } @Override @@ -268,7 +264,7 @@ public Record next() { value.put(f.pos(), f.defaultVal()); } - return new Record<>(offset++, key, value); + return new Record<>(key, value); } @Override diff --git a/radar-commons-testing/src/test/java/org/radarcns/mock/RecordGeneratorTest.java b/radar-commons-testing/src/test/java/org/radarcns/mock/RecordGeneratorTest.java index 73201b48..f230e428 100644 --- a/radar-commons-testing/src/test/java/org/radarcns/mock/RecordGeneratorTest.java +++ b/radar-commons-testing/src/test/java/org/radarcns/mock/RecordGeneratorTest.java @@ -48,7 +48,6 @@ public void generate() throws Exception { Iterator> iter = generator .iterateValues(new ObservationKey("test", "a", "b"), 0); Record record = iter.next(); - assertEquals(0, record.offset); assertEquals(new ObservationKey("test", "a", "b"), record.key); float x = ((EmpaticaE4Acceleration)record.value).getX(); assertTrue(x >= 0.1f && x < 9.9f); @@ -61,7 +60,6 @@ public void generate() throws Exception { && time <= System.currentTimeMillis() / 1000d); Record nextRecord = iter.next(); - assertEquals(1, nextRecord.offset); assertEquals(time + 0.1d, (Double)nextRecord.value.get(0), 1e-6); } diff --git a/src/main/java/org/radarcns/data/AvroRecordData.java b/src/main/java/org/radarcns/data/AvroRecordData.java new file mode 100644 index 00000000..1573b5d0 --- /dev/null +++ b/src/main/java/org/radarcns/data/AvroRecordData.java @@ -0,0 +1,81 @@ +package org.radarcns.data; + +import org.radarcns.topic.AvroTopic; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Collection; +import java.util.Iterator; + +public class AvroRecordData implements RecordData { + private final AvroTopic topic; + private final Collection> records; + private final AvroEncoder.AvroWriter keyEncoder; + private final AvroEncoder.AvroWriter valueEncoder; + + public AvroRecordData(AvroTopic topic, Collection> records) + throws IOException { + this.topic = topic; + this.records = records; + SpecificRecordEncoder encoder = new SpecificRecordEncoder(false); + this.keyEncoder = encoder.writer(topic.getKeySchema(), topic.getKeyClass()); + this.valueEncoder = encoder.writer(topic.getValueSchema(), topic.getValueClass()); + } + + public AvroTopic getTopic() { + return topic; + } + + @Override + public Iterator> iterator() { + return records.iterator(); + } + + public boolean isEmpty() { + return records.isEmpty(); + } + + @Override + public Iterator rawIterator() { + return new InputStreamIterator(); + } + + private class InputStreamIterator implements Iterator { + private V value = null; + private final Iterator> recordIterator; + + InputStreamIterator() { + recordIterator = records.iterator(); + } + + @Override + public boolean hasNext() { + return value != null || recordIterator.hasNext(); + } + + @Override + public InputStream next() { + try { + byte[] encoded; + if (value == null) { + Record record = recordIterator.next(); + value = record.value; + encoded = keyEncoder.encode(record.key); + } else { + encoded = valueEncoder.encode(value); + value = null; + } + return new ByteArrayInputStream(encoded); + } catch (IOException ex) { + throw new IllegalStateException("Cannot encode record", ex); + } + } + + @Override + public void remove() { + recordIterator.remove(); + value = null; + } + } +} diff --git a/src/main/java/org/radarcns/data/Record.java b/src/main/java/org/radarcns/data/Record.java index 91167ad1..595f72ad 100644 --- a/src/main/java/org/radarcns/data/Record.java +++ b/src/main/java/org/radarcns/data/Record.java @@ -25,13 +25,11 @@ * @param value type */ public class Record { - public final long offset; public final K key; public final V value; public final long milliTimeAdded; - public Record(long offset, K key, V value) { - this.offset = offset; + public Record(K key, V value) { this.key = key; this.value = value; this.milliTimeAdded = System.currentTimeMillis(); diff --git a/src/main/java/org/radarcns/data/RecordData.java b/src/main/java/org/radarcns/data/RecordData.java new file mode 100644 index 00000000..4c8ffc55 --- /dev/null +++ b/src/main/java/org/radarcns/data/RecordData.java @@ -0,0 +1,15 @@ +package org.radarcns.data; + +import org.radarcns.topic.AvroTopic; + +import java.io.InputStream; +import java.util.Iterator; + +public interface RecordData extends Iterable> { + + AvroTopic getTopic(); + + Iterator rawIterator(); + + boolean isEmpty(); +} diff --git a/src/main/java/org/radarcns/producer/BatchedKafkaSender.java b/src/main/java/org/radarcns/producer/BatchedKafkaSender.java index 0e57f276..e8f3bd97 100644 --- a/src/main/java/org/radarcns/producer/BatchedKafkaSender.java +++ b/src/main/java/org/radarcns/producer/BatchedKafkaSender.java @@ -16,7 +16,9 @@ package org.radarcns.producer; +import org.radarcns.data.AvroRecordData; import org.radarcns.data.Record; +import org.radarcns.data.RecordData; import org.radarcns.topic.AvroTopic; import java.io.IOException; @@ -29,23 +31,20 @@ * flush or close are not called within this given age, the data will also not be sent. Calling * {@link #close()} will not flush or close the KafkaTopicSender that were created. That must be * done separately. - * - * @param base key class - * @param base value class */ -public class BatchedKafkaSender implements KafkaSender { - private final KafkaSender wrappedSender; +public class BatchedKafkaSender implements KafkaSender { + private final KafkaSender wrappedSender; private final int ageMillis; private final int maxBatchSize; - public BatchedKafkaSender(KafkaSender sender, int ageMillis, int maxBatchSize) { + public BatchedKafkaSender(KafkaSender sender, int ageMillis, int maxBatchSize) { this.wrappedSender = sender; this.ageMillis = ageMillis; this.maxBatchSize = maxBatchSize; } @Override - public KafkaTopicSender sender(final AvroTopic topic) + public KafkaTopicSender sender(final AvroTopic topic) throws IOException { return new BatchedKafkaTopicSender<>(topic); } @@ -65,53 +64,49 @@ public synchronized void close() throws IOException { wrappedSender.close(); } - private class BatchedKafkaTopicSender implements - KafkaTopicSender { - private final List> cache; - private final KafkaTopicSender topicSender; + private class BatchedKafkaTopicSender implements + KafkaTopicSender { + private final List> cache; + private final KafkaTopicSender topicSender; + private final AvroTopic topic; - private BatchedKafkaTopicSender(AvroTopic topic) throws IOException { + private BatchedKafkaTopicSender(AvroTopic topic) throws IOException { cache = new ArrayList<>(); + this.topic = topic; topicSender = wrappedSender.sender(topic); } @Override - public void send(long offset, L key, W value) throws IOException { + public void send(K key, V value) throws IOException { if (!isConnected()) { throw new IOException("Cannot send records to unconnected producer."); } - cache.add(new Record<>(offset, key, value)); - - if (exceedsBuffer(cache)) { - topicSender.send(cache); - cache.clear(); - } + trySend(new Record<>(key, value)); } @Override - public void send(List> records) throws IOException { + public void send(RecordData records) throws IOException { if (records.isEmpty()) { return; } - if (cache.isEmpty()) { - if (exceedsBuffer(records)) { - topicSender.send(records); - } else { - cache.addAll(records); - } - } else { - cache.addAll(records); - - if (exceedsBuffer(cache)) { - topicSender.send(cache); - cache.clear(); - } + for (Record record : records) { + trySend(record); } } - @Override - public long getLastSentOffset() { - return topicSender.getLastSentOffset(); + private void trySend(Record record) throws IOException { + boolean doSend; + if (record == null) { + doSend = !cache.isEmpty(); + } else { + cache.add(record); + doSend = exceedsBuffer(cache); + } + + if (doSend) { + topicSender.send(new AvroRecordData<>(topic, cache)); + cache.clear(); + } } @Override @@ -122,10 +117,7 @@ public void clear() { @Override public void flush() throws IOException { - if (!cache.isEmpty()) { - topicSender.send(cache); - cache.clear(); - } + trySend(null); topicSender.flush(); } @@ -138,7 +130,7 @@ public void close() throws IOException { } } - private boolean exceedsBuffer(List> records) { + private boolean exceedsBuffer(List> records) { return records.size() >= maxBatchSize || System.currentTimeMillis() - records.get(0).milliTimeAdded >= ageMillis; } diff --git a/src/main/java/org/radarcns/producer/KafkaSender.java b/src/main/java/org/radarcns/producer/KafkaSender.java index 68c30945..a9696261 100644 --- a/src/main/java/org/radarcns/producer/KafkaSender.java +++ b/src/main/java/org/radarcns/producer/KafkaSender.java @@ -16,18 +16,18 @@ package org.radarcns.producer; +import org.radarcns.topic.AvroTopic; + import java.io.Closeable; import java.io.IOException; -import org.radarcns.topic.AvroTopic; /** * Thread-safe sender. Calling {@link #close()} must be done after all {@link KafkaTopicSender} * senders created with {@link #sender(AvroTopic)} have been called. */ -public interface KafkaSender extends Closeable { +public interface KafkaSender extends Closeable { /** Get a non thread-safe sender instance. */ - KafkaTopicSender sender(AvroTopic topic) - throws IOException; + KafkaTopicSender sender(AvroTopic topic) throws IOException; /** * If the sender is no longer connected, try to reconnect. diff --git a/src/main/java/org/radarcns/producer/KafkaTopicSender.java b/src/main/java/org/radarcns/producer/KafkaTopicSender.java index 8530b30a..433f4bc3 100644 --- a/src/main/java/org/radarcns/producer/KafkaTopicSender.java +++ b/src/main/java/org/radarcns/producer/KafkaTopicSender.java @@ -1,39 +1,20 @@ -/* - * Copyright 2017 The Hyve and King's College London - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - package org.radarcns.producer; -import org.radarcns.data.Record; +import org.radarcns.data.RecordData; import java.io.Closeable; import java.io.IOException; -import java.util.List; public interface KafkaTopicSender extends Closeable { /** - * Send a message to Kafka eventually. Given offset must be strictly monotonically increasing - * for subsequent calls. + * Send a message to Kafka eventually. * - * @param offset local offset, monotonically increasing * @param key key of a kafka record to send * @param value value of a kafka record to send * @throws AuthenticationException if the client failed to authenticate itself * @throws IOException if the client could not send a message */ - void send(long offset, K key, V value) throws IOException; + void send(K key, V value) throws IOException; /** * Send a message to Kafka eventually. @@ -45,12 +26,7 @@ public interface KafkaTopicSender extends Closeable { * @throws AuthenticationException if the client failed to authenticate itself * @throws IOException if the client could not send a message */ - void send(List> records) throws IOException; - - /** - * Get the latest offsets actually sent for a given topic. Returns -1L for unknown offsets. - */ - long getLastSentOffset(); + void send(RecordData records) throws IOException; /** * Clears any messages still in cache. diff --git a/src/main/java/org/radarcns/producer/ThreadedKafkaSender.java b/src/main/java/org/radarcns/producer/ThreadedKafkaSender.java deleted file mode 100644 index 81ce3ef8..00000000 --- a/src/main/java/org/radarcns/producer/ThreadedKafkaSender.java +++ /dev/null @@ -1,274 +0,0 @@ -/* - * Copyright 2017 The Hyve and King's College London - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.radarcns.producer; - -import org.radarcns.data.Record; -import org.radarcns.producer.rest.ConnectionState; -import org.radarcns.topic.AvroTopic; -import org.radarcns.util.RollingTimeAverage; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; - -/** - * Send Avro Records to a Kafka REST Proxy. This queues messages for a specified amount of time - * and then sends all messages up to that time. - */ -public class ThreadedKafkaSender implements KafkaSender { - private static final Logger logger = LoggerFactory.getLogger(ThreadedKafkaSender.class); - private static final int RETRIES = 3; - private static final long HEARTBEAT_TIMEOUT_MILLIS = 60_000L; - private static final long HEARTBEAT_TIMEOUT_MARGIN = HEARTBEAT_TIMEOUT_MILLIS + 10_000L; - - private final KafkaSender wrappedSender; - private final ScheduledExecutorService executor; - private final RollingTimeAverage opsSent; - private final RollingTimeAverage opsRequests; - private final ConnectionState state; - - /** - * Create a REST producer that caches some values - * - * @param sender Actual KafkaSender - */ - public ThreadedKafkaSender(KafkaSender sender) { - this.wrappedSender = sender; - this.executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { - @Override - public Thread newThread( Runnable r) { - return new Thread(r, "Kafka REST Producer"); - } - }); - opsSent = new RollingTimeAverage(20000L); - opsRequests = new RollingTimeAverage(20000L); - state = new ConnectionState(HEARTBEAT_TIMEOUT_MARGIN, TimeUnit.MILLISECONDS); - - executor.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - opsRequests.add(1); - - try { - boolean success = sendHeartbeat(); - if (success) { - state.didConnect(); - } else { - logger.error("Failed to send message"); - state.didDisconnect(); - } - } catch (AuthenticationException ex) { - logger.error("Unauthorized"); - state.wasUnauthorized(); - } - - if (opsSent.hasAverage() && opsRequests.hasAverage()) { - logger.info("Sending {} messages in {} requests per second", - (int) Math.round(opsSent.getAverage()), - (int) Math.round(opsRequests.getAverage())); - } - } - }, 0L, HEARTBEAT_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); - } - - private class ThreadedTopicSender - implements KafkaTopicSender, Runnable { - private final KafkaTopicSender topicSender; - private final List>> topicQueue; - private final List>> threadLocalQueue; - private Future topicFuture; - - private ThreadedTopicSender(AvroTopic topic) throws IOException { - topicSender = wrappedSender.sender(topic); - topicQueue = new ArrayList<>(); - threadLocalQueue = new ArrayList<>(); - topicFuture = null; - } - - /** - * Send given key and record to a topic. - * @param key key - * @param value value with schema - * @throws IOException if the producer is not connected. - */ - @Override - public void send(long offset, L key, W value) throws IOException { - List> recordList = new ArrayList<>(1); - recordList.add(new Record<>(offset, key, value)); - send(recordList); - } - - @Override - public synchronized void send(List> records) throws IOException { - if (records.isEmpty()) { - return; - } - if (!isConnected()) { - throw new IOException("Producer is not connected"); - } - synchronized (this) { - topicQueue.add(records); - if (topicFuture == null) { - topicFuture = executor.submit(this); - } - } - notifyAll(); - } - - @Override - public void clear() { - synchronized (this) { - topicFuture.cancel(false); - topicFuture = null; - topicQueue.clear(); - } - topicSender.clear(); - } - - - @Override - public long getLastSentOffset() { - return this.topicSender.getLastSentOffset(); - } - - @Override - public void flush() throws IOException { - Future localFuture = null; - synchronized (this) { - if (topicFuture != null) { - localFuture = topicFuture; - } - } - if (localFuture != null) { - try { - localFuture.wait(); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - } - topicSender.flush(); - } - - @Override - public void close() throws IOException { - flush(); - topicSender.close(); - } - - @Override - public void run() { - synchronized (this) { - threadLocalQueue.addAll(topicQueue); - topicQueue.clear(); - topicFuture = null; - } - - opsRequests.add(1); - - for (List> records : threadLocalQueue) { - opsSent.add(records.size()); - - IOException exception = null; - for (int i = 0; i < RETRIES; i++) { - try { - topicSender.send(records); - break; - } catch (IOException ex) { - exception = ex; - } - } - - if (exception == null) { - state.didConnect(); - } else if (exception instanceof AuthenticationException) { - logger.error("Authentication failed"); - state.wasUnauthorized(); - break; - } else { - logger.error("Failed to send message"); - state.didDisconnect(); - break; - } - } - - threadLocalQueue.clear(); - } - } - - private boolean sendHeartbeat() throws AuthenticationException { - boolean success = false; - for (int i = 0; !success && i < RETRIES; i++) { - success = wrappedSender.resetConnection(); - } - return success; - } - - @Override - public synchronized boolean isConnected() throws AuthenticationException { - switch (state.getState()) { - case CONNECTED: - return true; - case DISCONNECTED: - return false; - case UNAUTHORIZED: - throw new AuthenticationException("Authorization invalid"); - case UNKNOWN: - state.didDisconnect(); - return false; - default: - throw new IllegalStateException("Illegal connection state"); - } - } - - @Override - public boolean resetConnection() throws AuthenticationException { - if (isConnected()) { - return true; - } - - try { - if (wrappedSender.resetConnection()) { - state.didConnect(); - return true; - } else { - state.didDisconnect(); - return false; - } - } catch (AuthenticationException ex) { - state.wasUnauthorized(); - throw ex; - } - } - - @Override - public KafkaTopicSender sender(AvroTopic topic) - throws IOException { - return new ThreadedTopicSender<>(topic); - } - - @Override - public void close() throws IOException { - executor.shutdown(); - } -} diff --git a/src/main/java/org/radarcns/producer/direct/DirectSender.java b/src/main/java/org/radarcns/producer/direct/DirectSender.java index 58e14413..30757d92 100644 --- a/src/main/java/org/radarcns/producer/direct/DirectSender.java +++ b/src/main/java/org/radarcns/producer/direct/DirectSender.java @@ -16,28 +16,29 @@ package org.radarcns.producer.direct; -import java.io.IOException; -import java.util.List; -import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.radarcns.data.Record; +import org.radarcns.data.RecordData; import org.radarcns.producer.KafkaSender; import org.radarcns.producer.KafkaTopicSender; import org.radarcns.topic.AvroTopic; +import java.io.IOException; +import java.util.Properties; + /** * Directly sends a message to Kafka using a KafkaProducer */ -public class DirectSender implements KafkaSender { - private final KafkaProducer producer; +public class DirectSender implements KafkaSender { + private final KafkaProducer producer; public DirectSender(Properties properties) { - producer = new KafkaProducer<>(properties); + producer = new KafkaProducer(properties); } @Override - public KafkaTopicSender sender(final AvroTopic topic) + public KafkaTopicSender sender(final AvroTopic topic) throws IOException { return new DirectTopicSender<>(topic); } @@ -58,32 +59,26 @@ public void close() { producer.close(); } - private class DirectTopicSender implements KafkaTopicSender { - private long lastOffset = -1L; - private final AvroTopic topic; + @SuppressWarnings("unchecked") + private class DirectTopicSender implements KafkaTopicSender { + private final String name; - private DirectTopicSender(AvroTopic topic) { - this.topic = topic; + private DirectTopicSender(AvroTopic topic) { + name = topic.getName(); } @Override - public void send(long offset, L key, W value) throws IOException { - producer.send(new ProducerRecord<>(topic.getName(), (K)key, (V)value)); - - lastOffset = offset; + public void send(K key, V value) throws IOException { + producer.send(new ProducerRecord<>(name, key, value)); + producer.flush(); } @Override - public void send(List> records) throws IOException { - for (Record record : records) { - producer.send(new ProducerRecord(topic.getName(), record.key, record.value)); + public void send(RecordData records) throws IOException { + for (Record record : records) { + producer.send(new ProducerRecord<>(name, record.key, record.value)); } - lastOffset = records.get(records.size() - 1).offset; - } - - @Override - public long getLastSentOffset() { - return lastOffset; + producer.flush(); } @Override @@ -93,12 +88,12 @@ public void clear() { @Override public void flush() throws IOException { - producer.flush(); + // noop } @Override public void close() throws IOException { - producer.flush(); + // noop } } } diff --git a/src/main/java/org/radarcns/producer/rest/RestSender.java b/src/main/java/org/radarcns/producer/rest/RestSender.java index 8ff9f21f..53d96ea9 100644 --- a/src/main/java/org/radarcns/producer/rest/RestSender.java +++ b/src/main/java/org/radarcns/producer/rest/RestSender.java @@ -17,19 +17,13 @@ package org.radarcns.producer.rest; import okhttp3.Headers; -import okhttp3.HttpUrl; import okhttp3.MediaType; import okhttp3.Request; import okhttp3.Response; -import org.apache.avro.Schema; import org.radarcns.config.ServerConfig; -import org.radarcns.data.AvroEncoder; -import org.radarcns.data.Record; import org.radarcns.producer.AuthenticationException; -import org.radarcns.producer.BatchedKafkaSender; import org.radarcns.producer.KafkaSender; import org.radarcns.producer.KafkaTopicSender; -import org.radarcns.producer.ThreadedKafkaSender; import org.radarcns.producer.rest.ConnectionState.State; import org.radarcns.topic.AvroTopic; import org.slf4j.Logger; @@ -37,30 +31,18 @@ import java.io.IOException; import java.net.MalformedURLException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; import java.util.Objects; import java.util.concurrent.TimeUnit; import static org.radarcns.producer.rest.RestClient.responseBody; -import static org.radarcns.producer.rest.TopicRequestBody.topicRequestContent; /** * RestSender sends records to the Kafka REST Proxy. It does so using an Avro JSON encoding. A new * sender must be constructed with {@link #sender(AvroTopic)} per AvroTopic. This implementation is - * blocking and unbuffered, so flush, clear and close do not do anything. To get a non-blocking - * sender, wrap this in a {@link ThreadedKafkaSender}, for a buffered sender, wrap it in a - * {@link BatchedKafkaSender}. - * - * @param base key class - * @param base value class + * blocking and unbuffered, so flush, clear and close do not do anything. */ -@SuppressWarnings("PMD.GodClass") -public class RestSender implements KafkaSender { +public class RestSender implements KafkaSender { private static final Logger logger = LoggerFactory.getLogger(RestSender.class); - private static final int LOG_CONTENT_LENGTH = 1024; public static final String KAFKA_REST_ACCEPT_ENCODING = "application/vnd.kafka.v2+json, application/vnd.kafka+json, application/json"; @@ -70,42 +52,27 @@ public class RestSender implements KafkaSender { MediaType.parse("application/vnd.kafka.avro.v2+json; charset=utf-8"); public static final MediaType KAFKA_REST_AVRO_LEGACY_ENCODING = MediaType.parse("application/vnd.kafka.avro.v1+json; charset=utf-8"); + private RequestProperties requestProperties; - private final AvroEncoder keyEncoder; - private final AvroEncoder valueEncoder; - - private HttpUrl schemalessKeyUrl; - private HttpUrl schemalessValueUrl; private Request.Builder isConnectedRequest; private SchemaRetriever schemaRetriever; private RestClient httpClient; - private String acceptType; - private MediaType contentType; - private boolean useCompression; private final ConnectionState state; - private Headers additionalHeaders; /** * Construct a RestSender. * @param httpClient client to send requests with * @param schemaRetriever non-null Retriever of avro schemas - * @param keyEncoder non-null Avro encoder for keys - * @param valueEncoder non-null Avro encoder for values * @param useCompression use compression to send data * @param sharedState shared connection state * @param additionalHeaders headers to add to requests */ private RestSender(RestClient httpClient, SchemaRetriever schemaRetriever, - AvroEncoder keyEncoder, AvroEncoder valueEncoder, boolean useCompression, - ConnectionState sharedState, Headers additionalHeaders) { + boolean useCompression, ConnectionState sharedState, Headers additionalHeaders) { this.schemaRetriever = schemaRetriever; - this.keyEncoder = keyEncoder; - this.valueEncoder = valueEncoder; - this.useCompression = useCompression; - this.acceptType = KAFKA_REST_ACCEPT_ENCODING; - this.contentType = KAFKA_REST_AVRO_ENCODING; + this.requestProperties = new RequestProperties(KAFKA_REST_ACCEPT_ENCODING, + KAFKA_REST_AVRO_ENCODING, useCompression, additionalHeaders); this.state = sharedState; - this.additionalHeaders = additionalHeaders; setRestClient(httpClient); } @@ -132,8 +99,6 @@ public synchronized void setKafkaConfig(ServerConfig kafkaConfig) { private void setRestClient(RestClient newClient) { try { - schemalessKeyUrl = newClient.getRelativeUrl("topics/schemaless-key"); - schemalessValueUrl = newClient.getRelativeUrl("topics/schemaless-value"); isConnectedRequest = newClient.requestBuilder("").head(); } catch (MalformedURLException ex) { throw new IllegalArgumentException("Schemaless topics do not have a valid URL", ex); @@ -146,224 +111,40 @@ public final synchronized void setSchemaRetriever(SchemaRetriever retriever) { this.schemaRetriever = retriever; } - private synchronized RestClient getRestClient() { + public synchronized RestClient getRestClient() { return httpClient; } - private synchronized SchemaRetriever getSchemaRetriever() { + public synchronized SchemaRetriever getSchemaRetriever() { return this.schemaRetriever; } - private synchronized HttpUrl getSchemalessValueUrl() { - return schemalessValueUrl; - } - - private synchronized HttpUrl getSchemalessKeyUrl() { - return schemalessKeyUrl; - } - private synchronized Request getIsConnectedRequest() { - return isConnectedRequest.headers(additionalHeaders).build(); + return isConnectedRequest.headers(requestProperties.headers).build(); } public synchronized void setCompression(boolean useCompression) { - this.useCompression = useCompression; - } - - private synchronized boolean hasCompression() { - return this.useCompression; + this.requestProperties = new RequestProperties(requestProperties.acceptType, + requestProperties.contentType, useCompression, requestProperties.headers); } public synchronized Headers getHeaders() { - return additionalHeaders; + return requestProperties.headers; } public synchronized void setHeaders(Headers additionalHeaders) { - this.additionalHeaders = additionalHeaders; + this.requestProperties = new RequestProperties(requestProperties.acceptType, + requestProperties.contentType, requestProperties.useCompression, additionalHeaders); this.state.reset(); } - private class RestTopicSender implements KafkaTopicSender { - private long lastOffsetSent = -1L; - private final AvroTopic topic; - private final HttpUrl url; - private final TopicRequestData requestData; - - private RestTopicSender(AvroTopic topic) throws IOException { - this.topic = topic; - url = getRestClient().getRelativeUrl("topics/" + topic.getName()); - requestData = new TopicRequestData<>(topic, keyEncoder, valueEncoder); - } - - @Override - public void send(long offset, L key, W value) throws IOException { - List> records = new ArrayList<>(1); - records.add(new Record<>(offset, key, value)); - send(records); - } - - /** - * Actually make a REST request to the Kafka REST server and Schema Registry. - * - * @param records values to send - * @throws IOException if records could not be sent - */ - @Override - public void send(List> records) throws IOException { - if (records.isEmpty()) { - return; - } - - Request request = buildRequest(records); - - boolean doResend = false; - try (Response response = getRestClient().request(request)) { - // Evaluate the result - if (response.isSuccessful()) { - state.didConnect(); - if (logger.isDebugEnabled()) { - logger.debug("Added message to topic {} -> {}", - topic, responseBody(response)); - } - lastOffsetSent = records.get(records.size() - 1).offset; - } else if (response.code() == 401) { - throw new AuthenticationException("Cannot authenticate"); - } else if (response.code() == 403 || response.code() == 422) { - throw new AuthenticationException("Data does not match authentication"); - } else if (response.code() == 415 - && request.header("Accept").equals(KAFKA_REST_ACCEPT_ENCODING)) { - state.didConnect(); - logger.warn("Latest Avro encoding is not supported. Switching to legacy " - + "encoding."); - synchronized (RestSender.this) { - contentType = KAFKA_REST_AVRO_LEGACY_ENCODING; - acceptType = KAFKA_REST_ACCEPT_LEGACY_ENCODING; - } - doResend = true; - } else { - logFailure(request, response, null); - } - } catch (IOException ex) { - logFailure(request, null, ex); - } finally { - requestData.reset(); - } - - if (doResend) { - send(records); - } - } - - @SuppressWarnings("ConstantConditions") - private void logFailure(Request request, Response response, Exception ex) - throws IOException { - state.didDisconnect(); - String content = response == null ? null : responseBody(response); - int code = response == null ? -1 : response.code(); - String requestContent = topicRequestContent(request); - if (requestContent != null) { - requestContent = requestContent.substring(0, - Math.min(requestContent.length(), LOG_CONTENT_LENGTH)); - } - logger.error("FAILED to transmit message: {} -> {}...", - content, requestContent); - throw new IOException("Failed to submit (HTTP status code " + code - + "): " + content, ex); - } - - private Request buildRequest(List> records) throws IOException { - HttpUrl sendToUrl = updateRequestData(records); - - MediaType currentContentType; - String currentAcceptType; - Headers currentHeaders; - - synchronized (RestSender.this) { - currentContentType = contentType; - currentAcceptType = acceptType; - currentHeaders = additionalHeaders; - } - - TopicRequestBody requestBody; - Request.Builder requestBuilder = new Request.Builder() - .url(sendToUrl) - .headers(currentHeaders) - .addHeader("Accept", currentAcceptType); - - if (hasCompression()) { - requestBody = new GzipTopicRequestBody(requestData, currentContentType); - requestBuilder.addHeader("Content-Encoding", "gzip"); - } else { - requestBody = new TopicRequestBody(requestData, currentContentType); - } - - return requestBuilder.post(requestBody).build(); - } - - private HttpUrl updateRequestData(List> records) { - // Get schema IDs - Schema valueSchema = topic.getValueSchema(); - String sendTopic = topic.getName(); - - HttpUrl sendToUrl = url; - - try { - ParsedSchemaMetadata metadata = getSchemaRetriever() - .getOrSetSchemaMetadata(sendTopic, false, topic.getKeySchema(), -1); - requestData.setKeySchemaId(metadata.getId()); - } catch (IOException ex) { - logger.error("Failed to get schema for key {} of topic {}", - topic.getKeyClass().getName(), topic, ex); - sendToUrl = getSchemalessKeyUrl(); - } - if (requestData.getKeySchemaId() == null) { - requestData.setKeySchemaString(topic.getKeySchema().toString()); - } - - try { - ParsedSchemaMetadata metadata = getSchemaRetriever().getOrSetSchemaMetadata( - sendTopic, true, valueSchema, -1); - requestData.setValueSchemaId(metadata.getId()); - } catch (IOException ex) { - logger.error("Failed to get schema for value {} of topic {}", - topic.getValueClass().getName(), topic, ex); - sendToUrl = getSchemalessValueUrl(); - } - if (requestData.getValueSchemaId() == null) { - requestData.setValueSchemaString(topic.getValueSchema().toString()); - } - requestData.setRecords(records); - - return sendToUrl; - } - - @Override - public long getLastSentOffset() { - return lastOffsetSent; - } - - - @Override - public void clear() { - // noop - } - - @Override - public void flush() { - // noop - } - - @Override - public void close() { - // noop - } - + @Override + public KafkaTopicSender sender(AvroTopic topic) { + return new RestTopicSender<>(topic, this, state); } - @Override - public KafkaTopicSender sender(AvroTopic topic) - throws IOException { - return new RestTopicSender<>(topic); + public synchronized RequestProperties getRequestProperties() { + return requestProperties; } @Override @@ -415,77 +196,64 @@ public void close() { httpClient.close(); } - public static class Builder { + public synchronized void useLegacyEncoding() { + this.requestProperties = new RequestProperties(KAFKA_REST_ACCEPT_LEGACY_ENCODING, + KAFKA_REST_AVRO_LEGACY_ENCODING, requestProperties.useCompression, + requestProperties.headers); + } + + public static class Builder { private ServerConfig kafkaConfig; private SchemaRetriever retriever; - private AvroEncoder keyEncoder; - private AvroEncoder valueEncoder; private boolean compression = false; private long timeout = 10; private ConnectionState state; private ManagedConnectionPool pool; private Headers.Builder additionalHeaders = new Headers.Builder(); - public Builder server(ServerConfig kafkaConfig) { + public Builder server(ServerConfig kafkaConfig) { this.kafkaConfig = kafkaConfig; return this; } - public Builder schemaRetriever(SchemaRetriever schemaRetriever) { + public Builder schemaRetriever(SchemaRetriever schemaRetriever) { this.retriever = schemaRetriever; return this; } - public Builder encoders(AvroEncoder keyEncoder, AvroEncoder valueEncoder) { - this.keyEncoder = keyEncoder; - this.valueEncoder = valueEncoder; - return this; - } - - public Builder useCompression(boolean compression) { + public Builder useCompression(boolean compression) { this.compression = compression; return this; } - public Builder connectionState(ConnectionState state) { + public Builder connectionState(ConnectionState state) { this.state = state; return this; } - public Builder connectionTimeout(long timeout, TimeUnit unit) { + public Builder connectionTimeout(long timeout, TimeUnit unit) { this.timeout = TimeUnit.SECONDS.convert(timeout, unit); return this; } - public Builder connectionPool(ManagedConnectionPool pool) { + public Builder connectionPool(ManagedConnectionPool pool) { this.pool = pool; return this; } - public Builder headers(Headers headers) { + public Builder headers(Headers headers) { additionalHeaders = headers.newBuilder(); return this; } - @Deprecated - public Builder headers(List> headers) { - additionalHeaders = new Headers.Builder(); - for (Entry header : headers) { - additionalHeaders.add(header.getKey(), header.getValue()); - } - return this; - } - - public Builder addHeader(String header, String value) { + public Builder addHeader(String header, String value) { additionalHeaders.add(header, value); return this; } - public RestSender build() { + public RestSender build() { Objects.requireNonNull(kafkaConfig); Objects.requireNonNull(retriever); - Objects.requireNonNull(keyEncoder); - Objects.requireNonNull(valueEncoder); if (timeout <= 0) { throw new IllegalStateException("Connection timeout must be strictly positive"); } @@ -503,9 +271,23 @@ public RestSender build() { usePool = ManagedConnectionPool.GLOBAL_POOL; } - return new RestSender<>(new RestClient(kafkaConfig, timeout, usePool), - retriever, keyEncoder, valueEncoder, compression, useState, - additionalHeaders.build()); + return new RestSender(new RestClient(kafkaConfig, timeout, usePool), + retriever, compression, useState, additionalHeaders.build()); + } + } + + public static final class RequestProperties { + public final String acceptType; + public final MediaType contentType; + public final boolean useCompression; + public final Headers headers; + + RequestProperties(String acceptType, MediaType contentType, boolean useCompression, + Headers headers) { + this.acceptType = acceptType; + this.contentType = contentType; + this.useCompression = useCompression; + this.headers = headers; } } } diff --git a/src/main/java/org/radarcns/producer/rest/RestTopicSender.java b/src/main/java/org/radarcns/producer/rest/RestTopicSender.java new file mode 100644 index 00000000..9f36fbb4 --- /dev/null +++ b/src/main/java/org/radarcns/producer/rest/RestTopicSender.java @@ -0,0 +1,190 @@ +/* + * Copyright 2017 The Hyve and King's College London + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.radarcns.producer.rest; + +import okhttp3.HttpUrl; +import okhttp3.Request; +import okhttp3.Response; +import org.radarcns.data.AvroRecordData; +import org.radarcns.data.Record; +import org.radarcns.data.RecordData; +import org.radarcns.producer.AuthenticationException; +import org.radarcns.producer.KafkaTopicSender; +import org.radarcns.topic.AvroTopic; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.Objects; + +import static org.radarcns.producer.rest.RestClient.responseBody; +import static org.radarcns.producer.rest.RestSender.KAFKA_REST_ACCEPT_ENCODING; +import static org.radarcns.producer.rest.TopicRequestBody.topicRequestContent; + +class RestTopicSender implements KafkaTopicSender { + private static final Logger logger = LoggerFactory.getLogger(RestTopicSender.class); + private static final int LOG_CONTENT_LENGTH = 1024; + + private final AvroTopic topic; + private final TopicRequestData requestData; + private final RestSender sender; + private final ConnectionState state; + + RestTopicSender(AvroTopic topic, RestSender sender, ConnectionState state) { + this.topic = topic; + this.sender = sender; + this.state = state; + this.requestData = new TopicRequestData(); + } + + @Override + public void send(K key, V value) throws IOException { + send(new AvroRecordData<>(topic, Collections.singleton(new Record<>(key, value)))); + } + + /** + * Actually make a REST request to the Kafka REST server and Schema Registry. + * + * @param records values to send + * @throws IOException if records could not be sent + */ + @Override + public void send(RecordData records) throws IOException { + if (records.isEmpty()) { + return; + } + + RestClient restClient; + RestSender.RequestProperties requestProperties; + synchronized (sender) { + restClient = sender.getRestClient(); + requestProperties = sender.getRequestProperties(); + } + Request request = buildRequest(restClient, requestProperties, records); + + boolean doResend = false; + try (Response response = restClient.request(request)) { + if (response.isSuccessful()) { + state.didConnect(); + if (logger.isDebugEnabled()) { + logger.debug("Added message to topic {} -> {}", + topic, responseBody(response)); + } + } else if (response.code() == 401 || response.code() == 403 || response.code() == 422) { + state.wasUnauthorized(); + } else if (response.code() == 415 + && Objects.equals(request.header("Accept"), KAFKA_REST_ACCEPT_ENCODING)) { + state.didConnect(); + logger.warn("Latest Avro encoding is not supported. Switching to legacy " + + "encoding."); + sender.useLegacyEncoding(); + doResend = true; + } else { + logFailure(request, response, null); + } + } catch (IOException ex) { + logFailure(request, null, ex); + } finally { + requestData.reset(); + } + + if (state.getState() == ConnectionState.State.UNAUTHORIZED) { + throw new AuthenticationException("Request unauthorized"); + } + + if (doResend) { + send(records); + } + } + + private Request buildRequest(RestClient restClient, RestSender.RequestProperties properties, + RecordData records) + throws IOException { + HttpUrl sendToUrl = updateRequestData(restClient, records); + + TopicRequestBody requestBody; + Request.Builder requestBuilder = new Request.Builder() + .url(sendToUrl) + .headers(properties.headers) + .header("Accept", properties.acceptType); + + if (properties.useCompression) { + requestBody = new GzipTopicRequestBody(requestData, properties.contentType); + requestBuilder.addHeader("Content-Encoding", "gzip"); + } else { + requestBody = new TopicRequestBody(requestData, properties.contentType); + } + + return requestBuilder.post(requestBody).build(); + } + + private HttpUrl updateRequestData(RestClient restClient, RecordData records) + throws IOException { + // Get schema IDs + String sendTopic = topic.getName(); + + SchemaRetriever retriever = sender.getSchemaRetriever(); + try { + ParsedSchemaMetadata metadata = retriever.getOrSetSchemaMetadata( + sendTopic, false, topic.getKeySchema(), -1); + requestData.setKeySchemaId(metadata.getId()); + + metadata = retriever.getOrSetSchemaMetadata( + sendTopic, true, topic.getValueSchema(), -1); + requestData.setValueSchemaId(metadata.getId()); + } catch (IOException ex) { + throw new IOException("Failed to get schemas for topic " + topic, ex); + } + + requestData.setRecords(records); + + return restClient.getRelativeUrl("topics/" + sendTopic); + } + + @SuppressWarnings("ConstantConditions") + private void logFailure(Request request, Response response, Exception ex) + throws IOException { + state.didDisconnect(); + String content = response == null ? null : responseBody(response); + int code = response == null ? -1 : response.code(); + String requestContent = topicRequestContent(request); + if (requestContent != null) { + requestContent = requestContent.substring(0, + Math.min(requestContent.length(), LOG_CONTENT_LENGTH)); + } + logger.error("FAILED to transmit message: {} -> {}...", + content, requestContent); + throw new IOException("Failed to submit (HTTP status code " + code + + "): " + content, ex); + } + + @Override + public void clear() { + // nothing + } + + @Override + public void flush() throws IOException { + // nothing + } + + @Override + public void close() { + // noop + } +} diff --git a/src/main/java/org/radarcns/producer/rest/SchemaRetriever.java b/src/main/java/org/radarcns/producer/rest/SchemaRetriever.java index a5f29e12..9212b5ab 100644 --- a/src/main/java/org/radarcns/producer/rest/SchemaRetriever.java +++ b/src/main/java/org/radarcns/producer/rest/SchemaRetriever.java @@ -36,6 +36,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import static org.radarcns.util.Strings.utf8; + /** Retriever of an Avro Schema. * * Internally, only {@link JSONObject} is used to manage JSON data, to keep the class as lean as @@ -47,6 +49,7 @@ public class SchemaRetriever implements Closeable { "application/vnd.schemaregistry.v1+json; charset=utf-8"); private static final Schema NULL_SCHEMA = Schema.create(Type.NULL); private static final Map, Schema> PRIMITIVE_SCHEMAS = new HashMap<>(); + private static final byte[] SCHEMA = utf8("{\"schema\":"); static { PRIMITIVE_SCHEMAS.put(Long.class, Schema.create(Type.LONG)); @@ -85,7 +88,7 @@ protected static String subject(String topic, boolean ofValue) { return topic + (ofValue ? "-value" : "-key"); } - /** Retrieve schema metadata */ + /** Retrieve schema metadata from server. */ protected ParsedSchemaMetadata retrieveSchemaMetadata(String subject, int version) throws IOException { String path = "/subjects/" + subject + "/versions/"; @@ -107,6 +110,7 @@ protected ParsedSchemaMetadata retrieveSchemaMetadata(String subject, int versio return new ParsedSchemaMetadata(schemaId, newVersion, schema); } + /** Get schema metadata. Cached schema metadata will be used if present. */ public ParsedSchemaMetadata getSchemaMetadata(String topic, boolean ofValue, int version) throws IOException { String subject = subject(topic, ofValue); @@ -192,9 +196,9 @@ public MediaType contentType() { @Override public void writeTo(BufferedSink sink) throws IOException { - sink.writeUtf8("{\"schema\":"); + sink.write(SCHEMA); sink.writeUtf8(JSONObject.quote(schema.toString())); - sink.writeUtf8("}"); + sink.writeByte('}'); } } diff --git a/src/main/java/org/radarcns/producer/rest/TopicRequestData.java b/src/main/java/org/radarcns/producer/rest/TopicRequestData.java index 3b3b719b..ada3e3aa 100644 --- a/src/main/java/org/radarcns/producer/rest/TopicRequestData.java +++ b/src/main/java/org/radarcns/producer/rest/TopicRequestData.java @@ -17,118 +17,92 @@ package org.radarcns.producer.rest; import org.json.JSONObject; -import org.radarcns.data.AvroEncoder; -import org.radarcns.data.Record; -import org.radarcns.topic.AvroTopic; +import org.radarcns.data.RecordData; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; -import java.io.OutputStreamWriter; -import java.io.Writer; -import java.util.List; +import java.util.Iterator; + +import static org.radarcns.util.Strings.utf8; /** * Request data to submit records to the Kafka REST proxy. */ -class TopicRequestData { - private final AvroEncoder.AvroWriter keyWriter; - private final AvroEncoder.AvroWriter valueWriter; +class TopicRequestData { + private static final byte[] KEY_SCHEMA_ID = utf8("\"key_schema_id\":"); + private static final byte[] VALUE_SCHEMA_ID = utf8(",\"value_schema_id\":"); + private static final byte[] RECORDS = utf8(",\"records\":["); + private static final byte[] KEY = utf8("{\"key\":"); + private static final byte[] VALUE = utf8(",\"value\":"); + private static final byte[] END = utf8("]}"); + + private final byte[] buffer; - private Integer keySchemaId; - private Integer valueSchemaId; - private String keySchemaString; - private String valueSchemaString; + private int keySchemaId; + private int valueSchemaId; - private List> records; + private RecordData records; - TopicRequestData(AvroTopic topic, AvroEncoder keyEncoder, AvroEncoder valueEncoder) - throws IOException { - keyWriter = keyEncoder.writer(topic.getKeySchema(), topic.getKeyClass()); - valueWriter = valueEncoder.writer(topic.getValueSchema(), topic.getValueClass()); + TopicRequestData() { + buffer = new byte[1024]; } /** * Writes the current topic to a stream. This implementation does not use any JSON writers to - * write the data, but writes it directly to a stream. {@link JSONObject#quote(String, Writer)} + * write the data, but writes it directly to a stream. {@link JSONObject#quote(String)} * is used to get the correct formatting. This makes the method as lean as possible. * @param out OutputStream to write to. It is assumed to be buffered. * @throws IOException if a superimposing stream could not be created */ void writeToStream(OutputStream out) throws IOException { - try (OutputStreamWriter writer = new OutputStreamWriter(out)) { - writer.append('{'); - if (keySchemaId != null) { - writer.append("\"key_schema_id\":").append(keySchemaId.toString()); - } else { - writer.append("\"key_schema\":"); - writer.append(JSONObject.quote(keySchemaString)); - } - if (valueSchemaId != null) { - writer.append(",\"value_schema_id\":").append(valueSchemaId.toString()); + out.write('{'); + out.write(KEY_SCHEMA_ID); + out.write(utf8(String.valueOf(keySchemaId))); + out.write(VALUE_SCHEMA_ID); + out.write(utf8(String.valueOf(valueSchemaId))); + + out.write(RECORDS); + + boolean first = true; + Iterator iterator = records.rawIterator(); + while (iterator.hasNext()) { + if (first) { + first = false; } else { - writer.append(",\"value_schema\":"); - writer.append(JSONObject.quote(valueSchemaString)); + out.write(','); } - writer.append(",\"records\":["); - - for (int i = 0; i < records.size(); i++) { - Record record = records.get(i); - - if (i == 0) { - writer.append("{\"key\":"); - } else { - writer.append(",{\"key\":"); - } - - // flush writer and write raw bytes to underlying stream - // flush so the data do not overlap. - writer.flush(); - out.write(keyWriter.encode(record.key)); - - writer.append(",\"value\":"); - // flush writer and write raw bytes to underlying stream - // flush so the data do not overlap. - writer.flush(); - out.write(valueWriter.encode(record.value)); - writer.append('}'); - } - writer.append("]}"); + out.write(KEY); + copyStream(iterator.next(), out); + + out.write(VALUE); + copyStream(iterator.next(), out); + out.write('}'); } + out.write(END); } void reset() { - keySchemaId = null; - keySchemaString = null; - valueSchemaId = null; - valueSchemaString = null; records = null; } - void setKeySchemaId(Integer keySchemaId) { + void setKeySchemaId(int keySchemaId) { this.keySchemaId = keySchemaId; } - void setValueSchemaId(Integer valueSchemaId) { + void setValueSchemaId(int valueSchemaId) { this.valueSchemaId = valueSchemaId; } - void setKeySchemaString(String keySchemaString) { - this.keySchemaString = keySchemaString; - } - - void setValueSchemaString(String valueSchemaString) { - this.valueSchemaString = valueSchemaString; - } - - void setRecords(List> records) { + void setRecords(RecordData records) { this.records = records; } - Integer getKeySchemaId() { - return keySchemaId; - } - - Integer getValueSchemaId() { - return valueSchemaId; + private void copyStream(InputStream in, OutputStream out) throws IOException { + int len = in.read(buffer); + while (len != -1) { + out.write(buffer, 0, len); + len = in.read(buffer); + } } } diff --git a/src/main/java/org/radarcns/util/Strings.java b/src/main/java/org/radarcns/util/Strings.java index 976ef541..f2490a36 100644 --- a/src/main/java/org/radarcns/util/Strings.java +++ b/src/main/java/org/radarcns/util/Strings.java @@ -16,6 +16,7 @@ package org.radarcns.util; +import java.nio.charset.Charset; import java.util.Collection; import java.util.Iterator; import java.util.regex.Pattern; @@ -24,6 +25,7 @@ * String utilities. */ public final class Strings { + private static final Charset UTF_8 = Charset.forName("UTF-8"); private Strings() { // utility class @@ -65,6 +67,10 @@ public static boolean findAny(Pattern[] patterns, CharSequence value) { return false; } + public static byte[] utf8(String value) { + return value.getBytes(UTF_8); + } + /** Whether given value is null or empty. */ public static boolean isNullOrEmpty(String value) { return value == null || value.isEmpty(); diff --git a/src/test/java/org/radarcns/data/SpecificRecordEncoderTest.java b/src/test/java/org/radarcns/data/SpecificRecordEncoderTest.java index 8139aabd..d5dd5db5 100644 --- a/src/test/java/org/radarcns/data/SpecificRecordEncoderTest.java +++ b/src/test/java/org/radarcns/data/SpecificRecordEncoderTest.java @@ -18,14 +18,23 @@ import junit.framework.TestCase; +import org.radarcns.passive.empatica.EmpaticaE4Acceleration; import org.radarcns.passive.empatica.EmpaticaE4BloodVolumePulse; import org.radarcns.kafka.ObservationKey; import java.io.IOException; import java.util.Arrays; +import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; + +import org.radarcns.passive.phone.PhoneAcceleration; import org.radarcns.topic.AvroTopic; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class SpecificRecordEncoderTest extends TestCase { + private static final Logger logger = LoggerFactory.getLogger(SpecificRecordEncoderTest.class); + public void testJson() throws IOException { SpecificRecordEncoder encoder = new SpecificRecordEncoder(false); AvroTopic topic = new AvroTopic<>("keeeeys", ObservationKey.getClassSchema(), EmpaticaE4BloodVolumePulse.getClassSchema(), ObservationKey.class, EmpaticaE4BloodVolumePulse.class); @@ -67,4 +76,36 @@ public static String byteArrayToHex(byte[] a) { sb.append(String.format("%02x", b & 0xff)); return sb.toString(); } + + public void testSize() throws IOException { + int n = 100; + + ThreadLocalRandom random = ThreadLocalRandom.current(); + AvroTopic topic = new AvroTopic<>("testie", ObservationKey.getClassSchema(), PhoneAcceleration.getClassSchema(), ObservationKey.class, PhoneAcceleration.class); + ObservationKey key = new ObservationKey("my project", UUID.randomUUID().toString(), UUID.randomUUID().toString()); + double now = System.currentTimeMillis() / 1000d; + + SpecificRecordEncoder binEncoder = new SpecificRecordEncoder(true); + AvroEncoder.AvroWriter binKeyEncoder = binEncoder.writer(topic.getKeySchema(), topic.getKeyClass()); + AvroEncoder.AvroWriter binValueEncoder = binEncoder.writer(topic.getValueSchema(), topic.getValueClass()); + + int binaryLength = n * binKeyEncoder.encode(key).length; + for (int i = 0; i < 100; i++) { + binaryLength += binValueEncoder.encode(new PhoneAcceleration(now, now, random.nextFloat(), random.nextFloat(), random.nextFloat())).length; + now += 0.001; + } + + SpecificRecordEncoder encoder = new SpecificRecordEncoder(false); + AvroEncoder.AvroWriter keyEncoder = encoder.writer(topic.getKeySchema(), topic.getKeyClass()); + AvroEncoder.AvroWriter valueEncoder = encoder.writer(topic.getValueSchema(), topic.getValueClass()); + + int normalLength = n * (keyEncoder.encode(key).length + "{\"key\":".length()); + for (int i = 0; i < 100; i++) { + normalLength += ",\"value\":},".length(); + normalLength += valueEncoder.encode(new PhoneAcceleration(now, now, random.nextFloat(), random.nextFloat(), random.nextFloat())).length; + now += 0.001; + } + logger.info("Binary length: {}. Normal length: {}", binaryLength, normalLength); + assertTrue(binaryLength < normalLength); + } } diff --git a/src/test/java/org/radarcns/producer/rest/RestSenderTest.java b/src/test/java/org/radarcns/producer/rest/RestSenderTest.java index 53f19f73..b8d2c194 100644 --- a/src/test/java/org/radarcns/producer/rest/RestSenderTest.java +++ b/src/test/java/org/radarcns/producer/rest/RestSenderTest.java @@ -25,13 +25,12 @@ import okhttp3.mockwebserver.MockWebServer; import okhttp3.mockwebserver.RecordedRequest; import org.apache.avro.Schema; -import org.apache.avro.specific.SpecificRecord; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.radarcns.config.ServerConfig; +import org.radarcns.data.AvroRecordData; import org.radarcns.data.Record; -import org.radarcns.data.SpecificRecordEncoder; import org.radarcns.kafka.ObservationKey; import org.radarcns.passive.phone.PhoneLight; import org.radarcns.producer.AuthenticationException; @@ -43,12 +42,18 @@ import java.util.Arrays; import java.util.zip.GZIPInputStream; -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class RestSenderTest { private SchemaRetriever retriever; - private RestSender sender; + private RestSender sender; @Rule public MockWebServer webServer = new MockWebServer(); @@ -56,13 +61,11 @@ public class RestSenderTest { @Before public void setUp() { this.retriever = mock(SchemaRetriever.class); - SpecificRecordEncoder encoder = new SpecificRecordEncoder(false); ServerConfig config = new ServerConfig(webServer.url("/").url()); - this.sender = new RestSender.Builder() + this.sender = new RestSender.Builder() .server(config) .schemaRetriever(retriever) - .encoders(encoder, encoder) .connectionPool(new ManagedConnectionPool()) .build(); } @@ -96,7 +99,7 @@ public void sender() throws Exception { .setHeader("Content-Type", "application/json; charset=utf-8") .setBody("{\"offset\": 100}")); - topicSender.send(1, key, value); + topicSender.send(key, value); verify(retriever, times(1)) .getOrSetSchemaMetadata("test", false, keySchema, -1); @@ -141,9 +144,9 @@ public void sendTwo() throws Exception { .setHeader("Content-Type", "application/json; charset=utf-8") .setBody("{\"offset\": 100}")); - topicSender.send(Arrays.asList( - new Record<>(1, key, value), - new Record<>(2, key, value))); + topicSender.send(new AvroRecordData<>(topic, Arrays.asList( + new Record<>(key, value), + new Record<>(key, value)))); verify(retriever, times(1)) .getOrSetSchemaMetadata("test", false, keySchema, -1); @@ -244,7 +247,7 @@ public void withCompression() throws IOException, InterruptedException { .getOrSetSchemaMetadata("test", true, valueSchema, -1)) .thenReturn(valueSchemaMetadata); - topicSender.send(1, key, value); + topicSender.send(key, value); RecordedRequest request = webServer.takeRequest(); assertEquals("gzip", request.getHeader("Content-Encoding"));