diff --git a/RELEASE.md b/RELEASE.md index 887b7ae063..2599688a4a 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -2,6 +2,7 @@ ## Major features and improvements * Implemented dict-like interface for `KedroDataCatalog`. +* Implemented lazy dataset initializing for `KedroDataCatalog`. **Note:** ``KedroDataCatalog`` is an experimental feature and is under active development. Therefore, it is possible we'll introduce breaking changes to this class, so be mindful of that if you decide to use it already. Let us know if you have any feedback about the ``KedroDataCatalog`` or ideas for new features. diff --git a/kedro/io/core.py b/kedro/io/core.py index a57baba6f5..01e85963b9 100644 --- a/kedro/io/core.py +++ b/kedro/io/core.py @@ -483,6 +483,7 @@ def parse_dataset_definition( save_version = save_version or generate_timestamp() config = copy.deepcopy(config) + # TODO: remove when removing old catalog as moved to KedroDataCatalog if "type" not in config: raise DatasetError( "'type' is missing from dataset catalog configuration." diff --git a/kedro/io/kedro_data_catalog.py b/kedro/io/kedro_data_catalog.py index 27dcf1a765..8bbf573d7e 100644 --- a/kedro/io/kedro_data_catalog.py +++ b/kedro/io/kedro_data_catalog.py @@ -31,6 +31,30 @@ from kedro.utils import _format_rich, _has_rich_handler +class _LazyDataset: + """A helper class to store AbstractDataset configuration and materialize dataset object.""" + + def __init__( + self, + name: str, + config: dict[str, Any], + load_version: str | None = None, + save_version: str | None = None, + ): + self.name = name + self.config = config + self.load_version = load_version + self.save_version = save_version + + def __repr__(self) -> str: + return f"{self.config.get('type', 'UnknownType')}" + + def materialize(self) -> AbstractDataset: + return AbstractDataset.from_config( + self.name, self.config, self.load_version, self.save_version + ) + + class KedroDataCatalog(CatalogProtocol): def __init__( self, @@ -73,6 +97,7 @@ def __init__( """ self._config_resolver = config_resolver or CatalogConfigResolver() self._datasets = datasets or {} + self._lazy_datasets: dict[str, _LazyDataset] = {} self._load_versions = load_versions or {} self._save_version = save_version @@ -81,8 +106,9 @@ def __init__( for ds_name, ds_config in self._config_resolver.config.items(): self._add_from_config(ds_name, ds_config) - if raw_data: - self.add_feed_dict(raw_data) + raw_data = raw_data or {} + for ds_name, data in raw_data.items(): + self[ds_name] = data # type: ignore[has-type] @property def datasets(self) -> dict[str, Any]: @@ -101,36 +127,42 @@ def config_resolver(self) -> CatalogConfigResolver: return self._config_resolver def __repr__(self) -> str: - return repr(self._datasets) + return repr(self._lazy_datasets | self._datasets) def __contains__(self, dataset_name: str) -> bool: """Check if an item is in the catalog as a materialised dataset or pattern.""" return ( dataset_name in self._datasets + or dataset_name in self._lazy_datasets or self._config_resolver.match_pattern(dataset_name) is not None ) def __eq__(self, other) -> bool: # type: ignore[no-untyped-def] """Compares two catalogs based on materialised datasets and datasets patterns.""" - return (self._datasets, self._config_resolver.list_patterns()) == ( + return ( + self._datasets, + self._lazy_datasets, + self._config_resolver.list_patterns(), + ) == ( other._datasets, + other._lazy_datasets, other.config_resolver.list_patterns(), ) def keys(self) -> List[str]: # noqa: UP006 """List all dataset names registered in the catalog.""" - return list(self.__iter__()) + return list(self._lazy_datasets.keys()) + list(self._datasets.keys()) def values(self) -> List[AbstractDataset]: # noqa: UP006 """List all datasets registered in the catalog.""" - return [self._datasets[key] for key in self] + return [self.get(key) for key in self] def items(self) -> List[tuple[str, AbstractDataset]]: # noqa: UP006 """List all dataset names and datasets registered in the catalog.""" - return [(key, self._datasets[key]) for key in self] + return [(key, self.get(key)) for key in self] def __iter__(self) -> Iterator[str]: - yield from self._datasets.keys() + yield from self.keys() def __getitem__(self, ds_name: str) -> AbstractDataset: """Get a dataset by name from an internal collection of datasets. @@ -187,6 +219,8 @@ def __setitem__(self, key: str, value: Any) -> None: self._logger.warning("Replacing dataset '%s'", key) if isinstance(value, AbstractDataset): self._datasets[key] = value + elif isinstance(value, _LazyDataset): + self._lazy_datasets[key] = value else: self._logger.info(f"Adding input data as a MemoryDataset - {key}") self._datasets[key] = MemoryDataset(data=value) # type: ignore[abstract] @@ -210,17 +244,21 @@ def get( Returns: An instance of AbstractDataset. """ - if key not in self._datasets: + if key not in self._datasets and key not in self._lazy_datasets: ds_config = self._config_resolver.resolve_pattern(key) if ds_config: self._add_from_config(key, ds_config) + lazy_dataset = self._lazy_datasets.pop(key, None) + if lazy_dataset: + self[key] = lazy_dataset.materialize() + dataset = self._datasets.get(key, None) return dataset or default def _ipython_key_completions_(self) -> list[str]: - return list(self._datasets.keys()) + return self.keys() @property def _logger(self) -> logging.Logger: @@ -335,11 +373,26 @@ def _validate_dataset_config(ds_name: str, ds_config: Any) -> None: "make sure that the key is preceded by an underscore." ) + if "type" not in ds_config: + raise DatasetError( + f"An exception occurred when parsing config for dataset '{ds_name}':\n" + "'type' is missing from dataset catalog configuration." + "\nHint: If this catalog entry is intended for variable interpolation, " + "make sure that the top level key is preceded by an underscore." + ) + def _add_from_config(self, ds_name: str, ds_config: dict[str, Any]) -> None: - # TODO: Add lazy loading feature to store the configuration but not to init actual dataset - # TODO: Initialise actual dataset when load or save + """Create a LazyDataset instance and add it to the catalog. + + Args: + ds_name: A dataset name. + ds_config: A dataset configuration. + + Raises: + DatasetError: When a dataset configuration provided is not valid. + """ self._validate_dataset_config(ds_name, ds_config) - ds = AbstractDataset.from_config( + ds = _LazyDataset( ds_name, ds_config, self._load_versions.get(ds_name), @@ -398,7 +451,10 @@ def _get_dataset( return self.get_dataset(dataset_name, version, suggest) def add( - self, ds_name: str, dataset: AbstractDataset, replace: bool = False + self, + ds_name: str, + dataset: AbstractDataset | _LazyDataset, + replace: bool = False, ) -> None: # TODO: remove when removing old catalog """Adds a new ``AbstractDataset`` object to the ``KedroDataCatalog``.""" diff --git a/tests/io/test_kedro_data_catalog.py b/tests/io/test_kedro_data_catalog.py index a53717f8ba..367580ef80 100644 --- a/tests/io/test_kedro_data_catalog.py +++ b/tests/io/test_kedro_data_catalog.py @@ -178,14 +178,14 @@ def test_eq(self, multi_catalog, data_catalog): def test_datasets_on_init(self, data_catalog_from_config): """Check datasets are loaded correctly on construction""" - assert isinstance(data_catalog_from_config.datasets["boats"], CSVDataset) - assert isinstance(data_catalog_from_config.datasets["cars"], CSVDataset) + assert isinstance(data_catalog_from_config.get("boats"), CSVDataset) + assert isinstance(data_catalog_from_config.get("cars"), CSVDataset) def test_datasets_on_add(self, data_catalog_from_config): """Check datasets are updated correctly after adding""" data_catalog_from_config.add("new_dataset", CSVDataset(filepath="some_path")) - assert isinstance(data_catalog_from_config.datasets["new_dataset"], CSVDataset) - assert isinstance(data_catalog_from_config.datasets["boats"], CSVDataset) + assert isinstance(data_catalog_from_config.get("new_dataset"), CSVDataset) + assert isinstance(data_catalog_from_config.get("boats"), CSVDataset) def test_adding_datasets_not_allowed(self, data_catalog_from_config): """Check error if user tries to update the datasets attribute""" @@ -260,12 +260,16 @@ def test_init_with_raw_data(self, dummy_dataframe, dataset): ) assert "ds" in catalog assert "df" in catalog - assert isinstance(catalog.datasets["ds"], CSVDataset) - assert isinstance(catalog.datasets["df"], MemoryDataset) + assert isinstance(catalog["ds"], CSVDataset) + assert isinstance(catalog["df"], MemoryDataset) def test_repr(self, data_catalog): assert data_catalog.__repr__() == str(data_catalog) + def test_repr_no_type_found(self, data_catalog_from_config): + del data_catalog_from_config._lazy_datasets["boats"].config["type"] + assert data_catalog_from_config.__repr__() == str(data_catalog_from_config) + def test_missing_keys_from_load_versions(self, correct_config): """Test load versions include keys missing in the catalog""" pattern = "'load_versions' keys [version] are not found in the catalog." @@ -314,7 +318,7 @@ def test_config_invalid_module(self, correct_config): error_msg = "Class 'kedro.invalid_module_name.io.CSVDataset' not found" with pytest.raises(DatasetError, match=re.escape(error_msg)): - KedroDataCatalog.from_config(**correct_config) + KedroDataCatalog.from_config(**correct_config).get("boats") def test_config_relative_import(self, correct_config): """Check the error if the type points to a relative import""" @@ -322,7 +326,7 @@ def test_config_relative_import(self, correct_config): pattern = "'type' class path does not support relative paths" with pytest.raises(DatasetError, match=re.escape(pattern)): - KedroDataCatalog.from_config(**correct_config) + KedroDataCatalog.from_config(**correct_config).get("boats") def test_config_import_kedro_datasets(self, correct_config, mocker): """Test kedro_datasets default path to the dataset class""" @@ -344,7 +348,7 @@ def test_config_missing_class(self, correct_config): "Class 'kedro.io.CSVDatasetInvalid' not found, is this a typo?" ) with pytest.raises(DatasetError, match=re.escape(pattern)): - KedroDataCatalog.from_config(**correct_config) + KedroDataCatalog.from_config(**correct_config).get("boats") def test_config_invalid_dataset(self, correct_config): """Check the error if the type points to invalid class""" @@ -355,7 +359,7 @@ def test_config_invalid_dataset(self, correct_config): "all dataset types must extend 'AbstractDataset'" ) with pytest.raises(DatasetError, match=re.escape(pattern)): - KedroDataCatalog.from_config(**correct_config) + KedroDataCatalog.from_config(**correct_config).get("boats") def test_config_invalid_arguments(self, correct_config): """Check the error if the dataset config contains invalid arguments""" @@ -365,7 +369,7 @@ def test_config_invalid_arguments(self, correct_config): r"the constructor of '.*CSVDataset'" ) with pytest.raises(DatasetError, match=pattern): - KedroDataCatalog.from_config(**correct_config) + KedroDataCatalog.from_config(**correct_config).get("boats") def test_config_invalid_dataset_config(self, correct_config): correct_config["catalog"]["invalid_entry"] = "some string" @@ -395,7 +399,7 @@ def test_link_credentials(self, correct_config, mocker): config = deepcopy(correct_config) del config["catalog"]["boats"] - KedroDataCatalog.from_config(**config) + KedroDataCatalog.from_config(**config).get("cars") expected_client_kwargs = correct_config["credentials"]["s3_credentials"] mock_client.filesystem.assert_called_with("s3", **expected_client_kwargs) @@ -404,7 +408,7 @@ def test_nested_credentials(self, correct_config_with_nested_creds, mocker): mock_client = mocker.patch("kedro_datasets.pandas.csv_dataset.fsspec") config = deepcopy(correct_config_with_nested_creds) del config["catalog"]["boats"] - KedroDataCatalog.from_config(**config) + KedroDataCatalog.from_config(**config).get("cars") expected_client_kwargs = { "client_kwargs": { @@ -439,7 +443,7 @@ def dummy_load(obj_path, *args, **kwargs): mocker.patch("kedro.io.core.load_obj", side_effect=dummy_load) with pytest.raises(DatasetError, match=pattern): - KedroDataCatalog.from_config(**correct_config) + KedroDataCatalog.from_config(**correct_config).get("boats") def test_idempotent_catalog(self, correct_config): """Test that data catalog instantiations are idempotent""" @@ -451,7 +455,7 @@ def test_error_dataset_init(self, bad_config): """Check the error when trying to instantiate erroneous dataset""" pattern = r"Failed to instantiate dataset \'bad\' of type '.*BadDataset'" with pytest.raises(DatasetError, match=pattern): - KedroDataCatalog.from_config(bad_config, None) + KedroDataCatalog.from_config(bad_config, None).get("bad") def test_validate_dataset_config(self): """Test _validate_dataset_config raises error when wrong dataset config type is passed""" @@ -583,7 +587,7 @@ def test_from_correct_config_versioned_warn( to the dataset config""" correct_config["catalog"]["boats"]["versioned"] = versioned correct_config["catalog"]["boats"]["version"] = True - KedroDataCatalog.from_config(**correct_config) + KedroDataCatalog.from_config(**correct_config).get("boats") log_record = caplog.records[0] expected_log_message = ( "'version' attribute removed from dataset configuration since it "