-
Notifications
You must be signed in to change notification settings - Fork 190
/
Copy path__iceberg.py
79 lines (58 loc) · 2.29 KB
/
__iceberg.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
"""WARNING! These APIs are internal; please use Catalog.from_iceberg() and Table.from_iceberg()."""
from __future__ import annotations
import warnings
from typing import TYPE_CHECKING
from pyiceberg.catalog import Catalog as InnerCatalog
from pyiceberg.table import Table as InnerTable
from daft.catalog import Catalog, Table
if TYPE_CHECKING:
from daft.dataframe import DataFrame
class IcebergCatalog(Catalog):
_inner: InnerCatalog
def __init__(self, pyiceberg_catalog: InnerCatalog):
"""DEPRECATED: Please use `Catalog.from_iceberg`; version 0.5.0!"""
warnings.warn(
"This is deprecated and will be removed in daft >= 0.5.0, please use `Catalog.from_iceberg` instead.",
category=DeprecationWarning,
)
self._inner = pyiceberg_catalog
@staticmethod
def _from_obj(obj: object) -> IcebergCatalog:
"""Returns an IcebergCatalog instance if the given object can be adapted so."""
if isinstance(obj, InnerCatalog):
c = IcebergCatalog.__new__(IcebergCatalog)
c._inner = obj
return c
raise ValueError(f"Unsupported iceberg catalog type: {type(obj)}")
@property
def inner(self) -> InnerCatalog:
"""Returns the inner iceberg catalog."""
return self._inner
###
# get_*
###
def get_table(self, name: str) -> IcebergTable:
return IcebergTable(self._inner.load_table(name))
###
# list_*
###
def list_tables(self, pattern: str | None = None) -> list[str]:
"""List tables under the given namespace (pattern) in the catalog, or all tables if no namespace is provided."""
return [".".join(tup) for tup in self._inner.list_tables(pattern)]
class IcebergTable(Table):
_inner: InnerTable
def __init__(self, inner: InnerTable):
self._inner = inner
@staticmethod
def _try_from(obj: object) -> IcebergTable | None:
"""Returns an IcebergTable if the given object can be adapted so."""
if isinstance(obj, InnerTable):
return IcebergTable(obj)
return None
@property
def inner(self) -> InnerTable:
"""Returns the inner iceberg table."""
return self._inner
def read(self) -> DataFrame:
import daft
return daft.read_iceberg(self._inner)