diff --git a/CHANGELOG.md b/CHANGELOG.md index 5c6cdb6..83ce97b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,13 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). +## [Unreleased] + +### Deprecated + +- ([#123](https://github.com/stac-utils/stac-task/issues/123)) Bare `ProcessDefinition` + objects are deprecated in favor of arrays of `ProcessDefinition` objects. + ## [0.6.0] ### ⚠️ Breaking Change diff --git a/README.md b/README.md index 5cee28e..b255702 100644 --- a/README.md +++ b/README.md @@ -15,17 +15,20 @@ - [collections](#collections) - [tasks](#tasks) - [TaskConfig Object](#taskconfig-object) -- [Full Process Definition Example](#full-process-definition-example) + - [Full ProcessDefinition Example](#full-processdefinition-example) - [Migration](#migration) - [0.4.x -\> 0.5.x](#04x---05x) + - [0.5.x -\> 0.6.0](#05x---060) - [Development](#development) - [Contributing](#contributing) -This Python library consists of the Task class, which is used to create custom tasks based -on a "STAC In, STAC Out" approach. The Task class acts as wrapper around custom code and provides -several convenience methods for modifying STAC Items, creating derived Items, and providing a CLI. +This Python library consists of the Task class, which is used to create custom tasks +based on a "STAC In, STAC Out" approach. The Task class acts as wrapper around custom +code and provides several convenience methods for modifying STAC Items, creating derived +Items, and providing a CLI. -This library is based on a [branch of cirrus-lib](https://github.com/cirrus-geo/cirrus-lib/tree/features/task-class) except aims to be more generic. +This library is based on a [branch of cirrus-lib](https://github.com/cirrus-geo/cirrus-lib/tree/features/task-class) +except aims to be more generic. ## Quickstart for Creating New Tasks @@ -59,25 +62,33 @@ class MyTask(Task): ## Task Input -| Field Name | Type | Description | -| ---------- | ----------------- | ------------------------- | -| type | string | Must be FeatureCollection | -| features | [Item] | A list of STAC `Item` | -| process | ProcessDefinition | A Process Definition | +Task input is often referred to as a 'payload'. + +| Field Name | Type | Description | +| ---------- | ------------------------- | --------------------------------------------------- | +| type | string | Must be FeatureCollection | +| features | [Item] | An array of STAC Items | +| process | [`ProcessDefinition`] | An array of `ProcessDefinition` objects. | +| ~~process~~ | ~~`ProcessDefinition`~~ | **DEPRECATED** A `ProcessDefinition` object | ### ProcessDefinition Object -A STAC task can be provided additional configuration via the 'process' field in the input -ItemCollection. +A Task can be provided additional configuration via the 'process' field in the input +payload. + +| Field Name | Type | Description | +| -------------- | ------------------ | ---------------------------------------------- | +| description | string | Description of the process configuration | +| upload_options | `UploadOptions` | An `UploadOptions` object | +| tasks | Map | Dictionary of task configurations. | +| ~~tasks~~ | ~~[`TaskConfig`]~~ | **DEPRECATED** A list of `TaskConfig` objects. | -| Field Name | Type | Description | -| -------------- | ------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | -| description | string | Optional description of the process configuration | -| upload_options | UploadOptions | Options used when uploading assets to a remote server | -| tasks | Map | Dictionary of task configurations. A list of [task configurations](#taskconfig-object) is supported for backwards compatibility reasons, but a dictionary should be preferred. | #### UploadOptions Object +Options used when uploading Item assets to a remote server can be specified in a +'upload_options' field in the `ProcessDefinition` object. + | Field Name | Type | Description | | ------------- | ------------- | --------------------------------------------------------------------------------------- | | path_template | string | **REQUIRED** A string template for specifying the location of uploaded assets | @@ -88,16 +99,19 @@ ItemCollection. ##### path_template -The path_template string is a way to control the output location of uploaded assets from a STAC Item using metadata from the Item itself. -The template can contain fixed strings along with variables used for substitution. -See [the PySTAC documentation for `LayoutTemplate`](https://pystac.readthedocs.io/en/stable/api/layout.html#pystac.layout.LayoutTemplate) for a list of supported template variables and their meaning. +The 'path_template' string is a way to control the output location of uploaded assets +from a STAC Item using metadata from the Item itself. The template can contain fixed +strings along with variables used for substitution. See [the PySTAC documentation for +`LayoutTemplate`](https://pystac.readthedocs.io/en/stable/api/layout.html#pystac.layout.LayoutTemplate) +for a list of supported template variables and their meaning. ##### collections -The collections dictionary provides a collection ID and JSONPath pattern for matching against STAC Items. -At the end of processing, before the final STAC Items are returned, the Task class can be used to assign -all of the Items to specific collection IDs. For each Item the JSONPath pattern for all collections will be -compared. The first match will cause the Item's Collection ID to be set to the provided value. +The 'collections' dictionary provides a collection ID and JSONPath pattern for matching +against STAC Items. At the end of processing, before the final STAC Items are returned, +the Task class can be used to assign all of the Items to specific collection IDs. For +each Item the JSONPath pattern for all collections will be compared. The first match +will cause the Item's Collection ID to be set to the provided value. For example: @@ -107,15 +121,18 @@ For example: } ``` -In this example, the task will set any STAC Items that have an ID beginning with "LC08" to the `landsat-c2l2` collection. +In this example, the task will set any STAC Items that have an ID beginning with "LC08" +to the `landsat-c2l2` collection. -See [JSONPath Online Evaluator](https://jsonpath.com) to experiment with JSONPath and [regex101](https://regex101.com) to experiment with regex. +See [JSONPath Online Evaluator](https://jsonpath.com) to experiment with JSONPath and +[regex101](https://regex101.com) to experiment with regex. #### tasks -The tasks field is a dictionary with an optional key for each task. If present, it contains -a dictionary that is converted to a set of keywords and passed to the Task's `process` function. -The documentation for each task will provide the list of available parameters. +The 'tasks' field is a dictionary with an optional key for each task. If present, it +contains a dictionary that is converted to a set of keywords and passed to the Task's +`process` function. The documentation for each Task will provide the list of available +parameters. ```json { @@ -130,32 +147,32 @@ The documentation for each task will provide the list of available parameters. } ``` -In the example above a task named `task-a` would have the `param1=value1` passed as a keyword, while `task-c` -would have `param2=value2` passed. If there were a `task-b` to be run it would not be passed any keywords. +In the example above, a task named `task-a` would have the `param1=value1` passed as a +keyword, while `task-c` would have `param2=value2` passed. If there were a `task-b` to +be run, it would not be passed any keywords. #### TaskConfig Object -**DEPRECATED**: `tasks` should be a dictionary of parameters, with task names as keys. See [tasks](#tasks) for more information. +**DEPRECATED** The 'tasks' field _should_ be a dictionary of parameters, with task names +as keys. See [tasks](#tasks) for more information. `TaskConfig` objects are supported +for backwards compatibility. -A Task Configuration contains information for running a specific task. +| Field Name | Type | Description | +| ---------- | ------------- | ----------------------------------------------------------------------------------- | +| name | str | **REQUIRED** Name of the task | +| parameters | Map | Dictionary of keyword parameters that will be passed to the Task `process` function | -| Field Name | Type | Description | -| ---------- | ------------- | ------------------------------------------------------------------------------------ | -| name | str | **REQUIRED** Name of the task | -| parameters | Map | Dictionary of keyword parameters that will be passed to the Tasks `process` function | -## Full Process Definition Example - -Process definitions are sometimes called "Payloads": +### Full ProcessDefinition Example ```json { "description": "My process configuration", - "collections": { - "landsat-c2l2": "$[?(@.id =~ 'LC08.*')]" - }, "upload_options": { - "path_template": "s3://my-bucket/${collection}/${year}/${month}/${day}/${id}" + "path_template": "s3://my-bucket/${collection}/${year}/${month}/${day}/${id}", + "collections": { + "landsat-c2l2": "$[?(@.id =~ 'LC08.*')]" + } }, "tasks": { "task-name": { @@ -169,13 +186,13 @@ Process definitions are sometimes called "Payloads": ### 0.4.x -> 0.5.x -In 0.5.0, the previous use of fsspec to download Item Assets has been replaced with -the stac-asset library. This has necessitated a change in the parameters -that the download methods accept. +In 0.5.0, the previous use of fsspec to download Item Assets has been replaced with the +stac-asset library. This has necessitated a change in the parameters that the download +methods accept. The primary change is that the Task methods `download_item_assets` and -`download_items_assets` (items plural) now accept fewer explicit and implicit -(kwargs) parameters. +`download_items_assets` (items plural) now accept fewer explicit and implicit (kwargs) +parameters. Previously, the methods looked like: @@ -225,8 +242,9 @@ async def download_item_assets( ) -> Item: ``` -Additionally, `kwargs` keys were set to pass configuration through to fsspec. The most common -parameter was `requester_pays`, to set the Requester Pays flag in AWS S3 requests. +Additionally, `kwargs` keys were set to pass configuration through to fsspec. The most +common parameter was `requester_pays`, to set the Requester Pays flag in AWS S3 +requests. Many of these parameters can be directly translated into configuration passed in a `DownloadConfig` object, which is just a wrapper over the `stac_asset.Config` object. @@ -239,17 +257,16 @@ Migration of these various parameters to `DownloadConfig` are as follows: `FileNameStrategy.FILE_NAME` if True or `FileNameStrategy.KEY` if False - `overwrite`: set `overwrite` - `save_item`: none, Item is always saved -- `absolute_path`: none. To create or retrieve the Asset hrefs as absolute paths, use either - `Item#make_all_asset_hrefs_absolute()` or `Asset#get_absolute_href()` +- `absolute_path`: none. To create or retrieve the Asset hrefs as absolute paths, use + either `Item#make_all_asset_hrefs_absolute()` or `Asset#get_absolute_href()` ### 0.5.x -> 0.6.0 -Previously, the `validate` method was a _classmethod_, validating the payload -argument passed. This has now been made an instance method, which validates -the `self._payload` copy of the payload, from which the `Task` instance is -constructed. This is behaviorally the same, in that construction will fail if -validation fails, but allows implementers to utilize the instance method's -convenience functions. +Previously, the `validate` method was a _classmethod_, validating the payload argument +passed. This has now been made an instance method, which validates the `self._payload` +copy of the payload, from which the `Task` instance is constructed. This is +behaviorally the same, in that construction will fail if validation fails, but allows +implementers to utilize the instance method's convenience functions. Previous implementations of `validate` would have been similar to this: @@ -270,12 +287,13 @@ And will now need to be updated to this form: ## Development -Clone, install in editable mode with development requirements, and install the **pre-commit** hooks: +Clone, install in editable mode with development and test requirements, and install the +**pre-commit** hooks: ```shell git clone https://github.com/stac-utils/stac-task cd stac-task -pip install -e '.[dev]' +pip install -e '.[dev,test]' pre-commit install ``` @@ -293,4 +311,5 @@ pre-commit run --all-files ## Contributing -Use Github [issues](https://github.com/stac-utils/stac-task/issues) and [pull requests](https://github.com/stac-utils/stac-task/pulls). +Use Github [issues](https://github.com/stac-utils/stac-task/issues) and [pull +requests](https://github.com/stac-utils/stac-task/pulls). diff --git a/stactask/task.py b/stactask/task.py index cf365ce..f6b2b8e 100644 --- a/stactask/task.py +++ b/stactask/task.py @@ -75,7 +75,6 @@ def __init__( upload: bool = True, validate: bool = True, ): - self._payload = payload if not skip_validation and validate: @@ -108,15 +107,37 @@ def __init__( @property def process_definition(self) -> dict[str, Any]: - process = self._payload.get("process", {}) + process = self._payload.get("process", []) if isinstance(process, dict): + warnings.warn( + ( + "`process` as a bare dictionary will be unsupported in a future " + "version; wrap it in a list to remove this warning" + ), + DeprecationWarning, + stacklevel=2, + ) return process - else: - raise ValueError(f"process is not a dict: {type(process)}") + + if not isinstance(process, list): + raise TypeError("unable to parse `process`: must be type list") + + if not process: + return {} + + if not isinstance(process[0], dict): + raise TypeError( + ( + "unable to parse `process`: the first element of the list must be " + "a dictionary" + ) + ) + + return process[0] @property def parameters(self) -> dict[str, Any]: - task_configs = self.process_definition.get("tasks", []) + task_configs = self.process_definition.get("tasks", {}) if isinstance(task_configs, list): warnings.warn( "task configs is list, use a dictionary instead", diff --git a/tests/fixtures/sentinel2-l2a-j2k-payload.json b/tests/fixtures/sentinel2-l2a-j2k-payload.json index a6bf450..b760e6c 100644 --- a/tests/fixtures/sentinel2-l2a-j2k-payload.json +++ b/tests/fixtures/sentinel2-l2a-j2k-payload.json @@ -1,30 +1,32 @@ { "type": "FeatureCollection", "id": "sentinel-s2-l2a/workflow-test/S2B_17HQD_20201103_0_L2A", - "process": { - "input_collections": [ - "sentinel-2-l2a" - ], - "workflow": "cog-archive", - "upload_options": { - "path_template": "s3://sentinel-cogs/${collection}/${mgrs:utm_zone}/${mgrs:latitude_band}/${mgrs:grid_square}/${year}/${month}/${id}", - "public_assets": "ALL", - "collections": { - "sentinel-2-l2a": "$[?(@.id =~ 'S2[AB].*')]" - }, - "headers": { - "CacheControl": "public, max-age=31536000, immutable" - } - }, - "tasks": { - "nothing-task": { - "do_nothing": true + "process": [ + { + "input_collections": [ + "sentinel-2-l2a" + ], + "workflow": "cog-archive", + "upload_options": { + "path_template": "s3://sentinel-cogs/${collection}/${mgrs:utm_zone}/${mgrs:latitude_band}/${mgrs:grid_square}/${year}/${month}/${id}", + "public_assets": "ALL", + "collections": { + "sentinel-2-l2a": "$[?(@.id =~ 'S2[AB].*')]" + }, + "headers": { + "CacheControl": "public, max-age=31536000, immutable" + } }, - "derived-item-task": { - "parameter": "value" + "tasks": { + "nothing-task": { + "do_nothing": true + }, + "derived-item-task": { + "parameter": "value" + } } } - }, + ], "features": [ { "type": "Feature", diff --git a/tests/test_task.py b/tests/test_task.py index 6ee2dda..526976a 100644 --- a/tests/test_task.py +++ b/tests/test_task.py @@ -18,22 +18,22 @@ @pytest.fixture -def items() -> dict[str, Any]: +def payload() -> dict[str, Any]: filename = testpath / "fixtures" / "sentinel2-l2a-j2k-payload.json" with open(filename) as f: - items = json.loads(f.read()) - assert isinstance(items, dict) - return items + payload = json.loads(f.read()) + assert isinstance(payload, dict) + return payload @pytest.fixture -def nothing_task(items: dict[str, Any]) -> Task: - return NothingTask(items) +def nothing_task(payload: dict[str, Any]) -> Task: + return NothingTask(payload) @pytest.fixture -def derived_item_task(items: dict[str, Any]) -> Task: - return DerivedItemTask(items) +def derived_item_task(payload: dict[str, Any]) -> Task: + return DerivedItemTask(payload) def test_task_init(nothing_task: Task) -> None: @@ -43,14 +43,20 @@ def test_task_init(nothing_task: Task) -> None: assert nothing_task._save_workdir is False -def test_failed_validation(items: dict[str, Any]) -> None: +def test_failed_validation(payload: dict[str, Any]) -> None: with pytest.raises(FailedValidation, match="Extra context"): - FailValidateTask(items) + FailValidateTask(payload) + + +def test_deprecated_payload_dict(nothing_task: Task) -> None: + nothing_task._payload["process"] = nothing_task._payload["process"][0] + with pytest.warns(DeprecationWarning): + nothing_task.process_definition def test_edit_items(nothing_task: Task) -> None: nothing_task.process_definition["workflow"] = "test-task-workflow" - assert nothing_task._payload["process"]["workflow"] == "test-task-workflow" + assert nothing_task._payload["process"][0]["workflow"] == "test-task-workflow" def test_edit_items2(nothing_task: Task) -> None: @@ -58,8 +64,8 @@ def test_edit_items2(nothing_task: Task) -> None: @pytest.mark.parametrize("save_workdir", [False, True, None]) -def test_tmp_workdir(items: dict[str, Any], save_workdir: Optional[bool]) -> None: - t = NothingTask(items, save_workdir=save_workdir) +def test_tmp_workdir(payload: dict[str, Any], save_workdir: Optional[bool]) -> None: + t = NothingTask(payload, save_workdir=save_workdir) expected = save_workdir if save_workdir is not None else False assert t._save_workdir is expected workdir = t._workdir @@ -72,11 +78,11 @@ def test_tmp_workdir(items: dict[str, Any], save_workdir: Optional[bool]) -> Non @pytest.mark.parametrize("save_workdir", [False, True, None]) def test_workdir( - items: dict[str, Any], + payload: dict[str, Any], tmp_path: Path, save_workdir: Optional[bool], ) -> None: - t = NothingTask(items, workdir=tmp_path / "test_task", save_workdir=save_workdir) + t = NothingTask(payload, workdir=tmp_path / "test_task", save_workdir=save_workdir) expected = save_workdir if save_workdir is not None else True assert t._save_workdir is expected workdir = t._workdir @@ -87,12 +93,12 @@ def test_workdir( assert workdir.exists() is expected -def test_parameters(items: dict[str, Any]) -> None: - nothing_task = NothingTask(items) +def test_parameters(payload: dict[str, Any]) -> None: + nothing_task = NothingTask(payload) assert nothing_task.process_definition["workflow"] == "cog-archive" assert ( nothing_task.upload_options["path_template"] - == items["process"]["upload_options"]["path_template"] + == payload["process"][0]["upload_options"]["path_template"] ) @@ -101,7 +107,7 @@ def test_process(nothing_task: Task) -> None: assert processed_items[0]["type"] == "Feature" -def test_post_process(items: dict[str, Any]) -> None: +def test_post_process(payload: dict[str, Any]) -> None: class PostProcessTask(NothingTask): name = "post-processing-test" version = "42" @@ -111,8 +117,8 @@ def post_process_item(self, item: dict[str, Any]) -> dict[str, Any]: item["stac_extensions"].insert(0, "zzz") return super().post_process_item(item) - payload = PostProcessTask.handler(items) - for item in payload["features"]: + payload_out = PostProcessTask.handler(payload) + for item in payload_out["features"]: assert item["properties"]["foo"] == "bar" stac_extensions = item["stac_extensions"] assert item["stac_extensions"] == sorted(stac_extensions) @@ -126,9 +132,11 @@ def test_derived_item(derived_item_task: Task) -> None: assert links[0]["href"] == self_link["href"] -def test_task_handler(items: dict[str, Any]) -> None: - self_link = next(lk for lk in items["features"][0]["links"] if lk["rel"] == "self") - output_items = DerivedItemTask.handler(items) +def test_task_handler(payload: dict[str, Any]) -> None: + self_link = next( + lk for lk in payload["features"][0]["links"] if lk["rel"] == "self" + ) + output_items = DerivedItemTask.handler(payload) derived_link = next( lk for lk in output_items["features"][0]["links"] if lk["rel"] == "derived_from" ) @@ -188,7 +196,6 @@ def test_collection_mapping(nothing_task: Task) -> None: @mock_aws # type: ignore def test_s3_upload(nothing_task: Task) -> None: - # start S3 mocks s3_client = boto3.client("s3") s3_client.create_bucket(