Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DataCatalog]: Lazy dataset loading #4270

Merged
merged 46 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from 43 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
7c794f4
Move warm-up to runner
ElenaKhaustova Oct 28, 2024
208a24b
Implemented test for running thread runner with patterns
ElenaKhaustova Oct 28, 2024
7c6729a
Added test for new catalog
ElenaKhaustova Oct 28, 2024
9d5b37d
Add line separator to file
ElenaKhaustova Oct 28, 2024
c3229c0
Replaced writing csv manually to writing with pandas
ElenaKhaustova Oct 28, 2024
6c509d9
Merge branch 'main' into fix/4250-move-warm-up-to-runner
ElenaKhaustova Oct 28, 2024
bd878c9
Fixed fixture
ElenaKhaustova Oct 28, 2024
68010aa
Removed new catalog from test
ElenaKhaustova Oct 28, 2024
29d373f
Made catalog type a parameter
ElenaKhaustova Oct 28, 2024
e90cfd7
Removed old catalog from test
ElenaKhaustova Oct 28, 2024
3f1dbe0
Removed new catalog from test
ElenaKhaustova Oct 28, 2024
892cda4
Removed data creation/loading
ElenaKhaustova Oct 29, 2024
e7f2632
Fixed test docstring
ElenaKhaustova Oct 29, 2024
429ca13
Removed extra loop
ElenaKhaustova Oct 29, 2024
3ffd538
Renamed variable for clarifty
ElenaKhaustova Oct 29, 2024
681d3f1
Merge branch 'main' into fix/4250-move-warm-up-to-runner
ElenaKhaustova Oct 29, 2024
8177ddc
Replaced ald method in the constructor
ElenaKhaustova Oct 29, 2024
379f3b4
Implemented dataset materialization
ElenaKhaustova Oct 29, 2024
9335578
Added temporal repr
ElenaKhaustova Oct 29, 2024
26d0c84
Removed replacing warning when init
ElenaKhaustova Oct 29, 2024
dea5630
Improved repr
ElenaKhaustova Oct 29, 2024
0da806f
Fixed bug in get()
ElenaKhaustova Oct 29, 2024
b813408
Merge branch 'fix/4250-move-warm-up-to-runner' into feature/3935-lazy…
ElenaKhaustova Oct 29, 2024
c63ece6
Moved warm-up to the top
ElenaKhaustova Oct 29, 2024
01f9b62
Moved warm-up to the top
ElenaKhaustova Oct 29, 2024
069dff4
Moved warm-up to the top
ElenaKhaustova Oct 29, 2024
15df0f8
Merge branch 'fix/4250-move-warm-up-to-runner' into feature/3935-lazy…
ElenaKhaustova Oct 29, 2024
da669f5
Updated eq and repr
ElenaKhaustova Oct 29, 2024
3999913
Fixed mypy errors
ElenaKhaustova Oct 30, 2024
c50cad4
Removed brackets from repr
ElenaKhaustova Oct 31, 2024
f2861ed
Renamed non_initialized to lazy
ElenaKhaustova Oct 31, 2024
df5807c
Fixed bug in the constructor
ElenaKhaustova Oct 31, 2024
eecf23a
Made the iteration order fixed
ElenaKhaustova Oct 31, 2024
128e79c
Fixed unit tests
ElenaKhaustova Oct 31, 2024
3b475aa
Added test for repr
ElenaKhaustova Oct 31, 2024
32dc417
Merge branch 'main' into feature/3935-lazy-loading
ElenaKhaustova Oct 31, 2024
72c106b
Merge branch 'main' into feature/3935-lazy-loading
ElenaKhaustova Nov 1, 2024
870b794
Added docstrings
ElenaKhaustova Nov 1, 2024
e031f8a
Updated release notes
ElenaKhaustova Nov 1, 2024
9d0f579
Merge branch 'main' into fix/4250-move-warm-up-to-runner
ElenaKhaustova Nov 1, 2024
5f6ef85
Updated release notes
ElenaKhaustova Nov 1, 2024
6d23b4d
Merge branch 'fix/4250-move-warm-up-to-runner' into feature/3935-lazy…
ElenaKhaustova Nov 1, 2024
c31c5eb
Renamed variable for consistency
ElenaKhaustova Nov 4, 2024
822e206
Updated release notes
ElenaKhaustova Nov 5, 2024
1f14ded
Merge branch 'main' into feature/3935-lazy-loading
ElenaKhaustova Nov 6, 2024
25aebce
Merge branch 'main' into feature/3935-lazy-loading
ElenaKhaustova Nov 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@

## Major features and improvements
* Implemented dict-like interface for `KedroDataCatalog`.
* Implemented lazy dataset loading for `KedroDataCatalog`.
ElenaKhaustova marked this conversation as resolved.
Show resolved Hide resolved

**Note:** ``KedroDataCatalog`` is an experimental feature and is under active development. Therefore, it is possible we'll introduce breaking changes to this class, so be mindful of that if you decide to use it already. Let us know if you have any feedback about the ``KedroDataCatalog`` or ideas for new features.

## Bug fixes and other changes
* Added I/O support for Oracle Cloud Infrastructure (OCI) Object Storage filesystem
* Added I/O support for Oracle Cloud Infrastructure (OCI) Object Storage filesystem.
* Fixed `DatasetAlreadyExistsError` for `ThreadRunner` when Kedro project run and using runner separately.

## Breaking changes to the API
## Documentation changes
Expand Down
7 changes: 1 addition & 6 deletions kedro/framework/session/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
validate_settings,
)
from kedro.io.core import generate_timestamp
from kedro.runner import AbstractRunner, SequentialRunner, ThreadRunner
from kedro.runner import AbstractRunner, SequentialRunner
from kedro.utils import _find_kedro_project

if TYPE_CHECKING:
Expand Down Expand Up @@ -395,11 +395,6 @@ def run( # noqa: PLR0913
hook_manager.hook.before_pipeline_run(
run_params=record_data, pipeline=filtered_pipeline, catalog=catalog
)

if isinstance(runner, ThreadRunner):
for ds in filtered_pipeline.datasets():
if catalog.config_resolver.match_pattern(ds):
_ = catalog._get_dataset(ds)
try:
run_result = runner.run(
filtered_pipeline, catalog, hook_manager, session_id
Expand Down
1 change: 1 addition & 0 deletions kedro/io/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,7 @@ def parse_dataset_definition(
save_version = save_version or generate_timestamp()
config = copy.deepcopy(config)

# TODO: remove when removing old catalog as moved to KedroDataCatalog
if "type" not in config:
raise DatasetError(
"'type' is missing from dataset catalog configuration."
Expand Down
84 changes: 70 additions & 14 deletions kedro/io/kedro_data_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,30 @@
from kedro.utils import _format_rich, _has_rich_handler


class _LazyDataset:
"""A helper class to store AbstractDataset configuration and materialize dataset object."""

def __init__(
self,
name: str,
config: dict[str, Any],
load_version: str | None = None,
save_version: str | None = None,
):
self.name = name
self.config = config
self.load_version = load_version
self.save_version = save_version

def __repr__(self) -> str:
return f"{self.config.get('type', 'UnknownType')}"

def materialize(self) -> AbstractDataset:
return AbstractDataset.from_config(
self.name, self.config, self.load_version, self.save_version
)


class KedroDataCatalog(CatalogProtocol):
def __init__(
self,
Expand Down Expand Up @@ -73,6 +97,7 @@ def __init__(
"""
self._config_resolver = config_resolver or CatalogConfigResolver()
self._datasets = datasets or {}
self._lazy_datasets: dict[str, _LazyDataset] = {}
self._load_versions = load_versions or {}
self._save_version = save_version

Expand All @@ -81,8 +106,9 @@ def __init__(
for ds_name, ds_config in self._config_resolver.config.items():
self._add_from_config(ds_name, ds_config)

if raw_data:
self.add_feed_dict(raw_data)
raw_data = raw_data or {}
for ds_name, data in raw_data.items():
self[ds_name] = data # type: ignore[has-type]

@property
def datasets(self) -> dict[str, Any]:
Expand All @@ -101,36 +127,42 @@ def config_resolver(self) -> CatalogConfigResolver:
return self._config_resolver

def __repr__(self) -> str:
return repr(self._datasets)
return repr(self._lazy_datasets | self._datasets)

def __contains__(self, dataset_name: str) -> bool:
"""Check if an item is in the catalog as a materialised dataset or pattern."""
return (
dataset_name in self._datasets
or dataset_name in self._lazy_datasets
or self._config_resolver.match_pattern(dataset_name) is not None
)

def __eq__(self, other) -> bool: # type: ignore[no-untyped-def]
"""Compares two catalogs based on materialised datasets and datasets patterns."""
return (self._datasets, self._config_resolver.list_patterns()) == (
return (
self._datasets,
self._lazy_datasets,
self._config_resolver.list_patterns(),
) == (
other._datasets,
other._lazy_datasets,
other.config_resolver.list_patterns(),
)

def keys(self) -> List[str]: # noqa: UP006
"""List all dataset names registered in the catalog."""
return list(self.__iter__())
return list(self._lazy_datasets.keys()) + list(self._datasets.keys())

def values(self) -> List[AbstractDataset]: # noqa: UP006
"""List all datasets registered in the catalog."""
return [self._datasets[key] for key in self]
return [self.get(key) for key in self]

def items(self) -> List[tuple[str, AbstractDataset]]: # noqa: UP006
"""List all dataset names and datasets registered in the catalog."""
return [(key, self._datasets[key]) for key in self]
return [(key, self.get(key)) for key in self]

def __iter__(self) -> Iterator[str]:
yield from self._datasets.keys()
yield from self.keys()

def __getitem__(self, ds_name: str) -> AbstractDataset:
"""Get a dataset by name from an internal collection of datasets.
Expand Down Expand Up @@ -187,6 +219,8 @@ def __setitem__(self, key: str, value: Any) -> None:
self._logger.warning("Replacing dataset '%s'", key)
if isinstance(value, AbstractDataset):
self._datasets[key] = value
elif isinstance(value, _LazyDataset):
self._lazy_datasets[key] = value
else:
self._logger.info(f"Adding input data as a MemoryDataset - {key}")
self._datasets[key] = MemoryDataset(data=value) # type: ignore[abstract]
Expand All @@ -210,17 +244,21 @@ def get(
Returns:
An instance of AbstractDataset.
"""
if key not in self._datasets:
if key not in self._datasets and key not in self._lazy_datasets:
ds_config = self._config_resolver.resolve_pattern(key)
if ds_config:
self._add_from_config(key, ds_config)

lazy_dataset = self._lazy_datasets.pop(key, None)
if lazy_dataset:
self[key] = lazy_dataset.materialize()

dataset = self._datasets.get(key, None)

return dataset or default

def _ipython_key_completions_(self) -> list[str]:
return list(self._datasets.keys())
return self.keys()

@property
def _logger(self) -> logging.Logger:
Expand Down Expand Up @@ -335,11 +373,26 @@ def _validate_dataset_config(ds_name: str, ds_config: Any) -> None:
"make sure that the key is preceded by an underscore."
)

if "type" not in ds_config:
raise DatasetError(
f"An exception occurred when parsing config for dataset '{ds_name}':\n"
"'type' is missing from dataset catalog configuration."
"\nHint: If this catalog entry is intended for variable interpolation, "
"make sure that the top level key is preceded by an underscore."
)

def _add_from_config(self, ds_name: str, ds_config: dict[str, Any]) -> None:
# TODO: Add lazy loading feature to store the configuration but not to init actual dataset
# TODO: Initialise actual dataset when load or save
"""Create a LazyDataset instance and add it to the catalog.

Args:
ds_name: A dataset name.
ds_config: A dataset configuration.

Raises:
DatasetError: When a dataset configuration provided is not valid.
"""
self._validate_dataset_config(ds_name, ds_config)
ds = AbstractDataset.from_config(
ds = _LazyDataset(
ds_name,
ds_config,
self._load_versions.get(ds_name),
Expand Down Expand Up @@ -398,7 +451,10 @@ def _get_dataset(
return self.get_dataset(dataset_name, version, suggest)

def add(
self, ds_name: str, dataset: AbstractDataset, replace: bool = False
self,
ds_name: str,
dataset: AbstractDataset | _LazyDataset,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure we'd like to expose _LazyDataset to the users? If yes, then maybe we need to make it a proper LazyDataset that can be instantiated even outside of the catalog. Otherwise, I'd prefer for us to hide it from this method signature.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want to expose it and we added it here only until we use the old add() method for compatibility with the old catalog. I will go after the breaking change as setter will be used instead.

replace: bool = False,
) -> None:
# TODO: remove when removing old catalog
"""Adds a new ``AbstractDataset`` object to the ``KedroDataCatalog``."""
Expand Down
10 changes: 8 additions & 2 deletions kedro/runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,16 @@ def run(
"""
# Check which datasets used in the pipeline are in the catalog or match
# a pattern in the catalog, not including extra dataset patterns
registered_ds = [ds for ds in pipeline.datasets() if ds in catalog]
# Run a warm-up to materialize all datasets in the catalog before run
registered_ds_no_runtime_patterns = []
for ds in pipeline.datasets():
if ds in catalog:
registered_ds_no_runtime_patterns.append(ds)
_ = catalog._get_dataset(ds)

# Check if there are any input datasets that aren't in the catalog and
# don't match a pattern in the catalog.
unsatisfied = pipeline.inputs() - set(registered_ds)
unsatisfied = pipeline.inputs() - set(registered_ds_no_runtime_patterns)

if unsatisfied:
raise ValueError(
Expand All @@ -104,6 +109,7 @@ def run(
self._logger.info(
"Asynchronous mode is enabled for loading and saving data"
)

self._run(pipeline, catalog, hook_or_null_manager, session_id) # type: ignore[arg-type]

self._logger.info("Pipeline execution completed successfully.")
Expand Down
36 changes: 20 additions & 16 deletions tests/io/test_kedro_data_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,14 +178,14 @@ def test_eq(self, multi_catalog, data_catalog):

def test_datasets_on_init(self, data_catalog_from_config):
"""Check datasets are loaded correctly on construction"""
assert isinstance(data_catalog_from_config.datasets["boats"], CSVDataset)
assert isinstance(data_catalog_from_config.datasets["cars"], CSVDataset)
assert isinstance(data_catalog_from_config.get("boats"), CSVDataset)
assert isinstance(data_catalog_from_config.get("cars"), CSVDataset)

def test_datasets_on_add(self, data_catalog_from_config):
"""Check datasets are updated correctly after adding"""
data_catalog_from_config.add("new_dataset", CSVDataset(filepath="some_path"))
assert isinstance(data_catalog_from_config.datasets["new_dataset"], CSVDataset)
assert isinstance(data_catalog_from_config.datasets["boats"], CSVDataset)
assert isinstance(data_catalog_from_config.get("new_dataset"), CSVDataset)
assert isinstance(data_catalog_from_config.get("boats"), CSVDataset)

def test_adding_datasets_not_allowed(self, data_catalog_from_config):
"""Check error if user tries to update the datasets attribute"""
Expand Down Expand Up @@ -260,12 +260,16 @@ def test_init_with_raw_data(self, dummy_dataframe, dataset):
)
assert "ds" in catalog
assert "df" in catalog
assert isinstance(catalog.datasets["ds"], CSVDataset)
assert isinstance(catalog.datasets["df"], MemoryDataset)
assert isinstance(catalog["ds"], CSVDataset)
assert isinstance(catalog["df"], MemoryDataset)

def test_repr(self, data_catalog):
assert data_catalog.__repr__() == str(data_catalog)

def test_repr_no_type_found(self, data_catalog_from_config):
del data_catalog_from_config._lazy_datasets["boats"].config["type"]
assert data_catalog_from_config.__repr__() == str(data_catalog_from_config)

def test_missing_keys_from_load_versions(self, correct_config):
"""Test load versions include keys missing in the catalog"""
pattern = "'load_versions' keys [version] are not found in the catalog."
Expand Down Expand Up @@ -314,15 +318,15 @@ def test_config_invalid_module(self, correct_config):

error_msg = "Class 'kedro.invalid_module_name.io.CSVDataset' not found"
with pytest.raises(DatasetError, match=re.escape(error_msg)):
KedroDataCatalog.from_config(**correct_config)
KedroDataCatalog.from_config(**correct_config).get("boats")

def test_config_relative_import(self, correct_config):
"""Check the error if the type points to a relative import"""
correct_config["catalog"]["boats"]["type"] = ".CSVDatasetInvalid"

pattern = "'type' class path does not support relative paths"
with pytest.raises(DatasetError, match=re.escape(pattern)):
KedroDataCatalog.from_config(**correct_config)
KedroDataCatalog.from_config(**correct_config).get("boats")

def test_config_import_kedro_datasets(self, correct_config, mocker):
"""Test kedro_datasets default path to the dataset class"""
Expand All @@ -344,7 +348,7 @@ def test_config_missing_class(self, correct_config):
"Class 'kedro.io.CSVDatasetInvalid' not found, is this a typo?"
)
with pytest.raises(DatasetError, match=re.escape(pattern)):
KedroDataCatalog.from_config(**correct_config)
KedroDataCatalog.from_config(**correct_config).get("boats")

def test_config_invalid_dataset(self, correct_config):
"""Check the error if the type points to invalid class"""
Expand All @@ -355,7 +359,7 @@ def test_config_invalid_dataset(self, correct_config):
"all dataset types must extend 'AbstractDataset'"
)
with pytest.raises(DatasetError, match=re.escape(pattern)):
KedroDataCatalog.from_config(**correct_config)
KedroDataCatalog.from_config(**correct_config).get("boats")

def test_config_invalid_arguments(self, correct_config):
"""Check the error if the dataset config contains invalid arguments"""
Expand All @@ -365,7 +369,7 @@ def test_config_invalid_arguments(self, correct_config):
r"the constructor of '.*CSVDataset'"
)
with pytest.raises(DatasetError, match=pattern):
KedroDataCatalog.from_config(**correct_config)
KedroDataCatalog.from_config(**correct_config).get("boats")

def test_config_invalid_dataset_config(self, correct_config):
correct_config["catalog"]["invalid_entry"] = "some string"
Expand Down Expand Up @@ -395,7 +399,7 @@ def test_link_credentials(self, correct_config, mocker):
config = deepcopy(correct_config)
del config["catalog"]["boats"]

KedroDataCatalog.from_config(**config)
KedroDataCatalog.from_config(**config).get("cars")

expected_client_kwargs = correct_config["credentials"]["s3_credentials"]
mock_client.filesystem.assert_called_with("s3", **expected_client_kwargs)
Expand All @@ -404,7 +408,7 @@ def test_nested_credentials(self, correct_config_with_nested_creds, mocker):
mock_client = mocker.patch("kedro_datasets.pandas.csv_dataset.fsspec")
config = deepcopy(correct_config_with_nested_creds)
del config["catalog"]["boats"]
KedroDataCatalog.from_config(**config)
KedroDataCatalog.from_config(**config).get("cars")

expected_client_kwargs = {
"client_kwargs": {
Expand Down Expand Up @@ -439,7 +443,7 @@ def dummy_load(obj_path, *args, **kwargs):

mocker.patch("kedro.io.core.load_obj", side_effect=dummy_load)
with pytest.raises(DatasetError, match=pattern):
KedroDataCatalog.from_config(**correct_config)
KedroDataCatalog.from_config(**correct_config).get("boats")

def test_idempotent_catalog(self, correct_config):
"""Test that data catalog instantiations are idempotent"""
Expand All @@ -451,7 +455,7 @@ def test_error_dataset_init(self, bad_config):
"""Check the error when trying to instantiate erroneous dataset"""
pattern = r"Failed to instantiate dataset \'bad\' of type '.*BadDataset'"
with pytest.raises(DatasetError, match=pattern):
KedroDataCatalog.from_config(bad_config, None)
KedroDataCatalog.from_config(bad_config, None).get("bad")

def test_validate_dataset_config(self):
"""Test _validate_dataset_config raises error when wrong dataset config type is passed"""
Expand Down Expand Up @@ -583,7 +587,7 @@ def test_from_correct_config_versioned_warn(
to the dataset config"""
correct_config["catalog"]["boats"]["versioned"] = versioned
correct_config["catalog"]["boats"]["version"] = True
KedroDataCatalog.from_config(**correct_config)
KedroDataCatalog.from_config(**correct_config).get("boats")
log_record = caplog.records[0]
expected_log_message = (
"'version' attribute removed from dataset configuration since it "
Expand Down
Loading