Skip to content

Commit

Permalink
General clean up / docs
Browse files Browse the repository at this point in the history
  • Loading branch information
jordanrfrazier committed Oct 27, 2023
1 parent 1178804 commit fafbd49
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 30 deletions.
2 changes: 1 addition & 1 deletion crates/sparrow-batch/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -730,8 +730,8 @@ mod tests {
use crate::testing::arb_arrays::arb_batch;
use crate::{Batch, RowTime};
use arrow_array::cast::AsArray;
use arrow_array::new_empty_array;
use arrow_array::types::{ArrowPrimitiveType, TimestampNanosecondType, UInt64Type};
use arrow_array::{new_empty_array, UInt64Array};
use itertools::Itertools;
use proptest::prelude::*;

Expand Down
11 changes: 10 additions & 1 deletion crates/sparrow-merge/src/gather.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,15 @@ impl Gatherer {
self.active.peek().map(|min| min.index)
}

/// Returns a boolean indicating whether the gatherer can produce a batch.
pub fn can_produce(&self) -> bool {
if let Some(top) = self.active.peek() {
top.up_to_time > self.emitted_up_to_time
} else {
false
}
}

/// Adds a batch to the pending set for the given index.
///
/// Returns `true` if the gatherer is ready to produce output.
Expand Down Expand Up @@ -307,7 +316,7 @@ impl GatheredBatches {
self.batches
.iter()
.map(|batches| -> error_stack::Result<Option<Batch>, Error> {
if batches.len() > 0 {
if !batches.is_empty() {
let batches = batches.to_vec();
Ok(Some(
Batch::concat(batches, self.up_to_time)
Expand Down
9 changes: 7 additions & 2 deletions crates/sparrow-merge/src/merge/heterogeneous_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ impl HeterogeneousMerge {
self.gatherer.add_batch(input, batch)
}

/// Returns a boolean indicating if the merge can produce a batch.
pub fn can_produce(&self) -> bool {
self.gatherer.can_produce()
}

/// Merges the next batch.
///
/// Note that this method can only be called on the active index,
Expand Down Expand Up @@ -244,8 +249,8 @@ impl HeterogeneousMerge {
) -> error_stack::Result<Arc<StructArray>, Error> {
// The result type of the merge does not flatten the structs.
//
// e.g. have X.a + Y.a then we have left: { a: i64 } and right: { a: i64 }.
// If we flatten that to {a: i64} we can no longer perform the arithmetic.
// e.g. we have [X.a + Y.a] then we have left: { a: i64 } and right: { a: i64 }.
// If we flatten that to { a: i64 } we can no longer perform the arithmetic.
// Instead, we want {left: {a: i64}, right: {a: i64 }} so we can do
// merged.left.a + merged.right.
let fields: Vec<(FieldRef, ArrayRef)> = match &self.result_type {
Expand Down
76 changes: 50 additions & 26 deletions crates/sparrow-merge/src/merge_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@ struct MergePartition {
///
/// This vec should contains one element for each input.
txs: Vec<tokio::sync::mpsc::UnboundedSender<Batch>>,
// Keeps track of the entities seen by this partition
// TODO: key_hash_index: KeyHashIndex,
/// Handles the merge logic.
///
/// All objects that require locking should be within this handler.
handler: Mutex<MergePartitionHandler>,
// Keeps track of the entities seen by this partition
// TODO: key_hash_index: KeyHashIndex,
}

impl MergePartition {
Expand All @@ -76,7 +76,7 @@ impl MergePartition {
///
/// Separated from the [MergePartition] to avoid multiple locks.
struct MergePartitionHandler {
/// The batch receivers for each input.
/// The receivers for each input.
///
/// This vec should contains one element for each input.
rxs: Vec<tokio::sync::mpsc::UnboundedReceiver<Batch>>,
Expand All @@ -100,7 +100,12 @@ impl MergePipeline {
/// Create a new merge pipeline.
///
/// Args:
/// consumers: The `InputHandles` to output the result of the transform to.
/// - `input_l`: The left input.
/// - `input_r`: The right input.
/// - `datatype_l`: The data type for the left input.
/// - `datatype_r`: The data type for the right input.
/// - `result_type`: The result type of this merge.
/// - `consumers`: The consumers for this merge.
pub fn try_new(
input_l: StepId,
input_r: StepId,
Expand All @@ -127,23 +132,38 @@ impl MergePipeline {
result_type: result_type.clone(),
})
}

fn complete_partition(
&self,
input_partition: Partition,
merge_partition: &MergePartition,
scheduler: &mut dyn Scheduler,
) -> error_stack::Result<(), PipelineError> {
tracing::info!(
"Input is closed and empty. Closing consumers and finishing partition {}.",
input_partition
);
self.consumers.close_input(input_partition, scheduler)?;
merge_partition.task.complete();
Ok(())
}
}

impl Pipeline for MergePipeline {
fn initialize(&mut self, tasks: Partitioned<TaskRef>) {
// TODO: FRAZ - need to create the channels here.
self.partitions = tasks
.into_iter()
.map(|task| {
let (tx_l, rx_l) = tokio::sync::mpsc::unbounded_channel();
let (tx_r, rx_r) = tokio::sync::mpsc::unbounded_channel();
MergePartition {
task,
is_closed: AtomicBool::new(false),
txs: Vec::new(),
// TODO: Error handling
txs: vec![tx_l, tx_r],
// TODO: Interpolation
// Current impl uses unlatched spread (`Null` interpolation), meaning discrete behavior.
// Current impl uses unlatched spread (`Null` interpolation).
handler: Mutex::new(MergePartitionHandler {
rxs: Vec::new(), // TODO: create channels
rxs: vec![rx_l, rx_r],
merger: HeterogeneousMerge::new(
&self.result_type,
&self.left.datatype,
Expand Down Expand Up @@ -273,33 +293,37 @@ impl Pipeline for MergePipeline {
}
}
Err(TryRecvError::Empty) => {
// TODO: This is uhh an invalid state? We called do_work, but have
// no work to do
todo!("?")
error_stack::ensure!(
partition.is_closed(),
PipelineError::illegal_state("scheduled without work")
);

// The channel is empty and the partition is closed, but the input may
// not have disconnected from the channel yet. This is okay -- if the partition
// is verified closed, we can continue to close our consumers here.
return self.complete_partition(input_partition, partition, scheduler);
}
Err(TryRecvError::Disconnected) => {
tracing::info!(
"Input is closed and empty. Closing consumers and finishing partition {}.",
input_partition
);
self.consumers.close_input(input_partition, scheduler)?;
partition.task.complete();
return Ok(());
return self.complete_partition(input_partition, partition, scheduler)
}
}
} else {
// Though all inputs are closed, the merger may have batches remaining to flush.
tracing::info!("Inputs are closed. Flushing merger.");
assert!(handler.merger.all_closed());

let last_batch = handler
.merger
.merge()
.change_context(PipelineError::Execution)?;
if !last_batch.is_empty() {
self.consumers
.add_input(input_partition, last_batch, scheduler)
// Check whether we need to flush the leftovers
if handler.merger.can_produce() {
let last_batch = handler
.merger
.merge()
.change_context(PipelineError::Execution)?;
if !last_batch.is_empty() {
self.consumers
.add_input(input_partition, last_batch, scheduler)
.change_context(PipelineError::Execution)?;
}
assert!(!handler.merger.can_produce(), "expected only one batch");
}

tracing::info!(
Expand Down

0 comments on commit fafbd49

Please sign in to comment.