Skip to content

Commit

Permalink
Merge pull request #29 from RADAR-CNS/v0.5_release
Browse files Browse the repository at this point in the history
V0.5 release
  • Loading branch information
blootsvoets authored Aug 31, 2017
2 parents 97144de + e88d15c commit ac8631f
Show file tree
Hide file tree
Showing 25 changed files with 116 additions and 420 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ repositories {
}
dependencies {
compile group: 'org.radarcns', name: 'radar-commons', version: '0.4.2'
compile group: 'org.radarcns', name: 'radar-commons', version: '0.5'
}
```

Expand All @@ -26,7 +26,7 @@ repositories {
}
dependencies {
testCompile group: 'org.radarcns', name: 'radar-commons-testing', version: '0.4.2'
testCompile group: 'org.radarcns', name: 'radar-commons-testing', version: '0.5'
}
```

Expand All @@ -51,7 +51,7 @@ configurations.all {
}
dependencies {
compile group: 'org.radarcns', name: 'radar-commons', version: '0.5-SNAPSHOT', changing: true
compile group: 'org.radarcns', name: 'radar-commons', version: '0.5.1-SNAPSHOT', changing: true
}
```

Expand Down
18 changes: 4 additions & 14 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ allprojects {
// Configuration //
//---------------------------------------------------------------------------//

version = '0.4.2'
version = '0.5'
group = 'org.radarcns'
ext.githubRepoName = 'RADAR-CNS/RADAR-Commons'

Expand Down Expand Up @@ -217,24 +217,14 @@ dependencies {
// Direct producer uses KafkaAvroSerializer if initialized
testImplementation (group: 'io.confluent', name: 'kafka-avro-serializer', version: confluentVersion) {
exclude group: 'com.101tec'
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
}
testImplementation group: 'org.radarcns', name: 'radar-schemas-commons', version: radarSchemasVersion
testImplementation group: 'junit', name: 'junit', version: junitVersion
testImplementation group: 'org.mockito', name: 'mockito-core', version: mockitoVersion
testImplementation group: 'org.hamcrest', name: 'hamcrest-all', version: hamcrestVersion
testImplementation group: 'com.squareup.okhttp3', name: 'mockwebserver', version: okhttpVersion

// For Topic name validation based on Kafka classes
testImplementation (group: 'org.apache.kafka', name: 'kafka_2.11', version: kafkaVersion) {
exclude group: 'org.apache.kafka', module: 'kafka-clients'
exclude group: 'net.sf.jopt-simple'
exclude group: 'com.yammer.metrics'
exclude group: 'org.scala-lang.modules'
exclude group: 'org.slf4j'
exclude group: 'com.101tec'
exclude group: 'org.apache.zookeeper'
}

codacy group: 'com.github.codacy', name: 'codacy-coverage-reporter', version: codacyVersion
}

Expand Down Expand Up @@ -369,6 +359,6 @@ artifactoryPublish {
}

task wrapper(type: Wrapper) {
gradleVersion = '3.4.1'
distributionUrl distributionUrl.replace("bin", "all")
gradleVersion = '4.1'
distributionType 'all'
}
Binary file modified gradle/wrapper/gradle-wrapper.jar
Binary file not shown.
3 changes: 1 addition & 2 deletions gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#Fri Mar 17 09:52:45 CET 2017
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-3.4.1-all.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-4.1-all.zip
6 changes: 3 additions & 3 deletions gradlew
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ DEFAULT_JVM_OPTS=""
# Use the maximum available, or set MAX_FD != -1 to use that value.
MAX_FD="maximum"

warn ( ) {
warn () {
echo "$*"
}

die ( ) {
die () {
echo
echo "$*"
echo
Expand Down Expand Up @@ -155,7 +155,7 @@ if $cygwin ; then
fi

# Escape application args
save ( ) {
save () {
for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done
echo " "
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,10 @@
* limitations under the License.
*/

package org.radarcns.producer.rest;
package org.radarcns.producer;

import org.radarcns.data.Record;
import org.radarcns.topic.AvroTopic;
import org.radarcns.producer.KafkaSender;
import org.radarcns.producer.KafkaTopicSender;

import java.io.IOException;
import java.util.ArrayList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@
* limitations under the License.
*/

package org.radarcns.producer.rest;
package org.radarcns.producer;

import org.radarcns.data.Record;
import org.radarcns.producer.KafkaSender;
import org.radarcns.producer.KafkaTopicSender;
import org.radarcns.producer.rest.ConnectionState;
import org.radarcns.topic.AvroTopic;
import org.radarcns.util.RollingTimeAverage;
import org.slf4j.Logger;
Expand All @@ -34,9 +33,8 @@
import java.util.concurrent.TimeUnit;

/**
* Send Avro Records to a Kafka REST Proxy.
*
* This queues messages for a specified amount of time and then sends all messages up to that time.
* Send Avro Records to a Kafka REST Proxy. This queues messages for a specified amount of time
* and then sends all messages up to that time.
*/
public class ThreadedKafkaSender<K, V> implements KafkaSender<K, V> {
private static final Logger logger = LoggerFactory.getLogger(ThreadedKafkaSender.class);
Expand Down
75 changes: 35 additions & 40 deletions src/main/java/org/radarcns/producer/rest/RestSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,7 @@
package org.radarcns.producer.rest;

import com.fasterxml.jackson.core.JsonFactory;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import okhttp3.Headers;
import okhttp3.HttpUrl;
import okhttp3.MediaType;
import okhttp3.Request;
Expand All @@ -37,14 +27,25 @@
import org.radarcns.data.AvroEncoder;
import org.radarcns.data.Record;
import org.radarcns.producer.AuthenticationException;
import org.radarcns.producer.BatchedKafkaSender;
import org.radarcns.producer.KafkaSender;
import org.radarcns.producer.KafkaTopicSender;
import org.radarcns.producer.SchemaRetriever;
import org.radarcns.producer.ThreadedKafkaSender;
import org.radarcns.producer.rest.ConnectionState.State;
import org.radarcns.topic.AvroTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

/**
* RestSender sends records to the Kafka REST Proxy. It does so using an Avro JSON encoding. A new
* sender must be constructed with {@link #sender(AvroTopic)} per AvroTopic. This implementation is
Expand All @@ -58,6 +59,8 @@
@SuppressWarnings("PMD.GodClass")
public class RestSender<K, V> implements KafkaSender<K, V> {
private static final Logger logger = LoggerFactory.getLogger(RestSender.class);
private static final int LOG_CONTENT_LENGTH = 1024;

public static final String KAFKA_REST_ACCEPT_ENCODING =
"application/vnd.kafka.v2+json, application/vnd.kafka+json, application/json";
public static final String KAFKA_REST_ACCEPT_LEGACY_ENCODING =
Expand All @@ -73,14 +76,14 @@ public class RestSender<K, V> implements KafkaSender<K, V> {

private HttpUrl schemalessKeyUrl;
private HttpUrl schemalessValueUrl;
private Request isConnectedRequest;
private Request.Builder isConnectedRequest;
private SchemaRetriever schemaRetriever;
private RestClient httpClient;
private String acceptType;
private MediaType contentType;
private boolean useCompression;
private final ConnectionState state;
private List<Entry<String, String>> additionalHeaders;
private Headers additionalHeaders;

/**
* Construct a RestSender.
Expand All @@ -90,11 +93,11 @@ public class RestSender<K, V> implements KafkaSender<K, V> {
* @param valueEncoder non-null Avro encoder for values
* @param useCompression use compression to send data
* @param sharedState shared connection state
* @param additionalHeaders
* @param additionalHeaders headers to add to requests
*/
private RestSender(RestClient httpClient, SchemaRetriever schemaRetriever,
AvroEncoder keyEncoder, AvroEncoder valueEncoder, boolean useCompression,
ConnectionState sharedState, List<Entry<String, String>> additionalHeaders) {
ConnectionState sharedState, Headers additionalHeaders) {
this.schemaRetriever = schemaRetriever;
this.keyEncoder = keyEncoder;
this.valueEncoder = valueEncoder;
Expand Down Expand Up @@ -132,7 +135,7 @@ private void setRestClient(RestClient newClient) {
try {
schemalessKeyUrl = HttpUrl.get(newClient.getRelativeUrl("topics/schemaless-key"));
schemalessValueUrl = HttpUrl.get(newClient.getRelativeUrl("topics/schemaless-value"));
isConnectedRequest = newClient.requestBuilder("").head().build();
isConnectedRequest = newClient.requestBuilder("").head();
} catch (MalformedURLException ex) {
throw new IllegalArgumentException("Schemaless topics do not have a valid URL", ex);
}
Expand Down Expand Up @@ -160,7 +163,7 @@ private synchronized HttpUrl getSchemalessKeyUrl() {
}

private synchronized Request getIsConnectedRequest() {
return isConnectedRequest;
return isConnectedRequest.headers(additionalHeaders).build();
}

public synchronized void setCompression(boolean useCompression) {
Expand All @@ -171,11 +174,11 @@ private synchronized boolean hasCompression() {
return this.useCompression;
}

public synchronized List<Entry<String, String>> getHeaders() {
public synchronized Headers getHeaders() {
return additionalHeaders;
}

public synchronized void setHeaders(List<Entry<String, String>> additionalHeaders) {
public synchronized void setHeaders(Headers additionalHeaders) {
this.additionalHeaders = additionalHeaders;
}

Expand Down Expand Up @@ -243,7 +246,7 @@ public void send(List<Record<L, W>> records) throws IOException {
String content = response.body().string();
String requestContent = ((TopicRequestBody)request.body()).content();
requestContent = requestContent.substring(0,
Math.min(requestContent.length(), 255));
Math.min(requestContent.length(), LOG_CONTENT_LENGTH));
logger.error("FAILED to transmit message: {} -> {}...",
content, requestContent);
throw new IOException("Failed to submit (HTTP status code " + response.code()
Expand All @@ -253,7 +256,7 @@ public void send(List<Record<L, W>> records) throws IOException {
state.didDisconnect();
String requestContent = ((TopicRequestBody)request.body()).content();
requestContent = requestContent.substring(0,
Math.min(requestContent.length(), 255));
Math.min(requestContent.length(), LOG_CONTENT_LENGTH));
logger.error("FAILED to transmit message:\n{}...", requestContent);
throw ex;
} finally {
Expand All @@ -270,7 +273,7 @@ private Request buildRequest(List<Record<L, W>> records) throws IOException {

MediaType currentContentType;
String currentAcceptType;
List<Entry<String, String>> currentHeaders;
Headers currentHeaders;

synchronized (RestSender.this) {
currentContentType = contentType;
Expand All @@ -281,12 +284,9 @@ private Request buildRequest(List<Record<L, W>> records) throws IOException {
TopicRequestBody requestBody;
Request.Builder requestBuilder = new Request.Builder()
.url(sendToUrl)
.headers(currentHeaders)
.addHeader("Accept", currentAcceptType);

for (Map.Entry<String, String> header : currentHeaders) {
requestBuilder.addHeader(header.getKey(), header.getValue());
}

if (hasCompression()) {
requestBody = new GzipTopicRequestBody(requestData, currentContentType);
requestBuilder.addHeader("Content-Encoding", "gzip");
Expand Down Expand Up @@ -413,7 +413,7 @@ public static class Builder<K, V> {
private long timeout = 10;
private ConnectionState state;
private ManagedConnectionPool pool;
private List<Entry<String, String>> additionalHeaders;
private Headers.Builder additionalHeaders = new Headers.Builder();

public Builder<K, V> server(ServerConfig kafkaConfig) {
this.kafkaConfig = kafkaConfig;
Expand Down Expand Up @@ -452,15 +452,15 @@ public Builder<K, V> connectionPool(ManagedConnectionPool pool) {
}

public Builder<K, V> headers(List<Map.Entry<String, String>> headers) {
this.additionalHeaders = headers;
additionalHeaders = new Headers.Builder();
for (Entry<String, String> header : headers) {
additionalHeaders.add(header.getKey(), header.getValue());
}
return this;
}

public Builder<K, V> addHeader(String header, String value) {
if (additionalHeaders == null) {
additionalHeaders = new ArrayList<>();
}
additionalHeaders.add(new AbstractMap.SimpleImmutableEntry<>(header, value));
additionalHeaders.add(header, value);
return this;
}

Expand All @@ -474,7 +474,6 @@ public RestSender<K, V> build() {
}
ConnectionState useState;
ManagedConnectionPool usePool;
List<Entry<String, String>> useHeaders;

if (state != null) {
useState = state;
Expand All @@ -486,14 +485,10 @@ public RestSender<K, V> build() {
} else {
usePool = ManagedConnectionPool.GLOBAL_POOL;
}
if (additionalHeaders != null) {
useHeaders = additionalHeaders;
} else {
useHeaders = Collections.emptyList();
}

return new RestSender<>(new RestClient(kafkaConfig, timeout, usePool),
retriever, keyEncoder, valueEncoder, compression, useState, useHeaders);
retriever, keyEncoder, valueEncoder, compression, useState,
additionalHeaders.build());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package org.radarcns.producer;
package org.radarcns.producer.rest;

import com.fasterxml.jackson.core.JsonEncoding;
import com.fasterxml.jackson.core.JsonFactory;
Expand All @@ -39,9 +39,6 @@
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericContainer;
import org.radarcns.config.ServerConfig;
import org.radarcns.producer.rest.ManagedConnectionPool;
import org.radarcns.producer.rest.ParsedSchemaMetadata;
import org.radarcns.producer.rest.RestClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Loading

0 comments on commit ac8631f

Please sign in to comment.