Skip to content

Commit

Permalink
fix logging, concurrency and interruption (#769)
Browse files Browse the repository at this point in the history
* ensure logging and threading

* move generate load to sender
  • Loading branch information
ekneg54 authored Feb 25, 2025
1 parent 128fbed commit cf7db36
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 114 deletions.
26 changes: 3 additions & 23 deletions logprep/abc/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import signal
import threading
from abc import ABC, abstractmethod
from concurrent.futures import ThreadPoolExecutor, as_completed

from logprep.abc.output import Output
from logprep.generator.http.input import Input
Expand Down Expand Up @@ -39,18 +38,14 @@ def __init__(
# implement shuffle in batcher
# implement handling in batcher for different paths
#
# revise logging (no logs in controller about threading)
# interrupt (SIGINT) in threadpoolexecutor with high event count
#
# refactor input class with focus on Single Responsibility Principle
# how to handle big amount of example events? they are loaded in memory
# test with big files
# compute message backlog size instead of defaults?

def setup(self) -> None:
logger = logging.getLogger("Generator")
"""Setup the generator"""
self.loghandler.start()
self.file_loader.start()
logger.debug("Start thread Fileloader active threads: %s", threading.active_count())
self.sender = Sender(self.file_loader.read_lines(), self.output, **self.config)
signal.signal(signal.SIGTERM, self.stop)
Expand All @@ -60,25 +55,10 @@ def setup(self) -> None:
def run(self):
"""Run the generator"""

def _generate_load(self):
with ThreadPoolExecutor(max_workers=self.thread_count) as executor:
futures = {executor.submit(self.sender.send_batch) for _ in range(self.thread_count)}
while not self.exit_requested:
future = next(as_completed(futures))
try:
result = future.result() # Wait for a completed task
logger.debug("Result: %s", result)
logger.info("During generate load active threads: %s", threading.active_count())
logger.info("Finished processing a batch")
except StopIteration:
logger.info("Stopped Data Processing")
break

logger.debug("After generate load active threads: %s", threading.active_count())

def stop(self, signum, frame):
"""Stop the generator"""
self.exit_requested = True
self.sender.stop()
self.file_loader.close()
logger.info("Stopped Data Processing on signal %s", signum)
self.loghandler.stop()
return None
3 changes: 1 addition & 2 deletions logprep/connector/http/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,6 @@ def store_custom(self, document: dict | tuple | list, target: str) -> None:
return
try:
try:
logger.debug(request_data)
response = requests.post(
url=target,
headers=self._headers,
Expand All @@ -192,7 +191,7 @@ def store_custom(self, document: dict | tuple | list, target: str) -> None:
timeout=(self.timeout, self.timeout),
data=request_data,
)
logger.debug("Servers response code is: %i", response.status_code)
# logger.debug("Servers response code is: %i", response.status_code)
self.metrics.status_codes.add_with_labels(
1,
{
Expand Down
8 changes: 5 additions & 3 deletions logprep/generator/http/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ def run(self) -> None:
self.input.reformat_dataset()
self.setup()
run_time_start = time.perf_counter()
self._generate_load()
# self.input.clean_up_tempdir()
self.sender.send_batches()
self.input.clean_up_tempdir()
run_duration = time.perf_counter() - run_time_start
stats = self.output.statistics
logger.info("Completed with following statistics: %s", stats)
logger.info("Execution time: %f seconds", run_duration)
self.stop(signal.SIGTERM, None)
if not self.exit_requested:
self.stop(signal.SIGTERM, None)
self.loghandler.stop()
88 changes: 9 additions & 79 deletions logprep/generator/http/loader.py
Original file line number Diff line number Diff line change
@@ -1,88 +1,19 @@
import io
import logging
import shutil
import threading
from pathlib import Path
from queue import Empty, Full, Queue
from typing import Generator, List

from logprep.util.defaults import DEFAULT_MESSAGE_BACKLOG_SIZE

logger = logging.getLogger("Loader")


class EventBuffer:
"""Handles the read and write operation into the buffer"""

_message_backlog: Queue

_sentinel = object()

_thread: threading.Thread

def __init__(
self, file_loader: "FileLoader", message_backlog_size: int = DEFAULT_MESSAGE_BACKLOG_SIZE
) -> None:
self.file_loader = file_loader
self._message_backlog = Queue(maxsize=message_backlog_size)
self._thread = threading.Thread(target=self.write)
self.exit_requested = False

def write(self) -> None:
"""Reads lines from the file loader and puts them into the message backlog queue.
This method blocks if queue is full.
"""
for file in self.file_loader.files:
if self.exit_requested:
break
with open(file, "r", encoding="utf8") as current_file:
while line := current_file.readline():
succeeded = False
while not succeeded:
try:
self._message_backlog.put(line.strip(), timeout=1)
succeeded = True
except Full:
logger.warning("Message backlog queue is full. ")
self._message_backlog.put(self._sentinel)

def read_lines(self) -> Generator[str, None, None]:
"""Reads lines from the message backlog queue.
Yields:
-------
str:
A line from the message backlog queue.
"""
while True:
try:
event = self._message_backlog.get(timeout=1)
except Empty:
continue
if event is self._sentinel:
break
yield event

def start(self) -> None:
"""Starts the thread."""
if self._thread.is_alive():
return
self._thread.start()

def stop(self) -> None:
"""Stops the thread."""
self._message_backlog.put(self._sentinel)
self.exit_requested = True
self._thread.join()


class FileLoader:
"""Handles file operations like reading files, shuffling, and cycling through them."""

def __init__(self, directory: str | Path, **config) -> None:
message_backlog_size = config.get("message_backlog_size", DEFAULT_MESSAGE_BACKLOG_SIZE)
self._buffer = EventBuffer(self, message_backlog_size)
self.directory = Path(directory)
self.event_count = config.get("events", 1)
self.exit_requested = False

@property
def files(self) -> List[Path]:
Expand All @@ -99,20 +30,19 @@ def files(self) -> List[Path]:

def read_lines(self) -> Generator[str, None, None]:
"""Endless loop over files."""
yield from self._buffer.read_lines()
for file in self.files:
with open(file, "r", encoding="utf8", buffering=io.DEFAULT_BUFFER_SIZE) as current_file:
while line := current_file.readline():
yield line
if self.exit_requested:
return

def clean_up(self) -> None:
"""Deletes the temporary directory."""
if self.directory.exists():
shutil.rmtree(self.directory)

def start(self) -> None:
"""Starts the thread."""
self._buffer.start()

def close(self) -> None:
"""Stops the thread."""
self._buffer.stop()
while self._buffer._thread.is_alive():
logger.info("Waiting for buffer thread to finish")
self.exit_requested = True
self.clean_up()
29 changes: 23 additions & 6 deletions logprep/generator/http/sender.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,43 @@
import logging
import threading
from concurrent.futures import ThreadPoolExecutor
from itertools import islice
from typing import Iterable

from logprep.abc.output import Output

logger = logging.getLogger("Sender")


class Sender:
"""Manages the Batcher and Output classes"""

def __init__(self, input_events: Iterable, output: Output, **config):
self.config = config
self.output = output
self.thread_count = config.get("thread_count", 1)
self.input_events = iter(input_events)
self.target_url = config.get("target_url")
self._lock = threading.Lock()
self.exit_requested = False
if not self.target_url:
raise ValueError("No target_url specified")

def send_batch(self) -> None:
def send_batches(self) -> None:
"""Loads a batch from the message backlog and sends to the endpoint"""
target_url = self.target_url
while self.input_events:
with self._lock:
batch = next(self.input_events)
# line starts with the target path of the target url
self.output.store(f"{target_url}{batch}")
event_generator = (f"{target_url}{batch}" for batch in self.input_events)
with ThreadPoolExecutor(max_workers=self.thread_count) as executor:
while not self.exit_requested:
chunk = tuple(islice(event_generator, self.thread_count))
if not chunk:
break
for _ in executor.map(self.output.store, chunk):
logger.debug(
"During generate load active threads: %s", threading.active_count()
)
logger.debug("After generate load active threads: %s", threading.active_count())

def stop(self) -> None:
"""Stops the sender"""
self.exit_requested = True
2 changes: 1 addition & 1 deletion tests/unit/generator/http/test_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,6 @@ def test_send_batch_calls_store(self):
mock.call("http://example.com/file2.txt"),
mock.call("http://example.com/file3.txt"),
]
self.sender.send_batch()
self.sender.send_batches()
self.mock_output.store.assert_has_calls(expected_calls, any_order=False)
assert self.mock_output.store.call_count == 3

0 comments on commit cf7db36

Please sign in to comment.