diff --git a/crates/sparrow-compiler/src/data_context.rs b/crates/sparrow-compiler/src/data_context.rs index 8380ecc95..50557fb4e 100644 --- a/crates/sparrow-compiler/src/data_context.rs +++ b/crates/sparrow-compiler/src/data_context.rs @@ -314,10 +314,8 @@ pub struct TableInfo { /// /// Each file set corresponds to the files for the table with a specific /// slice configuration. - /// TODO: FRAZ I do need to update these -- if that's how scan is reading them. - /// Inmemorybatches is mutating them in the session/table and seeing those changes - /// reflected here (and taht's how scan reads them). - /// // TODO: Should be an option? + /// TODO: Make optional? + /// wrap both these in a Source class? pub file_sets: Arc>, /// An in-memory record batch for the contents of the table. pub in_memory: Option>, diff --git a/crates/sparrow-session/src/session.rs b/crates/sparrow-session/src/session.rs index 1f9aa8157..11acd2115 100644 --- a/crates/sparrow-session/src/session.rs +++ b/crates/sparrow-session/src/session.rs @@ -94,6 +94,7 @@ impl Session { key_column_name: &str, grouping_name: Option<&str>, time_unit: Option<&str>, + source: Option<&str>, ) -> error_stack::Result { let uuid = Uuid::new_v4(); let schema_proto = sparrow_api::kaskada::v1alpha::Schema::try_from(schema.as_ref()) @@ -162,6 +163,7 @@ impl Session { queryable, time_unit, self.object_store_registry.clone(), + source, ) } @@ -604,6 +606,7 @@ mod tests { "key", Some("user"), None, + None, ) .unwrap(); diff --git a/crates/sparrow-session/src/table.rs b/crates/sparrow-session/src/table.rs index 665531497..f7d608fd7 100644 --- a/crates/sparrow-session/src/table.rs +++ b/crates/sparrow-session/src/table.rs @@ -17,10 +17,11 @@ use crate::{Error, Expr}; pub struct Table { pub expr: Expr, preparer: Preparer, - in_memory_batches: Arc, + // in_memory_batches: Arc, key_column: usize, key_hash_inverse: Arc, - files: Arc>, + // files: Arc>, + source: Source, // TODO: FRAZ: How is tableinfo created? // Answer: DataContext.add_table (ComputeTable holds the filesets) // ComputeTable is created via session.add_table(). With no filesets nor source @@ -34,6 +35,11 @@ pub struct Table { // Batch, Paths, Streams, per type? } +enum Source { + InMemoryBatches(Arc), + Parquet(Arc>), +} + impl Table { pub(crate) fn new( table_info: &mut TableInfo, @@ -43,6 +49,7 @@ impl Table { queryable: bool, time_unit: Option<&str>, object_stores: Arc, + source: Option<&str>, ) -> error_stack::Result { let prepared_fields: Fields = KEY_FIELDS .iter() @@ -52,25 +59,23 @@ impl Table { let prepared_schema = Arc::new(Schema::new(prepared_fields)); let prepare_hash = 0; - assert!(table_info.in_memory.is_none()); - - let in_memory_batches = Arc::new(InMemoryBatches::new(queryable, prepared_schema.clone())); - table_info.in_memory = Some(in_memory_batches.clone()); + // Filesets and in_memory should initially be empty. + // From python, we create the table with no inputs, then add data. + // TODO: Make these a single "Source" type in TableInfo? + error_stack::ensure!(table_info.file_sets().is_empty()); + error_stack::ensure!(table_info.in_memory.is_none()); - let prepared_files: Vec = Vec::new(); - // The table info here should be empty. We create with empty source and empty filesets. // TODO: Slicing let file_set = Arc::new(compute_table::FileSet { slice_plan: None, - prepared_files: prepared_files.clone(), + prepared_files: vec![], }); + + // Clone into the table_info, so that any modifications to our + // original reference are reflected within the table_info. table_info.file_sets = Some(file_set.clone()); - // TODO: FRAZ - Preparer can hold the ObjectStRegistry. Needs it, to read parquet files. let preparer = Preparer::new( - // table_info.config().time_column_name.clone(), - // table_info.config().subsort_column_name.clone(), - // table_info.config().group_column_name.clone(), table_info.config().clone(), prepared_schema, prepare_hash, @@ -81,13 +86,29 @@ impl Table { name: table_info.name().to_owned(), })?; + // TODO: FRAZ NExt steps: + // 1. Async thing -- look in exeuction.rs and copy that pyarrow async method + // in my table.rs add_parquet file + // 2. See how Scan is reading in_memory and skipping normal execution, and emulate that + // for parquet files + // 3. Differentiate sources to ensure we can't add_data for parquet tables + // or add_parquet for inmemorybatch tables? + // 4. Other shit? + + let source = match source { + Some("parquet") => Source::Parquet(Arc::new(vec![])), + _ => Source::InMemoryBatches(Arc::new(InMemoryBatches::new( + querable, // TODO: fraz + prepared_schema.clone(), + ))), + }; + Ok(Self { expr, preparer, - in_memory_batches, key_hash_inverse, key_column: key_column + KEY_FIELDS.len(), - files: file_set, + source, }) } @@ -96,6 +117,11 @@ impl Table { } pub async fn add_data(&self, batch: RecordBatch) -> error_stack::Result<(), Error> { + let source = match self.source { + Source::InMemoryBatches(in_memory) => in_memory, + other => error_stack::bail!("expected in memory data source, saw {}", other), + }; + let prepared = self .preparer .prepare_batch(batch) @@ -108,7 +134,7 @@ impl Table { .await .change_context(Error::Prepare)?; - self.in_memory_batches + source .add_batch(prepared) .await .change_context(Error::Prepare)?; @@ -116,15 +142,21 @@ impl Table { } pub async fn add_parquet(&mut self, path: &str) -> error_stack::Result<(), Error> { + let source = match self.source { + Source::Parquet(file_sets) => file_sets, + other => error_stack::bail!("expected parquet data source, saw {}", other), + }; + let prepared = self .preparer .prepare_parquet(path) .await .change_context(Error::Prepare)?; - self.files.push(prepared); - // TODO: Also add files to the session's table info? - // could pass in session to add_parquet method, then mutate datacontext.table info from there? + source.push(FileSet { + slice_plan: None, + prepared_files: prepared, + }); Ok(()) } diff --git a/python/pysrc/kaskada/sources/arrow.py b/python/pysrc/kaskada/sources/arrow.py index 484cfd3cd..f4587e107 100644 --- a/python/pysrc/kaskada/sources/arrow.py +++ b/python/pysrc/kaskada/sources/arrow.py @@ -430,6 +430,7 @@ def __init__( subsort_column=subsort_column, grouping_name=grouping_name, time_unit=time_unit, + source="parquet" ) @staticmethod diff --git a/python/pysrc/kaskada/sources/source.py b/python/pysrc/kaskada/sources/source.py index 2c3a7344d..78cac5dba 100644 --- a/python/pysrc/kaskada/sources/source.py +++ b/python/pysrc/kaskada/sources/source.py @@ -29,6 +29,7 @@ def __init__( subsort_column: Optional[str] = None, grouping_name: Optional[str] = None, time_unit: Optional[TimeUnit] = None, + source: Optional[str] = None, ): """Create a new source.""" assert isinstance(schema, pa.Schema) @@ -75,6 +76,7 @@ def fix_field(field: pa.Field) -> pa.Field: subsort_column, grouping_name, time_unit, + source, ) super().__init__(ffi_table.expr()) self._schema = schema diff --git a/python/src/table.rs b/python/src/table.rs index d5d8f9166..f47f68ad7 100644 --- a/python/src/table.rs +++ b/python/src/table.rs @@ -22,7 +22,7 @@ pub(crate) struct Table { impl Table { /// Create a new table. #[new] - #[pyo3(signature = (session, name, time_column, key_column, schema, queryable, subsort_column, grouping_name, time_unit))] + #[pyo3(signature = (session, name, time_column, key_column, schema, queryable, subsort_column, grouping_name, time_unit, source))] #[allow(clippy::too_many_arguments)] fn new( session: Session, @@ -34,6 +34,7 @@ impl Table { subsort_column: Option<&str>, grouping_name: Option<&str>, time_unit: Option<&str>, + source: Option<&str>, ) -> Result { let raw_schema = Arc::new(schema.0); @@ -46,6 +47,7 @@ impl Table { key_column, grouping_name, time_unit, + source, )?; let table = Table { name, rust_table: Arc::new(rust_table), session };