diff --git a/CHANGELOG.md b/CHANGELOG.md index c2644c0..900a95c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## Changed +- Replaced the use of fsspec with stac-asset for downloading Item Assets - `--local` flag no longer turns off validation - The `processing:software` field is no longer added to Items by default. This is because the intention of the STAC Processing Extension is to add metadata about the diff --git a/README.md b/README.md index 5670321..31525af 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,4 @@ + # STAC Task (stac-task) [![Build Status](https://github.com/stac-utils/stac-task/workflows/CI/badge.svg?branch=main)](https://github.com/stac-utils/stac-task/actions/workflows/continuous-integration.yml) @@ -6,6 +7,20 @@ [![codecov](https://codecov.io/gh/stac-utils/stac-task/branch/main/graph/badge.svg)](https://codecov.io/gh/stac-utils/stac-task) [![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0) +- [Quickstart for Creating New Tasks](#quickstart-for-creating-new-tasks) +- [Task Input](#task-input) + - [ProcessDefinition Object](#processdefinition-object) + - [UploadOptions Object](#uploadoptions-object) + - [path\_template](#path_template) + - [collections](#collections) + - [tasks](#tasks) + - [TaskConfig Object](#taskconfig-object) +- [Full Process Definition Example](#full-process-definition-example) +- [Migration](#migration) + - [0.4.x -\> 0.5.x](#04x---05x) +- [Development](#development) +- [Contributing](#contributing) + This Python library consists of the Task class, which is used to create custom tasks based on a "STAC In, STAC Out" approach. The Task class acts as wrapper around custom code and provides several convenience methods for modifying STAC Items, creating derived Items, and providing a CLI. @@ -17,7 +32,7 @@ This library is based on a [branch of cirrus-lib](https://github.com/cirrus-geo/ ```python from typing import Any -from stactask import Task +from stactask import Task, DownloadConfig class MyTask(Task): name = "my-task" @@ -30,7 +45,10 @@ class MyTask(Task): item = self.items[0] # download a datafile - item = self.download_item_assets(item, assets=['data']) + item = self.download_item_assets( + item, + config=DownloadConfig(include=['data']) + ) # operate on the local file to create a new asset item = self.upload_item_assets_to_s3(item) @@ -41,32 +59,32 @@ class MyTask(Task): ## Task Input -| Field Name | Type | Description | -| ------------- | ---- | ----------- | -| type | string | Must be FeatureCollection | -| features | [Item] | A list of STAC `Item` | -| process | ProcessDefinition | A Process Definition | +| Field Name | Type | Description | +| ---------- | ----------------- | ------------------------- | +| type | string | Must be FeatureCollection | +| features | [Item] | A list of STAC `Item` | +| process | ProcessDefinition | A Process Definition | ### ProcessDefinition Object A STAC task can be provided additional configuration via the 'process' field in the input ItemCollection. -| Field Name | Type | Description | -| ------------- | ---- | ----------- | -| description | string | Optional description of the process configuration | -| upload_options | UploadOptions | Options used when uploading assets to a remote server | -| tasks | Map | Dictionary of task configurations. A List of [task configurations](#taskconfig-object) is supported for backwards compatibility reasons, but a dictionary should be preferred. | +| Field Name | Type | Description | +| -------------- | ------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | +| description | string | Optional description of the process configuration | +| upload_options | UploadOptions | Options used when uploading assets to a remote server | +| tasks | Map | Dictionary of task configurations. A list of [task configurations](#taskconfig-object) is supported for backwards compatibility reasons, but a dictionary should be preferred. | #### UploadOptions Object -| Field Name | Type | Description | -| ------------- | ---- | ----------- | -| path_template | string | **REQUIRED** A string template for specifying the location of uploaded assets | -| public_assets | [str] | A list of asset keys that should be marked as public when uploaded | -| headers | Map | A set of key, value headers to send when uploading data to s3 | -| collections | Map | A mapping of output collection name to a JSONPath pattern (for matching Items) | -| s3_urls | bool | Controls if the final published URLs should be an s3 (s3://*bucket*/*key*) or https URL | +| Field Name | Type | Description | +| ------------- | ------------- | --------------------------------------------------------------------------------------- | +| path_template | string | **REQUIRED** A string template for specifying the location of uploaded assets | +| public_assets | [str] | A list of asset keys that should be marked as public when uploaded | +| headers | Map | A set of key, value headers to send when uploading data to s3 | +| collections | Map | A mapping of output collection name to a JSONPath pattern (for matching Items) | +| s3_urls | bool | Controls if the final published URLs should be an s3 (s3://*bucket*/*key*) or https URL | ##### path_template @@ -121,10 +139,10 @@ would have `param2=value2` passed. If there were a `task-b` to be run it would n A Task Configuration contains information for running a specific task. -| Field Name | Type | Description | -| ------------- | ---- | ----------- | -| name | str | **REQUIRED** Name of the task | -| parameters | Map | Dictionary of keyword parameters that will be passed to the Tasks `process` function | +| Field Name | Type | Description | +| ---------- | ------------- | ------------------------------------------------------------------------------------ | +| name | str | **REQUIRED** Name of the task | +| parameters | Map | Dictionary of keyword parameters that will be passed to the Tasks `process` function | ## Full Process Definition Example @@ -147,6 +165,83 @@ Process definitions are sometimes called "Payloads": } ``` +## Migration + +### 0.4.x -> 0.5.x + +In 0.5.0, the previous use of fsspec to download Item Assets has been replaced with +the stac-asset library. This has necessitated a change in the parameters +that the download methods accept. + +The primary change is that the Task methods `download_item_assets` and +`download_items_assets` (items plural) now accept fewer explicit and implicit +(kwargs) parameters. + +Previously, the methods looked like: + +```python + def download_item_assets( + self, + item: Item, + path_template: str = "${collection}/${id}", + keep_original_filenames: bool = False, + **kwargs: Any, + ) -> Item: +``` + +but now look like: + +```python + def download_item_assets( + self, + item: Item, + path_template: str = "${collection}/${id}", + config: Optional[DownloadConfig] = None, + ) -> Item: +``` + +Similarly, the `asset_io` package methods were previously: + +```python +async def download_item_assets( + item: Item, + assets: Optional[list[str]] = None, + save_item: bool = True, + overwrite: bool = False, + path_template: str = "${collection}/${id}", + absolute_path: bool = False, + keep_original_filenames: bool = False, + **kwargs: Any, +) -> Item: +``` + +and are now: + +```python +async def download_item_assets( + item: Item, + path_template: str = "${collection}/${id}", + config: Optional[DownloadConfig] = None, +) -> Item: +``` + +Additionally, `kwargs` keys were set to pass configuration through to fsspec. The most common +parameter was `requester_pays`, to set the Requester Pays flag in AWS S3 requests. + +Many of these parameters can be directly translated into configuration passed in a +`DownloadConfig` object, which is just a wrapper over the `stac_asset.Config` object. + +Migration of these various parameters to `DownloadConfig` are as follows: + +- `assets`: set `include` +- `requester_pays`: set `s3_requester_pays` = True +- `keep_original_filenames`: set `file_name_strategy` to + `FileNameStrategy.FILE_NAME` if True or `FileNameStrategy.KEY` if False +- `overwrite`: set `overwrite` +- `save_item`: none, Item is always saved +- `absolute_path`: none. To create or retrieve the Asset hrefs as absolute paths, use either + `Item#make_all_asset_hrefs_absolute()` or `Asset#get_absolute_href()` + ## Development Clone, install in editable mode with development requirements, and install the **pre-commit** hooks: diff --git a/pyproject.toml b/pyproject.toml index c043bae..b723dc0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,6 +24,7 @@ dependencies = [ "python-dateutil>=2.7.0", "boto3-utils>=0.3.2", "fsspec>=2022.8.2", + "stac-asset>=0.3.0", "jsonpath_ng>=1.5.3", "requests>=2.28.1", "s3fs>=2022.8.2", @@ -37,6 +38,7 @@ dev = [ "pre-commit~=3.7", "ruff~=0.4.1", "types-setuptools~=69.0", + "boto3-stubs", ] test = [ "pytest~=8.0", diff --git a/stactask/__init__.py b/stactask/__init__.py index d68fab3..c59809f 100644 --- a/stactask/__init__.py +++ b/stactask/__init__.py @@ -6,6 +6,7 @@ # package is not installed pass +from .config import DownloadConfig from .task import Task -__all__ = ["Task"] +__all__ = ["Task", "DownloadConfig"] diff --git a/stactask/asset_io.py b/stactask/asset_io.py index 9bdfaab..63629fe 100644 --- a/stactask/asset_io.py +++ b/stactask/asset_io.py @@ -1,102 +1,55 @@ import asyncio import logging -import os from os import path as op from typing import Any, Iterable, Optional, Union -from urllib.parse import urlparse -import fsspec +import stac_asset from boto3utils import s3 -from fsspec import AbstractFileSystem from pystac import Item from pystac.layout import LayoutTemplate +from .config import DownloadConfig + logger = logging.getLogger(__name__) # global dictionary of sessions per bucket global_s3_client = s3() -SIMULTANEOUS_DOWNLOADS = int(os.getenv("STAC_SIMULTANEOUS_DOWNLOADS", 3)) -sem = asyncio.Semaphore(SIMULTANEOUS_DOWNLOADS) - - -async def download_file(fs: AbstractFileSystem, src: str, dest: str) -> None: - async with sem: - logger.debug(f"{src} start") - if hasattr(fs, "_get_file"): - await fs._get_file(src, dest) - elif hasattr(fs, "get_file"): - fs.get_file(src, dest) - else: - raise NotImplementedError( - "stac-task only supports filesystems providing" - " `get_file` or `_get_file` interface" - ) - logger.debug(f"{src} completed") - async def download_item_assets( item: Item, - assets: Optional[list[str]] = None, - save_item: bool = True, - overwrite: bool = False, path_template: str = "${collection}/${id}", - absolute_path: bool = False, - keep_original_filenames: bool = False, - **kwargs: Any, + config: Optional[DownloadConfig] = None, + keep_non_downloaded: bool = True, ) -> Item: - _assets = item.assets.keys() if assets is None else assets - - # determine path from template and item - layout = LayoutTemplate(path_template) - path = layout.substitute(item) - - # make necessary directories - os.makedirs(path, exist_ok=True) + return await stac_asset.download_item( + item=item.clone(), + directory=LayoutTemplate(path_template).substitute(item), + file_name="item.json", + config=config, + keep_non_downloaded=keep_non_downloaded, + ) - new_item = item.clone() - - tasks = [] - for a in _assets: - if a not in item.assets: - continue - href = item.assets[a].href - # local filename - url_path = urlparse(href).path - if keep_original_filenames: - basename = os.path.basename(url_path) - else: - basename = a + os.path.splitext(url_path)[1] - new_href = os.path.join(path, basename) - if absolute_path: - new_href = os.path.abspath(new_href) - - # save file - if not os.path.exists(new_href) or overwrite: - fs = fsspec.core.url_to_fs(href, asynchronous=True, **kwargs)[0] - task = asyncio.create_task(download_file(fs, href, new_href)) - tasks.append(task) - - # update - new_item.assets[a].href = new_href - - await asyncio.gather(*tasks) - - # save Item metadata alongside saved assets - if save_item: - new_item.remove_links("root") - new_item.save_object(dest_href=os.path.join(path, "item.json")) - - return new_item - - -async def download_items_assets(items: Iterable[Item], **kwargs: Any) -> list[Item]: - tasks = [] - for item in items: - tasks.append(asyncio.create_task(download_item_assets(item, **kwargs))) - new_items: list[Item] = await asyncio.gather(*tasks) - return new_items +async def download_items_assets( + items: Iterable[Item], + path_template: str = "${collection}/${id}", + config: Optional[DownloadConfig] = None, + keep_non_downloaded: bool = True, +) -> list[Item]: + return await asyncio.gather( + *[ + asyncio.create_task( + download_item_assets( + item=item, + path_template=path_template, + config=config, + keep_non_downloaded=keep_non_downloaded, + ) + ) + for item in items + ] + ) def upload_item_assets_to_s3( @@ -107,7 +60,7 @@ def upload_item_assets_to_s3( s3_urls: bool = False, headers: Optional[dict[str, Any]] = None, s3_client: Optional[s3] = None, - **kwargs: Any, + **kwargs: Any, # unused, but retain to permit unused attributes from upload_options ) -> Item: """Upload Item assets to an S3 bucket Args: diff --git a/stactask/config.py b/stactask/config.py new file mode 100644 index 0000000..8ee65ce --- /dev/null +++ b/stactask/config.py @@ -0,0 +1,8 @@ +from dataclasses import dataclass + +from stac_asset import Config + + +@dataclass +class DownloadConfig(Config): # type: ignore + pass diff --git a/stactask/task.py b/stactask/task.py index 97e02c8..ac4929a 100644 --- a/stactask/task.py +++ b/stactask/task.py @@ -22,6 +22,7 @@ download_items_assets, upload_item_assets_to_s3, ) +from .config import DownloadConfig from .exceptions import FailedValidation from .logging import TaskLoggerAdapter from .utils import find_collection as utils_find_collection @@ -240,8 +241,8 @@ def download_item_assets( self, item: Item, path_template: str = "${collection}/${id}", - keep_original_filenames: bool = False, - **kwargs: Any, + config: Optional[DownloadConfig] = None, + keep_non_downloaded: bool = True, ) -> Item: """Download provided asset keys for the given item. Assets are saved in workdir in a directory (as specified by path_template), and @@ -256,24 +257,21 @@ def download_item_assets( keep_original_filenames (Optional[bool]): Controls whether original file names should be used, or asset key + extension. """ - outdir = str(self._workdir / path_template) - loop = asyncio.get_event_loop() - item = loop.run_until_complete( + return asyncio.get_event_loop().run_until_complete( download_item_assets( item, - path_template=outdir, - keep_original_filenames=keep_original_filenames, - **kwargs, + path_template=str(self._workdir / path_template), + config=config, + keep_non_downloaded=keep_non_downloaded, ) ) - return item def download_items_assets( self, items: Iterable[Item], path_template: str = "${collection}/${id}", - keep_original_filenames: bool = False, - **kwargs: Any, + config: Optional[DownloadConfig] = None, + keep_non_downloaded: bool = True, ) -> list[Item]: """Download provided asset keys for the given items. Assets are saved in workdir in a directory (as specified by path_template), and @@ -289,17 +287,16 @@ def download_items_assets( keep_original_filenames (Optional[bool]): Controls whether original file names should be used, or asset key + extension. """ - outdir = str(self._workdir / path_template) - loop = asyncio.get_event_loop() - items = loop.run_until_complete( - download_items_assets( - items, - path_template=outdir, - keep_original_filenames=keep_original_filenames, - **kwargs, + return list( + asyncio.get_event_loop().run_until_complete( + download_items_assets( + items, + path_template=str(self._workdir / path_template), + config=config, + keep_non_downloaded=keep_non_downloaded, + ) ) ) - return list(items) def upload_item_assets_to_s3( self, diff --git a/tests/test_task_download.py b/tests/test_task_download.py index e66cb01..50f927c 100644 --- a/tests/test_task_download.py +++ b/tests/test_task_download.py @@ -1,8 +1,12 @@ import json +import os from pathlib import Path from typing import Any import pytest +import stac_asset + +from stactask.config import DownloadConfig from .tasks import NothingTask @@ -17,22 +21,47 @@ def item_collection() -> dict[str, Any]: return items -def test_download_nosuch_asset(item_collection: dict[str, Any]) -> None: +def test_download_nosuch_asset(tmp_path: Path, item_collection: dict[str, Any]) -> None: + t = NothingTask( + item_collection, + workdir=tmp_path / "test-task-download-nosuch-asset", + save_workdir=True, + ) + item = t.download_item_assets( + t.items[0], config=DownloadConfig(include=["nosuch_asset"]) + ) + + # new item has same assets hrefs as old item + assert [x.href for x in item.assets.values()] == [ + x.href for x in t.items[0].assets.values() + ] + + +def test_download_asset_dont_keep_existing( + tmp_path: Path, item_collection: dict[str, Any] +) -> None: t = NothingTask( item_collection, + workdir=tmp_path / "test-task-download-nosuch-asset", + save_workdir=True, + ) + item = t.download_item_assets( + t.items[0], + config=DownloadConfig(include=["nosuch_asset"]), + keep_non_downloaded=False, ) - item = t.download_item_assets(t.items[0], assets=["nosuch_asset"]).to_dict() - # new item same as old item - assert item["assets"] == t.items[0].to_dict()["assets"] + + # new item has no assets + assert item.assets == {} # @vcr.use_cassette(str(cassettepath / 'download_assets')) def test_download_item_asset(tmp_path: Path, item_collection: dict[str, Any]) -> None: t = NothingTask(item_collection, workdir=tmp_path / "test-task-download-item-asset") - item = t.download_item_assets(t.items[0], assets=["tileinfo_metadata"]).to_dict() - fname = item["assets"]["tileinfo_metadata"]["href"] - filename = Path(fname) - assert filename.is_file() is True + item = t.download_item_assets( + t.items[0], config=DownloadConfig(include=["tileinfo_metadata"]) + ) + assert Path(item.assets["tileinfo_metadata"].get_absolute_href()).is_file() def test_download_keep_original_filenames( @@ -43,7 +72,11 @@ def test_download_keep_original_filenames( workdir=tmp_path / "test-task-download-item-asset", ) item = t.download_item_assets( - t.items[0], assets=["tileinfo_metadata"], keep_original_filenames=True + t.items[0], + config=DownloadConfig( + include=["tileinfo_metadata"], + file_name_strategy=stac_asset.FileNameStrategy.FILE_NAME, + ), ).to_dict() fname = item["assets"]["tileinfo_metadata"]["href"] filename = Path(fname) @@ -54,19 +87,25 @@ def test_download_item_asset_local( tmp_path: Path, item_collection: dict[str, Any] ) -> None: t = NothingTask(item_collection, workdir=tmp_path / "test-task-download-item-asset") - item = t.download_item_assets(t.items[0], assets=["tileinfo_metadata"]) - fname = item.assets["tileinfo_metadata"].href - filename = Path(fname) - assert filename.is_file() is True + item = t.download_item_assets( + t.items[0], config=DownloadConfig(include=["tileinfo_metadata"]) + ) + + assert ( + Path(os.path.dirname(item.self_href)) / item.assets["tileinfo_metadata"].href + ).is_file() + # Downloaded to local, as in prev test. # With the asset hrefs updated by the prev download, we "download" again to subdir item = t.download_item_assets( - item, assets=["tileinfo_metadata"], path_template="again/${collection}/${id}" + item=item, + config=DownloadConfig(include=["tileinfo_metadata"]), + path_template="again/${collection}/${id}", ) - href = item.assets["tileinfo_metadata"].href + assert "again" in item.self_href + href = item.assets["tileinfo_metadata"].get_absolute_href() assert "again" in href - filename = Path(href) - assert filename.is_file() is True + assert Path(href).is_file() # @vcr.use_cassette(str(cassettepath / 'download_assets')) @@ -77,12 +116,12 @@ def test_download_item_assets(tmp_path: Path, item_collection: dict[str, Any]) - save_workdir=True, ) item = t.download_item_assets( - t.items[0], assets=["tileinfo_metadata", "granule_metadata"] - ).to_dict() - filename = Path(item["assets"]["tileinfo_metadata"]["href"]) - assert filename.is_file() is True - filename = Path(item["assets"]["granule_metadata"]["href"]) - assert filename.is_file() is True + t.items[0], + config=DownloadConfig(include=["tileinfo_metadata", "granule_metadata"]), + ) + + assert Path(item.assets["tileinfo_metadata"].get_absolute_href()).is_file() + assert Path(item.assets["granule_metadata"].get_absolute_href()).is_file() def test_download_items_assets(tmp_path: Path, item_collection: dict[str, Any]) -> None: @@ -92,11 +131,11 @@ def test_download_items_assets(tmp_path: Path, item_collection: dict[str, Any]) workdir=tmp_path / "test-task-download-items-assets", save_workdir=True, ) - items = [i.to_dict() for i in t.download_items_assets(t.items, assets=[asset_key])] - filename = Path(items[0]["assets"][asset_key]["href"]) - assert filename.is_file() is True - filename = Path(items[1]["assets"][asset_key]["href"]) - assert filename.is_file() is True + items = t.download_items_assets(t.items, config=DownloadConfig(include=[asset_key])) + + assert len(items) == 2 + for item in items: + assert Path(item.assets[asset_key].get_absolute_href()).is_file() # @vcr.use_cassette(str(cassettepath / 'download_assets')) @@ -108,8 +147,7 @@ def test_download_large_asset(tmp_path: Path, item_collection: dict[str, Any]) - save_workdir=True, ) item = t.download_item_assets( - t.items[0], assets=["red"], requester_pays=True - ).to_dict() - filename = Path(item["assets"]["red"]["href"]) - assert filename.is_file() is True - del t + t.items[0], config=DownloadConfig(s3_requester_pays=True, include=["red"]) + ) + + assert Path(item.assets["red"].get_absolute_href()).is_file()