Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref: move prepared files to ~/.cache dir #781

Merged
merged 9 commits into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

83 changes: 64 additions & 19 deletions crates/sparrow-runtime/src/prepare/preparer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ use crate::PreparedMetadata;

use super::{prepared_batches, write_parquet};

/// For now, this is a temporary location for the prepared files.
/// In the future, we'll want to move this path to a managed cache
/// so we can reuse state.
const KASKADA_PATH: &str = "kaskada";
const PREPARED_FILE_PREFIX: &str = "part";

#[derive(derive_more::Display, Debug)]
pub enum Error {
#[display(fmt = "batch missing required column '{_0}'")]
Expand Down Expand Up @@ -87,35 +93,27 @@ impl Preparer {
/// - This adds or casts columns as needed.
/// - This produces multiple parts if the input file is large.
/// - This produces metadata files alongside data files.
/// Parameters:
/// - `to_prepare`: The path to the parquet file to prepare.
/// - `prepare_prefix`: The prefix to write prepared files to.
pub async fn prepare_parquet(
&self,
path: &std::path::Path,
to_prepare: &str,
prepare_prefix: &ObjectStoreUrl,
) -> error_stack::Result<Vec<PreparedFile>, Error> {
// TODO: Support Slicing

// Prepared files are stored in the following format:
// file:///<cwd>/tables/<table_uuid>/prepared/<uuid>/part-<n>.parquet
let cur_dir = std::env::current_dir().expect("current dir");
let cur_dir = cur_dir.to_string_lossy();

let uuid = Uuid::new_v4();
let output_path_prefix = format!(
"file:///{}/tables/{}/prepare/{uuid}/",
cur_dir, self.table_config.uuid
);
let output_file_prefix = "part";

let output_url = ObjectStoreUrl::from_str(&output_path_prefix)
.change_context_lazy(|| Error::InvalidUrl(path.to_string_lossy().to_string()))?;
let output_url = self.prepared_output_url_prefix(prepare_prefix)?;

let object_store = self
.object_stores
.object_store(&output_url)
.change_context(Error::Internal)?;

// TODO: support preparing from remote stores
let to_prepare = std::path::Path::new(to_prepare);
let source_data = SourceData {
source: Some(
SourceData::try_from_local(path)
SourceData::try_from_local(to_prepare)
.into_report()
.change_context(Error::Internal)?,
),
Expand All @@ -140,10 +138,10 @@ impl Preparer {
let (data, metadata) = next.change_context(Error::Internal)?;

let data_url = output_url
.join(&format!("{output_file_prefix}-{n}.parquet"))
.join(&format!("{PREPARED_FILE_PREFIX}-{n}.parquet"))
.change_context(Error::Internal)?;
let metadata_url = output_url
.join(&format!("{output_file_prefix}-{n}-metadata.parquet"))
.join(&format!("{PREPARED_FILE_PREFIX}-{n}-metadata.parquet"))
.change_context(Error::Internal)?;

// Create the prepared file via PreparedMetadata.
Expand Down Expand Up @@ -185,6 +183,53 @@ impl Preparer {
self.time_multiplier.as_ref(),
)
}

/// Creates the output url prefix to use for prepared files.
///
/// e.g. for osx: file:///<dir>/<KASKADA_PATH>/tables/<table_uuid>/prepared/<uuid>
/// for s3: s3://<path>/<KASKADA_PATH>/tables/<table_uuid>/prepared/<uuid>
pub fn prepared_output_url_prefix(
jordanrfrazier marked this conversation as resolved.
Show resolved Hide resolved
&self,
prefix: &ObjectStoreUrl,
) -> error_stack::Result<ObjectStoreUrl, Error> {
let error = || Error::InvalidUrl(prefix.to_string());

// Append the trailing slash to make it a directory
let uuid = Uuid::new_v4();
let uuid = uuid.to_string() + "/";

if prefix.is_local() {
// The temp directory doesn't have a trailing slash, so we can't join on the existing URL.
let prefix = prefix.to_string();
let url = prefix
+ "/"
+ KASKADA_PATH
+ "/tables/"
+ &self.table_config.uuid.to_string()
+ "/prepared/"
+ &uuid;

let url = ObjectStoreUrl::from_str(&url).change_context_lazy(error)?;
Ok(url)
} else if prefix.is_s3() {
jordanrfrazier marked this conversation as resolved.
Show resolved Hide resolved
// TODO: This requires the s3 path has a trailing slash as-is.
// Should we check that and append it if we need to?
let url = prefix
.join(KASKADA_PATH)
.change_context_lazy(error)?
.join("tables")
.change_context_lazy(error)?
.join(&self.table_config.uuid.to_string())
.change_context_lazy(error)?
.join("prepared")
.change_context_lazy(error)?
.join(&uuid.to_string())
.change_context_lazy(error)?;
Ok(url)
} else {
error_stack::bail!(Error::Internal)
}
}
}

pub fn prepare_batch(
Expand Down
10 changes: 10 additions & 0 deletions crates/sparrow-runtime/src/stores/object_store_url.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@ impl ObjectStoreUrl {
self.url.make_relative(&url.url)
}

pub fn is_local(&self) -> bool {
self.url.scheme() == "file"
}

// TODO: Currently only supports the "s3" scheme
// Several others are supported by ObjectStore: https://docs.rs/object_store/latest/object_store/aws/struct.AmazonS3Builder.html#method.with_url
pub fn is_s3(&self) -> bool {
jordanrfrazier marked this conversation as resolved.
Show resolved Hide resolved
self.url.scheme() == "s3"
}

/// Return the local path, if this is a local file.
pub fn local_path(&self) -> Option<&Path> {
if self.url.scheme() == "file" {
Expand Down
1 change: 1 addition & 0 deletions crates/sparrow-session/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ sparrow-runtime = { path = "../sparrow-runtime" }
sparrow-syntax = { path = "../sparrow-syntax" }
sparrow-instructions = { path = "../sparrow-instructions" }
static_init.workspace = true
tempfile.workspace = true
tokio.workspace = true
tokio-stream.workspace = true
uuid.workspace = true
Expand Down
24 changes: 23 additions & 1 deletion crates/sparrow-session/src/session.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use hashbrown::HashMap;
use sparrow_runtime::stores::ObjectStoreRegistry;
use sparrow_runtime::stores::{ObjectStoreRegistry, ObjectStoreUrl};
use std::borrow::Cow;
use std::str::FromStr;
use std::sync::Arc;

use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef};
Expand Down Expand Up @@ -34,17 +35,37 @@ pub struct Session {
udfs: HashMap<Uuid, Arc<dyn Udf>>,
object_store_registry: Arc<ObjectStoreRegistry>,
rt: tokio::runtime::Runtime,
/// Url to prepare files to.
///
/// May be a path to a temporary directory, remote object store, etc.
/// Note: Creating in the Session ensures temporary files won't be dropped
/// until the session is closed.
prepare_prefix: ObjectStoreUrl,
/// Temporary directory for preparing files to in the local case.
///
/// Stored in the session to ensure it persists until the Session is dropped.
_temp_dir: tempfile::TempDir,
}

impl Default for Session {
fn default() -> Self {
// TODO: Support object stores
// Likely will need the option to pass in the destination url when executing the
// query or creating a table.
let temp_dir = tempfile::tempdir().expect("create temp dir");
let mut url = "file://".to_owned();
url.push_str(temp_dir.path().to_str().expect("valid string"));
jordanrfrazier marked this conversation as resolved.
Show resolved Hide resolved
let prepare_prefix = ObjectStoreUrl::from_str(&url).expect("object store url");

Self {
data_context: Default::default(),
dfg: Default::default(),
key_hash_inverse: Default::default(),
udfs: Default::default(),
object_store_registry: Default::default(),
rt: tokio::runtime::Runtime::new().expect("tokio runtime"),
prepare_prefix,
_temp_dir: temp_dir,
}
}
}
Expand Down Expand Up @@ -188,6 +209,7 @@ impl Session {
time_unit,
self.object_store_registry.clone(),
source,
self.prepare_prefix.clone(),
)
}

Expand Down
7 changes: 5 additions & 2 deletions crates/sparrow-session/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub struct Table {
key_hash_inverse: Arc<ThreadSafeKeyHashInverse>,
source: Source,
registry: Arc<ObjectStoreRegistry>,
prepare_prefix: ObjectStoreUrl,
}

#[derive(Debug)]
Expand All @@ -43,6 +44,7 @@ impl Table {
time_unit: Option<&str>,
object_stores: Arc<ObjectStoreRegistry>,
source: Option<&str>,
prepare_prefix: ObjectStoreUrl,
) -> error_stack::Result<Self, Error> {
let prepared_fields: Fields = KEY_FIELDS
.iter()
Expand Down Expand Up @@ -99,6 +101,7 @@ impl Table {
key_column: key_column + KEY_FIELDS.len(),
source,
registry: object_stores,
prepare_prefix: prepare_prefix.to_owned(),
})
}

Expand Down Expand Up @@ -134,7 +137,7 @@ impl Table {
Ok(())
}

pub async fn add_parquet(&self, path: &std::path::Path) -> error_stack::Result<(), Error> {
pub async fn add_parquet(&self, path: &str) -> error_stack::Result<(), Error> {
jordanrfrazier marked this conversation as resolved.
Show resolved Hide resolved
let file_sets = match &self.source {
Source::Parquet(file_sets) => file_sets.clone(),
other => error_stack::bail!(Error::internal_msg(format!(
Expand All @@ -145,7 +148,7 @@ impl Table {

let prepared = self
.preparer
.prepare_parquet(path)
.prepare_parquet(path, &self.prepare_prefix)
.await
.change_context(Error::Prepare)?;

Expand Down
1 change: 1 addition & 0 deletions python/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions python/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ impl Table {
fn add_parquet<'py>(&mut self, py: Python<'py>, path: String) -> Result<&'py PyAny> {
let rust_table = self.rust_table.clone();
Ok(pyo3_asyncio::tokio::future_into_py(py, async move {
let path = std::path::Path::new(&path);
let result = rust_table.add_parquet(path).await;
let result = rust_table.add_parquet(&path).await;
Python::with_gil(|py| {
result.unwrap();
Ok(py.None())
Expand Down
Loading