@@ -16,227 +16,3 @@ pub use python::register_modules;
16
16
17
17
// TODO audit daft-catalog and daft-session errors.
18
18
pub mod error;
19
-
20
- // ----------------------------------
21
- // TODO deprecated catalog APIs #3819
22
- // ----------------------------------
23
-
24
- mod data_catalog;
25
- mod data_catalog_table;
26
-
27
- // Export public-facing traits
28
- use std:: { collections:: HashMap , default, sync:: Arc } ;
29
-
30
- use daft_logical_plan:: LogicalPlanBuilder ;
31
- pub use data_catalog:: DataCatalog ;
32
- pub use data_catalog_table:: DataCatalogTable ;
33
- use error:: { Error , Result } ;
34
-
35
- pub mod global_catalog {
36
- use std:: sync:: { Arc , RwLock } ;
37
-
38
- use lazy_static:: lazy_static;
39
-
40
- use crate :: { DaftCatalog , DataCatalog } ;
41
-
42
- lazy_static ! {
43
- pub ( crate ) static ref GLOBAL_DAFT_META_CATALOG : RwLock <DaftCatalog > =
44
- RwLock :: new( DaftCatalog :: new_from_env( ) ) ;
45
- }
46
-
47
- /// Register a DataCatalog with the global DaftMetaCatalog
48
- pub fn register_catalog ( catalog : Arc < dyn DataCatalog > , name : Option < & str > ) {
49
- GLOBAL_DAFT_META_CATALOG
50
- . write ( )
51
- . unwrap ( )
52
- . register_catalog ( catalog, name) ;
53
- }
54
-
55
- /// Unregisters a catalog with the global DaftMetaCatalog
56
- pub fn unregister_catalog ( name : Option < & str > ) -> bool {
57
- GLOBAL_DAFT_META_CATALOG
58
- . write ( )
59
- . unwrap ( )
60
- . unregister_catalog ( name)
61
- }
62
- }
63
-
64
- /// Name of the default catalog
65
- static DEFAULT_CATALOG_NAME : & str = "default" ;
66
-
67
- /// The [`DaftMetaCatalog`] is a catalog of [`DataCatalog`] implementations
68
- ///
69
- /// Users of Daft can register various [`DataCatalog`] with Daft, enabling
70
- /// discovery of tables across various [`DataCatalog`] implementations.
71
- #[ derive( Debug , Clone , Default ) ]
72
- pub struct DaftCatalog {
73
- /// Map of catalog names to the DataCatalog impls.
74
- ///
75
- /// NOTE: The default catalog is always named "default"
76
- data_catalogs : HashMap < String , Arc < dyn DataCatalog > > ,
77
-
78
- /// LogicalPlans that were "named" and registered with Daft
79
- named_tables : HashMap < String , LogicalPlanBuilder > ,
80
- }
81
-
82
- impl DaftCatalog {
83
- /// Create a `DaftMetaCatalog` from the current environment
84
- pub fn new_from_env ( ) -> Self {
85
- // TODO: Parse a YAML file to produce the catalog
86
- DaftCatalog {
87
- data_catalogs : default:: Default :: default ( ) ,
88
- named_tables : default:: Default :: default ( ) ,
89
- }
90
- }
91
-
92
- /// Register a new [`DataCatalog`] with the `DaftMetaCatalog`.
93
- ///
94
- /// # Arguments
95
- ///
96
- /// * `catalog` - The [`DataCatalog`] to register.
97
- pub fn register_catalog ( & mut self , catalog : Arc < dyn DataCatalog > , name : Option < & str > ) {
98
- let name = name. unwrap_or ( DEFAULT_CATALOG_NAME ) ;
99
- self . data_catalogs . insert ( name. to_string ( ) , catalog) ;
100
- }
101
-
102
- /// Unregister a [`DataCatalog`] from the `DaftMetaCatalog`.
103
- ///
104
- /// # Arguments
105
- ///
106
- /// * `name` - The name of the catalog to unregister. If None, the default catalog will be unregistered.
107
- ///
108
- /// # Returns
109
- ///
110
- /// Returns `true` if a catalog was successfully unregistered, `false` otherwise.
111
- pub fn unregister_catalog ( & mut self , name : Option < & str > ) -> bool {
112
- let name = name. unwrap_or ( DEFAULT_CATALOG_NAME ) ;
113
- self . data_catalogs . remove ( name) . is_some ( )
114
- }
115
-
116
- /// Registers a LogicalPlan with a name in the DaftMetaCatalog
117
- pub fn register_table (
118
- & mut self ,
119
- name : & str ,
120
- view : impl Into < LogicalPlanBuilder > ,
121
- ) -> Result < ( ) > {
122
- // TODO this API is being removed, for now preserve the exact name as if it were delimited.
123
- self . named_tables . insert ( name. into ( ) , view. into ( ) ) ;
124
- Ok ( ( ) )
125
- }
126
-
127
- /// Check if a named table is registered in the DaftCatalog
128
- pub fn contains_table ( & self , name : & str ) -> bool {
129
- self . named_tables . contains_key ( name)
130
- }
131
-
132
- /// Provides high-level functionality for reading a table of data against a [`DaftMetaCatalog`]
133
- ///
134
- /// Resolves the provided table_identifier against the catalog:
135
- ///
136
- /// 1. If there is an exact match for the provided `table_identifier` in the catalog's registered named tables, immediately return the exact match
137
- /// 2. If the [`DaftMetaCatalog`] has a default catalog, we will attempt to resolve the `table_identifier` against the default catalog
138
- /// 3. If the `table_identifier` is hierarchical (delimited by "."), use the first component as the Data Catalog name and resolve the rest of the components against
139
- /// the selected Data Catalog
140
- pub fn read_table ( & self , table_identifier : & str ) -> error:: Result < LogicalPlanBuilder > {
141
- // If the name is an exact match with a registered view, return it.
142
- if let Some ( view) = self . named_tables . get ( table_identifier) {
143
- return Ok ( view. clone ( ) ) ;
144
- }
145
-
146
- let mut searched_catalog_name = "default" ;
147
- let mut searched_table_name = table_identifier;
148
-
149
- // Check the default catalog for a match
150
- if let Some ( default_data_catalog) = self . data_catalogs . get ( DEFAULT_CATALOG_NAME ) {
151
- if let Some ( tbl) = default_data_catalog. get_table ( table_identifier) ? {
152
- return tbl. as_ref ( ) . to_logical_plan_builder ( ) ;
153
- }
154
- }
155
-
156
- // Try to parse the catalog name from the provided table identifier by taking the first segment, split by '.'
157
- if let Some ( ( catalog_name, table_name) ) = table_identifier. split_once ( '.' ) {
158
- if let Some ( data_catalog) = self . data_catalogs . get ( catalog_name) {
159
- searched_catalog_name = catalog_name;
160
- searched_table_name = table_name;
161
- if let Some ( tbl) = data_catalog. get_table ( table_name) ? {
162
- return tbl. as_ref ( ) . to_logical_plan_builder ( ) ;
163
- }
164
- }
165
- }
166
-
167
- // Return the error containing the last catalog/table pairing that we attempted to search on
168
- Err ( Error :: TableNotFound {
169
- catalog_name : searched_catalog_name. to_string ( ) ,
170
- table_id : searched_table_name. to_string ( ) ,
171
- } )
172
- }
173
-
174
- /// Copy from another catalog, using tables from other in case of conflict
175
- pub fn copy_from ( & mut self , other : & Self ) {
176
- for ( name, plan) in & other. named_tables {
177
- self . named_tables . insert ( name. clone ( ) , plan. clone ( ) ) ;
178
- }
179
- for ( name, catalog) in & other. data_catalogs {
180
- self . data_catalogs . insert ( name. clone ( ) , catalog. clone ( ) ) ;
181
- }
182
- }
183
-
184
- /// TODO remove py register and read methods are moved to session
185
- /// I cannot remove DaftMetaCatalog until I invert the dependency
186
- /// so that the current register_ methods use the session rather than the catalog.
187
- pub fn into_catalog_map ( self ) -> HashMap < String , Arc < dyn DataCatalog > > {
188
- self . data_catalogs
189
- }
190
- }
191
-
192
- #[ cfg( test) ]
193
- mod tests {
194
- use std:: sync:: Arc ;
195
-
196
- use daft_core:: prelude:: * ;
197
- use daft_logical_plan:: {
198
- ops:: Source , source_info:: PlaceHolderInfo , ClusteringSpec , LogicalPlan , LogicalPlanRef ,
199
- SourceInfo ,
200
- } ;
201
-
202
- use super :: * ;
203
-
204
- fn mock_plan ( ) -> LogicalPlanRef {
205
- let schema = Arc :: new (
206
- Schema :: new ( vec ! [
207
- Field :: new( "text" , DataType :: Utf8 ) ,
208
- Field :: new( "id" , DataType :: Int32 ) ,
209
- ] )
210
- . unwrap ( ) ,
211
- ) ;
212
- LogicalPlan :: Source ( Source :: new (
213
- schema. clone ( ) ,
214
- Arc :: new ( SourceInfo :: PlaceHolder ( PlaceHolderInfo {
215
- source_schema : schema,
216
- clustering_spec : Arc :: new ( ClusteringSpec :: unknown ( ) ) ,
217
- source_id : 0 ,
218
- } ) ) ,
219
- ) )
220
- . arced ( )
221
- }
222
-
223
- #[ test]
224
- fn test_register_and_unregister_named_table ( ) {
225
- let mut catalog = DaftCatalog :: new_from_env ( ) ;
226
- let plan = LogicalPlanBuilder :: from ( mock_plan ( ) ) ;
227
-
228
- // Register a table
229
- assert ! ( catalog. register_table( "test_table" , plan. clone( ) ) . is_ok( ) ) ;
230
- }
231
-
232
- #[ test]
233
- fn test_read_registered_table ( ) {
234
- let mut catalog = DaftCatalog :: new_from_env ( ) ;
235
- let plan = LogicalPlanBuilder :: from ( mock_plan ( ) ) ;
236
-
237
- catalog. register_table ( "test_table" , plan) . unwrap ( ) ;
238
-
239
- assert ! ( catalog. read_table( "test_table" ) . is_ok( ) ) ;
240
- assert ! ( catalog. read_table( "non_existent_table" ) . is_err( ) ) ;
241
- }
242
- }
0 commit comments