Skip to content

Commit

Permalink
Set no offsets on store custom (#518)
Browse files Browse the repository at this point in the history
* Remove unused code from s3 connector
* Make store_custom in s3, OS and ES connector set no offsets
* Update changelog
  • Loading branch information
ppcad authored Feb 5, 2024
1 parent 9e19caa commit 87e9e21
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 48 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
* make the s3 connector raise `FatalOutputError` instead of warnings
* make the s3 connector blocking by removing threading
* revert the change from v9.0.0 to always check the existence of a field for negated key-value based lucene filter expressions
* make store_custom in s3, opensearch and elasticsearch connector not call `batch_finished_callback` to prevent data loss that could be caused by partially processed events

### Bugfix

Expand Down
23 changes: 12 additions & 11 deletions logprep/connector/elasticsearch/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,16 @@ def store(self, document: dict):

self._add_dates(document)
self.metrics.number_of_processed_events += 1
self._write_to_search_context(document)
self._message_backlog.append(document)
self._write_to_search_context()

def store_custom(self, document: dict, target: str):
"""Write document to Elasticsearch into the target index.
"""Store document into backlog to be written into Elasticsearch with the target index.
Only add to backlog instead of writing the batch and calling batch_finished_callback,
since store_custom can be called before the event has been fully processed.
Setting the offset or comiting before fully processing an event can lead to data loss if
Logprep terminates.
Parameters
----------
Expand All @@ -232,7 +238,7 @@ def store_custom(self, document: dict, target: str):
document["_index"] = target
self._add_dates(document)
self.metrics.number_of_processed_events += 1
self._write_to_search_context(document)
self._message_backlog.append(document)

def store_failed(self, error_message: str, document_received: dict, document_processed: dict):
"""Write errors into error topic for documents that failed processing.
Expand All @@ -256,7 +262,8 @@ def store_failed(self, error_message: str, document_received: dict, document_pro
"_index": self._config.error_index,
}
self._add_dates(error_document)
self._write_to_search_context(error_document)
self._message_backlog.append(error_document)
self._write_to_search_context()

def _build_failed_index_document(self, message_document: dict, reason: str):
document = {
Expand All @@ -278,25 +285,19 @@ def _add_dates(self, document):
formatted_date = now.strftime(date_format_match[2:-1])
document["_index"] = re.sub(date_format_match, formatted_date, document["_index"])

def _write_to_search_context(self, document):
def _write_to_search_context(self):
"""Writes documents from a buffer into Elasticsearch indices.
Writes documents in a bulk if the document buffer limit has been reached.
This reduces connections to Elasticsearch.
The target index is determined per document by the value of the meta field '_index'.
A configured default index is used if '_index' hasn't been set.
Parameters
----------
document : dict
Document to store.
Returns
-------
Returns True to inform the pipeline to call the batch_finished_callback method in the
configured input
"""
self._message_backlog.append(document)
if len(self._message_backlog) >= self._config.message_backlog_size:
self._write_backlog()

Expand Down
34 changes: 19 additions & 15 deletions logprep/connector/s3/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ class Metrics(Output.Metrics):
def __init__(self, name: str, configuration: "S3Output.Config", logger: Logger):
super().__init__(name, configuration, logger)
self._message_backlog = defaultdict(list)
self._writing_thread = None
self._base_prefix = f"{self._config.base_prefix}/" if self._config.base_prefix else ""
self._s3_resource = None
self._setup_s3_resource()
Expand Down Expand Up @@ -195,29 +194,28 @@ def _add_dates(self, prefix):
return prefix

@Metric.measure_time()
def _write_to_s3_resource(self, document: dict, prefix: str):
"""Writes a document into s3 bucket using given prefix.
def _write_to_s3_resource(self):
"""Writes a document into s3 bucket using given prefix."""
if self._backlog_size >= self._config.message_backlog_size:
self._write_backlog()

def _add_to_backlog(self, document: dict, prefix: str):
"""Adds document to backlog and adds a a prefix.
Parameters
----------
document : dict
Document to store.
Document to store in backlog.
"""
prefix = self._add_dates(prefix)
prefix = f"{self._base_prefix}{prefix}"
self._message_backlog[prefix].append(document)

if self._backlog_size >= self._config.message_backlog_size:
self._write_backlog()

def _write_backlog(self):
"""Write to s3 if it is not already writing."""
if not self._message_backlog:
return

self._bulk()

def _bulk(self):
self._logger.info("Writing %s documents to s3", self._backlog_size)
for prefix_mb, document_batch in self._message_backlog.items():
self._write_document_batch(document_batch, f"{prefix_mb}/{time()}-{uuid4()}")
Expand Down Expand Up @@ -264,8 +262,8 @@ def store(self, document: dict):
document, f"Prefix field '{self._config.prefix_field}' empty or missing in document"
)
prefix_value = self._config.default_prefix

self._write_to_s3_resource(document, prefix_value)
self._add_to_backlog(document, prefix_value)
self._write_to_s3_resource()

@staticmethod
def _build_no_prefix_document(message_document: dict, reason: str):
Expand All @@ -280,7 +278,12 @@ def _build_no_prefix_document(message_document: dict, reason: str):
return document

def store_custom(self, document: dict, target: str):
"""Write document into s3 bucket using the target prefix.
"""Store document into backlog to be written into s3 bucket using the target prefix.
Only add to backlog instead of writing the batch and calling batch_finished_callback,
since store_custom can be called before the event has been fully processed.
Setting the offset or comiting before fully processing an event can lead to data loss if
Logprep terminates.
Parameters
----------
Expand All @@ -291,7 +294,7 @@ def store_custom(self, document: dict, target: str):
"""
self.metrics.number_of_processed_events += 1
self._write_to_s3_resource(document, target)
self._add_to_backlog(document, target)

def store_failed(self, error_message: str, document_received: dict, document_processed: dict):
"""Write errors into s3 bucket using error prefix for documents that failed processing.
Expand All @@ -313,4 +316,5 @@ def store_failed(self, error_message: str, document_received: dict, document_pro
"processed": document_processed,
"@timestamp": TimeParser.now().isoformat(),
}
self._write_to_s3_resource(error_document, self._config.error_prefix)
self._add_to_backlog(error_document, self._config.error_prefix)
self._write_to_s3_resource()
9 changes: 6 additions & 3 deletions tests/unit/connector/test_elasticsearch_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ def test_write_to_search_context_calls_handle_serialization_error_if_serializati
):
self.object._config.message_backlog_size = 1
self.object._handle_serialization_error = mock.MagicMock()
self.object._write_to_search_context({"dummy": "event"})
self.object._message_backlog.append({"dummy": "event"})
self.object._write_to_search_context()
self.object._handle_serialization_error.assert_called()

@mock.patch(
Expand All @@ -148,7 +149,8 @@ def test_write_to_search_context_calls_handle_serialization_error_if_serializati
def test_write_to_search_context_calls_handle_connection_error_if_connection_error(self, _):
self.object._config.message_backlog_size = 1
self.object._handle_connection_error = mock.MagicMock()
self.object._write_to_search_context({"dummy": "event"})
self.object._message_backlog.append({"dummy": "event"})
self.object._write_to_search_context()
self.object._handle_connection_error.assert_called()

@mock.patch(
Expand All @@ -158,7 +160,8 @@ def test_write_to_search_context_calls_handle_connection_error_if_connection_err
def test_write_to_search_context_calls_handle_bulk_index_error_if_bulk_index_error(self, _):
self.object._config.message_backlog_size = 1
self.object._handle_bulk_index_error = mock.MagicMock()
self.object._write_to_search_context({"dummy": "event"})
self.object._message_backlog.append({"dummy": "event"})
self.object._write_to_search_context()
self.object._handle_bulk_index_error.assert_called()

@mock.patch("elasticsearch.helpers.bulk")
Expand Down
9 changes: 6 additions & 3 deletions tests/unit/connector/test_opensearch_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ def test_write_to_search_context_calls_handle_serialization_error_if_serializati
):
self.object._config.message_backlog_size = 1
self.object._handle_serialization_error = mock.MagicMock()
self.object._write_to_search_context({"dummy": "event"})
self.object._message_backlog.append({"dummy": "event"})
self.object._write_to_search_context()
self.object._handle_serialization_error.assert_called()

@mock.patch(
Expand All @@ -162,7 +163,8 @@ def test_write_to_search_context_calls_handle_serialization_error_if_serializati
def test_write_to_search_context_calls_handle_connection_error_if_connection_error(self, _):
self.object._config.message_backlog_size = 1
self.object._handle_connection_error = mock.MagicMock()
self.object._write_to_search_context({"dummy": "event"})
self.object._message_backlog.append({"dummy": "event"})
self.object._write_to_search_context()
self.object._handle_connection_error.assert_called()

@mock.patch(
Expand All @@ -172,7 +174,8 @@ def test_write_to_search_context_calls_handle_connection_error_if_connection_err
def test_write_to_search_context_calls_handle_bulk_index_error_if_bulk_index_error(self, _):
self.object._config.message_backlog_size = 1
self.object._handle_bulk_index_error = mock.MagicMock()
self.object._write_to_search_context({"dummy": "event"})
self.object._message_backlog.append({"dummy": "event"})
self.object._write_to_search_context()
self.object._handle_bulk_index_error.assert_called()

@mock.patch("opensearchpy.helpers.parallel_bulk")
Expand Down
28 changes: 12 additions & 16 deletions tests/unit/connector/test_s3_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,8 @@ def test_write_to_s3_resource_sets_current_backlog_count_and_below_max_backlog(s
s3_output = Factory.create({"s3": s3_config}, self.logger)
assert self._calculate_backlog_size(s3_output) == 0
for idx in range(1, message_backlog_size):
s3_output._write_to_s3_resource({"dummy": "event"}, "write_to_s3")
s3_output._add_to_backlog({"dummy": "event"}, "write_to_s3")
s3_output._write_to_s3_resource()
assert self._calculate_backlog_size(s3_output) == idx

def test_write_to_s3_resource_sets_current_backlog_count_and_is_max_backlog(self):
Expand All @@ -205,27 +206,27 @@ def test_write_to_s3_resource_sets_current_backlog_count_and_is_max_backlog(self

# Backlog not full
for idx in range(message_backlog_size - 1):
s3_output._write_to_s3_resource({"dummy": "event"}, "write_to_s3")
self._wait_for_writing_thread(s3_output)
s3_output._add_to_backlog({"dummy": "event"}, "write_to_s3")
s3_output._write_to_s3_resource()
assert self._calculate_backlog_size(s3_output) == idx + 1
s3_output._write_document_batch.assert_not_called()

# Backlog full then cleared
s3_output._write_to_s3_resource({"dummy": "event"}, "write_to_s3")
self._wait_for_writing_thread(s3_output)
s3_output._add_to_backlog({"dummy": "event"}, "write_to_s3")
s3_output._write_to_s3_resource()
s3_output._write_document_batch.assert_called_once()
assert self._calculate_backlog_size(s3_output) == 0

# Backlog not full
for idx in range(message_backlog_size - 1):
s3_output._write_to_s3_resource({"dummy": "event"}, "write_to_s3")
self._wait_for_writing_thread(s3_output)
s3_output._add_to_backlog({"dummy": "event"}, "write_to_s3")
s3_output._write_to_s3_resource()
assert self._calculate_backlog_size(s3_output) == idx + 1
s3_output._write_document_batch.assert_called_once()

# Backlog full then cleared
s3_output._write_to_s3_resource({"dummy": "event"}, "write_to_s3")
self._wait_for_writing_thread(s3_output)
s3_output._add_to_backlog({"dummy": "event"}, "write_to_s3")
s3_output._write_to_s3_resource()
assert s3_output._write_document_batch.call_count == 2
assert self._calculate_backlog_size(s3_output) == 0

Expand All @@ -237,7 +238,6 @@ def test_store_calls_batch_finished_callback(self):
self.object._s3_resource = mock.MagicMock()
self.object.input_connector = mock.MagicMock()
self.object.store({"message": "my event message"})
self._wait_for_writing_thread(self.object)
self.object.input_connector.batch_finished_callback.assert_called()

def test_store_does_not_call_batch_finished_callback_if_disabled(self):
Expand All @@ -252,7 +252,8 @@ def test_store_does_not_call_batch_finished_callback_if_disabled(self):
def test_write_to_s3_resource_replaces_dates(self):
expected_prefix = f'base_prefix/prefix-{TimeParser.now().strftime("%y:%m:%d")}'
self.object._write_backlog = mock.MagicMock()
self.object._write_to_s3_resource({"foo": "bar"}, "base_prefix/prefix-%{%y:%m:%d}")
self.object._add_to_backlog({"foo": "bar"}, "base_prefix/prefix-%{%y:%m:%d}")
self.object._write_to_s3_resource()
resulting_prefix = next(iter(self.object._message_backlog.keys()))

assert expected_prefix == resulting_prefix
Expand All @@ -270,11 +271,6 @@ def test_store_failed_counts_failed_events(self):
self.object._write_backlog = mock.MagicMock()
super().test_store_failed_counts_failed_events()

@staticmethod
def _wait_for_writing_thread(s3_output):
if s3_output._writing_thread is not None:
s3_output._writing_thread.join()

@staticmethod
def _calculate_backlog_size(s3_output):
return sum(len(values) for values in s3_output._message_backlog.values())

0 comments on commit 87e9e21

Please sign in to comment.