Skip to content

Commit

Permalink
pass source through table initialization
Browse files Browse the repository at this point in the history
  • Loading branch information
jordanrfrazier committed Sep 12, 2023
1 parent 2b4e7e5 commit 4f5a75d
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 24 deletions.
6 changes: 2 additions & 4 deletions crates/sparrow-compiler/src/data_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,10 +314,8 @@ pub struct TableInfo {
///
/// Each file set corresponds to the files for the table with a specific
/// slice configuration.
/// TODO: FRAZ I do need to update these -- if that's how scan is reading them.
/// Inmemorybatches is mutating them in the session/table and seeing those changes
/// reflected here (and taht's how scan reads them).
/// // TODO: Should be an option?
/// TODO: Make optional?
/// wrap both these in a Source class?
pub file_sets: Arc<Vec<compute_table::FileSet>>,
/// An in-memory record batch for the contents of the table.
pub in_memory: Option<Arc<InMemoryBatches>>,
Expand Down
3 changes: 3 additions & 0 deletions crates/sparrow-session/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ impl Session {
key_column_name: &str,
grouping_name: Option<&str>,
time_unit: Option<&str>,
source: Option<&str>,
) -> error_stack::Result<Table, Error> {
let uuid = Uuid::new_v4();
let schema_proto = sparrow_api::kaskada::v1alpha::Schema::try_from(schema.as_ref())
Expand Down Expand Up @@ -162,6 +163,7 @@ impl Session {
queryable,
time_unit,
self.object_store_registry.clone(),
source,
)
}

Expand Down Expand Up @@ -604,6 +606,7 @@ mod tests {
"key",
Some("user"),
None,
None,
)
.unwrap();

Expand Down
70 changes: 51 additions & 19 deletions crates/sparrow-session/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ use crate::{Error, Expr};
pub struct Table {
pub expr: Expr,
preparer: Preparer,
in_memory_batches: Arc<InMemoryBatches>,
// in_memory_batches: Arc<InMemoryBatches>,
key_column: usize,
key_hash_inverse: Arc<ThreadSafeKeyHashInverse>,
files: Arc<Vec<compute_table::FileSet>>,
// files: Arc<Vec<compute_table::FileSet>>,
source: Source,
// TODO: FRAZ: How is tableinfo created?
// Answer: DataContext.add_table (ComputeTable holds the filesets)
// ComputeTable is created via session.add_table(). With no filesets nor source
Expand All @@ -34,6 +35,11 @@ pub struct Table {
// Batch, Paths, Streams, per type?
}

enum Source {
InMemoryBatches(Arc<InMemoryBatches>),
Parquet(Arc<Vec<compute_table::FileSet>>),
}

impl Table {
pub(crate) fn new(
table_info: &mut TableInfo,
Expand All @@ -43,6 +49,7 @@ impl Table {
queryable: bool,
time_unit: Option<&str>,
object_stores: Arc<ObjectStoreRegistry>,
source: Option<&str>,
) -> error_stack::Result<Self, Error> {
let prepared_fields: Fields = KEY_FIELDS
.iter()
Expand All @@ -52,25 +59,23 @@ impl Table {
let prepared_schema = Arc::new(Schema::new(prepared_fields));
let prepare_hash = 0;

assert!(table_info.in_memory.is_none());

let in_memory_batches = Arc::new(InMemoryBatches::new(queryable, prepared_schema.clone()));
table_info.in_memory = Some(in_memory_batches.clone());
// Filesets and in_memory should initially be empty.
// From python, we create the table with no inputs, then add data.
// TODO: Make these a single "Source" type in TableInfo?
error_stack::ensure!(table_info.file_sets().is_empty());
error_stack::ensure!(table_info.in_memory.is_none());

let prepared_files: Vec<PreparedFile> = Vec::new();
// The table info here should be empty. We create with empty source and empty filesets.
// TODO: Slicing
let file_set = Arc::new(compute_table::FileSet {
slice_plan: None,
prepared_files: prepared_files.clone(),
prepared_files: vec![],
});

// Clone into the table_info, so that any modifications to our
// original reference are reflected within the table_info.
table_info.file_sets = Some(file_set.clone());

// TODO: FRAZ - Preparer can hold the ObjectStRegistry. Needs it, to read parquet files.
let preparer = Preparer::new(
// table_info.config().time_column_name.clone(),
// table_info.config().subsort_column_name.clone(),
// table_info.config().group_column_name.clone(),
table_info.config().clone(),
prepared_schema,
prepare_hash,
Expand All @@ -81,13 +86,29 @@ impl Table {
name: table_info.name().to_owned(),
})?;

// TODO: FRAZ NExt steps:
// 1. Async thing -- look in exeuction.rs and copy that pyarrow async method
// in my table.rs add_parquet file
// 2. See how Scan is reading in_memory and skipping normal execution, and emulate that
// for parquet files
// 3. Differentiate sources to ensure we can't add_data for parquet tables
// or add_parquet for inmemorybatch tables?
// 4. Other shit?

let source = match source {
Some("parquet") => Source::Parquet(Arc::new(vec![])),
_ => Source::InMemoryBatches(Arc::new(InMemoryBatches::new(
querable, // TODO: fraz
prepared_schema.clone(),
))),
};

Ok(Self {
expr,
preparer,
in_memory_batches,
key_hash_inverse,
key_column: key_column + KEY_FIELDS.len(),
files: file_set,
source,
})
}

Expand All @@ -96,6 +117,11 @@ impl Table {
}

pub async fn add_data(&self, batch: RecordBatch) -> error_stack::Result<(), Error> {
let source = match self.source {
Source::InMemoryBatches(in_memory) => in_memory,
other => error_stack::bail!("expected in memory data source, saw {}", other),
};

let prepared = self
.preparer
.prepare_batch(batch)
Expand All @@ -108,23 +134,29 @@ impl Table {
.await
.change_context(Error::Prepare)?;

self.in_memory_batches
source
.add_batch(prepared)
.await
.change_context(Error::Prepare)?;
Ok(())
}

pub async fn add_parquet(&mut self, path: &str) -> error_stack::Result<(), Error> {
let source = match self.source {
Source::Parquet(file_sets) => file_sets,
other => error_stack::bail!("expected parquet data source, saw {}", other),
};

let prepared = self
.preparer
.prepare_parquet(path)
.await
.change_context(Error::Prepare)?;
self.files.push(prepared);

// TODO: Also add files to the session's table info?
// could pass in session to add_parquet method, then mutate datacontext.table info from there?
source.push(FileSet {
slice_plan: None,
prepared_files: prepared,
});

Ok(())
}
Expand Down
1 change: 1 addition & 0 deletions python/pysrc/kaskada/sources/arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,7 @@ def __init__(
subsort_column=subsort_column,
grouping_name=grouping_name,
time_unit=time_unit,
source="parquet"
)

@staticmethod
Expand Down
2 changes: 2 additions & 0 deletions python/pysrc/kaskada/sources/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def __init__(
subsort_column: Optional[str] = None,
grouping_name: Optional[str] = None,
time_unit: Optional[TimeUnit] = None,
source: Optional[str] = None,
):
"""Create a new source."""
assert isinstance(schema, pa.Schema)
Expand Down Expand Up @@ -75,6 +76,7 @@ def fix_field(field: pa.Field) -> pa.Field:
subsort_column,
grouping_name,
time_unit,
source,
)
super().__init__(ffi_table.expr())
self._schema = schema
Expand Down
4 changes: 3 additions & 1 deletion python/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub(crate) struct Table {
impl Table {
/// Create a new table.
#[new]
#[pyo3(signature = (session, name, time_column, key_column, schema, queryable, subsort_column, grouping_name, time_unit))]
#[pyo3(signature = (session, name, time_column, key_column, schema, queryable, subsort_column, grouping_name, time_unit, source))]
#[allow(clippy::too_many_arguments)]
fn new(
session: Session,
Expand All @@ -34,6 +34,7 @@ impl Table {
subsort_column: Option<&str>,
grouping_name: Option<&str>,
time_unit: Option<&str>,
source: Option<&str>,
) -> Result<Self> {
let raw_schema = Arc::new(schema.0);

Expand All @@ -46,6 +47,7 @@ impl Table {
key_column,
grouping_name,
time_unit,
source,
)?;

let table = Table { name, rust_table: Arc::new(rust_table), session };
Expand Down

0 comments on commit 4f5a75d

Please sign in to comment.