diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..2198efb --- /dev/null +++ b/Makefile @@ -0,0 +1,16 @@ +build: + docker build --tag dep/s1 . + +run: + docker run -it --rm \ + -e PC_SDK_SUBSCRIPTION_KEY=${PC_SDK_SUBSCRIPTION_KEY} \ + -e AZURE_STORAGE_ACCOUNT="deppcpublicstorage" \ + -e AZURE_STORAGE_SAS_TOKEN=${AZURE_STORAGE_SAS_TOKEN} \ + dep/s1 \ + python src/run_task.py \ + --region-code "64,20" \ + --datetime 2023 \ + --version "0.0.0t0" \ + --resolution 100 \ + --output-bucket files.auspatious.com \ + --overwrite diff --git a/Test_S1_Mosaic.ipynb b/Test_S1_Mosaic.ipynb index 91aa445..5c8e34e 100644 --- a/Test_S1_Mosaic.ipynb +++ b/Test_S1_Mosaic.ipynb @@ -8,12 +8,14 @@ "source": [ "from datacube.utils.dask import start_local_dask\n", "from dep_tools.namers import LocalPath\n", - "from dep_tools.writers import LocalDsWriter\n", + "from dep_tools.writers import LocalDsCogWriter\n", "\n", - "from src.run_task import get_grid, S1Processor, Sentinel1Loader\n", + "from src.run_task import get_grid, S1Processor\n", "\n", - "from pystac import Item\n", - "from odc.stac import load" + "from dep_tools.searchers import PystacSearcher\n", + "from dep_tools.loaders import OdcLoader\n", + "\n", + "from planetary_computer import sign_url" ] }, { @@ -52,8 +54,8 @@ "\n", "# And get the study site\n", "grid = get_grid()\n", - "cell = grid.loc[[(region_code)]]\n", - "cell.explore()" + "area = grid.loc[[(region_code)]]\n", + "area.explore()" ] }, { @@ -62,23 +64,17 @@ "metadata": {}, "outputs": [], "source": [ - "# Set up a data loader\n", - "loader = Sentinel1Loader(\n", - " epsg=3832,\n", + "# Find some items\n", + "searcher = PystacSearcher(\n", + " catalog=\"https://planetarycomputer.microsoft.com/api/stac/v1/\",\n", + " collections=[\"sentinel-1-rtc\"],\n", " datetime=datetime,\n", - " dask_chunksize=dict(time=1, x=4096, y=4096),\n", - " odc_load_kwargs=dict(\n", - " fail_on_error=False,\n", - " resolution=100,\n", - " bands=[\"vv\", \"vh\"],\n", - " groupby=\"solar_day\",\n", - " ),\n", - " load_as_dataset=True,\n", + " query = {\"sat:orbit_state\": {\"eq\": \"descending\"}}\n", ")\n", "\n", - "# Run the load process, which is lazy-loaded\n", - "input_data = loader.load(cell)\n", - "input_data" + "items = searcher.search(area)\n", + "\n", + "print(f\"Found {len(items)} items\")" ] }, { @@ -87,12 +83,20 @@ "metadata": {}, "outputs": [], "source": [ - "# Set up a data processor\n", - "processor = S1Processor()\n", + "# Set up a data loader\n", + "loader = OdcLoader(\n", + " crs=3832,\n", + " resolution=100,\n", + " bands=[\"vv\", \"vh\"],\n", + " groupby=\"solar_day\",\n", + " chunks=dict(time=1, x=4096, y=4096),\n", + " fail_on_error=False,\n", + " patch_url=sign_url\n", + ")\n", "\n", - "# Plan the processing.\n", - "output_data = processor.process(input_data)\n", - "output_data" + "# Run the load process, which is lazy-loaded\n", + "input_data = loader.load(items, area)\n", + "input_data" ] }, { @@ -101,7 +105,12 @@ "metadata": {}, "outputs": [], "source": [ - "output_data[\"count\"].plot.imshow(size=10)" + "# Set up a data processor\n", + "processor = S1Processor()\n", + "\n", + "# Plan the processing.\n", + "output_data = processor.process(input_data)\n", + "output_data" ] }, { @@ -110,7 +119,7 @@ "metadata": {}, "outputs": [], "source": [ - "output_data[[\"median_vv\", \"median_vh\", \"median_vv_vh\"]].to_array().plot.imshow(size=10, vmin=0, vmax=0.1)" + "output_data[\"count\"].plot.imshow(size=10)" ] }, { @@ -119,7 +128,8 @@ "metadata": {}, "outputs": [], "source": [ - "output_data[[\"mean_vv\", \"mean_vh\", \"mean_vv_vh\"]].to_array().plot.imshow(size=10, vmin=0, vmax=0.2)" + "output_data[\"mean_vv_vh\"] = (output_data[\"mean_vv\"] / output_data[\"mean_vh\"])\n", + "output_data[[\"mean_vv\", \"mean_vh\", \"mean_vv_vh\"]].to_array().plot.imshow(size=10, vmin=0, vmax=0.1)" ] }, { @@ -151,13 +161,22 @@ "metadata": {}, "outputs": [], "source": [ - "from odc.stac import load\n", - "from pystac import Item\n", + "# Testing the AWS writer\n", "\n", - "item = Item.from_file(\"https://deppcpublicstorage.blob.core.windows.net/output/dep_geomad_test/0-0/test/2023-01/dep_geomad_test_test_2023-01.stac-item.json\")\n", + "from dep_tools.writers import AwsDsCogWriter\n", + "from dep_tools.namers import DepItemPath\n", "\n", - "data = load([item], chunks={})\n", - "data" + "itempath = DepItemPath(\"geomad\", \"test\", \"0.0\", datetime)\n", + "\n", + "writer = AwsDsCogWriter(\n", + " itempath=itempath,\n", + " overwrite=False,\n", + " convert_to_int16=False,\n", + " extra_attrs=dict(dep_version=\"0.0\"),\n", + " bucket=\"files.auspatious.com\"\n", + ")\n", + "\n", + "writer.write(output_data, \"test\")" ] }, { @@ -166,7 +185,13 @@ "metadata": {}, "outputs": [], "source": [ - "data.red.isel(time=0).plot.imshow(size=8, robust=True)" + "from odc.stac import load\n", + "from pystac import Item\n", + "\n", + "item = Item.from_file(\"https://deppcpublicstorage.blob.core.windows.net/output/dep_geomad_test/0-0/test/2023-01/dep_geomad_test_test_2023-01.stac-item.json\")\n", + "\n", + "data = load([item], chunks={})\n", + "data" ] }, { @@ -216,16 +241,8 @@ "metadata": {}, "outputs": [], "source": [ - "d = load([item])" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "d[\"count\"].plot()" + "from pystac import Item\n", + "item = Item.from_file(\"https://s3.ap-southeast-2.amazonaws.com/files.auspatious.com/dep_geomad_test/0-0/test/2023/dep_geomad_test_test_2023.stac-item.json\")" ] }, { diff --git a/requirements.txt b/requirements.txt index 457b625..7f558ee 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,7 @@ +boto3 typer -git+https://github.com/digitalearthpacific/dep-tools.git +git+https://github.com/digitalearthpacific/dep-tools.git@add-s3-writer + --no-binary gdal --no-binary rasterio diff --git a/src/run_task.py b/src/run_task.py index 7785907..fe95000 100644 --- a/src/run_task.py +++ b/src/run_task.py @@ -1,38 +1,25 @@ -from logging import INFO, Formatter, Logger, StreamHandler, getLogger - import geopandas as gpd import typer from dask.distributed import Client from dep_tools.azure import blob_exists from dep_tools.exceptions import EmptyCollectionError -from dep_tools.loaders import OdcLoaderMixin, StackXrLoader +from dep_tools.loaders import OdcLoader + from dep_tools.namers import DepItemPath from dep_tools.processors import Processor +from dep_tools.searchers import PystacSearcher from dep_tools.stac_utils import set_stac_properties +from dep_tools.utils import get_logger + +import boto3 # from dep_tools.task import SimpleLoggingAreaTask -from dep_tools.task import SimpleLoggingAreaTask -from dep_tools.utils import search_across_180 -from dep_tools.writers import AzureDsWriter +from dep_tools.writers import AzureDsWriter, AwsDsCogWriter +from planetary_computer import sign_url from typing_extensions import Annotated from xarray import DataArray, Dataset, merge - -def get_logger(region_code: str) -> Logger: - """Set up a simple logger""" - console = StreamHandler() - time_format = "%Y-%m-%d %H:%M:%S" - console.setFormatter( - Formatter( - fmt=f"%(asctime)s %(levelname)s ({region_code}): %(message)s", - datefmt=time_format, - ) - ) - - log = getLogger("S1M") - log.addHandler(console) - log.setLevel(INFO) - return log +from dep_tools.aws import object_exists def get_grid() -> gpd.GeoDataFrame: @@ -45,47 +32,18 @@ def get_grid() -> gpd.GeoDataFrame: ) -class Sentinel1LoaderMixin(object): - def __init__( - self, - only_search_descending: bool = True, - **kwargs, - ): - super().__init__(**kwargs) - self.only_search_descending = only_search_descending - - def _get_items(self, area): - query = {} - if self.only_search_descending: - query["sat:orbit_state"] = {"eq": "descending"} - # Do the search - item_collection = search_across_180( - area, collections=["sentinel-1-rtc"], datetime=self.datetime, query=query - ) - - if len(item_collection) == 0: - raise EmptyCollectionError() - - return item_collection - - -class Sentinel1Loader(Sentinel1LoaderMixin, OdcLoaderMixin, StackXrLoader): - def __init__(self, **kwargs): - super().__init__(**kwargs) - - class S1Processor(Processor): def __init__( self, send_area_to_processor: bool = False, - load_data_before_writing: bool = True, + load_data: bool = False, drop_vars: list[str] = [], ) -> None: super().__init__( send_area_to_processor, ) self.drop_vars = drop_vars - self.load_data_before_writing = load_data_before_writing + self.load_data = load_data def process(self, input_data: DataArray) -> Dataset: arrays = [] @@ -109,7 +67,7 @@ def process(self, input_data: DataArray) -> Dataset: output = set_stac_properties(input_data, data) - if self.load_data_before_writing: + if self.load_data: output = output.compute() return output @@ -120,6 +78,8 @@ def main( datetime: Annotated[str, typer.Option()], version: Annotated[str, typer.Option()], dataset_id: str = "mosaic", + output_bucket: str = None, + output_resolution: int = 10, memory_limit_per_worker: str = "50GB", n_workers: int = 2, threads_per_worker: int = 32, @@ -130,8 +90,8 @@ def main( grid = get_grid() area = grid.loc[[region_code]] - log = get_logger(region_code) - log.info(f"Starting processing for {region_code}") + log = get_logger(region_code, "Sentinel-1-Mosaic") + log.info(f"Starting processing version {version} for {datetime}") itempath = DepItemPath( base_product, dataset_id, version, datetime, zero_pad_numbers=True @@ -139,45 +99,66 @@ def main( stac_document = itempath.stac_path(region_code) # If we don't want to overwrite, and the destination file already exists, skip it - if not overwrite and blob_exists(stac_document): - log.info(f"Item already exists at {stac_document}") - # This is an exit with success - raise typer.Exit() - - loader = Sentinel1Loader( - epsg=3832, + if not overwrite: + already_done = False + if output_bucket is None: + # The Azure case + already_done = blob_exists(stac_document) + else: + # The AWS case + already_done = object_exists(output_bucket, stac_document) + + if already_done: + log.info(f"Item already exists at {stac_document}") + # This is an exit with success + raise typer.Exit() + + # A searcher to find the data + searcher = PystacSearcher( + catalog="https://planetarycomputer.microsoft.com/api/stac/v1/", + collections=["sentinel-1-rtc"], datetime=datetime, - dask_chunksize=dict(time=1, x=xy_chunk_size, y=xy_chunk_size), - load_as_dataset=True, - odc_load_kwargs=dict( - fail_on_error=False, - resolution=10, - groupby="solar_day", - bands=["vv", "vh"], - ), - nodata_value=-32768, + query={"sat:orbit_state": {"eq": "descending"}}, ) - log.info("Configuring processor") - processor = S1Processor() - - log.info("Configuring writer") - writer = AzureDsWriter( - itempath=itempath, + # A loader to load them + loader = OdcLoader( + crs=3832, + resolution=output_resolution, + bands=["vv", "vh"], + groupby="solar_day", + chunks=dict(time=1, x=xy_chunk_size, y=xy_chunk_size), + fail_on_error=False, + patch_url=sign_url, overwrite=overwrite, - convert_to_int16=False, - extra_attrs=dict(dep_version=version), - write_multithreaded=True, ) - runner = SimpleLoggingAreaTask( - id=region_code, - area=area, - loader=loader, - processor=processor, - writer=writer, - logger=log, - ) + # A processor to process them + processor = S1Processor() + + # And a writer to bind them + if output_bucket is None: + log.info("Writing with Azure writer") + writer = AzureDsWriter( + itempath=itempath, + overwrite=overwrite, + convert_to_int16=False, + extra_attrs=dict(dep_version=version), + write_multithreaded=True, + load_before_write=True, + ) + else: + log.info("Writing with AWS writer") + client = boto3.client("s3") + writer = AwsDsCogWriter( + itempath=itempath, + overwrite=overwrite, + convert_to_int16=False, + extra_attrs=dict(dep_version=version), + write_multithreaded=True, + bucket="files.auspatious.com", + client=client, + ) with Client( n_workers=n_workers, @@ -185,8 +166,24 @@ def main( memory_limit=memory_limit_per_worker, ): try: - paths = runner.run() - log.info(f"Completed writing to {paths[-1]}") + # Run the task + items = searcher.search(area) + log.info(f"Found {len(items)} items") + + data = loader.load(items, area) + log.info(f"Found {len(data.time)} timesteps to load") + + output_data = processor.process(data) + log.info( + f"Processed data to shape {[output_data.sizes[d] for d in ['x', 'y']]}" + ) + + paths = writer.write(output_data, region_code) + if paths is not None: + log.info(f"Completed writing to {paths[-1]}") + else: + log.warning("No paths returned from writer") + except EmptyCollectionError: log.warning("No data found for this tile.") except Exception as e: