Skip to content

Commit

Permalink
Manually include PR internetitem#45
Browse files Browse the repository at this point in the history
  • Loading branch information
cgoIT committed Jun 22, 2020
1 parent 6517bf9 commit f6bc5ac
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 7 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ In your `logback.xml`:
<logsToStderr>false</logsToStderr> <!-- optional (default false) -->
<maxQueueSize>104857600</maxQueueSize> <!-- optional (default 104857600) -->
<maxRetries>3</maxRetries> <!-- optional (default 3) -->
<maxEvents>100</maxEvents><!-- optional (default -1) -->
<readTimeout>30000</readTimeout> <!-- optional (in ms, default 30000) -->
<sleepTime>250</sleepTime> <!-- optional (in ms, default 250) -->
<rawJsonMessage>false</rawJsonMessage> <!-- optional (default false) -->
Expand Down Expand Up @@ -105,6 +106,7 @@ Configuration Reference
* `errorsToStderr` (optional, default false): If set to `true`, any errors in communicating with Elasticsearch will also be dumped to stderr (normally they are only reported to the internal Logback Status system, in order to prevent a feedback loop)
* `logsToStderr` (optional, default false): If set to `true`, dump the raw Elasticsearch messages to stderr
* `maxQueueSize` (optional, default 104,857,600 = 200MB): Maximum size (in characters) of the send buffer. After this point, *logs will be dropped*. This should only happen if Elasticsearch is down, but this is a self-protection mechanism to ensure that the logging system doesn't cause the main process to run out of memory. Note that this maximum is approximate; once the maximum is hit, no new logs will be accepted until it shrinks, but any logs already accepted to be processed will still be added to the buffer
* `maxEvents` (optional, default -1 i.e. not limited): Maximum amount of logging events to be stored for later sending.
* `loggerName` (optional): If set, raw ES-formatted log data will be sent to this logger
* `errorLoggerName` (optional): If set, any internal errors or problems will be logged to this logger
* `rawJsonMessage` (optional, default false): If set to `true`, the log message is interpreted as pre-formatted raw JSON message.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,4 +137,8 @@ public void setMaxMessageSize(int maxMessageSize) {
public void setEnableContextMap(boolean enableContextMap) {
settings.setEnableContextMap(enableContextMap);
}

public void setMaxEvents(int maxEvents) {
settings.setMaxEvents(maxEvents);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -42,7 +43,7 @@ public abstract class AbstractElasticsearchPublisher<T> implements Runnable {

public AbstractElasticsearchPublisher(Context context, ErrorReporter errorReporter, Settings settings, ElasticsearchProperties properties, HttpRequestHeaders headers) throws IOException {
this.errorReporter = errorReporter;
this.events = new ArrayList<>();
this.events = new LinkedList<>();
this.lock = new Object();
this.settings = settings;

Expand Down Expand Up @@ -97,8 +98,14 @@ public void addEvent(T event) {
return;
}

int max = settings.getMaxEvents();

synchronized (lock) {
events.add(event);
if (max > 0 && events.size() > max) {
errorReporter.logWarning("Max events in queue reached - log messages will be lost until the queue is processed");
((LinkedList<T>)events).removeFirst();
}
if (!working) {
working = true;
Thread thread = new Thread(this, THREAD_NAME_PREFIX + THREAD_COUNTER.getAndIncrement());
Expand All @@ -118,7 +125,7 @@ public void run() {
synchronized (lock) {
if (!events.isEmpty()) {
eventsCopy = events;
events = new ArrayList<>();
events = new LinkedList<>();
currentTry = 1;
}

Expand Down Expand Up @@ -185,4 +192,7 @@ private void serializeEvent(JsonGenerator gen, T event, List<AbstractPropertyAnd

protected abstract void serializeCommonFields(JsonGenerator gen, T event) throws IOException;

public List<T> getEvents() {
return this.events;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public class Settings {
private Authentication authentication;
private int maxMessageSize = -1;
private boolean enableContextMap;
private int maxEvents = -1;

public String getIndex() {
return index;
Expand Down Expand Up @@ -171,4 +172,12 @@ public boolean isEnableContextMap() {
public void setEnableContextMap(boolean enableContextMap) {
this.enableContextMap = enableContextMap;
}

public int getMaxEvents() {
return maxEvents;
}

public void setMaxEvents(int maxEvents) {
this.maxEvents = maxEvents;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package com.internetitem.logback.elasticsearch;

import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.Context;
import com.internetitem.logback.elasticsearch.config.ElasticsearchProperties;
import com.internetitem.logback.elasticsearch.config.Settings;
import com.internetitem.logback.elasticsearch.util.ErrorReporter;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;
import static org.junit.Assert.assertEquals;

public class ElasticesearchPublisherTest {

private static final Logger LOGGER = LoggerFactory.getLogger(ElasticesearchPublisherTest.class);
private static final String LOGGER_NAME = "es-logger";
private static final int MAX_EVENTS = 100_000;
@Mock
private ClassicElasticsearchPublisher elasticsearchPublisher;
@Mock
private ElasticsearchProperties elasticsearchProperties;
@Mock
private Context mockedContext;

@Test
public void should_remove_the_right_element_if_max_elements_is_reached() throws IOException {
setupPublisher(MAX_EVENTS);

publishEvents(0);

int maxI = MAX_EVENTS;
assertEquals(String.format("Event count should be %s", MAX_EVENTS), MAX_EVENTS, elasticsearchPublisher.getEvents().size());
assertEquals("First event should have message 'Message 1'", "Message 1", elasticsearchPublisher.getEvents().get(0).getMessage());
assertEquals(String.format("Last event should have message 'Message %s'", maxI), "Message " + maxI, elasticsearchPublisher.getEvents().get(elasticsearchPublisher.getEvents().size() - 1).getMessage());

// Add one more event. First event (Message 1) should be removed from list
LOGGER.info("Publish another additional message.");
publishEvents(maxI);

maxI = maxI + MAX_EVENTS;
assertEquals(String.format("Event count should be %s", MAX_EVENTS), MAX_EVENTS, elasticsearchPublisher.getEvents().size());
assertEquals(String.format("First event should have message 'Message %s'", MAX_EVENTS + 1), "Message " + (MAX_EVENTS + 1), elasticsearchPublisher.getEvents().get(0).getMessage());
assertEquals(String.format("Last event should have message 'Message %s'", maxI), "Message " + maxI, elasticsearchPublisher.getEvents().get(elasticsearchPublisher.getEvents().size() - 1).getMessage());
}

@Test
public void should_not_remove_any_element_if_max_elements_is_not_set() throws IOException {
setupPublisher(-1);

publishEvents(0);

int maxI = MAX_EVENTS;
assertEquals(String.format("Event count should be %s", MAX_EVENTS), MAX_EVENTS, elasticsearchPublisher.getEvents().size());
assertEquals("First event should have message 'Message 1'", "Message 1", elasticsearchPublisher.getEvents().get(0).getMessage());
assertEquals(String.format("Last event should have message 'Message %s'", maxI), "Message " + maxI, elasticsearchPublisher.getEvents().get(elasticsearchPublisher.getEvents().size() - 1).getMessage());

// Add one more event. First event (Message 1) should be removed from list
LOGGER.info("Publish another additional message.");
publishEvents(maxI);

maxI = maxI + MAX_EVENTS;
assertEquals(String.format("Event count should be %s", maxI), maxI, elasticsearchPublisher.getEvents().size());
assertEquals(String.format("First event should have message 'Message %s'", 1), "Message 1", elasticsearchPublisher.getEvents().get(0).getMessage());
assertEquals(String.format("Last event should have message 'Message %s'", maxI), "Message " + maxI, elasticsearchPublisher.getEvents().get(elasticsearchPublisher.getEvents().size() - 1).getMessage());
}

private void setupPublisher(int maxEvents) throws IOException {
Settings settings = new Settings();
settings.setSleepTime(1000 * 60 * 60); // since we don't want to really publish the events
settings.setLoggerName(LOGGER_NAME);
settings.setMaxEvents(maxEvents);

ErrorReporter errorReporter = new ErrorReporter(settings, mockedContext);

elasticsearchPublisher = new ClassicElasticsearchPublisher(mockedContext, errorReporter, settings, elasticsearchProperties, null);
}

private void publishEvents(int offset) {
LOGGER.info("Try to publish {} events.", MAX_EVENTS);
long start = System.currentTimeMillis();
int maxI = offset + MAX_EVENTS;
for (int i = offset + 1; i <= maxI; i++) {
ILoggingEvent eventToPublish = mock(ILoggingEvent.class);
given(eventToPublish.getLoggerName()).willReturn(LOGGER_NAME);
given(eventToPublish.getMessage()).willReturn(String.format("Message %s", i));
elasticsearchPublisher.addEvent(eventToPublish);
}
LOGGER.info("Messages published. Time={}ms", System.currentTimeMillis() - start);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import java.net.URL;

import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -171,7 +171,6 @@ public Context getContext() {
assertThat(errorReporter.getContext(), is(mockedContext));
}


@Test
public void should_delegate_setters_to_settings() throws MalformedURLException {
ElasticsearchAppender appender = new ElasticsearchAppender(settings);
Expand All @@ -190,6 +189,7 @@ public void should_delegate_setters_to_settings() throws MalformedURLException {
int readTimeout = 10000;
int connectTimeout = 5000;
boolean enableContextMap = true;
int maxEvents = 1000;

appender.setIncludeCallerData(includeCallerData);
appender.setSleepTime(aSleepTime);
Expand All @@ -207,6 +207,7 @@ public void should_delegate_setters_to_settings() throws MalformedURLException {
appender.setRawJsonMessage(rawJsonMessage);
appender.setIncludeMdc(includeMdc);
appender.setEnableContextMap(enableContextMap);
appender.setMaxEvents(maxEvents);

verify(settings, times(1)).setReadTimeout(readTimeout);
verify(settings, times(1)).setSleepTime(aSleepTime);
Expand All @@ -224,6 +225,7 @@ public void should_delegate_setters_to_settings() throws MalformedURLException {
verify(settings, times(1)).setRawJsonMessage(rawJsonMessage);
verify(settings, times(1)).setIncludeMdc(includeMdc);
verify(settings, times(1)).setEnableContextMap(enableContextMap);
verify(settings, times(1)).setMaxEvents(maxEvents);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public abstract class IntegrationTest {

private static final String INDEX = "log_entries";
private static final int WAIT_FOR_DOCUMENTS_MAX_RETRIES = 10;
private static final int WAIT_FOR_DOCUMENTS_SLEEP_INTERVAL = 500;
private static final int WAIT_FOR_DOCUMENTS_SLEEP_INTERVAL = 1000;
protected static final String ELASTICSEARCH_LOGGER_NAME = "ES_LOGGER";
private static final String ELASTICSEARCH_APPENDER_NAME = "ES_APPENDER";

Expand Down Expand Up @@ -94,8 +94,8 @@ protected void checkLogEntries(long desiredCount) throws IOException {
}
}

LOG.debug("Found {} documents. Desired count is {}. Retry...", hits.getTotalHits().value, desiredCount);
assertEquals(desiredCount, hits.getTotalHits().value);
LOG.debug("Found {} documents. Desired count is {}.", hits.getTotalHits().value, desiredCount);
assertEquals(String.format("Document count should be %s", desiredCount), desiredCount, hits.getTotalHits().value);
}

private static void configureElasticSearchAppender() throws MalformedURLException {
Expand Down

0 comments on commit f6bc5ac

Please sign in to comment.