Skip to content

Commit

Permalink
Update: replace print with logging, simplify wait_job_statuses functi…
Browse files Browse the repository at this point in the history
…on and test_endpoint function, replace getting filenames with glob, fix typehints, introduce validate_uri function, more robust get_spec_path and get_examples_path functions
  • Loading branch information
GeraldIr committed Feb 7, 2024
1 parent 838e523 commit 8311ba4
Showing 1 changed file with 73 additions and 123 deletions.
196 changes: 73 additions & 123 deletions src/openeo_test_suite/lib/compliance_util.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from pathlib import Path
import pathlib
from typing import Union
from openapi_core import Spec
import yaml
Expand All @@ -6,6 +8,8 @@
import uuid
import requests
import time
import logging
import openeo_test_suite

from openeo_test_suite.lib.backend_under_test import (
get_backend_url,
Expand All @@ -19,104 +23,40 @@
from openapi_core import V31ResponseValidator


from typing import Union
import requests
from requests import Request, Session


def test_endpoint(
base_url: str,
endpoint_path: str,
test_name: str,
spec: Spec,
spec: "Spec",
payload: dict = None,
bearer_token: str = None,
method: str = "GET",
expected_status_codes: Union[list[int], int] = [200],
return_response: bool = False,
):
full_endpoint_url = f"{base_url}{endpoint_path}"
session = Session()
headers = {"Content-Type": "application/json"} if payload else {}

if bearer_token is not None:
if method == "POST":
requests_response = requests.post(
full_endpoint_url,
data=payload,
headers={
"Content-Type": "application/json",
"Authorization": f"{bearer_token}",
},
)

requests_request = requests.Request(
"post",
full_endpoint_url,
data=payload,
headers={
"Content-Type": "application/json",
"Authorization": f"{bearer_token}",
},
)
if bearer_token:
headers["Authorization"] = bearer_token

elif method == "PUT":
requests_response = requests.put(
full_endpoint_url,
data=payload,
headers={
"Content-Type": "application/json",
"Authorization": f"{bearer_token}",
},
)

requests_request = requests.Request(
"put",
full_endpoint_url,
data=payload,
headers={
"Content-Type": "application/json",
"Authorization": f"{bearer_token}",
},
)

elif method == "PATCH":
requests_response = requests.patch(
full_endpoint_url,
data=payload,
headers={
"Content-Type": "application/json",
"Authorization": f"{bearer_token}",
},
)

requests_request = requests.Request(
"patch",
full_endpoint_url,
data=payload,
headers={
"Content-Type": "application/json",
"Authorization": f"{bearer_token}",
},
)

if method == "GET":
requests_response = requests.get(
full_endpoint_url, headers={"Authorization": f"{bearer_token}"}
)
requests_request = requests.Request(
"get", full_endpoint_url, headers={"Authorization": f"{bearer_token}"}
)

elif method == "DELETE":
requests_response = requests.delete(
full_endpoint_url, headers={"Authorization": f"{bearer_token}"}
)
requests_request = requests.Request(
"delete",
full_endpoint_url,
headers={"Authorization": f"{bearer_token}"},
)
# if both payload and bearer token are None, it can only be a simple get
else:
requests_response = requests.get(full_endpoint_url)
requests_request = requests.Request("get", full_endpoint_url)
response = session.request(
method=method.upper(),
url=full_endpoint_url,
json=payload,
headers=headers,
)

openapi_request = RequestsOpenAPIRequest(requests_request)
openapi_response = RequestsOpenAPIResponse(requests_response)
openapi_request = RequestsOpenAPIRequest(
Request(method.upper(), full_endpoint_url, json=payload, headers=headers)
)
openapi_response = RequestsOpenAPIResponse(response)

try:
if check_status_code(expected_status_codes, openapi_response.status_code):
Expand All @@ -133,12 +73,12 @@ def test_endpoint(
except Exception as e:
print_test_results(e, endpoint_path=endpoint_path, test_name=test_name)
if return_response:
return check_test_results(e), requests_response
return check_test_results(e), response
else:
return check_test_results(e)
else:
if return_response:
return "", requests_response
return "", response
else:
return ""

Expand Down Expand Up @@ -184,23 +124,17 @@ def wait_job_statuses(
"""
end_time = time.time() + timeout
while time.time() < end_time:
all_jobs_finished = True
for job_id in job_ids:
if all_jobs_finished:
all_jobs_finished = (
get_batch_job_status(
base_url=base_url, bearer_token=bearer_token, job_id=job_id
)
in job_statuses
)
else:
break
if all_jobs_finished:
print("All jobs have reached their desired status.")
return True
if all(
get_batch_job_status(
base_url=base_url, bearer_token=bearer_token, job_id=job_id
)
in job_statuses
for job_id in job_ids
):
return True
time.sleep(1)
print("Waiting on jobs to reach desired status..")
print("Jobs failed to reach desired state, timeout has been reached.")
logging.log("Waiting on jobs to reach desired status..")
logging.warn("Jobs failed to reach desired state, timeout has been reached.")
return False


Expand Down Expand Up @@ -305,7 +239,11 @@ def adjust_server_in_well_known(data, endpoint):


def validate_uri(value):
return True
if not isinstance(value, str):
return False
if value.startswith("http://") or value.startswith("https://"):
return True
return False


extra_format_validators = {
Expand All @@ -328,25 +266,37 @@ def get_examples_path():
)


def get_spec_path():
return os.path.join(
os.getcwd(), "src/openeo_test_suite/tests/general/openeo-api/openapi.yaml"
def _guess_root():
project_root = Path(openeo_test_suite.__file__).parents[2]
candidates = [
project_root / "assets/openeo-api",
Path("./assets/openeo-api"),
Path("./openeo-test-suite/assets/openeo-api"),
]
for candidate in candidates:
if candidate.exists() and candidate.is_dir():
return candidate
raise ValueError(
f"Could not find valid processes test root directory (tried {candidates})"
)


def get_spec_path():
return _guess_root() / "openapi.yaml"


def load_payloads_from_directory(directory_path: str) -> list[dict]:
for filename in os.listdir(directory_path):
if filename.endswith(".json"):
file_path = os.path.join(directory_path, filename)
with open(file_path, "r") as file:
try:
# Load the JSON data from the file
data = json.load(file)
yield json.dumps(data)
except json.JSONDecodeError:
print(f"Error decoding JSON in file: {filename}")
except Exception as e:
print(f"Error reading file: {filename} - {str(e)}")
for filename in pathlib.Path.glob(directory_path, "*.json"):
file_path = os.path.join(directory_path, filename)
with open(file_path, "r") as file:
try:
# Load the JSON data from the file
data = json.load(file)
yield json.dumps(data)
except json.JSONDecodeError:
logging.error(f"Error decoding JSON in file: {filename}")
except Exception as e:
logging.error(f"Error reading file: {filename} - {str(e)}")


def set_uuid_in_job(json_data):
Expand All @@ -363,14 +313,14 @@ def set_uuid_in_job(json_data):
def delete_id_resource(
base_url: str, endpoint_path: str, bearer_token: str, ids: list[str]
):
try:
for id in ids:
for id in ids:
try:
requests.delete(
f"{base_url}/{endpoint_path}/{id}",
headers={"Authorization": f"{bearer_token}"},
)
except Exception as e:
print(e)
except Exception as e:
logging.error(f"Failed to delete resource with id {id}: {e}")


def put_process_graphs(base_url: str, bearer_token: str): # TODO id and so forth
Expand All @@ -395,7 +345,7 @@ def put_process_graphs(base_url: str, bearer_token: str): # TODO id and so fort
},
)
except Exception as e:
print(e)
print(f"Failed to create process graph: {e}")
return created_udp_ids


Expand Down

0 comments on commit 8311ba4

Please sign in to comment.