Skip to content

Commit

Permalink
ref: move prepared files to ~/.cache dir (#781)
Browse files Browse the repository at this point in the history
Moves the prepared files directory to `~/.cache/kaskada/`. 

The other alternative is to, for now, create a temporary directory that
cleans up after use. While the `~/.cache` is generally used for
application-specific files (which is what the prepared files are), we
aren't utilizing them and won't until we have state. But, if it's not a
large concern to accumulate prepared files in the cache, we might as
well start with it here in anticipation of using them.

Closes #779
  • Loading branch information
jordanrfrazier authored Oct 5, 2023
1 parent 74fa425 commit 147642b
Show file tree
Hide file tree
Showing 10 changed files with 98 additions and 36 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 6 additions & 9 deletions crates/sparrow-instructions/src/columnar_value.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
use anyhow::anyhow;
use arrow::datatypes::*;
use arrow_array::cast::AsArray;
use arrow_array::{
Array, ArrayRef, BooleanArray, GenericStringArray, MapArray, OffsetSizeTrait, PrimitiveArray,
StructArray,
};
use owning_ref::ArcRef;
use sparrow_arrow::downcast::{
downcast_boolean_array, downcast_map_array, downcast_primitive_array, downcast_string_array,
downcast_struct_array,
};
use sparrow_arrow::scalar_value::{NativeFromScalar, ScalarValue};

/// The input to an instruction.
Expand Down Expand Up @@ -52,14 +49,14 @@ impl ColumnarValue {
/// struct array.
pub fn struct_array(&self) -> anyhow::Result<ArcRef<dyn Array, StructArray>> {
let array = self.array_ref()?;
ArcRef::new(array).try_map(|a| downcast_struct_array(a))
Ok(ArcRef::new(array).map(|a| a.as_struct()))
}

/// Specialized version of `array_ref` that downcasts the array to a
/// map array.
pub fn map_array(&self) -> anyhow::Result<ArcRef<dyn Array, MapArray>> {
let array = self.array_ref()?;
ArcRef::new(array).try_map(|a| downcast_map_array(a))
Ok(ArcRef::new(array).map(|a| a.as_map()))
}

/// Specialized version of `array_ref` that downcasts the array to a
Expand All @@ -69,7 +66,7 @@ impl ColumnarValue {
T: OffsetSizeTrait,
{
let array = self.array_ref()?;
ArcRef::new(array).try_map(|a| downcast_string_array(a))
Ok(ArcRef::new(array).map(|a| a.as_string::<T>()))
}

/// Specialized version of `array_ref` that downcasts the array to a
Expand All @@ -79,14 +76,14 @@ impl ColumnarValue {
&self,
) -> anyhow::Result<ArcRef<dyn Array, PrimitiveArray<T>>> {
let array = self.array_ref()?;
ArcRef::new(array).try_map(|a| downcast_primitive_array(a))
Ok(ArcRef::new(array).map(|a| a.as_primitive::<T>()))
}

/// Specialized version of `array_ref` that downcasts the array to a
/// boolean array.
pub fn boolean_array(&self) -> anyhow::Result<ArcRef<dyn Array, BooleanArray>> {
let array = self.array_ref()?;
ArcRef::new(array).try_map(|a| downcast_boolean_array(a))
Ok(ArcRef::new(array).map(|a| a.as_boolean()))
}

/// Attempt to get the native value of a literal from this `ColumnarValue`.
Expand Down
61 changes: 40 additions & 21 deletions crates/sparrow-runtime/src/prepare/preparer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;

use arrow::array::{ArrayRef, UInt64Array};
use arrow::compute::SortColumn;
Expand All @@ -11,13 +9,20 @@ use error_stack::{IntoReport, IntoReportCompat, ResultExt};
use futures::stream::FuturesUnordered;
use futures::{StreamExt, TryStreamExt};
use sparrow_api::kaskada::v1alpha::{PreparedFile, SourceData, TableConfig};
use std::sync::Arc;
use uuid::Uuid;

use crate::stores::{ObjectStoreRegistry, ObjectStoreUrl};
use crate::PreparedMetadata;

use super::{prepared_batches, write_parquet};

/// For now, this is a temporary location for the prepared files.
/// In the future, we'll want to move this path to a managed cache
/// so we can reuse state.
const KASKADA_PATH: &str = "kaskada";
const PREPARED_FILE_PREFIX: &str = "part";

#[derive(derive_more::Display, Debug)]
pub enum Error {
#[display(fmt = "batch missing required column '{_0}'")]
Expand Down Expand Up @@ -87,35 +92,29 @@ impl Preparer {
/// - This adds or casts columns as needed.
/// - This produces multiple parts if the input file is large.
/// - This produces metadata files alongside data files.
/// Parameters:
/// - `to_prepare`: The path to the parquet file to prepare.
/// - `prepare_prefix`: The prefix to write prepared files to.
pub async fn prepare_parquet(
&self,
path: &std::path::Path,
to_prepare: &str,
prepare_prefix: &ObjectStoreUrl,
) -> error_stack::Result<Vec<PreparedFile>, Error> {
// TODO: Support Slicing

// Prepared files are stored in the following format:
// file:///<cwd>/tables/<table_uuid>/prepared/<uuid>/part-<n>.parquet
let cur_dir = std::env::current_dir().expect("current dir");
let cur_dir = cur_dir.to_string_lossy();

let uuid = Uuid::new_v4();
let output_path_prefix = format!(
"file:///{}/tables/{}/prepare/{uuid}/",
cur_dir, self.table_config.uuid
);
let output_file_prefix = "part";

let output_url = ObjectStoreUrl::from_str(&output_path_prefix)
.change_context_lazy(|| Error::InvalidUrl(path.to_string_lossy().to_string()))?;
let output_url = self
.prepared_output_url_prefix(prepare_prefix)
.change_context_lazy(|| Error::InvalidUrl(prepare_prefix.to_string()))?;

let object_store = self
.object_stores
.object_store(&output_url)
.change_context(Error::Internal)?;

// TODO: support preparing from remote stores
let to_prepare = std::path::Path::new(to_prepare);
let source_data = SourceData {
source: Some(
SourceData::try_from_local(path)
SourceData::try_from_local(to_prepare)
.into_report()
.change_context(Error::Internal)?,
),
Expand All @@ -140,10 +139,10 @@ impl Preparer {
let (data, metadata) = next.change_context(Error::Internal)?;

let data_url = output_url
.join(&format!("{output_file_prefix}-{n}.parquet"))
.join(&format!("{PREPARED_FILE_PREFIX}-{n}.parquet"))
.change_context(Error::Internal)?;
let metadata_url = output_url
.join(&format!("{output_file_prefix}-{n}-metadata.parquet"))
.join(&format!("{PREPARED_FILE_PREFIX}-{n}-metadata.parquet"))
.change_context(Error::Internal)?;

// Create the prepared file via PreparedMetadata.
Expand Down Expand Up @@ -185,6 +184,26 @@ impl Preparer {
self.time_multiplier.as_ref(),
)
}

/// Creates the output url prefix to use for prepared files.
///
/// e.g. for osx: file:///<dir>/<KASKADA_PATH>/tables/<table_uuid>/prepared/<uuid>/
/// for s3: s3://<path>/<KASKADA_PATH>/tables/<table_uuid>/prepared/<uuid>/
fn prepared_output_url_prefix(
&self,
prefix: &ObjectStoreUrl,
) -> error_stack::Result<ObjectStoreUrl, crate::stores::registry::Error> {
let uuid = Uuid::new_v4();
let url = prefix
.join(KASKADA_PATH)?
.join("tables")?
.join(&self.table_config.uuid.to_string())?
.join("prepared")?
.join(&uuid.to_string())?
.ensure_directory()?;

Ok(url)
}
}

pub fn prepare_batch(
Expand Down
2 changes: 1 addition & 1 deletion crates/sparrow-runtime/src/stores.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
mod object_meta_ext;
mod object_store_key;
pub mod object_store_url;
mod registry;
pub mod registry;

pub use object_meta_ext::ObjectMetaExt;
pub use object_store_url::ObjectStoreUrl;
Expand Down
21 changes: 21 additions & 0 deletions crates/sparrow-runtime/src/stores/object_store_url.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,27 @@ impl ObjectStoreUrl {
.change_context_lazy(|| Error::InvalidUrl(self.url.to_string()))
}

/// Returns the URL, ensuring it is a directory.
pub fn ensure_directory(self) -> error_stack::Result<Self, Error> {
let mut url = self.url.to_string();
if !url.ends_with('/') {
url.push('/');
}

let url = Url::parse(&url)
.into_report()
.change_context_lazy(|| Error::InvalidUrl(self.url.to_string()))?;
Ok(Self { url })
}

/// Constructs an [ObjectStoreUrl] from a local directory path.
pub fn from_directory_path(path: &std::path::Path) -> error_stack::Result<Self, Error> {
let url = Url::from_directory_path(path)
.map_err(|_| Error::InvalidUrl(path.display().to_string()))?;

Ok(Self { url })
}

pub fn url(&self) -> &Url {
&self.url
}
Expand Down
1 change: 1 addition & 0 deletions crates/sparrow-session/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ sparrow-runtime = { path = "../sparrow-runtime" }
sparrow-syntax = { path = "../sparrow-syntax" }
sparrow-instructions = { path = "../sparrow-instructions" }
static_init.workspace = true
tempfile.workspace = true
tokio.workspace = true
tokio-stream.workspace = true
uuid.workspace = true
Expand Down
22 changes: 21 additions & 1 deletion crates/sparrow-session/src/session.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use hashbrown::HashMap;
use sparrow_runtime::stores::ObjectStoreRegistry;
use sparrow_runtime::stores::{ObjectStoreRegistry, ObjectStoreUrl};
use std::borrow::Cow;
use std::sync::Arc;

Expand Down Expand Up @@ -34,17 +34,36 @@ pub struct Session {
udfs: HashMap<Uuid, Arc<dyn Udf>>,
object_store_registry: Arc<ObjectStoreRegistry>,
rt: tokio::runtime::Runtime,
/// Default temporary path to prepare files to.
///
/// NOTE: we'll want to figure out how we're passing in the user-defined
/// destination url. It's possible it could be passed in as part of the
/// query, at which point we'd use that instead of this default temporary path.
prepare_prefix: ObjectStoreUrl,
/// Temporary directory for preparing files to in the local case.
///
/// Stored in the session to ensure it persists until the Session is dropped.
_temp_dir: tempfile::TempDir,
}

impl Default for Session {
fn default() -> Self {
// TODO: Support object stores
// Likely will need the option to pass in the destination url when executing the
// query or creating a table.

let temp_dir = tempfile::tempdir().expect("create temp dir");
let prepare_prefix =
ObjectStoreUrl::from_directory_path(temp_dir.path()).expect("valid path");
Self {
data_context: Default::default(),
dfg: Default::default(),
key_hash_inverse: Default::default(),
udfs: Default::default(),
object_store_registry: Default::default(),
rt: tokio::runtime::Runtime::new().expect("tokio runtime"),
prepare_prefix,
_temp_dir: temp_dir,
}
}
}
Expand Down Expand Up @@ -188,6 +207,7 @@ impl Session {
time_unit,
self.object_store_registry.clone(),
source,
self.prepare_prefix.clone(),
)
}

Expand Down
7 changes: 5 additions & 2 deletions crates/sparrow-session/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub struct Table {
key_hash_inverse: Arc<ThreadSafeKeyHashInverse>,
source: Source,
registry: Arc<ObjectStoreRegistry>,
prepare_prefix: ObjectStoreUrl,
}

#[derive(Debug)]
Expand All @@ -43,6 +44,7 @@ impl Table {
time_unit: Option<&str>,
object_stores: Arc<ObjectStoreRegistry>,
source: Option<&str>,
prepare_prefix: ObjectStoreUrl,
) -> error_stack::Result<Self, Error> {
let prepared_fields: Fields = KEY_FIELDS
.iter()
Expand Down Expand Up @@ -99,6 +101,7 @@ impl Table {
key_column: key_column + KEY_FIELDS.len(),
source,
registry: object_stores,
prepare_prefix: prepare_prefix.to_owned(),
})
}

Expand Down Expand Up @@ -134,7 +137,7 @@ impl Table {
Ok(())
}

pub async fn add_parquet(&self, path: &std::path::Path) -> error_stack::Result<(), Error> {
pub async fn add_parquet(&self, path: &str) -> error_stack::Result<(), Error> {
let file_sets = match &self.source {
Source::Parquet(file_sets) => file_sets.clone(),
other => error_stack::bail!(Error::internal_msg(format!(
Expand All @@ -145,7 +148,7 @@ impl Table {

let prepared = self
.preparer
.prepare_parquet(path)
.prepare_parquet(path, &self.prepare_prefix)
.await
.change_context(Error::Prepare)?;

Expand Down
1 change: 1 addition & 0 deletions python/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions python/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ impl Table {
fn add_parquet<'py>(&mut self, py: Python<'py>, path: String) -> Result<&'py PyAny> {
let rust_table = self.rust_table.clone();
Ok(pyo3_asyncio::tokio::future_into_py(py, async move {
let path = std::path::Path::new(&path);
let result = rust_table.add_parquet(path).await;
let result = rust_table.add_parquet(&path).await;
Python::with_gil(|py| {
result.unwrap();
Ok(py.None())
Expand Down

0 comments on commit 147642b

Please sign in to comment.