Skip to content

Commit

Permalink
Merge pull request #12 from statisticsnorway/fix-duplicated-lines-in-io
Browse files Browse the repository at this point in the history
Fix duplicated lines in io
  • Loading branch information
krlono authored May 14, 2024
2 parents 4e5ed83 + 9251d81 commit fa51e15
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 112 deletions.
9 changes: 3 additions & 6 deletions src/ssb_timeseries/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,13 @@ def file_system_type(self) -> str:
else:
return "local"

def toJSON(self) -> str:
def to_json(self) -> str:
"""Return timeseries configurations as JSON string."""
return json.dumps(self, default=lambda o: o.__dict__, sort_keys=True, indent=4)

def __str__(self) -> str:
"""Human readable string representation of configuration object: JSON string."""
return self.toJSON()
return self.to_json()

def load(self, path: PathStr) -> None:
"""Read the properties from a JSON file into a Config object."""
Expand All @@ -116,7 +116,7 @@ def save(self, path: PathStr = TIMESERIES_CONFIG) -> None:
Args:
path (PathStr): Full path of the JSON file to save to. Defaults to the value of the environment variable TIMESERIES_CONFIG.
"""
fs.write_json(content=self.toJSON(), path=path)
fs.write_json(content=self.to_json(), path=path)
if HOME == JOVYAN:
# For some reason `os.environ["TIMESERIES_CONFIG"] = path` does not work:
cmd = f"export TIMESERIES_CONFIG={TIMESERIES_CONFIG}"
Expand Down Expand Up @@ -153,9 +153,6 @@ def main(*args: str | PathStr) -> None:
ValueError: If args is not 'home' | 'gcs' | 'jovyan'.
"""
# for a, arg in enumerate(sys.argv[1:]):
# print(f"{a} - {arg}")

TIMESERIES_CONFIG = os.getenv("TIMESERIES_CONFIG", DEFAULT_CONFIG_LOCATION)
if not TIMESERIES_CONFIG:
print(
Expand Down
98 changes: 43 additions & 55 deletions src/ssb_timeseries/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,14 +203,6 @@ def snapshot(self, as_of_tz: datetime = None) -> None:
Args:
as_of_tz (datetime): Optional. Provide a timezone sensitive as_of date in order to create another version. The default is None, which will save with Dataset.as_of_utc (utc dates under the hood).
"""
# def snapshot_name(self) -> str:
# # <kort-beskrivelse>_p<periode-fra-og-med>_p<perode-til-og- med>_v<versjon>.<filtype>
# date_from = np.min(self.data[self.datetime_columns()])
# date_to = np.max(self.data[self.datetime_columns()])
# version = self.io.last_version + 1
# out = f"{self.name}_p{date_from}_p{date_to}_v{version}.parquet"
# return out

date_from = self.data[self.datetime_columns()].min().min()
date_to = self.data[self.datetime_columns()].max().max()
ts_logger.debug(
Expand Down Expand Up @@ -507,8 +499,8 @@ def resample(
self,
freq: str,
func: F | str,
*args: Any, # ---noqa: ANN002
**kwargs: Any, # --noqa: ANN003
*args: Any,
**kwargs: Any,
) -> Self:
"""Alter frequency of dataset data."""
# TODO: have a closer look at dates returned for last period when upsampling
Expand Down Expand Up @@ -666,8 +658,6 @@ def math(
out_data[self.numeric_columns()] = func(
out_data[self.numeric_columns()], other
)

# return out_data
else:
raise ValueError(
f"Incompatible shapes for element-wise {func.__name__}"
Expand All @@ -677,8 +667,6 @@ def math(
else:
raise ValueError("Unsupported operand type")

# TODO: return (new) Dataset object instead!
# return out_data
if other_as_of:
out_as_of = max(self.as_of_utc, other_as_of)
else:
Expand Down Expand Up @@ -789,63 +777,45 @@ def aggregate(
self,
attribute: str,
taxonomy: Taxonomy | int | PathStr,
aggregate_type: str | list[str] = "sum",
aggregate_function: str | list[str] = "sum",
) -> Self:
"""Aggregate dataset by taxonomy.
Args:
attribute: The attribute to aggregate by.
taxonomy (Taxonomy | int | PathStr): The values for `attribute`. A taxonomy object as returned by Taxonomy(klass_id_or_path), or the id or path to retrieve one.
aggregate_type (str | list[str]): Optional function name (or list) of the function names to apply (mean | count | sum | ...). Defaults to `sum`.
aggregate_function (str | list[str]): Optional function name (or list) of the function names to apply (mean | count | sum | ...). Defaults to `sum`.
Returns:
Self: A dataset object with the aggregated data.
If the taxonomy object has hierarchical structure, aggregate series are calculated for parent nodes at all levels.
If the taxonomy is a flat list, only a single 'total' aggregate series is calculated.
Raises:
NotImplementedError: If the aggregation method is not implemented yet. --> TODO!
"""
if isinstance(taxonomy, Taxonomy):
pass
else:
taxonomy = Taxonomy(taxonomy)

# TODO: alter to handle list of functions, eg ["mean", "10 percentile", "25 percentile", "median", "75 percentile", "90 percentile"]
if isinstance(aggregate_type, str):
match aggregate_type.lower():
case "mean" | "average":
raise NotImplementedError(
"Aggregation method 'mean' is not implemented yet."
)
case "percentile":
raise NotImplementedError(
"Aggregation method 'percentile' is not implemented yet."
)
case "count":
raise NotImplementedError(
"Aggregation method 'count' is not implemented yet."
)
case "sum" | _:
df = self.data.copy().drop(columns=self.numeric_columns())
for node in taxonomy.parent_nodes():
leaf_node_subset = self.filter(
tags={attribute: taxonomy.leaf_nodes()}, output="df"
).drop(columns=self.datetime_columns())
df[node.name] = leaf_node_subset.sum(axis=1)
ts_logger.debug(
f"DATASET.aggregate(): For node '{node.name}', column {aggregate_type} for input df:\n{leaf_node_subset}\nreturned:\n{df}"
)
new_col_name = node.name
df = df.rename(columns={node: new_col_name})
else:
raise NotImplementedError(
"Multiple aggregation methods is planned, but not yet implemented."
)
return self.copy(f"{self.name}.{aggregate_type}", data=df)
if isinstance(aggregate_function, str):
aggregate_function = [aggregate_function]

# unfinished business
# mypy: disable-error-code="no-untyped-def"
df = self.data.copy().drop(columns=self.numeric_columns())
for node in taxonomy.parent_nodes():
leaf_node_subset = self.filter(
tags={attribute: taxonomy.leaf_nodes()}, output="df"
).drop(columns=self.datetime_columns())

for m in aggregate_function:
new_col_name = f"{m}({node.name})"
df[new_col_name] = calculate_aggregate(leaf_node_subset, m)
ts_logger.debug(
f"DATASET.aggregate(): For node '{node.name}', column {m} for input df:\n{leaf_node_subset}\nreturned:\n{df}"
)

return self.copy(f"{self.name}.{aggregate_function}", data=df)

# reindexing is a remainder of abandoned approach to avoid calculating on datatime columns?
# --> can be deleted if not used / or nice to have?
@no_type_check
def reindex(
self,
Expand All @@ -864,6 +834,26 @@ def reindex(
self.data = self.data.set_index(self.datetime_columns(), *args)


def calculate_aggregate(df: pd.DataFrame, method: str) -> pd.Series | Any:
"""Helper function to calculate aggregate over dataframe columns."""
match method.lower():
case "mean" | "average":
out = df.mean(axis=1)
case "min" | "minimum":
out = df.min(axis=1)
case "max" | "maximum":
out = df.max(axis=1)
case "count":
out = df.count(axis=1)
case "sum":
out = df.sum(axis=1)
case "percentile" | _:
raise NotImplementedError(
f"Aggregation method '{method}' is not implemented (yet)."
)
return out


def search(
pattern: str = "*", as_of_tz: datetime = None
) -> list[io.SearchResult] | Dataset | list[None]:
Expand All @@ -872,12 +862,10 @@ def search(
ts_logger.debug(f"DATASET.search returned:\n{found} ")

if len(found) == 1:
# raise NotImplementedError("TODO: extract name and type from result.")
return Dataset(
name=found[0].name,
data_type=properties.seriestype_from_str(found[0].type_directory),
as_of_tz=as_of_tz,
)
else:
# elif len(found) > 1:
return found
47 changes: 3 additions & 44 deletions src/ssb_timeseries/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from ssb_timeseries import fs
from ssb_timeseries import properties
from ssb_timeseries.dates import Interval
from ssb_timeseries.dates import utc_iso_no_colon
from ssb_timeseries.logging import ts_logger
from ssb_timeseries.types import PathStr

Expand Down Expand Up @@ -69,30 +70,9 @@ def __init__(

if as_of_utc is None:
pass
# ecxception if not
# exception if not
else:
# rounded_utc = as_of_utc
self.as_of_utc: datetime = as_of_utc.isoformat().replace(":", "")

# def __new__(
# cls,
# set_name: str,
# set_type: properties.SeriesType,
# as_of_utc: datetime,
# process_stage: str = "statistikk",
# sharing: dict = {},
# type_name="local",
# *args,
# **kwargs,
# ):

# subclass_map = {
# subclass.type_name: subclass for subclass in cls.__subclasses__()
# }
# subclass = subclass_map[type_name]
# instance = super(FileSystem, subclass).__new__(subclass)
# instance.init_fs()
# return instance
self.as_of_utc: datetime = utc_iso_no_colon(as_of_utc)

@property
def root(self) -> str:
Expand Down Expand Up @@ -404,26 +384,6 @@ def snapshot(
f"DATASET {self.set_name}: sharing with {s['team']}, snapshot copied to {s['path']}."
)

@classmethod
def search(
cls, pattern: str | PathStr = "", as_of: datetime | None = None
) -> list[SearchResult]:
"""Search for files in under timeseries root."""
if pattern:
pattern = f"*{pattern}*"
else:
pattern = "*"

search_str = os.path.join(CONFIG.timeseries_root, "*", pattern)
dirs = glob.glob(search_str)
ts_logger.debug(f"DATASET.IO.SEARCH: {search_str} dirs{dirs}")
search_results = [
d.replace(CONFIG.timeseries_root, "root").split(os.path.sep) for d in dirs
]
ts_logger.debug(f"DATASET.IO.SEARCH: search_results{search_results}")

return [SearchResult(f[2], f[1]) for f in search_results]

@classmethod
def dir(cls, *args: str, **kwargs: bool) -> str:
"""Check that target directory is under BUCKET. If so, create it if it does not exist."""
Expand All @@ -444,7 +404,6 @@ def dir(cls, *args: str, **kwargs: bool) -> str:
def find_datasets(
pattern: str | PathStr = "", as_of: datetime | None = None
) -> list[SearchResult]:
# ) -> list[str | PathStr]:
"""Search for files in under timeseries root."""
if pattern:
pattern = f"*{pattern}*"
Expand Down
5 changes: 2 additions & 3 deletions tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,5 @@ def test_read_config_from_missing_json_file(remember_config) -> None:
@pytest.mark.skipif(HOME != "/home/bernhard", reason="None of your business.")
def test_fail(remember_config, caplog):
caplog.set_level(logging.DEBUG)
print("print to std out")
ts_logger.warning("ts_logger.warning: std out")
assert True
ts_logger.debug("ts_logger.warning: std out")
pass
94 changes: 90 additions & 4 deletions tests/test_meta_tagging.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,13 @@ def test_updated_tags_propagates_to_column_names_accordingly() -> None:
raise AssertionError()


@pytest.mark.skip(reason="Not ready yet.")
def test_aggregate_sum_for_flat_list_taxonomy(
caplog,
) -> None:
pass


@log_start_stop
def test_aggregate_sums_for_hierarchical_taxonomy(
conftest,
Expand Down Expand Up @@ -320,13 +327,92 @@ def test_aggregate_sums_for_hierarchical_taxonomy(
# ts_logger.debug(f"calculated: \n{y.data.info()}\n{y.data}")
assert len(y.numeric_columns()) == len(klass157.parent_nodes())
assert sorted(y.numeric_columns()) == sorted(
[n.name for n in klass157.parent_nodes()]
[f"sum({n.name})" for n in klass157.parent_nodes()]
)
# raise AssertionError("In order to see DEBUG logs while testing.")


@pytest.mark.skip(reason="Not ready yet.")
def test_aggregate_sum_for_flat_list_taxonomy(
@log_start_stop
def test_aggregate_mean_for_hierarchical_taxonomy(
conftest,
caplog,
) -> None:
pass
caplog.set_level(logging.DEBUG)
klass157 = Taxonomy(157)
klass157_leaves = [n.name for n in klass157.structure.root.leaves]

set_name = conftest.function_name()
set_tags = {
"Country": "Norway",
}
series_tags = {"A": klass157_leaves, "B": ["q"], "C": ["z"]}
tag_values: list[list[str]] = [value for value in series_tags.values()]

x = Dataset(
name=set_name,
data_type=SeriesType.estimate(),
as_of_tz=date_utc("2022-01-01"),
tags=set_tags,
series_tags=series_tags,
data=create_df(
*tag_values, start_date="2022-01-01", end_date="2022-04-03", freq="MS"
),
name_pattern=["A", "B", "C"],
)

assert len(x.numeric_columns()) == len(klass157_leaves)

y = x.aggregate("A", klass157, "mean")
assert isinstance(y, Dataset)
# ts_logger.debug(f"calculated: \n{y.data.info()}\n{y.data}")
assert len(y.numeric_columns()) == len(klass157.parent_nodes())
assert sorted(y.numeric_columns()) == sorted(
[f"mean({n.name})" for n in klass157.parent_nodes()]
)
# raise AssertionError("In order to see DEBUG logs while testing.")


@log_start_stop
def test_aggregate_multiple_methods_for_hierarchical_taxonomy(
conftest,
caplog,
) -> None:
caplog.set_level(logging.DEBUG)
klass157 = Taxonomy(157)
klass157_leaves = [n.name for n in klass157.structure.root.leaves]

set_name = conftest.function_name()
set_tags = {
"Country": "Norway",
}
series_tags = {"A": klass157_leaves, "B": ["q"], "C": ["z"]}
tag_values: list[list[str]] = [value for value in series_tags.values()]

x = Dataset(
name=set_name,
data_type=SeriesType.estimate(),
as_of_tz=date_utc("2022-01-01"),
tags=set_tags,
series_tags=series_tags,
data=create_df(
*tag_values, start_date="2022-01-01", end_date="2022-04-03", freq="MS"
),
name_pattern=["A", "B", "C"],
)

assert len(x.numeric_columns()) == len(klass157_leaves)
multiple_functions = ["count", "min", "max"]
y = x.aggregate(
attribute="A",
taxonomy=klass157,
aggregate_function=multiple_functions,
)
assert isinstance(y, Dataset)
# ts_logger.debug(f"calculated: \n{y.data.info()}\n{y.data}")
assert len(y.numeric_columns()) == len(
klass157.parent_nodes() * len(multiple_functions)
)
# assert sorted(y.numeric_columns()) == sorted(
# [n.name for n in klass157.parent_nodes()]
# )
# raise AssertionError("In order to see DEBUG logs while testing.")

0 comments on commit fa51e15

Please sign in to comment.