Skip to content

Commit

Permalink
data pond: expose readable datasets as dataframes and arrow tables (#…
Browse files Browse the repository at this point in the history
…1507)

* add simple ibis helper

* start working on dataframe reading interface

* a bit more work

* first simple implementation

* small change

* more work on dataset

* some work on filesystem destination

* add support for parquet files and compression on jsonl files in filesystem dataframe implementation

* fix test after devel merge

* add nice composable pipeline example

* small updates to demo

* enable tests for all bucket providers
remove resource based dataset accessor

* fix tests

* create views in duckdb filesystem accessor

* move to relations based interface

* add generic duckdb interface to filesystem

* move code for accessing frames and tables to the cursor and use duckdb dbapi cursor in filesystem

* add native db api cursor fetching to exposed dataset

* some small changes

* switch dataaccess pandas to pyarrow

* add native bigquery support for df and arrow tables

* change iter functions to always expect chunk size (None will default to full frame/table)

* add native implementation for databricks

* add dremio native implementation for full frames and tables

* fix filesystem test
make filesystem duckdb instance use glob pattern

* add test for evolving filesystem

* fix empty dataframe retrieval

* remove old df test

* clean up interfaces a bit (more to come?)
remove pipeline dependency from dataset

* move dataset creation into destination client and clean up interfaces / reference a bit more

* renames some interfaces and adds brief docstrings

* add filesystem cached duckdb and remove the need to declare needed views for filesystem

* fix tests for snowflake

* make data set a function

* fix db-types depdency for bigquery

* create duckdb based sql client for filesystem

* fix example pipeline

* enable filesystem sql client to work on streamlit

* add comments

* rename sql to query
remove unneeded code

* fix tests that rely on sql client

* post merge cleanups

* move imports around a bit

* exclude abfss buckets from test

* add support for arrow schema creation from known dlt schema

* re-use sqldatabase code for cursors

* fix bug

* add default columns where needed

* add sql glot to filesystem deps

* store filesystem tables in correct dataset

* move cursor columns location

* fix snowflake and mssql
disable tests with sftp

* clean up compose files a bit

* fix sqlalchemy

* add mysql docker compose file

* fix linting

* prepare hint checking

* disable part of state test

* enable hint check

* add column type support for filesystem json

* rename dataset implementation to DBAPI
remove dataset specific code from destination client

* wrap functions in dbapi readable dataset

* remove example pipeline

* rename test_decimal_name

* make column code a bit clearer and fix mssql again

* rename df methods to pandas

* fix bug in default columns

* fix hints test and columns bug
removes some uneeded code

* catch mysql error if no rows returned

* add exceptions for not implemented bucket and filetypes

* fix docs

* add config section for getting pipeline clients

* set default dataset in filesystem sqlclient

* add config section for sync_destination

* rename readablerelation methods

* use more functions of the duckdb sql client in filesystem version

* update dependencies

* use active pipeline capabilities if available for arrow table

* update types

* rename dataset accessor function

* add test for accessing tables with unquqlified tablename

* fix sql client

* add duckdb native support for azure, s3 and gcs (via s3)

* some typing

* add dataframes tests back in

* add join table and update view tests for filesystem

* start adding tests for creating views on remote duckdb

* fix snippets

* fix some dependencies and mssql/synapse tests

* fix bigquery dependencies and abfss tests

* add tests for adding view to external dbs and persistent secrets

* add support for delta tables

* add duckdb to read interface tests

* fix delta tests

* make default secret name derived from bucket url

* try fix azure tests again

* fix df access tests

* PR fixes

* correct internal table access

* allow datasets without schema

* skips parametrized queries, skips tables from non-dataset schemas

* move filesystem specific sql_client tests to correct location and test a few more things

* fix sql client tests

* make secret name when dropping optional

* fix gs test

* remove moved filesystem tests from test_read_interfaces

* fix sql client tests again... :)

* clear duckdb secrets

* disable secrets deleting for delta tests

---------

Co-authored-by: Marcin Rudolf <[email protected]>
  • Loading branch information
sh-rp and rudolfix authored Oct 8, 2024
1 parent 2d07a43 commit 4ee65a8
Show file tree
Hide file tree
Showing 45 changed files with 1,734 additions and 298 deletions.
5 changes: 4 additions & 1 deletion .github/workflows/test_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,14 @@ jobs:

- name: Install dependencies
# if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true'
run: poetry install --no-interaction -E redshift -E gs -E s3 -E az -E parquet -E duckdb -E cli --with sentry-sdk --with pipeline -E deltalake
run: poetry install --no-interaction -E redshift -E gs -E s3 -E az -E parquet -E duckdb -E cli -E filesystem --with sentry-sdk --with pipeline -E deltalake

- name: create secrets.toml
run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml

- name: clear duckdb secrets and cache
run: rm -rf ~/.duckdb

- run: |
poetry run pytest tests/load --ignore tests/load/sources -m "essential"
name: Run essential tests Linux
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_doc_snippets.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ jobs:
uses: actions/checkout@master

- name: Start weaviate
run: docker compose -f ".github/weaviate-compose.yml" up -d
run: docker compose -f "tests/load/weaviate/docker-compose.yml" up -d

- name: Setup Python
uses: actions/setup-python@v4
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test_local_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ jobs:
uses: actions/checkout@master

- name: Start weaviate
run: docker compose -f ".github/weaviate-compose.yml" up -d
run: docker compose -f "tests/load/weaviate/docker-compose.yml" up -d

- name: Setup Python
uses: actions/setup-python@v4
Expand Down Expand Up @@ -122,7 +122,7 @@ jobs:

- name: Stop weaviate
if: always()
run: docker compose -f ".github/weaviate-compose.yml" down -v
run: docker compose -f "tests/load/weaviate/docker-compose.yml" down -v

- name: Stop SFTP server
if: always()
Expand Down
8 changes: 6 additions & 2 deletions .github/workflows/test_pyarrow17.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,18 @@ jobs:
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-pyarrow17

- name: Install dependencies
run: poetry install --no-interaction --with sentry-sdk --with pipeline -E deltalake -E gs -E s3 -E az
run: poetry install --no-interaction --with sentry-sdk --with pipeline -E deltalake -E duckdb -E filesystem -E gs -E s3 -E az


- name: Upgrade pyarrow
run: poetry run pip install pyarrow==17.0.0

- name: create secrets.toml
run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml

- name: clear duckdb secrets and cache
run: rm -rf ~/.duckdb

- name: Run needspyarrow17 tests Linux
run: |
poetry run pytest tests/libs -m "needspyarrow17"
Expand Down
3 changes: 0 additions & 3 deletions .github/workflows/test_sqlalchemy_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,3 @@ jobs:
# always run full suite, also on branches
- run: poetry run pytest tests/load -x --ignore tests/load/sources
name: Run tests Linux
env:
DESTINATION__SQLALCHEMY_MYSQL__CREDENTIALS: mysql://root:[email protected]:3306/dlt_data # Use root cause we need to create databases
DESTINATION__SQLALCHEMY_SQLITE__CREDENTIALS: sqlite:///_storage/dl_data.sqlite
9 changes: 8 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,11 @@ test-build-images: build-library

preprocess-docs:
# run docs preprocessing to run a few checks and ensure examples can be parsed
cd docs/website && npm i && npm run preprocess-docs
cd docs/website && npm i && npm run preprocess-docs

start-test-containers:
docker compose -f "tests/load/dremio/docker-compose.yml" up -d
docker compose -f "tests/load/postgres/docker-compose.yml" up -d
docker compose -f "tests/load/weaviate/docker-compose.yml" up -d
docker compose -f "tests/load/filesystem_sftp/docker-compose.yml" up -d
docker compose -f "tests/load/sqlalchemy/docker-compose.yml" up -d
17 changes: 2 additions & 15 deletions dlt/common/data_writers/writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,23 +320,10 @@ def _create_writer(self, schema: "pa.Schema") -> "pa.parquet.ParquetWriter":
)

def write_header(self, columns_schema: TTableSchemaColumns) -> None:
from dlt.common.libs.pyarrow import pyarrow, get_py_arrow_datatype
from dlt.common.libs.pyarrow import columns_to_arrow

# build schema
self.schema = pyarrow.schema(
[
pyarrow.field(
name,
get_py_arrow_datatype(
schema_item,
self._caps,
self.timestamp_timezone,
),
nullable=is_nullable_column(schema_item),
)
for name, schema_item in columns_schema.items()
]
)
self.schema = columns_to_arrow(columns_schema, self._caps, self.timestamp_timezone)
# find row items that are of the json type (could be abstracted out for use in other writers?)
self.nested_indices = [
i for i, field in columns_schema.items() if field["data_type"] == "json"
Expand Down
85 changes: 84 additions & 1 deletion dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from abc import ABC, abstractmethod
import dataclasses
from importlib import import_module
from contextlib import contextmanager

from types import TracebackType
from typing import (
Callable,
Expand All @@ -18,24 +20,33 @@
Any,
TypeVar,
Generic,
Generator,
TYPE_CHECKING,
Protocol,
Tuple,
AnyStr,
)
from typing_extensions import Annotated
import datetime # noqa: 251
import inspect

from dlt.common import logger, pendulum

from dlt.common.configuration.specs.base_configuration import extract_inner_hint
from dlt.common.destination.typing import PreparedTableSchema
from dlt.common.destination.utils import verify_schema_capabilities, verify_supported_data_types
from dlt.common.exceptions import TerminalException
from dlt.common.metrics import LoadJobMetrics
from dlt.common.normalizers.naming import NamingConvention
from dlt.common.schema import Schema, TSchemaTables
from dlt.common.schema.typing import TTableSchemaColumns

from dlt.common.schema import Schema, TSchemaTables, TTableSchema
from dlt.common.schema.typing import (
C_DLT_LOAD_ID,
TLoaderReplaceStrategy,
)
from dlt.common.schema.utils import fill_hints_from_parent_and_clone_table

from dlt.common.configuration import configspec, resolve_configuration, known_sections, NotResolved
from dlt.common.configuration.specs import BaseConfiguration, CredentialsConfiguration
from dlt.common.destination.capabilities import DestinationCapabilitiesContext
Expand All @@ -49,13 +60,26 @@
from dlt.common.storages import FileStorage
from dlt.common.storages.load_storage import ParsedLoadJobFileName
from dlt.common.storages.load_package import LoadJobInfo, TPipelineStateDoc
from dlt.common.exceptions import MissingDependencyException


TDestinationConfig = TypeVar("TDestinationConfig", bound="DestinationClientConfiguration")
TDestinationClient = TypeVar("TDestinationClient", bound="JobClientBase")
TDestinationDwhClient = TypeVar("TDestinationDwhClient", bound="DestinationClientDwhConfiguration")

DEFAULT_FILE_LAYOUT = "{table_name}/{load_id}.{file_id}.{ext}"

if TYPE_CHECKING:
try:
from dlt.common.libs.pandas import DataFrame
from dlt.common.libs.pyarrow import Table as ArrowTable
except MissingDependencyException:
DataFrame = Any
ArrowTable = Any
else:
DataFrame = Any
ArrowTable = Any


class StorageSchemaInfo(NamedTuple):
version_hash: str
Expand Down Expand Up @@ -442,6 +466,65 @@ def create_followup_jobs(self, final_state: TLoadJobState) -> List[FollowupJobRe
return []


class SupportsReadableRelation(Protocol):
"""A readable relation retrieved from a destination that supports it"""

schema_columns: TTableSchemaColumns
"""Known dlt table columns for this relation"""

def df(self, chunk_size: int = None) -> Optional[DataFrame]:
"""Fetches the results as data frame. For large queries the results may be chunked
Fetches the results into a data frame. The default implementation uses helpers in `pandas.io.sql` to generate Pandas data frame.
This function will try to use native data frame generation for particular destination. For `BigQuery`: `QueryJob.to_dataframe` is used.
For `duckdb`: `DuckDBPyConnection.df'
Args:
chunk_size (int, optional): Will chunk the results into several data frames. Defaults to None
**kwargs (Any): Additional parameters which will be passed to native data frame generation function.
Returns:
Optional[DataFrame]: A data frame with query results. If chunk_size > 0, None will be returned if there is no more data in results
"""
...

def arrow(self, chunk_size: int = None) -> Optional[ArrowTable]: ...

def iter_df(self, chunk_size: int) -> Generator[DataFrame, None, None]: ...

def iter_arrow(self, chunk_size: int) -> Generator[ArrowTable, None, None]: ...

def fetchall(self) -> List[Tuple[Any, ...]]: ...

def fetchmany(self, chunk_size: int) -> List[Tuple[Any, ...]]: ...

def iter_fetch(self, chunk_size: int) -> Generator[List[Tuple[Any, ...]], Any, Any]: ...

def fetchone(self) -> Optional[Tuple[Any, ...]]: ...


class DBApiCursor(SupportsReadableRelation):
"""Protocol for DBAPI cursor"""

description: Tuple[Any, ...]

native_cursor: "DBApiCursor"
"""Cursor implementation native to current destination"""

def execute(self, query: AnyStr, *args: Any, **kwargs: Any) -> None: ...
def close(self) -> None: ...


class SupportsReadableDataset(Protocol):
"""A readable dataset retrieved from a destination, has support for creating readable relations for a query or table"""

def __call__(self, query: Any) -> SupportsReadableRelation: ...

def __getitem__(self, table: str) -> SupportsReadableRelation: ...

def __getattr__(self, table: str) -> SupportsReadableRelation: ...


class JobClientBase(ABC):
def __init__(
self,
Expand Down
1 change: 1 addition & 0 deletions dlt/common/libs/pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

try:
import pandas
from pandas import DataFrame
except ModuleNotFoundError:
raise MissingDependencyException("dlt Pandas Helpers", ["pandas"])

Expand Down
Loading

0 comments on commit 4ee65a8

Please sign in to comment.