Skip to content

Commit

Permalink
updates duckdb/motherduck load job, adds full ci for motherduck and u…
Browse files Browse the repository at this point in the history
…pdates docs (#1674)

* adds full ci for motherduck and updates docs

* drops parquet locks from duckdb, matches parquet to columns by name, allows full jsonl loading

* fixes basic job and sql client tests so motherduck+parquet runs

* adds parallel parquet loading test
  • Loading branch information
rudolfix authored Aug 12, 2024
1 parent 7aeb6c8 commit 250c2e2
Show file tree
Hide file tree
Showing 14 changed files with 403 additions and 109 deletions.
80 changes: 80 additions & 0 deletions .github/workflows/test_destination_motherduck.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@

name: dest | motherduck

on:
pull_request:
branches:
- master
- devel
workflow_dispatch:
schedule:
- cron: '0 2 * * *'

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

env:
DLT_SECRETS_TOML: ${{ secrets.DLT_SECRETS_TOML }}

RUNTIME__SENTRY_DSN: https://[email protected]/4504819859914752
RUNTIME__LOG_LEVEL: ERROR
RUNTIME__DLTHUB_TELEMETRY_ENDPOINT: ${{ secrets.RUNTIME__DLTHUB_TELEMETRY_ENDPOINT }}

ACTIVE_DESTINATIONS: "[\"motherduck\"]"
ALL_FILESYSTEM_DRIVERS: "[\"memory\"]"

jobs:
get_docs_changes:
name: docs changes
uses: ./.github/workflows/get_docs_changes.yml
if: ${{ !github.event.pull_request.head.repo.fork || contains(github.event.pull_request.labels.*.name, 'ci from fork')}}

run_loader:
name: dest | motherduck tests
needs: get_docs_changes
if: needs.get_docs_changes.outputs.changes_outside_docs == 'true'
defaults:
run:
shell: bash
runs-on: "ubuntu-latest"

steps:

- name: Check out
uses: actions/checkout@master

- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: "3.10.x"

- name: Install Poetry
uses: snok/[email protected]
with:
virtualenvs-create: true
virtualenvs-in-project: true
installer-parallel: true

- name: Load cached venv
id: cached-poetry-dependencies
uses: actions/cache@v3
with:
path: .venv
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-motherduck

- name: Install dependencies
run: poetry install --no-interaction -E motherduck -E s3 -E gs -E az -E parquet --with sentry-sdk --with pipeline

- name: create secrets.toml
run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml

- run: |
poetry run pytest tests/load -m "essential"
name: Run essential tests Linux
if: ${{ ! (contains(github.event.pull_request.labels.*.name, 'ci full') || github.event_name == 'schedule')}}
- run: |
poetry run pytest tests/load
name: Run all tests Linux
if: ${{ contains(github.event.pull_request.labels.*.name, 'ci full') || github.event_name == 'schedule'}}
2 changes: 1 addition & 1 deletion .github/workflows/test_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ env:
RUNTIME__DLTHUB_TELEMETRY_ENDPOINT: ${{ secrets.RUNTIME__DLTHUB_TELEMETRY_ENDPOINT }}
# Test redshift and filesystem with all buckets
# postgres runs again here so we can test on mac/windows
ACTIVE_DESTINATIONS: "[\"redshift\", \"postgres\", \"duckdb\", \"filesystem\", \"dummy\", \"motherduck\"]"
ACTIVE_DESTINATIONS: "[\"redshift\", \"postgres\", \"duckdb\", \"filesystem\", \"dummy\"]"

jobs:
get_docs_changes:
Expand Down
30 changes: 9 additions & 21 deletions dlt/destinations/impl/duckdb/duck.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,6 @@

HINT_TO_POSTGRES_ATTR: Dict[TColumnHint, str] = {"unique": "UNIQUE"}

# duckdb cannot load PARQUET to the same table in parallel. so serialize it per table
PARQUET_TABLE_LOCK = threading.Lock()
TABLES_LOCKS: Dict[str, threading.Lock] = {}


class DuckDbTypeMapper(TypeMapper):
sct_to_unbound_dbt = {
Expand Down Expand Up @@ -123,28 +119,20 @@ def run(self) -> None:

qualified_table_name = self._sql_client.make_qualified_table_name(self.load_table_name)
if self._file_path.endswith("parquet"):
source_format = "PARQUET"
options = ""
# lock when creating a new lock
with PARQUET_TABLE_LOCK:
# create or get lock per table name
lock: threading.Lock = TABLES_LOCKS.setdefault(
qualified_table_name, threading.Lock()
)
source_format = "read_parquet"
options = ", union_by_name=true"
elif self._file_path.endswith("jsonl"):
# NOTE: loading JSON does not work in practice on duckdb: the missing keys fail the load instead of being interpreted as NULL
source_format = "JSON" # newline delimited, compression auto
options = ", COMPRESSION GZIP" if FileStorage.is_gzipped(self._file_path) else ""
lock = None
source_format = "read_json" # newline delimited, compression auto
options = ", COMPRESSION=GZIP" if FileStorage.is_gzipped(self._file_path) else ""
else:
raise ValueError(self._file_path)

with maybe_context(lock):
with self._sql_client.begin_transaction():
self._sql_client.execute_sql(
f"COPY {qualified_table_name} FROM '{self._file_path}' ( FORMAT"
f" {source_format} {options});"
)
with self._sql_client.begin_transaction():
self._sql_client.execute_sql(
f"INSERT INTO {qualified_table_name} BY NAME SELECT * FROM"
f" {source_format}('{self._file_path}' {options});"
)


class DuckDbClient(InsertValuesJobClient):
Expand Down
3 changes: 2 additions & 1 deletion dlt/destinations/impl/motherduck/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.is_max_query_length_in_bytes = True
caps.max_text_data_type_length = 1024 * 1024 * 1024
caps.is_max_text_data_type_length_in_bytes = True
caps.supports_ddl_transactions = False
caps.supports_ddl_transactions = True
caps.alter_add_multi_column = False
caps.supports_truncate_command = False
caps.supported_merge_strategies = ["delete-insert", "scd2"]
caps.max_parallel_load_jobs = 8

return caps

Expand Down
2 changes: 1 addition & 1 deletion docs/website/docs/dlt-ecosystem/destinations/duckdb.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ You can configure the following file formats to load data to duckdb:
:::note
`duckdb` cannot COPY many parquet files to a single table from multiple threads. In this situation, `dlt` serializes the loads. Still, that may be faster than INSERT.
:::
* [jsonl](../file-formats/jsonl.md) **is supported but does not work if JSON fields are optional. The missing keys fail the COPY instead of being interpreted as NULL.**
* [jsonl](../file-formats/jsonl.md)

:::tip
`duckdb` has [timestamp types](https://duckdb.org/docs/sql/data_types/timestamp.html) with resolutions from milliseconds to nanoseconds. However
Expand Down
42 changes: 17 additions & 25 deletions docs/website/docs/dlt-ecosystem/destinations/motherduck.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
---
title: 🧪 MotherDuck
title: MotherDuck
description: MotherDuck `dlt` destination
keywords: [MotherDuck, duckdb, destination, data warehouse]
---

# MotherDuck
> 🧪 MotherDuck is still invitation-only and is being intensively tested. Please see the limitations/problems at the end.

## Install dlt with MotherDuck
**To install the dlt library with MotherDuck dependencies:**
Expand Down Expand Up @@ -50,11 +49,19 @@ Alternatively, you can use the connection string syntax.
motherduck.credentials="md:///dlt_data_3?token=<my service token>"
```

:::tip
Motherduck now supports configurable **access tokens**. Please refer to the [documentation](https://motherduck.com/docs/key-tasks/authenticating-to-motherduck/#authentication-using-an-access-token)
:::

**4. Run the pipeline**
```sh
python3 chess_pipeline.py
```

### Motherduck connection identifier
We enable Motherduck to identify that the connection is created by `dlt`. Motherduck will use this identifier to better understand the usage patterns
associated with `dlt` integration. The connection identifier is `dltHub_dlt/DLT_VERSION(OS_NAME)`.

## Write disposition
All write dispositions are supported.

Expand All @@ -64,22 +71,19 @@ By default, Parquet files and the `COPY` command are used to move files to the r
The **INSERT** format is also supported and will execute large INSERT queries directly into the remote database. This method is significantly slower and may exceed the maximum query size, so it is not advised.

## dbt support
This destination [integrates with dbt](../transformations/dbt/dbt.md) via [dbt-duckdb](https://github.com/jwills/dbt-duckdb), which is a community-supported package. `dbt` version >= 1.5 is required (which is the current `dlt` default.)
This destination [integrates with dbt](../transformations/dbt/dbt.md) via [dbt-duckdb](https://github.com/jwills/dbt-duckdb), which is a community-supported package. `dbt` version >= 1.7 is required

## Multi-statement transaction support
Motherduck supports multi-statement transactions. This change happened with `duckdb 0.10.2`.

## Syncing of `dlt` state
This destination fully supports [dlt state sync](../../general-usage/state#syncing-state-with-destination).

## Automated tests
Each destination must pass a few hundred automatic tests. MotherDuck is passing these tests (except for the transactions, of course). However, we have encountered issues with ATTACH timeouts when connecting, which makes running such a number of tests unstable. Tests on CI are disabled.
## Troubleshooting

## Troubleshooting / limitations

### I see a lot of errors in the log like DEADLINE_EXCEEDED or Connection timed out
MotherDuck is very sensitive to the quality of the internet connection and the **number of workers used to load data**. Decrease the number of workers and ensure your internet connection is stable. We have not found any way to increase these timeouts yet.

### MotherDuck does not support transactions.
Do not use `begin`, `commit`, and `rollback` on `dlt` **sql_client** or on the duckdb dbapi connection. It has no effect on DML statements (they are autocommit). It confuses the query engine for DDL (tables not found, etc.).
If your connection is of poor quality and you get a timeout when executing a DML query, it may happen that your transaction got executed.
### My database is attached in read only mode
ie. `Error: Invalid Input Error: Cannot execute statement of type "CREATE" on database "dlt_data" which is attached in read-only mode!`
We encountered this problem for databases created with `duckdb 0.9.x` and then migrated to `0.10.x`. After switch to `1.0.x` on Motherduck, all our databases had permission "read-only" visible in UI. We could not figure out how to change it so we dropped and recreated our databases.

### I see some exception with home_dir missing when opening `md:` connection.
Some internal component (HTTPS) requires the **HOME** env variable to be present. Export such a variable to the command line. Here is what we do in our tests:
Expand All @@ -88,17 +92,5 @@ os.environ["HOME"] = "/tmp"
```
before opening the connection.

### I see some watchdog timeouts.
We also see them.
```text
'ATTACH_DATABASE': keepalive watchdog timeout
```
Our observation is that if you write a lot of data into the database, then close the connection and then open it again to write, there's a chance of such a timeout. A possible **WAL** file is being written to the remote duckdb database.

### Invalid Input Error: Initialization function "motherduck_init" from file
Use `duckdb 0.8.1` or above.

### Motherduck connection identifier
We enable Motherduck to identify that the connection is created by `dlt`. Motherduck will use this identifier to better understand the usage patterns
associated with `dlt` integration. The connection identifier is `dltHub_dlt/DLT_VERSION(OS_NAME)`.
<!--@@@DLT_TUBA motherduck-->
8 changes: 4 additions & 4 deletions tests/cases.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def table_update_and_row(
Optionally exclude some data types from the schema and row.
"""
column_schemas = deepcopy(TABLE_UPDATE_COLUMNS_SCHEMA)
data_row = deepcopy(TABLE_ROW_ALL_DATA_TYPES)
data_row = deepcopy(TABLE_ROW_ALL_DATA_TYPES_DATETIMES)
exclude_col_names = list(exclude_columns or [])
if exclude_types:
exclude_col_names.extend(
Expand All @@ -203,7 +203,7 @@ def assert_all_data_types_row(
# content must equal
# print(db_row)
schema = schema or TABLE_UPDATE_COLUMNS_SCHEMA
expected_row = expected_row or TABLE_ROW_ALL_DATA_TYPES
expected_row = expected_row or TABLE_ROW_ALL_DATA_TYPES_DATETIMES

# Include only columns requested in schema
if isinstance(db_row, dict):
Expand Down Expand Up @@ -274,8 +274,8 @@ def assert_all_data_types_row(
# then it must be json
db_mapping["col9"] = json.loads(db_mapping["col9"])

if "col10" in db_mapping:
db_mapping["col10"] = db_mapping["col10"].isoformat()
# if "col10" in db_mapping:
# db_mapping["col10"] = db_mapping["col10"].isoformat()
if "col11" in db_mapping:
db_mapping["col11"] = ensure_pendulum_time(db_mapping["col11"]).isoformat()

Expand Down
9 changes: 9 additions & 0 deletions tests/common/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,15 @@ def test_json_pendulum(json_impl: SupportsJson) -> None:
assert s_r == pendulum.parse(dt_str_z)


# @pytest.mark.parametrize("json_impl", _JSON_IMPL)
# def test_json_timedelta(json_impl: SupportsJson) -> None:
# from datetime import timedelta
# start_date = pendulum.parse("2005-04-02T20:37:37.358236Z")
# delta = pendulum.interval(start_date, pendulum.now())
# assert isinstance(delta, timedelta)
# print(str(delta.as_timedelta()))


@pytest.mark.parametrize("json_impl", _JSON_IMPL)
def test_json_named_tuple(json_impl: SupportsJson) -> None:
assert (
Expand Down
Loading

0 comments on commit 250c2e2

Please sign in to comment.