Skip to content

Commit

Permalink
revert merge changes
Browse files Browse the repository at this point in the history
  • Loading branch information
jordanrfrazier committed Oct 10, 2023
1 parent 594bddb commit 4124488
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 105 deletions.
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions crates/sparrow-merge/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ testing = ["arrow-csv", "proptest"]

[dependencies]
anyhow.workspace = true
arrow.workspace = true
arrow-arith.workspace = true
arrow-array.workspace = true
arrow-csv = { workspace = true, optional = true }
Expand All @@ -31,7 +30,6 @@ proptest = { workspace = true, optional = true }
smallvec.workspace = true
sparrow-arrow = { path = "../sparrow-arrow" }
sparrow-core = { path = "../sparrow-core" }
sparrow-interfaces = { path = "../sparrow-interfaces" }
tokio.workspace = true
tracing.workspace = true

Expand Down
158 changes: 158 additions & 0 deletions crates/sparrow-merge/src/in_memory_batches.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
use std::sync::RwLock;

use arrow_array::RecordBatch;
use arrow_schema::SchemaRef;
use error_stack::{IntoReportCompat, ResultExt};
use futures::Stream;

use crate::old::homogeneous_merge;

#[derive(derive_more::Display, Debug)]
pub enum Error {
#[display(fmt = "failed to add in-memory batch")]
Add,
#[display(fmt = "receiver lagged")]
ReceiverLagged,
}

impl error_stack::Context for Error {}

/// Struct for managing in-memory batches.
#[derive(Debug)]
pub struct InMemoryBatches {
/// Whether rows added will be available for interactive queries.
/// If False, rows will be discarded after being sent to any active
/// materializations.
queryable: bool,
current: RwLock<Current>,
sender: async_broadcast::Sender<(usize, RecordBatch)>,
/// A subscriber that is never used -- it exists only to keep the sender
/// alive.
_receiver: async_broadcast::InactiveReceiver<(usize, RecordBatch)>,
}

#[derive(Debug)]
struct Current {
schema: SchemaRef,
version: usize,
batch: RecordBatch,
}

impl Current {
pub fn new(schema: SchemaRef) -> Self {
let batch = RecordBatch::new_empty(schema.clone());
Self {
schema,
version: 0,
batch,
}
}

pub fn add_batch(&mut self, batch: &RecordBatch) -> error_stack::Result<(), Error> {
if self.batch.num_rows() == 0 {
self.batch = batch.clone();
} else {
// This assumes that cloning the old batch is cheap.
// If it isn't, we could replace it with an empty batch (`std::mem::replace`),
// put it in an option, or allow `homogeneous_merge` to take `&RecordBatch`.
self.batch = homogeneous_merge(&self.schema, vec![self.batch.clone(), batch.clone()])
.into_report()
.change_context(Error::Add)?;
}
Ok(())
}
}

impl InMemoryBatches {
pub fn new(queryable: bool, schema: SchemaRef) -> Self {
let (mut sender, receiver) = async_broadcast::broadcast(10);

// Don't wait for a receiver. If no-one receives, `send` will fail.
sender.set_await_active(false);

let current = RwLock::new(Current::new(schema.clone()));
Self {
queryable,
current,
sender,
_receiver: receiver.deactivate(),
}
}

/// Add a batch, merging it into the in-memory version.
///
/// Publishes the new batch to the subscribers.
pub async fn add_batch(&self, batch: RecordBatch) -> error_stack::Result<(), Error> {
if batch.num_rows() == 0 {
return Ok(());
}

let new_version = {
let mut write = self.current.write().map_err(|_| Error::Add)?;
if self.queryable {
write.add_batch(&batch)?;
}
write.version += 1;
write.version
};

let send_result = self.sender.broadcast((new_version, batch)).await;
if send_result.is_err() {
assert!(!self.sender.is_closed());
tracing::info!("No-one subscribed for new batch");
}
Ok(())
}

/// Create a stream subscribed to the batches.
///
/// The first batch will be the in-memory merged batch, and batches will be
/// added as they arrive.
pub fn subscribe(
&self,
) -> impl Stream<Item = error_stack::Result<RecordBatch, Error>> + 'static {
let (mut version, merged) = {
let read = self.current.read().unwrap();
(read.version, read.batch.clone())
};
let mut recv = self.sender.new_receiver();

async_stream::try_stream! {
tracing::info!("Starting subscriber with version {version}");
if merged.num_rows() > 0 {
yield merged;
}

loop {
match recv.recv().await {
Ok((recv_version, batch)) => {
if version < recv_version {
tracing::info!("Received version {recv_version}");
yield batch;
version = recv_version;
} else {
tracing::warn!("Ignoring old version {recv_version}");
}
}
Err(async_broadcast::RecvError::Closed) => {
tracing::info!("Sender closed.");
break;
},
Err(async_broadcast::RecvError::Overflowed(_)) => {
Err(Error::ReceiverLagged)?;
}
}
}
}
}

/// Retrieve the current in-memory batch.
pub fn current(&self) -> Option<RecordBatch> {
let batch = self.current.read().unwrap().batch.clone();
if batch.num_rows() == 0 {
None
} else {
Some(batch)
}
}
}
3 changes: 3 additions & 0 deletions crates/sparrow-merge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,7 @@
clippy::undocumented_unsafe_blocks
)]

mod in_memory_batches;
pub mod old;

pub use in_memory_batches::*;
101 changes: 0 additions & 101 deletions crates/sparrow-merge/src/old/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,53 +19,6 @@ pub trait InputItem: Sized {
fn split_at(self, split_time: i64) -> anyhow::Result<(Option<Self>, Option<Self>)>;
}

impl InputItem for Batch {
fn min_time(&self) -> i64 {
self.lower_bound.time
}

fn max_time(&self) -> i64 {
self.upper_bound.time
}

fn split_at(self, split_time: i64) -> anyhow::Result<(Option<Self>, Option<Self>)> {
if self.is_empty() {
return Ok((None, None));
} else if split_time <= self.min_time() {
return Ok((None, Some(self)));
} else if split_time > self.max_time() {
return Ok((Some(self), None));
}

let times = self.times()?;
let split_point = match times.binary_search(&split_time) {
Ok(mut found_index) => {
// Just do a linear search for the first value less than split time.
while found_index > 0 && times[found_index - 1] == split_time {
found_index -= 1
}
found_index
}
Err(not_found_index) => not_found_index,
};

let lt = if split_point > 0 {
let lt = self.data.slice(0, split_point);
Some(Batch::try_new_from_batch(lt)?)
} else {
None
};

let gte = if split_point < self.num_rows() {
let gte = self.data.slice(split_point, self.num_rows() - split_point);
Some(Batch::try_new_from_batch(gte)?)
} else {
None
};
Ok((lt, gte))
}
}

/// A collection of zero or more `InputItems` ordered by time.
#[derive(Debug, PartialEq, Eq)]
pub struct OrderedInputs<T> {
Expand Down Expand Up @@ -289,15 +242,6 @@ impl<T: InputItem> std::ops::Index<usize> for OrderedInputs<T> {
mod tests {
use super::*;
use std::ops::RangeInclusive;
use std::sync::Arc;

use arrow::datatypes::{DataType, Field};
use arrow_array::RecordBatch;
use itertools::Itertools;
use proptest::prelude::*;
use sparrow_core::TableSchema;

use crate::old::testing::{arb_i64_array, arb_key_triples};

fn ranges(
ranges: impl IntoIterator<Item = RangeInclusive<i64>>,
Expand Down Expand Up @@ -405,51 +349,6 @@ mod tests {
assert_eq!(inputs.max_time, 10);
}

fn arb_batch(max_len: usize) -> impl Strategy<Value = RecordBatch> {
(1..max_len)
.prop_flat_map(|len| (arb_key_triples(len), arb_i64_array(len)))
.prop_map(|((time, subsort, key_hash), values)| {
let schema = TableSchema::from_data_fields([Arc::new(Field::new(
"data",
DataType::Int64,
true,
))])
.unwrap();
let schema = schema.schema_ref();
RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(time),
Arc::new(subsort),
Arc::new(key_hash),
Arc::new(values),
],
)
.unwrap()
})
}

proptest! {
#[test]
fn test_splitting(batch in arb_batch(1000)) {
// For every time value in the batch, try splitting there and make sure
// the ordering constraints are satisfied.
let input = Batch::try_new_from_batch(batch).unwrap();
let times = input.times().unwrap();

for split_time in times.iter().dedup() {
let (lt, gte) = input.clone().split_at(*split_time).unwrap();

if let Some(lt) = lt {
lt.times().unwrap().iter().all(|t| *t < *split_time);
}
if let Some(gte) = gte {
gte.times().unwrap().iter().all(|t| *t >= *split_time);
}
}
}
}

// TODO: proptests for the OrderedInputs?
// These should focus on the correctness conditions, so that
// the heuristics can be adjusted.
Expand Down

0 comments on commit 4124488

Please sign in to comment.