Skip to content

Commit

Permalink
add message_backlog_size and metadata add
Browse files Browse the repository at this point in the history
  • Loading branch information
david committed Mar 26, 2024
1 parent d76d73c commit b7a20c5
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 143 deletions.
154 changes: 91 additions & 63 deletions logprep/connector/http/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
input:
myhttpinput:
type: http_input
message_backlog_size: 15000
collect_meta: False
metafield_name: "@metadata"
uvicorn_config:
host: 0.0.0.0
port: 9000
Expand All @@ -30,10 +33,10 @@
from logging import Logger
import logging
import re
import msgspec
import uvicorn
from typing import Mapping, Tuple, Union, Callable
from attrs import define, field, validators
import msgspec
import uvicorn
import falcon.asgi
from falcon import HTTPTooManyRequests, HTTPMethodNotAllowed # pylint: disable=no-name-in-module
from logprep.abc.input import FatalInputError, Input
Expand All @@ -48,13 +51,41 @@
HTTP_INPUT_CONFIG_KEYS = ["preprocessing", "uvicorn_config", "endpoints"]


def threadsafe_wrapper(func):
"""Decorator making sure that the decorated function is thread safe"""
lock = threading.Lock()
def decorator_request_exceptions(func: Callable):
"""Decorator to wrap http calls and raise exceptions"""

def func_wrapper(*args, **kwargs):
with lock:
func_wrapper = func(*args, **kwargs)
async def func_wrapper(*args, **kwargs):
try:
if args[1].method == "POST":
func_wrapper = await func(*args, **kwargs)
else:
raise HTTPMethodNotAllowed(["POST"])
except queue.Full as exc:
raise HTTPTooManyRequests(description="Logprep Message Queue is full.") from exc
return func_wrapper

return func_wrapper


def decorator_add_metadata(func: Callable):
"""Decorator to add metadata to resulting http event.
Uses attribute collect_meta of endpoint class to decide over metadata collection
Uses attribute metafield_name to define key name for metadata
"""

async def func_wrapper(*args, **kwargs):
req = args[1]
endpoint = args[0]
if endpoint.collect_meta:
metadata = {
"url": req.url,
"remote_addr": req.remote_addr,
"user_agent": req.user_agent,
}
kwargs["metadata"] = {endpoint.metafield_name: metadata}
else:
kwargs["metadata"] = {}
func_wrapper = await func(*args, **kwargs)
return func_wrapper

return func_wrapper
Expand Down Expand Up @@ -93,35 +124,23 @@ def route_compile_helper(input_re_str: str):
return re.compile(input_re_str)


def decorator_request_exceptions(func: Callable):
"""Decorator to wrap http calls and raise exceptions"""

async def func_wrapper(*args, **kwargs):
try:
if args[1].method == "POST":
func_wrapper = await func(*args, **kwargs)
else:
raise HTTPMethodNotAllowed(["POST"])
except queue.Full as exc:
raise HTTPTooManyRequests(description="Logprep Message Queue is full.") from exc
return func_wrapper

return func_wrapper


class HTTPEvent(msgspec.Struct):
"""Eventdata and Metadata from HTTP Input"""
metadata: set[str] = set()
event: set[str] = set()

def to_dict(self):
return {f: getattr(self, f) for f in self.__struct_fields__}

class HttpEndpoint(ABC):
"""interface for http endpoints"""
"""Interface for http endpoints
Parameters
----------
messages: queue.Queue
Input Events are put here
collect_meta: bool
Collects Metadata on True (default)
metafield_name: str
Defines key name for metadata
"""

def __init__(self, messages: queue.Queue) -> None:
def __init__(self, messages: queue.Queue, collect_meta: bool, metafield_name: str) -> None:
self.messages = messages
self.collect_meta = collect_meta
self.metafield_name = metafield_name


class JSONHttpEndpoint(HttpEndpoint):
Expand All @@ -130,19 +149,15 @@ class JSONHttpEndpoint(HttpEndpoint):
_decoder = msgspec.json.Decoder()

@decorator_request_exceptions
async def __call__(self, req, resp): # pylint: disable=arguments-differ
@decorator_add_metadata
async def __call__(self, req, resp, **kwargs): # pylint: disable=arguments-differ
"""json endpoint method"""
data = await req.stream.read()
data = data.decode("utf8")
event = kwargs.get("metadata", {})
if data:
event = HTTPEvent(
metadata={
"url":req.url,
"remote_addr":req.remote_addr,
"user_agent":req.user_agent
},
event=self._decoder.decode(data))
self.messages.put(event.to_dict(), block=False)
event.update(self._decoder.decode(data))
self.messages.put(event, block=False)


class JSONLHttpEndpoint(HttpEndpoint):
Expand All @@ -151,33 +166,34 @@ class JSONLHttpEndpoint(HttpEndpoint):
_decoder = msgspec.json.Decoder()

@decorator_request_exceptions
async def __call__(self, req, resp): # pylint: disable=arguments-differ
@decorator_add_metadata
async def __call__(self, req, resp, **kwargs): # pylint: disable=arguments-differ
"""jsonl endpoint method"""
data = await req.stream.read()
data = data.decode("utf8")
metadata={
"url":req.url,
"remote_addr":req.remote_addr,
"user_agent":req.user_agent}
for line in data.splitlines():
line = line.strip()
if line:
event = HTTPEvent(
metadata=metadata,
event=self._decoder.decode(line)
)
self.messages.put(event.to_dict(), block=False)
event = kwargs.get("metadata", {})
dec_line = self._decoder.decode(line)
event.update(dec_line)
print("Event: " + str(event))
print("Line: " + str(dec_line))
self.messages.put(event, block=False)


class PlaintextHttpEndpoint(HttpEndpoint):
""":code:`plaintext` endpoint to get the body from request
and put it in :code:`message` field"""

@decorator_request_exceptions
async def __call__(self, req, resp): # pylint: disable=arguments-differ
@decorator_add_metadata
async def __call__(self, req, resp, **kwargs): # pylint: disable=arguments-differ
"""plaintext endpoint method"""
data = await req.stream.read()
self.messages.put({"message": data.decode("utf8")})
event = kwargs.get("metadata", {})
event.update({"message": data.decode("utf8")})
self.messages.put(event, block=False)


class ThreadingHTTPServer:
Expand All @@ -197,7 +213,6 @@ class ThreadingHTTPServer:
Log level to be set for uvicorn server
"""


_instance = None
_lock = threading.Lock()

Expand Down Expand Up @@ -246,15 +261,13 @@ def __init__(
self.thread = threading.Thread(daemon=False, target=self.server.run)
self._start()

@threadsafe_wrapper
def _start(self):
"""Start thread with uvicorn+falcon http server and wait
until it is up (started)"""
self.thread.start()
while not self.server.started:
continue

@threadsafe_wrapper
def _stop(self):
"""Stop thread with uvicorn+falcon http server, wait for uvicorn
to exit gracefully and join the thread"""
Expand All @@ -265,7 +278,6 @@ def _stop(self):
continue
self.thread.join()

@threadsafe_wrapper
def _init_log_config(self) -> dict:
"""Use for Uvicorn same log formatter like for Logprep"""
log_config = uvicorn.config.LOGGING_CONFIG
Expand All @@ -274,7 +286,6 @@ def _init_log_config(self) -> dict:
log_config["handlers"]["default"]["stream"] = "ext://sys.stdout"
return log_config

@threadsafe_wrapper
def _override_runtime_logging(self):
"""Uvicorn doesn't provide API to change name and handler beforehand
needs to be done during runtime"""
Expand All @@ -292,7 +303,6 @@ def _init_web_application_server(self, endpoints_config: dict) -> None:
for endpoint_path, endpoint in endpoints_config.items():
self.app.add_sink(endpoint, prefix=route_compile_helper(endpoint_path))

@threadsafe_wrapper
def shut_down(self):
"""Shutdown method to trigger http server shutdown externally"""
self._stop()
Expand Down Expand Up @@ -350,6 +360,20 @@ class Config(Input.Config):
message_backlog_size: int = field(
validator=validators.instance_of((int, float)), default=15000
)
"""Configures maximum size of input message queue for this connector. When limit is reached
the server will answer with 429 Too Many Requests. For reasonable throughput this shouldn't
be smaller than default value.
"""

collect_meta: str = field(validator=validators.instance_of(bool), default=True)
"""Defines if metadata should be collected in format
``{metafield_name:{"url":"", "remote_addr":"", "user_agent":""}}``:
- ``True``: Collect metadata
- ``False``: Won't collect metadata
"""

metafield_name: str = field(validator=validators.instance_of(str), default="@metadata")
"""Defines the name of the key for the collected metadata fields"""

__slots__ = []

Expand Down Expand Up @@ -381,9 +405,13 @@ def setup(self):
) # pylint: disable=attribute-defined-outside-init

endpoints_config = {}
collect_meta = self._config.collect_meta
metafield_name = self._config.metafield_name
for endpoint_path, endpoint_name in self._config.endpoints.items():
endpoint_class = self._endpoint_registry.get(endpoint_name)
endpoints_config[endpoint_path] = endpoint_class(self.messages)
endpoints_config[endpoint_path] = endpoint_class(
self.messages, collect_meta, metafield_name
)

self.http_server = ThreadingHTTPServer( # pylint: disable=attribute-defined-outside-init
connector_config=self._config,
Expand All @@ -392,7 +420,7 @@ def setup(self):
)

def _get_event(self, timeout: float) -> Tuple:
"""returns the first message from the queue"""
"""Returns the first message from the queue"""
try:
message = self.messages.get(timeout=timeout)
raw_message = str(message).encode("utf8")
Expand Down
77 changes: 1 addition & 76 deletions logprep/framework/pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,82 +13,7 @@
from logprep.metrics.exporter import PrometheusExporter
from logprep.metrics.metrics import CounterMetric
from logprep.util.configuration import Configuration
#from logprep.util.logging import QueueListener

import time
import logging
import logging.handlers
import threading
from queue import Empty


class SingleThreadQueueListener(logging.handlers.QueueListener):
"""A subclass of QueueListener that uses a single thread for all queues.
See https://github.com/python/cpython/blob/main/Lib/logging/handlers.py
for the implementation of QueueListener.
"""
monitor_thread = None
listeners = []
sleep_time = 0.1

@classmethod
def _start(cls):
"""Start a single thread, only if none is started."""
if cls.monitor_thread is None or not cls.monitor_thread.is_alive():
cls.monitor_thread = t = threading.Thread(
target=cls._monitor_all, name='logging_monitor')
t.daemon = True
t.start()
return cls.monitor_thread

@classmethod
def _join(cls):
"""Waits for the thread to stop.
Only call this after stopping all listeners.
"""
if cls.monitor_thread is not None and cls.monitor_thread.is_alive():
cls.monitor_thread.join()
cls.monitor_thread = None

@classmethod
def _monitor_all(cls):
"""A monitor function for all the registered listeners.
Does not block when obtaining messages from the queue to give all
listeners a chance to get an item from the queue. That's why we
must sleep at every cycle.
If a sentinel is sent, the listener is unregistered.
When all listeners are unregistered, the thread stops.
"""
noop = lambda: None
while cls.listeners:
time.sleep(cls.sleep_time) # does not block all threads
for listener in cls.listeners:
try:
# Gets all messages in this queue without blocking
task_done = getattr(listener.queue, 'task_done', noop)
while True:
record = listener.dequeue(False)
if record is listener._sentinel:
cls.listeners.remove(listener)
else:
listener.handle(record)
task_done()
except Empty:
continue

def start(self):
"""Override default implementation.
Register this listener and call class' _start() instead.
"""
SingleThreadQueueListener.listeners.append(self)
# Start if not already
SingleThreadQueueListener._start()

def stop(self):
"""Enqueues the sentinel but does not stop the thread."""
self.enqueue_sentinel()
from logprep.util.logging_utils import SingleThreadQueueListener


class PipelineManager:
Expand Down
3 changes: 3 additions & 0 deletions quickstart/exampledata/config/http_pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ metrics:
input:
httpinput:
type: http_input
message_backlog_size: 15000000
collect_meta: True
metafield_name: "@metadata"
uvicorn_config:
host: 0.0.0.0
port: 9000
Expand Down
Loading

0 comments on commit b7a20c5

Please sign in to comment.