diff --git a/logprep/connector/s3/output.py b/logprep/connector/s3/output.py index adb3b3edc..e7136a25b 100644 --- a/logprep/connector/s3/output.py +++ b/logprep/connector/s3/output.py @@ -112,6 +112,9 @@ class Config(Output.Config): ) """The input callback is called after the maximum backlog size has been reached if this is set to True (optional)""" + flush_timeout: Optional[int] = field(validator=validators.instance_of(int), default=60) + """(Optional) Timout after :code:`message_backlog` is flushed if + :code:`message_backlog_size` is not reached.""" @define(kw_only=True) class Metrics(Output.Metrics): @@ -141,6 +144,7 @@ def __init__(self, name: str, configuration: "S3Output.Config", logger: Logger): self._base_prefix = f"{self._config.base_prefix}/" if self._config.base_prefix else "" self._s3_resource = None self._setup_s3_resource() + self._schedule_task(task=self._write_backlog, seconds=self._config.flush_timeout) def _setup_s3_resource(self): session = boto3.Session( diff --git a/tests/unit/connector/test_s3_output.py b/tests/unit/connector/test_s3_output.py index 301e34582..d718cd7b3 100644 --- a/tests/unit/connector/test_s3_output.py +++ b/tests/unit/connector/test_s3_output.py @@ -270,6 +270,7 @@ def test_message_backlog_is_not_written_if_message_backlog_size_not_reached(self def test_store_failed_counts_failed_events(self): self.object._write_backlog = mock.MagicMock() super().test_store_failed_counts_failed_events() + self.object._scheduler.jobs = [] # If I don't do this I poison the scheduler for all subsequent tests (wtf) @staticmethod def _calculate_backlog_size(s3_output):