Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(gherkin-tester): add new endpoint to test workflows (#699) #699

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
297 changes: 297 additions & 0 deletions reana_server/rest/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@
get_workspace_retention_rules,
is_uuid_v4,
)
from reana_commons.gherkin_parser.data_fetcher import DataFetcherBase
from dataclasses import asdict
import pathlib
import reana_commons.gherkin_parser.parser as parser
from reana_db.models import Workflow

try:
from urllib import parse as urlparse
Expand Down Expand Up @@ -3262,3 +3267,295 @@ def prune_workspace(
except Exception as e:
logging.exception(str(e))
return jsonify({"message": str(e)}), 500


@blueprint.route("/workflows/<workflow_id_or_name>/test_workflow", methods=["GET"])
@signin_required()
def test_workflow(workflow_id_or_name, user):
r"""Test a workflow.

---
get:
summary: Test a workflow.
description: >-
This resource tests a given workflow by using Gherkin test files found in its specification.
operationId: test_workflow
produces:
- application/json
parameters:
- name: access_token
in: query
description: The API access_token of workflow owner.
required: false
type: string
- name: workflow_id_or_name
in: path
description: Required. Analysis UUID or name.
required: true
type: string
responses:
200:
description: >-
Request succeeded. The workflow has been tested.
schema:
type: object
properties:
workflow_id:
type: string
workflow_name:
type: string
results:
type: array
items:
type: object
properties:
scenario:
type: string
failed_testcase:
type: string
result:
type: string
error_log:
type: string
feature:
type: string
checked_at:
type: string

examples:
application/json:
{
"message": "The workspace has successfully been tested.",
"workflow_id": "cdcf48b1-c2f3-4693-8230-b066e088c6ac",
"workflow_name": "mytest.1",
"results": [{"scenario": "Test scenario", failed_testcase": "", "result": "passed", "error_log": "", "feature": "Test feature", "checked_at": "2024-11-24T23:59:59"}]
}
400:
description: >-
Request failed. The incoming data specification seems malformed.
schema:
type: object
properties:
message:
type: string
examples:
application/json:
{
"message": "Malformed request."
}
403:
description: >-
Request failed. User is not allowed to access workflow.
schema:
type: object
properties:
message:
type: string
examples:
application/json:
{
"message": "User 00000000-0000-0000-0000-000000000000
is not allowed to access workflow
256b25f4-4cfb-4684-b7a8-73872ef455a1"
}
404:
description: >-
Request failed. User does not exist.
schema:
type: object
properties:
message:
type: string
examples:
application/json:
{
"message": "Workflow cdcf48b1-c2f3-4693-8230-b066e088c6ac does
not exist"
}
500:
description: >-
Request failed. Internal controller error.
schema:
type: object
properties:
message:
type: string
examples:
application/json:
{
"message": "Internal controller error."
}
"""

class DataFetcherDB(DataFetcherBase):
"""Implementation of DataFetcherBase using server side functions."""

def list_files(
self, workflow, file_name=None, page=None, size=None, search=None
):
# to prevent multiple "user" arguments being passed
_get_files = remove_decorators(get_files)
response, code = _get_files(
workflow, user, file_name=file_name, page=page, size=size, search=search
)
return json.loads((response.get_data()).decode("utf-8"))["items"]

def get_workflow_disk_usage(self, workflow, parameters):
"""Display disk usage workflow."""
response, code = _get_workflow_disk_usage(workflow, user, parameters)
return response.get_json()

def get_workflow_logs(self, workflow, steps=None, page=None, size=None):
"""Get logs from a workflow engine, use existing API function."""
response, code = _get_workflow_logs(
workflow, user, steps=steps, page=page, size=size
)
return response.get_json()

def get_workflow_status(self, workflow):
"""Get status of a previously created workflow."""
_get_workflow_status = remove_decorators(get_workflow_status)
response, code = _get_workflow_status(workflow, user)
return response.get_json()

def get_workflow_specification(self, workflow):
"""Get specification of previously created workflow."""
_get_workflow_specification = remove_decorators(get_workflow_specification)
response, code = _get_workflow_specification(workflow, user)
return response.get_json()

def download_file(self, workflow, file_path):
"""Download the requested file if it exists."""
_download_file = remove_decorators(download_file)

response, code = _download_file(
workflow_id_or_name=workflow, file_name=file_path, user=user
)
content_type = response.headers.get("Content-Type", "")

if "image" in content_type or "application/octet-stream" in content_type:
return response.get_data(as_text=False), file_path, False

return (
response.get_data(as_text=True).encode("utf-8"),
file_path,
False,
)

def test_workflow(workflow: Workflow):
"""Test a workflow using Gherkin test files found in its specification."""
specification = workflow.reana_specification
workspace_path = pathlib.Path(workflow.workspace_path)
if specification.get("tests"):
return [
parser.parse_and_run_tests(
workspace_path / pathlib.Path(test_file),
workflow.name,
DataFetcherDB(),
)
for test_file in specification["tests"]["files"]
]
return []

try:
workflow = _get_workflow_with_uuid_or_name(workflow_id_or_name, str(user.id_))
multiple_results = test_workflow(workflow)
response = {
"message": "The workspace has successfully been tested.",
"results": sum(
[
[test_status_to_dict(result) for result in results[1]]
for results in multiple_results
],
[],
),
"workflow_id": workflow.id_,
"workflow_name": workflow.name,
}

return jsonify(response), 200
except HTTPError as e:
logging.exception(str(e))
return jsonify(e.response.json()), e.response.status_code
except ValueError as e:
# In case of invalid workflow name / UUID
logging.exception(str(e))
return jsonify({"message": str(e)}), 403
except Exception as e:
logging.exception(str(e))
return jsonify({"message": str(e)}), 500


def remove_decorators(func):
"""Remove all decorators from a function."""
while hasattr(func, "__wrapped__"):
func = func.__wrapped__
return func


def test_status_to_dict(test_status):
"""Convert test status object to dictionary."""
test_dict = asdict(test_status)
test_dict["result"] = test_status.result.name
# prevent None values from being returned in the response, to match schema
if not test_status.failed_testcase:
test_dict["failed_testcase"] = ""
if not test_status.error_log:
test_dict["error_log"] = ""
return test_dict


def _get_workflow_logs(workflow_id_or_name, user, **kwargs): # noqa
"""Alternative implementation of get_workflow_logs, necessary for test_workflow."""
try:
request_steps = request.json if request.is_json else None
if not workflow_id_or_name:
raise ValueError("workflow_id_or_name is not supplied")

response, http_response = current_rwc_api_client.api.get_workflow_logs(
user=str(user.id_),
steps=request_steps or kwargs.get("steps"),
workflow_id_or_name=workflow_id_or_name,
**{k: v for k, v in kwargs.items() if k != "steps"},
).result()

return jsonify(response), http_response.status_code
except HTTPError as e:
logging.error(traceback.format_exc())
return jsonify(e.response.json()), e.response.status_code
except ValueError as e:
logging.error(traceback.format_exc())
return jsonify({"message": str(e)}), 403
except Exception as e:
logging.error(traceback.format_exc())
return jsonify({"message": str(e)}), 500


def _get_workflow_disk_usage(workflow_id_or_name, user, parameters):
"""Alternative implementation of get_workflow_disk_usage, necessary for test_workflow."""
try:
if not workflow_id_or_name:
raise ValueError("workflow_id_or_name is not supplied")
workflow = _get_workflow_with_uuid_or_name(workflow_id_or_name, str(user.id_))
summarize = bool(parameters.get("summarize", False))
search = parameters.get("search", None)
disk_usage_info = workflow.get_workspace_disk_usage(
summarize=summarize, search=search
)
response = {
"workflow_id": workflow.id_,
"workflow_name": workflow.name,
"user": str(user.id_),
"disk_usage_info": disk_usage_info,
}

return jsonify(response), 200
except HTTPError as e:
logging.error(traceback.format_exc())
return jsonify(e.response.json()), e.response.status_code
except ValueError as e:
logging.error(traceback.format_exc())
return jsonify({"message": str(e)}), 403
except Exception as e:
logging.error(traceback.format_exc())
return jsonify({"message": str(e)}), 500
Loading