From 1fb17e0e71f8c0af0edbc14b705b6605b883b256 Mon Sep 17 00:00:00 2001 From: Dave Date: Tue, 26 Nov 2024 11:15:07 +0100 Subject: [PATCH] make dataset type selectable --- dlt/common/destination/reference.py | 2 +- dlt/destinations/dataset/dataset.py | 9 +++++++-- dlt/destinations/dataset/factory.py | 6 ++---- dlt/pipeline/pipeline.py | 12 ++++++++++-- tests/load/test_read_interfaces.py | 24 +++++++----------------- 5 files changed, 27 insertions(+), 26 deletions(-) diff --git a/dlt/common/destination/reference.py b/dlt/common/destination/reference.py index d1024eb28c..80773da40d 100644 --- a/dlt/common/destination/reference.py +++ b/dlt/common/destination/reference.py @@ -67,7 +67,7 @@ TDestinationConfig = TypeVar("TDestinationConfig", bound="DestinationClientConfiguration") TDestinationClient = TypeVar("TDestinationClient", bound="JobClientBase") TDestinationDwhClient = TypeVar("TDestinationDwhClient", bound="DestinationClientDwhConfiguration") -TDatasetType = Literal["dbapi", "ibis"] +TDatasetType = Literal["auto", "default", "ibis"] DEFAULT_FILE_LAYOUT = "{table_name}/{load_id}.{file_id}.{ext}" diff --git a/dlt/destinations/dataset/dataset.py b/dlt/destinations/dataset/dataset.py index 4a96ad674c..0e88da7f3a 100644 --- a/dlt/destinations/dataset/dataset.py +++ b/dlt/destinations/dataset/dataset.py @@ -17,6 +17,7 @@ from dlt.common.schema import Schema from dlt.destinations.dataset.relation import ReadableDBAPIRelation from dlt.destinations.dataset.utils import get_destination_clients +from dlt.common.destination.reference import TDatasetType if TYPE_CHECKING: try: @@ -35,12 +36,14 @@ def __init__( destination: TDestinationReferenceArg, dataset_name: str, schema: Union[Schema, str, None] = None, + dataset_type: TDatasetType = "auto", ) -> None: self._destination = Destination.from_reference(destination) self._provided_schema = schema self._dataset_name = dataset_name self._sql_client: SqlClientBase[Any] = None self._schema: Schema = None + self._dataset_type = dataset_type def ibis(self) -> IbisBackend: """return a connected ibis backend""" @@ -112,7 +115,7 @@ def __call__(self, query: Any) -> ReadableDBAPIRelation: def table(self, table_name: str) -> SupportsReadableRelation: # we can create an ibis powered relation if ibis is available - if table_name in self.schema.tables: + if table_name in self.schema.tables and self._dataset_type in ("auto", "ibis"): try: from dlt.common.libs.ibis import create_unbound_ibis_table from dlt.destinations.dataset.ibis_relation import ReadableIbisRelation @@ -120,7 +123,9 @@ def table(self, table_name: str) -> SupportsReadableRelation: unbound_table = create_unbound_ibis_table(self.sql_client, self.schema, table_name) return ReadableIbisRelation(readable_dataset=self, expression=unbound_table) # type: ignore[abstract] except MissingDependencyException: - pass + # if ibis is explicitly requested, reraise + if self._dataset_type == "ibis": + raise # fallback to the standard dbapi relation return ReadableDBAPIRelation( diff --git a/dlt/destinations/dataset/factory.py b/dlt/destinations/dataset/factory.py index 3822651c8e..8ea0ddf7a1 100644 --- a/dlt/destinations/dataset/factory.py +++ b/dlt/destinations/dataset/factory.py @@ -17,8 +17,6 @@ def dataset( destination: TDestinationReferenceArg, dataset_name: str, schema: Union[Schema, str, None] = None, - dataset_type: TDatasetType = "dbapi", + dataset_type: TDatasetType = "auto", ) -> SupportsReadableDataset: - if dataset_type == "dbapi": - return ReadableDBAPIDataset(destination, dataset_name, schema) - raise NotImplementedError(f"Dataset of type {dataset_type} not implemented") + return ReadableDBAPIDataset(destination, dataset_name, schema, dataset_type) diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index a9f07d417e..1ec4b01fdc 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -1752,9 +1752,17 @@ def __getstate__(self) -> Any: return {"pipeline_name": self.pipeline_name} def _dataset( - self, schema: Union[Schema, str, None] = None, dataset_type: TDatasetType = "dbapi" + self, schema: Union[Schema, str, None] = None, dataset_type: TDatasetType = "auto" ) -> SupportsReadableDataset: - """Access helper to dataset""" + """Returns a dataset object for querying the destination data. + + Args: + schema: Schema name or Schema object to use. If None, uses the default schema if set. + dataset_type: Type of dataset interface to return. Defaults to 'auto' which will select ibis if available + otherwise it will fallback to the standard dbapi interface. + Returns: + A dataset object that supports querying the destination data. + """ if schema is None: schema = self.default_schema if self.default_schema_name else None return dataset( diff --git a/tests/load/test_read_interfaces.py b/tests/load/test_read_interfaces.py index 91b4c6e3f2..c51d820508 100644 --- a/tests/load/test_read_interfaces.py +++ b/tests/load/test_read_interfaces.py @@ -53,13 +53,12 @@ def _expected_chunk_count(p: Pipeline) -> List[int]: def populated_pipeline(request) -> Any: """fixture that returns a pipeline object populated with the example data""" - # ensure ibis is not installed + # ensure ibis is installed for these tests import subprocess - try: - subprocess.check_call(["pip", "uninstall", "ibis-framework"]) - except subprocess.CalledProcessError: - pass + subprocess.check_call( + ["pip", "install", "ibis-framework[duckdb,postgres,bigquery,snowflake,mssql,clickhouse]"] + ) destination_config = cast(DestinationTestConfiguration, request.param) @@ -272,7 +271,8 @@ def test_db_cursor_access(populated_pipeline: Pipeline) -> None: ids=lambda x: x.name, ) def test_hint_preservation(populated_pipeline: Pipeline) -> None: - table_relationship = populated_pipeline._dataset().items + # NOTE: for now hints are only preserved for the default dataset + table_relationship = populated_pipeline._dataset("default").items # check that hints are carried over to arrow table expected_decimal_precision = 10 expected_decimal_precision_2 = 12 @@ -365,7 +365,7 @@ def test_limit_and_head(populated_pipeline: Pipeline) -> None: ids=lambda x: x.name, ) def test_column_selection(populated_pipeline: Pipeline) -> None: - table_relationship = populated_pipeline._dataset().items + table_relationship = populated_pipeline._dataset("default").items columns = ["_dlt_load_id", "other_decimal"] data_frame = table_relationship.select(*columns).head().df() assert [v.lower() for v in data_frame.columns.values] == columns @@ -429,11 +429,6 @@ def test_schema_arg(populated_pipeline: Pipeline) -> None: def test_ibis_expression_relation(populated_pipeline: Pipeline) -> None: # NOTE: we could generalize this with a context for certain deps - # install ibis, we do not need any backends - import subprocess - - subprocess.check_call(["pip", "install", "ibis-framework"]) - # now we should get the more powerful ibis relation dataset = populated_pipeline._dataset() total_records = _total_records(populated_pipeline) @@ -505,11 +500,6 @@ def test_ibis_expression_relation(populated_pipeline: Pipeline) -> None: ) def test_ibis_dataset_access(populated_pipeline: Pipeline) -> None: # NOTE: we could generalize this with a context for certain deps - import subprocess - - subprocess.check_call( - ["pip", "install", "ibis-framework[duckdb,postgres,bigquery,snowflake,mssql,clickhouse]"] - ) from dlt.common.libs.ibis import SUPPORTED_DESTINATIONS