diff --git a/.github/workflows/test_destination_athena.yml b/.github/workflows/test_destination_athena.yml index 1169fab0de..03eb7f9434 100644 --- a/.github/workflows/test_destination_athena.yml +++ b/.github/workflows/test_destination_athena.yml @@ -67,7 +67,7 @@ jobs: - name: Install dependencies # if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true' - run: poetry install --no-interaction -E athena --with sentry-sdk --with pipeline + run: poetry install --no-interaction -E athena --with sentry-sdk --with pipeline,ibis - name: create secrets.toml run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml diff --git a/.github/workflows/test_destination_athena_iceberg.yml b/.github/workflows/test_destination_athena_iceberg.yml index 7ccefcc055..3412e789e3 100644 --- a/.github/workflows/test_destination_athena_iceberg.yml +++ b/.github/workflows/test_destination_athena_iceberg.yml @@ -67,7 +67,7 @@ jobs: - name: Install dependencies # if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true' - run: poetry install --no-interaction -E athena --with sentry-sdk --with pipeline + run: poetry install --no-interaction -E athena --with sentry-sdk --with pipeline,ibis - name: create secrets.toml run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml diff --git a/.github/workflows/test_destination_bigquery.yml b/.github/workflows/test_destination_bigquery.yml index 7afc9b8a00..eb8b63f757 100644 --- a/.github/workflows/test_destination_bigquery.yml +++ b/.github/workflows/test_destination_bigquery.yml @@ -66,7 +66,7 @@ jobs: - name: Install dependencies # if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true' - run: poetry install --no-interaction -E bigquery --with providers -E parquet --with sentry-sdk --with pipeline + run: poetry install --no-interaction -E bigquery --with providers -E parquet --with sentry-sdk --with pipeline,ibis - name: create secrets.toml run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml diff --git a/.github/workflows/test_destination_clickhouse.yml b/.github/workflows/test_destination_clickhouse.yml index 7f297db971..46464ea462 100644 --- a/.github/workflows/test_destination_clickhouse.yml +++ b/.github/workflows/test_destination_clickhouse.yml @@ -61,7 +61,7 @@ jobs: key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-gcp - name: Install dependencies - run: poetry install --no-interaction -E clickhouse --with providers -E parquet --with sentry-sdk --with pipeline + run: poetry install --no-interaction -E clickhouse --with providers -E parquet --with sentry-sdk --with pipeline,ibis - name: create secrets.toml run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml diff --git a/.github/workflows/test_destination_databricks.yml b/.github/workflows/test_destination_databricks.yml index 1656fe27f4..c1609de863 100644 --- a/.github/workflows/test_destination_databricks.yml +++ b/.github/workflows/test_destination_databricks.yml @@ -64,7 +64,7 @@ jobs: key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-gcp - name: Install dependencies - run: poetry install --no-interaction -E databricks -E s3 -E gs -E az -E parquet --with sentry-sdk --with pipeline + run: poetry install --no-interaction -E databricks -E s3 -E gs -E az -E parquet --with sentry-sdk --with pipeline,ibis - name: create secrets.toml run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml diff --git a/.github/workflows/test_destination_dremio.yml b/.github/workflows/test_destination_dremio.yml index 45c6d17db1..4bc48c54db 100644 --- a/.github/workflows/test_destination_dremio.yml +++ b/.github/workflows/test_destination_dremio.yml @@ -65,7 +65,7 @@ jobs: key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-gcp - name: Install dependencies - run: poetry install --no-interaction -E s3 -E gs -E az -E parquet --with sentry-sdk --with pipeline + run: poetry install --no-interaction -E s3 -E gs -E az -E parquet --with sentry-sdk --with pipeline,ibis - run: | poetry run pytest tests/load --ignore tests/load/sources diff --git a/.github/workflows/test_destination_motherduck.yml b/.github/workflows/test_destination_motherduck.yml index 0014b17655..db81131266 100644 --- a/.github/workflows/test_destination_motherduck.yml +++ b/.github/workflows/test_destination_motherduck.yml @@ -64,7 +64,7 @@ jobs: key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-motherduck - name: Install dependencies - run: poetry install --no-interaction -E motherduck -E s3 -E gs -E az -E parquet --with sentry-sdk --with pipeline + run: poetry install --no-interaction -E motherduck -E s3 -E gs -E az -E parquet --with sentry-sdk --with pipeline,ibis - name: create secrets.toml run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml diff --git a/.github/workflows/test_destination_mssql.yml b/.github/workflows/test_destination_mssql.yml index 8b899e7da2..6fdd7a5bc5 100644 --- a/.github/workflows/test_destination_mssql.yml +++ b/.github/workflows/test_destination_mssql.yml @@ -69,7 +69,7 @@ jobs: key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-gcp - name: Install dependencies - run: poetry install --no-interaction -E mssql -E s3 -E gs -E az -E parquet --with sentry-sdk --with pipeline + run: poetry install --no-interaction -E mssql -E s3 -E gs -E az -E parquet --with sentry-sdk --with pipeline,ibis - name: create secrets.toml run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml diff --git a/.github/workflows/test_destination_snowflake.yml b/.github/workflows/test_destination_snowflake.yml index a720c479bd..73a2a8f6e7 100644 --- a/.github/workflows/test_destination_snowflake.yml +++ b/.github/workflows/test_destination_snowflake.yml @@ -64,7 +64,7 @@ jobs: key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-gcp - name: Install dependencies - run: poetry install --no-interaction -E snowflake -E s3 -E gs -E az -E parquet --with sentry-sdk --with pipeline + run: poetry install --no-interaction -E snowflake -E s3 -E gs -E az -E parquet --with sentry-sdk --with pipeline,ibis - name: create secrets.toml run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml diff --git a/.github/workflows/test_destination_synapse.yml b/.github/workflows/test_destination_synapse.yml index be1b493916..8f6bf1eb29 100644 --- a/.github/workflows/test_destination_synapse.yml +++ b/.github/workflows/test_destination_synapse.yml @@ -67,7 +67,7 @@ jobs: key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-gcp - name: Install dependencies - run: poetry install --no-interaction -E synapse -E parquet --with sentry-sdk --with pipeline + run: poetry install --no-interaction -E synapse -E parquet --with sentry-sdk --with pipeline,ibis - name: create secrets.toml run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml diff --git a/.github/workflows/test_destinations.yml b/.github/workflows/test_destinations.yml index 933248d994..cfd0a3bd56 100644 --- a/.github/workflows/test_destinations.yml +++ b/.github/workflows/test_destinations.yml @@ -78,7 +78,7 @@ jobs: - name: Install dependencies # if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true' - run: poetry install --no-interaction -E redshift -E postgis -E postgres -E gs -E s3 -E az -E parquet -E duckdb -E cli -E filesystem --with sentry-sdk --with pipeline -E deltalake + run: poetry install --no-interaction -E redshift -E postgis -E postgres -E gs -E s3 -E az -E parquet -E duckdb -E cli -E filesystem --with sentry-sdk --with pipeline,ibis -E deltalake - name: create secrets.toml run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml diff --git a/.github/workflows/test_local_destinations.yml b/.github/workflows/test_local_destinations.yml index 4947a46a3b..6f44e5fd5a 100644 --- a/.github/workflows/test_local_destinations.yml +++ b/.github/workflows/test_local_destinations.yml @@ -95,7 +95,7 @@ jobs: key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-local-destinations - name: Install dependencies - run: poetry install --no-interaction -E postgres -E postgis -E duckdb -E parquet -E filesystem -E cli -E weaviate -E qdrant -E sftp --with sentry-sdk --with pipeline -E deltalake + run: poetry install --no-interaction -E postgres -E postgis -E duckdb -E parquet -E filesystem -E cli -E weaviate -E qdrant -E sftp --with sentry-sdk --with pipeline,ibis -E deltalake - name: Start SFTP server run: docker compose -f "tests/load/filesystem_sftp/docker-compose.yml" up -d diff --git a/.github/workflows/test_sqlalchemy_destinations.yml b/.github/workflows/test_sqlalchemy_destinations.yml index c2572b322d..1f00373674 100644 --- a/.github/workflows/test_sqlalchemy_destinations.yml +++ b/.github/workflows/test_sqlalchemy_destinations.yml @@ -86,7 +86,7 @@ jobs: key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-local-destinations - name: Install dependencies - run: poetry install --no-interaction -E parquet -E filesystem -E sqlalchemy -E cli --with sentry-sdk --with pipeline && poetry run pip install mysqlclient && poetry run pip install "sqlalchemy==${{ matrix.sqlalchemy }}" + run: poetry install --no-interaction -E parquet -E filesystem -E sqlalchemy -E cli --with sentry-sdk --with pipeline,ibis && poetry run pip install mysqlclient && poetry run pip install "sqlalchemy==${{ matrix.sqlalchemy }}" - name: create secrets.toml run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml diff --git a/dlt/common/destination/reference.py b/dlt/common/destination/reference.py index e27f99cde7..048fe2186f 100644 --- a/dlt/common/destination/reference.py +++ b/dlt/common/destination/reference.py @@ -67,7 +67,7 @@ TDestinationConfig = TypeVar("TDestinationConfig", bound="DestinationClientConfiguration") TDestinationClient = TypeVar("TDestinationClient", bound="JobClientBase") TDestinationDwhClient = TypeVar("TDestinationDwhClient", bound="DestinationClientDwhConfiguration") -TDatasetType = Literal["dbapi", "ibis"] +TDatasetType = Literal["auto", "default", "ibis"] DEFAULT_FILE_LAYOUT = "{table_name}/{load_id}.{file_id}.{ext}" @@ -76,7 +76,7 @@ try: from dlt.common.libs.pandas import DataFrame from dlt.common.libs.pyarrow import Table as ArrowTable - from dlt.common.libs.ibis import BaseBackend as IbisBackend + from dlt.helpers.ibis import BaseBackend as IbisBackend except MissingDependencyException: DataFrame = Any ArrowTable = Any @@ -535,7 +535,7 @@ def fetchone(self) -> Optional[Tuple[Any, ...]]: ... # modifying access parameters - def limit(self, limit: int) -> "SupportsReadableRelation": + def limit(self, limit: int, **kwargs: Any) -> "SupportsReadableRelation": """limit the result to 'limit' items""" ... @@ -557,6 +557,10 @@ def __getitem__(self, columns: Union[str, Sequence[str]]) -> "SupportsReadableRe """set which columns will be selected""" ... + def __getattr__(self, attr: str) -> Any: + """get an attribute of the relation""" + ... + def __copy__(self) -> "SupportsReadableRelation": """create a copy of the relation object""" ... diff --git a/dlt/destinations/dataset.py b/dlt/destinations/dataset.py deleted file mode 100644 index 27a7f5a7af..0000000000 --- a/dlt/destinations/dataset.py +++ /dev/null @@ -1,412 +0,0 @@ -from typing import Any, Generator, Sequence, Union, TYPE_CHECKING, Tuple - -from contextlib import contextmanager - -from dlt import version -from dlt.common.json import json -from dlt.common.exceptions import MissingDependencyException -from dlt.common.destination import AnyDestination -from dlt.common.destination.reference import ( - SupportsReadableRelation, - SupportsReadableDataset, - TDatasetType, - TDestinationReferenceArg, - Destination, - JobClientBase, - WithStateSync, - DestinationClientDwhConfiguration, - DestinationClientStagingConfiguration, - DestinationClientConfiguration, - DestinationClientDwhWithStagingConfiguration, -) - -from dlt.common.schema.typing import TTableSchemaColumns -from dlt.destinations.sql_client import SqlClientBase, WithSqlClient -from dlt.common.schema import Schema -from dlt.common.exceptions import DltException - -if TYPE_CHECKING: - try: - from dlt.common.libs.ibis import BaseBackend as IbisBackend - except MissingDependencyException: - IbisBackend = Any -else: - IbisBackend = Any - - -class DatasetException(DltException): - pass - - -class ReadableRelationHasQueryException(DatasetException): - def __init__(self, attempted_change: str) -> None: - msg = ( - "This readable relation was created with a provided sql query. You cannot change" - f" {attempted_change}. Please change the orignal sql query." - ) - super().__init__(msg) - - -class ReadableRelationUnknownColumnException(DatasetException): - def __init__(self, column_name: str) -> None: - msg = ( - f"The selected column {column_name} is not known in the dlt schema for this releation." - ) - super().__init__(msg) - - -class ReadableDBAPIRelation(SupportsReadableRelation): - def __init__( - self, - *, - readable_dataset: "ReadableDBAPIDataset", - provided_query: Any = None, - table_name: str = None, - limit: int = None, - selected_columns: Sequence[str] = None, - ) -> None: - """Create a lazy evaluated relation to for the dataset of a destination""" - - # NOTE: we can keep an assertion here, this class will not be created by the user - assert bool(table_name) != bool( - provided_query - ), "Please provide either an sql query OR a table_name" - - self._dataset = readable_dataset - - self._provided_query = provided_query - self._table_name = table_name - self._limit = limit - self._selected_columns = selected_columns - - # wire protocol functions - self.df = self._wrap_func("df") # type: ignore - self.arrow = self._wrap_func("arrow") # type: ignore - self.fetchall = self._wrap_func("fetchall") # type: ignore - self.fetchmany = self._wrap_func("fetchmany") # type: ignore - self.fetchone = self._wrap_func("fetchone") # type: ignore - - self.iter_df = self._wrap_iter("iter_df") # type: ignore - self.iter_arrow = self._wrap_iter("iter_arrow") # type: ignore - self.iter_fetch = self._wrap_iter("iter_fetch") # type: ignore - - @property - def sql_client(self) -> SqlClientBase[Any]: - return self._dataset.sql_client - - @property - def schema(self) -> Schema: - return self._dataset.schema - - @property - def query(self) -> Any: - """build the query""" - if self._provided_query: - return self._provided_query - - table_name = self.sql_client.make_qualified_table_name( - self.schema.naming.normalize_tables_path(self._table_name) - ) - - maybe_limit_clause_1 = "" - maybe_limit_clause_2 = "" - if self._limit: - maybe_limit_clause_1, maybe_limit_clause_2 = self.sql_client._limit_clause_sql( - self._limit - ) - - selector = "*" - if self._selected_columns: - selector = ",".join( - [ - self.sql_client.escape_column_name(self.schema.naming.normalize_path(c)) - for c in self._selected_columns - ] - ) - - return f"SELECT {maybe_limit_clause_1} {selector} FROM {table_name} {maybe_limit_clause_2}" - - @property - def columns_schema(self) -> TTableSchemaColumns: - return self.compute_columns_schema() - - @columns_schema.setter - def columns_schema(self, new_value: TTableSchemaColumns) -> None: - raise NotImplementedError("columns schema in ReadableDBAPIRelation can only be computed") - - def compute_columns_schema(self) -> TTableSchemaColumns: - """provide schema columns for the cursor, may be filtered by selected columns""" - - columns_schema = ( - self.schema.tables.get(self._table_name, {}).get("columns", {}) if self.schema else {} - ) - - if not columns_schema: - return None - if not self._selected_columns: - return columns_schema - - filtered_columns: TTableSchemaColumns = {} - for sc in self._selected_columns: - sc = self.schema.naming.normalize_path(sc) - if sc not in columns_schema.keys(): - raise ReadableRelationUnknownColumnException(sc) - filtered_columns[sc] = columns_schema[sc] - - return filtered_columns - - @contextmanager - def cursor(self) -> Generator[SupportsReadableRelation, Any, Any]: - """Gets a DBApiCursor for the current relation""" - with self.sql_client as client: - # this hacky code is needed for mssql to disable autocommit, read iterators - # will not work otherwise. in the future we should be able to create a readony - # client which will do this automatically - if hasattr(self.sql_client, "_conn") and hasattr(self.sql_client._conn, "autocommit"): - self.sql_client._conn.autocommit = False - with client.execute_query(self.query) as cursor: - if columns_schema := self.columns_schema: - cursor.columns_schema = columns_schema - yield cursor - - def _wrap_iter(self, func_name: str) -> Any: - """wrap SupportsReadableRelation generators in cursor context""" - - def _wrap(*args: Any, **kwargs: Any) -> Any: - with self.cursor() as cursor: - yield from getattr(cursor, func_name)(*args, **kwargs) - - return _wrap - - def _wrap_func(self, func_name: str) -> Any: - """wrap SupportsReadableRelation functions in cursor context""" - - def _wrap(*args: Any, **kwargs: Any) -> Any: - with self.cursor() as cursor: - return getattr(cursor, func_name)(*args, **kwargs) - - return _wrap - - def __copy__(self) -> "ReadableDBAPIRelation": - return self.__class__( - readable_dataset=self._dataset, - provided_query=self._provided_query, - table_name=self._table_name, - limit=self._limit, - selected_columns=self._selected_columns, - ) - - def limit(self, limit: int) -> "ReadableDBAPIRelation": - if self._provided_query: - raise ReadableRelationHasQueryException("limit") - rel = self.__copy__() - rel._limit = limit - return rel - - def select(self, *columns: str) -> "ReadableDBAPIRelation": - if self._provided_query: - raise ReadableRelationHasQueryException("select") - rel = self.__copy__() - rel._selected_columns = columns - # NOTE: the line below will ensure that no unknown columns are selected if - # schema is known - rel.compute_columns_schema() - return rel - - def __getitem__(self, columns: Union[str, Sequence[str]]) -> "SupportsReadableRelation": - if isinstance(columns, str): - return self.select(columns) - elif isinstance(columns, Sequence): - return self.select(*columns) - else: - raise TypeError(f"Invalid argument type: {type(columns).__name__}") - - def head(self, limit: int = 5) -> "ReadableDBAPIRelation": - return self.limit(limit) - - -class ReadableDBAPIDataset(SupportsReadableDataset): - """Access to dataframes and arrowtables in the destination dataset via dbapi""" - - def __init__( - self, - destination: TDestinationReferenceArg, - dataset_name: str, - schema: Union[Schema, str, None] = None, - ) -> None: - self._destination = Destination.from_reference(destination) - self._provided_schema = schema - self._dataset_name = dataset_name - self._sql_client: SqlClientBase[Any] = None - self._schema: Schema = None - - def ibis(self) -> IbisBackend: - """return a connected ibis backend""" - from dlt.common.libs.ibis import create_ibis_backend - - self._ensure_client_and_schema() - return create_ibis_backend( - self._destination, - self._destination_client(self.schema), - ) - - @property - def schema(self) -> Schema: - self._ensure_client_and_schema() - return self._schema - - @property - def sql_client(self) -> SqlClientBase[Any]: - self._ensure_client_and_schema() - return self._sql_client - - def _destination_client(self, schema: Schema) -> JobClientBase: - return get_destination_clients( - schema, destination=self._destination, destination_dataset_name=self._dataset_name - )[0] - - def _ensure_client_and_schema(self) -> None: - """Lazy load schema and client""" - - # full schema given, nothing to do - if not self._schema and isinstance(self._provided_schema, Schema): - self._schema = self._provided_schema - - # schema name given, resolve it from destination by name - elif not self._schema and isinstance(self._provided_schema, str): - with self._destination_client(Schema(self._provided_schema)) as client: - if isinstance(client, WithStateSync): - stored_schema = client.get_stored_schema(self._provided_schema) - if stored_schema: - self._schema = Schema.from_stored_schema(json.loads(stored_schema.schema)) - else: - self._schema = Schema(self._provided_schema) - - # no schema name given, load newest schema from destination - elif not self._schema: - with self._destination_client(Schema(self._dataset_name)) as client: - if isinstance(client, WithStateSync): - stored_schema = client.get_stored_schema() - if stored_schema: - self._schema = Schema.from_stored_schema(json.loads(stored_schema.schema)) - - # default to empty schema with dataset name - if not self._schema: - self._schema = Schema(self._dataset_name) - - # here we create the client bound to the resolved schema - if not self._sql_client: - destination_client = self._destination_client(self._schema) - if isinstance(destination_client, WithSqlClient): - self._sql_client = destination_client.sql_client - else: - raise Exception( - f"Destination {destination_client.config.destination_type} does not support" - " SqlClient." - ) - - def __call__(self, query: Any) -> ReadableDBAPIRelation: - return ReadableDBAPIRelation(readable_dataset=self, provided_query=query) # type: ignore[abstract] - - def table(self, table_name: str) -> SupportsReadableRelation: - return ReadableDBAPIRelation( - readable_dataset=self, - table_name=table_name, - ) # type: ignore[abstract] - - def __getitem__(self, table_name: str) -> SupportsReadableRelation: - """access of table via dict notation""" - return self.table(table_name) - - def __getattr__(self, table_name: str) -> SupportsReadableRelation: - """access of table via property notation""" - return self.table(table_name) - - -def dataset( - destination: TDestinationReferenceArg, - dataset_name: str, - schema: Union[Schema, str, None] = None, - dataset_type: TDatasetType = "dbapi", -) -> SupportsReadableDataset: - if dataset_type == "dbapi": - return ReadableDBAPIDataset(destination, dataset_name, schema) - raise NotImplementedError(f"Dataset of type {dataset_type} not implemented") - - -# helpers -def get_destination_client_initial_config( - destination: AnyDestination, - default_schema_name: str, - dataset_name: str, - as_staging: bool = False, -) -> DestinationClientConfiguration: - client_spec = destination.spec - - # this client supports many schemas and datasets - if issubclass(client_spec, DestinationClientDwhConfiguration): - if issubclass(client_spec, DestinationClientStagingConfiguration): - spec: DestinationClientDwhConfiguration = client_spec(as_staging_destination=as_staging) - else: - spec = client_spec() - - spec._bind_dataset_name(dataset_name, default_schema_name) - return spec - - return client_spec() - - -def get_destination_clients( - schema: Schema, - destination: AnyDestination = None, - destination_dataset_name: str = None, - destination_initial_config: DestinationClientConfiguration = None, - staging: AnyDestination = None, - staging_dataset_name: str = None, - staging_initial_config: DestinationClientConfiguration = None, - # pipeline specific settings - default_schema_name: str = None, -) -> Tuple[JobClientBase, JobClientBase]: - destination = Destination.from_reference(destination) if destination else None - staging = Destination.from_reference(staging) if staging else None - - try: - # resolve staging config in order to pass it to destination client config - staging_client = None - if staging: - if not staging_initial_config: - # this is just initial config - without user configuration injected - staging_initial_config = get_destination_client_initial_config( - staging, - dataset_name=staging_dataset_name, - default_schema_name=default_schema_name, - as_staging=True, - ) - # create the client - that will also resolve the config - staging_client = staging.client(schema, staging_initial_config) - - if not destination_initial_config: - # config is not provided then get it with injected credentials - initial_config = get_destination_client_initial_config( - destination, - dataset_name=destination_dataset_name, - default_schema_name=default_schema_name, - ) - - # attach the staging client config to destination client config - if its type supports it - if ( - staging_client - and isinstance(initial_config, DestinationClientDwhWithStagingConfiguration) - and isinstance(staging_client.config, DestinationClientStagingConfiguration) - ): - initial_config.staging_config = staging_client.config - # create instance with initial_config properly set - client = destination.client(schema, initial_config) - return client, staging_client - except ModuleNotFoundError: - client_spec = destination.spec() - raise MissingDependencyException( - f"{client_spec.destination_type} destination", - [f"{version.DLT_PKG_NAME}[{client_spec.destination_type}]"], - "Dependencies for specific destinations are available as extras of dlt", - ) diff --git a/dlt/destinations/dataset/__init__.py b/dlt/destinations/dataset/__init__.py new file mode 100644 index 0000000000..e0eef681b8 --- /dev/null +++ b/dlt/destinations/dataset/__init__.py @@ -0,0 +1,19 @@ +from dlt.destinations.dataset.factory import ( + dataset, +) +from dlt.destinations.dataset.dataset import ( + ReadableDBAPIDataset, + get_destination_clients, +) +from dlt.destinations.dataset.utils import ( + get_destination_clients, + get_destination_client_initial_config, +) + + +__all__ = [ + "dataset", + "ReadableDBAPIDataset", + "get_destination_client_initial_config", + "get_destination_clients", +] diff --git a/dlt/destinations/dataset/dataset.py b/dlt/destinations/dataset/dataset.py new file mode 100644 index 0000000000..e443045e49 --- /dev/null +++ b/dlt/destinations/dataset/dataset.py @@ -0,0 +1,142 @@ +from typing import Any, Union, TYPE_CHECKING + +from dlt.common.json import json + +from dlt.common.exceptions import MissingDependencyException + +from dlt.common.destination.reference import ( + SupportsReadableRelation, + SupportsReadableDataset, + TDestinationReferenceArg, + Destination, + JobClientBase, + WithStateSync, +) + +from dlt.destinations.sql_client import SqlClientBase, WithSqlClient +from dlt.common.schema import Schema +from dlt.destinations.dataset.relation import ReadableDBAPIRelation +from dlt.destinations.dataset.utils import get_destination_clients +from dlt.common.destination.reference import TDatasetType + +if TYPE_CHECKING: + try: + from dlt.helpers.ibis import BaseBackend as IbisBackend + except MissingDependencyException: + IbisBackend = Any +else: + IbisBackend = Any + + +class ReadableDBAPIDataset(SupportsReadableDataset): + """Access to dataframes and arrowtables in the destination dataset via dbapi""" + + def __init__( + self, + destination: TDestinationReferenceArg, + dataset_name: str, + schema: Union[Schema, str, None] = None, + dataset_type: TDatasetType = "auto", + ) -> None: + self._destination = Destination.from_reference(destination) + self._provided_schema = schema + self._dataset_name = dataset_name + self._sql_client: SqlClientBase[Any] = None + self._schema: Schema = None + self._dataset_type = dataset_type + + def ibis(self) -> IbisBackend: + """return a connected ibis backend""" + from dlt.helpers.ibis import create_ibis_backend + + self._ensure_client_and_schema() + return create_ibis_backend( + self._destination, + self._destination_client(self.schema), + ) + + @property + def schema(self) -> Schema: + self._ensure_client_and_schema() + return self._schema + + @property + def sql_client(self) -> SqlClientBase[Any]: + self._ensure_client_and_schema() + return self._sql_client + + def _destination_client(self, schema: Schema) -> JobClientBase: + return get_destination_clients( + schema, destination=self._destination, destination_dataset_name=self._dataset_name + )[0] + + def _ensure_client_and_schema(self) -> None: + """Lazy load schema and client""" + + # full schema given, nothing to do + if not self._schema and isinstance(self._provided_schema, Schema): + self._schema = self._provided_schema + + # schema name given, resolve it from destination by name + elif not self._schema and isinstance(self._provided_schema, str): + with self._destination_client(Schema(self._provided_schema)) as client: + if isinstance(client, WithStateSync): + stored_schema = client.get_stored_schema(self._provided_schema) + if stored_schema: + self._schema = Schema.from_stored_schema(json.loads(stored_schema.schema)) + else: + self._schema = Schema(self._provided_schema) + + # no schema name given, load newest schema from destination + elif not self._schema: + with self._destination_client(Schema(self._dataset_name)) as client: + if isinstance(client, WithStateSync): + stored_schema = client.get_stored_schema() + if stored_schema: + self._schema = Schema.from_stored_schema(json.loads(stored_schema.schema)) + + # default to empty schema with dataset name + if not self._schema: + self._schema = Schema(self._dataset_name) + + # here we create the client bound to the resolved schema + if not self._sql_client: + destination_client = self._destination_client(self._schema) + if isinstance(destination_client, WithSqlClient): + self._sql_client = destination_client.sql_client + else: + raise Exception( + f"Destination {destination_client.config.destination_type} does not support" + " SqlClient." + ) + + def __call__(self, query: Any) -> ReadableDBAPIRelation: + return ReadableDBAPIRelation(readable_dataset=self, provided_query=query) # type: ignore[abstract] + + def table(self, table_name: str) -> SupportsReadableRelation: + # we can create an ibis powered relation if ibis is available + if table_name in self.schema.tables and self._dataset_type in ("auto", "ibis"): + try: + from dlt.helpers.ibis import create_unbound_ibis_table + from dlt.destinations.dataset.ibis_relation import ReadableIbisRelation + + unbound_table = create_unbound_ibis_table(self.sql_client, self.schema, table_name) + return ReadableIbisRelation(readable_dataset=self, ibis_object=unbound_table, columns_schema=self.schema.tables[table_name]["columns"]) # type: ignore[abstract] + except MissingDependencyException: + # if ibis is explicitly requested, reraise + if self._dataset_type == "ibis": + raise + + # fallback to the standard dbapi relation + return ReadableDBAPIRelation( + readable_dataset=self, + table_name=table_name, + ) # type: ignore[abstract] + + def __getitem__(self, table_name: str) -> SupportsReadableRelation: + """access of table via dict notation""" + return self.table(table_name) + + def __getattr__(self, table_name: str) -> SupportsReadableRelation: + """access of table via property notation""" + return self.table(table_name) diff --git a/dlt/destinations/dataset/exceptions.py b/dlt/destinations/dataset/exceptions.py new file mode 100644 index 0000000000..17e8f6b563 --- /dev/null +++ b/dlt/destinations/dataset/exceptions.py @@ -0,0 +1,22 @@ +from dlt.common.exceptions import DltException + + +class DatasetException(DltException): + pass + + +class ReadableRelationHasQueryException(DatasetException): + def __init__(self, attempted_change: str) -> None: + msg = ( + "This readable relation was created with a provided sql query. You cannot change" + f" {attempted_change}. Please change the orignal sql query." + ) + super().__init__(msg) + + +class ReadableRelationUnknownColumnException(DatasetException): + def __init__(self, column_name: str) -> None: + msg = ( + f"The selected column {column_name} is not known in the dlt schema for this releation." + ) + super().__init__(msg) diff --git a/dlt/destinations/dataset/factory.py b/dlt/destinations/dataset/factory.py new file mode 100644 index 0000000000..8ea0ddf7a1 --- /dev/null +++ b/dlt/destinations/dataset/factory.py @@ -0,0 +1,22 @@ +from typing import Union + + +from dlt.common.destination import AnyDestination +from dlt.common.destination.reference import ( + SupportsReadableDataset, + TDatasetType, + TDestinationReferenceArg, +) + +from dlt.common.schema import Schema + +from dlt.destinations.dataset.dataset import ReadableDBAPIDataset + + +def dataset( + destination: TDestinationReferenceArg, + dataset_name: str, + schema: Union[Schema, str, None] = None, + dataset_type: TDatasetType = "auto", +) -> SupportsReadableDataset: + return ReadableDBAPIDataset(destination, dataset_name, schema, dataset_type) diff --git a/dlt/destinations/dataset/ibis_relation.py b/dlt/destinations/dataset/ibis_relation.py new file mode 100644 index 0000000000..632298ad56 --- /dev/null +++ b/dlt/destinations/dataset/ibis_relation.py @@ -0,0 +1,224 @@ +from typing import TYPE_CHECKING, Any, Union, Sequence + +from functools import partial + +from dlt.common.exceptions import MissingDependencyException +from dlt.destinations.dataset.relation import BaseReadableDBAPIRelation +from dlt.common.schema.typing import TTableSchemaColumns + + +if TYPE_CHECKING: + from dlt.destinations.dataset.dataset import ReadableDBAPIDataset +else: + ReadableDBAPIDataset = Any + +try: + from dlt.helpers.ibis import Expr +except MissingDependencyException: + Expr = Any + +# map dlt destination to sqlglot dialect +DIALECT_MAP = { + "dlt.destinations.duckdb": "duckdb", # works + "dlt.destinations.motherduck": "duckdb", # works + "dlt.destinations.clickhouse": "clickhouse", # works + "dlt.destinations.databricks": "databricks", # works + "dlt.destinations.bigquery": "bigquery", # works + "dlt.destinations.postgres": "postgres", # works + "dlt.destinations.redshift": "redshift", # works + "dlt.destinations.snowflake": "snowflake", # works + "dlt.destinations.mssql": "tsql", # works + "dlt.destinations.synapse": "tsql", # works + "dlt.destinations.athena": "trino", # works + "dlt.destinations.filesystem": "duckdb", # works + "dlt.destinations.dremio": "presto", # works + # NOTE: can we discover the current dialect in sqlalchemy? + "dlt.destinations.sqlalchemy": "mysql", # may work +} + +# NOTE: some dialects are not supported by ibis, but by sqlglot, these need to +# be transpiled with a intermediary step +TRANSPILE_VIA_MAP = { + "tsql": "postgres", + "databricks": "postgres", + "clickhouse": "postgres", + "redshift": "postgres", + "presto": "postgres", +} + + +class ReadableIbisRelation(BaseReadableDBAPIRelation): + def __init__( + self, + *, + readable_dataset: ReadableDBAPIDataset, + ibis_object: Any = None, + columns_schema: TTableSchemaColumns = None, + ) -> None: + """Create a lazy evaluated relation to for the dataset of a destination""" + super().__init__(readable_dataset=readable_dataset) + self._ibis_object = ibis_object + self._columns_schema = columns_schema + + @property + def query(self) -> Any: + """build the query""" + + from dlt.helpers.ibis import ibis, sqlglot + + destination_type = self._dataset._destination.destination_type + target_dialect = DIALECT_MAP[destination_type] + + # render sql directly if possible + if target_dialect not in TRANSPILE_VIA_MAP: + return ibis.to_sql(self._ibis_object, dialect=target_dialect) + + # here we need to transpile first + transpile_via = TRANSPILE_VIA_MAP[target_dialect] + sql = ibis.to_sql(self._ibis_object, dialect=transpile_via) + sql = sqlglot.transpile(sql, read=transpile_via, write=target_dialect)[0] + return sql + + @property + def columns_schema(self) -> TTableSchemaColumns: + return self.compute_columns_schema() + + @columns_schema.setter + def columns_schema(self, new_value: TTableSchemaColumns) -> None: + raise NotImplementedError("columns schema in ReadableDBAPIRelation can only be computed") + + def compute_columns_schema(self) -> TTableSchemaColumns: + """provide schema columns for the cursor, may be filtered by selected columns""" + # TODO: provide column lineage tracing with sqlglot lineage + return self._columns_schema + + def _proxy_expression_method(self, method_name: str, *args: Any, **kwargs: Any) -> Any: + """Proxy method calls to the underlying ibis expression, allowing to wrap the resulting expression in a new relation""" + + # Get the method from the expression + method = getattr(self._ibis_object, method_name) + + # unwrap args and kwargs if they are relations + args = tuple( + arg._ibis_object if isinstance(arg, ReadableIbisRelation) else arg for arg in args + ) + kwargs = { + k: v._ibis_object if isinstance(v, ReadableIbisRelation) else v + for k, v in kwargs.items() + } + + # casefold string params, we assume these are column names + args = tuple( + self.sql_client.capabilities.casefold_identifier(arg) if isinstance(arg, str) else arg + for arg in args + ) + kwargs = { + k: self.sql_client.capabilities.casefold_identifier(v) if isinstance(v, str) else v + for k, v in kwargs.items() + } + + # Call it with provided args + result = method(*args, **kwargs) + + # calculate columns schema for the result, some operations we know will not change the schema + # and select will just reduce the amount of column + columns_schema = None + if method_name == "select": + columns_schema = self._get_filtered_columns_schema(args) + elif method_name in ["filter", "limit", "order_by", "head"]: + columns_schema = self._columns_schema + + # If result is an ibis expression, wrap it in a new relation else return raw result + return self.__class__( + readable_dataset=self._dataset, ibis_object=result, columns_schema=columns_schema + ) + + def __getattr__(self, name: str) -> Any: + """Wrap all callable attributes of the expression""" + + attr = getattr(self._ibis_object, name, None) + + # try casefolded name for ibis columns access + if attr is None: + name = self.sql_client.capabilities.casefold_identifier(name) + attr = getattr(self._ibis_object, name, None) + + if attr is None: + raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{name}'") + + if not callable(attr): + # NOTE: we don't need to forward columns schema for non-callable attributes, these are usually columns + return self.__class__(readable_dataset=self._dataset, ibis_object=attr) + + return partial(self._proxy_expression_method, name) + + def __getitem__(self, columns: Union[str, Sequence[str]]) -> "ReadableIbisRelation": + # casefold column-names + columns = [columns] if isinstance(columns, str) else columns + columns = [self.sql_client.capabilities.casefold_identifier(col) for col in columns] + expr = self._ibis_object[columns] + return self.__class__( + readable_dataset=self._dataset, + ibis_object=expr, + columns_schema=self._get_filtered_columns_schema(columns), + ) + + def _get_filtered_columns_schema(self, columns: Sequence[str]) -> TTableSchemaColumns: + if not self._columns_schema: + return None + try: + return {col: self._columns_schema[col] for col in columns} + except KeyError: + # NOTE: select statements can contain new columns not present in the original schema + # here we just break the column schema inheritance chain + return None + + # forward ibis methods defined on interface + def limit(self, limit: int, **kwargs: Any) -> "ReadableIbisRelation": + """limit the result to 'limit' items""" + return self._proxy_expression_method("limit", limit, **kwargs) # type: ignore + + def head(self, limit: int = 5) -> "ReadableIbisRelation": + """limit the result to 5 items by default""" + return self._proxy_expression_method("head", limit) # type: ignore + + def select(self, *columns: str) -> "ReadableIbisRelation": + """set which columns will be selected""" + return self._proxy_expression_method("select", *columns) # type: ignore + + # forward ibis comparison and math operators + def __lt__(self, other: Any) -> "ReadableIbisRelation": + return self._proxy_expression_method("__lt__", other) # type: ignore + + def __gt__(self, other: Any) -> "ReadableIbisRelation": + return self._proxy_expression_method("__gt__", other) # type: ignore + + def __ge__(self, other: Any) -> "ReadableIbisRelation": + return self._proxy_expression_method("__ge__", other) # type: ignore + + def __le__(self, other: Any) -> "ReadableIbisRelation": + return self._proxy_expression_method("__le__", other) # type: ignore + + def __eq__(self, other: Any) -> bool: + return self._proxy_expression_method("__eq__", other) # type: ignore + + def __ne__(self, other: Any) -> bool: + return self._proxy_expression_method("__ne__", other) # type: ignore + + def __and__(self, other: Any) -> "ReadableIbisRelation": + return self._proxy_expression_method("__and__", other) # type: ignore + + def __or__(self, other: Any) -> "ReadableIbisRelation": + return self._proxy_expression_method("__or__", other) # type: ignore + + def __mul__(self, other: Any) -> "ReadableIbisRelation": + return self._proxy_expression_method("__mul__", other) # type: ignore + + def __div__(self, other: Any) -> "ReadableIbisRelation": + return self._proxy_expression_method("__div__", other) # type: ignore + + def __add__(self, other: Any) -> "ReadableIbisRelation": + return self._proxy_expression_method("__add__", other) # type: ignore + + def __sub__(self, other: Any) -> "ReadableIbisRelation": + return self._proxy_expression_method("__sub__", other) # type: ignore diff --git a/dlt/destinations/dataset/relation.py b/dlt/destinations/dataset/relation.py new file mode 100644 index 0000000000..2cdb7640df --- /dev/null +++ b/dlt/destinations/dataset/relation.py @@ -0,0 +1,207 @@ +from typing import Any, Generator, Sequence, Union, TYPE_CHECKING + +from contextlib import contextmanager + + +from dlt.common.destination.reference import ( + SupportsReadableRelation, +) + +from dlt.destinations.dataset.exceptions import ( + ReadableRelationHasQueryException, + ReadableRelationUnknownColumnException, +) + +from dlt.common.schema.typing import TTableSchemaColumns +from dlt.destinations.sql_client import SqlClientBase +from dlt.common.schema import Schema + +if TYPE_CHECKING: + from dlt.destinations.dataset.dataset import ReadableDBAPIDataset +else: + ReadableDBAPIDataset = Any + + +class BaseReadableDBAPIRelation(SupportsReadableRelation): + def __init__( + self, + *, + readable_dataset: "ReadableDBAPIDataset", + ) -> None: + """Create a lazy evaluated relation to for the dataset of a destination""" + + self._dataset = readable_dataset + + # wire protocol functions + self.df = self._wrap_func("df") # type: ignore + self.arrow = self._wrap_func("arrow") # type: ignore + self.fetchall = self._wrap_func("fetchall") # type: ignore + self.fetchmany = self._wrap_func("fetchmany") # type: ignore + self.fetchone = self._wrap_func("fetchone") # type: ignore + + self.iter_df = self._wrap_iter("iter_df") # type: ignore + self.iter_arrow = self._wrap_iter("iter_arrow") # type: ignore + self.iter_fetch = self._wrap_iter("iter_fetch") # type: ignore + + @property + def sql_client(self) -> SqlClientBase[Any]: + return self._dataset.sql_client + + @property + def schema(self) -> Schema: + return self._dataset.schema + + @property + def query(self) -> Any: + raise NotImplementedError("No query in ReadableDBAPIRelation") + + @contextmanager + def cursor(self) -> Generator[SupportsReadableRelation, Any, Any]: + """Gets a DBApiCursor for the current relation""" + with self.sql_client as client: + # this hacky code is needed for mssql to disable autocommit, read iterators + # will not work otherwise. in the future we should be able to create a readony + # client which will do this automatically + if hasattr(self.sql_client, "_conn") and hasattr(self.sql_client._conn, "autocommit"): + self.sql_client._conn.autocommit = False + with client.execute_query(self.query) as cursor: + if columns_schema := self.columns_schema: + cursor.columns_schema = columns_schema + yield cursor + + def _wrap_iter(self, func_name: str) -> Any: + """wrap SupportsReadableRelation generators in cursor context""" + + def _wrap(*args: Any, **kwargs: Any) -> Any: + with self.cursor() as cursor: + yield from getattr(cursor, func_name)(*args, **kwargs) + + return _wrap + + def _wrap_func(self, func_name: str) -> Any: + """wrap SupportsReadableRelation functions in cursor context""" + + def _wrap(*args: Any, **kwargs: Any) -> Any: + with self.cursor() as cursor: + return getattr(cursor, func_name)(*args, **kwargs) + + return _wrap + + +class ReadableDBAPIRelation(BaseReadableDBAPIRelation): + def __init__( + self, + *, + readable_dataset: "ReadableDBAPIDataset", + provided_query: Any = None, + table_name: str = None, + limit: int = None, + selected_columns: Sequence[str] = None, + ) -> None: + """Create a lazy evaluated relation to for the dataset of a destination""" + + # NOTE: we can keep an assertion here, this class will not be created by the user + assert bool(table_name) != bool( + provided_query + ), "Please provide either an sql query OR a table_name" + + super().__init__(readable_dataset=readable_dataset) + + self._provided_query = provided_query + self._table_name = table_name + self._limit = limit + self._selected_columns = selected_columns + + @property + def query(self) -> Any: + """build the query""" + if self._provided_query: + return self._provided_query + + table_name = self.sql_client.make_qualified_table_name( + self.schema.naming.normalize_path(self._table_name) + ) + + maybe_limit_clause_1 = "" + maybe_limit_clause_2 = "" + if self._limit: + maybe_limit_clause_1, maybe_limit_clause_2 = self.sql_client._limit_clause_sql( + self._limit + ) + + selector = "*" + if self._selected_columns: + selector = ",".join( + [ + self.sql_client.escape_column_name(self.schema.naming.normalize_tables_path(c)) + for c in self._selected_columns + ] + ) + + return f"SELECT {maybe_limit_clause_1} {selector} FROM {table_name} {maybe_limit_clause_2}" + + @property + def columns_schema(self) -> TTableSchemaColumns: + return self.compute_columns_schema() + + @columns_schema.setter + def columns_schema(self, new_value: TTableSchemaColumns) -> None: + raise NotImplementedError("columns schema in ReadableDBAPIRelation can only be computed") + + def compute_columns_schema(self) -> TTableSchemaColumns: + """provide schema columns for the cursor, may be filtered by selected columns""" + + columns_schema = ( + self.schema.tables.get(self._table_name, {}).get("columns", {}) if self.schema else {} + ) + + if not columns_schema: + return None + if not self._selected_columns: + return columns_schema + + filtered_columns: TTableSchemaColumns = {} + for sc in self._selected_columns: + sc = self.schema.naming.normalize_path(sc) + if sc not in columns_schema.keys(): + raise ReadableRelationUnknownColumnException(sc) + filtered_columns[sc] = columns_schema[sc] + + return filtered_columns + + def __copy__(self) -> "ReadableDBAPIRelation": + return self.__class__( + readable_dataset=self._dataset, + provided_query=self._provided_query, + table_name=self._table_name, + limit=self._limit, + selected_columns=self._selected_columns, + ) + + def limit(self, limit: int, **kwargs: Any) -> "ReadableDBAPIRelation": + if self._provided_query: + raise ReadableRelationHasQueryException("limit") + rel = self.__copy__() + rel._limit = limit + return rel + + def select(self, *columns: str) -> "ReadableDBAPIRelation": + if self._provided_query: + raise ReadableRelationHasQueryException("select") + rel = self.__copy__() + rel._selected_columns = columns + # NOTE: the line below will ensure that no unknown columns are selected if + # schema is known + rel.compute_columns_schema() + return rel + + def __getitem__(self, columns: Union[str, Sequence[str]]) -> "SupportsReadableRelation": + if isinstance(columns, str): + return self.select(columns) + elif isinstance(columns, Sequence): + return self.select(*columns) + else: + raise TypeError(f"Invalid argument type: {type(columns).__name__}") + + def head(self, limit: int = 5) -> "ReadableDBAPIRelation": + return self.limit(limit) diff --git a/dlt/destinations/dataset/utils.py b/dlt/destinations/dataset/utils.py new file mode 100644 index 0000000000..766fbc13ea --- /dev/null +++ b/dlt/destinations/dataset/utils.py @@ -0,0 +1,95 @@ +from typing import Tuple + +from dlt import version + +from dlt.common.exceptions import MissingDependencyException + +from dlt.common.destination import AnyDestination +from dlt.common.destination.reference import ( + Destination, + JobClientBase, + DestinationClientDwhConfiguration, + DestinationClientStagingConfiguration, + DestinationClientConfiguration, + DestinationClientDwhWithStagingConfiguration, +) + +from dlt.common.schema import Schema + + +# helpers +def get_destination_client_initial_config( + destination: AnyDestination, + default_schema_name: str, + dataset_name: str, + as_staging: bool = False, +) -> DestinationClientConfiguration: + client_spec = destination.spec + + # this client supports many schemas and datasets + if issubclass(client_spec, DestinationClientDwhConfiguration): + if issubclass(client_spec, DestinationClientStagingConfiguration): + spec: DestinationClientDwhConfiguration = client_spec(as_staging_destination=as_staging) + else: + spec = client_spec() + + spec._bind_dataset_name(dataset_name, default_schema_name) + return spec + + return client_spec() + + +def get_destination_clients( + schema: Schema, + destination: AnyDestination = None, + destination_dataset_name: str = None, + destination_initial_config: DestinationClientConfiguration = None, + staging: AnyDestination = None, + staging_dataset_name: str = None, + staging_initial_config: DestinationClientConfiguration = None, + # pipeline specific settings + default_schema_name: str = None, +) -> Tuple[JobClientBase, JobClientBase]: + destination = Destination.from_reference(destination) if destination else None + staging = Destination.from_reference(staging) if staging else None + + try: + # resolve staging config in order to pass it to destination client config + staging_client = None + if staging: + if not staging_initial_config: + # this is just initial config - without user configuration injected + staging_initial_config = get_destination_client_initial_config( + staging, + dataset_name=staging_dataset_name, + default_schema_name=default_schema_name, + as_staging=True, + ) + # create the client - that will also resolve the config + staging_client = staging.client(schema, staging_initial_config) + + if not destination_initial_config: + # config is not provided then get it with injected credentials + initial_config = get_destination_client_initial_config( + destination, + dataset_name=destination_dataset_name, + default_schema_name=default_schema_name, + ) + + # attach the staging client config to destination client config - if its type supports it + if ( + staging_client + and isinstance(initial_config, DestinationClientDwhWithStagingConfiguration) + and isinstance(staging_client.config, DestinationClientStagingConfiguration) + ): + initial_config.staging_config = staging_client.config + # create instance with initial_config properly set + client = destination.client(schema, initial_config) + return client, staging_client + except ModuleNotFoundError: + client_spec = destination.spec() + raise MissingDependencyException( + f"{client_spec.destination_type} destination", + [f"{version.DLT_PKG_NAME}[{client_spec.destination_type}]"], + "Dependencies for specific destinations are available as extras of dlt", + ) diff --git a/dlt/destinations/impl/sqlalchemy/db_api_client.py b/dlt/destinations/impl/sqlalchemy/db_api_client.py index 6f3ff065bf..27c4f2f1f9 100644 --- a/dlt/destinations/impl/sqlalchemy/db_api_client.py +++ b/dlt/destinations/impl/sqlalchemy/db_api_client.py @@ -84,7 +84,7 @@ def __init__(self, curr: sa.engine.CursorResult) -> None: def _get_columns(self) -> List[str]: try: - return list(self.native_cursor.keys()) # type: ignore[attr-defined] + return list(self.native_cursor.keys()) except ResourceClosedError: # this happens if now rows are returned return [] @@ -314,7 +314,7 @@ def execute_sql( self, sql: Union[AnyStr, sa.sql.Executable], *args: Any, **kwargs: Any ) -> Optional[Sequence[Sequence[Any]]]: with self.execute_query(sql, *args, **kwargs) as cursor: - if cursor.returns_rows: # type: ignore[attr-defined] + if cursor.returns_rows: return cursor.fetchall() return None diff --git a/dlt/common/libs/ibis.py b/dlt/helpers/ibis.py similarity index 74% rename from dlt/common/libs/ibis.py rename to dlt/helpers/ibis.py index ba6f363e66..ed4264dac7 100644 --- a/dlt/common/libs/ibis.py +++ b/dlt/helpers/ibis.py @@ -1,12 +1,14 @@ -from typing import cast +from typing import cast, Any from dlt.common.exceptions import MissingDependencyException - from dlt.common.destination.reference import TDestinationReferenceArg, Destination, JobClientBase +from dlt.common.schema import Schema +from dlt.destinations.sql_client import SqlClientBase try: import ibis # type: ignore - from ibis import BaseBackend + import sqlglot + from ibis import BaseBackend, Expr except ModuleNotFoundError: raise MissingDependencyException("dlt ibis Helpers", ["ibis"]) @@ -29,6 +31,22 @@ ] +# Map dlt data types to ibis data types +DATA_TYPE_MAP = { + "text": "string", + "double": "float64", + "bool": "boolean", + "timestamp": "timestamp", + "bigint": "int64", + "binary": "binary", + "json": "string", # Store JSON as string in ibis + "decimal": "decimal", + "wei": "int64", # Wei is a large integer + "date": "date", + "time": "time", +} + + def create_ibis_backend( destination: TDestinationReferenceArg, client: JobClientBase ) -> BaseBackend: @@ -119,3 +137,37 @@ def create_ibis_backend( con = ibis.duckdb.from_connection(duck) return con + + +def create_unbound_ibis_table( + sql_client: SqlClientBase[Any], schema: Schema, table_name: str +) -> Expr: + """Create an unbound ibis table from a dlt schema""" + + if table_name not in schema.tables: + raise Exception( + f"Table {table_name} not found in schema. Available tables: {schema.tables.keys()}" + ) + table_schema = schema.tables[table_name] + + # Convert dlt table schema columns to ibis schema + ibis_schema = { + sql_client.capabilities.casefold_identifier(col_name): DATA_TYPE_MAP[ + col_info.get("data_type", "string") + ] + for col_name, col_info in table_schema.get("columns", {}).items() + } + + # normalize table name + table_path = sql_client.make_qualified_table_name_path(table_name, escape=False) + + catalog = None + if len(table_path) == 3: + catalog, database, table = table_path + else: + database, table = table_path + + # create unbound ibis table and return in dlt wrapper + unbound_table = ibis.table(schema=ibis_schema, name=table, database=database, catalog=catalog) + + return unbound_table diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index 70d160ea67..9bd2d6911f 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -1751,9 +1751,17 @@ def __getstate__(self) -> Any: return {"pipeline_name": self.pipeline_name} def _dataset( - self, schema: Union[Schema, str, None] = None, dataset_type: TDatasetType = "dbapi" + self, schema: Union[Schema, str, None] = None, dataset_type: TDatasetType = "auto" ) -> SupportsReadableDataset: - """Access helper to dataset""" + """Returns a dataset object for querying the destination data. + + Args: + schema: Schema name or Schema object to use. If None, uses the default schema if set. + dataset_type: Type of dataset interface to return. Defaults to 'auto' which will select ibis if available + otherwise it will fallback to the standard dbapi interface. + Returns: + A dataset object that supports querying the destination data. + """ if schema is None: schema = self.default_schema if self.default_schema_name else None return dataset( diff --git a/docs/website/docs/general-usage/dataset-access/dataset.md b/docs/website/docs/general-usage/dataset-access/dataset.md index 68635383c5..b2e3f03d4d 100644 --- a/docs/website/docs/general-usage/dataset-access/dataset.md +++ b/docs/website/docs/general-usage/dataset-access/dataset.md @@ -156,6 +156,64 @@ You can combine `select`, `limit`, and other methods. arrow_table = items_relation.select("col1", "col2").limit(50).arrow() ``` +## Modifying queries with ibis expressions + +If you install the amazing [ibis](https://ibis-project.org/) library, you can use ibis expressions to modify your queries. + +```sh +pip install ibis-framework +``` + +dlt will then wrap an `ibis.UnboundTable` with a `ReadableIbisRelation` object under the hood that will allow you to modify the query of a reltaion using ibis expressions: + +```py +# now that ibis is installed, we can get a dataset with ibis relations +dataset = pipeline._dataset() + +# get two relations +items_relation = dataset["items"] +order_relation = dataset["orders"] + +# join them using an ibis expression +joined_relation = items_relation.join(order_relation, items_relation.id == order_relation.item_id) + +# now we can use the ibis expression to filter the data +filtered_relation = joined_relation.filter(order_relation.status == "completed") + +# we can inspect the query that will be used to read the data +print(filtered_relation.query) + +# and finally fetch the data as a pandas dataframe, the same way we would do with a normal relation +df = filtered_relation.df() + +# a few more examples + +# filter for rows where the id is in the list of ids +items_relation.filter(items_relation.id.isin([1, 2, 3])).df() + +# limit and offset +items_relation.limit(10, offset=5).arrow() + +# mutate columns by adding a new colums that always is 10 times the value of the id column +items_relation.mutate(new_id=items_relation.id * 10).df() + +# sort asc and desc +import ibis +items_relation.order_by(ibis.desc("id"), ibis.asc("price")).limit(10) + +# group by and aggregate +items_relation.group_by("item_group").having(items_table.count() >= 1000).aggregate(sum_id=items_table.id.sum()).df() + +# subqueries +items_relation.filter(items_table.category.isin(beverage_categories.name)).df() +``` + +You can learn more about the available expressions on the [ibis for sql users](https://ibis-project.org/tutorials/ibis-for-sql-users) page. + +:::note +Keep in mind that you can use only methods that modify the executed query and none of the methods ibis provides for fetching data. This is done with the same methods defined on the regular relations explained above. If you need full native ibis integration, please read the ibis section in the advanced part further down. Additionally, not all ibis expressions may be supported by all destinations and sql dialects. +::: + ## Supported destinations All SQL and filesystem destinations supported by `dlt` can utilize this data access interface. For filesystem destinations, `dlt` [uses **DuckDB** under the hood](./sql-client.md#the-filesystem-sql-client) to create views from Parquet or JSONL files dynamically. This allows you to query data stored in files using the same interface as you would with SQL databases. If you plan on accessing data in buckets or the filesystem a lot this way, it is advised to load data as Parquet instead of JSONL, as **DuckDB** is able to only load the parts of the data actually needed for the query to work. diff --git a/poetry.lock b/poetry.lock index 6232b383c8..749979439d 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand. [[package]] name = "about-time" @@ -776,7 +776,7 @@ files = [ name = "atpublic" version = "5.0" description = "Keep all y'all's __all__'s in sync" -optional = true +optional = false python-versions = ">=3.8" files = [ {file = "atpublic-5.0-py3-none-any.whl", hash = "sha256:b651dcd886666b1042d1e38158a22a4f2c267748f4e97fde94bc492a4a28a3f3"}, @@ -1755,7 +1755,7 @@ PyYAML = ">=3.11" name = "clickhouse-connect" version = "0.7.8" description = "ClickHouse Database Core Driver for Python, Pandas, and Superset" -optional = true +optional = false python-versions = "~=3.8" files = [ {file = "clickhouse-connect-0.7.8.tar.gz", hash = "sha256:dad10ba90eabfe215dfb1fef59f2821a95c752988e66f1093ca8590a51539b8f"}, @@ -2242,7 +2242,7 @@ urllib3 = ">=1.0" name = "db-dtypes" version = "1.3.0" description = "Pandas Data Types for SQL systems (BigQuery, Spanner)" -optional = true +optional = false python-versions = ">=3.7" files = [ {file = "db_dtypes-1.3.0-py2.py3-none-any.whl", hash = "sha256:7e65c59f849ccbe6f7bc4d0253edcc212a7907662906921caba3e4aadd0bc277"}, @@ -3526,7 +3526,7 @@ tqdm = ["tqdm (>=4.7.4,<5.0.0dev)"] name = "google-cloud-bigquery-storage" version = "2.27.0" description = "Google Cloud Bigquery Storage API client library" -optional = true +optional = false python-versions = ">=3.7" files = [ {file = "google_cloud_bigquery_storage-2.27.0-py2.py3-none-any.whl", hash = "sha256:3bfa8f74a61ceaffd3bfe90be5bbef440ad81c1c19ac9075188cccab34bffc2b"}, @@ -4504,63 +4504,64 @@ files = [ [[package]] name = "ibis-framework" -version = "10.0.0.dev256" +version = "9.5.0" description = "The portable Python dataframe library" -optional = true +optional = false python-versions = "<4.0,>=3.10" files = [ - {file = "ibis_framework-10.0.0.dev256-py3-none-any.whl", hash = "sha256:d6f21278e6fd78920bbe986df2c871921142635cc4f7d5d2048cae26e307a3df"}, - {file = "ibis_framework-10.0.0.dev256.tar.gz", hash = "sha256:e9f97d8177fd88f4a3578be20519c1da79a6a7ffac678b46b790bfde67405930"}, + {file = "ibis_framework-9.5.0-py3-none-any.whl", hash = "sha256:145fe30d94f111cff332580c275ce77725c5ff7086eede93af0b371649d009c0"}, + {file = "ibis_framework-9.5.0.tar.gz", hash = "sha256:1c8a29277e63ee0dfc289bc8f550164b5e3bdaec1b76b62436c37d331bb4ef84"}, ] [package.dependencies] atpublic = ">=2.3,<6" clickhouse-connect = {version = ">=0.5.23,<1", extras = ["arrow", "numpy", "pandas"], optional = true, markers = "extra == \"clickhouse\""} db-dtypes = {version = ">=0.3,<2", optional = true, markers = "extra == \"bigquery\""} -duckdb = {version = ">=0.10,<1.2", optional = true, markers = "extra == \"duckdb\""} +duckdb = {version = ">=0.8.1,<1.2", optional = true, markers = "extra == \"duckdb\""} google-cloud-bigquery = {version = ">=3,<4", optional = true, markers = "extra == \"bigquery\""} google-cloud-bigquery-storage = {version = ">=2,<3", optional = true, markers = "extra == \"bigquery\""} -numpy = {version = ">=1.23.2,<3", optional = true, markers = "extra == \"bigquery\" or extra == \"clickhouse\" or extra == \"databricks\" or extra == \"datafusion\" or extra == \"druid\" or extra == \"duckdb\" or extra == \"exasol\" or extra == \"flink\" or extra == \"impala\" or extra == \"mssql\" or extra == \"mysql\" or extra == \"oracle\" or extra == \"polars\" or extra == \"postgres\" or extra == \"pyspark\" or extra == \"snowflake\" or extra == \"sqlite\" or extra == \"risingwave\" or extra == \"trino\""} -packaging = {version = ">=21.3,<25", optional = true, markers = "extra == \"duckdb\" or extra == \"oracle\" or extra == \"polars\" or extra == \"pyspark\""} -pandas = {version = ">=1.5.3,<3", optional = true, markers = "extra == \"bigquery\" or extra == \"clickhouse\" or extra == \"databricks\" or extra == \"datafusion\" or extra == \"druid\" or extra == \"duckdb\" or extra == \"exasol\" or extra == \"flink\" or extra == \"impala\" or extra == \"mssql\" or extra == \"mysql\" or extra == \"oracle\" or extra == \"polars\" or extra == \"postgres\" or extra == \"pyspark\" or extra == \"snowflake\" or extra == \"sqlite\" or extra == \"risingwave\" or extra == \"trino\""} +numpy = {version = ">=1.23.2,<3", optional = true, markers = "extra == \"bigquery\" or extra == \"clickhouse\" or extra == \"dask\" or extra == \"datafusion\" or extra == \"druid\" or extra == \"duckdb\" or extra == \"exasol\" or extra == \"flink\" or extra == \"impala\" or extra == \"mssql\" or extra == \"mysql\" or extra == \"oracle\" or extra == \"pandas\" or extra == \"polars\" or extra == \"postgres\" or extra == \"pyspark\" or extra == \"snowflake\" or extra == \"sqlite\" or extra == \"risingwave\" or extra == \"trino\""} +packaging = {version = ">=21.3,<25", optional = true, markers = "extra == \"dask\" or extra == \"duckdb\" or extra == \"oracle\" or extra == \"pandas\" or extra == \"polars\" or extra == \"pyspark\""} +pandas = {version = ">=1.5.3,<3", optional = true, markers = "extra == \"bigquery\" or extra == \"clickhouse\" or extra == \"dask\" or extra == \"datafusion\" or extra == \"druid\" or extra == \"duckdb\" or extra == \"exasol\" or extra == \"flink\" or extra == \"impala\" or extra == \"mssql\" or extra == \"mysql\" or extra == \"oracle\" or extra == \"pandas\" or extra == \"polars\" or extra == \"postgres\" or extra == \"pyspark\" or extra == \"snowflake\" or extra == \"sqlite\" or extra == \"risingwave\" or extra == \"trino\""} parsy = ">=2,<3" psycopg2 = {version = ">=2.8.4,<3", optional = true, markers = "extra == \"postgres\" or extra == \"risingwave\""} -pyarrow = {version = ">=10.0.1,<19", optional = true, markers = "extra == \"bigquery\" or extra == \"clickhouse\" or extra == \"databricks\" or extra == \"datafusion\" or extra == \"druid\" or extra == \"duckdb\" or extra == \"exasol\" or extra == \"flink\" or extra == \"impala\" or extra == \"mssql\" or extra == \"mysql\" or extra == \"oracle\" or extra == \"polars\" or extra == \"postgres\" or extra == \"pyspark\" or extra == \"snowflake\" or extra == \"sqlite\" or extra == \"risingwave\" or extra == \"trino\""} -pyarrow-hotfix = {version = ">=0.4,<1", optional = true, markers = "extra == \"bigquery\" or extra == \"clickhouse\" or extra == \"databricks\" or extra == \"datafusion\" or extra == \"druid\" or extra == \"duckdb\" or extra == \"exasol\" or extra == \"flink\" or extra == \"impala\" or extra == \"mssql\" or extra == \"mysql\" or extra == \"oracle\" or extra == \"polars\" or extra == \"postgres\" or extra == \"pyspark\" or extra == \"snowflake\" or extra == \"sqlite\" or extra == \"risingwave\" or extra == \"trino\""} +pyarrow = {version = ">=10.0.1,<18", optional = true, markers = "extra == \"bigquery\" or extra == \"clickhouse\" or extra == \"dask\" or extra == \"datafusion\" or extra == \"druid\" or extra == \"duckdb\" or extra == \"exasol\" or extra == \"flink\" or extra == \"impala\" or extra == \"mssql\" or extra == \"mysql\" or extra == \"oracle\" or extra == \"pandas\" or extra == \"polars\" or extra == \"postgres\" or extra == \"pyspark\" or extra == \"snowflake\" or extra == \"sqlite\" or extra == \"risingwave\" or extra == \"trino\""} +pyarrow-hotfix = {version = ">=0.4,<1", optional = true, markers = "extra == \"bigquery\" or extra == \"clickhouse\" or extra == \"dask\" or extra == \"datafusion\" or extra == \"druid\" or extra == \"duckdb\" or extra == \"exasol\" or extra == \"flink\" or extra == \"impala\" or extra == \"mssql\" or extra == \"mysql\" or extra == \"oracle\" or extra == \"pandas\" or extra == \"polars\" or extra == \"postgres\" or extra == \"pyspark\" or extra == \"snowflake\" or extra == \"sqlite\" or extra == \"risingwave\" or extra == \"trino\""} pydata-google-auth = {version = ">=1.4.0,<2", optional = true, markers = "extra == \"bigquery\""} pyodbc = {version = ">=4.0.39,<6", optional = true, markers = "extra == \"mssql\""} python-dateutil = ">=2.8.2,<3" pytz = ">=2022.7" -rich = {version = ">=12.4.4,<14", optional = true, markers = "extra == \"bigquery\" or extra == \"clickhouse\" or extra == \"databricks\" or extra == \"datafusion\" or extra == \"druid\" or extra == \"duckdb\" or extra == \"exasol\" or extra == \"flink\" or extra == \"impala\" or extra == \"mssql\" or extra == \"mysql\" or extra == \"oracle\" or extra == \"polars\" or extra == \"postgres\" or extra == \"pyspark\" or extra == \"snowflake\" or extra == \"sqlite\" or extra == \"risingwave\" or extra == \"trino\""} +rich = {version = ">=12.4.4,<14", optional = true, markers = "extra == \"bigquery\" or extra == \"clickhouse\" or extra == \"dask\" or extra == \"datafusion\" or extra == \"druid\" or extra == \"duckdb\" or extra == \"exasol\" or extra == \"flink\" or extra == \"impala\" or extra == \"mssql\" or extra == \"mysql\" or extra == \"oracle\" or extra == \"pandas\" or extra == \"polars\" or extra == \"postgres\" or extra == \"pyspark\" or extra == \"snowflake\" or extra == \"sqlite\" or extra == \"risingwave\" or extra == \"trino\""} snowflake-connector-python = {version = ">=3.0.2,<3.3.0b1 || >3.3.0b1,<4", optional = true, markers = "extra == \"snowflake\""} -sqlglot = ">=23.4,<25.30" -toolz = ">=0.11,<2" +sqlglot = ">=23.4,<25.21" +toolz = ">=0.11,<1" typing-extensions = ">=4.3.0,<5" [package.extras] -bigquery = ["db-dtypes (>=0.3,<2)", "google-cloud-bigquery (>=3,<4)", "google-cloud-bigquery-storage (>=2,<3)", "numpy (>=1.23.2,<3)", "pandas (>=1.5.3,<3)", "pyarrow (>=10.0.1,<19)", "pyarrow-hotfix (>=0.4,<1)", "pydata-google-auth (>=1.4.0,<2)", "rich (>=12.4.4,<14)"] -clickhouse = ["clickhouse-connect[arrow,numpy,pandas] (>=0.5.23,<1)", "numpy (>=1.23.2,<3)", "pandas (>=1.5.3,<3)", "pyarrow (>=10.0.1,<19)", "pyarrow-hotfix (>=0.4,<1)", "rich (>=12.4.4,<14)"] -databricks = ["databricks-sql-connector-core (>=4,<5)", "numpy (>=1.23.2,<3)", "pandas (>=1.5.3,<3)", "pyarrow (>=10.0.1,<19)", "pyarrow-hotfix (>=0.4,<1)", "rich (>=12.4.4,<14)"] -datafusion = ["datafusion (>=0.6,<43)", "numpy (>=1.23.2,<3)", "pandas (>=1.5.3,<3)", "pyarrow (>=10.0.1,<19)", "pyarrow-hotfix (>=0.4,<1)", "rich (>=12.4.4,<14)"] +bigquery = ["db-dtypes (>=0.3,<2)", "google-cloud-bigquery (>=3,<4)", "google-cloud-bigquery-storage (>=2,<3)", "numpy (>=1.23.2,<3)", "pandas (>=1.5.3,<3)", "pyarrow (>=10.0.1,<18)", "pyarrow-hotfix (>=0.4,<1)", "pydata-google-auth (>=1.4.0,<2)", "rich (>=12.4.4,<14)"] +clickhouse = ["clickhouse-connect[arrow,numpy,pandas] (>=0.5.23,<1)", "numpy (>=1.23.2,<3)", "pandas (>=1.5.3,<3)", "pyarrow (>=10.0.1,<18)", "pyarrow-hotfix (>=0.4,<1)", "rich (>=12.4.4,<14)"] +dask = ["dask[array,dataframe] (>=2022.9.1,<2024.3.0)", "numpy (>=1.23.2,<3)", "packaging (>=21.3,<25)", "pandas (>=1.5.3,<3)", "pyarrow (>=10.0.1,<18)", "pyarrow-hotfix (>=0.4,<1)", "regex (>=2021.7.6)", "rich (>=12.4.4,<14)"] +datafusion = ["datafusion (>=0.6,<41)", "numpy (>=1.23.2,<3)", "pandas (>=1.5.3,<3)", "pyarrow (>=10.0.1,<18)", "pyarrow-hotfix (>=0.4,<1)", "rich (>=12.4.4,<14)"] decompiler = ["black (>=22.1.0,<25)"] deltalake = ["deltalake (>=0.9.0,<1)"] -druid = ["numpy (>=1.23.2,<3)", "pandas (>=1.5.3,<3)", "pyarrow (>=10.0.1,<19)", "pyarrow-hotfix (>=0.4,<1)", "pydruid (>=0.6.7,<1)", "rich (>=12.4.4,<14)"] -duckdb = ["duckdb (>=0.10,<1.2)", "numpy (>=1.23.2,<3)", "packaging (>=21.3,<25)", "pandas (>=1.5.3,<3)", "pyarrow (>=10.0.1,<19)", "pyarrow-hotfix (>=0.4,<1)", "rich (>=12.4.4,<14)"] +druid = ["numpy (>=1.23.2,<3)", "pandas (>=1.5.3,<3)", "pyarrow (>=10.0.1,<18)", "pyarrow-hotfix (>=0.4,<1)", "pydruid (>=0.6.7,<1)", "rich (>=12.4.4,<14)"] +duckdb = ["duckdb (>=0.8.1,<1.2)", "numpy (>=1.23.2,<3)", "packaging (>=21.3,<25)", "pandas (>=1.5.3,<3)", "pyarrow (>=10.0.1,<18)", "pyarrow-hotfix (>=0.4,<1)", "rich (>=12.4.4,<14)"] examples = ["pins[gcs] (>=0.8.3,<1)"] -exasol = ["numpy (>=1.23.2,<3)", "pandas (>=1.5.3,<3)", "pyarrow (>=10.0.1,<19)", "pyarrow-hotfix (>=0.4,<1)", "pyexasol[pandas] (>=0.25.2,<1)", "rich (>=12.4.4,<14)"] -flink = ["numpy (>=1.23.2,<3)", "pandas (>=1.5.3,<3)", "pyarrow (>=10.0.1,<19)", "pyarrow-hotfix (>=0.4,<1)", "rich (>=12.4.4,<14)"] +exasol = ["numpy (>=1.23.2,<3)", "pandas (>=1.5.3,<3)", "pyarrow (>=10.0.1,<18)", "pyarrow-hotfix (>=0.4,<1)", "pyexasol[pandas] (>=0.25.2,<1)", "rich (>=12.4.4,<14)"] +flink = ["numpy (>=1.23.2,<3)", "pandas (>=1.5.3,<3)", "pyarrow (>=10.0.1,<18)", "pyarrow-hotfix (>=0.4,<1)", "rich (>=12.4.4,<14)"] geospatial = ["geoarrow-types (>=0.2,<1)", "geopandas (>=0.6,<2)", "pyproj (>=3.3.0,<4)", "shapely (>=2,<3)"] -impala = ["impyla (>=0.17,<1)", "numpy (>=1.23.2,<3)", "pandas (>=1.5.3,<3)", "pyarrow (>=10.0.1,<19)", "pyarrow-hotfix (>=0.4,<1)", "rich (>=12.4.4,<14)"] -mssql = ["numpy (>=1.23.2,<3)", "pandas (>=1.5.3,<3)", "pyarrow (>=10.0.1,<19)", "pyarrow-hotfix (>=0.4,<1)", "pyodbc (>=4.0.39,<6)", "rich (>=12.4.4,<14)"] -mysql = ["mysqlclient (>=2.2.4,<3)", "numpy (>=1.23.2,<3)", "pandas (>=1.5.3,<3)", "pyarrow (>=10.0.1,<19)", "pyarrow-hotfix (>=0.4,<1)", "rich (>=12.4.4,<14)"] -oracle = ["numpy (>=1.23.2,<3)", "oracledb (>=1.3.1,<3)", "packaging (>=21.3,<25)", "pandas (>=1.5.3,<3)", "pyarrow (>=10.0.1,<19)", "pyarrow-hotfix (>=0.4,<1)", "rich (>=12.4.4,<14)"] -polars = ["numpy (>=1.23.2,<3)", "packaging (>=21.3,<25)", "pandas (>=1.5.3,<3)", "polars (>=1,<2)", "pyarrow (>=10.0.1,<19)", "pyarrow-hotfix (>=0.4,<1)", "rich (>=12.4.4,<14)"] -postgres = ["numpy (>=1.23.2,<3)", "pandas (>=1.5.3,<3)", "psycopg2 (>=2.8.4,<3)", "pyarrow (>=10.0.1,<19)", "pyarrow-hotfix (>=0.4,<1)", "rich (>=12.4.4,<14)"] -pyspark = ["numpy (>=1.23.2,<3)", "packaging (>=21.3,<25)", "pandas (>=1.5.3,<3)", "pyarrow (>=10.0.1,<19)", "pyarrow-hotfix (>=0.4,<1)", "pyspark (>=3.3.3,<4)", "rich (>=12.4.4,<14)"] -risingwave = ["numpy (>=1.23.2,<3)", "pandas (>=1.5.3,<3)", "psycopg2 (>=2.8.4,<3)", "pyarrow (>=10.0.1,<19)", "pyarrow-hotfix (>=0.4,<1)", "rich (>=12.4.4,<14)"] -snowflake = ["numpy (>=1.23.2,<3)", "pandas (>=1.5.3,<3)", "pyarrow (>=10.0.1,<19)", "pyarrow-hotfix (>=0.4,<1)", "rich (>=12.4.4,<14)", "snowflake-connector-python (>=3.0.2,!=3.3.0b1,<4)"] -sqlite = ["numpy (>=1.23.2,<3)", "pandas (>=1.5.3,<3)", "pyarrow (>=10.0.1,<19)", "pyarrow-hotfix (>=0.4,<1)", "regex (>=2021.7.6)", "rich (>=12.4.4,<14)"] -trino = ["numpy (>=1.23.2,<3)", "pandas (>=1.5.3,<3)", "pyarrow (>=10.0.1,<19)", "pyarrow-hotfix (>=0.4,<1)", "rich (>=12.4.4,<14)", "trino (>=0.321,<1)"] +impala = ["impyla (>=0.17,<1)", "numpy (>=1.23.2,<3)", "pandas (>=1.5.3,<3)", "pyarrow (>=10.0.1,<18)", "pyarrow-hotfix (>=0.4,<1)", "rich (>=12.4.4,<14)"] +mssql = ["numpy (>=1.23.2,<3)", "pandas (>=1.5.3,<3)", "pyarrow (>=10.0.1,<18)", "pyarrow-hotfix (>=0.4,<1)", "pyodbc (>=4.0.39,<6)", "rich (>=12.4.4,<14)"] +mysql = ["numpy (>=1.23.2,<3)", "pandas (>=1.5.3,<3)", "pyarrow (>=10.0.1,<18)", "pyarrow-hotfix (>=0.4,<1)", "pymysql (>=1,<2)", "rich (>=12.4.4,<14)"] +oracle = ["numpy (>=1.23.2,<3)", "oracledb (>=1.3.1,<3)", "packaging (>=21.3,<25)", "pandas (>=1.5.3,<3)", "pyarrow (>=10.0.1,<18)", "pyarrow-hotfix (>=0.4,<1)", "rich (>=12.4.4,<14)"] +pandas = ["numpy (>=1.23.2,<3)", "packaging (>=21.3,<25)", "pandas (>=1.5.3,<3)", "pyarrow (>=10.0.1,<18)", "pyarrow-hotfix (>=0.4,<1)", "regex (>=2021.7.6)", "rich (>=12.4.4,<14)"] +polars = ["numpy (>=1.23.2,<3)", "packaging (>=21.3,<25)", "pandas (>=1.5.3,<3)", "polars (>=1,<2)", "pyarrow (>=10.0.1,<18)", "pyarrow-hotfix (>=0.4,<1)", "rich (>=12.4.4,<14)"] +postgres = ["numpy (>=1.23.2,<3)", "pandas (>=1.5.3,<3)", "psycopg2 (>=2.8.4,<3)", "pyarrow (>=10.0.1,<18)", "pyarrow-hotfix (>=0.4,<1)", "rich (>=12.4.4,<14)"] +pyspark = ["numpy (>=1.23.2,<3)", "packaging (>=21.3,<25)", "pandas (>=1.5.3,<3)", "pyarrow (>=10.0.1,<18)", "pyarrow-hotfix (>=0.4,<1)", "pyspark (>=3.3.3,<4)", "rich (>=12.4.4,<14)"] +risingwave = ["numpy (>=1.23.2,<3)", "pandas (>=1.5.3,<3)", "psycopg2 (>=2.8.4,<3)", "pyarrow (>=10.0.1,<18)", "pyarrow-hotfix (>=0.4,<1)", "rich (>=12.4.4,<14)"] +snowflake = ["numpy (>=1.23.2,<3)", "pandas (>=1.5.3,<3)", "pyarrow (>=10.0.1,<18)", "pyarrow-hotfix (>=0.4,<1)", "rich (>=12.4.4,<14)", "snowflake-connector-python (>=3.0.2,!=3.3.0b1,<4)"] +sqlite = ["numpy (>=1.23.2,<3)", "pandas (>=1.5.3,<3)", "pyarrow (>=10.0.1,<18)", "pyarrow-hotfix (>=0.4,<1)", "regex (>=2021.7.6)", "rich (>=12.4.4,<14)"] +trino = ["numpy (>=1.23.2,<3)", "pandas (>=1.5.3,<3)", "pyarrow (>=10.0.1,<18)", "pyarrow-hotfix (>=0.4,<1)", "rich (>=12.4.4,<14)", "trino (>=0.321,<1)"] visualization = ["graphviz (>=0.16,<1)"] [[package]] @@ -5212,7 +5213,7 @@ source = ["Cython (>=0.29.35)"] name = "lz4" version = "4.3.3" description = "LZ4 Bindings for Python" -optional = true +optional = false python-versions = ">=3.8" files = [ {file = "lz4-4.3.3-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:b891880c187e96339474af2a3b2bfb11a8e4732ff5034be919aa9029484cd201"}, @@ -6645,7 +6646,7 @@ future = "*" name = "parsy" version = "2.1" description = "Easy-to-use parser combinators, for parsing in pure Python" -optional = true +optional = false python-versions = ">=3.7" files = [ {file = "parsy-2.1-py3-none-any.whl", hash = "sha256:8f18e7b11985e7802e7e3ecbd8291c6ca243d29820b1186e4c84605db4efffa0"}, @@ -7080,7 +7081,7 @@ test = ["enum34", "ipaddress", "mock", "pywin32", "wmi"] name = "psycopg2" version = "2.9.10" description = "psycopg2 - Python-PostgreSQL Database Adapter" -optional = true +optional = false python-versions = ">=3.8" files = [ {file = "psycopg2-2.9.10-cp310-cp310-win32.whl", hash = "sha256:5df2b672140f95adb453af93a7d669d7a7bf0a56bcd26f1502329166f4a61716"}, @@ -7243,7 +7244,7 @@ test = ["cffi", "hypothesis", "pandas", "pytest", "pytz"] name = "pyarrow-hotfix" version = "0.6" description = "" -optional = true +optional = false python-versions = ">=3.5" files = [ {file = "pyarrow_hotfix-0.6-py3-none-any.whl", hash = "sha256:dcc9ae2d220dff0083be6a9aa8e0cdee5182ad358d4931fce825c545e5c89178"}, @@ -7456,7 +7457,7 @@ typing-extensions = ">=4.6.0,<4.7.0 || >4.7.0" name = "pydata-google-auth" version = "1.9.0" description = "PyData helpers for authenticating to Google APIs" -optional = true +optional = false python-versions = ">=3.9" files = [ {file = "pydata-google-auth-1.9.0.tar.gz", hash = "sha256:2f546e88f007dfdb050087556eb46d6008e351386a7b368096797fae5df374f2"}, @@ -9265,13 +9266,13 @@ typing-extensions = "*" [[package]] name = "sqlglot" -version = "25.24.5" +version = "25.20.2" description = "An easily customizable SQL parser and transpiler" optional = false python-versions = ">=3.7" files = [ - {file = "sqlglot-25.24.5-py3-none-any.whl", hash = "sha256:f8a8870d1f5cdd2e2dc5c39a5030a0c7b0a91264fb8972caead3dac8e8438873"}, - {file = "sqlglot-25.24.5.tar.gz", hash = "sha256:6d3d604034301ca3b614d6b4148646b4033317b7a93d1801e9661495eb4b4fcf"}, + {file = "sqlglot-25.20.2-py3-none-any.whl", hash = "sha256:cdbfd7ce3f2f39f32bd7b4c23fd9e0fd261636a6b14285b914e8def25fd0a567"}, + {file = "sqlglot-25.20.2.tar.gz", hash = "sha256:169fe8308dd70d7bd40117b2221b62bdc7c4e2ea8eb07394b2a6146cdedf05ab"}, ] [package.extras] @@ -9648,13 +9649,13 @@ files = [ [[package]] name = "toolz" -version = "1.0.0" +version = "0.12.1" description = "List processing tools and functional utilities" -optional = true -python-versions = ">=3.8" +optional = false +python-versions = ">=3.7" files = [ - {file = "toolz-1.0.0-py3-none-any.whl", hash = "sha256:292c8f1c4e7516bf9086f8850935c799a874039c8bcf959d47b600e4c44a6236"}, - {file = "toolz-1.0.0.tar.gz", hash = "sha256:2c86e3d9a04798ac556793bced838816296a2f085017664e4995cb40a1047a02"}, + {file = "toolz-0.12.1-py3-none-any.whl", hash = "sha256:d22731364c07d72eea0a0ad45bafb2c2937ab6fd38a3507bf55eae8744aa7d85"}, + {file = "toolz-0.12.1.tar.gz", hash = "sha256:ecca342664893f177a13dac0e6b41cbd8ac25a358e5f215316d43e2100224f4d"}, ] [[package]] @@ -10529,7 +10530,7 @@ testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "p name = "zstandard" version = "0.22.0" description = "Zstandard bindings for Python" -optional = true +optional = false python-versions = ">=3.8" files = [ {file = "zstandard-0.22.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:275df437ab03f8c033b8a2c181e51716c32d831082d93ce48002a5227ec93019"}, @@ -10618,4 +10619,4 @@ weaviate = ["weaviate-client"] [metadata] lock-version = "2.0" python-versions = ">=3.8.1,<3.13" -content-hash = "1bf3deccd929c083b880c1a82be0983430ab49f7ade247b1c5573bb8c70d9ff5" +content-hash = "a7cd6b599326d80b5beb8d4a3d3e3b4074eda6dc53daa5c296ef8d54002c5f78" diff --git a/pyproject.toml b/pyproject.toml index f736fc65ad..0fb7f94e36 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -167,7 +167,6 @@ pytest-mock = "^3.14.0" types-regex = "^2024.5.15.20240519" flake8-print = "^5.0.0" mimesis = "^7.0.0" -ibis-framework = { version = ">=9.0.0", markers = "python_version >= '3.10'", optional = true, extras = ["duckdb", "postgres", "bigquery", "snowflake", "mssql", "clickhouse"]} shapely = ">=2.0.6" [tool.poetry.group.sources] @@ -205,6 +204,12 @@ optional = true [tool.poetry.group.airflow.dependencies] apache-airflow = {version = "^2.8.0", markers = "python_version < '3.12'"} +[tool.poetry.group.ibis] +optional = true + +[tool.poetry.group.ibis.dependencies] +ibis-framework = { version = ">=9.0.0,<10.0.0", markers = "python_version >= '3.10'", extras = ["duckdb", "postgres", "bigquery", "snowflake", "mssql", "clickhouse"]} + [tool.poetry.group.providers] optional = true diff --git a/tests/destinations/test_readable_dbapi_dataset.py b/tests/destinations/test_readable_dbapi_dataset.py index 4745735371..bc58a18fa0 100644 --- a/tests/destinations/test_readable_dbapi_dataset.py +++ b/tests/destinations/test_readable_dbapi_dataset.py @@ -2,7 +2,7 @@ import dlt import pytest -from dlt.destinations.dataset import ( +from dlt.destinations.dataset.exceptions import ( ReadableRelationHasQueryException, ReadableRelationUnknownColumnException, ) @@ -12,44 +12,44 @@ def test_query_builder() -> None: dataset = dlt.pipeline(destination="duckdb", pipeline_name="pipeline")._dataset() # default query for a table - assert dataset.my_table.query.strip() == 'SELECT * FROM "pipeline_dataset"."my_table"' # type: ignore[attr-defined] + assert dataset.my_table.query.strip() == 'SELECT * FROM "pipeline_dataset"."my_table"' # head query assert ( - dataset.my_table.head().query.strip() # type: ignore[attr-defined] + dataset.my_table.head().query.strip() == 'SELECT * FROM "pipeline_dataset"."my_table" LIMIT 5' ) # limit query assert ( - dataset.my_table.limit(24).query.strip() # type: ignore[attr-defined] + dataset.my_table.limit(24).query.strip() == 'SELECT * FROM "pipeline_dataset"."my_table" LIMIT 24' ) # select columns assert ( - dataset.my_table.select("col1", "col2").query.strip() # type: ignore[attr-defined] + dataset.my_table.select("col1", "col2").query.strip() == 'SELECT "col1","col2" FROM "pipeline_dataset"."my_table"' ) # also indexer notation assert ( - dataset.my_table[["col1", "col2"]].query.strip() # type: ignore[attr-defined] + dataset.my_table[["col1", "col2"]].query.strip() == 'SELECT "col1","col2" FROM "pipeline_dataset"."my_table"' ) # identifiers are normalized assert ( - dataset["MY_TABLE"].select("CoL1", "cOl2").query.strip() # type: ignore[attr-defined] + dataset["MY_TABLE"].select("CoL1", "cOl2").query.strip() == 'SELECT "co_l1","c_ol2" FROM "pipeline_dataset"."my_table"' ) assert ( - dataset["MY__TABLE"].select("Co__L1", "cOl2").query.strip() # type: ignore[attr-defined] + dataset["MY__TABLE"].select("Co__L1", "cOl2").query.strip() == 'SELECT "co__l1","c_ol2" FROM "pipeline_dataset"."my__table"' ) # limit and select chained assert ( - dataset.my_table.select("col1", "col2").limit(24).query.strip() # type: ignore[attr-defined] + dataset.my_table.select("col1", "col2").limit(24).query.strip() == 'SELECT "col1","col2" FROM "pipeline_dataset"."my_table" LIMIT 24' ) @@ -65,18 +65,18 @@ def test_copy_and_chaining() -> None: relation2 = relation.__copy__() assert relation != relation2 - assert relation._limit == relation2._limit # type: ignore[attr-defined] - assert relation._table_name == relation2._table_name # type: ignore[attr-defined] - assert relation._provided_query == relation2._provided_query # type: ignore[attr-defined] - assert relation._selected_columns == relation2._selected_columns # type: ignore[attr-defined] + assert relation._limit == relation2._limit + assert relation._table_name == relation2._table_name + assert relation._provided_query == relation2._provided_query + assert relation._selected_columns == relation2._selected_columns # test copy while chaining limit relation3 = relation2.limit(22) assert relation2 != relation3 - assert relation2._limit != relation3._limit # type: ignore[attr-defined] + assert relation2._limit != relation3._limit # test last setting prevails chaining - assert relation.limit(23).limit(67).limit(11)._limit == 11 # type: ignore[attr-defined] + assert relation.limit(23).limit(67).limit(11)._limit == 11 def test_computed_schema_columns() -> None: diff --git a/tests/load/pipeline/test_duckdb.py b/tests/load/pipeline/test_duckdb.py index 98642bb263..a7aa4d36e4 100644 --- a/tests/load/pipeline/test_duckdb.py +++ b/tests/load/pipeline/test_duckdb.py @@ -283,8 +283,8 @@ def test_duckdb_credentials_separation( print(p1_dataset.p1_data.fetchall()) print(p2_dataset.p2_data.fetchall()) - assert "p1" in p1_dataset.sql_client.credentials._conn_str() # type: ignore[attr-defined] - assert "p2" in p2_dataset.sql_client.credentials._conn_str() # type: ignore[attr-defined] + assert "p1" in p1_dataset.sql_client.credentials._conn_str() + assert "p2" in p2_dataset.sql_client.credentials._conn_str() - assert p1_dataset.sql_client.credentials.bound_to_pipeline is p1 # type: ignore[attr-defined] - assert p2_dataset.sql_client.credentials.bound_to_pipeline is p2 # type: ignore[attr-defined] + assert p1_dataset.sql_client.credentials.bound_to_pipeline is p1 + assert p2_dataset.sql_client.credentials.bound_to_pipeline is p2 diff --git a/tests/load/test_read_interfaces.py b/tests/load/test_read_interfaces.py index 1a9c8a383b..d2f5f7951e 100644 --- a/tests/load/test_read_interfaces.py +++ b/tests/load/test_read_interfaces.py @@ -1,5 +1,5 @@ -from typing import Any, cast - +from typing import Any, cast, Tuple, List +import re import pytest import dlt import os @@ -20,8 +20,10 @@ ) from dlt.destinations import filesystem from tests.utils import TEST_STORAGE_ROOT, clean_test_storage -from dlt.common.destination.reference import TDestinationReferenceArg -from dlt.destinations.dataset import ReadableDBAPIDataset, ReadableRelationUnknownColumnException +from dlt.destinations.dataset.dataset import ReadableDBAPIDataset +from dlt.destinations.dataset.exceptions import ( + ReadableRelationUnknownColumnException, +) from tests.load.utils import drop_pipeline_data EXPECTED_COLUMNS = ["id", "decimal", "other_decimal", "_dlt_load_id", "_dlt_id"] @@ -58,6 +60,7 @@ def autouse_test_storage() -> FileStorage: @pytest.fixture(scope="session") def populated_pipeline(request, autouse_test_storage) -> Any: """fixture that returns a pipeline object populated with the example data""" + destination_config = cast(DestinationTestConfiguration, request.param) if ( @@ -104,6 +107,7 @@ def items(): columns={ "id": {"data_type": "bigint"}, "double_id": {"data_type": "bigint"}, + "di_decimal": {"data_type": "decimal", "precision": 7, "scale": 3}, }, ) def double_items(): @@ -111,6 +115,7 @@ def double_items(): { "id": i, "double_id": i * 2, + "di_decimal": Decimal("10.433"), } for i in range(total_records) ] @@ -151,6 +156,24 @@ def double_items(): ) +@pytest.mark.no_load +@pytest.mark.essential +@pytest.mark.parametrize( + "populated_pipeline", + configs, + indirect=True, + ids=lambda x: x.name, +) +def test_explicit_dataset_type_selection(populated_pipeline: Pipeline): + from dlt.destinations.dataset.dataset import ReadableDBAPIRelation + from dlt.destinations.dataset.ibis_relation import ReadableIbisRelation + + assert isinstance( + populated_pipeline._dataset(dataset_type="default").items, ReadableDBAPIRelation + ) + assert isinstance(populated_pipeline._dataset(dataset_type="ibis").items, ReadableIbisRelation) + + @pytest.mark.no_load @pytest.mark.essential @pytest.mark.parametrize( @@ -258,71 +281,6 @@ def test_db_cursor_access(populated_pipeline: Pipeline) -> None: assert set(ids) == set(range(total_records)) -@pytest.mark.no_load -@pytest.mark.essential -@pytest.mark.parametrize( - "populated_pipeline", - configs, - indirect=True, - ids=lambda x: x.name, -) -def test_ibis_dataset_access(populated_pipeline: Pipeline) -> None: - # NOTE: we could generalize this with a context for certain deps - import subprocess - - subprocess.check_call( - ["pip", "install", "ibis-framework[duckdb,postgres,bigquery,snowflake,mssql,clickhouse]"] - ) - - from dlt.common.libs.ibis import SUPPORTED_DESTINATIONS - - # check correct error if not supported - if populated_pipeline.destination.destination_type not in SUPPORTED_DESTINATIONS: - with pytest.raises(NotImplementedError): - populated_pipeline._dataset().ibis() - return - - total_records = _total_records(populated_pipeline) - ibis_connection = populated_pipeline._dataset().ibis() - - map_i = lambda x: x - if populated_pipeline.destination.destination_type == "dlt.destinations.snowflake": - map_i = lambda x: x.upper() - - dataset_name = map_i(populated_pipeline.dataset_name) - table_like_statement = None - table_name_prefix = "" - addtional_tables = [] - - # clickhouse has no datasets, but table prefixes and a sentinel table - if populated_pipeline.destination.destination_type == "dlt.destinations.clickhouse": - table_like_statement = dataset_name + "." - table_name_prefix = dataset_name + "___" - dataset_name = None - addtional_tables = ["dlt_sentinel_table"] - - add_table_prefix = lambda x: table_name_prefix + x - - # just do a basic check to see wether ibis can connect - assert set(ibis_connection.list_tables(database=dataset_name, like=table_like_statement)) == { - add_table_prefix(map_i(x)) - for x in ( - [ - "_dlt_loads", - "_dlt_pipeline_state", - "_dlt_version", - "double_items", - "items", - "items__children", - ] - + addtional_tables - ) - } - - items_table = ibis_connection.table(add_table_prefix(map_i("items")), database=dataset_name) - assert items_table.count().to_pandas() == total_records - - @pytest.mark.no_load @pytest.mark.essential @pytest.mark.parametrize( @@ -332,7 +290,8 @@ def test_ibis_dataset_access(populated_pipeline: Pipeline) -> None: ids=lambda x: x.name, ) def test_hint_preservation(populated_pipeline: Pipeline) -> None: - table_relationship = populated_pipeline._dataset().items + # NOTE: for now hints are only preserved for the default dataset + table_relationship = populated_pipeline._dataset(dataset_type="default").items # check that hints are carried over to arrow table expected_decimal_precision = 10 expected_decimal_precision_2 = 12 @@ -425,8 +384,7 @@ def test_limit_and_head(populated_pipeline: Pipeline) -> None: ids=lambda x: x.name, ) def test_column_selection(populated_pipeline: Pipeline) -> None: - table_relationship = populated_pipeline._dataset().items - + table_relationship = populated_pipeline._dataset(dataset_type="default").items columns = ["_dlt_load_id", "other_decimal"] data_frame = table_relationship.select(*columns).head().df() assert [v.lower() for v in data_frame.columns.values] == columns @@ -479,6 +437,266 @@ def test_schema_arg(populated_pipeline: Pipeline) -> None: assert "items" in dataset.schema.tables +@pytest.mark.no_load +@pytest.mark.essential +@pytest.mark.parametrize( + "populated_pipeline", + configs, + indirect=True, + ids=lambda x: x.name, +) +def test_ibis_expression_relation(populated_pipeline: Pipeline) -> None: + # NOTE: we could generalize this with a context for certain deps + import ibis # type: ignore + + # now we should get the more powerful ibis relation + dataset = populated_pipeline._dataset() + total_records = _total_records(populated_pipeline) + + items_table = dataset["items"] + double_items_table = dataset["double_items"] + + # check full table access + df = items_table.df() + assert len(df.index) == total_records + + df = double_items_table.df() + assert len(df.index) == total_records + + # check limit + df = items_table.limit(5).df() + assert len(df.index) == 5 + + # check chained expression with join, column selection, order by and limit + joined_table = ( + items_table.join(double_items_table, items_table.id == double_items_table.id)[ + ["id", "double_id"] + ] + .order_by("id") + .limit(20) + ) + table = joined_table.fetchall() + assert len(table) == 20 + assert list(table[0]) == [0, 0] + assert list(table[5]) == [5, 10] + assert list(table[10]) == [10, 20] + + # check aggregate of first 20 items + agg_table = items_table.order_by("id").limit(20).aggregate(sum_id=items_table.id.sum()) + assert agg_table.fetchone()[0] == reduce(lambda a, b: a + b, range(20)) + + # check filtering + filtered_table = items_table.filter(items_table.id < 10) + assert len(filtered_table.fetchall()) == 10 + + if populated_pipeline.destination.destination_type != "dlt.destinations.duckdb": + return + + # we check a bunch of expressions without executing them to see that they produce correct sql + # also we return the keys of the disovered schema columns + def sql_from_expr(expr: Any) -> Tuple[str, List[str]]: + query = str(expr.query).replace(populated_pipeline.dataset_name, "dataset") + columns = list(expr.columns_schema.keys()) if expr.columns_schema else None + return re.sub(r"\s+", " ", query), columns + + # test all functions discussed here: https://ibis-project.org/tutorials/ibis-for-sql-users + ALL_COLUMNS = ["id", "decimal", "other_decimal", "_dlt_load_id", "_dlt_id"] + + # selecting two columns + assert sql_from_expr(items_table.select("id", "decimal")) == ( + 'SELECT "t0"."id", "t0"."decimal" FROM "dataset"."items" AS "t0"', + ["id", "decimal"], + ) + + # selecting all columns + assert sql_from_expr(items_table) == ('SELECT * FROM "dataset"."items"', ALL_COLUMNS) + + # selecting two other columns via item getter + assert sql_from_expr(items_table["id", "decimal"]) == ( + 'SELECT "t0"."id", "t0"."decimal" FROM "dataset"."items" AS "t0"', + ["id", "decimal"], + ) + + # adding a new columns + new_col = (items_table.id * 2).name("new_col") + assert sql_from_expr(items_table.select("id", "decimal", new_col)) == ( + ( + 'SELECT "t0"."id", "t0"."decimal", "t0"."id" * 2 AS "new_col" FROM' + ' "dataset"."items" AS "t0"' + ), + None, + ) + + # mutating table (add a new column computed from existing columns) + assert sql_from_expr( + items_table.mutate(double_id=items_table.id * 2).select("id", "double_id") + ) == ( + 'SELECT "t0"."id", "t0"."id" * 2 AS "double_id" FROM "dataset"."items" AS "t0"', + None, + ) + + # mutating table add new static column + assert sql_from_expr( + items_table.mutate(new_col=ibis.literal("static_value")).select("id", "new_col") + ) == ('SELECT "t0"."id", \'static_value\' AS "new_col" FROM "dataset"."items" AS "t0"', None) + + # check filtering (preserves all columns) + assert sql_from_expr(items_table.filter(items_table.id < 10)) == ( + 'SELECT * FROM "dataset"."items" AS "t0" WHERE "t0"."id" < 10', + ALL_COLUMNS, + ) + + # filtering and selecting a single column + assert sql_from_expr(items_table.filter(items_table.id < 10).select("id")) == ( + 'SELECT "t0"."id" FROM "dataset"."items" AS "t0" WHERE "t0"."id" < 10', + ["id"], + ) + + # check filter "and" condition + assert sql_from_expr(items_table.filter(items_table.id < 10).filter(items_table.id > 5)) == ( + 'SELECT * FROM "dataset"."items" AS "t0" WHERE "t0"."id" < 10 AND "t0"."id" > 5', + ALL_COLUMNS, + ) + + # check filter "or" condition + assert sql_from_expr(items_table.filter((items_table.id < 10) | (items_table.id > 5))) == ( + 'SELECT * FROM "dataset"."items" AS "t0" WHERE ( "t0"."id" < 10 ) OR ( "t0"."id" > 5 )', + ALL_COLUMNS, + ) + + # check group by and aggregate + assert sql_from_expr( + items_table.group_by("id") + .having(items_table.count() >= 1000) + .aggregate(sum_id=items_table.id.sum()) + ) == ( + ( + 'SELECT "t1"."id", "t1"."sum_id" FROM ( SELECT "t0"."id", SUM("t0"."id") AS "sum_id",' + ' COUNT(*) AS "CountStar(items)" FROM "dataset"."items" AS "t0" GROUP BY 1 ) AS "t1"' + ' WHERE "t1"."CountStar(items)" >= 1000' + ), + None, + ) + + # sorting and ordering + assert sql_from_expr(items_table.order_by("id", "decimal").limit(10)) == ( + ( + 'SELECT * FROM "dataset"."items" AS "t0" ORDER BY "t0"."id" ASC, "t0"."decimal" ASC' + " LIMIT 10" + ), + ALL_COLUMNS, + ) + + # sort desc and asc + assert sql_from_expr(items_table.order_by(ibis.desc("id"), ibis.asc("decimal")).limit(10)) == ( + ( + 'SELECT * FROM "dataset"."items" AS "t0" ORDER BY "t0"."id" DESC, "t0"."decimal" ASC' + " LIMIT 10" + ), + ALL_COLUMNS, + ) + + # offset and limit + assert sql_from_expr(items_table.order_by("id").limit(10, offset=5)) == ( + 'SELECT * FROM "dataset"."items" AS "t0" ORDER BY "t0"."id" ASC LIMIT 10 OFFSET 5', + ALL_COLUMNS, + ) + + # join + assert sql_from_expr( + items_table.join(double_items_table, items_table.id == double_items_table.id)[ + ["id", "double_id"] + ] + ) == ( + ( + 'SELECT "t2"."id", "t3"."double_id" FROM "dataset"."items" AS "t2" INNER JOIN' + ' "dataset"."double_items" AS "t3" ON "t2"."id" = "t3"."id"' + ), + None, + ) + + # subqueries + assert sql_from_expr( + items_table.filter(items_table.decimal.isin(double_items_table.di_decimal)) + ) == ( + ( + 'SELECT * FROM "dataset"."items" AS "t0" WHERE "t0"."decimal" IN ( SELECT' + ' "t1"."di_decimal" FROM "dataset"."double_items" AS "t1" )' + ), + ALL_COLUMNS, + ) + + # topk + assert sql_from_expr(items_table.decimal.topk(10)) == ( + ( + 'SELECT * FROM ( SELECT "t0"."decimal", COUNT(*) AS "CountStar(items)" FROM' + ' "dataset"."items" AS "t0" GROUP BY 1 ) AS "t1" ORDER BY "t1"."CountStar(items)" DESC' + " LIMIT 10" + ), + None, + ) + + +@pytest.mark.no_load +@pytest.mark.essential +@pytest.mark.parametrize( + "populated_pipeline", + configs, + indirect=True, + ids=lambda x: x.name, +) +def test_ibis_dataset_access(populated_pipeline: Pipeline) -> None: + # NOTE: we could generalize this with a context for certain deps + + from dlt.helpers.ibis import SUPPORTED_DESTINATIONS + + # check correct error if not supported + if populated_pipeline.destination.destination_type not in SUPPORTED_DESTINATIONS: + with pytest.raises(NotImplementedError): + populated_pipeline._dataset().ibis() + return + + total_records = _total_records(populated_pipeline) + ibis_connection = populated_pipeline._dataset().ibis() + + map_i = lambda x: x + if populated_pipeline.destination.destination_type == "dlt.destinations.snowflake": + map_i = lambda x: x.upper() + + dataset_name = map_i(populated_pipeline.dataset_name) + table_like_statement = None + table_name_prefix = "" + addtional_tables = [] + + # clickhouse has no datasets, but table prefixes and a sentinel table + if populated_pipeline.destination.destination_type == "dlt.destinations.clickhouse": + table_like_statement = dataset_name + "." + table_name_prefix = dataset_name + "___" + dataset_name = None + addtional_tables = ["dlt_sentinel_table"] + + add_table_prefix = lambda x: table_name_prefix + x + + # just do a basic check to see wether ibis can connect + assert set(ibis_connection.list_tables(database=dataset_name, like=table_like_statement)) == { + add_table_prefix(map_i(x)) + for x in ( + [ + "_dlt_loads", + "_dlt_pipeline_state", + "_dlt_version", + "double_items", + "items", + "items__children", + ] + + addtional_tables + ) + } + + items_table = ibis_connection.table(add_table_prefix(map_i("items")), database=dataset_name) + assert items_table.count().to_pandas() == total_records + + @pytest.mark.no_load @pytest.mark.essential @pytest.mark.parametrize( @@ -546,6 +764,7 @@ def test_standalone_dataset(populated_pipeline: Pipeline) -> None: assert dataset.schema.name == "unknown_dataset" assert "items" not in dataset.schema.tables + # NOTE: this breaks the following test, it will need to be fixed somehow # create a newer schema with different name and see wether this is loaded from dlt.common.schema import Schema from dlt.common.schema import utils