Skip to content

Commit

Permalink
add tests for too_many_request and metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
david committed Mar 26, 2024
1 parent b7a20c5 commit e9948e3
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 19 deletions.
53 changes: 35 additions & 18 deletions logprep/connector/http/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,14 @@
]

# Config Parts that's checked for Config Change
HTTP_INPUT_CONFIG_KEYS = ["preprocessing", "uvicorn_config", "endpoints"]
HTTP_INPUT_CONFIG_KEYS = [
"preprocessing",
"uvicorn_config",
"endpoints",
"collect_meta",
"metafield_name",
"message_backlog_size",
]


def decorator_request_exceptions(func: Callable):
Expand Down Expand Up @@ -99,17 +106,23 @@ def has_config_changed(
For sake of simplicity JSON Strings are compared instead
of compare nested dict key-values
"""
all_dicts = {}
all_dicts["old"] = old_config
all_dicts["new"] = new_config
all_dicts["old_c"] = {}
all_dicts["new_c"] = {}
print(dir(old_config))
if not new_config:
return True
old_config_dict = {}
new_config_dict = {}
for key in check_attrs:
old_config_dict.update(getattr(old_config, key))
new_config_dict.update(getattr(new_config, key))
cur_json = json.dumps(old_config_dict, sort_keys=True)
new_json = json.dumps(new_config_dict, sort_keys=True)

if cur_json == new_json:
for version in ["new", "old"]:
for key in check_attrs:
if isinstance(getattr(old_config, key), dict):
all_dicts[version + "_c"].update(getattr(all_dicts[version], key))
else:
all_dicts[version + "_c"].update({key: getattr(all_dicts[version], key)})
old_json = json.dumps(all_dicts["old_c"], sort_keys=True)
new_json = json.dumps(all_dicts["new_c"], sort_keys=True)
if old_json == new_json:
return False
return True

Expand All @@ -125,7 +138,8 @@ def route_compile_helper(input_re_str: str):


class HttpEndpoint(ABC):
"""Interface for http endpoints
"""Interface for http endpoints.
Additional functionality is added to child classes via removable decorators.
Parameters
----------
Expand Down Expand Up @@ -171,15 +185,12 @@ async def __call__(self, req, resp, **kwargs): # pylint: disable=arguments-diff
"""jsonl endpoint method"""
data = await req.stream.read()
data = data.decode("utf8")
event = kwargs.get("metadata", {})
for line in data.splitlines():
line = line.strip()
if line:
event = kwargs.get("metadata", {})
dec_line = self._decoder.decode(line)
event.update(dec_line)
print("Event: " + str(event))
print("Line: " + str(dec_line))
self.messages.put(event, block=False)
event.update(self._decoder.decode(line))
self.messages.put(dict(event), block=False)


class PlaintextHttpEndpoint(HttpEndpoint):
Expand Down Expand Up @@ -391,6 +402,11 @@ def __init__(self, name: str, configuration: "HttpConnector.Config", logger: Log
self.target = "http://" + self.host + ":" + str(self.port)

def setup(self):
"""setup starts the actual functionality of this connector.
By checking against pipeline_index we're assuring this connector
only runs a single time for multiple processes.
"""

super().setup()
if not hasattr(self, "pipeline_index"):
raise FatalInputError(
Expand All @@ -407,6 +423,7 @@ def setup(self):
endpoints_config = {}
collect_meta = self._config.collect_meta
metafield_name = self._config.metafield_name
# preparing dict with endpoint paths and initialized endpoints objects
for endpoint_path, endpoint_name in self._config.endpoints.items():
endpoint_class = self._endpoint_registry.get(endpoint_name)
endpoints_config[endpoint_path] = endpoint_class(
Expand Down Expand Up @@ -440,5 +457,5 @@ def shut_down(self):
"""Raises Uvicorn HTTP Server internal stop flag and waits to join"""
try:
self.http_server.shut_down()
except:
except AttributeError:
pass
36 changes: 35 additions & 1 deletion tests/unit/connector/test_http_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# pylint: disable=protected-access
# pylint: disable=attribute-defined-outside-init
from copy import deepcopy
from concurrent.futures import ThreadPoolExecutor
import requests
import uvicorn
import falcon
Expand All @@ -23,7 +24,7 @@ def setup_method(self):

CONFIG: dict = {
"type": "http_input",
"message_backlog_size": 15000,
"message_backlog_size": 100,
"collect_meta": False,
"metafield_name": "@metadata",
"uvicorn_config": {"port": 9000, "host": "127.0.0.1"},
Expand All @@ -50,6 +51,23 @@ def test_get_error_code_on_get(self):
resp = requests.get(url=self.target + "/json", timeout=0.5)
assert resp.status_code == 405

def test_get_error_code_too_many_requests(self):
data = {"message": "my log message"}
session = requests.Session()
session.mount(
"http://",
requests.adapters.HTTPAdapter(pool_maxsize=20, max_retries=3, pool_block=True),
)

def get_url(url):
for _ in range(100):
_ = session.post(url, json=data)

with ThreadPoolExecutor(max_workers=100) as executor:
executor.submit(get_url, self.target + "/json")
resp = requests.post(url=self.target + "/json", json=data, timeout=0.5)
assert resp.status_code == 429

def test_json_endpoint_accepts_post_request(self):
data = {"message": "my log message"}
resp = requests.post(url=self.target + "/json", json=data, timeout=0.5)
Expand Down Expand Up @@ -176,6 +194,22 @@ def test_server_starts_threaded_server(self):
requests.post(url=self.target + "/json", json=message, timeout=0.5) # nosemgrep
assert self.object.messages.qsize() == 100, "messages are put to queue"

def test_get_metadata(self):
message = {"message": "my message"}
connector_config = deepcopy(self.CONFIG)
connector_config["collect_meta"] = True
connector_config["metafield_name"] = "custom"
connector = Factory.create({"test connector": connector_config}, logger=self.logger)
connector.pipeline_index = 1
connector.setup()
target = connector.target
resp = requests.post(url=target + "/json", json=message, timeout=0.5) # nosemgrep
assert resp.status_code == 200
message = connector.messages.get(timeout=0.5)
assert message["custom"]["url"] == target + "/json"
assert message["custom"]["remote_addr"] == connector.host
assert isinstance(message["custom"]["user_agent"], str)

def test_server_multiple_config_changes(self):
message = {"message": "my message"}
connector_config = deepcopy(self.CONFIG)
Expand Down

0 comments on commit e9948e3

Please sign in to comment.