From ff8699e042981adf41a83894df3823bc44daeb97 Mon Sep 17 00:00:00 2001 From: Jordan Frazier Date: Thu, 28 Sep 2023 09:49:55 -0700 Subject: [PATCH 1/9] move prepared files to ~/.cache dir --- Cargo.lock | 28 ++++++++++++++ Cargo.toml | 1 + crates/sparrow-runtime/Cargo.toml | 1 + .../sparrow-runtime/src/prepare/preparer.rs | 37 +++++++++++-------- python/Cargo.lock | 28 ++++++++++++++ 5 files changed, 80 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 98b1b48c2..2025bb210 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1432,6 +1432,15 @@ dependencies = [ "crypto-common", ] +[[package]] +name = "dirs" +version = "5.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44c45a9d03d6676652bcb5e724c7e988de1acad23a711b5217ab9cbecbec2225" +dependencies = [ + "dirs-sys", +] + [[package]] name = "dirs-next" version = "2.0.0" @@ -1442,6 +1451,18 @@ dependencies = [ "dirs-sys-next", ] +[[package]] +name = "dirs-sys" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "520f05a5cbd335fae5a99ff7a6ab8627577660ee5cfd6a94a6a929b52ff0321c" +dependencies = [ + "libc", + "option-ext", + "redox_users", + "windows-sys 0.48.0", +] + [[package]] name = "dirs-sys-next" version = "0.1.2" @@ -2999,6 +3020,12 @@ dependencies = [ "tokio-stream", ] +[[package]] +name = "option-ext" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" + [[package]] name = "ordered-float" version = "2.10.0" @@ -4739,6 +4766,7 @@ dependencies = [ "dashmap", "data-encoding", "derive_more", + "dirs", "enum-map", "erased-serde", "error-stack", diff --git a/Cargo.toml b/Cargo.toml index baed629a3..d88f0b7ef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,6 +48,7 @@ dashmap = "5.4.0" data-encoding = "2.3.3" decorum = "0.3.1" derive_more = "0.99.17" +dirs = "5.0.1" edit-distance = "2.1.0" egg = "0.9.3" enum-as-inner = "0.6.0" diff --git a/crates/sparrow-runtime/Cargo.toml b/crates/sparrow-runtime/Cargo.toml index fee2c4a66..ac76d1dbf 100644 --- a/crates/sparrow-runtime/Cargo.toml +++ b/crates/sparrow-runtime/Cargo.toml @@ -34,6 +34,7 @@ clap.workspace = true dashmap.workspace = true data-encoding.workspace = true derive_more.workspace = true +dirs.workspace = true enum-map.workspace = true erased-serde.workspace = true error-stack.workspace = true diff --git a/crates/sparrow-runtime/src/prepare/preparer.rs b/crates/sparrow-runtime/src/prepare/preparer.rs index c9421783a..da58c0d96 100644 --- a/crates/sparrow-runtime/src/prepare/preparer.rs +++ b/crates/sparrow-runtime/src/prepare/preparer.rs @@ -18,6 +18,9 @@ use crate::PreparedMetadata; use super::{prepared_batches, write_parquet}; +const KASKADA_PATH: &str = ".cache/kaskada"; +const PREPARED_FILE_PREFIX: &str = "part"; + #[derive(derive_more::Display, Debug)] pub enum Error { #[display(fmt = "batch missing required column '{_0}'")] @@ -93,20 +96,9 @@ impl Preparer { ) -> error_stack::Result, Error> { // TODO: Support Slicing - // Prepared files are stored in the following format: - // file:////tables//prepared//part-.parquet - let cur_dir = std::env::current_dir().expect("current dir"); - let cur_dir = cur_dir.to_string_lossy(); - - let uuid = Uuid::new_v4(); - let output_path_prefix = format!( - "file:///{}/tables/{}/prepare/{uuid}/", - cur_dir, self.table_config.uuid - ); - let output_file_prefix = "part"; - + let output_path_prefix = self.prepared_output_prefix()?; let output_url = ObjectStoreUrl::from_str(&output_path_prefix) - .change_context_lazy(|| Error::InvalidUrl(path.to_string_lossy().to_string()))?; + .change_context_lazy(|| Error::InvalidUrl(output_path_prefix))?; let object_store = self .object_stores @@ -140,10 +132,10 @@ impl Preparer { let (data, metadata) = next.change_context(Error::Internal)?; let data_url = output_url - .join(&format!("{output_file_prefix}-{n}.parquet")) + .join(&format!("{PREPARED_FILE_PREFIX}-{n}.parquet")) .change_context(Error::Internal)?; let metadata_url = output_url - .join(&format!("{output_file_prefix}-{n}-metadata.parquet")) + .join(&format!("{PREPARED_FILE_PREFIX}-{n}-metadata.parquet")) .change_context(Error::Internal)?; // Create the prepared file via PreparedMetadata. @@ -185,6 +177,21 @@ impl Preparer { self.time_multiplier.as_ref(), ) } + // Prepared files are stored in the following format: + // file://///tables//prepared//part-.parquet + pub fn prepared_output_prefix(&self) -> error_stack::Result { + let uuid = Uuid::new_v4(); + let home_dir = dirs::home_dir(); + if let Some(home_dir) = home_dir.map(|p| p.display().to_string()) { + Ok(format!( + "file:///{}/{}/tables/{}/prepare/{uuid}/", + home_dir, KASKADA_PATH, self.table_config.uuid + )) + } else { + tracing::error!("Failed to get home directory"); + error_stack::bail!(Error::Internal) + } + } } pub fn prepare_batch( diff --git a/python/Cargo.lock b/python/Cargo.lock index 1a17ce41a..1ae66e179 100644 --- a/python/Cargo.lock +++ b/python/Cargo.lock @@ -1224,6 +1224,15 @@ dependencies = [ "crypto-common", ] +[[package]] +name = "dirs" +version = "5.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44c45a9d03d6676652bcb5e724c7e988de1acad23a711b5217ab9cbecbec2225" +dependencies = [ + "dirs-sys", +] + [[package]] name = "dirs-next" version = "2.0.0" @@ -1234,6 +1243,18 @@ dependencies = [ "dirs-sys-next", ] +[[package]] +name = "dirs-sys" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "520f05a5cbd335fae5a99ff7a6ab8627577660ee5cfd6a94a6a929b52ff0321c" +dependencies = [ + "libc", + "option-ext", + "redox_users", + "windows-sys", +] + [[package]] name = "dirs-sys-next" version = "0.1.2" @@ -2560,6 +2581,12 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "option-ext" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" + [[package]] name = "ordered-float" version = "2.10.0" @@ -3924,6 +3951,7 @@ dependencies = [ "dashmap", "data-encoding", "derive_more", + "dirs", "enum-map", "erased-serde", "error-stack", From 584d31e70923a84d3b1b5e3a2e71bce9f2ed539d Mon Sep 17 00:00:00 2001 From: Jordan Frazier Date: Thu, 28 Sep 2023 12:36:55 -0700 Subject: [PATCH 2/9] use temp dir instead of cache --- Cargo.lock | 28 ------------------- Cargo.toml | 1 - crates/sparrow-runtime/Cargo.toml | 1 - .../sparrow-runtime/src/prepare/preparer.rs | 28 +++++++++---------- python/Cargo.lock | 28 ------------------- 5 files changed, 14 insertions(+), 72 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2025bb210..98b1b48c2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1432,15 +1432,6 @@ dependencies = [ "crypto-common", ] -[[package]] -name = "dirs" -version = "5.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44c45a9d03d6676652bcb5e724c7e988de1acad23a711b5217ab9cbecbec2225" -dependencies = [ - "dirs-sys", -] - [[package]] name = "dirs-next" version = "2.0.0" @@ -1451,18 +1442,6 @@ dependencies = [ "dirs-sys-next", ] -[[package]] -name = "dirs-sys" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "520f05a5cbd335fae5a99ff7a6ab8627577660ee5cfd6a94a6a929b52ff0321c" -dependencies = [ - "libc", - "option-ext", - "redox_users", - "windows-sys 0.48.0", -] - [[package]] name = "dirs-sys-next" version = "0.1.2" @@ -3020,12 +2999,6 @@ dependencies = [ "tokio-stream", ] -[[package]] -name = "option-ext" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" - [[package]] name = "ordered-float" version = "2.10.0" @@ -4766,7 +4739,6 @@ dependencies = [ "dashmap", "data-encoding", "derive_more", - "dirs", "enum-map", "erased-serde", "error-stack", diff --git a/Cargo.toml b/Cargo.toml index d88f0b7ef..baed629a3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,7 +48,6 @@ dashmap = "5.4.0" data-encoding = "2.3.3" decorum = "0.3.1" derive_more = "0.99.17" -dirs = "5.0.1" edit-distance = "2.1.0" egg = "0.9.3" enum-as-inner = "0.6.0" diff --git a/crates/sparrow-runtime/Cargo.toml b/crates/sparrow-runtime/Cargo.toml index ac76d1dbf..fee2c4a66 100644 --- a/crates/sparrow-runtime/Cargo.toml +++ b/crates/sparrow-runtime/Cargo.toml @@ -34,7 +34,6 @@ clap.workspace = true dashmap.workspace = true data-encoding.workspace = true derive_more.workspace = true -dirs.workspace = true enum-map.workspace = true erased-serde.workspace = true error-stack.workspace = true diff --git a/crates/sparrow-runtime/src/prepare/preparer.rs b/crates/sparrow-runtime/src/prepare/preparer.rs index da58c0d96..41408a7dc 100644 --- a/crates/sparrow-runtime/src/prepare/preparer.rs +++ b/crates/sparrow-runtime/src/prepare/preparer.rs @@ -18,7 +18,10 @@ use crate::PreparedMetadata; use super::{prepared_batches, write_parquet}; -const KASKADA_PATH: &str = ".cache/kaskada"; +/// For now, this is a temporary location for the prepared files. +/// In the future, we'll want to move this path to a managed cache +/// so we can reuse state. +const KASKADA_PATH: &str = "kaskada"; const PREPARED_FILE_PREFIX: &str = "part"; #[derive(derive_more::Display, Debug)] @@ -96,7 +99,7 @@ impl Preparer { ) -> error_stack::Result, Error> { // TODO: Support Slicing - let output_path_prefix = self.prepared_output_prefix()?; + let output_path_prefix = self.prepared_output_prefix(); let output_url = ObjectStoreUrl::from_str(&output_path_prefix) .change_context_lazy(|| Error::InvalidUrl(output_path_prefix))?; @@ -178,19 +181,16 @@ impl Preparer { ) } // Prepared files are stored in the following format: - // file://///tables//prepared//part-.parquet - pub fn prepared_output_prefix(&self) -> error_stack::Result { + // file://///tables//prepared//part-.parquet + pub fn prepared_output_prefix(&self) -> String { let uuid = Uuid::new_v4(); - let home_dir = dirs::home_dir(); - if let Some(home_dir) = home_dir.map(|p| p.display().to_string()) { - Ok(format!( - "file:///{}/{}/tables/{}/prepare/{uuid}/", - home_dir, KASKADA_PATH, self.table_config.uuid - )) - } else { - tracing::error!("Failed to get home directory"); - error_stack::bail!(Error::Internal) - } + let temp_dir = tempfile::tempdir().expect("failed to create temp dir"); + format!( + "file:///{}/{}/tables/{}/prepare/{uuid}/", + temp_dir.path().display(), + KASKADA_PATH, + self.table_config.uuid + ) } } diff --git a/python/Cargo.lock b/python/Cargo.lock index 1ae66e179..1a17ce41a 100644 --- a/python/Cargo.lock +++ b/python/Cargo.lock @@ -1224,15 +1224,6 @@ dependencies = [ "crypto-common", ] -[[package]] -name = "dirs" -version = "5.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44c45a9d03d6676652bcb5e724c7e988de1acad23a711b5217ab9cbecbec2225" -dependencies = [ - "dirs-sys", -] - [[package]] name = "dirs-next" version = "2.0.0" @@ -1243,18 +1234,6 @@ dependencies = [ "dirs-sys-next", ] -[[package]] -name = "dirs-sys" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "520f05a5cbd335fae5a99ff7a6ab8627577660ee5cfd6a94a6a929b52ff0321c" -dependencies = [ - "libc", - "option-ext", - "redox_users", - "windows-sys", -] - [[package]] name = "dirs-sys-next" version = "0.1.2" @@ -2581,12 +2560,6 @@ dependencies = [ "vcpkg", ] -[[package]] -name = "option-ext" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" - [[package]] name = "ordered-float" version = "2.10.0" @@ -3951,7 +3924,6 @@ dependencies = [ "dashmap", "data-encoding", "derive_more", - "dirs", "enum-map", "erased-serde", "error-stack", From c3cb4d76191410f54192ad5be6d6e3ff50dea205 Mon Sep 17 00:00:00 2001 From: Jordan Frazier Date: Tue, 3 Oct 2023 09:19:09 -0700 Subject: [PATCH 3/9] Create prepared dir in session --- Cargo.lock | 1 + .../sparrow-runtime/src/prepare/preparer.rs | 35 +++++++++++-------- crates/sparrow-session/Cargo.toml | 1 + crates/sparrow-session/src/session.rs | 10 +++++- crates/sparrow-session/src/table.rs | 9 +++-- 5 files changed, 38 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 98b1b48c2..9563d0e1e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4824,6 +4824,7 @@ dependencies = [ "sparrow-runtime", "sparrow-syntax", "static_init", + "tempfile", "tokio", "tokio-stream", "uuid 1.4.1", diff --git a/crates/sparrow-runtime/src/prepare/preparer.rs b/crates/sparrow-runtime/src/prepare/preparer.rs index 41408a7dc..bbf1f3ea3 100644 --- a/crates/sparrow-runtime/src/prepare/preparer.rs +++ b/crates/sparrow-runtime/src/prepare/preparer.rs @@ -93,13 +93,16 @@ impl Preparer { /// - This adds or casts columns as needed. /// - This produces multiple parts if the input file is large. /// - This produces metadata files alongside data files. + /// Parameters: + /// - `to_prepare`: The path to the parquet file to prepare. + /// - `prepared_dir`: The directory to write the prepared files to. pub async fn prepare_parquet( &self, - path: &std::path::Path, + to_prepare: &std::path::Path, + prepared_dir: &std::path::Path, ) -> error_stack::Result, Error> { // TODO: Support Slicing - - let output_path_prefix = self.prepared_output_prefix(); + let output_path_prefix = self.prepared_output_prefix(prepared_dir); let output_url = ObjectStoreUrl::from_str(&output_path_prefix) .change_context_lazy(|| Error::InvalidUrl(output_path_prefix))?; @@ -110,7 +113,7 @@ impl Preparer { let source_data = SourceData { source: Some( - SourceData::try_from_local(path) + SourceData::try_from_local(to_prepare) .into_report() .change_context(Error::Internal)?, ), @@ -180,17 +183,21 @@ impl Preparer { self.time_multiplier.as_ref(), ) } - // Prepared files are stored in the following format: - // file://///tables//prepared//part-.parquet - pub fn prepared_output_prefix(&self) -> String { + + /// Creates the local output prefix to use for prepared files. + /// + /// e.g. for osx: file://///tables//prepared/ + pub fn prepared_output_prefix(&self, dir: &std::path::Path) -> String { let uuid = Uuid::new_v4(); - let temp_dir = tempfile::tempdir().expect("failed to create temp dir"); - format!( - "file:///{}/{}/tables/{}/prepare/{uuid}/", - temp_dir.path().display(), - KASKADA_PATH, - self.table_config.uuid - ) + + // Construct the path using PathBuf to handle platform-specific path separators. + let mut buf = std::path::PathBuf::new(); + buf.push(dir); + buf.push(KASKADA_PATH); + buf.push("tables"); + buf.push("prepare"); + buf.push(uuid.to_string()); + buf.to_string_lossy().to_string() } } diff --git a/crates/sparrow-session/Cargo.toml b/crates/sparrow-session/Cargo.toml index 117bcab16..67d125d4a 100644 --- a/crates/sparrow-session/Cargo.toml +++ b/crates/sparrow-session/Cargo.toml @@ -26,6 +26,7 @@ sparrow-runtime = { path = "../sparrow-runtime" } sparrow-syntax = { path = "../sparrow-syntax" } sparrow-instructions = { path = "../sparrow-instructions" } static_init.workspace = true +tempfile.workspace = true tokio.workspace = true tokio-stream.workspace = true uuid.workspace = true diff --git a/crates/sparrow-session/src/session.rs b/crates/sparrow-session/src/session.rs index 5e9d8cbdc..d592a402b 100644 --- a/crates/sparrow-session/src/session.rs +++ b/crates/sparrow-session/src/session.rs @@ -34,10 +34,16 @@ pub struct Session { udfs: HashMap>, object_store_registry: Arc, rt: tokio::runtime::Runtime, + /// Temporary directory to hold prepared files for this session. + /// + /// Creating in the session ensures it won't be dropped until the + /// session is closed. + prepared_dir: tempfile::TempDir, } impl Default for Session { fn default() -> Self { + let prepared_dir = tempfile::tempdir().expect("failed to create temp dir"); Self { data_context: Default::default(), dfg: Default::default(), @@ -45,6 +51,7 @@ impl Default for Session { udfs: Default::default(), object_store_registry: Default::default(), rt: tokio::runtime::Runtime::new().expect("tokio runtime"), + prepared_dir, } } } @@ -125,7 +132,7 @@ impl Session { grouping_name: Option<&str>, time_unit: Option<&str>, source: Option<&str>, - ) -> error_stack::Result { + ) -> error_stack::Result, Error> { let uuid = Uuid::new_v4(); let schema_proto = sparrow_api::kaskada::v1alpha::Schema::try_from(schema.as_ref()) .into_report() @@ -188,6 +195,7 @@ impl Session { time_unit, self.object_store_registry.clone(), source, + self.prepared_dir.path(), ) } diff --git a/crates/sparrow-session/src/table.rs b/crates/sparrow-session/src/table.rs index debbdc627..0dcc5674f 100644 --- a/crates/sparrow-session/src/table.rs +++ b/crates/sparrow-session/src/table.rs @@ -17,13 +17,14 @@ use sparrow_runtime::{key_hash_inverse::ThreadSafeKeyHashInverse, stores::Object use crate::{Error, Expr}; -pub struct Table { +pub struct Table<'a> { pub expr: Expr, preparer: Preparer, key_column: usize, key_hash_inverse: Arc, source: Source, registry: Arc, + prepared_dir: &'a std::path::Path, } #[derive(Debug)] @@ -32,7 +33,7 @@ enum Source { Parquet(Arc), } -impl Table { +impl<'a> Table<'a> { #[allow(clippy::too_many_arguments)] pub(crate) fn new( table_info: &mut TableInfo, @@ -43,6 +44,7 @@ impl Table { time_unit: Option<&str>, object_stores: Arc, source: Option<&str>, + prepared_dir: &'a std::path::Path, ) -> error_stack::Result { let prepared_fields: Fields = KEY_FIELDS .iter() @@ -99,6 +101,7 @@ impl Table { key_column: key_column + KEY_FIELDS.len(), source, registry: object_stores, + prepared_dir, }) } @@ -145,7 +148,7 @@ impl Table { let prepared = self .preparer - .prepare_parquet(path) + .prepare_parquet(path, self.prepared_dir) .await .change_context(Error::Prepare)?; From 0d2c6678c0b225a2683a790c3065414dd7e6e155 Mon Sep 17 00:00:00 2001 From: Jordan Frazier Date: Tue, 3 Oct 2023 15:21:47 -0700 Subject: [PATCH 4/9] use urls --- .../sparrow-runtime/src/prepare/preparer.rs | 41 +++++++++++-------- crates/sparrow-session/src/session.rs | 32 +++++++++++---- crates/sparrow-session/src/table.rs | 14 +++---- 3 files changed, 54 insertions(+), 33 deletions(-) diff --git a/crates/sparrow-runtime/src/prepare/preparer.rs b/crates/sparrow-runtime/src/prepare/preparer.rs index bbf1f3ea3..da247a9bb 100644 --- a/crates/sparrow-runtime/src/prepare/preparer.rs +++ b/crates/sparrow-runtime/src/prepare/preparer.rs @@ -1,4 +1,3 @@ -use std::str::FromStr; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; @@ -95,22 +94,22 @@ impl Preparer { /// - This produces metadata files alongside data files. /// Parameters: /// - `to_prepare`: The path to the parquet file to prepare. - /// - `prepared_dir`: The directory to write the prepared files to. + /// - `prepare_prefix`: The prefix to write prepared files to. pub async fn prepare_parquet( &self, - to_prepare: &std::path::Path, - prepared_dir: &std::path::Path, + to_prepare: &str, + prepare_prefix: &ObjectStoreUrl, ) -> error_stack::Result, Error> { // TODO: Support Slicing - let output_path_prefix = self.prepared_output_prefix(prepared_dir); - let output_url = ObjectStoreUrl::from_str(&output_path_prefix) - .change_context_lazy(|| Error::InvalidUrl(output_path_prefix))?; + let output_url = self.prepared_output_url_prefix(prepare_prefix)?; let object_store = self .object_stores .object_store(&output_url) .change_context(Error::Internal)?; + // TODO: support preparing from remote stores + let to_prepare = std::path::Path::new(to_prepare); let source_data = SourceData { source: Some( SourceData::try_from_local(to_prepare) @@ -184,20 +183,28 @@ impl Preparer { ) } - /// Creates the local output prefix to use for prepared files. + /// Creates the output url prefix to use for prepared files. /// /// e.g. for osx: file://///tables//prepared/ - pub fn prepared_output_prefix(&self, dir: &std::path::Path) -> String { + /// for s3: s3:////tables//prepared/ + pub fn prepared_output_url_prefix( + &self, + prefix: &ObjectStoreUrl, + ) -> error_stack::Result { + let error = || Error::InvalidUrl(prefix.to_string()); let uuid = Uuid::new_v4(); - // Construct the path using PathBuf to handle platform-specific path separators. - let mut buf = std::path::PathBuf::new(); - buf.push(dir); - buf.push(KASKADA_PATH); - buf.push("tables"); - buf.push("prepare"); - buf.push(uuid.to_string()); - buf.to_string_lossy().to_string() + let url = prefix + .join(KASKADA_PATH) + .change_context_lazy(error)? + .join("tables") + .change_context_lazy(error)? + .join("prepare") + .change_context_lazy(error)? + .join(&uuid.to_string()) + .change_context_lazy(error)?; + + Ok(url) } } diff --git a/crates/sparrow-session/src/session.rs b/crates/sparrow-session/src/session.rs index d592a402b..11ca059b4 100644 --- a/crates/sparrow-session/src/session.rs +++ b/crates/sparrow-session/src/session.rs @@ -1,6 +1,7 @@ use hashbrown::HashMap; -use sparrow_runtime::stores::ObjectStoreRegistry; +use sparrow_runtime::stores::{ObjectStoreRegistry, ObjectStoreUrl}; use std::borrow::Cow; +use std::str::FromStr; use std::sync::Arc; use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef}; @@ -34,16 +35,28 @@ pub struct Session { udfs: HashMap>, object_store_registry: Arc, rt: tokio::runtime::Runtime, - /// Temporary directory to hold prepared files for this session. + /// Url to prepare files to. /// - /// Creating in the session ensures it won't be dropped until the - /// session is closed. - prepared_dir: tempfile::TempDir, + /// May be a path to a temporary directory, remote object store, etc. + /// Note: Creating in the Session ensures temporary files won't be dropped + /// until the session is closed. + prepare_prefix: ObjectStoreUrl, + /// Temporary directory for preparing files to in the local case. + /// + /// Stored in the session to ensure it persists until the Session is dropped. + _temp_dir: tempfile::TempDir, } impl Default for Session { fn default() -> Self { - let prepared_dir = tempfile::tempdir().expect("failed to create temp dir"); + // TODO: Support object stores + // Likely will need the option to pass in the destination url when executing the + // query or creating a table. + let temp_dir = tempfile::tempdir().expect("create temp dir"); + let mut url = "file://".to_owned(); + url.push_str(temp_dir.path().to_str().expect("valid string")); + let prepare_prefix = ObjectStoreUrl::from_str(&url).expect("object store url"); + Self { data_context: Default::default(), dfg: Default::default(), @@ -51,7 +64,8 @@ impl Default for Session { udfs: Default::default(), object_store_registry: Default::default(), rt: tokio::runtime::Runtime::new().expect("tokio runtime"), - prepared_dir, + prepare_prefix, + _temp_dir: temp_dir, } } } @@ -132,7 +146,7 @@ impl Session { grouping_name: Option<&str>, time_unit: Option<&str>, source: Option<&str>, - ) -> error_stack::Result, Error> { + ) -> error_stack::Result { let uuid = Uuid::new_v4(); let schema_proto = sparrow_api::kaskada::v1alpha::Schema::try_from(schema.as_ref()) .into_report() @@ -195,7 +209,7 @@ impl Session { time_unit, self.object_store_registry.clone(), source, - self.prepared_dir.path(), + self.prepare_prefix.clone(), ) } diff --git a/crates/sparrow-session/src/table.rs b/crates/sparrow-session/src/table.rs index 0dcc5674f..1d3a0267f 100644 --- a/crates/sparrow-session/src/table.rs +++ b/crates/sparrow-session/src/table.rs @@ -17,14 +17,14 @@ use sparrow_runtime::{key_hash_inverse::ThreadSafeKeyHashInverse, stores::Object use crate::{Error, Expr}; -pub struct Table<'a> { +pub struct Table { pub expr: Expr, preparer: Preparer, key_column: usize, key_hash_inverse: Arc, source: Source, registry: Arc, - prepared_dir: &'a std::path::Path, + prepare_prefix: ObjectStoreUrl, } #[derive(Debug)] @@ -33,7 +33,7 @@ enum Source { Parquet(Arc), } -impl<'a> Table<'a> { +impl Table { #[allow(clippy::too_many_arguments)] pub(crate) fn new( table_info: &mut TableInfo, @@ -44,7 +44,7 @@ impl<'a> Table<'a> { time_unit: Option<&str>, object_stores: Arc, source: Option<&str>, - prepared_dir: &'a std::path::Path, + prepare_prefix: ObjectStoreUrl, ) -> error_stack::Result { let prepared_fields: Fields = KEY_FIELDS .iter() @@ -101,7 +101,7 @@ impl<'a> Table<'a> { key_column: key_column + KEY_FIELDS.len(), source, registry: object_stores, - prepared_dir, + prepare_prefix: prepare_prefix.to_owned(), }) } @@ -137,7 +137,7 @@ impl<'a> Table<'a> { Ok(()) } - pub async fn add_parquet(&self, path: &std::path::Path) -> error_stack::Result<(), Error> { + pub async fn add_parquet(&self, path: &str) -> error_stack::Result<(), Error> { let file_sets = match &self.source { Source::Parquet(file_sets) => file_sets.clone(), other => error_stack::bail!(Error::internal_msg(format!( @@ -148,7 +148,7 @@ impl<'a> Table<'a> { let prepared = self .preparer - .prepare_parquet(path, self.prepared_dir) + .prepare_parquet(path, &self.prepare_prefix) .await .change_context(Error::Prepare)?; From 81deaee5a30114da5e5c8c65777003b202939ae8 Mon Sep 17 00:00:00 2001 From: Jordan Frazier Date: Wed, 4 Oct 2023 08:58:25 -0700 Subject: [PATCH 5/9] path -> string --- python/src/table.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/python/src/table.rs b/python/src/table.rs index bd478a360..44640ffa0 100644 --- a/python/src/table.rs +++ b/python/src/table.rs @@ -89,8 +89,7 @@ impl Table { fn add_parquet<'py>(&mut self, py: Python<'py>, path: String) -> Result<&'py PyAny> { let rust_table = self.rust_table.clone(); Ok(pyo3_asyncio::tokio::future_into_py(py, async move { - let path = std::path::Path::new(&path); - let result = rust_table.add_parquet(path).await; + let result = rust_table.add_parquet(&path).await; Python::with_gil(|py| { result.unwrap(); Ok(py.None()) From ca4b9bd4df0842d29aed2879f6a7645b353309ab Mon Sep 17 00:00:00 2001 From: Jordan Frazier Date: Wed, 4 Oct 2023 10:27:23 -0700 Subject: [PATCH 6/9] fix url --- .../sparrow-runtime/src/prepare/preparer.rs | 48 ++++++++++++++----- .../src/stores/object_store_url.rs | 10 ++++ python/Cargo.lock | 1 + 3 files changed, 47 insertions(+), 12 deletions(-) diff --git a/crates/sparrow-runtime/src/prepare/preparer.rs b/crates/sparrow-runtime/src/prepare/preparer.rs index da247a9bb..5242f3610 100644 --- a/crates/sparrow-runtime/src/prepare/preparer.rs +++ b/crates/sparrow-runtime/src/prepare/preparer.rs @@ -1,3 +1,4 @@ +use std::str::FromStr; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; @@ -192,19 +193,42 @@ impl Preparer { prefix: &ObjectStoreUrl, ) -> error_stack::Result { let error = || Error::InvalidUrl(prefix.to_string()); - let uuid = Uuid::new_v4(); - - let url = prefix - .join(KASKADA_PATH) - .change_context_lazy(error)? - .join("tables") - .change_context_lazy(error)? - .join("prepare") - .change_context_lazy(error)? - .join(&uuid.to_string()) - .change_context_lazy(error)?; - Ok(url) + // Append the trailing slash to make it a directory + let uuid = Uuid::new_v4(); + let uuid = uuid.to_string() + "/"; + + if prefix.is_local() { + // The temp directory doesn't have a trailing slash, so we can't join on the existing URL. + let prefix = prefix.to_string(); + let url = prefix + + "/" + + KASKADA_PATH + + "/tables/" + + &self.table_config.uuid.to_string() + + "/prepared/" + + &uuid; + + let url = ObjectStoreUrl::from_str(&url).change_context_lazy(error)?; + Ok(url) + } else if prefix.is_s3() { + // TODO: This requires the s3 path has a trailing slash as-is. + // Should we check that and append it if we need to? + let url = prefix + .join(KASKADA_PATH) + .change_context_lazy(error)? + .join("tables") + .change_context_lazy(error)? + .join(&self.table_config.uuid.to_string()) + .change_context_lazy(error)? + .join("prepared") + .change_context_lazy(error)? + .join(&uuid.to_string()) + .change_context_lazy(error)?; + Ok(url) + } else { + error_stack::bail!(Error::Internal) + } } } diff --git a/crates/sparrow-runtime/src/stores/object_store_url.rs b/crates/sparrow-runtime/src/stores/object_store_url.rs index a1ee68753..8c5b424e5 100644 --- a/crates/sparrow-runtime/src/stores/object_store_url.rs +++ b/crates/sparrow-runtime/src/stores/object_store_url.rs @@ -51,6 +51,16 @@ impl ObjectStoreUrl { self.url.make_relative(&url.url) } + pub fn is_local(&self) -> bool { + self.url.scheme() == "file" + } + + // TODO: Currently only supports the "s3" scheme + // Several others are supported by ObjectStore: https://docs.rs/object_store/latest/object_store/aws/struct.AmazonS3Builder.html#method.with_url + pub fn is_s3(&self) -> bool { + self.url.scheme() == "s3" + } + /// Return the local path, if this is a local file. pub fn local_path(&self) -> Option<&Path> { if self.url.scheme() == "file" { diff --git a/python/Cargo.lock b/python/Cargo.lock index 1a17ce41a..6803337c6 100644 --- a/python/Cargo.lock +++ b/python/Cargo.lock @@ -3988,6 +3988,7 @@ dependencies = [ "sparrow-runtime", "sparrow-syntax", "static_init", + "tempfile", "tokio", "tokio-stream", "uuid 1.4.1", From 643009c18a369a5546e3f44ebe8ba48355a0ec2f Mon Sep 17 00:00:00 2001 From: Jordan Frazier Date: Wed, 4 Oct 2023 15:35:09 -0700 Subject: [PATCH 7/9] clean up --- .../sparrow-runtime/src/prepare/preparer.rs | 56 +++++++------------ .../src/stores/object_store_url.rs | 28 ++++++---- crates/sparrow-session/src/session.rs | 16 +++--- 3 files changed, 44 insertions(+), 56 deletions(-) diff --git a/crates/sparrow-runtime/src/prepare/preparer.rs b/crates/sparrow-runtime/src/prepare/preparer.rs index 5242f3610..011b69813 100644 --- a/crates/sparrow-runtime/src/prepare/preparer.rs +++ b/crates/sparrow-runtime/src/prepare/preparer.rs @@ -1,4 +1,3 @@ -use std::str::FromStr; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; @@ -186,49 +185,32 @@ impl Preparer { /// Creates the output url prefix to use for prepared files. /// - /// e.g. for osx: file://///tables//prepared/ - /// for s3: s3:////tables//prepared/ + /// e.g. for osx: file://///tables//prepared// + /// for s3: s3:////tables//prepared// pub fn prepared_output_url_prefix( &self, prefix: &ObjectStoreUrl, ) -> error_stack::Result { let error = || Error::InvalidUrl(prefix.to_string()); - // Append the trailing slash to make it a directory let uuid = Uuid::new_v4(); - let uuid = uuid.to_string() + "/"; - - if prefix.is_local() { - // The temp directory doesn't have a trailing slash, so we can't join on the existing URL. - let prefix = prefix.to_string(); - let url = prefix - + "/" - + KASKADA_PATH - + "/tables/" - + &self.table_config.uuid.to_string() - + "/prepared/" - + &uuid; - - let url = ObjectStoreUrl::from_str(&url).change_context_lazy(error)?; - Ok(url) - } else if prefix.is_s3() { - // TODO: This requires the s3 path has a trailing slash as-is. - // Should we check that and append it if we need to? - let url = prefix - .join(KASKADA_PATH) - .change_context_lazy(error)? - .join("tables") - .change_context_lazy(error)? - .join(&self.table_config.uuid.to_string()) - .change_context_lazy(error)? - .join("prepared") - .change_context_lazy(error)? - .join(&uuid.to_string()) - .change_context_lazy(error)?; - Ok(url) - } else { - error_stack::bail!(Error::Internal) - } + + let url = prefix + .join(KASKADA_PATH) + .change_context_lazy(error)? + .join("tables") + .change_context_lazy(error)? + .join(&self.table_config.uuid.to_string()) + .change_context_lazy(error)? + .join("prepared") + .change_context_lazy(error)? + .join(&uuid.to_string()) + .change_context_lazy(error)?; + + // Ensure it's treated as a directory + let path = url.std_path().change_context_lazy(error)?; + let url = ObjectStoreUrl::from_directory_path(&path).unwrap(); + Ok(url) } } diff --git a/crates/sparrow-runtime/src/stores/object_store_url.rs b/crates/sparrow-runtime/src/stores/object_store_url.rs index 8c5b424e5..27d207631 100644 --- a/crates/sparrow-runtime/src/stores/object_store_url.rs +++ b/crates/sparrow-runtime/src/stores/object_store_url.rs @@ -21,6 +21,24 @@ impl ObjectStoreUrl { .change_context_lazy(|| Error::InvalidUrl(self.url.to_string())) } + /// Return the [std::path::Path] corresponding to this URL. + pub fn std_path(&self) -> error_stack::Result { + let path = self + .url + .to_file_path() + .map_err(|_| Error::InvalidUrl(self.url.to_string()))?; + Ok(path) + } + + /// Convert a directory name as [std::path::Path] into an URL in the `file` scheme. + /// + /// This ensures the path has a trailing slash. + pub fn from_directory_path(path: &std::path::Path) -> error_stack::Result { + let url = Url::from_directory_path(path) + .map_err(|_| Error::InvalidUrl(path.to_string_lossy().to_string()))?; + Ok(Self { url }) + } + pub fn url(&self) -> &Url { &self.url } @@ -51,16 +69,6 @@ impl ObjectStoreUrl { self.url.make_relative(&url.url) } - pub fn is_local(&self) -> bool { - self.url.scheme() == "file" - } - - // TODO: Currently only supports the "s3" scheme - // Several others are supported by ObjectStore: https://docs.rs/object_store/latest/object_store/aws/struct.AmazonS3Builder.html#method.with_url - pub fn is_s3(&self) -> bool { - self.url.scheme() == "s3" - } - /// Return the local path, if this is a local file. pub fn local_path(&self) -> Option<&Path> { if self.url.scheme() == "file" { diff --git a/crates/sparrow-session/src/session.rs b/crates/sparrow-session/src/session.rs index 11ca059b4..fe95682f5 100644 --- a/crates/sparrow-session/src/session.rs +++ b/crates/sparrow-session/src/session.rs @@ -1,7 +1,6 @@ use hashbrown::HashMap; use sparrow_runtime::stores::{ObjectStoreRegistry, ObjectStoreUrl}; use std::borrow::Cow; -use std::str::FromStr; use std::sync::Arc; use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef}; @@ -35,11 +34,11 @@ pub struct Session { udfs: HashMap>, object_store_registry: Arc, rt: tokio::runtime::Runtime, - /// Url to prepare files to. + /// Default temporary path to prepare files to. /// - /// May be a path to a temporary directory, remote object store, etc. - /// Note: Creating in the Session ensures temporary files won't be dropped - /// until the session is closed. + /// NOTE: we'll want to figure out how we're passing in the user-defined + /// destination url. It's possible it could be passed in as part of the + /// query, at which point we'd use that instead of this default temporary path. prepare_prefix: ObjectStoreUrl, /// Temporary directory for preparing files to in the local case. /// @@ -52,11 +51,10 @@ impl Default for Session { // TODO: Support object stores // Likely will need the option to pass in the destination url when executing the // query or creating a table. - let temp_dir = tempfile::tempdir().expect("create temp dir"); - let mut url = "file://".to_owned(); - url.push_str(temp_dir.path().to_str().expect("valid string")); - let prepare_prefix = ObjectStoreUrl::from_str(&url).expect("object store url"); + let temp_dir = tempfile::tempdir().expect("create temp dir"); + let prepare_prefix = + ObjectStoreUrl::from_directory_path(temp_dir.path()).expect("valid path"); Self { data_context: Default::default(), dfg: Default::default(), From d1a862e9a93b13d90853704baf36a63a51211d0b Mon Sep 17 00:00:00 2001 From: Jordan Frazier Date: Thu, 5 Oct 2023 10:28:11 -0700 Subject: [PATCH 8/9] Clean up error handling --- .../sparrow-runtime/src/prepare/preparer.rs | 32 +++++++------------ crates/sparrow-runtime/src/stores.rs | 2 +- .../src/stores/object_store_url.rs | 25 ++++++++------- 3 files changed, 27 insertions(+), 32 deletions(-) diff --git a/crates/sparrow-runtime/src/prepare/preparer.rs b/crates/sparrow-runtime/src/prepare/preparer.rs index 011b69813..ca1676f22 100644 --- a/crates/sparrow-runtime/src/prepare/preparer.rs +++ b/crates/sparrow-runtime/src/prepare/preparer.rs @@ -1,5 +1,4 @@ use std::sync::atomic::{AtomicU64, Ordering}; -use std::sync::Arc; use arrow::array::{ArrayRef, UInt64Array}; use arrow::compute::SortColumn; @@ -10,6 +9,7 @@ use error_stack::{IntoReport, IntoReportCompat, ResultExt}; use futures::stream::FuturesUnordered; use futures::{StreamExt, TryStreamExt}; use sparrow_api::kaskada::v1alpha::{PreparedFile, SourceData, TableConfig}; +use std::sync::Arc; use uuid::Uuid; use crate::stores::{ObjectStoreRegistry, ObjectStoreUrl}; @@ -101,7 +101,9 @@ impl Preparer { prepare_prefix: &ObjectStoreUrl, ) -> error_stack::Result, Error> { // TODO: Support Slicing - let output_url = self.prepared_output_url_prefix(prepare_prefix)?; + let output_url = self + .prepared_output_url_prefix(prepare_prefix) + .change_context_lazy(|| Error::InvalidUrl(prepare_prefix.to_string()))?; let object_store = self .object_stores @@ -187,29 +189,19 @@ impl Preparer { /// /// e.g. for osx: file://///tables//prepared// /// for s3: s3:////tables//prepared// - pub fn prepared_output_url_prefix( + fn prepared_output_url_prefix( &self, prefix: &ObjectStoreUrl, - ) -> error_stack::Result { - let error = || Error::InvalidUrl(prefix.to_string()); - + ) -> error_stack::Result { let uuid = Uuid::new_v4(); - let url = prefix - .join(KASKADA_PATH) - .change_context_lazy(error)? - .join("tables") - .change_context_lazy(error)? - .join(&self.table_config.uuid.to_string()) - .change_context_lazy(error)? - .join("prepared") - .change_context_lazy(error)? - .join(&uuid.to_string()) - .change_context_lazy(error)?; + .join(KASKADA_PATH)? + .join("tables")? + .join(&self.table_config.uuid.to_string())? + .join("prepared")? + .join(&uuid.to_string())? + .ensure_directory()?; - // Ensure it's treated as a directory - let path = url.std_path().change_context_lazy(error)?; - let url = ObjectStoreUrl::from_directory_path(&path).unwrap(); Ok(url) } } diff --git a/crates/sparrow-runtime/src/stores.rs b/crates/sparrow-runtime/src/stores.rs index b85a58432..f030a8b55 100644 --- a/crates/sparrow-runtime/src/stores.rs +++ b/crates/sparrow-runtime/src/stores.rs @@ -1,7 +1,7 @@ mod object_meta_ext; mod object_store_key; pub mod object_store_url; -mod registry; +pub mod registry; pub use object_meta_ext::ObjectMetaExt; pub use object_store_url::ObjectStoreUrl; diff --git a/crates/sparrow-runtime/src/stores/object_store_url.rs b/crates/sparrow-runtime/src/stores/object_store_url.rs index 27d207631..758a6186f 100644 --- a/crates/sparrow-runtime/src/stores/object_store_url.rs +++ b/crates/sparrow-runtime/src/stores/object_store_url.rs @@ -21,21 +21,24 @@ impl ObjectStoreUrl { .change_context_lazy(|| Error::InvalidUrl(self.url.to_string())) } - /// Return the [std::path::Path] corresponding to this URL. - pub fn std_path(&self) -> error_stack::Result { - let path = self - .url - .to_file_path() - .map_err(|_| Error::InvalidUrl(self.url.to_string()))?; - Ok(path) + /// Returns the URL, ensuring it is a directory. + pub fn ensure_directory(self) -> error_stack::Result { + let mut url = self.url.to_string(); + if !url.ends_with('/') { + url.push('/'); + } + + let url = Url::parse(&url) + .into_report() + .change_context_lazy(|| Error::InvalidUrl(self.url.to_string()))?; + Ok(Self { url }) } - /// Convert a directory name as [std::path::Path] into an URL in the `file` scheme. - /// - /// This ensures the path has a trailing slash. + /// Constructs an [ObjectStoreUrl] from a local directory path. pub fn from_directory_path(path: &std::path::Path) -> error_stack::Result { let url = Url::from_directory_path(path) - .map_err(|_| Error::InvalidUrl(path.to_string_lossy().to_string()))?; + .map_err(|_| Error::InvalidUrl(path.display().to_string()))?; + Ok(Self { url }) } From 7262d716a1aa0f3ffdea7d25434e5048a126a994 Mon Sep 17 00:00:00 2001 From: Jordan Frazier Date: Thu, 5 Oct 2023 12:01:30 -0700 Subject: [PATCH 9/9] new clippy things --- crates/sparrow-instructions/src/columnar_value.rs | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/crates/sparrow-instructions/src/columnar_value.rs b/crates/sparrow-instructions/src/columnar_value.rs index 0ac15124c..fb3c12638 100644 --- a/crates/sparrow-instructions/src/columnar_value.rs +++ b/crates/sparrow-instructions/src/columnar_value.rs @@ -1,14 +1,11 @@ use anyhow::anyhow; use arrow::datatypes::*; +use arrow_array::cast::AsArray; use arrow_array::{ Array, ArrayRef, BooleanArray, GenericStringArray, MapArray, OffsetSizeTrait, PrimitiveArray, StructArray, }; use owning_ref::ArcRef; -use sparrow_arrow::downcast::{ - downcast_boolean_array, downcast_map_array, downcast_primitive_array, downcast_string_array, - downcast_struct_array, -}; use sparrow_arrow::scalar_value::{NativeFromScalar, ScalarValue}; /// The input to an instruction. @@ -52,14 +49,14 @@ impl ColumnarValue { /// struct array. pub fn struct_array(&self) -> anyhow::Result> { let array = self.array_ref()?; - ArcRef::new(array).try_map(|a| downcast_struct_array(a)) + Ok(ArcRef::new(array).map(|a| a.as_struct())) } /// Specialized version of `array_ref` that downcasts the array to a /// map array. pub fn map_array(&self) -> anyhow::Result> { let array = self.array_ref()?; - ArcRef::new(array).try_map(|a| downcast_map_array(a)) + Ok(ArcRef::new(array).map(|a| a.as_map())) } /// Specialized version of `array_ref` that downcasts the array to a @@ -69,7 +66,7 @@ impl ColumnarValue { T: OffsetSizeTrait, { let array = self.array_ref()?; - ArcRef::new(array).try_map(|a| downcast_string_array(a)) + Ok(ArcRef::new(array).map(|a| a.as_string::())) } /// Specialized version of `array_ref` that downcasts the array to a @@ -79,14 +76,14 @@ impl ColumnarValue { &self, ) -> anyhow::Result>> { let array = self.array_ref()?; - ArcRef::new(array).try_map(|a| downcast_primitive_array(a)) + Ok(ArcRef::new(array).map(|a| a.as_primitive::())) } /// Specialized version of `array_ref` that downcasts the array to a /// boolean array. pub fn boolean_array(&self) -> anyhow::Result> { let array = self.array_ref()?; - ArcRef::new(array).try_map(|a| downcast_boolean_array(a)) + Ok(ArcRef::new(array).map(|a| a.as_boolean())) } /// Attempt to get the native value of a literal from this `ColumnarValue`.