Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref: move prepared files to ~/.cache dir #781

Merged
merged 9 commits into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 6 additions & 9 deletions crates/sparrow-instructions/src/columnar_value.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
use anyhow::anyhow;
use arrow::datatypes::*;
use arrow_array::cast::AsArray;
use arrow_array::{
Array, ArrayRef, BooleanArray, GenericStringArray, MapArray, OffsetSizeTrait, PrimitiveArray,
StructArray,
};
use owning_ref::ArcRef;
use sparrow_arrow::downcast::{
downcast_boolean_array, downcast_map_array, downcast_primitive_array, downcast_string_array,
downcast_struct_array,
};
use sparrow_arrow::scalar_value::{NativeFromScalar, ScalarValue};

/// The input to an instruction.
Expand Down Expand Up @@ -52,14 +49,14 @@ impl ColumnarValue {
/// struct array.
pub fn struct_array(&self) -> anyhow::Result<ArcRef<dyn Array, StructArray>> {
let array = self.array_ref()?;
ArcRef::new(array).try_map(|a| downcast_struct_array(a))
Ok(ArcRef::new(array).map(|a| a.as_struct()))
}

/// Specialized version of `array_ref` that downcasts the array to a
/// map array.
pub fn map_array(&self) -> anyhow::Result<ArcRef<dyn Array, MapArray>> {
let array = self.array_ref()?;
ArcRef::new(array).try_map(|a| downcast_map_array(a))
Ok(ArcRef::new(array).map(|a| a.as_map()))
}

/// Specialized version of `array_ref` that downcasts the array to a
Expand All @@ -69,7 +66,7 @@ impl ColumnarValue {
T: OffsetSizeTrait,
{
let array = self.array_ref()?;
ArcRef::new(array).try_map(|a| downcast_string_array(a))
Ok(ArcRef::new(array).map(|a| a.as_string::<T>()))
}

/// Specialized version of `array_ref` that downcasts the array to a
Expand All @@ -79,14 +76,14 @@ impl ColumnarValue {
&self,
) -> anyhow::Result<ArcRef<dyn Array, PrimitiveArray<T>>> {
let array = self.array_ref()?;
ArcRef::new(array).try_map(|a| downcast_primitive_array(a))
Ok(ArcRef::new(array).map(|a| a.as_primitive::<T>()))
}

/// Specialized version of `array_ref` that downcasts the array to a
/// boolean array.
pub fn boolean_array(&self) -> anyhow::Result<ArcRef<dyn Array, BooleanArray>> {
let array = self.array_ref()?;
ArcRef::new(array).try_map(|a| downcast_boolean_array(a))
Ok(ArcRef::new(array).map(|a| a.as_boolean()))
}

/// Attempt to get the native value of a literal from this `ColumnarValue`.
Expand Down
61 changes: 40 additions & 21 deletions crates/sparrow-runtime/src/prepare/preparer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;

use arrow::array::{ArrayRef, UInt64Array};
use arrow::compute::SortColumn;
Expand All @@ -11,13 +9,20 @@ use error_stack::{IntoReport, IntoReportCompat, ResultExt};
use futures::stream::FuturesUnordered;
use futures::{StreamExt, TryStreamExt};
use sparrow_api::kaskada::v1alpha::{PreparedFile, SourceData, TableConfig};
use std::sync::Arc;
use uuid::Uuid;

use crate::stores::{ObjectStoreRegistry, ObjectStoreUrl};
use crate::PreparedMetadata;

use super::{prepared_batches, write_parquet};

/// For now, this is a temporary location for the prepared files.
/// In the future, we'll want to move this path to a managed cache
/// so we can reuse state.
const KASKADA_PATH: &str = "kaskada";
const PREPARED_FILE_PREFIX: &str = "part";

#[derive(derive_more::Display, Debug)]
pub enum Error {
#[display(fmt = "batch missing required column '{_0}'")]
Expand Down Expand Up @@ -87,35 +92,29 @@ impl Preparer {
/// - This adds or casts columns as needed.
/// - This produces multiple parts if the input file is large.
/// - This produces metadata files alongside data files.
/// Parameters:
/// - `to_prepare`: The path to the parquet file to prepare.
/// - `prepare_prefix`: The prefix to write prepared files to.
pub async fn prepare_parquet(
&self,
path: &std::path::Path,
to_prepare: &str,
prepare_prefix: &ObjectStoreUrl,
) -> error_stack::Result<Vec<PreparedFile>, Error> {
// TODO: Support Slicing

// Prepared files are stored in the following format:
// file:///<cwd>/tables/<table_uuid>/prepared/<uuid>/part-<n>.parquet
let cur_dir = std::env::current_dir().expect("current dir");
let cur_dir = cur_dir.to_string_lossy();

let uuid = Uuid::new_v4();
let output_path_prefix = format!(
"file:///{}/tables/{}/prepare/{uuid}/",
cur_dir, self.table_config.uuid
);
let output_file_prefix = "part";

let output_url = ObjectStoreUrl::from_str(&output_path_prefix)
.change_context_lazy(|| Error::InvalidUrl(path.to_string_lossy().to_string()))?;
let output_url = self
.prepared_output_url_prefix(prepare_prefix)
.change_context_lazy(|| Error::InvalidUrl(prepare_prefix.to_string()))?;

let object_store = self
.object_stores
.object_store(&output_url)
.change_context(Error::Internal)?;

// TODO: support preparing from remote stores
let to_prepare = std::path::Path::new(to_prepare);
let source_data = SourceData {
source: Some(
SourceData::try_from_local(path)
SourceData::try_from_local(to_prepare)
.into_report()
.change_context(Error::Internal)?,
),
Expand All @@ -140,10 +139,10 @@ impl Preparer {
let (data, metadata) = next.change_context(Error::Internal)?;

let data_url = output_url
.join(&format!("{output_file_prefix}-{n}.parquet"))
.join(&format!("{PREPARED_FILE_PREFIX}-{n}.parquet"))
.change_context(Error::Internal)?;
let metadata_url = output_url
.join(&format!("{output_file_prefix}-{n}-metadata.parquet"))
.join(&format!("{PREPARED_FILE_PREFIX}-{n}-metadata.parquet"))
.change_context(Error::Internal)?;

// Create the prepared file via PreparedMetadata.
Expand Down Expand Up @@ -185,6 +184,26 @@ impl Preparer {
self.time_multiplier.as_ref(),
)
}

/// Creates the output url prefix to use for prepared files.
///
/// e.g. for osx: file:///<dir>/<KASKADA_PATH>/tables/<table_uuid>/prepared/<uuid>/
/// for s3: s3://<path>/<KASKADA_PATH>/tables/<table_uuid>/prepared/<uuid>/
fn prepared_output_url_prefix(
&self,
prefix: &ObjectStoreUrl,
) -> error_stack::Result<ObjectStoreUrl, crate::stores::registry::Error> {
let uuid = Uuid::new_v4();
let url = prefix
.join(KASKADA_PATH)?
jordanrfrazier marked this conversation as resolved.
Show resolved Hide resolved
.join("tables")?
.join(&self.table_config.uuid.to_string())?
.join("prepared")?
.join(&uuid.to_string())?
.ensure_directory()?;

Ok(url)
}
}

pub fn prepare_batch(
Expand Down
2 changes: 1 addition & 1 deletion crates/sparrow-runtime/src/stores.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
mod object_meta_ext;
mod object_store_key;
pub mod object_store_url;
mod registry;
pub mod registry;

pub use object_meta_ext::ObjectMetaExt;
pub use object_store_url::ObjectStoreUrl;
Expand Down
21 changes: 21 additions & 0 deletions crates/sparrow-runtime/src/stores/object_store_url.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,27 @@ impl ObjectStoreUrl {
.change_context_lazy(|| Error::InvalidUrl(self.url.to_string()))
}

/// Returns the URL, ensuring it is a directory.
pub fn ensure_directory(self) -> error_stack::Result<Self, Error> {
let mut url = self.url.to_string();
if !url.ends_with('/') {
url.push('/');
}

let url = Url::parse(&url)
.into_report()
.change_context_lazy(|| Error::InvalidUrl(self.url.to_string()))?;
Ok(Self { url })
}

/// Constructs an [ObjectStoreUrl] from a local directory path.
pub fn from_directory_path(path: &std::path::Path) -> error_stack::Result<Self, Error> {
let url = Url::from_directory_path(path)
.map_err(|_| Error::InvalidUrl(path.display().to_string()))?;

Ok(Self { url })
}

pub fn url(&self) -> &Url {
&self.url
}
Expand Down
1 change: 1 addition & 0 deletions crates/sparrow-session/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ sparrow-runtime = { path = "../sparrow-runtime" }
sparrow-syntax = { path = "../sparrow-syntax" }
sparrow-instructions = { path = "../sparrow-instructions" }
static_init.workspace = true
tempfile.workspace = true
tokio.workspace = true
tokio-stream.workspace = true
uuid.workspace = true
Expand Down
22 changes: 21 additions & 1 deletion crates/sparrow-session/src/session.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use hashbrown::HashMap;
use sparrow_runtime::stores::ObjectStoreRegistry;
use sparrow_runtime::stores::{ObjectStoreRegistry, ObjectStoreUrl};
use std::borrow::Cow;
use std::sync::Arc;

Expand Down Expand Up @@ -34,17 +34,36 @@ pub struct Session {
udfs: HashMap<Uuid, Arc<dyn Udf>>,
object_store_registry: Arc<ObjectStoreRegistry>,
rt: tokio::runtime::Runtime,
/// Default temporary path to prepare files to.
///
/// NOTE: we'll want to figure out how we're passing in the user-defined
/// destination url. It's possible it could be passed in as part of the
/// query, at which point we'd use that instead of this default temporary path.
prepare_prefix: ObjectStoreUrl,
/// Temporary directory for preparing files to in the local case.
///
/// Stored in the session to ensure it persists until the Session is dropped.
_temp_dir: tempfile::TempDir,
}

impl Default for Session {
fn default() -> Self {
// TODO: Support object stores
// Likely will need the option to pass in the destination url when executing the
// query or creating a table.

let temp_dir = tempfile::tempdir().expect("create temp dir");
let prepare_prefix =
ObjectStoreUrl::from_directory_path(temp_dir.path()).expect("valid path");
Self {
data_context: Default::default(),
dfg: Default::default(),
key_hash_inverse: Default::default(),
udfs: Default::default(),
object_store_registry: Default::default(),
rt: tokio::runtime::Runtime::new().expect("tokio runtime"),
prepare_prefix,
_temp_dir: temp_dir,
}
}
}
Expand Down Expand Up @@ -188,6 +207,7 @@ impl Session {
time_unit,
self.object_store_registry.clone(),
source,
self.prepare_prefix.clone(),
)
}

Expand Down
7 changes: 5 additions & 2 deletions crates/sparrow-session/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub struct Table {
key_hash_inverse: Arc<ThreadSafeKeyHashInverse>,
source: Source,
registry: Arc<ObjectStoreRegistry>,
prepare_prefix: ObjectStoreUrl,
}

#[derive(Debug)]
Expand All @@ -43,6 +44,7 @@ impl Table {
time_unit: Option<&str>,
object_stores: Arc<ObjectStoreRegistry>,
source: Option<&str>,
prepare_prefix: ObjectStoreUrl,
) -> error_stack::Result<Self, Error> {
let prepared_fields: Fields = KEY_FIELDS
.iter()
Expand Down Expand Up @@ -99,6 +101,7 @@ impl Table {
key_column: key_column + KEY_FIELDS.len(),
source,
registry: object_stores,
prepare_prefix: prepare_prefix.to_owned(),
})
}

Expand Down Expand Up @@ -134,7 +137,7 @@ impl Table {
Ok(())
}

pub async fn add_parquet(&self, path: &std::path::Path) -> error_stack::Result<(), Error> {
pub async fn add_parquet(&self, path: &str) -> error_stack::Result<(), Error> {
jordanrfrazier marked this conversation as resolved.
Show resolved Hide resolved
let file_sets = match &self.source {
Source::Parquet(file_sets) => file_sets.clone(),
other => error_stack::bail!(Error::internal_msg(format!(
Expand All @@ -145,7 +148,7 @@ impl Table {

let prepared = self
.preparer
.prepare_parquet(path)
.prepare_parquet(path, &self.prepare_prefix)
.await
.change_context(Error::Prepare)?;

Expand Down
1 change: 1 addition & 0 deletions python/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions python/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ impl Table {
fn add_parquet<'py>(&mut self, py: Python<'py>, path: String) -> Result<&'py PyAny> {
let rust_table = self.rust_table.clone();
Ok(pyo3_asyncio::tokio::future_into_py(py, async move {
let path = std::path::Path::new(&path);
let result = rust_table.add_parquet(path).await;
let result = rust_table.add_parquet(&path).await;
Python::with_gil(|py| {
result.unwrap();
Ok(py.None())
Expand Down
Loading