From b7a20c5ff1431ff5927273bf19fb31efce67fc75 Mon Sep 17 00:00:00 2001 From: david Date: Tue, 26 Mar 2024 14:05:28 +0100 Subject: [PATCH] add message_backlog_size and metadata add --- logprep/connector/http/input.py | 154 +++++++++++------- logprep/framework/pipeline_manager.py | 77 +-------- .../exampledata/config/http_pipeline.yml | 3 + tests/unit/connector/test_http_input.py | 10 +- 4 files changed, 101 insertions(+), 143 deletions(-) diff --git a/logprep/connector/http/input.py b/logprep/connector/http/input.py index b4d4f9dda..a63420b69 100644 --- a/logprep/connector/http/input.py +++ b/logprep/connector/http/input.py @@ -13,6 +13,9 @@ input: myhttpinput: type: http_input + message_backlog_size: 15000 + collect_meta: False + metafield_name: "@metadata" uvicorn_config: host: 0.0.0.0 port: 9000 @@ -30,10 +33,10 @@ from logging import Logger import logging import re -import msgspec -import uvicorn from typing import Mapping, Tuple, Union, Callable from attrs import define, field, validators +import msgspec +import uvicorn import falcon.asgi from falcon import HTTPTooManyRequests, HTTPMethodNotAllowed # pylint: disable=no-name-in-module from logprep.abc.input import FatalInputError, Input @@ -48,13 +51,41 @@ HTTP_INPUT_CONFIG_KEYS = ["preprocessing", "uvicorn_config", "endpoints"] -def threadsafe_wrapper(func): - """Decorator making sure that the decorated function is thread safe""" - lock = threading.Lock() +def decorator_request_exceptions(func: Callable): + """Decorator to wrap http calls and raise exceptions""" - def func_wrapper(*args, **kwargs): - with lock: - func_wrapper = func(*args, **kwargs) + async def func_wrapper(*args, **kwargs): + try: + if args[1].method == "POST": + func_wrapper = await func(*args, **kwargs) + else: + raise HTTPMethodNotAllowed(["POST"]) + except queue.Full as exc: + raise HTTPTooManyRequests(description="Logprep Message Queue is full.") from exc + return func_wrapper + + return func_wrapper + + +def decorator_add_metadata(func: Callable): + """Decorator to add metadata to resulting http event. + Uses attribute collect_meta of endpoint class to decide over metadata collection + Uses attribute metafield_name to define key name for metadata + """ + + async def func_wrapper(*args, **kwargs): + req = args[1] + endpoint = args[0] + if endpoint.collect_meta: + metadata = { + "url": req.url, + "remote_addr": req.remote_addr, + "user_agent": req.user_agent, + } + kwargs["metadata"] = {endpoint.metafield_name: metadata} + else: + kwargs["metadata"] = {} + func_wrapper = await func(*args, **kwargs) return func_wrapper return func_wrapper @@ -93,35 +124,23 @@ def route_compile_helper(input_re_str: str): return re.compile(input_re_str) -def decorator_request_exceptions(func: Callable): - """Decorator to wrap http calls and raise exceptions""" - - async def func_wrapper(*args, **kwargs): - try: - if args[1].method == "POST": - func_wrapper = await func(*args, **kwargs) - else: - raise HTTPMethodNotAllowed(["POST"]) - except queue.Full as exc: - raise HTTPTooManyRequests(description="Logprep Message Queue is full.") from exc - return func_wrapper - - return func_wrapper - - -class HTTPEvent(msgspec.Struct): - """Eventdata and Metadata from HTTP Input""" - metadata: set[str] = set() - event: set[str] = set() - - def to_dict(self): - return {f: getattr(self, f) for f in self.__struct_fields__} - class HttpEndpoint(ABC): - """interface for http endpoints""" + """Interface for http endpoints + + Parameters + ---------- + messages: queue.Queue + Input Events are put here + collect_meta: bool + Collects Metadata on True (default) + metafield_name: str + Defines key name for metadata + """ - def __init__(self, messages: queue.Queue) -> None: + def __init__(self, messages: queue.Queue, collect_meta: bool, metafield_name: str) -> None: self.messages = messages + self.collect_meta = collect_meta + self.metafield_name = metafield_name class JSONHttpEndpoint(HttpEndpoint): @@ -130,19 +149,15 @@ class JSONHttpEndpoint(HttpEndpoint): _decoder = msgspec.json.Decoder() @decorator_request_exceptions - async def __call__(self, req, resp): # pylint: disable=arguments-differ + @decorator_add_metadata + async def __call__(self, req, resp, **kwargs): # pylint: disable=arguments-differ """json endpoint method""" data = await req.stream.read() data = data.decode("utf8") + event = kwargs.get("metadata", {}) if data: - event = HTTPEvent( - metadata={ - "url":req.url, - "remote_addr":req.remote_addr, - "user_agent":req.user_agent - }, - event=self._decoder.decode(data)) - self.messages.put(event.to_dict(), block=False) + event.update(self._decoder.decode(data)) + self.messages.put(event, block=False) class JSONLHttpEndpoint(HttpEndpoint): @@ -151,22 +166,20 @@ class JSONLHttpEndpoint(HttpEndpoint): _decoder = msgspec.json.Decoder() @decorator_request_exceptions - async def __call__(self, req, resp): # pylint: disable=arguments-differ + @decorator_add_metadata + async def __call__(self, req, resp, **kwargs): # pylint: disable=arguments-differ """jsonl endpoint method""" data = await req.stream.read() data = data.decode("utf8") - metadata={ - "url":req.url, - "remote_addr":req.remote_addr, - "user_agent":req.user_agent} for line in data.splitlines(): line = line.strip() if line: - event = HTTPEvent( - metadata=metadata, - event=self._decoder.decode(line) - ) - self.messages.put(event.to_dict(), block=False) + event = kwargs.get("metadata", {}) + dec_line = self._decoder.decode(line) + event.update(dec_line) + print("Event: " + str(event)) + print("Line: " + str(dec_line)) + self.messages.put(event, block=False) class PlaintextHttpEndpoint(HttpEndpoint): @@ -174,10 +187,13 @@ class PlaintextHttpEndpoint(HttpEndpoint): and put it in :code:`message` field""" @decorator_request_exceptions - async def __call__(self, req, resp): # pylint: disable=arguments-differ + @decorator_add_metadata + async def __call__(self, req, resp, **kwargs): # pylint: disable=arguments-differ """plaintext endpoint method""" data = await req.stream.read() - self.messages.put({"message": data.decode("utf8")}) + event = kwargs.get("metadata", {}) + event.update({"message": data.decode("utf8")}) + self.messages.put(event, block=False) class ThreadingHTTPServer: @@ -197,7 +213,6 @@ class ThreadingHTTPServer: Log level to be set for uvicorn server """ - _instance = None _lock = threading.Lock() @@ -246,7 +261,6 @@ def __init__( self.thread = threading.Thread(daemon=False, target=self.server.run) self._start() - @threadsafe_wrapper def _start(self): """Start thread with uvicorn+falcon http server and wait until it is up (started)""" @@ -254,7 +268,6 @@ def _start(self): while not self.server.started: continue - @threadsafe_wrapper def _stop(self): """Stop thread with uvicorn+falcon http server, wait for uvicorn to exit gracefully and join the thread""" @@ -265,7 +278,6 @@ def _stop(self): continue self.thread.join() - @threadsafe_wrapper def _init_log_config(self) -> dict: """Use for Uvicorn same log formatter like for Logprep""" log_config = uvicorn.config.LOGGING_CONFIG @@ -274,7 +286,6 @@ def _init_log_config(self) -> dict: log_config["handlers"]["default"]["stream"] = "ext://sys.stdout" return log_config - @threadsafe_wrapper def _override_runtime_logging(self): """Uvicorn doesn't provide API to change name and handler beforehand needs to be done during runtime""" @@ -292,7 +303,6 @@ def _init_web_application_server(self, endpoints_config: dict) -> None: for endpoint_path, endpoint in endpoints_config.items(): self.app.add_sink(endpoint, prefix=route_compile_helper(endpoint_path)) - @threadsafe_wrapper def shut_down(self): """Shutdown method to trigger http server shutdown externally""" self._stop() @@ -350,6 +360,20 @@ class Config(Input.Config): message_backlog_size: int = field( validator=validators.instance_of((int, float)), default=15000 ) + """Configures maximum size of input message queue for this connector. When limit is reached + the server will answer with 429 Too Many Requests. For reasonable throughput this shouldn't + be smaller than default value. + """ + + collect_meta: str = field(validator=validators.instance_of(bool), default=True) + """Defines if metadata should be collected in format + ``{metafield_name:{"url":"", "remote_addr":"", "user_agent":""}}``: + - ``True``: Collect metadata + - ``False``: Won't collect metadata + """ + + metafield_name: str = field(validator=validators.instance_of(str), default="@metadata") + """Defines the name of the key for the collected metadata fields""" __slots__ = [] @@ -381,9 +405,13 @@ def setup(self): ) # pylint: disable=attribute-defined-outside-init endpoints_config = {} + collect_meta = self._config.collect_meta + metafield_name = self._config.metafield_name for endpoint_path, endpoint_name in self._config.endpoints.items(): endpoint_class = self._endpoint_registry.get(endpoint_name) - endpoints_config[endpoint_path] = endpoint_class(self.messages) + endpoints_config[endpoint_path] = endpoint_class( + self.messages, collect_meta, metafield_name + ) self.http_server = ThreadingHTTPServer( # pylint: disable=attribute-defined-outside-init connector_config=self._config, @@ -392,7 +420,7 @@ def setup(self): ) def _get_event(self, timeout: float) -> Tuple: - """returns the first message from the queue""" + """Returns the first message from the queue""" try: message = self.messages.get(timeout=timeout) raw_message = str(message).encode("utf8") diff --git a/logprep/framework/pipeline_manager.py b/logprep/framework/pipeline_manager.py index b79e91a6f..a000c0159 100644 --- a/logprep/framework/pipeline_manager.py +++ b/logprep/framework/pipeline_manager.py @@ -13,82 +13,7 @@ from logprep.metrics.exporter import PrometheusExporter from logprep.metrics.metrics import CounterMetric from logprep.util.configuration import Configuration -#from logprep.util.logging import QueueListener - -import time -import logging -import logging.handlers -import threading -from queue import Empty - - -class SingleThreadQueueListener(logging.handlers.QueueListener): - """A subclass of QueueListener that uses a single thread for all queues. - - See https://github.com/python/cpython/blob/main/Lib/logging/handlers.py - for the implementation of QueueListener. - """ - monitor_thread = None - listeners = [] - sleep_time = 0.1 - - @classmethod - def _start(cls): - """Start a single thread, only if none is started.""" - if cls.monitor_thread is None or not cls.monitor_thread.is_alive(): - cls.monitor_thread = t = threading.Thread( - target=cls._monitor_all, name='logging_monitor') - t.daemon = True - t.start() - return cls.monitor_thread - - @classmethod - def _join(cls): - """Waits for the thread to stop. - Only call this after stopping all listeners. - """ - if cls.monitor_thread is not None and cls.monitor_thread.is_alive(): - cls.monitor_thread.join() - cls.monitor_thread = None - - @classmethod - def _monitor_all(cls): - """A monitor function for all the registered listeners. - Does not block when obtaining messages from the queue to give all - listeners a chance to get an item from the queue. That's why we - must sleep at every cycle. - - If a sentinel is sent, the listener is unregistered. - When all listeners are unregistered, the thread stops. - """ - noop = lambda: None - while cls.listeners: - time.sleep(cls.sleep_time) # does not block all threads - for listener in cls.listeners: - try: - # Gets all messages in this queue without blocking - task_done = getattr(listener.queue, 'task_done', noop) - while True: - record = listener.dequeue(False) - if record is listener._sentinel: - cls.listeners.remove(listener) - else: - listener.handle(record) - task_done() - except Empty: - continue - - def start(self): - """Override default implementation. - Register this listener and call class' _start() instead. - """ - SingleThreadQueueListener.listeners.append(self) - # Start if not already - SingleThreadQueueListener._start() - - def stop(self): - """Enqueues the sentinel but does not stop the thread.""" - self.enqueue_sentinel() +from logprep.util.logging_utils import SingleThreadQueueListener class PipelineManager: diff --git a/quickstart/exampledata/config/http_pipeline.yml b/quickstart/exampledata/config/http_pipeline.yml index a288ad5fc..82949c035 100644 --- a/quickstart/exampledata/config/http_pipeline.yml +++ b/quickstart/exampledata/config/http_pipeline.yml @@ -8,6 +8,9 @@ metrics: input: httpinput: type: http_input + message_backlog_size: 15000000 + collect_meta: True + metafield_name: "@metadata" uvicorn_config: host: 0.0.0.0 port: 9000 diff --git a/tests/unit/connector/test_http_input.py b/tests/unit/connector/test_http_input.py index 82cb0e062..f11f101ed 100644 --- a/tests/unit/connector/test_http_input.py +++ b/tests/unit/connector/test_http_input.py @@ -24,6 +24,8 @@ def setup_method(self): CONFIG: dict = { "type": "http_input", "message_backlog_size": 15000, + "collect_meta": False, + "metafield_name": "@metadata", "uvicorn_config": {"port": 9000, "host": "127.0.0.1"}, "endpoints": { "/json": "json", @@ -121,12 +123,12 @@ def test_jsonl_messages_are_put_in_queue(self): resp = requests.post(url=self.target + "/jsonl", data=data, timeout=0.5) assert resp.status_code == 200 assert self.object.messages.qsize() == 3 + event_from_queue = self.object.messages.get(timeout=1) + assert event_from_queue["message"] == "my first log message" event_from_queue = self.object.messages.get(timeout=0.001) - assert event_from_queue == {"message": "my first log message"} + assert event_from_queue["message"] == "my second log message" event_from_queue = self.object.messages.get(timeout=0.001) - assert event_from_queue == {"message": "my second log message"} - event_from_queue = self.object.messages.get(timeout=0.001) - assert event_from_queue == {"message": "my third log message"} + assert event_from_queue["message"] == "my third log message" def test_get_next_returns_message_from_queue(self): data = {"message": "my log message"}