diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e2f81246a..5f1edc0d0 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -73,17 +73,11 @@ jobs: pip install --upgrade pip wheel pip install .[dev] - - name: Perform unit tests without http + - name: Perform unit tests env: PYTEST_ADDOPTS: "--color=yes" run: | - pytest -vv tests/unit -k "not test_http" - - - name: Perform unit tests for http - env: - PYTEST_ADDOPTS: "--color=yes" - run: | - pytest -vv tests/unit -k "test_http" + pytest -vv tests/unit - name: Perform acceptance tests env: diff --git a/logprep/connector/http/input.py b/logprep/connector/http/input.py index ca4238a6b..b4d4f9dda 100644 --- a/logprep/connector/http/input.py +++ b/logprep/connector/http/input.py @@ -30,9 +30,9 @@ from logging import Logger import logging import re -from typing import Mapping, Tuple, Union, Callable import msgspec import uvicorn +from typing import Mapping, Tuple, Union, Callable from attrs import define, field, validators import falcon.asgi from falcon import HTTPTooManyRequests, HTTPMethodNotAllowed # pylint: disable=no-name-in-module @@ -109,11 +109,17 @@ async def func_wrapper(*args, **kwargs): return func_wrapper +class HTTPEvent(msgspec.Struct): + """Eventdata and Metadata from HTTP Input""" + metadata: set[str] = set() + event: set[str] = set() + + def to_dict(self): + return {f: getattr(self, f) for f in self.__struct_fields__} + class HttpEndpoint(ABC): """interface for http endpoints""" - messages: queue.Queue - def __init__(self, messages: queue.Queue) -> None: self.messages = messages @@ -128,7 +134,15 @@ async def __call__(self, req, resp): # pylint: disable=arguments-differ """json endpoint method""" data = await req.stream.read() data = data.decode("utf8") - self.messages.put(self._decoder.decode(data), block=False) + if data: + event = HTTPEvent( + metadata={ + "url":req.url, + "remote_addr":req.remote_addr, + "user_agent":req.user_agent + }, + event=self._decoder.decode(data)) + self.messages.put(event.to_dict(), block=False) class JSONLHttpEndpoint(HttpEndpoint): @@ -141,11 +155,18 @@ async def __call__(self, req, resp): # pylint: disable=arguments-differ """jsonl endpoint method""" data = await req.stream.read() data = data.decode("utf8") + metadata={ + "url":req.url, + "remote_addr":req.remote_addr, + "user_agent":req.user_agent} for line in data.splitlines(): line = line.strip() if line: - event = self._decoder.decode(line) - self.messages.put(event, block=False) + event = HTTPEvent( + metadata=metadata, + event=self._decoder.decode(line) + ) + self.messages.put(event.to_dict(), block=False) class PlaintextHttpEndpoint(HttpEndpoint): @@ -160,7 +181,22 @@ async def __call__(self, req, resp): # pylint: disable=arguments-differ class ThreadingHTTPServer: - """Threading Wrapper around Uvicorn Server""" + """Singleton Wrapper Class around Uvicorn Thread that controls + lifecycle of Uvicorn HTTP Server. During Runtime this singleton object + is stateful and therefore we need to check for some attributes during + __init__ when multiple consecutive reconfigurations are happening. + + Parameters + ---------- + connector_config: Input.Config + Holds full connector config for config change checks + endpoints_config: dict + Endpoint paths as key and initiated endpoint objects as + value + log_level: str + Log level to be set for uvicorn server + """ + _instance = None _lock = threading.Lock() diff --git a/logprep/framework/pipeline_manager.py b/logprep/framework/pipeline_manager.py index 5b9c7e0c7..b79e91a6f 100644 --- a/logprep/framework/pipeline_manager.py +++ b/logprep/framework/pipeline_manager.py @@ -13,6 +13,82 @@ from logprep.metrics.exporter import PrometheusExporter from logprep.metrics.metrics import CounterMetric from logprep.util.configuration import Configuration +#from logprep.util.logging import QueueListener + +import time +import logging +import logging.handlers +import threading +from queue import Empty + + +class SingleThreadQueueListener(logging.handlers.QueueListener): + """A subclass of QueueListener that uses a single thread for all queues. + + See https://github.com/python/cpython/blob/main/Lib/logging/handlers.py + for the implementation of QueueListener. + """ + monitor_thread = None + listeners = [] + sleep_time = 0.1 + + @classmethod + def _start(cls): + """Start a single thread, only if none is started.""" + if cls.monitor_thread is None or not cls.monitor_thread.is_alive(): + cls.monitor_thread = t = threading.Thread( + target=cls._monitor_all, name='logging_monitor') + t.daemon = True + t.start() + return cls.monitor_thread + + @classmethod + def _join(cls): + """Waits for the thread to stop. + Only call this after stopping all listeners. + """ + if cls.monitor_thread is not None and cls.monitor_thread.is_alive(): + cls.monitor_thread.join() + cls.monitor_thread = None + + @classmethod + def _monitor_all(cls): + """A monitor function for all the registered listeners. + Does not block when obtaining messages from the queue to give all + listeners a chance to get an item from the queue. That's why we + must sleep at every cycle. + + If a sentinel is sent, the listener is unregistered. + When all listeners are unregistered, the thread stops. + """ + noop = lambda: None + while cls.listeners: + time.sleep(cls.sleep_time) # does not block all threads + for listener in cls.listeners: + try: + # Gets all messages in this queue without blocking + task_done = getattr(listener.queue, 'task_done', noop) + while True: + record = listener.dequeue(False) + if record is listener._sentinel: + cls.listeners.remove(listener) + else: + listener.handle(record) + task_done() + except Empty: + continue + + def start(self): + """Override default implementation. + Register this listener and call class' _start() instead. + """ + SingleThreadQueueListener.listeners.append(self) + # Start if not already + SingleThreadQueueListener._start() + + def stop(self): + """Enqueues the sentinel but does not stop the thread.""" + self.enqueue_sentinel() class PipelineManager: @@ -50,7 +126,7 @@ def __init__(self, configuration: Configuration): self.metrics = self.Metrics(labels={"component": "manager"}) self._logger = logging.getLogger("Logprep PipelineManager") self.log_queue = multiprocessing.Queue(-1) - self._queue_listener = logging.handlers.QueueListener(self.log_queue) + self._queue_listener = SingleThreadQueueListener(self.log_queue) self._queue_listener.start() self._pipelines: list[multiprocessing.Process] = [] diff --git a/quickstart/exampledata/config/http_pipeline.yml b/quickstart/exampledata/config/http_pipeline.yml index 82051f14e..a288ad5fc 100644 --- a/quickstart/exampledata/config/http_pipeline.yml +++ b/quickstart/exampledata/config/http_pipeline.yml @@ -12,6 +12,7 @@ input: host: 0.0.0.0 port: 9000 endpoints: + /json: json /lab/123/(first|second|third)/js.*: jsonl /lab/123/(ABC|DEF)/pl.*: plaintext output: