Skip to content

Commit

Permalink
Support DuckDBPyRelation kernel computations (#37)
Browse files Browse the repository at this point in the history
  • Loading branch information
MarcSkovMadsen authored Nov 10, 2024
1 parent a700b38 commit 30a9ed4
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 29 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -165,3 +165,6 @@ cython_debug/
#.idea/
script.*
tmp_graphic_walker.json
tmp.db
tmp.ibis.db
tmp.db.wal
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,8 @@ Our dream is that this package is super simple to use and supports your use case
| ---- | - | - | - |
| Pandas ||| |
| Polars ||| |
| DuckDB Relation ||| |
| Dask ||| [Not supported by Pygwalker](https://github.com/Kanaries/pygwalker/issues/658) |
| DuckDB Relation ||| [Not supported by Pygwalker](https://github.com/Kanaries/pygwalker/issues/658) |
| Pygwalker Database Connector ||| [Not supported by Narwhals](https://github.com/narwhals-dev/narwhals/issues/1289) |

Other backends might be supported if they are supported by both [Narwhals](https://github.com/narwhals-dev/narwhals) and [PygWalker](https://github.com/Kanaries/pygwalker).
Expand Down
24 changes: 19 additions & 5 deletions examples/reference/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,41 @@

DATA = "https://datasets.holoviz.org/significant_earthquakes/v1/significant_earthquakes.parquet"
df_pandas = pd.read_parquet(DATA)
duckdb_relation = duckdb.sql("SELECT * FROM df_pandas")
duckdb_simple = duckdb.sql("SELECT * FROM df_pandas")

con_in_memory = duckdb.connect(":memory:")
duckdb_in_memory = con_in_memory.sql("SELECT * FROM df_pandas")

con_persistent = duckdb.connect("tmp.db")
duckdb_persistent = con_persistent.sql("SELECT * FROM df_pandas")

DATAFRAMES = {
"pandas": df_pandas,
"polars": pl.read_parquet(DATA),
"dask": dd.read_parquet(DATA, npartitions=1),
"duckdb": duckdb_relation,
"duckdb-simple": duckdb_simple,
"duckdb in-memory": duckdb_in_memory,
"duckdb persistent": duckdb_persistent,
}

select = pn.widgets.Select(options=list(DATAFRAMES), name="Data Source")
kernel_computation = pn.widgets.Checkbox(name="Kernel Computation", value=False)

if pn.state.location:
pn.state.location.sync(select, {"value": "backend"})
pn.state.location.sync(kernel_computation, {"value": "kernel_computation"})


@pn.depends(select, kernel_computation)
def get_data(value, kernel_computation):
data = DATAFRAMES[value]
data = DATAFRAMES.get(value, None)
if data is None:
return "Not a valid option"
if not kernel_computation:
try:
data = data.head(10)
data = data.head(1000)
except:
data = data.df().head(10)
data = data.df().head(1000)
try:
return GraphicWalker(
data,
Expand Down
88 changes: 73 additions & 15 deletions src/panel_gwalker/_pygwalker.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import json
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Protocol
import sys
import weakref
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Protocol, runtime_checkable

import pandas as pd
from sqlalchemy import create_engine, text
Expand Down Expand Up @@ -37,6 +39,44 @@ def _convert_to_field_spec(spec: dict) -> dict:
}


@runtime_checkable
class ConnectorP(Protocol):
def query_datas(self, sql: str) -> List[Dict[str, Any]]: ...

@property
def dialect_name(self) -> str: ...


def _get_data_parser_non_pygwalker(
object, fields_specs, infer_string_to_date, infer_number_to_dimension, other_params
):
if isinstance(dataset, ConnectorP):
from pygwalker.data_parsers.database_parser import DatabaseDataParser

__classname2method[DatabaseDataParser] = (DatabaseDataParser, "connector")
return __classname2method[DatabaseDataParser]

return object, parser, name


class DuckDBPyRelationConnector(ConnectorP):
def __init__(self, relation):
self.relation = relation
self.view_sql = "SELECT * FROM __relation"

def query_datas(self, sql: str) -> List[Dict[str, Any]]:
__relation = self.relation

result = self.relation.query("__relation", sql).fetchall()
columns = self.relation.query("__relation", sql).columns
records = [dict(zip(columns, row)) for row in result]
return records

@property
def dialect_name(self) -> str:
return "duckdb"


def get_data_parser(
object,
field_specs: List[dict], # FieldSpec
Expand All @@ -45,24 +85,42 @@ def get_data_parser(
other_params: Dict[str, Any],
) -> "BaseDataParser":
try:
from pygwalker import data_parsers
from pygwalker.data_parsers.base import FieldSpec
from pygwalker.services.data_parsers import _get_data_parser
from pygwalker.data_parsers.database_parser import DatabaseDataParser
from pygwalker.services.data_parsers import (
__classname2method,
)
from pygwalker.services.data_parsers import (
_get_data_parser as _get_data_parser_pygwalker,
)
except ImportError as exc:
raise ImportError(
"Server dependencies are not installed. Please: pip install panel-graphic-walker[kernel]."
) from exc

custom_connector = None
if "duckdb" in sys.modules:
from duckdb.duckdb import DuckDBPyRelation

if isinstance(object, DuckDBPyRelation):
custom_connector = DuckDBPyRelationConnector(object)

if custom_connector:
object = custom_connector
__classname2method[DatabaseDataParser] = (DatabaseDataParser, "connector")
parser, name = __classname2method[DatabaseDataParser]
else:
try:
parser, name = _get_data_parser_pygwalker(object)
except TypeError as exc:
msg = f"Data type {type(object)} is currently not supported"
raise NotImplementedError(msg) from exc

_field_specs = [FieldSpec(**_convert_to_field_spec(spec)) for spec in field_specs]
try:
parser, name = _get_data_parser(object)
return parser(
object,
_field_specs,
infer_string_to_date,
infer_number_to_dimension,
other_params,
)
except TypeError as exc:
msg = f"Data type {type(object)} is currently not supported"
raise NotImplementedError(msg) from exc
return parser(
object,
_field_specs,
infer_string_to_date,
infer_number_to_dimension,
other_params,
)
39 changes: 36 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,50 @@
from panel_gwalker import GraphicWalker
from panel_gwalker._utils import _raw_fields

# Using duckdb relation as fixtures requires special care
# See https://github.com/duckdb/duckdb/issues/14771

@pytest.fixture(params=["pandas", "polars", "dask", "duckdb"])
def data(request, tmp_path):

@pytest.fixture(scope="session")
def memory_conn():
con = duckdb.connect()
con.execute("CREATE TABLE df_pandas (a INTEGER)")
con.execute("INSERT INTO df_pandas VALUES (1), (2), (3)")
return con


@pytest.fixture()
def persistent_conn(tmp_path):
database = (tmp_path / "tmp.db").as_posix()
con = duckdb.connect(database)
con.execute("CREATE TABLE df_pandas (a INTEGER)")
con.execute("INSERT INTO df_pandas VALUES (1), (2), (3)")
return con


@pytest.fixture(
params=[
"pandas",
"polars",
"dask",
"duckdb-simple",
"duckdb-in-memory",
"duckdb-persistent",
]
)
def data(request, tmp_path, memory_conn, persistent_conn):
if request.param == "pandas":
return pd.DataFrame({"a": [1, 2, 3]})
if request.param == "polars":
return pl.DataFrame({"a": [1, 2, 3]})
if request.param == "dask":
return dd.from_pandas(pd.DataFrame({"a": [1, 2, 3]}), npartitions=1)
if request.param == "duckdb":
if request.param == "duckdb-simple":
df_pandas = pd.DataFrame({"a": [1, 2, 3]})
return duckdb.sql("SELECT * FROM df_pandas")
if request.param == "duckdb-in-memory":
return memory_conn.sql("SELECT * FROM df_pandas")
if request.param == "duckdb-persistent":
return persistent_conn.sql("SELECT * FROM df_pandas")
else:
raise ValueError(f"Unknown data type: {request.param}")
62 changes: 62 additions & 0 deletions tests/test_duckdb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from typing import Any, Dict, List

import duckdb
import pandas as pd
import pytest


class Connector:
def __init__(self, relation):
self.relation = relation
self.view_sql = "SELECT * FROM __relation"

def query_datas(self, sql: str) -> List[Dict[str, Any]]:
__relation = self.relation

result = self.relation.query("__relation", sql).fetchall()
columns = self.relation.query("__relation", sql).columns
records = [dict(zip(columns, row)) for row in result]
return records

@property
def dialect_name(self) -> str:
return "duckdb"


@pytest.fixture(params=["in-memory", "persistent"])
def con(request, tmp_path):
if request.param == "in-memory":
database = ":memory:"
else:
database = (tmp_path / "tmp.db").as_posix()
con = duckdb.connect(database)
con.execute("CREATE TABLE df_pandas (a INTEGER)")
con.execute("INSERT INTO df_pandas VALUES (1), (2), (3)")
return con


@pytest.fixture
def data(con):
return con.sql("SELECT * FROM df_pandas")


def test_connector_simple_works():
df_pandas = pd.DataFrame({"a": [1, 2, 3]})
data = duckdb.sql("SELECT * FROM df_pandas")
connector = Connector(data)
assert connector.dialect_name == "duckdb"
assert connector.query_datas("SELECT * FROM __relation") == [
{"a": 1},
{"a": 2},
{"a": 3},
]


def test_connector_advanced_which_does_not_work(data):
connector = Connector(data)
assert connector.dialect_name == "duckdb"
assert connector.query_datas("SELECT * FROM __relation") == [
{"a": 1},
{"a": 2},
{"a": 3},
]
7 changes: 2 additions & 5 deletions tests/test_pygwalker.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
import dask.dataframe as dd
import duckdb
import pytest

from panel_gwalker._gwalker import get_data_parser


def test_get_data_parser(data):
if isinstance(data, (dd.DataFrame, duckdb.duckdb.DuckDBPyRelation)):
pytest.xfail(f"Unsupported data type: {type(data)}")

if str(type(data)) == "<class 'dask_expr._collection.DataFrame'>":
pytest.xfail("Dask DataFrame is not supported yet")
assert get_data_parser(data, [], False, False, {})

0 comments on commit 30a9ed4

Please sign in to comment.