Skip to content

Commit

Permalink
add metadata - WIP and add singlethreadqueuelistener for testing
Browse files Browse the repository at this point in the history
  • Loading branch information
david committed Mar 25, 2024
1 parent 5ce5203 commit d76d73c
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 16 deletions.
10 changes: 2 additions & 8 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,11 @@ jobs:
pip install --upgrade pip wheel
pip install .[dev]
- name: Perform unit tests without http
- name: Perform unit tests
env:
PYTEST_ADDOPTS: "--color=yes"
run: |
pytest -vv tests/unit -k "not test_http"
- name: Perform unit tests for http
env:
PYTEST_ADDOPTS: "--color=yes"
run: |
pytest -vv tests/unit -k "test_http"
pytest -vv tests/unit
- name: Perform acceptance tests
env:
Expand Down
50 changes: 43 additions & 7 deletions logprep/connector/http/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
from logging import Logger
import logging
import re
from typing import Mapping, Tuple, Union, Callable
import msgspec
import uvicorn
from typing import Mapping, Tuple, Union, Callable
from attrs import define, field, validators
import falcon.asgi
from falcon import HTTPTooManyRequests, HTTPMethodNotAllowed # pylint: disable=no-name-in-module
Expand Down Expand Up @@ -109,11 +109,17 @@ async def func_wrapper(*args, **kwargs):
return func_wrapper


class HTTPEvent(msgspec.Struct):
"""Eventdata and Metadata from HTTP Input"""
metadata: set[str] = set()
event: set[str] = set()

def to_dict(self):
return {f: getattr(self, f) for f in self.__struct_fields__}

class HttpEndpoint(ABC):
"""interface for http endpoints"""

messages: queue.Queue

def __init__(self, messages: queue.Queue) -> None:
self.messages = messages

Expand All @@ -128,7 +134,15 @@ async def __call__(self, req, resp): # pylint: disable=arguments-differ
"""json endpoint method"""
data = await req.stream.read()
data = data.decode("utf8")
self.messages.put(self._decoder.decode(data), block=False)
if data:
event = HTTPEvent(
metadata={
"url":req.url,
"remote_addr":req.remote_addr,
"user_agent":req.user_agent
},
event=self._decoder.decode(data))
self.messages.put(event.to_dict(), block=False)


class JSONLHttpEndpoint(HttpEndpoint):
Expand All @@ -141,11 +155,18 @@ async def __call__(self, req, resp): # pylint: disable=arguments-differ
"""jsonl endpoint method"""
data = await req.stream.read()
data = data.decode("utf8")
metadata={
"url":req.url,
"remote_addr":req.remote_addr,
"user_agent":req.user_agent}
for line in data.splitlines():
line = line.strip()
if line:
event = self._decoder.decode(line)
self.messages.put(event, block=False)
event = HTTPEvent(
metadata=metadata,
event=self._decoder.decode(line)
)
self.messages.put(event.to_dict(), block=False)


class PlaintextHttpEndpoint(HttpEndpoint):
Expand All @@ -160,7 +181,22 @@ async def __call__(self, req, resp): # pylint: disable=arguments-differ


class ThreadingHTTPServer:
"""Threading Wrapper around Uvicorn Server"""
"""Singleton Wrapper Class around Uvicorn Thread that controls
lifecycle of Uvicorn HTTP Server. During Runtime this singleton object
is stateful and therefore we need to check for some attributes during
__init__ when multiple consecutive reconfigurations are happening.
Parameters
----------
connector_config: Input.Config
Holds full connector config for config change checks
endpoints_config: dict
Endpoint paths as key and initiated endpoint objects as
value
log_level: str
Log level to be set for uvicorn server
"""


_instance = None
_lock = threading.Lock()
Expand Down
78 changes: 77 additions & 1 deletion logprep/framework/pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,82 @@
from logprep.metrics.exporter import PrometheusExporter
from logprep.metrics.metrics import CounterMetric
from logprep.util.configuration import Configuration
#from logprep.util.logging import QueueListener

import time
import logging
import logging.handlers
import threading
from queue import Empty


class SingleThreadQueueListener(logging.handlers.QueueListener):
"""A subclass of QueueListener that uses a single thread for all queues.
See https://github.com/python/cpython/blob/main/Lib/logging/handlers.py
for the implementation of QueueListener.
"""
monitor_thread = None
listeners = []
sleep_time = 0.1

@classmethod
def _start(cls):
"""Start a single thread, only if none is started."""
if cls.monitor_thread is None or not cls.monitor_thread.is_alive():
cls.monitor_thread = t = threading.Thread(
target=cls._monitor_all, name='logging_monitor')
t.daemon = True
t.start()
return cls.monitor_thread

@classmethod
def _join(cls):
"""Waits for the thread to stop.
Only call this after stopping all listeners.
"""
if cls.monitor_thread is not None and cls.monitor_thread.is_alive():
cls.monitor_thread.join()
cls.monitor_thread = None

@classmethod
def _monitor_all(cls):
"""A monitor function for all the registered listeners.
Does not block when obtaining messages from the queue to give all
listeners a chance to get an item from the queue. That's why we
must sleep at every cycle.
If a sentinel is sent, the listener is unregistered.
When all listeners are unregistered, the thread stops.
"""
noop = lambda: None
while cls.listeners:
time.sleep(cls.sleep_time) # does not block all threads
for listener in cls.listeners:
try:
# Gets all messages in this queue without blocking
task_done = getattr(listener.queue, 'task_done', noop)
while True:
record = listener.dequeue(False)
if record is listener._sentinel:
cls.listeners.remove(listener)
else:
listener.handle(record)
task_done()
except Empty:
continue

def start(self):
"""Override default implementation.
Register this listener and call class' _start() instead.
"""
SingleThreadQueueListener.listeners.append(self)
# Start if not already
SingleThreadQueueListener._start()

def stop(self):
"""Enqueues the sentinel but does not stop the thread."""
self.enqueue_sentinel()


class PipelineManager:
Expand Down Expand Up @@ -50,7 +126,7 @@ def __init__(self, configuration: Configuration):
self.metrics = self.Metrics(labels={"component": "manager"})
self._logger = logging.getLogger("Logprep PipelineManager")
self.log_queue = multiprocessing.Queue(-1)
self._queue_listener = logging.handlers.QueueListener(self.log_queue)
self._queue_listener = SingleThreadQueueListener(self.log_queue)
self._queue_listener.start()

self._pipelines: list[multiprocessing.Process] = []
Expand Down
1 change: 1 addition & 0 deletions quickstart/exampledata/config/http_pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ input:
host: 0.0.0.0
port: 9000
endpoints:
/json: json
/lab/123/(first|second|third)/js.*: jsonl
/lab/123/(ABC|DEF)/pl.*: plaintext
output:
Expand Down

0 comments on commit d76d73c

Please sign in to comment.