Skip to content

Commit

Permalink
Move s3 resource creation into setup method.
Browse files Browse the repository at this point in the history
* Make `_s3_resource` a cached property.
* Move s3 session and resource creation into `setup()` method so that
  task for `flush_timeout` can be scheduled properly.
  • Loading branch information
clumsy9 committed Apr 16, 2024
1 parent d125974 commit 3d1658f
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 82 deletions.
153 changes: 73 additions & 80 deletions logprep/connector/s3/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
from uuid import uuid4

import boto3
import msgspec
from attr import define, field
from attrs import validators
from botocore.exceptions import (
Expand All @@ -58,8 +57,8 @@
EndpointConnectionError,
)

from logprep.abc.output import Output, FatalOutputError
from logprep.metrics.metrics import Metric, CounterMetric
from logprep.abc.output import FatalOutputError, Output
from logprep.metrics.metrics import CounterMetric, Metric
from logprep.util.helper import get_dotted_field_value
from logprep.util.time import TimeParser

Expand Down Expand Up @@ -128,25 +127,19 @@ class Metrics(Output.Metrics):
)
"""Number of events that were successfully written to s3"""

__slots__ = ["_message_backlog", "_index_cache"]
__slots__ = ["_message_backlog", "_base_prefix"]

_message_backlog: DefaultDict

_s3_resource: Optional["boto3.resources.factory.s3.ServiceResource"]

_encoder: msgspec.json.Encoder = msgspec.json.Encoder()

_base_prefix: str

def __init__(self, name: str, configuration: "S3Output.Config", logger: Logger):
super().__init__(name, configuration, logger)
self._message_backlog = defaultdict(list)
self._base_prefix = f"{self._config.base_prefix}/" if self._config.base_prefix else ""
self._s3_resource = None
self._setup_s3_resource()
self._schedule_task(task=self._write_backlog, seconds=self._config.flush_timeout)

def _setup_s3_resource(self):
@cached_property
def _s3_resource(self):
session = boto3.Session(
aws_access_key_id=self._config.aws_access_key_id,
aws_secret_access_key=self._config.aws_secret_access_key,
Expand All @@ -156,19 +149,14 @@ def _setup_s3_resource(self):
connect_timeout=self._config.connect_timeout,
retries={"max_attempts": self._config.max_retries},
)
self._s3_resource = session.resource(
return session.resource(
"s3",
endpoint_url=f"{self._config.endpoint_url}",
verify=self._config.ca_cert,
use_ssl=self._config.use_ssl,
config=config,
)

@property
def s3_resource(self):
"""Return s3 resource"""
return self._s3_resource

@property
def _backlog_size(self):
return sum(map(len, self._message_backlog.values()))
Expand All @@ -189,6 +177,72 @@ def describe(self) -> str:
base_description = super().describe()
return f"{base_description} - S3 Output: {self._config.endpoint_url}"

def setup(self):
super().setup()
self._schedule_task(task=self._write_backlog, seconds=self._config.flush_timeout)
_ = self._s3_resource

def store(self, document: dict):
"""Store a document into s3 bucket.
Parameters
----------
document : dict
Document to store.
"""
self.metrics.number_of_processed_events += 1

prefix_value = get_dotted_field_value(document, self._config.prefix_field)
if prefix_value is None:
document = self._build_no_prefix_document(
document, f"Prefix field '{self._config.prefix_field}' empty or missing in document"
)
prefix_value = self._config.default_prefix
self._add_to_backlog(document, prefix_value)
self._write_to_s3_resource()

def store_custom(self, document: dict, target: str):
"""Store document into backlog to be written into s3 bucket using the target prefix.
Only add to backlog instead of writing the batch and calling batch_finished_callback,
since store_custom can be called before the event has been fully processed.
Setting the offset or comiting before fully processing an event can lead to data loss if
Logprep terminates.
Parameters
----------
document : dict
Document to be stored into the target prefix.
target : str
Prefix for the document.
"""
self.metrics.number_of_processed_events += 1
self._add_to_backlog(document, target)

def store_failed(self, error_message: str, document_received: dict, document_processed: dict):
"""Write errors into s3 bucket using error prefix for documents that failed processing.
Parameters
----------
error_message : str
Error message to write into document.
document_received : dict
Document as it was before processing.
document_processed : dict
Document after processing until an error occurred.
"""
self.metrics.number_of_failed_events += 1
error_document = {
"error": error_message,
"original": document_received,
"processed": document_processed,
"@timestamp": TimeParser.now().isoformat(),
}
self._add_to_backlog(error_document, self._config.error_prefix)
self._write_to_s3_resource()

def _add_dates(self, prefix):
date_format_matches = self._replace_pattern.findall(prefix)
if date_format_matches:
Expand Down Expand Up @@ -247,29 +301,10 @@ def _write_document_batch(self, document_batch: dict, identifier: str):

def _write_to_s3(self, document_batch: dict, identifier: str):
self._logger.debug(f'Writing "{identifier}" to s3 bucket "{self._config.bucket}"')
s3_obj = self.s3_resource.Object(self._config.bucket, identifier)
s3_obj = self._s3_resource.Object(self._config.bucket, identifier)
s3_obj.put(Body=self._encoder.encode(document_batch), ContentType="application/json")
self.metrics.number_of_successful_writes += len(document_batch)

def store(self, document: dict):
"""Store a document into s3 bucket.
Parameters
----------
document : dict
Document to store.
"""
self.metrics.number_of_processed_events += 1

prefix_value = get_dotted_field_value(document, self._config.prefix_field)
if prefix_value is None:
document = self._build_no_prefix_document(
document, f"Prefix field '{self._config.prefix_field}' empty or missing in document"
)
prefix_value = self._config.default_prefix
self._add_to_backlog(document, prefix_value)
self._write_to_s3_resource()

@staticmethod
def _build_no_prefix_document(message_document: dict, reason: str):
document = {
Expand All @@ -281,45 +316,3 @@ def _build_no_prefix_document(message_document: dict, reason: str):
except TypeError:
document["message"] = str(message_document)
return document

def store_custom(self, document: dict, target: str):
"""Store document into backlog to be written into s3 bucket using the target prefix.
Only add to backlog instead of writing the batch and calling batch_finished_callback,
since store_custom can be called before the event has been fully processed.
Setting the offset or comiting before fully processing an event can lead to data loss if
Logprep terminates.
Parameters
----------
document : dict
Document to be stored into the target prefix.
target : str
Prefix for the document.
"""
self.metrics.number_of_processed_events += 1
self._add_to_backlog(document, target)

def store_failed(self, error_message: str, document_received: dict, document_processed: dict):
"""Write errors into s3 bucket using error prefix for documents that failed processing.
Parameters
----------
error_message : str
Error message to write into document.
document_received : dict
Document as it was before processing.
document_processed : dict
Document after processing until an error occurred.
"""
self.metrics.number_of_failed_events += 1
error_document = {
"error": error_message,
"original": document_received,
"processed": document_processed,
"@timestamp": TimeParser.now().isoformat(),
}
self._add_to_backlog(error_document, self._config.error_prefix)
self._write_to_s3_resource()
7 changes: 5 additions & 2 deletions tests/unit/connector/test_s3_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,11 @@ def test_message_backlog_is_not_written_if_message_backlog_size_not_reached(self
def test_store_failed_counts_failed_events(self):
self.object._write_backlog = mock.MagicMock()
super().test_store_failed_counts_failed_events()
self.object._scheduler.jobs = [] # If I don't do this,
# I poison the scheduler for all subsequent tests (wtf)

def test_setup_registers_flush_timout_tasks(self):
job_count = len(self.object._scheduler.jobs)
self.object.setup()
assert len(self.object._scheduler.jobs) == job_count + 1

@staticmethod
def _calculate_backlog_size(s3_output):
Expand Down

0 comments on commit 3d1658f

Please sign in to comment.