Skip to content

Commit

Permalink
feat(backends): support creation from a DB-API con (#9603)
Browse files Browse the repository at this point in the history
Co-authored-by: Phillip Cloud <[email protected]>
  • Loading branch information
deepyaman and cpcloud authored Jul 22, 2024
1 parent 00a776e commit fc4d1e3
Show file tree
Hide file tree
Showing 21 changed files with 431 additions and 29 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ibis-backends.yml
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ jobs:
extras:
- sqlite
- name: datafusion
title: Datafusion
title: DataFusion
extras:
- datafusion
- name: polars
Expand Down
9 changes: 6 additions & 3 deletions gen_redirects.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
"/backends/{version}/BigQuery/": "/backends/bigquery/",
"/backends/{version}/Clickhouse/": "/backends/clickhouse/",
"/backends/{version}/Dask/": "/backends/dask/",
"/backends/{version}/Datafusion/": "/backends/datafusion/",
"/backends/{version}/DataFusion/": "/backends/datafusion/",
"/backends/{version}/Datafusion/": "/backends/datafusion/", # For backwards compatibility
"/backends/{version}/Druid/": "/backends/druid/",
"/backends/{version}/DuckDB/": "/backends/duckdb/",
"/backends/{version}/Impala/": "/backends/impala/",
Expand All @@ -30,7 +31,8 @@
"/docs/{version}/backends/BigQuery/": "/backends/bigquery/",
"/docs/{version}/backends/Clickhouse/": "/backends/clickhouse/",
"/docs/{version}/backends/Dask/": "/backends/dask/",
"/docs/{version}/backends/Datafusion/": "/backends/datafusion/",
"/docs/{version}/backends/DataFusion/": "/backends/datafusion/",
"/docs/{version}/backends/Datafusion/": "/backends/datafusion/", # For backwards compatibility
"/docs/{version}/backends/Druid/": "/backends/druid/",
"/docs/{version}/backends/DuckDB/": "/backends/duckdb/",
"/docs/{version}/backends/Impala/": "/backends/impala/",
Expand Down Expand Up @@ -73,7 +75,8 @@
"/backends/BigQuery/": "/backends/bigquery/",
"/backends/Clickhouse/": "/backends/clickhouse/",
"/backends/Dask/": "/backends/dask/",
"/backends/Datafusion/": "/backends/datafusion/",
"/backends/DataFusion/": "/backends/datafusion/",
"/backends/Datafusion/": "/backends/datafusion/", # For backwards compatibility
"/backends/Druid/": "/backends/druid/",
"/backends/DuckDB/": "/backends/duckdb/",
"/backends/Impala/": "/backends/impala/",
Expand Down
6 changes: 5 additions & 1 deletion ibis/backends/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -767,6 +767,7 @@ class BaseBackend(abc.ABC, _FileIOHandler):
def __init__(self, *args, **kwargs):
self._con_args: tuple[Any] = args
self._con_kwargs: dict[str, Any] = kwargs
self._can_reconnect: bool = True
# expression cache
self._query_cache = RefCountedCache(
populate=self._load_into_cache,
Expand Down Expand Up @@ -856,7 +857,10 @@ def _convert_kwargs(kwargs: MutableMapping) -> None:
# TODO(kszucs): should call self.connect(*self._con_args, **self._con_kwargs)
def reconnect(self) -> None:
"""Reconnect to the database already configured with connect."""
self.do_connect(*self._con_args, **self._con_kwargs)
if self._can_reconnect:
self.do_connect(*self._con_args, **self._con_kwargs)
else:
raise exc.IbisError("Cannot reconnect to unconfigured {self.name} backend")

def do_connect(self, *args, **kwargs) -> None:
"""Connect to database specified by `args` and `kwargs`."""
Expand Down
31 changes: 31 additions & 0 deletions ibis/backends/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,37 @@ def do_connect(

self.partition_column = partition_column

@util.experimental
@classmethod
def from_connection(
cls,
client: bq.Client,
partition_column: str | None = "PARTITIONTIME",
storage_client: bqstorage.BigQueryReadClient | None = None,
dataset_id: str = "",
) -> Backend:
"""Create a BigQuery `Backend` from an existing ``Client``.
Parameters
----------
client
A `Client` from the `google.cloud.bigquery` package.
partition_column
Identifier to use instead of default `_PARTITIONTIME` partition
column. Defaults to `'PARTITIONTIME'`.
storage_client
A `BigQueryReadClient` from the `google.cloud.bigquery_storage_v1`
package.
dataset_id
A dataset id that lives inside of the project attached to `client`.
"""
return ibis.bigquery.connect(
client=client,
partition_column=partition_column,
storage_client=storage_client,
dataset_id=dataset_id,
)

def disconnect(self) -> None:
self.client.close()

Expand Down
15 changes: 15 additions & 0 deletions ibis/backends/clickhouse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,21 @@ def do_connect(
**kwargs,
)

@util.experimental
@classmethod
def from_connection(cls, con: cc.driver.Client) -> Backend:
"""Create an Ibis client from an existing ClickHouse Connect Client instance.
Parameters
----------
con
An existing ClickHouse Connect Client instance.
"""
new_backend = cls()
new_backend._can_reconnect = False
new_backend.con = con
return new_backend

@property
def version(self) -> str:
return self.con.server_version
Expand Down
29 changes: 22 additions & 7 deletions ibis/backends/datafusion/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
import sqlglot as sg
import sqlglot.expressions as sge

import ibis
import ibis.common.exceptions as com
import ibis.expr.datatypes as dt
import ibis.expr.operations as ops
import ibis.expr.schema as sch
import ibis.expr.types as ir
from ibis import util
from ibis.backends import CanCreateCatalog, CanCreateDatabase, CanCreateSchema, NoUrl
from ibis.backends.sql import SQLBackend
from ibis.backends.sql.compilers import DataFusionCompiler
Expand Down Expand Up @@ -77,12 +79,13 @@ def version(self):
def do_connect(
self, config: Mapping[str, str | Path] | SessionContext | None = None
) -> None:
"""Create a Datafusion backend for use with Ibis.
"""Create a DataFusion `Backend` for use with Ibis.
Parameters
----------
config
Mapping of table names to files.
Mapping of table names to files or a `SessionContext`
instance.
Examples
--------
Expand Down Expand Up @@ -112,6 +115,18 @@ def do_connect(
for name, path in config.items():
self.register(path, table_name=name)

@util.experimental
@classmethod
def from_connection(cls, con: SessionContext) -> Backend:
"""Create a DataFusion `Backend` from an existing `SessionContext` instance.
Parameters
----------
con
A `SessionContext` instance.
"""
return ibis.datafusion.connect(con)

def disconnect(self) -> None:
pass

Expand Down Expand Up @@ -329,7 +344,7 @@ def register(
table_name
The name of the table
kwargs
Datafusion-specific keyword arguments
DataFusion-specific keyword arguments
Examples
--------
Expand Down Expand Up @@ -423,7 +438,7 @@ def read_csv(
An optional name to use for the created table. This defaults to
a sequentially generated name.
**kwargs
Additional keyword arguments passed to Datafusion loading function.
Additional keyword arguments passed to DataFusion loading function.
Returns
-------
Expand Down Expand Up @@ -451,7 +466,7 @@ def read_parquet(
An optional name to use for the created table. This defaults to
a sequentially generated name.
**kwargs
Additional keyword arguments passed to Datafusion loading function.
Additional keyword arguments passed to DataFusion loading function.
Returns
-------
Expand Down Expand Up @@ -576,7 +591,7 @@ def create_table(
temp: bool = False,
overwrite: bool = False,
):
"""Create a table in Datafusion.
"""Create a table in DataFusion.
Parameters
----------
Expand Down Expand Up @@ -697,7 +712,7 @@ def truncate_table(
def _create_and_drop_memtable(_conn, table_name, tmp_name, overwrite):
"""Workaround inability to overwrite tables in dataframe API.
Datafusion has helper methods for loading in-memory data, but these methods
DataFusion has helper methods for loading in-memory data, but these methods
don't allow overwriting tables.
The SQL interface allows creating tables from existing tables, so we register
the data as a table using the dataframe API, then run a
Expand Down
16 changes: 16 additions & 0 deletions ibis/backends/druid/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import ibis.expr.datatypes as dt
import ibis.expr.schema as sch
from ibis import util
from ibis.backends.sql import SQLBackend
from ibis.backends.sql.compilers import DruidCompiler
from ibis.backends.sql.compilers.base import STAR
Expand Down Expand Up @@ -81,6 +82,21 @@ def do_connect(self, **kwargs: Any) -> None:
header = kwargs.pop("header", True)
self.con = pydruid.db.connect(**kwargs, header=header)

@util.experimental
@classmethod
def from_connection(cls, con: pydruid.db.api.Connection) -> Backend:
"""Create an Ibis client from an existing connection to a Druid database.
Parameters
----------
con
An existing connection to a Druid database.
"""
new_backend = cls()
new_backend._can_reconnect = False
new_backend.con = con
return new_backend

@contextlib.contextmanager
def _safe_raw_sql(self, query, *args, **kwargs):
with contextlib.suppress(AttributeError):
Expand Down
25 changes: 25 additions & 0 deletions ibis/backends/duckdb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,31 @@ def do_connect(

self.con = duckdb.connect(str(database), config=config, read_only=read_only)

self._post_connect(extensions)

@util.experimental
@classmethod
def from_connection(
cls,
con: duckdb.DuckDBPyConnection,
extensions: Sequence[str] | None = None,
) -> Backend:
"""Create an Ibis client from an existing connection to a DuckDB database.
Parameters
----------
con
An existing connection to a DuckDB database.
extensions
A list of duckdb extensions to install/load upon connection.
"""
new_backend = cls(extensions=extensions)
new_backend._can_reconnect = False
new_backend.con = con
new_backend._post_connect(extensions)
return new_backend

def _post_connect(self, extensions: Sequence[str] | None = None) -> None:
# Load any pre-specified extensions
if extensions is not None:
self._load_extensions(extensions)
Expand Down
26 changes: 26 additions & 0 deletions ibis/backends/exasol/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,32 @@ def do_connect(
quote_ident=True,
**kwargs,
)
self._post_connect(timezone)

@util.experimental
@classmethod
def from_connection(
cls, con: pyexasol.ExaConnection, timezone: str | None = None
) -> Backend:
"""Create an Ibis client from an existing connection to an Exasol database.
Parameters
----------
con
An existing connection to an Exasol database.
timezone
The session timezone.
"""
if timezone is None:
timezone = (con.execute("SELECT SESSIONTIMEZONE").fetchone() or ("UTC",))[0]

new_backend = cls(timezone=timezone)
new_backend._can_reconnect = False
new_backend.con = con
new_backend._post_connect(timezone)
return new_backend

def _post_connect(self, timezone: str = "UTC") -> None:
with self.begin() as con:
con.execute(f"ALTER SESSION SET TIME_ZONE = {timezone!r}")

Expand Down
13 changes: 13 additions & 0 deletions ibis/backends/flink/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import ibis.expr.operations as ops
import ibis.expr.schema as sch
import ibis.expr.types as ir
from ibis import util
from ibis.backends import CanCreateDatabase, NoUrl
from ibis.backends.flink.ddl import (
CreateDatabase,
Expand Down Expand Up @@ -71,6 +72,18 @@ def do_connect(self, table_env: TableEnvironment) -> None:
"""
self._table_env = table_env

@util.experimental
@classmethod
def from_connection(cls, table_env: TableEnvironment) -> Backend:
"""Create a Flink `Backend` from an existing table environment.
Parameters
----------
table_env
A table environment.
"""
return ibis.flink.connect(table_env)

def disconnect(self) -> None:
pass

Expand Down
20 changes: 20 additions & 0 deletions ibis/backends/impala/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
from pathlib import Path
from urllib.parse import ParseResult

import impala.hiveserver2 as hs2
import pandas as pd
import polars as pl
import pyarrow as pa
Expand Down Expand Up @@ -183,6 +184,25 @@ def do_connect(
cur.ping()

self.con = con
self._post_connect()

@util.experimental
@classmethod
def from_connection(cls, con: hs2.HiveServer2Connection) -> Backend:
"""Create an Impala `Backend` from an existing HS2 connection.
Parameters
----------
con
An existing connection to HiveServer2 (HS2).
"""
new_backend = cls()
new_backend._can_reconnect = False
new_backend.con = con
new_backend._post_connect()
return new_backend

def _post_connect(self) -> None:
self.options = {}

@cached_property
Expand Down
Loading

0 comments on commit fc4d1e3

Please sign in to comment.