diff --git a/daft/catalog/__iceberg.py b/daft/catalog/__iceberg.py new file mode 100644 index 0000000000..04d0fc9815 --- /dev/null +++ b/daft/catalog/__iceberg.py @@ -0,0 +1,79 @@ +"""WARNING! These APIs are internal; please use Catalog.from_iceberg() and Table.from_iceberg().""" + +from __future__ import annotations + +import warnings +from typing import TYPE_CHECKING + +from pyiceberg.catalog import Catalog as InnerCatalog +from pyiceberg.table import Table as InnerTable + +from daft.catalog import Catalog, Table + +if TYPE_CHECKING: + from daft.dataframe import DataFrame + + +class IcebergCatalog(Catalog): + _inner: InnerCatalog + + def __init__(self, pyiceberg_catalog: InnerCatalog): + """DEPRECATED: Please use `Catalog.from_iceberg`; version 0.5.0!""" + warnings.warn( + "This is deprecated and will be removed in daft >= 0.5.0, please use `Catalog.from_iceberg` instead.", + category=DeprecationWarning, + ) + self._inner = pyiceberg_catalog + + @staticmethod + def _from_obj(obj: object) -> IcebergCatalog: + """Returns an IcebergCatalog instance if the given object can be adapted so.""" + if isinstance(obj, InnerCatalog): + c = IcebergCatalog.__new__(IcebergCatalog) + c._inner = obj + return c + raise ValueError(f"Unsupported iceberg catalog type: {type(obj)}") + + @property + def inner(self) -> InnerCatalog: + """Returns the inner iceberg catalog.""" + return self._inner + + ### + # get_* + ### + + def get_table(self, name: str) -> IcebergTable: + return IcebergTable(self._inner.load_table(name)) + + ### + # list_* + ### + + def list_tables(self, pattern: str | None = None) -> list[str]: + """List tables under the given namespace (pattern) in the catalog, or all tables if no namespace is provided.""" + return [".".join(tup) for tup in self._inner.list_tables(pattern)] + + +class IcebergTable(Table): + _inner: InnerTable + + def __init__(self, inner: InnerTable): + self._inner = inner + + @staticmethod + def _try_from(obj: object) -> IcebergTable | None: + """Returns an IcebergTable if the given object can be adapted so.""" + if isinstance(obj, InnerTable): + return IcebergTable(obj) + return None + + @property + def inner(self) -> InnerTable: + """Returns the inner iceberg table.""" + return self._inner + + def read(self) -> DataFrame: + import daft + + return daft.read_iceberg(self._inner) diff --git a/daft/catalog/__init__.py b/daft/catalog/__init__.py index dd1d923137..ad0b8ab8d6 100644 --- a/daft/catalog/__init__.py +++ b/daft/catalog/__init__.py @@ -1,15 +1,12 @@ -"""The `daft.catalog` module contains functionality for Data Catalogs. +"""The `daft.catalog` module contains functionality for Catalogs. -A Data Catalog can be understood as a system/service for users to discover, access and query their data. -Most commonly, users' data is represented as a "table". Some more modern Data Catalogs such as Unity Catalog +A Catalog can be understood as a system/service for users to discover, access and query their data. +Most commonly, users' data is represented as a "table". Some more modern Catalogs such as Unity Catalog also expose other types of data including files, ML models, registered functions and more. -Examples of Data Catalogs include AWS Glue, Hive Metastore, Apache Iceberg REST and Unity Catalog. +Examples of Catalogs include AWS Glue, Hive Metastore, Apache Iceberg REST and Unity Catalog. -Daft manages Data Catalogs by registering them in an internal meta-catalog, called the "DaftMetaCatalog". This -is simple a collection of data catalogs, which Daft will attempt to detect from a users' current environment. - -**Data Catalog** +**Catalog** Daft recognizes a default catalog which it will attempt to use when no specific catalog name is provided. @@ -20,13 +17,14 @@ **Named Tables** -Daft allows also the registration of named tables, which have no catalog associated with them. +Daft allows also the registration of temporary tables, which have no catalog associated with them. -Note that named tables take precedence over the default catalog's table names when resolving names. +Note that temporary tables take precedence over catalog tables when resolving unqualified names. ```python df = daft.from_pydict({"foo": [1, 2, 3]}) +# TODO deprecated catalog APIs #3819 daft.catalog.register_table( "my_table", df, @@ -40,8 +38,12 @@ from __future__ import annotations +import warnings + +from abc import ABC, abstractmethod from collections.abc import Sequence from daft.daft import catalog as native_catalog +from daft.daft import PyIdentifier from daft.logical.builder import LogicalPlanBuilder from daft.dataframe import DataFrame @@ -49,22 +51,25 @@ from typing import TYPE_CHECKING if TYPE_CHECKING: - from pyiceberg.catalog import Catalog as PyIcebergCatalog - from daft.unity_catalog import UnityCatalog + from daft.dataframe.dataframe import ColumnInputType __all__ = [ + "Catalog", "Identifier", + "Table", + # TODO deprecated catalog APIs #3819 "read_table", "register_python_catalog", "register_table", "unregister_catalog", ] -# Forward imports from the native catalog which don't require Python wrappers +# TODO deprecated catalog APIs #3819 unregister_catalog = native_catalog.unregister_catalog +# TODO deprecated catalog APIs #3819 def read_table(name: str) -> DataFrame: """Finds a table with the specified name and reads it as a DataFrame. @@ -84,6 +89,7 @@ def read_table(name: str) -> DataFrame: return DataFrame(LogicalPlanBuilder(native_logical_plan_builder)) +# TODO deprecated catalog APIs #3819 def register_table(name: str, dataframe: DataFrame) -> str: """Register a DataFrame as a named table. @@ -105,7 +111,8 @@ def register_table(name: str, dataframe: DataFrame) -> str: return native_catalog.register_table(name, dataframe._builder._builder) -def register_python_catalog(catalog: PyIcebergCatalog | UnityCatalog, name: str | None = None) -> str: +# TODO deprecated catalog APIs #3819 +def register_python_catalog(catalog: object, name: str | None = None) -> str: """Registers a Python catalog with Daft. Currently supports: @@ -129,35 +136,72 @@ def register_python_catalog(catalog: PyIcebergCatalog | UnityCatalog, name: str >>> daft.catalog.register_python_catalog(catalog, "my_daft_catalog") """ - _PYICEBERG_AVAILABLE = False - try: - from pyiceberg.catalog import Catalog as PyIcebergCatalog + return native_catalog.register_python_catalog(Catalog._from_obj(catalog), name) - _PYICEBERG_AVAILABLE = True - except ImportError: - pass - _UNITY_AVAILABLE = False - try: - from daft.unity_catalog import UnityCatalog +class Catalog(ABC): + """Interface for python catalog implementations.""" - _UNITY_AVAILABLE = True - except ImportError: - pass + @property + def inner(self) -> object | None: + """Returns the inner catalog object if this is an adapter.""" - python_catalog: PyIcebergCatalog - if _PYICEBERG_AVAILABLE and isinstance(catalog, PyIcebergCatalog): - from daft.catalog.pyiceberg import PyIcebergCatalogAdaptor + @staticmethod + def from_iceberg(obj: object) -> Catalog: + """Returns a Daft Catalog instance from an Iceberg catalog.""" + try: + from daft.catalog.__iceberg import IcebergCatalog - python_catalog = PyIcebergCatalogAdaptor(catalog) - elif _UNITY_AVAILABLE and isinstance(catalog, UnityCatalog): - from daft.catalog.unity import UnityCatalogAdaptor + return IcebergCatalog._from_obj(obj) + except ImportError: + raise ImportError("Iceberg support not installed: pip install -U 'getdaft[iceberg]'") - python_catalog = UnityCatalogAdaptor(catalog) - else: - raise ValueError(f"Unsupported catalog type: {type(catalog)}") + @staticmethod + def from_unity(obj: object) -> Catalog: + """Returns a Daft Catalog instance from a Unity catalog.""" + try: + from daft.catalog.__unity import UnityCatalog - return native_catalog.register_python_catalog(python_catalog, name) + return UnityCatalog._from_obj(obj) + except ImportError: + raise ImportError("Unity support not installed: pip install -U 'getdaft[unity]'") + + @staticmethod + def _from_obj(obj: object) -> Catalog: + """Returns a Daft Catalog from a supported object type or raises a ValueError.""" + for factory in (Catalog.from_iceberg, Catalog.from_unity): + try: + return factory(obj) + except ValueError: + pass + except ImportError: + pass + raise ValueError( + f"Unsupported catalog type: {type(obj)}; please ensure all required extra dependencies are installed." + ) + + ### + # list_* + ### + + @abstractmethod + def list_tables(self, pattern: str | None = None) -> list[str]: ... + + ### + # get_* + ### + + @abstractmethod + def get_table(self, name: str) -> Table: ... + + # TODO deprecated catalog APIs #3819 + def load_table(self, name: str) -> Table: + """DEPRECATED: Please use `get_table` instead; version=0.5.0!""" + warnings.warn( + "This is deprecated and will be removed in daft >= 0.5.0, please use `get_table` instead.", + category=DeprecationWarning, + ) + return self.get_table(name) class Identifier(Sequence): @@ -168,28 +212,27 @@ class Identifier(Sequence): >>> assert len(id) == 2 """ - _identifier: native_catalog.PyIdentifier + _identifier: PyIdentifier def __init__(self, *parts: str): """Creates an Identifier from its parts. Example: - >>> Identifier("schema", "table") - >>> # + >>> Identifier("namespace", "table") Returns: Identifier: A new identifier. """ if len(parts) < 1: raise ValueError("Identifier requires at least one part.") - self._identifier = native_catalog.PyIdentifier(parts[:-1], parts[-1]) + self._identifier = PyIdentifier(parts[:-1], parts[-1]) @staticmethod def from_sql(input: str, normalize: bool = False) -> Identifier: """Parses an Identifier from an SQL string, normalizing to lowercase if specified. Example: - >>> Identifier.from_sql("schema.table") == Identifier("schema", "table") + >>> Identifier.from_sql("namespace.table") == Identifier("namespace", "table") >>> Identifier.from_sql('"a.b"') == Identifier('"a.b."') >>> Identifier.from_sql('ABC."xYz"', normalize=True) == Identifier("abc", "xYz") @@ -197,7 +240,7 @@ def from_sql(input: str, normalize: bool = False) -> Identifier: Identifier: A new identifier. """ i = Identifier.__new__(Identifier) - i._identifier = native_catalog.PyIdentifier.from_sql(input, normalize) + i._identifier = PyIdentifier.from_sql(input, normalize) return i def __eq__(self, other: object) -> bool: @@ -216,3 +259,32 @@ def __len__(self) -> int: def __repr__(self) -> str: return f"Identifier('{self._identifier.__repr__()}')" + + +class Table(ABC): + """Interface for python table implementations.""" + + @property + def inner(self) -> object | None: + """Returns the inner table object if this is an adapter.""" + + # TODO deprecated catalog APIs #3819 + def to_dataframe(self) -> DataFrame: + """DEPRECATED: Please use `read` instead; version 0.5.0!""" + warnings.warn( + "This is deprecated and will be removed in daft >= 0.5.0, please use `read` instead.", + category=DeprecationWarning, + ) + return self.read() + + @abstractmethod + def read(self) -> DataFrame: + """Returns a DataFrame from this table.""" + + def select(self, *columns: ColumnInputType) -> DataFrame: + """Returns a DataFrame from this table with the selected columns.""" + return self.read().select(*columns) + + def show(self, n: int = 8) -> None: + """Shows the first n rows from this table.""" + return self.read().show(n) diff --git a/daft/catalog/__unity.py b/daft/catalog/__unity.py new file mode 100644 index 0000000000..c73a492d56 --- /dev/null +++ b/daft/catalog/__unity.py @@ -0,0 +1,94 @@ +"""WARNING! These APIs are internal; please use Catalog.from_unity() and Table.from_unity().""" + +from __future__ import annotations + +import warnings +from typing import TYPE_CHECKING + +from daft.catalog import Catalog, Table +from daft.unity_catalog import UnityCatalog as InnerCatalog # noqa: TID253 +from daft.unity_catalog import UnityCatalogTable as InnerTable # noqa: TID253 + +if TYPE_CHECKING: + from daft.dataframe import DataFrame + + +class UnityCatalog(Catalog): + _inner: InnerCatalog + + def __init__(self, unity_catalog: InnerCatalog): + """DEPRECATED: Please use `Catalog.from_unity`; version 0.5.0!""" + warnings.warn( + "This is deprecated and will be removed in daft >= 0.5.0, please prefer using `Catalog.from_unity` instead; version 0.5.0!", + category=DeprecationWarning, + ) + self._inner = unity_catalog + + @staticmethod + def _from_obj(obj: object) -> UnityCatalog: + """Returns an UnityCatalog instance if the given object can be adapted so.""" + if isinstance(obj, InnerCatalog): + c = UnityCatalog.__new__(UnityCatalog) + c._inner = obj + return c + raise ValueError(f"Unsupported unity catalog type: {type(obj)}") + + @property + def inner(self) -> InnerCatalog: + """Returns the inner unity catalog.""" + return self._inner + + ### + # get_* + ### + + def get_table(self, name: str) -> UnityTable: + return UnityTable(self._inner.load_table(name)) + + ### + # list_.* + ### + + def list_tables(self, pattern: str | None = None) -> list[str]: + if pattern is None or pattern == "": + return [ + tbl + for cat in self._inner.list_catalogs() + for schema in self._inner.list_schemas(cat) + for tbl in self._inner.list_tables(schema) + ] + num_namespaces = pattern.count(".") + if num_namespaces == 0: + catalog_name = pattern + return [tbl for schema in self._inner.list_schemas(catalog_name) for tbl in self._inner.list_tables(schema)] + elif num_namespaces == 1: + schema_name = pattern + return [tbl for tbl in self._inner.list_tables(schema_name)] + else: + raise ValueError( + f"Unrecognized catalog name or schema name, expected a '.'-separated namespace but received: {pattern}" + ) + + +class UnityTable(Table): + _inner: InnerTable + + def __init__(self, unity_table: InnerTable): + self._inner = unity_table + + @staticmethod + def _try_from(obj: object) -> UnityTable | None: + """Returns an UnityTable if the given object can be adapted so.""" + if isinstance(obj, InnerTable): + return UnityTable(obj) + return None + + @property + def inner(self) -> InnerTable: + """Returns the inner unity table.""" + return self._inner + + def read(self) -> DataFrame: + import daft + + return daft.read_deltalake(self._inner) diff --git a/daft/catalog/pyiceberg.py b/daft/catalog/pyiceberg.py index bde293a488..a3989e218a 100644 --- a/daft/catalog/pyiceberg.py +++ b/daft/catalog/pyiceberg.py @@ -1,32 +1,11 @@ -from __future__ import annotations - -from typing import TYPE_CHECKING - -if TYPE_CHECKING: - from pyiceberg.catalog import Catalog as PyIcebergCatalog - from pyiceberg.table import Table as PyIcebergTable - - from daft.dataframe import DataFrame - -from daft.catalog.python_catalog import PythonCatalog, PythonCatalogTable - - -class PyIcebergCatalogAdaptor(PythonCatalog): - def __init__(self, pyiceberg_catalog: PyIcebergCatalog): - self._catalog = pyiceberg_catalog - - def list_tables(self, prefix: str) -> list[str]: - return [".".join(tup) for tup in self._catalog.list_tables(prefix)] - - def load_table(self, name: str) -> PyIcebergTableAdaptor: - return PyIcebergTableAdaptor(self._catalog.load_table(name)) +"""DEPRECATED: Please use `Catalog.from_iceberg`, these APIs will be removed in the next release.""" +from __future__ import annotations -class PyIcebergTableAdaptor(PythonCatalogTable): - def __init__(self, pyiceberg_table: PyIcebergTable): - self._table = pyiceberg_table +from daft.catalog.__iceberg import IcebergCatalog, IcebergTable - def to_dataframe(self) -> DataFrame: - import daft +# TODO deprecated catalog APIs #3819 +PyIcebergCatalogAdaptor = IcebergCatalog - return daft.read_iceberg(self._table) +# TODO deprecated catalog APIs #3819 +PyIcebergTableAdaptor = IcebergTable diff --git a/daft/catalog/python_catalog.py b/daft/catalog/python_catalog.py index 2a0f942eac..dc5f736aa0 100644 --- a/daft/catalog/python_catalog.py +++ b/daft/catalog/python_catalog.py @@ -1,24 +1,9 @@ -from __future__ import annotations +"""DEPRECATED: Please use `from daft.catalog import Catalog, Table`.""" -from abc import abstractmethod -from typing import TYPE_CHECKING +from daft.catalog import Catalog, Table -if TYPE_CHECKING: - from daft.dataframe import DataFrame +# TODO deprecated catalog APIs #3819 +PythonCatalog = Catalog - -class PythonCatalog: - """Wrapper class for various Python implementations of Data Catalogs.""" - - @abstractmethod - def list_tables(self, prefix: str) -> list[str]: ... - - @abstractmethod - def load_table(self, name: str) -> PythonCatalogTable: ... - - -class PythonCatalogTable: - """Wrapper class for various Python implementations of Data Catalog Tables.""" - - @abstractmethod - def to_dataframe(self) -> DataFrame: ... +# TODO deprecated catalog APIs #3819 +PythonCatalogTable = Table diff --git a/daft/catalog/unity.py b/daft/catalog/unity.py index aed0b0233e..ab6df14b45 100644 --- a/daft/catalog/unity.py +++ b/daft/catalog/unity.py @@ -1,49 +1,11 @@ -from __future__ import annotations - -from typing import TYPE_CHECKING - -if TYPE_CHECKING: - from daft.dataframe import DataFrame - from daft.unity_catalog import UnityCatalog, UnityCatalogTable - -from daft.catalog.python_catalog import PythonCatalog, PythonCatalogTable - - -class UnityCatalogAdaptor(PythonCatalog): - def __init__(self, unity_catalog: UnityCatalog): - self._catalog = unity_catalog - - def list_tables(self, prefix: str) -> list[str]: - num_namespaces = prefix.count(".") - if prefix == "": - return [ - tbl - for cat in self._catalog.list_catalogs() - for schema in self._catalog.list_schemas(cat) - for tbl in self._catalog.list_tables(schema) - ] - elif num_namespaces == 0: - catalog_name = prefix - return [ - tbl for schema in self._catalog.list_schemas(catalog_name) for tbl in self._catalog.list_tables(schema) - ] - elif num_namespaces == 1: - schema_name = prefix - return [tbl for tbl in self._catalog.list_tables(schema_name)] - else: - raise ValueError( - f"Unrecognized catalog name or schema name, expected a '.'-separated namespace but received: {prefix}" - ) - - def load_table(self, name: str) -> UnityTableAdaptor: - return UnityTableAdaptor(self._catalog.load_table(name)) +"""DEPRECATED: Please use `Catalog.from_unity`, these APIs will be removed in the next release.""" +from __future__ import annotations -class UnityTableAdaptor(PythonCatalogTable): - def __init__(self, unity_table: UnityCatalogTable): - self._table = unity_table +from daft.catalog.__unity import UnityCatalog, UnityTable - def to_dataframe(self) -> DataFrame: - import daft +# TODO deprecated catalog APIs #3819 +UnityCatalogAdaptor = UnityCatalog - return daft.read_deltalake(self._table) +# TODO deprecated catalog APIs #3819 +UnityTableAdaptor = UnityTable diff --git a/daft/daft/__init__.pyi b/daft/daft/__init__.pyi index fe90e5222d..7d5d5d4b27 100644 --- a/daft/daft/__init__.pyi +++ b/daft/daft/__init__.pyi @@ -1852,6 +1852,19 @@ class SystemInfo: def total_memory(self) -> int: ... def cpu_count(self) -> int | None: ... +### +# daft-catalog +### + +class PyIdentifier: + def __init__(self, namespace: tuple[str, ...], name: str): ... + @staticmethod + def from_sql(input: str, normalize: bool): ... + def eq(self, other: PyIdentifier) -> bool: ... + def getitem(self, index: int) -> str: ... + def __len__(self) -> int: ... + def __repr__(self) -> str: ... + ### # daft-session ### diff --git a/src/daft-catalog/src/python.rs b/src/daft-catalog/src/python.rs index 1bbea213c7..6826f8802d 100644 --- a/src/daft-catalog/src/python.rs +++ b/src/daft-catalog/src/python.rs @@ -144,8 +144,10 @@ impl From for PyIdentifier { /// Defines the python daft. pub fn register_modules<'py>(parent: &Bound<'py, PyModule>) -> PyResult> { + // daft.daft.PyIdentifier + parent.add_class::()?; + let module = PyModule::new(parent.py(), "catalog")?; - module.add_class::()?; module.add_wrapped(wrap_pyfunction!(py_read_table))?; module.add_wrapped(wrap_pyfunction!(py_register_table))?; module.add_wrapped(wrap_pyfunction!(py_unregister_catalog))?; diff --git a/tests/catalog/test_catalog.py b/tests/catalog/test_catalog.py new file mode 100644 index 0000000000..21b817a53d --- /dev/null +++ b/tests/catalog/test_catalog.py @@ -0,0 +1,25 @@ +from daft.catalog import Catalog + + +def test_try_from_iceberg(tmpdir): + from pyiceberg.catalog.sql import SqlCatalog + + pyiceberg_catalog = SqlCatalog( + "default", + **{ + "uri": f"sqlite:///{tmpdir}/pyiceberg_catalog.db", + "warehouse": f"file://{tmpdir}", + }, + ) + # assert doesn't throw! + assert Catalog._from_obj(pyiceberg_catalog) is not None + assert Catalog.from_iceberg(pyiceberg_catalog) is not None + + +def test_try_from_unity(): + from daft.unity_catalog import UnityCatalog + + unity_catalog = UnityCatalog("-", token="-") + # assert doesn't throw! + assert Catalog._from_obj(unity_catalog) is not None + assert Catalog.from_unity(unity_catalog) is not None diff --git a/tests/sql/test_catalog.py b/tests/sql/test_sql_catalog.py similarity index 100% rename from tests/sql/test_catalog.py rename to tests/sql/test_sql_catalog.py