Skip to content

Commit

Permalink
Update geomad for aws (#8)
Browse files Browse the repository at this point in the history
* Redo notebook and processor

* Tweaks after testing

* Ready to test

* Add python libraries we need
  • Loading branch information
alexgleith authored Jun 13, 2024
1 parent 24ef429 commit 7271372
Show file tree
Hide file tree
Showing 7 changed files with 496 additions and 2,319 deletions.
2,021 changes: 17 additions & 2,004 deletions Load_GeoMAD.ipynb

Large diffs are not rendered by default.

181 changes: 55 additions & 126 deletions Test_GeoMAD_Sentinel2.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@
"outputs": [],
"source": [
"from datacube.utils.dask import start_local_dask\n",
"from dep_tools.loaders import Sentinel2OdcLoader\n",
"from dep_tools.searchers import PystacSearcher\n",
"from dep_tools.loaders import OdcLoader\n",
"from dep_tools.namers import LocalPath\n",
"from dep_tools.writers import LocalDsWriter\n",
"from dep_tools.writers import LocalDsCogWriter\n",
"\n",
"from pystac import Item\n",
"from odc.stac import configure_rio\n",
"\n",
"from src.run_task import GeoMADSentinel2Processor, get_grid"
"from dep_tools.grids import PACIFIC_GRID_10\n",
"\n",
"from src.utils import GeoMADAWSSentinel2Processor"
]
},
{
Expand All @@ -32,6 +35,9 @@
"metadata": {},
"outputs": [],
"source": [
"# Set up rasterio\n",
"configure_rio(cloud_defaults=True, aws={\"aws_unsigned\": True})\n",
"\n",
"# Optionally set up a local dask cluster\n",
"client = start_local_dask(threads_per_worker=64, n_workers=1, mem_safety_margin=\"2GB\")\n",
"client.dashboard_link"
Expand All @@ -48,14 +54,33 @@
"# item_id = \"65,22\" # Near the anti-meridian\n",
"# item_id = \"66,22\" # Right of the anti-meridian\n",
"\n",
"item_id = \"63,27\" # Only finds one item\n",
"\n",
"datetime = \"2019\"\n",
"datetime = \"2024\"\n",
"\n",
"# And get the study site\n",
"grid = get_grid()\n",
"cell = grid.loc[[(item_id)]]\n",
"cell.explore()"
"tile_index = tuple(int(i) for i in item_id.split(\",\"))\n",
"geobox = PACIFIC_GRID_10.tile_geobox(tile_index)\n",
"\n",
"# Load low-res if you want it faster\n",
"# geobox = geobox.zoom_out(10)\n",
"\n",
"geobox.explore()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Search for data\n",
"searcher = PystacSearcher(\n",
" catalog=\"https://earth-search.aws.element84.com/v1\",\n",
" collections=[\"sentinel-2-c1-l2a\"],\n",
" datetime=datetime, \n",
")\n",
"\n",
"items = searcher.search(area=geobox)\n",
"print(f\"Found {len(items)} items\")"
]
},
{
Expand All @@ -65,34 +90,15 @@
"outputs": [],
"source": [
"# Set up a data loader\n",
"loader = Sentinel2OdcLoader(\n",
" epsg=3832,\n",
" datetime=datetime,\n",
" dask_chunksize=dict(time=1, x=4096, y=4096),\n",
" odc_load_kwargs=dict(\n",
" fail_on_error=False,\n",
" resolution=100,\n",
" # bands=[\"SCL\", \"B02\", \"B03\", \"B04\", \"B05\", \"B06\", \"B07\", \"B08\", \"B8A\", \"B11\", \"B12\"],\n",
" bands=[\"SCL\", \"B04\", \"B03\", \"B02\"],\n",
" groupby=\"solar_day\",\n",
" stac_cfg={\n",
" \"sentinel-2-l2a\": {\n",
" \"assets\": {\n",
" \"*\": {\n",
" \"nodata\": 0,\n",
" \"data_type\": \"uint16\"\n",
" }\n",
" }\n",
" }\n",
" }\n",
" ),\n",
" nodata_value=0,\n",
" keep_ints=True,\n",
" load_as_dataset=True,\n",
"loader = OdcLoader(\n",
" # bands=[\"SCL\", \"B02\", \"B03\", \"B04\", \"B05\", \"B06\", \"B07\", \"B08\", \"B8A\", \"B11\", \"B12\"],\n",
" bands=[\"cloud\", \"red\", \"blue\", \"green\"],\n",
" chunks=dict(time=1, x=3201, y=3201),\n",
" groupby=\"solar_day\"\n",
")\n",
"\n",
"# Run the load process, which is lazy-loaded\n",
"input_data = loader.load(cell)\n",
"input_data = loader.load(items, areas=geobox)\n",
"input_data"
]
},
Expand All @@ -103,19 +109,19 @@
"outputs": [],
"source": [
"# Set up a data processor\n",
"processor = GeoMADSentinel2Processor(\n",
" scale_and_offset=False,\n",
" harmonize_to_old=True,\n",
"processor = GeoMADAWSSentinel2Processor(\n",
" geomad_options=dict(\n",
" work_chunks=(601, 601),\n",
" work_chunks=(600, 600),\n",
" num_threads=10,\n",
" maxiters=100,\n",
" ),\n",
" filters=[(\"closing\", 5), (\"opening\", 5)],\n",
" keep_ints=True\n",
" mask_cloud_percentage=5,\n",
" min_timesteps=5,\n",
" drop_vars = [\"cloud\"]\n",
")\n",
"\n",
"# Plan the processing. Still lazy-loaded\n",
"# Do the processing\n",
"output_data = processor.process(input_data)\n",
"output_data"
]
Expand All @@ -126,38 +132,7 @@
"metadata": {},
"outputs": [],
"source": [
"output_data.B04.plot.imshow()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"output_data.odc.to_rgba(bands=[\"B04\", \"B03\", \"B02\"], vmin=0, vmax=3000).odc.explore()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import xarray as xr\n",
"\n",
"xr.__version__"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Actually load data and do the processing, so we have it in memory\n",
"loaded = output_data.compute()\n",
"loaded"
"output_data.odc.explore(vmin=1000, vmax=3000)"
]
},
{
Expand All @@ -166,7 +141,7 @@
"metadata": {},
"outputs": [],
"source": [
"loaded[\"count\"].plot.imshow(size=8, robust=True)"
"output_data[[\"red\", \"green\", \"blue\"]].write_cog(\"test_geomad.tif\")"
]
},
{
Expand All @@ -180,20 +155,15 @@
" local_folder=\"data\",\n",
" sensor=\"s2\",\n",
" dataset_id=\"geomad\",\n",
" version=\"0.0.0\",\n",
" version=\"0.1.0\",\n",
" time=datetime,\n",
")\n",
"\n",
"# Set up a writer and write out the files\n",
"writer = LocalDsWriter(\n",
" itempath=dep_path,\n",
" output_nodata=0,\n",
" use_odc_writer=True,\n",
" overwrite=True,\n",
" convert_to_int16=False\n",
")\n",
"print(f\"Writing to: {writer.itempath}\")\n",
"out_files = writer.write(loaded, item_id)"
"writer = LocalDsCogWriter(itempath=dep_path)\n",
"\n",
"# THIS DOESN'T WORK!\n",
"# out_files = writer.write(output_data, item_id)"
]
},
{
Expand All @@ -208,47 +178,6 @@
"item = Item.from_file(stac_path)\n",
"item.validate()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Testing the Azure writer\n",
"\n",
"# from dep_tools.writers import AzureDsWriter\n",
"# from dep_tools.namers import DepItemPath\n",
"\n",
"# itempath = DepItemPath(\"geomad\", \"test\", \"0.0\", datetime)\n",
"\n",
"# writer = AzureDsWriter(\n",
"# itempath=itempath,\n",
"# overwrite=True,\n",
"# convert_to_int16=False,\n",
"# extra_attrs=dict(dep_version=\"0.0\"),\n",
"# )\n",
"\n",
"# writer.write(output_data, \"test\")\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# from odc.stac import load\n",
"# from pystac import Item\n",
"\n",
"# # How to make this dynamic?\n",
"# stac_url = \"https://deppcpublicstorage.blob.core.windows.net/output/dep_geomad_test/0-0/test/2023-01/dep_geomad_test_test_2023-01.stac-item.json\"\n",
"\n",
"# item = Item.from_file()\n",
"\n",
"# data = load([item], chunks={})\n",
"# data"
]
}
],
"metadata": {
Expand All @@ -267,7 +196,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.6"
"version": "3.10.14"
}
},
"nbformat": 4,
Expand Down
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
typer
git+https://github.com/digitalearthpacific/dep-tools.git@fa12330
git+https://github.com/opendatacube/odc-algo.git@57316a8
git+https://github.com/digitalearthpacific/dep-tools.git@add-s3-writer
https://github.com/auspatious/datacube-compute/releases/download/0.0.3/datacube_compute-0.0.3-cp310-cp310-linux_x86_64.whl

--no-binary gdal
--no-binary rasterio
46 changes: 27 additions & 19 deletions src/print_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,57 +4,65 @@
from typing import Annotated, Optional

import typer
from dep_tools.aws import object_exists
from dep_tools.azure import blob_exists
from dep_tools.namers import DepItemPath

from run_task import get_grid
from utils import get_grid


def main(
regions: Annotated[str, typer.Option()],
datetime: Annotated[str, typer.Option()],
years: Annotated[str, typer.Option()],
version: Annotated[str, typer.Option()],
regions: Optional[str] = "ALL",
limit: Optional[str] = None,
base_product: str = "ls",
dataset_id: str = "geomad",
output_bucket: Optional[str] = None,
output_prefix: Optional[str] = None,
overwrite: Annotated[bool, typer.Option()] = False,
) -> None:
grid = get_grid()
tiles = get_grid()

region_codes = None if regions.upper() == "ALL" else regions.split(",")

if limit is not None:
limit = int(limit)

# Makes a list no matter what
years = datetime.split("-")
years = years.split("-")
if len(years) == 2:
years = range(int(years[0]), int(years[1]) + 1)
elif len(years) > 2:
ValueError(f"{datetime} is not a valid value for --datetime")
ValueError(f"{years} is not a valid value for --years")

# Filter by country codes if we have them
if region_codes is not None:
grid = grid.loc[grid.country_code.isin(region_codes)]
tiles = tiles.loc[tiles.country_code.isin(region_codes)]

tasks = [
{
"base-product": base_product,
"region-code": region[0][0],
"datetime": region[1],
}
for region in product(grid.index, years)
{"tile-id": tile, "year": year, "version": version}
for tile, year in product(list(tiles.tile_id), years)
]

# If we don't want to overwrite, then we should only run tasks that don't already exist
# i.e., they failed in the past or they're missing for some other reason
itempath = DepItemPath(
base_product, dataset_id, version, datetime, zero_pad_numbers=True
)
if not overwrite:
valid_tasks = []
for task in tasks:
stac_path = itempath.stac_path(task["region-code"])
if not blob_exists(stac_path):
itempath = DepItemPath(
base_product, "geomad", version, task["year"], zero_pad_numbers=True
)
stac_path = itempath.stac_path(task["tile-id"])

if output_prefix is not None:
stac_path = f"{output_prefix}/{stac_path}"

exists = False
if output_bucket is not None:
exists = object_exists(output_bucket, stac_path)
else:
exists = blob_exists(stac_path)
if not exists:
valid_tasks.append(task)
if len(valid_tasks) == limit:
break
Expand Down
Loading

0 comments on commit 7271372

Please sign in to comment.