From b55fc6a54ea776a817a2d7dd971b1d69b0d0cdf3 Mon Sep 17 00:00:00 2001 From: Joris Borgdorff Date: Wed, 22 Nov 2017 10:33:42 +0100 Subject: [PATCH 1/4] Snapshot version --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 9d4bc7a4..8d340d07 100644 --- a/build.gradle +++ b/build.gradle @@ -36,7 +36,7 @@ allprojects { // Configuration // //---------------------------------------------------------------------------// - version = '0.6' + version = '0.6.1-SNAPSHOT' group = 'org.radarcns' ext.githubRepoName = 'RADAR-CNS/RADAR-Commons' From 7772f84e7c084f93478492bcb07e5130b5e7c696 Mon Sep 17 00:00:00 2001 From: Joris Borgdorff Date: Wed, 22 Nov 2017 10:52:28 +0100 Subject: [PATCH 2/4] More elegant URL/Response handling --- .../org/radarcns/config/ServerConfig.java | 53 ++++++++++++++----- .../radarcns/producer/rest/RestClient.java | 39 ++++++++++---- .../radarcns/producer/rest/RestSender.java | 53 ++++++++++--------- .../producer/rest/TopicRequestBody.java | 9 ++++ .../org/radarcns/config/ServerConfigTest.java | 25 +++++++++ .../producer/rest/RestClientTest.java | 10 ++-- 6 files changed, 135 insertions(+), 54 deletions(-) diff --git a/src/main/java/org/radarcns/config/ServerConfig.java b/src/main/java/org/radarcns/config/ServerConfig.java index 13478e32..a23b8ca0 100644 --- a/src/main/java/org/radarcns/config/ServerConfig.java +++ b/src/main/java/org/radarcns/config/ServerConfig.java @@ -18,6 +18,8 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonSetter; +import okhttp3.HttpUrl; + import java.net.InetSocketAddress; import java.net.MalformedURLException; import java.net.Proxy; @@ -95,10 +97,34 @@ public static String getPaths(List configList) { * Get the server as a URL. * * @return URL to the server. - * @throws MalformedURLException if protocol is not set or the host name is invalid. + * @throws IllegalStateException if the URL is invalid */ - public URL getUrl() throws MalformedURLException { - return new URL(protocol, host, port, path == null ? "" : path); + public URL getUrl() { + try { + return new URL(protocol, host, port, path == null ? "" : path); + } catch (MalformedURLException ex) { + throw new IllegalStateException("Already parsed a URL but it turned out invalid", ex); + } + } + + /** + * Get the server as an HttpUrl. + * @return HttpUrl to the server + * @throws IllegalStateException if the URL is invalid + */ + public HttpUrl getHttpUrl() { + HttpUrl.Builder urlBuilder = new HttpUrl.Builder() + .scheme(protocol) + .host(host); + + if (port != -1) { + urlBuilder.port(port); + } + if (path != null) { + urlBuilder.encodedPath(path); + } + + return urlBuilder.build(); } /** @@ -185,11 +211,15 @@ public final void setPath(String path) { throw new IllegalArgumentException("Cannot set server path with query string"); } else { this.path = path.trim(); - if (!this.path.isEmpty() && this.path.charAt(0) != '/') { - this.path = '/' + this.path; - } - if (!this.path.isEmpty() && this.path.charAt(this.path.length() - 1) != '/') { - this.path += '/'; + if (this.path.isEmpty()) { + this.path = "/"; + } else { + if (this.path.charAt(0) != '/') { + this.path = '/' + this.path; + } + if (this.path.charAt(this.path.length() - 1) != '/') { + this.path += '/'; + } } } } @@ -218,12 +248,7 @@ public boolean equals(Object other) { @Override public int hashCode() { - int result = host != null ? host.hashCode() : 0; - result = 31 * result + port; - result = 31 * result + (protocol != null ? protocol.hashCode() : 0); - result = 31 * result + (proxyHost != null ? proxyHost.hashCode() : 0); - result = 31 * result + proxyPort; - return result; + return Objects.hash(protocol, host, port); } public boolean isUnsafe() { diff --git a/src/main/java/org/radarcns/producer/rest/RestClient.java b/src/main/java/org/radarcns/producer/rest/RestClient.java index 7986806e..b9876408 100644 --- a/src/main/java/org/radarcns/producer/rest/RestClient.java +++ b/src/main/java/org/radarcns/producer/rest/RestClient.java @@ -16,6 +16,8 @@ package org.radarcns.producer.rest; +import okhttp3.Callback; +import okhttp3.HttpUrl; import okhttp3.OkHttpClient; import okhttp3.OkHttpClient.Builder; import okhttp3.Request; @@ -32,7 +34,6 @@ import java.io.Closeable; import java.io.IOException; import java.net.MalformedURLException; -import java.net.URL; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; import java.security.cert.CertificateException; @@ -185,6 +186,17 @@ public Response request(Request request) throws IOException { return httpClient.newCall(request).execute(); } + /** + * Make an asynchronous request. + * @param request request, possibly built with {@link #requestBuilder(String)} + * @param callback callback to activate once the request is done. + */ + public void request(Request request, Callback callback) { + Objects.requireNonNull(request); + Objects.requireNonNull(callback); + httpClient.newCall(request).enqueue(callback); + } + /** * Make a request to given relative path. This does not set any request properties except the * URL. @@ -207,13 +219,8 @@ public Response request(String relativePath) throws IOException { */ public String requestString(Request request) throws IOException { try (Response response = request(request)) { - ResponseBody body = response.body(); - - String bodyString = null; + String bodyString = responseBody(response); - if (body != null) { - bodyString = body.string(); - } if (!response.isSuccessful() || bodyString == null) { throw new RestException(response.code(), bodyString); } @@ -223,7 +230,7 @@ public String requestString(Request request) throws IOException { } /** - * Create a OkHttp3 request builder with {@link Request.Builder#url(URL)} set. + * Create a OkHttp3 request builder with {@link Request.Builder#url(HttpUrl)} set. * Call{@link Request.Builder#build()} to make the actual request with * {@link #request(Request)}. * @@ -241,12 +248,16 @@ public Request.Builder requestBuilder(String relativePath) throws MalformedURLEx * @return URL * @throws MalformedURLException if the path is malformed */ - public URL getRelativeUrl(String path) throws MalformedURLException { + public HttpUrl getRelativeUrl(String path) throws MalformedURLException { String strippedPath = path; while (!strippedPath.isEmpty() && strippedPath.charAt(0) == '/') { strippedPath = strippedPath.substring(1); } - return new URL(getConfig().getUrl(), strippedPath); + HttpUrl.Builder builder = getConfig().getHttpUrl().newBuilder(strippedPath); + if (builder == null) { + throw new MalformedURLException(); + } + return builder.build(); } @Override @@ -285,4 +296,12 @@ public void close() { connectionPool.release(); } } + + public static String responseBody(Response response) throws IOException { + ResponseBody body = response.body(); + if (body == null) { + return null; + } + return body.string(); + } } diff --git a/src/main/java/org/radarcns/producer/rest/RestSender.java b/src/main/java/org/radarcns/producer/rest/RestSender.java index 9512bd81..40cd038c 100644 --- a/src/main/java/org/radarcns/producer/rest/RestSender.java +++ b/src/main/java/org/radarcns/producer/rest/RestSender.java @@ -37,7 +37,6 @@ import java.io.IOException; import java.net.MalformedURLException; -import java.net.URL; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -45,6 +44,9 @@ import java.util.Objects; import java.util.concurrent.TimeUnit; +import static org.radarcns.producer.rest.RestClient.responseBody; +import static org.radarcns.producer.rest.TopicRequestBody.topicRequestContent; + /** * RestSender sends records to the Kafka REST Proxy. It does so using an Avro JSON encoding. A new * sender must be constructed with {@link #sender(AvroTopic)} per AvroTopic. This implementation is @@ -130,8 +132,8 @@ public synchronized void setKafkaConfig(ServerConfig kafkaConfig) { private void setRestClient(RestClient newClient) { try { - schemalessKeyUrl = HttpUrl.get(newClient.getRelativeUrl("topics/schemaless-key")); - schemalessValueUrl = HttpUrl.get(newClient.getRelativeUrl("topics/schemaless-value")); + schemalessKeyUrl = newClient.getRelativeUrl("topics/schemaless-key"); + schemalessValueUrl = newClient.getRelativeUrl("topics/schemaless-value"); isConnectedRequest = newClient.requestBuilder("").head(); } catch (MalformedURLException ex) { throw new IllegalArgumentException("Schemaless topics do not have a valid URL", ex); @@ -187,11 +189,7 @@ private class RestTopicSender implements KafkaTopicSen private RestTopicSender(AvroTopic topic) throws IOException { this.topic = topic; - URL rawUrl = getRestClient().getRelativeUrl("topics/" + topic.getName()); - url = HttpUrl.get(rawUrl); - if (url == null) { - throw new MalformedURLException("Cannot parse " + rawUrl); - } + url = getRestClient().getRelativeUrl("topics/" + topic.getName()); requestData = new TopicRequestData<>(topic, keyEncoder, valueEncoder); } @@ -223,7 +221,7 @@ public void send(List> records) throws IOException { state.didConnect(); if (logger.isDebugEnabled()) { logger.debug("Added message to topic {} -> {}", - topic, response.body().string()); + topic, responseBody(response)); } lastOffsetSent = records.get(records.size() - 1).offset; } else if (response.code() == 401) { @@ -239,23 +237,10 @@ public void send(List> records) throws IOException { } doResend = true; } else { - state.didDisconnect(); - String content = response.body().string(); - String requestContent = ((TopicRequestBody)request.body()).content(); - requestContent = requestContent.substring(0, - Math.min(requestContent.length(), LOG_CONTENT_LENGTH)); - logger.error("FAILED to transmit message: {} -> {}...", - content, requestContent); - throw new IOException("Failed to submit (HTTP status code " + response.code() - + "): " + content); + logFailure(request, response, null); } } catch (IOException ex) { - state.didDisconnect(); - String requestContent = ((TopicRequestBody)request.body()).content(); - requestContent = requestContent.substring(0, - Math.min(requestContent.length(), LOG_CONTENT_LENGTH)); - logger.error("FAILED to transmit message:\n{}...", requestContent); - throw ex; + logFailure(request, null, ex); } finally { requestData.reset(); } @@ -265,6 +250,23 @@ public void send(List> records) throws IOException { } } + @SuppressWarnings("ConstantConditions") + private void logFailure(Request request, Response response, Exception ex) + throws IOException { + state.didDisconnect(); + String content = response == null ? null : responseBody(response); + int code = response == null ? -1 : response.code(); + String requestContent = topicRequestContent(request); + if (requestContent != null) { + requestContent = requestContent.substring(0, + Math.min(requestContent.length(), LOG_CONTENT_LENGTH)); + } + logger.error("FAILED to transmit message: {} -> {}...", + content, requestContent); + throw new IOException("Failed to submit (HTTP status code " + code + + "): " + content, ex); + } + private Request buildRequest(List> records) throws IOException { HttpUrl sendToUrl = updateRequestData(records); @@ -371,8 +373,9 @@ public boolean resetConnection() { return true; } else { state.didDisconnect(); + String bodyString = responseBody(response); logger.warn("Failed to make heartbeat request to {} (HTTP status code {}): {}", - httpClient, response.code(), response.body().string()); + httpClient, response.code(), bodyString); return false; } } catch (IOException ex) { diff --git a/src/main/java/org/radarcns/producer/rest/TopicRequestBody.java b/src/main/java/org/radarcns/producer/rest/TopicRequestBody.java index bbd376d1..e5836324 100644 --- a/src/main/java/org/radarcns/producer/rest/TopicRequestBody.java +++ b/src/main/java/org/radarcns/producer/rest/TopicRequestBody.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.io.OutputStream; import okhttp3.MediaType; +import okhttp3.Request; import okhttp3.RequestBody; import okio.BufferedSink; @@ -53,4 +54,12 @@ String content() throws IOException { return out.toString(); } } + + public static String topicRequestContent(Request request) throws IOException { + TopicRequestBody body = (TopicRequestBody) request.body(); + if (body == null) { + return null; + } + return body.content(); + } } diff --git a/src/test/java/org/radarcns/config/ServerConfigTest.java b/src/test/java/org/radarcns/config/ServerConfigTest.java index 1d0a352e..1f7041e9 100644 --- a/src/test/java/org/radarcns/config/ServerConfigTest.java +++ b/src/test/java/org/radarcns/config/ServerConfigTest.java @@ -23,7 +23,10 @@ import com.fasterxml.jackson.databind.ObjectReader; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import java.io.IOException; +import java.net.MalformedURLException; import java.net.URL; + +import okhttp3.HttpUrl; import org.junit.Test; /** @@ -60,4 +63,26 @@ public void jacksonUrl() throws IOException { + "path: /schema")) .getUrlString()); } + + @Test + public void getHttpUrl() throws MalformedURLException { + ServerConfig config = new ServerConfig("http://something.else/that"); + HttpUrl url = config.getHttpUrl(); + assertEquals("http://something.else/that/", url.toString()); + assertEquals("something.else", url.host()); + assertEquals("http", url.scheme()); + assertEquals(80, url.port()); + assertEquals("/that/", url.encodedPath()); + } + + @Test + public void getHttpUrlWitoutRoot() throws MalformedURLException { + ServerConfig config = new ServerConfig("http://something.else"); + HttpUrl url = config.getHttpUrl(); + assertEquals("http://something.else/", url.toString()); + assertEquals("something.else", url.host()); + assertEquals("http", url.scheme()); + assertEquals(80, url.port()); + assertEquals("/", url.encodedPath()); + } } diff --git a/src/test/java/org/radarcns/producer/rest/RestClientTest.java b/src/test/java/org/radarcns/producer/rest/RestClientTest.java index 00a4f551..6a07ed76 100644 --- a/src/test/java/org/radarcns/producer/rest/RestClientTest.java +++ b/src/test/java/org/radarcns/producer/rest/RestClientTest.java @@ -92,10 +92,10 @@ public void requestBuilder() throws Exception { @Test public void getRelativeUrl() throws Exception { - URL url = client.getRelativeUrl("myPath"); - assertEquals(server.getHostName(), url.getHost()); - assertEquals(server.getPort(), url.getPort()); - assertEquals("http", url.getProtocol()); - assertEquals("/base/myPath", url.getFile()); + HttpUrl url = client.getRelativeUrl("myPath"); + assertEquals(server.getHostName(), url.host()); + assertEquals(server.getPort(), url.port()); + assertEquals("http", url.scheme()); + assertEquals("/base/myPath", url.encodedPath()); } } \ No newline at end of file From dcf824d062a43800cae65c3b7d539844fb6aaa3a Mon Sep 17 00:00:00 2001 From: Joris Borgdorff Date: Wed, 22 Nov 2017 14:42:18 +0100 Subject: [PATCH 3/4] Fixed incompatibility with android JSON --- .../java/org/radarcns/producer/rest/TopicRequestData.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/radarcns/producer/rest/TopicRequestData.java b/src/main/java/org/radarcns/producer/rest/TopicRequestData.java index 2ca4eacd..3b3b719b 100644 --- a/src/main/java/org/radarcns/producer/rest/TopicRequestData.java +++ b/src/main/java/org/radarcns/producer/rest/TopicRequestData.java @@ -61,13 +61,13 @@ void writeToStream(OutputStream out) throws IOException { writer.append("\"key_schema_id\":").append(keySchemaId.toString()); } else { writer.append("\"key_schema\":"); - JSONObject.quote(keySchemaString, writer); + writer.append(JSONObject.quote(keySchemaString)); } if (valueSchemaId != null) { writer.append(",\"value_schema_id\":").append(valueSchemaId.toString()); } else { writer.append(",\"value_schema\":"); - JSONObject.quote(valueSchemaString, writer); + writer.append(JSONObject.quote(valueSchemaString)); } writer.append(",\"records\":["); From 42f3de356bc5a7321487e7bdec408ac8acc2f774 Mon Sep 17 00:00:00 2001 From: Joris Borgdorff Date: Thu, 23 Nov 2017 09:56:01 +0100 Subject: [PATCH 4/4] Bumped version --- README.md | 6 +++--- build.gradle | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 7d0f166b..1840fa69 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ repositories { } dependencies { - compile group: 'org.radarcns', name: 'radar-commons', version: '0.6' + compile group: 'org.radarcns', name: 'radar-commons', version: '0.6.1' } ``` @@ -26,7 +26,7 @@ repositories { } dependencies { - testCompile group: 'org.radarcns', name: 'radar-commons-testing', version: '0.6' + testCompile group: 'org.radarcns', name: 'radar-commons-testing', version: '0.6.1' } ``` @@ -51,7 +51,7 @@ configurations.all { } dependencies { - compile group: 'org.radarcns', name: 'radar-commons', version: '0.6.1-SNAPSHOT', changing: true + compile group: 'org.radarcns', name: 'radar-commons', version: '0.6.2-SNAPSHOT', changing: true } ``` diff --git a/build.gradle b/build.gradle index 8d340d07..4999cde6 100644 --- a/build.gradle +++ b/build.gradle @@ -36,7 +36,7 @@ allprojects { // Configuration // //---------------------------------------------------------------------------// - version = '0.6.1-SNAPSHOT' + version = '0.6.1' group = 'org.radarcns' ext.githubRepoName = 'RADAR-CNS/RADAR-Commons'