Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix duplicated lines in io #12

Merged
merged 3 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions src/ssb_timeseries/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,13 @@ def file_system_type(self) -> str:
else:
return "local"

def toJSON(self) -> str:
def to_json(self) -> str:
"""Return timeseries configurations as JSON string."""
return json.dumps(self, default=lambda o: o.__dict__, sort_keys=True, indent=4)

def __str__(self) -> str:
"""Human readable string representation of configuration object: JSON string."""
return self.toJSON()
return self.to_json()

def load(self, path: PathStr) -> None:
"""Read the properties from a JSON file into a Config object."""
Expand All @@ -116,7 +116,7 @@ def save(self, path: PathStr = TIMESERIES_CONFIG) -> None:
Args:
path (PathStr): Full path of the JSON file to save to. Defaults to the value of the environment variable TIMESERIES_CONFIG.
"""
fs.write_json(content=self.toJSON(), path=path)
fs.write_json(content=self.to_json(), path=path)
if HOME == JOVYAN:
# For some reason `os.environ["TIMESERIES_CONFIG"] = path` does not work:
cmd = f"export TIMESERIES_CONFIG={TIMESERIES_CONFIG}"
Expand Down Expand Up @@ -153,9 +153,6 @@ def main(*args: str | PathStr) -> None:
ValueError: If args is not 'home' | 'gcs' | 'jovyan'.

"""
# for a, arg in enumerate(sys.argv[1:]):
# print(f"{a} - {arg}")

TIMESERIES_CONFIG = os.getenv("TIMESERIES_CONFIG", DEFAULT_CONFIG_LOCATION)
if not TIMESERIES_CONFIG:
print(
Expand Down
98 changes: 43 additions & 55 deletions src/ssb_timeseries/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,14 +203,6 @@ def snapshot(self, as_of_tz: datetime = None) -> None:
Args:
as_of_tz (datetime): Optional. Provide a timezone sensitive as_of date in order to create another version. The default is None, which will save with Dataset.as_of_utc (utc dates under the hood).
"""
# def snapshot_name(self) -> str:
# # <kort-beskrivelse>_p<periode-fra-og-med>_p<perode-til-og- med>_v<versjon>.<filtype>
# date_from = np.min(self.data[self.datetime_columns()])
# date_to = np.max(self.data[self.datetime_columns()])
# version = self.io.last_version + 1
# out = f"{self.name}_p{date_from}_p{date_to}_v{version}.parquet"
# return out

date_from = self.data[self.datetime_columns()].min().min()
date_to = self.data[self.datetime_columns()].max().max()
ts_logger.debug(
Expand Down Expand Up @@ -507,8 +499,8 @@ def resample(
self,
freq: str,
func: F | str,
*args: Any, # ---noqa: ANN002
**kwargs: Any, # --noqa: ANN003
*args: Any,
**kwargs: Any,
) -> Self:
"""Alter frequency of dataset data."""
# TODO: have a closer look at dates returned for last period when upsampling
Expand Down Expand Up @@ -666,8 +658,6 @@ def math(
out_data[self.numeric_columns()] = func(
out_data[self.numeric_columns()], other
)

# return out_data
else:
raise ValueError(
f"Incompatible shapes for element-wise {func.__name__}"
Expand All @@ -677,8 +667,6 @@ def math(
else:
raise ValueError("Unsupported operand type")

# TODO: return (new) Dataset object instead!
# return out_data
if other_as_of:
out_as_of = max(self.as_of_utc, other_as_of)
else:
Expand Down Expand Up @@ -789,63 +777,45 @@ def aggregate(
self,
attribute: str,
taxonomy: Taxonomy | int | PathStr,
aggregate_type: str | list[str] = "sum",
aggregate_function: str | list[str] = "sum",
) -> Self:
"""Aggregate dataset by taxonomy.

Args:
attribute: The attribute to aggregate by.
taxonomy (Taxonomy | int | PathStr): The values for `attribute`. A taxonomy object as returned by Taxonomy(klass_id_or_path), or the id or path to retrieve one.
aggregate_type (str | list[str]): Optional function name (or list) of the function names to apply (mean | count | sum | ...). Defaults to `sum`.
aggregate_function (str | list[str]): Optional function name (or list) of the function names to apply (mean | count | sum | ...). Defaults to `sum`.

Returns:
Self: A dataset object with the aggregated data.
If the taxonomy object has hierarchical structure, aggregate series are calculated for parent nodes at all levels.
If the taxonomy is a flat list, only a single 'total' aggregate series is calculated.

Raises:
NotImplementedError: If the aggregation method is not implemented yet. --> TODO!
"""
if isinstance(taxonomy, Taxonomy):
pass
else:
taxonomy = Taxonomy(taxonomy)

# TODO: alter to handle list of functions, eg ["mean", "10 percentile", "25 percentile", "median", "75 percentile", "90 percentile"]
if isinstance(aggregate_type, str):
match aggregate_type.lower():
case "mean" | "average":
raise NotImplementedError(
"Aggregation method 'mean' is not implemented yet."
)
case "percentile":
raise NotImplementedError(
"Aggregation method 'percentile' is not implemented yet."
)
case "count":
raise NotImplementedError(
"Aggregation method 'count' is not implemented yet."
)
case "sum" | _:
df = self.data.copy().drop(columns=self.numeric_columns())
for node in taxonomy.parent_nodes():
leaf_node_subset = self.filter(
tags={attribute: taxonomy.leaf_nodes()}, output="df"
).drop(columns=self.datetime_columns())
df[node.name] = leaf_node_subset.sum(axis=1)
ts_logger.debug(
f"DATASET.aggregate(): For node '{node.name}', column {aggregate_type} for input df:\n{leaf_node_subset}\nreturned:\n{df}"
)
new_col_name = node.name
df = df.rename(columns={node: new_col_name})
else:
raise NotImplementedError(
"Multiple aggregation methods is planned, but not yet implemented."
)
return self.copy(f"{self.name}.{aggregate_type}", data=df)
if isinstance(aggregate_function, str):
aggregate_function = [aggregate_function]

# unfinished business
# mypy: disable-error-code="no-untyped-def"
df = self.data.copy().drop(columns=self.numeric_columns())
for node in taxonomy.parent_nodes():
leaf_node_subset = self.filter(
tags={attribute: taxonomy.leaf_nodes()}, output="df"
).drop(columns=self.datetime_columns())

for m in aggregate_function:
new_col_name = f"{m}({node.name})"
df[new_col_name] = calculate_aggregate(leaf_node_subset, m)
ts_logger.debug(
f"DATASET.aggregate(): For node '{node.name}', column {m} for input df:\n{leaf_node_subset}\nreturned:\n{df}"
)

return self.copy(f"{self.name}.{aggregate_function}", data=df)

# reindexing is a remainder of abandoned approach to avoid calculating on datatime columns?
# --> can be deleted if not used / or nice to have?
@no_type_check
def reindex(
self,
Expand All @@ -864,6 +834,26 @@ def reindex(
self.data = self.data.set_index(self.datetime_columns(), *args)


def calculate_aggregate(df: pd.DataFrame, method: str) -> pd.Series | Any:
"""Helper function to calculate aggregate over dataframe columns."""
match method.lower():
case "mean" | "average":
out = df.mean(axis=1)
case "min" | "minimum":
out = df.min(axis=1)
case "max" | "maximum":
out = df.max(axis=1)
case "count":
out = df.count(axis=1)
case "sum":
out = df.sum(axis=1)
case "percentile" | _:
raise NotImplementedError(
f"Aggregation method '{method}' is not implemented (yet)."
)
return out


def search(
pattern: str = "*", as_of_tz: datetime = None
) -> list[io.SearchResult] | Dataset | list[None]:
Expand All @@ -872,12 +862,10 @@ def search(
ts_logger.debug(f"DATASET.search returned:\n{found} ")

if len(found) == 1:
# raise NotImplementedError("TODO: extract name and type from result.")
return Dataset(
name=found[0].name,
data_type=properties.seriestype_from_str(found[0].type_directory),
as_of_tz=as_of_tz,
)
else:
# elif len(found) > 1:
return found
47 changes: 3 additions & 44 deletions src/ssb_timeseries/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from ssb_timeseries import fs
from ssb_timeseries import properties
from ssb_timeseries.dates import Interval
from ssb_timeseries.dates import utc_iso_no_colon
from ssb_timeseries.logging import ts_logger
from ssb_timeseries.types import PathStr

Expand Down Expand Up @@ -69,30 +70,9 @@ def __init__(

if as_of_utc is None:
pass
# ecxception if not
# exception if not
else:
# rounded_utc = as_of_utc
self.as_of_utc: datetime = as_of_utc.isoformat().replace(":", "")

# def __new__(
# cls,
# set_name: str,
# set_type: properties.SeriesType,
# as_of_utc: datetime,
# process_stage: str = "statistikk",
# sharing: dict = {},
# type_name="local",
# *args,
# **kwargs,
# ):

# subclass_map = {
# subclass.type_name: subclass for subclass in cls.__subclasses__()
# }
# subclass = subclass_map[type_name]
# instance = super(FileSystem, subclass).__new__(subclass)
# instance.init_fs()
# return instance
self.as_of_utc: datetime = utc_iso_no_colon(as_of_utc)

@property
def root(self) -> str:
Expand Down Expand Up @@ -404,26 +384,6 @@ def snapshot(
f"DATASET {self.set_name}: sharing with {s['team']}, snapshot copied to {s['path']}."
)

@classmethod
def search(
cls, pattern: str | PathStr = "", as_of: datetime | None = None
) -> list[SearchResult]:
"""Search for files in under timeseries root."""
if pattern:
pattern = f"*{pattern}*"
else:
pattern = "*"

search_str = os.path.join(CONFIG.timeseries_root, "*", pattern)
dirs = glob.glob(search_str)
ts_logger.debug(f"DATASET.IO.SEARCH: {search_str} dirs{dirs}")
search_results = [
d.replace(CONFIG.timeseries_root, "root").split(os.path.sep) for d in dirs
]
ts_logger.debug(f"DATASET.IO.SEARCH: search_results{search_results}")

return [SearchResult(f[2], f[1]) for f in search_results]

@classmethod
def dir(cls, *args: str, **kwargs: bool) -> str:
"""Check that target directory is under BUCKET. If so, create it if it does not exist."""
Expand All @@ -444,7 +404,6 @@ def dir(cls, *args: str, **kwargs: bool) -> str:
def find_datasets(
pattern: str | PathStr = "", as_of: datetime | None = None
) -> list[SearchResult]:
# ) -> list[str | PathStr]:
"""Search for files in under timeseries root."""
if pattern:
pattern = f"*{pattern}*"
Expand Down
5 changes: 2 additions & 3 deletions tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,5 @@ def test_read_config_from_missing_json_file(remember_config) -> None:
@pytest.mark.skipif(HOME != "/home/bernhard", reason="None of your business.")
def test_fail(remember_config, caplog):
caplog.set_level(logging.DEBUG)
print("print to std out")
ts_logger.warning("ts_logger.warning: std out")
assert True
ts_logger.debug("ts_logger.warning: std out")
pass
94 changes: 90 additions & 4 deletions tests/test_meta_tagging.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,13 @@ def test_updated_tags_propagates_to_column_names_accordingly() -> None:
raise AssertionError()


@pytest.mark.skip(reason="Not ready yet.")
def test_aggregate_sum_for_flat_list_taxonomy(
caplog,
) -> None:
pass


@log_start_stop
def test_aggregate_sums_for_hierarchical_taxonomy(
conftest,
Expand Down Expand Up @@ -320,13 +327,92 @@ def test_aggregate_sums_for_hierarchical_taxonomy(
# ts_logger.debug(f"calculated: \n{y.data.info()}\n{y.data}")
assert len(y.numeric_columns()) == len(klass157.parent_nodes())
assert sorted(y.numeric_columns()) == sorted(
[n.name for n in klass157.parent_nodes()]
[f"sum({n.name})" for n in klass157.parent_nodes()]
)
# raise AssertionError("In order to see DEBUG logs while testing.")


@pytest.mark.skip(reason="Not ready yet.")
def test_aggregate_sum_for_flat_list_taxonomy(
@log_start_stop
def test_aggregate_mean_for_hierarchical_taxonomy(
conftest,
caplog,
) -> None:
pass
caplog.set_level(logging.DEBUG)
klass157 = Taxonomy(157)
klass157_leaves = [n.name for n in klass157.structure.root.leaves]

set_name = conftest.function_name()
set_tags = {
"Country": "Norway",
}
series_tags = {"A": klass157_leaves, "B": ["q"], "C": ["z"]}
tag_values: list[list[str]] = [value for value in series_tags.values()]

x = Dataset(
name=set_name,
data_type=SeriesType.estimate(),
as_of_tz=date_utc("2022-01-01"),
tags=set_tags,
series_tags=series_tags,
data=create_df(
*tag_values, start_date="2022-01-01", end_date="2022-04-03", freq="MS"
),
name_pattern=["A", "B", "C"],
)

assert len(x.numeric_columns()) == len(klass157_leaves)

y = x.aggregate("A", klass157, "mean")
assert isinstance(y, Dataset)
# ts_logger.debug(f"calculated: \n{y.data.info()}\n{y.data}")
assert len(y.numeric_columns()) == len(klass157.parent_nodes())
assert sorted(y.numeric_columns()) == sorted(
[f"mean({n.name})" for n in klass157.parent_nodes()]
)
# raise AssertionError("In order to see DEBUG logs while testing.")


@log_start_stop
def test_aggregate_multiple_methods_for_hierarchical_taxonomy(
conftest,
caplog,
) -> None:
caplog.set_level(logging.DEBUG)
klass157 = Taxonomy(157)
klass157_leaves = [n.name for n in klass157.structure.root.leaves]

set_name = conftest.function_name()
set_tags = {
"Country": "Norway",
}
series_tags = {"A": klass157_leaves, "B": ["q"], "C": ["z"]}
tag_values: list[list[str]] = [value for value in series_tags.values()]

x = Dataset(
name=set_name,
data_type=SeriesType.estimate(),
as_of_tz=date_utc("2022-01-01"),
tags=set_tags,
series_tags=series_tags,
data=create_df(
*tag_values, start_date="2022-01-01", end_date="2022-04-03", freq="MS"
),
name_pattern=["A", "B", "C"],
)

assert len(x.numeric_columns()) == len(klass157_leaves)
multiple_functions = ["count", "min", "max"]
y = x.aggregate(
attribute="A",
taxonomy=klass157,
aggregate_function=multiple_functions,
)
assert isinstance(y, Dataset)
# ts_logger.debug(f"calculated: \n{y.data.info()}\n{y.data}")
assert len(y.numeric_columns()) == len(
klass157.parent_nodes() * len(multiple_functions)
)
# assert sorted(y.numeric_columns()) == sorted(
# [n.name for n in klass157.parent_nodes()]
# )
# raise AssertionError("In order to see DEBUG logs while testing.")