Skip to content

Commit

Permalink
ibis support - hand over credentials to ibis backend for a number of …
Browse files Browse the repository at this point in the history
…destinations (#2004)

* add PoC for ibis table support on readabledbapidataset

* add PoC for exposing an ibis backend for a destination

* install ibis dependency for tests

* add support for filesystem

* remove print statments

* remove ibis tables from dbapirelation

* clean up interfaces

* move backend creation and skip tests for unsupported backend

* fix dependencies and typing

* mark import not found, can't be linted on 3.8 and 3.9

* add snowflake and bigquery support

* add redshift and maybe fix linter

* fix linter

* remove unneeded dependency

* add in missing pipeline drop

* fix snowflake table access test

* add mssql support

* enable synapse

* add clickhouse support

* enable motherduck

* post rebase lock file update

* enable motherduck

* add missing ibis framework extras

* remove argument of create ibis backend

* extract destination client factories into dataset file

* fix partial loading example

* fix setting of default schema name in destination config

* fix default dataset for staging destination

* post rebase lockfile update

* always set azure transport connection
  • Loading branch information
sh-rp authored Nov 23, 2024
1 parent 6e0510a commit dfde071
Show file tree
Hide file tree
Showing 13 changed files with 635 additions and 109 deletions.
10 changes: 9 additions & 1 deletion dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,16 @@
try:
from dlt.common.libs.pandas import DataFrame
from dlt.common.libs.pyarrow import Table as ArrowTable
from dlt.common.libs.ibis import BaseBackend as IbisBackend
except MissingDependencyException:
DataFrame = Any
ArrowTable = Any
IbisBackend = Any

else:
DataFrame = Any
ArrowTable = Any
IbisBackend = Any


class StorageSchemaInfo(NamedTuple):
Expand Down Expand Up @@ -291,7 +295,6 @@ def _make_dataset_name(self, schema_name: str) -> str:
# if default schema is None then suffix is not added
if self.default_schema_name is not None and schema_name != self.default_schema_name:
return (self.dataset_name or "") + "_" + schema_name

return self.dataset_name


Expand Down Expand Up @@ -575,12 +578,17 @@ def close(self) -> None: ...
class SupportsReadableDataset(Protocol):
"""A readable dataset retrieved from a destination, has support for creating readable relations for a query or table"""

@property
def schema(self) -> Schema: ...

def __call__(self, query: Any) -> SupportsReadableRelation: ...

def __getitem__(self, table: str) -> SupportsReadableRelation: ...

def __getattr__(self, table: str) -> SupportsReadableRelation: ...

def ibis(self) -> IbisBackend: ...


class JobClientBase(ABC):
def __init__(
Expand Down
121 changes: 121 additions & 0 deletions dlt/common/libs/ibis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
from typing import cast

from dlt.common.exceptions import MissingDependencyException

from dlt.common.destination.reference import TDestinationReferenceArg, Destination, JobClientBase

try:
import ibis # type: ignore
from ibis import BaseBackend
except ModuleNotFoundError:
raise MissingDependencyException("dlt ibis Helpers", ["ibis"])


SUPPORTED_DESTINATIONS = [
"dlt.destinations.postgres",
"dlt.destinations.duckdb",
"dlt.destinations.motherduck",
"dlt.destinations.filesystem",
"dlt.destinations.bigquery",
"dlt.destinations.snowflake",
"dlt.destinations.redshift",
"dlt.destinations.mssql",
"dlt.destinations.synapse",
"dlt.destinations.clickhouse",
# NOTE: Athena could theoretically work with trino backend, but according to
# https://github.com/ibis-project/ibis/issues/7682 connecting with aws credentials
# does not work yet.
# "dlt.destinations.athena",
]


def create_ibis_backend(
destination: TDestinationReferenceArg, client: JobClientBase
) -> BaseBackend:
"""Create a given ibis backend for a destination client and dataset"""

# check if destination is supported
destination_type = Destination.from_reference(destination).destination_type
if destination_type not in SUPPORTED_DESTINATIONS:
raise NotImplementedError(f"Destination of type {destination_type} not supported by ibis.")

if destination_type in ["dlt.destinations.motherduck", "dlt.destinations.duckdb"]:
import duckdb
from dlt.destinations.impl.duckdb.duck import DuckDbClient

duck_client = cast(DuckDbClient, client)
duck = duckdb.connect(
database=duck_client.config.credentials._conn_str(),
read_only=duck_client.config.credentials.read_only,
config=duck_client.config.credentials._get_conn_config(),
)
con = ibis.duckdb.from_connection(duck)
elif destination_type in [
"dlt.destinations.postgres",
"dlt.destinations.redshift",
]:
credentials = client.config.credentials.to_native_representation()
con = ibis.connect(credentials)
elif destination_type == "dlt.destinations.snowflake":
from dlt.destinations.impl.snowflake.snowflake import SnowflakeClient

sf_client = cast(SnowflakeClient, client)
credentials = sf_client.config.credentials.to_connector_params()
con = ibis.snowflake.connect(**credentials)
elif destination_type in ["dlt.destinations.mssql", "dlt.destinations.synapse"]:
from dlt.destinations.impl.mssql.mssql import MsSqlJobClient

mssql_client = cast(MsSqlJobClient, client)
con = ibis.mssql.connect(
host=mssql_client.config.credentials.host,
port=mssql_client.config.credentials.port,
database=mssql_client.config.credentials.database,
user=mssql_client.config.credentials.username,
password=mssql_client.config.credentials.password,
driver=mssql_client.config.credentials.driver,
)
elif destination_type == "dlt.destinations.bigquery":
from dlt.destinations.impl.bigquery.bigquery import BigQueryClient

bq_client = cast(BigQueryClient, client)
credentials = bq_client.config.credentials.to_native_credentials()
con = ibis.bigquery.connect(
credentials=credentials,
project_id=bq_client.sql_client.project_id,
location=bq_client.sql_client.location,
)
elif destination_type == "dlt.destinations.clickhouse":
from dlt.destinations.impl.clickhouse.clickhouse import ClickHouseClient

ch_client = cast(ClickHouseClient, client)
con = ibis.clickhouse.connect(
host=ch_client.config.credentials.host,
port=ch_client.config.credentials.http_port,
database=ch_client.config.credentials.database,
user=ch_client.config.credentials.username,
password=ch_client.config.credentials.password,
secure=bool(ch_client.config.credentials.secure),
# compression=True,
)
elif destination_type == "dlt.destinations.filesystem":
import duckdb
from dlt.destinations.impl.filesystem.sql_client import (
FilesystemClient,
FilesystemSqlClient,
)
from dlt.destinations.impl.duckdb.factory import DuckDbCredentials

# we create an in memory duckdb and create all tables on there
duck = duckdb.connect(":memory:")
fs_client = cast(FilesystemClient, client)
creds = DuckDbCredentials(duck)
sql_client = FilesystemSqlClient(
fs_client, dataset_name=fs_client.dataset_name, credentials=creds
)

# NOTE: we should probably have the option for the user to only select a subset of tables here
with sql_client as _:
sql_client.create_views_for_all_tables()
con = ibis.duckdb.from_connection(duck)

return con
119 changes: 110 additions & 9 deletions dlt/destinations/dataset.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
from typing import Any, Generator, Optional, Sequence, Union, List
from typing import Any, Generator, Sequence, Union, TYPE_CHECKING, Tuple

from contextlib import contextmanager

from dlt import version

from dlt.common.json import json
from copy import deepcopy

from dlt.common.normalizers.naming.naming import NamingConvention
from dlt.common.exceptions import MissingDependencyException

from contextlib import contextmanager
from dlt.common.destination import AnyDestination
from dlt.common.destination.reference import (
SupportsReadableRelation,
SupportsReadableDataset,
Expand All @@ -14,13 +19,24 @@
JobClientBase,
WithStateSync,
DestinationClientDwhConfiguration,
DestinationClientStagingConfiguration,
DestinationClientConfiguration,
DestinationClientDwhWithStagingConfiguration,
)

from dlt.common.schema.typing import TTableSchemaColumns
from dlt.destinations.sql_client import SqlClientBase, WithSqlClient
from dlt.common.schema import Schema
from dlt.common.exceptions import DltException

if TYPE_CHECKING:
try:
from dlt.common.libs.ibis import BaseBackend as IbisBackend
except MissingDependencyException:
IbisBackend = Any
else:
IbisBackend = Any


class DatasetException(DltException):
pass
Expand Down Expand Up @@ -228,6 +244,16 @@ def __init__(
self._sql_client: SqlClientBase[Any] = None
self._schema: Schema = None

def ibis(self) -> IbisBackend:
"""return a connected ibis backend"""
from dlt.common.libs.ibis import create_ibis_backend

self._ensure_client_and_schema()
return create_ibis_backend(
self._destination,
self._destination_client(self.schema),
)

@property
def schema(self) -> Schema:
self._ensure_client_and_schema()
Expand All @@ -239,12 +265,9 @@ def sql_client(self) -> SqlClientBase[Any]:
return self._sql_client

def _destination_client(self, schema: Schema) -> JobClientBase:
client_spec = self._destination.spec()
if isinstance(client_spec, DestinationClientDwhConfiguration):
client_spec._bind_dataset_name(
dataset_name=self._dataset_name, default_schema_name=schema.name
)
return self._destination.client(schema, client_spec)
return get_destination_clients(
schema, destination=self._destination, destination_dataset_name=self._dataset_name
)[0]

def _ensure_client_and_schema(self) -> None:
"""Lazy load schema and client"""
Expand Down Expand Up @@ -313,3 +336,81 @@ def dataset(
if dataset_type == "dbapi":
return ReadableDBAPIDataset(destination, dataset_name, schema)
raise NotImplementedError(f"Dataset of type {dataset_type} not implemented")


# helpers
def get_destination_client_initial_config(
destination: AnyDestination,
default_schema_name: str,
dataset_name: str,
as_staging: bool = False,
) -> DestinationClientConfiguration:
client_spec = destination.spec

# this client supports many schemas and datasets
if issubclass(client_spec, DestinationClientDwhConfiguration):
if issubclass(client_spec, DestinationClientStagingConfiguration):
spec: DestinationClientDwhConfiguration = client_spec(as_staging_destination=as_staging)
else:
spec = client_spec()

spec._bind_dataset_name(dataset_name, default_schema_name)
return spec

return client_spec()


def get_destination_clients(
schema: Schema,
destination: AnyDestination = None,
destination_dataset_name: str = None,
destination_initial_config: DestinationClientConfiguration = None,
staging: AnyDestination = None,
staging_dataset_name: str = None,
staging_initial_config: DestinationClientConfiguration = None,
# pipeline specific settings
default_schema_name: str = None,
) -> Tuple[JobClientBase, JobClientBase]:
destination = Destination.from_reference(destination) if destination else None
staging = Destination.from_reference(staging) if staging else None

try:
# resolve staging config in order to pass it to destination client config
staging_client = None
if staging:
if not staging_initial_config:
# this is just initial config - without user configuration injected
staging_initial_config = get_destination_client_initial_config(
staging,
dataset_name=staging_dataset_name,
default_schema_name=default_schema_name,
as_staging=True,
)
# create the client - that will also resolve the config
staging_client = staging.client(schema, staging_initial_config)

if not destination_initial_config:
# config is not provided then get it with injected credentials
initial_config = get_destination_client_initial_config(
destination,
dataset_name=destination_dataset_name,
default_schema_name=default_schema_name,
)

# attach the staging client config to destination client config - if its type supports it
if (
staging_client
and isinstance(initial_config, DestinationClientDwhWithStagingConfiguration)
and isinstance(staging_client.config, DestinationClientStagingConfiguration)
):
initial_config.staging_config = staging_client.config
# create instance with initial_config properly set
client = destination.client(schema, initial_config)
return client, staging_client
except ModuleNotFoundError:
client_spec = destination.spec()
raise MissingDependencyException(
f"{client_spec.destination_type} destination",
[f"{version.DLT_PKG_NAME}[{client_spec.destination_type}]"],
"Dependencies for specific destinations are available as extras of dlt",
)
6 changes: 2 additions & 4 deletions dlt/destinations/impl/duckdb/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,8 @@ def iter_arrow(self, chunk_size: int) -> Generator[ArrowTable, None, None]:
yield self.native_cursor.fetch_arrow_table()
return
# iterate
try:
yield from self.native_cursor.fetch_record_batch(chunk_size)
except StopIteration:
pass
for item in self.native_cursor.fetch_record_batch(chunk_size):
yield ArrowTable.from_batches([item])


class DuckDbSqlClient(SqlClientBase[duckdb.DuckDBPyConnection], DBTransaction):
Expand Down
13 changes: 8 additions & 5 deletions dlt/destinations/impl/filesystem/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,6 @@ def create_authentication(self, persistent: bool = False, secret_name: str = Non
elif self.fs_client.config.protocol == "memory":
self._conn.register_filesystem(self.fs_client.fs_client)

# the line below solves problems with certificate path lookup on linux
# see duckdb docs
if self.fs_client.config.protocol in ["az", "abfss"]:
self._conn.sql("SET azure_transport_option_type = 'curl';")

def open_connection(self) -> duckdb.DuckDBPyConnection:
# we keep the in memory instance around, so if this prop is set, return it
first_connection = self.credentials.has_open_connection
Expand All @@ -195,8 +190,16 @@ def open_connection(self) -> duckdb.DuckDBPyConnection:
self._conn.sql(f"USE {self.fully_qualified_dataset_name()}")
self.create_authentication()

# the line below solves problems with certificate path lookup on linux
# see duckdb docs
if self.fs_client.config.protocol in ["az", "abfss"]:
self._conn.sql("SET azure_transport_option_type = 'curl';")

return self._conn

def create_views_for_all_tables(self) -> None:
self.create_views_for_tables({v: v for v in self.fs_client.schema.tables.keys()})

@raise_database_error
def create_views_for_tables(self, tables: Dict[str, str]) -> None:
"""Add the required tables as views to the duckdb in memory instance"""
Expand Down
Loading

0 comments on commit dfde071

Please sign in to comment.