Skip to content

Commit

Permalink
First commit
Browse files Browse the repository at this point in the history
  • Loading branch information
hectormachin committed Aug 12, 2023
1 parent 9bf950c commit 43dde49
Show file tree
Hide file tree
Showing 8 changed files with 362 additions and 0 deletions.
11 changes: 11 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
FROM python:3.9-alpine

WORKDIR /task

COPY requirements.txt requirements.txt

RUN pip install --no-cache-dir -r requirements.txt

COPY task.py .

CMD ["python3", "./task.py", "run"]
29 changes: 29 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
## Instructions

This task copies specified Assets from Source STAC Item(s), uploads them to S3 and updates Item assets hrefs to point to the new location.

In order to run this task within Argo Workflows, follow the below instructions.

1. `cd` into this directory.

2. Create an image from the provided Dockerfile. If you are using Rancher Desktop to run your K8s cluster, you need to use `nerdctl` to build the image.

`nerdctl build --namespace k8s.io -t copyassets .`

This will create an image with the name & tag of `copyassets:latest`.

3. Make sure Argo Workflows is installed on the K8s cluster (see instructions [here](https://argoproj.github.io/argo-workflows/quick-start/)).

4. Upload the `payload_workflow.json` file to object storage, such as S3. Change the `path_template` variable in `upload_options` to match a path where you want to save the output Item assets of this task. For example, if you want to save the output Item assets inside the `output` folder of a bucket named `copy_results` and templated by the Item's collection and id, the `path_template` would be `s3://copy_results/output/${collection}/${id}/`.

5. Make the bucket publically accessible and get the object URL associated with the uploaded payload in step 4.

6. Create a secret named `my-s3-credentials` that contains your AWS credentials. The secret must have the keys `access-key-id`, `secret-access-key`, and `session-token` for authenticating to AWS.

6. Run the Argo workflow in the same namespace where the Argo Workflow Controller is installed using:

`argo submit -n <NAMESPACE>--watch <FULL PATH TO WORKFLOW YAML FILE>`

substituting the appropriate values where needed.

You can either run the `workflow_copyassets_with_template.yaml` file or the `workflow_copyassets_no_template.yaml` file. If you run the `workflow_copyassets_with_template.yaml` file, you need to first have the Workflow Template installed. You can do this with `kubectl apply -n <NAMESPACE> -f <FULL PATH TO THE workflow-template.yaml file>` where `<NAMESPACE>` is the namespace where the Argo Workflow Controller is installed and the path is the full path to the workflow-template.yaml file.
155 changes: 155 additions & 0 deletions payload_workflow.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
{
"id": "test",
"type": "FeatureCollection",
"features": [
{
"id": "tx_m_2609719_se_14_060_20201217",
"bbox": [
-97.690252,
26.622563,
-97.622203,
26.689923
],
"type": "Feature",
"links": [
{
"rel": "collection",
"type": "application/json",
"href": "https://planetarycomputer.microsoft.com/api/stac/v1/collections/naip"
},
{
"rel": "parent",
"type": "application/json",
"href": "https://planetarycomputer.microsoft.com/api/stac/v1/collections/naip"
},
{
"rel": "root",
"type": "application/json",
"href": "https://planetarycomputer.microsoft.com/api/stac/v1/"
},
{
"rel": "self",
"type": "application/geo+json",
"href": "https://planetarycomputer.microsoft.com/api/stac/v1/collections/naip/items/tx_m_2609719_se_14_060_20201217"
},
{
"rel": "preview",
"href": "https://planetarycomputer.microsoft.com/api/data/v1/item/map?collection=naip&item=tx_m_2609719_se_14_060_20201217",
"title": "Map of item",
"type": "text/html"
}
],
"assets": {
"image": {
"href": "https://naipeuwest.blob.core.windows.net/naip/v002/tx/2020/tx_060cm_2020/26097/m_2609719_se_14_060_20201217.tif",
"type": "image/tiff; application=geotiff; profile=cloud-optimized",
"roles": [
"data"
],
"title": "RGBIR COG tile",
"eo:bands": [
{
"name": "Red",
"common_name": "red"
},
{
"name": "Green",
"common_name": "green"
},
{
"name": "Blue",
"common_name": "blue"
},
{
"name": "NIR",
"common_name": "nir",
"description": "near-infrared"
}
]
},
"thumbnail": {
"href": "https://naipeuwest.blob.core.windows.net/naip/v002/tx/2020/tx_060cm_2020/26097/m_2609719_se_14_060_20201217.200.jpg",
"type": "image/jpeg",
"roles": [
"thumbnail"
],
"title": "Thumbnail"
}
},
"geometry": {
"type": "Polygon",
"coordinates": [
[
[
-97.623004,
26.622563
],
[
-97.622203,
26.689286
],
[
-97.68949,
26.689923
],
[
-97.690252,
26.623198
],
[
-97.623004,
26.622563
]
]
]
},
"collection": "naip",
"properties": {
"gsd": 0.6,
"datetime": "2020-12-17T00:00:00Z",
"naip:year": "2020",
"proj:bbox": [
630384,
2945370,
637080,
2952762
],
"proj:epsg": 26914,
"naip:state": "tx",
"proj:shape": [
12320,
11160
],
"proj:transform": [
0.6,
0,
630384,
0,
-0.6,
2952762,
0,
0,
1
]
},
"stac_extensions": [
"https://stac-extensions.github.io/eo/v1.0.0/schema.json",
"https://stac-extensions.github.io/projection/v1.0.0/schema.json"
],
"stac_version": "1.0.0"
}],
"process": {
"workflow": "copy-assets",
"upload_options": {
"path_template": "s3://payloadtest/data/${collection}/${id}/",
"public_assets": [],
"s3_urls": false
},
"tasks": {
"copy-assets": {
"assets": ["thumbnail"],
"drop_assets": ["image"]
}
}
}
}
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
stactask==0.1.0
stac-validator
89 changes: 89 additions & 0 deletions task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
#!/usr/bin/env python
import logging
from typing import Any, Dict, List
from boto3utils import s3
from pystac import Item
from stac_validator import stac_validator
from stactask import Task
from stactask.exceptions import InvalidInput


s3_client = s3(requester_pays=False)


class CopyAssets(Task):
name = "copy-assets"
description = "Copies specified Assets from Source STAC Item(s) and copies to s3 and updates Item Assets to point to new location."
version = "0.1.0"

@classmethod
def validate(cls, payload: dict[str, Any]) -> bool:
if "assets" not in payload["process"]["tasks"][cls.name]:
raise InvalidInput("assets that need to be copied required to be specified")
return True

def process(
self, assets: List[str], drop_assets: List[str]
) -> List[Dict[str, Any]]:
# process method overrides Task
created_items = []

payload = self._payload

try:
item = self.items[0]

item_dict = item.to_dict()

# configuration options
config = payload.get('process', {}).get('tasks', {}).get('copy-assets', {})

assets = config.get('assets', item_dict['assets'].keys())
drop_assets = config.get('drop_assets', [])


# drop specified assets
for asset in [a for a in drop_assets if a in list(item_dict['assets'].keys())]:
logging.debug(f'Dropping asset {asset}')
item_dict['assets'].pop(asset)
if type(assets) is str and assets == 'ALL':
assets = item_dict['assets'].keys()

item_mod = item.from_dict(item_dict)

try:
# copy specified assets
_assets = [a for a in assets if a in item_dict['assets'].keys()]

for asset in _assets:
item = self.download_item_assets(item_mod, assets=[asset])
item = self.upload_item_assets_to_s3(item, assets=[asset])

# replace item in payload
payload['features'][0] = item.to_dict()
except Exception as err:
msg = f"copy-assets: failed processing {payload['id']} ({err})"
logging.error(msg, exc_info=True)
raise Exception(msg) from err

except Exception as err:
self.logger.error(err)
raise Exception(f"Unable to copy assets: {err}")

stac = stac_validator.StacValidate()
valid = stac.validate_dict(item.to_dict())

if valid:
created_items.append(item.to_dict())
return created_items
else:
raise Exception(
f"STAC Item validation failed. Error: {stac.message[0]['error_message']}."
)

def handler(event: dict[str, Any], context: dict[str, Any] = {}) -> Task:
return CopyAssets.handler(event)


if __name__ == "__main__":
CopyAssets.cli()
24 changes: 24 additions & 0 deletions workflow-template.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
name: copy-assets-template
spec:
serviceAccountName: argo-workflow
templates:
- name: copy-task
container:
env:
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: minio-secret-credentials
key: access_key_id
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: minio-secret-credentials
key: secret_access_key
image: quay.io/element84/copy-assets-stac-task:latest
imagePullPolicy: IfNotPresent
command: ["python3", "./task.py", "run"]
args: ["{{workflow.parameters.input}}"]
33 changes: 33 additions & 0 deletions workflow_copyassets_no_template.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: copy
spec:
entrypoint: copy-assets
serviceAccountName: argo # set this so the proper permissions are assigned
arguments:
parameters:
- name: input
value: https://payloadtest.s3.us-west-2.amazonaws.com/payload_workflow.json

templates:
- name: copy-assets
inputs:
parameters:
- name: input # parameter declaration
container:
env:
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: minio-secret-credentials
key: access_key_id
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: minio-secret-credentials
key: secret_access_key
image: quay.io/element84/copy-assets-stac-task:latest
imagePullPolicy: IfNotPresent
command: ["python3", "./task.py", "run"]
args: ["{{inputs.parameters.input}}"]
19 changes: 19 additions & 0 deletions workflow_copyassets_with_template.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: copy-assets-workflow
spec:
entrypoint: copy-assets
serviceAccountName: argo # set this so the proper permissions are assigned
arguments:
parameters:
- name: input
value: https://payloadtest.s3.us-west-2.amazonaws.com/payload_workflow.json

templates:
- name: copy-assets
steps:
- - name: copy-assets
templateRef:
name: copy-assets-template
template: copy-task

0 comments on commit 43dde49

Please sign in to comment.