Skip to content

Commit

Permalink
add native db api cursor fetching to exposed dataset
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Aug 7, 2024
1 parent 46e0226 commit 7cf69a7
Show file tree
Hide file tree
Showing 8 changed files with 191 additions and 129 deletions.
44 changes: 41 additions & 3 deletions dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
Generic,
Generator,
TYPE_CHECKING,
Protocol,
Tuple,
)
from typing_extensions import Annotated
import datetime # noqa: 251
Expand Down Expand Up @@ -568,8 +570,44 @@ def should_truncate_table_before_load_on_staging_destination(self, table: TTable
return True


class SupportsDataAccess(ABC):
"""Add support for accessing data as arrow tables or pandas dataframes"""
class SupportsDataAccess(Protocol):
"""Add support accessing data items"""

def df(self, chunk_size: int = None, **kwargs: None) -> Optional[DataFrame]:
"""Fetches the results as data frame. For large queries the results may be chunked
Fetches the results into a data frame. The default implementation uses helpers in `pandas.io.sql` to generate Pandas data frame.
This function will try to use native data frame generation for particular destination. For `BigQuery`: `QueryJob.to_dataframe` is used.
For `duckdb`: `DuckDBPyConnection.df'
Args:
chunk_size (int, optional): Will chunk the results into several data frames. Defaults to None
**kwargs (Any): Additional parameters which will be passed to native data frame generation function.
Returns:
Optional[DataFrame]: A data frame with query results. If chunk_size > 0, None will be returned if there is no more data in results
"""
...

def arrow(self, *, chunk_size: int = None) -> Optional[ArrowTable]: ...

def iter_df(self, chunk_size: int = 1000) -> Generator[DataFrame, None, None]: ...

def iter_arrow(self, chunk_size: int = 1000) -> Generator[ArrowTable, None, None]: ...

def fetchall(self) -> List[Tuple[Any, ...]]: ...

def fetchmany(self, chunk_size: int = ...) -> List[Tuple[Any, ...]]: ...

def iter_fetchmany(
self, chunk_size: int = ...
) -> Generator[List[Tuple[Any, ...]], Any, Any]: ...

def fetchone(self) -> Optional[Tuple[Any, ...]]: ...


class SupportsRelationshipAccess(ABC):
"""Add support for accessing a cursor for a given relationship or query"""

@abstractmethod
def cursor_for_relation(
Expand All @@ -578,7 +616,7 @@ def cursor_for_relation(
table: str = None,
sql: str = None,
prepare_tables: List[str] = None,
) -> ContextManager[Any]: ...
) -> ContextManager[SupportsDataAccess]: ...


# TODO: type Destination properly
Expand Down
58 changes: 35 additions & 23 deletions dlt/dataset.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
from typing import cast, Any, TYPE_CHECKING, Generator, List, ContextManager
from typing import cast, Any, TYPE_CHECKING, Generator, List, Tuple, Optional

from contextlib import contextmanager


from dlt.common.destination.reference import SupportsDataAccess
from dlt.common.destination.reference import SupportsRelationshipAccess, SupportsDataAccess

from dlt.common.typing import DataFrame, ArrowTable

Expand All @@ -21,7 +20,7 @@ def __init__(
self.table = table

@contextmanager
def _client(self) -> Generator[SupportsDataAccess, Any, Any]:
def _client(self) -> Generator[SupportsRelationshipAccess, Any, Any]:
from dlt.destinations.job_client_impl import SqlJobClientBase
from dlt.destinations.fs_client import FSClientBase

Expand All @@ -42,7 +41,8 @@ def _client(self) -> Generator[SupportsDataAccess, Any, Any]:
)

@contextmanager
def _cursor_for_relation(self) -> Generator[Any, Any, Any]:
def cursor(self) -> Generator[SupportsDataAccess, Any, Any]:
"""Gets a DBApiCursor for the current relation"""
with self._client() as client:
with client.cursor_for_relation(
sql=self.sql, table=self.table, prepare_tables=self.prepare_tables
Expand All @@ -52,51 +52,61 @@ def _cursor_for_relation(self) -> Generator[Any, Any, Any]:
def df(
self,
*,
chunk_size: int = 1000,
chunk_size: int = None,
) -> DataFrame:
"""Get first batch of table as dataframe"""
return next(
self.iter_df(
chunk_size=chunk_size,
)
)
with self.cursor() as cursor:
return cursor.df(chunk_size=chunk_size)

def arrow(
self,
*,
chunk_size: int = 1000,
chunk_size: int = None,
) -> ArrowTable:
"""Get first batch of table as arrow table"""
return next(
self.iter_arrow(
chunk_size=chunk_size,
)
)
with self.cursor() as cursor:
return cursor.arrow(chunk_size=chunk_size)

def iter_df(
self,
*,
chunk_size: int = 1000,
chunk_size: int = None,
) -> Generator[DataFrame, None, None]:
"""iterates over the whole table in dataframes of the given chunk_size, chunk_size of -1 will return the full table in the first batch"""
# if no table is given, take the bound table
with self._cursor_for_relation() as cursor:
with self.cursor() as cursor:
yield from cursor.iter_df(
chunk_size=chunk_size,
)

def iter_arrow(
self,
*,
chunk_size: int = 1000,
chunk_size: int = None,
) -> Generator[ArrowTable, None, None]:
"""iterates over the whole table in arrow tables of the given chunk_size, chunk_size of -1 will return the full table in the first batch"""
# if no table is given, take the bound table
with self._cursor_for_relation() as cursor:
with self.cursor() as cursor:
yield from cursor.iter_arrow(
chunk_size=chunk_size,
)

def fetchall(self) -> List[Tuple[Any, ...]]:
with self.cursor() as cursor:
return cursor.fetchall()

def fetchmany(self, chunk_size: int = None) -> List[Tuple[Any, ...]]:
with self.cursor() as cursor:
return cursor.fetchmany(chunk_size)

def iter_fetchmany(self, chunk_size: int = None) -> Generator[List[Tuple[Any, ...]], Any, Any]:
with self.cursor() as cursor:
yield from cursor.iter_fetchmany(
chunk_size=chunk_size,
)

def fetchone(self) -> Optional[Tuple[Any, ...]]:
with self.cursor() as cursor:
return cursor.fetchone()


class Dataset:
"""Access to dataframes and arrowtables in the destination dataset"""
Expand All @@ -110,7 +120,9 @@ def sql(self, sql: str, prepare_tables: List[str] = None) -> Relation:
return Relation(pipeline=self.pipeline, sql=sql, prepare_tables=prepare_tables)

def __getitem__(self, table: str) -> Relation:
"""access of table via dict notation"""
return Relation(pipeline=self.pipeline, table=table)

def __getattr__(self, table: str) -> Relation:
"""access of table via property notation"""
return Relation(pipeline=self.pipeline, table=table)
4 changes: 2 additions & 2 deletions dlt/destinations/fs_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
from fsspec import AbstractFileSystem

from dlt.common.typing import DuckDBPyConnection
from dlt.common.destination.reference import SupportsDataAccess
from dlt.common.destination.reference import SupportsRelationshipAccess


class FSClientBase(SupportsDataAccess, ABC):
class FSClientBase(SupportsRelationshipAccess, ABC):
fs_client: AbstractFileSystem

@property
Expand Down
2 changes: 1 addition & 1 deletion dlt/destinations/impl/databricks/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class DatabricksCursorImpl(DBApiCursorImpl):
"""Use native data frame support if available"""

native_cursor: DatabricksSqlCursor # type: ignore[assignment]
vector_size: ClassVar[int] = 2048
default_chunk_size: ClassVar[int] = 2048 # vector size is 2048

def df(self, chunk_size: int = None, **kwargs: Any) -> DataFrame:
if chunk_size is None:
Expand Down
26 changes: 13 additions & 13 deletions dlt/destinations/impl/duckdb/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,20 @@ class DuckDBDBApiCursorImpl(DBApiCursorImpl):
"""Use native duckdb data frame support if available"""

native_cursor: duckdb.DuckDBPyConnection # type: ignore
vector_size: ClassVar[int] = 2048
default_chunk_size: ClassVar[int] = 2048 # vector size is 2048

def df(self, chunk_size: int = None, **kwargs: Any) -> DataFrame:
if chunk_size is None:
return self.native_cursor.df(**kwargs)
else:
multiple = chunk_size // self.vector_size + (
0 if self.vector_size % chunk_size == 0 else 1
)
df = self.native_cursor.fetch_df_chunk(multiple, **kwargs)
if df.shape[0] == 0:
return None
else:
return df
# def df(self, chunk_size: int = None, **kwargs: Any) -> DataFrame:
# if chunk_size is None:
# return self.native_cursor.df(**kwargs)
# else:
# multiple = chunk_size // self.vector_size + (
# 0 if self.vector_size % chunk_size == 0 else 1
# )
# df = self.native_cursor.fetch_df_chunk(multiple, **kwargs)
# if df.shape[0] == 0:
# return None
# else:
# return df


class DuckDbSqlClient(SqlClientBase[duckdb.DuckDBPyConnection], DBTransaction):
Expand Down
51 changes: 32 additions & 19 deletions dlt/destinations/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
DBTransaction,
ArrowTable,
)
from dlt.common.destination.reference import SupportsDataAccess
from dlt.common.destination.reference import SupportsRelationshipAccess


from dlt.destinations.typing import DBApi, TNativeConn, DBApiCursor, DataFrame, DBTransaction
Expand All @@ -52,7 +52,7 @@ class TJobQueryTags(TypedDict):
pipeline_name: str


class SqlClientBase(SupportsDataAccess, ABC, Generic[TNativeConn]):
class SqlClientBase(SupportsRelationshipAccess, ABC, Generic[TNativeConn]):
dbapi: ClassVar[DBApi] = None

database_name: Optional[str]
Expand Down Expand Up @@ -300,6 +300,9 @@ def cursor_for_relation(
class DBApiCursorImpl(DBApiCursor):
"""A DBApi Cursor wrapper with dataframes reading functionality"""

# default size of an iter chunk if none is given
default_chunk_size: ClassVar[int] = 1000

def __init__(self, curr: DBApiCursor) -> None:
self.native_cursor = curr

Expand All @@ -321,32 +324,42 @@ def df(self, chunk_size: int = None, **kwargs: Any) -> Optional[DataFrame]:
May use native pandas/arrow reader if available. Depending on
the native implementation chunk size may vary.
"""
from dlt.common.libs.pandas_sql import _wrap_result
return next(self.iter_df(chunk_size=chunk_size))

columns = self._get_columns()
if chunk_size is None:
return _wrap_result(self.native_cursor.fetchall(), columns, **kwargs)
else:
df = _wrap_result(self.native_cursor.fetchmany(chunk_size), columns, **kwargs)
# if no rows return None
if df.shape[0] == 0:
return None
else:
return df

def iter_df(self, chunk_size: int = 1000) -> Generator[DataFrame, None, None]:
from dlt.common.libs.pandas_sql import _wrap_result
def arrow(self, chunk_size: int = None, **kwargs: Any) -> Optional[ArrowTable]:
"""Fetches results as data frame in full or in specified chunks.
# iterate over results in batch size chunks
columns = self._get_columns()
May use native pandas/arrow reader if available. Depending on
the native implementation chunk size may vary.
"""
return next(self.iter_arrow(chunk_size=chunk_size))

def iter_fetchmany(self, chunk_size: int = None) -> Generator[List[Tuple[Any, ...]], Any, Any]:
if not chunk_size:
chunk_size = self.default_chunk_size
while True:
if not (result := self.fetchmany(chunk_size)):
return
yield result

def iter_df(self, chunk_size: int = None) -> Generator[DataFrame, None, None]:
from dlt.common.libs.pandas_sql import _wrap_result

columns = self._get_columns()

# if no chunk size, fetch all
if not chunk_size:
yield _wrap_result(self.fetchall(), columns)
return

# otherwise iterate over results in batch size chunks
for result in self.iter_fetchmany(chunk_size=chunk_size):
yield _wrap_result(result, columns)

def iter_arrow(self, chunk_size: int = 1000) -> Generator[ArrowTable, None, None]:
def iter_arrow(self, chunk_size: int = None) -> Generator[ArrowTable, None, None]:
"""Default implementation converts df to arrow"""
for df in self.iter_df(chunk_size=chunk_size):
# TODO: is this efficient?
yield ArrowTable.from_pandas(df)


Expand Down
26 changes: 2 additions & 24 deletions dlt/destinations/typing.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Any, AnyStr, List, Type, Optional, Protocol, Tuple, TypeVar, Generator

from dlt.common.typing import DataFrame, ArrowTable
from dlt.common.destination.reference import SupportsDataAccess

# native connection
TNativeConn = TypeVar("TNativeConn", bound=Any)
Expand All @@ -18,7 +19,7 @@ class DBApi(Protocol):
paramstyle: str


class DBApiCursor(Protocol):
class DBApiCursor(SupportsDataAccess):
"""Protocol for DBAPI cursor"""

description: Tuple[Any, ...]
Expand All @@ -27,27 +28,4 @@ class DBApiCursor(Protocol):
"""Cursor implementation native to current destination"""

def execute(self, query: AnyStr, *args: Any, **kwargs: Any) -> None: ...
def fetchall(self) -> List[Tuple[Any, ...]]: ...
def fetchmany(self, size: int = ...) -> List[Tuple[Any, ...]]: ...
def fetchone(self) -> Optional[Tuple[Any, ...]]: ...
def close(self) -> None: ...

def df(self, chunk_size: int = None, **kwargs: None) -> Optional[DataFrame]:
"""Fetches the results as data frame. For large queries the results may be chunked
Fetches the results into a data frame. The default implementation uses helpers in `pandas.io.sql` to generate Pandas data frame.
This function will try to use native data frame generation for particular destination. For `BigQuery`: `QueryJob.to_dataframe` is used.
For `duckdb`: `DuckDBPyConnection.df'
Args:
chunk_size (int, optional): Will chunk the results into several data frames. Defaults to None
**kwargs (Any): Additional parameters which will be passed to native data frame generation function.
Returns:
Optional[DataFrame]: A data frame with query results. If chunk_size > 0, None will be returned if there is no more data in results
"""
...

def iter_df(self, chunk_size: int = 1000) -> Generator[DataFrame, None, None]: ...

def iter_arrow(self, chunk_size: int = 1000) -> Generator[ArrowTable, None, None]: ...
Loading

0 comments on commit 7cf69a7

Please sign in to comment.