Skip to content

Commit

Permalink
Add flush timeout to s3 connector
Browse files Browse the repository at this point in the history
  • Loading branch information
saegel committed Apr 8, 2024
1 parent 0008958 commit d125974
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 0 deletions.
4 changes: 4 additions & 0 deletions logprep/connector/s3/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ class Config(Output.Config):
)
"""The input callback is called after the maximum backlog size has been reached
if this is set to True (optional)"""
flush_timeout: Optional[int] = field(validator=validators.instance_of(int), default=60)
"""(Optional) Timout after :code:`message_backlog` is flushed if
:code:`message_backlog_size` is not reached."""

@define(kw_only=True)
class Metrics(Output.Metrics):
Expand Down Expand Up @@ -141,6 +144,7 @@ def __init__(self, name: str, configuration: "S3Output.Config", logger: Logger):
self._base_prefix = f"{self._config.base_prefix}/" if self._config.base_prefix else ""
self._s3_resource = None
self._setup_s3_resource()
self._schedule_task(task=self._write_backlog, seconds=self._config.flush_timeout)

def _setup_s3_resource(self):
session = boto3.Session(
Expand Down
2 changes: 2 additions & 0 deletions tests/unit/connector/test_s3_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ def test_message_backlog_is_not_written_if_message_backlog_size_not_reached(self
def test_store_failed_counts_failed_events(self):
self.object._write_backlog = mock.MagicMock()
super().test_store_failed_counts_failed_events()
self.object._scheduler.jobs = [] # If I don't do this,
# I poison the scheduler for all subsequent tests (wtf)

@staticmethod
def _calculate_backlog_size(s3_output):
Expand Down

0 comments on commit d125974

Please sign in to comment.