Skip to content

Commit

Permalink
replace use of fsspec for downloading Item Assets with stac-asset (#120)
Browse files Browse the repository at this point in the history
* replace use of fsspec for downloading Item Assets with stac-asset

* changelog
  • Loading branch information
Phil Varner authored May 8, 2024
1 parent 7e66cf5 commit b85ec7a
Show file tree
Hide file tree
Showing 8 changed files with 251 additions and 156 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

## Changed

- Replaced the use of fsspec with stac-asset for downloading Item Assets
- `--local` flag no longer turns off validation
- The `processing:software` field is no longer added to Items by default. This is
because the intention of the STAC Processing Extension is to add metadata about the
Expand Down
141 changes: 118 additions & 23 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
<!-- omit from toc -->
# STAC Task (stac-task)

[![Build Status](https://github.com/stac-utils/stac-task/workflows/CI/badge.svg?branch=main)](https://github.com/stac-utils/stac-task/actions/workflows/continuous-integration.yml)
Expand All @@ -6,6 +7,20 @@
[![codecov](https://codecov.io/gh/stac-utils/stac-task/branch/main/graph/badge.svg)](https://codecov.io/gh/stac-utils/stac-task)
[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)

- [Quickstart for Creating New Tasks](#quickstart-for-creating-new-tasks)
- [Task Input](#task-input)
- [ProcessDefinition Object](#processdefinition-object)
- [UploadOptions Object](#uploadoptions-object)
- [path\_template](#path_template)
- [collections](#collections)
- [tasks](#tasks)
- [TaskConfig Object](#taskconfig-object)
- [Full Process Definition Example](#full-process-definition-example)
- [Migration](#migration)
- [0.4.x -\> 0.5.x](#04x---05x)
- [Development](#development)
- [Contributing](#contributing)

This Python library consists of the Task class, which is used to create custom tasks based
on a "STAC In, STAC Out" approach. The Task class acts as wrapper around custom code and provides
several convenience methods for modifying STAC Items, creating derived Items, and providing a CLI.
Expand All @@ -17,7 +32,7 @@ This library is based on a [branch of cirrus-lib](https://github.com/cirrus-geo/
```python
from typing import Any

from stactask import Task
from stactask import Task, DownloadConfig

class MyTask(Task):
name = "my-task"
Expand All @@ -30,7 +45,10 @@ class MyTask(Task):
item = self.items[0]

# download a datafile
item = self.download_item_assets(item, assets=['data'])
item = self.download_item_assets(
item,
config=DownloadConfig(include=['data'])
)

# operate on the local file to create a new asset
item = self.upload_item_assets_to_s3(item)
Expand All @@ -41,32 +59,32 @@ class MyTask(Task):

## Task Input

| Field Name | Type | Description |
| ------------- | ---- | ----------- |
| type | string | Must be FeatureCollection |
| features | [Item] | A list of STAC `Item` |
| process | ProcessDefinition | A Process Definition |
| Field Name | Type | Description |
| ---------- | ----------------- | ------------------------- |
| type | string | Must be FeatureCollection |
| features | [Item] | A list of STAC `Item` |
| process | ProcessDefinition | A Process Definition |

### ProcessDefinition Object

A STAC task can be provided additional configuration via the 'process' field in the input
ItemCollection.

| Field Name | Type | Description |
| ------------- | ---- | ----------- |
| description | string | Optional description of the process configuration |
| upload_options | UploadOptions | Options used when uploading assets to a remote server |
| tasks | Map<str, Map> | Dictionary of task configurations. A List of [task configurations](#taskconfig-object) is supported for backwards compatibility reasons, but a dictionary should be preferred. |
| Field Name | Type | Description |
| -------------- | ------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| description | string | Optional description of the process configuration |
| upload_options | UploadOptions | Options used when uploading assets to a remote server |
| tasks | Map<str, Map> | Dictionary of task configurations. A list of [task configurations](#taskconfig-object) is supported for backwards compatibility reasons, but a dictionary should be preferred. |

#### UploadOptions Object

| Field Name | Type | Description |
| ------------- | ---- | ----------- |
| path_template | string | **REQUIRED** A string template for specifying the location of uploaded assets |
| public_assets | [str] | A list of asset keys that should be marked as public when uploaded |
| headers | Map<str, str> | A set of key, value headers to send when uploading data to s3 |
| collections | Map<str, str> | A mapping of output collection name to a JSONPath pattern (for matching Items) |
| s3_urls | bool | Controls if the final published URLs should be an s3 (s3://*bucket*/*key*) or https URL |
| Field Name | Type | Description |
| ------------- | ------------- | --------------------------------------------------------------------------------------- |
| path_template | string | **REQUIRED** A string template for specifying the location of uploaded assets |
| public_assets | [str] | A list of asset keys that should be marked as public when uploaded |
| headers | Map<str, str> | A set of key, value headers to send when uploading data to s3 |
| collections | Map<str, str> | A mapping of output collection name to a JSONPath pattern (for matching Items) |
| s3_urls | bool | Controls if the final published URLs should be an s3 (s3://*bucket*/*key*) or https URL |

##### path_template

Expand Down Expand Up @@ -121,10 +139,10 @@ would have `param2=value2` passed. If there were a `task-b` to be run it would n

A Task Configuration contains information for running a specific task.

| Field Name | Type | Description |
| ------------- | ---- | ----------- |
| name | str | **REQUIRED** Name of the task |
| parameters | Map<str, str> | Dictionary of keyword parameters that will be passed to the Tasks `process` function |
| Field Name | Type | Description |
| ---------- | ------------- | ------------------------------------------------------------------------------------ |
| name | str | **REQUIRED** Name of the task |
| parameters | Map<str, str> | Dictionary of keyword parameters that will be passed to the Tasks `process` function |

## Full Process Definition Example

Expand All @@ -147,6 +165,83 @@ Process definitions are sometimes called "Payloads":
}
```

## Migration

### 0.4.x -> 0.5.x

In 0.5.0, the previous use of fsspec to download Item Assets has been replaced with
the stac-asset library. This has necessitated a change in the parameters
that the download methods accept.

The primary change is that the Task methods `download_item_assets` and
`download_items_assets` (items plural) now accept fewer explicit and implicit
(kwargs) parameters.

Previously, the methods looked like:

```python
def download_item_assets(
self,
item: Item,
path_template: str = "${collection}/${id}",
keep_original_filenames: bool = False,
**kwargs: Any,
) -> Item:
```

but now look like:

```python
def download_item_assets(
self,
item: Item,
path_template: str = "${collection}/${id}",
config: Optional[DownloadConfig] = None,
) -> Item:
```

Similarly, the `asset_io` package methods were previously:

```python
async def download_item_assets(
item: Item,
assets: Optional[list[str]] = None,
save_item: bool = True,
overwrite: bool = False,
path_template: str = "${collection}/${id}",
absolute_path: bool = False,
keep_original_filenames: bool = False,
**kwargs: Any,
) -> Item:
```

and are now:

```python
async def download_item_assets(
item: Item,
path_template: str = "${collection}/${id}",
config: Optional[DownloadConfig] = None,
) -> Item:
```

Additionally, `kwargs` keys were set to pass configuration through to fsspec. The most common
parameter was `requester_pays`, to set the Requester Pays flag in AWS S3 requests.

Many of these parameters can be directly translated into configuration passed in a
`DownloadConfig` object, which is just a wrapper over the `stac_asset.Config` object.

Migration of these various parameters to `DownloadConfig` are as follows:

- `assets`: set `include`
- `requester_pays`: set `s3_requester_pays` = True
- `keep_original_filenames`: set `file_name_strategy` to
`FileNameStrategy.FILE_NAME` if True or `FileNameStrategy.KEY` if False
- `overwrite`: set `overwrite`
- `save_item`: none, Item is always saved
- `absolute_path`: none. To create or retrieve the Asset hrefs as absolute paths, use either
`Item#make_all_asset_hrefs_absolute()` or `Asset#get_absolute_href()`

## Development

Clone, install in editable mode with development requirements, and install the **pre-commit** hooks:
Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ dependencies = [
"python-dateutil>=2.7.0",
"boto3-utils>=0.3.2",
"fsspec>=2022.8.2",
"stac-asset>=0.3.0",
"jsonpath_ng>=1.5.3",
"requests>=2.28.1",
"s3fs>=2022.8.2",
Expand All @@ -37,6 +38,7 @@ dev = [
"pre-commit~=3.7",
"ruff~=0.4.1",
"types-setuptools~=69.0",
"boto3-stubs",
]
test = [
"pytest~=8.0",
Expand Down
3 changes: 2 additions & 1 deletion stactask/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
# package is not installed
pass

from .config import DownloadConfig
from .task import Task

__all__ = ["Task"]
__all__ = ["Task", "DownloadConfig"]
111 changes: 32 additions & 79 deletions stactask/asset_io.py
Original file line number Diff line number Diff line change
@@ -1,102 +1,55 @@
import asyncio
import logging
import os
from os import path as op
from typing import Any, Iterable, Optional, Union
from urllib.parse import urlparse

import fsspec
import stac_asset
from boto3utils import s3
from fsspec import AbstractFileSystem
from pystac import Item
from pystac.layout import LayoutTemplate

from .config import DownloadConfig

logger = logging.getLogger(__name__)

# global dictionary of sessions per bucket
global_s3_client = s3()

SIMULTANEOUS_DOWNLOADS = int(os.getenv("STAC_SIMULTANEOUS_DOWNLOADS", 3))
sem = asyncio.Semaphore(SIMULTANEOUS_DOWNLOADS)


async def download_file(fs: AbstractFileSystem, src: str, dest: str) -> None:
async with sem:
logger.debug(f"{src} start")
if hasattr(fs, "_get_file"):
await fs._get_file(src, dest)
elif hasattr(fs, "get_file"):
fs.get_file(src, dest)
else:
raise NotImplementedError(
"stac-task only supports filesystems providing"
" `get_file` or `_get_file` interface"
)
logger.debug(f"{src} completed")


async def download_item_assets(
item: Item,
assets: Optional[list[str]] = None,
save_item: bool = True,
overwrite: bool = False,
path_template: str = "${collection}/${id}",
absolute_path: bool = False,
keep_original_filenames: bool = False,
**kwargs: Any,
config: Optional[DownloadConfig] = None,
keep_non_downloaded: bool = True,
) -> Item:
_assets = item.assets.keys() if assets is None else assets

# determine path from template and item
layout = LayoutTemplate(path_template)
path = layout.substitute(item)

# make necessary directories
os.makedirs(path, exist_ok=True)
return await stac_asset.download_item(
item=item.clone(),
directory=LayoutTemplate(path_template).substitute(item),
file_name="item.json",
config=config,
keep_non_downloaded=keep_non_downloaded,
)

new_item = item.clone()

tasks = []
for a in _assets:
if a not in item.assets:
continue
href = item.assets[a].href

# local filename
url_path = urlparse(href).path
if keep_original_filenames:
basename = os.path.basename(url_path)
else:
basename = a + os.path.splitext(url_path)[1]
new_href = os.path.join(path, basename)
if absolute_path:
new_href = os.path.abspath(new_href)

# save file
if not os.path.exists(new_href) or overwrite:
fs = fsspec.core.url_to_fs(href, asynchronous=True, **kwargs)[0]
task = asyncio.create_task(download_file(fs, href, new_href))
tasks.append(task)

# update
new_item.assets[a].href = new_href

await asyncio.gather(*tasks)

# save Item metadata alongside saved assets
if save_item:
new_item.remove_links("root")
new_item.save_object(dest_href=os.path.join(path, "item.json"))

return new_item


async def download_items_assets(items: Iterable[Item], **kwargs: Any) -> list[Item]:
tasks = []
for item in items:
tasks.append(asyncio.create_task(download_item_assets(item, **kwargs)))
new_items: list[Item] = await asyncio.gather(*tasks)
return new_items
async def download_items_assets(
items: Iterable[Item],
path_template: str = "${collection}/${id}",
config: Optional[DownloadConfig] = None,
keep_non_downloaded: bool = True,
) -> list[Item]:
return await asyncio.gather(
*[
asyncio.create_task(
download_item_assets(
item=item,
path_template=path_template,
config=config,
keep_non_downloaded=keep_non_downloaded,
)
)
for item in items
]
)


def upload_item_assets_to_s3(
Expand All @@ -107,7 +60,7 @@ def upload_item_assets_to_s3(
s3_urls: bool = False,
headers: Optional[dict[str, Any]] = None,
s3_client: Optional[s3] = None,
**kwargs: Any,
**kwargs: Any, # unused, but retain to permit unused attributes from upload_options
) -> Item:
"""Upload Item assets to an S3 bucket
Args:
Expand Down
8 changes: 8 additions & 0 deletions stactask/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from dataclasses import dataclass

from stac_asset import Config


@dataclass
class DownloadConfig(Config): # type: ignore
pass
Loading

0 comments on commit b85ec7a

Please sign in to comment.