Skip to content

Commit

Permalink
Adding get job output route
Browse files Browse the repository at this point in the history
  • Loading branch information
Nazim-crim committed Oct 15, 2024
1 parent fe71427 commit 71f6f96
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 63 deletions.
78 changes: 35 additions & 43 deletions weaver/wps_restapi/jobs/jobs.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import os
from typing import TYPE_CHECKING

from box import Box
Expand All @@ -15,15 +14,14 @@
from weaver.processes.wps_package import mask_process_inputs
from weaver.status import JOB_STATUS_CATEGORIES, Status, StatusCategory
from weaver.store.base import StoreJobs
from weaver.transform.transform import Transform
from weaver.utils import get_settings
from weaver.wps.utils import get_wps_output_dir
from weaver.utils import get_any_value, get_settings
from weaver.wps_restapi import swagger_definitions as sd
from weaver.wps_restapi.jobs.utils import (
dismiss_job_task,
get_all_possible_formats_links,
get_job,
get_job_list_links,
get_job_output_transmission,
get_job_possible_output_formats,
get_job_results_response,
get_results,
Expand Down Expand Up @@ -404,67 +402,65 @@ def get_job_outputs(request):

@sd.provider_result_value_service.get(
tags=[sd.TAG_JOBS, sd.TAG_RESULTS, sd.TAG_PROVIDERS],
renderer=OutputFormat.JSON,
schema=sd.ProviderResultValueEndpoint(),
response_schemas=sd.FileResponseHeaders
response_schemas=sd.get_prov_result_responses
)
@sd.process_result_value_service.get(
tags=[sd.TAG_JOBS, sd.TAG_RESULTS, sd.TAG_PROCESSES],
renderer=OutputFormat.JSON,
schema=sd.ProcessResultValueEndpoint(),
response_schemas=sd.FileResponseHeaders
response_schemas=sd.get_proc_result_responses
)
@sd.job_result_value_service.get(
tags=[sd.TAG_JOBS, sd.TAG_RESULTS],
renderer=OutputFormat.JSON,
schema=sd.JobResultValueEndpoint(),
response_schemas=sd.FileResponseHeaders
response_schemas=sd.get_job_result_responses
)
@sd.provider_output_service.get(
tags=[sd.TAG_JOBS, sd.TAG_RESULTS, sd.TAG_PROCESSES],
renderer=OutputFormat.JSON,
schema=sd.ProviderAnyOutputEndpoint(),
response_schemas=sd.get_prov_output_responses
response_schemas=sd.get_prov_result_responses
)
@sd.process_output_service.get(
tags=[sd.TAG_JOBS, sd.TAG_RESULTS, sd.TAG_PROCESSES],
renderer=OutputFormat.JSON,
schema=sd.ProcessAnyOutputEndpoint(),
response_schemas=sd.FileResponseHeaders
response_schemas=sd.get_proc_result_responses
)
@sd.job_output_service.get(
tags=[sd.TAG_JOBS, sd.TAG_RESULTS, sd.TAG_PROCESSES],
renderer=OutputFormat.JSON,
schema=sd.JobAnyOutputEndpoint(),
response_schemas=sd.FileResponseHeaders
response_schemas=sd.get_job_result_responses
)
@log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description)
def get_job_output(request):
# type: (PyramidRequest) -> AnyResponseType
"""
Retrieve the output values resulting from a job execution.
"""
settings = get_settings(request)
# TODO REMOVE settings = get_settings(request)
output_id = request.matchdict.get("output_id")
# Get requested media-type. "*/*" if omit
accept = str(request.accept) if request.accept else "*/*"

job = get_job(request)
requested_media_types = get_job_possible_output_formats(job)[0]["alternatives"][0]

# Filtered by requested output_id
result_media_type = [o["mimeType"] for o in job.results if str(o["identifier"]) == output_id][0]

# if any format requested, we take the resulting one
frm = request.params.get("f")
if frm is not None:
accept = frm

if accept == "*/*":
accept = result_media_type
possible_media_types = get_job_possible_output_formats(job)[0]["alternatives"][0]
results = [o for o in job.results if str(o["identifier"]) == output_id]
if results:
result = results[0]
else:
raise HTTPNotFound(
json={
"code": "",
"description": "The requested output format is not in the possible output formats",
"cause": "Incompatible mime Types",
"error": "",
"value": ""
}
)
result_media_type = result["mimeType"]
result_media_type = guess_target_format(request, default=result_media_type)

# if format requested not in possible mediatypes...
if accept not in requested_media_types:
if accept not in possible_media_types:
raise HTTPUnprocessableEntity(json={
"code": "InvalidMimeTypeRequested",
"description": "The requested output format is not in the possible output formats",
Expand All @@ -473,20 +469,16 @@ def get_job_output(request):
"value": ""
})

# Get resulting file
reference = [o["reference"] for o in job.results if str(o["identifier"]) == output_id][0]
res_file = os.path.join(get_wps_output_dir(settings), reference)
if not os.path.exists(res_file):
raise HTTPNotFound({
"code": "JobFileNotExists",
"description": f"{str(output_id)} - the job result does not exist (anymore)",
"cause": "Job File Not Exists",
"error": type(HTTPNotFound).__name__,
"value": ""
})
is_reference = bool(get_any_value(result, key=True, file=True))
output_mode, output_format = get_job_output_transmission(job, output_id, is_reference)

# output_format = en priorite accept | output format | result_media_type
# TODO handle appeler get_job_results_single dans la pr ouvert de francis au lieu de generate result

# res_headers, res_data = generate_or_resolve_result(
# job, result, output_id, output_id, output_mode, output_format, request)

# Return resulting file transformed if necessary
return Transform(file_path=res_file, current_media_type=result_media_type, wanted_media_type=accept).get()
return # get_job_results_single (modifier pour ajouter flag bypass )


@sd.provider_results_service.get(
Expand Down
90 changes: 70 additions & 20 deletions weaver/wps_restapi/jobs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
ServiceNotAccessible,
ServiceNotFound
)
from weaver.execute import AnyExecuteTransmissionMode, ExecuteResponse, ExecuteTransmissionMode
from weaver.execute import ExecuteResponse, ExecuteReturnPreference, ExecuteTransmissionMode
from weaver.formats import ContentType, get_extension, get_format, repr_json
from weaver.owsexceptions import OWSNoApplicableCode, OWSNotFound
from weaver.processes.constants import JobInputsOutputsSchema
Expand All @@ -42,6 +42,7 @@
from weaver.transform import transform
from weaver.typedefs import AnyDataStream, HeadersType
from weaver.utils import (
data2str,
get_any_id,
get_any_value,
get_header,
Expand All @@ -62,6 +63,7 @@
if TYPE_CHECKING:
from typing import Dict, List, Optional, Tuple, Union

from weaver.execute import AnyExecuteTransmissionMode
from weaver.processes.constants import JobInputsOutputsSchemaType
from weaver.typedefs import (
AnyHeadersContainer,
Expand All @@ -75,6 +77,7 @@
ExecutionResults,
ExecutionResultValue,
HeadersTupleType,
JobValueFormat,
JSON,
PyramidRequest,
SettingsType
Expand Down Expand Up @@ -495,7 +498,31 @@ def get_results( # pylint: disable=R1260

return outputs, headers

# TODO CHECK THIS

def get_job_output_transmission(job, output_id, is_reference):
# type: (Job, str, bool) -> Tuple[AnyExecuteTransmissionMode, Optional[JobValueFormat]]
"""
Obtain the requested :term:`Job` output ``transmissionMode`` and ``format``.
"""
outputs = job.outputs or {}
outputs = convert_output_params_schema(outputs, JobInputsOutputsSchema.OGC)
out = outputs.get(output_id) or {}
out_mode = cast("AnyExecuteTransmissionMode", out.get("transmissionMode"))
out_fmt = cast("JobValueFormat", out.get("format"))

# raw/representation can change the output transmission mode if they are not overriding it
# document/minimal return is not checked, since it is our default, and will resolve as such anyway
if (
not out_mode and
job.execution_return == ExecuteReturnPreference.REPRESENTATION and
job.execution_response == ExecuteResponse.RAW
):
return ExecuteTransmissionMode.VALUE, out_fmt

# because mode can be omitted, resolve their default explicitly
if not out_mode:
out_mode = ExecuteTransmissionMode.REFERENCE if is_reference else ExecuteTransmissionMode.VALUE
return out_mode, out_fmt


def get_job_results_response(job, container, headers=None):
Expand Down Expand Up @@ -648,25 +675,30 @@ def generate_or_resolve_result(
result_id, # type: str
output_id, # type: str
output_mode, # type: AnyExecuteTransmissionMode
request, # type: AnyRequestType
output_format, # type: Optional[JobValueFormat] # FIXME: implement (https://github.com/crim-ca/weaver/pull/548)
settings, # type: SettingsType
): # type: (...) -> Tuple[HeadersType, Optional[AnyDataStream]]
"""
Obtains the local file path and the corresponding :term:`URL` reference for a given result, generating it as needed.
:param job: Job with results details.
:param result: The specific output value or reference (could be an item index within an array of a given output).
:param result_id: Specific identifier of the result, including any array index as applicable.
:param output_id: Generic identifier of the output containing the result.
:param output_mode: Desired output transmission mode.
:param output_format: Desired output transmission ``format``, with minimally the :term:`Media-Type`.
:param settings: Application settings to resolve locations.
:return: Resolved locations.
:return:
Resolved headers and data (as applicable) for the result.
If only returned by reference, ``None`` data is returned. An empty-data contents would be an empty string.
Therefore, the explicit check of ``None`` is important to identify a by-reference result.
"""
settings = get_settings(request)
key = get_any_value(result, key=True)
is_val = bool(get_any_value(result, key=True, file=False, data=True))
is_ref = bool(get_any_value(result, key=True, file=True, data=False))
val = get_any_value(result)
cid = f"{result_id}@{job.id}"
url = None
loc = None
typ = None
res_data = None
c_length = None

Expand All @@ -676,34 +708,52 @@ def generate_or_resolve_result(

# FIXME: Handle S3 output storage. Should multipart response even be allowed in this case?

if key == "href":
if is_ref:
url = val
typ = result.get("type") or ContentType.APP_OCTET_STREAM
loc = map_wps_output_location(val, settings, exists=True, url=False)
typ = result.get("type") # expected for typical link, but also check media-type variants in case pre-converted
typ = typ or get_field(result, "mime_type", search_variations=True, default=ContentType.APP_OCTET_STREAM)
job_out_url = job.result_path(output_id=output_id)
if url.startswith(f"/{job_out_url}/"): # job "relative" path
out_url = get_wps_output_url(settings)
url = os.path.join(out_url, url[1:])
loc = map_wps_output_location(url, settings, exists=True, url=False)
loc = get_secure_path(loc)
else:
typ = get_field(result, "mime_type", search_variations=True, default=ContentType.TEXT_PLAIN)

# TODO if else check si le type demande est different,
# faire conversation sinon rien faire
# FIXME : clean_media_type_format
out = get_field(output_format, "mime_type", search_variations=True, default=None)
# if out = none or out == typ , mettre dans if else
file_transform = transform.Transform(file_path=loc, current_media_type=typ, wanted_media_type=out)
typ = out
file_transform.get()
loc = file_transform.output_path

if not url:
out_dir = get_wps_output_dir(settings)
out_name = f"{result_id}.txt"
job_path = job.result_path(output_id=output_id, file_name=out_name)
loc = os.path.join(out_dir, job_path)
loc = get_secure_path(loc)
url = map_wps_output_location(loc, settings, exists=False, url=True)

if key == "value":
if is_val and output_mode == ExecuteTransmissionMode.VALUE:
res_data = io.StringIO()
c_length = res_data.write(val)
typ = ContentType.TEXT_PLAIN
# FIXME res_data = val si flag bypass est passer pour ensuite etre appeler dans
# https://github.com/crim-ca/weaver/blob/6f7aba71763201132d840caaf33e0d3e5bbdc5de/weaver/wps_restapi/jobs/utils.py#L886
c_length = res_data.write(data2str(val))

if key == "value" and output_mode == ExecuteTransmissionMode.REFERENCE:
if is_val and output_mode == ExecuteTransmissionMode.REFERENCE:
if not os.path.isfile(loc):
os.makedirs(os.path.dirname(loc), exist_ok=True)
with open(loc, mode="w", encoding="utf-8") as out_file:
out_file.write(val)

if key == "href" and output_mode == ExecuteTransmissionMode.VALUE:
res_data = io.FileIO(loc, mode="rb")
out_file.write(data2str(val))

# faire transform selon job output / result
# appeler weaver.format guess type TODO
if is_ref and output_mode == ExecuteTransmissionMode.VALUE and typ != ContentType.APP_DIR:
res_path = loc[7:] if loc.startswith("file://") else loc
res_data = io.FileIO(res_path, mode="rb")

res_headers = get_href_headers(
loc,
Expand Down

0 comments on commit 71f6f96

Please sign in to comment.