Skip to content

Commit

Permalink
Merge pull request #17 from statisticsnorway/simplify-configs
Browse files Browse the repository at this point in the history
Simplify configurations
  • Loading branch information
krlono authored May 28, 2024
2 parents 1fd3b8c + ee15337 commit a61c8aa
Show file tree
Hide file tree
Showing 10 changed files with 300 additions and 298 deletions.
231 changes: 72 additions & 159 deletions src/ssb_timeseries/config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import os
import sys
from dataclasses import asdict
from dataclasses import dataclass
from pathlib import Path

Expand All @@ -9,42 +10,57 @@
from ssb_timeseries import fs
from ssb_timeseries.types import PathStr

# mypy: disable-error-code="assignment, arg-type"
# mypy: disable-error-code="assignment, arg-type, override,call-arg,has-type"


GCS = "gs://ssb-prod-dapla-felles-data-delt/poc-tidsserier"
JOVYAN = "/home/jovyan"
HOME = str(Path.home())
LOGFILE = "timeseries.log"

DEFAULT_BUCKET = HOME
DEFAULT_TIMESERIES_LOCATION = os.path.join(HOME, "series_data")
DEFAULT_CONFIG_LOCATION = os.path.join(HOME, "timeseries_config.json")
DEFAULT_LOG_FILE_LOCATION: str = os.path.join(HOME, "logs", LOGFILE)
CONFIGURATION_FILE: str = os.getenv("TIMESERIES_CONFIG", DEFAULT_CONFIG_LOCATION)
DEFAULTS = {
"configuration_file": os.path.join(HOME, "timeseries_config.json"),
"timeseries_root": os.path.join(HOME, "series_data"),
"log_file": os.path.join(HOME, "logs", LOGFILE),
"bucket": HOME,
}
CONFIGURATION_FILE: str = os.getenv("TIMESERIES_CONFIG", DEFAULTS["configuration_file"])


@dataclass(slots=True)
class Cfg:
@dataclass(slots=False)
class Config:
"""Configuration class."""

configuration_file: str = CONFIGURATION_FILE
repository: str = DEFAULT_TIMESERIES_LOCATION
log_file: str = DEFAULT_LOG_FILE_LOCATION
bucket: str = DEFAULT_BUCKET
product: str = ""
timeseries_root: str = DEFAULTS["timeseries_root"]
log_file: str = DEFAULTS["log_file"]
bucket: str = DEFAULTS["bucket"]

def __getitem__(self, item: str) -> str:
"""Get the value of a configuration."""
d = asdict(self)
return str(d[item])

def __str__(self) -> str:
def __eq__(self, other: Self) -> bool:
"""Equality test."""
return asdict(self) == other.__dict__()

def to_json(self, original_implementation: bool = False) -> str:
"""Return timeseries configurations as JSON string."""
return json.dumps(self, default=lambda o: o.__dict__, sort_keys=True, indent=4)
if original_implementation:
return json.dumps(
self, default=lambda o: o.__dict__(), sort_keys=True, indent=4
)
else:
return json.dumps(asdict(self), sort_keys=True, indent=4)

def save(self, path: PathStr = CONFIGURATION_FILE) -> None:
"""Saves configurations to JSON file and set environment variable TIMESERIES_CONFIG to the location of the file.
Args:
path (PathStr): Full path of the JSON file to save to. Defaults to the value of the environment variable TIMESERIES_CONFIG.
"""
fs.write_json(content=str(self), path=path)
fs.write_json(content=self.to_json(), path=path)
if HOME == JOVYAN:
# For some reason `os.environ["TIMESERIES_CONFIG"] = path` does not work:
cmd = f"export TIMESERIES_CONFIG={CONFIGURATION_FILE}"
Expand All @@ -56,131 +72,24 @@ def save(self, path: PathStr = CONFIGURATION_FILE) -> None:
@classmethod
def load(cls, path: PathStr) -> Self:
"""Read the properties from a JSON file into a Config object."""
if path:
if fs.exists(path):
json_file = json.loads(fs.read_json(path))

return cls(
configuration_file=str(path),
bucket=json_file.get("bucket"),
repository=json_file.get("timeseries_root"),
timeseries_root=json_file.get("timeseries_root"),
product=json_file.get("product"),
log_file=json_file.get("log_file"),
)
else:
raise ValueError("cfg_from_file was called with an empty or invalid path.")


class Config:
"""Timeseries configurations: bucket, product, timeseries_root, log_file."""

def __init__(self, configuration_file: str = "", **kwargs: str) -> None:
"""Create or retrieve configurations.
If called with no parameters, Config attempts to read from the file specified by the environment variable TIMSERIES_CONFIG. If that does not succeed, applies defaults.
Args:
configuration_file (str): Tries to read this before falling back to environment variable. Defaults to "".
kwargs (str): Configuration options:
Kwargs:
- bucket - The "production bucket" location. Sharing and snapshots typically go in the sub directories hee, depending on configs.
- product - Optional sub directory for "production bucket".
- timeseries_root - Series data are stored in tree underneath. Defaults to '$HOME/series_data/'
- log_file - Exactly that. Defaults to '$HOME/series_data/'
"""
if fs.exists(configuration_file):
# self = Cfg.load(configuration_file) # NOSONAR # TODO: switch to Cfg class to simplify code
self.configuration_file = configuration_file
os.environ["TIMESERIES_CONFIG"] = configuration_file
elif configuration_file:
if fs.exists(CONFIGURATION_FILE):
self.load(CONFIGURATION_FILE)
self.save(configuration_file)
else:
self.__set_default_config()

elif fs.exists(CONFIGURATION_FILE):
self.load(CONFIGURATION_FILE)
self.configuration_file = CONFIGURATION_FILE

if kwargs:
log_file = kwargs.get("log_file", "")
if log_file:
self.log_file = log_file
elif not self.log_file:
self.log_file = DEFAULT_LOG_FILE_LOCATION

timeseries_root = kwargs.get("timeseries_root", "")
if timeseries_root:
self.timeseries_root = timeseries_root
elif not self.timeseries_root:
self.timeseries_root = DEFAULT_TIMESERIES_LOCATION

bucket = kwargs.get("bucket", "")
if bucket:
self.bucket = bucket
elif not self.bucket:
self.bucket = DEFAULT_BUCKET

product = kwargs.get("product", "")
if product:
self.product = product

if not hasattr(self, "log_file"):
self.__set_default_config()

self.save()

@property
def file_system_type(self) -> str:
"""Returns 'gcs' if Config.timeseries_root is on Google Cloud Storage, otherwise'local'."""
if self.timeseries_root.startswith("gs://"):
return "gcs"
else:
return "local"

def to_json(self) -> str:
"""Return timeseries configurations as JSON string."""
return json.dumps(self, default=lambda o: o.__dict__, sort_keys=True, indent=4)

def __str__(self) -> str:
"""Human readable string representation of configuration object: JSON string."""
return self.to_json()

def load(self, path: PathStr) -> None:
"""Read the properties from a JSON file into a Config object."""
if path:
read_from_file = json.loads(fs.read_json(path))

self.bucket = read_from_file.get("bucket")
self.timeseries_root = read_from_file.get("timeseries_root")
self.product = read_from_file.get("product", "")
self.log_file = read_from_file.get("log_file", "")
else:
raise ValueError("Config.load(<path>) was called with an empty path.")

def save(self, path: PathStr = CONFIGURATION_FILE) -> None:
"""Saves configurations to JSON file and set environment variable TIMESERIES_CONFIG to the location of the file.
Args:
path (PathStr): Full path of the JSON file to save to. Defaults to the value of the environment variable TIMESERIES_CONFIG.
"""
fs.write_json(content=self.to_json(), path=path)
if HOME == JOVYAN:
# For some reason `os.environ["TIMESERIES_CONFIG"] = path` does not work:
cmd = f"export TIMESERIES_CONFIG={CONFIGURATION_FILE}"
os.system(cmd)
# os.system(f"echo '{cmd}' >> ~/.bashrc")
else:
os.environ["TIMESERIES_CONFIG"] = path
raise FileNotFoundError(
"Cfg.load() was called with an empty or invalid path."
)

def __set_default_config(self) -> None:
self.bucket = DEFAULT_BUCKET
self.configuration_file = DEFAULT_CONFIG_LOCATION
self.log_file = DEFAULT_LOG_FILE_LOCATION
self.product = ""
self.timeseries_root = DEFAULT_TIMESERIES_LOCATION
fs.touch(self.log_file)
def __dict__(self) -> dict[str, str]:
"""Return timeseries configurations as dict."""
return asdict(self)


CONFIG = Config(configuration_file=CONFIGURATION_FILE)
Expand All @@ -205,54 +114,58 @@ def main(*args: str | PathStr) -> None:
ValueError: If args is not 'home' | 'gcs' | 'jovyan'.
"""
TIMESERIES_CONFIG = os.getenv("TIMESERIES_CONFIG", DEFAULT_CONFIG_LOCATION)
if not TIMESERIES_CONFIG:
print(
"Environvent variable TIMESERIES_CONFIG is empty. Using default: {DEFAULT_CONFIG_LOCATION}."
)
os.environ["TIMESERIES_CONFIG"] = DEFAULT_CONFIG_LOCATION
TIMESERIES_CONFIG = DEFAULT_CONFIG_LOCATION

if args:
named_config = args[0]
config_identifier: PathStr = args[0]
else:
named_config = sys.argv[1]
config_identifier = sys.argv[1]

print(
f"Update configuration file TIMESERIES_CONFIG: {TIMESERIES_CONFIG}, with named presets: '{named_config}'."
f"Update configuration file TIMESERIES_CONFIG: {CONFIGURATION_FILE}, with named presets: '{config_identifier}'."
)
match named_config:
match config_identifier:
case "home":
identifier_is_named_option = True
bucket = HOME
timeseries_root = os.path.join(HOME, "series_data")
log_file = DEFAULT_LOG_FILE_LOCATION
timeseries_root = fs.path(HOME, "series_data")
log_file = DEFAULTS["log_file"]
case "gcs":
identifier_is_named_option = True
bucket = GCS
timeseries_root = os.path.join(GCS, "series_data")
log_file = os.path.join(HOME, "logs", LOGFILE)
timeseries_root = fs.path(GCS, "series_data")
log_file = fs.path(HOME, "logs", LOGFILE)
case "jovyan":
identifier_is_named_option = True
bucket = JOVYAN
timeseries_root = os.path.join(JOVYAN, "series_data")
log_file = os.path.join(JOVYAN, "logs", LOGFILE)
timeseries_root = fs.path(JOVYAN, "series_data")
log_file = fs.path(JOVYAN, "logs", LOGFILE)
case _:
raise ValueError(
f"Unrecognised named configuration preset '{named_config}'."
)
identifier_is_named_option = False
identifier_is_existing_file = fs.exists(config_identifier)
bucket = None

if identifier_is_named_option:
cfg = Config(
configuration_file=CONFIGURATION_FILE,
bucket=bucket,
timeseries_root=timeseries_root,
log_file=log_file,
)
elif identifier_is_existing_file:
cfg = Config(configuration_file=config_identifier)
else:
raise ValueError(
f"Unrecognised named configuration preset '{config_identifier}'."
)

cfg = Config(
configuration_file=TIMESERIES_CONFIG,
bucket=bucket,
timeseries_root=timeseries_root,
log_file=log_file,
)
cfg.save(TIMESERIES_CONFIG)
cfg.save(CONFIGURATION_FILE)
print(cfg)
print(os.getenv("TIMESERIES_CONFIG"))


if __name__ == "__main__":
# Execute when called directly, ie not via import statements.
"""Execute when called directly, ie not via import statements."""
# ??? `poetry run timeseries-config <option>` does not appear to go this route.
# --> then it is not obvious that this is a good idea.
print(f"Name of the script : {sys.argv[0]=}")
print(f"Arguments of the script : {sys.argv[1:]=}")
main(sys.argv[1])
32 changes: 30 additions & 2 deletions src/ssb_timeseries/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,10 +396,38 @@ def plot(self, *args: Any, **kwargs: Any) -> Any:
Convenience wrapper around Dataframe.plot() with sensible defaults.
"""
xlabels = self.datetime_columns()[0]
df = self.data.copy()

if self.data_type.temporality == properties.Temporality.FROM_TO:
interval_handling = kwargs.pop("interval_handling", "interval").lower()
match interval_handling:
case "interval":
from_data = df
to_data = df
from_data["valid_to"] = from_data["valid_from"]
df = pd.concat(
[from_data, to_data],
axis=0,
ignore_index=True,
).sort_values(by=["valid_from", "valid_to"])
df.drop(columns=["valid_to"], inplace=True)
xlabels = "valid_from"
case "midpoint":
xlabels = "midpoint"
df["midpoint"] = df[self.datetime_columns()].median(axis=1)
df.drop(columns=["valid_from", "valid_to"], inplace=True)

case _:
raise ValueError(
"Invalid option for interval_handling. Must be 'from', 'to', 'interval' or 'midpoint'."
)
else:
xlabels = self.datetime_columns()[0]

ts_logger.debug(f"DATASET.plot(): x labels = {xlabels}")
ts_logger.debug(f"Dataset.plot({args!r}, {kwargs!r}) x-labels {xlabels}")
return self.data.plot( # type: ignore[call-overload]

return df.plot( # type: ignore[call-overload]
xlabels,
*args,
legend=len(self.data.columns) < 9,
Expand Down
14 changes: 11 additions & 3 deletions src/ssb_timeseries/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,14 @@ def touch(path: PathStr) -> None:
Path(path).touch()


def path(*args: PathStr) -> str:
"""Join args to form path. Make sure that gcs paths are begins with double slash: gs://..."""
p = Path(args[0]).joinpath(*args[1:])
return str(p).replace("gs:/", "gs://")
# Feels dirty. Could instead do something like:
# str(Path(args[0]).joinpath(*args[1:])).replace("gs:/{[a-z]}", "gs://{1}")


def mkdir(path: PathStr) -> None:
"""Make directory regardless of filesystem is local or GCS."""
# not good enough .. it is hard to distinguish between dirs and files that do not exist yet
Expand All @@ -83,7 +91,7 @@ def mkdir(path: PathStr) -> None:

def mk_parent_dir(path: PathStr) -> None:
"""Ensure a parent directory exists. ... regardless of wether fielsystem is local or GCS."""
# wanted a mkdir that could work with both file and directory paths,
# wanted a mkdir that could work seamlessly with both file and directory paths,
# but it is hard to distinguish between dirs and files that do not exist yet
# --> use this to create parent directory for files, mkdir() when the last part of path is a directory
if is_local(path):
Expand Down Expand Up @@ -151,7 +159,7 @@ def mv(from_path: PathStr, to_path: PathStr) -> None:


def rm(path: PathStr) -> None:
"""Remove file from local or GCS filesystem."""
"""Remove file from local or GCS filesystem. Nonrecursive. For a recursive variant, see rmtree()."""
if is_gcs(path):
...
# TO DO: implement this (but recursive)
Expand All @@ -164,7 +172,7 @@ def rm(path: PathStr) -> None:
def rmtree(
path: str,
) -> None:
"""Remove all directory and all its files and subdirectories regardless of local or GCS filesystem."""
"""Recursively remove a directory and all its subdirectories and files regardless of local or GCS filesystem."""
if is_gcs(path):
...
# TO DO: implement this (but recursive)
Expand Down
Loading

0 comments on commit a61c8aa

Please sign in to comment.