Skip to content

Commit

Permalink
make dataset type selectable
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Nov 26, 2024
1 parent 48e4034 commit 1fb17e0
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 26 deletions.
2 changes: 1 addition & 1 deletion dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
TDestinationConfig = TypeVar("TDestinationConfig", bound="DestinationClientConfiguration")
TDestinationClient = TypeVar("TDestinationClient", bound="JobClientBase")
TDestinationDwhClient = TypeVar("TDestinationDwhClient", bound="DestinationClientDwhConfiguration")
TDatasetType = Literal["dbapi", "ibis"]
TDatasetType = Literal["auto", "default", "ibis"]


DEFAULT_FILE_LAYOUT = "{table_name}/{load_id}.{file_id}.{ext}"
Expand Down
9 changes: 7 additions & 2 deletions dlt/destinations/dataset/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from dlt.common.schema import Schema
from dlt.destinations.dataset.relation import ReadableDBAPIRelation
from dlt.destinations.dataset.utils import get_destination_clients
from dlt.common.destination.reference import TDatasetType

if TYPE_CHECKING:
try:
Expand All @@ -35,12 +36,14 @@ def __init__(
destination: TDestinationReferenceArg,
dataset_name: str,
schema: Union[Schema, str, None] = None,
dataset_type: TDatasetType = "auto",
) -> None:
self._destination = Destination.from_reference(destination)
self._provided_schema = schema
self._dataset_name = dataset_name
self._sql_client: SqlClientBase[Any] = None
self._schema: Schema = None
self._dataset_type = dataset_type

def ibis(self) -> IbisBackend:
"""return a connected ibis backend"""
Expand Down Expand Up @@ -112,15 +115,17 @@ def __call__(self, query: Any) -> ReadableDBAPIRelation:

def table(self, table_name: str) -> SupportsReadableRelation:
# we can create an ibis powered relation if ibis is available
if table_name in self.schema.tables:
if table_name in self.schema.tables and self._dataset_type in ("auto", "ibis"):
try:
from dlt.common.libs.ibis import create_unbound_ibis_table
from dlt.destinations.dataset.ibis_relation import ReadableIbisRelation

unbound_table = create_unbound_ibis_table(self.sql_client, self.schema, table_name)
return ReadableIbisRelation(readable_dataset=self, expression=unbound_table) # type: ignore[abstract]
except MissingDependencyException:
pass
# if ibis is explicitly requested, reraise
if self._dataset_type == "ibis":
raise

# fallback to the standard dbapi relation
return ReadableDBAPIRelation(
Expand Down
6 changes: 2 additions & 4 deletions dlt/destinations/dataset/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ def dataset(
destination: TDestinationReferenceArg,
dataset_name: str,
schema: Union[Schema, str, None] = None,
dataset_type: TDatasetType = "dbapi",
dataset_type: TDatasetType = "auto",
) -> SupportsReadableDataset:
if dataset_type == "dbapi":
return ReadableDBAPIDataset(destination, dataset_name, schema)
raise NotImplementedError(f"Dataset of type {dataset_type} not implemented")
return ReadableDBAPIDataset(destination, dataset_name, schema, dataset_type)
12 changes: 10 additions & 2 deletions dlt/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -1752,9 +1752,17 @@ def __getstate__(self) -> Any:
return {"pipeline_name": self.pipeline_name}

def _dataset(
self, schema: Union[Schema, str, None] = None, dataset_type: TDatasetType = "dbapi"
self, schema: Union[Schema, str, None] = None, dataset_type: TDatasetType = "auto"
) -> SupportsReadableDataset:
"""Access helper to dataset"""
"""Returns a dataset object for querying the destination data.
Args:
schema: Schema name or Schema object to use. If None, uses the default schema if set.
dataset_type: Type of dataset interface to return. Defaults to 'auto' which will select ibis if available
otherwise it will fallback to the standard dbapi interface.
Returns:
A dataset object that supports querying the destination data.
"""
if schema is None:
schema = self.default_schema if self.default_schema_name else None
return dataset(
Expand Down
24 changes: 7 additions & 17 deletions tests/load/test_read_interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,12 @@ def _expected_chunk_count(p: Pipeline) -> List[int]:
def populated_pipeline(request) -> Any:
"""fixture that returns a pipeline object populated with the example data"""

# ensure ibis is not installed
# ensure ibis is installed for these tests
import subprocess

try:
subprocess.check_call(["pip", "uninstall", "ibis-framework"])
except subprocess.CalledProcessError:
pass
subprocess.check_call(
["pip", "install", "ibis-framework[duckdb,postgres,bigquery,snowflake,mssql,clickhouse]"]
)

destination_config = cast(DestinationTestConfiguration, request.param)

Expand Down Expand Up @@ -272,7 +271,8 @@ def test_db_cursor_access(populated_pipeline: Pipeline) -> None:
ids=lambda x: x.name,
)
def test_hint_preservation(populated_pipeline: Pipeline) -> None:
table_relationship = populated_pipeline._dataset().items
# NOTE: for now hints are only preserved for the default dataset
table_relationship = populated_pipeline._dataset("default").items
# check that hints are carried over to arrow table
expected_decimal_precision = 10
expected_decimal_precision_2 = 12
Expand Down Expand Up @@ -365,7 +365,7 @@ def test_limit_and_head(populated_pipeline: Pipeline) -> None:
ids=lambda x: x.name,
)
def test_column_selection(populated_pipeline: Pipeline) -> None:
table_relationship = populated_pipeline._dataset().items
table_relationship = populated_pipeline._dataset("default").items
columns = ["_dlt_load_id", "other_decimal"]
data_frame = table_relationship.select(*columns).head().df()
assert [v.lower() for v in data_frame.columns.values] == columns
Expand Down Expand Up @@ -429,11 +429,6 @@ def test_schema_arg(populated_pipeline: Pipeline) -> None:
def test_ibis_expression_relation(populated_pipeline: Pipeline) -> None:
# NOTE: we could generalize this with a context for certain deps

# install ibis, we do not need any backends
import subprocess

subprocess.check_call(["pip", "install", "ibis-framework"])

# now we should get the more powerful ibis relation
dataset = populated_pipeline._dataset()
total_records = _total_records(populated_pipeline)
Expand Down Expand Up @@ -505,11 +500,6 @@ def test_ibis_expression_relation(populated_pipeline: Pipeline) -> None:
)
def test_ibis_dataset_access(populated_pipeline: Pipeline) -> None:
# NOTE: we could generalize this with a context for certain deps
import subprocess

subprocess.check_call(
["pip", "install", "ibis-framework[duckdb,postgres,bigquery,snowflake,mssql,clickhouse]"]
)

from dlt.common.libs.ibis import SUPPORTED_DESTINATIONS

Expand Down

0 comments on commit 1fb17e0

Please sign in to comment.