Skip to content

Commit

Permalink
extract a class for the UDP sending, making it easier to play with ba…
Browse files Browse the repository at this point in the history
…tching implementations for issue #15
  • Loading branch information
scarytom committed Aug 26, 2014
1 parent 26765c9 commit be0b4fd
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 59 deletions.
64 changes: 5 additions & 59 deletions src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,8 @@
package com.timgroup.statsd;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.charset.Charset;
import java.text.NumberFormat;
import java.util.Locale;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

/**
* A simple StatsD client implementation facilitating metrics recording.
Expand Down Expand Up @@ -46,18 +39,7 @@ public final class NonBlockingStatsDClient extends ConvenienceMethodProvidingSta
};

private final String prefix;
private final DatagramChannel clientSocket;
private final StatsDClientErrorHandler handler;

private final ExecutorService executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
final ThreadFactory delegate = Executors.defaultThreadFactory();
@Override public Thread newThread(Runnable r) {
Thread result = delegate.newThread(r);
result.setName("StatsD-" + result.getName());
result.setDaemon(true);
return result;
}
});
private final NonBlockingUdpSender sender;

/**
* Create a new StatsD client communicating with a StatsD instance on the
Expand Down Expand Up @@ -106,11 +88,9 @@ public NonBlockingStatsDClient(String prefix, String hostname, int port) throws
*/
public NonBlockingStatsDClient(String prefix, String hostname, int port, StatsDClientErrorHandler errorHandler) throws StatsDClientException {
this.prefix = (prefix == null || prefix.trim().isEmpty()) ? "" : (prefix.trim() + ".");
this.handler = errorHandler;


try {
this.clientSocket = DatagramChannel.open();
this.clientSocket.connect(new InetSocketAddress(hostname, port));
this.sender = new NonBlockingUdpSender(hostname, port, STATS_D_ENCODING, errorHandler);
} catch (Exception e) {
throw new StatsDClientException("Failed to start StatsD client", e);
}
Expand All @@ -122,23 +102,7 @@ public NonBlockingStatsDClient(String prefix, String hostname, int port, StatsDC
*/
@Override
public void stop() {
try {
executor.shutdown();
executor.awaitTermination(30, TimeUnit.SECONDS);
}
catch (Exception e) {
handler.handle(e);
}
finally {
if (clientSocket != null) {
try {
clientSocket.close();
}
catch (Exception e) {
handler.handle(e);
}
}
}
sender.stop();
}

/**
Expand Down Expand Up @@ -239,25 +203,7 @@ private String messageFor(String aspect, String value, String type, double sampl
}

private void send(final String message) {
try {
executor.execute(new Runnable() {
@Override public void run() {
blockingSend(message);
}
});
}
catch (Exception e) {
handler.handle(e);
}
}

private void blockingSend(String message) {
try {
final byte[] sendData = message.getBytes(STATS_D_ENCODING);
clientSocket.write(ByteBuffer.wrap(sendData));
} catch (Exception e) {
handler.handle(e);
}
sender.send(message);
}

private String stringValueOf(double value) {
Expand Down
77 changes: 77 additions & 0 deletions src/main/java/com/timgroup/statsd/NonBlockingUdpSender.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package com.timgroup.statsd;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.charset.Charset;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

public final class NonBlockingUdpSender {
private final Charset encoding;
private final DatagramChannel clientSocket;
private final ExecutorService executor;
private StatsDClientErrorHandler handler;

public NonBlockingUdpSender(String hostname, int port, Charset encoding, StatsDClientErrorHandler handler) throws IOException {
this.encoding = encoding;
this.handler = handler;
this.clientSocket = DatagramChannel.open();
this.clientSocket.connect(new InetSocketAddress(hostname, port));

this.executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
final ThreadFactory delegate = Executors.defaultThreadFactory();
@Override public Thread newThread(Runnable r) {
Thread result = delegate.newThread(r);
result.setName("StatsD-" + result.getName());
result.setDaemon(true);
return result;
}
});
}

public void stop() {
try {
executor.shutdown();
executor.awaitTermination(30, TimeUnit.SECONDS);
}
catch (Exception e) {
handler.handle(e);
}
finally {
if (clientSocket != null) {
try {
clientSocket.close();
}
catch (Exception e) {
handler.handle(e);
}
}
}
}

public void send(final String message) {
try {
executor.execute(new Runnable() {
@Override public void run() {
blockingSend(message);
}
});
}
catch (Exception e) {
handler.handle(e);
}
}

private void blockingSend(String message) {
try {
final byte[] sendData = message.getBytes(encoding);
clientSocket.write(ByteBuffer.wrap(sendData));
} catch (Exception e) {
handler.handle(e);
}
}
}

0 comments on commit be0b4fd

Please sign in to comment.