Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(catalog): Prepare existing catalog APIs for integration [1/3] #3820

Merged
merged 6 commits into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions daft/catalog/__iceberg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
"""WARNING! These APIs are internal; please use Catalog.from_iceberg() and Table.from_iceberg()."""

from __future__ import annotations

from typing import TYPE_CHECKING

from pyiceberg.catalog import Catalog as InnerCatalog
from pyiceberg.table import Table as InnerTable

from daft.catalog import Catalog, Table

if TYPE_CHECKING:
from daft.dataframe import DataFrame

Check warning on line 13 in daft/catalog/__iceberg.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/__iceberg.py#L13

Added line #L13 was not covered by tests


class IcebergCatalog(Catalog):
_inner: InnerCatalog

def __init__(self, pyiceberg_catalog: InnerCatalog):
"""DEPRECATED: Please use `Catalog.from_iceberg`."""
self._inner = pyiceberg_catalog

@staticmethod
def _try_from(obj: object) -> IcebergCatalog | None:
"""Returns an IcebergCatalog instance if the given object can be adapted so."""
if isinstance(obj, InnerCatalog):
return IcebergCatalog(obj)
return None

@property
def inner(self) -> InnerCatalog:
"""Returns the inner iceberg catalog."""
return self._inner

Check warning on line 33 in daft/catalog/__iceberg.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/__iceberg.py#L33

Added line #L33 was not covered by tests

###
# get_*
###

def get_table(self, name: str) -> IcebergTable:
return IcebergTable(self._inner.load_table(name))

Check warning on line 40 in daft/catalog/__iceberg.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/__iceberg.py#L40

Added line #L40 was not covered by tests

###
# list_*
###

def list_tables(self, pattern: str | None = None) -> list[str]:
namespace = pattern if pattern else "" # pyiceberg lists on namespaces
return [".".join(tup) for tup in self._inner.list_tables(namespace)]

Check warning on line 48 in daft/catalog/__iceberg.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/__iceberg.py#L47-L48

Added lines #L47 - L48 were not covered by tests


class IcebergTable(Table):
_inner: InnerTable

def __init__(self, inner: InnerTable):
self._inner = inner

Check warning on line 55 in daft/catalog/__iceberg.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/__iceberg.py#L55

Added line #L55 was not covered by tests

@staticmethod
def _try_from(obj: object) -> IcebergTable | None:
"""Returns an IcebergTable if the given object can be adapted so."""
if isinstance(obj, InnerTable):
return IcebergTable(obj)
return None

Check warning on line 62 in daft/catalog/__iceberg.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/__iceberg.py#L60-L62

Added lines #L60 - L62 were not covered by tests

@property
def inner(self) -> InnerTable:
"""Returns the inner iceberg table."""
return self._inner

Check warning on line 67 in daft/catalog/__iceberg.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/__iceberg.py#L67

Added line #L67 was not covered by tests

def read(self) -> DataFrame:
import daft

Check warning on line 70 in daft/catalog/__iceberg.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/__iceberg.py#L70

Added line #L70 was not covered by tests

return daft.read_iceberg(self._inner)

Check warning on line 72 in daft/catalog/__iceberg.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/__iceberg.py#L72

Added line #L72 was not covered by tests
141 changes: 99 additions & 42 deletions daft/catalog/__init__.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
"""The `daft.catalog` module contains functionality for Data Catalogs.
"""The `daft.catalog` module contains functionality for Catalogs.

A Data Catalog can be understood as a system/service for users to discover, access and query their data.
Most commonly, users' data is represented as a "table". Some more modern Data Catalogs such as Unity Catalog
A Catalog can be understood as a system/service for users to discover, access and query their data.
Most commonly, users' data is represented as a "table". Some more modern Catalogs such as Unity Catalog
also expose other types of data including files, ML models, registered functions and more.

Examples of Data Catalogs include AWS Glue, Hive Metastore, Apache Iceberg REST and Unity Catalog.
Examples of Catalogs include AWS Glue, Hive Metastore, Apache Iceberg REST and Unity Catalog.

Daft manages Data Catalogs by registering them in an internal meta-catalog, called the "DaftMetaCatalog". This
is simple a collection of data catalogs, which Daft will attempt to detect from a users' current environment.

**Data Catalog**
**Catalog**

Daft recognizes a default catalog which it will attempt to use when no specific catalog name is provided.

Expand All @@ -20,13 +17,14 @@

**Named Tables**

Daft allows also the registration of named tables, which have no catalog associated with them.
Daft allows also the registration of temporary tables, which have no catalog associated with them.

Note that named tables take precedence over the default catalog's table names when resolving names.
Note that temporary tables take precedence over catalog tables when resolving unqualified names.

```python
df = daft.from_pydict({"foo": [1, 2, 3]})

# TODO deprecated catalog APIs #3819
daft.catalog.register_table(
"my_table",
df,
Expand All @@ -40,31 +38,36 @@

from __future__ import annotations

from abc import ABC, abstractmethod
from collections.abc import Sequence
from daft.daft import catalog as native_catalog
from daft.daft import PyIdentifier
from daft.logical.builder import LogicalPlanBuilder

from daft.dataframe import DataFrame

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from pyiceberg.catalog import Catalog as PyIcebergCatalog
from daft.unity_catalog import UnityCatalog
from daft.dataframe.dataframe import ColumnInputType

Check warning on line 52 in daft/catalog/__init__.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/__init__.py#L52

Added line #L52 was not covered by tests


__all__ = [
"Catalog",
"Identifier",
"Table",
# TODO deprecated catalog APIs #3819
"read_table",
"register_python_catalog",
"register_table",
"unregister_catalog",
]

# Forward imports from the native catalog which don't require Python wrappers
# TODO deprecated catalog APIs #3819
unregister_catalog = native_catalog.unregister_catalog


# TODO deprecated catalog APIs #3819
def read_table(name: str) -> DataFrame:
"""Finds a table with the specified name and reads it as a DataFrame.

Expand All @@ -84,6 +87,7 @@
return DataFrame(LogicalPlanBuilder(native_logical_plan_builder))


# TODO deprecated catalog APIs #3819
def register_table(name: str, dataframe: DataFrame) -> str:
"""Register a DataFrame as a named table.

Expand All @@ -105,7 +109,8 @@
return native_catalog.register_table(name, dataframe._builder._builder)


def register_python_catalog(catalog: PyIcebergCatalog | UnityCatalog, name: str | None = None) -> str:
# TODO deprecated catalog APIs #3819
def register_python_catalog(catalog: object, name: str | None = None) -> str:
"""Registers a Python catalog with Daft.

Currently supports:
Expand All @@ -129,35 +134,63 @@
>>> daft.catalog.register_python_catalog(catalog, "my_daft_catalog")

"""
_PYICEBERG_AVAILABLE = False
try:
from pyiceberg.catalog import Catalog as PyIcebergCatalog
if (c := Catalog._try_from(catalog)) is not None:
return native_catalog.register_python_catalog(c, name)
raise ValueError(f"Unsupported catalog type: {type(catalog)}")

Check warning on line 139 in daft/catalog/__init__.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/__init__.py#L137-L139

Added lines #L137 - L139 were not covered by tests


class Catalog(ABC):
"""Interface for python catalog implementations."""

_PYICEBERG_AVAILABLE = True
except ImportError:
pass
@property
def inner(self) -> object | None:
"""Returns the inner catalog object if this is an adapter."""

@staticmethod
def _try_from(obj: object) -> Catalog | None:
for factory in (Catalog._try_from_iceberg, Catalog._try_from_unity):
if (c := factory(obj)) is not None:
return c
return None

Check warning on line 154 in daft/catalog/__init__.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/__init__.py#L154

Added line #L154 was not covered by tests

_UNITY_AVAILABLE = False
try:
from daft.unity_catalog import UnityCatalog
@staticmethod
def _try_from_iceberg(obj: object) -> Catalog | None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think most accurately, from_pyiceberg

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True. I went for consistency with daft.read_iceberg, but could go either way.

"""Returns a Daft Catalog instance from an Iceberg catalog."""
try:
from daft.catalog.__iceberg import IcebergCatalog

return IcebergCatalog._try_from(obj)
except ImportError:
return None

Check warning on line 164 in daft/catalog/__init__.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/__init__.py#L163-L164

Added lines #L163 - L164 were not covered by tests

@staticmethod
def _try_from_unity(obj: object) -> Catalog | None:
"""Returns a Daft Catalog instance from a Unity catalog."""
try:
from daft.catalog.__unity import UnityCatalog

_UNITY_AVAILABLE = True
except ImportError:
pass
return UnityCatalog._try_from(obj)
except ImportError:
return None

Check warning on line 174 in daft/catalog/__init__.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/__init__.py#L173-L174

Added lines #L173 - L174 were not covered by tests

python_catalog: PyIcebergCatalog
if _PYICEBERG_AVAILABLE and isinstance(catalog, PyIcebergCatalog):
from daft.catalog.pyiceberg import PyIcebergCatalogAdaptor
###
# list_*
###

python_catalog = PyIcebergCatalogAdaptor(catalog)
elif _UNITY_AVAILABLE and isinstance(catalog, UnityCatalog):
from daft.catalog.unity import UnityCatalogAdaptor
@abstractmethod
def list_tables(self, pattern: str | None = None) -> list[str]: ...

python_catalog = UnityCatalogAdaptor(catalog)
else:
raise ValueError(f"Unsupported catalog type: {type(catalog)}")
###
# get_*
###

return native_catalog.register_python_catalog(python_catalog, name)
@abstractmethod
def get_table(self, name: str) -> Table: ...

# TODO deprecated catalog APIs #3819
def load_table(self, name: str) -> Table:
"""DEPRECATED: Please use get_table(name: str)."""
return self.get_table(name)

Check warning on line 193 in daft/catalog/__init__.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/__init__.py#L193

Added line #L193 was not covered by tests


class Identifier(Sequence):
Expand All @@ -168,36 +201,35 @@
>>> assert len(id) == 2
"""

_identifier: native_catalog.PyIdentifier
_identifier: PyIdentifier

def __init__(self, *parts: str):
"""Creates an Identifier from its parts.

Example:
>>> Identifier("schema", "table")
>>> #
>>> Identifier("namespace", "table")

Returns:
Identifier: A new identifier.
"""
if len(parts) < 1:
raise ValueError("Identifier requires at least one part.")
self._identifier = native_catalog.PyIdentifier(parts[:-1], parts[-1])
self._identifier = PyIdentifier(parts[:-1], parts[-1])

@staticmethod
def from_sql(input: str, normalize: bool = False) -> Identifier:
"""Parses an Identifier from an SQL string, normalizing to lowercase if specified.

Example:
>>> Identifier.from_sql("schema.table") == Identifier("schema", "table")
>>> Identifier.from_sql("namespace.table") == Identifier("namespace", "table")
>>> Identifier.from_sql('"a.b"') == Identifier('"a.b."')
>>> Identifier.from_sql('ABC."xYz"', normalize=True) == Identifier("abc", "xYz")

Returns:
Identifier: A new identifier.
"""
i = Identifier.__new__(Identifier)
i._identifier = native_catalog.PyIdentifier.from_sql(input, normalize)
i._identifier = PyIdentifier.from_sql(input, normalize)
return i

def __eq__(self, other: object) -> bool:
Expand All @@ -216,3 +248,28 @@

def __repr__(self) -> str:
return f"Identifier('{self._identifier.__repr__()}')"


class Table(ABC):
"""Interface for python table implementations."""

@property
def inner(self) -> object | None:
"""Returns the inner table object if this is an adapter."""

# TODO deprecated catalog APIs #3819
def to_dataframe(self) -> DataFrame:
"""DEPRECATED: Please use `read()`."""
return self.read()

Check warning on line 263 in daft/catalog/__init__.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/__init__.py#L263

Added line #L263 was not covered by tests

@abstractmethod
def read(self) -> DataFrame:
"""Returns a DataFrame from this table."""

def select(self, *columns: ColumnInputType) -> DataFrame:
"""Returns a DataFrame from this table with the selected columns."""
return self.read().select(*columns)

Check warning on line 271 in daft/catalog/__init__.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/__init__.py#L271

Added line #L271 was not covered by tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, but I do like the UX and don't think read() should take any arguments.


def show(self, n: int = 8) -> None:
"""Shows the first n rows from this table."""
self.read().show(n)

Check warning on line 275 in daft/catalog/__init__.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/__init__.py#L275

Added line #L275 was not covered by tests
Loading
Loading