Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev unit tests speed up #698

Merged
merged 6 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions logprep/filter/lucene_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@
RegEx-Filter
------------

It is possible use regex expressions to match values.
To be recognized as a regular expression the filter field has to be start and end with
It is possible to use regex expressions to match values.
To be recognized as a regular expression, the filter field has to start with
:code:`/`.


Expand Down
83 changes: 38 additions & 45 deletions tests/unit/connector/test_http_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import uvicorn
from requests.auth import HTTPBasicAuth

from falcon import testing

from logprep.abc.input import FatalInputError
from logprep.connector.http.input import HttpInput
from logprep.factory import Factory
Expand Down Expand Up @@ -57,7 +59,9 @@ def setup_method(self):
super().setup_method()
self.object.pipeline_index = 1
self.object.setup()
self.app = self.object.http_server.server.config.app
self.target = self.object.target
self.client = testing.TestClient(self.app)

CONFIG: dict = {
"type": "http_input",
Expand Down Expand Up @@ -109,89 +113,83 @@ def test_not_first_pipeline(self):
assert connector.http_server is None

def test_get_method_returns_200(self):
resp = requests.get(url=f"{self.target}/json", timeout=0.5)
resp = self.client.simulate_get("/json")
assert resp.status_code == 200

def test_get_method_returns_200_with_authentication(self):
resp = requests.get(url=f"{self.target}/auth-json-secret", timeout=0.5)
resp = self.client.simulate_get("/auth-json-secret")
assert resp.status_code == 200

def test_get_method_returns_429_if_queue_is_full(self):
self.object.messages.full = mock.MagicMock()
self.object.messages.full.return_value = True
resp = requests.get(url=f"{self.target}/json", timeout=20)
resp = self.client.simulate_get("/json")
assert resp.status_code == 429

def test_get_error_code_too_many_requests(self):
data = {"message": "my log message"}
self.object.messages.put = mock.MagicMock()
self.object.messages.put.side_effect = queue.Full()
session = requests.Session()
resp = session.post(url=f"{self.target}/json", json=data, timeout=0.5)
resp = self.client.simulate_post("/json", json=data)
assert resp.status_code == 429

def test_json_endpoint_accepts_post_request(self):
data = {"message": "my log message"}
resp = requests.post(url=f"{self.target}/json", json=data, timeout=0.5)
resp = self.client.simulate_post("/json", json=data)
assert resp.status_code == 200

def test_json_endpoint_match_wildcard_route(self):
data = {"message": "my log message"}
resp = requests.post(url=f"{self.target}/api/wildcard_path/json", json=data, timeout=0.5)
resp = self.client.simulate_post("/json", json=data)
assert resp.status_code == 200

def test_json_endpoint_not_match_wildcard_route(self):
data = {"message": "my log message"}
resp = requests.post(
url=f"{self.target}/api/wildcard_path/json/another_path", json=data, timeout=0.5
)
resp = self.client.simulate_post("/api/wildcard_path/json/another_path", json=data)
assert resp.status_code == 404

data = {"message": "my log message"}
resp = requests.post(url=f"{self.target}/json", json=data, timeout=0.5)
resp = self.client.simulate_post("/json", json=data)
assert resp.status_code == 200

event_from_queue = self.object.messages.get(timeout=0.001)
assert event_from_queue == data

def test_plaintext_endpoint_accepts_post_request(self):
data = "my log message"
resp = requests.post(url=f"{self.target}/plaintext", json=data, timeout=0.5)
resp = self.client.simulate_post("/plaintext", json=data)
assert resp.status_code == 200

def test_plaintext_message_is_put_in_queue(self):
data = "my log message"
resp = requests.post(url=f"{self.target}/plaintext", data=data, timeout=0.5)
resp = self.client.simulate_post("/plaintext", body=data)
assert resp.status_code == 200
event_from_queue = self.object.messages.get(timeout=0.001)
assert event_from_queue.get("message") == data

def test_jsonl_endpoint_match_regex_route(self):
data = {"message": "my log message"}
resp = requests.post(url=f"{self.target}/first/jsonl", json=data, timeout=0.5)
resp = self.client.simulate_post("/first/jsonl", json=data)
assert resp.status_code == 200

def test_jsonl_endpoint_not_match_regex_route(self):
data = {"message": "my log message"}
resp = requests.post(url=f"{self.target}/firs/jsonl", json=data, timeout=0.5)
resp = self.client.simulate_post("/firs/jsonl", json=data)
assert resp.status_code == 404

def test_jsonl_endpoint_not_match_before_start_regex(self):
data = {"message": "my log message"}
resp = requests.post(url=f"{self.target}/api/first/jsonl", json=data, timeout=0.5)
resp = self.client.simulate_post("/api/first/jsonl", json=data)
assert resp.status_code == 404

def test_jsonl_endpoint_match_wildcard_regex_mix_route(self):
data = {"message": "my log message"}
resp = requests.post(
url=f"{self.target}/third/jsonl/another_path/last_path", json=data, timeout=0.5
)
resp = self.client.simulate_post("/third/jsonl/another_path/last_path", json=data)
assert resp.status_code == 200

def test_jsonl_endpoint_not_match_wildcard_regex_mix_route(self):
data = {"message": "my log message"}
resp = requests.post(
url=f"{self.target}/api/third/jsonl/another_path", json=data, timeout=0.5
)
resp = self.client.simulate_post("/api/third/jsonl/another_path", json=data)
assert resp.status_code == 404

def test_jsonl_messages_are_put_in_queue(self):
Expand All @@ -200,7 +198,7 @@ def test_jsonl_messages_are_put_in_queue(self):
{"message": "my second log message"}
{"message": "my third log message"}
"""
resp = requests.post(url=f"{self.target}/jsonl", data=data, timeout=0.5)
resp = self.client.simulate_post("/jsonl", body=data)
assert resp.status_code == 200
assert self.object.messages.qsize() == 3
event_from_queue = self.object.messages.get(timeout=1)
Expand All @@ -212,7 +210,7 @@ def test_jsonl_messages_are_put_in_queue(self):

def test_get_next_returns_message_from_queue(self):
data = {"message": "my log message"}
requests.post(url=f"{self.target}/json", json=data, timeout=0.5)
self.client.simulate_post("/json", json=data)
assert self.object.get_next(0.001) == data

def test_get_next_returns_first_in_first_out(self):
Expand All @@ -236,9 +234,9 @@ def test_get_next_returns_first_in_first_out_for_mixed_endpoints(self):
for message in data:
endpoint, post_data = message.values()
if endpoint == "json":
requests.post(url=self.target + "/json", json=post_data, timeout=0.5)
self.client.simulate_post("/json", json=post_data)
if endpoint == "plaintext":
requests.post(url=self.target + "/plaintext", data=post_data, timeout=0.5)
self.client.simulate_post("/plaintext", body=post_data)
assert self.object.get_next(0.001) == data[0].get("data")
assert self.object.get_next(0.001) == {"message": data[1].get("data")}
assert self.object.get_next(0.001) == data[2].get("data")
Expand All @@ -253,7 +251,7 @@ def test_server_starts_threaded_server(self):
message = {"message": "my message"}
for i in range(100):
message["message"] = f"message number {i}"
requests.post(url=f"{self.target}/json", json=message, timeout=0.5) # nosemgrep
self.client.simulate_post("/json", json=message)
assert self.object.messages.qsize() == 100, "messages are put to queue"

def test_get_metadata(self):
Expand All @@ -265,7 +263,7 @@ def test_get_metadata(self):
connector.pipeline_index = 1
connector.setup()
target = connector.target
resp = requests.post(url=f"{target}/json", json=message, timeout=0.5) # nosemgrep
resp = requests.post(url=f"{target}/json", json=message, timeout=0.5)
assert resp.status_code == 200
message = connector.messages.get(timeout=0.5)
assert message["custom"]["url"] == target + "/json"
Expand All @@ -280,19 +278,19 @@ def test_server_multiple_config_changes(self):
connector.pipeline_index = 1
connector.setup()
target = connector.target
resp = requests.post(url=f"{target}/json", json=message, timeout=0.5) # nosemgrep
resp = requests.post(url=f"{target}/json", json=message, timeout=0.5)
assert resp.status_code == 200
target = target.replace(":9001", ":9000")
try:
resp = requests.post(url=f"{target}/json", json=message, timeout=0.5) # nosemgrep
resp = requests.post(url=f"{target}/json", json=message, timeout=0.5)
except requests.exceptions.ConnectionError as e:
assert e.response is None
connector_config = deepcopy(self.CONFIG)
connector = Factory.create({"test connector": connector_config})
connector.pipeline_index = 1
connector.setup()
target = connector.target
resp = requests.post(url=f"{target}/json", json=message, timeout=0.5) # nosemgrep
resp = requests.post(url=f"{target}/json", json=message, timeout=0.5)
assert resp.status_code == 200

def test_get_next_with_hmac_of_raw_message(self):
Expand All @@ -312,7 +310,7 @@ def test_get_next_with_hmac_of_raw_message(self):
connector.pipeline_index = 1
connector.setup()
test_event = "the content"
requests.post(url=f"{self.target}/plaintext", data=test_event, timeout=0.5) # nosemgrep
self.client.simulate_post("/plaintext", body=test_event)

expected_event = {
"message": "the content",
Expand Down Expand Up @@ -401,17 +399,17 @@ def test_messages_is_multiprocessing_queue(self):

def test_all_endpoints_share_the_same_queue(self):
data = {"message": "my log message"}
requests.post(url=f"{self.target}/json", json=data, timeout=0.5)
self.client.simulate_post("/json", json=data)
assert self.object.messages.qsize() == 1
data = "my log message"
requests.post(url=f"{self.target}/plaintext", json=data, timeout=0.5)
self.client.simulate_post("/plaintext", json=data)
assert self.object.messages.qsize() == 2
data = """
{"message": "my first log message"}
{"message": "my second log message"}
{"message": "my third log message"}
"""
requests.post(url=f"{self.target}/jsonl", data=data, timeout=0.5)
self.client.simulate_post("/jsonl", body=data)
assert self.object.messages.qsize() == 5

def test_sets_target_to_https_schema_if_ssl_options(self):
Expand All @@ -438,34 +436,28 @@ def test_enpoints_count_requests(self):
self.object.setup()
random_number = random.randint(1, 100)
for number in range(random_number):
requests.post(
url=f"{self.target}/json", json={"message": f"my message{number}"}, timeout=0.5
)
self.client.simulate_post("/json", json={"message": f"my message{number}"})
assert self.object.metrics.number_of_http_requests == random_number

@pytest.mark.parametrize("endpoint", ["json", "plaintext", "jsonl"])
def test_endpoint_handles_gzip_compression(self, endpoint):
data = {"message": "my log message"}
data = gzip.compress(json.dumps(data).encode())
headers = {"Content-Encoding": "gzip"}
resp = requests.post(
url=f"{self.target}/{endpoint}",
data=data,
headers=headers,
timeout=0.5,
)
resp = self.client.simulate_post(f"/{endpoint}", body=data, headers=headers)
assert resp.status_code == 200

@pytest.mark.parametrize("endpoint", ["json", "jsonl"])
def test_raises_http_bad_request_on_decode_error(self, endpoint):
data = "this is not a valid json nor jsonl"
resp = requests.post(url=f"{self.target}/{endpoint}", data=data, timeout=0.5)
resp = self.client.simulate_post(f"/{endpoint}", body=data)
assert resp.status_code == 400

@responses.activate
def test_health_endpoint_is_ready_if_all_endpoints_are_successful(self):
for endpoint in self.object.health_endpoints:
responses.get(f"http://127.0.0.1:9000{endpoint}", status=200)

assert self.object.health(), "Health endpoint should be ready"

@responses.activate
Expand Down Expand Up @@ -504,6 +496,7 @@ def test_health_endpoint_is_not_ready_if_one_endpoint_has_read_timeout(self):
def test_health_check_logs_error(self):
endpoint = self.object.health_endpoints[0]
responses.get(f"http://127.0.0.1:9000{endpoint}", body=requests.Timeout("bad"))

with mock.patch("logging.Logger.error") as mock_logger:
assert not self.object.health(), "Health endpoint should not be ready"
mock_logger.assert_called()
Expand Down
Loading