Skip to content

Commit

Permalink
feat: Update key hash inverses as new data arrives (#639)
Browse files Browse the repository at this point in the history
Previously, we froze the key hash inverse based on the data that was
available when a query started (by computing it from the merged data).
Now, we compute it as data is added and maintain it.

Note: This does mean that if a `with_key` runs with the same grouping as
an existing table, any keys it adds will be added to the in-memory key
hash inverse.
  • Loading branch information
bjchambers authored Aug 9, 2023
1 parent 720524b commit ae98c60
Show file tree
Hide file tree
Showing 19 changed files with 271 additions and 125 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/sparrow-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ pulsar = ["dep:pulsar", "avro", "lz4"]
ahash.workspace = true
anyhow.workspace = true
arrow.workspace = true
arrow-array.workspace = true
arrow-select.workspace = true
async-once-cell.workspace = true
async-stream.workspace = true
async-trait.workspace = true
Expand Down
81 changes: 52 additions & 29 deletions crates/sparrow-runtime/src/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,18 @@ use sparrow_arrow::scalar_value::ScalarValue;
use sparrow_compiler::{hash_compute_plan_proto, DataContext};
use sparrow_qfr::kaskada::sparrow::v1alpha::FlightRecordHeader;

use crate::execute::compute_store_guard::ComputeStoreGuard;
use crate::execute::error::Error;
use crate::execute::key_hash_inverse::{KeyHashInverse, ThreadSafeKeyHashInverse};
use crate::execute::operation::OperationContext;
use crate::execute::output::Destination;
use crate::key_hash_inverse::{KeyHashInverse, ThreadSafeKeyHashInverse};
use crate::stores::ObjectStoreRegistry;
use crate::RuntimeOptions;

mod checkpoints;
mod compute_executor;
mod compute_store_guard;
pub mod error;
pub(crate) mod key_hash_inverse;
pub(crate) mod operation;
pub mod output;
mod progress_reporter;
Expand Down Expand Up @@ -68,7 +68,7 @@ pub async fn execute(

// let output_at_time = request.final_result_time;

execute_new(plan, destination, data_context, options).await
execute_new(plan, destination, data_context, options, None).await
}

#[derive(Default, Debug)]
Expand Down Expand Up @@ -173,24 +173,12 @@ impl ExecutionOptions {
}
}

pub async fn execute_new(
plan: ComputePlan,
destination: Destination,
mut data_context: DataContext,
options: ExecutionOptions,
) -> error_stack::Result<impl Stream<Item = error_stack::Result<ExecuteResponse, Error>>, Error> {
let object_stores = Arc::new(ObjectStoreRegistry::default());

let plan_hash = hash_compute_plan_proto(&plan);

let compute_store = options
.compute_store(
object_stores.as_ref(),
plan.per_entity_behavior(),
&plan_hash,
)
.await?;

async fn load_key_hash_inverse(
plan: &ComputePlan,
data_context: &mut DataContext,
compute_store: &Option<ComputeStoreGuard>,
object_stores: &ObjectStoreRegistry,
) -> error_stack::Result<Arc<ThreadSafeKeyHashInverse>, Error> {
let primary_grouping_key_type = plan
.primary_grouping_key_type
.to_owned()
Expand All @@ -200,23 +188,58 @@ pub async fn execute_new(
.into_report()
.change_context(Error::internal_msg("decode primary_grouping_key_type"))?;

let mut key_hash_inverse = KeyHashInverse::from_data_type(primary_grouping_key_type.clone());
let primary_group_id = data_context
.get_or_create_group_id(&plan.primary_grouping, &primary_grouping_key_type)
.into_report()
.change_context(Error::internal_msg("get primary grouping ID"))?;

let mut key_hash_inverse = KeyHashInverse::from_data_type(&primary_grouping_key_type.clone());
if let Some(compute_store) = &compute_store {
if let Ok(restored) = KeyHashInverse::restore_from(compute_store.store_ref()) {
key_hash_inverse = restored
}
}

let primary_group_id = data_context
.get_or_create_group_id(&plan.primary_grouping, &primary_grouping_key_type)
.into_report()
.change_context(Error::internal_msg("get primary grouping ID"))?;

key_hash_inverse
.add_from_data_context(&data_context, primary_group_id, &object_stores)
.add_from_data_context(data_context, primary_group_id, object_stores)
.await
.change_context(Error::internal_msg("initialize key hash inverse"))?;
let key_hash_inverse = Arc::new(ThreadSafeKeyHashInverse::new(key_hash_inverse));
Ok(key_hash_inverse)
}

/// Execute a given query based on the options.
///
/// Parameters
/// ----------
/// - key_hash_inverse: If set, specifies the key hash inverses to use. If None, the
/// key hashes will be created.
pub async fn execute_new(
plan: ComputePlan,
destination: Destination,
mut data_context: DataContext,
options: ExecutionOptions,
key_hash_inverses: Option<Arc<ThreadSafeKeyHashInverse>>,
) -> error_stack::Result<impl Stream<Item = error_stack::Result<ExecuteResponse, Error>>, Error> {
let object_stores = Arc::new(ObjectStoreRegistry::default());

let plan_hash = hash_compute_plan_proto(&plan);

let compute_store = options
.compute_store(
object_stores.as_ref(),
plan.per_entity_behavior(),
&plan_hash,
)
.await?;

let key_hash_inverse = if let Some(key_hash_inverse) = key_hash_inverses {
key_hash_inverse
} else {
load_key_hash_inverse(&plan, &mut data_context, &compute_store, &object_stores)
.await
.change_context(Error::internal_msg("load key hash inverse"))?
};

// Channel for the output stats.
let (progress_updates_tx, progress_updates_rx) =
Expand Down Expand Up @@ -303,5 +326,5 @@ pub async fn materialize(
// TODO: the `execute_with_progress` method contains a lot of additional logic that is theoretically not needed,
// as the materialization does not exit, and should not need to handle cleanup tasks that regular
// queries do. We should likely refactor this to use a separate `materialize_with_progress` method.
execute_new(plan, destination, data_context, options).await
execute_new(plan, destination, data_context, options, None).await
}
2 changes: 1 addition & 1 deletion crates/sparrow-runtime/src/execute/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ use self::scan::ScanOperation;
use self::select::SelectOperation;
use self::tick::TickOperation;
use self::with_key::WithKeyOperation;
use crate::execute::key_hash_inverse::ThreadSafeKeyHashInverse;
use crate::execute::operation::expression_executor::{ExpressionExecutor, InputColumn};
use crate::execute::operation::shift_until::ShiftUntilOperation;
use crate::execute::Error;
use crate::key_hash_inverse::ThreadSafeKeyHashInverse;
use crate::stores::ObjectStoreRegistry;
use crate::Batch;

Expand Down
5 changes: 2 additions & 3 deletions crates/sparrow-runtime/src/execute/operation/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,9 +376,9 @@ mod tests {
use sparrow_compiler::DataContext;
use uuid::Uuid;

use crate::execute::key_hash_inverse::{KeyHashInverse, ThreadSafeKeyHashInverse};
use crate::execute::operation::testing::batches_to_csv;
use crate::execute::operation::{OperationContext, OperationExecutor};
use crate::key_hash_inverse::ThreadSafeKeyHashInverse;
use crate::read::testing::write_parquet_file;
use crate::stores::ObjectStoreRegistry;

Expand Down Expand Up @@ -486,8 +486,7 @@ mod tests {
})),
};

let key_hash_inverse = KeyHashInverse::from_data_type(DataType::Utf8);
let key_hash_inverse = Arc::new(ThreadSafeKeyHashInverse::new(key_hash_inverse));
let key_hash_inverse = Arc::new(ThreadSafeKeyHashInverse::from_data_type(&DataType::Utf8));

let (max_event_tx, mut max_event_rx) = tokio::sync::mpsc::unbounded_channel();
let (sender, receiver) = tokio::sync::mpsc::channel(10);
Expand Down
8 changes: 3 additions & 5 deletions crates/sparrow-runtime/src/execute/operation/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use itertools::Itertools;
use sparrow_api::kaskada::v1alpha::{ComputePlan, OperationPlan};
use sparrow_compiler::DataContext;

use crate::execute::key_hash_inverse::{KeyHashInverse, ThreadSafeKeyHashInverse};
use crate::execute::operation::{OperationContext, OperationExecutor};
use crate::key_hash_inverse::ThreadSafeKeyHashInverse;
use crate::stores::ObjectStoreRegistry;
use crate::Batch;

Expand Down Expand Up @@ -173,8 +173,7 @@ pub(super) async fn run_operation(
// Channel for the output stats.
let (progress_updates_tx, _) = tokio::sync::mpsc::channel(29);

let key_hash_inverse = KeyHashInverse::from_data_type(DataType::Utf8);
let key_hash_inverse = Arc::new(ThreadSafeKeyHashInverse::new(key_hash_inverse));
let key_hash_inverse = Arc::new(ThreadSafeKeyHashInverse::from_data_type(&DataType::Utf8));

let mut context = OperationContext {
plan: ComputePlan {
Expand Down Expand Up @@ -223,8 +222,7 @@ pub(super) async fn run_operation_json(
inputs.push(receiver);
}

let key_hash_inverse = KeyHashInverse::from_data_type(DataType::Utf8);
let key_hash_inverse = Arc::new(ThreadSafeKeyHashInverse::new(key_hash_inverse));
let key_hash_inverse = Arc::new(ThreadSafeKeyHashInverse::from_data_type(&DataType::Utf8));

let (max_event_tx, mut max_event_rx) = tokio::sync::mpsc::unbounded_channel();

Expand Down
7 changes: 4 additions & 3 deletions crates/sparrow-runtime/src/execute/operation/with_key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ use std::sync::Arc;

use super::BoxedOperation;
use crate::execute::error::{invalid_operation, Error};
use crate::execute::key_hash_inverse::ThreadSafeKeyHashInverse;
use crate::execute::operation::expression_executor::InputColumn;
use crate::execute::operation::single_consumer_helper::SingleConsumerHelper;
use crate::execute::operation::{InputBatch, Operation, OperationContext};
use crate::key_hash_inverse::ThreadSafeKeyHashInverse;
use crate::Batch;
use anyhow::Context;
use async_trait::async_trait;
Expand Down Expand Up @@ -115,8 +115,9 @@ impl WithKeyOperation {
// primary grouping to produce the key hash inverse for output.
if self.is_primary_grouping {
self.key_hash_inverse
.add(new_keys.to_owned(), &new_key_hashes)
.await?;
.add(new_keys.as_ref(), &new_key_hashes)
.await
.map_err(|e| e.into_error())?;
}

// Get the take indices, which will allow us to get the requested columns from
Expand Down
2 changes: 1 addition & 1 deletion crates/sparrow-runtime/src/execute/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ use sparrow_api::kaskada::v1alpha::execute_request::Limits;
use sparrow_api::kaskada::v1alpha::{data_type, ObjectStoreDestination, PulsarDestination};
use sparrow_arrow::downcast::{downcast_primitive_array, downcast_struct_array};

use crate::execute::key_hash_inverse::ThreadSafeKeyHashInverse;
use crate::execute::operation::OperationContext;
use crate::execute::progress_reporter::ProgressUpdate;
use crate::key_hash_inverse::ThreadSafeKeyHashInverse;
use crate::Batch;

mod object_store;
Expand Down
Loading

0 comments on commit ae98c60

Please sign in to comment.