Skip to content
This repository has been archived by the owner on Nov 28, 2023. It is now read-only.

Commit

Permalink
Merge pull request #20 from datasift/develop
Browse files Browse the repository at this point in the history
Release sent-items metric
  • Loading branch information
jamesbloomer committed Jul 20, 2015
2 parents 18bd3d9 + 675e7de commit eccc704
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 24 deletions.
Binary file modified chef/cookbooks/datasift-stats/files/default/grafana.db
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ protected void logStatistics() {
msg += "Total Kafka items read: "
+ metrics.readKafkaItem.getCount() + ". ";
msg += "Total items sent to endpoint: "
+ metrics.sendSuccess.getCount() + ".";
+ metrics.sentItems.getCount() + ".";

log.info(msg);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ protected String getAuthorizationToken(final DataSiftConfig config) {
public final void run() {
while (isRunning()) {
try {
String data = read();
send(data);
BulkReadValues readValues = read();
send(readValues);
} catch (InterruptedException e) {
log.error("Run loop interrupted ", e);
}
Expand All @@ -183,7 +183,7 @@ public final void run() {
*/
@VisibleForTesting
@SuppressWarnings("checkstyle:designforextension")
protected String read() {
protected BulkReadValues read() {
final long start = System.nanoTime();
StringBuilder buffer = new StringBuilder();
int loop = 0;
Expand All @@ -200,7 +200,7 @@ protected String read() {
log.debug("Read {} items from Kafka", read);
metrics.readKafkaItemsFromConsumer.mark();

return buffer.toString();
return new BulkReadValues(read, buffer.toString());
}

/**
Expand Down Expand Up @@ -230,28 +230,29 @@ protected boolean getDataFromKafka(final StringBuilder buffer) {
/**
* Reads from Kafka and sends to the DataSift ingestion endpoint.
* Deals with back-offs if unsuccessful.
* @param data the data to post
* @param readValues the data to post
* @throws InterruptedException if the waits are interrupted
*/
@SuppressWarnings("checkstyle:designforextension")
protected void send(final String data) throws InterruptedException {
protected void send(final BulkReadValues readValues) throws InterruptedException {
log.debug("send()");
HttpResponse response = null;

try {
if (data.equals("")) {
if (readValues.getData().equals("")) {
return;
}

final Timer.Context context = metrics.bulkPostTime.time();
response = post(data);
response = post(readValues.getData());
context.stop();

int statusCode = response.getStatusLine().getStatusCode();
if (statusCode == HttpStatus.SC_OK) {
simpleConsumerManager.commit();
String body = EntityUtils.toString(response.getEntity());
metrics.sendSuccess.mark();
metrics.sentItems.mark(readValues.getItemsRead());
backoff.reset();
log.trace("Data successfully sent to ingestion endpoint: {}", body);
log.debug("Data successfully sent to ingestion endpoint: hash {}", body.hashCode());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.datasift.connector.writer;

/**
* Container class to return values read.
*/
public class BulkReadValues {

/**
* The number of items read.
*/
private int itemsRead;

/**
* The data read.
*/
private String data;

/**
* Get the number of items read.
* @return the number of items read
*/
public final int getItemsRead() {
return this.itemsRead;
}

/**
* Get the data.
* @return the data
*/
public final String getData() {
return this.data;
}

/**
* The constructor.
* @param itemsRead the number of items read
* @param data the data
*/
public BulkReadValues(final int itemsRead, final String data) {
this.itemsRead = itemsRead;
this.data = data;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public Metrics(final MetricRegistry registry) {
registry.meter("http-connection-closed");
this.sendAttempt = registry.meter("sent-attempt");
this.sendSuccess = registry.meter("send-success");
this.sentItems = registry.meter("sent-items");
this.sendError = registry.meter("send-error");
this.sendRateLimit = registry.meter("send-rate-limit");
this.sendException = registry.meter("send-exception");
Expand Down Expand Up @@ -97,10 +98,15 @@ public MetricRegistry getRegistry() {
public Meter sendAttempt;

/**
* The meter for measuring items successfully sent to the HTTP endpoint.
* The meter for measuring successful posts sent the HTTP endpoint.
*/
public Meter sendSuccess;

/**
* The meter for measuring items successfully sent to the HTTP endpoint.
*/
public Meter sentItems;

/**
* The meter for measuring errors sending to the HTTP endpoint.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public void send_should_reset_backoff_on_success() throws Exception {

BulkManager bm = new BulkManager(config, consumer, backoff, metrics);
bm.setLogger(log);
bm.send("{}");
bm.send(new BulkReadValues(0, "{}"));

verify(log).debug("Reset backoff time");
}
Expand Down Expand Up @@ -137,7 +137,7 @@ public void send_should_wait_until_if_413() throws Exception {

BulkManager bm = new BulkManager(config, consumer, backoff, metrics);
bm.setLogger(log);
bm.send("{}");
bm.send(new BulkReadValues(0, "{}"));

verify(backoff).waitUntil(1000);
ArgumentCaptor<Date> arg = ArgumentCaptor.forClass(Date.class);
Expand Down Expand Up @@ -169,7 +169,7 @@ public void send_should_backoff_exponentially_if_status_code_400_plus() throws I

BulkManager bm = new BulkManager(config, consumer, backoff, metrics);
bm.setLogger(log);
bm.send("{}");
bm.send(new BulkReadValues(0, "{}"));

verify(backoff).exponentialBackoff();
verify(log).error("Error code returned from ingestion endpoint, status = {}", 503);
Expand All @@ -190,7 +190,7 @@ public void send_should_backoff_linearly_on_exception() throws InterruptedExcept
SimpleConsumerManager consumer = mock(SimpleConsumerManager.class);
BulkManager bm = new BulkManager(config, consumer, backoff, metrics);
bm.setLogger(log);
bm.send("{}");
bm.send(new BulkReadValues(0, "{}"));

verify(backoff).linearBackoff();
verify(log).error(eq("Could not connect to ingestion endpoint"), any(Exception.class));
Expand All @@ -199,7 +199,7 @@ public void send_should_backoff_linearly_on_exception() throws InterruptedExcept
@Test
public void should_not_send_if_buffer_empty() throws Exception {
BulkManager bm = new BulkManager(null, null, null, null);
bm.send(new StringBuilder().toString());
bm.send(new BulkReadValues(0, new StringBuilder().toString()));
// Will throw a NullPointerException if allowed to carry on
}

Expand Down Expand Up @@ -263,13 +263,13 @@ public void send_should_time_post() throws Exception {
BulkManager bm = new BulkManager(config, consumer, backoff, metrics);
bm.setLogger(log);

bm.send("{}");
bm.send(new BulkReadValues(0, "{}"));

verify(context).stop();
}

@Test
public void should_mark_sent_success_on_200() throws Exception {
public void should_mark_success_metrics_on_200() throws Exception {
Thread.sleep(1000); // Give WireMock time
stubFor(post(urlEqualTo("/SOURCEID"))
.willReturn(aResponse()
Expand All @@ -294,9 +294,10 @@ public void should_mark_sent_success_on_200() throws Exception {
Backoff backoff = mock(Backoff.class);
BulkManager bm = new BulkManager(config, consumer, backoff, metrics);

bm.send("{}");
bm.send(new BulkReadValues(3, "{}"));

assertEquals(1, metrics.sendSuccess.getCount());
assertEquals(3, metrics.sentItems.getCount());
}

@Test
Expand Down Expand Up @@ -325,7 +326,7 @@ public void should_mark_sent_error_on_400() throws Exception {
BulkManager bm = new BulkManager(config, consumer, backoff, metrics);
bm.setLogger(log);

bm.send("{}");
bm.send(new BulkReadValues(0, "{}"));

assertEquals(1, metrics.sendError.getCount());
}
Expand Down Expand Up @@ -355,7 +356,7 @@ public void should_mark_sent_error_on_413() throws Exception {
Metrics metrics = new Metrics();
BulkManager bm = new BulkManager(config, consumer, backoff, metrics);

bm.send("{}");
bm.send(new BulkReadValues(0, "{}"));

assertEquals(1, metrics.sendRateLimit.getCount());
}
Expand Down Expand Up @@ -383,7 +384,7 @@ public void should_mark_sent_exception_on_exception() throws Exception {
BulkManager bm = new BulkManager(config, consumer, backoff, metrics);
bm.setLogger(log);

bm.send("{}");
bm.send(new BulkReadValues(0, "{}"));

assertEquals(1, metrics.sendException.getCount());
}
Expand Down Expand Up @@ -472,7 +473,7 @@ public void read_should_read_for_configured_interval() {
bm.setLogger(log);

long before = System.nanoTime();
String data = bm.read();
String data = bm.read().getData();
long after = System.nanoTime();

assertTrue(after - before >= TimeUnit.MILLISECONDS.toNanos(config.bulkInterval));
Expand All @@ -495,7 +496,7 @@ public void read_should_read_for_configured_items() {
BulkManager bm = new BulkManager(config, scm, null, metrics);
bm.setLogger(log);

String data = bm.read();
String data = bm.read().getData();

assertEquals("ONE\r\nTWO", data);
assertEquals(1, metrics.readKafkaItemsFromConsumer.getCount());
Expand All @@ -518,7 +519,7 @@ public void read_should_read_for_configured_size() {
BulkManager bm = new BulkManager(config, scm, null, metrics);
bm.setLogger(log);

String data = bm.read();
String data = bm.read().getData();

assertEquals("ONE\r\nTWO", data);
assertEquals(1, metrics.readKafkaItemsFromConsumer.getCount());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.datasift.connector.writer;

import org.junit.Test;

import static org.junit.Assert.*;

public class TestBulkReadValues {

@Test
public void can_set_and_get_properties() {
BulkReadValues brv = new BulkReadValues(42, "DATA");
assertEquals(42, brv.getItemsRead());
assertEquals("DATA", brv.getData());
}
}

0 comments on commit eccc704

Please sign in to comment.