From 65178aa262391de67a17ffca4537763742ea8ae2 Mon Sep 17 00:00:00 2001 From: Alastair Lyall Date: Fri, 30 Aug 2024 19:26:28 +0200 Subject: [PATCH] feat(gherkin-tester): add new endpoint to test workflows (#699) Code for server-side equivalent `reana-client test`. --- reana_server/rest/workflows.py | 297 +++++++++++++++++++++++++++++++++ 1 file changed, 297 insertions(+) diff --git a/reana_server/rest/workflows.py b/reana_server/rest/workflows.py index 3657bdfc..3ad363c6 100644 --- a/reana_server/rest/workflows.py +++ b/reana_server/rest/workflows.py @@ -55,6 +55,11 @@ get_workspace_retention_rules, is_uuid_v4, ) +from reana_commons.gherkin_parser.data_fetcher import DataFetcherBase +from dataclasses import asdict +import pathlib +import reana_commons.gherkin_parser.parser as parser +from reana_db.models import Workflow try: from urllib import parse as urlparse @@ -3262,3 +3267,295 @@ def prune_workspace( except Exception as e: logging.exception(str(e)) return jsonify({"message": str(e)}), 500 + + +@blueprint.route("/workflows//test_workflow", methods=["GET"]) +@signin_required() +def test_workflow(workflow_id_or_name, user): + r"""Test a workflow. + + --- + get: + summary: Test a workflow. + description: >- + This resource tests a given workflow by using Gherkin test files found in its specification. + operationId: test_workflow + produces: + - application/json + parameters: + - name: access_token + in: query + description: The API access_token of workflow owner. + required: false + type: string + - name: workflow_id_or_name + in: path + description: Required. Analysis UUID or name. + required: true + type: string + responses: + 200: + description: >- + Request succeeded. The workflow has been tested. + schema: + type: object + properties: + workflow_id: + type: string + workflow_name: + type: string + results: + type: array + items: + type: object + properties: + scenario: + type: string + failed_testcase: + type: string + result: + type: string + error_log: + type: string + feature: + type: string + checked_at: + type: string + + examples: + application/json: + { + "message": "The workspace has successfully been tested.", + "workflow_id": "cdcf48b1-c2f3-4693-8230-b066e088c6ac", + "workflow_name": "mytest.1", + "results": [{"scenario": "Test scenario", failed_testcase": "", "result": "passed", "error_log": "", "feature": "Test feature", "checked_at": "2024-11-24T23:59:59"}] + } + 400: + description: >- + Request failed. The incoming data specification seems malformed. + schema: + type: object + properties: + message: + type: string + examples: + application/json: + { + "message": "Malformed request." + } + 403: + description: >- + Request failed. User is not allowed to access workflow. + schema: + type: object + properties: + message: + type: string + examples: + application/json: + { + "message": "User 00000000-0000-0000-0000-000000000000 + is not allowed to access workflow + 256b25f4-4cfb-4684-b7a8-73872ef455a1" + } + 404: + description: >- + Request failed. User does not exist. + schema: + type: object + properties: + message: + type: string + examples: + application/json: + { + "message": "Workflow cdcf48b1-c2f3-4693-8230-b066e088c6ac does + not exist" + } + 500: + description: >- + Request failed. Internal controller error. + schema: + type: object + properties: + message: + type: string + examples: + application/json: + { + "message": "Internal controller error." + } + """ + + class DataFetcherDB(DataFetcherBase): + """Implementation of DataFetcherBase using server side functions.""" + + def list_files( + self, workflow, file_name=None, page=None, size=None, search=None + ): + # to prevent multiple "user" arguments being passed + _get_files = remove_decorators(get_files) + response, code = _get_files( + workflow, user, file_name=file_name, page=page, size=size, search=search + ) + return json.loads((response.get_data()).decode("utf-8"))["items"] + + def get_workflow_disk_usage(self, workflow, parameters): + """Display disk usage workflow.""" + response, code = _get_workflow_disk_usage(workflow, user, parameters) + return response.get_json() + + def get_workflow_logs(self, workflow, steps=None, page=None, size=None): + """Get logs from a workflow engine, use existing API function.""" + response, code = _get_workflow_logs( + workflow, user, steps=steps, page=page, size=size + ) + return response.get_json() + + def get_workflow_status(self, workflow): + """Get status of a previously created workflow.""" + _get_workflow_status = remove_decorators(get_workflow_status) + response, code = _get_workflow_status(workflow, user) + return response.get_json() + + def get_workflow_specification(self, workflow): + """Get specification of previously created workflow.""" + _get_workflow_specification = remove_decorators(get_workflow_specification) + response, code = _get_workflow_specification(workflow, user) + return response.get_json() + + def download_file(self, workflow, file_path): + """Download the requested file if it exists.""" + _download_file = remove_decorators(download_file) + + response, code = _download_file( + workflow_id_or_name=workflow, file_name=file_path, user=user + ) + content_type = response.headers.get("Content-Type", "") + + if "image" in content_type or "application/octet-stream" in content_type: + return response.get_data(as_text=False), file_path, False + + return ( + response.get_data(as_text=True).encode("utf-8"), + file_path, + False, + ) + + def test_workflow(workflow: Workflow): + """Test a workflow using Gherkin test files found in its specification.""" + specification = workflow.reana_specification + workspace_path = pathlib.Path(workflow.workspace_path) + if specification.get("tests"): + return [ + parser.parse_and_run_tests( + workspace_path / pathlib.Path(test_file), + workflow.name, + DataFetcherDB(), + ) + for test_file in specification["tests"]["files"] + ] + return [] + + try: + workflow = _get_workflow_with_uuid_or_name(workflow_id_or_name, str(user.id_)) + multiple_results = test_workflow(workflow) + response = { + "message": "The workspace has successfully been tested.", + "results": sum( + [ + [test_status_to_dict(result) for result in results[1]] + for results in multiple_results + ], + [], + ), + "workflow_id": workflow.id_, + "workflow_name": workflow.name, + } + + return jsonify(response), 200 + except HTTPError as e: + logging.exception(str(e)) + return jsonify(e.response.json()), e.response.status_code + except ValueError as e: + # In case of invalid workflow name / UUID + logging.exception(str(e)) + return jsonify({"message": str(e)}), 403 + except Exception as e: + logging.exception(str(e)) + return jsonify({"message": str(e)}), 500 + + +def remove_decorators(func): + """Remove all decorators from a function.""" + while hasattr(func, "__wrapped__"): + func = func.__wrapped__ + return func + + +def test_status_to_dict(test_status): + """Convert test status object to dictionary.""" + test_dict = asdict(test_status) + test_dict["result"] = test_status.result.name + # prevent None values from being returned in the response, to match schema + if not test_status.failed_testcase: + test_dict["failed_testcase"] = "" + if not test_status.error_log: + test_dict["error_log"] = "" + return test_dict + + +def _get_workflow_logs(workflow_id_or_name, user, **kwargs): # noqa + """Alternative implementation of get_workflow_logs, necessary for test_workflow.""" + try: + request_steps = request.json if request.is_json else None + if not workflow_id_or_name: + raise ValueError("workflow_id_or_name is not supplied") + + response, http_response = current_rwc_api_client.api.get_workflow_logs( + user=str(user.id_), + steps=request_steps or kwargs.get("steps"), + workflow_id_or_name=workflow_id_or_name, + **{k: v for k, v in kwargs.items() if k != "steps"}, + ).result() + + return jsonify(response), http_response.status_code + except HTTPError as e: + logging.error(traceback.format_exc()) + return jsonify(e.response.json()), e.response.status_code + except ValueError as e: + logging.error(traceback.format_exc()) + return jsonify({"message": str(e)}), 403 + except Exception as e: + logging.error(traceback.format_exc()) + return jsonify({"message": str(e)}), 500 + + +def _get_workflow_disk_usage(workflow_id_or_name, user, parameters): + """Alternative implementation of get_workflow_disk_usage, necessary for test_workflow.""" + try: + if not workflow_id_or_name: + raise ValueError("workflow_id_or_name is not supplied") + workflow = _get_workflow_with_uuid_or_name(workflow_id_or_name, str(user.id_)) + summarize = bool(parameters.get("summarize", False)) + search = parameters.get("search", None) + disk_usage_info = workflow.get_workspace_disk_usage( + summarize=summarize, search=search + ) + response = { + "workflow_id": workflow.id_, + "workflow_name": workflow.name, + "user": str(user.id_), + "disk_usage_info": disk_usage_info, + } + + return jsonify(response), 200 + except HTTPError as e: + logging.error(traceback.format_exc()) + return jsonify(e.response.json()), e.response.status_code + except ValueError as e: + logging.error(traceback.format_exc()) + return jsonify({"message": str(e)}), 403 + except Exception as e: + logging.error(traceback.format_exc()) + return jsonify({"message": str(e)}), 500