Skip to content

Commit

Permalink
WIP Replace request with falcon 3
Browse files Browse the repository at this point in the history
  • Loading branch information
MoessnerFabian(Group) committed Nov 12, 2024
1 parent e1d4a87 commit cce9173
Showing 1 changed file with 46 additions and 62 deletions.
108 changes: 46 additions & 62 deletions tests/unit/connector/test_http_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import uvicorn
from requests.auth import HTTPBasicAuth

import falcon
from falcon import testing

from logprep.abc.input import FatalInputError
Expand Down Expand Up @@ -62,7 +61,7 @@ def setup_method(self):
self.object.setup()
self.app = self.object.http_server.server.config.app
self.target = self.object.target
self.falconClient = testing.TestClient(self.app)
self.client = testing.TestClient(self.app)

CONFIG: dict = {
"type": "http_input",
Expand Down Expand Up @@ -114,84 +113,83 @@ def test_not_first_pipeline(self):
assert connector.http_server is None

def test_get_method_returns_200(self):
resp = self.falconClient.simulate_get('/json')
# Todo: Notiz timeout nicht nötig bei falcon, da kein reales Netz
resp = self.client.simulate_get('/json')
assert resp.status_code == 200

def test_get_method_returns_200_with_authentication(self):
resp = self.falconClient.simulate_get('/auth-json-secret')
resp = self.client.simulate_get('/auth-json-secret')
assert resp.status_code == 200

def test_get_method_returns_429_if_queue_is_full(self):
self.object.messages.full = mock.MagicMock()
self.object.messages.full.return_value = True
resp = self.falconClient.simulate_get('/json')
resp = self.client.simulate_get('/json')
assert resp.status_code == 429

def test_get_error_code_too_many_requests(self):
data = {"message": "my log message"}
self.object.messages.put = mock.MagicMock()
self.object.messages.put.side_effect = queue.Full()
resp = self.falconClient.simulate_post('/json', json=data)
resp = self.client.simulate_post('/json', json=data)
assert resp.status_code == 429

def test_json_endpoint_accepts_post_request(self):
data = {"message": "my log message"}
resp = self.falconClient.simulate_post('/json', json=data)
resp = self.client.simulate_post('/json', json=data)
assert resp.status_code == 200

def test_json_endpoint_match_wildcard_route(self):
data = {"message": "my log message"}
resp = self.falconClient.simulate_post('/json', json=data)
resp = self.client.simulate_post('/json', json=data)
assert resp.status_code == 200

def test_json_endpoint_not_match_wildcard_route(self):
data = {"message": "my log message"}
resp = self.falconClient.simulate_post('/api/wildcard_path/json/another_path', json=data)
resp = self.client.simulate_post('/api/wildcard_path/json/another_path', json=data)
assert resp.status_code == 404

data = {"message": "my log message"}
resp = self.falconClient.simulate_post('/json', json=data)
resp = self.client.simulate_post('/json', json=data)
assert resp.status_code == 200

event_from_queue = self.object.messages.get(timeout=0.001)
assert event_from_queue == data

def test_plaintext_endpoint_accepts_post_request(self):
data = "my log message"
resp = self.falconClient.simulate_post('/plaintext', json=data)
resp = self.client.simulate_post('/plaintext', json=data)
assert resp.status_code == 200

def test_plaintext_message_is_put_in_queue(self):
data = "my log message"
resp = self.falconClient.simulate_post('/plaintext', json=data)
resp = self.client.simulate_post('/plaintext', body=data)
assert resp.status_code == 200
event_from_queue = self.object.messages.get(timeout=0.001)
assert event_from_queue.get("message") == data

def test_jsonl_endpoint_match_regex_route(self):
data = {"message": "my log message"}
resp = self.falconClient.simulate_post('/first/jsonl', json=data)
resp = self.client.simulate_post('/first/jsonl', json=data)
assert resp.status_code == 200

def test_jsonl_endpoint_not_match_regex_route(self):
data = {"message": "my log message"}
resp = self.falconClient.simulate_post('/firs/jsonl', json=data)
resp = self.client.simulate_post('/firs/jsonl', json=data)
assert resp.status_code == 404

def test_jsonl_endpoint_not_match_before_start_regex(self):
data = {"message": "my log message"}
resp = self.falconClient.simulate_post('/api/first/jsonl', json=data)
resp = self.client.simulate_post('/api/first/jsonl', json=data)
assert resp.status_code == 404

def test_jsonl_endpoint_match_wildcard_regex_mix_route(self):
data = {"message": "my log message"}
resp = self.falconClient.simulate_post('/third/jsonl/another_path/last_path', json=data)
resp = self.client.simulate_post('/third/jsonl/another_path/last_path', json=data)
assert resp.status_code == 200

def test_jsonl_endpoint_not_match_wildcard_regex_mix_route(self):
data = {"message": "my log message"}
resp = self.falconClient.simulate_post('/api/third/jsonl/another_path', json=data)
resp = self.client.simulate_post('/api/third/jsonl/another_path', json=data)
assert resp.status_code == 404

def test_jsonl_messages_are_put_in_queue(self):
Expand All @@ -200,7 +198,7 @@ def test_jsonl_messages_are_put_in_queue(self):
{"message": "my second log message"}
{"message": "my third log message"}
"""
resp = self.falconClient.simulate_post('/jsonl', body=data)
resp = self.client.simulate_post('/jsonl', body=data)
assert resp.status_code == 200
assert self.object.messages.qsize() == 3
event_from_queue = self.object.messages.get(timeout=1)
Expand All @@ -212,7 +210,7 @@ def test_jsonl_messages_are_put_in_queue(self):

def test_get_next_returns_message_from_queue(self):
data = {"message": "my log message"}
self.falconClient.simulate_post('/json', json=data)
self.client.simulate_post('/json', json=data)
assert self.object.get_next(0.001) == data

def test_get_next_returns_first_in_first_out(self):
Expand All @@ -236,9 +234,9 @@ def test_get_next_returns_first_in_first_out_for_mixed_endpoints(self):
for message in data:
endpoint, post_data = message.values()
if endpoint == "json":
self.falconClient.simulate_post('/json', json=post_data)
self.client.simulate_post('/json', json=post_data)
if endpoint == "plaintext":
self.falconClient.simulate_post('/plaintext', body=post_data)
self.client.simulate_post('/plaintext', body=post_data)
assert self.object.get_next(0.001) == data[0].get("data")
assert self.object.get_next(0.001) == {"message": data[1].get("data")}
assert self.object.get_next(0.001) == data[2].get("data")
Expand All @@ -253,7 +251,7 @@ def test_server_starts_threaded_server(self):
message = {"message": "my message"}
for i in range(100):
message["message"] = f"message number {i}"
self.falconClient.simulate_post('/json', json=message)
self.client.simulate_post('/json', json=message)
assert self.object.messages.qsize() == 100, "messages are put to queue"

def test_get_metadata(self):
Expand All @@ -265,43 +263,40 @@ def test_get_metadata(self):
connector.pipeline_index = 1
connector.setup()
target = connector.target
# Todo: comment entfernen
#resp = requests.post(url=f"{target}/json", json=message, timeout=0.5) # nosemgrep
resp = self.falconClient.simulate_post('/json', json=message)
# Todo: gibt es hier eine Lösung?
resp = requests.post(url=f"{target}/json", json=message, timeout=0.5) # nosemgrep
#resp = self.falconClient.simulate_post('/json', json=message)
assert resp.status_code == 200
message = connector.messages.get(timeout=0.5)
assert message["custom"]["url"] == target + "/json"
assert re.search(r"\d+\.\d+\.\d+\.\d+", message["custom"]["remote_addr"])
assert isinstance(message["custom"]["user_agent"], str)

def test_server_multiple_config_changes(self):
# Todo: abklären hier request zu lassen -> comments entfernen
message = {"message": "my message"}
connector_config = deepcopy(self.CONFIG)
connector_config["uvicorn_config"]["port"] = 9001
connector = Factory.create({"test connector": connector_config})
connector.pipeline_index = 1
connector.setup()
target = connector.target
# Todo: comment entfernen
#resp = requests.post(url=f"{target}/json", json=message, timeout=0.5) # nosemgrep
resp = self.falconClient.simulate_post('/json', json=message)
#resp = self.client.simulate_post('/json', json=message)
resp = requests.post(url=f"{target}/json", json=message, timeout=0.5) # nosemgrep
assert resp.status_code == 200
# Todo: was mit target / ports?
target = target.replace(":9001", ":9000")
try:
# Todo: comment entfernen
# resp = requests.post(url=f"{target}/json", json=message, timeout=0.5) # nosemgrep
resp = self.falconClient.simulate_post('/json', json=message)
#resp = self.client.simulate_post('/json', json=message)
resp = requests.post(url=f"{target}/json", json=message, timeout=0.5) # nosemgrep
except requests.exceptions.ConnectionError as e:
assert e.response is None
connector_config = deepcopy(self.CONFIG)
connector = Factory.create({"test connector": connector_config})
connector.pipeline_index = 1
connector.setup()
# Todo: comment entfernen
#target = connector.target
# resp = requests.post(url=f"{target}/json", json=message, timeout=0.5) # nosemgrep
resp = self.falconClient.simulate_post('/json', json=message)
target = connector.target
resp = requests.post(url=f"{target}/json", json=message, timeout=0.5) # nosemgrep
# resp = self.client.simulate_post('/json', json=message)
assert resp.status_code == 200

def test_get_next_with_hmac_of_raw_message(self):
Expand All @@ -323,7 +318,7 @@ def test_get_next_with_hmac_of_raw_message(self):
test_event = "the content"
# Todo: comment entfernen
# requests.post(url=f"{self.target}/plaintext", data=test_event, timeout=0.5) # nosemgrep
self.falconClient.simulate_post('/plaintext', body=test_event)
self.client.simulate_post('/plaintext', body=test_event)

expected_event = {
"message": "the content",
Expand All @@ -342,10 +337,11 @@ def test_endpoint_returns_401_if_authorization_not_provided(self, credentials_fi
new_connector = Factory.create({"test connector": self.CONFIG})
new_connector.pipeline_index = 1
new_connector.setup()
# resp = requests.post(
# url=f"{self.target}/auth-json-file", timeout=0.5, data=json.dumps(data)
# )
resp = self.falconClient.simulate_post('/auth-json-file', body=json.dumps(data))
resp = requests.post(
url=f"{self.target}/auth-json-file", timeout=0.5, data=json.dumps(data)
)
# Todo: gibt es eine Lösung?
#resp = self.falconClient.simulate_post('/auth-json-file', body=json.dumps(data))
assert resp.status_code == 401

def test_endpoint_returns_401_on_wrong_authorization(self, credentials_file_path):
Expand Down Expand Up @@ -417,20 +413,17 @@ def test_messages_is_multiprocessing_queue(self):

def test_all_endpoints_share_the_same_queue(self):
data = {"message": "my log message"}
# requests.post(url=f"{self.target}/json", json=data, timeout=0.5)
self.falconClient.simulate_post('/json', json=data)
self.client.simulate_post('/json', json=data)
assert self.object.messages.qsize() == 1
data = "my log message"
#requests.post(url=f"{self.target}/plaintext", json=data, timeout=0.5)
self.falconClient.simulate_post('/plaintext', json=data)
self.client.simulate_post('/plaintext', json=data)
assert self.object.messages.qsize() == 2
data = """
{"message": "my first log message"}
{"message": "my second log message"}
{"message": "my third log message"}
"""
#requests.post(url=f"{self.target}/jsonl", data=data, timeout=0.5)
self.falconClient.simulate_post('/jsonl', body=data)
self.client.simulate_post('/jsonl', body=data)
assert self.object.messages.qsize() == 5

def test_sets_target_to_https_schema_if_ssl_options(self):
Expand All @@ -457,38 +450,28 @@ def test_enpoints_count_requests(self):
self.object.setup()
random_number = random.randint(1, 100)
for number in range(random_number):
#requests.post(
# url=f"{self.target}/json", json={"message": f"my message{number}"}, timeout=0.5
#)
self.falconClient.simulate_post('/json', json={"message": f"my message{number}"})
self.client.simulate_post('/json', json={"message": f"my message{number}"})
assert self.object.metrics.number_of_http_requests == random_number

@pytest.mark.parametrize("endpoint", ["json", "plaintext", "jsonl"])
def test_endpoint_handles_gzip_compression(self, endpoint):
data = {"message": "my log message"}
data = gzip.compress(json.dumps(data).encode())
headers = {"Content-Encoding": "gzip"}
# resp = requests.post(
# url=f"{self.target}/{endpoint}",
# data=data,
# headers=headers,
# timeout=0.5,
# )
resp = self.falconClient.simulate_post(f"/{endpoint}", body=data, headers=headers)
resp = self.client.simulate_post(f"/{endpoint}", body=data, headers=headers)
assert resp.status_code == 200

@pytest.mark.parametrize("endpoint", ["json", "jsonl"])
def test_raises_http_bad_request_on_decode_error(self, endpoint):
data = "this is not a valid json nor jsonl"
#resp = requests.post(url=f"{self.target}/{endpoint}", data=data, timeout=0.5)
resp = self.falconClient.simulate_post(f"/{endpoint}", body=data)
resp = self.client.simulate_post(f"/{endpoint}", body=data)
assert resp.status_code == 400

@responses.activate
def test_health_endpoint_is_ready_if_all_endpoints_are_successful(self):
for endpoint in self.object.health_endpoints:
# hier weiter was ist mit den ports?
responses.get(f"http://127.0.0.1:9000{endpoint}", status=200)

assert self.object.health(), "Health endpoint should be ready"

@responses.activate
Expand Down Expand Up @@ -527,6 +510,7 @@ def test_health_endpoint_is_not_ready_if_one_endpoint_has_read_timeout(self):
def test_health_check_logs_error(self):
endpoint = self.object.health_endpoints[0]
responses.get(f"http://127.0.0.1:9000{endpoint}", body=requests.Timeout("bad"))

with mock.patch("logging.Logger.error") as mock_logger:
assert not self.object.health(), "Health endpoint should not be ready"
mock_logger.assert_called()
Expand Down

0 comments on commit cce9173

Please sign in to comment.