Skip to content

Commit

Permalink
extend dataset querying
Browse files Browse the repository at this point in the history
  • Loading branch information
zilto committed Mar 6, 2025
1 parent d6ec255 commit 918635d
Show file tree
Hide file tree
Showing 4 changed files with 411 additions and 16 deletions.
18 changes: 18 additions & 0 deletions dlt/common/schema/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,24 @@ def get_root_table(tables: TSchemaTables, table_name: str) -> TTableSchema:
return table


def get_root_to_table_chain(tables: TSchemaTables, table_name: str) -> List[TTableSchema]:
"""Return a list of tables ordered from root to child using the (row_key - parent_key) references.
Similar functions:
- `get_root_table()` returns the root of the specified child instead of the full chain
- `get_nested_tables()` returns all children of the specified table as a chain
"""
chain: List[TTableSchema] = []

def _parent(t: TTableSchema) -> None:
chain.append(t)
if t.get("parent"):
_parent(tables[t["parent"]])

_parent(tables[table_name])
return chain[::-1]


def get_nested_tables(tables: TSchemaTables, table_name: str) -> List[TTableSchema]:
"""Get nested tables for table name and return a list of tables ordered by ancestry so the nested tables are always after their parents
Expand Down
27 changes: 27 additions & 0 deletions dlt/destinations/dataset/dataset.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import textwrap
from typing import Any, Union, TYPE_CHECKING, List

from dlt.common.json import json
Expand Down Expand Up @@ -128,11 +129,14 @@ def table(self, table_name: str) -> SupportsReadableRelation:
from dlt.helpers.ibis import create_unbound_ibis_table
from dlt.destinations.dataset.ibis_relation import ReadableIbisRelation

# TODO use the self.ibis() connection instead of `self.sql_client` and work against the Ibis Schema
# creating unbound tables would no longer be required and queries could be evaluated lazily
unbound_table = create_unbound_ibis_table(self.sql_client, self.schema, table_name)
return ReadableIbisRelation( # type: ignore[abstract]
readable_dataset=self,
ibis_object=unbound_table,
columns_schema=self.schema.tables[table_name]["columns"],
table_name=table_name,
)
except MissingDependencyException:
# if ibis is explicitly requested, reraise
Expand Down Expand Up @@ -171,6 +175,29 @@ def row_counts(
# Execute query and build result dict
return self(query)

def list_load_ids(self, status: Union[int, list[int]] = 0, limit: int = 10) -> list[str]:
"""Return the list most recent `load_id`s in descending order.
If no `load_id` is found, return empty list.
"""
status_value = (status,) if isinstance(status, int) else tuple(status)
# TODO protect from SQL injection
query = textwrap.dedent(f"""SELECT load_id
FROM {self.schema.loads_table_name}
WHERE status IN {status_value}
ORDER BY load_id DESC
LIMIT {limit}""")
results = self.__call__(query=query).fetchall()
return [row[0] for row in results]

def latest_load_id(self, status: Union[int, list[int]] = 0) -> Union[str, None]:
"""Return the latest `load_id`.
If no `load_id` is found, return None
"""
results = self.list_load_ids(status=status, limit=1)
return results[0] if len(results) > 0 else None

def __getitem__(self, table_name: str) -> SupportsReadableRelation:
"""access of table via dict notation"""
return self.table(table_name)
Expand Down
162 changes: 161 additions & 1 deletion dlt/destinations/dataset/ibis_relation.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from collections.abc import Sequence
from typing import TYPE_CHECKING, Any, Union, Sequence

from functools import partial

from dlt.common.exceptions import MissingDependencyException
from dlt.destinations.dataset.relation import BaseReadableDBAPIRelation
from dlt.common.schema import Schema
from dlt.common.schema.utils import get_root_table, get_root_to_table_chain
from dlt.common.schema.typing import TTableSchemaColumns
from dlt.destinations.dataset.relation import BaseReadableDBAPIRelation


if TYPE_CHECKING:
Expand Down Expand Up @@ -54,11 +57,13 @@ def __init__(
readable_dataset: ReadableDBAPIDataset,
ibis_object: Any = None,
columns_schema: TTableSchemaColumns = None,
table_name: str = None,
) -> None:
"""Create a lazy evaluated relation to for the dataset of a destination"""
super().__init__(readable_dataset=readable_dataset)
self._ibis_object = ibis_object
self._columns_schema = columns_schema
self._table_name = table_name

def query(self) -> Any:
"""build the query"""
Expand All @@ -85,6 +90,10 @@ def columns_schema(self) -> TTableSchemaColumns:
def columns_schema(self, new_value: TTableSchemaColumns) -> None:
raise NotImplementedError("columns schema in ReadableDBAPIRelation can only be computed")

@property
def table_name(self) -> str:
return self._table_name

def compute_columns_schema(self) -> TTableSchemaColumns:
"""provide schema columns for the cursor, may be filtered by selected columns"""
# TODO: provide column lineage tracing with sqlglot lineage
Expand Down Expand Up @@ -173,6 +182,64 @@ def _get_filtered_columns_schema(self, columns: Sequence[str]) -> TTableSchemaCo
# here we just break the column schema inheritance chain
return None

def _filter_nested_table(self, filtered_root_table: Any) -> Any: # ibis.expr.types.Table
"""Filter the current table based on a filtered root table.
This takes the Ibis Table expression `filtered_root_table`, which has filtered rows,
and propagate the selection of `_dlt_id` on the root table via `row_key -> parent_key`
recursively.
To visualize it:
```
filtered_root_table = root.filter(root._dlt_load_id.isin(
["foo", "bar"]
))._dlt_id # root row_key
child2.filter(child2._dlt_parent_id.isin( # child2 parent_key
child1.filter(child1._dlt_parent_id.isin( # child1 parent_key
filtered_root_table
))._dlt_id # child1 row_key
))
```
Takes as input and returns a `ibis.expr.types.Table`
"""
from dlt.helpers.ibis import create_unbound_ibis_table

filtered_table = filtered_root_table
parent_row_key = None
for table in get_root_to_table_chain(self.schema.tables, self.table_name):
# the root table is already filtered, only set the parent_row_key
if parent_row_key is None:
parent_row_key = next(
col_name
for col_name, col in table["columns"].items()
if col.get("row_key") is True
)
continue

parent_key = None
row_key = None
for col_name, col in table["columns"].items():
if col.get("parent_key") is True:
parent_key = col_name
if col.get("row_key") is True:
row_key = col_name

# should always match a column because `get_root_to_table_chain()` returns tables based on parent_key / row_key
assert parent_key is not None
assert row_key is not None

ibis_table = create_unbound_ibis_table(
self.sql_client, schema=self.schema, table_name=table["name"]
)
filter_clause = ibis_table[parent_key].isin(filtered_table[parent_row_key])
# TODO the only operation to proxy if required
filtered_table = ibis_table.filter(filter_clause)

parent_row_key = row_key

return filtered_table

# forward ibis methods defined on interface
def limit(self, limit: int, **kwargs: Any) -> "ReadableIbisRelation":
"""limit the result to 'limit' items"""
Expand All @@ -186,6 +253,99 @@ def select(self, *columns: str) -> "ReadableIbisRelation":
"""set which columns will be selected"""
return self._proxy_expression_method("select", *columns) # type: ignore

def filter_by_load_ids(self, load_ids: Union[str, list[str]]) -> "ReadableIbisRelation":
"""Filter on matching `load_ids`."""
from dlt.helpers.ibis import create_unbound_ibis_table

load_ids = (
[
load_ids,
]
if isinstance(load_ids, str)
else load_ids
)

root_table = get_root_table(self.schema.tables, self.table_name)
ibis_root_table = create_unbound_ibis_table(
self.sql_client, schema=self.schema, table_name=root_table["name"]
)

# filter the root table
filtered_table = ibis_root_table.filter(ibis_root_table["_dlt_load_id"].isin(load_ids))
if root_table["name"] != self.table_name:
filtered_table = self._filter_nested_table(filtered_table)

# TODO use the proxies? it's a bit hard to do all that proxying
return self.__class__(
readable_dataset=self._dataset,
ibis_object=filtered_table,
columns_schema=self.columns_schema,
)

def filter_by_latest_load_id(self, status: Union[int, list[int]] = 0) -> "ReadableIbisRelation":
"""Filter on the most recent `load_id` with a specific status."""
from dlt.helpers.ibis import create_unbound_ibis_table

status = (
[
status,
]
if isinstance(status, int)
else status
)

root_table = get_root_table(self.schema.tables, self.table_name)
ibis_root_table = create_unbound_ibis_table(
self.sql_client, schema=self.schema, table_name=root_table["name"]
)
load_table = create_unbound_ibis_table(
self.sql_client, schema=self.schema, table_name=self.schema.loads_table_name
)

latest_load_id = load_table.filter(
load_table.status.isin(status)
).load_id.max() # lazy expression
filtered_table = ibis_root_table.filter(ibis_root_table["_dlt_load_id"] == latest_load_id)
if root_table["name"] != self.table_name:
filtered_table = self._filter_nested_table(filtered_table)

return self.__class__(
readable_dataset=self._dataset,
ibis_object=filtered_table,
columns_schema=self.columns_schema,
)

def filter_by_load_status(self, status: Union[int, list[int]] = 0) -> "ReadableIbisRelation":
""""""
from dlt.helpers.ibis import create_unbound_ibis_table

status = (
[
status,
]
if isinstance(status, int)
else status
)

root_table = get_root_table(self.schema.tables, self.table_name)
ibis_root_table = create_unbound_ibis_table(
self.sql_client, schema=self.schema, table_name=root_table["name"]
)
load_table = create_unbound_ibis_table(
self.sql_client, schema=self.schema, table_name=self.schema.loads_table_name
)

load_ids = load_table.filter(load_table.status.isin(status)).load_id # lazy expression
filtered_table = ibis_root_table.filter(ibis_root_table["_dlt_load_id"].isin(load_ids))
if root_table["name"] != self.table_name:
filtered_table = self._filter_nested_table(filtered_table)

return self.__class__(
readable_dataset=self._dataset,
ibis_object=filtered_table,
columns_schema=self.columns_schema,
)

# forward ibis comparison and math operators
def __lt__(self, other: Any) -> "ReadableIbisRelation":
return self._proxy_expression_method("__lt__", other) # type: ignore
Expand Down
Loading

0 comments on commit 918635d

Please sign in to comment.