Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
move dataset creation into destination client and clean up interfaces…
Browse files Browse the repository at this point in the history
… / reference a bit more
sh-rp committed Aug 8, 2024
1 parent 28ee1c6 commit 28cb282
Showing 6 changed files with 89 additions and 95 deletions.
91 changes: 47 additions & 44 deletions dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
@@ -408,6 +408,50 @@ def create_followup_jobs(self, final_state: TLoadJobState) -> List[FollowupJob]:
return []


class SupportsReadRelation(Protocol):
"""Add support accessing data items on a relation"""

def df(self, chunk_size: int = None) -> Optional[DataFrame]:
"""Fetches the results as data frame. For large queries the results may be chunked
Fetches the results into a data frame. The default implementation uses helpers in `pandas.io.sql` to generate Pandas data frame.
This function will try to use native data frame generation for particular destination. For `BigQuery`: `QueryJob.to_dataframe` is used.
For `duckdb`: `DuckDBPyConnection.df'
Args:
chunk_size (int, optional): Will chunk the results into several data frames. Defaults to None
**kwargs (Any): Additional parameters which will be passed to native data frame generation function.
Returns:
Optional[DataFrame]: A data frame with query results. If chunk_size > 0, None will be returned if there is no more data in results
"""
...

def arrow(self, chunk_size: int = None) -> Optional[ArrowTable]: ...

def iter_df(self, chunk_size: int) -> Generator[DataFrame, None, None]: ...

def iter_arrow(self, chunk_size: int) -> Generator[ArrowTable, None, None]: ...

def fetchall(self) -> List[Tuple[Any, ...]]: ...

def fetchmany(self, chunk_size: int) -> List[Tuple[Any, ...]]: ...

def iter_fetchmany(self, chunk_size: int) -> Generator[List[Tuple[Any, ...]], Any, Any]: ...

def fetchone(self) -> Optional[Tuple[Any, ...]]: ...


class SupportsReadDataset(Protocol):
"""Add support for read access on a dataset"""

def sql(self, sql: str, prepare_tables: List[str] = None) -> SupportsReadRelation: ...

def __getitem__(self, table: str) -> SupportsReadRelation: ...

def __getattr__(self, table: str) -> SupportsReadRelation: ...


class JobClientBase(ABC):
def __init__(
self,
@@ -526,6 +570,9 @@ def prepare_load_table(
except KeyError:
raise UnknownTableException(self.schema.name, table_name)

def dataset(self) -> SupportsReadDataset:
raise Exception("Destination does not support SupportsReadDataset")


class WithStateSync(ABC):
@abstractmethod
@@ -570,50 +617,6 @@ def should_truncate_table_before_load_on_staging_destination(self, table: TTable
return True


class SupportsReadRelation(Protocol):
"""Add support accessing data items on a relation"""

def df(self, chunk_size: int = None) -> Optional[DataFrame]:
"""Fetches the results as data frame. For large queries the results may be chunked
Fetches the results into a data frame. The default implementation uses helpers in `pandas.io.sql` to generate Pandas data frame.
This function will try to use native data frame generation for particular destination. For `BigQuery`: `QueryJob.to_dataframe` is used.
For `duckdb`: `DuckDBPyConnection.df'
Args:
chunk_size (int, optional): Will chunk the results into several data frames. Defaults to None
**kwargs (Any): Additional parameters which will be passed to native data frame generation function.
Returns:
Optional[DataFrame]: A data frame with query results. If chunk_size > 0, None will be returned if there is no more data in results
"""
...

def arrow(self, chunk_size: int = None) -> Optional[ArrowTable]: ...

def iter_df(self, chunk_size: int) -> Generator[DataFrame, None, None]: ...

def iter_arrow(self, chunk_size: int) -> Generator[ArrowTable, None, None]: ...

def fetchall(self) -> List[Tuple[Any, ...]]: ...

def fetchmany(self, chunk_size: int) -> List[Tuple[Any, ...]]: ...

def iter_fetchmany(self, chunk_size: int) -> Generator[List[Tuple[Any, ...]], Any, Any]: ...

def fetchone(self) -> Optional[Tuple[Any, ...]]: ...


class SupportsReadDataset(Protocol):
"""Add support for read access on a dataset"""

def sql(self, sql: str, prepare_tables: List[str] = None) -> SupportsReadRelation: ...

def __getitem__(self, table: str) -> SupportsReadRelation: ...

def __getattr__(self, table: str) -> SupportsReadRelation: ...


class SupportsRelationshipAccess(ABC):
"""Add support for accessing a cursor for a given relationship or query"""

47 changes: 13 additions & 34 deletions dlt/destinations/dataset.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
from typing import Any, Generator, List, Tuple, Optional

from contextlib import contextmanager
from dlt.destinations.job_client_impl import SqlJobClientBase
from dlt.destinations.fs_client import FSClientBase

from dlt.common.destination.reference import (
SupportsRelationshipAccess,
SupportsReadRelation,
JobClientBase,
SupportsReadDataset,
)

@@ -18,41 +14,24 @@ class Relation(SupportsReadRelation):
def __init__(
self,
*,
job_client: JobClientBase,
client: SupportsRelationshipAccess,
table: str = None,
sql: str = None,
prepare_tables: List[str] = None,
) -> None:
"""Create a lazy evaluated relation to for the dataset of a destination"""
self.job_client = job_client
self.client = client
self.prepare_tables = prepare_tables
self.sql = sql
self.table = table

@contextmanager
def _client(self) -> Generator[SupportsRelationshipAccess, Any, Any]:
if isinstance(self.job_client, SqlJobClientBase):
with self.job_client.sql_client as sql_client:
yield sql_client
return

if isinstance(self.job_client, FSClientBase):
yield self.job_client
return

raise Exception(
f"Destination {self.job_client.config.destination_type} does not support data access"
" via dataset."
)

@contextmanager
def cursor(self) -> Generator[SupportsReadRelation, Any, Any]:
"""Gets a DBApiCursor for the current relation"""
with self._client() as client:
with client.cursor_for_relation(
sql=self.sql, table=self.table, prepare_tables=self.prepare_tables
) as cursor:
yield cursor
with self.client.cursor_for_relation(
sql=self.sql, table=self.table, prepare_tables=self.prepare_tables
) as cursor:
yield cursor

def df(
self,
@@ -74,7 +53,7 @@ def iter_df(
self,
chunk_size: int,
) -> Generator[DataFrame, None, None]:
"""iterates over the whole table in dataframes of the given chunk_size, chunk_size of -1 will return the full table in the first batch"""
"""iterates over the whole table in dataframes of the given chunk_size"""
with self.cursor() as cursor:
yield from cursor.iter_df(
chunk_size=chunk_size,
@@ -84,7 +63,7 @@ def iter_arrow(
self,
chunk_size: int,
) -> Generator[ArrowTable, None, None]:
"""iterates over the whole table in arrow tables of the given chunk_size, chunk_size of -1 will return the full table in the first batch"""
"""iterates over the whole table in arrow tables of the given chunk_size"""
with self.cursor() as cursor:
yield from cursor.iter_arrow(
chunk_size=chunk_size,
@@ -112,16 +91,16 @@ def fetchone(self) -> Optional[Tuple[Any, ...]]:
class Dataset(SupportsReadDataset):
"""Access to dataframes and arrowtables in the destination dataset"""

def __init__(self, job_client: JobClientBase) -> None:
self.job_client = job_client
def __init__(self, client: SupportsRelationshipAccess) -> None:
self.client = client

def sql(self, sql: str, prepare_tables: List[str] = None) -> SupportsReadRelation:
return Relation(job_client=self.job_client, sql=sql, prepare_tables=prepare_tables)
return Relation(client=self.client, sql=sql, prepare_tables=prepare_tables)

def __getitem__(self, table: str) -> SupportsReadRelation:
"""access of table via dict notation"""
return Relation(job_client=self.job_client, table=table)
return Relation(client=self.client, table=table)

def __getattr__(self, table: str) -> SupportsReadRelation:
"""access of table via property notation"""
return Relation(job_client=self.job_client, table=table)
return Relation(client=self.client, table=table)
7 changes: 6 additions & 1 deletion dlt/destinations/impl/filesystem/filesystem.py
Original file line number Diff line number Diff line change
@@ -45,16 +45,18 @@
StorageSchemaInfo,
StateInfo,
LoadJob,
SupportsReadDataset,
)
from dlt.common.destination.exceptions import DestinationUndefinedEntity
from dlt.common.typing import DataFrame, ArrowTable, DuckDBPyConnection
from dlt.common.typing import DuckDBPyConnection
from dlt.destinations.job_impl import (
ReferenceFollowupJob,
FinalizedLoadJob,
)
from dlt.destinations.impl.filesystem.configuration import FilesystemDestinationClientConfiguration
from dlt.destinations import path_utils
from dlt.destinations.fs_client import FSClientBase
from dlt.destinations.dataset import Dataset

INIT_FILE_NAME = "init"
FILENAME_SEPARATOR = "__"
@@ -674,3 +676,6 @@ def cursor_for_relation(
# we can use the implementation of the duckdb cursor here
db.execute(sql)
yield DuckDBDBApiCursorImpl(db) # type: ignore

def dataset(self) -> SupportsReadDataset:
return Dataset(self)
22 changes: 21 additions & 1 deletion dlt/destinations/job_client_impl.py
Original file line number Diff line number Diff line change
@@ -14,9 +14,11 @@
Type,
Iterable,
Iterator,
Generator,
)
import zlib
import re
from contextlib import contextmanager

from dlt.common import pendulum, logger
from dlt.common.json import json
@@ -39,6 +41,7 @@
from dlt.common.destination.reference import (
StateInfo,
StorageSchemaInfo,
SupportsReadDataset,
WithStateSync,
DestinationClientConfiguration,
DestinationClientDwhConfiguration,
@@ -49,7 +52,10 @@
JobClientBase,
HasFollowupJobs,
CredentialsConfiguration,
SupportsRelationshipAccess,
SupportsReadRelation,
)
from dlt.destinations.dataset import Dataset

from dlt.destinations.exceptions import DatabaseUndefinedRelation
from dlt.destinations.job_impl import (
@@ -121,7 +127,7 @@ def __init__(
self._bucket_path = ReferenceFollowupJob.resolve_reference(file_path)


class SqlJobClientBase(JobClientBase, WithStateSync):
class SqlJobClientBase(SupportsRelationshipAccess, JobClientBase, WithStateSync):
INFO_TABLES_QUERY_THRESHOLD: ClassVar[int] = 1000
"""Fallback to querying all tables in the information schema if checking more than threshold"""

@@ -678,6 +684,20 @@ def _set_query_tags_for_job(self, load_id: str, table: TTableSchema) -> None:
}
)

@contextmanager
def cursor_for_relation(
self, *, table: str = None, sql: str = None, prepare_tables: List[str] = None
) -> Generator[SupportsReadRelation, Any, Any]:
with self.sql_client as sql_client:
if not sql:
table = sql_client.make_qualified_table_name(table)
sql = f"SELECT * FROM {table}"
with sql_client.execute_query(sql) as cursor:
yield cursor

def dataset(self) -> SupportsReadDataset:
return Dataset(self)


class SqlJobClientWithStaging(SqlJobClientBase, WithStagingDataset):
in_staging_mode: bool = False
13 changes: 1 addition & 12 deletions dlt/destinations/sql_client.py
Original file line number Diff line number Diff line change
@@ -36,7 +36,6 @@
DBTransaction,
ArrowTable,
)
from dlt.common.destination.reference import SupportsRelationshipAccess


from dlt.destinations.typing import DBApi, TNativeConn, DBApiCursor, DataFrame, DBTransaction
@@ -52,7 +51,7 @@ class TJobQueryTags(TypedDict):
pipeline_name: str


class SqlClientBase(SupportsRelationshipAccess, ABC, Generic[TNativeConn]):
class SqlClientBase(ABC, Generic[TNativeConn]):
dbapi: ClassVar[DBApi] = None

database_name: Optional[str]
@@ -286,16 +285,6 @@ def _truncate_table_sql(self, qualified_table_name: str) -> str:
else:
return f"DELETE FROM {qualified_table_name} WHERE 1=1;"

@contextmanager
def cursor_for_relation(
self, *, table: str = None, sql: str = None, prepare_tables: List[str] = None
) -> Generator[DBApiCursor, Any, Any]:
if not sql:
table = self.make_qualified_table_name(table)
sql = f"SELECT * FROM {table}"
with self.execute_query(sql) as cursor:
yield cursor


class DBApiCursorImpl(DBApiCursor):
"""A DBApi Cursor wrapper with dataframes reading functionality"""
4 changes: 1 addition & 3 deletions dlt/pipeline/pipeline.py
Original file line number Diff line number Diff line change
@@ -1704,6 +1704,4 @@ def __getstate__(self) -> Any:
@property
def dataset(self) -> SupportsReadDataset:
"""Access helper to dataset"""
from dlt.destinations.dataset import Dataset

return Dataset(self.destination_client())
return self.destination_client().dataset()

0 comments on commit 28cb282

Please sign in to comment.