Skip to content

Commit

Permalink
add shared stac-populator CLI for all populator impl + add directory …
Browse files Browse the repository at this point in the history
…crawler populator
  • Loading branch information
fmigneault-crim committed Nov 10, 2023
1 parent d22214a commit cf75217
Show file tree
Hide file tree
Showing 7 changed files with 256 additions and 25 deletions.
104 changes: 104 additions & 0 deletions STACpopulator/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import argparse
import glob
import importlib
import os
import sys
from typing import Callable, Optional

from STACpopulator import __version__

POPULATORS = {}


def make_main_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(prog="stac-populator", description="STACpopulator operations.")
parser.add_argument("--version", "-V", action="version", version=f"%(prog)s {__version__}",
help="prints the version of the library and exits")
commands = parser.add_subparsers(title="command", dest="command", description="STAC populator command to execute.")

run_cmd_parser = make_run_command_parser()
commands.add_parser(
"run",
prog=f"{parser.prog} {run_cmd_parser.prog}", parents=[run_cmd_parser],
formatter_class=run_cmd_parser.formatter_class, usage=run_cmd_parser.usage,
add_help=False, help=run_cmd_parser.description, description=run_cmd_parser.description
)

# add more commands as needed...

return parser


def make_run_command_parser() -> argparse.ArgumentParser:
"""
Groups all sub-populator CLI listed in :py:mod:`STACpopulator.implementations` as a common ``stac-populator`` CLI.
Dispatches the provided arguments to the appropriate sub-populator CLI as requested. Each sub-populator CLI must
implement functions ``make_parser`` and ``main`` to generate the arguments and dispatch them to the corresponding
caller. The ``main`` function should accept a sequence of string arguments, which can be passed to the parser
obtained from ``make_parser``.
An optional ``runner`` can also be defined in each populator module. If provided, the namespace arguments that have
already been parsed to resolve the populator to run will be used directly, avoiding parsing arguments twice.
"""
parser = argparse.ArgumentParser(prog="command", description="STACpopulator implementation runner.")
subparsers = parser.add_subparsers(title="populator", dest="populator", description="Implementation to run.")
populators_impl = "implementations"
populators_dir = os.path.join(os.path.dirname(__file__), populators_impl)
populator_mods = glob.glob(f"{populators_dir}/**/[!__init__]*.py", recursive=True) # potential candidate scripts
for populator_path in sorted(populator_mods):
populator_script = populator_path.split(populators_dir, 1)[1][1:]
populator_py_mod = os.path.splitext(populator_script)[0].replace(os.sep, ".")
populator_name, pop_mod_file = populator_py_mod.rsplit(".", 1)
populator_root = f"STACpopulator.{populators_impl}.{populator_name}"
pop_mod_file_loc = f"{populator_root}.{pop_mod_file}"
populator_module = importlib.import_module(pop_mod_file_loc, populator_root)
parser_maker: Callable[[], argparse.ArgumentParser] = getattr(populator_module, "make_parser", None)
populator_runner = getattr(populator_module, "runner", None) # optional, call main directly if not available
populator_caller = getattr(populator_module, "main", None)
if callable(parser_maker) and callable(populator_caller):
populator_parser = parser_maker()
populator_prog = f"{parser.prog} {populator_name}"
subparsers.add_parser(
populator_name,
prog=populator_prog, parents=[populator_parser], formatter_class=populator_parser.formatter_class,
add_help=False, # add help disabled otherwise conflicts with this main populator help
help=populator_parser.description, description=populator_parser.description,
usage=populator_parser.usage,
)
POPULATORS[populator_name] = {
"name": populator_name,
"caller": populator_caller,
"parser": populator_parser,
"runner": populator_runner,
}
return parser


def main(*args: str) -> Optional[int]:
parser = make_main_parser()
args = args or sys.argv[1:] # same as was parse args does, but we must provide them to subparser
ns = parser.parse_args(args=args) # if 'command' or 'populator' unknown, auto prints the help message with exit(2)
params = vars(ns)
populator_cmd = params.pop("command")
if not populator_cmd:
parser.print_help()
return 0
result = None
if populator_cmd == "run":
populator_name = params.pop("populator")
if not populator_name:
parser.print_help()
return 0
populator_args = args[2:] # skip [command] [populator]
populator_caller = POPULATORS[populator_name]["caller"]
populator_runner = POPULATORS[populator_name]["runner"]
if populator_runner:
result = populator_runner(ns)
else:
result = populator_caller(*populator_args)
return 0 if result is None else result


if __name__ == "__main__":
sys.exit(main())
45 changes: 23 additions & 22 deletions STACpopulator/implementations/CMIP6_UofT/add_CMIP6.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,18 @@
import json
import logging
from datetime import datetime
from typing import Any, List, Literal, MutableMapping, Optional
from typing import Any, List, Literal, MutableMapping, NoReturn, Optional

import pydantic_core
import pyessv
from colorlog import ColoredFormatter
from pydantic import AnyHttpUrl, ConfigDict, Field, FieldValidationInfo, field_validator
from pystac.extensions.datacube import DatacubeExtension

from STACpopulator.implementations.CMIP6_UofT.extensions import DataCubeHelper
from STACpopulator.input import GenericLoader, ErrorLoader, THREDDSLoader
from STACpopulator.models import GeoJSONPolygon, STACItemProperties
from STACpopulator.populator_base import STACpopulatorBase
from STACpopulator.stac_utils import STAC_item_from_metadata, collection2literal

LOGGER = logging.getLogger(__name__)
LOGFORMAT = " %(log_color)s%(levelname)s:%(reset)s %(blue)s[%(name)-30s]%(reset)s %(message)s"
formatter = ColoredFormatter(LOGFORMAT)
stream = logging.StreamHandler()
stream.setFormatter(formatter)
LOGGER.addHandler(stream)
LOGGER.setLevel(logging.INFO)
LOGGER.propagate = False
from STACpopulator.stac_utils import LOGGER, STAC_item_from_metadata, collection2literal

# CMIP6 controlled vocabulary (CV)
CV = pyessv.WCRP.CMIP6
Expand Down Expand Up @@ -169,23 +159,34 @@ def create_stac_item(self, item_name: str, item_data: MutableMapping[str, Any])
return json.loads(json.dumps(item.to_dict()))


if __name__ == "__main__":
parser = argparse.ArgumentParser(prog="CMIP6 STAC populator")
def make_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="CMIP6 STAC populator")
parser.add_argument("stac_host", type=str, help="STAC API address")
parser.add_argument("thredds_catalog_URL", type=str, help="URL to the CMIP6 THREDDS catalog")
parser.add_argument("--update", action="store_true", help="Update collection and its items")
parser.add_argument("--mode", choices=["full", "single"],
help="Operation mode, processing the full dataset or only the single reference.")
return parser

args = parser.parse_args()

LOGGER.info(f"Arguments to call: {args}")
def runner(ns: argparse.Namespace) -> Optional[int] | NoReturn:
LOGGER.info(f"Arguments to call: {vars(ns)}")

mode = "full"

if mode == "full":
data_loader = THREDDSLoader(args.thredds_catalog_URL)
if ns.mode == "full":
data_loader = THREDDSLoader(ns.thredds_catalog_URL)
else:
# To be implemented
data_loader = ErrorLoader(args.error_file)
data_loader = ErrorLoader()

c = CMIP6populator(args.stac_host, data_loader, args.update)
c = CMIP6populator(ns.stac_host, data_loader, ns.update)
c.ingest()


def main(*args: str) -> Optional[int]:
parser = make_parser()
ns = parser.parse_args(args)
return runner(ns)


if __name__ == "__main__":
main()
Empty file.
62 changes: 62 additions & 0 deletions STACpopulator/implementations/DirectoryLoader/crawl_directory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import argparse
from typing import NoReturn, Optional, MutableMapping, Any

from STACpopulator.input import STACDirectoryLoader
from STACpopulator.models import GeoJSONPolygon, STACItemProperties
from STACpopulator.populator_base import STACpopulatorBase
from STACpopulator.stac_utils import LOGGER


class DirectoryPopulator(STACpopulatorBase):
item_properties_model = STACItemProperties
item_geometry_model = GeoJSONPolygon

def __init__(
self,
stac_host: str,
loader: STACDirectoryLoader,
update: bool,
collection: MutableMapping[str, Any],
) -> None:
self._collection_info = collection
super().__init__(stac_host, loader, update)

def load_config(self):
pass # ignore

def create_stac_collection(self) -> MutableMapping[str, Any]:
return self._collection_info

def create_stac_item(self, item_name: str, item_data: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
return item_data


def make_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Directory STAC populator")
parser.add_argument("stac_host", type=str, help="STAC API URL.")
parser.add_argument("directory", type=str, help="Path to a directory structure with STAC Collections and Items.")
parser.add_argument("--update", action="store_true", help="Update collection and its items.")
parser.add_argument(
"--prune", action="store_true",
help="Limit search of STAC Collections only to first top-most matches in the crawled directory structure."
)
return parser


def runner(ns: argparse.Namespace) -> Optional[int] | NoReturn:
LOGGER.info(f"Arguments to call: {vars(ns)}")

for collection_path, collection_json in STACDirectoryLoader(ns.directory, "collection", ns.prune):
loader = STACDirectoryLoader(collection_path, "item", False)
populator = DirectoryPopulator(ns.stac_host, loader, ns.update, collection_json)
populator.ingest()


def main(*args: str) -> Optional[int]:
parser = make_parser()
ns = parser.parse_args(args)
return runner(ns)


if __name__ == "__main__":
main()
63 changes: 62 additions & 1 deletion STACpopulator/input.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import json
import logging
import os
from abc import ABC, abstractmethod
from typing import Any, Iterator, MutableMapping, Optional, Tuple
from typing import Any, Iterator, Literal, MutableMapping, Optional, Tuple, Union

import pystac
import requests
Expand Down Expand Up @@ -131,6 +133,65 @@ def extract_metadata(self, ds: siphon.catalog.Dataset) -> MutableMapping[str, An
return attrs


class STACDirectoryLoader(GenericLoader):
"""
Iterates through a directory structure looking for STAC Collections or Items.
For each directory that gets crawled, if a file is named ``collection.json``, it assumed to be a STAC Collection.
All other ``.json`` files under the directory where ``collection.json`` was found are assumed to be STAC Items.
These JSON STAC Items can be either at the same directory level as the STAC Collection, or under nested directories.
Using the mode option, yielded results will be either the STAC Collections or the STAC Items.
This allows this class to be used in conjunction (2 nested loops) to find collections and their underlying items.
.. code-block:: python
for collection_path, collection_json in STACDirectoryLoader(dir_path, mode="collection"):
for item_path, item_json in STACDirectoryLoader(collection_path, mode="item"):
... # do stuff
For convenience, option ``prune`` can be used to stop crawling deeper once a STAC Collection is found.
Any collection files found further down the directory were a top-most match was found will not be yielded.
This can be useful to limit search, or to ignore nested directories using subsets of STAC Collections.
"""

def __init__(self, path: str, mode: Literal["collection", "item"], prune: bool = False) -> None:
super().__init__()
self.path = path
self.iter = None
self.prune = prune
self.reset()
self._collection_mode = mode == "collection"
self._collection_name = "collection.json"

def __iter__(self) -> Iterator[Tuple[str, MutableMapping[str, Any]]]:
for root, dirs, files in self.iter:
if self.prune and self._collection_mode and self._collection_name in files:
del dirs[:]
for name in files:
if self._collection_mode and self._is_collection(name):
col_path = os.path.join(root, name)
yield col_path, self._load_json(col_path)
elif not self._collection_mode and self._is_item(name):
item_path = os.path.join(root, name)
yield item_path, self._load_json(item_path)

def _is_collection(self, path: Union[os.PathLike[str], str]) -> bool:
name = os.path.split(path)[-1]
return name == self._collection_name

def _is_item(self, path: Union[os.PathLike[str], str]) -> bool:
name = os.path.split(path)[-1]
return name != self._collection_name and os.path.splitext(name)[-1] in [".json", ".geojson"]

def _load_json(self, path: Union[os.PathLike[str], str]) -> MutableMapping[str, Any]:
with open(path, mode="r", encoding="utf-8") as file:
return json.load(file)

def reset(self):
self.iter = os.walk(self.path)


class STACLoader(GenericLoader):
def __init__(self) -> None:
super().__init__()
Expand Down
4 changes: 2 additions & 2 deletions STACpopulator/stac_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
from STACpopulator.models import STACItem

LOGGER = logging.getLogger(__name__)
LOGFORMAT = " %(log_color)s%(levelname)s:%(reset)s %(blue)s[%(name)-30s]%(reset)s %(message)s"
formatter = ColoredFormatter(LOGFORMAT)
LOG_FORMAT = " %(log_color)s%(levelname)s:%(reset)s %(blue)s[%(name)-30s]%(reset)s %(message)s"
formatter = ColoredFormatter(LOG_FORMAT)
stream = logging.StreamHandler()
stream.setFormatter(formatter)
LOGGER.addHandler(stream)
Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ keywords = [
"CMIP6"
]

[project.scripts]
stac-populator = "STACpopulator.cli:main"

[project.urls]
Repository = "https://github.com/crim-ca/stac-populator"
Changelog = "https://github.com/crim-ca/stac-populator/blob/master/CHANGES.md"
Expand Down

0 comments on commit cf75217

Please sign in to comment.