From 87e9e21a2be434be3ed6094b762f85f40d729bd8 Mon Sep 17 00:00:00 2001
From: ppcad <45867125+ppcad@users.noreply.github.com>
Date: Mon, 5 Feb 2024 07:51:48 +0100
Subject: [PATCH] Set no offsets on store custom (#518)

* Remove unused code from s3 connector
* Make store_custom in s3, OS and ES connector set no offsets
* Update changelog
---
 CHANGELOG.md                                  |  1 +
 logprep/connector/elasticsearch/output.py     | 23 +++++++------
 logprep/connector/s3/output.py                | 34 +++++++++++--------
 .../connector/test_elasticsearch_output.py    |  9 +++--
 .../unit/connector/test_opensearch_output.py  |  9 +++--
 tests/unit/connector/test_s3_output.py        | 28 +++++++--------
 6 files changed, 56 insertions(+), 48 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 0836533a7..c675b8312 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -19,6 +19,7 @@
 * make the s3 connector raise `FatalOutputError` instead of warnings
 * make the s3 connector blocking by removing threading
 * revert the change from v9.0.0 to always check the existence of a field for negated key-value based lucene filter expressions
+* make store_custom in s3, opensearch and elasticsearch connector not call `batch_finished_callback` to prevent data loss that could be caused by partially processed events
 
 ### Bugfix
 
diff --git a/logprep/connector/elasticsearch/output.py b/logprep/connector/elasticsearch/output.py
index 47992c0ca..4d2972ac9 100644
--- a/logprep/connector/elasticsearch/output.py
+++ b/logprep/connector/elasticsearch/output.py
@@ -212,10 +212,16 @@ def store(self, document: dict):
 
         self._add_dates(document)
         self.metrics.number_of_processed_events += 1
-        self._write_to_search_context(document)
+        self._message_backlog.append(document)
+        self._write_to_search_context()
 
     def store_custom(self, document: dict, target: str):
-        """Write document to Elasticsearch into the target index.
+        """Store document into backlog to be written into Elasticsearch with the target index.
+
+        Only add to backlog instead of writing the batch and calling batch_finished_callback,
+        since store_custom can be called before the event has been fully processed.
+        Setting the offset or comiting before fully processing an event can lead to data loss if
+        Logprep terminates.
 
         Parameters
         ----------
@@ -232,7 +238,7 @@ def store_custom(self, document: dict, target: str):
         document["_index"] = target
         self._add_dates(document)
         self.metrics.number_of_processed_events += 1
-        self._write_to_search_context(document)
+        self._message_backlog.append(document)
 
     def store_failed(self, error_message: str, document_received: dict, document_processed: dict):
         """Write errors into error topic for documents that failed processing.
@@ -256,7 +262,8 @@ def store_failed(self, error_message: str, document_received: dict, document_pro
             "_index": self._config.error_index,
         }
         self._add_dates(error_document)
-        self._write_to_search_context(error_document)
+        self._message_backlog.append(error_document)
+        self._write_to_search_context()
 
     def _build_failed_index_document(self, message_document: dict, reason: str):
         document = {
@@ -278,7 +285,7 @@ def _add_dates(self, document):
                 formatted_date = now.strftime(date_format_match[2:-1])
                 document["_index"] = re.sub(date_format_match, formatted_date, document["_index"])
 
-    def _write_to_search_context(self, document):
+    def _write_to_search_context(self):
         """Writes documents from a buffer into Elasticsearch indices.
 
         Writes documents in a bulk if the document buffer limit has been reached.
@@ -286,17 +293,11 @@ def _write_to_search_context(self, document):
         The target index is determined per document by the value of the meta field '_index'.
         A configured default index is used if '_index' hasn't been set.
 
-        Parameters
-        ----------
-        document : dict
-           Document to store.
-
         Returns
         -------
         Returns True to inform the pipeline to call the batch_finished_callback method in the
         configured input
         """
-        self._message_backlog.append(document)
         if len(self._message_backlog) >= self._config.message_backlog_size:
             self._write_backlog()
 
diff --git a/logprep/connector/s3/output.py b/logprep/connector/s3/output.py
index b0ca34127..2440baf4a 100644
--- a/logprep/connector/s3/output.py
+++ b/logprep/connector/s3/output.py
@@ -137,7 +137,6 @@ class Metrics(Output.Metrics):
     def __init__(self, name: str, configuration: "S3Output.Config", logger: Logger):
         super().__init__(name, configuration, logger)
         self._message_backlog = defaultdict(list)
-        self._writing_thread = None
         self._base_prefix = f"{self._config.base_prefix}/" if self._config.base_prefix else ""
         self._s3_resource = None
         self._setup_s3_resource()
@@ -195,29 +194,28 @@ def _add_dates(self, prefix):
         return prefix
 
     @Metric.measure_time()
-    def _write_to_s3_resource(self, document: dict, prefix: str):
-        """Writes a document into s3 bucket using given prefix.
+    def _write_to_s3_resource(self):
+        """Writes a document into s3 bucket using given prefix."""
+        if self._backlog_size >= self._config.message_backlog_size:
+            self._write_backlog()
+
+    def _add_to_backlog(self, document: dict, prefix: str):
+        """Adds document to backlog and adds a a prefix.
 
         Parameters
         ----------
         document : dict
-           Document to store.
+           Document to store in backlog.
         """
         prefix = self._add_dates(prefix)
         prefix = f"{self._base_prefix}{prefix}"
         self._message_backlog[prefix].append(document)
 
-        if self._backlog_size >= self._config.message_backlog_size:
-            self._write_backlog()
-
     def _write_backlog(self):
         """Write to s3 if it is not already writing."""
         if not self._message_backlog:
             return
 
-        self._bulk()
-
-    def _bulk(self):
         self._logger.info("Writing %s documents to s3", self._backlog_size)
         for prefix_mb, document_batch in self._message_backlog.items():
             self._write_document_batch(document_batch, f"{prefix_mb}/{time()}-{uuid4()}")
@@ -264,8 +262,8 @@ def store(self, document: dict):
                 document, f"Prefix field '{self._config.prefix_field}' empty or missing in document"
             )
             prefix_value = self._config.default_prefix
-
-        self._write_to_s3_resource(document, prefix_value)
+        self._add_to_backlog(document, prefix_value)
+        self._write_to_s3_resource()
 
     @staticmethod
     def _build_no_prefix_document(message_document: dict, reason: str):
@@ -280,7 +278,12 @@ def _build_no_prefix_document(message_document: dict, reason: str):
         return document
 
     def store_custom(self, document: dict, target: str):
-        """Write document into s3 bucket using the target prefix.
+        """Store document into backlog to be written into s3 bucket using the target prefix.
+
+        Only add to backlog instead of writing the batch and calling batch_finished_callback,
+        since store_custom can be called before the event has been fully processed.
+        Setting the offset or comiting before fully processing an event can lead to data loss if
+        Logprep terminates.
 
         Parameters
         ----------
@@ -291,7 +294,7 @@ def store_custom(self, document: dict, target: str):
 
         """
         self.metrics.number_of_processed_events += 1
-        self._write_to_s3_resource(document, target)
+        self._add_to_backlog(document, target)
 
     def store_failed(self, error_message: str, document_received: dict, document_processed: dict):
         """Write errors into s3 bucket using error prefix for documents that failed processing.
@@ -313,4 +316,5 @@ def store_failed(self, error_message: str, document_received: dict, document_pro
             "processed": document_processed,
             "@timestamp": TimeParser.now().isoformat(),
         }
-        self._write_to_s3_resource(error_document, self._config.error_prefix)
+        self._add_to_backlog(error_document, self._config.error_prefix)
+        self._write_to_s3_resource()
diff --git a/tests/unit/connector/test_elasticsearch_output.py b/tests/unit/connector/test_elasticsearch_output.py
index 3ea43e60d..1d3c36bea 100644
--- a/tests/unit/connector/test_elasticsearch_output.py
+++ b/tests/unit/connector/test_elasticsearch_output.py
@@ -138,7 +138,8 @@ def test_write_to_search_context_calls_handle_serialization_error_if_serializati
     ):
         self.object._config.message_backlog_size = 1
         self.object._handle_serialization_error = mock.MagicMock()
-        self.object._write_to_search_context({"dummy": "event"})
+        self.object._message_backlog.append({"dummy": "event"})
+        self.object._write_to_search_context()
         self.object._handle_serialization_error.assert_called()
 
     @mock.patch(
@@ -148,7 +149,8 @@ def test_write_to_search_context_calls_handle_serialization_error_if_serializati
     def test_write_to_search_context_calls_handle_connection_error_if_connection_error(self, _):
         self.object._config.message_backlog_size = 1
         self.object._handle_connection_error = mock.MagicMock()
-        self.object._write_to_search_context({"dummy": "event"})
+        self.object._message_backlog.append({"dummy": "event"})
+        self.object._write_to_search_context()
         self.object._handle_connection_error.assert_called()
 
     @mock.patch(
@@ -158,7 +160,8 @@ def test_write_to_search_context_calls_handle_connection_error_if_connection_err
     def test_write_to_search_context_calls_handle_bulk_index_error_if_bulk_index_error(self, _):
         self.object._config.message_backlog_size = 1
         self.object._handle_bulk_index_error = mock.MagicMock()
-        self.object._write_to_search_context({"dummy": "event"})
+        self.object._message_backlog.append({"dummy": "event"})
+        self.object._write_to_search_context()
         self.object._handle_bulk_index_error.assert_called()
 
     @mock.patch("elasticsearch.helpers.bulk")
diff --git a/tests/unit/connector/test_opensearch_output.py b/tests/unit/connector/test_opensearch_output.py
index dce0cb178..8550a5286 100644
--- a/tests/unit/connector/test_opensearch_output.py
+++ b/tests/unit/connector/test_opensearch_output.py
@@ -152,7 +152,8 @@ def test_write_to_search_context_calls_handle_serialization_error_if_serializati
     ):
         self.object._config.message_backlog_size = 1
         self.object._handle_serialization_error = mock.MagicMock()
-        self.object._write_to_search_context({"dummy": "event"})
+        self.object._message_backlog.append({"dummy": "event"})
+        self.object._write_to_search_context()
         self.object._handle_serialization_error.assert_called()
 
     @mock.patch(
@@ -162,7 +163,8 @@ def test_write_to_search_context_calls_handle_serialization_error_if_serializati
     def test_write_to_search_context_calls_handle_connection_error_if_connection_error(self, _):
         self.object._config.message_backlog_size = 1
         self.object._handle_connection_error = mock.MagicMock()
-        self.object._write_to_search_context({"dummy": "event"})
+        self.object._message_backlog.append({"dummy": "event"})
+        self.object._write_to_search_context()
         self.object._handle_connection_error.assert_called()
 
     @mock.patch(
@@ -172,7 +174,8 @@ def test_write_to_search_context_calls_handle_connection_error_if_connection_err
     def test_write_to_search_context_calls_handle_bulk_index_error_if_bulk_index_error(self, _):
         self.object._config.message_backlog_size = 1
         self.object._handle_bulk_index_error = mock.MagicMock()
-        self.object._write_to_search_context({"dummy": "event"})
+        self.object._message_backlog.append({"dummy": "event"})
+        self.object._write_to_search_context()
         self.object._handle_bulk_index_error.assert_called()
 
     @mock.patch("opensearchpy.helpers.parallel_bulk")
diff --git a/tests/unit/connector/test_s3_output.py b/tests/unit/connector/test_s3_output.py
index c0c4a6776..301e34582 100644
--- a/tests/unit/connector/test_s3_output.py
+++ b/tests/unit/connector/test_s3_output.py
@@ -191,7 +191,8 @@ def test_write_to_s3_resource_sets_current_backlog_count_and_below_max_backlog(s
         s3_output = Factory.create({"s3": s3_config}, self.logger)
         assert self._calculate_backlog_size(s3_output) == 0
         for idx in range(1, message_backlog_size):
-            s3_output._write_to_s3_resource({"dummy": "event"}, "write_to_s3")
+            s3_output._add_to_backlog({"dummy": "event"}, "write_to_s3")
+            s3_output._write_to_s3_resource()
             assert self._calculate_backlog_size(s3_output) == idx
 
     def test_write_to_s3_resource_sets_current_backlog_count_and_is_max_backlog(self):
@@ -205,27 +206,27 @@ def test_write_to_s3_resource_sets_current_backlog_count_and_is_max_backlog(self
 
         # Backlog not full
         for idx in range(message_backlog_size - 1):
-            s3_output._write_to_s3_resource({"dummy": "event"}, "write_to_s3")
-            self._wait_for_writing_thread(s3_output)
+            s3_output._add_to_backlog({"dummy": "event"}, "write_to_s3")
+            s3_output._write_to_s3_resource()
             assert self._calculate_backlog_size(s3_output) == idx + 1
         s3_output._write_document_batch.assert_not_called()
 
         # Backlog full then cleared
-        s3_output._write_to_s3_resource({"dummy": "event"}, "write_to_s3")
-        self._wait_for_writing_thread(s3_output)
+        s3_output._add_to_backlog({"dummy": "event"}, "write_to_s3")
+        s3_output._write_to_s3_resource()
         s3_output._write_document_batch.assert_called_once()
         assert self._calculate_backlog_size(s3_output) == 0
 
         # Backlog not full
         for idx in range(message_backlog_size - 1):
-            s3_output._write_to_s3_resource({"dummy": "event"}, "write_to_s3")
-            self._wait_for_writing_thread(s3_output)
+            s3_output._add_to_backlog({"dummy": "event"}, "write_to_s3")
+            s3_output._write_to_s3_resource()
             assert self._calculate_backlog_size(s3_output) == idx + 1
         s3_output._write_document_batch.assert_called_once()
 
         # Backlog full then cleared
-        s3_output._write_to_s3_resource({"dummy": "event"}, "write_to_s3")
-        self._wait_for_writing_thread(s3_output)
+        s3_output._add_to_backlog({"dummy": "event"}, "write_to_s3")
+        s3_output._write_to_s3_resource()
         assert s3_output._write_document_batch.call_count == 2
         assert self._calculate_backlog_size(s3_output) == 0
 
@@ -237,7 +238,6 @@ def test_store_calls_batch_finished_callback(self):
         self.object._s3_resource = mock.MagicMock()
         self.object.input_connector = mock.MagicMock()
         self.object.store({"message": "my event message"})
-        self._wait_for_writing_thread(self.object)
         self.object.input_connector.batch_finished_callback.assert_called()
 
     def test_store_does_not_call_batch_finished_callback_if_disabled(self):
@@ -252,7 +252,8 @@ def test_store_does_not_call_batch_finished_callback_if_disabled(self):
     def test_write_to_s3_resource_replaces_dates(self):
         expected_prefix = f'base_prefix/prefix-{TimeParser.now().strftime("%y:%m:%d")}'
         self.object._write_backlog = mock.MagicMock()
-        self.object._write_to_s3_resource({"foo": "bar"}, "base_prefix/prefix-%{%y:%m:%d}")
+        self.object._add_to_backlog({"foo": "bar"}, "base_prefix/prefix-%{%y:%m:%d}")
+        self.object._write_to_s3_resource()
         resulting_prefix = next(iter(self.object._message_backlog.keys()))
 
         assert expected_prefix == resulting_prefix
@@ -270,11 +271,6 @@ def test_store_failed_counts_failed_events(self):
         self.object._write_backlog = mock.MagicMock()
         super().test_store_failed_counts_failed_events()
 
-    @staticmethod
-    def _wait_for_writing_thread(s3_output):
-        if s3_output._writing_thread is not None:
-            s3_output._writing_thread.join()
-
     @staticmethod
     def _calculate_backlog_size(s3_output):
         return sum(len(values) for values in s3_output._message_backlog.values())