Skip to content

Commit 5d70408

Browse files
committed
Defines the catalog trait/abc across both sides
1 parent 60feaaf commit 5d70408

20 files changed

+345
-180
lines changed

Cargo.lock

-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

daft/__init__.py

+7-1
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,12 @@ def refresh_logger() -> None:
5858
# Daft top-level imports
5959
###
6060

61-
from daft.catalog import Catalog, Identifier, load_catalog
61+
from daft.catalog import (
62+
Catalog,
63+
Identifier,
64+
Table,
65+
load_catalog,
66+
)
6267
from daft.convert import (
6368
from_arrow,
6469
from_dask_dataframe,
@@ -113,6 +118,7 @@ def refresh_logger() -> None:
113118
"Schema",
114119
"Series",
115120
"Session",
121+
"Table",
116122
"TimeUnit",
117123
"coalesce",
118124
"col",

daft/catalog/__init__.py

+55-6
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
1-
"""Add catalog documentation..
2-
"""
1+
"""The daft-catalog moduel documentation..."""
32

43
from __future__ import annotations
54
from abc import ABC, abstractmethod
65
from typing import Sequence
76
from collections.abc import Sequence
87
from daft.daft import PyCatalog, PyIdentifier
9-
from daft.table import Source, Table
8+
from daft.dataframe import DataFrame
9+
from daft.expressions import Expression
10+
from daft.logical.schema import Schema
11+
from daft.recordbatch import MicroPartition
12+
1013

1114
def load_catalog(name: str, options: object | None = None) -> Catalog:
1215
"""Loads a new catalog from the configuration options or creates an in-memory catalog if none given."""
@@ -15,15 +18,18 @@ def load_catalog(name: str, options: object | None = None) -> Catalog:
1518
else:
1619
return Catalog._from_some(name, options)
1720

21+
1822
class Catalog(ABC):
23+
"""Catalog documentation..."""
1924

2025
def __repr__(self) -> str:
2126
return f"Catalog('{self.name()}')"
2227

2328
@staticmethod
2429
def _from_none(name: str):
25-
from daft.catalog.__memory import MemoryCatalog
26-
return MemoryCatalog(name)
30+
from daft.catalog.__temp import TempCatalog
31+
32+
return TempCatalog(name)
2733

2834
@staticmethod
2935
def _from_some(name: str, options: object) -> Catalog:
@@ -140,14 +146,57 @@ def __len__(self) -> int:
140146
def __repr__(self) -> str:
141147
return f"Identifier('{self._identifier.__repr__()}')"
142148

149+
143150
# TODO make a sequence
144151
Namespace = tuple[str]
145152

153+
154+
# TODO for future sources, consider https://github.com/Eventual-Inc/Daft/pull/2864
155+
# pandas/arrow/arrow_record_batches/pydict
156+
TableSource = Schema | DataFrame | str | None
157+
158+
159+
class Table(ABC):
160+
"""Table documentation..."""
161+
162+
def __repr__(self) -> str:
163+
return f"Table('{self._name}')"
164+
165+
@abstractmethod
166+
def name(self) -> str:
167+
"""Returns the table name."""
168+
169+
@abstractmethod
170+
def schema(self) -> Schema:
171+
"""Returns the table schema."""
172+
173+
###
174+
# Creation Methods
175+
###
176+
177+
@staticmethod
178+
def _from_source(name: str, source: TableSource = None) -> Table:
179+
from daft.catalog.__temp import TempTable
180+
return TempTable._from_source(name, source)
181+
182+
###
183+
# DataFrame Methods
184+
###
185+
186+
@abstractmethod
187+
def read(self) -> DataFrame:
188+
"""Returns a DataFrame from this table."""
189+
190+
@abstractmethod
191+
def show(self, n: int = 8) -> None:
192+
"""Shows the first n rows from this table."""
193+
194+
146195
__all__ = [
147196
"Catalog",
148197
"Identifier",
149198
"Namespace",
150-
"Source",
151199
"Table",
200+
"TableSource",
152201
"load_catalog",
153202
]

daft/catalog/__memory.py

-14
This file was deleted.

daft/table/__init__.py daft/catalog/__temp.py

+20-18
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,29 @@
11
from __future__ import annotations
22

3-
from daft.dataframe import DataFrame
4-
from daft.expressions import Expression
3+
from daft.catalog import Catalog, Table, TableSource
4+
from daft.dataframe.dataframe import DataFrame
55
from daft.logical.schema import Schema
6-
from daft.recordbatch import MicroPartition
6+
from daft.recordbatch.micropartition import MicroPartition
77

8-
# for future sources, consider https://github.com/Eventual-Inc/Daft/pull/2864
9-
# pandas/arrow/arrow_record_batches/pydict
10-
Source = Schema | DataFrame | str | None
118

12-
class Table:
9+
class TempCatalog(Catalog):
10+
"""A temporary catalog scoped to a given session."""
11+
12+
def __init__(self, name: str):
13+
self._name: str = name
14+
15+
def __repr__(self) -> str:
16+
return f"TempCatalog('{self._name}')"
17+
18+
def name(self) -> str:
19+
return self._name
20+
21+
22+
class TempTable(Table):
23+
"""A temp table holds a reference to an existing dataframe."""
1324

1425
def __init__(self) -> Table:
15-
raise NotImplementedError("Creating a Table via __init__ is not supported")
26+
raise NotImplementedError("Creating a TempTable via __init__ is not supported")
1627

1728
def name(self) -> str:
1829
return self._name
@@ -23,12 +34,8 @@ def schema(self) -> Schema:
2334
def __repr__(self) -> str:
2435
return f"table('{self._name}')"
2536

26-
###
27-
# Creation Methods
28-
###
29-
3037
@staticmethod
31-
def _from_source(name: str, source: Source = None) -> Table:
38+
def _from_source(name: str, source: TableSource = None) -> Table:
3239
if source is None:
3340
return Table._from_none(name)
3441
elif isinstance(source, DataFrame):
@@ -74,8 +81,3 @@ def read(self) -> DataFrame:
7481

7582
def show(self, n: int = 8) -> None:
7683
return self._inner.show(n)
77-
78-
79-
__all__ = [
80-
"Table",
81-
]

daft/session.py

+18-11
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,8 @@
22

33
from daft.context import DaftContext, get_context
44
from daft.dataframe import DataFrame
5-
from daft.catalog import Catalog, Namespace
5+
from daft.catalog import Catalog, Identifier, Namespace, Table, TableSource
66
from daft.daft import PySession
7-
from daft.table import Source, Table
87

98
class Session:
109

@@ -57,14 +56,20 @@ def create_catalog(self, name: str) -> Catalog:
5756
"""Create a new catalog scoped to this session."""
5857
return self._session.create_catalog(name)
5958

60-
# def create_namespace(self, name: str) -> Namespace:
61-
# """Create a new namespace scope to this session's current catalog."""
62-
# return self._session.create_namespace(name)
59+
def create_namespace(self, name: str) -> Namespace:
60+
"""Create a new namespace scope to this session's current catalog."""
61+
return self._session.create_namespace(name)
6362

64-
def create_table(self, name: str, source: Source = None) -> Table:
63+
def create_table(self, name: str, source: TableSource = None) -> Table:
6564
"""Creates a new table scoped to this session's current catalog and namespace."""
6665
return self._session.create_table(name, source)
6766

67+
def create_temp_table(self, name: str, source: TableSource = None) -> Table:
68+
"""Creates a temp table scoped to this session's lifetime."""
69+
# TODO implement TableSource on the rust side.
70+
source = Table._from_source(name, source)
71+
return self._session.create_temp_table(name, source)
72+
6873
###
6974
# session state
7075
###
@@ -73,9 +78,9 @@ def current_catalog(self) -> Catalog:
7378
"""Returns the session's current catalog."""
7479
return self._session.current_catalog()
7580

76-
# def current_namespace(self) -> Namespace:
77-
# """Returns the session's current namespace."""
78-
# return self._session.current_namespace()
81+
def current_namespace(self) -> Namespace:
82+
"""Returns the session's current namespace."""
83+
return self._session.current_namespace()
7984

8085
###
8186
# get_*
@@ -89,8 +94,10 @@ def get_namespace(self, name: str) -> Namespace:
8994
"""Returns the namespace or raises an exception if it does not exist."""
9095
return self._session.get_namespace(name)
9196

92-
def get_table(self, name: str) -> Table:
93-
"""Returns the table ."""
97+
def get_table(self, name: str | Identifier) -> Table:
98+
"""Returns the table or raises an exception if it does not exist."""
99+
if isinstance(name, str):
100+
name = Identifier(*name.split("."))
94101
return self._session.get_table(name)
95102

96103
###

src/daft-catalog/Cargo.toml

-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
common-error = {path = "../common/error", default-features = false}
33
daft-core = {path = "../daft-core", default-features = false}
44
daft-logical-plan = {path = "../daft-logical-plan", default-features = false}
5-
lazy_static = {workspace = true}
65
pyo3 = {workspace = true, optional = true}
76
sqlparser = {workspace = true}
87
snafu.workspace = true

src/daft-catalog/src/bindings.rs

+55
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
use std::collections::HashMap;
2+
3+
/// Bindings
4+
///
5+
/// Notes:
6+
/// - Currently using Clone because all references are arcs.
7+
/// - This is intentionally lightweight and everything is exact-case.
8+
/// - All APIs are non-fallible because callers determine what is an error.
9+
/// - It does not necessarily have map semantics.
10+
/// - Callers are responsible for case-normalization hence String, &str.
11+
/// - Intentionally using String and &str rather than Into and AsRef.
12+
#[derive(Debug)]
13+
pub struct Bindings<T>(HashMap<String, T>);
14+
15+
impl<T> Bindings<T> {
16+
/// Creates an empty catalog provider.
17+
pub fn empty() -> Self {
18+
Self(HashMap::new())
19+
}
20+
21+
/// Inserts a new binding with ownership.
22+
pub fn insert(&mut self, name: String, object: T) {
23+
self.0.insert(name, object);
24+
}
25+
26+
/// Removes the binding if it exists.
27+
pub fn remove(&mut self, name: &str) {
28+
self.0.remove(name);
29+
}
30+
31+
/// Returns true if the binding exists.
32+
pub fn exists(&self, name: &str) -> bool {
33+
self.0.contains_key(name)
34+
}
35+
36+
/// Get an object reference by name.
37+
pub fn get(&self, name: &str) -> Option<&T> {
38+
self.0.get(name)
39+
}
40+
41+
/// List all objects matching the pattern.
42+
pub fn list(&self, pattern: Option<&str>) -> Vec<String> {
43+
self.0
44+
.keys()
45+
.map(|k| k.as_str())
46+
.filter(|k| pattern.is_none() || k.contains(pattern.unwrap_or("")))
47+
.map(|k| k.to_string())
48+
.collect()
49+
}
50+
51+
/// Returns true iff there are no bindings.
52+
pub fn is_empty(&self) -> bool {
53+
self.0.is_empty()
54+
}
55+
}

0 commit comments

Comments
 (0)