From 71f6f9639b73d8c1251c34583a6769a8ba4668dc Mon Sep 17 00:00:00 2001 From: Nazim Azeli Date: Tue, 15 Oct 2024 10:50:40 -0400 Subject: [PATCH] Adding get job output route --- weaver/wps_restapi/jobs/jobs.py | 78 +++++++++++++-------------- weaver/wps_restapi/jobs/utils.py | 90 +++++++++++++++++++++++++------- 2 files changed, 105 insertions(+), 63 deletions(-) diff --git a/weaver/wps_restapi/jobs/jobs.py b/weaver/wps_restapi/jobs/jobs.py index b64b68dd5..a674b18bc 100644 --- a/weaver/wps_restapi/jobs/jobs.py +++ b/weaver/wps_restapi/jobs/jobs.py @@ -1,4 +1,3 @@ -import os from typing import TYPE_CHECKING from box import Box @@ -15,15 +14,14 @@ from weaver.processes.wps_package import mask_process_inputs from weaver.status import JOB_STATUS_CATEGORIES, Status, StatusCategory from weaver.store.base import StoreJobs -from weaver.transform.transform import Transform -from weaver.utils import get_settings -from weaver.wps.utils import get_wps_output_dir +from weaver.utils import get_any_value, get_settings from weaver.wps_restapi import swagger_definitions as sd from weaver.wps_restapi.jobs.utils import ( dismiss_job_task, get_all_possible_formats_links, get_job, get_job_list_links, + get_job_output_transmission, get_job_possible_output_formats, get_job_results_response, get_results, @@ -404,39 +402,33 @@ def get_job_outputs(request): @sd.provider_result_value_service.get( tags=[sd.TAG_JOBS, sd.TAG_RESULTS, sd.TAG_PROVIDERS], - renderer=OutputFormat.JSON, schema=sd.ProviderResultValueEndpoint(), - response_schemas=sd.FileResponseHeaders + response_schemas=sd.get_prov_result_responses ) @sd.process_result_value_service.get( tags=[sd.TAG_JOBS, sd.TAG_RESULTS, sd.TAG_PROCESSES], - renderer=OutputFormat.JSON, schema=sd.ProcessResultValueEndpoint(), - response_schemas=sd.FileResponseHeaders + response_schemas=sd.get_proc_result_responses ) @sd.job_result_value_service.get( tags=[sd.TAG_JOBS, sd.TAG_RESULTS], - renderer=OutputFormat.JSON, schema=sd.JobResultValueEndpoint(), - response_schemas=sd.FileResponseHeaders + response_schemas=sd.get_job_result_responses ) @sd.provider_output_service.get( tags=[sd.TAG_JOBS, sd.TAG_RESULTS, sd.TAG_PROCESSES], - renderer=OutputFormat.JSON, schema=sd.ProviderAnyOutputEndpoint(), - response_schemas=sd.get_prov_output_responses + response_schemas=sd.get_prov_result_responses ) @sd.process_output_service.get( tags=[sd.TAG_JOBS, sd.TAG_RESULTS, sd.TAG_PROCESSES], - renderer=OutputFormat.JSON, schema=sd.ProcessAnyOutputEndpoint(), - response_schemas=sd.FileResponseHeaders + response_schemas=sd.get_proc_result_responses ) @sd.job_output_service.get( tags=[sd.TAG_JOBS, sd.TAG_RESULTS, sd.TAG_PROCESSES], - renderer=OutputFormat.JSON, schema=sd.JobAnyOutputEndpoint(), - response_schemas=sd.FileResponseHeaders + response_schemas=sd.get_job_result_responses ) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description) def get_job_output(request): @@ -444,27 +436,31 @@ def get_job_output(request): """ Retrieve the output values resulting from a job execution. """ - settings = get_settings(request) + # TODO REMOVE settings = get_settings(request) output_id = request.matchdict.get("output_id") # Get requested media-type. "*/*" if omit accept = str(request.accept) if request.accept else "*/*" job = get_job(request) - requested_media_types = get_job_possible_output_formats(job)[0]["alternatives"][0] - - # Filtered by requested output_id - result_media_type = [o["mimeType"] for o in job.results if str(o["identifier"]) == output_id][0] - - # if any format requested, we take the resulting one - frm = request.params.get("f") - if frm is not None: - accept = frm - - if accept == "*/*": - accept = result_media_type + possible_media_types = get_job_possible_output_formats(job)[0]["alternatives"][0] + results = [o for o in job.results if str(o["identifier"]) == output_id] + if results: + result = results[0] + else: + raise HTTPNotFound( + json={ + "code": "", + "description": "The requested output format is not in the possible output formats", + "cause": "Incompatible mime Types", + "error": "", + "value": "" + } + ) + result_media_type = result["mimeType"] + result_media_type = guess_target_format(request, default=result_media_type) # if format requested not in possible mediatypes... - if accept not in requested_media_types: + if accept not in possible_media_types: raise HTTPUnprocessableEntity(json={ "code": "InvalidMimeTypeRequested", "description": "The requested output format is not in the possible output formats", @@ -473,20 +469,16 @@ def get_job_output(request): "value": "" }) - # Get resulting file - reference = [o["reference"] for o in job.results if str(o["identifier"]) == output_id][0] - res_file = os.path.join(get_wps_output_dir(settings), reference) - if not os.path.exists(res_file): - raise HTTPNotFound({ - "code": "JobFileNotExists", - "description": f"{str(output_id)} - the job result does not exist (anymore)", - "cause": "Job File Not Exists", - "error": type(HTTPNotFound).__name__, - "value": "" - }) + is_reference = bool(get_any_value(result, key=True, file=True)) + output_mode, output_format = get_job_output_transmission(job, output_id, is_reference) + + # output_format = en priorite accept | output format | result_media_type + # TODO handle appeler get_job_results_single dans la pr ouvert de francis au lieu de generate result + + # res_headers, res_data = generate_or_resolve_result( + # job, result, output_id, output_id, output_mode, output_format, request) - # Return resulting file transformed if necessary - return Transform(file_path=res_file, current_media_type=result_media_type, wanted_media_type=accept).get() + return # get_job_results_single (modifier pour ajouter flag bypass ) @sd.provider_results_service.get( diff --git a/weaver/wps_restapi/jobs/utils.py b/weaver/wps_restapi/jobs/utils.py index e0198bbe4..dca9ac7a0 100644 --- a/weaver/wps_restapi/jobs/utils.py +++ b/weaver/wps_restapi/jobs/utils.py @@ -32,7 +32,7 @@ ServiceNotAccessible, ServiceNotFound ) -from weaver.execute import AnyExecuteTransmissionMode, ExecuteResponse, ExecuteTransmissionMode +from weaver.execute import ExecuteResponse, ExecuteReturnPreference, ExecuteTransmissionMode from weaver.formats import ContentType, get_extension, get_format, repr_json from weaver.owsexceptions import OWSNoApplicableCode, OWSNotFound from weaver.processes.constants import JobInputsOutputsSchema @@ -42,6 +42,7 @@ from weaver.transform import transform from weaver.typedefs import AnyDataStream, HeadersType from weaver.utils import ( + data2str, get_any_id, get_any_value, get_header, @@ -62,6 +63,7 @@ if TYPE_CHECKING: from typing import Dict, List, Optional, Tuple, Union + from weaver.execute import AnyExecuteTransmissionMode from weaver.processes.constants import JobInputsOutputsSchemaType from weaver.typedefs import ( AnyHeadersContainer, @@ -75,6 +77,7 @@ ExecutionResults, ExecutionResultValue, HeadersTupleType, + JobValueFormat, JSON, PyramidRequest, SettingsType @@ -495,7 +498,31 @@ def get_results( # pylint: disable=R1260 return outputs, headers -# TODO CHECK THIS + +def get_job_output_transmission(job, output_id, is_reference): + # type: (Job, str, bool) -> Tuple[AnyExecuteTransmissionMode, Optional[JobValueFormat]] + """ + Obtain the requested :term:`Job` output ``transmissionMode`` and ``format``. + """ + outputs = job.outputs or {} + outputs = convert_output_params_schema(outputs, JobInputsOutputsSchema.OGC) + out = outputs.get(output_id) or {} + out_mode = cast("AnyExecuteTransmissionMode", out.get("transmissionMode")) + out_fmt = cast("JobValueFormat", out.get("format")) + + # raw/representation can change the output transmission mode if they are not overriding it + # document/minimal return is not checked, since it is our default, and will resolve as such anyway + if ( + not out_mode and + job.execution_return == ExecuteReturnPreference.REPRESENTATION and + job.execution_response == ExecuteResponse.RAW + ): + return ExecuteTransmissionMode.VALUE, out_fmt + + # because mode can be omitted, resolve their default explicitly + if not out_mode: + out_mode = ExecuteTransmissionMode.REFERENCE if is_reference else ExecuteTransmissionMode.VALUE + return out_mode, out_fmt def get_job_results_response(job, container, headers=None): @@ -648,25 +675,30 @@ def generate_or_resolve_result( result_id, # type: str output_id, # type: str output_mode, # type: AnyExecuteTransmissionMode - request, # type: AnyRequestType + output_format, # type: Optional[JobValueFormat] # FIXME: implement (https://github.com/crim-ca/weaver/pull/548) + settings, # type: SettingsType ): # type: (...) -> Tuple[HeadersType, Optional[AnyDataStream]] """ Obtains the local file path and the corresponding :term:`URL` reference for a given result, generating it as needed. + :param job: Job with results details. :param result: The specific output value or reference (could be an item index within an array of a given output). :param result_id: Specific identifier of the result, including any array index as applicable. :param output_id: Generic identifier of the output containing the result. :param output_mode: Desired output transmission mode. + :param output_format: Desired output transmission ``format``, with minimally the :term:`Media-Type`. :param settings: Application settings to resolve locations. - :return: Resolved locations. + :return: + Resolved headers and data (as applicable) for the result. + If only returned by reference, ``None`` data is returned. An empty-data contents would be an empty string. + Therefore, the explicit check of ``None`` is important to identify a by-reference result. """ - settings = get_settings(request) - key = get_any_value(result, key=True) + is_val = bool(get_any_value(result, key=True, file=False, data=True)) + is_ref = bool(get_any_value(result, key=True, file=True, data=False)) val = get_any_value(result) cid = f"{result_id}@{job.id}" url = None loc = None - typ = None res_data = None c_length = None @@ -676,34 +708,52 @@ def generate_or_resolve_result( # FIXME: Handle S3 output storage. Should multipart response even be allowed in this case? - if key == "href": + if is_ref: url = val - typ = result.get("type") or ContentType.APP_OCTET_STREAM - loc = map_wps_output_location(val, settings, exists=True, url=False) + typ = result.get("type") # expected for typical link, but also check media-type variants in case pre-converted + typ = typ or get_field(result, "mime_type", search_variations=True, default=ContentType.APP_OCTET_STREAM) + job_out_url = job.result_path(output_id=output_id) + if url.startswith(f"/{job_out_url}/"): # job "relative" path + out_url = get_wps_output_url(settings) + url = os.path.join(out_url, url[1:]) + loc = map_wps_output_location(url, settings, exists=True, url=False) + loc = get_secure_path(loc) + else: + typ = get_field(result, "mime_type", search_variations=True, default=ContentType.TEXT_PLAIN) + + # TODO if else check si le type demande est different, + # faire conversation sinon rien faire + # FIXME : clean_media_type_format + out = get_field(output_format, "mime_type", search_variations=True, default=None) + # if out = none or out == typ , mettre dans if else + file_transform = transform.Transform(file_path=loc, current_media_type=typ, wanted_media_type=out) + typ = out + file_transform.get() + loc = file_transform.output_path if not url: out_dir = get_wps_output_dir(settings) out_name = f"{result_id}.txt" job_path = job.result_path(output_id=output_id, file_name=out_name) loc = os.path.join(out_dir, job_path) + loc = get_secure_path(loc) url = map_wps_output_location(loc, settings, exists=False, url=True) - if key == "value": + if is_val and output_mode == ExecuteTransmissionMode.VALUE: res_data = io.StringIO() - c_length = res_data.write(val) - typ = ContentType.TEXT_PLAIN + # FIXME res_data = val si flag bypass est passer pour ensuite etre appeler dans + # https://github.com/crim-ca/weaver/blob/6f7aba71763201132d840caaf33e0d3e5bbdc5de/weaver/wps_restapi/jobs/utils.py#L886 + c_length = res_data.write(data2str(val)) - if key == "value" and output_mode == ExecuteTransmissionMode.REFERENCE: + if is_val and output_mode == ExecuteTransmissionMode.REFERENCE: if not os.path.isfile(loc): os.makedirs(os.path.dirname(loc), exist_ok=True) with open(loc, mode="w", encoding="utf-8") as out_file: - out_file.write(val) - - if key == "href" and output_mode == ExecuteTransmissionMode.VALUE: - res_data = io.FileIO(loc, mode="rb") + out_file.write(data2str(val)) - # faire transform selon job output / result - # appeler weaver.format guess type TODO + if is_ref and output_mode == ExecuteTransmissionMode.VALUE and typ != ContentType.APP_DIR: + res_path = loc[7:] if loc.startswith("file://") else loc + res_data = io.FileIO(res_path, mode="rb") res_headers = get_href_headers( loc,