From ec82fc9b5ae64ac38c9a2da38cab4652fe78243a Mon Sep 17 00:00:00 2001 From: "R. Conner Howell" Date: Mon, 17 Feb 2025 15:42:33 -0800 Subject: [PATCH 1/6] feat(catalog): [1/3] prepare existing catalog APIs for integration --- daft/catalog/__iceberg.py | 48 ++++++++++++++ daft/catalog/__init__.py | 111 ++++++++++++++++++++++----------- daft/catalog/__unity.py | 63 +++++++++++++++++++ daft/catalog/pyiceberg.py | 35 +++-------- daft/catalog/python_catalog.py | 27 ++------ daft/catalog/unity.py | 52 +++------------ daft/daft/__init__.pyi | 13 ++++ daft/daft/catalog.pyi | 9 --- 8 files changed, 218 insertions(+), 140 deletions(-) create mode 100644 daft/catalog/__iceberg.py create mode 100644 daft/catalog/__unity.py diff --git a/daft/catalog/__iceberg.py b/daft/catalog/__iceberg.py new file mode 100644 index 0000000000..4bd5cf9216 --- /dev/null +++ b/daft/catalog/__iceberg.py @@ -0,0 +1,48 @@ +"""WARNING! These APIs are internal; please use Catalog.from_iceberg() and Table.from_iceberg().""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from daft.catalog import Catalog, Table + +if TYPE_CHECKING: + from pyiceberg.catalog import Catalog as InnerCatalog + from pyiceberg.table import Table as InnerTable + + from daft.dataframe import DataFrame + + +class IcebergCatalog(Catalog): + _inner: InnerCatalog + + def __init__(self, pyiceberg_catalog: InnerCatalog): + """DEPRECATED: Please use `Catalog.from_iceberg`.""" + self._inner = pyiceberg_catalog + + ### + # get_* + ### + + def get_table(self, name: str) -> IcebergTable: + return IcebergTable(self._inner.load_table(name)) + + ### + # list_* + ### + + def list_tables(self, pattern: str | None = None) -> list[str]: + namespace = pattern if pattern else "" # pyiceberg lists on namespaces + return [".".join(tup) for tup in self._inner.list_tables(namespace)] + + +class IcebergTable(Table): + _inner: InnerTable + + def __init__(self, inner: InnerTable): + self._inner = inner + + def read(self) -> DataFrame: + import daft + + return daft.read_iceberg(self._inner) diff --git a/daft/catalog/__init__.py b/daft/catalog/__init__.py index dd1d923137..abaeaa0b0f 100644 --- a/daft/catalog/__init__.py +++ b/daft/catalog/__init__.py @@ -1,15 +1,12 @@ -"""The `daft.catalog` module contains functionality for Data Catalogs. +"""The `daft.catalog` module contains functionality for Catalogs. -A Data Catalog can be understood as a system/service for users to discover, access and query their data. -Most commonly, users' data is represented as a "table". Some more modern Data Catalogs such as Unity Catalog +A Catalog can be understood as a system/service for users to discover, access and query their data. +Most commonly, users' data is represented as a "table". Some more modern Catalogs such as Unity Catalog also expose other types of data including files, ML models, registered functions and more. -Examples of Data Catalogs include AWS Glue, Hive Metastore, Apache Iceberg REST and Unity Catalog. +Examples of Catalogs include AWS Glue, Hive Metastore, Apache Iceberg REST and Unity Catalog. -Daft manages Data Catalogs by registering them in an internal meta-catalog, called the "DaftMetaCatalog". This -is simple a collection of data catalogs, which Daft will attempt to detect from a users' current environment. - -**Data Catalog** +**Catalog** Daft recognizes a default catalog which it will attempt to use when no specific catalog name is provided. @@ -20,13 +17,14 @@ **Named Tables** -Daft allows also the registration of named tables, which have no catalog associated with them. +Daft allows also the registration of temporary tables, which have no catalog associated with them. -Note that named tables take precedence over the default catalog's table names when resolving names. +Note that temporary tables take precedence over catalog tables when resolving unqualified names. ```python df = daft.from_pydict({"foo": [1, 2, 3]}) +# TODO deprecated catalog APIs #3819 daft.catalog.register_table( "my_table", df, @@ -40,8 +38,10 @@ from __future__ import annotations +from abc import ABC, abstractmethod from collections.abc import Sequence from daft.daft import catalog as native_catalog +from daft.daft import PyIdentifier from daft.logical.builder import LogicalPlanBuilder from daft.dataframe import DataFrame @@ -49,22 +49,25 @@ from typing import TYPE_CHECKING if TYPE_CHECKING: - from pyiceberg.catalog import Catalog as PyIcebergCatalog - from daft.unity_catalog import UnityCatalog + from daft.dataframe.dataframe import ColumnInputType __all__ = [ + "Catalog", "Identifier", + "Table", + # TODO deprecated catalog APIs #3819 "read_table", "register_python_catalog", "register_table", "unregister_catalog", ] -# Forward imports from the native catalog which don't require Python wrappers +# TODO deprecated catalog APIs #3819 unregister_catalog = native_catalog.unregister_catalog +# TODO deprecated catalog APIs #3819 def read_table(name: str) -> DataFrame: """Finds a table with the specified name and reads it as a DataFrame. @@ -84,6 +87,7 @@ def read_table(name: str) -> DataFrame: return DataFrame(LogicalPlanBuilder(native_logical_plan_builder)) +# TODO deprecated catalog APIs #3819 def register_table(name: str, dataframe: DataFrame) -> str: """Register a DataFrame as a named table. @@ -105,7 +109,8 @@ def register_table(name: str, dataframe: DataFrame) -> str: return native_catalog.register_table(name, dataframe._builder._builder) -def register_python_catalog(catalog: PyIcebergCatalog | UnityCatalog, name: str | None = None) -> str: +# TODO deprecated catalog APIs #3819 +def register_python_catalog(catalog: object, name: str | None = None) -> str: """Registers a Python catalog with Daft. Currently supports: @@ -129,35 +134,47 @@ def register_python_catalog(catalog: PyIcebergCatalog | UnityCatalog, name: str >>> daft.catalog.register_python_catalog(catalog, "my_daft_catalog") """ - _PYICEBERG_AVAILABLE = False + # try from iceberg try: - from pyiceberg.catalog import Catalog as PyIcebergCatalog + from daft.catalog.__iceberg import IcebergCatalog, InnerCatalog - _PYICEBERG_AVAILABLE = True + if isinstance(catalog, InnerCatalog): + return native_catalog.register_python_catalog(IcebergCatalog(catalog), name) except ImportError: pass - - _UNITY_AVAILABLE = False + # try from unity try: - from daft.unity_catalog import UnityCatalog + from daft.catalog.__unity import UnityCatalog, InnerCatalog - _UNITY_AVAILABLE = True + if isinstance(catalog, InnerCatalog): + return native_catalog.register_python_catalog(UnityCatalog(catalog), name) except ImportError: pass + # err! unknown + raise ValueError(f"Unsupported catalog type: {type(catalog)}") + - python_catalog: PyIcebergCatalog - if _PYICEBERG_AVAILABLE and isinstance(catalog, PyIcebergCatalog): - from daft.catalog.pyiceberg import PyIcebergCatalogAdaptor +class Catalog(ABC): + """Interface for python catalog implementations.""" - python_catalog = PyIcebergCatalogAdaptor(catalog) - elif _UNITY_AVAILABLE and isinstance(catalog, UnityCatalog): - from daft.catalog.unity import UnityCatalogAdaptor + ### + # list_* + ### - python_catalog = UnityCatalogAdaptor(catalog) - else: - raise ValueError(f"Unsupported catalog type: {type(catalog)}") + @abstractmethod + def list_tables(self, pattern: str | None = None) -> list[str]: ... - return native_catalog.register_python_catalog(python_catalog, name) + ### + # get_* + ### + + @abstractmethod + def get_table(self, name: str) -> Table: ... + + # TODO deprecated catalog APIs #3819 + def load_table(self, name: str) -> Table: + """DEPRECATED: Please use get_table(name: str).""" + return self.get_table(name) class Identifier(Sequence): @@ -168,28 +185,27 @@ class Identifier(Sequence): >>> assert len(id) == 2 """ - _identifier: native_catalog.PyIdentifier + _identifier: PyIdentifier def __init__(self, *parts: str): """Creates an Identifier from its parts. Example: - >>> Identifier("schema", "table") - >>> # + >>> Identifier("namespace", "table") Returns: Identifier: A new identifier. """ if len(parts) < 1: raise ValueError("Identifier requires at least one part.") - self._identifier = native_catalog.PyIdentifier(parts[:-1], parts[-1]) + self._identifier = PyIdentifier(parts[:-1], parts[-1]) @staticmethod def from_sql(input: str, normalize: bool = False) -> Identifier: """Parses an Identifier from an SQL string, normalizing to lowercase if specified. Example: - >>> Identifier.from_sql("schema.table") == Identifier("schema", "table") + >>> Identifier.from_sql("namespace.table") == Identifier("namespace", "table") >>> Identifier.from_sql('"a.b"') == Identifier('"a.b."') >>> Identifier.from_sql('ABC."xYz"', normalize=True) == Identifier("abc", "xYz") @@ -197,7 +213,7 @@ def from_sql(input: str, normalize: bool = False) -> Identifier: Identifier: A new identifier. """ i = Identifier.__new__(Identifier) - i._identifier = native_catalog.PyIdentifier.from_sql(input, normalize) + i._identifier = PyIdentifier.from_sql(input, normalize) return i def __eq__(self, other: object) -> bool: @@ -216,3 +232,24 @@ def __len__(self) -> int: def __repr__(self) -> str: return f"Identifier('{self._identifier.__repr__()}')" + + +class Table(ABC): + """Interface for python table implementations.""" + + # TODO deprecated catalog APIs #3819 + def to_dataframe(self) -> DataFrame: + """DEPRECATED: Please use `read()`.""" + return self.read() + + @abstractmethod + def read(self) -> DataFrame: + """Returns a DataFrame from this table.""" + + def select(self, *columns: ColumnInputType) -> DataFrame: + """Returns a DataFrame from this table with the selected columns.""" + return self.read().select(*columns) + + def show(self, n: int = 8) -> None: + """Shows the first n rows from this table.""" + self.read().show(n) diff --git a/daft/catalog/__unity.py b/daft/catalog/__unity.py new file mode 100644 index 0000000000..2b7fdcd4d6 --- /dev/null +++ b/daft/catalog/__unity.py @@ -0,0 +1,63 @@ +"""WARNING! These APIs are internal; please use Catalog.from_unity() and Table.from_unity().""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from daft.catalog import Catalog, Table + +if TYPE_CHECKING: + from daft.dataframe import DataFrame + from daft.unity_catalog import UnityCatalog as InnerCatalog + from daft.unity_catalog import UnityCatalogTable as InnerTable + + +class UnityCatalog(Catalog): + _inner: InnerCatalog + + def __init__(self, unity_catalog: InnerCatalog): + """DEPRECATED: Please use `Catalog.from_unity`.""" + self._inner = unity_catalog + + ### + # get_* + ### + + def get_table(self, name: str) -> UnityTable: + return UnityTable(self._inner.load_table(name)) + + ### + # list_.* + ### + + def list_tables(self, pattern: str | None = None) -> list[str]: + if pattern is None or pattern == "": + return [ + tbl + for cat in self._inner.list_catalogs() + for schema in self._inner.list_schemas(cat) + for tbl in self._inner.list_tables(schema) + ] + num_namespaces = pattern.count(".") + if num_namespaces == 0: + catalog_name = pattern + return [tbl for schema in self._inner.list_schemas(catalog_name) for tbl in self._inner.list_tables(schema)] + elif num_namespaces == 1: + schema_name = pattern + return [tbl for tbl in self._inner.list_tables(schema_name)] + else: + raise ValueError( + f"Unrecognized catalog name or schema name, expected a '.'-separated namespace but received: {pattern}" + ) + + +class UnityTable(Table): + _inner: InnerTable + + def __init__(self, unity_table: InnerTable): + self._inner = unity_table + + def read(self) -> DataFrame: + import daft + + return daft.read_deltalake(self._inner) diff --git a/daft/catalog/pyiceberg.py b/daft/catalog/pyiceberg.py index bde293a488..a3989e218a 100644 --- a/daft/catalog/pyiceberg.py +++ b/daft/catalog/pyiceberg.py @@ -1,32 +1,11 @@ -from __future__ import annotations - -from typing import TYPE_CHECKING - -if TYPE_CHECKING: - from pyiceberg.catalog import Catalog as PyIcebergCatalog - from pyiceberg.table import Table as PyIcebergTable - - from daft.dataframe import DataFrame - -from daft.catalog.python_catalog import PythonCatalog, PythonCatalogTable - - -class PyIcebergCatalogAdaptor(PythonCatalog): - def __init__(self, pyiceberg_catalog: PyIcebergCatalog): - self._catalog = pyiceberg_catalog - - def list_tables(self, prefix: str) -> list[str]: - return [".".join(tup) for tup in self._catalog.list_tables(prefix)] - - def load_table(self, name: str) -> PyIcebergTableAdaptor: - return PyIcebergTableAdaptor(self._catalog.load_table(name)) +"""DEPRECATED: Please use `Catalog.from_iceberg`, these APIs will be removed in the next release.""" +from __future__ import annotations -class PyIcebergTableAdaptor(PythonCatalogTable): - def __init__(self, pyiceberg_table: PyIcebergTable): - self._table = pyiceberg_table +from daft.catalog.__iceberg import IcebergCatalog, IcebergTable - def to_dataframe(self) -> DataFrame: - import daft +# TODO deprecated catalog APIs #3819 +PyIcebergCatalogAdaptor = IcebergCatalog - return daft.read_iceberg(self._table) +# TODO deprecated catalog APIs #3819 +PyIcebergTableAdaptor = IcebergTable diff --git a/daft/catalog/python_catalog.py b/daft/catalog/python_catalog.py index 2a0f942eac..dc5f736aa0 100644 --- a/daft/catalog/python_catalog.py +++ b/daft/catalog/python_catalog.py @@ -1,24 +1,9 @@ -from __future__ import annotations +"""DEPRECATED: Please use `from daft.catalog import Catalog, Table`.""" -from abc import abstractmethod -from typing import TYPE_CHECKING +from daft.catalog import Catalog, Table -if TYPE_CHECKING: - from daft.dataframe import DataFrame +# TODO deprecated catalog APIs #3819 +PythonCatalog = Catalog - -class PythonCatalog: - """Wrapper class for various Python implementations of Data Catalogs.""" - - @abstractmethod - def list_tables(self, prefix: str) -> list[str]: ... - - @abstractmethod - def load_table(self, name: str) -> PythonCatalogTable: ... - - -class PythonCatalogTable: - """Wrapper class for various Python implementations of Data Catalog Tables.""" - - @abstractmethod - def to_dataframe(self) -> DataFrame: ... +# TODO deprecated catalog APIs #3819 +PythonCatalogTable = Table diff --git a/daft/catalog/unity.py b/daft/catalog/unity.py index aed0b0233e..ab6df14b45 100644 --- a/daft/catalog/unity.py +++ b/daft/catalog/unity.py @@ -1,49 +1,11 @@ -from __future__ import annotations - -from typing import TYPE_CHECKING - -if TYPE_CHECKING: - from daft.dataframe import DataFrame - from daft.unity_catalog import UnityCatalog, UnityCatalogTable - -from daft.catalog.python_catalog import PythonCatalog, PythonCatalogTable - - -class UnityCatalogAdaptor(PythonCatalog): - def __init__(self, unity_catalog: UnityCatalog): - self._catalog = unity_catalog - - def list_tables(self, prefix: str) -> list[str]: - num_namespaces = prefix.count(".") - if prefix == "": - return [ - tbl - for cat in self._catalog.list_catalogs() - for schema in self._catalog.list_schemas(cat) - for tbl in self._catalog.list_tables(schema) - ] - elif num_namespaces == 0: - catalog_name = prefix - return [ - tbl for schema in self._catalog.list_schemas(catalog_name) for tbl in self._catalog.list_tables(schema) - ] - elif num_namespaces == 1: - schema_name = prefix - return [tbl for tbl in self._catalog.list_tables(schema_name)] - else: - raise ValueError( - f"Unrecognized catalog name or schema name, expected a '.'-separated namespace but received: {prefix}" - ) - - def load_table(self, name: str) -> UnityTableAdaptor: - return UnityTableAdaptor(self._catalog.load_table(name)) +"""DEPRECATED: Please use `Catalog.from_unity`, these APIs will be removed in the next release.""" +from __future__ import annotations -class UnityTableAdaptor(PythonCatalogTable): - def __init__(self, unity_table: UnityCatalogTable): - self._table = unity_table +from daft.catalog.__unity import UnityCatalog, UnityTable - def to_dataframe(self) -> DataFrame: - import daft +# TODO deprecated catalog APIs #3819 +UnityCatalogAdaptor = UnityCatalog - return daft.read_deltalake(self._table) +# TODO deprecated catalog APIs #3819 +UnityTableAdaptor = UnityTable diff --git a/daft/daft/__init__.pyi b/daft/daft/__init__.pyi index fe90e5222d..7d5d5d4b27 100644 --- a/daft/daft/__init__.pyi +++ b/daft/daft/__init__.pyi @@ -1852,6 +1852,19 @@ class SystemInfo: def total_memory(self) -> int: ... def cpu_count(self) -> int | None: ... +### +# daft-catalog +### + +class PyIdentifier: + def __init__(self, namespace: tuple[str, ...], name: str): ... + @staticmethod + def from_sql(input: str, normalize: bool): ... + def eq(self, other: PyIdentifier) -> bool: ... + def getitem(self, index: int) -> str: ... + def __len__(self) -> int: ... + def __repr__(self) -> str: ... + ### # daft-session ### diff --git a/daft/daft/catalog.pyi b/daft/daft/catalog.pyi index bb8457afa0..55c5cb50c8 100644 --- a/daft/daft/catalog.pyi +++ b/daft/daft/catalog.pyi @@ -5,15 +5,6 @@ from daft.daft import LogicalPlanBuilder as PyLogicalPlanBuilder if TYPE_CHECKING: from daft.catalog.python_catalog import PythonCatalog -class PyIdentifier: - def __init__(self, namespace: tuple[str, ...], name: str): ... - @staticmethod - def from_sql(input: str, normalize: bool): ... - def eq(self, other: PyIdentifier) -> bool: ... - def getitem(self, index: int) -> str: ... - def __len__(self) -> int: ... - def __repr__(self) -> str: ... - def read_table(name: str) -> PyLogicalPlanBuilder: ... def register_table(name: str, plan_builder: PyLogicalPlanBuilder) -> str: ... def register_python_catalog(catalog: PythonCatalog, catalog_name: str | None) -> str: ... From 708396c2560e5eed43719944849e10271582272d Mon Sep 17 00:00:00 2001 From: "R. Conner Howell" Date: Mon, 17 Feb 2025 16:04:12 -0800 Subject: [PATCH 2/6] Fix PyIdentifier import --- daft/daft/catalog.pyi | 9 +++++++++ src/daft-catalog/src/python.rs | 4 +++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/daft/daft/catalog.pyi b/daft/daft/catalog.pyi index 55c5cb50c8..bb8457afa0 100644 --- a/daft/daft/catalog.pyi +++ b/daft/daft/catalog.pyi @@ -5,6 +5,15 @@ from daft.daft import LogicalPlanBuilder as PyLogicalPlanBuilder if TYPE_CHECKING: from daft.catalog.python_catalog import PythonCatalog +class PyIdentifier: + def __init__(self, namespace: tuple[str, ...], name: str): ... + @staticmethod + def from_sql(input: str, normalize: bool): ... + def eq(self, other: PyIdentifier) -> bool: ... + def getitem(self, index: int) -> str: ... + def __len__(self) -> int: ... + def __repr__(self) -> str: ... + def read_table(name: str) -> PyLogicalPlanBuilder: ... def register_table(name: str, plan_builder: PyLogicalPlanBuilder) -> str: ... def register_python_catalog(catalog: PythonCatalog, catalog_name: str | None) -> str: ... diff --git a/src/daft-catalog/src/python.rs b/src/daft-catalog/src/python.rs index 1bbea213c7..6826f8802d 100644 --- a/src/daft-catalog/src/python.rs +++ b/src/daft-catalog/src/python.rs @@ -144,8 +144,10 @@ impl From for PyIdentifier { /// Defines the python daft. pub fn register_modules<'py>(parent: &Bound<'py, PyModule>) -> PyResult> { + // daft.daft.PyIdentifier + parent.add_class::()?; + let module = PyModule::new(parent.py(), "catalog")?; - module.add_class::()?; module.add_wrapped(wrap_pyfunction!(py_read_table))?; module.add_wrapped(wrap_pyfunction!(py_register_table))?; module.add_wrapped(wrap_pyfunction!(py_unregister_catalog))?; From 6d337203ce7f08e83eead87bb18c2e69ad6f63ac Mon Sep 17 00:00:00 2001 From: "R. Conner Howell" Date: Tue, 18 Feb 2025 09:22:41 -0800 Subject: [PATCH 3/6] Fix polymorphic Catalog factory methods --- daft/catalog/__iceberg.py | 30 +++++++++++++++++-- daft/catalog/__init__.py | 54 ++++++++++++++++++++++++----------- daft/catalog/__unity.py | 28 ++++++++++++++++-- tests/catalog/test_catalog.py | 23 +++++++++++++++ 4 files changed, 113 insertions(+), 22 deletions(-) create mode 100644 tests/catalog/test_catalog.py diff --git a/daft/catalog/__iceberg.py b/daft/catalog/__iceberg.py index 4bd5cf9216..ac675e74f6 100644 --- a/daft/catalog/__iceberg.py +++ b/daft/catalog/__iceberg.py @@ -4,12 +4,12 @@ from typing import TYPE_CHECKING +from pyiceberg.catalog import Catalog as InnerCatalog +from pyiceberg.table import Table as InnerTable + from daft.catalog import Catalog, Table if TYPE_CHECKING: - from pyiceberg.catalog import Catalog as InnerCatalog - from pyiceberg.table import Table as InnerTable - from daft.dataframe import DataFrame @@ -20,6 +20,18 @@ def __init__(self, pyiceberg_catalog: InnerCatalog): """DEPRECATED: Please use `Catalog.from_iceberg`.""" self._inner = pyiceberg_catalog + @staticmethod + def _try_from(obj: object) -> IcebergCatalog | None: + """Returns an IcebergCatalog instance if the given object can be adapted so.""" + if isinstance(obj, InnerCatalog): + return IcebergCatalog(obj) + return None + + @property + def inner(self) -> InnerCatalog: + """Returns the inner iceberg catalog.""" + return self._inner + ### # get_* ### @@ -42,6 +54,18 @@ class IcebergTable(Table): def __init__(self, inner: InnerTable): self._inner = inner + @staticmethod + def _try_from(obj: object) -> IcebergTable | None: + """Returns an IcebergTable if the given object can be adapted so.""" + if isinstance(obj, InnerTable): + return IcebergTable(obj) + return None + + @property + def inner(self) -> InnerTable: + """Returns the inner iceberg table.""" + return self._inner + def read(self) -> DataFrame: import daft diff --git a/daft/catalog/__init__.py b/daft/catalog/__init__.py index abaeaa0b0f..487c5b2d10 100644 --- a/daft/catalog/__init__.py +++ b/daft/catalog/__init__.py @@ -134,29 +134,45 @@ def register_python_catalog(catalog: object, name: str | None = None) -> str: >>> daft.catalog.register_python_catalog(catalog, "my_daft_catalog") """ - # try from iceberg - try: - from daft.catalog.__iceberg import IcebergCatalog, InnerCatalog - - if isinstance(catalog, InnerCatalog): - return native_catalog.register_python_catalog(IcebergCatalog(catalog), name) - except ImportError: - pass - # try from unity - try: - from daft.catalog.__unity import UnityCatalog, InnerCatalog - - if isinstance(catalog, InnerCatalog): - return native_catalog.register_python_catalog(UnityCatalog(catalog), name) - except ImportError: - pass - # err! unknown + if (c := Catalog._try_from(catalog)) is not None: + return native_catalog.register_python_catalog(c, name) raise ValueError(f"Unsupported catalog type: {type(catalog)}") class Catalog(ABC): """Interface for python catalog implementations.""" + @property + def inner(self) -> object | None: + """Returns the inner catalog object if this is an adapter.""" + + @staticmethod + def _try_from(obj: object) -> Catalog | None: + for factory in (Catalog._try_from_iceberg, Catalog._try_from_unity): + if (c := factory(obj)) is not None: + return c + return None + + @staticmethod + def _try_from_iceberg(obj: object) -> Catalog | None: + """Returns a Daft Catalog instance from an Iceberg catalog.""" + try: + from daft.catalog.__iceberg import IcebergCatalog + + return IcebergCatalog._try_from(obj) + except ImportError: + return None + + @staticmethod + def _try_from_unity(obj: object) -> Catalog | None: + """Returns a Daft Catalog instance from a Unity catalog.""" + try: + from daft.catalog.__unity import UnityCatalog + + return UnityCatalog._try_from(obj) + except ImportError: + return None + ### # list_* ### @@ -237,6 +253,10 @@ def __repr__(self) -> str: class Table(ABC): """Interface for python table implementations.""" + @property + def inner(self) -> object | None: + """Returns the inner table object if this is an adapter.""" + # TODO deprecated catalog APIs #3819 def to_dataframe(self) -> DataFrame: """DEPRECATED: Please use `read()`.""" diff --git a/daft/catalog/__unity.py b/daft/catalog/__unity.py index 2b7fdcd4d6..286170a421 100644 --- a/daft/catalog/__unity.py +++ b/daft/catalog/__unity.py @@ -5,11 +5,11 @@ from typing import TYPE_CHECKING from daft.catalog import Catalog, Table +from daft.unity_catalog import UnityCatalog as InnerCatalog # noqa: TID253 +from daft.unity_catalog import UnityCatalogTable as InnerTable # noqa: TID253 if TYPE_CHECKING: from daft.dataframe import DataFrame - from daft.unity_catalog import UnityCatalog as InnerCatalog - from daft.unity_catalog import UnityCatalogTable as InnerTable class UnityCatalog(Catalog): @@ -19,6 +19,18 @@ def __init__(self, unity_catalog: InnerCatalog): """DEPRECATED: Please use `Catalog.from_unity`.""" self._inner = unity_catalog + @staticmethod + def _try_from(obj: object) -> UnityCatalog | None: + """Returns an UnityCatalog instance if the given object can be adapted so.""" + if isinstance(obj, InnerCatalog): + return UnityCatalog(obj) + return None + + @property + def inner(self) -> InnerCatalog: + """Returns the inner unity catalog.""" + return self._inner + ### # get_* ### @@ -57,6 +69,18 @@ class UnityTable(Table): def __init__(self, unity_table: InnerTable): self._inner = unity_table + @staticmethod + def _try_from(obj: object) -> UnityTable | None: + """Returns an UnityTable if the given object can be adapted so.""" + if isinstance(obj, InnerTable): + return UnityTable(obj) + return None + + @property + def inner(self) -> InnerTable: + """Returns the inner unity table.""" + return self._inner + def read(self) -> DataFrame: import daft diff --git a/tests/catalog/test_catalog.py b/tests/catalog/test_catalog.py new file mode 100644 index 0000000000..1713a21232 --- /dev/null +++ b/tests/catalog/test_catalog.py @@ -0,0 +1,23 @@ +from daft.catalog import Catalog + + +def test_try_from_iceberg(tmpdir): + from pyiceberg.catalog.sql import SqlCatalog + + pyiceberg_catalog = SqlCatalog( + "default", + **{ + "uri": f"sqlite:///{tmpdir}/pyiceberg_catalog.db", + "warehouse": f"file://{tmpdir}", + }, + ) + assert Catalog._try_from(pyiceberg_catalog) is not None + assert Catalog._try_from_iceberg(pyiceberg_catalog) is not None + + +def test_try_from_unity(): + from daft.unity_catalog import UnityCatalog + + unity_catalog = UnityCatalog("-", token="-") + assert Catalog._try_from(unity_catalog) is not None + assert Catalog._try_from_unity(unity_catalog) is not None From 36b6fe78f93611b2f67e77525cb577086116d408 Mon Sep 17 00:00:00 2001 From: "R. Conner Howell" Date: Tue, 18 Feb 2025 09:29:02 -0800 Subject: [PATCH 4/6] Use unique file basename for catalog tests --- tests/sql/{test_catalog.py => test_sql_catalog.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename tests/sql/{test_catalog.py => test_sql_catalog.py} (100%) diff --git a/tests/sql/test_catalog.py b/tests/sql/test_sql_catalog.py similarity index 100% rename from tests/sql/test_catalog.py rename to tests/sql/test_sql_catalog.py From f66cc8d20a446921359c5e4abb3cf8534096e048 Mon Sep 17 00:00:00 2001 From: "R. Conner Howell" Date: Wed, 19 Feb 2025 12:18:35 -0800 Subject: [PATCH 5/6] Adds deprecation warnings --- daft/catalog/__iceberg.py | 11 ++++++++--- daft/catalog/__init__.py | 16 +++++++++++++--- daft/catalog/__unity.py | 7 ++++++- 3 files changed, 27 insertions(+), 7 deletions(-) diff --git a/daft/catalog/__iceberg.py b/daft/catalog/__iceberg.py index ac675e74f6..e56a0c6547 100644 --- a/daft/catalog/__iceberg.py +++ b/daft/catalog/__iceberg.py @@ -2,6 +2,7 @@ from __future__ import annotations +import warnings from typing import TYPE_CHECKING from pyiceberg.catalog import Catalog as InnerCatalog @@ -17,7 +18,11 @@ class IcebergCatalog(Catalog): _inner: InnerCatalog def __init__(self, pyiceberg_catalog: InnerCatalog): - """DEPRECATED: Please use `Catalog.from_iceberg`.""" + """DEPRECATED: Please use `Catalog.from_iceberg`; version 0.5.0!""" + warnings.warn( + "This is deprecated and will be removed in daft >= 0.5.0, please use `Catalog.from_iceberg` instead.", + category=DeprecationWarning, + ) self._inner = pyiceberg_catalog @staticmethod @@ -44,8 +49,8 @@ def get_table(self, name: str) -> IcebergTable: ### def list_tables(self, pattern: str | None = None) -> list[str]: - namespace = pattern if pattern else "" # pyiceberg lists on namespaces - return [".".join(tup) for tup in self._inner.list_tables(namespace)] + """List tables under the given namespace (pattern) in the catalog, or all tables if no namespace is provided.""" + return [".".join(tup) for tup in self._inner.list_tables(pattern)] class IcebergTable(Table): diff --git a/daft/catalog/__init__.py b/daft/catalog/__init__.py index 487c5b2d10..a097ca58b1 100644 --- a/daft/catalog/__init__.py +++ b/daft/catalog/__init__.py @@ -38,6 +38,8 @@ from __future__ import annotations +import warnings + from abc import ABC, abstractmethod from collections.abc import Sequence from daft.daft import catalog as native_catalog @@ -189,7 +191,11 @@ def get_table(self, name: str) -> Table: ... # TODO deprecated catalog APIs #3819 def load_table(self, name: str) -> Table: - """DEPRECATED: Please use get_table(name: str).""" + """DEPRECATED: Please use `get_table` instead; version=0.5.0!""" + warnings.warn( + "This is deprecated and will be removed in daft >= 0.5.0, please use `get_table` instead.", + category=DeprecationWarning, + ) return self.get_table(name) @@ -259,7 +265,11 @@ def inner(self) -> object | None: # TODO deprecated catalog APIs #3819 def to_dataframe(self) -> DataFrame: - """DEPRECATED: Please use `read()`.""" + """DEPRECATED: Please use `read` instead; version 0.5.0!""" + warnings.warn( + "This is deprecated and will be removed in daft >= 0.5.0, please use `read` instead.", + category=DeprecationWarning, + ) return self.read() @abstractmethod @@ -272,4 +282,4 @@ def select(self, *columns: ColumnInputType) -> DataFrame: def show(self, n: int = 8) -> None: """Shows the first n rows from this table.""" - self.read().show(n) + return self.read().show(n) diff --git a/daft/catalog/__unity.py b/daft/catalog/__unity.py index 286170a421..54a24d20a0 100644 --- a/daft/catalog/__unity.py +++ b/daft/catalog/__unity.py @@ -2,6 +2,7 @@ from __future__ import annotations +import warnings from typing import TYPE_CHECKING from daft.catalog import Catalog, Table @@ -16,7 +17,11 @@ class UnityCatalog(Catalog): _inner: InnerCatalog def __init__(self, unity_catalog: InnerCatalog): - """DEPRECATED: Please use `Catalog.from_unity`.""" + """DEPRECATED: Please use `Catalog.from_unity`; version 0.5.0!""" + warnings.warn( + "This is deprecated and will be removed in daft >= 0.5.0, please prefer using `Catalog.from_unity` instead; version 0.5.0!", + category=DeprecationWarning, + ) self._inner = unity_catalog @staticmethod From 1b0ab2d8b3ff1481ff0033452d45158631c42c07 Mon Sep 17 00:00:00 2001 From: "R. Conner Howell" Date: Wed, 19 Feb 2025 12:54:02 -0800 Subject: [PATCH 6/6] Adds public Catalog from_ methods --- daft/catalog/__iceberg.py | 8 +++++--- daft/catalog/__init__.py | 37 ++++++++++++++++++++--------------- daft/catalog/__unity.py | 8 +++++--- tests/catalog/test_catalog.py | 10 ++++++---- 4 files changed, 37 insertions(+), 26 deletions(-) diff --git a/daft/catalog/__iceberg.py b/daft/catalog/__iceberg.py index e56a0c6547..04d0fc9815 100644 --- a/daft/catalog/__iceberg.py +++ b/daft/catalog/__iceberg.py @@ -26,11 +26,13 @@ def __init__(self, pyiceberg_catalog: InnerCatalog): self._inner = pyiceberg_catalog @staticmethod - def _try_from(obj: object) -> IcebergCatalog | None: + def _from_obj(obj: object) -> IcebergCatalog: """Returns an IcebergCatalog instance if the given object can be adapted so.""" if isinstance(obj, InnerCatalog): - return IcebergCatalog(obj) - return None + c = IcebergCatalog.__new__(IcebergCatalog) + c._inner = obj + return c + raise ValueError(f"Unsupported iceberg catalog type: {type(obj)}") @property def inner(self) -> InnerCatalog: diff --git a/daft/catalog/__init__.py b/daft/catalog/__init__.py index a097ca58b1..ad0b8ab8d6 100644 --- a/daft/catalog/__init__.py +++ b/daft/catalog/__init__.py @@ -136,9 +136,7 @@ def register_python_catalog(catalog: object, name: str | None = None) -> str: >>> daft.catalog.register_python_catalog(catalog, "my_daft_catalog") """ - if (c := Catalog._try_from(catalog)) is not None: - return native_catalog.register_python_catalog(c, name) - raise ValueError(f"Unsupported catalog type: {type(catalog)}") + return native_catalog.register_python_catalog(Catalog._from_obj(catalog), name) class Catalog(ABC): @@ -149,31 +147,38 @@ def inner(self) -> object | None: """Returns the inner catalog object if this is an adapter.""" @staticmethod - def _try_from(obj: object) -> Catalog | None: - for factory in (Catalog._try_from_iceberg, Catalog._try_from_unity): - if (c := factory(obj)) is not None: - return c - return None - - @staticmethod - def _try_from_iceberg(obj: object) -> Catalog | None: + def from_iceberg(obj: object) -> Catalog: """Returns a Daft Catalog instance from an Iceberg catalog.""" try: from daft.catalog.__iceberg import IcebergCatalog - return IcebergCatalog._try_from(obj) + return IcebergCatalog._from_obj(obj) except ImportError: - return None + raise ImportError("Iceberg support not installed: pip install -U 'getdaft[iceberg]'") @staticmethod - def _try_from_unity(obj: object) -> Catalog | None: + def from_unity(obj: object) -> Catalog: """Returns a Daft Catalog instance from a Unity catalog.""" try: from daft.catalog.__unity import UnityCatalog - return UnityCatalog._try_from(obj) + return UnityCatalog._from_obj(obj) except ImportError: - return None + raise ImportError("Unity support not installed: pip install -U 'getdaft[unity]'") + + @staticmethod + def _from_obj(obj: object) -> Catalog: + """Returns a Daft Catalog from a supported object type or raises a ValueError.""" + for factory in (Catalog.from_iceberg, Catalog.from_unity): + try: + return factory(obj) + except ValueError: + pass + except ImportError: + pass + raise ValueError( + f"Unsupported catalog type: {type(obj)}; please ensure all required extra dependencies are installed." + ) ### # list_* diff --git a/daft/catalog/__unity.py b/daft/catalog/__unity.py index 54a24d20a0..c73a492d56 100644 --- a/daft/catalog/__unity.py +++ b/daft/catalog/__unity.py @@ -25,11 +25,13 @@ def __init__(self, unity_catalog: InnerCatalog): self._inner = unity_catalog @staticmethod - def _try_from(obj: object) -> UnityCatalog | None: + def _from_obj(obj: object) -> UnityCatalog: """Returns an UnityCatalog instance if the given object can be adapted so.""" if isinstance(obj, InnerCatalog): - return UnityCatalog(obj) - return None + c = UnityCatalog.__new__(UnityCatalog) + c._inner = obj + return c + raise ValueError(f"Unsupported unity catalog type: {type(obj)}") @property def inner(self) -> InnerCatalog: diff --git a/tests/catalog/test_catalog.py b/tests/catalog/test_catalog.py index 1713a21232..21b817a53d 100644 --- a/tests/catalog/test_catalog.py +++ b/tests/catalog/test_catalog.py @@ -11,13 +11,15 @@ def test_try_from_iceberg(tmpdir): "warehouse": f"file://{tmpdir}", }, ) - assert Catalog._try_from(pyiceberg_catalog) is not None - assert Catalog._try_from_iceberg(pyiceberg_catalog) is not None + # assert doesn't throw! + assert Catalog._from_obj(pyiceberg_catalog) is not None + assert Catalog.from_iceberg(pyiceberg_catalog) is not None def test_try_from_unity(): from daft.unity_catalog import UnityCatalog unity_catalog = UnityCatalog("-", token="-") - assert Catalog._try_from(unity_catalog) is not None - assert Catalog._try_from_unity(unity_catalog) is not None + # assert doesn't throw! + assert Catalog._from_obj(unity_catalog) is not None + assert Catalog.from_unity(unity_catalog) is not None