Skip to content

Commit

Permalink
Merge pull request #45 from crim-ca/error_handling
Browse files Browse the repository at this point in the history
Adding feature to intercept errors and save information to files
  • Loading branch information
dchandan authored Feb 21, 2024
2 parents 10b0716 + 00cb1d2 commit ba179dc
Show file tree
Hide file tree
Showing 9 changed files with 257 additions and 98 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,7 @@ reports
STACpopulator.egg-info/
build
*.pyc

## Logs
*.jsonl
*.json
12 changes: 2 additions & 10 deletions STACpopulator/api_requests.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,11 @@
import logging
import os
from typing import Any, Optional
from typing import Any, Optional, Union

import requests
from requests import Session
from colorlog import ColoredFormatter

LOGGER = logging.getLogger(__name__)
LOGFORMAT = " %(log_color)s%(levelname)s:%(reset)s %(blue)s[%(name)-30s]%(reset)s %(message)s"
formatter = ColoredFormatter(LOGFORMAT)
stream = logging.StreamHandler()
stream.setFormatter(formatter)
LOGGER.addHandler(stream)
LOGGER.setLevel(logging.INFO)
LOGGER.propagate = False


def stac_host_reachable(url: str, session: Optional[Session] = None) -> bool:
Expand Down Expand Up @@ -109,6 +101,6 @@ def post_stac_item(
r = session.put(os.path.join(stac_host, f"collections/{collection_id}/items/{item_id}"), json=json_data)
r.raise_for_status()
else:
LOGGER.info(f"Item {item_id} already exists.")
LOGGER.warn(f"Item {item_id} already exists.")
else:
r.raise_for_status()
61 changes: 44 additions & 17 deletions STACpopulator/cli.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
import argparse
import glob
import importlib
import logging
import os
import sys
from datetime import datetime
from http import cookiejar
from typing import Callable, Optional

import requests
from http import cookiejar
from requests.auth import AuthBase, HTTPBasicAuth, HTTPDigestAuth, HTTPProxyAuth
from requests.sessions import Session

from STACpopulator import __version__
from STACpopulator.logging import setup_logging

POPULATORS = {}

Expand Down Expand Up @@ -54,19 +57,24 @@ def add_request_options(parser: argparse.ArgumentParser) -> None:
Adds arguments to a parser to allow update of a request session definition used across a populator procedure.
"""
parser.add_argument(
"--no-verify", "--no-ssl", "--no-ssl-verify", dest="verify", action="store_false",
help="Disable SSL verification (not recommended unless for development/test servers)."
)
parser.add_argument(
"--cert", type=argparse.FileType(), required=False, help="Path to a certificate file to use."
"--no-verify",
"--no-ssl",
"--no-ssl-verify",
dest="verify",
action="store_false",
help="Disable SSL verification (not recommended unless for development/test servers).",
)
parser.add_argument("--cert", type=argparse.FileType(), required=False, help="Path to a certificate file to use.")
parser.add_argument(
"--auth-handler", choices=["basic", "digest", "bearer", "proxy", "cookie"], required=False,
help="Authentication strategy to employ for the requests session."
"--auth-handler",
choices=["basic", "digest", "bearer", "proxy", "cookie"],
required=False,
help="Authentication strategy to employ for the requests session.",
)
parser.add_argument(
"--auth-identity", required=False,
help="Bearer token, cookie-jar file or proxy/digest/basic username:password for selected authorization handler."
"--auth-identity",
required=False,
help="Bearer token, cookie-jar file or proxy/digest/basic username:password for selected authorization handler.",
)


Expand All @@ -93,19 +101,29 @@ def apply_request_options(session: Session, namespace: argparse.Namespace) -> No

def make_main_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(prog="stac-populator", description="STACpopulator operations.")
parser.add_argument("--version", "-V", action="version", version=f"%(prog)s {__version__}",
help="prints the version of the library and exits")
parser.add_argument(
"--version",
"-V",
action="version",
version=f"%(prog)s {__version__}",
help="prints the version of the library and exits",
)
commands = parser.add_subparsers(title="command", dest="command", description="STAC populator command to execute.")

run_cmd_parser = make_run_command_parser(parser.prog)
commands.add_parser(
"run",
prog=f"{parser.prog} {run_cmd_parser.prog}", parents=[run_cmd_parser],
formatter_class=run_cmd_parser.formatter_class, usage=run_cmd_parser.usage,
add_help=False, help=run_cmd_parser.description, description=run_cmd_parser.description
prog=f"{parser.prog} {run_cmd_parser.prog}",
parents=[run_cmd_parser],
formatter_class=run_cmd_parser.formatter_class,
usage=run_cmd_parser.usage,
add_help=False,
help=run_cmd_parser.description,
description=run_cmd_parser.description,
)

# add more commands as needed...
parser.add_argument("--debug", action="store_true", help="Set logger level to debug")

return parser

Expand Down Expand Up @@ -142,9 +160,12 @@ def make_run_command_parser(parent) -> argparse.ArgumentParser:
populator_prog = f"{parent} {parser.prog} {populator_name}"
subparsers.add_parser(
populator_name,
prog=populator_prog, parents=[populator_parser], formatter_class=populator_parser.formatter_class,
prog=populator_prog,
parents=[populator_parser],
formatter_class=populator_parser.formatter_class,
add_help=False, # add help disabled otherwise conflicts with this main populator help
help=populator_parser.description, description=populator_parser.description,
help=populator_parser.description,
description=populator_parser.description,
usage=populator_parser.usage,
)
POPULATORS[populator_name] = {
Expand All @@ -168,6 +189,12 @@ def main(*args: str) -> Optional[int]:
result = None
if populator_cmd == "run":
populator_name = params.pop("populator")

# Setup the application logger:
fname = f"{populator_name}_log_{datetime.utcnow().isoformat() + 'Z'}.jsonl"
log_level = logging.DEBUG if ns.debug else logging.INFO
setup_logging(fname, log_level)

if not populator_name:
parser.print_help()
return 0
Expand Down
56 changes: 30 additions & 26 deletions STACpopulator/implementations/CMIP6_UofT/add_CMIP6.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
import argparse
import json
import logging
import os
from typing import Any, MutableMapping, NoReturn, Optional, Union

from requests.sessions import Session
from pystac.extensions.datacube import DatacubeExtension
from requests.sessions import Session

from STACpopulator.cli import add_request_options, apply_request_options
from STACpopulator.extensions.cmip6 import CMIP6Properties, CMIP6Helper
from STACpopulator.extensions.cmip6 import CMIP6Helper, CMIP6Properties
from STACpopulator.extensions.datacube import DataCubeHelper
from STACpopulator.extensions.thredds import THREDDSHelper, THREDDSExtension
from STACpopulator.input import GenericLoader, ErrorLoader, THREDDSLoader
from STACpopulator.extensions.thredds import THREDDSExtension, THREDDSHelper
from STACpopulator.input import ErrorLoader, GenericLoader, THREDDSLoader
from STACpopulator.models import GeoJSONPolygon
from STACpopulator.populator_base import STACpopulatorBase
from STACpopulator.stac_utils import get_logger

LOGGER = get_logger(__name__)
LOGGER = logging.getLogger(__name__)


class CMIP6populator(STACpopulatorBase):
Expand All @@ -29,6 +29,7 @@ def __init__(
update: Optional[bool] = False,
session: Optional[Session] = None,
config_file: Optional[Union[os.PathLike[str], str]] = None,
log_debug: Optional[bool] = False,
) -> None:
"""Constructor
Expand All @@ -37,14 +38,12 @@ def __init__(
:param data_loader: loader to iterate over ingestion data.
"""
super().__init__(
stac_host,
data_loader,
update=update,
session=session,
config_file=config_file,
stac_host, data_loader, update=update, session=session, config_file=config_file, log_debug=log_debug
)

def create_stac_item(self, item_name: str, item_data: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
def create_stac_item(
self, item_name: str, item_data: MutableMapping[str, Any]
) -> Union[None, MutableMapping[str, Any]]:
"""Creates the STAC item.
:param item_name: name of the STAC item. Interpretation of name is left to the input loader implementation
Expand All @@ -58,26 +57,23 @@ def create_stac_item(self, item_name: str, item_data: MutableMapping[str, Any])
try:
cmip_helper = CMIP6Helper(item_data, self.item_geometry_model)
item = cmip_helper.stac_item()
except Exception:
LOGGER.error("Failed to add CMIP6 extension to item %s", item_name)
raise
except Exception as e:
raise Exception("Failed to add CMIP6 extension") from e

# Add datacube extension
try:
dc_helper = DataCubeHelper(item_data)
dc_ext = DatacubeExtension.ext(item, add_if_missing=True)
dc_ext.apply(dimensions=dc_helper.dimensions, variables=dc_helper.variables)
except Exception:
LOGGER.error("Failed to add Datacube extension to item %s", item_name)
raise
except Exception as e:
raise Exception("Failed to add Datacube extension") from e

try:
thredds_helper = THREDDSHelper(item_data["access_urls"])
thredds_ext = THREDDSExtension.ext(item)
thredds_ext.apply(thredds_helper.services, thredds_helper.links)
except Exception:
LOGGER.error("Failed to add THREDDS references to item %s", item_name)
raise
except Exception as e:
raise Exception("Failed to add THREDDS extension") from e

# print(json.dumps(item.to_dict()))
return json.loads(json.dumps(item.to_dict()))
Expand All @@ -88,13 +84,19 @@ def make_parser() -> argparse.ArgumentParser:
parser.add_argument("stac_host", type=str, help="STAC API address")
parser.add_argument("href", type=str, help="URL to a THREDDS catalog or a NCML XML with CMIP6 metadata.")
parser.add_argument("--update", action="store_true", help="Update collection and its items")
parser.add_argument("--mode", choices=["full", "single"], default="full",
help="Operation mode, processing the full dataset or only the single reference.")
parser.add_argument(
"--config", type=str, help=(
"--mode",
choices=["full", "single"],
default="full",
help="Operation mode, processing the full dataset or only the single reference.",
)
parser.add_argument(
"--config",
type=str,
help=(
"Override configuration file for the populator. "
"By default, uses the adjacent configuration to the implementation class."
)
),
)
add_request_options(parser)
return parser
Expand All @@ -111,7 +113,9 @@ def runner(ns: argparse.Namespace) -> Optional[int] | NoReturn:
# To be implemented
data_loader = ErrorLoader()

c = CMIP6populator(ns.stac_host, data_loader, update=ns.update, session=session, config_file=ns.config)
c = CMIP6populator(
ns.stac_host, data_loader, update=ns.update, session=session, config_file=ns.config, log_debug=ns.debug
)
c.ingest()


Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import argparse
import logging
import os.path
from typing import Any, MutableMapping, NoReturn, Optional

Expand All @@ -8,9 +9,8 @@
from STACpopulator.input import STACDirectoryLoader
from STACpopulator.models import GeoJSONPolygon
from STACpopulator.populator_base import STACpopulatorBase
from STACpopulator.stac_utils import get_logger

LOGGER = get_logger(__name__)
LOGGER = logging.getLogger(__name__)


class DirectoryPopulator(STACpopulatorBase):
Expand Down
12 changes: 3 additions & 9 deletions STACpopulator/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,12 @@
import requests
import siphon
import xncml
from colorlog import ColoredFormatter
from requests.sessions import Session
from siphon.catalog import TDSCatalog, session_manager

from STACpopulator.stac_utils import numpy_to_python_datatypes, url_validate

LOGGER = logging.getLogger(__name__)
LOGFORMAT = " %(log_color)s%(levelname)s:%(reset)s %(blue)s[%(name)-30s]%(reset)s %(message)s"
formatter = ColoredFormatter(LOGFORMAT)
stream = logging.StreamHandler()
stream.setFormatter(formatter)
LOGGER.addHandler(stream)
LOGGER.setLevel(logging.INFO)
LOGGER.propagate = False


class GenericLoader(ABC):
Expand Down Expand Up @@ -149,7 +141,9 @@ def __iter__(self) -> Iterator[Tuple[str, str, MutableMapping[str, Any]]]:
if self.catalog_head.datasets.items():
for item_name, ds in self.catalog_head.datasets.items():
attrs = self.extract_metadata(ds)
yield item_name, ds.url_path, attrs
filename = ds.url_path[ds.url_path.rfind("/") :]
url = self.catalog_head.catalog_url[: self.catalog_head.catalog_url.rfind("/")] + filename
yield item_name, url, attrs

for name, ref in self.catalog_head.catalog_refs.items():
self.catalog_head = ref.follow()
Expand Down
Loading

0 comments on commit ba179dc

Please sign in to comment.