Skip to content

Commit

Permalink
add schema query / profile content-type for openeo job status response
Browse files Browse the repository at this point in the history
  • Loading branch information
fmigneault committed Oct 22, 2024
1 parent 9eba01f commit 9b17af6
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 19 deletions.
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ markers =
oap_part2: mark test as 'OGC API - Processes - Part 2: Deploy, Replace, Undeploy (DRU)' functionalities
oap_part3: mark test as 'OGC API - Processes - Part 3: Workflows and Chaining' functionalities
oap_part4: mark test as 'OGC API - Processes - Part 4: Job Management' functionalities
openeo: mark test as evaluating 'openEO' functionalities
filterwarnings =
ignore:No file specified for WPS-1 providers registration:RuntimeWarning
ignore:.*configuration setting.*weaver\.cwl_processes_dir.*:RuntimeWarning
Expand Down
2 changes: 2 additions & 0 deletions tests/wps_restapi/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -1929,6 +1929,7 @@ def test_job_update_response_process_disallowed(self):
}

@pytest.mark.oap_part4
@pytest.mark.openeo
def test_job_status_alt_openeo_accept_response(self):
"""
Validate retrieval of :term:`Job` status response with alternate value mapping by ``Accept`` header.
Expand Down Expand Up @@ -1967,6 +1968,7 @@ def test_job_status_alt_openeo_accept_response(self):
assert resp.json["status"] == Status.QUEUED

@pytest.mark.oap_part4
@pytest.mark.openeo
def test_job_status_alt_openeo_profile_response(self):
"""
Validate retrieval of :term:`Job` status response with alternate value mapping by ``profile`` query parameter.
Expand Down
11 changes: 11 additions & 0 deletions weaver/processes/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,9 @@ class OpenSearchField(Constants):
JobInputsOutputsSchemaAnyOGCType = Union[JobInputsOutputsSchemaType_OGC, JobInputsOutputsSchemaType_OGC_STRICT]
JobInputsOutputsSchemaAnyOLDType = Union[JobInputsOutputsSchemaType_OLD, JobInputsOutputsSchemaType_OLD_STRICT]
JobInputsOutputsSchemaType = Union[JobInputsOutputsSchemaAnyOGCType, JobInputsOutputsSchemaAnyOLDType]
JobStatusSchemaType_OGC = Literal["OGC", "ogc"]
JobStatusSchemaType_OpenEO = Literal["OPENEO", "openeo", "openEO", "OpenEO"]
JobStatusSchemaType = Union[JobStatusSchemaType_OGC, JobStatusSchemaType_OpenEO]


class ProcessSchema(Constants):
Expand All @@ -386,6 +389,14 @@ class JobInputsOutputsSchema(Constants):
OLD = "old" # type: JobInputsOutputsSchemaType_OLD


class JobStatusSchema(Constants):
"""
Schema selector to represent a :term:`Job` status response.
"""
OGC = "ogc" # type: JobStatusSchemaType_OGC
OPENEO = "openeo" # type: JobStatusSchemaType_OpenEO


if TYPE_CHECKING:
# pylint: disable=invalid-name
CWL_RequirementNames = Literal[
Expand Down
12 changes: 10 additions & 2 deletions weaver/processes/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -1911,17 +1911,25 @@ def convert_input_values_schema(inputs, schema):


@overload
def convert_output_params_schema(inputs, schema):
def convert_output_params_schema(outputs, schema):
# type: (Optional[ExecutionOutputs], JobInputsOutputsSchemaAnyOGCType) -> Optional[ExecutionOutputsMap]
...


@overload
def convert_output_params_schema(inputs, schema):
def convert_output_params_schema(outputs, schema):
# type: (Optional[ExecutionOutputs], JobInputsOutputsSchemaAnyOLDType) -> Optional[ExecutionOutputsList]
...


# FIXME: workaround typing duplicate
# (https://youtrack.jetbrains.com/issue/PY-76786/Typing-literal-with-overload-fails-to-consider-non-overloaded-type)
@overload
def convert_output_params_schema(outputs, schema):
# type: (Optional[ExecutionOutputs], JobInputsOutputsSchemaType) -> Optional[ExecutionOutputs]
...


def convert_output_params_schema(outputs, schema):
# type: (Optional[ExecutionOutputs], JobInputsOutputsSchemaType) -> Optional[ExecutionOutputs]
"""
Expand Down
4 changes: 2 additions & 2 deletions weaver/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ class Status(Constants):
Status.CREATED, # Part 4: Job Management
Status.ACCEPTED,
Status.RUNNING,
Status.SUCCEEDED, # old (keep it because it matches existing ADES/EMS and other providers)
Status.SUCCEEDED, # new
Status.FAILED,
Status.SUCCESSFUL, # new
Status.SUCCESSFUL, # old (keep it because it matches existing ADES/EMS and other providers)
Status.DISMISSED # new
]),
StatusCompliant.PYWPS: frozenset([
Expand Down
21 changes: 13 additions & 8 deletions weaver/wps_restapi/jobs/jobs.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from moto.batch.utils import JobStatus
from typing import TYPE_CHECKING

from box import Box
Expand Down Expand Up @@ -25,7 +26,7 @@
guess_target_format,
repr_json
)
from weaver.processes.constants import JobInputsOutputsSchema
from weaver.processes.constants import JobInputsOutputsSchema, JobStatusSchema
from weaver.processes.convert import convert_input_values_schema, convert_output_params_schema
from weaver.processes.execution import (
submit_job,
Expand All @@ -35,7 +36,7 @@
)
from weaver.processes.utils import get_process
from weaver.processes.wps_package import mask_process_inputs
from weaver.status import JOB_STATUS_CATEGORIES, Status, StatusCategory
from weaver.status import JOB_STATUS_CATEGORIES, Status, StatusCategory, StatusCompliant, map_status
from weaver.store.base import StoreJobs
from weaver.utils import get_header, get_settings, make_link_header
from weaver.wps_restapi import swagger_definitions as sd
Expand All @@ -45,7 +46,8 @@
get_job_list_links,
get_job_results_response,
get_results,
get_schema_query,
get_job_io_schema_query,
get_job_status_schema,
raise_job_bad_status_locked,
raise_job_bad_status_success,
raise_job_dismissed,
Expand Down Expand Up @@ -320,8 +322,11 @@ def get_job_status(request):
Retrieve the status of a job.
"""
job = get_job(request)
job_status = job.json(request)
return HTTPOk(json=job_status)
job_body = job.json(request)
schema = get_job_status_schema(request)
if schema == JobStatusSchema.OPENEO:
job_body["status"] = map_status(job_body["status"], StatusCompliant.OPENEO)
return HTTPOk(json=job_body)


@sd.provider_job_service.patch(
Expand Down Expand Up @@ -485,14 +490,14 @@ def get_job_inputs(request):
Retrieve the inputs values and outputs definitions of a job.
"""
job = get_job(request)
schema = get_schema_query(request.params.get("schema"), strict=False, default=JobInputsOutputsSchema.OGC)
schema = get_job_io_schema_query(request.params.get("schema"), strict=False, default=JobInputsOutputsSchema.OGC)
job_inputs = job.inputs
job_outputs = job.outputs
if job.is_local:
process = get_process(job.process, request=request)
job_inputs = mask_process_inputs(process.package, job_inputs)
job_inputs = convert_input_values_schema(job_inputs, schema)
job_outputs = convert_output_params_schema(job_outputs, schema) # type: ignore
job_outputs = convert_output_params_schema(job_outputs, schema)
job_prefer = rebuild_prefer_header(job)
job_mode, _, _ = parse_prefer_header_execute_mode({"Prefer": job_prefer}, return_auto=True)
job_headers = {
Expand Down Expand Up @@ -543,7 +548,7 @@ def get_job_outputs(request):
job = get_job(request)
raise_job_dismissed(job, request)
raise_job_bad_status_success(job, request)
schema = get_schema_query(request.params.get("schema"), default=JobInputsOutputsSchema.OGC)
schema = get_job_io_schema_query(request.params.get("schema"), default=JobInputsOutputsSchema.OGC)
results, _ = get_results(job, request, schema=schema, link_references=False)
outputs = {"outputs": results}
outputs.update({"links": job.links(request, self_link="outputs")})
Expand Down
46 changes: 40 additions & 6 deletions weaver/wps_restapi/jobs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import os
import shutil
from copy import deepcopy
from typing import TYPE_CHECKING, cast
from typing import TYPE_CHECKING, cast, overload

import colander
from celery.utils.log import get_task_logger
Expand Down Expand Up @@ -42,7 +42,7 @@
)
from weaver.formats import ContentEncoding, ContentType, get_format, repr_json
from weaver.owsexceptions import OWSNoApplicableCode, OWSNotFound
from weaver.processes.constants import JobInputsOutputsSchema
from weaver.processes.constants import JobInputsOutputsSchema, JobStatusSchema
from weaver.processes.convert import any2wps_literal_datatype, convert_output_params_schema, get_field
from weaver.status import JOB_STATUS_CATEGORIES, Status, StatusCategory, map_status
from weaver.store.base import StoreJobs, StoreProcesses, StoreServices
Expand All @@ -54,12 +54,14 @@
get_header,
get_href_headers,
get_path_kvp,
get_request_args,
get_sane_name,
get_secure_path,
get_settings,
get_weaver_url,
is_uuid,
make_link_header
make_link_header,
parse_kvp
)
from weaver.visibility import Visibility
from weaver.wps.utils import get_wps_output_dir, get_wps_output_url, map_wps_output_location
Expand All @@ -72,7 +74,7 @@

from weaver.execute import AnyExecuteResponse, AnyExecuteReturnPreference, AnyExecuteTransmissionMode
from weaver.formats import AnyContentEncoding
from weaver.processes.constants import JobInputsOutputsSchemaType
from weaver.processes.constants import JobInputsOutputsSchemaType, JobStatusSchemaType
from weaver.typedefs import (
AnyDataStream,
AnyHeadersContainer,
Expand Down Expand Up @@ -280,8 +282,17 @@ def get_job_list_links(job_total, filters, request):
return links


def get_schema_query(
schema, # type: Optional[JobInputsOutputsSchemaType]
@overload
def get_job_io_schema_query(
schema, # type: Optional[str]
strict=True, # type: bool
default=None, # type: JobInputsOutputsSchemaType
): # type: (...) -> JobInputsOutputsSchemaType
...


def get_job_io_schema_query(
schema, # type: Optional[str]
strict=True, # type: bool
default=None, # type: Optional[JobInputsOutputsSchemaType]
): # type: (...) -> Optional[JobInputsOutputsSchemaType]
Expand All @@ -305,6 +316,29 @@ def get_schema_query(
return schema_checked


def get_job_status_schema(request):
# type: (AnyRequestType) -> JobStatusSchemaType
"""
Identifies if a :term:`Job` status response schema applies for the request.
"""
params = get_request_args(request)
schema = JobStatusSchema.get(params.get("schema"))
if schema:
return schema
ctype = get_header("Content-Type", request.headers)
if not ctype:
return JobStatusSchema.OGC
params = parse_kvp(ctype)
profile = params.get("profile")
if not profile:
return JobStatusSchema.OGC
schema = cast(
"JobStatusSchemaType",
JobStatusSchema.get(profile, default=JobStatusSchema.OGC)
)
return schema


def make_result_link(
job, # type: Job
result, # type: ExecutionResultValue
Expand Down
21 changes: 20 additions & 1 deletion weaver/wps_restapi/swagger_definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
PACKAGE_TYPE_POSSIBLE_VALUES,
WPS_LITERAL_DATA_TYPES,
JobInputsOutputsSchema,
JobStatusSchema,
ProcessSchema
)
from weaver.quotation.status import QuoteStatus
Expand Down Expand Up @@ -3264,12 +3265,26 @@ class ProcessVisibilityPutEndpoint(LocalProcessPath):
body = VisibilitySchema()


class GetJobQuery(ExtendedMappingSchema):
schema = ExtendedSchemaNode(
String(),
title="JobStatusQuerySchema",
example=JobStatusSchema.OGC,
default=JobStatusSchema.OGC,
validator=OneOfCaseInsensitive(JobStatusSchema.values()),
summary="Job status schema representation.",
description="Selects the schema employed for representation of returned job status response.",
)


class GetProviderJobEndpoint(ProviderProcessPath, JobPath):
header = RequestHeaders()
querystring = GetJobQuery()


class GetJobEndpoint(JobPath):
header = RequestHeaders()
querystring = GetJobQuery()


class ProcessInputsEndpoint(LocalProcessPath, JobPath):
Expand Down Expand Up @@ -6717,9 +6732,13 @@ class DeleteProviderJobsEndpoint(DeleteJobsEndpoint, ProviderProcessPath):
pass


class GetProcessJobQuery(LocalProcessQuery, GetJobQuery):
pass


class GetProcessJobEndpoint(LocalProcessPath):
header = RequestHeaders()
querystring = LocalProcessQuery()
querystring = GetProcessJobQuery()


class DeleteJobEndpoint(JobPath):
Expand Down

0 comments on commit 9b17af6

Please sign in to comment.