Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

speed up exporter tests #705

Merged
merged 4 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@
"python.testing.pytestEnabled": true,
"python.testing.unittestEnabled": false,
"python.analysis.importFormat": "absolute",
"python.testing.pytestPath": "pytest",
"editor.formatOnSave": true,
"editor.rulers": [
80,
100,
120
],
"editor.codeActionsOnSave": {
"source.organizeImports": "explicit"
"source.organizeImports": "explicit",
"source.unusedImports": "always",
"source.convertImportFormat": "always"
},
"autoDocstring.docstringFormat": "numpy",
"files.exclude": {
Expand Down
6 changes: 5 additions & 1 deletion logprep/metrics/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def __init__(self, configuration: MetricsConfig):
self.server = None
self.healthcheck_functions = None
self._multiprocessing_prepared = False
self.app = None

def prepare_multiprocessing(self):
"""
Expand Down Expand Up @@ -99,10 +100,12 @@ def run(self, daemon=True):

def init_server(self, daemon=True) -> None:
"""Initializes the server"""
if not self.app:
self.app = make_patched_asgi_app(self.healthcheck_functions)
port = self.configuration.port
self.server = http.ThreadingHTTPServer(
self.configuration.uvicorn_config | {"port": port, "host": "0.0.0.0"},
make_patched_asgi_app(self.healthcheck_functions),
self.app,
daemon=daemon,
logger_name="Exporter",
)
Expand All @@ -116,6 +119,7 @@ def restart(self):
def update_healthchecks(self, healthcheck_functions: Iterable[Callable], daemon=True) -> None:
"""Updates the healthcheck functions"""
self.healthcheck_functions = healthcheck_functions
self.app = make_patched_asgi_app(self.healthcheck_functions)
if self.server and self.server.thread and self.server.thread.is_alive():
self.server.shut_down()
self.init_server(daemon=daemon)
Expand Down
12 changes: 7 additions & 5 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,18 @@ keywords = [
"logdata",
]
dependencies = [
"aiohttp>=3.9.2", # CVE-2024-23334
"aiohttp>=3.9.2", # CVE-2024-23334
"attrs",
"certifi>=2023.7.22", # CVE-2023-37920
"ciso8601", # fastest iso8601 datetime parser. can be removed after dropping support for python < 3.11
"certifi>=2023.7.22", # CVE-2023-37920
"ciso8601", # fastest iso8601 datetime parser. can be removed after dropping support for python < 3.11
"colorama",
"confluent-kafka>2",
"geoip2",
"hyperscan>=0.7.0",
"jsonref",
"luqum",
"more-itertools==8.10.0",
"mysql-connector-python>=9.1.0", # CVE-2024-21272
"mysql-connector-python>=9.1.0", # CVE-2024-21272
"numpy>=1.26.0",
"opensearch-py",
"prometheus_client",
Expand All @@ -84,7 +84,7 @@ dependencies = [
"schedule",
"tldextract",
"urlextract",
"urllib3>=1.26.17", # CVE-2023-43804
"urllib3>=1.26.17", # CVE-2023-43804
"uvicorn",
"deepdiff",
"msgspec",
Expand Down Expand Up @@ -113,6 +113,8 @@ dev = [
"jinja2",
"maturin",
"cibuildwheel",
"asgiref",
"pytest-asyncio",
]

doc = [
Expand Down
9 changes: 8 additions & 1 deletion tests/acceptance/test_full_configuration.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# pylint: disable=missing-docstring
# pylint: disable=line-too-long
# pylint: disable=too-many-locals
import os
import re
import tempfile
Expand Down Expand Up @@ -115,7 +117,7 @@ def test_start_of_logprep_from_http_with_templated_url_and_config():
output = proc.stdout.readline().decode("utf8")


def test_logprep_exposes_prometheus_metrics(tmp_path):
def test_logprep_exposes_prometheus_metrics_and_healthchecks(tmp_path):
temp_dir = tempfile.gettempdir()
input_file_path = Path(os.path.join(temp_dir, "input.txt"))
input_file_path.touch()
Expand Down Expand Up @@ -246,4 +248,9 @@ def test_logprep_exposes_prometheus_metrics(tmp_path):
len(re.findall(both_calculators, metrics)) == 4
), "More or less than 4 rules were found for both calculator"

# check health endpoint
response = requests.get("http://127.0.0.1:8003/health", timeout=7)
response.raise_for_status()
assert "OK" == response.text

proc.kill()
143 changes: 83 additions & 60 deletions tests/unit/metrics/test_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
# pylint: disable=protected-access
# pylint: disable=attribute-defined-outside-init
# pylint: disable=line-too-long
import asyncio
import os.path
from unittest import mock

import pytest
import requests
from prometheus_client import REGISTRY
from asgiref.testing import ApplicationCommunicator
from prometheus_client import CollectorRegistry

from logprep.metrics.exporter import PrometheusExporter
from logprep.metrics.exporter import PrometheusExporter, make_patched_asgi_app
from logprep.util import http
from logprep.util.configuration import MetricsConfig

Expand All @@ -20,7 +21,6 @@
)
class TestPrometheusExporter:
def setup_method(self):
REGISTRY.__init__()
self.metrics_config = MetricsConfig(enabled=True, port=8000)

def test_correct_setup(self):
Expand Down Expand Up @@ -106,80 +106,103 @@ def test_is_running_returns_true_when_server_thread_is_alive(self):
assert exporter.is_running


@mock.patch(
"logprep.util.http.ThreadingHTTPServer", new=mock.create_autospec(http.ThreadingHTTPServer)
)
@mock.patch(
"logprep.metrics.exporter.PrometheusExporter.prepare_multiprocessing",
new=lambda *args, **kwargs: None,
)
class TestHealthEndpoint:
"""These tests uses the `asgiref.testing.ApplicationCommunicator` to test the ASGI app itself
For more information see: https://dokk.org/documentation/django-channels/2.4.0/topics/testing/
"""

def setup_method(self):
REGISTRY.__init__()
self.metrics_config = MetricsConfig(enabled=True, port=8000)
self.registry = CollectorRegistry()
self.captured_status = None
self.captured_headers = None
# Setup ASGI scope
self.scope = {
"client": ("127.0.0.1", 32767),
"headers": [],
"http_version": "1.0",
"method": "GET",
"path": "/",
"query_string": b"",
"scheme": "http",
"server": ("127.0.0.1", 80),
"type": "http",
}
self.communicator = None

def teardown_method(self):
if self.communicator:
asyncio.get_event_loop().run_until_complete(self.communicator.wait())

def seed_app(self, app):
self.communicator = ApplicationCommunicator(app, self.scope)

def test_health_endpoint_returns_503_as_default_health_state(self):
exporter = PrometheusExporter(self.metrics_config)
exporter.run(daemon=False)
resp = requests.get("http://localhost:8000/health", timeout=0.5)
assert resp.status_code == 503
exporter.server.shut_down()

def test_health_endpoint_calls_health_check_functions(self):
@pytest.mark.parametrize(
"functions, expected_status, expected_body",
[
([lambda: True], 200, b"OK"),
([lambda: True, lambda: True], 200, b"OK"),
([lambda: False], 503, b"FAIL"),
([lambda: False, lambda: False], 503, b"FAIL"),
([lambda: False, lambda: True, lambda: True], 503, b"FAIL"),
],
)
@pytest.mark.asyncio
async def test_asgi_app(self, functions, expected_status, expected_body):
app = make_patched_asgi_app(functions)
self.scope["path"] = "/health"
self.seed_app(app)
await self.communicator.send_input({"type": "http.request"})
event = await self.communicator.receive_output(timeout=1)
assert event["status"] == expected_status
event = await self.communicator.receive_output(timeout=1)
assert expected_body in event["body"]

@pytest.mark.asyncio
async def test_health_endpoint_calls_health_check_functions(self):
exporter = PrometheusExporter(self.metrics_config)
function_mock = mock.Mock(return_value=True)
exporter.healthcheck_functions = [function_mock]
exporter.run(daemon=False)
resp = requests.get("http://localhost:8000/health", timeout=0.5)
assert resp.status_code == 200
assert function_mock.call_count == 1

exporter.server.shut_down()

def test_health_endpoint_calls_updated_functions(self):
self.scope["path"] = "/health"
self.seed_app(exporter.app)
await self.communicator.send_input({"type": "http.request"})
event = await self.communicator.receive_output(timeout=1)
assert event["status"] == 200
event = await self.communicator.receive_output(timeout=1)
assert b"OK" in event["body"]
function_mock.assert_called_once()

@pytest.mark.asyncio
async def test_update_health_checks_injects_new_functions(self):
exporter = PrometheusExporter(self.metrics_config)
function_mock = mock.Mock(return_value=True)
exporter.healthcheck_functions = [function_mock]
exporter.run(daemon=False)
requests.get("http://localhost:8000/health", timeout=0.5)
exporter.server.thread = None
self.scope["path"] = "/health"
self.seed_app(exporter.app)
await self.communicator.send_input({"type": "http.request"})
event = await self.communicator.receive_output(timeout=1)
assert event["status"] == 200
event = await self.communicator.receive_output(timeout=1)
assert b"OK" in event["body"]
assert function_mock.call_count == 1, "initial function should be called"
new_function_mock = mock.Mock(return_value=True)
exporter.update_healthchecks([new_function_mock])
requests.get("http://localhost:8000/health", timeout=0.5)
self.scope["path"] = "/health"
self.seed_app(exporter.app)
await self.communicator.send_input({"type": "http.request"})
event = await self.communicator.receive_output(timeout=1)
assert event["status"] == 200
event = await self.communicator.receive_output(timeout=1)
assert b"OK" in event["body"]
assert new_function_mock.call_count == 1, "New function should be called"
assert function_mock.call_count == 1, "Old function should not be called"

exporter.server.shut_down()

@pytest.mark.parametrize(
"functions, expected",
[
([lambda: True], 200),
([lambda: True, lambda: True], 200),
([lambda: False], 503),
([lambda: False, lambda: False], 503),
([lambda: False, lambda: True, lambda: True], 503),
],
)
def test_health_check_returns_status_code(self, functions, expected):
exporter = PrometheusExporter(self.metrics_config)
exporter.run(daemon=False)
exporter.update_healthchecks(functions)
resp = requests.get("http://localhost:8000/health", timeout=0.5)
assert resp.status_code == expected
exporter.server.shut_down()

@pytest.mark.parametrize(
"functions, expected",
[
([lambda: True], "OK"),
([lambda: True, lambda: True], "OK"),
([lambda: False], "FAIL"),
([lambda: False, lambda: False], "FAIL"),
([lambda: False, lambda: True, lambda: True], "FAIL"),
],
)
def test_health_check_returns_body(self, functions, expected):
exporter = PrometheusExporter(self.metrics_config)
exporter.run(daemon=False)
exporter.update_healthchecks(functions)
resp = requests.get("http://localhost:8000/health", timeout=0.5)
assert resp.content.decode() == expected
exporter.server.shut_down()
Loading