Skip to content

Commit

Permalink
Optionnally provide request HTTP headers to processors
Browse files Browse the repository at this point in the history
  • Loading branch information
vprivat-ads committed Jan 20, 2025
1 parent fb642dd commit b9b06a7
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 9 deletions.
3 changes: 2 additions & 1 deletion pygeoapi/api/processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,8 @@ def execute_process(api: API, request: APIRequest,
process_id, data_dict, execution_mode=execution_mode,
requested_outputs=requested_outputs,
subscriber=subscriber,
requested_response=requested_response)
requested_response=requested_response,
request_headers=request.headers)
job_id, mime_type, outputs, status, additional_headers = result
headers.update(additional_headers or {})

Expand Down
6 changes: 5 additions & 1 deletion pygeoapi/process/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def __init__(self, processor_def: dict, process_metadata: dict):
self.name = processor_def['name']
self.metadata = process_metadata
self.supports_outputs = False
self.supports_request_headers = False

def set_job_id(self, job_id: str) -> None:
"""
Expand All @@ -70,7 +71,8 @@ def set_job_id(self, job_id: str) -> None:

pass

def execute(self, data: dict, outputs: Optional[dict] = None
def execute(self, data: dict, outputs: Optional[dict] = None,
request_headers: Optional[dict] = None
) -> Tuple[str, Any]:
"""
execute the process
Expand All @@ -81,6 +83,8 @@ def execute(self, data: dict, outputs: Optional[dict] = None
required outputs - defaults to all outputs.
The value of any key may be an object and include the
property `transmissionMode` - defaults to `value`.
:param request_headers: `dict` optionally specifying the headers from
the request
:returns: tuple of MIME type and process response
(string or bytes, or dict)
"""
Expand Down
26 changes: 20 additions & 6 deletions pygeoapi/process/manager/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def __init__(self, manager_def: dict):
self.name = manager_def['name']
self.is_async = False
self.supports_subscribing = False
self.supports_request_headers = False
self.connection = manager_def.get('connection')
self.output_dir = manager_def.get('output_dir')

Expand Down Expand Up @@ -195,7 +196,8 @@ def _execute_handler_async(self, p: BaseProcessor, job_id: str,
data_dict: dict,
requested_outputs: Optional[dict] = None,
subscriber: Optional[Subscriber] = None,
requested_response: Optional[RequestedResponse] = RequestedResponse.raw.value # noqa
requested_response: Optional[RequestedResponse] = RequestedResponse.raw.value, # noqa
request_headers: Optional[dict] = None
) -> Tuple[str, None, JobStatus]:
"""
This private execution handler executes a process in a background
Expand All @@ -216,13 +218,15 @@ def _execute_handler_async(self, p: BaseProcessor, job_id: str,
:param subscriber: optional `Subscriber` specifying callback URLs
:param requested_response: `RequestedResponse` optionally specifying
raw or document (default is `raw`)
:param request_headers: `dict` optionally specifying the headers from
the request
:returns: tuple of None (i.e. initial response payload)
and JobStatus.accepted (i.e. initial job status)
"""

args = (p, job_id, data_dict, requested_outputs, subscriber,
requested_response)
requested_response, request_headers)

_process = dummy.Process(target=self._execute_handler_sync, args=args)
_process.start()
Expand All @@ -233,7 +237,8 @@ def _execute_handler_sync(self, p: BaseProcessor, job_id: str,
data_dict: dict,
requested_outputs: Optional[dict] = None,
subscriber: Optional[Subscriber] = None,
requested_response: Optional[RequestedResponse] = RequestedResponse.raw.value # noqa
requested_response: Optional[RequestedResponse] = RequestedResponse.raw.value, # noqa
request_headers: Optional[dict] = None
) -> Tuple[str, Any, JobStatus]:
"""
Synchronous execution handler
Expand All @@ -255,16 +260,20 @@ def _execute_handler_sync(self, p: BaseProcessor, job_id: str,
:param subscriber: optional `Subscriber` specifying callback URLs
:param requested_response: `RequestedResponse` optionally specifying
raw or document (default is `raw`)
:param request_headers: `dict` optionally specifying the headers from
the request
:returns: tuple of MIME type, response payload and status
"""

extra_execute_parameters = {}

# only pass requested_outputs if supported,
# only pass requested_outputs and request_headers if supported,
# otherwise this breaks existing processes
if p.supports_outputs:
extra_execute_parameters['outputs'] = requested_outputs
if p.supports_request_headers:
extra_execute_parameters['request_headers'] = request_headers

self._send_in_progress_notification(subscriber)

Expand Down Expand Up @@ -358,7 +367,8 @@ def execute_process(
execution_mode: Optional[RequestedProcessExecutionMode] = None,
requested_outputs: Optional[dict] = None,
subscriber: Optional[Subscriber] = None,
requested_response: Optional[RequestedResponse] = RequestedResponse.raw.value # noqa
requested_response: Optional[RequestedResponse] = RequestedResponse.raw.value, # noqa
request_headers: Optional[dict] = None
) -> Tuple[str, Any, JobStatus, Optional[Dict[str, str]]]:
"""
Default process execution handler
Expand All @@ -377,6 +387,8 @@ def execute_process(
:param subscriber: `Subscriber` optionally specifying callback urls
:param requested_response: `RequestedResponse` optionally specifying
raw or document (default is `raw`)
:param request_headers: `dict` optionally specifying the headers from
the request
:raises UnknownProcessError: if the input process_id does not
Expand Down Expand Up @@ -442,10 +454,12 @@ def execute_process(
}
self.add_job(job_metadata)

# only pass subscriber if supported, otherwise this breaks
# only pass subscriber and headers if supported, otherwise this breaks
# existing managers
if self.supports_subscribing:
extra_execute_handler_parameters['subscriber'] = subscriber
if self.supports_request_headers:
extra_execute_handler_parameters['request_headers'] = request_headers # noqa

# TODO: handler's response could also be allowed to include more HTTP
# headers
Expand Down
5 changes: 4 additions & 1 deletion pygeoapi/process/manager/dummy.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ def execute_process(
execution_mode: Optional[RequestedProcessExecutionMode] = None,
requested_outputs: Optional[dict] = None,
subscriber: Optional[Subscriber] = None,
requested_response: Optional[RequestedResponse] = RequestedResponse.raw.value # noqa
requested_response: Optional[RequestedResponse] = RequestedResponse.raw.value, # noqa
request_headers: Optional[dict] = None
) -> Tuple[str, str, Any, JobStatus, Optional[Dict[str, str]]]:
"""
Default process execution handler
Expand All @@ -95,6 +96,8 @@ def execute_process(
:param subscriber: `Subscriber` optionally specifying callback urls
:param requested_response: `RequestedResponse` optionally specifying
raw or document (default is `raw`)
:param request_headers: `dict` optionally specifying the headers from
the request
:raises UnknownProcessError: if the input process_id does not
correspond to a known process
Expand Down
1 change: 1 addition & 0 deletions pygeoapi/process/manager/mongodb_.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def __init__(self, manager_def):
super().__init__(manager_def)
self.is_async = True
self.supports_subscribing = True
self.supports_request_headers = True

def _connect(self):
try:
Expand Down
1 change: 1 addition & 0 deletions pygeoapi/process/manager/postgresql.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def __init__(self, manager_def: dict):
self.is_async = True
self.id_field = 'identifier'
self.supports_subscribing = True
self.supports_request_headers = True
self.connection = manager_def['connection']

try:
Expand Down
1 change: 1 addition & 0 deletions pygeoapi/process/manager/tinydb_.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def __init__(self, manager_def: dict):
super().__init__(manager_def)
self.is_async = True
self.supports_subscribing = True
self.supports_request_headers = True

@contextmanager
def _db(self):
Expand Down

0 comments on commit b9b06a7

Please sign in to comment.