diff --git a/Cargo.lock b/Cargo.lock index c368d0712..1616d24bb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4824,6 +4824,7 @@ dependencies = [ "sparrow-runtime", "sparrow-syntax", "static_init", + "tempfile", "tokio", "tokio-stream", "uuid 1.4.1", diff --git a/crates/sparrow-runtime/src/prepare/preparer.rs b/crates/sparrow-runtime/src/prepare/preparer.rs index 41408a7dc..bbf1f3ea3 100644 --- a/crates/sparrow-runtime/src/prepare/preparer.rs +++ b/crates/sparrow-runtime/src/prepare/preparer.rs @@ -93,13 +93,16 @@ impl Preparer { /// - This adds or casts columns as needed. /// - This produces multiple parts if the input file is large. /// - This produces metadata files alongside data files. + /// Parameters: + /// - `to_prepare`: The path to the parquet file to prepare. + /// - `prepared_dir`: The directory to write the prepared files to. pub async fn prepare_parquet( &self, - path: &std::path::Path, + to_prepare: &std::path::Path, + prepared_dir: &std::path::Path, ) -> error_stack::Result, Error> { // TODO: Support Slicing - - let output_path_prefix = self.prepared_output_prefix(); + let output_path_prefix = self.prepared_output_prefix(prepared_dir); let output_url = ObjectStoreUrl::from_str(&output_path_prefix) .change_context_lazy(|| Error::InvalidUrl(output_path_prefix))?; @@ -110,7 +113,7 @@ impl Preparer { let source_data = SourceData { source: Some( - SourceData::try_from_local(path) + SourceData::try_from_local(to_prepare) .into_report() .change_context(Error::Internal)?, ), @@ -180,17 +183,21 @@ impl Preparer { self.time_multiplier.as_ref(), ) } - // Prepared files are stored in the following format: - // file://///tables//prepared//part-.parquet - pub fn prepared_output_prefix(&self) -> String { + + /// Creates the local output prefix to use for prepared files. + /// + /// e.g. for osx: file://///tables//prepared/ + pub fn prepared_output_prefix(&self, dir: &std::path::Path) -> String { let uuid = Uuid::new_v4(); - let temp_dir = tempfile::tempdir().expect("failed to create temp dir"); - format!( - "file:///{}/{}/tables/{}/prepare/{uuid}/", - temp_dir.path().display(), - KASKADA_PATH, - self.table_config.uuid - ) + + // Construct the path using PathBuf to handle platform-specific path separators. + let mut buf = std::path::PathBuf::new(); + buf.push(dir); + buf.push(KASKADA_PATH); + buf.push("tables"); + buf.push("prepare"); + buf.push(uuid.to_string()); + buf.to_string_lossy().to_string() } } diff --git a/crates/sparrow-session/Cargo.toml b/crates/sparrow-session/Cargo.toml index 117bcab16..67d125d4a 100644 --- a/crates/sparrow-session/Cargo.toml +++ b/crates/sparrow-session/Cargo.toml @@ -26,6 +26,7 @@ sparrow-runtime = { path = "../sparrow-runtime" } sparrow-syntax = { path = "../sparrow-syntax" } sparrow-instructions = { path = "../sparrow-instructions" } static_init.workspace = true +tempfile.workspace = true tokio.workspace = true tokio-stream.workspace = true uuid.workspace = true diff --git a/crates/sparrow-session/src/session.rs b/crates/sparrow-session/src/session.rs index 5e9d8cbdc..d592a402b 100644 --- a/crates/sparrow-session/src/session.rs +++ b/crates/sparrow-session/src/session.rs @@ -34,10 +34,16 @@ pub struct Session { udfs: HashMap>, object_store_registry: Arc, rt: tokio::runtime::Runtime, + /// Temporary directory to hold prepared files for this session. + /// + /// Creating in the session ensures it won't be dropped until the + /// session is closed. + prepared_dir: tempfile::TempDir, } impl Default for Session { fn default() -> Self { + let prepared_dir = tempfile::tempdir().expect("failed to create temp dir"); Self { data_context: Default::default(), dfg: Default::default(), @@ -45,6 +51,7 @@ impl Default for Session { udfs: Default::default(), object_store_registry: Default::default(), rt: tokio::runtime::Runtime::new().expect("tokio runtime"), + prepared_dir, } } } @@ -125,7 +132,7 @@ impl Session { grouping_name: Option<&str>, time_unit: Option<&str>, source: Option<&str>, - ) -> error_stack::Result { + ) -> error_stack::Result, Error> { let uuid = Uuid::new_v4(); let schema_proto = sparrow_api::kaskada::v1alpha::Schema::try_from(schema.as_ref()) .into_report() @@ -188,6 +195,7 @@ impl Session { time_unit, self.object_store_registry.clone(), source, + self.prepared_dir.path(), ) } diff --git a/crates/sparrow-session/src/table.rs b/crates/sparrow-session/src/table.rs index debbdc627..0dcc5674f 100644 --- a/crates/sparrow-session/src/table.rs +++ b/crates/sparrow-session/src/table.rs @@ -17,13 +17,14 @@ use sparrow_runtime::{key_hash_inverse::ThreadSafeKeyHashInverse, stores::Object use crate::{Error, Expr}; -pub struct Table { +pub struct Table<'a> { pub expr: Expr, preparer: Preparer, key_column: usize, key_hash_inverse: Arc, source: Source, registry: Arc, + prepared_dir: &'a std::path::Path, } #[derive(Debug)] @@ -32,7 +33,7 @@ enum Source { Parquet(Arc), } -impl Table { +impl<'a> Table<'a> { #[allow(clippy::too_many_arguments)] pub(crate) fn new( table_info: &mut TableInfo, @@ -43,6 +44,7 @@ impl Table { time_unit: Option<&str>, object_stores: Arc, source: Option<&str>, + prepared_dir: &'a std::path::Path, ) -> error_stack::Result { let prepared_fields: Fields = KEY_FIELDS .iter() @@ -99,6 +101,7 @@ impl Table { key_column: key_column + KEY_FIELDS.len(), source, registry: object_stores, + prepared_dir, }) } @@ -145,7 +148,7 @@ impl Table { let prepared = self .preparer - .prepare_parquet(path) + .prepare_parquet(path, self.prepared_dir) .await .change_context(Error::Prepare)?;