Skip to content

Commit

Permalink
add job update operations
Browse files Browse the repository at this point in the history
  • Loading branch information
fmigneault committed Oct 17, 2024
1 parent c6356bd commit c587dac
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 13 deletions.
43 changes: 38 additions & 5 deletions tests/wps_restapi/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
setup_mongodb_servicestore
)
from weaver.compat import Version
from weaver.datatype import Job, Service
from weaver.datatype import Job, Process, Service
from weaver.execute import ExecuteMode, ExecuteResponse, ExecuteReturnPreference, ExecuteTransmissionMode
from weaver.formats import ContentType
from weaver.notify import decrypt_email
Expand Down Expand Up @@ -1804,16 +1804,17 @@ def test_job_update_locked(self):
)
path = f"/jobs/{new_job.id}"
body = {"inputs": {"test": 400}}
resp = self.app.patch(path, params=body, headers=self.json_headers, expect_errors=True)
resp = self.app.patch_json(path, params=body, headers=self.json_headers, expect_errors=True)
assert resp.status_code == 423
assert resp.json["type"] == "http://www.opengis.net/def/exceptions/ogcapi-processes-4/1.0/locked"

@pytest.mark.oap_part4
def test_job_update_response(self):
new_job = self.make_job(
task_id=self.fully_qualified_test_name(), process=self.process_public.identifier, service=None,
status=Status.CREATED, progress=100, access=Visibility.PUBLIC,
status=Status.CREATED, progress=0, access=Visibility.PUBLIC,
inputs={"test": "data"}, outputs={"test": {"transmissionMode": ExecuteTransmissionMode.VALUE}},
subscribers={"successUri": "https://example.com/random"},
)

# check precondition job setup
Expand All @@ -1837,30 +1838,62 @@ def test_job_update_response(self):
body = {
"inputs": {"test": "modified", "new": 123},
"outputs": {"test": {"transmissionMode": ExecuteTransmissionMode.REFERENCE}},
"subscribers": {
"successUri": "https://example.com/success",
"failedUri": "https://example.com/failed",
},
}
headers = {
"Accept": ContentType.APP_JSON,
"Content-Type": ContentType.APP_JSON,
"Prefer": f"return={ExecuteReturnPreference.REPRESENTATION}; wait=5",
}
resp = self.app.patch(path, params=body, headers=headers)
resp = self.app.patch_json(path, params=body, headers=headers)
assert resp.status_code == 204

# validate changes applied and resolved accordingly
path = f"/jobs/{new_job.id}/inputs"
resp = self.app.get(path, headers=self.json_headers)
assert resp.status_code == 200
assert resp.json["inputs"] == {"test": "modified", "new": 123}
assert resp.json["outputs"] == {"test": {"transmissionMode": ExecuteTransmissionMode.REFERENCE}}
assert resp.json["subscribers"] == {
"successUri": "https://example.com/success",
"failedUri": "https://example.com/failed",
}
assert resp.json["headers"] == {
"Accept": None,
"Accept-Language": None,
"Content-Type": None,
"Prefer": f"return={ExecuteReturnPreference.MINIMAL}",
"Prefer": f"return={ExecuteReturnPreference.REPRESENTATION}; wait=5",
"X-WPS-Output-Context": "public"
}
assert resp.json["mode"] == ExecuteMode.SYNC, "Should have been modified from 'wait' preference."
assert resp.json["response"] == ExecuteResponse.RAW, "Should have been modified from 'return' preference."

@pytest.mark.oap_part4
def test_job_update_response_process_disallowed(self):
proc_id = self.fully_qualified_test_name()
process = WpsTestProcess(identifier=proc_id)
process = Process.from_wps(process)
process["processDescriptionURL"] = f"https://localhost/processes/{proc_id}"
self.process_store.save_process(process)

new_job = self.make_job(
task_id=self.fully_qualified_test_name(), process=proc_id, service=None,
status=Status.CREATED, progress=0, access=Visibility.PUBLIC,
)

path = f"/jobs/{new_job.id}"
body = {"process": "https://localhost/processes/random"}
resp = self.app.patch_json(path, params=body, headers=self.json_headers, expect_errors=True)
assert resp.status_code == 400
assert resp.json["cause"] == {"name": "process", "in": "body"}
assert resp.json["value"] == {
"body.process": "https://localhost/processes/random",
"job.process": f"https://localhost/processes/{proc_id}",
}

@pytest.mark.oap_part4
def test_job_status_alt_openeo_accept_response(self):
"""
Expand Down
99 changes: 98 additions & 1 deletion weaver/processes/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
HTTPAccepted,
HTTPBadRequest,
HTTPCreated,
HTTPException,
HTTPNotAcceptable,
HTTPUnprocessableEntity,
HTTPUnsupportedMediaType
Expand All @@ -28,6 +29,7 @@
ExecuteControlOption,
ExecuteMode,
parse_prefer_header_execute_mode,
parse_prefer_header_return,
update_preference_applied_return_header
)
from weaver.formats import AcceptLanguage, ContentType, clean_media_type_format, map_cwl_media_type, repr_json
Expand All @@ -43,6 +45,7 @@
ows2json_output_data
)
from weaver.processes.types import ProcessType
from weaver.processes.utils import get_process
from weaver.status import JOB_STATUS_CATEGORIES, Status, StatusCategory, map_status
from weaver.store.base import StoreJobs, StoreProcesses
from weaver.utils import (
Expand Down Expand Up @@ -943,7 +946,101 @@ def update_job_parameters(job, request):
body = validate_job_json(request)
body = validate_job_schema(body, sd.PatchJobBodySchema)

raise NotImplementedError # FIXME: implement
value = field = loc = None
job_process = get_process(job.process)
try:
loc = "body"
if "process" in body:
# note: don't use 'get_process' for input process, as it might not even exist!
req_process_url = body["process"]
req_process_id = body["process"].rsplit("/processes/", 1)[-1]
if req_process_id != job_process.id or req_process_url != job_process.processDescriptionURL:
raise HTTPBadRequest(
json=sd.ErrorJsonResponseBodySchema(schema_include=True).deserialize({
"type": "InvalidJobUpdate",
"title": "Invalid Job Execution Update",
"detail": "Update of the reference process for the job execution is not permitted.",
"status": HTTPBadRequest.code,
"cause": {"name": "process", "in": loc},
"value": repr_json({
"body.process": body["process"],
"job.process": job_process.processDescriptionURL,
}, force_string=False),
})
)

for node in sd.PatchJobBodySchema().children:
field = node.name
if not field or field not in body:
continue
if field in ["subscribers", "notification_email"]:
continue # will be handled simultaneously after

value = body[field] # type: ignore
if node.name in job:
setattr(job, field, value)
elif f"execution_{field}" in job:
field = f"execution_{field}"
if field == "execution_mode" and value in [ExecuteMode.ASYNC, ExecuteMode.SYNC]:
job_ctrl_exec = ExecuteControlOption.get(f"{value}-execute")
if job_ctrl_exec not in job_process.jobControlOptions:
raise HTTPBadRequest(
json=sd.ErrorJsonResponseBodySchema(schema_include=True).deserialize({
"type": "InvalidJobUpdate",
"title": "Invalid Job Execution Update",
"detail": "Update of job execution mode is not permitted by process jobControlOptions.",
"status": HTTPBadRequest.code,
"cause": {"name": "mode", "in": loc},
"value": repr_json(
{
"process.jobControlOptions": job_process.jobControlOptions,
"job.mode": job_ctrl_exec,
}, force_string=False
),
})
)

setattr(job, field, value)

settings = get_settings(request)
subscribers = map_job_subscribers(body, settings=settings)
if not subscribers and body.get("subscribers") == {}:
subscribers = {} # asking to remove all subscribers explicitly
if subscribers is not None:
job.subscribers = subscribers

# for both 'mode' and 'response'
# if provided both in body and corresponding 'Prefer' header parameter, the body parameter takes precedence
# however, if provided only in header, allow override of the body parameter considered as "higher priority"
loc = "header"
if "mode" not in body:
mode, wait, _ = parse_prefer_header_execute_mode(request.headers, job_process.jobControlOptions)
job.execution_mode = mode
job.execution_wait = wait
if "response" in body:
job_return = parse_prefer_header_return(request.headers)
if job_return:
job.execution_return = job_return

except HTTPException:
raise
except ValueError as exc:
raise HTTPUnprocessableEntity(
json=sd.ErrorJsonResponseBodySchema(schema_include=True).deserialize({
"type": "InvalidJobUpdate",
"title": "Invalid Job Execution Update",
"detail": "Could not update the job execution definition using specified parameters.",
"status": HTTPUnprocessableEntity.code,
"error": type(exc),
"cause": {"name": field, "in": loc},
"value": repr_json(value, force_string=False),
})
)

LOGGER.info("Updating %s", job)
db = get_db(request)
store = db.get_store(StoreJobs)
store.update_job(job)


def validate_job_json(request):
Expand Down
1 change: 1 addition & 0 deletions weaver/typedefs.py
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,7 @@ class CWL_SchemaName(Protocol):
}, total=True)

ProcessExecution = TypedDict("ProcessExecution", {
"process": NotRequired[str],
"status": NotRequired[Literal["create"]],
"mode": NotRequired[AnyExecuteMode],
"response": NotRequired[AnyExecuteResponse],
Expand Down
44 changes: 37 additions & 7 deletions weaver/wps_restapi/swagger_definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4210,6 +4210,13 @@ class Execute(ExecuteInputOutputs):
"value": EXAMPLES["job_execute.json"],
},
}
process = ProcessURL(
missing=drop,
description=(
"Process reference to be executed. "
"This parameter is required if the process cannot be inferred from the request endpoint."
),
)
status = JobStatusCreate(
description=(
"Status to request creation of the job without submitting it to processing queue "
Expand Down Expand Up @@ -6061,9 +6068,13 @@ class JobExecuteHeaders(ExtendedMappingSchema):


class JobInputsBody(ExecuteInputOutputs):
# note:
# following definitions do not employ 'missing=drop' to explicitly indicate the fields
# this makes it easier to consider everything that could be implied when executing the job
mode = JobExecuteModeEnum(default=ExecuteMode.AUTO)
response = JobResponseOptionsEnum(default=None)
headers = JobExecuteHeaders(missing={})
subscribers = JobExecuteSubscribers(missing={})
links = LinkList(missing=drop)


Expand Down Expand Up @@ -6584,13 +6595,25 @@ class PostProcessJobsEndpointXML(PostJobsEndpointXML, LocalProcessPath):


class PatchJobBodySchema(Execute):
description = "Execution request parameters to be updated."
description = "Execution request contents to be updated."
# all parameters that are not 'missing=drop' must be added to allow partial update
inputs = ExecuteInputValues(missing=drop, description="Input values or references to be updated.")
outputs = ExecuteOutputSpec(missing=drop, description="Output format and transmission mode to be updated.")


class PatchJobEndpoint(JobPath):
summary = "Execution request parameters to be updated."
description = (
"Execution request parameters to be updated. "
"If parameters are omitted, they will be left unmodified. "
"If provided, parameters will override existing definitions integrally. "
"Therefore, if only a partial update of certain nested elements in a mapping or list is desired, "
"all elements under the corresponding parameters must be resubmitted entirely with the applied changes. "
"In the case of certain parameters, equivalent definitions can cause conflicting definitions between "
"headers and contents "
f"(see for more details: {DOC_URL}/processes.html#execution-body and {DOC_URL}/processes.html#execution-mode). "
"To verify the resulting parameterization of any pending job, consider using the `GET /jobs/{jobId}/inputs`."
)
header = JobExecuteHeaders()
querystring = LocalProcessQuery()
body = PatchJobBodySchema()
Expand Down Expand Up @@ -6895,11 +6918,6 @@ def __deepcopy__(self, *args, **kwargs):
return GenericHTMLResponse(name=self.name, description=self.description, children=self.children)


class ErrorDetail(ExtendedMappingSchema):
code = ExtendedSchemaNode(Integer(), description="HTTP status code.", example=400)
status = ExtendedSchemaNode(String(), description="HTTP status detail.", example="400 Bad Request")


class OWSErrorCode(ExtendedSchemaNode):
schema_type = String
example = "InvalidParameterValue"
Expand All @@ -6918,6 +6936,18 @@ class OWSExceptionResponse(ExtendedMappingSchema):
description="Specific description of the error.")


class ErrorDetail(ExtendedMappingSchema):
code = ExtendedSchemaNode(Integer(), description="HTTP status code.", example=400)
status = ExtendedSchemaNode(String(), description="HTTP status detail.", example="400 Bad Request")


class ErrorSource(OneOfKeywordSchema):
_one_of = [
ExtendedSchemaNode(String(), description="Error name or description."),
ErrorDetail(description="Detailed error representation.")
]


class ErrorCause(OneOfKeywordSchema):
_one_of = [
ExtendedSchemaNode(String(), description="Error message from exception or cause of failure."),
Expand All @@ -6934,7 +6964,7 @@ class ErrorJsonResponseBodySchema(ExtendedMappingSchema):
status = ExtendedSchemaNode(Integer(), description="Error status code.", example=500, missing=drop)
cause = ErrorCause(missing=drop)
value = ErrorCause(missing=drop)
error = ErrorDetail(missing=drop)
error = ErrorSource(missing=drop)
instance = URI(missing=drop)
exception = OWSExceptionResponse(missing=drop)

Expand Down

0 comments on commit c587dac

Please sign in to comment.