diff --git a/logprep/connector/http/input.py b/logprep/connector/http/input.py index a63420b69..5a920aa02 100644 --- a/logprep/connector/http/input.py +++ b/logprep/connector/http/input.py @@ -48,7 +48,14 @@ ] # Config Parts that's checked for Config Change -HTTP_INPUT_CONFIG_KEYS = ["preprocessing", "uvicorn_config", "endpoints"] +HTTP_INPUT_CONFIG_KEYS = [ + "preprocessing", + "uvicorn_config", + "endpoints", + "collect_meta", + "metafield_name", + "message_backlog_size", +] def decorator_request_exceptions(func: Callable): @@ -99,17 +106,23 @@ def has_config_changed( For sake of simplicity JSON Strings are compared instead of compare nested dict key-values """ + all_dicts = {} + all_dicts["old"] = old_config + all_dicts["new"] = new_config + all_dicts["old_c"] = {} + all_dicts["new_c"] = {} + print(dir(old_config)) if not new_config: return True - old_config_dict = {} - new_config_dict = {} - for key in check_attrs: - old_config_dict.update(getattr(old_config, key)) - new_config_dict.update(getattr(new_config, key)) - cur_json = json.dumps(old_config_dict, sort_keys=True) - new_json = json.dumps(new_config_dict, sort_keys=True) - - if cur_json == new_json: + for version in ["new", "old"]: + for key in check_attrs: + if isinstance(getattr(old_config, key), dict): + all_dicts[version + "_c"].update(getattr(all_dicts[version], key)) + else: + all_dicts[version + "_c"].update({key: getattr(all_dicts[version], key)}) + old_json = json.dumps(all_dicts["old_c"], sort_keys=True) + new_json = json.dumps(all_dicts["new_c"], sort_keys=True) + if old_json == new_json: return False return True @@ -125,7 +138,8 @@ def route_compile_helper(input_re_str: str): class HttpEndpoint(ABC): - """Interface for http endpoints + """Interface for http endpoints. + Additional functionality is added to child classes via removable decorators. Parameters ---------- @@ -171,15 +185,12 @@ async def __call__(self, req, resp, **kwargs): # pylint: disable=arguments-diff """jsonl endpoint method""" data = await req.stream.read() data = data.decode("utf8") + event = kwargs.get("metadata", {}) for line in data.splitlines(): line = line.strip() if line: - event = kwargs.get("metadata", {}) - dec_line = self._decoder.decode(line) - event.update(dec_line) - print("Event: " + str(event)) - print("Line: " + str(dec_line)) - self.messages.put(event, block=False) + event.update(self._decoder.decode(line)) + self.messages.put(dict(event), block=False) class PlaintextHttpEndpoint(HttpEndpoint): @@ -391,6 +402,11 @@ def __init__(self, name: str, configuration: "HttpConnector.Config", logger: Log self.target = "http://" + self.host + ":" + str(self.port) def setup(self): + """setup starts the actual functionality of this connector. + By checking against pipeline_index we're assuring this connector + only runs a single time for multiple processes. + """ + super().setup() if not hasattr(self, "pipeline_index"): raise FatalInputError( @@ -407,6 +423,7 @@ def setup(self): endpoints_config = {} collect_meta = self._config.collect_meta metafield_name = self._config.metafield_name + # preparing dict with endpoint paths and initialized endpoints objects for endpoint_path, endpoint_name in self._config.endpoints.items(): endpoint_class = self._endpoint_registry.get(endpoint_name) endpoints_config[endpoint_path] = endpoint_class( @@ -440,5 +457,5 @@ def shut_down(self): """Raises Uvicorn HTTP Server internal stop flag and waits to join""" try: self.http_server.shut_down() - except: + except AttributeError: pass diff --git a/tests/unit/connector/test_http_input.py b/tests/unit/connector/test_http_input.py index f11f101ed..9c87ded35 100644 --- a/tests/unit/connector/test_http_input.py +++ b/tests/unit/connector/test_http_input.py @@ -2,6 +2,7 @@ # pylint: disable=protected-access # pylint: disable=attribute-defined-outside-init from copy import deepcopy +from concurrent.futures import ThreadPoolExecutor import requests import uvicorn import falcon @@ -23,7 +24,7 @@ def setup_method(self): CONFIG: dict = { "type": "http_input", - "message_backlog_size": 15000, + "message_backlog_size": 100, "collect_meta": False, "metafield_name": "@metadata", "uvicorn_config": {"port": 9000, "host": "127.0.0.1"}, @@ -50,6 +51,23 @@ def test_get_error_code_on_get(self): resp = requests.get(url=self.target + "/json", timeout=0.5) assert resp.status_code == 405 + def test_get_error_code_too_many_requests(self): + data = {"message": "my log message"} + session = requests.Session() + session.mount( + "http://", + requests.adapters.HTTPAdapter(pool_maxsize=20, max_retries=3, pool_block=True), + ) + + def get_url(url): + for _ in range(100): + _ = session.post(url, json=data) + + with ThreadPoolExecutor(max_workers=100) as executor: + executor.submit(get_url, self.target + "/json") + resp = requests.post(url=self.target + "/json", json=data, timeout=0.5) + assert resp.status_code == 429 + def test_json_endpoint_accepts_post_request(self): data = {"message": "my log message"} resp = requests.post(url=self.target + "/json", json=data, timeout=0.5) @@ -176,6 +194,22 @@ def test_server_starts_threaded_server(self): requests.post(url=self.target + "/json", json=message, timeout=0.5) # nosemgrep assert self.object.messages.qsize() == 100, "messages are put to queue" + def test_get_metadata(self): + message = {"message": "my message"} + connector_config = deepcopy(self.CONFIG) + connector_config["collect_meta"] = True + connector_config["metafield_name"] = "custom" + connector = Factory.create({"test connector": connector_config}, logger=self.logger) + connector.pipeline_index = 1 + connector.setup() + target = connector.target + resp = requests.post(url=target + "/json", json=message, timeout=0.5) # nosemgrep + assert resp.status_code == 200 + message = connector.messages.get(timeout=0.5) + assert message["custom"]["url"] == target + "/json" + assert message["custom"]["remote_addr"] == connector.host + assert isinstance(message["custom"]["user_agent"], str) + def test_server_multiple_config_changes(self): message = {"message": "my message"} connector_config = deepcopy(self.CONFIG)