From 34a7884ce5cc25a7c6608fcb5fc37643795373a1 Mon Sep 17 00:00:00 2001 From: Bernhard Ryeng Date: Thu, 23 May 2024 14:58:38 +0200 Subject: [PATCH 1/4] Remove team param from snapshot directories. --- src/ssb_timeseries/io.py | 18 +++++++----------- tests/test_dataset_sharing.py | 28 +++++++++++++--------------- 2 files changed, 20 insertions(+), 26 deletions(-) diff --git a/src/ssb_timeseries/io.py b/src/ssb_timeseries/io.py index d42dd28..7c0243b 100644 --- a/src/ssb_timeseries/io.py +++ b/src/ssb_timeseries/io.py @@ -201,7 +201,7 @@ def read_metadata(self) -> dict: def write_metadata(self, meta: dict) -> None: """Write tags to the metadata file.""" - os.makedirs(self.metadata_dir, exist_ok=True) + # no longer necessary: os.makedirs(self.metadata_dir, exist_ok=True) try: fs.write_json(self.metadata_fullpath, meta) ts_logger.info( @@ -241,7 +241,6 @@ def last_version(self, directory: str, pattern: str = "*.parquet") -> str: files = fs.ls(directory, pattern=pattern) number_of_files = len(files) - # TODO: mypy --> error: Item "None" of "Match[str] | None" has no attribute "group" [union-attr] vs = sorted([int(re.search("(_v)(\d+)(.parquet)", f).group(2)) for f in files]) ts_logger.debug( f"DATASET {self.set_name}: io.last_version regex identified versions {vs} in {directory}." @@ -306,17 +305,12 @@ def iso_no_colon(dt: datetime) -> str: ) return out - def sharing_directory(self, bucket: str, team: str = "") -> PathStr: + def sharing_directory(self, bucket: str) -> PathStr: """Get name of sharing directory based on dataset parameters and configuration. Creates the directory if it does not exist. """ - fix_test_cases_before_taking_this_approach = False - if team and fix_test_cases_before_taking_this_approach: - # allowing this breaks tests! --> TODO: adapt test cases - directory = os.path.join(bucket, team, self.set_name) - else: - directory = os.path.join(bucket, self.set_name) + directory = os.path.join(bucket, self.set_name) ts_logger.warning(f"DATASET.IO.SHARING_DIRECTORY: {directory}") fs.mkdir(directory) @@ -364,13 +358,15 @@ def snapshot( ts_logger.warning(f"Sharing configs: {sharing}") for s in sharing: ts_logger.debug(f"Sharing: {s}") + if "team" not in s.keys(): + s["team"] = "no team specified" fs.cp( data_publish_path, - self.sharing_directory(bucket=s["path"], team=s["team"]), + self.sharing_directory(bucket=s["path"]), ) fs.cp( meta_publish_path, - self.sharing_directory(bucket=s["path"], team=s["team"]), + self.sharing_directory(bucket=s["path"]), ) ts_logger.warning( f"DATASET {self.set_name}: sharing with {s['team']}, snapshot copied to {s['path']}." diff --git a/tests/test_dataset_sharing.py b/tests/test_dataset_sharing.py index 3ef663f..cc1bce4 100644 --- a/tests/test_dataset_sharing.py +++ b/tests/test_dataset_sharing.py @@ -1,7 +1,5 @@ import logging -import pytest - from ssb_timeseries import fs from ssb_timeseries.dataset import Dataset from ssb_timeseries.dates import date_utc @@ -18,7 +16,6 @@ PRODUCT = "sample-data-product" -@pytest.mark.skipif(False, reason="Don't skip.") @log_start_stop def test_snapshot_simple_set_has_higher_snapshot_file_count_after(caplog): caplog.set_level(logging.DEBUG) @@ -37,15 +34,16 @@ def test_snapshot_simple_set_has_higher_snapshot_file_count_after(caplog): stage_path = x.io.snapshot_directory( product=x.product, process_stage=x.process_stage ) - path_123 = x.io.dir(BUCKET, x.product, "shared", "s123") - path_234 = x.io.dir(BUCKET, x.product, "shared", "s234") + shared_base_path = x.io.dir(BUCKET, x.product, "shared", "all") + path_123 = shared_base_path + path_234 = shared_base_path x.sharing = [ { - "team": "s123", + # should work even if tartget team is not specified(? "path": path_123, }, { - "team": "s234", + "team": "", "path": path_234, }, ] @@ -80,7 +78,7 @@ def log(path, before, after): @log_start_stop -def test_snapshot_estimate_has_higher_file_count_after(caplog): +def test_snapshot_estimate_specified_has_higher_file_count_after(caplog): caplog.set_level(logging.DEBUG) x = Dataset( @@ -97,22 +95,22 @@ def test_snapshot_estimate_has_higher_file_count_after(caplog): stage_path = x.io.snapshot_directory( product=x.product, process_stage=x.process_stage ) - path_123 = x.io.dir(BUCKET, x.product, "shared", "s123") - path_234 = x.io.dir(BUCKET, x.product, "shared", "s234") + shared_base_path = x.io.dir(BUCKET, x.product, "shared") + team_path_123 = x.io.dir(shared_base_path, "s123") + team_path_234 = x.io.dir(shared_base_path, "s234") x.sharing = [ { "team": "s123", - "path": path_123, + "path": team_path_123, }, { "team": "s234", - "path": path_234, + "path": team_path_234, }, ] - path_123 = x.io.dir(path_123, x.name) - path_234 = x.io.dir(path_234, x.name) - + path_123 = x.io.dir(team_path_123, x.name) + path_234 = x.io.dir(team_path_234, x.name) x.save() ts_logger.debug(f"SNAPSHOT conf.bucket {BUCKET}") ts_logger.debug(f"SNAPSHOT to {path_123}") From 819f90a6c5369bdfc3b524a70ac650f48b197a00 Mon Sep 17 00:00:00 2001 From: Bernhard Ryeng Date: Tue, 28 May 2024 02:04:14 +0200 Subject: [PATCH 2/4] Use @dataclass to simplify configs. Reset configs after tests. --- src/ssb_timeseries/config.py | 231 ++++++++++--------------------- src/ssb_timeseries/fs.py | 14 +- src/ssb_timeseries/properties.py | 3 - tests/conftest.py | 83 +++++++---- tests/test_config.py | 133 +++++++++++------- tests/test_fs.py | 8 ++ 6 files changed, 227 insertions(+), 245 deletions(-) diff --git a/src/ssb_timeseries/config.py b/src/ssb_timeseries/config.py index 89751b6..54eb554 100644 --- a/src/ssb_timeseries/config.py +++ b/src/ssb_timeseries/config.py @@ -1,6 +1,7 @@ import json import os import sys +from dataclasses import asdict from dataclasses import dataclass from pathlib import Path @@ -9,7 +10,7 @@ from ssb_timeseries import fs from ssb_timeseries.types import PathStr -# mypy: disable-error-code="assignment, arg-type" +# mypy: disable-error-code="assignment, arg-type, override,call-arg,has-type" GCS = "gs://ssb-prod-dapla-felles-data-delt/poc-tidsserier" @@ -17,26 +18,41 @@ HOME = str(Path.home()) LOGFILE = "timeseries.log" -DEFAULT_BUCKET = HOME -DEFAULT_TIMESERIES_LOCATION = os.path.join(HOME, "series_data") -DEFAULT_CONFIG_LOCATION = os.path.join(HOME, "timeseries_config.json") -DEFAULT_LOG_FILE_LOCATION: str = os.path.join(HOME, "logs", LOGFILE) -CONFIGURATION_FILE: str = os.getenv("TIMESERIES_CONFIG", DEFAULT_CONFIG_LOCATION) +DEFAULTS = { + "configuration_file": os.path.join(HOME, "timeseries_config.json"), + "timeseries_root": os.path.join(HOME, "series_data"), + "log_file": os.path.join(HOME, "logs", LOGFILE), + "bucket": HOME, +} +CONFIGURATION_FILE: str = os.getenv("TIMESERIES_CONFIG", DEFAULTS["configuration_file"]) -@dataclass(slots=True) -class Cfg: +@dataclass(slots=False) +class Config: """Configuration class.""" configuration_file: str = CONFIGURATION_FILE - repository: str = DEFAULT_TIMESERIES_LOCATION - log_file: str = DEFAULT_LOG_FILE_LOCATION - bucket: str = DEFAULT_BUCKET - product: str = "" + timeseries_root: str = DEFAULTS["timeseries_root"] + log_file: str = DEFAULTS["log_file"] + bucket: str = DEFAULTS["bucket"] + + def __getitem__(self, item: str) -> str: + """Get the value of a configuration.""" + d = asdict(self) + return str(d[item]) - def __str__(self) -> str: + def __eq__(self, other: Self) -> bool: + """Equality test.""" + return asdict(self) == other.__dict__() + + def to_json(self, original_implementation: bool = False) -> str: """Return timeseries configurations as JSON string.""" - return json.dumps(self, default=lambda o: o.__dict__, sort_keys=True, indent=4) + if original_implementation: + return json.dumps( + self, default=lambda o: o.__dict__(), sort_keys=True, indent=4 + ) + else: + return json.dumps(asdict(self), sort_keys=True, indent=4) def save(self, path: PathStr = CONFIGURATION_FILE) -> None: """Saves configurations to JSON file and set environment variable TIMESERIES_CONFIG to the location of the file. @@ -44,7 +60,7 @@ def save(self, path: PathStr = CONFIGURATION_FILE) -> None: Args: path (PathStr): Full path of the JSON file to save to. Defaults to the value of the environment variable TIMESERIES_CONFIG. """ - fs.write_json(content=str(self), path=path) + fs.write_json(content=self.to_json(), path=path) if HOME == JOVYAN: # For some reason `os.environ["TIMESERIES_CONFIG"] = path` does not work: cmd = f"export TIMESERIES_CONFIG={CONFIGURATION_FILE}" @@ -56,131 +72,24 @@ def save(self, path: PathStr = CONFIGURATION_FILE) -> None: @classmethod def load(cls, path: PathStr) -> Self: """Read the properties from a JSON file into a Config object.""" - if path: + if fs.exists(path): json_file = json.loads(fs.read_json(path)) return cls( configuration_file=str(path), bucket=json_file.get("bucket"), - repository=json_file.get("timeseries_root"), + timeseries_root=json_file.get("timeseries_root"), product=json_file.get("product"), log_file=json_file.get("log_file"), ) else: - raise ValueError("cfg_from_file was called with an empty or invalid path.") - - -class Config: - """Timeseries configurations: bucket, product, timeseries_root, log_file.""" - - def __init__(self, configuration_file: str = "", **kwargs: str) -> None: - """Create or retrieve configurations. - - If called with no parameters, Config attempts to read from the file specified by the environment variable TIMSERIES_CONFIG. If that does not succeed, applies defaults. - - Args: - configuration_file (str): Tries to read this before falling back to environment variable. Defaults to "". - kwargs (str): Configuration options: - - Kwargs: - - bucket - The "production bucket" location. Sharing and snapshots typically go in the sub directories hee, depending on configs. - - product - Optional sub directory for "production bucket". - - timeseries_root - Series data are stored in tree underneath. Defaults to '$HOME/series_data/' - - log_file - Exactly that. Defaults to '$HOME/series_data/' - """ - if fs.exists(configuration_file): - # self = Cfg.load(configuration_file) # NOSONAR # TODO: switch to Cfg class to simplify code - self.configuration_file = configuration_file - os.environ["TIMESERIES_CONFIG"] = configuration_file - elif configuration_file: - if fs.exists(CONFIGURATION_FILE): - self.load(CONFIGURATION_FILE) - self.save(configuration_file) - else: - self.__set_default_config() - - elif fs.exists(CONFIGURATION_FILE): - self.load(CONFIGURATION_FILE) - self.configuration_file = CONFIGURATION_FILE - - if kwargs: - log_file = kwargs.get("log_file", "") - if log_file: - self.log_file = log_file - elif not self.log_file: - self.log_file = DEFAULT_LOG_FILE_LOCATION - - timeseries_root = kwargs.get("timeseries_root", "") - if timeseries_root: - self.timeseries_root = timeseries_root - elif not self.timeseries_root: - self.timeseries_root = DEFAULT_TIMESERIES_LOCATION - - bucket = kwargs.get("bucket", "") - if bucket: - self.bucket = bucket - elif not self.bucket: - self.bucket = DEFAULT_BUCKET - - product = kwargs.get("product", "") - if product: - self.product = product - - if not hasattr(self, "log_file"): - self.__set_default_config() - - self.save() - - @property - def file_system_type(self) -> str: - """Returns 'gcs' if Config.timeseries_root is on Google Cloud Storage, otherwise'local'.""" - if self.timeseries_root.startswith("gs://"): - return "gcs" - else: - return "local" - - def to_json(self) -> str: - """Return timeseries configurations as JSON string.""" - return json.dumps(self, default=lambda o: o.__dict__, sort_keys=True, indent=4) - - def __str__(self) -> str: - """Human readable string representation of configuration object: JSON string.""" - return self.to_json() - - def load(self, path: PathStr) -> None: - """Read the properties from a JSON file into a Config object.""" - if path: - read_from_file = json.loads(fs.read_json(path)) - - self.bucket = read_from_file.get("bucket") - self.timeseries_root = read_from_file.get("timeseries_root") - self.product = read_from_file.get("product", "") - self.log_file = read_from_file.get("log_file", "") - else: - raise ValueError("Config.load() was called with an empty path.") - - def save(self, path: PathStr = CONFIGURATION_FILE) -> None: - """Saves configurations to JSON file and set environment variable TIMESERIES_CONFIG to the location of the file. - - Args: - path (PathStr): Full path of the JSON file to save to. Defaults to the value of the environment variable TIMESERIES_CONFIG. - """ - fs.write_json(content=self.to_json(), path=path) - if HOME == JOVYAN: - # For some reason `os.environ["TIMESERIES_CONFIG"] = path` does not work: - cmd = f"export TIMESERIES_CONFIG={CONFIGURATION_FILE}" - os.system(cmd) - # os.system(f"echo '{cmd}' >> ~/.bashrc") - else: - os.environ["TIMESERIES_CONFIG"] = path + raise FileNotFoundError( + "Cfg.load() was called with an empty or invalid path." + ) - def __set_default_config(self) -> None: - self.bucket = DEFAULT_BUCKET - self.configuration_file = DEFAULT_CONFIG_LOCATION - self.log_file = DEFAULT_LOG_FILE_LOCATION - self.product = "" - self.timeseries_root = DEFAULT_TIMESERIES_LOCATION - fs.touch(self.log_file) + def __dict__(self) -> dict[str, str]: + """Return timeseries configurations as dict.""" + return asdict(self) CONFIG = Config(configuration_file=CONFIGURATION_FILE) @@ -205,54 +114,58 @@ def main(*args: str | PathStr) -> None: ValueError: If args is not 'home' | 'gcs' | 'jovyan'. """ - TIMESERIES_CONFIG = os.getenv("TIMESERIES_CONFIG", DEFAULT_CONFIG_LOCATION) - if not TIMESERIES_CONFIG: - print( - "Environvent variable TIMESERIES_CONFIG is empty. Using default: {DEFAULT_CONFIG_LOCATION}." - ) - os.environ["TIMESERIES_CONFIG"] = DEFAULT_CONFIG_LOCATION - TIMESERIES_CONFIG = DEFAULT_CONFIG_LOCATION - if args: - named_config = args[0] + config_identifier: PathStr = args[0] else: - named_config = sys.argv[1] + config_identifier = sys.argv[1] print( - f"Update configuration file TIMESERIES_CONFIG: {TIMESERIES_CONFIG}, with named presets: '{named_config}'." + f"Update configuration file TIMESERIES_CONFIG: {CONFIGURATION_FILE}, with named presets: '{config_identifier}'." ) - match named_config: + match config_identifier: case "home": + identifier_is_named_option = True bucket = HOME - timeseries_root = os.path.join(HOME, "series_data") - log_file = DEFAULT_LOG_FILE_LOCATION + timeseries_root = fs.path(HOME, "series_data") + log_file = DEFAULTS["log_file"] case "gcs": + identifier_is_named_option = True bucket = GCS - timeseries_root = os.path.join(GCS, "series_data") - log_file = os.path.join(HOME, "logs", LOGFILE) + timeseries_root = fs.path(GCS, "series_data") + log_file = fs.path(HOME, "logs", LOGFILE) case "jovyan": + identifier_is_named_option = True bucket = JOVYAN - timeseries_root = os.path.join(JOVYAN, "series_data") - log_file = os.path.join(JOVYAN, "logs", LOGFILE) + timeseries_root = fs.path(JOVYAN, "series_data") + log_file = fs.path(JOVYAN, "logs", LOGFILE) case _: - raise ValueError( - f"Unrecognised named configuration preset '{named_config}'." - ) + identifier_is_named_option = False + identifier_is_existing_file = fs.exists(config_identifier) + bucket = None + + if identifier_is_named_option: + cfg = Config( + configuration_file=CONFIGURATION_FILE, + bucket=bucket, + timeseries_root=timeseries_root, + log_file=log_file, + ) + elif identifier_is_existing_file: + cfg = Config(configuration_file=config_identifier) + else: + raise ValueError( + f"Unrecognised named configuration preset '{config_identifier}'." + ) - cfg = Config( - configuration_file=TIMESERIES_CONFIG, - bucket=bucket, - timeseries_root=timeseries_root, - log_file=log_file, - ) - cfg.save(TIMESERIES_CONFIG) + cfg.save(CONFIGURATION_FILE) print(cfg) print(os.getenv("TIMESERIES_CONFIG")) if __name__ == "__main__": - # Execute when called directly, ie not via import statements. + """Execute when called directly, ie not via import statements.""" # ??? `poetry run timeseries-config