Skip to content

Commit b60e0b6

Browse files
committed
Defines the catalog trait/abc across both sides
1 parent 0e84849 commit b60e0b6

20 files changed

+336
-177
lines changed

Cargo.lock

-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

daft/__init__.py

+7-1
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,12 @@ def refresh_logger() -> None:
5858
# Daft top-level imports
5959
###
6060

61-
from daft.catalog import Catalog, Identifier, load_catalog
61+
from daft.catalog import (
62+
Catalog,
63+
Identifier,
64+
Table,
65+
load_catalog,
66+
)
6267
from daft.convert import (
6368
from_arrow,
6469
from_dask_dataframe,
@@ -113,6 +118,7 @@ def refresh_logger() -> None:
113118
"Schema",
114119
"Series",
115120
"Session",
121+
"Table",
116122
"TimeUnit",
117123
"coalesce",
118124
"col",

daft/catalog/__init__.py

+55-6
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
1-
"""Add catalog documentation..
2-
"""
1+
"""The daft-catalog moduel documentation..."""
32

43
from __future__ import annotations
54
from abc import ABC, abstractmethod
65
from typing import Sequence
76
from collections.abc import Sequence
87
from daft.daft import PyCatalog, PyIdentifier
9-
from daft.table import Source, Table
8+
from daft.dataframe import DataFrame
9+
from daft.expressions import Expression
10+
from daft.logical.schema import Schema
11+
from daft.recordbatch import MicroPartition
12+
1013

1114
def load_catalog(name: str, options: object | None = None) -> Catalog:
1215
"""Loads a new catalog from the configuration options or creates an in-memory catalog if none given."""
@@ -15,15 +18,18 @@ def load_catalog(name: str, options: object | None = None) -> Catalog:
1518
else:
1619
return Catalog._from_some(name, options)
1720

21+
1822
class Catalog(ABC):
23+
"""Catalog documentation..."""
1924

2025
def __repr__(self) -> str:
2126
return f"Catalog('{self.name()}')"
2227

2328
@staticmethod
2429
def _from_none(name: str):
25-
from daft.catalog.__memory import MemoryCatalog
26-
return MemoryCatalog(name)
30+
from daft.catalog.__temp import TempCatalog
31+
32+
return TempCatalog(name)
2733

2834
@staticmethod
2935
def _from_some(name: str, options: object) -> Catalog:
@@ -140,14 +146,57 @@ def __len__(self) -> int:
140146
def __repr__(self) -> str:
141147
return f"Identifier('{self._identifier.__repr__()}')"
142148

149+
143150
# TODO make a sequence
144151
Namespace = tuple[str]
145152

153+
154+
# TODO for future sources, consider https://github.com/Eventual-Inc/Daft/pull/2864
155+
# pandas/arrow/arrow_record_batches/pydict
156+
TableSource = Schema | DataFrame | str | None
157+
158+
159+
class Table(ABC):
160+
"""Table documentation..."""
161+
162+
def __repr__(self) -> str:
163+
return f"Table('{self._name}')"
164+
165+
@abstractmethod
166+
def name(self) -> str:
167+
"""Returns the table name."""
168+
169+
@abstractmethod
170+
def schema(self) -> Schema:
171+
"""Returns the table schema."""
172+
173+
###
174+
# Creation Methods
175+
###
176+
177+
@staticmethod
178+
def _from_source(name: str, source: TableSource = None) -> Table:
179+
from daft.catalog.__temp import TempTable
180+
return TempTable._from_source(name, source)
181+
182+
###
183+
# DataFrame Methods
184+
###
185+
186+
@abstractmethod
187+
def read(self) -> DataFrame:
188+
"""Returns a DataFrame from this table."""
189+
190+
@abstractmethod
191+
def show(self, n: int = 8) -> None:
192+
"""Shows the first n rows from this table."""
193+
194+
146195
__all__ = [
147196
"Catalog",
148197
"Identifier",
149198
"Namespace",
150-
"Source",
151199
"Table",
200+
"TableSource",
152201
"load_catalog",
153202
]

daft/catalog/__memory.py

-14
This file was deleted.

daft/table/__init__.py daft/catalog/__temp.py

+20-18
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,29 @@
11
from __future__ import annotations
22

3-
from daft.dataframe import DataFrame
4-
from daft.expressions import Expression
3+
from daft.catalog import Catalog, Table, TableSource
4+
from daft.dataframe.dataframe import DataFrame
55
from daft.logical.schema import Schema
6-
from daft.recordbatch import MicroPartition
6+
from daft.recordbatch.micropartition import MicroPartition
77

8-
# for future sources, consider https://github.com/Eventual-Inc/Daft/pull/2864
9-
# pandas/arrow/arrow_record_batches/pydict
10-
Source = Schema | DataFrame | str | None
118

12-
class Table:
9+
class TempCatalog(Catalog):
10+
"""A temporary catalog scoped to a given session."""
11+
12+
def __init__(self, name: str):
13+
self._name: str = name
14+
15+
def __repr__(self) -> str:
16+
return f"TempCatalog('{self._name}')"
17+
18+
def name(self) -> str:
19+
return self._name
20+
21+
22+
class TempTable(Table):
23+
"""A temp table holds a reference to an existing dataframe."""
1324

1425
def __init__(self) -> Table:
15-
raise NotImplementedError("Creating a Table via __init__ is not supported")
26+
raise NotImplementedError("Creating a TempTable via __init__ is not supported")
1627

1728
def name(self) -> str:
1829
return self._name
@@ -23,12 +34,8 @@ def schema(self) -> Schema:
2334
def __repr__(self) -> str:
2435
return f"table('{self._name}')"
2536

26-
###
27-
# Creation Methods
28-
###
29-
3037
@staticmethod
31-
def _from_source(name: str, source: Source = None) -> Table:
38+
def _from_source(name: str, source: TableSource = None) -> Table:
3239
if source is None:
3340
return Table._from_none(name)
3441
elif isinstance(source, DataFrame):
@@ -74,8 +81,3 @@ def read(self) -> DataFrame:
7481

7582
def show(self, n: int = 8) -> None:
7683
return self._inner.show(n)
77-
78-
79-
__all__ = [
80-
"Table",
81-
]

daft/session.py

+6-8
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
11
from __future__ import annotations
22

3-
from daft.daft import PySession
43
from daft.dataframe import DataFrame
54
from daft.catalog import Catalog, Identifier, Namespace, Table, TableSource
65
from daft.daft import PySession
76

8-
97
class Session:
108
"""Session holds a connection's state and orchestrates execution of DataFrame and SQL queries against catalogs."""
119

@@ -68,9 +66,9 @@ def create_catalog(self, name: str) -> Catalog:
6866
"""Create a new catalog scoped to this session."""
6967
return self._session.create_catalog(name)
7068

71-
# def create_namespace(self, name: str) -> Namespace:
72-
# """Create a new namespace scope to this session's current catalog."""
73-
# return self._session.create_namespace(name)
69+
def create_namespace(self, name: str) -> Namespace:
70+
"""Create a new namespace scope to this session's current catalog."""
71+
return self._session.create_namespace(name)
7472

7573
def create_table(self, name: str, source: TableSource = None) -> Table:
7674
"""Creates a new table scoped to this session's current catalog and namespace."""
@@ -90,9 +88,9 @@ def current_catalog(self) -> Catalog:
9088
"""Returns the session's current catalog."""
9189
return self._session.current_catalog()
9290

93-
# def current_namespace(self) -> Namespace:
94-
# """Returns the session's current namespace."""
95-
# return self._session.current_namespace()
91+
def current_namespace(self) -> Namespace:
92+
"""Returns the session's current namespace."""
93+
return self._session.current_namespace()
9694

9795
###
9896
# get_*

src/daft-catalog/Cargo.toml

-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
common-error = {path = "../common/error", default-features = false}
33
daft-core = {path = "../daft-core", default-features = false}
44
daft-logical-plan = {path = "../daft-logical-plan", default-features = false}
5-
lazy_static = {workspace = true}
65
pyo3 = {workspace = true, optional = true}
76
sqlparser = {workspace = true}
87
snafu.workspace = true

src/daft-catalog/src/bindings.rs

+55
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
use std::collections::HashMap;
2+
3+
/// Bindings
4+
///
5+
/// Notes:
6+
/// - Currently using Clone because all references are arcs.
7+
/// - This is intentionally lightweight and everything is exact-case.
8+
/// - All APIs are non-fallible because callers determine what is an error.
9+
/// - It does not necessarily have map semantics.
10+
/// - Callers are responsible for case-normalization hence String, &str.
11+
/// - Intentionally using String and &str rather than Into and AsRef.
12+
#[derive(Debug)]
13+
pub struct Bindings<T>(HashMap<String, T>);
14+
15+
impl<T> Bindings<T> {
16+
/// Creates an empty catalog provider.
17+
pub fn empty() -> Self {
18+
Self(HashMap::new())
19+
}
20+
21+
/// Inserts a new binding with ownership.
22+
pub fn insert(&mut self, name: String, object: T) {
23+
self.0.insert(name, object);
24+
}
25+
26+
/// Removes the binding if it exists.
27+
pub fn remove(&mut self, name: &str) {
28+
self.0.remove(name);
29+
}
30+
31+
/// Returns true if the binding exists.
32+
pub fn exists(&self, name: &str) -> bool {
33+
self.0.contains_key(name)
34+
}
35+
36+
/// Get an object reference by name.
37+
pub fn get(&self, name: &str) -> Option<&T> {
38+
self.0.get(name)
39+
}
40+
41+
/// List all objects matching the pattern.
42+
pub fn list(&self, pattern: Option<&str>) -> Vec<String> {
43+
self.0
44+
.keys()
45+
.map(|k| k.as_str())
46+
.filter(|k| pattern.is_none() || k.contains(pattern.unwrap_or("")))
47+
.map(|k| k.to_string())
48+
.collect()
49+
}
50+
51+
/// Returns true iff there are no bindings.
52+
pub fn is_empty(&self) -> bool {
53+
self.0.is_empty()
54+
}
55+
}

src/daft-catalog/src/catalog.rs

+8-57
Original file line numberDiff line numberDiff line change
@@ -1,61 +1,12 @@
1-
use std::{collections::HashMap, sync::Arc};
1+
use std::sync::Arc;
22

3-
use pyo3::ffi::PyObject;
3+
use crate::{bindings::Bindings, error::Result, Identifier, Table};
44

5-
use crate::{error::Result, Identifier, Table};
5+
/// Catalog implementation reference.
6+
pub type CatalogRef = Arc<dyn Catalog>;
67

7-
/// Catalogs is a collection of referenceable catalogs (glorified map).
8-
///
9-
/// Notes:
10-
/// - This is intentionally lightweight and everything is exact-case.
11-
/// - All APIs are non-fallible because callers determine what is an error.
12-
/// - It does not necessarily have map semantics.
13-
/// - Callers are responsible for case-normalization hence String, &str.
14-
/// - Intentionally using String and &str rather than Into and AsRef.
15-
#[derive(Debug)]
16-
pub struct Catalogs(HashMap<String, Arc<dyn Catalog>>);
17-
18-
impl Catalogs {
19-
/// Creates an empty catalogs collection
20-
pub fn empty() -> Catalogs {
21-
Self(HashMap::new())
22-
}
23-
24-
/// Attaches a catalog to this catalog collection.
25-
pub fn attach(&mut self, name: String, catalog: Arc<dyn Catalog>) {
26-
self.0.insert(name, catalog);
27-
}
28-
29-
/// Detaches a catalog from this catalog collection.
30-
pub fn detach(&mut self, name: &str) {
31-
self.0.remove(name);
32-
}
33-
34-
/// Returns true iff a catalog with the given name exists (exact-case).
35-
pub fn exists(&self, name: &str) -> bool {
36-
self.0.contains_key(name)
37-
}
38-
39-
/// Get the catalog by name.
40-
pub fn get(&self, name: &str) -> Option<Arc<dyn Catalog>> {
41-
self.0.get(name).map(Arc::clone)
42-
}
43-
44-
/// Lists the catalogs
45-
pub fn list(&self, pattern: Option<&str>) -> Vec<String> {
46-
self.0
47-
.keys()
48-
.map(|k| k.as_str())
49-
.filter(|k| pattern.is_none() || k.contains(pattern.unwrap_or("")))
50-
.map(|k| k.to_string())
51-
.collect()
52-
}
53-
54-
/// Returns true iff there are no catalogs in this collection.
55-
pub fn is_empty(&self) -> bool {
56-
self.0.is_empty()
57-
}
58-
}
8+
/// CatalogProvider is a collection of referenceable catalogs.
9+
pub type CatalogProvider = Bindings<CatalogRef>;
5910

6011
/// A catalog provides object metadata such as namespaces, tables, and functions.
6112
pub trait Catalog: Sync + Send + std::fmt::Debug {
@@ -65,9 +16,9 @@ pub trait Catalog: Sync + Send + std::fmt::Debug {
6516
/// Returns the given table if it exists.
6617
fn get_table(&self, name: &Identifier) -> Result<Option<Box<dyn Table>>>;
6718

68-
/// Leverage dynamic dispatch to return the inner object for a PyObjectImpl (use generics?).
19+
/// Leverage dynamic dispatch to return the inner object for a PyCatalogImpl (generics?)
6920
#[cfg(feature = "python")]
7021
fn to_py(&self, _: pyo3::Python<'_>) -> pyo3::PyObject {
71-
panic!("missing to_py implementation; consider PyCatalog(self) as the blanket implementation")
22+
panic!("missing to_py implementation, consider PyCatalog(self) as the blanket implementation")
7223
}
7324
}

src/daft-catalog/src/lib.rs

+2
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
1+
mod bindings;
12
mod catalog;
23
pub mod error;
34
mod identifier;
45
mod table;
56

7+
pub use bindings::*;
68
pub use catalog::*;
79
pub use identifier::*;
810
pub use table::*;

0 commit comments

Comments
 (0)