diff --git a/README.md b/README.md
index 74be8fe..a060144 100644
--- a/README.md
+++ b/README.md
@@ -33,6 +33,7 @@ In your `logback.xml`:
false
104857600
3
+ 100
30000
250
false
@@ -105,6 +106,7 @@ Configuration Reference
* `errorsToStderr` (optional, default false): If set to `true`, any errors in communicating with Elasticsearch will also be dumped to stderr (normally they are only reported to the internal Logback Status system, in order to prevent a feedback loop)
* `logsToStderr` (optional, default false): If set to `true`, dump the raw Elasticsearch messages to stderr
* `maxQueueSize` (optional, default 104,857,600 = 200MB): Maximum size (in characters) of the send buffer. After this point, *logs will be dropped*. This should only happen if Elasticsearch is down, but this is a self-protection mechanism to ensure that the logging system doesn't cause the main process to run out of memory. Note that this maximum is approximate; once the maximum is hit, no new logs will be accepted until it shrinks, but any logs already accepted to be processed will still be added to the buffer
+ * `maxEvents` (optional, default -1 i.e. not limited): Maximum amount of logging events to be stored for later sending.
* `loggerName` (optional): If set, raw ES-formatted log data will be sent to this logger
* `errorLoggerName` (optional): If set, any internal errors or problems will be logged to this logger
* `rawJsonMessage` (optional, default false): If set to `true`, the log message is interpreted as pre-formatted raw JSON message.
diff --git a/src/main/java/com/internetitem/logback/elasticsearch/AbstractElasticsearchAppender.java b/src/main/java/com/internetitem/logback/elasticsearch/AbstractElasticsearchAppender.java
index f13c765..a232b23 100644
--- a/src/main/java/com/internetitem/logback/elasticsearch/AbstractElasticsearchAppender.java
+++ b/src/main/java/com/internetitem/logback/elasticsearch/AbstractElasticsearchAppender.java
@@ -137,4 +137,8 @@ public void setMaxMessageSize(int maxMessageSize) {
public void setEnableContextMap(boolean enableContextMap) {
settings.setEnableContextMap(enableContextMap);
}
+
+ public void setMaxEvents(int maxEvents) {
+ settings.setMaxEvents(maxEvents);
+ }
}
diff --git a/src/main/java/com/internetitem/logback/elasticsearch/AbstractElasticsearchPublisher.java b/src/main/java/com/internetitem/logback/elasticsearch/AbstractElasticsearchPublisher.java
index 1ec28ee..1bb5ae1 100644
--- a/src/main/java/com/internetitem/logback/elasticsearch/AbstractElasticsearchPublisher.java
+++ b/src/main/java/com/internetitem/logback/elasticsearch/AbstractElasticsearchPublisher.java
@@ -18,6 +18,7 @@
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
+import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
@@ -42,7 +43,7 @@ public abstract class AbstractElasticsearchPublisher implements Runnable {
public AbstractElasticsearchPublisher(Context context, ErrorReporter errorReporter, Settings settings, ElasticsearchProperties properties, HttpRequestHeaders headers) throws IOException {
this.errorReporter = errorReporter;
- this.events = new ArrayList<>();
+ this.events = new LinkedList<>();
this.lock = new Object();
this.settings = settings;
@@ -97,8 +98,14 @@ public void addEvent(T event) {
return;
}
+ int max = settings.getMaxEvents();
+
synchronized (lock) {
events.add(event);
+ if (max > 0 && events.size() > max) {
+ errorReporter.logWarning("Max events in queue reached - log messages will be lost until the queue is processed");
+ ((LinkedList)events).removeFirst();
+ }
if (!working) {
working = true;
Thread thread = new Thread(this, THREAD_NAME_PREFIX + THREAD_COUNTER.getAndIncrement());
@@ -118,7 +125,7 @@ public void run() {
synchronized (lock) {
if (!events.isEmpty()) {
eventsCopy = events;
- events = new ArrayList<>();
+ events = new LinkedList<>();
currentTry = 1;
}
@@ -185,4 +192,7 @@ private void serializeEvent(JsonGenerator gen, T event, List getEvents() {
+ return this.events;
+ }
}
diff --git a/src/main/java/com/internetitem/logback/elasticsearch/config/Settings.java b/src/main/java/com/internetitem/logback/elasticsearch/config/Settings.java
index 61c2e20..9bb524f 100644
--- a/src/main/java/com/internetitem/logback/elasticsearch/config/Settings.java
+++ b/src/main/java/com/internetitem/logback/elasticsearch/config/Settings.java
@@ -24,6 +24,7 @@ public class Settings {
private Authentication authentication;
private int maxMessageSize = -1;
private boolean enableContextMap;
+ private int maxEvents = -1;
public String getIndex() {
return index;
@@ -171,4 +172,12 @@ public boolean isEnableContextMap() {
public void setEnableContextMap(boolean enableContextMap) {
this.enableContextMap = enableContextMap;
}
+
+ public int getMaxEvents() {
+ return maxEvents;
+ }
+
+ public void setMaxEvents(int maxEvents) {
+ this.maxEvents = maxEvents;
+ }
}
diff --git a/src/test/java/com/internetitem/logback/elasticsearch/ElasticesearchPublisherTest.java b/src/test/java/com/internetitem/logback/elasticsearch/ElasticesearchPublisherTest.java
new file mode 100644
index 0000000..9324f61
--- /dev/null
+++ b/src/test/java/com/internetitem/logback/elasticsearch/ElasticesearchPublisherTest.java
@@ -0,0 +1,97 @@
+package com.internetitem.logback.elasticsearch;
+
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.Context;
+import com.internetitem.logback.elasticsearch.config.ElasticsearchProperties;
+import com.internetitem.logback.elasticsearch.config.Settings;
+import com.internetitem.logback.elasticsearch.util.ErrorReporter;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+import static org.mockito.BDDMockito.given;
+import static org.mockito.Mockito.mock;
+import static org.junit.Assert.assertEquals;
+
+public class ElasticesearchPublisherTest {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ElasticesearchPublisherTest.class);
+ private static final String LOGGER_NAME = "es-logger";
+ private static final int MAX_EVENTS = 100_000;
+ @Mock
+ private ClassicElasticsearchPublisher elasticsearchPublisher;
+ @Mock
+ private ElasticsearchProperties elasticsearchProperties;
+ @Mock
+ private Context mockedContext;
+
+ @Test
+ public void should_remove_the_right_element_if_max_elements_is_reached() throws IOException {
+ setupPublisher(MAX_EVENTS);
+
+ publishEvents(0);
+
+ int maxI = MAX_EVENTS;
+ assertEquals(String.format("Event count should be %s", MAX_EVENTS), MAX_EVENTS, elasticsearchPublisher.getEvents().size());
+ assertEquals("First event should have message 'Message 1'", "Message 1", elasticsearchPublisher.getEvents().get(0).getMessage());
+ assertEquals(String.format("Last event should have message 'Message %s'", maxI), "Message " + maxI, elasticsearchPublisher.getEvents().get(elasticsearchPublisher.getEvents().size() - 1).getMessage());
+
+ // Add one more event. First event (Message 1) should be removed from list
+ LOGGER.info("Publish another additional message.");
+ publishEvents(maxI);
+
+ maxI = maxI + MAX_EVENTS;
+ assertEquals(String.format("Event count should be %s", MAX_EVENTS), MAX_EVENTS, elasticsearchPublisher.getEvents().size());
+ assertEquals(String.format("First event should have message 'Message %s'", MAX_EVENTS + 1), "Message " + (MAX_EVENTS + 1), elasticsearchPublisher.getEvents().get(0).getMessage());
+ assertEquals(String.format("Last event should have message 'Message %s'", maxI), "Message " + maxI, elasticsearchPublisher.getEvents().get(elasticsearchPublisher.getEvents().size() - 1).getMessage());
+ }
+
+ @Test
+ public void should_not_remove_any_element_if_max_elements_is_not_set() throws IOException {
+ setupPublisher(-1);
+
+ publishEvents(0);
+
+ int maxI = MAX_EVENTS;
+ assertEquals(String.format("Event count should be %s", MAX_EVENTS), MAX_EVENTS, elasticsearchPublisher.getEvents().size());
+ assertEquals("First event should have message 'Message 1'", "Message 1", elasticsearchPublisher.getEvents().get(0).getMessage());
+ assertEquals(String.format("Last event should have message 'Message %s'", maxI), "Message " + maxI, elasticsearchPublisher.getEvents().get(elasticsearchPublisher.getEvents().size() - 1).getMessage());
+
+ // Add one more event. First event (Message 1) should be removed from list
+ LOGGER.info("Publish another additional message.");
+ publishEvents(maxI);
+
+ maxI = maxI + MAX_EVENTS;
+ assertEquals(String.format("Event count should be %s", maxI), maxI, elasticsearchPublisher.getEvents().size());
+ assertEquals(String.format("First event should have message 'Message %s'", 1), "Message 1", elasticsearchPublisher.getEvents().get(0).getMessage());
+ assertEquals(String.format("Last event should have message 'Message %s'", maxI), "Message " + maxI, elasticsearchPublisher.getEvents().get(elasticsearchPublisher.getEvents().size() - 1).getMessage());
+ }
+
+ private void setupPublisher(int maxEvents) throws IOException {
+ Settings settings = new Settings();
+ settings.setSleepTime(1000 * 60 * 60); // since we don't want to really publish the events
+ settings.setLoggerName(LOGGER_NAME);
+ settings.setMaxEvents(maxEvents);
+
+ ErrorReporter errorReporter = new ErrorReporter(settings, mockedContext);
+
+ elasticsearchPublisher = new ClassicElasticsearchPublisher(mockedContext, errorReporter, settings, elasticsearchProperties, null);
+ }
+
+ private void publishEvents(int offset) {
+ LOGGER.info("Try to publish {} events.", MAX_EVENTS);
+ long start = System.currentTimeMillis();
+ int maxI = offset + MAX_EVENTS;
+ for (int i = offset + 1; i <= maxI; i++) {
+ ILoggingEvent eventToPublish = mock(ILoggingEvent.class);
+ given(eventToPublish.getLoggerName()).willReturn(LOGGER_NAME);
+ given(eventToPublish.getMessage()).willReturn(String.format("Message %s", i));
+ elasticsearchPublisher.addEvent(eventToPublish);
+ }
+ LOGGER.info("Messages published. Time={}ms", System.currentTimeMillis() - start);
+ }
+}
diff --git a/src/test/java/com/internetitem/logback/elasticsearch/ElasticsearchAppenderTest.java b/src/test/java/com/internetitem/logback/elasticsearch/ElasticsearchAppenderTest.java
index cb15521..f775aa4 100644
--- a/src/test/java/com/internetitem/logback/elasticsearch/ElasticsearchAppenderTest.java
+++ b/src/test/java/com/internetitem/logback/elasticsearch/ElasticsearchAppenderTest.java
@@ -18,7 +18,7 @@
import java.net.URL;
import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertThat;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -171,7 +171,6 @@ public Context getContext() {
assertThat(errorReporter.getContext(), is(mockedContext));
}
-
@Test
public void should_delegate_setters_to_settings() throws MalformedURLException {
ElasticsearchAppender appender = new ElasticsearchAppender(settings);
@@ -190,6 +189,7 @@ public void should_delegate_setters_to_settings() throws MalformedURLException {
int readTimeout = 10000;
int connectTimeout = 5000;
boolean enableContextMap = true;
+ int maxEvents = 1000;
appender.setIncludeCallerData(includeCallerData);
appender.setSleepTime(aSleepTime);
@@ -207,6 +207,7 @@ public void should_delegate_setters_to_settings() throws MalformedURLException {
appender.setRawJsonMessage(rawJsonMessage);
appender.setIncludeMdc(includeMdc);
appender.setEnableContextMap(enableContextMap);
+ appender.setMaxEvents(maxEvents);
verify(settings, times(1)).setReadTimeout(readTimeout);
verify(settings, times(1)).setSleepTime(aSleepTime);
@@ -224,6 +225,7 @@ public void should_delegate_setters_to_settings() throws MalformedURLException {
verify(settings, times(1)).setRawJsonMessage(rawJsonMessage);
verify(settings, times(1)).setIncludeMdc(includeMdc);
verify(settings, times(1)).setEnableContextMap(enableContextMap);
+ verify(settings, times(1)).setMaxEvents(maxEvents);
}
diff --git a/src/test/java/com/internetitem/logback/elasticsearch/it/IntegrationTest.java b/src/test/java/com/internetitem/logback/elasticsearch/it/IntegrationTest.java
index 5c27be5..040b71e 100644
--- a/src/test/java/com/internetitem/logback/elasticsearch/it/IntegrationTest.java
+++ b/src/test/java/com/internetitem/logback/elasticsearch/it/IntegrationTest.java
@@ -32,7 +32,7 @@ public abstract class IntegrationTest {
private static final String INDEX = "log_entries";
private static final int WAIT_FOR_DOCUMENTS_MAX_RETRIES = 10;
- private static final int WAIT_FOR_DOCUMENTS_SLEEP_INTERVAL = 500;
+ private static final int WAIT_FOR_DOCUMENTS_SLEEP_INTERVAL = 1000;
protected static final String ELASTICSEARCH_LOGGER_NAME = "ES_LOGGER";
private static final String ELASTICSEARCH_APPENDER_NAME = "ES_APPENDER";
@@ -94,8 +94,8 @@ protected void checkLogEntries(long desiredCount) throws IOException {
}
}
- LOG.debug("Found {} documents. Desired count is {}. Retry...", hits.getTotalHits().value, desiredCount);
- assertEquals(desiredCount, hits.getTotalHits().value);
+ LOG.debug("Found {} documents. Desired count is {}.", hits.getTotalHits().value, desiredCount);
+ assertEquals(String.format("Document count should be %s", desiredCount), desiredCount, hits.getTotalHits().value);
}
private static void configureElasticSearchAppender() throws MalformedURLException {