Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

123: Support process arrays #166

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Deprecated

- ([#123](https://github.com/stac-utils/stac-task/issues/123)) Bare `ProcessDefinition`
objects are deprecated in favor of arrays of `ProcessDefinition` objects.

## [0.6.0]

### ⚠️ Breaking Change
Expand Down
143 changes: 81 additions & 62 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,20 @@
- [collections](#collections)
- [tasks](#tasks)
- [TaskConfig Object](#taskconfig-object)
- [Full Process Definition Example](#full-process-definition-example)
- [Full ProcessDefinition Example](#full-processdefinition-example)
- [Migration](#migration)
- [0.4.x -\> 0.5.x](#04x---05x)
- [0.5.x -\> 0.6.0](#05x---060)
- [Development](#development)
- [Contributing](#contributing)

This Python library consists of the Task class, which is used to create custom tasks based
on a "STAC In, STAC Out" approach. The Task class acts as wrapper around custom code and provides
several convenience methods for modifying STAC Items, creating derived Items, and providing a CLI.
This Python library consists of the Task class, which is used to create custom tasks
based on a "STAC In, STAC Out" approach. The Task class acts as wrapper around custom
code and provides several convenience methods for modifying STAC Items, creating derived
Items, and providing a CLI.

This library is based on a [branch of cirrus-lib](https://github.com/cirrus-geo/cirrus-lib/tree/features/task-class) except aims to be more generic.
This library is based on a [branch of cirrus-lib](https://github.com/cirrus-geo/cirrus-lib/tree/features/task-class)
except aims to be more generic.

## Quickstart for Creating New Tasks

Expand Down Expand Up @@ -59,25 +62,33 @@ class MyTask(Task):

## Task Input

| Field Name | Type | Description |
| ---------- | ----------------- | ------------------------- |
| type | string | Must be FeatureCollection |
| features | [Item] | A list of STAC `Item` |
| process | ProcessDefinition | A Process Definition |
Task input is often referred to as a 'payload'.

| Field Name | Type | Description |
| ---------- | ------------------------- | --------------------------------------------------- |
| type | string | Must be FeatureCollection |
| features | [Item] | An array of STAC Items |
| process | [`ProcessDefinition`] | An array of `ProcessDefinition` objects. |
| ~~process~~ | ~~`ProcessDefinition`~~ | **DEPRECATED** A `ProcessDefinition` object |

### ProcessDefinition Object

A STAC task can be provided additional configuration via the 'process' field in the input
ItemCollection.
A Task can be provided additional configuration via the 'process' field in the input
payload.

| Field Name | Type | Description |
| -------------- | ------------------ | ---------------------------------------------- |
| description | string | Description of the process configuration |
| upload_options | `UploadOptions` | An `UploadOptions` object |
| tasks | Map<str, Map> | Dictionary of task configurations. |
| ~~tasks~~ | ~~[`TaskConfig`]~~ | **DEPRECATED** A list of `TaskConfig` objects. |

| Field Name | Type | Description |
| -------------- | ------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| description | string | Optional description of the process configuration |
| upload_options | UploadOptions | Options used when uploading assets to a remote server |
| tasks | Map<str, Map> | Dictionary of task configurations. A list of [task configurations](#taskconfig-object) is supported for backwards compatibility reasons, but a dictionary should be preferred. |

#### UploadOptions Object

Options used when uploading Item assets to a remote server can be specified in a
'upload_options' field in the `ProcessDefinition` object.

| Field Name | Type | Description |
| ------------- | ------------- | --------------------------------------------------------------------------------------- |
| path_template | string | **REQUIRED** A string template for specifying the location of uploaded assets |
Expand All @@ -88,16 +99,19 @@ ItemCollection.

##### path_template

The path_template string is a way to control the output location of uploaded assets from a STAC Item using metadata from the Item itself.
The template can contain fixed strings along with variables used for substitution.
See [the PySTAC documentation for `LayoutTemplate`](https://pystac.readthedocs.io/en/stable/api/layout.html#pystac.layout.LayoutTemplate) for a list of supported template variables and their meaning.
The 'path_template' string is a way to control the output location of uploaded assets
from a STAC Item using metadata from the Item itself. The template can contain fixed
strings along with variables used for substitution. See [the PySTAC documentation for
`LayoutTemplate`](https://pystac.readthedocs.io/en/stable/api/layout.html#pystac.layout.LayoutTemplate)
for a list of supported template variables and their meaning.

##### collections

The collections dictionary provides a collection ID and JSONPath pattern for matching against STAC Items.
At the end of processing, before the final STAC Items are returned, the Task class can be used to assign
all of the Items to specific collection IDs. For each Item the JSONPath pattern for all collections will be
compared. The first match will cause the Item's Collection ID to be set to the provided value.
The 'collections' dictionary provides a collection ID and JSONPath pattern for matching
against STAC Items. At the end of processing, before the final STAC Items are returned,
the Task class can be used to assign all of the Items to specific collection IDs. For
each Item the JSONPath pattern for all collections will be compared. The first match
will cause the Item's Collection ID to be set to the provided value.

For example:

Expand All @@ -107,15 +121,18 @@ For example:
}
```

In this example, the task will set any STAC Items that have an ID beginning with "LC08" to the `landsat-c2l2` collection.
In this example, the task will set any STAC Items that have an ID beginning with "LC08"
to the `landsat-c2l2` collection.

See [JSONPath Online Evaluator](https://jsonpath.com) to experiment with JSONPath and [regex101](https://regex101.com) to experiment with regex.
See [JSONPath Online Evaluator](https://jsonpath.com) to experiment with JSONPath and
[regex101](https://regex101.com) to experiment with regex.

#### tasks

The tasks field is a dictionary with an optional key for each task. If present, it contains
a dictionary that is converted to a set of keywords and passed to the Task's `process` function.
The documentation for each task will provide the list of available parameters.
The 'tasks' field is a dictionary with an optional key for each task. If present, it
contains a dictionary that is converted to a set of keywords and passed to the Task's
`process` function. The documentation for each Task will provide the list of available
parameters.

```json
{
Expand All @@ -130,32 +147,32 @@ The documentation for each task will provide the list of available parameters.
}
```

In the example above a task named `task-a` would have the `param1=value1` passed as a keyword, while `task-c`
would have `param2=value2` passed. If there were a `task-b` to be run it would not be passed any keywords.
In the example above, a task named `task-a` would have the `param1=value1` passed as a
keyword, while `task-c` would have `param2=value2` passed. If there were a `task-b` to
be run, it would not be passed any keywords.

#### TaskConfig Object

**DEPRECATED**: `tasks` should be a dictionary of parameters, with task names as keys. See [tasks](#tasks) for more information.
**DEPRECATED** The 'tasks' field _should_ be a dictionary of parameters, with task names
as keys. See [tasks](#tasks) for more information. `TaskConfig` objects are supported
for backwards compatibility.

A Task Configuration contains information for running a specific task.
| Field Name | Type | Description |
| ---------- | ------------- | ----------------------------------------------------------------------------------- |
| name | str | **REQUIRED** Name of the task |
| parameters | Map<str, str> | Dictionary of keyword parameters that will be passed to the Task `process` function |

| Field Name | Type | Description |
| ---------- | ------------- | ------------------------------------------------------------------------------------ |
| name | str | **REQUIRED** Name of the task |
| parameters | Map<str, str> | Dictionary of keyword parameters that will be passed to the Tasks `process` function |

## Full Process Definition Example

Process definitions are sometimes called "Payloads":
### Full ProcessDefinition Example

```json
{
"description": "My process configuration",
"collections": {
"landsat-c2l2": "$[?(@.id =~ 'LC08.*')]"
},
"upload_options": {
"path_template": "s3://my-bucket/${collection}/${year}/${month}/${day}/${id}"
"path_template": "s3://my-bucket/${collection}/${year}/${month}/${day}/${id}",
"collections": {
"landsat-c2l2": "$[?(@.id =~ 'LC08.*')]"
}
},
"tasks": {
"task-name": {
Expand All @@ -169,13 +186,13 @@ Process definitions are sometimes called "Payloads":

### 0.4.x -> 0.5.x

In 0.5.0, the previous use of fsspec to download Item Assets has been replaced with
the stac-asset library. This has necessitated a change in the parameters
that the download methods accept.
In 0.5.0, the previous use of fsspec to download Item Assets has been replaced with the
stac-asset library. This has necessitated a change in the parameters that the download
methods accept.

The primary change is that the Task methods `download_item_assets` and
`download_items_assets` (items plural) now accept fewer explicit and implicit
(kwargs) parameters.
`download_items_assets` (items plural) now accept fewer explicit and implicit (kwargs)
parameters.

Previously, the methods looked like:

Expand Down Expand Up @@ -225,8 +242,9 @@ async def download_item_assets(
) -> Item:
```

Additionally, `kwargs` keys were set to pass configuration through to fsspec. The most common
parameter was `requester_pays`, to set the Requester Pays flag in AWS S3 requests.
Additionally, `kwargs` keys were set to pass configuration through to fsspec. The most
common parameter was `requester_pays`, to set the Requester Pays flag in AWS S3
requests.

Many of these parameters can be directly translated into configuration passed in a
`DownloadConfig` object, which is just a wrapper over the `stac_asset.Config` object.
Expand All @@ -239,17 +257,16 @@ Migration of these various parameters to `DownloadConfig` are as follows:
`FileNameStrategy.FILE_NAME` if True or `FileNameStrategy.KEY` if False
- `overwrite`: set `overwrite`
- `save_item`: none, Item is always saved
- `absolute_path`: none. To create or retrieve the Asset hrefs as absolute paths, use either
`Item#make_all_asset_hrefs_absolute()` or `Asset#get_absolute_href()`
- `absolute_path`: none. To create or retrieve the Asset hrefs as absolute paths, use
either `Item#make_all_asset_hrefs_absolute()` or `Asset#get_absolute_href()`

### 0.5.x -> 0.6.0

Previously, the `validate` method was a _classmethod_, validating the payload
argument passed. This has now been made an instance method, which validates
the `self._payload` copy of the payload, from which the `Task` instance is
constructed. This is behaviorally the same, in that construction will fail if
validation fails, but allows implementers to utilize the instance method's
convenience functions.
Previously, the `validate` method was a _classmethod_, validating the payload argument
passed. This has now been made an instance method, which validates the `self._payload`
copy of the payload, from which the `Task` instance is constructed. This is
behaviorally the same, in that construction will fail if validation fails, but allows
implementers to utilize the instance method's convenience functions.

Previous implementations of `validate` would have been similar to this:

Expand All @@ -270,12 +287,13 @@ And will now need to be updated to this form:

## Development

Clone, install in editable mode with development requirements, and install the **pre-commit** hooks:
Clone, install in editable mode with development and test requirements, and install the
**pre-commit** hooks:

```shell
git clone https://github.com/stac-utils/stac-task
cd stac-task
pip install -e '.[dev]'
pip install -e '.[dev,test]'
pre-commit install
```

Expand All @@ -293,4 +311,5 @@ pre-commit run --all-files

## Contributing

Use Github [issues](https://github.com/stac-utils/stac-task/issues) and [pull requests](https://github.com/stac-utils/stac-task/pulls).
Use Github [issues](https://github.com/stac-utils/stac-task/issues) and [pull
requests](https://github.com/stac-utils/stac-task/pulls).
31 changes: 26 additions & 5 deletions stactask/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ def __init__(
upload: bool = True,
validate: bool = True,
):

self._payload = payload

if not skip_validation and validate:
Expand Down Expand Up @@ -108,15 +107,37 @@ def __init__(

@property
def process_definition(self) -> dict[str, Any]:
process = self._payload.get("process", {})
process = self._payload.get("process", [])
if isinstance(process, dict):
warnings.warn(
(
"`process` as a bare dictionary will be unsupported in a future "
"version; wrap it in a list to remove this warning"
),
DeprecationWarning,
stacklevel=2,
)
return process
else:
raise ValueError(f"process is not a dict: {type(process)}")

if not isinstance(process, list):
raise TypeError("unable to parse `process`: must be type list")

if not process:
return {}

if not isinstance(process[0], dict):
raise TypeError(
(
"unable to parse `process`: the first element of the list must be "
"a dictionary"
)
)

return process[0]

@property
def parameters(self) -> dict[str, Any]:
task_configs = self.process_definition.get("tasks", [])
task_configs = self.process_definition.get("tasks", {})
if isinstance(task_configs, list):
warnings.warn(
"task configs is list, use a dictionary instead",
Expand Down
44 changes: 23 additions & 21 deletions tests/fixtures/sentinel2-l2a-j2k-payload.json
Original file line number Diff line number Diff line change
@@ -1,30 +1,32 @@
{
"type": "FeatureCollection",
"id": "sentinel-s2-l2a/workflow-test/S2B_17HQD_20201103_0_L2A",
"process": {
"input_collections": [
"sentinel-2-l2a"
],
"workflow": "cog-archive",
"upload_options": {
"path_template": "s3://sentinel-cogs/${collection}/${mgrs:utm_zone}/${mgrs:latitude_band}/${mgrs:grid_square}/${year}/${month}/${id}",
"public_assets": "ALL",
"collections": {
"sentinel-2-l2a": "$[?(@.id =~ 'S2[AB].*')]"
},
"headers": {
"CacheControl": "public, max-age=31536000, immutable"
}
},
"tasks": {
"nothing-task": {
"do_nothing": true
"process": [
{
"input_collections": [
"sentinel-2-l2a"
],
"workflow": "cog-archive",
"upload_options": {
"path_template": "s3://sentinel-cogs/${collection}/${mgrs:utm_zone}/${mgrs:latitude_band}/${mgrs:grid_square}/${year}/${month}/${id}",
"public_assets": "ALL",
"collections": {
"sentinel-2-l2a": "$[?(@.id =~ 'S2[AB].*')]"
},
"headers": {
"CacheControl": "public, max-age=31536000, immutable"
}
},
"derived-item-task": {
"parameter": "value"
"tasks": {
"nothing-task": {
"do_nothing": true
},
"derived-item-task": {
"parameter": "value"
}
}
}
},
],
"features": [
{
"type": "Feature",
Expand Down
Loading