Skip to content

Commit

Permalink
[DataCatalog]: Lazy dataset loading (#4270)
Browse files Browse the repository at this point in the history
* Move warm-up to runner

Signed-off-by: Elena Khaustova <[email protected]>

* Implemented test for running thread runner with patterns

Signed-off-by: Elena Khaustova <[email protected]>

* Added test for new catalog

Signed-off-by: Elena Khaustova <[email protected]>

* Add line separator to file

Signed-off-by: Elena Khaustova <[email protected]>

* Replaced writing csv manually to writing with pandas

Signed-off-by: Elena Khaustova <[email protected]>

* Fixed fixture

Signed-off-by: Elena Khaustova <[email protected]>

* Removed new catalog from test

Signed-off-by: Elena Khaustova <[email protected]>

* Made catalog type a parameter

Signed-off-by: Elena Khaustova <[email protected]>

* Removed old catalog from test

Signed-off-by: Elena Khaustova <[email protected]>

* Removed new catalog from test

Signed-off-by: Elena Khaustova <[email protected]>

* Removed data creation/loading

Signed-off-by: Elena Khaustova <[email protected]>

* Fixed test docstring

Signed-off-by: Elena Khaustova <[email protected]>

* Removed extra loop

Signed-off-by: Elena Khaustova <[email protected]>

* Renamed variable for clarifty

Signed-off-by: Elena Khaustova <[email protected]>

* Replaced ald method in the constructor

Signed-off-by: Elena Khaustova <[email protected]>

* Implemented dataset materialization

Signed-off-by: Elena Khaustova <[email protected]>

* Added temporal repr

Signed-off-by: Elena Khaustova <[email protected]>

* Removed replacing warning when init

Signed-off-by: Elena Khaustova <[email protected]>

* Improved repr

Signed-off-by: Elena Khaustova <[email protected]>

* Fixed bug in get()

Signed-off-by: Elena Khaustova <[email protected]>

* Moved warm-up to the top

Signed-off-by: Elena Khaustova <[email protected]>

* Moved warm-up to the top

Signed-off-by: Elena Khaustova <[email protected]>

* Moved warm-up to the top

Signed-off-by: Elena Khaustova <[email protected]>

* Updated eq and repr

Signed-off-by: Elena Khaustova <[email protected]>

* Fixed mypy errors

Signed-off-by: Elena Khaustova <[email protected]>

* Removed brackets from repr

Signed-off-by: Elena Khaustova <[email protected]>

* Renamed non_initialized to lazy

Signed-off-by: Elena Khaustova <[email protected]>

* Fixed bug in the constructor

Signed-off-by: Elena Khaustova <[email protected]>

* Made the iteration order fixed

Signed-off-by: Elena Khaustova <[email protected]>

* Fixed unit tests

Signed-off-by: Elena Khaustova <[email protected]>

* Added test for repr

Signed-off-by: Elena Khaustova <[email protected]>

* Added docstrings

Signed-off-by: Elena Khaustova <[email protected]>

* Updated release notes

Signed-off-by: Elena Khaustova <[email protected]>

* Updated release notes

Signed-off-by: Elena Khaustova <[email protected]>

* Renamed variable for consistency

Signed-off-by: Elena Khaustova <[email protected]>

* Updated release notes

Signed-off-by: Elena Khaustova <[email protected]>

---------

Signed-off-by: Elena Khaustova <[email protected]>
  • Loading branch information
ElenaKhaustova authored Nov 6, 2024
1 parent 826021d commit 7b7dad9
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 30 deletions.
1 change: 1 addition & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## Major features and improvements
* Implemented dict-like interface for `KedroDataCatalog`.
* Implemented lazy dataset initializing for `KedroDataCatalog`.

**Note:** ``KedroDataCatalog`` is an experimental feature and is under active development. Therefore, it is possible we'll introduce breaking changes to this class, so be mindful of that if you decide to use it already. Let us know if you have any feedback about the ``KedroDataCatalog`` or ideas for new features.

Expand Down
1 change: 1 addition & 0 deletions kedro/io/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,7 @@ def parse_dataset_definition(
save_version = save_version or generate_timestamp()
config = copy.deepcopy(config)

# TODO: remove when removing old catalog as moved to KedroDataCatalog
if "type" not in config:
raise DatasetError(
"'type' is missing from dataset catalog configuration."
Expand Down
84 changes: 70 additions & 14 deletions kedro/io/kedro_data_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,30 @@
from kedro.utils import _format_rich, _has_rich_handler


class _LazyDataset:
"""A helper class to store AbstractDataset configuration and materialize dataset object."""

def __init__(
self,
name: str,
config: dict[str, Any],
load_version: str | None = None,
save_version: str | None = None,
):
self.name = name
self.config = config
self.load_version = load_version
self.save_version = save_version

def __repr__(self) -> str:
return f"{self.config.get('type', 'UnknownType')}"

def materialize(self) -> AbstractDataset:
return AbstractDataset.from_config(
self.name, self.config, self.load_version, self.save_version
)


class KedroDataCatalog(CatalogProtocol):
def __init__(
self,
Expand Down Expand Up @@ -73,6 +97,7 @@ def __init__(
"""
self._config_resolver = config_resolver or CatalogConfigResolver()
self._datasets = datasets or {}
self._lazy_datasets: dict[str, _LazyDataset] = {}
self._load_versions = load_versions or {}
self._save_version = save_version

Expand All @@ -81,8 +106,9 @@ def __init__(
for ds_name, ds_config in self._config_resolver.config.items():
self._add_from_config(ds_name, ds_config)

if raw_data:
self.add_feed_dict(raw_data)
raw_data = raw_data or {}
for ds_name, data in raw_data.items():
self[ds_name] = data # type: ignore[has-type]

@property
def datasets(self) -> dict[str, Any]:
Expand All @@ -101,36 +127,42 @@ def config_resolver(self) -> CatalogConfigResolver:
return self._config_resolver

def __repr__(self) -> str:
return repr(self._datasets)
return repr(self._lazy_datasets | self._datasets)

def __contains__(self, dataset_name: str) -> bool:
"""Check if an item is in the catalog as a materialised dataset or pattern."""
return (
dataset_name in self._datasets
or dataset_name in self._lazy_datasets
or self._config_resolver.match_pattern(dataset_name) is not None
)

def __eq__(self, other) -> bool: # type: ignore[no-untyped-def]
"""Compares two catalogs based on materialised datasets and datasets patterns."""
return (self._datasets, self._config_resolver.list_patterns()) == (
return (
self._datasets,
self._lazy_datasets,
self._config_resolver.list_patterns(),
) == (
other._datasets,
other._lazy_datasets,
other.config_resolver.list_patterns(),
)

def keys(self) -> List[str]: # noqa: UP006
"""List all dataset names registered in the catalog."""
return list(self.__iter__())
return list(self._lazy_datasets.keys()) + list(self._datasets.keys())

def values(self) -> List[AbstractDataset]: # noqa: UP006
"""List all datasets registered in the catalog."""
return [self._datasets[key] for key in self]
return [self.get(key) for key in self]

def items(self) -> List[tuple[str, AbstractDataset]]: # noqa: UP006
"""List all dataset names and datasets registered in the catalog."""
return [(key, self._datasets[key]) for key in self]
return [(key, self.get(key)) for key in self]

def __iter__(self) -> Iterator[str]:
yield from self._datasets.keys()
yield from self.keys()

def __getitem__(self, ds_name: str) -> AbstractDataset:
"""Get a dataset by name from an internal collection of datasets.
Expand Down Expand Up @@ -187,6 +219,8 @@ def __setitem__(self, key: str, value: Any) -> None:
self._logger.warning("Replacing dataset '%s'", key)
if isinstance(value, AbstractDataset):
self._datasets[key] = value
elif isinstance(value, _LazyDataset):
self._lazy_datasets[key] = value
else:
self._logger.info(f"Adding input data as a MemoryDataset - {key}")
self._datasets[key] = MemoryDataset(data=value) # type: ignore[abstract]
Expand All @@ -210,17 +244,21 @@ def get(
Returns:
An instance of AbstractDataset.
"""
if key not in self._datasets:
if key not in self._datasets and key not in self._lazy_datasets:
ds_config = self._config_resolver.resolve_pattern(key)
if ds_config:
self._add_from_config(key, ds_config)

lazy_dataset = self._lazy_datasets.pop(key, None)
if lazy_dataset:
self[key] = lazy_dataset.materialize()

dataset = self._datasets.get(key, None)

return dataset or default

def _ipython_key_completions_(self) -> list[str]:
return list(self._datasets.keys())
return self.keys()

@property
def _logger(self) -> logging.Logger:
Expand Down Expand Up @@ -335,11 +373,26 @@ def _validate_dataset_config(ds_name: str, ds_config: Any) -> None:
"make sure that the key is preceded by an underscore."
)

if "type" not in ds_config:
raise DatasetError(
f"An exception occurred when parsing config for dataset '{ds_name}':\n"
"'type' is missing from dataset catalog configuration."
"\nHint: If this catalog entry is intended for variable interpolation, "
"make sure that the top level key is preceded by an underscore."
)

def _add_from_config(self, ds_name: str, ds_config: dict[str, Any]) -> None:
# TODO: Add lazy loading feature to store the configuration but not to init actual dataset
# TODO: Initialise actual dataset when load or save
"""Create a LazyDataset instance and add it to the catalog.
Args:
ds_name: A dataset name.
ds_config: A dataset configuration.
Raises:
DatasetError: When a dataset configuration provided is not valid.
"""
self._validate_dataset_config(ds_name, ds_config)
ds = AbstractDataset.from_config(
ds = _LazyDataset(
ds_name,
ds_config,
self._load_versions.get(ds_name),
Expand Down Expand Up @@ -398,7 +451,10 @@ def _get_dataset(
return self.get_dataset(dataset_name, version, suggest)

def add(
self, ds_name: str, dataset: AbstractDataset, replace: bool = False
self,
ds_name: str,
dataset: AbstractDataset | _LazyDataset,
replace: bool = False,
) -> None:
# TODO: remove when removing old catalog
"""Adds a new ``AbstractDataset`` object to the ``KedroDataCatalog``."""
Expand Down
36 changes: 20 additions & 16 deletions tests/io/test_kedro_data_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,14 +178,14 @@ def test_eq(self, multi_catalog, data_catalog):

def test_datasets_on_init(self, data_catalog_from_config):
"""Check datasets are loaded correctly on construction"""
assert isinstance(data_catalog_from_config.datasets["boats"], CSVDataset)
assert isinstance(data_catalog_from_config.datasets["cars"], CSVDataset)
assert isinstance(data_catalog_from_config.get("boats"), CSVDataset)
assert isinstance(data_catalog_from_config.get("cars"), CSVDataset)

def test_datasets_on_add(self, data_catalog_from_config):
"""Check datasets are updated correctly after adding"""
data_catalog_from_config.add("new_dataset", CSVDataset(filepath="some_path"))
assert isinstance(data_catalog_from_config.datasets["new_dataset"], CSVDataset)
assert isinstance(data_catalog_from_config.datasets["boats"], CSVDataset)
assert isinstance(data_catalog_from_config.get("new_dataset"), CSVDataset)
assert isinstance(data_catalog_from_config.get("boats"), CSVDataset)

def test_adding_datasets_not_allowed(self, data_catalog_from_config):
"""Check error if user tries to update the datasets attribute"""
Expand Down Expand Up @@ -260,12 +260,16 @@ def test_init_with_raw_data(self, dummy_dataframe, dataset):
)
assert "ds" in catalog
assert "df" in catalog
assert isinstance(catalog.datasets["ds"], CSVDataset)
assert isinstance(catalog.datasets["df"], MemoryDataset)
assert isinstance(catalog["ds"], CSVDataset)
assert isinstance(catalog["df"], MemoryDataset)

def test_repr(self, data_catalog):
assert data_catalog.__repr__() == str(data_catalog)

def test_repr_no_type_found(self, data_catalog_from_config):
del data_catalog_from_config._lazy_datasets["boats"].config["type"]
assert data_catalog_from_config.__repr__() == str(data_catalog_from_config)

def test_missing_keys_from_load_versions(self, correct_config):
"""Test load versions include keys missing in the catalog"""
pattern = "'load_versions' keys [version] are not found in the catalog."
Expand Down Expand Up @@ -314,15 +318,15 @@ def test_config_invalid_module(self, correct_config):

error_msg = "Class 'kedro.invalid_module_name.io.CSVDataset' not found"
with pytest.raises(DatasetError, match=re.escape(error_msg)):
KedroDataCatalog.from_config(**correct_config)
KedroDataCatalog.from_config(**correct_config).get("boats")

def test_config_relative_import(self, correct_config):
"""Check the error if the type points to a relative import"""
correct_config["catalog"]["boats"]["type"] = ".CSVDatasetInvalid"

pattern = "'type' class path does not support relative paths"
with pytest.raises(DatasetError, match=re.escape(pattern)):
KedroDataCatalog.from_config(**correct_config)
KedroDataCatalog.from_config(**correct_config).get("boats")

def test_config_import_kedro_datasets(self, correct_config, mocker):
"""Test kedro_datasets default path to the dataset class"""
Expand All @@ -344,7 +348,7 @@ def test_config_missing_class(self, correct_config):
"Class 'kedro.io.CSVDatasetInvalid' not found, is this a typo?"
)
with pytest.raises(DatasetError, match=re.escape(pattern)):
KedroDataCatalog.from_config(**correct_config)
KedroDataCatalog.from_config(**correct_config).get("boats")

def test_config_invalid_dataset(self, correct_config):
"""Check the error if the type points to invalid class"""
Expand All @@ -355,7 +359,7 @@ def test_config_invalid_dataset(self, correct_config):
"all dataset types must extend 'AbstractDataset'"
)
with pytest.raises(DatasetError, match=re.escape(pattern)):
KedroDataCatalog.from_config(**correct_config)
KedroDataCatalog.from_config(**correct_config).get("boats")

def test_config_invalid_arguments(self, correct_config):
"""Check the error if the dataset config contains invalid arguments"""
Expand All @@ -365,7 +369,7 @@ def test_config_invalid_arguments(self, correct_config):
r"the constructor of '.*CSVDataset'"
)
with pytest.raises(DatasetError, match=pattern):
KedroDataCatalog.from_config(**correct_config)
KedroDataCatalog.from_config(**correct_config).get("boats")

def test_config_invalid_dataset_config(self, correct_config):
correct_config["catalog"]["invalid_entry"] = "some string"
Expand Down Expand Up @@ -395,7 +399,7 @@ def test_link_credentials(self, correct_config, mocker):
config = deepcopy(correct_config)
del config["catalog"]["boats"]

KedroDataCatalog.from_config(**config)
KedroDataCatalog.from_config(**config).get("cars")

expected_client_kwargs = correct_config["credentials"]["s3_credentials"]
mock_client.filesystem.assert_called_with("s3", **expected_client_kwargs)
Expand All @@ -404,7 +408,7 @@ def test_nested_credentials(self, correct_config_with_nested_creds, mocker):
mock_client = mocker.patch("kedro_datasets.pandas.csv_dataset.fsspec")
config = deepcopy(correct_config_with_nested_creds)
del config["catalog"]["boats"]
KedroDataCatalog.from_config(**config)
KedroDataCatalog.from_config(**config).get("cars")

expected_client_kwargs = {
"client_kwargs": {
Expand Down Expand Up @@ -439,7 +443,7 @@ def dummy_load(obj_path, *args, **kwargs):

mocker.patch("kedro.io.core.load_obj", side_effect=dummy_load)
with pytest.raises(DatasetError, match=pattern):
KedroDataCatalog.from_config(**correct_config)
KedroDataCatalog.from_config(**correct_config).get("boats")

def test_idempotent_catalog(self, correct_config):
"""Test that data catalog instantiations are idempotent"""
Expand All @@ -451,7 +455,7 @@ def test_error_dataset_init(self, bad_config):
"""Check the error when trying to instantiate erroneous dataset"""
pattern = r"Failed to instantiate dataset \'bad\' of type '.*BadDataset'"
with pytest.raises(DatasetError, match=pattern):
KedroDataCatalog.from_config(bad_config, None)
KedroDataCatalog.from_config(bad_config, None).get("bad")

def test_validate_dataset_config(self):
"""Test _validate_dataset_config raises error when wrong dataset config type is passed"""
Expand Down Expand Up @@ -583,7 +587,7 @@ def test_from_correct_config_versioned_warn(
to the dataset config"""
correct_config["catalog"]["boats"]["versioned"] = versioned
correct_config["catalog"]["boats"]["version"] = True
KedroDataCatalog.from_config(**correct_config)
KedroDataCatalog.from_config(**correct_config).get("boats")
log_record = caplog.records[0]
expected_log_message = (
"'version' attribute removed from dataset configuration since it "
Expand Down

0 comments on commit 7b7dad9

Please sign in to comment.