Skip to content

Commit 0fd6a1e

Browse files
rchowellkevinzwang
authored andcommitted
feat(catalog): Integrate session and catalog actions alongside existing APIs [2/3] (#3825)
This is PR [2/3] for integrating #3805 to Daft/main. This PR adds several implementations with basic end-to-end sanity tests. Most notably, this PR includes the bi-directional implementations for Catalogs and Tables. These enable us to use python-backed implementations in rust (for daft-connect and daft-sql) while also being able to return Catalog/Table python objects for use with the python APIs. **Changes** 1. Adds several internal factories for attach/detaching supported objects to the session. 2. Adds `Catalog.from_pydict` to cover basic needs and some backwards compatibility. 3. Adds `Identifier.from_str` for pyiceberg style identifiers (these are entirely optional). 4. Flesh out the [minimal session actions](https://github.com/Eventual-Inc/Daft/compare/rchowell/catalog-1-of-3...rchowell/catalog-2-of-3?expand=1#diff-fc2305e1560f8f7d5a974f58325f4b231bc638d6bacebbc795b6f8fb924114ccR1884-R1895) for a release. 5. Adds Bindings for a single name resolution abstraction, will come in handy for case-normalization in the near future. 6. Updates daft-connect and daft-sql to use `attach_table` Part [3/3] will cutover all deprecated APIs to use these new APIs and remove the unused artifacts. These APIs are enumerated in #3820
1 parent 49783ad commit 0fd6a1e

27 files changed

+1080
-169
lines changed

Cargo.lock

-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

-1
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,6 @@ python = [
8181
"daft-stats/python",
8282
"daft-recordbatch/python",
8383
"daft-writers/python",
84-
"dep:daft-catalog-python-catalog",
8584
"dep:daft-connect",
8685
"daft-connect/python",
8786
"dep:pyo3",

daft/catalog/__iceberg.py

+18-12
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from pyiceberg.catalog import Catalog as InnerCatalog
99
from pyiceberg.table import Table as InnerTable
1010

11-
from daft.catalog import Catalog, Table
11+
from daft.catalog import Catalog, Identifier, Table
1212

1313
if TYPE_CHECKING:
1414
from daft.dataframe import DataFrame
@@ -34,16 +34,13 @@ def _from_obj(obj: object) -> IcebergCatalog:
3434
return c
3535
raise ValueError(f"Unsupported iceberg catalog type: {type(obj)}")
3636

37-
@property
38-
def inner(self) -> InnerCatalog:
39-
"""Returns the inner iceberg catalog."""
40-
return self._inner
41-
4237
###
4338
# get_*
4439
###
4540

46-
def get_table(self, name: str) -> IcebergTable:
41+
def get_table(self, name: str | Identifier) -> IcebergTable:
42+
if isinstance(name, Identifier):
43+
name = tuple(name) # type: ignore
4744
return IcebergTable(self._inner.load_table(name))
4845

4946
###
@@ -59,20 +56,29 @@ class IcebergTable(Table):
5956
_inner: InnerTable
6057

6158
def __init__(self, inner: InnerTable):
59+
"""DEPRECATED: Please use `Table.from_iceberg`; version 0.5.0!"""
60+
warnings.warn(
61+
"This is deprecated and will be removed in daft >= 0.5.0, please prefer using `Table.from_iceberg` instead; version 0.5.0!",
62+
category=DeprecationWarning,
63+
)
6264
self._inner = inner
6365

66+
@staticmethod
67+
def _from_obj(obj: object) -> IcebergTable | None:
68+
"""Returns an IcebergTable if the given object can be adapted so."""
69+
if isinstance(obj, InnerTable):
70+
t = IcebergTable.__new__(IcebergTable)
71+
t._inner = obj
72+
return t
73+
raise ValueError(f"Unsupported iceberg table type: {type(obj)}")
74+
6475
@staticmethod
6576
def _try_from(obj: object) -> IcebergTable | None:
6677
"""Returns an IcebergTable if the given object can be adapted so."""
6778
if isinstance(obj, InnerTable):
6879
return IcebergTable(obj)
6980
return None
7081

71-
@property
72-
def inner(self) -> InnerTable:
73-
"""Returns the inner iceberg table."""
74-
return self._inner
75-
7682
def read(self) -> DataFrame:
7783
import daft
7884

daft/catalog/__init__.py

+123-12
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,16 @@
4242

4343
from abc import ABC, abstractmethod
4444
from collections.abc import Sequence
45-
from daft.daft import catalog as native_catalog
45+
from daft.daft import PyTableSource, catalog as native_catalog
4646
from daft.daft import PyIdentifier
4747
from daft.logical.builder import LogicalPlanBuilder
4848

4949
from daft.dataframe import DataFrame
5050

5151
from typing import TYPE_CHECKING
5252

53+
from daft.logical.schema import Schema
54+
5355
if TYPE_CHECKING:
5456
from daft.dataframe.dataframe import ColumnInputType
5557

@@ -142,9 +144,12 @@ def register_python_catalog(catalog: object, name: str | None = None) -> str:
142144
class Catalog(ABC):
143145
"""Interface for python catalog implementations."""
144146

145-
@property
146-
def inner(self) -> object | None:
147-
"""Returns the inner catalog object if this is an adapter."""
147+
@staticmethod
148+
def from_pydict(tables: dict[str, Table]) -> Catalog:
149+
"""Returns an in-memory catalog from the dictionary."""
150+
from daft.catalog.__memory import MemoryCatalog
151+
152+
return MemoryCatalog(tables)
148153

149154
@staticmethod
150155
def from_iceberg(obj: object) -> Catalog:
@@ -192,7 +197,7 @@ def list_tables(self, pattern: str | None = None) -> list[str]: ...
192197
###
193198

194199
@abstractmethod
195-
def get_table(self, name: str) -> Table: ...
200+
def get_table(self, name: str | Identifier) -> Table: ...
196201

197202
# TODO deprecated catalog APIs #3819
198203
def load_table(self, name: str) -> Table:
@@ -227,6 +232,12 @@ def __init__(self, *parts: str):
227232
raise ValueError("Identifier requires at least one part.")
228233
self._identifier = PyIdentifier(parts[:-1], parts[-1])
229234

235+
@staticmethod
236+
def _from_pyidentifier(identifier: PyIdentifier) -> Identifier:
237+
i = Identifier.__new__(Identifier)
238+
i._identifier = identifier
239+
return i
240+
230241
@staticmethod
231242
def from_sql(input: str, normalize: bool = False) -> Identifier:
232243
"""Parses an Identifier from an SQL string, normalizing to lowercase if specified.
@@ -243,6 +254,11 @@ def from_sql(input: str, normalize: bool = False) -> Identifier:
243254
i._identifier = PyIdentifier.from_sql(input, normalize)
244255
return i
245256

257+
@staticmethod
258+
def from_str(input: str) -> Identifier:
259+
"""Parses an Identifier from a dot-delimited Python string without normalization."""
260+
return Identifier(*input.split("."))
261+
246262
def __eq__(self, other: object) -> bool:
247263
if not isinstance(other, Identifier):
248264
return False
@@ -260,13 +276,60 @@ def __len__(self) -> int:
260276
def __repr__(self) -> str:
261277
return f"Identifier('{self._identifier.__repr__()}')"
262278

279+
def __str__(self) -> str:
280+
return ".".join(self)
281+
263282

264283
class Table(ABC):
265284
"""Interface for python table implementations."""
266285

267-
@property
268-
def inner(self) -> object | None:
269-
"""Returns the inner table object if this is an adapter."""
286+
@staticmethod
287+
def from_df(name: str, dataframe: DataFrame) -> Table:
288+
"""Returns a read-only table backed by the DataFrame."""
289+
from daft.catalog.__memory import MemoryTable
290+
291+
return MemoryTable(name, dataframe)
292+
293+
@staticmethod
294+
def from_iceberg(obj: object) -> Table:
295+
"""Returns a Daft Table instance from an Iceberg table."""
296+
try:
297+
from daft.catalog.__iceberg import IcebergTable
298+
299+
return IcebergTable._from_obj(obj)
300+
except ImportError:
301+
raise ImportError("Iceberg support not installed: pip install -U 'getdaft[iceberg]'")
302+
303+
@staticmethod
304+
def from_unity(obj: object) -> Table:
305+
"""Returns a Daft Table instance from a Unity table."""
306+
try:
307+
from daft.catalog.__unity import UnityTable
308+
309+
return UnityTable._from_obj(obj)
310+
except ImportError:
311+
raise ImportError("Unity support not installed: pip install -U 'getdaft[unity]'")
312+
313+
@staticmethod
314+
def _from_obj(obj: object) -> Table:
315+
"""Returns a Daft Table from a supported object type or raises an error."""
316+
raise ValueError(f"Unsupported table type: {type(obj)}")
317+
318+
# TODO catalog APIs part 3
319+
# @property
320+
# @abstractmethod
321+
# def name(self) -> str:
322+
# """Returns the table name."""
323+
324+
# TODO catalog APIs part 3
325+
# @property
326+
# @abstractmethod
327+
# def inner(self) -> object | None:
328+
# """Returns the inner table object if this is an adapter."""
329+
330+
@abstractmethod
331+
def read(self) -> DataFrame:
332+
"""Returns a DataFrame from this table."""
270333

271334
# TODO deprecated catalog APIs #3819
272335
def to_dataframe(self) -> DataFrame:
@@ -277,14 +340,62 @@ def to_dataframe(self) -> DataFrame:
277340
)
278341
return self.read()
279342

280-
@abstractmethod
281-
def read(self) -> DataFrame:
282-
"""Returns a DataFrame from this table."""
283-
284343
def select(self, *columns: ColumnInputType) -> DataFrame:
285344
"""Returns a DataFrame from this table with the selected columns."""
286345
return self.read().select(*columns)
287346

288347
def show(self, n: int = 8) -> None:
289348
"""Shows the first n rows from this table."""
290349
return self.read().show(n)
350+
351+
352+
class TableSource:
353+
"""A TableSource is used to create a new table; this could be a Schema or DataFrame."""
354+
355+
_source: PyTableSource
356+
357+
def __init__(self) -> None:
358+
raise ValueError("We do not support creating a TableSource via __init__")
359+
360+
@staticmethod
361+
def from_df(df: DataFrame) -> TableSource:
362+
s = TableSource.__new__(TableSource)
363+
s._source = PyTableSource.from_builder(df._builder._builder)
364+
return s
365+
366+
@staticmethod
367+
def _from_obj(obj: object = None) -> TableSource:
368+
# TODO for future sources, consider https://github.com/Eventual-Inc/Daft/pull/2864
369+
if obj is None:
370+
return TableSource._from_none()
371+
elif isinstance(obj, DataFrame):
372+
return TableSource.from_df(obj)
373+
elif isinstance(obj, str):
374+
return TableSource._from_path(obj)
375+
elif isinstance(obj, Schema):
376+
return TableSource._from_schema(obj)
377+
else:
378+
raise Exception(f"Unknown table source: {obj}")
379+
380+
@staticmethod
381+
def _from_none() -> TableSource:
382+
# for creating temp mutable tables, but we don't have those yet
383+
# s = TableSource.__new__(TableSource)
384+
# s._source = PyTableSource.empty()
385+
# return s
386+
# todo temp workaround just use an empty schema
387+
return TableSource._from_schema(Schema._from_fields([]))
388+
389+
@staticmethod
390+
def _from_schema(schema: Schema) -> TableSource:
391+
# we don't have mutable temp tables, so just make an empty view
392+
# s = TableSource.__new__(TableSource)
393+
# s._source = PyTableSource.from_schema(schema._schema)
394+
# return s
395+
# todo temp workaround until create_table is wired
396+
return TableSource.from_df(DataFrame._from_pylist([]))
397+
398+
@staticmethod
399+
def _from_path(path: str) -> TableSource:
400+
# for supporting daft.create_table("t", "/path/to/data") <-> CREATE TABLE t AS '/path/to/my.data'
401+
raise NotImplementedError("creating a table source from a path is not yet supported.")

daft/catalog/__memory.py

+56
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
"""An in-memory implementation for the daft catalog abstractions."""
2+
3+
from __future__ import annotations
4+
5+
from typing import TYPE_CHECKING
6+
7+
from daft.catalog import Catalog, Identifier, Table
8+
9+
if TYPE_CHECKING:
10+
from daft.dataframe.dataframe import DataFrame
11+
12+
13+
class MemoryCatalog(Catalog):
14+
"""An in-memory catalog is backed by a dictionary."""
15+
16+
_tables: dict[str, Table]
17+
18+
def __init__(self, tables: dict[str, Table]):
19+
self._tables = tables
20+
21+
###
22+
# list_*
23+
###
24+
25+
def list_tables(self, pattern: str | None = None) -> list[str]:
26+
if pattern is None:
27+
return list(self._tables.keys())
28+
return [path for path in self._tables.keys() if path.startswith(pattern)]
29+
30+
###
31+
# get_*
32+
###
33+
34+
def get_table(self, name: str | Identifier) -> Table:
35+
path = str(name)
36+
if path not in self._tables:
37+
raise ValueError(f"Table {path} does not exist.")
38+
return self._tables[path]
39+
40+
41+
class MemoryTable(Table):
42+
"""An in-memory table holds a reference to an existing dataframe."""
43+
44+
_name: str
45+
_inner: DataFrame
46+
47+
def __init__(self, name: str, inner: DataFrame):
48+
self._name = name
49+
self._inner = inner
50+
51+
###
52+
# DataFrame Methods
53+
###
54+
55+
def read(self) -> DataFrame:
56+
return self._inner

daft/catalog/__unity.py

+18-7
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import warnings
66
from typing import TYPE_CHECKING
77

8-
from daft.catalog import Catalog, Table
8+
from daft.catalog import Catalog, Identifier, Table
99
from daft.unity_catalog import UnityCatalog as InnerCatalog # noqa: TID253
1010
from daft.unity_catalog import UnityCatalogTable as InnerTable # noqa: TID253
1111

@@ -33,16 +33,13 @@ def _from_obj(obj: object) -> UnityCatalog:
3333
return c
3434
raise ValueError(f"Unsupported unity catalog type: {type(obj)}")
3535

36-
@property
37-
def inner(self) -> InnerCatalog:
38-
"""Returns the inner unity catalog."""
39-
return self._inner
40-
4136
###
4237
# get_*
4338
###
4439

45-
def get_table(self, name: str) -> UnityTable:
40+
def get_table(self, name: str | Identifier) -> UnityTable:
41+
if isinstance(name, Identifier):
42+
name = ".".join(name) # TODO unity qualified identifiers
4643
return UnityTable(self._inner.load_table(name))
4744

4845
###
@@ -74,8 +71,22 @@ class UnityTable(Table):
7471
_inner: InnerTable
7572

7673
def __init__(self, unity_table: InnerTable):
74+
"""DEPRECATED: Please use `Table.from_unity`; version 0.5.0!"""
75+
warnings.warn(
76+
"This is deprecated and will be removed in daft >= 0.5.0, please prefer using `Table.from_unity` instead; version 0.5.0!",
77+
category=DeprecationWarning,
78+
)
7779
self._inner = unity_table
7880

81+
@staticmethod
82+
def _from_obj(obj: object) -> UnityTable | None:
83+
"""Returns a UnityTable if the given object can be adapted so."""
84+
if isinstance(obj, InnerTable):
85+
t = UnityTable.__new__(UnityTable)
86+
t._inner = obj
87+
return t
88+
raise ValueError(f"Unsupported unity table type: {type(obj)}")
89+
7990
@staticmethod
8091
def _try_from(obj: object) -> UnityTable | None:
8192
"""Returns an UnityTable if the given object can be adapted so."""

0 commit comments

Comments
 (0)