diff --git a/chef/cookbooks/datasift-stats/files/default/grafana.db b/chef/cookbooks/datasift-stats/files/default/grafana.db index 72a77b1..1bdfb2d 100644 Binary files a/chef/cookbooks/datasift-stats/files/default/grafana.db and b/chef/cookbooks/datasift-stats/files/default/grafana.db differ diff --git a/datasift-writer/src/main/java/com/datasift/connector/DataSiftWriter.java b/datasift-writer/src/main/java/com/datasift/connector/DataSiftWriter.java index 39c9d16..374092e 100644 --- a/datasift-writer/src/main/java/com/datasift/connector/DataSiftWriter.java +++ b/datasift-writer/src/main/java/com/datasift/connector/DataSiftWriter.java @@ -345,7 +345,7 @@ protected void logStatistics() { msg += "Total Kafka items read: " + metrics.readKafkaItem.getCount() + ". "; msg += "Total items sent to endpoint: " - + metrics.sendSuccess.getCount() + "."; + + metrics.sentItems.getCount() + "."; log.info(msg); } diff --git a/datasift-writer/src/main/java/com/datasift/connector/writer/BulkManager.java b/datasift-writer/src/main/java/com/datasift/connector/writer/BulkManager.java index e4f196d..8f4efc2 100644 --- a/datasift-writer/src/main/java/com/datasift/connector/writer/BulkManager.java +++ b/datasift-writer/src/main/java/com/datasift/connector/writer/BulkManager.java @@ -167,8 +167,8 @@ protected String getAuthorizationToken(final DataSiftConfig config) { public final void run() { while (isRunning()) { try { - String data = read(); - send(data); + BulkReadValues readValues = read(); + send(readValues); } catch (InterruptedException e) { log.error("Run loop interrupted ", e); } @@ -183,7 +183,7 @@ public final void run() { */ @VisibleForTesting @SuppressWarnings("checkstyle:designforextension") - protected String read() { + protected BulkReadValues read() { final long start = System.nanoTime(); StringBuilder buffer = new StringBuilder(); int loop = 0; @@ -200,7 +200,7 @@ protected String read() { log.debug("Read {} items from Kafka", read); metrics.readKafkaItemsFromConsumer.mark(); - return buffer.toString(); + return new BulkReadValues(read, buffer.toString()); } /** @@ -230,21 +230,21 @@ protected boolean getDataFromKafka(final StringBuilder buffer) { /** * Reads from Kafka and sends to the DataSift ingestion endpoint. * Deals with back-offs if unsuccessful. - * @param data the data to post + * @param readValues the data to post * @throws InterruptedException if the waits are interrupted */ @SuppressWarnings("checkstyle:designforextension") - protected void send(final String data) throws InterruptedException { + protected void send(final BulkReadValues readValues) throws InterruptedException { log.debug("send()"); HttpResponse response = null; try { - if (data.equals("")) { + if (readValues.getData().equals("")) { return; } final Timer.Context context = metrics.bulkPostTime.time(); - response = post(data); + response = post(readValues.getData()); context.stop(); int statusCode = response.getStatusLine().getStatusCode(); @@ -252,6 +252,7 @@ protected void send(final String data) throws InterruptedException { simpleConsumerManager.commit(); String body = EntityUtils.toString(response.getEntity()); metrics.sendSuccess.mark(); + metrics.sentItems.mark(readValues.getItemsRead()); backoff.reset(); log.trace("Data successfully sent to ingestion endpoint: {}", body); log.debug("Data successfully sent to ingestion endpoint: hash {}", body.hashCode()); diff --git a/datasift-writer/src/main/java/com/datasift/connector/writer/BulkReadValues.java b/datasift-writer/src/main/java/com/datasift/connector/writer/BulkReadValues.java new file mode 100644 index 0000000..8b233a1 --- /dev/null +++ b/datasift-writer/src/main/java/com/datasift/connector/writer/BulkReadValues.java @@ -0,0 +1,43 @@ +package com.datasift.connector.writer; + +/** + * Container class to return values read. + */ +public class BulkReadValues { + + /** + * The number of items read. + */ + private int itemsRead; + + /** + * The data read. + */ + private String data; + + /** + * Get the number of items read. + * @return the number of items read + */ + public final int getItemsRead() { + return this.itemsRead; + } + + /** + * Get the data. + * @return the data + */ + public final String getData() { + return this.data; + } + + /** + * The constructor. + * @param itemsRead the number of items read + * @param data the data + */ + public BulkReadValues(final int itemsRead, final String data) { + this.itemsRead = itemsRead; + this.data = data; + } +} diff --git a/datasift-writer/src/main/java/com/datasift/connector/writer/Metrics.java b/datasift-writer/src/main/java/com/datasift/connector/writer/Metrics.java index 0b29184..fcc72cf 100644 --- a/datasift-writer/src/main/java/com/datasift/connector/writer/Metrics.java +++ b/datasift-writer/src/main/java/com/datasift/connector/writer/Metrics.java @@ -37,6 +37,7 @@ public Metrics(final MetricRegistry registry) { registry.meter("http-connection-closed"); this.sendAttempt = registry.meter("sent-attempt"); this.sendSuccess = registry.meter("send-success"); + this.sentItems = registry.meter("sent-items"); this.sendError = registry.meter("send-error"); this.sendRateLimit = registry.meter("send-rate-limit"); this.sendException = registry.meter("send-exception"); @@ -97,10 +98,15 @@ public MetricRegistry getRegistry() { public Meter sendAttempt; /** - * The meter for measuring items successfully sent to the HTTP endpoint. + * The meter for measuring successful posts sent the HTTP endpoint. */ public Meter sendSuccess; + /** + * The meter for measuring items successfully sent to the HTTP endpoint. + */ + public Meter sentItems; + /** * The meter for measuring errors sending to the HTTP endpoint. */ diff --git a/datasift-writer/src/test/java/com/datasift/connector/writer/TestBulkManager.java b/datasift-writer/src/test/java/com/datasift/connector/writer/TestBulkManager.java index a0c9701..c00c473 100644 --- a/datasift-writer/src/test/java/com/datasift/connector/writer/TestBulkManager.java +++ b/datasift-writer/src/test/java/com/datasift/connector/writer/TestBulkManager.java @@ -108,7 +108,7 @@ public void send_should_reset_backoff_on_success() throws Exception { BulkManager bm = new BulkManager(config, consumer, backoff, metrics); bm.setLogger(log); - bm.send("{}"); + bm.send(new BulkReadValues(0, "{}")); verify(log).debug("Reset backoff time"); } @@ -137,7 +137,7 @@ public void send_should_wait_until_if_413() throws Exception { BulkManager bm = new BulkManager(config, consumer, backoff, metrics); bm.setLogger(log); - bm.send("{}"); + bm.send(new BulkReadValues(0, "{}")); verify(backoff).waitUntil(1000); ArgumentCaptor arg = ArgumentCaptor.forClass(Date.class); @@ -169,7 +169,7 @@ public void send_should_backoff_exponentially_if_status_code_400_plus() throws I BulkManager bm = new BulkManager(config, consumer, backoff, metrics); bm.setLogger(log); - bm.send("{}"); + bm.send(new BulkReadValues(0, "{}")); verify(backoff).exponentialBackoff(); verify(log).error("Error code returned from ingestion endpoint, status = {}", 503); @@ -190,7 +190,7 @@ public void send_should_backoff_linearly_on_exception() throws InterruptedExcept SimpleConsumerManager consumer = mock(SimpleConsumerManager.class); BulkManager bm = new BulkManager(config, consumer, backoff, metrics); bm.setLogger(log); - bm.send("{}"); + bm.send(new BulkReadValues(0, "{}")); verify(backoff).linearBackoff(); verify(log).error(eq("Could not connect to ingestion endpoint"), any(Exception.class)); @@ -199,7 +199,7 @@ public void send_should_backoff_linearly_on_exception() throws InterruptedExcept @Test public void should_not_send_if_buffer_empty() throws Exception { BulkManager bm = new BulkManager(null, null, null, null); - bm.send(new StringBuilder().toString()); + bm.send(new BulkReadValues(0, new StringBuilder().toString())); // Will throw a NullPointerException if allowed to carry on } @@ -263,13 +263,13 @@ public void send_should_time_post() throws Exception { BulkManager bm = new BulkManager(config, consumer, backoff, metrics); bm.setLogger(log); - bm.send("{}"); + bm.send(new BulkReadValues(0, "{}")); verify(context).stop(); } @Test - public void should_mark_sent_success_on_200() throws Exception { + public void should_mark_success_metrics_on_200() throws Exception { Thread.sleep(1000); // Give WireMock time stubFor(post(urlEqualTo("/SOURCEID")) .willReturn(aResponse() @@ -294,9 +294,10 @@ public void should_mark_sent_success_on_200() throws Exception { Backoff backoff = mock(Backoff.class); BulkManager bm = new BulkManager(config, consumer, backoff, metrics); - bm.send("{}"); + bm.send(new BulkReadValues(3, "{}")); assertEquals(1, metrics.sendSuccess.getCount()); + assertEquals(3, metrics.sentItems.getCount()); } @Test @@ -325,7 +326,7 @@ public void should_mark_sent_error_on_400() throws Exception { BulkManager bm = new BulkManager(config, consumer, backoff, metrics); bm.setLogger(log); - bm.send("{}"); + bm.send(new BulkReadValues(0, "{}")); assertEquals(1, metrics.sendError.getCount()); } @@ -355,7 +356,7 @@ public void should_mark_sent_error_on_413() throws Exception { Metrics metrics = new Metrics(); BulkManager bm = new BulkManager(config, consumer, backoff, metrics); - bm.send("{}"); + bm.send(new BulkReadValues(0, "{}")); assertEquals(1, metrics.sendRateLimit.getCount()); } @@ -383,7 +384,7 @@ public void should_mark_sent_exception_on_exception() throws Exception { BulkManager bm = new BulkManager(config, consumer, backoff, metrics); bm.setLogger(log); - bm.send("{}"); + bm.send(new BulkReadValues(0, "{}")); assertEquals(1, metrics.sendException.getCount()); } @@ -472,7 +473,7 @@ public void read_should_read_for_configured_interval() { bm.setLogger(log); long before = System.nanoTime(); - String data = bm.read(); + String data = bm.read().getData(); long after = System.nanoTime(); assertTrue(after - before >= TimeUnit.MILLISECONDS.toNanos(config.bulkInterval)); @@ -495,7 +496,7 @@ public void read_should_read_for_configured_items() { BulkManager bm = new BulkManager(config, scm, null, metrics); bm.setLogger(log); - String data = bm.read(); + String data = bm.read().getData(); assertEquals("ONE\r\nTWO", data); assertEquals(1, metrics.readKafkaItemsFromConsumer.getCount()); @@ -518,7 +519,7 @@ public void read_should_read_for_configured_size() { BulkManager bm = new BulkManager(config, scm, null, metrics); bm.setLogger(log); - String data = bm.read(); + String data = bm.read().getData(); assertEquals("ONE\r\nTWO", data); assertEquals(1, metrics.readKafkaItemsFromConsumer.getCount()); diff --git a/datasift-writer/src/test/java/com/datasift/connector/writer/TestBulkReadValues.java b/datasift-writer/src/test/java/com/datasift/connector/writer/TestBulkReadValues.java new file mode 100644 index 0000000..af6a519 --- /dev/null +++ b/datasift-writer/src/test/java/com/datasift/connector/writer/TestBulkReadValues.java @@ -0,0 +1,15 @@ +package com.datasift.connector.writer; + +import org.junit.Test; + +import static org.junit.Assert.*; + +public class TestBulkReadValues { + + @Test + public void can_set_and_get_properties() { + BulkReadValues brv = new BulkReadValues(42, "DATA"); + assertEquals(42, brv.getItemsRead()); + assertEquals("DATA", brv.getData()); + } +}