diff --git a/Cargo.lock b/Cargo.lock index 98b1b48c2..9563d0e1e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4824,6 +4824,7 @@ dependencies = [ "sparrow-runtime", "sparrow-syntax", "static_init", + "tempfile", "tokio", "tokio-stream", "uuid 1.4.1", diff --git a/crates/sparrow-instructions/src/columnar_value.rs b/crates/sparrow-instructions/src/columnar_value.rs index 0ac15124c..fb3c12638 100644 --- a/crates/sparrow-instructions/src/columnar_value.rs +++ b/crates/sparrow-instructions/src/columnar_value.rs @@ -1,14 +1,11 @@ use anyhow::anyhow; use arrow::datatypes::*; +use arrow_array::cast::AsArray; use arrow_array::{ Array, ArrayRef, BooleanArray, GenericStringArray, MapArray, OffsetSizeTrait, PrimitiveArray, StructArray, }; use owning_ref::ArcRef; -use sparrow_arrow::downcast::{ - downcast_boolean_array, downcast_map_array, downcast_primitive_array, downcast_string_array, - downcast_struct_array, -}; use sparrow_arrow::scalar_value::{NativeFromScalar, ScalarValue}; /// The input to an instruction. @@ -52,14 +49,14 @@ impl ColumnarValue { /// struct array. pub fn struct_array(&self) -> anyhow::Result> { let array = self.array_ref()?; - ArcRef::new(array).try_map(|a| downcast_struct_array(a)) + Ok(ArcRef::new(array).map(|a| a.as_struct())) } /// Specialized version of `array_ref` that downcasts the array to a /// map array. pub fn map_array(&self) -> anyhow::Result> { let array = self.array_ref()?; - ArcRef::new(array).try_map(|a| downcast_map_array(a)) + Ok(ArcRef::new(array).map(|a| a.as_map())) } /// Specialized version of `array_ref` that downcasts the array to a @@ -69,7 +66,7 @@ impl ColumnarValue { T: OffsetSizeTrait, { let array = self.array_ref()?; - ArcRef::new(array).try_map(|a| downcast_string_array(a)) + Ok(ArcRef::new(array).map(|a| a.as_string::())) } /// Specialized version of `array_ref` that downcasts the array to a @@ -79,14 +76,14 @@ impl ColumnarValue { &self, ) -> anyhow::Result>> { let array = self.array_ref()?; - ArcRef::new(array).try_map(|a| downcast_primitive_array(a)) + Ok(ArcRef::new(array).map(|a| a.as_primitive::())) } /// Specialized version of `array_ref` that downcasts the array to a /// boolean array. pub fn boolean_array(&self) -> anyhow::Result> { let array = self.array_ref()?; - ArcRef::new(array).try_map(|a| downcast_boolean_array(a)) + Ok(ArcRef::new(array).map(|a| a.as_boolean())) } /// Attempt to get the native value of a literal from this `ColumnarValue`. diff --git a/crates/sparrow-runtime/src/prepare/preparer.rs b/crates/sparrow-runtime/src/prepare/preparer.rs index c9421783a..ca1676f22 100644 --- a/crates/sparrow-runtime/src/prepare/preparer.rs +++ b/crates/sparrow-runtime/src/prepare/preparer.rs @@ -1,6 +1,4 @@ -use std::str::FromStr; use std::sync::atomic::{AtomicU64, Ordering}; -use std::sync::Arc; use arrow::array::{ArrayRef, UInt64Array}; use arrow::compute::SortColumn; @@ -11,6 +9,7 @@ use error_stack::{IntoReport, IntoReportCompat, ResultExt}; use futures::stream::FuturesUnordered; use futures::{StreamExt, TryStreamExt}; use sparrow_api::kaskada::v1alpha::{PreparedFile, SourceData, TableConfig}; +use std::sync::Arc; use uuid::Uuid; use crate::stores::{ObjectStoreRegistry, ObjectStoreUrl}; @@ -18,6 +17,12 @@ use crate::PreparedMetadata; use super::{prepared_batches, write_parquet}; +/// For now, this is a temporary location for the prepared files. +/// In the future, we'll want to move this path to a managed cache +/// so we can reuse state. +const KASKADA_PATH: &str = "kaskada"; +const PREPARED_FILE_PREFIX: &str = "part"; + #[derive(derive_more::Display, Debug)] pub enum Error { #[display(fmt = "batch missing required column '{_0}'")] @@ -87,35 +92,29 @@ impl Preparer { /// - This adds or casts columns as needed. /// - This produces multiple parts if the input file is large. /// - This produces metadata files alongside data files. + /// Parameters: + /// - `to_prepare`: The path to the parquet file to prepare. + /// - `prepare_prefix`: The prefix to write prepared files to. pub async fn prepare_parquet( &self, - path: &std::path::Path, + to_prepare: &str, + prepare_prefix: &ObjectStoreUrl, ) -> error_stack::Result, Error> { // TODO: Support Slicing - - // Prepared files are stored in the following format: - // file:////tables//prepared//part-.parquet - let cur_dir = std::env::current_dir().expect("current dir"); - let cur_dir = cur_dir.to_string_lossy(); - - let uuid = Uuid::new_v4(); - let output_path_prefix = format!( - "file:///{}/tables/{}/prepare/{uuid}/", - cur_dir, self.table_config.uuid - ); - let output_file_prefix = "part"; - - let output_url = ObjectStoreUrl::from_str(&output_path_prefix) - .change_context_lazy(|| Error::InvalidUrl(path.to_string_lossy().to_string()))?; + let output_url = self + .prepared_output_url_prefix(prepare_prefix) + .change_context_lazy(|| Error::InvalidUrl(prepare_prefix.to_string()))?; let object_store = self .object_stores .object_store(&output_url) .change_context(Error::Internal)?; + // TODO: support preparing from remote stores + let to_prepare = std::path::Path::new(to_prepare); let source_data = SourceData { source: Some( - SourceData::try_from_local(path) + SourceData::try_from_local(to_prepare) .into_report() .change_context(Error::Internal)?, ), @@ -140,10 +139,10 @@ impl Preparer { let (data, metadata) = next.change_context(Error::Internal)?; let data_url = output_url - .join(&format!("{output_file_prefix}-{n}.parquet")) + .join(&format!("{PREPARED_FILE_PREFIX}-{n}.parquet")) .change_context(Error::Internal)?; let metadata_url = output_url - .join(&format!("{output_file_prefix}-{n}-metadata.parquet")) + .join(&format!("{PREPARED_FILE_PREFIX}-{n}-metadata.parquet")) .change_context(Error::Internal)?; // Create the prepared file via PreparedMetadata. @@ -185,6 +184,26 @@ impl Preparer { self.time_multiplier.as_ref(), ) } + + /// Creates the output url prefix to use for prepared files. + /// + /// e.g. for osx: file://///tables//prepared// + /// for s3: s3:////tables//prepared// + fn prepared_output_url_prefix( + &self, + prefix: &ObjectStoreUrl, + ) -> error_stack::Result { + let uuid = Uuid::new_v4(); + let url = prefix + .join(KASKADA_PATH)? + .join("tables")? + .join(&self.table_config.uuid.to_string())? + .join("prepared")? + .join(&uuid.to_string())? + .ensure_directory()?; + + Ok(url) + } } pub fn prepare_batch( diff --git a/crates/sparrow-runtime/src/stores.rs b/crates/sparrow-runtime/src/stores.rs index b85a58432..f030a8b55 100644 --- a/crates/sparrow-runtime/src/stores.rs +++ b/crates/sparrow-runtime/src/stores.rs @@ -1,7 +1,7 @@ mod object_meta_ext; mod object_store_key; pub mod object_store_url; -mod registry; +pub mod registry; pub use object_meta_ext::ObjectMetaExt; pub use object_store_url::ObjectStoreUrl; diff --git a/crates/sparrow-runtime/src/stores/object_store_url.rs b/crates/sparrow-runtime/src/stores/object_store_url.rs index a1ee68753..758a6186f 100644 --- a/crates/sparrow-runtime/src/stores/object_store_url.rs +++ b/crates/sparrow-runtime/src/stores/object_store_url.rs @@ -21,6 +21,27 @@ impl ObjectStoreUrl { .change_context_lazy(|| Error::InvalidUrl(self.url.to_string())) } + /// Returns the URL, ensuring it is a directory. + pub fn ensure_directory(self) -> error_stack::Result { + let mut url = self.url.to_string(); + if !url.ends_with('/') { + url.push('/'); + } + + let url = Url::parse(&url) + .into_report() + .change_context_lazy(|| Error::InvalidUrl(self.url.to_string()))?; + Ok(Self { url }) + } + + /// Constructs an [ObjectStoreUrl] from a local directory path. + pub fn from_directory_path(path: &std::path::Path) -> error_stack::Result { + let url = Url::from_directory_path(path) + .map_err(|_| Error::InvalidUrl(path.display().to_string()))?; + + Ok(Self { url }) + } + pub fn url(&self) -> &Url { &self.url } diff --git a/crates/sparrow-session/Cargo.toml b/crates/sparrow-session/Cargo.toml index 117bcab16..67d125d4a 100644 --- a/crates/sparrow-session/Cargo.toml +++ b/crates/sparrow-session/Cargo.toml @@ -26,6 +26,7 @@ sparrow-runtime = { path = "../sparrow-runtime" } sparrow-syntax = { path = "../sparrow-syntax" } sparrow-instructions = { path = "../sparrow-instructions" } static_init.workspace = true +tempfile.workspace = true tokio.workspace = true tokio-stream.workspace = true uuid.workspace = true diff --git a/crates/sparrow-session/src/session.rs b/crates/sparrow-session/src/session.rs index 5e9d8cbdc..fe95682f5 100644 --- a/crates/sparrow-session/src/session.rs +++ b/crates/sparrow-session/src/session.rs @@ -1,5 +1,5 @@ use hashbrown::HashMap; -use sparrow_runtime::stores::ObjectStoreRegistry; +use sparrow_runtime::stores::{ObjectStoreRegistry, ObjectStoreUrl}; use std::borrow::Cow; use std::sync::Arc; @@ -34,10 +34,27 @@ pub struct Session { udfs: HashMap>, object_store_registry: Arc, rt: tokio::runtime::Runtime, + /// Default temporary path to prepare files to. + /// + /// NOTE: we'll want to figure out how we're passing in the user-defined + /// destination url. It's possible it could be passed in as part of the + /// query, at which point we'd use that instead of this default temporary path. + prepare_prefix: ObjectStoreUrl, + /// Temporary directory for preparing files to in the local case. + /// + /// Stored in the session to ensure it persists until the Session is dropped. + _temp_dir: tempfile::TempDir, } impl Default for Session { fn default() -> Self { + // TODO: Support object stores + // Likely will need the option to pass in the destination url when executing the + // query or creating a table. + + let temp_dir = tempfile::tempdir().expect("create temp dir"); + let prepare_prefix = + ObjectStoreUrl::from_directory_path(temp_dir.path()).expect("valid path"); Self { data_context: Default::default(), dfg: Default::default(), @@ -45,6 +62,8 @@ impl Default for Session { udfs: Default::default(), object_store_registry: Default::default(), rt: tokio::runtime::Runtime::new().expect("tokio runtime"), + prepare_prefix, + _temp_dir: temp_dir, } } } @@ -188,6 +207,7 @@ impl Session { time_unit, self.object_store_registry.clone(), source, + self.prepare_prefix.clone(), ) } diff --git a/crates/sparrow-session/src/table.rs b/crates/sparrow-session/src/table.rs index debbdc627..1d3a0267f 100644 --- a/crates/sparrow-session/src/table.rs +++ b/crates/sparrow-session/src/table.rs @@ -24,6 +24,7 @@ pub struct Table { key_hash_inverse: Arc, source: Source, registry: Arc, + prepare_prefix: ObjectStoreUrl, } #[derive(Debug)] @@ -43,6 +44,7 @@ impl Table { time_unit: Option<&str>, object_stores: Arc, source: Option<&str>, + prepare_prefix: ObjectStoreUrl, ) -> error_stack::Result { let prepared_fields: Fields = KEY_FIELDS .iter() @@ -99,6 +101,7 @@ impl Table { key_column: key_column + KEY_FIELDS.len(), source, registry: object_stores, + prepare_prefix: prepare_prefix.to_owned(), }) } @@ -134,7 +137,7 @@ impl Table { Ok(()) } - pub async fn add_parquet(&self, path: &std::path::Path) -> error_stack::Result<(), Error> { + pub async fn add_parquet(&self, path: &str) -> error_stack::Result<(), Error> { let file_sets = match &self.source { Source::Parquet(file_sets) => file_sets.clone(), other => error_stack::bail!(Error::internal_msg(format!( @@ -145,7 +148,7 @@ impl Table { let prepared = self .preparer - .prepare_parquet(path) + .prepare_parquet(path, &self.prepare_prefix) .await .change_context(Error::Prepare)?; diff --git a/python/Cargo.lock b/python/Cargo.lock index 1a17ce41a..6803337c6 100644 --- a/python/Cargo.lock +++ b/python/Cargo.lock @@ -3988,6 +3988,7 @@ dependencies = [ "sparrow-runtime", "sparrow-syntax", "static_init", + "tempfile", "tokio", "tokio-stream", "uuid 1.4.1", diff --git a/python/src/table.rs b/python/src/table.rs index bd478a360..44640ffa0 100644 --- a/python/src/table.rs +++ b/python/src/table.rs @@ -89,8 +89,7 @@ impl Table { fn add_parquet<'py>(&mut self, py: Python<'py>, path: String) -> Result<&'py PyAny> { let rust_table = self.rust_table.clone(); Ok(pyo3_asyncio::tokio::future_into_py(py, async move { - let path = std::path::Path::new(&path); - let result = rust_table.add_parquet(path).await; + let result = rust_table.add_parquet(&path).await; Python::with_gil(|py| { result.unwrap(); Ok(py.None())