Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add motherduck examples for duckdb plugin #1730

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions examples/duckdb_plugin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
.. tags:: Integration, Data, Analytics, Beginner
```

[DuckDB](https://duckdb.org/) is an in-process SQL OLAP database management system that is explicitly designed to achieve high performance in analytics.
[DuckDB](https://duckdb.org/) is an in-process SQL OLAP database management system that is explicitly designed to achieve high performance in analytics. MotherDuck is a collaborative data warehouse that extends the power of DuckDB to the cloud.

The Flytekit DuckDB plugin facilitates the efficient execution of intricate analytical queries within your workflow.
The Flytekit DuckDB plugin facilitates the efficient execution of intricate analytical queries within your workflow either in-process with DuckDB, on the cloud with MotherDuck, or a hybrid of the two.

To install the Flytekit DuckDB plugin, run the following command:

Expand All @@ -18,8 +18,9 @@ pip install flytekitplugins-duckdb

The Flytekit DuckDB plugin includes the {py:class}`~flytekitplugins:flytekitplugins.duckdb.DuckDBQuery` task, which allows you to specify the following parameters:

- `query`: The DuckDB query to execute.
- `query`: The DuckDB query to execute. This is optional as it can be passed at initialization or run time.
- `inputs`: The query parameters to be used during query execution. This can be a StructuredDataset, a string or a list.
- `provider`: This is a {py:class}`~flytekitplugins:flytekitplugins.duckdb.DuckDBProvider` or a callable that facilitates the connection to a remote database if desired.

```{auto-examples-toc}
duckdb_example
Expand Down
99 changes: 96 additions & 3 deletions examples/duckdb_plugin/duckdb_plugin/duckdb_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@

# %%
import json
from typing import List
from typing import List, Tuple

import pandas as pd
import pyarrow as pa
from flytekit import kwtypes, task, workflow
from flytekit import Secret, dynamic, kwtypes, task, workflow
from flytekit.types.structured.structured_dataset import StructuredDataset
from flytekitplugins.duckdb import DuckDBQuery
from flytekitplugins.duckdb import DuckDBProvider, DuckDBQuery
from typing_extensions import Annotated

# %% [markdown]
Expand Down Expand Up @@ -160,3 +160,96 @@ def params_wf(

if __name__ == "__main__":
print(f"Running params_wf()... {params_wf()}")

# %% [markdown]
# ## Queries to MotherDuck
#
# The DuckDB plugin can be used to make DuckDB queries to a remote MotherDuck data warehouse by specifying the
# MotherDuck `DuckDBProvider` and passing a secret called `motherduck_token`. Hybrid queries can target remote
# data in MotherDuck and data local to a Flyte task at the same time.
#
# %%
# This query targets a sample_data.nyc.rideshare table in MotherDuck
motherduck_query = DuckDBQuery(
name="motherduck_query",
query="SELECT MEAN(trip_time) FROM sample_data.nyc.rideshare",
provider=DuckDBProvider.MOTHERDUCK,
secret_requests=[Secret(key="motherduck_token")],
)

# This query targets a e_commerce.year_09_10 table in MotherDuck and a DataFrame mydf local to the task
hybrid_motherduck_query = DuckDBQuery(
name="my_query",
query="""
WITH HistoricalData AS
(SELECT COUNT(DISTINCT "Customer ID") AS CustomerCount_Historical FROM e_commerce.year_09_10),
RecentData AS
(SELECT COUNT(DISTINCT "Customer ID") AS CustomerCount_Recent FROM mydf)

SELECT HistoricalData.CustomerCount_Historical, RecentData.CustomerCount_Recent FROM HistoricalData, RecentData
""",
inputs=kwtypes(mydf=pd.DataFrame),
provider=DuckDBProvider.MOTHERDUCK,
secret_requests=[Secret(key="motherduck_token")],
)


@workflow
def motherduck_wf(mydf: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]:
motherduck_response = motherduck_query()
hybrid_response = hybrid_motherduck_query(mydf=mydf)
return motherduck_response, hybrid_response


if __name__ == "__main__":
print(f"Running motherduck_wf()... {motherduck_wf()}")

# %% [markdown]
# ## Runtime Queries
#
# If task logic is necessary to craft a query, the query can be passed at runtime rather than when the `DuckDBQuery` is
# initialized.
#
# %%
runtime_query_task = DuckDBQuery(
name="runtime_query_task",
inputs=kwtypes(query=str, mydf=pd.DataFrame),
)


@dynamic
def check_dataframe(mydf: pd) -> pd.DataFrame:
col_a_present = "column_a" in mydf.columns
col_b_present = "column_b" in mydf.columns

if col_a_present and col_b_present:
query = """
SELECT column_a, column_b
FROM mydf
WHERE column_a > 10 AND column_b < 100;
"""
elif col_a_present:
query = """
SELECT column_a
FROM mydf
WHERE column_a > 10;
"""
elif col_b_present:
query = """
SELECT column_b
FROM mydf
WHERE column_b < 100;
"""
else:
raise ValueError("Neither 'column_a' nor 'column_b' is present in the DataFrame.")

return simple_duckdb_query(query=query, mydf=mydf)


@workflow
def runtime_query_wf(mydf: pd.DataFrame) -> pd.DataFrame:
return check_dataframe(mydf=mydf)


if __name__ == "__main__":
print(f"Running runtime_query_wf()... {runtime_query_wf()}")
Loading