Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement various fixes and new features #76

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ In your `logback.xml`:
<includeMdc>false</includeMdc> <!-- optional (default false) -->
<maxMessageSize>100</maxMessageSize> <!-- optional (default -1 -->
<authentication class="com.internetitem.logback.elasticsearch.config.BasicAuthentication" /> <!-- optional -->
<objectSerialization>true</objectSerialization> <!-- optional (default false) -->
<keyPrefix>data.</keyPrefix> <!-- optional (default None) -->

<properties>
<property>
<name>host</name>
Expand Down Expand Up @@ -110,6 +113,8 @@ Configuration Reference
* `includeMdc` (optional, default false): If set to `true`, then all [MDC](http://www.slf4j.org/api/org/slf4j/MDC.html) values will be mapped to properties on the JSON payload.
* `maxMessageSize` (optional, default -1): If set to a number greater than 0, truncate messages larger than this length, then append "`..`" to denote that the message was truncated
* `authentication` (optional): Add the ability to send authentication headers (see below)
* `objectSerialization` (optional): specifies whether to use POJO to JSON serialization
* `keyPrefix` (optional): objects logged within a message will also be logged separately with this prefix added

The fields `@timestamp` and `message` are always sent and can not currently be configured. Additional fields can be sent by adding `<property>` elements to the `<properties>` set.

Expand Down
11 changes: 11 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@
<artifactId>jackson-core</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-core</artifactId>
Expand All @@ -85,6 +90,12 @@
<version>1.10.19</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>6.3</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,28 @@ public abstract class AbstractElasticsearchAppender<T> extends UnsynchronizedApp
public AbstractElasticsearchAppender() {
this.settings = new Settings();
this.headers = new HttpRequestHeaders();
registerShutdownHook();
}

public AbstractElasticsearchAppender(Settings settings) {
this.settings = settings;
registerShutdownHook();
}

private void registerShutdownHook() {
Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook()));
}

private class ShutdownHook implements Runnable {
@Override
public void run() {
stop();
if(publisher != null) {
publisher.close();
}
}
}

@Override
public void start() {
super.start();
Expand Down Expand Up @@ -138,4 +154,13 @@ public void setAuthentication(Authentication auth) {
public void setMaxMessageSize(int maxMessageSize) {
settings.setMaxMessageSize(maxMessageSize);
}

public void setKeyPrefix(String keyPrefix) {
settings.setKeyPrefix(keyPrefix);
}

public void setObjectSerialization(boolean objectSerialization) {
settings.setObjectSerialization(objectSerialization);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import ch.qos.logback.core.Context;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.internetitem.logback.elasticsearch.config.ElasticsearchProperties;
import com.internetitem.logback.elasticsearch.config.HttpRequestHeaders;
import com.internetitem.logback.elasticsearch.config.Property;
Expand Down Expand Up @@ -51,6 +52,8 @@ protected DateFormat initialValue() {

private final PropertySerializer propertySerializer;

private Thread thread;

public AbstractElasticsearchPublisher(Context context, ErrorReporter errorReporter, Settings settings, ElasticsearchProperties properties, HttpRequestHeaders headers) throws IOException {
this.errorReporter = errorReporter;
this.events = new ArrayList<T>();
Expand All @@ -59,14 +62,29 @@ public AbstractElasticsearchPublisher(Context context, ErrorReporter errorReport

this.outputAggregator = configureOutputAggregator(settings, errorReporter, headers);

this.jf = new JsonFactory();
this.jf = buildJsonFactory(settings);

this.jf.setRootValueSeparator(null);
this.jsonGenerator = jf.createGenerator(outputAggregator);

this.indexPattern = buildPropertyAndEncoder(context, new Property("<index>", settings.getIndex(), false));
this.propertyList = generatePropertyList(context, properties);

this.propertySerializer = new PropertySerializer();
}

public void close() {
if(thread != null) {
thread.interrupt();
}
}

private JsonFactory buildJsonFactory(Settings settings) {
if(settings.isObjectSerialization()) {
ObjectMapper mapper = new ObjectMapper();
return mapper.getFactory();
}
return new JsonFactory();
}

private static ElasticsearchOutputAggregator configureOutputAggregator(Settings settings, ErrorReporter errorReporter, HttpRequestHeaders httpRequestHeaders) {
Expand Down Expand Up @@ -108,7 +126,7 @@ public void addEvent(T event) {
events.add(event);
if (!working) {
working = true;
Thread thread = new Thread(this, THREAD_NAME_PREFIX + THREAD_COUNTER.getAndIncrement());
thread = new Thread(this, THREAD_NAME_PREFIX + THREAD_COUNTER.getAndIncrement());
thread.start();
}
}
Expand All @@ -119,7 +137,11 @@ public void run() {
int maxRetries = settings.getMaxRetries();
while (true) {
try {
Thread.sleep(settings.getSleepTime());
try {
Thread.sleep(settings.getSleepTime());
} catch(InterruptedException e) {
// we are waking up the thread
}

List<T> eventsCopy = null;
synchronized (lock) {
Expand Down Expand Up @@ -152,6 +174,7 @@ public void run() {
if (!outputAggregator.sendData()) {
currentTry++;
}

} catch (Exception e) {
errorReporter.logError("Internal error handling log data: " + e.getMessage(), e);
currentTry++;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.internetitem.logback.elasticsearch;

import com.internetitem.logback.elasticsearch.config.Settings;

import java.io.IOException;

public class StructuredArgsElasticsearchAppender extends ElasticsearchAppender {

public StructuredArgsElasticsearchAppender() {
}

public StructuredArgsElasticsearchAppender(Settings settings) {
super(settings);
}

protected StructuredArgsElasticsearchPublisher buildElasticsearchPublisher() throws IOException {
return new StructuredArgsElasticsearchPublisher(this.getContext(), this.errorReporter, this.settings,
this.elasticsearchProperties, this.headers);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package com.internetitem.logback.elasticsearch;

import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.Context;
import com.fasterxml.jackson.core.JsonGenerator;
import com.internetitem.logback.elasticsearch.config.ElasticsearchProperties;
import com.internetitem.logback.elasticsearch.config.HttpRequestHeaders;
import com.internetitem.logback.elasticsearch.config.Settings;
import com.internetitem.logback.elasticsearch.util.ErrorReporter;
import net.logstash.logback.marker.ObjectAppendingMarker;

import java.io.IOException;
import java.lang.reflect.Field;

public class StructuredArgsElasticsearchPublisher extends ClassicElasticsearchPublisher {
private String keyPrefix;
private Field field;
private ErrorReporter errorReporter;

public StructuredArgsElasticsearchPublisher(Context context, ErrorReporter errorReporter, Settings settings, ElasticsearchProperties properties,
HttpRequestHeaders headers) throws IOException {
super(context, errorReporter, settings, properties, headers);

this.errorReporter = errorReporter;

keyPrefix = "";
if(settings != null && settings.getKeyPrefix() != null) {
keyPrefix = settings.getKeyPrefix();
}

try {
field = ObjectAppendingMarker.class.getDeclaredField("object");
field.setAccessible(true);
} catch (NoSuchFieldException e) {
// message will be logged without object
errorReporter.logError("error in logging with object serialization", e);
}
}

protected void serializeCommonFields(JsonGenerator gen, ILoggingEvent event) throws IOException {
super.serializeCommonFields(gen, event);

if(event.getArgumentArray() != null) {
Object[] eventArgs = event.getArgumentArray();
for(Object eventArg:eventArgs) {
if(eventArg instanceof ObjectAppendingMarker) {
ObjectAppendingMarker marker = (ObjectAppendingMarker) eventArg;
if(field != null && settings != null && settings.isObjectSerialization() &&
marker.getFieldValue().toString().contains("@")) {
try {
Object obj = field.get(marker);
if(obj != null) {
gen.writeObjectField(keyPrefix + marker.getFieldName(), obj);
}
} catch (IllegalAccessException e) {
// message will be logged without object
errorReporter.logError("error in logging with object serialization", e);
}
}
else
gen.writeObjectField(keyPrefix + marker.getFieldName(), marker.getFieldValue());
}
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ public class Settings {
private int maxQueueSize = 100 * 1024 * 1024;
private Authentication authentication;
private int maxMessageSize = -1;
private String keyPrefix;
private boolean objectSerialization;

public String getIndex() {
return index;
Expand Down Expand Up @@ -162,4 +164,21 @@ public int getMaxMessageSize() {
public void setMaxMessageSize(int maxMessageSize) {
this.maxMessageSize = maxMessageSize;
}

public String getKeyPrefix() {
return this.keyPrefix;
}

public void setKeyPrefix(String keyPrefix) {
this.keyPrefix = keyPrefix;
}

public boolean isObjectSerialization() {
return objectSerialization;
}

public void setObjectSerialization(boolean objectSerialization) {
this.objectSerialization = objectSerialization;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@
import java.net.HttpURLConnection;
import java.util.Collection;
import java.util.Collections;
import java.util.zip.GZIPOutputStream;

import com.internetitem.logback.elasticsearch.config.HttpRequestHeader;
import com.internetitem.logback.elasticsearch.config.HttpRequestHeaders;
import com.internetitem.logback.elasticsearch.config.Settings;
import com.internetitem.logback.elasticsearch.util.ErrorReporter;
import org.apache.http.HttpHeaders;

public class ElasticsearchWriter implements SafeWriter {

Expand All @@ -23,6 +25,7 @@ public class ElasticsearchWriter implements SafeWriter {
private Collection<HttpRequestHeader> headerList;

private boolean bufferExceeded;
private boolean compressedTransfer;

public ElasticsearchWriter(ErrorReporter errorReporter, Settings settings, HttpRequestHeaders headers) {
this.errorReporter = errorReporter;
Expand All @@ -32,6 +35,13 @@ public ElasticsearchWriter(ErrorReporter errorReporter, Settings settings, HttpR
: Collections.<HttpRequestHeader>emptyList();

this.sendBuffer = new StringBuilder();
compressedTransfer = false;
for(HttpRequestHeader header : this.headerList) {
if(header.getName().toLowerCase().equals(HttpHeaders.CONTENT_ENCODING.toLowerCase()) && header.getValue().equals("gzip")) {
compressedTransfer = true;
break;
}
}
}

public void write(char[] cbuf, int off, int len) {
Expand Down Expand Up @@ -72,10 +82,7 @@ public void sendData() throws IOException {
settings.getAuthentication().addAuth(urlConnection, body);
}

Writer writer = new OutputStreamWriter(urlConnection.getOutputStream(), "UTF-8");
writer.write(body);
writer.flush();
writer.close();
writeData(urlConnection, body);

int rc = urlConnection.getResponseCode();
if (rc != 200) {
Expand All @@ -98,23 +105,37 @@ public boolean hasPendingData() {
}

private static String slurpErrors(HttpURLConnection urlConnection) {
try {
InputStream stream = urlConnection.getErrorStream();
try (InputStream stream = urlConnection.getErrorStream()) {
if (stream == null) {
return "<no data>";
}

StringBuilder builder = new StringBuilder();
InputStreamReader reader = new InputStreamReader(stream, "UTF-8");
char[] buf = new char[2048];
int numRead;
while ((numRead = reader.read(buf)) > 0) {
builder.append(buf, 0, numRead);
try(InputStreamReader reader = new InputStreamReader(stream, "UTF-8")) {
char[] buf = new char[2048];
int numRead;
while ((numRead = reader.read(buf)) > 0) {
builder.append(buf, 0, numRead);
}
}
return builder.toString();
} catch (Exception e) {
return "<error retrieving data: " + e.getMessage() + ">";
}
}

private void writeData(HttpURLConnection urlConnection, String body) throws IOException {
if(this.compressedTransfer) {
try(Writer writer = new OutputStreamWriter(new GZIPOutputStream(urlConnection.getOutputStream()), "UTF-8")) {
writer.write(body);
writer.flush();
}
} else {
try(Writer writer = new OutputStreamWriter(urlConnection.getOutputStream(), "UTF-8")) {
writer.write(body);
writer.flush();
}
}
}

}