From 344d3dd510652c2a70621361f757c5517f4e6474 Mon Sep 17 00:00:00 2001 From: Jordan Frazier Date: Mon, 11 Sep 2023 08:38:19 -0700 Subject: [PATCH] Add concurrent file sets --- crates/sparrow-compiler/src/data_context.rs | 42 ++++++++++++--- .../sparrow-runtime/src/prepare/preparer.rs | 48 ++++++----------- crates/sparrow-session/src/error.rs | 14 ++++- crates/sparrow-session/src/session.rs | 6 +-- crates/sparrow-session/src/table.rs | 53 +++++++++---------- python/pysrc/kaskada/_ffi.pyi | 4 +- python/pysrc/kaskada/sources/arrow.py | 2 +- python/src/table.rs | 20 +++++-- 8 files changed, 111 insertions(+), 78 deletions(-) diff --git a/crates/sparrow-compiler/src/data_context.rs b/crates/sparrow-compiler/src/data_context.rs index 50557fb4e..4e73d9ef6 100644 --- a/crates/sparrow-compiler/src/data_context.rs +++ b/crates/sparrow-compiler/src/data_context.rs @@ -1,8 +1,10 @@ +use std::cell::RefCell; use std::collections::BTreeMap; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use anyhow::Context; use arrow::datatypes::{DataType, SchemaRef}; +use sparrow_api::kaskada::v1alpha::compute_table::FileSet; use sparrow_api::kaskada::v1alpha::slice_plan::Slice; use sparrow_api::kaskada::v1alpha::{compute_table, ComputeTable, PreparedFile, TableConfig}; use sparrow_core::context_code; @@ -303,6 +305,28 @@ pub struct GroupInfo { key_type: DataType, } +#[derive(Debug, Default, Clone)] +pub struct ConcurrentFileSets { + file_sets: Arc>>, +} + +impl ConcurrentFileSets { + pub fn new(file_sets: Vec) -> Self { + Self { + file_sets: Arc::new(Mutex::new(file_sets)), + } + } + pub fn push(&mut self, file_set: compute_table::FileSet) { + self.file_sets.lock().unwrap().push(file_set); + } + + // TODO: Cloning is bad, but since it's locked behind a mutex, it's necessary. + // Can we do better? + pub fn get(&self) -> Vec { + self.file_sets.lock().unwrap().clone() + } +} + /// Information about tables. #[derive(Debug, Clone)] pub struct TableInfo { @@ -316,7 +340,7 @@ pub struct TableInfo { /// slice configuration. /// TODO: Make optional? /// wrap both these in a Source class? - pub file_sets: Arc>, + pub file_sets: ConcurrentFileSets, /// An in-memory record batch for the contents of the table. pub in_memory: Option>, } @@ -339,7 +363,7 @@ impl TableInfo { group_id, schema, config, - file_sets: Arc::new(file_sets), + file_sets: ConcurrentFileSets::new(file_sets), in_memory: None, }) } @@ -368,17 +392,18 @@ impl TableInfo { &self.config } - pub fn file_sets(&self) -> &[compute_table::FileSet] { - &self.file_sets + pub fn file_sets(&self) -> Vec { + self.file_sets.get() } pub fn prepared_files_for_slice( &self, requested_slice: &Option, - ) -> anyhow::Result<&[PreparedFile]> { + ) -> anyhow::Result> { let file_set = self .file_sets - .iter() + .get() + .into_iter() .find(|set| { set.slice_plan .iter() @@ -393,11 +418,12 @@ impl TableInfo { ) })?; - Ok(&file_set.prepared_files) + Ok(file_set.prepared_files) } pub fn metadata_for_files(&self) -> Vec { self.file_sets + .get() .iter() .flat_map(|set| { set.prepared_files diff --git a/crates/sparrow-runtime/src/prepare/preparer.rs b/crates/sparrow-runtime/src/prepare/preparer.rs index 94fda4f8f..1da82b0cf 100644 --- a/crates/sparrow-runtime/src/prepare/preparer.rs +++ b/crates/sparrow-runtime/src/prepare/preparer.rs @@ -8,12 +8,14 @@ use arrow::record_batch::RecordBatch; use arrow_array::Array; use error_stack::{IntoReport, IntoReportCompat, ResultExt}; use futures::stream::FuturesUnordered; -use futures::StreamExt; +use futures::{StreamExt, TryStreamExt}; use sparrow_api::kaskada::v1alpha::{PreparedFile, Source, SourceData, TableConfig}; +use uuid::Uuid; use crate::stores::{ObjectStoreRegistry, ObjectStoreUrl}; +use crate::PreparedMetadata; -use super::prepared_batches; +use super::{prepared_batches, write_parquet}; #[derive(derive_more::Display, Debug)] pub enum Error { @@ -52,27 +54,6 @@ pub struct Preparer { impl Preparer { /// Create a new prepare produce data with the given schema. - // pub fn new( - // time_column_name: String, - // subsort_column_name: Option, - // key_column_name: String, - // prepared_schema: SchemaRef, - // prepare_hash: u64, - // time_unit: Option<&str>, - // object_stores: Arc, - // ) -> error_stack::Result { - // let time_multiplier = time_multiplier(time_unit)?; - // Ok(Self { - // prepared_schema, - // time_column_name, - // subsort_column_name, - // next_subsort: prepare_hash, - // key_column_name, - // time_multiplier, - // object_stores, - // }) - // } - pub fn new( table_config: Arc, prepared_schema: SchemaRef, @@ -100,19 +81,19 @@ impl Preparer { /// Prepare a parquet file. /// todo; docs pub async fn prepare_parquet( - &mut self, + &self, path: &str, ) -> error_stack::Result, Error> { - // TODO: - // * Support Slicing + // TODO: Support Slicing - // TODO: What is the output url? + // TODO: Output URL // in wren it is prepared/prep_//file_id (a uuid)/ // file_id is persisted in wren. Don't know if that matters. - let output_path_prefix = "file://prepared/"; + let uuid = Uuid::new_v4(); + let output_path_prefix = format!("file://prepared/{uuid}/"); let output_file_prefix = "part"; - let output_url = ObjectStoreUrl::from_str(output_path_prefix) + let output_url = ObjectStoreUrl::from_str(&output_path_prefix) .change_context_lazy(|| Error::InvalidUrl(path.to_owned()))?; let object_store = self @@ -129,6 +110,7 @@ impl Preparer { ), }; + // TODO: Slicing let mut prepare_stream = prepared_batches(&self.object_stores, &source_data, &self.table_config, &None) .await @@ -165,7 +147,7 @@ impl Preparer { } // Wait for the uploads. - while let Some(upload) = uploads.try_next().await? { + while let Some(upload) = uploads.try_next().await.change_context(Error::Internal)? { tracing::info!("Finished uploading {upload}"); } @@ -181,9 +163,9 @@ impl Preparer { /// Self is mutated as necessary to ensure the `subsort` column is increasing, if /// it is added. pub fn prepare_batch(&mut self, batch: RecordBatch) -> error_stack::Result { - let time_column_name = self.table_config.time_column_name; - let subsort_column_name = self.table_config.subsort_column_name; - let key_column_name = self.table_config.group_column_name; + let time_column_name = self.table_config.time_column_name.clone(); + let subsort_column_name = self.table_config.subsort_column_name.clone(); + let key_column_name = self.table_config.group_column_name.clone(); let time = get_required_column(&batch, &time_column_name)?; let time = cast_to_timestamp(time, self.time_multiplier)?; diff --git a/crates/sparrow-session/src/error.rs b/crates/sparrow-session/src/error.rs index 5589d2d8e..fedddcf12 100644 --- a/crates/sparrow-session/src/error.rs +++ b/crates/sparrow-session/src/error.rs @@ -18,8 +18,8 @@ pub enum Error { Errors(Vec), #[display(fmt = "failed to prepare batch")] Prepare, - #[display(fmt = "internal error")] - Internal, + #[display(fmt = "internal error: {_0}")] + Internal(String), #[display(fmt = "compile query")] Compile, #[display(fmt = "execute query")] @@ -29,3 +29,13 @@ pub enum Error { } impl error_stack::Context for Error {} + +impl Error { + pub fn internal() -> Self { + Error::Internal("no additional context".to_owned()) + } + + pub fn internal_msg(msg: String) -> Self { + Error::Internal(msg) + } +} diff --git a/crates/sparrow-session/src/session.rs b/crates/sparrow-session/src/session.rs index 11acd2115..89f6621c6 100644 --- a/crates/sparrow-session/src/session.rs +++ b/crates/sparrow-session/src/session.rs @@ -421,7 +421,7 @@ impl Session { ) .into_report() .change_context(Error::Compile)?; - error_stack::ensure!(diagnostics.num_errors() == 0, Error::Internal); + error_stack::ensure!(diagnostics.num_errors() == 0, Error::internal()); // Extract the necessary subset of the DFG as an expression. // This will allow us to operate without mutating things. @@ -556,8 +556,8 @@ impl ExecutionOptions { } fn result_schema(expr: &Expr, key_type: &DataType) -> error_stack::Result { - let DataType::Struct(fields) = expr.0.value_type().arrow_type().ok_or(Error::Internal)? else { - error_stack::bail!(Error::Internal) + let DataType::Struct(fields) = expr.0.value_type().arrow_type().ok_or(Error::internal())? else { + error_stack::bail!(Error::internal()) }; let fields: Fields = super::table::KEY_FIELDS diff --git a/crates/sparrow-session/src/table.rs b/crates/sparrow-session/src/table.rs index f7d608fd7..eb18f4011 100644 --- a/crates/sparrow-session/src/table.rs +++ b/crates/sparrow-session/src/table.rs @@ -7,7 +7,7 @@ use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef}; use error_stack::ResultExt; use sparrow_api::kaskada::v1alpha::compute_table::FileSet; use sparrow_api::kaskada::v1alpha::{compute_table, PreparedFile}; -use sparrow_compiler::TableInfo; +use sparrow_compiler::{ConcurrentFileSets, TableInfo}; use sparrow_merge::InMemoryBatches; use sparrow_runtime::preparer::Preparer; use sparrow_runtime::{key_hash_inverse::ThreadSafeKeyHashInverse, stores::ObjectStoreRegistry}; @@ -35,9 +35,10 @@ pub struct Table { // Batch, Paths, Streams, per type? } +#[derive(Debug)] enum Source { InMemoryBatches(Arc), - Parquet(Arc>), + Parquet(ConcurrentFileSets), } impl Table { @@ -62,22 +63,19 @@ impl Table { // Filesets and in_memory should initially be empty. // From python, we create the table with no inputs, then add data. // TODO: Make these a single "Source" type in TableInfo? - error_stack::ensure!(table_info.file_sets().is_empty()); - error_stack::ensure!(table_info.in_memory.is_none()); + error_stack::ensure!(table_info.file_sets().is_empty(), Error::internal()); + error_stack::ensure!(table_info.in_memory.is_none(), Error::internal()); // TODO: Slicing - let file_set = Arc::new(compute_table::FileSet { - slice_plan: None, - prepared_files: vec![], - }); + let concurrent_file_sets = ConcurrentFileSets::default(); // Clone into the table_info, so that any modifications to our // original reference are reflected within the table_info. - table_info.file_sets = Some(file_set.clone()); + table_info.file_sets = concurrent_file_sets.clone(); let preparer = Preparer::new( table_info.config().clone(), - prepared_schema, + prepared_schema.clone(), prepare_hash, time_unit, object_stores, @@ -91,16 +89,11 @@ impl Table { // in my table.rs add_parquet file // 2. See how Scan is reading in_memory and skipping normal execution, and emulate that // for parquet files - // 3. Differentiate sources to ensure we can't add_data for parquet tables - // or add_parquet for inmemorybatch tables? - // 4. Other shit? + // TODO: Support other sources let source = match source { - Some("parquet") => Source::Parquet(Arc::new(vec![])), - _ => Source::InMemoryBatches(Arc::new(InMemoryBatches::new( - querable, // TODO: fraz - prepared_schema.clone(), - ))), + Some("parquet") => Source::Parquet(concurrent_file_sets), + _ => Source::InMemoryBatches(Arc::new(InMemoryBatches::new(prepared_schema))), }; Ok(Self { @@ -116,10 +109,13 @@ impl Table { self.preparer.schema() } - pub async fn add_data(&self, batch: RecordBatch) -> error_stack::Result<(), Error> { - let source = match self.source { - Source::InMemoryBatches(in_memory) => in_memory, - other => error_stack::bail!("expected in memory data source, saw {}", other), + pub async fn add_data(&mut self, batch: RecordBatch) -> error_stack::Result<(), Error> { + let source = match &self.source { + Source::InMemoryBatches(in_memory) => in_memory.clone(), + other => error_stack::bail!(Error::internal_msg(format!( + "expected in-memory data source, saw {:?}", + other + ))), }; let prepared = self @@ -141,15 +137,18 @@ impl Table { Ok(()) } - pub async fn add_parquet(&mut self, path: &str) -> error_stack::Result<(), Error> { - let source = match self.source { - Source::Parquet(file_sets) => file_sets, - other => error_stack::bail!("expected parquet data source, saw {}", other), + pub async fn add_parquet(&self, path: String) -> error_stack::Result<(), Error> { + let mut source = match &self.source { + Source::Parquet(file_sets) => file_sets.clone(), + other => error_stack::bail!(Error::internal_msg(format!( + "expected parquet data source, saw {:?}", + other + ))), }; let prepared = self .preparer - .prepare_parquet(path) + .prepare_parquet(&path) .await .change_context(Error::Prepare)?; diff --git a/python/pysrc/kaskada/_ffi.pyi b/python/pysrc/kaskada/_ffi.pyi index 4a8a239f9..3055f12cd 100644 --- a/python/pysrc/kaskada/_ffi.pyi +++ b/python/pysrc/kaskada/_ffi.pyi @@ -57,11 +57,13 @@ class Table: subsort_column: Optional[str], grouping_name: Optional[str], time_unit: Optional[str], + source: Optional[str], ) -> None: ... @property def name(self) -> str: ... - async def add_pyarrow(self, data: pa.RecordBatch) -> None: ... def expr(self) -> Expr: ... + async def add_pyarrow(self, data: pa.RecordBatch) -> None: ... + async def add_parquet(self, path: str) -> None: ... class Udf(object): def __init__(self, result_ty: str, result_fn: Callable[..., pa.Array]) -> None: ... diff --git a/python/pysrc/kaskada/sources/arrow.py b/python/pysrc/kaskada/sources/arrow.py index f4587e107..99cb74c81 100644 --- a/python/pysrc/kaskada/sources/arrow.py +++ b/python/pysrc/kaskada/sources/arrow.py @@ -485,4 +485,4 @@ async def add_file(self, path: str) -> None: ) for batch in table.to_batches(): await self._ffi_table.add_pyarrow(batch) - # self._ffi_table.add_parquet(path) + # await self._ffi_table.add_parquet(path) diff --git a/python/src/table.rs b/python/src/table.rs index f47f68ad7..2bd3d4f10 100644 --- a/python/src/table.rs +++ b/python/src/table.rs @@ -9,6 +9,7 @@ use sparrow_session::Table as RustTable; use crate::error::Result; use crate::expr::Expr; use crate::session::Session; +use tokio::sync::{Mutex, MutexGuard, OwnedMutexGuard}; #[pyclass] pub(crate) struct Table { @@ -79,9 +80,22 @@ impl Table { })?) } - fn add_parquet(&mut self, path: &str) -> Result<()> { - // TODO: This is an async function. Is that possible? - self.rust_table.add_parquet(path)?; + // fn add_parquet<'py>(&mut self, py: Python<'py>, path: String) -> PyResult<&'py PyAny> { + // pyo3_asyncio::tokio::future_into_py(py, async move { + // self.rust_table.add_parquet(path).await.unwrap(); + // Python::with_gil(|py| { + // Ok(py.None()) + // }) + // }) + // } + + fn add_parquet<'py>(&mut self, py: Python<'py>, path: String) -> PyResult<()> { + let rust_table = self.rust_table.clone(); + pyo3_asyncio::tokio::future_into_py(py, async move { + rust_table.add_parquet(path).await.unwrap(); + Python::with_gil(|py| Ok(py.None())) + })?; + Ok(()) } }