Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(catalog): Prepare existing catalog APIs for integration [1/3] #3820

Merged
merged 6 commits into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions daft/catalog/__iceberg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
"""WARNING! These APIs are internal; please use Catalog.from_iceberg() and Table.from_iceberg()."""

from __future__ import annotations

import warnings
from typing import TYPE_CHECKING

from pyiceberg.catalog import Catalog as InnerCatalog
from pyiceberg.table import Table as InnerTable

from daft.catalog import Catalog, Table

if TYPE_CHECKING:
from daft.dataframe import DataFrame

Check warning on line 14 in daft/catalog/__iceberg.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/__iceberg.py#L14

Added line #L14 was not covered by tests


class IcebergCatalog(Catalog):
_inner: InnerCatalog

def __init__(self, pyiceberg_catalog: InnerCatalog):
"""DEPRECATED: Please use `Catalog.from_iceberg`; version 0.5.0!"""
warnings.warn(

Check warning on line 22 in daft/catalog/__iceberg.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/__iceberg.py#L22

Added line #L22 was not covered by tests
"This is deprecated and will be removed in daft >= 0.5.0, please use `Catalog.from_iceberg` instead.",
category=DeprecationWarning,
)
self._inner = pyiceberg_catalog

Check warning on line 26 in daft/catalog/__iceberg.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/__iceberg.py#L26

Added line #L26 was not covered by tests

@staticmethod
def _from_obj(obj: object) -> IcebergCatalog:
"""Returns an IcebergCatalog instance if the given object can be adapted so."""
if isinstance(obj, InnerCatalog):
c = IcebergCatalog.__new__(IcebergCatalog)
c._inner = obj
return c
raise ValueError(f"Unsupported iceberg catalog type: {type(obj)}")

@property
def inner(self) -> InnerCatalog:
"""Returns the inner iceberg catalog."""
return self._inner

Check warning on line 40 in daft/catalog/__iceberg.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/__iceberg.py#L40

Added line #L40 was not covered by tests

###
# get_*
###

def get_table(self, name: str) -> IcebergTable:
return IcebergTable(self._inner.load_table(name))

Check warning on line 47 in daft/catalog/__iceberg.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/__iceberg.py#L47

Added line #L47 was not covered by tests

###
# list_*
###

def list_tables(self, pattern: str | None = None) -> list[str]:
"""List tables under the given namespace (pattern) in the catalog, or all tables if no namespace is provided."""
return [".".join(tup) for tup in self._inner.list_tables(pattern)]

Check warning on line 55 in daft/catalog/__iceberg.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/__iceberg.py#L55

Added line #L55 was not covered by tests


class IcebergTable(Table):
_inner: InnerTable

def __init__(self, inner: InnerTable):
self._inner = inner

Check warning on line 62 in daft/catalog/__iceberg.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/__iceberg.py#L62

Added line #L62 was not covered by tests

@staticmethod
def _try_from(obj: object) -> IcebergTable | None:
"""Returns an IcebergTable if the given object can be adapted so."""
if isinstance(obj, InnerTable):
return IcebergTable(obj)
return None

Check warning on line 69 in daft/catalog/__iceberg.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/__iceberg.py#L67-L69

Added lines #L67 - L69 were not covered by tests

@property
def inner(self) -> InnerTable:
"""Returns the inner iceberg table."""
return self._inner

Check warning on line 74 in daft/catalog/__iceberg.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/__iceberg.py#L74

Added line #L74 was not covered by tests

def read(self) -> DataFrame:
import daft

Check warning on line 77 in daft/catalog/__iceberg.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/__iceberg.py#L77

Added line #L77 was not covered by tests

return daft.read_iceberg(self._inner)

Check warning on line 79 in daft/catalog/__iceberg.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/__iceberg.py#L79

Added line #L79 was not covered by tests
156 changes: 114 additions & 42 deletions daft/catalog/__init__.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
"""The `daft.catalog` module contains functionality for Data Catalogs.
"""The `daft.catalog` module contains functionality for Catalogs.

A Data Catalog can be understood as a system/service for users to discover, access and query their data.
Most commonly, users' data is represented as a "table". Some more modern Data Catalogs such as Unity Catalog
A Catalog can be understood as a system/service for users to discover, access and query their data.
Most commonly, users' data is represented as a "table". Some more modern Catalogs such as Unity Catalog
also expose other types of data including files, ML models, registered functions and more.

Examples of Data Catalogs include AWS Glue, Hive Metastore, Apache Iceberg REST and Unity Catalog.
Examples of Catalogs include AWS Glue, Hive Metastore, Apache Iceberg REST and Unity Catalog.

Daft manages Data Catalogs by registering them in an internal meta-catalog, called the "DaftMetaCatalog". This
is simple a collection of data catalogs, which Daft will attempt to detect from a users' current environment.

**Data Catalog**
**Catalog**

Daft recognizes a default catalog which it will attempt to use when no specific catalog name is provided.

Expand All @@ -20,13 +17,14 @@

**Named Tables**

Daft allows also the registration of named tables, which have no catalog associated with them.
Daft allows also the registration of temporary tables, which have no catalog associated with them.

Note that named tables take precedence over the default catalog's table names when resolving names.
Note that temporary tables take precedence over catalog tables when resolving unqualified names.

```python
df = daft.from_pydict({"foo": [1, 2, 3]})

# TODO deprecated catalog APIs #3819
daft.catalog.register_table(
"my_table",
df,
Expand All @@ -40,31 +38,38 @@

from __future__ import annotations

import warnings

from abc import ABC, abstractmethod
from collections.abc import Sequence
from daft.daft import catalog as native_catalog
from daft.daft import PyIdentifier
from daft.logical.builder import LogicalPlanBuilder

from daft.dataframe import DataFrame

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from pyiceberg.catalog import Catalog as PyIcebergCatalog
from daft.unity_catalog import UnityCatalog
from daft.dataframe.dataframe import ColumnInputType

Check warning on line 54 in daft/catalog/__init__.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/__init__.py#L54

Added line #L54 was not covered by tests


__all__ = [
"Catalog",
"Identifier",
"Table",
# TODO deprecated catalog APIs #3819
"read_table",
"register_python_catalog",
"register_table",
"unregister_catalog",
]

# Forward imports from the native catalog which don't require Python wrappers
# TODO deprecated catalog APIs #3819
unregister_catalog = native_catalog.unregister_catalog


# TODO deprecated catalog APIs #3819
def read_table(name: str) -> DataFrame:
"""Finds a table with the specified name and reads it as a DataFrame.

Expand All @@ -84,6 +89,7 @@
return DataFrame(LogicalPlanBuilder(native_logical_plan_builder))


# TODO deprecated catalog APIs #3819
def register_table(name: str, dataframe: DataFrame) -> str:
"""Register a DataFrame as a named table.

Expand All @@ -105,7 +111,8 @@
return native_catalog.register_table(name, dataframe._builder._builder)


def register_python_catalog(catalog: PyIcebergCatalog | UnityCatalog, name: str | None = None) -> str:
# TODO deprecated catalog APIs #3819
def register_python_catalog(catalog: object, name: str | None = None) -> str:
"""Registers a Python catalog with Daft.

Currently supports:
Expand All @@ -129,35 +136,72 @@
>>> daft.catalog.register_python_catalog(catalog, "my_daft_catalog")

"""
_PYICEBERG_AVAILABLE = False
try:
from pyiceberg.catalog import Catalog as PyIcebergCatalog
return native_catalog.register_python_catalog(Catalog._from_obj(catalog), name)

Check warning on line 139 in daft/catalog/__init__.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/__init__.py#L139

Added line #L139 was not covered by tests

_PYICEBERG_AVAILABLE = True
except ImportError:
pass

_UNITY_AVAILABLE = False
try:
from daft.unity_catalog import UnityCatalog
class Catalog(ABC):
"""Interface for python catalog implementations."""

_UNITY_AVAILABLE = True
except ImportError:
pass
@property
def inner(self) -> object | None:
"""Returns the inner catalog object if this is an adapter."""

python_catalog: PyIcebergCatalog
if _PYICEBERG_AVAILABLE and isinstance(catalog, PyIcebergCatalog):
from daft.catalog.pyiceberg import PyIcebergCatalogAdaptor
@staticmethod
def from_iceberg(obj: object) -> Catalog:
"""Returns a Daft Catalog instance from an Iceberg catalog."""
try:
from daft.catalog.__iceberg import IcebergCatalog

python_catalog = PyIcebergCatalogAdaptor(catalog)
elif _UNITY_AVAILABLE and isinstance(catalog, UnityCatalog):
from daft.catalog.unity import UnityCatalogAdaptor
return IcebergCatalog._from_obj(obj)
except ImportError:
raise ImportError("Iceberg support not installed: pip install -U 'getdaft[iceberg]'")

Check warning on line 157 in daft/catalog/__init__.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/__init__.py#L157

Added line #L157 was not covered by tests

python_catalog = UnityCatalogAdaptor(catalog)
else:
raise ValueError(f"Unsupported catalog type: {type(catalog)}")
@staticmethod
def from_unity(obj: object) -> Catalog:
"""Returns a Daft Catalog instance from a Unity catalog."""
try:
from daft.catalog.__unity import UnityCatalog

return native_catalog.register_python_catalog(python_catalog, name)
return UnityCatalog._from_obj(obj)
except ImportError:
raise ImportError("Unity support not installed: pip install -U 'getdaft[unity]'")

Check warning on line 167 in daft/catalog/__init__.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/__init__.py#L166-L167

Added lines #L166 - L167 were not covered by tests

@staticmethod
def _from_obj(obj: object) -> Catalog:
"""Returns a Daft Catalog from a supported object type or raises a ValueError."""
for factory in (Catalog.from_iceberg, Catalog.from_unity):
try:
return factory(obj)
except ValueError:
pass
except ImportError:
pass
raise ValueError(

Check warning on line 179 in daft/catalog/__init__.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/__init__.py#L177-L179

Added lines #L177 - L179 were not covered by tests
f"Unsupported catalog type: {type(obj)}; please ensure all required extra dependencies are installed."
)

###
# list_*
###

@abstractmethod
def list_tables(self, pattern: str | None = None) -> list[str]: ...

###
# get_*
###

@abstractmethod
def get_table(self, name: str) -> Table: ...

# TODO deprecated catalog APIs #3819
def load_table(self, name: str) -> Table:
"""DEPRECATED: Please use `get_table` instead; version=0.5.0!"""
warnings.warn(

Check warning on line 200 in daft/catalog/__init__.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/__init__.py#L200

Added line #L200 was not covered by tests
"This is deprecated and will be removed in daft >= 0.5.0, please use `get_table` instead.",
category=DeprecationWarning,
)
return self.get_table(name)

Check warning on line 204 in daft/catalog/__init__.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/__init__.py#L204

Added line #L204 was not covered by tests


class Identifier(Sequence):
Expand All @@ -168,36 +212,35 @@
>>> assert len(id) == 2
"""

_identifier: native_catalog.PyIdentifier
_identifier: PyIdentifier

def __init__(self, *parts: str):
"""Creates an Identifier from its parts.

Example:
>>> Identifier("schema", "table")
>>> #
>>> Identifier("namespace", "table")

Returns:
Identifier: A new identifier.
"""
if len(parts) < 1:
raise ValueError("Identifier requires at least one part.")
self._identifier = native_catalog.PyIdentifier(parts[:-1], parts[-1])
self._identifier = PyIdentifier(parts[:-1], parts[-1])

@staticmethod
def from_sql(input: str, normalize: bool = False) -> Identifier:
"""Parses an Identifier from an SQL string, normalizing to lowercase if specified.

Example:
>>> Identifier.from_sql("schema.table") == Identifier("schema", "table")
>>> Identifier.from_sql("namespace.table") == Identifier("namespace", "table")
>>> Identifier.from_sql('"a.b"') == Identifier('"a.b."')
>>> Identifier.from_sql('ABC."xYz"', normalize=True) == Identifier("abc", "xYz")

Returns:
Identifier: A new identifier.
"""
i = Identifier.__new__(Identifier)
i._identifier = native_catalog.PyIdentifier.from_sql(input, normalize)
i._identifier = PyIdentifier.from_sql(input, normalize)
return i

def __eq__(self, other: object) -> bool:
Expand All @@ -216,3 +259,32 @@

def __repr__(self) -> str:
return f"Identifier('{self._identifier.__repr__()}')"


class Table(ABC):
"""Interface for python table implementations."""

@property
def inner(self) -> object | None:
"""Returns the inner table object if this is an adapter."""

# TODO deprecated catalog APIs #3819
def to_dataframe(self) -> DataFrame:
"""DEPRECATED: Please use `read` instead; version 0.5.0!"""
warnings.warn(

Check warning on line 274 in daft/catalog/__init__.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/__init__.py#L274

Added line #L274 was not covered by tests
"This is deprecated and will be removed in daft >= 0.5.0, please use `read` instead.",
category=DeprecationWarning,
)
return self.read()

Check warning on line 278 in daft/catalog/__init__.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/__init__.py#L278

Added line #L278 was not covered by tests

@abstractmethod
def read(self) -> DataFrame:
"""Returns a DataFrame from this table."""

def select(self, *columns: ColumnInputType) -> DataFrame:
"""Returns a DataFrame from this table with the selected columns."""
return self.read().select(*columns)

Check warning on line 286 in daft/catalog/__init__.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/__init__.py#L286

Added line #L286 was not covered by tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, but I do like the UX and don't think read() should take any arguments.


def show(self, n: int = 8) -> None:
"""Shows the first n rows from this table."""
return self.read().show(n)

Check warning on line 290 in daft/catalog/__init__.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/__init__.py#L290

Added line #L290 was not covered by tests
Loading
Loading