diff --git a/CHANGELOG.md b/CHANGELOG.md index c4a34c054..22eb0b29e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,10 @@ ### Breaking ### Features ### Improvements +### Bugfix + +## 13.1.1 +### Improvements * adds ability to bypass the processing of events if there is no pipeline. This is useful for pure connector deployments. * adds experimental feature to bypass the rule tree by setting `LOGPREP_BYPASS_RULE_TREE` environment variable @@ -11,6 +15,7 @@ ### Bugfix * fixes a bug in the `http_output` used by the http generator, where the timeout parameter does only set the read_timeout not the write_timeout +* fixes a bug in the `http_input` not handling decode errors ## 13.1.0 ### Features diff --git a/logprep/connector/http/input.py b/logprep/connector/http/input.py index c60b527c6..96e26b112 100644 --- a/logprep/connector/http/input.py +++ b/logprep/connector/http/input.py @@ -91,6 +91,7 @@ from attrs import define, field, validators from falcon import ( # pylint: disable=no-name-in-module HTTP_200, + HTTPBadRequest, HTTPMethodNotAllowed, HTTPTooManyRequests, HTTPUnauthorized, @@ -140,8 +141,12 @@ async def func_wrapper(*args, **kwargs): return else: raise HTTPMethodNotAllowed(["POST"]) - except queue.Full as exc: - raise HTTPTooManyRequests(description="Logprep Message Queue is full.") from exc + except HTTPUnauthorized as error: + raise error from error + except queue.Full as error: + raise HTTPTooManyRequests(description="Logprep Message Queue is full.") from error + except Exception as error: # pylint: disable=broad-except + raise HTTPBadRequest(str(error)) from error return func_wrapper return func_wrapper @@ -272,7 +277,8 @@ class JSONLHttpEndpoint(HttpEndpoint): async def __call__(self, req, resp, **kwargs): # pylint: disable=arguments-differ """jsonl endpoint method""" self.collect_metrics() - events = self._decoder.decode_lines(await self.get_data(req)) + data = await self.get_data(req) + events = self._decoder.decode_lines(data) for event in events: self.messages.put(event | kwargs["metadata"], block=False, batch_size=len(events)) diff --git a/tests/unit/connector/test_http_input.py b/tests/unit/connector/test_http_input.py index 09379815c..b2e9c88b7 100644 --- a/tests/unit/connector/test_http_input.py +++ b/tests/unit/connector/test_http_input.py @@ -4,6 +4,7 @@ import gzip import json import multiprocessing +import queue import random import re from copy import deepcopy @@ -116,16 +117,10 @@ def test_get_method_returns_200_with_authentication(self): def test_get_error_code_too_many_requests(self): data = {"message": "my log message"} + self.object.messages.put = mock.MagicMock() + self.object.messages.put.side_effect = queue.Full() session = requests.Session() - for _ in range(100): - resp = session.post(url=f"{self.target}/json", json=data, timeout=0.5) - assert self.object.messages.qsize() == 100 - resp = requests.post(url=f"{self.target}/json", json=data, timeout=0.5) - assert self.object.messages._maxsize == 100 - assert resp.status_code == 429 - for _ in range(100): - resp = session.post(url=f"{self.target}/json", json=data, timeout=0.5) - resp = requests.get(url=f"{self.target}/json", json=data, timeout=0.5) + resp = session.post(url=f"{self.target}/json", json=data, timeout=0.5) assert resp.status_code == 429 def test_json_endpoint_accepts_post_request(self): @@ -322,26 +317,71 @@ def test_get_next_with_hmac_of_raw_message(self): connector_next_msg, _ = connector.get_next(1) assert connector_next_msg == expected_event, "Output event with hmac is not as expected" - def test_endpoint_has_basic_auth(self, credentials_file_path): + def test_endpoint_returns_401_if_authorization_not_provided(self, credentials_file_path): mock_env = {ENV_NAME_LOGPREP_CREDENTIALS_FILE: credentials_file_path} + data = {"message": "my log message"} with mock.patch.dict("os.environ", mock_env): new_connector = Factory.create({"test connector": self.CONFIG}) new_connector.pipeline_index = 1 new_connector.setup() - resp = requests.post(url=f"{self.target}/auth-json-file", timeout=0.5) + resp = requests.post( + url=f"{self.target}/auth-json-file", timeout=0.5, data=json.dumps(data) + ) assert resp.status_code == 401 + + def test_endpoint_returns_401_on_wrong_authorization(self, credentials_file_path): + mock_env = {ENV_NAME_LOGPREP_CREDENTIALS_FILE: credentials_file_path} + data = {"message": "my log message"} + with mock.patch.dict("os.environ", mock_env): + new_connector = Factory.create({"test connector": self.CONFIG}) + new_connector.pipeline_index = 1 + new_connector.setup() basic = HTTPBasicAuth("wrong", "credentials") - resp = requests.post(url=f"{self.target}/auth-json-file", auth=basic, timeout=0.5) + resp = requests.post( + url=f"{self.target}/auth-json-file", auth=basic, timeout=0.5, json=data + ) assert resp.status_code == 401 + + def test_endpoint_returns_200_on_correct_authorization_with_password_from_file( + self, credentials_file_path + ): + mock_env = {ENV_NAME_LOGPREP_CREDENTIALS_FILE: credentials_file_path} + data = {"message": "my log message"} + with mock.patch.dict("os.environ", mock_env): + new_connector = Factory.create({"test connector": self.CONFIG}) + new_connector.pipeline_index = 1 + new_connector.setup() basic = HTTPBasicAuth("user", "file_password") - resp = requests.post(url=f"{self.target}/auth-json-file", auth=basic, timeout=0.5) + resp = requests.post( + url=f"{self.target}/auth-json-file", auth=basic, timeout=0.5, json=data + ) assert resp.status_code == 200 + + def test_endpoint_returns_200_on_correct_authorization_with_password_within_credentials_file( + self, credentials_file_path + ): + mock_env = {ENV_NAME_LOGPREP_CREDENTIALS_FILE: credentials_file_path} + data = {"message": "my log message"} + with mock.patch.dict("os.environ", mock_env): + new_connector = Factory.create({"test connector": self.CONFIG}) + new_connector.pipeline_index = 1 + new_connector.setup() basic = HTTPBasicAuth("user", "secret_password") - resp = requests.post(url=f"{self.target}/auth-json-secret", auth=basic, timeout=0.5) + resp = requests.post( + url=f"{self.target}/auth-json-secret", auth=basic, timeout=0.5, json=data + ) assert resp.status_code == 200 + + def test_endpoint_returns_200_on_correct_authorization_for_subpath(self, credentials_file_path): + mock_env = {ENV_NAME_LOGPREP_CREDENTIALS_FILE: credentials_file_path} + data = {"message": "my log message"} + with mock.patch.dict("os.environ", mock_env): + new_connector = Factory.create({"test connector": self.CONFIG}) + new_connector.pipeline_index = 1 + new_connector.setup() basic = HTTPBasicAuth("user", "password") resp = requests.post( - url=f"{self.target}/auth-json-secret/AB/json", auth=basic, timeout=0.5 + url=f"{self.target}/auth-json-secret/AB/json", auth=basic, timeout=0.5, json=data ) assert resp.status_code == 200 @@ -408,3 +448,9 @@ def test_endpoint_handles_gzip_compression(self, endpoint): timeout=0.5, ) assert resp.status_code == 200 + + @pytest.mark.parametrize("endpoint", ["json", "jsonl"]) + def test_raises_http_bad_request_on_decode_error(self, endpoint): + data = "this is not a valid json nor jsonl" + resp = requests.post(url=f"{self.target}/{endpoint}", data=data, timeout=0.5) + assert resp.status_code == 400