Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed issue #84: Added support for TCP-Keepalive #113

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 20 additions & 6 deletions src/main/java/org/graylog2/GelfTCPSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,41 @@ public class GelfTCPSender implements GelfSender {
private boolean shutdown = false;
private InetAddress host;
private int port;
private boolean keepalive;
private Socket socket;
private OutputStream os;

public GelfTCPSender() {
}

public GelfTCPSender(String host, int port) throws IOException {
public GelfTCPSender(String host, int port, boolean keepalive) throws IOException {
this.host = InetAddress.getByName(host);
this.port = port;
this.socket = new Socket(host, port);
this.keepalive = keepalive;
initConnection();
}

private void initConnection() throws IOException {
this.socket = createSocket();
this.os = socket.getOutputStream();
}

private Socket createSocket() throws UnknownHostException, IOException {
Socket socket = new Socket(host, port);
socket.setKeepAlive(keepalive);
return socket;
}

public GelfSenderResult sendMessage(GelfMessage message) {
if (shutdown || !message.isValid()) {
return GelfSenderResult.MESSAGE_NOT_VALID_OR_SHUTTING_DOWN;
}

try {
// reconnect if necessary
if (socket == null || os == null) {
socket = new Socket(host, port);
os = socket.getOutputStream();
if (!isConnectionInitialized()) {
initConnection();
}

os.write(message.toTCPBuffer().array());

return GelfSenderResult.OK;
Expand All @@ -43,6 +53,10 @@ public GelfSenderResult sendMessage(GelfMessage message) {
}
}

private boolean isConnectionInitialized() {
return socket != null && os != null;
}

public void close() {
shutdown = true;
try {
Expand Down
15 changes: 12 additions & 3 deletions src/main/java/org/graylog2/log/GelfAppender.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class GelfAppender extends AppenderSkeleton implements GelfMessageProvide
private int amqpMaxRetries = 0;
private static String originHost;
private int graylogPort = 12201;
private boolean tcpKeepalive;
private String facility;
private GelfSender gelfSender;
private boolean extractStacktrace;
Expand All @@ -59,6 +60,14 @@ public int getGraylogPort() {
public void setGraylogPort(int graylogPort) {
this.graylogPort = graylogPort;
}

public boolean isTcpKeepalive() {
return tcpKeepalive;
}

public void setTcpKeepalive(boolean tcpKeepalive) {
this.tcpKeepalive = tcpKeepalive;
}

public String getGraylogHost() {
return graylogHost;
Expand Down Expand Up @@ -177,7 +186,7 @@ public void activateOptions() {
try {
if (graylogHost != null && graylogHost.startsWith("tcp:")) {
String tcpGraylogHost = graylogHost.substring(4);
gelfSender = getGelfTCPSender(tcpGraylogHost, graylogPort);
gelfSender = getGelfTCPSender(tcpGraylogHost, graylogPort, tcpKeepalive);
} else if (graylogHost != null && graylogHost.startsWith("udp:")) {
String udpGraylogHost = graylogHost.substring(4);
gelfSender = getGelfUDPSender(udpGraylogHost, graylogPort);
Expand Down Expand Up @@ -206,8 +215,8 @@ protected GelfUDPSender getGelfUDPSender(String udpGraylogHost, int graylogPort)
return new GelfUDPSender(udpGraylogHost, graylogPort);
}

protected GelfTCPSender getGelfTCPSender(String tcpGraylogHost, int graylogPort) throws IOException {
return new GelfTCPSender(tcpGraylogHost, graylogPort);
protected GelfTCPSender getGelfTCPSender(String tcpGraylogHost, int graylogPort, boolean tcpKeepalive) throws IOException {
return new GelfTCPSender(tcpGraylogHost, graylogPort, tcpKeepalive);
}

protected GelfAMQPSender getGelfAMQPSender(String amqpURI, String amqpExchangeName, String amqpRoutingKey, int amqpMaxRetries) throws IOException, URISyntaxException, NoSuchAlgorithmException, KeyManagementException {
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/org/graylog2/logging/GelfHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class GelfHandler
private String amqpRoutingKey;
private int amqpMaxRetries;
private String originHost;
private boolean tcpKeepalive;
private int graylogPort;
private String facility;
private GelfSender gelfSender;
Expand All @@ -40,6 +41,7 @@ public GelfHandler() {
graylogHost = manager.getProperty(prefix + ".graylogHost");
final String port = manager.getProperty(prefix + ".graylogPort");
graylogPort = null == port ? 12201 : Integer.parseInt(port);
tcpKeepalive = "true".equalsIgnoreCase(manager.getProperty(prefix + ".tcpKeepalive"));
originHost = manager.getProperty(prefix + ".originHost");
extractStacktrace = "true".equalsIgnoreCase(manager.getProperty(prefix + ".extractStacktrace"));
int fieldNumber = 0;
Expand Down Expand Up @@ -128,7 +130,7 @@ public synchronized void publish(final LogRecord record) {
try {
if (graylogHost.startsWith("tcp:")) {
String tcpGraylogHost = graylogHost.substring(4, graylogHost.length());
gelfSender = new GelfTCPSender(tcpGraylogHost, graylogPort);
gelfSender = new GelfTCPSender(tcpGraylogHost, graylogPort, tcpKeepalive);
} else if (graylogHost.startsWith("udp:")) {
String udpGraylogHost = graylogHost.substring(4, graylogHost.length());
gelfSender = new GelfUDPSender(udpGraylogHost, graylogPort);
Expand Down
6 changes: 3 additions & 3 deletions src/test/java/org/graylog2/log/GelfAppenderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,8 @@ protected GelfUDPSender getGelfUDPSender(String udpGraylogHost, int port) throws
}

@Override
protected GelfTCPSender getGelfTCPSender(String tcpGraylogHost, int port) throws IOException {
return new MockGelfTCPSender(tcpGraylogHost, port);
protected GelfTCPSender getGelfTCPSender(String tcpGraylogHost, int port, boolean tcpKeepalive) throws IOException {
return new MockGelfTCPSender(tcpGraylogHost, port, tcpKeepalive);
}

};
Expand Down Expand Up @@ -264,7 +264,7 @@ private MockGelfUDPSender(String host, int port) throws IOException {

private class MockGelfTCPSender extends GelfTCPSender {

private MockGelfTCPSender(String host, int port) throws IOException {
private MockGelfTCPSender(String host, int port, boolean keepalive) throws IOException {
if (host.contains("tcp:")) {
throw new UnknownHostException("tcp: found in host");
}
Expand Down