From 3d1658f689cf30ce0cc2ae59196fea98b0fcb056 Mon Sep 17 00:00:00 2001 From: Marco Herzog Date: Tue, 16 Apr 2024 13:03:37 +0200 Subject: [PATCH] Move s3 resource creation into setup method. * Make `_s3_resource` a cached property. * Move s3 session and resource creation into `setup()` method so that task for `flush_timeout` can be scheduled properly. --- logprep/connector/s3/output.py | 153 ++++++++++++------------- tests/unit/connector/test_s3_output.py | 7 +- 2 files changed, 78 insertions(+), 82 deletions(-) diff --git a/logprep/connector/s3/output.py b/logprep/connector/s3/output.py index e7136a25b..18976e400 100644 --- a/logprep/connector/s3/output.py +++ b/logprep/connector/s3/output.py @@ -48,7 +48,6 @@ from uuid import uuid4 import boto3 -import msgspec from attr import define, field from attrs import validators from botocore.exceptions import ( @@ -58,8 +57,8 @@ EndpointConnectionError, ) -from logprep.abc.output import Output, FatalOutputError -from logprep.metrics.metrics import Metric, CounterMetric +from logprep.abc.output import FatalOutputError, Output +from logprep.metrics.metrics import CounterMetric, Metric from logprep.util.helper import get_dotted_field_value from logprep.util.time import TimeParser @@ -128,25 +127,19 @@ class Metrics(Output.Metrics): ) """Number of events that were successfully written to s3""" - __slots__ = ["_message_backlog", "_index_cache"] + __slots__ = ["_message_backlog", "_base_prefix"] _message_backlog: DefaultDict - _s3_resource: Optional["boto3.resources.factory.s3.ServiceResource"] - - _encoder: msgspec.json.Encoder = msgspec.json.Encoder() - _base_prefix: str def __init__(self, name: str, configuration: "S3Output.Config", logger: Logger): super().__init__(name, configuration, logger) self._message_backlog = defaultdict(list) self._base_prefix = f"{self._config.base_prefix}/" if self._config.base_prefix else "" - self._s3_resource = None - self._setup_s3_resource() - self._schedule_task(task=self._write_backlog, seconds=self._config.flush_timeout) - def _setup_s3_resource(self): + @cached_property + def _s3_resource(self): session = boto3.Session( aws_access_key_id=self._config.aws_access_key_id, aws_secret_access_key=self._config.aws_secret_access_key, @@ -156,7 +149,7 @@ def _setup_s3_resource(self): connect_timeout=self._config.connect_timeout, retries={"max_attempts": self._config.max_retries}, ) - self._s3_resource = session.resource( + return session.resource( "s3", endpoint_url=f"{self._config.endpoint_url}", verify=self._config.ca_cert, @@ -164,11 +157,6 @@ def _setup_s3_resource(self): config=config, ) - @property - def s3_resource(self): - """Return s3 resource""" - return self._s3_resource - @property def _backlog_size(self): return sum(map(len, self._message_backlog.values())) @@ -189,6 +177,72 @@ def describe(self) -> str: base_description = super().describe() return f"{base_description} - S3 Output: {self._config.endpoint_url}" + def setup(self): + super().setup() + self._schedule_task(task=self._write_backlog, seconds=self._config.flush_timeout) + _ = self._s3_resource + + def store(self, document: dict): + """Store a document into s3 bucket. + + Parameters + ---------- + document : dict + Document to store. + """ + self.metrics.number_of_processed_events += 1 + + prefix_value = get_dotted_field_value(document, self._config.prefix_field) + if prefix_value is None: + document = self._build_no_prefix_document( + document, f"Prefix field '{self._config.prefix_field}' empty or missing in document" + ) + prefix_value = self._config.default_prefix + self._add_to_backlog(document, prefix_value) + self._write_to_s3_resource() + + def store_custom(self, document: dict, target: str): + """Store document into backlog to be written into s3 bucket using the target prefix. + + Only add to backlog instead of writing the batch and calling batch_finished_callback, + since store_custom can be called before the event has been fully processed. + Setting the offset or comiting before fully processing an event can lead to data loss if + Logprep terminates. + + Parameters + ---------- + document : dict + Document to be stored into the target prefix. + target : str + Prefix for the document. + + """ + self.metrics.number_of_processed_events += 1 + self._add_to_backlog(document, target) + + def store_failed(self, error_message: str, document_received: dict, document_processed: dict): + """Write errors into s3 bucket using error prefix for documents that failed processing. + + Parameters + ---------- + error_message : str + Error message to write into document. + document_received : dict + Document as it was before processing. + document_processed : dict + Document after processing until an error occurred. + + """ + self.metrics.number_of_failed_events += 1 + error_document = { + "error": error_message, + "original": document_received, + "processed": document_processed, + "@timestamp": TimeParser.now().isoformat(), + } + self._add_to_backlog(error_document, self._config.error_prefix) + self._write_to_s3_resource() + def _add_dates(self, prefix): date_format_matches = self._replace_pattern.findall(prefix) if date_format_matches: @@ -247,29 +301,10 @@ def _write_document_batch(self, document_batch: dict, identifier: str): def _write_to_s3(self, document_batch: dict, identifier: str): self._logger.debug(f'Writing "{identifier}" to s3 bucket "{self._config.bucket}"') - s3_obj = self.s3_resource.Object(self._config.bucket, identifier) + s3_obj = self._s3_resource.Object(self._config.bucket, identifier) s3_obj.put(Body=self._encoder.encode(document_batch), ContentType="application/json") self.metrics.number_of_successful_writes += len(document_batch) - def store(self, document: dict): - """Store a document into s3 bucket. - - Parameters - ---------- - document : dict - Document to store. - """ - self.metrics.number_of_processed_events += 1 - - prefix_value = get_dotted_field_value(document, self._config.prefix_field) - if prefix_value is None: - document = self._build_no_prefix_document( - document, f"Prefix field '{self._config.prefix_field}' empty or missing in document" - ) - prefix_value = self._config.default_prefix - self._add_to_backlog(document, prefix_value) - self._write_to_s3_resource() - @staticmethod def _build_no_prefix_document(message_document: dict, reason: str): document = { @@ -281,45 +316,3 @@ def _build_no_prefix_document(message_document: dict, reason: str): except TypeError: document["message"] = str(message_document) return document - - def store_custom(self, document: dict, target: str): - """Store document into backlog to be written into s3 bucket using the target prefix. - - Only add to backlog instead of writing the batch and calling batch_finished_callback, - since store_custom can be called before the event has been fully processed. - Setting the offset or comiting before fully processing an event can lead to data loss if - Logprep terminates. - - Parameters - ---------- - document : dict - Document to be stored into the target prefix. - target : str - Prefix for the document. - - """ - self.metrics.number_of_processed_events += 1 - self._add_to_backlog(document, target) - - def store_failed(self, error_message: str, document_received: dict, document_processed: dict): - """Write errors into s3 bucket using error prefix for documents that failed processing. - - Parameters - ---------- - error_message : str - Error message to write into document. - document_received : dict - Document as it was before processing. - document_processed : dict - Document after processing until an error occurred. - - """ - self.metrics.number_of_failed_events += 1 - error_document = { - "error": error_message, - "original": document_received, - "processed": document_processed, - "@timestamp": TimeParser.now().isoformat(), - } - self._add_to_backlog(error_document, self._config.error_prefix) - self._write_to_s3_resource() diff --git a/tests/unit/connector/test_s3_output.py b/tests/unit/connector/test_s3_output.py index 88a6acc6c..0dbf2d450 100644 --- a/tests/unit/connector/test_s3_output.py +++ b/tests/unit/connector/test_s3_output.py @@ -270,8 +270,11 @@ def test_message_backlog_is_not_written_if_message_backlog_size_not_reached(self def test_store_failed_counts_failed_events(self): self.object._write_backlog = mock.MagicMock() super().test_store_failed_counts_failed_events() - self.object._scheduler.jobs = [] # If I don't do this, - # I poison the scheduler for all subsequent tests (wtf) + + def test_setup_registers_flush_timout_tasks(self): + job_count = len(self.object._scheduler.jobs) + self.object.setup() + assert len(self.object._scheduler.jobs) == job_count + 1 @staticmethod def _calculate_backlog_size(s3_output):