Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

replace use of fsspec for downloading Item Assets with stac-asset #120

Merged
merged 3 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

## Changed

- Replaced the use of fsspec with stac-asset for downloading Item Assets
- `--local` flag no longer turns off validation
- The `processing:software` field is no longer added to Items by default. This is
because the intention of the STAC Processing Extension is to add metadata about the
Expand Down
141 changes: 118 additions & 23 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
<!-- omit from toc -->
# STAC Task (stac-task)

[![Build Status](https://github.com/stac-utils/stac-task/workflows/CI/badge.svg?branch=main)](https://github.com/stac-utils/stac-task/actions/workflows/continuous-integration.yml)
Expand All @@ -6,6 +7,20 @@
[![codecov](https://codecov.io/gh/stac-utils/stac-task/branch/main/graph/badge.svg)](https://codecov.io/gh/stac-utils/stac-task)
[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)

- [Quickstart for Creating New Tasks](#quickstart-for-creating-new-tasks)
- [Task Input](#task-input)
- [ProcessDefinition Object](#processdefinition-object)
- [UploadOptions Object](#uploadoptions-object)
- [path\_template](#path_template)
- [collections](#collections)
- [tasks](#tasks)
- [TaskConfig Object](#taskconfig-object)
- [Full Process Definition Example](#full-process-definition-example)
- [Migration](#migration)
- [0.4.x -\> 0.5.x](#04x---05x)
- [Development](#development)
- [Contributing](#contributing)

This Python library consists of the Task class, which is used to create custom tasks based
on a "STAC In, STAC Out" approach. The Task class acts as wrapper around custom code and provides
several convenience methods for modifying STAC Items, creating derived Items, and providing a CLI.
Expand All @@ -17,7 +32,7 @@ This library is based on a [branch of cirrus-lib](https://github.com/cirrus-geo/
```python
from typing import Any

from stactask import Task
from stactask import Task, DownloadConfig

class MyTask(Task):
name = "my-task"
Expand All @@ -30,7 +45,10 @@ class MyTask(Task):
item = self.items[0]

# download a datafile
item = self.download_item_assets(item, assets=['data'])
item = self.download_item_assets(
item,
config=DownloadConfig(include=['data'])
)

# operate on the local file to create a new asset
item = self.upload_item_assets_to_s3(item)
Expand All @@ -41,32 +59,32 @@ class MyTask(Task):

## Task Input

| Field Name | Type | Description |
| ------------- | ---- | ----------- |
| type | string | Must be FeatureCollection |
| features | [Item] | A list of STAC `Item` |
| process | ProcessDefinition | A Process Definition |
| Field Name | Type | Description |
| ---------- | ----------------- | ------------------------- |
| type | string | Must be FeatureCollection |
| features | [Item] | A list of STAC `Item` |
| process | ProcessDefinition | A Process Definition |

### ProcessDefinition Object

A STAC task can be provided additional configuration via the 'process' field in the input
ItemCollection.

| Field Name | Type | Description |
| ------------- | ---- | ----------- |
| description | string | Optional description of the process configuration |
| upload_options | UploadOptions | Options used when uploading assets to a remote server |
| tasks | Map<str, Map> | Dictionary of task configurations. A List of [task configurations](#taskconfig-object) is supported for backwards compatibility reasons, but a dictionary should be preferred. |
| Field Name | Type | Description |
| -------------- | ------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| description | string | Optional description of the process configuration |
| upload_options | UploadOptions | Options used when uploading assets to a remote server |
| tasks | Map<str, Map> | Dictionary of task configurations. A list of [task configurations](#taskconfig-object) is supported for backwards compatibility reasons, but a dictionary should be preferred. |

#### UploadOptions Object

| Field Name | Type | Description |
| ------------- | ---- | ----------- |
| path_template | string | **REQUIRED** A string template for specifying the location of uploaded assets |
| public_assets | [str] | A list of asset keys that should be marked as public when uploaded |
| headers | Map<str, str> | A set of key, value headers to send when uploading data to s3 |
| collections | Map<str, str> | A mapping of output collection name to a JSONPath pattern (for matching Items) |
| s3_urls | bool | Controls if the final published URLs should be an s3 (s3://*bucket*/*key*) or https URL |
| Field Name | Type | Description |
| ------------- | ------------- | --------------------------------------------------------------------------------------- |
| path_template | string | **REQUIRED** A string template for specifying the location of uploaded assets |
| public_assets | [str] | A list of asset keys that should be marked as public when uploaded |
| headers | Map<str, str> | A set of key, value headers to send when uploading data to s3 |
| collections | Map<str, str> | A mapping of output collection name to a JSONPath pattern (for matching Items) |
| s3_urls | bool | Controls if the final published URLs should be an s3 (s3://*bucket*/*key*) or https URL |

##### path_template

Expand Down Expand Up @@ -121,10 +139,10 @@ would have `param2=value2` passed. If there were a `task-b` to be run it would n

A Task Configuration contains information for running a specific task.

| Field Name | Type | Description |
| ------------- | ---- | ----------- |
| name | str | **REQUIRED** Name of the task |
| parameters | Map<str, str> | Dictionary of keyword parameters that will be passed to the Tasks `process` function |
| Field Name | Type | Description |
| ---------- | ------------- | ------------------------------------------------------------------------------------ |
| name | str | **REQUIRED** Name of the task |
| parameters | Map<str, str> | Dictionary of keyword parameters that will be passed to the Tasks `process` function |

## Full Process Definition Example

Expand All @@ -147,6 +165,83 @@ Process definitions are sometimes called "Payloads":
}
```

## Migration

### 0.4.x -> 0.5.x

In 0.5.0, the previous use of fsspec to download Item Assets has been replaced with
the stac-asset library. This has necessitated a change in the parameters
that the download methods accept.

The primary change is that the Task methods `download_item_assets` and
`download_items_assets` (items plural) now accept fewer explicit and implicit
(kwargs) parameters.

Previously, the methods looked like:

```python
def download_item_assets(
self,
item: Item,
path_template: str = "${collection}/${id}",
keep_original_filenames: bool = False,
**kwargs: Any,
) -> Item:
```

but now look like:

```python
def download_item_assets(
self,
item: Item,
path_template: str = "${collection}/${id}",
config: Optional[DownloadConfig] = None,
) -> Item:
```

Similarly, the `asset_io` package methods were previously:

```python
async def download_item_assets(
item: Item,
assets: Optional[list[str]] = None,
save_item: bool = True,
overwrite: bool = False,
path_template: str = "${collection}/${id}",
absolute_path: bool = False,
keep_original_filenames: bool = False,
**kwargs: Any,
) -> Item:
```

and are now:

```python
async def download_item_assets(
item: Item,
path_template: str = "${collection}/${id}",
config: Optional[DownloadConfig] = None,
) -> Item:
```

Additionally, `kwargs` keys were set to pass configuration through to fsspec. The most common
parameter was `requester_pays`, to set the Requester Pays flag in AWS S3 requests.

Many of these parameters can be directly translated into configuration passed in a
`DownloadConfig` object, which is just a wrapper over the `stac_asset.Config` object.

Migration of these various parameters to `DownloadConfig` are as follows:

- `assets`: set `include`
- `requester_pays`: set `s3_requester_pays` = True
- `keep_original_filenames`: set `file_name_strategy` to
`FileNameStrategy.FILE_NAME` if True or `FileNameStrategy.KEY` if False
- `overwrite`: set `overwrite`
- `save_item`: none, Item is always saved
- `absolute_path`: none. To create or retrieve the Asset hrefs as absolute paths, use either
`Item#make_all_asset_hrefs_absolute()` or `Asset#get_absolute_href()`

## Development

Clone, install in editable mode with development requirements, and install the **pre-commit** hooks:
Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ dependencies = [
"python-dateutil>=2.7.0",
"boto3-utils>=0.3.2",
"fsspec>=2022.8.2",
"stac-asset>=0.3.0",
"jsonpath_ng>=1.5.3",
"requests>=2.28.1",
"s3fs>=2022.8.2",
Expand All @@ -37,6 +38,7 @@ dev = [
"pre-commit~=3.7",
"ruff~=0.4.1",
"types-setuptools~=69.0",
"boto3-stubs",
]
test = [
"pytest~=8.0",
Expand Down
3 changes: 2 additions & 1 deletion stactask/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
# package is not installed
pass

from .config import DownloadConfig
from .task import Task

__all__ = ["Task"]
__all__ = ["Task", "DownloadConfig"]
111 changes: 32 additions & 79 deletions stactask/asset_io.py
Original file line number Diff line number Diff line change
@@ -1,102 +1,55 @@
import asyncio
import logging
import os
from os import path as op
from typing import Any, Iterable, Optional, Union
from urllib.parse import urlparse

import fsspec
import stac_asset
from boto3utils import s3
from fsspec import AbstractFileSystem
from pystac import Item
from pystac.layout import LayoutTemplate

from .config import DownloadConfig

logger = logging.getLogger(__name__)

# global dictionary of sessions per bucket
global_s3_client = s3()

SIMULTANEOUS_DOWNLOADS = int(os.getenv("STAC_SIMULTANEOUS_DOWNLOADS", 3))
sem = asyncio.Semaphore(SIMULTANEOUS_DOWNLOADS)


async def download_file(fs: AbstractFileSystem, src: str, dest: str) -> None:
async with sem:
logger.debug(f"{src} start")
if hasattr(fs, "_get_file"):
await fs._get_file(src, dest)
elif hasattr(fs, "get_file"):
fs.get_file(src, dest)
else:
raise NotImplementedError(
"stac-task only supports filesystems providing"
" `get_file` or `_get_file` interface"
)
logger.debug(f"{src} completed")


async def download_item_assets(
item: Item,
assets: Optional[list[str]] = None,
save_item: bool = True,
overwrite: bool = False,
path_template: str = "${collection}/${id}",
absolute_path: bool = False,
keep_original_filenames: bool = False,
**kwargs: Any,
config: Optional[DownloadConfig] = None,
keep_non_downloaded: bool = True,
) -> Item:
_assets = item.assets.keys() if assets is None else assets

# determine path from template and item
layout = LayoutTemplate(path_template)
path = layout.substitute(item)

# make necessary directories
os.makedirs(path, exist_ok=True)
return await stac_asset.download_item(
item=item.clone(),
directory=LayoutTemplate(path_template).substitute(item),
file_name="item.json",
config=config,
keep_non_downloaded=keep_non_downloaded,
)

new_item = item.clone()

tasks = []
for a in _assets:
if a not in item.assets:
continue
href = item.assets[a].href

# local filename
url_path = urlparse(href).path
if keep_original_filenames:
basename = os.path.basename(url_path)
else:
basename = a + os.path.splitext(url_path)[1]
new_href = os.path.join(path, basename)
if absolute_path:
new_href = os.path.abspath(new_href)

# save file
if not os.path.exists(new_href) or overwrite:
fs = fsspec.core.url_to_fs(href, asynchronous=True, **kwargs)[0]
task = asyncio.create_task(download_file(fs, href, new_href))
tasks.append(task)

# update
new_item.assets[a].href = new_href

await asyncio.gather(*tasks)

# save Item metadata alongside saved assets
if save_item:
new_item.remove_links("root")
new_item.save_object(dest_href=os.path.join(path, "item.json"))

return new_item


async def download_items_assets(items: Iterable[Item], **kwargs: Any) -> list[Item]:
tasks = []
for item in items:
tasks.append(asyncio.create_task(download_item_assets(item, **kwargs)))
new_items: list[Item] = await asyncio.gather(*tasks)
return new_items
async def download_items_assets(
items: Iterable[Item],
path_template: str = "${collection}/${id}",
config: Optional[DownloadConfig] = None,
keep_non_downloaded: bool = True,
) -> list[Item]:
return await asyncio.gather(
*[
asyncio.create_task(
download_item_assets(
item=item,
path_template=path_template,
config=config,
keep_non_downloaded=keep_non_downloaded,
)
)
for item in items
]
)


def upload_item_assets_to_s3(
Expand All @@ -107,7 +60,7 @@ def upload_item_assets_to_s3(
s3_urls: bool = False,
headers: Optional[dict[str, Any]] = None,
s3_client: Optional[s3] = None,
**kwargs: Any,
**kwargs: Any, # unused, but retain to permit unused attributes from upload_options
) -> Item:
"""Upload Item assets to an S3 bucket
Args:
Expand Down
8 changes: 8 additions & 0 deletions stactask/config.py
gadomski marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from dataclasses import dataclass

from stac_asset import Config


@dataclass
class DownloadConfig(Config): # type: ignore
pass
Loading