From 4ee65a8269cd0309f8bf72fda6546275cbbc6491 Mon Sep 17 00:00:00 2001 From: David Scharf Date: Tue, 8 Oct 2024 14:30:56 +0200 Subject: [PATCH] data pond: expose readable datasets as dataframes and arrow tables (#1507) * add simple ibis helper * start working on dataframe reading interface * a bit more work * first simple implementation * small change * more work on dataset * some work on filesystem destination * add support for parquet files and compression on jsonl files in filesystem dataframe implementation * fix test after devel merge * add nice composable pipeline example * small updates to demo * enable tests for all bucket providers remove resource based dataset accessor * fix tests * create views in duckdb filesystem accessor * move to relations based interface * add generic duckdb interface to filesystem * move code for accessing frames and tables to the cursor and use duckdb dbapi cursor in filesystem * add native db api cursor fetching to exposed dataset * some small changes * switch dataaccess pandas to pyarrow * add native bigquery support for df and arrow tables * change iter functions to always expect chunk size (None will default to full frame/table) * add native implementation for databricks * add dremio native implementation for full frames and tables * fix filesystem test make filesystem duckdb instance use glob pattern * add test for evolving filesystem * fix empty dataframe retrieval * remove old df test * clean up interfaces a bit (more to come?) remove pipeline dependency from dataset * move dataset creation into destination client and clean up interfaces / reference a bit more * renames some interfaces and adds brief docstrings * add filesystem cached duckdb and remove the need to declare needed views for filesystem * fix tests for snowflake * make data set a function * fix db-types depdency for bigquery * create duckdb based sql client for filesystem * fix example pipeline * enable filesystem sql client to work on streamlit * add comments * rename sql to query remove unneeded code * fix tests that rely on sql client * post merge cleanups * move imports around a bit * exclude abfss buckets from test * add support for arrow schema creation from known dlt schema * re-use sqldatabase code for cursors * fix bug * add default columns where needed * add sql glot to filesystem deps * store filesystem tables in correct dataset * move cursor columns location * fix snowflake and mssql disable tests with sftp * clean up compose files a bit * fix sqlalchemy * add mysql docker compose file * fix linting * prepare hint checking * disable part of state test * enable hint check * add column type support for filesystem json * rename dataset implementation to DBAPI remove dataset specific code from destination client * wrap functions in dbapi readable dataset * remove example pipeline * rename test_decimal_name * make column code a bit clearer and fix mssql again * rename df methods to pandas * fix bug in default columns * fix hints test and columns bug removes some uneeded code * catch mysql error if no rows returned * add exceptions for not implemented bucket and filetypes * fix docs * add config section for getting pipeline clients * set default dataset in filesystem sqlclient * add config section for sync_destination * rename readablerelation methods * use more functions of the duckdb sql client in filesystem version * update dependencies * use active pipeline capabilities if available for arrow table * update types * rename dataset accessor function * add test for accessing tables with unquqlified tablename * fix sql client * add duckdb native support for azure, s3 and gcs (via s3) * some typing * add dataframes tests back in * add join table and update view tests for filesystem * start adding tests for creating views on remote duckdb * fix snippets * fix some dependencies and mssql/synapse tests * fix bigquery dependencies and abfss tests * add tests for adding view to external dbs and persistent secrets * add support for delta tables * add duckdb to read interface tests * fix delta tests * make default secret name derived from bucket url * try fix azure tests again * fix df access tests * PR fixes * correct internal table access * allow datasets without schema * skips parametrized queries, skips tables from non-dataset schemas * move filesystem specific sql_client tests to correct location and test a few more things * fix sql client tests * make secret name when dropping optional * fix gs test * remove moved filesystem tests from test_read_interfaces * fix sql client tests again... :) * clear duckdb secrets * disable secrets deleting for delta tests --------- Co-authored-by: Marcin Rudolf --- .github/workflows/test_destinations.yml | 5 +- .github/workflows/test_doc_snippets.yml | 2 +- .github/workflows/test_local_destinations.yml | 4 +- .github/workflows/test_pyarrow17.yml | 8 +- .../test_sqlalchemy_destinations.yml | 3 - Makefile | 9 +- dlt/common/data_writers/writers.py | 17 +- dlt/common/destination/reference.py | 85 ++++- dlt/common/libs/pandas.py | 1 + dlt/common/libs/pyarrow.py | 147 ++++++++ dlt/common/typing.py | 1 + dlt/destinations/dataset.py | 99 ++++++ dlt/destinations/fs_client.py | 3 +- dlt/destinations/impl/athena/athena.py | 2 +- dlt/destinations/impl/bigquery/sql_client.py | 30 +- .../impl/databricks/sql_client.py | 39 ++- dlt/destinations/impl/dremio/sql_client.py | 10 +- dlt/destinations/impl/duckdb/configuration.py | 6 +- dlt/destinations/impl/duckdb/sql_client.py | 48 ++- .../impl/filesystem/filesystem.py | 41 ++- .../impl/filesystem/sql_client.py | 279 +++++++++++++++ dlt/destinations/impl/mssql/mssql.py | 5 +- dlt/destinations/impl/mssql/sql_client.py | 3 +- dlt/destinations/impl/postgres/sql_client.py | 3 +- dlt/destinations/impl/snowflake/sql_client.py | 3 +- .../impl/sqlalchemy/db_api_client.py | 15 +- dlt/destinations/job_client_impl.py | 17 +- dlt/destinations/sql_client.py | 97 ++++- dlt/destinations/typing.py | 44 +-- dlt/helpers/streamlit_app/pages/load_info.py | 2 +- dlt/pipeline/pipeline.py | 21 +- dlt/sources/sql_database/arrow_helpers.py | 147 +------- dlt/sources/sql_database/helpers.py | 2 +- .../dlt-ecosystem/transformations/pandas.md | 2 +- .../visualizations/exploring-the-data.md | 2 +- poetry.lock | 142 +++++++- pyproject.toml | 8 +- tests/load/filesystem/test_sql_client.py | 330 ++++++++++++++++++ tests/load/pipeline/test_restore_state.py | 4 + tests/load/sqlalchemy/docker-compose.yml | 16 + tests/load/test_read_interfaces.py | 302 ++++++++++++++++ tests/load/test_sql_client.py | 10 +- tests/load/utils.py | 14 +- .../load/weaviate/docker-compose.yml | 0 .../sql_database/test_arrow_helpers.py | 4 +- 45 files changed, 1734 insertions(+), 298 deletions(-) create mode 100644 dlt/destinations/dataset.py create mode 100644 dlt/destinations/impl/filesystem/sql_client.py create mode 100644 tests/load/filesystem/test_sql_client.py create mode 100644 tests/load/sqlalchemy/docker-compose.yml create mode 100644 tests/load/test_read_interfaces.py rename .github/weaviate-compose.yml => tests/load/weaviate/docker-compose.yml (100%) diff --git a/.github/workflows/test_destinations.yml b/.github/workflows/test_destinations.yml index ada73b85d9..95fbd83ad9 100644 --- a/.github/workflows/test_destinations.yml +++ b/.github/workflows/test_destinations.yml @@ -77,11 +77,14 @@ jobs: - name: Install dependencies # if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true' - run: poetry install --no-interaction -E redshift -E gs -E s3 -E az -E parquet -E duckdb -E cli --with sentry-sdk --with pipeline -E deltalake + run: poetry install --no-interaction -E redshift -E gs -E s3 -E az -E parquet -E duckdb -E cli -E filesystem --with sentry-sdk --with pipeline -E deltalake - name: create secrets.toml run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml + - name: clear duckdb secrets and cache + run: rm -rf ~/.duckdb + - run: | poetry run pytest tests/load --ignore tests/load/sources -m "essential" name: Run essential tests Linux diff --git a/.github/workflows/test_doc_snippets.yml b/.github/workflows/test_doc_snippets.yml index 2bff0df899..faa2c59a0b 100644 --- a/.github/workflows/test_doc_snippets.yml +++ b/.github/workflows/test_doc_snippets.yml @@ -60,7 +60,7 @@ jobs: uses: actions/checkout@master - name: Start weaviate - run: docker compose -f ".github/weaviate-compose.yml" up -d + run: docker compose -f "tests/load/weaviate/docker-compose.yml" up -d - name: Setup Python uses: actions/setup-python@v4 diff --git a/.github/workflows/test_local_destinations.yml b/.github/workflows/test_local_destinations.yml index 8911e05ecc..51a078b1ab 100644 --- a/.github/workflows/test_local_destinations.yml +++ b/.github/workflows/test_local_destinations.yml @@ -73,7 +73,7 @@ jobs: uses: actions/checkout@master - name: Start weaviate - run: docker compose -f ".github/weaviate-compose.yml" up -d + run: docker compose -f "tests/load/weaviate/docker-compose.yml" up -d - name: Setup Python uses: actions/setup-python@v4 @@ -122,7 +122,7 @@ jobs: - name: Stop weaviate if: always() - run: docker compose -f ".github/weaviate-compose.yml" down -v + run: docker compose -f "tests/load/weaviate/docker-compose.yml" down -v - name: Stop SFTP server if: always() diff --git a/.github/workflows/test_pyarrow17.yml b/.github/workflows/test_pyarrow17.yml index 78d6742ac1..dc776e4ce1 100644 --- a/.github/workflows/test_pyarrow17.yml +++ b/.github/workflows/test_pyarrow17.yml @@ -65,14 +65,18 @@ jobs: key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-pyarrow17 - name: Install dependencies - run: poetry install --no-interaction --with sentry-sdk --with pipeline -E deltalake -E gs -E s3 -E az + run: poetry install --no-interaction --with sentry-sdk --with pipeline -E deltalake -E duckdb -E filesystem -E gs -E s3 -E az + - name: Upgrade pyarrow run: poetry run pip install pyarrow==17.0.0 - + - name: create secrets.toml run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml + - name: clear duckdb secrets and cache + run: rm -rf ~/.duckdb + - name: Run needspyarrow17 tests Linux run: | poetry run pytest tests/libs -m "needspyarrow17" diff --git a/.github/workflows/test_sqlalchemy_destinations.yml b/.github/workflows/test_sqlalchemy_destinations.yml index 5da2dac04b..a38d644158 100644 --- a/.github/workflows/test_sqlalchemy_destinations.yml +++ b/.github/workflows/test_sqlalchemy_destinations.yml @@ -94,6 +94,3 @@ jobs: # always run full suite, also on branches - run: poetry run pytest tests/load -x --ignore tests/load/sources name: Run tests Linux - env: - DESTINATION__SQLALCHEMY_MYSQL__CREDENTIALS: mysql://root:root@127.0.0.1:3306/dlt_data # Use root cause we need to create databases - DESTINATION__SQLALCHEMY_SQLITE__CREDENTIALS: sqlite:///_storage/dl_data.sqlite diff --git a/Makefile b/Makefile index 3a99d96e5e..4a786ed528 100644 --- a/Makefile +++ b/Makefile @@ -109,4 +109,11 @@ test-build-images: build-library preprocess-docs: # run docs preprocessing to run a few checks and ensure examples can be parsed - cd docs/website && npm i && npm run preprocess-docs \ No newline at end of file + cd docs/website && npm i && npm run preprocess-docs + +start-test-containers: + docker compose -f "tests/load/dremio/docker-compose.yml" up -d + docker compose -f "tests/load/postgres/docker-compose.yml" up -d + docker compose -f "tests/load/weaviate/docker-compose.yml" up -d + docker compose -f "tests/load/filesystem_sftp/docker-compose.yml" up -d + docker compose -f "tests/load/sqlalchemy/docker-compose.yml" up -d diff --git a/dlt/common/data_writers/writers.py b/dlt/common/data_writers/writers.py index d6be15abdd..b3b997629f 100644 --- a/dlt/common/data_writers/writers.py +++ b/dlt/common/data_writers/writers.py @@ -320,23 +320,10 @@ def _create_writer(self, schema: "pa.Schema") -> "pa.parquet.ParquetWriter": ) def write_header(self, columns_schema: TTableSchemaColumns) -> None: - from dlt.common.libs.pyarrow import pyarrow, get_py_arrow_datatype + from dlt.common.libs.pyarrow import columns_to_arrow # build schema - self.schema = pyarrow.schema( - [ - pyarrow.field( - name, - get_py_arrow_datatype( - schema_item, - self._caps, - self.timestamp_timezone, - ), - nullable=is_nullable_column(schema_item), - ) - for name, schema_item in columns_schema.items() - ] - ) + self.schema = columns_to_arrow(columns_schema, self._caps, self.timestamp_timezone) # find row items that are of the json type (could be abstracted out for use in other writers?) self.nested_indices = [ i for i, field in columns_schema.items() if field["data_type"] == "json" diff --git a/dlt/common/destination/reference.py b/dlt/common/destination/reference.py index 527b9419e8..0c572379de 100644 --- a/dlt/common/destination/reference.py +++ b/dlt/common/destination/reference.py @@ -1,6 +1,8 @@ from abc import ABC, abstractmethod import dataclasses from importlib import import_module +from contextlib import contextmanager + from types import TracebackType from typing import ( Callable, @@ -18,24 +20,33 @@ Any, TypeVar, Generic, + Generator, + TYPE_CHECKING, + Protocol, + Tuple, + AnyStr, ) from typing_extensions import Annotated import datetime # noqa: 251 import inspect from dlt.common import logger, pendulum + from dlt.common.configuration.specs.base_configuration import extract_inner_hint from dlt.common.destination.typing import PreparedTableSchema from dlt.common.destination.utils import verify_schema_capabilities, verify_supported_data_types from dlt.common.exceptions import TerminalException from dlt.common.metrics import LoadJobMetrics from dlt.common.normalizers.naming import NamingConvention -from dlt.common.schema import Schema, TSchemaTables +from dlt.common.schema.typing import TTableSchemaColumns + +from dlt.common.schema import Schema, TSchemaTables, TTableSchema from dlt.common.schema.typing import ( C_DLT_LOAD_ID, TLoaderReplaceStrategy, ) from dlt.common.schema.utils import fill_hints_from_parent_and_clone_table + from dlt.common.configuration import configspec, resolve_configuration, known_sections, NotResolved from dlt.common.configuration.specs import BaseConfiguration, CredentialsConfiguration from dlt.common.destination.capabilities import DestinationCapabilitiesContext @@ -49,6 +60,8 @@ from dlt.common.storages import FileStorage from dlt.common.storages.load_storage import ParsedLoadJobFileName from dlt.common.storages.load_package import LoadJobInfo, TPipelineStateDoc +from dlt.common.exceptions import MissingDependencyException + TDestinationConfig = TypeVar("TDestinationConfig", bound="DestinationClientConfiguration") TDestinationClient = TypeVar("TDestinationClient", bound="JobClientBase") @@ -56,6 +69,17 @@ DEFAULT_FILE_LAYOUT = "{table_name}/{load_id}.{file_id}.{ext}" +if TYPE_CHECKING: + try: + from dlt.common.libs.pandas import DataFrame + from dlt.common.libs.pyarrow import Table as ArrowTable + except MissingDependencyException: + DataFrame = Any + ArrowTable = Any +else: + DataFrame = Any + ArrowTable = Any + class StorageSchemaInfo(NamedTuple): version_hash: str @@ -442,6 +466,65 @@ def create_followup_jobs(self, final_state: TLoadJobState) -> List[FollowupJobRe return [] +class SupportsReadableRelation(Protocol): + """A readable relation retrieved from a destination that supports it""" + + schema_columns: TTableSchemaColumns + """Known dlt table columns for this relation""" + + def df(self, chunk_size: int = None) -> Optional[DataFrame]: + """Fetches the results as data frame. For large queries the results may be chunked + + Fetches the results into a data frame. The default implementation uses helpers in `pandas.io.sql` to generate Pandas data frame. + This function will try to use native data frame generation for particular destination. For `BigQuery`: `QueryJob.to_dataframe` is used. + For `duckdb`: `DuckDBPyConnection.df' + + Args: + chunk_size (int, optional): Will chunk the results into several data frames. Defaults to None + **kwargs (Any): Additional parameters which will be passed to native data frame generation function. + + Returns: + Optional[DataFrame]: A data frame with query results. If chunk_size > 0, None will be returned if there is no more data in results + """ + ... + + def arrow(self, chunk_size: int = None) -> Optional[ArrowTable]: ... + + def iter_df(self, chunk_size: int) -> Generator[DataFrame, None, None]: ... + + def iter_arrow(self, chunk_size: int) -> Generator[ArrowTable, None, None]: ... + + def fetchall(self) -> List[Tuple[Any, ...]]: ... + + def fetchmany(self, chunk_size: int) -> List[Tuple[Any, ...]]: ... + + def iter_fetch(self, chunk_size: int) -> Generator[List[Tuple[Any, ...]], Any, Any]: ... + + def fetchone(self) -> Optional[Tuple[Any, ...]]: ... + + +class DBApiCursor(SupportsReadableRelation): + """Protocol for DBAPI cursor""" + + description: Tuple[Any, ...] + + native_cursor: "DBApiCursor" + """Cursor implementation native to current destination""" + + def execute(self, query: AnyStr, *args: Any, **kwargs: Any) -> None: ... + def close(self) -> None: ... + + +class SupportsReadableDataset(Protocol): + """A readable dataset retrieved from a destination, has support for creating readable relations for a query or table""" + + def __call__(self, query: Any) -> SupportsReadableRelation: ... + + def __getitem__(self, table: str) -> SupportsReadableRelation: ... + + def __getattr__(self, table: str) -> SupportsReadableRelation: ... + + class JobClientBase(ABC): def __init__( self, diff --git a/dlt/common/libs/pandas.py b/dlt/common/libs/pandas.py index 022aa9b9cd..a165ea8747 100644 --- a/dlt/common/libs/pandas.py +++ b/dlt/common/libs/pandas.py @@ -3,6 +3,7 @@ try: import pandas + from pandas import DataFrame except ModuleNotFoundError: raise MissingDependencyException("dlt Pandas Helpers", ["pandas"]) diff --git a/dlt/common/libs/pyarrow.py b/dlt/common/libs/pyarrow.py index adba832c43..805b43b163 100644 --- a/dlt/common/libs/pyarrow.py +++ b/dlt/common/libs/pyarrow.py @@ -18,6 +18,8 @@ from dlt.common.pendulum import pendulum from dlt.common.exceptions import MissingDependencyException from dlt.common.schema.typing import C_DLT_ID, C_DLT_LOAD_ID, TTableSchemaColumns +from dlt.common import logger, json +from dlt.common.json import custom_encode, map_nested_in_place from dlt.common.destination.capabilities import DestinationCapabilitiesContext from dlt.common.schema.typing import TColumnType @@ -31,6 +33,7 @@ import pyarrow.compute import pyarrow.dataset from pyarrow.parquet import ParquetFile + from pyarrow import Table except ModuleNotFoundError: raise MissingDependencyException( "dlt pyarrow helpers", @@ -394,6 +397,37 @@ def py_arrow_to_table_schema_columns(schema: pyarrow.Schema) -> TTableSchemaColu return result +def columns_to_arrow( + columns: TTableSchemaColumns, + caps: DestinationCapabilitiesContext, + timestamp_timezone: str = "UTC", +) -> pyarrow.Schema: + """Convert a table schema columns dict to a pyarrow schema. + + Args: + columns (TTableSchemaColumns): table schema columns + + Returns: + pyarrow.Schema: pyarrow schema + + """ + return pyarrow.schema( + [ + pyarrow.field( + name, + get_py_arrow_datatype( + schema_item, + caps or DestinationCapabilitiesContext.generic_capabilities(), + timestamp_timezone, + ), + nullable=schema_item.get("nullable", True), + ) + for name, schema_item in columns.items() + if schema_item.get("data_type") is not None + ] + ) + + def get_parquet_metadata(parquet_file: TFileOrPath) -> Tuple[int, pyarrow.Schema]: """Gets parquet file metadata (including row count and schema) @@ -531,6 +565,119 @@ def concat_batches_and_tables_in_order( return pyarrow.concat_tables(tables, promote_options="none") +def row_tuples_to_arrow( + rows: Sequence[Any], caps: DestinationCapabilitiesContext, columns: TTableSchemaColumns, tz: str +) -> Any: + """Converts the rows to an arrow table using the columns schema. + Columns missing `data_type` will be inferred from the row data. + Columns with object types not supported by arrow are excluded from the resulting table. + """ + from dlt.common.libs.pyarrow import pyarrow as pa + import numpy as np + + try: + from pandas._libs import lib + + pivoted_rows = lib.to_object_array_tuples(rows).T + except ImportError: + logger.info( + "Pandas not installed, reverting to numpy.asarray to create a table which is slower" + ) + pivoted_rows = np.asarray(rows, dtype="object", order="k").T # type: ignore[call-overload] + + columnar = { + col: dat.ravel() for col, dat in zip(columns, np.vsplit(pivoted_rows, len(columns))) + } + columnar_known_types = { + col["name"]: columnar[col["name"]] + for col in columns.values() + if col.get("data_type") is not None + } + columnar_unknown_types = { + col["name"]: columnar[col["name"]] + for col in columns.values() + if col.get("data_type") is None + } + + arrow_schema = columns_to_arrow(columns, caps, tz) + + for idx in range(0, len(arrow_schema.names)): + field = arrow_schema.field(idx) + py_type = type(rows[0][idx]) + # cast double / float ndarrays to decimals if type mismatch, looks like decimals and floats are often mixed up in dialects + if pa.types.is_decimal(field.type) and issubclass(py_type, (str, float)): + logger.warning( + f"Field {field.name} was reflected as decimal type, but rows contains" + f" {py_type.__name__}. Additional cast is required which may slow down arrow table" + " generation." + ) + float_array = pa.array(columnar_known_types[field.name], type=pa.float64()) + columnar_known_types[field.name] = float_array.cast(field.type, safe=False) + if issubclass(py_type, (dict, list)): + logger.warning( + f"Field {field.name} was reflected as JSON type and needs to be serialized back to" + " string to be placed in arrow table. This will slow data extraction down. You" + " should cast JSON field to STRING in your database system ie. by creating and" + " extracting an SQL VIEW that selects with cast." + ) + json_str_array = pa.array( + [None if s is None else json.dumps(s) for s in columnar_known_types[field.name]] + ) + columnar_known_types[field.name] = json_str_array + + # If there are unknown type columns, first create a table to infer their types + if columnar_unknown_types: + new_schema_fields = [] + for key in list(columnar_unknown_types): + arrow_col: Optional[pa.Array] = None + try: + arrow_col = pa.array(columnar_unknown_types[key]) + if pa.types.is_null(arrow_col.type): + logger.warning( + f"Column {key} contains only NULL values and data type could not be" + " inferred. This column is removed from a arrow table" + ) + continue + + except pa.ArrowInvalid as e: + # Try coercing types not supported by arrow to a json friendly format + # E.g. dataclasses -> dict, UUID -> str + try: + arrow_col = pa.array( + map_nested_in_place(custom_encode, list(columnar_unknown_types[key])) + ) + logger.warning( + f"Column {key} contains a data type which is not supported by pyarrow and" + f" got converted into {arrow_col.type}. This slows down arrow table" + " generation." + ) + except (pa.ArrowInvalid, TypeError): + logger.warning( + f"Column {key} contains a data type which is not supported by pyarrow. This" + f" column will be ignored. Error: {e}" + ) + if arrow_col is not None: + columnar_known_types[key] = arrow_col + new_schema_fields.append( + pa.field( + key, + arrow_col.type, + nullable=columns[key]["nullable"], + ) + ) + + # New schema + column_order = {name: idx for idx, name in enumerate(columns)} + arrow_schema = pa.schema( + sorted( + list(arrow_schema) + new_schema_fields, + key=lambda x: column_order[x.name], + ) + ) + + return pa.Table.from_pydict(columnar_known_types, schema=arrow_schema) + + class NameNormalizationCollision(ValueError): def __init__(self, reason: str) -> None: msg = f"Arrow column name collision after input data normalization. {reason}" diff --git a/dlt/common/typing.py b/dlt/common/typing.py index 8d18d84400..4bdfa27ad9 100644 --- a/dlt/common/typing.py +++ b/dlt/common/typing.py @@ -79,6 +79,7 @@ REPattern = _REPattern PathLike = os.PathLike + AnyType: TypeAlias = Any NoneType = type(None) DictStrAny: TypeAlias = Dict[str, Any] diff --git a/dlt/destinations/dataset.py b/dlt/destinations/dataset.py new file mode 100644 index 0000000000..a5584851e9 --- /dev/null +++ b/dlt/destinations/dataset.py @@ -0,0 +1,99 @@ +from typing import Any, Generator, AnyStr, Optional + +from contextlib import contextmanager +from dlt.common.destination.reference import ( + SupportsReadableRelation, + SupportsReadableDataset, +) + +from dlt.common.schema.typing import TTableSchemaColumns +from dlt.destinations.sql_client import SqlClientBase +from dlt.common.schema import Schema + + +class ReadableDBAPIRelation(SupportsReadableRelation): + def __init__( + self, + *, + client: SqlClientBase[Any], + query: Any, + schema_columns: TTableSchemaColumns = None, + ) -> None: + """Create a lazy evaluated relation to for the dataset of a destination""" + self.client = client + self.schema_columns = schema_columns + self.query = query + + # wire protocol functions + self.df = self._wrap_func("df") # type: ignore + self.arrow = self._wrap_func("arrow") # type: ignore + self.fetchall = self._wrap_func("fetchall") # type: ignore + self.fetchmany = self._wrap_func("fetchmany") # type: ignore + self.fetchone = self._wrap_func("fetchone") # type: ignore + + self.iter_df = self._wrap_iter("iter_df") # type: ignore + self.iter_arrow = self._wrap_iter("iter_arrow") # type: ignore + self.iter_fetch = self._wrap_iter("iter_fetch") # type: ignore + + @contextmanager + def cursor(self) -> Generator[SupportsReadableRelation, Any, Any]: + """Gets a DBApiCursor for the current relation""" + with self.client as client: + # this hacky code is needed for mssql to disable autocommit, read iterators + # will not work otherwise. in the future we should be able to create a readony + # client which will do this automatically + if hasattr(self.client, "_conn") and hasattr(self.client._conn, "autocommit"): + self.client._conn.autocommit = False + with client.execute_query(self.query) as cursor: + if self.schema_columns: + cursor.schema_columns = self.schema_columns + yield cursor + + def _wrap_iter(self, func_name: str) -> Any: + """wrap SupportsReadableRelation generators in cursor context""" + + def _wrap(*args: Any, **kwargs: Any) -> Any: + with self.cursor() as cursor: + yield from getattr(cursor, func_name)(*args, **kwargs) + + return _wrap + + def _wrap_func(self, func_name: str) -> Any: + """wrap SupportsReadableRelation functions in cursor context""" + + def _wrap(*args: Any, **kwargs: Any) -> Any: + with self.cursor() as cursor: + return getattr(cursor, func_name)(*args, **kwargs) + + return _wrap + + +class ReadableDBAPIDataset(SupportsReadableDataset): + """Access to dataframes and arrowtables in the destination dataset via dbapi""" + + def __init__(self, client: SqlClientBase[Any], schema: Optional[Schema]) -> None: + self.client = client + self.schema = schema + + def __call__( + self, query: Any, schema_columns: TTableSchemaColumns = None + ) -> ReadableDBAPIRelation: + schema_columns = schema_columns or {} + return ReadableDBAPIRelation(client=self.client, query=query, schema_columns=schema_columns) # type: ignore[abstract] + + def table(self, table_name: str) -> SupportsReadableRelation: + # prepare query for table relation + schema_columns = ( + self.schema.tables.get(table_name, {}).get("columns", {}) if self.schema else {} + ) + table_name = self.client.make_qualified_table_name(table_name) + query = f"SELECT * FROM {table_name}" + return self(query, schema_columns) + + def __getitem__(self, table_name: str) -> SupportsReadableRelation: + """access of table via dict notation""" + return self.table(table_name) + + def __getattr__(self, table_name: str) -> SupportsReadableRelation: + """access of table via property notation""" + return self.table(table_name) diff --git a/dlt/destinations/fs_client.py b/dlt/destinations/fs_client.py index 14e77b6b4e..ab4c91544a 100644 --- a/dlt/destinations/fs_client.py +++ b/dlt/destinations/fs_client.py @@ -1,5 +1,6 @@ -import gzip from typing import Iterable, cast, Any, List + +import gzip from abc import ABC, abstractmethod from fsspec import AbstractFileSystem diff --git a/dlt/destinations/impl/athena/athena.py b/dlt/destinations/impl/athena/athena.py index 72611a9568..a2e2566a76 100644 --- a/dlt/destinations/impl/athena/athena.py +++ b/dlt/destinations/impl/athena/athena.py @@ -56,7 +56,7 @@ raise_database_error, raise_open_connection_error, ) -from dlt.destinations.typing import DBApiCursor +from dlt.common.destination.reference import DBApiCursor from dlt.destinations.job_client_impl import SqlJobClientWithStagingDataset from dlt.destinations.job_impl import FinalizedLoadJobWithFollowupJobs, FinalizedLoadJob from dlt.destinations.impl.athena.configuration import AthenaClientConfiguration diff --git a/dlt/destinations/impl/bigquery/sql_client.py b/dlt/destinations/impl/bigquery/sql_client.py index c56742f1ff..650db1d8b9 100644 --- a/dlt/destinations/impl/bigquery/sql_client.py +++ b/dlt/destinations/impl/bigquery/sql_client.py @@ -23,7 +23,8 @@ raise_database_error, raise_open_connection_error, ) -from dlt.destinations.typing import DBApi, DBApiCursor, DBTransaction, DataFrame +from dlt.destinations.typing import DBApi, DBTransaction, DataFrame, ArrowTable +from dlt.common.destination.reference import DBApiCursor # terminal reasons as returned in BQ gRPC error response @@ -44,32 +45,15 @@ class BigQueryDBApiCursorImpl(DBApiCursorImpl): """Use native BigQuery data frame support if available""" native_cursor: BQDbApiCursor # type: ignore - df_iterator: Generator[Any, None, None] def __init__(self, curr: DBApiCursor) -> None: super().__init__(curr) - self.df_iterator = None - def df(self, chunk_size: Optional[int] = None, **kwargs: Any) -> DataFrame: - query_job: bigquery.QueryJob = getattr( - self.native_cursor, "_query_job", self.native_cursor.query_job - ) - if self.df_iterator: - return next(self.df_iterator, None) - try: - if chunk_size is not None: - # create iterator with given page size - self.df_iterator = query_job.result(page_size=chunk_size).to_dataframe_iterable() - return next(self.df_iterator, None) - return query_job.to_dataframe(**kwargs) - except ValueError as ex: - # no pyarrow/db-types, fallback to our implementation - logger.warning(f"Native BigQuery pandas reader could not be used: {str(ex)}") - return super().df(chunk_size=chunk_size) - - def close(self) -> None: - if self.df_iterator: - self.df_iterator.close() + def iter_df(self, chunk_size: int) -> Generator[DataFrame, None, None]: + yield from self.native_cursor.query_job.result(page_size=chunk_size).to_dataframe_iterable() + + def iter_arrow(self, chunk_size: int) -> Generator[ArrowTable, None, None]: + yield from self.native_cursor.query_job.result(page_size=chunk_size).to_arrow_iterable() class BigQuerySqlClient(SqlClientBase[bigquery.Client], DBTransaction): diff --git a/dlt/destinations/impl/databricks/sql_client.py b/dlt/destinations/impl/databricks/sql_client.py index 8228fa06a4..88d47410d5 100644 --- a/dlt/destinations/impl/databricks/sql_client.py +++ b/dlt/destinations/impl/databricks/sql_client.py @@ -1,5 +1,17 @@ from contextlib import contextmanager, suppress -from typing import Any, AnyStr, ClassVar, Iterator, Optional, Sequence, List, Tuple, Union, Dict +from typing import ( + Any, + AnyStr, + ClassVar, + Generator, + Iterator, + Optional, + Sequence, + List, + Tuple, + Union, + Dict, +) from databricks import sql as databricks_lib @@ -21,25 +33,30 @@ raise_database_error, raise_open_connection_error, ) -from dlt.destinations.typing import DBApi, DBApiCursor, DBTransaction, DataFrame +from dlt.destinations.typing import ArrowTable, DBApi, DBTransaction, DataFrame from dlt.destinations.impl.databricks.configuration import DatabricksCredentials +from dlt.common.destination.reference import DBApiCursor class DatabricksCursorImpl(DBApiCursorImpl): """Use native data frame support if available""" native_cursor: DatabricksSqlCursor # type: ignore[assignment] - vector_size: ClassVar[int] = 2048 + vector_size: ClassVar[int] = 2048 # vector size is 2048 - def df(self, chunk_size: int = None, **kwargs: Any) -> DataFrame: + def iter_arrow(self, chunk_size: int) -> Generator[ArrowTable, None, None]: if chunk_size is None: - return self.native_cursor.fetchall_arrow().to_pandas() - else: - df = self.native_cursor.fetchmany_arrow(chunk_size).to_pandas() - if df.shape[0] == 0: - return None - else: - return df + yield self.native_cursor.fetchall_arrow() + return + while True: + table = self.native_cursor.fetchmany_arrow(chunk_size) + if table.num_rows == 0: + return + yield table + + def iter_df(self, chunk_size: int) -> Generator[DataFrame, None, None]: + for table in self.iter_arrow(chunk_size=chunk_size): + yield table.to_pandas() class DatabricksSqlClient(SqlClientBase[DatabricksSqlConnection], DBTransaction): diff --git a/dlt/destinations/impl/dremio/sql_client.py b/dlt/destinations/impl/dremio/sql_client.py index 7dee056da7..030009c74b 100644 --- a/dlt/destinations/impl/dremio/sql_client.py +++ b/dlt/destinations/impl/dremio/sql_client.py @@ -18,7 +18,8 @@ raise_database_error, raise_open_connection_error, ) -from dlt.destinations.typing import DBApi, DBApiCursor, DBTransaction, DataFrame +from dlt.destinations.typing import DBApi, DBTransaction, DataFrame +from dlt.common.destination.reference import DBApiCursor class DremioCursorImpl(DBApiCursorImpl): @@ -26,9 +27,14 @@ class DremioCursorImpl(DBApiCursorImpl): def df(self, chunk_size: int = None, **kwargs: Any) -> Optional[DataFrame]: if chunk_size is None: - return self.native_cursor.fetch_arrow_table().to_pandas() + return self.arrow(chunk_size=chunk_size).to_pandas() return super().df(chunk_size=chunk_size, **kwargs) + def arrow(self, chunk_size: int = None, **kwargs: Any) -> Optional[DataFrame]: + if chunk_size is None: + return self.native_cursor.fetch_arrow_table() + return super().arrow(chunk_size=chunk_size, **kwargs) + class DremioSqlClient(SqlClientBase[pydremio.DremioConnection]): dbapi: ClassVar[DBApi] = pydremio diff --git a/dlt/destinations/impl/duckdb/configuration.py b/dlt/destinations/impl/duckdb/configuration.py index ec58d66c8b..33a2bb7f78 100644 --- a/dlt/destinations/impl/duckdb/configuration.py +++ b/dlt/destinations/impl/duckdb/configuration.py @@ -83,6 +83,11 @@ def parse_native_representation(self, native_value: Any) -> None: else: raise + @property + def has_open_connection(self) -> bool: + """Returns true if connection was not yet created or no connections were borrowed in case of external connection""" + return not hasattr(self, "_conn") or self._conn_borrows == 0 + def _get_conn_config(self) -> Dict[str, Any]: return {} @@ -90,7 +95,6 @@ def _conn_str(self) -> str: return self.database def _delete_conn(self) -> None: - # print("Closing conn because is owner") self._conn.close() delattr(self, "_conn") diff --git a/dlt/destinations/impl/duckdb/sql_client.py b/dlt/destinations/impl/duckdb/sql_client.py index 80bbbedc9c..89a522c8f7 100644 --- a/dlt/destinations/impl/duckdb/sql_client.py +++ b/dlt/destinations/impl/duckdb/sql_client.py @@ -1,7 +1,9 @@ import duckdb +import math + from contextlib import contextmanager -from typing import Any, AnyStr, ClassVar, Iterator, Optional, Sequence +from typing import Any, AnyStr, ClassVar, Iterator, Optional, Sequence, Generator from dlt.common.destination import DestinationCapabilitiesContext from dlt.destinations.exceptions import ( @@ -9,7 +11,7 @@ DatabaseTransientException, DatabaseUndefinedRelation, ) -from dlt.destinations.typing import DBApi, DBApiCursor, DBTransaction, DataFrame +from dlt.destinations.typing import DBApi, DBTransaction, DataFrame, ArrowTable from dlt.destinations.sql_client import ( SqlClientBase, DBApiCursorImpl, @@ -18,26 +20,42 @@ ) from dlt.destinations.impl.duckdb.configuration import DuckDbBaseCredentials +from dlt.common.destination.reference import DBApiCursor class DuckDBDBApiCursorImpl(DBApiCursorImpl): """Use native duckdb data frame support if available""" native_cursor: duckdb.DuckDBPyConnection # type: ignore - vector_size: ClassVar[int] = 2048 - - def df(self, chunk_size: int = None, **kwargs: Any) -> DataFrame: - if chunk_size is None: - return self.native_cursor.df(**kwargs) - else: - multiple = chunk_size // self.vector_size + ( - 0 if self.vector_size % chunk_size == 0 else 1 - ) - df = self.native_cursor.fetch_df_chunk(multiple, **kwargs) + vector_size: ClassVar[int] = 2048 # vector size is 2048 + + def _get_page_count(self, chunk_size: int) -> int: + """get the page count for vector size""" + if chunk_size < self.vector_size: + return 1 + return math.floor(chunk_size / self.vector_size) + + def iter_df(self, chunk_size: int) -> Generator[DataFrame, None, None]: + # full frame + if not chunk_size: + yield self.native_cursor.fetch_df() + return + # iterate + while True: + df = self.native_cursor.fetch_df_chunk(self._get_page_count(chunk_size)) if df.shape[0] == 0: - return None - else: - return df + break + yield df + + def iter_arrow(self, chunk_size: int) -> Generator[ArrowTable, None, None]: + if not chunk_size: + yield self.native_cursor.fetch_arrow_table() + return + # iterate + try: + yield from self.native_cursor.fetch_record_batch(chunk_size) + except StopIteration: + pass class DuckDbSqlClient(SqlClientBase[duckdb.DuckDBPyConnection], DBTransaction): diff --git a/dlt/destinations/impl/filesystem/filesystem.py b/dlt/destinations/impl/filesystem/filesystem.py index 3f2f793559..d6d9865a06 100644 --- a/dlt/destinations/impl/filesystem/filesystem.py +++ b/dlt/destinations/impl/filesystem/filesystem.py @@ -1,9 +1,23 @@ import posixpath import os import base64 - +from contextlib import contextmanager from types import TracebackType -from typing import Dict, List, Type, Iterable, Iterator, Optional, Tuple, Sequence, cast, Any +from typing import ( + ContextManager, + List, + Type, + Iterable, + Iterator, + Optional, + Tuple, + Sequence, + cast, + Generator, + Literal, + Any, + Dict, +) from fsspec import AbstractFileSystem from contextlib import contextmanager @@ -23,10 +37,12 @@ TPipelineStateDoc, load_package as current_load_package, ) +from dlt.destinations.sql_client import DBApiCursor, WithSqlClient, SqlClientBase from dlt.common.destination import DestinationCapabilitiesContext from dlt.common.destination.reference import ( FollowupJobRequest, PreparedTableSchema, + SupportsReadableRelation, TLoadJobState, RunnableLoadJob, JobClientBase, @@ -38,6 +54,7 @@ LoadJob, ) from dlt.common.destination.exceptions import DestinationUndefinedEntity + from dlt.destinations.job_impl import ( ReferenceFollowupJobRequest, FinalizedLoadJob, @@ -46,6 +63,7 @@ from dlt.destinations.impl.filesystem.configuration import FilesystemDestinationClientConfiguration from dlt.destinations import path_utils from dlt.destinations.fs_client import FSClientBase +from dlt.destinations.dataset import ReadableDBAPIDataset from dlt.destinations.utils import verify_schema_merge_disposition INIT_FILE_NAME = "init" @@ -209,7 +227,9 @@ def create_followup_jobs(self, final_state: TLoadJobState) -> List[FollowupJobRe return jobs -class FilesystemClient(FSClientBase, JobClientBase, WithStagingDataset, WithStateSync): +class FilesystemClient( + FSClientBase, WithSqlClient, JobClientBase, WithStagingDataset, WithStateSync +): """filesystem client storing jobs in memory""" fs_client: AbstractFileSystem @@ -238,6 +258,21 @@ def __init__( # cannot be replaced and we cannot initialize folders consistently self.table_prefix_layout = path_utils.get_table_prefix_layout(config.layout) self.dataset_name = self.config.normalize_dataset_name(self.schema) + self._sql_client: SqlClientBase[Any] = None + + @property + def sql_client(self) -> SqlClientBase[Any]: + # we use an inner import here, since the sql client depends on duckdb and will + # only be used for read access on data, some users will not need the dependency + from dlt.destinations.impl.filesystem.sql_client import FilesystemSqlClient + + if not self._sql_client: + self._sql_client = FilesystemSqlClient(self) + return self._sql_client + + @sql_client.setter + def sql_client(self, client: SqlClientBase[Any]) -> None: + self._sql_client = client def drop_storage(self) -> None: if self.is_storage_initialized(): diff --git a/dlt/destinations/impl/filesystem/sql_client.py b/dlt/destinations/impl/filesystem/sql_client.py new file mode 100644 index 0000000000..87aa254e96 --- /dev/null +++ b/dlt/destinations/impl/filesystem/sql_client.py @@ -0,0 +1,279 @@ +from typing import Any, Iterator, AnyStr, List, cast, TYPE_CHECKING, Dict + +import os +import re + +import dlt + +import duckdb + +import sqlglot +import sqlglot.expressions as exp +from dlt.common import logger + +from contextlib import contextmanager + +from dlt.common.destination.reference import DBApiCursor +from dlt.common.destination.typing import PreparedTableSchema + +from dlt.destinations.sql_client import raise_database_error + +from dlt.destinations.impl.duckdb.sql_client import DuckDbSqlClient +from dlt.destinations.impl.duckdb.factory import duckdb as duckdb_factory, DuckDbCredentials +from dlt.common.configuration.specs import ( + AwsCredentials, + AzureServicePrincipalCredentialsWithoutDefaults, + AzureCredentialsWithoutDefaults, +) + +SUPPORTED_PROTOCOLS = ["gs", "gcs", "s3", "file", "memory", "az", "abfss"] + +if TYPE_CHECKING: + from dlt.destinations.impl.filesystem.filesystem import FilesystemClient +else: + FilesystemClient = Any + + +class FilesystemSqlClient(DuckDbSqlClient): + memory_db: duckdb.DuckDBPyConnection = None + """Internally created in-mem database in case external is not provided""" + + def __init__( + self, + fs_client: FilesystemClient, + dataset_name: str = None, + credentials: DuckDbCredentials = None, + ) -> None: + # if no credentials are passed from the outside + # we know to keep an in memory instance here + if not credentials: + self.memory_db = duckdb.connect(":memory:") + credentials = DuckDbCredentials(self.memory_db) + + super().__init__( + dataset_name=dataset_name or fs_client.dataset_name, + staging_dataset_name=None, + credentials=credentials, + capabilities=duckdb_factory()._raw_capabilities(), + ) + self.fs_client = fs_client + + if self.fs_client.config.protocol not in SUPPORTED_PROTOCOLS: + raise NotImplementedError( + f"Protocol {self.fs_client.config.protocol} currently not supported for" + f" FilesystemSqlClient. Supported protocols are {SUPPORTED_PROTOCOLS}." + ) + + def _create_default_secret_name(self) -> str: + regex = re.compile("[^a-zA-Z]") + escaped_bucket_name = regex.sub("", self.fs_client.config.bucket_url.lower()) + return f"secret_{escaped_bucket_name}" + + def drop_authentication(self, secret_name: str = None) -> None: + if not secret_name: + secret_name = self._create_default_secret_name() + self._conn.sql(f"DROP PERSISTENT SECRET IF EXISTS {secret_name}") + + def create_authentication(self, persistent: bool = False, secret_name: str = None) -> None: + if not secret_name: + secret_name = self._create_default_secret_name() + + persistent_stmt = "" + if persistent: + persistent_stmt = " PERSISTENT " + + # abfss buckets have an @ compontent + scope = self.fs_client.config.bucket_url + if "@" in scope: + scope = scope.split("@")[0] + + # add secrets required for creating views + if self.fs_client.config.protocol == "s3": + aws_creds = cast(AwsCredentials, self.fs_client.config.credentials) + endpoint = ( + aws_creds.endpoint_url.replace("https://", "") + if aws_creds.endpoint_url + else "s3.amazonaws.com" + ) + self._conn.sql(f""" + CREATE OR REPLACE {persistent_stmt} SECRET {secret_name} ( + TYPE S3, + KEY_ID '{aws_creds.aws_access_key_id}', + SECRET '{aws_creds.aws_secret_access_key}', + REGION '{aws_creds.region_name}', + ENDPOINT '{endpoint}', + SCOPE '{scope}' + );""") + + # azure with storage account creds + elif self.fs_client.config.protocol in ["az", "abfss"] and isinstance( + self.fs_client.config.credentials, AzureCredentialsWithoutDefaults + ): + azsa_creds = self.fs_client.config.credentials + self._conn.sql(f""" + CREATE OR REPLACE {persistent_stmt} SECRET {secret_name} ( + TYPE AZURE, + CONNECTION_STRING 'AccountName={azsa_creds.azure_storage_account_name};AccountKey={azsa_creds.azure_storage_account_key}', + SCOPE '{scope}' + );""") + + # azure with service principal creds + elif self.fs_client.config.protocol in ["az", "abfss"] and isinstance( + self.fs_client.config.credentials, AzureServicePrincipalCredentialsWithoutDefaults + ): + azsp_creds = self.fs_client.config.credentials + self._conn.sql(f""" + CREATE OR REPLACE {persistent_stmt} SECRET {secret_name} ( + TYPE AZURE, + PROVIDER SERVICE_PRINCIPAL, + TENANT_ID '{azsp_creds.azure_tenant_id}', + CLIENT_ID '{azsp_creds.azure_client_id}', + CLIENT_SECRET '{azsp_creds.azure_client_secret}', + ACCOUNT_NAME '{azsp_creds.azure_storage_account_name}', + SCOPE '{scope}' + );""") + elif persistent: + raise Exception( + "Cannot create persistent secret for filesystem protocol" + f" {self.fs_client.config.protocol}. If you are trying to use persistent secrets" + " with gs/gcs, please use the s3 compatibility layer." + ) + + # native google storage implementation is not supported.. + elif self.fs_client.config.protocol in ["gs", "gcs"]: + logger.warn( + "For gs/gcs access via duckdb please use the gs/gcs s3 compatibility layer. Falling" + " back to fsspec." + ) + self._conn.register_filesystem(self.fs_client.fs_client) + + # for memory we also need to register filesystem + elif self.fs_client.config.protocol == "memory": + self._conn.register_filesystem(self.fs_client.fs_client) + + # the line below solves problems with certificate path lookup on linux + # see duckdb docs + if self.fs_client.config.protocol in ["az", "abfss"]: + self._conn.sql("SET azure_transport_option_type = 'curl';") + + def open_connection(self) -> duckdb.DuckDBPyConnection: + # we keep the in memory instance around, so if this prop is set, return it + first_connection = self.credentials.has_open_connection + super().open_connection() + + if first_connection: + # set up dataset + if not self.has_dataset(): + self.create_dataset() + self._conn.sql(f"USE {self.fully_qualified_dataset_name()}") + + # create authentication to data provider + self.create_authentication() + + return self._conn + + @raise_database_error + def create_views_for_tables(self, tables: Dict[str, str]) -> None: + """Add the required tables as views to the duckdb in memory instance""" + + # create all tables in duck instance + for table_name in tables.keys(): + view_name = tables[table_name] + + if table_name not in self.fs_client.schema.tables: + # unknown views will not be created + continue + + # only create view if it does not exist in the current schema yet + existing_tables = [tname[0] for tname in self._conn.execute("SHOW TABLES").fetchall()] + if view_name in existing_tables: + continue + + # discover file type + schema_table = cast(PreparedTableSchema, self.fs_client.schema.tables[table_name]) + folder = self.fs_client.get_table_dir(table_name) + files = self.fs_client.list_table_files(table_name) + first_file_type = os.path.splitext(files[0])[1][1:] + + # build files string + supports_wildcard_notation = self.fs_client.config.protocol != "abfss" + protocol = ( + "" if self.fs_client.is_local_filesystem else f"{self.fs_client.config.protocol}://" + ) + resolved_folder = f"{protocol}{folder}" + resolved_files_string = f"'{resolved_folder}/**/*.{first_file_type}'" + if not supports_wildcard_notation: + resolved_files_string = ",".join(map(lambda f: f"'{protocol}{f}'", files)) + + # build columns definition + type_mapper = self.capabilities.get_type_mapper() + columns = ",".join( + map( + lambda c: ( + f'{self.escape_column_name(c["name"])}:' + f' "{type_mapper.to_destination_type(c, schema_table)}"' + ), + self.fs_client.schema.tables[table_name]["columns"].values(), + ) + ) + + # discover wether compression is enabled + compression = ( + "" + if dlt.config.get("data_writer.disable_compression") + else ", compression = 'gzip'" + ) + + # dlt tables are never compressed for now... + if table_name in self.fs_client.schema.dlt_table_names(): + compression = "" + + # create from statement + from_statement = "" + if schema_table.get("table_format") == "delta": + from_statement = f"delta_scan('{resolved_folder}')" + elif first_file_type == "parquet": + from_statement = f"read_parquet([{resolved_files_string}])" + elif first_file_type == "jsonl": + from_statement = ( + f"read_json([{resolved_files_string}], columns = {{{columns}}}) {compression}" + ) + else: + raise NotImplementedError( + f"Unknown filetype {first_file_type} for table {table_name}. Currently only" + " jsonl and parquet files as well as delta tables are supported." + ) + + # create table + view_name = self.make_qualified_table_name(view_name) + create_table_sql_base = f"CREATE VIEW {view_name} AS SELECT * FROM {from_statement}" + self._conn.execute(create_table_sql_base) + + @contextmanager + @raise_database_error + def execute_query(self, query: AnyStr, *args: Any, **kwargs: Any) -> Iterator[DBApiCursor]: + # skip parametrized queries, we could also render them but currently user is not able to + # do parametrized queries via dataset interface + if not args and not kwargs: + # find all tables to preload + expression = sqlglot.parse_one(query, read="duckdb") # type: ignore + load_tables: Dict[str, str] = {} + for table in expression.find_all(exp.Table): + # sqlglot has tables without tables ie. schemas are tables + if not table.this: + continue + schema = table.db + # add only tables from the dataset schema + if not schema or schema.lower() == self.dataset_name.lower(): + load_tables[table.name] = table.name + + if load_tables: + self.create_views_for_tables(load_tables) + + with super().execute_query(query, *args, **kwargs) as cursor: + yield cursor + + def __del__(self) -> None: + if self.memory_db: + self.memory_db.close() + self.memory_db = None diff --git a/dlt/destinations/impl/mssql/mssql.py b/dlt/destinations/impl/mssql/mssql.py index 9eabfcf392..27aebe07f2 100644 --- a/dlt/destinations/impl/mssql/mssql.py +++ b/dlt/destinations/impl/mssql/mssql.py @@ -1,6 +1,9 @@ from typing import Dict, Optional, Sequence, List, Any -from dlt.common.destination.reference import FollowupJobRequest, PreparedTableSchema +from dlt.common.destination.reference import ( + FollowupJobRequest, + PreparedTableSchema, +) from dlt.common.destination import DestinationCapabilitiesContext from dlt.common.schema import TColumnSchema, TColumnHint, Schema from dlt.common.schema.typing import TColumnType diff --git a/dlt/destinations/impl/mssql/sql_client.py b/dlt/destinations/impl/mssql/sql_client.py index e1b51743f5..6ec2beb95e 100644 --- a/dlt/destinations/impl/mssql/sql_client.py +++ b/dlt/destinations/impl/mssql/sql_client.py @@ -13,7 +13,7 @@ DatabaseTransientException, DatabaseUndefinedRelation, ) -from dlt.destinations.typing import DBApi, DBApiCursor, DBTransaction +from dlt.destinations.typing import DBApi, DBTransaction from dlt.destinations.sql_client import ( DBApiCursorImpl, SqlClientBase, @@ -22,6 +22,7 @@ ) from dlt.destinations.impl.mssql.configuration import MsSqlCredentials +from dlt.common.destination.reference import DBApiCursor def handle_datetimeoffset(dto_value: bytes) -> datetime: diff --git a/dlt/destinations/impl/postgres/sql_client.py b/dlt/destinations/impl/postgres/sql_client.py index d867248196..a97c8511f1 100644 --- a/dlt/destinations/impl/postgres/sql_client.py +++ b/dlt/destinations/impl/postgres/sql_client.py @@ -17,7 +17,8 @@ DatabaseTransientException, DatabaseUndefinedRelation, ) -from dlt.destinations.typing import DBApi, DBApiCursor, DBTransaction +from dlt.common.destination.reference import DBApiCursor +from dlt.destinations.typing import DBApi, DBTransaction from dlt.destinations.sql_client import ( DBApiCursorImpl, SqlClientBase, diff --git a/dlt/destinations/impl/snowflake/sql_client.py b/dlt/destinations/impl/snowflake/sql_client.py index 8d11c23363..e52c5424d3 100644 --- a/dlt/destinations/impl/snowflake/sql_client.py +++ b/dlt/destinations/impl/snowflake/sql_client.py @@ -16,8 +16,9 @@ raise_database_error, raise_open_connection_error, ) -from dlt.destinations.typing import DBApi, DBApiCursor, DBTransaction, DataFrame +from dlt.destinations.typing import DBApi, DBTransaction, DataFrame from dlt.destinations.impl.snowflake.configuration import SnowflakeCredentials +from dlt.common.destination.reference import DBApiCursor class SnowflakeCursorImpl(DBApiCursorImpl): diff --git a/dlt/destinations/impl/sqlalchemy/db_api_client.py b/dlt/destinations/impl/sqlalchemy/db_api_client.py index 7bc64240e1..a407e53d70 100644 --- a/dlt/destinations/impl/sqlalchemy/db_api_client.py +++ b/dlt/destinations/impl/sqlalchemy/db_api_client.py @@ -17,6 +17,7 @@ import sqlalchemy as sa from sqlalchemy.engine import Connection +from sqlalchemy.exc import ResourceClosedError from dlt.common.destination import DestinationCapabilitiesContext from dlt.common.destination.reference import PreparedTableSchema @@ -27,11 +28,13 @@ LoadClientNotConnected, DatabaseException, ) -from dlt.destinations.typing import DBTransaction, DBApiCursor -from dlt.destinations.sql_client import SqlClientBase, DBApiCursorImpl +from dlt.common.destination.reference import DBApiCursor +from dlt.destinations.typing import DBTransaction +from dlt.destinations.sql_client import SqlClientBase from dlt.destinations.impl.sqlalchemy.configuration import SqlalchemyCredentials from dlt.destinations.impl.sqlalchemy.alter_table import MigrationMaker from dlt.common.typing import TFun +from dlt.destinations.sql_client import DBApiCursorImpl class SqlaTransactionWrapper(DBTransaction): @@ -77,8 +80,14 @@ def __init__(self, curr: sa.engine.CursorResult) -> None: self.fetchone = curr.fetchone # type: ignore[assignment] self.fetchmany = curr.fetchmany # type: ignore[assignment] + self.set_default_schema_columns() + def _get_columns(self) -> List[str]: - return list(self.native_cursor.keys()) # type: ignore[attr-defined] + try: + return list(self.native_cursor.keys()) # type: ignore[attr-defined] + except ResourceClosedError: + # this happens if now rows are returned + return [] # @property # def description(self) -> Any: diff --git a/dlt/destinations/job_client_impl.py b/dlt/destinations/job_client_impl.py index 0ddded98b6..0fca64d7ba 100644 --- a/dlt/destinations/job_client_impl.py +++ b/dlt/destinations/job_client_impl.py @@ -14,9 +14,11 @@ Type, Iterable, Iterator, + Generator, ) import zlib import re +from contextlib import contextmanager from dlt.common import pendulum, logger from dlt.common.json import json @@ -41,6 +43,7 @@ PreparedTableSchema, StateInfo, StorageSchemaInfo, + SupportsReadableDataset, WithStateSync, DestinationClientConfiguration, DestinationClientDwhConfiguration, @@ -51,7 +54,9 @@ JobClientBase, HasFollowupJobs, CredentialsConfiguration, + SupportsReadableRelation, ) +from dlt.destinations.dataset import ReadableDBAPIDataset from dlt.destinations.exceptions import DatabaseUndefinedRelation from dlt.destinations.job_impl import ( @@ -59,7 +64,7 @@ ) from dlt.destinations.sql_jobs import SqlMergeFollowupJob, SqlStagingCopyFollowupJob from dlt.destinations.typing import TNativeConn -from dlt.destinations.sql_client import SqlClientBase +from dlt.destinations.sql_client import SqlClientBase, WithSqlClient from dlt.destinations.utils import ( get_pipeline_state_query_columns, info_schema_null_to_bool, @@ -123,7 +128,7 @@ def __init__( self._bucket_path = ReferenceFollowupJobRequest.resolve_reference(file_path) -class SqlJobClientBase(JobClientBase, WithStateSync): +class SqlJobClientBase(WithSqlClient, JobClientBase, WithStateSync): INFO_TABLES_QUERY_THRESHOLD: ClassVar[int] = 1000 """Fallback to querying all tables in the information schema if checking more than threshold""" @@ -153,6 +158,14 @@ def __init__( assert isinstance(config, DestinationClientDwhConfiguration) self.config: DestinationClientDwhConfiguration = config + @property + def sql_client(self) -> SqlClientBase[TNativeConn]: + return self._sql_client + + @sql_client.setter + def sql_client(self, client: SqlClientBase[TNativeConn]) -> None: + self._sql_client = client + def drop_storage(self) -> None: self.sql_client.drop_dataset() diff --git a/dlt/destinations/sql_client.py b/dlt/destinations/sql_client.py index 96f18cea3d..51f3211f1b 100644 --- a/dlt/destinations/sql_client.py +++ b/dlt/destinations/sql_client.py @@ -16,18 +16,29 @@ Type, AnyStr, List, + Generator, TypedDict, + cast, ) from dlt.common.typing import TFun +from dlt.common.schema.typing import TTableSchemaColumns from dlt.common.destination import DestinationCapabilitiesContext from dlt.common.utils import concat_strings_with_limit +from dlt.common.destination.reference import JobClientBase from dlt.destinations.exceptions import ( DestinationConnectionError, LoadClientNotConnected, ) -from dlt.destinations.typing import DBApi, TNativeConn, DBApiCursor, DataFrame, DBTransaction +from dlt.destinations.typing import ( + DBApi, + TNativeConn, + DataFrame, + DBTransaction, + ArrowTable, +) +from dlt.common.destination.reference import DBApiCursor class TJobQueryTags(TypedDict): @@ -292,6 +303,20 @@ def _truncate_table_sql(self, qualified_table_name: str) -> str: return f"DELETE FROM {qualified_table_name} WHERE 1=1;" +class WithSqlClient(JobClientBase): + @property + @abstractmethod + def sql_client(self) -> SqlClientBase[TNativeConn]: ... + + def __enter__(self) -> "WithSqlClient": + return self + + def __exit__( + self, exc_type: Type[BaseException], exc_val: BaseException, exc_tb: TracebackType + ) -> None: + pass + + class DBApiCursorImpl(DBApiCursor): """A DBApi Cursor wrapper with dataframes reading functionality""" @@ -304,11 +329,20 @@ def __init__(self, curr: DBApiCursor) -> None: self.fetchmany = curr.fetchmany # type: ignore self.fetchone = curr.fetchone # type: ignore + self.set_default_schema_columns() + def __getattr__(self, name: str) -> Any: return getattr(self.native_cursor, name) def _get_columns(self) -> List[str]: - return [c[0] for c in self.native_cursor.description] + if self.native_cursor.description: + return [c[0] for c in self.native_cursor.description] + return [] + + def set_default_schema_columns(self) -> None: + self.schema_columns = cast( + TTableSchemaColumns, {c: {"name": c, "nullable": True} for c in self._get_columns()} + ) def df(self, chunk_size: int = None, **kwargs: Any) -> Optional[DataFrame]: """Fetches results as data frame in full or in specified chunks. @@ -316,18 +350,55 @@ def df(self, chunk_size: int = None, **kwargs: Any) -> Optional[DataFrame]: May use native pandas/arrow reader if available. Depending on the native implementation chunk size may vary. """ - from dlt.common.libs.pandas_sql import _wrap_result + try: + return next(self.iter_df(chunk_size=chunk_size)) + except StopIteration: + return None - columns = self._get_columns() - if chunk_size is None: - return _wrap_result(self.native_cursor.fetchall(), columns, **kwargs) - else: - df = _wrap_result(self.native_cursor.fetchmany(chunk_size), columns, **kwargs) - # if no rows return None - if df.shape[0] == 0: - return None - else: - return df + def arrow(self, chunk_size: int = None, **kwargs: Any) -> Optional[ArrowTable]: + """Fetches results as data frame in full or in specified chunks. + + May use native pandas/arrow reader if available. Depending on + the native implementation chunk size may vary. + """ + try: + return next(self.iter_arrow(chunk_size=chunk_size)) + except StopIteration: + return None + + def iter_fetch(self, chunk_size: int) -> Generator[List[Tuple[Any, ...]], Any, Any]: + while True: + if not (result := self.fetchmany(chunk_size)): + return + yield result + + def iter_df(self, chunk_size: int) -> Generator[DataFrame, None, None]: + """Default implementation converts arrow to df""" + from dlt.common.libs.pandas import pandas as pd + + for table in self.iter_arrow(chunk_size=chunk_size): + # NOTE: we go via arrow table, types are created for arrow is columns are known + # https://github.com/apache/arrow/issues/38644 for reference on types_mapper + yield table.to_pandas() + + def iter_arrow(self, chunk_size: int) -> Generator[ArrowTable, None, None]: + """Default implementation converts query result to arrow table""" + from dlt.common.libs.pyarrow import row_tuples_to_arrow + from dlt.common.configuration.container import Container + + # get capabilities of possibly currently active pipeline + caps = ( + Container().get(DestinationCapabilitiesContext) + or DestinationCapabilitiesContext.generic_capabilities() + ) + + if not chunk_size: + result = self.fetchall() + yield row_tuples_to_arrow(result, caps, self.schema_columns, tz="UTC") + return + + for result in self.iter_fetch(chunk_size=chunk_size): + yield row_tuples_to_arrow(result, caps, self.schema_columns, tz="UTC") def raise_database_error(f: TFun) -> TFun: diff --git a/dlt/destinations/typing.py b/dlt/destinations/typing.py index 99ffed01fd..c809bf3230 100644 --- a/dlt/destinations/typing.py +++ b/dlt/destinations/typing.py @@ -1,17 +1,22 @@ -from typing import Any, AnyStr, List, Type, Optional, Protocol, Tuple, TypeVar +from typing import Any, AnyStr, List, Type, Optional, Protocol, Tuple, TypeVar, Generator + + +# native connection +TNativeConn = TypeVar("TNativeConn", bound=Any) try: from pandas import DataFrame except ImportError: DataFrame: Type[Any] = None # type: ignore -# native connection -TNativeConn = TypeVar("TNativeConn", bound=Any) +try: + from pyarrow import Table as ArrowTable +except ImportError: + ArrowTable: Type[Any] = None # type: ignore class DBTransaction(Protocol): def commit_transaction(self) -> None: ... - def rollback_transaction(self) -> None: ... @@ -19,34 +24,3 @@ class DBApi(Protocol): threadsafety: int apilevel: str paramstyle: str - - -class DBApiCursor(Protocol): - """Protocol for DBAPI cursor""" - - description: Tuple[Any, ...] - - native_cursor: "DBApiCursor" - """Cursor implementation native to current destination""" - - def execute(self, query: AnyStr, *args: Any, **kwargs: Any) -> None: ... - def fetchall(self) -> List[Tuple[Any, ...]]: ... - def fetchmany(self, size: int = ...) -> List[Tuple[Any, ...]]: ... - def fetchone(self) -> Optional[Tuple[Any, ...]]: ... - def close(self) -> None: ... - - def df(self, chunk_size: int = None, **kwargs: None) -> Optional[DataFrame]: - """Fetches the results as data frame. For large queries the results may be chunked - - Fetches the results into a data frame. The default implementation uses helpers in `pandas.io.sql` to generate Pandas data frame. - This function will try to use native data frame generation for particular destination. For `BigQuery`: `QueryJob.to_dataframe` is used. - For `duckdb`: `DuckDBPyConnection.df' - - Args: - chunk_size (int, optional): Will chunk the results into several data frames. Defaults to None - **kwargs (Any): Additional parameters which will be passed to native data frame generation function. - - Returns: - Optional[DataFrame]: A data frame with query results. If chunk_size > 0, None will be returned if there is no more data in results - """ - ... diff --git a/dlt/helpers/streamlit_app/pages/load_info.py b/dlt/helpers/streamlit_app/pages/load_info.py index ee13cf2531..699e786410 100644 --- a/dlt/helpers/streamlit_app/pages/load_info.py +++ b/dlt/helpers/streamlit_app/pages/load_info.py @@ -27,7 +27,7 @@ def write_load_status_page(pipeline: Pipeline) -> None: ) if loads_df is not None: - selected_load_id = st.selectbox("Select load id", loads_df) + selected_load_id: str = st.selectbox("Select load id", loads_df) schema = pipeline.default_schema st.markdown("**Number of loaded rows:**") diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index 54e576b5fc..39ccde42d9 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -16,6 +16,7 @@ get_type_hints, ContextManager, Dict, + Literal, ) from dlt import version @@ -82,6 +83,7 @@ DestinationClientStagingConfiguration, DestinationClientStagingConfiguration, DestinationClientDwhWithStagingConfiguration, + SupportsReadableDataset, ) from dlt.common.normalizers.naming import NamingConvention from dlt.common.pipeline import ( @@ -108,9 +110,10 @@ from dlt.extract.extract import Extract, data_to_sources from dlt.normalize import Normalize from dlt.normalize.configuration import NormalizeConfiguration -from dlt.destinations.sql_client import SqlClientBase +from dlt.destinations.sql_client import SqlClientBase, WithSqlClient from dlt.destinations.fs_client import FSClientBase from dlt.destinations.job_client_impl import SqlJobClientBase +from dlt.destinations.dataset import ReadableDBAPIDataset from dlt.load.configuration import LoaderConfiguration from dlt.load import Load @@ -444,6 +447,7 @@ def extract( workers, refresh=refresh or self.refresh, ) + # this will update state version hash so it will not be extracted again by with_state_sync self._bump_version_and_extract_state( self._container[StateInjectableContext].state, @@ -1005,7 +1009,12 @@ def sql_client(self, schema_name: str = None) -> SqlClientBase[Any]: # "Sql Client is not available in a pipeline without a default schema. Extract some data first or restore the pipeline from the destination using 'restore_from_destination' flag. There's also `_inject_schema` method for advanced users." # ) schema = self._get_schema_or_create(schema_name) - return self._sql_job_client(schema).sql_client + client_config = self._get_destination_client_initial_config() + client = self._get_destination_clients(schema, client_config)[0] + if isinstance(client, WithSqlClient): + return client.sql_client + else: + raise SqlClientNotAvailable(self.pipeline_name, self.destination.destination_name) def _fs_client(self, schema_name: str = None) -> FSClientBase: """Returns a filesystem client configured to point to the right folder / bucket for each table. @@ -1707,3 +1716,11 @@ def _save_state(self, state: TPipelineState) -> None: def __getstate__(self) -> Any: # pickle only the SupportsPipeline protocol fields return {"pipeline_name": self.pipeline_name} + + def _dataset(self, dataset_type: Literal["dbapi", "ibis"] = "dbapi") -> SupportsReadableDataset: + """Access helper to dataset""" + if dataset_type == "dbapi": + return ReadableDBAPIDataset( + self.sql_client(), schema=self.default_schema if self.default_schema_name else None + ) + raise NotImplementedError(f"Dataset of type {dataset_type} not implemented") diff --git a/dlt/sources/sql_database/arrow_helpers.py b/dlt/sources/sql_database/arrow_helpers.py index 898d8c3280..1f72205a2a 100644 --- a/dlt/sources/sql_database/arrow_helpers.py +++ b/dlt/sources/sql_database/arrow_helpers.py @@ -1,150 +1,25 @@ -from typing import Any, Sequence, Optional +from typing import Any, Sequence from dlt.common.schema.typing import TTableSchemaColumns -from dlt.common import logger, json + from dlt.common.configuration import with_config from dlt.common.destination import DestinationCapabilitiesContext -from dlt.common.json import custom_encode, map_nested_in_place - -from .schema_types import RowAny +from dlt.common.libs.pyarrow import ( + row_tuples_to_arrow as _row_tuples_to_arrow, +) @with_config -def columns_to_arrow( - columns_schema: TTableSchemaColumns, +def row_tuples_to_arrow( + rows: Sequence[Any], caps: DestinationCapabilitiesContext = None, - tz: str = "UTC", + columns: TTableSchemaColumns = None, + tz: str = None, ) -> Any: """Converts `column_schema` to arrow schema using `caps` and `tz`. `caps` are injected from the container - which is always the case if run within the pipeline. This will generate arrow schema compatible with the destination. Otherwise generic capabilities are used """ - from dlt.common.libs.pyarrow import pyarrow as pa, get_py_arrow_datatype - from dlt.common.destination.capabilities import DestinationCapabilitiesContext - - return pa.schema( - [ - pa.field( - name, - get_py_arrow_datatype( - schema_item, - caps or DestinationCapabilitiesContext.generic_capabilities(), - tz, - ), - nullable=schema_item.get("nullable", True), - ) - for name, schema_item in columns_schema.items() - if schema_item.get("data_type") is not None - ] + return _row_tuples_to_arrow( + rows, caps or DestinationCapabilitiesContext.generic_capabilities(), columns, tz ) - - -def row_tuples_to_arrow(rows: Sequence[RowAny], columns: TTableSchemaColumns, tz: str) -> Any: - """Converts the rows to an arrow table using the columns schema. - Columns missing `data_type` will be inferred from the row data. - Columns with object types not supported by arrow are excluded from the resulting table. - """ - from dlt.common.libs.pyarrow import pyarrow as pa - import numpy as np - - try: - from pandas._libs import lib - - pivoted_rows = lib.to_object_array_tuples(rows).T - except ImportError: - logger.info( - "Pandas not installed, reverting to numpy.asarray to create a table which is slower" - ) - pivoted_rows = np.asarray(rows, dtype="object", order="k").T # type: ignore[call-overload] - - columnar = { - col: dat.ravel() for col, dat in zip(columns, np.vsplit(pivoted_rows, len(columns))) - } - columnar_known_types = { - col["name"]: columnar[col["name"]] - for col in columns.values() - if col.get("data_type") is not None - } - columnar_unknown_types = { - col["name"]: columnar[col["name"]] - for col in columns.values() - if col.get("data_type") is None - } - - arrow_schema = columns_to_arrow(columns, tz=tz) - - for idx in range(0, len(arrow_schema.names)): - field = arrow_schema.field(idx) - py_type = type(rows[0][idx]) - # cast double / float ndarrays to decimals if type mismatch, looks like decimals and floats are often mixed up in dialects - if pa.types.is_decimal(field.type) and issubclass(py_type, (str, float)): - logger.warning( - f"Field {field.name} was reflected as decimal type, but rows contains" - f" {py_type.__name__}. Additional cast is required which may slow down arrow table" - " generation." - ) - float_array = pa.array(columnar_known_types[field.name], type=pa.float64()) - columnar_known_types[field.name] = float_array.cast(field.type, safe=False) - if issubclass(py_type, (dict, list)): - logger.warning( - f"Field {field.name} was reflected as JSON type and needs to be serialized back to" - " string to be placed in arrow table. This will slow data extraction down. You" - " should cast JSON field to STRING in your database system ie. by creating and" - " extracting an SQL VIEW that selects with cast." - ) - json_str_array = pa.array( - [None if s is None else json.dumps(s) for s in columnar_known_types[field.name]] - ) - columnar_known_types[field.name] = json_str_array - - # If there are unknown type columns, first create a table to infer their types - if columnar_unknown_types: - new_schema_fields = [] - for key in list(columnar_unknown_types): - arrow_col: Optional[pa.Array] = None - try: - arrow_col = pa.array(columnar_unknown_types[key]) - if pa.types.is_null(arrow_col.type): - logger.warning( - f"Column {key} contains only NULL values and data type could not be" - " inferred. This column is removed from a arrow table" - ) - continue - - except pa.ArrowInvalid as e: - # Try coercing types not supported by arrow to a json friendly format - # E.g. dataclasses -> dict, UUID -> str - try: - arrow_col = pa.array( - map_nested_in_place(custom_encode, list(columnar_unknown_types[key])) - ) - logger.warning( - f"Column {key} contains a data type which is not supported by pyarrow and" - f" got converted into {arrow_col.type}. This slows down arrow table" - " generation." - ) - except (pa.ArrowInvalid, TypeError): - logger.warning( - f"Column {key} contains a data type which is not supported by pyarrow. This" - f" column will be ignored. Error: {e}" - ) - if arrow_col is not None: - columnar_known_types[key] = arrow_col - new_schema_fields.append( - pa.field( - key, - arrow_col.type, - nullable=columns[key]["nullable"], - ) - ) - - # New schema - column_order = {name: idx for idx, name in enumerate(columns)} - arrow_schema = pa.schema( - sorted( - list(arrow_schema) + new_schema_fields, - key=lambda x: column_order[x.name], - ) - ) - - return pa.Table.from_pydict(columnar_known_types, schema=arrow_schema) diff --git a/dlt/sources/sql_database/helpers.py b/dlt/sources/sql_database/helpers.py index 1d758fe882..fccd59831e 100644 --- a/dlt/sources/sql_database/helpers.py +++ b/dlt/sources/sql_database/helpers.py @@ -146,7 +146,7 @@ def _load_rows(self, query: SelectAny, backend_kwargs: Dict[str, Any]) -> TDataI yield df elif self.backend == "pyarrow": yield row_tuples_to_arrow( - partition, self.columns, tz=backend_kwargs.get("tz", "UTC") + partition, columns=self.columns, tz=backend_kwargs.get("tz", "UTC") ) def _load_rows_connectorx( diff --git a/docs/website/docs/dlt-ecosystem/transformations/pandas.md b/docs/website/docs/dlt-ecosystem/transformations/pandas.md index 4125e4e114..cda4855268 100644 --- a/docs/website/docs/dlt-ecosystem/transformations/pandas.md +++ b/docs/website/docs/dlt-ecosystem/transformations/pandas.md @@ -22,7 +22,7 @@ with pipeline.sql_client() as client: with client.execute_query( 'SELECT "reactions__+1", "reactions__-1", reactions__laugh, reactions__hooray, reactions__rocket FROM issues' ) as table: - # calling `df` on a cursor returns the data as a data frame + # calling `df` on a cursor, returns the data as a pandas data frame reactions = table.df() counts = reactions.sum(0).sort_values(0, ascending=False) ``` diff --git a/docs/website/docs/dlt-ecosystem/visualizations/exploring-the-data.md b/docs/website/docs/dlt-ecosystem/visualizations/exploring-the-data.md index 65c937ef77..2d7a7642c2 100644 --- a/docs/website/docs/dlt-ecosystem/visualizations/exploring-the-data.md +++ b/docs/website/docs/dlt-ecosystem/visualizations/exploring-the-data.md @@ -65,7 +65,7 @@ with pipeline.sql_client() as client: with client.execute_query( 'SELECT "reactions__+1", "reactions__-1", reactions__laugh, reactions__hooray, reactions__rocket FROM issues' ) as table: - # calling `df` on a cursor returns the data as a DataFrame + # calling `df` on a cursor, returns the data as a pandas DataFrame reactions = table.df() counts = reactions.sum(0).sort_values(0, ascending=False) ``` diff --git a/poetry.lock b/poetry.lock index 12c0d75d1e..25f9164c0c 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand. [[package]] name = "about-time" @@ -2194,6 +2194,23 @@ urllib3 = ">=1.26" alembic = ["alembic (>=1.0.11,<2.0.0)", "sqlalchemy (>=2.0.21)"] sqlalchemy = ["sqlalchemy (>=2.0.21)"] +[[package]] +name = "db-dtypes" +version = "1.3.0" +description = "Pandas Data Types for SQL systems (BigQuery, Spanner)" +optional = true +python-versions = ">=3.7" +files = [ + {file = "db_dtypes-1.3.0-py2.py3-none-any.whl", hash = "sha256:7e65c59f849ccbe6f7bc4d0253edcc212a7907662906921caba3e4aadd0bc277"}, + {file = "db_dtypes-1.3.0.tar.gz", hash = "sha256:7bcbc8858b07474dc85b77bb2f3ae488978d1336f5ea73b58c39d9118bc3e91b"}, +] + +[package.dependencies] +numpy = ">=1.16.6" +packaging = ">=17.0" +pandas = ">=0.24.2" +pyarrow = ">=3.0.0" + [[package]] name = "dbt-athena-community" version = "1.7.1" @@ -3788,6 +3805,106 @@ files = [ {file = "google_re2-1.1-4-cp39-cp39-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:1f4d4f0823e8b2f6952a145295b1ff25245ce9bb136aff6fe86452e507d4c1dd"}, {file = "google_re2-1.1-4-cp39-cp39-win32.whl", hash = "sha256:1afae56b2a07bb48cfcfefaa15ed85bae26a68f5dc7f9e128e6e6ea36914e847"}, {file = "google_re2-1.1-4-cp39-cp39-win_amd64.whl", hash = "sha256:aa7d6d05911ab9c8adbf3c225a7a120ab50fd2784ac48f2f0d140c0b7afc2b55"}, + {file = "google_re2-1.1-5-cp310-cp310-macosx_12_0_arm64.whl", hash = "sha256:222fc2ee0e40522de0b21ad3bc90ab8983be3bf3cec3d349c80d76c8bb1a4beb"}, + {file = "google_re2-1.1-5-cp310-cp310-macosx_12_0_x86_64.whl", hash = "sha256:d4763b0b9195b72132a4e7de8e5a9bf1f05542f442a9115aa27cfc2a8004f581"}, + {file = "google_re2-1.1-5-cp310-cp310-macosx_13_0_arm64.whl", hash = "sha256:209649da10c9d4a93d8a4d100ecbf9cc3b0252169426bec3e8b4ad7e57d600cf"}, + {file = "google_re2-1.1-5-cp310-cp310-macosx_13_0_x86_64.whl", hash = "sha256:68813aa333c1604a2df4a495b2a6ed065d7c8aebf26cc7e7abb5a6835d08353c"}, + {file = "google_re2-1.1-5-cp310-cp310-macosx_14_0_arm64.whl", hash = "sha256:370a23ec775ad14e9d1e71474d56f381224dcf3e72b15d8ca7b4ad7dd9cd5853"}, + {file = "google_re2-1.1-5-cp310-cp310-macosx_14_0_x86_64.whl", hash = "sha256:14664a66a3ddf6bc9e56f401bf029db2d169982c53eff3f5876399104df0e9a6"}, + {file = "google_re2-1.1-5-cp310-cp310-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:3ea3722cc4932cbcebd553b69dce1b4a73572823cff4e6a244f1c855da21d511"}, + {file = "google_re2-1.1-5-cp310-cp310-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:e14bb264c40fd7c627ef5678e295370cd6ba95ca71d835798b6e37502fc4c690"}, + {file = "google_re2-1.1-5-cp310-cp310-win32.whl", hash = "sha256:39512cd0151ea4b3969c992579c79b423018b464624ae955be685fc07d94556c"}, + {file = "google_re2-1.1-5-cp310-cp310-win_amd64.whl", hash = "sha256:ac66537aa3bc5504320d922b73156909e3c2b6da19739c866502f7827b3f9fdf"}, + {file = "google_re2-1.1-5-cp311-cp311-macosx_12_0_arm64.whl", hash = "sha256:5b5ea68d54890c9edb1b930dcb2658819354e5d3f2201f811798bbc0a142c2b4"}, + {file = "google_re2-1.1-5-cp311-cp311-macosx_12_0_x86_64.whl", hash = "sha256:33443511b6b83c35242370908efe2e8e1e7cae749c766b2b247bf30e8616066c"}, + {file = "google_re2-1.1-5-cp311-cp311-macosx_13_0_arm64.whl", hash = "sha256:413d77bdd5ba0bfcada428b4c146e87707452ec50a4091ec8e8ba1413d7e0619"}, + {file = "google_re2-1.1-5-cp311-cp311-macosx_13_0_x86_64.whl", hash = "sha256:5171686e43304996a34baa2abcee6f28b169806d0e583c16d55e5656b092a414"}, + {file = "google_re2-1.1-5-cp311-cp311-macosx_14_0_arm64.whl", hash = "sha256:3b284db130283771558e31a02d8eb8fb756156ab98ce80035ae2e9e3a5f307c4"}, + {file = "google_re2-1.1-5-cp311-cp311-macosx_14_0_x86_64.whl", hash = "sha256:296e6aed0b169648dc4b870ff47bd34c702a32600adb9926154569ef51033f47"}, + {file = "google_re2-1.1-5-cp311-cp311-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:38d50e68ead374160b1e656bbb5d101f0b95fb4cc57f4a5c12100155001480c5"}, + {file = "google_re2-1.1-5-cp311-cp311-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:2a0416a35921e5041758948bcb882456916f22845f66a93bc25070ef7262b72a"}, + {file = "google_re2-1.1-5-cp311-cp311-win32.whl", hash = "sha256:a1d59568bbb5de5dd56dd6cdc79907db26cce63eb4429260300c65f43469e3e7"}, + {file = "google_re2-1.1-5-cp311-cp311-win_amd64.whl", hash = "sha256:72f5a2f179648b8358737b2b493549370debd7d389884a54d331619b285514e3"}, + {file = "google_re2-1.1-5-cp312-cp312-macosx_12_0_arm64.whl", hash = "sha256:cbc72c45937b1dc5acac3560eb1720007dccca7c9879138ff874c7f6baf96005"}, + {file = "google_re2-1.1-5-cp312-cp312-macosx_12_0_x86_64.whl", hash = "sha256:5fadd1417fbef7235fa9453dba4eb102e6e7d94b1e4c99d5fa3dd4e288d0d2ae"}, + {file = "google_re2-1.1-5-cp312-cp312-macosx_13_0_arm64.whl", hash = "sha256:040f85c63cc02696485b59b187a5ef044abe2f99b92b4fb399de40b7d2904ccc"}, + {file = "google_re2-1.1-5-cp312-cp312-macosx_13_0_x86_64.whl", hash = "sha256:64e3b975ee6d9bbb2420494e41f929c1a0de4bcc16d86619ab7a87f6ea80d6bd"}, + {file = "google_re2-1.1-5-cp312-cp312-macosx_14_0_arm64.whl", hash = "sha256:8ee370413e00f4d828eaed0e83b8af84d7a72e8ee4f4bd5d3078bc741dfc430a"}, + {file = "google_re2-1.1-5-cp312-cp312-macosx_14_0_x86_64.whl", hash = "sha256:5b89383001079323f693ba592d7aad789d7a02e75adb5d3368d92b300f5963fd"}, + {file = "google_re2-1.1-5-cp312-cp312-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:63cb4fdfbbda16ae31b41a6388ea621510db82feb8217a74bf36552ecfcd50ad"}, + {file = "google_re2-1.1-5-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:9ebedd84ae8be10b7a71a16162376fd67a2386fe6361ef88c622dcf7fd679daf"}, + {file = "google_re2-1.1-5-cp312-cp312-win32.whl", hash = "sha256:c8e22d1692bc2c81173330c721aff53e47ffd3c4403ff0cd9d91adfd255dd150"}, + {file = "google_re2-1.1-5-cp312-cp312-win_amd64.whl", hash = "sha256:5197a6af438bb8c4abda0bbe9c4fbd6c27c159855b211098b29d51b73e4cbcf6"}, + {file = "google_re2-1.1-5-cp38-cp38-macosx_12_0_arm64.whl", hash = "sha256:b6727e0b98417e114b92688ad2aa256102ece51f29b743db3d831df53faf1ce3"}, + {file = "google_re2-1.1-5-cp38-cp38-macosx_12_0_x86_64.whl", hash = "sha256:711e2b6417eb579c61a4951029d844f6b95b9b373b213232efd413659889a363"}, + {file = "google_re2-1.1-5-cp38-cp38-macosx_13_0_arm64.whl", hash = "sha256:71ae8b3df22c5c154c8af0f0e99d234a450ef1644393bc2d7f53fc8c0a1e111c"}, + {file = "google_re2-1.1-5-cp38-cp38-macosx_13_0_x86_64.whl", hash = "sha256:94a04e214bc521a3807c217d50cf099bbdd0c0a80d2d996c0741dbb995b5f49f"}, + {file = "google_re2-1.1-5-cp38-cp38-macosx_14_0_arm64.whl", hash = "sha256:a770f75358508a9110c81a1257721f70c15d9bb592a2fb5c25ecbd13566e52a5"}, + {file = "google_re2-1.1-5-cp38-cp38-macosx_14_0_x86_64.whl", hash = "sha256:07c9133357f7e0b17c6694d5dcb82e0371f695d7c25faef2ff8117ef375343ff"}, + {file = "google_re2-1.1-5-cp38-cp38-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:204ca6b1cf2021548f4a9c29ac015e0a4ab0a7b6582bf2183d838132b60c8fda"}, + {file = "google_re2-1.1-5-cp38-cp38-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:f0b95857c2c654f419ca684ec38c9c3325c24e6ba7d11910a5110775a557bb18"}, + {file = "google_re2-1.1-5-cp38-cp38-win32.whl", hash = "sha256:347ac770e091a0364e822220f8d26ab53e6fdcdeaec635052000845c5a3fb869"}, + {file = "google_re2-1.1-5-cp38-cp38-win_amd64.whl", hash = "sha256:ec32bb6de7ffb112a07d210cf9f797b7600645c2d5910703fa07f456dd2150e0"}, + {file = "google_re2-1.1-5-cp39-cp39-macosx_12_0_arm64.whl", hash = "sha256:eb5adf89060f81c5ff26c28e261e6b4997530a923a6093c9726b8dec02a9a326"}, + {file = "google_re2-1.1-5-cp39-cp39-macosx_12_0_x86_64.whl", hash = "sha256:a22630c9dd9ceb41ca4316bccba2643a8b1d5c198f21c00ed5b50a94313aaf10"}, + {file = "google_re2-1.1-5-cp39-cp39-macosx_13_0_arm64.whl", hash = "sha256:544dc17fcc2d43ec05f317366375796351dec44058e1164e03c3f7d050284d58"}, + {file = "google_re2-1.1-5-cp39-cp39-macosx_13_0_x86_64.whl", hash = "sha256:19710af5ea88751c7768575b23765ce0dfef7324d2539de576f75cdc319d6654"}, + {file = "google_re2-1.1-5-cp39-cp39-macosx_14_0_arm64.whl", hash = "sha256:f82995a205e08ad896f4bd5ce4847c834fab877e1772a44e5f262a647d8a1dec"}, + {file = "google_re2-1.1-5-cp39-cp39-macosx_14_0_x86_64.whl", hash = "sha256:63533c4d58da9dc4bc040250f1f52b089911699f0368e0e6e15f996387a984ed"}, + {file = "google_re2-1.1-5-cp39-cp39-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:79e00fcf0cb04ea35a22b9014712d448725ce4ddc9f08cc818322566176ca4b0"}, + {file = "google_re2-1.1-5-cp39-cp39-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:bc41afcefee2da6c4ed883a93d7f527c4b960cd1d26bbb0020a7b8c2d341a60a"}, + {file = "google_re2-1.1-5-cp39-cp39-win32.whl", hash = "sha256:486730b5e1f1c31b0abc6d80abe174ce4f1188fe17d1b50698f2bf79dc6e44be"}, + {file = "google_re2-1.1-5-cp39-cp39-win_amd64.whl", hash = "sha256:4de637ca328f1d23209e80967d1b987d6b352cd01b3a52a84b4d742c69c3da6c"}, + {file = "google_re2-1.1-6-cp310-cp310-macosx_12_0_arm64.whl", hash = "sha256:621e9c199d1ff0fdb2a068ad450111a84b3bf14f96dfe5a8a7a0deae5f3f4cce"}, + {file = "google_re2-1.1-6-cp310-cp310-macosx_12_0_x86_64.whl", hash = "sha256:220acd31e7dde95373f97c3d1f3b3bd2532b38936af28b1917ee265d25bebbf4"}, + {file = "google_re2-1.1-6-cp310-cp310-macosx_13_0_arm64.whl", hash = "sha256:db34e1098d164f76251a6ece30e8f0ddfd65bb658619f48613ce71acb3f9cbdb"}, + {file = "google_re2-1.1-6-cp310-cp310-macosx_13_0_x86_64.whl", hash = "sha256:5152bac41d8073977582f06257219541d0fc46ad99b0bbf30e8f60198a43b08c"}, + {file = "google_re2-1.1-6-cp310-cp310-macosx_14_0_arm64.whl", hash = "sha256:6191294799e373ee1735af91f55abd23b786bdfd270768a690d9d55af9ea1b0d"}, + {file = "google_re2-1.1-6-cp310-cp310-macosx_14_0_x86_64.whl", hash = "sha256:070cbafbb4fecbb02e98feb28a1eb292fb880f434d531f38cc33ee314b521f1f"}, + {file = "google_re2-1.1-6-cp310-cp310-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:8437d078b405a59a576cbed544490fe041140f64411f2d91012e8ec05ab8bf86"}, + {file = "google_re2-1.1-6-cp310-cp310-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:f00f9a9af8896040e37896d9b9fc409ad4979f1ddd85bb188694a7d95ddd1164"}, + {file = "google_re2-1.1-6-cp310-cp310-win32.whl", hash = "sha256:df26345f229a898b4fd3cafd5f82259869388cee6268fc35af16a8e2293dd4e5"}, + {file = "google_re2-1.1-6-cp310-cp310-win_amd64.whl", hash = "sha256:3665d08262c57c9b28a5bdeb88632ad792c4e5f417e5645901695ab2624f5059"}, + {file = "google_re2-1.1-6-cp311-cp311-macosx_12_0_arm64.whl", hash = "sha256:b26b869d8aa1d8fe67c42836bf3416bb72f444528ee2431cfb59c0d3e02c6ce3"}, + {file = "google_re2-1.1-6-cp311-cp311-macosx_12_0_x86_64.whl", hash = "sha256:41fd4486c57dea4f222a6bb7f1ff79accf76676a73bdb8da0fcbd5ba73f8da71"}, + {file = "google_re2-1.1-6-cp311-cp311-macosx_13_0_arm64.whl", hash = "sha256:0ee378e2e74e25960070c338c28192377c4dd41e7f4608f2688064bd2badc41e"}, + {file = "google_re2-1.1-6-cp311-cp311-macosx_13_0_x86_64.whl", hash = "sha256:a00cdbf662693367b36d075b29feb649fd7ee1b617cf84f85f2deebeda25fc64"}, + {file = "google_re2-1.1-6-cp311-cp311-macosx_14_0_arm64.whl", hash = "sha256:4c09455014217a41499432b8c8f792f25f3df0ea2982203c3a8c8ca0e7895e69"}, + {file = "google_re2-1.1-6-cp311-cp311-macosx_14_0_x86_64.whl", hash = "sha256:6501717909185327935c7945e23bb5aa8fc7b6f237b45fe3647fa36148662158"}, + {file = "google_re2-1.1-6-cp311-cp311-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:3510b04790355f199e7861c29234081900e1e1cbf2d1484da48aa0ba6d7356ab"}, + {file = "google_re2-1.1-6-cp311-cp311-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:8c0e64c187ca406764f9e9ad6e750d62e69ed8f75bf2e865d0bfbc03b642361c"}, + {file = "google_re2-1.1-6-cp311-cp311-win32.whl", hash = "sha256:2a199132350542b0de0f31acbb3ca87c3a90895d1d6e5235f7792bb0af02e523"}, + {file = "google_re2-1.1-6-cp311-cp311-win_amd64.whl", hash = "sha256:83bdac8ceaece8a6db082ea3a8ba6a99a2a1ee7e9f01a9d6d50f79c6f251a01d"}, + {file = "google_re2-1.1-6-cp312-cp312-macosx_12_0_arm64.whl", hash = "sha256:81985ff894cd45ab5a73025922ac28c0707759db8171dd2f2cc7a0e856b6b5ad"}, + {file = "google_re2-1.1-6-cp312-cp312-macosx_12_0_x86_64.whl", hash = "sha256:5635af26065e6b45456ccbea08674ae2ab62494008d9202df628df3b267bc095"}, + {file = "google_re2-1.1-6-cp312-cp312-macosx_13_0_arm64.whl", hash = "sha256:813b6f04de79f4a8fdfe05e2cb33e0ccb40fe75d30ba441d519168f9d958bd54"}, + {file = "google_re2-1.1-6-cp312-cp312-macosx_13_0_x86_64.whl", hash = "sha256:5ec2f5332ad4fd232c3f2d6748c2c7845ccb66156a87df73abcc07f895d62ead"}, + {file = "google_re2-1.1-6-cp312-cp312-macosx_14_0_arm64.whl", hash = "sha256:5a687b3b32a6cbb731647393b7c4e3fde244aa557f647df124ff83fb9b93e170"}, + {file = "google_re2-1.1-6-cp312-cp312-macosx_14_0_x86_64.whl", hash = "sha256:39a62f9b3db5d3021a09a47f5b91708b64a0580193e5352751eb0c689e4ad3d7"}, + {file = "google_re2-1.1-6-cp312-cp312-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:ca0f0b45d4a1709cbf5d21f355e5809ac238f1ee594625a1e5ffa9ff7a09eb2b"}, + {file = "google_re2-1.1-6-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:a64b3796a7a616c7861247bd061c9a836b5caf0d5963e5ea8022125601cf7b09"}, + {file = "google_re2-1.1-6-cp312-cp312-win32.whl", hash = "sha256:32783b9cb88469ba4cd9472d459fe4865280a6b1acdad4480a7b5081144c4eb7"}, + {file = "google_re2-1.1-6-cp312-cp312-win_amd64.whl", hash = "sha256:259ff3fd2d39035b9cbcbf375995f83fa5d9e6a0c5b94406ff1cc168ed41d6c6"}, + {file = "google_re2-1.1-6-cp38-cp38-macosx_12_0_arm64.whl", hash = "sha256:e4711bcffe190acd29104d8ecfea0c0e42b754837de3fb8aad96e6cc3c613cdc"}, + {file = "google_re2-1.1-6-cp38-cp38-macosx_12_0_x86_64.whl", hash = "sha256:4d081cce43f39c2e813fe5990e1e378cbdb579d3f66ded5bade96130269ffd75"}, + {file = "google_re2-1.1-6-cp38-cp38-macosx_13_0_arm64.whl", hash = "sha256:4f123b54d48450d2d6b14d8fad38e930fb65b5b84f1b022c10f2913bd956f5b5"}, + {file = "google_re2-1.1-6-cp38-cp38-macosx_13_0_x86_64.whl", hash = "sha256:e1928b304a2b591a28eb3175f9db7f17c40c12cf2d4ec2a85fdf1cc9c073ff91"}, + {file = "google_re2-1.1-6-cp38-cp38-macosx_14_0_arm64.whl", hash = "sha256:3a69f76146166aec1173003c1f547931bdf288c6b135fda0020468492ac4149f"}, + {file = "google_re2-1.1-6-cp38-cp38-macosx_14_0_x86_64.whl", hash = "sha256:fc08c388f4ebbbca345e84a0c56362180d33d11cbe9ccfae663e4db88e13751e"}, + {file = "google_re2-1.1-6-cp38-cp38-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:b057adf38ce4e616486922f2f47fc7d19c827ba0a7f69d540a3664eba2269325"}, + {file = "google_re2-1.1-6-cp38-cp38-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:4138c0b933ab099e96f5d8defce4486f7dfd480ecaf7f221f2409f28022ccbc5"}, + {file = "google_re2-1.1-6-cp38-cp38-win32.whl", hash = "sha256:9693e45b37b504634b1abbf1ee979471ac6a70a0035954592af616306ab05dd6"}, + {file = "google_re2-1.1-6-cp38-cp38-win_amd64.whl", hash = "sha256:5674d437baba0ea287a5a7f8f81f24265d6ae8f8c09384e2ef7b6f84b40a7826"}, + {file = "google_re2-1.1-6-cp39-cp39-macosx_12_0_arm64.whl", hash = "sha256:7783137cb2e04f458a530c6d0ee9ef114815c1d48b9102f023998c371a3b060e"}, + {file = "google_re2-1.1-6-cp39-cp39-macosx_12_0_x86_64.whl", hash = "sha256:a49b7153935e7a303675f4deb5f5d02ab1305adefc436071348706d147c889e0"}, + {file = "google_re2-1.1-6-cp39-cp39-macosx_13_0_arm64.whl", hash = "sha256:a96a8bb309182090704593c60bdb369a2756b38fe358bbf0d40ddeb99c71769f"}, + {file = "google_re2-1.1-6-cp39-cp39-macosx_13_0_x86_64.whl", hash = "sha256:dff3d4be9f27ef8ec3705eed54f19ef4ab096f5876c15fe011628c69ba3b561c"}, + {file = "google_re2-1.1-6-cp39-cp39-macosx_14_0_arm64.whl", hash = "sha256:40f818b0b39e26811fa677978112a8108269977fdab2ba0453ac4363c35d9e66"}, + {file = "google_re2-1.1-6-cp39-cp39-macosx_14_0_x86_64.whl", hash = "sha256:8a7e53538cdb40ef4296017acfbb05cab0c19998be7552db1cfb85ba40b171b9"}, + {file = "google_re2-1.1-6-cp39-cp39-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:6ee18e7569fb714e5bb8c42809bf8160738637a5e71ed5a4797757a1fb4dc4de"}, + {file = "google_re2-1.1-6-cp39-cp39-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:1cda4f6d1a7d5b43ea92bc395f23853fba0caf8b1e1efa6e8c48685f912fcb89"}, + {file = "google_re2-1.1-6-cp39-cp39-win32.whl", hash = "sha256:6a9cdbdc36a2bf24f897be6a6c85125876dc26fea9eb4247234aec0decbdccfd"}, + {file = "google_re2-1.1-6-cp39-cp39-win_amd64.whl", hash = "sha256:73f646cecfad7cc5b4330b4192c25f2e29730a3b8408e089ffd2078094208196"}, ] [[package]] @@ -8693,6 +8810,21 @@ toml = {version = "*", markers = "python_version < \"3.11\""} tqdm = "*" typing-extensions = "*" +[[package]] +name = "sqlglot" +version = "25.23.2" +description = "An easily customizable SQL parser and transpiler" +optional = true +python-versions = ">=3.7" +files = [ + {file = "sqlglot-25.23.2-py3-none-any.whl", hash = "sha256:52b8c82da4b338fe5163395d6dbc4346fb39142d2735b0b662fc70a28b71472c"}, + {file = "sqlglot-25.23.2.tar.gz", hash = "sha256:fbf384de30f83ba01c47f1b953509da2edc0b4c906e6c5491a90c8accbd6ed26"}, +] + +[package.extras] +dev = ["duckdb (>=0.6)", "maturin (>=1.4,<2.0)", "mypy", "pandas", "pandas-stubs", "pdoc", "pre-commit", "python-dateutil", "pytz", "ruff (==0.4.3)", "types-python-dateutil", "types-pytz", "typing-extensions"] +rs = ["sqlglotrs (==0.2.12)"] + [[package]] name = "sqlparse" version = "0.4.4" @@ -9801,15 +9933,15 @@ cffi = ["cffi (>=1.11)"] [extras] athena = ["botocore", "pyarrow", "pyathena", "s3fs"] az = ["adlfs"] -bigquery = ["gcsfs", "google-cloud-bigquery", "grpcio", "pyarrow"] +bigquery = ["db-dtypes", "gcsfs", "google-cloud-bigquery", "grpcio", "pyarrow"] cli = ["cron-descriptor", "pipdeptree"] clickhouse = ["adlfs", "clickhouse-connect", "clickhouse-driver", "gcsfs", "pyarrow", "s3fs"] databricks = ["databricks-sql-connector"] deltalake = ["deltalake", "pyarrow"] dremio = ["pyarrow"] duckdb = ["duckdb"] -filesystem = ["botocore", "s3fs"] -gcp = ["gcsfs", "google-cloud-bigquery", "grpcio"] +filesystem = ["botocore", "s3fs", "sqlglot"] +gcp = ["db-dtypes", "gcsfs", "google-cloud-bigquery", "grpcio"] gs = ["gcsfs"] lancedb = ["lancedb", "pyarrow", "tantivy"] motherduck = ["duckdb", "pyarrow"] @@ -9829,4 +9961,4 @@ weaviate = ["weaviate-client"] [metadata] lock-version = "2.0" python-versions = ">=3.8.1,<3.13" -content-hash = "985bb75a9579b44a5f9fd029ade1cc77455b544f2e18f9741b1d0d89bd188537" +content-hash = "e0407ef0b20740989cddd3a9fba109bb4a3ce3a2699e9bf5f48a08e480c42225" diff --git a/pyproject.toml b/pyproject.toml index 7b378429cd..19c601f790 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -84,18 +84,20 @@ lancedb = { version = ">=0.8.2", optional = true, markers = "python_version >= ' tantivy = { version = ">= 0.22.0", optional = true } deltalake = { version = ">=0.19.0", optional = true } sqlalchemy = { version = ">=1.4", optional = true } -alembic = {version = "^1.13.2", optional = true} +alembic = {version = ">1.10.0", optional = true} paramiko = {version = ">=3.3.0", optional = true} +sqlglot = {version = ">=20.0.0", optional = true} +db-dtypes = { version = ">=1.2.0", optional = true } [tool.poetry.extras] gcp = ["grpcio", "google-cloud-bigquery", "db-dtypes", "gcsfs"] # bigquery is alias on gcp extras -bigquery = ["grpcio", "google-cloud-bigquery", "pyarrow", "db-dtypes", "gcsfs"] +bigquery = ["grpcio", "google-cloud-bigquery", "pyarrow", "gcsfs", "db-dtypes"] postgres = ["psycopg2-binary", "psycopg2cffi"] redshift = ["psycopg2-binary", "psycopg2cffi"] parquet = ["pyarrow"] duckdb = ["duckdb"] -filesystem = ["s3fs", "botocore"] +filesystem = ["s3fs", "botocore", "sqlglot"] s3 = ["s3fs", "botocore"] gs = ["gcsfs"] az = ["adlfs"] diff --git a/tests/load/filesystem/test_sql_client.py b/tests/load/filesystem/test_sql_client.py new file mode 100644 index 0000000000..a5344e14e1 --- /dev/null +++ b/tests/load/filesystem/test_sql_client.py @@ -0,0 +1,330 @@ +"""Test the duckdb supported sql client for special internal features""" + + +from typing import Any + +import pytest +import dlt +import os + +from dlt import Pipeline +from dlt.common.utils import uniq_id + +from tests.load.utils import ( + destinations_configs, + DestinationTestConfiguration, + GCS_BUCKET, + SFTP_BUCKET, + MEMORY_BUCKET, +) +from dlt.destinations import filesystem +from tests.utils import TEST_STORAGE_ROOT +from dlt.destinations.exceptions import DatabaseUndefinedRelation + + +def _run_dataset_checks( + pipeline: Pipeline, + destination_config: DestinationTestConfiguration, + table_format: Any = None, + alternate_access_pipeline: Pipeline = None, +) -> None: + total_records = 200 + + TEST_SECRET_NAME = "TEST_SECRET" + uniq_id() + + # only some buckets have support for persistent secrets + needs_persistent_secrets = ( + destination_config.bucket_url.startswith("s3") + or destination_config.bucket_url.startswith("az") + or destination_config.bucket_url.startswith("abfss") + ) + + unsupported_persistent_secrets = destination_config.bucket_url.startswith("gs") + + @dlt.source() + def source(): + @dlt.resource( + table_format=table_format, + write_disposition="replace", + ) + def items(): + yield from [ + { + "id": i, + "children": [{"id": i + 100}, {"id": i + 1000}], + } + for i in range(total_records) + ] + + @dlt.resource( + table_format=table_format, + write_disposition="replace", + ) + def double_items(): + yield from [ + { + "id": i, + "double_id": i * 2, + } + for i in range(total_records) + ] + + return [items, double_items] + + # run source + pipeline.run(source(), loader_file_format=destination_config.file_format) + + if alternate_access_pipeline: + pipeline.destination = alternate_access_pipeline.destination + + import duckdb + from duckdb import HTTPException, IOException, InvalidInputException + from dlt.destinations.impl.filesystem.sql_client import ( + FilesystemSqlClient, + DuckDbCredentials, + ) + + # check we can create new tables from the views + with pipeline.sql_client() as c: + c.execute_sql( + "CREATE TABLE items_joined AS (SELECT i.id, di.double_id FROM items as i JOIN" + " double_items as di ON (i.id = di.id));" + ) + with c.execute_query("SELECT * FROM items_joined ORDER BY id ASC;") as cursor: + joined_table = cursor.fetchall() + assert len(joined_table) == total_records + assert list(joined_table[0]) == [0, 0] + assert list(joined_table[5]) == [5, 10] + assert list(joined_table[10]) == [10, 20] + + # inserting values into a view should fail gracefully + with pipeline.sql_client() as c: + try: + c.execute_sql("INSERT INTO double_items VALUES (1, 2)") + except Exception as exc: + assert "double_items is not an table" in str(exc) + + # check that no automated views are created for a schema different than + # the known one + with pipeline.sql_client() as c: + c.execute_sql("CREATE SCHEMA other_schema;") + with pytest.raises(DatabaseUndefinedRelation): + with c.execute_query("SELECT * FROM other_schema.items ORDER BY id ASC;") as cursor: + pass + # correct dataset view works + with c.execute_query(f"SELECT * FROM {c.dataset_name}.items ORDER BY id ASC;") as cursor: + table = cursor.fetchall() + assert len(table) == total_records + # no dataset prefix works + with c.execute_query("SELECT * FROM items ORDER BY id ASC;") as cursor: + table = cursor.fetchall() + assert len(table) == total_records + + # + # tests with external duckdb instance + # + + duck_db_location = TEST_STORAGE_ROOT + "/" + uniq_id() + + def _external_duckdb_connection() -> duckdb.DuckDBPyConnection: + external_db = duckdb.connect(duck_db_location) + # the line below solves problems with certificate path lookup on linux, see duckdb docs + external_db.sql("SET azure_transport_option_type = 'curl';") + return external_db + + def _fs_sql_client_for_external_db( + connection: duckdb.DuckDBPyConnection, + ) -> FilesystemSqlClient: + return FilesystemSqlClient( + dataset_name="second", + fs_client=pipeline.destination_client(), # type: ignore + credentials=DuckDbCredentials(connection), + ) + + # we create a duckdb with a table an see wether we can add more views from the fs client + external_db = _external_duckdb_connection() + external_db.execute("CREATE SCHEMA first;") + external_db.execute("CREATE SCHEMA second;") + external_db.execute("CREATE TABLE first.items AS SELECT i FROM range(0, 3) t(i)") + assert len(external_db.sql("SELECT * FROM first.items").fetchall()) == 3 + + fs_sql_client = _fs_sql_client_for_external_db(external_db) + with fs_sql_client as sql_client: + sql_client.create_views_for_tables( + {"items": "referenced_items", "_dlt_loads": "_dlt_loads"} + ) + + # views exist + assert len(external_db.sql("SELECT * FROM second.referenced_items").fetchall()) == total_records + assert len(external_db.sql("SELECT * FROM first.items").fetchall()) == 3 + external_db.close() + + # in case we are not connecting to a bucket, views should still be here after connection reopen + if not needs_persistent_secrets and not unsupported_persistent_secrets: + external_db = _external_duckdb_connection() + assert ( + len(external_db.sql("SELECT * FROM second.referenced_items").fetchall()) + == total_records + ) + external_db.close() + return + + # in other cases secrets are not available and this should fail + external_db = _external_duckdb_connection() + with pytest.raises((HTTPException, IOException, InvalidInputException)): + assert ( + len(external_db.sql("SELECT * FROM second.referenced_items").fetchall()) + == total_records + ) + external_db.close() + + # gs does not support persistent secrest, so we can't do further checks + if unsupported_persistent_secrets: + return + + # create secret + external_db = _external_duckdb_connection() + fs_sql_client = _fs_sql_client_for_external_db(external_db) + with fs_sql_client as sql_client: + fs_sql_client.create_authentication(persistent=True, secret_name=TEST_SECRET_NAME) + external_db.close() + + # now this should work + external_db = _external_duckdb_connection() + assert len(external_db.sql("SELECT * FROM second.referenced_items").fetchall()) == total_records + + # NOTE: when running this on CI, there seem to be some kind of race conditions that prevent + # secrets from being removed as it does not find the file... We'll need to investigate this. + return + + # now drop the secrets again + fs_sql_client = _fs_sql_client_for_external_db(external_db) + with fs_sql_client as sql_client: + fs_sql_client.drop_authentication(TEST_SECRET_NAME) + external_db.close() + + # fails again + external_db = _external_duckdb_connection() + with pytest.raises((HTTPException, IOException, InvalidInputException)): + assert ( + len(external_db.sql("SELECT * FROM second.referenced_items").fetchall()) + == total_records + ) + external_db.close() + + +@pytest.mark.essential +@pytest.mark.parametrize( + "destination_config", + destinations_configs( + local_filesystem_configs=True, + all_buckets_filesystem_configs=True, + bucket_exclude=[SFTP_BUCKET, MEMORY_BUCKET], + ), # TODO: make SFTP work + ids=lambda x: x.name, +) +def test_read_interfaces_filesystem(destination_config: DestinationTestConfiguration) -> None: + # we force multiple files per table, they may only hold 700 items + os.environ["DATA_WRITER__FILE_MAX_ITEMS"] = "700" + + if destination_config.file_format not in ["parquet", "jsonl"]: + pytest.skip( + f"Test only works for jsonl and parquet, given: {destination_config.file_format}" + ) + + pipeline = destination_config.setup_pipeline( + "read_pipeline", + dataset_name="read_test", + dev_mode=True, + ) + + _run_dataset_checks(pipeline, destination_config) + + # for gcs buckets we additionally test the s3 compat layer + if destination_config.bucket_url == GCS_BUCKET: + gcp_bucket = filesystem( + GCS_BUCKET.replace("gs://", "s3://"), destination_name="filesystem_s3_gcs_comp" + ) + pipeline = destination_config.setup_pipeline( + "read_pipeline", dataset_name="read_test", dev_mode=True, destination=gcp_bucket + ) + _run_dataset_checks(pipeline, destination_config) + + +@pytest.mark.essential +@pytest.mark.parametrize( + "destination_config", + destinations_configs( + table_format_filesystem_configs=True, + with_table_format="delta", + bucket_exclude=[SFTP_BUCKET, MEMORY_BUCKET], + # NOTE: delta does not work on memory buckets + ), + ids=lambda x: x.name, +) +def test_delta_tables(destination_config: DestinationTestConfiguration) -> None: + os.environ["DATA_WRITER__FILE_MAX_ITEMS"] = "700" + + pipeline = destination_config.setup_pipeline( + "read_pipeline", + dataset_name="read_test", + ) + + # in case of gcs we use the s3 compat layer for reading + # for writing we still need to use the gc authentication, as delta_rs seems to use + # methods on the s3 interface that are not implemented by gcs + access_pipeline = pipeline + if destination_config.bucket_url == GCS_BUCKET: + gcp_bucket = filesystem( + GCS_BUCKET.replace("gs://", "s3://"), destination_name="filesystem_s3_gcs_comp" + ) + access_pipeline = destination_config.setup_pipeline( + "read_pipeline", dataset_name="read_test", destination=gcp_bucket + ) + + _run_dataset_checks( + pipeline, + destination_config, + table_format="delta", + alternate_access_pipeline=access_pipeline, + ) + + +@pytest.mark.essential +@pytest.mark.parametrize( + "destination_config", + destinations_configs(local_filesystem_configs=True), + ids=lambda x: x.name, +) +def test_evolving_filesystem(destination_config: DestinationTestConfiguration) -> None: + """test that files with unequal schemas still work together""" + + if destination_config.file_format not in ["parquet", "jsonl"]: + pytest.skip( + f"Test only works for jsonl and parquet, given: {destination_config.file_format}" + ) + + @dlt.resource(table_name="items") + def items(): + yield from [{"id": i} for i in range(20)] + + pipeline = destination_config.setup_pipeline( + "read_pipeline", + dataset_name="read_test", + dev_mode=True, + ) + + pipeline.run([items()], loader_file_format=destination_config.file_format) + + df = pipeline._dataset().items.df() + assert len(df.index) == 20 + + @dlt.resource(table_name="items") + def items2(): + yield from [{"id": i, "other_value": "Blah"} for i in range(20, 50)] + + pipeline.run([items2()], loader_file_format=destination_config.file_format) + + # check df and arrow access + assert len(pipeline._dataset().items.df().index) == 50 + assert pipeline._dataset().items.arrow().num_rows == 50 diff --git a/tests/load/pipeline/test_restore_state.py b/tests/load/pipeline/test_restore_state.py index 050636c491..51cb392b29 100644 --- a/tests/load/pipeline/test_restore_state.py +++ b/tests/load/pipeline/test_restore_state.py @@ -674,6 +674,10 @@ def some_data(param: str) -> Any: # nevertheless this is potentially dangerous situation 🤷 assert ra_production_p.state == prod_state + # for now skip sql client tests for filesystem + if destination_config.destination_type == "filesystem": + return + # get all the states, notice version 4 twice (one from production, the other from local) try: with p.sql_client() as client: diff --git a/tests/load/sqlalchemy/docker-compose.yml b/tests/load/sqlalchemy/docker-compose.yml new file mode 100644 index 0000000000..29375a0f2e --- /dev/null +++ b/tests/load/sqlalchemy/docker-compose.yml @@ -0,0 +1,16 @@ +# Use root/example as user/password credentials +version: '3.1' + +services: + + db: + image: mysql:8 + restart: always + environment: + MYSQL_ROOT_PASSWORD: root + MYSQL_DATABASE: dlt_data + MYSQL_USER: loader + MYSQL_PASSWORD: loader + ports: + - 3306:3306 + # (this is just an example, not intended to be a production configuration) diff --git a/tests/load/test_read_interfaces.py b/tests/load/test_read_interfaces.py new file mode 100644 index 0000000000..e093e4d670 --- /dev/null +++ b/tests/load/test_read_interfaces.py @@ -0,0 +1,302 @@ +from typing import Any + +import pytest +import dlt +import os + +from dlt import Pipeline +from dlt.common import Decimal +from dlt.common.utils import uniq_id + +from typing import List +from functools import reduce + +from tests.load.utils import ( + destinations_configs, + DestinationTestConfiguration, + GCS_BUCKET, + SFTP_BUCKET, + MEMORY_BUCKET, +) +from dlt.destinations import filesystem +from tests.utils import TEST_STORAGE_ROOT + + +def _run_dataset_checks( + pipeline: Pipeline, + destination_config: DestinationTestConfiguration, + table_format: Any = None, + alternate_access_pipeline: Pipeline = None, +) -> None: + destination_type = pipeline.destination_client().config.destination_type + + skip_df_chunk_size_check = False + expected_columns = ["id", "decimal", "other_decimal", "_dlt_load_id", "_dlt_id"] + if destination_type == "bigquery": + chunk_size = 50 + total_records = 80 + elif destination_type == "mssql": + chunk_size = 700 + total_records = 1000 + else: + chunk_size = 2048 + total_records = 3000 + + # on filesystem one chunk is one file and not the default vector size + if destination_type == "filesystem": + skip_df_chunk_size_check = True + + # we always expect 2 chunks based on the above setup + expected_chunk_counts = [chunk_size, total_records - chunk_size] + + @dlt.source() + def source(): + @dlt.resource( + table_format=table_format, + write_disposition="replace", + columns={ + "id": {"data_type": "bigint"}, + # we add a decimal with precision to see wether the hints are preserved + "decimal": {"data_type": "decimal", "precision": 10, "scale": 3}, + "other_decimal": {"data_type": "decimal", "precision": 12, "scale": 3}, + }, + ) + def items(): + yield from [ + { + "id": i, + "children": [{"id": i + 100}, {"id": i + 1000}], + "decimal": Decimal("10.433"), + "other_decimal": Decimal("10.433"), + } + for i in range(total_records) + ] + + @dlt.resource( + table_format=table_format, + write_disposition="replace", + columns={ + "id": {"data_type": "bigint"}, + "double_id": {"data_type": "bigint"}, + }, + ) + def double_items(): + yield from [ + { + "id": i, + "double_id": i * 2, + } + for i in range(total_records) + ] + + return [items, double_items] + + # run source + s = source() + pipeline.run(s, loader_file_format=destination_config.file_format) + + if alternate_access_pipeline: + pipeline.destination = alternate_access_pipeline.destination + + # access via key + table_relationship = pipeline._dataset()["items"] + + # full frame + df = table_relationship.df() + assert len(df.index) == total_records + + # + # check dataframes + # + + # chunk + df = table_relationship.df(chunk_size=chunk_size) + if not skip_df_chunk_size_check: + assert len(df.index) == chunk_size + # lowercase results for the snowflake case + assert set(df.columns.values) == set(expected_columns) + + # iterate all dataframes + frames = list(table_relationship.iter_df(chunk_size=chunk_size)) + if not skip_df_chunk_size_check: + assert [len(df.index) for df in frames] == expected_chunk_counts + + # check all items are present + ids = reduce(lambda a, b: a + b, [f[expected_columns[0]].to_list() for f in frames]) + assert set(ids) == set(range(total_records)) + + # access via prop + table_relationship = pipeline._dataset().items + + # + # check arrow tables + # + + # full table + table = table_relationship.arrow() + assert table.num_rows == total_records + + # chunk + table = table_relationship.arrow(chunk_size=chunk_size) + assert set(table.column_names) == set(expected_columns) + assert table.num_rows == chunk_size + + # check frame amount and items counts + tables = list(table_relationship.iter_arrow(chunk_size=chunk_size)) + assert [t.num_rows for t in tables] == expected_chunk_counts + + # check all items are present + ids = reduce(lambda a, b: a + b, [t.column(expected_columns[0]).to_pylist() for t in tables]) + assert set(ids) == set(range(total_records)) + + # check fetch accessors + table_relationship = pipeline._dataset().items + + # check accessing one item + one = table_relationship.fetchone() + assert one[0] in range(total_records) + + # check fetchall + fall = table_relationship.fetchall() + assert len(fall) == total_records + assert {item[0] for item in fall} == set(range(total_records)) + + # check fetchmany + many = table_relationship.fetchmany(chunk_size) + assert len(many) == chunk_size + + # check iterfetchmany + chunks = list(table_relationship.iter_fetch(chunk_size=chunk_size)) + assert [len(chunk) for chunk in chunks] == expected_chunk_counts + ids = reduce(lambda a, b: a + b, [[item[0] for item in chunk] for chunk in chunks]) + assert set(ids) == set(range(total_records)) + + # check that hints are carried over to arrow table + expected_decimal_precision = 10 + expected_decimal_precision_2 = 12 + if destination_config.destination_type == "bigquery": + # bigquery does not allow precision configuration.. + expected_decimal_precision = 38 + expected_decimal_precision_2 = 38 + assert ( + table_relationship.arrow().schema.field("decimal").type.precision + == expected_decimal_precision + ) + assert ( + table_relationship.arrow().schema.field("other_decimal").type.precision + == expected_decimal_precision_2 + ) + + # simple check that query also works + tname = pipeline.sql_client().make_qualified_table_name("items") + query_relationship = pipeline._dataset()(f"select * from {tname} where id < 20") + + # we selected the first 20 + table = query_relationship.arrow() + assert table.num_rows == 20 + + # check join query + tdname = pipeline.sql_client().make_qualified_table_name("double_items") + query = ( + f"SELECT i.id, di.double_id FROM {tname} as i JOIN {tdname} as di ON (i.id = di.id) WHERE" + " i.id < 20 ORDER BY i.id ASC" + ) + join_relationship = pipeline._dataset()(query) + table = join_relationship.fetchall() + assert len(table) == 20 + assert list(table[0]) == [0, 0] + assert list(table[5]) == [5, 10] + assert list(table[10]) == [10, 20] + + # check loads table access + loads_table = pipeline._dataset()[pipeline.default_schema.loads_table_name] + loads_table.fetchall() + + +@pytest.mark.essential +@pytest.mark.parametrize( + "destination_config", + destinations_configs(default_sql_configs=True), + ids=lambda x: x.name, +) +def test_read_interfaces_sql(destination_config: DestinationTestConfiguration) -> None: + pipeline = destination_config.setup_pipeline( + "read_pipeline", dataset_name="read_test", dev_mode=True + ) + _run_dataset_checks(pipeline, destination_config) + + +@pytest.mark.essential +@pytest.mark.parametrize( + "destination_config", + destinations_configs( + local_filesystem_configs=True, + all_buckets_filesystem_configs=True, + bucket_exclude=[SFTP_BUCKET, MEMORY_BUCKET], + ), # TODO: make SFTP work + ids=lambda x: x.name, +) +def test_read_interfaces_filesystem(destination_config: DestinationTestConfiguration) -> None: + # we force multiple files per table, they may only hold 700 items + os.environ["DATA_WRITER__FILE_MAX_ITEMS"] = "700" + + if destination_config.file_format not in ["parquet", "jsonl"]: + pytest.skip( + f"Test only works for jsonl and parquet, given: {destination_config.file_format}" + ) + + pipeline = destination_config.setup_pipeline( + "read_pipeline", + dataset_name="read_test", + dev_mode=True, + ) + + _run_dataset_checks(pipeline, destination_config) + + # for gcs buckets we additionally test the s3 compat layer + if destination_config.bucket_url == GCS_BUCKET: + gcp_bucket = filesystem( + GCS_BUCKET.replace("gs://", "s3://"), destination_name="filesystem_s3_gcs_comp" + ) + pipeline = destination_config.setup_pipeline( + "read_pipeline", dataset_name="read_test", dev_mode=True, destination=gcp_bucket + ) + _run_dataset_checks(pipeline, destination_config) + + +@pytest.mark.essential +@pytest.mark.parametrize( + "destination_config", + destinations_configs( + table_format_filesystem_configs=True, + with_table_format="delta", + bucket_exclude=[SFTP_BUCKET, MEMORY_BUCKET], + ), + ids=lambda x: x.name, +) +def test_delta_tables(destination_config: DestinationTestConfiguration) -> None: + os.environ["DATA_WRITER__FILE_MAX_ITEMS"] = "700" + + pipeline = destination_config.setup_pipeline( + "read_pipeline", + dataset_name="read_test", + ) + + # in case of gcs we use the s3 compat layer for reading + # for writing we still need to use the gc authentication, as delta_rs seems to use + # methods on the s3 interface that are not implemented by gcs + access_pipeline = pipeline + if destination_config.bucket_url == GCS_BUCKET: + gcp_bucket = filesystem( + GCS_BUCKET.replace("gs://", "s3://"), destination_name="filesystem_s3_gcs_comp" + ) + access_pipeline = destination_config.setup_pipeline( + "read_pipeline", dataset_name="read_test", destination=gcp_bucket + ) + + _run_dataset_checks( + pipeline, + destination_config, + table_format="delta", + alternate_access_pipeline=access_pipeline, + ) diff --git a/tests/load/test_sql_client.py b/tests/load/test_sql_client.py index 199b4b83b7..3636b3e53a 100644 --- a/tests/load/test_sql_client.py +++ b/tests/load/test_sql_client.py @@ -347,9 +347,13 @@ def test_execute_df(client: SqlJobClientBase) -> None: f"SELECT * FROM {f_q_table_name} ORDER BY col ASC" ) as curr: # be compatible with duckdb vector size - df_1 = curr.df(chunk_size=chunk_size) - df_2 = curr.df(chunk_size=chunk_size) - df_3 = curr.df(chunk_size=chunk_size) + iterator = curr.iter_df(chunk_size) + df_1 = next(iterator) + df_2 = next(iterator) + try: + df_3 = next(iterator) + except StopIteration: + df_3 = None # Force lower case df columns, snowflake has all cols uppercase for df in [df_1, df_2, df_3]: if df is not None: diff --git a/tests/load/utils.py b/tests/load/utils.py index 9cfb6984a5..268d24ded2 100644 --- a/tests/load/utils.py +++ b/tests/load/utils.py @@ -162,7 +162,7 @@ class DestinationTestConfiguration: supports_dbt: bool = True disable_compression: bool = False dev_mode: bool = False - credentials: Optional[Union[CredentialsConfiguration, Dict[str, Any]]] = None + credentials: Optional[Union[CredentialsConfiguration, Dict[str, Any], str]] = None env_vars: Optional[Dict[str, str]] = None destination_name: Optional[str] = None @@ -215,8 +215,11 @@ def setup(self) -> None: os.environ["DATA_WRITER__DISABLE_COMPRESSION"] = "True" if self.credentials is not None: - for key, value in dict(self.credentials).items(): - os.environ[f"DESTINATION__CREDENTIALS__{key.upper()}"] = str(value) + if isinstance(self.credentials, str): + os.environ["DESTINATION__CREDENTIALS"] = self.credentials + else: + for key, value in dict(self.credentials).items(): + os.environ[f"DESTINATION__CREDENTIALS__{key.upper()}"] = str(value) if self.env_vars is not None: for k, v in self.env_vars.items(): @@ -334,12 +337,16 @@ def destinations_configs( supports_merge=True, supports_dbt=False, destination_name="sqlalchemy_mysql", + credentials=( # Use root cause we need to create databases, + "mysql://root:root@127.0.0.1:3306/dlt_data" + ), ), DestinationTestConfiguration( destination_type="sqlalchemy", supports_merge=True, supports_dbt=False, destination_name="sqlalchemy_sqlite", + credentials="sqlite:///_storage/dl_data.sqlite", ), ] @@ -589,6 +596,7 @@ def destinations_configs( bucket_url=bucket, extra_info=bucket, supports_merge=False, + file_format="parquet", ) ] diff --git a/.github/weaviate-compose.yml b/tests/load/weaviate/docker-compose.yml similarity index 100% rename from .github/weaviate-compose.yml rename to tests/load/weaviate/docker-compose.yml diff --git a/tests/sources/sql_database/test_arrow_helpers.py b/tests/sources/sql_database/test_arrow_helpers.py index 8328bed89b..abd063889c 100644 --- a/tests/sources/sql_database/test_arrow_helpers.py +++ b/tests/sources/sql_database/test_arrow_helpers.py @@ -65,7 +65,7 @@ def test_row_tuples_to_arrow_unknown_types(all_unknown: bool) -> None: col.pop("data_type", None) # Call the function - result = row_tuples_to_arrow(rows, columns, tz="UTC") # type: ignore[arg-type] + result = row_tuples_to_arrow(rows, columns=columns, tz="UTC") # type: ignore # Result is arrow table containing all columns in original order with correct types assert result.num_columns == len(columns) @@ -98,7 +98,7 @@ def test_row_tuples_to_arrow_detects_range_type() -> None: (IntRange(3, 30),), ] result = row_tuples_to_arrow( - rows=rows, # type: ignore[arg-type] + rows=rows, columns={"range_col": {"name": "range_col", "nullable": False}}, tz="UTC", )