Skip to content

Commit

Permalink
Add concurrent file sets
Browse files Browse the repository at this point in the history
  • Loading branch information
jordanrfrazier committed Sep 12, 2023
1 parent 4f5a75d commit 344d3dd
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 78 deletions.
42 changes: 34 additions & 8 deletions crates/sparrow-compiler/src/data_context.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use std::cell::RefCell;
use std::collections::BTreeMap;
use std::sync::Arc;
use std::sync::{Arc, Mutex};

use anyhow::Context;
use arrow::datatypes::{DataType, SchemaRef};
use sparrow_api::kaskada::v1alpha::compute_table::FileSet;
use sparrow_api::kaskada::v1alpha::slice_plan::Slice;
use sparrow_api::kaskada::v1alpha::{compute_table, ComputeTable, PreparedFile, TableConfig};
use sparrow_core::context_code;
Expand Down Expand Up @@ -303,6 +305,28 @@ pub struct GroupInfo {
key_type: DataType,
}

#[derive(Debug, Default, Clone)]
pub struct ConcurrentFileSets {
file_sets: Arc<Mutex<Vec<compute_table::FileSet>>>,
}

impl ConcurrentFileSets {
pub fn new(file_sets: Vec<compute_table::FileSet>) -> Self {
Self {
file_sets: Arc::new(Mutex::new(file_sets)),
}
}
pub fn push(&mut self, file_set: compute_table::FileSet) {
self.file_sets.lock().unwrap().push(file_set);
}

// TODO: Cloning is bad, but since it's locked behind a mutex, it's necessary.
// Can we do better?
pub fn get(&self) -> Vec<compute_table::FileSet> {
self.file_sets.lock().unwrap().clone()
}
}

/// Information about tables.
#[derive(Debug, Clone)]
pub struct TableInfo {
Expand All @@ -316,7 +340,7 @@ pub struct TableInfo {
/// slice configuration.
/// TODO: Make optional?
/// wrap both these in a Source class?
pub file_sets: Arc<Vec<compute_table::FileSet>>,
pub file_sets: ConcurrentFileSets,
/// An in-memory record batch for the contents of the table.
pub in_memory: Option<Arc<InMemoryBatches>>,
}
Expand All @@ -339,7 +363,7 @@ impl TableInfo {
group_id,
schema,
config,
file_sets: Arc::new(file_sets),
file_sets: ConcurrentFileSets::new(file_sets),
in_memory: None,
})
}
Expand Down Expand Up @@ -368,17 +392,18 @@ impl TableInfo {
&self.config
}

pub fn file_sets(&self) -> &[compute_table::FileSet] {
&self.file_sets
pub fn file_sets(&self) -> Vec<compute_table::FileSet> {
self.file_sets.get()
}

pub fn prepared_files_for_slice(
&self,
requested_slice: &Option<Slice>,
) -> anyhow::Result<&[PreparedFile]> {
) -> anyhow::Result<Vec<PreparedFile>> {
let file_set = self
.file_sets
.iter()
.get()
.into_iter()
.find(|set| {
set.slice_plan
.iter()
Expand All @@ -393,11 +418,12 @@ impl TableInfo {
)
})?;

Ok(&file_set.prepared_files)
Ok(file_set.prepared_files)
}

pub fn metadata_for_files(&self) -> Vec<String> {
self.file_sets
.get()
.iter()
.flat_map(|set| {
set.prepared_files
Expand Down
48 changes: 15 additions & 33 deletions crates/sparrow-runtime/src/prepare/preparer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ use arrow::record_batch::RecordBatch;
use arrow_array::Array;
use error_stack::{IntoReport, IntoReportCompat, ResultExt};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use futures::{StreamExt, TryStreamExt};
use sparrow_api::kaskada::v1alpha::{PreparedFile, Source, SourceData, TableConfig};
use uuid::Uuid;

use crate::stores::{ObjectStoreRegistry, ObjectStoreUrl};
use crate::PreparedMetadata;

use super::prepared_batches;
use super::{prepared_batches, write_parquet};

#[derive(derive_more::Display, Debug)]
pub enum Error {
Expand Down Expand Up @@ -52,27 +54,6 @@ pub struct Preparer {

impl Preparer {
/// Create a new prepare produce data with the given schema.
// pub fn new(
// time_column_name: String,
// subsort_column_name: Option<String>,
// key_column_name: String,
// prepared_schema: SchemaRef,
// prepare_hash: u64,
// time_unit: Option<&str>,
// object_stores: Arc<ObjectStoreRegistry>,
// ) -> error_stack::Result<Self, Error> {
// let time_multiplier = time_multiplier(time_unit)?;
// Ok(Self {
// prepared_schema,
// time_column_name,
// subsort_column_name,
// next_subsort: prepare_hash,
// key_column_name,
// time_multiplier,
// object_stores,
// })
// }

pub fn new(
table_config: Arc<TableConfig>,
prepared_schema: SchemaRef,
Expand Down Expand Up @@ -100,19 +81,19 @@ impl Preparer {
/// Prepare a parquet file.
/// todo; docs
pub async fn prepare_parquet(
&mut self,
&self,
path: &str,
) -> error_stack::Result<Vec<PreparedFile>, Error> {
// TODO:
// * Support Slicing
// TODO: Support Slicing

// TODO: What is the output url?
// TODO: Output URL
// in wren it is prepared/prep_<version_id>/<sliceplanhash>/file_id (a uuid)/
// file_id is persisted in wren. Don't know if that matters.
let output_path_prefix = "file://prepared/";
let uuid = Uuid::new_v4();
let output_path_prefix = format!("file://prepared/{uuid}/");
let output_file_prefix = "part";

let output_url = ObjectStoreUrl::from_str(output_path_prefix)
let output_url = ObjectStoreUrl::from_str(&output_path_prefix)
.change_context_lazy(|| Error::InvalidUrl(path.to_owned()))?;

let object_store = self
Expand All @@ -129,6 +110,7 @@ impl Preparer {
),
};

// TODO: Slicing
let mut prepare_stream =
prepared_batches(&self.object_stores, &source_data, &self.table_config, &None)
.await
Expand Down Expand Up @@ -165,7 +147,7 @@ impl Preparer {
}

// Wait for the uploads.
while let Some(upload) = uploads.try_next().await? {
while let Some(upload) = uploads.try_next().await.change_context(Error::Internal)? {
tracing::info!("Finished uploading {upload}");
}

Expand All @@ -181,9 +163,9 @@ impl Preparer {
/// Self is mutated as necessary to ensure the `subsort` column is increasing, if
/// it is added.
pub fn prepare_batch(&mut self, batch: RecordBatch) -> error_stack::Result<RecordBatch, Error> {
let time_column_name = self.table_config.time_column_name;
let subsort_column_name = self.table_config.subsort_column_name;
let key_column_name = self.table_config.group_column_name;
let time_column_name = self.table_config.time_column_name.clone();
let subsort_column_name = self.table_config.subsort_column_name.clone();
let key_column_name = self.table_config.group_column_name.clone();

let time = get_required_column(&batch, &time_column_name)?;
let time = cast_to_timestamp(time, self.time_multiplier)?;
Expand Down
14 changes: 12 additions & 2 deletions crates/sparrow-session/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ pub enum Error {
Errors(Vec<String>),
#[display(fmt = "failed to prepare batch")]
Prepare,
#[display(fmt = "internal error")]
Internal,
#[display(fmt = "internal error: {_0}")]
Internal(String),
#[display(fmt = "compile query")]
Compile,
#[display(fmt = "execute query")]
Expand All @@ -29,3 +29,13 @@ pub enum Error {
}

impl error_stack::Context for Error {}

impl Error {
pub fn internal() -> Self {
Error::Internal("no additional context".to_owned())
}

pub fn internal_msg(msg: String) -> Self {
Error::Internal(msg)
}
}
6 changes: 3 additions & 3 deletions crates/sparrow-session/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ impl Session {
)
.into_report()
.change_context(Error::Compile)?;
error_stack::ensure!(diagnostics.num_errors() == 0, Error::Internal);
error_stack::ensure!(diagnostics.num_errors() == 0, Error::internal());

// Extract the necessary subset of the DFG as an expression.
// This will allow us to operate without mutating things.
Expand Down Expand Up @@ -556,8 +556,8 @@ impl ExecutionOptions {
}

fn result_schema(expr: &Expr, key_type: &DataType) -> error_stack::Result<SchemaRef, Error> {
let DataType::Struct(fields) = expr.0.value_type().arrow_type().ok_or(Error::Internal)? else {
error_stack::bail!(Error::Internal)
let DataType::Struct(fields) = expr.0.value_type().arrow_type().ok_or(Error::internal())? else {
error_stack::bail!(Error::internal())
};

let fields: Fields = super::table::KEY_FIELDS
Expand Down
53 changes: 26 additions & 27 deletions crates/sparrow-session/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef};
use error_stack::ResultExt;
use sparrow_api::kaskada::v1alpha::compute_table::FileSet;
use sparrow_api::kaskada::v1alpha::{compute_table, PreparedFile};
use sparrow_compiler::TableInfo;
use sparrow_compiler::{ConcurrentFileSets, TableInfo};
use sparrow_merge::InMemoryBatches;
use sparrow_runtime::preparer::Preparer;
use sparrow_runtime::{key_hash_inverse::ThreadSafeKeyHashInverse, stores::ObjectStoreRegistry};
Expand Down Expand Up @@ -35,9 +35,10 @@ pub struct Table {
// Batch, Paths, Streams, per type?
}

#[derive(Debug)]
enum Source {
InMemoryBatches(Arc<InMemoryBatches>),
Parquet(Arc<Vec<compute_table::FileSet>>),
Parquet(ConcurrentFileSets),
}

impl Table {
Expand All @@ -62,22 +63,19 @@ impl Table {
// Filesets and in_memory should initially be empty.
// From python, we create the table with no inputs, then add data.
// TODO: Make these a single "Source" type in TableInfo?
error_stack::ensure!(table_info.file_sets().is_empty());
error_stack::ensure!(table_info.in_memory.is_none());
error_stack::ensure!(table_info.file_sets().is_empty(), Error::internal());
error_stack::ensure!(table_info.in_memory.is_none(), Error::internal());

// TODO: Slicing
let file_set = Arc::new(compute_table::FileSet {
slice_plan: None,
prepared_files: vec![],
});
let concurrent_file_sets = ConcurrentFileSets::default();

// Clone into the table_info, so that any modifications to our
// original reference are reflected within the table_info.
table_info.file_sets = Some(file_set.clone());
table_info.file_sets = concurrent_file_sets.clone();

let preparer = Preparer::new(
table_info.config().clone(),
prepared_schema,
prepared_schema.clone(),
prepare_hash,
time_unit,
object_stores,
Expand All @@ -91,16 +89,11 @@ impl Table {
// in my table.rs add_parquet file
// 2. See how Scan is reading in_memory and skipping normal execution, and emulate that
// for parquet files
// 3. Differentiate sources to ensure we can't add_data for parquet tables
// or add_parquet for inmemorybatch tables?
// 4. Other shit?

// TODO: Support other sources
let source = match source {
Some("parquet") => Source::Parquet(Arc::new(vec![])),
_ => Source::InMemoryBatches(Arc::new(InMemoryBatches::new(
querable, // TODO: fraz
prepared_schema.clone(),
))),
Some("parquet") => Source::Parquet(concurrent_file_sets),
_ => Source::InMemoryBatches(Arc::new(InMemoryBatches::new(prepared_schema))),
};

Ok(Self {
Expand All @@ -116,10 +109,13 @@ impl Table {
self.preparer.schema()
}

pub async fn add_data(&self, batch: RecordBatch) -> error_stack::Result<(), Error> {
let source = match self.source {
Source::InMemoryBatches(in_memory) => in_memory,
other => error_stack::bail!("expected in memory data source, saw {}", other),
pub async fn add_data(&mut self, batch: RecordBatch) -> error_stack::Result<(), Error> {
let source = match &self.source {
Source::InMemoryBatches(in_memory) => in_memory.clone(),
other => error_stack::bail!(Error::internal_msg(format!(
"expected in-memory data source, saw {:?}",
other
))),
};

let prepared = self
Expand All @@ -141,15 +137,18 @@ impl Table {
Ok(())
}

pub async fn add_parquet(&mut self, path: &str) -> error_stack::Result<(), Error> {
let source = match self.source {
Source::Parquet(file_sets) => file_sets,
other => error_stack::bail!("expected parquet data source, saw {}", other),
pub async fn add_parquet(&self, path: String) -> error_stack::Result<(), Error> {
let mut source = match &self.source {
Source::Parquet(file_sets) => file_sets.clone(),
other => error_stack::bail!(Error::internal_msg(format!(
"expected parquet data source, saw {:?}",
other
))),
};

let prepared = self
.preparer
.prepare_parquet(path)
.prepare_parquet(&path)
.await
.change_context(Error::Prepare)?;

Expand Down
4 changes: 3 additions & 1 deletion python/pysrc/kaskada/_ffi.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,13 @@ class Table:
subsort_column: Optional[str],
grouping_name: Optional[str],
time_unit: Optional[str],
source: Optional[str],
) -> None: ...
@property
def name(self) -> str: ...
async def add_pyarrow(self, data: pa.RecordBatch) -> None: ...
def expr(self) -> Expr: ...
async def add_pyarrow(self, data: pa.RecordBatch) -> None: ...
async def add_parquet(self, path: str) -> None: ...

class Udf(object):
def __init__(self, result_ty: str, result_fn: Callable[..., pa.Array]) -> None: ...
2 changes: 1 addition & 1 deletion python/pysrc/kaskada/sources/arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,4 +485,4 @@ async def add_file(self, path: str) -> None:
)
for batch in table.to_batches():
await self._ffi_table.add_pyarrow(batch)
# self._ffi_table.add_parquet(path)
# await self._ffi_table.add_parquet(path)
Loading

0 comments on commit 344d3dd

Please sign in to comment.