From 34b65ecfe1e7c12b5e8c07c507a2c7282dc14a05 Mon Sep 17 00:00:00 2001 From: ekneg54 Date: Tue, 12 Nov 2024 12:03:25 +0100 Subject: [PATCH 1/4] speed up exporter tests by introducing asgiref async testing --- .vscode/settings.json | 5 +- pyproject.toml | 12 +++-- tests/unit/metrics/test_exporter.py | 83 +++++++++++++++++++---------- 3 files changed, 66 insertions(+), 34 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index 4681437a4..3d4b83d9b 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -2,6 +2,7 @@ "python.testing.pytestEnabled": true, "python.testing.unittestEnabled": false, "python.analysis.importFormat": "absolute", + "python.testing.pytestPath": "pytest", "editor.formatOnSave": true, "editor.rulers": [ 80, @@ -9,7 +10,9 @@ 120 ], "editor.codeActionsOnSave": { - "source.organizeImports": "explicit" + "source.organizeImports": "explicit", + "source.unusedImports": "always", + "source.convertImportFormat": "always" }, "autoDocstring.docstringFormat": "numpy", "files.exclude": { diff --git a/pyproject.toml b/pyproject.toml index 85d38a036..61f54fed8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -56,10 +56,10 @@ keywords = [ "logdata", ] dependencies = [ - "aiohttp>=3.9.2", # CVE-2024-23334 + "aiohttp>=3.9.2", # CVE-2024-23334 "attrs", - "certifi>=2023.7.22", # CVE-2023-37920 - "ciso8601", # fastest iso8601 datetime parser. can be removed after dropping support for python < 3.11 + "certifi>=2023.7.22", # CVE-2023-37920 + "ciso8601", # fastest iso8601 datetime parser. can be removed after dropping support for python < 3.11 "colorama", "confluent-kafka>2", "geoip2", @@ -67,7 +67,7 @@ dependencies = [ "jsonref", "luqum", "more-itertools==8.10.0", - "mysql-connector-python>=9.1.0", # CVE-2024-21272 + "mysql-connector-python>=9.1.0", # CVE-2024-21272 "numpy>=1.26.0", "opensearch-py", "prometheus_client", @@ -84,7 +84,7 @@ dependencies = [ "schedule", "tldextract", "urlextract", - "urllib3>=1.26.17", # CVE-2023-43804 + "urllib3>=1.26.17", # CVE-2023-43804 "uvicorn", "deepdiff", "msgspec", @@ -113,6 +113,8 @@ dev = [ "jinja2", "maturin", "cibuildwheel", + "asgiref", + "pytest-asyncio", ] doc = [ diff --git a/tests/unit/metrics/test_exporter.py b/tests/unit/metrics/test_exporter.py index 4ff498888..67d86ee72 100644 --- a/tests/unit/metrics/test_exporter.py +++ b/tests/unit/metrics/test_exporter.py @@ -2,14 +2,16 @@ # pylint: disable=protected-access # pylint: disable=attribute-defined-outside-init # pylint: disable=line-too-long +import asyncio import os.path from unittest import mock import pytest import requests -from prometheus_client import REGISTRY +from asgiref.testing import ApplicationCommunicator +from prometheus_client import REGISTRY, CollectorRegistry -from logprep.metrics.exporter import PrometheusExporter +from logprep.metrics.exporter import PrometheusExporter, make_patched_asgi_app from logprep.util import http from logprep.util.configuration import MetricsConfig @@ -148,38 +150,63 @@ def test_health_endpoint_calls_updated_functions(self): exporter.server.shut_down() - @pytest.mark.parametrize( - "functions, expected", - [ - ([lambda: True], 200), - ([lambda: True, lambda: True], 200), - ([lambda: False], 503), - ([lambda: False, lambda: False], 503), - ([lambda: False, lambda: True, lambda: True], 503), - ], - ) - def test_health_check_returns_status_code(self, functions, expected): + def test_health_check_returns_body_and_status_code(self): exporter = PrometheusExporter(self.metrics_config) exporter.run(daemon=False) - exporter.update_healthchecks(functions) + exporter.update_healthchecks([lambda: True]) resp = requests.get("http://localhost:8000/health", timeout=0.5) - assert resp.status_code == expected + assert resp.status_code == 200 + assert resp.content.decode() == "OK" exporter.server.shut_down() + +class TestAsgiApp: + """These tests uses the `asgiref.testing.ApplicationCommunicator` to test the ASGI app itself + For more information see: https://dokk.org/documentation/django-channels/2.4.0/topics/testing/ + """ + + def setup_method(self): + self.registry = CollectorRegistry() + self.captured_status = None + self.captured_headers = None + # Setup ASGI scope + self.scope = { + "client": ("127.0.0.1", 32767), + "headers": [], + "http_version": "1.0", + "method": "GET", + "path": "/", + "query_string": b"", + "scheme": "http", + "server": ("127.0.0.1", 80), + "type": "http", + } + self.communicator = None + + def teardown_method(self): + if self.communicator: + asyncio.get_event_loop().run_until_complete(self.communicator.wait()) + + def seed_app(self, app): + self.communicator = ApplicationCommunicator(app, self.scope) + @pytest.mark.parametrize( - "functions, expected", + "functions, expected_status, expected_body", [ - ([lambda: True], "OK"), - ([lambda: True, lambda: True], "OK"), - ([lambda: False], "FAIL"), - ([lambda: False, lambda: False], "FAIL"), - ([lambda: False, lambda: True, lambda: True], "FAIL"), + ([lambda: True], 200, b"OK"), + ([lambda: True, lambda: True], 200, b"OK"), + ([lambda: False], 503, b"FAIL"), + ([lambda: False, lambda: False], 503, b"FAIL"), + ([lambda: False, lambda: True, lambda: True], 503, b"FAIL"), ], ) - def test_health_check_returns_body(self, functions, expected): - exporter = PrometheusExporter(self.metrics_config) - exporter.run(daemon=False) - exporter.update_healthchecks(functions) - resp = requests.get("http://localhost:8000/health", timeout=0.5) - assert resp.content.decode() == expected - exporter.server.shut_down() + @pytest.mark.asyncio + async def test_asgi_app(self, functions, expected_status, expected_body): + app = make_patched_asgi_app(functions) + self.scope["path"] = "/health" + self.seed_app(app) + await self.communicator.send_input({"type": "http.request"}) + event = await self.communicator.receive_output(timeout=1) + assert event["status"] == expected_status + event = await self.communicator.receive_output(timeout=1) + assert expected_body in event["body"] From 52a8a33a9eb6f2fbe0913ba69305a04bd729d331 Mon Sep 17 00:00:00 2001 From: ekneg54 Date: Tue, 12 Nov 2024 13:51:40 +0100 Subject: [PATCH 2/4] refactor health endpoint tests * move http endpoint test as smoke test to the acceptance tests * use asgiref testing to test only the asgi app in unittests --- logprep/metrics/exporter.py | 6 +- tests/acceptance/test_full_configuration.py | 7 +- tests/unit/metrics/test_exporter.py | 95 ++++++++++----------- 3 files changed, 57 insertions(+), 51 deletions(-) diff --git a/logprep/metrics/exporter.py b/logprep/metrics/exporter.py index 411a47c7e..69c38aa04 100644 --- a/logprep/metrics/exporter.py +++ b/logprep/metrics/exporter.py @@ -52,6 +52,7 @@ def __init__(self, configuration: MetricsConfig): self.server = None self.healthcheck_functions = None self._multiprocessing_prepared = False + self.app = None def prepare_multiprocessing(self): """ @@ -99,10 +100,12 @@ def run(self, daemon=True): def init_server(self, daemon=True) -> None: """Initializes the server""" + if not self.app: + self.app = make_patched_asgi_app(self.healthcheck_functions) port = self.configuration.port self.server = http.ThreadingHTTPServer( self.configuration.uvicorn_config | {"port": port, "host": "0.0.0.0"}, - make_patched_asgi_app(self.healthcheck_functions), + self.app, daemon=daemon, logger_name="Exporter", ) @@ -116,6 +119,7 @@ def restart(self): def update_healthchecks(self, healthcheck_functions: Iterable[Callable], daemon=True) -> None: """Updates the healthcheck functions""" self.healthcheck_functions = healthcheck_functions + self.app = make_patched_asgi_app(self.healthcheck_functions) if self.server and self.server.thread and self.server.thread.is_alive(): self.server.shut_down() self.init_server(daemon=daemon) diff --git a/tests/acceptance/test_full_configuration.py b/tests/acceptance/test_full_configuration.py index 5ccf14a09..54e2ccf1e 100644 --- a/tests/acceptance/test_full_configuration.py +++ b/tests/acceptance/test_full_configuration.py @@ -115,7 +115,7 @@ def test_start_of_logprep_from_http_with_templated_url_and_config(): output = proc.stdout.readline().decode("utf8") -def test_logprep_exposes_prometheus_metrics(tmp_path): +def test_logprep_exposes_prometheus_metrics_and_healthchecks(tmp_path): temp_dir = tempfile.gettempdir() input_file_path = Path(os.path.join(temp_dir, "input.txt")) input_file_path.touch() @@ -246,4 +246,9 @@ def test_logprep_exposes_prometheus_metrics(tmp_path): len(re.findall(both_calculators, metrics)) == 4 ), "More or less than 4 rules were found for both calculator" + # check health endpoint + response = requests.get("http://127.0.0.1:8003/health", timeout=7) + response.raise_for_status() + assert "OK" == response.text + proc.kill() diff --git a/tests/unit/metrics/test_exporter.py b/tests/unit/metrics/test_exporter.py index 67d86ee72..1aa4541ec 100644 --- a/tests/unit/metrics/test_exporter.py +++ b/tests/unit/metrics/test_exporter.py @@ -7,7 +7,6 @@ from unittest import mock import pytest -import requests from asgiref.testing import ApplicationCommunicator from prometheus_client import REGISTRY, CollectorRegistry @@ -108,64 +107,20 @@ def test_is_running_returns_true_when_server_thread_is_alive(self): assert exporter.is_running +@mock.patch( + "logprep.util.http.ThreadingHTTPServer", new=mock.create_autospec(http.ThreadingHTTPServer) +) @mock.patch( "logprep.metrics.exporter.PrometheusExporter.prepare_multiprocessing", new=lambda *args, **kwargs: None, ) class TestHealthEndpoint: - def setup_method(self): - REGISTRY.__init__() - self.metrics_config = MetricsConfig(enabled=True, port=8000) - - def test_health_endpoint_returns_503_as_default_health_state(self): - exporter = PrometheusExporter(self.metrics_config) - exporter.run(daemon=False) - resp = requests.get("http://localhost:8000/health", timeout=0.5) - assert resp.status_code == 503 - exporter.server.shut_down() - - def test_health_endpoint_calls_health_check_functions(self): - exporter = PrometheusExporter(self.metrics_config) - function_mock = mock.Mock(return_value=True) - exporter.healthcheck_functions = [function_mock] - exporter.run(daemon=False) - resp = requests.get("http://localhost:8000/health", timeout=0.5) - assert resp.status_code == 200 - assert function_mock.call_count == 1 - - exporter.server.shut_down() - - def test_health_endpoint_calls_updated_functions(self): - exporter = PrometheusExporter(self.metrics_config) - function_mock = mock.Mock(return_value=True) - exporter.healthcheck_functions = [function_mock] - exporter.run(daemon=False) - requests.get("http://localhost:8000/health", timeout=0.5) - assert function_mock.call_count == 1, "initial function should be called" - new_function_mock = mock.Mock(return_value=True) - exporter.update_healthchecks([new_function_mock]) - requests.get("http://localhost:8000/health", timeout=0.5) - assert new_function_mock.call_count == 1, "New function should be called" - assert function_mock.call_count == 1, "Old function should not be called" - - exporter.server.shut_down() - - def test_health_check_returns_body_and_status_code(self): - exporter = PrometheusExporter(self.metrics_config) - exporter.run(daemon=False) - exporter.update_healthchecks([lambda: True]) - resp = requests.get("http://localhost:8000/health", timeout=0.5) - assert resp.status_code == 200 - assert resp.content.decode() == "OK" - exporter.server.shut_down() - - -class TestAsgiApp: """These tests uses the `asgiref.testing.ApplicationCommunicator` to test the ASGI app itself For more information see: https://dokk.org/documentation/django-channels/2.4.0/topics/testing/ """ def setup_method(self): + self.metrics_config = MetricsConfig(enabled=True, port=8000) self.registry = CollectorRegistry() self.captured_status = None self.captured_headers = None @@ -210,3 +165,45 @@ async def test_asgi_app(self, functions, expected_status, expected_body): assert event["status"] == expected_status event = await self.communicator.receive_output(timeout=1) assert expected_body in event["body"] + + @pytest.mark.asyncio + async def test_health_endpoint_calls_health_check_functions(self): + exporter = PrometheusExporter(self.metrics_config) + function_mock = mock.Mock(return_value=True) + exporter.healthcheck_functions = [function_mock] + exporter.run(daemon=False) + self.scope["path"] = "/health" + self.seed_app(exporter.app) + await self.communicator.send_input({"type": "http.request"}) + event = await self.communicator.receive_output(timeout=1) + assert event["status"] == 200 + event = await self.communicator.receive_output(timeout=1) + assert b"OK" in event["body"] + function_mock.assert_called_once() + + @pytest.mark.asyncio + async def test_update_health_checks_injects_new_functions(self): + exporter = PrometheusExporter(self.metrics_config) + function_mock = mock.Mock(return_value=True) + exporter.healthcheck_functions = [function_mock] + exporter.run(daemon=False) + exporter.server.thread = None + self.scope["path"] = "/health" + self.seed_app(exporter.app) + await self.communicator.send_input({"type": "http.request"}) + event = await self.communicator.receive_output(timeout=1) + assert event["status"] == 200 + event = await self.communicator.receive_output(timeout=1) + assert b"OK" in event["body"] + assert function_mock.call_count == 1, "initial function should be called" + new_function_mock = mock.Mock(return_value=True) + exporter.update_healthchecks([new_function_mock]) + self.scope["path"] = "/health" + self.seed_app(exporter.app) + await self.communicator.send_input({"type": "http.request"}) + event = await self.communicator.receive_output(timeout=1) + assert event["status"] == 200 + event = await self.communicator.receive_output(timeout=1) + assert b"OK" in event["body"] + assert new_function_mock.call_count == 1, "New function should be called" + assert function_mock.call_count == 1, "Old function should not be called" From 8fde055fc4993227d48f193f07e07eebd5bde8e3 Mon Sep 17 00:00:00 2001 From: ekneg54 Date: Tue, 12 Nov 2024 13:57:38 +0100 Subject: [PATCH 3/4] remove redundant init call --- tests/unit/metrics/test_exporter.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/unit/metrics/test_exporter.py b/tests/unit/metrics/test_exporter.py index 1aa4541ec..cb0a5f455 100644 --- a/tests/unit/metrics/test_exporter.py +++ b/tests/unit/metrics/test_exporter.py @@ -8,7 +8,7 @@ import pytest from asgiref.testing import ApplicationCommunicator -from prometheus_client import REGISTRY, CollectorRegistry +from prometheus_client import CollectorRegistry from logprep.metrics.exporter import PrometheusExporter, make_patched_asgi_app from logprep.util import http @@ -21,7 +21,6 @@ ) class TestPrometheusExporter: def setup_method(self): - REGISTRY.__init__() self.metrics_config = MetricsConfig(enabled=True, port=8000) def test_correct_setup(self): From 600e9668df566eea39187da803f1eafb4962c668 Mon Sep 17 00:00:00 2001 From: ekneg54 Date: Tue, 12 Nov 2024 14:01:39 +0100 Subject: [PATCH 4/4] fix linting --- tests/acceptance/test_full_configuration.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/acceptance/test_full_configuration.py b/tests/acceptance/test_full_configuration.py index 54e2ccf1e..aee28e09f 100644 --- a/tests/acceptance/test_full_configuration.py +++ b/tests/acceptance/test_full_configuration.py @@ -1,4 +1,6 @@ # pylint: disable=missing-docstring +# pylint: disable=line-too-long +# pylint: disable=too-many-locals import os import re import tempfile