Skip to content

Commit

Permalink
Add in-memory batch unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jordanrfrazier committed Oct 18, 2023
1 parent 1a0adcb commit e2b35ef
Show file tree
Hide file tree
Showing 3 changed files with 185 additions and 4 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/sparrow-sources/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ sparrow-interfaces = { path = "../sparrow-interfaces" }
tracing.workspace = true

[dev-dependencies]
tokio.workspace = true

[lib]
bench = false
Expand Down
187 changes: 183 additions & 4 deletions crates/sparrow-sources/src/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ use std::sync::{Arc, RwLock};
use arrow_array::RecordBatch;
use arrow_schema::{DataType, SchemaRef};
use error_stack::{IntoReportCompat, ResultExt};
use futures::{Stream, StreamExt, TryStreamExt};
use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};

use sparrow_batch::Batch;
use sparrow_interfaces::source::{Source, SourceError};
Expand Down Expand Up @@ -148,6 +149,8 @@ impl InMemoryBatches {
/// Add a batch, merging it into the in-memory version.
///
/// Publishes the new batch to the subscribers.
///
/// Note: This assumes the batch has been prepared, and will likely panic if not.
pub async fn add_batch(&self, batch: RecordBatch) -> error_stack::Result<(), SourceError> {
if batch.num_rows() == 0 {
return Ok(());
Expand All @@ -174,9 +177,7 @@ impl InMemoryBatches {
///
/// The first batch will be the in-memory merged batch, and batches will be
/// added as they arrive.
pub fn subscribe(
&self,
) -> impl Stream<Item = error_stack::Result<RecordBatch, SourceError>> + 'static {
pub fn subscribe(&self) -> BoxStream<'static, error_stack::Result<RecordBatch, SourceError>> {
let (mut version, merged) = {
let read = self.current.read().unwrap();
(read.version, read.batch.clone())
Expand Down Expand Up @@ -210,6 +211,7 @@ impl InMemoryBatches {
}
}
}
.boxed()
}

/// Retrieve the current in-memory batch.
Expand All @@ -222,3 +224,180 @@ impl InMemoryBatches {
}
}
}

#[cfg(test)]
mod tests {
use arrow_array::{
types::{ArrowPrimitiveType, TimestampNanosecondType},
ArrayRef, Int32Array, TimestampNanosecondArray, UInt64Array,
};
use arrow_schema::{Field, Schema};

use super::*;

#[tokio::test]
async fn test_subscribe_to_batches() {
let time: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![0, 1, 2]));
let subsort: ArrayRef = Arc::new(UInt64Array::from(vec![0, 1, 2]));
let key: ArrayRef = Arc::new(UInt64Array::from(vec![0, 0, 0]));
let a: ArrayRef = Arc::new(Int32Array::from(vec![0, 1, 2]));
let b: ArrayRef = Arc::new(UInt64Array::from(vec![None, Some(1), Some(8)]));
let schema = Schema::new(vec![
Field::new("_time", TimestampNanosecondType::DATA_TYPE, false),
Field::new("_subsort", DataType::UInt64, false),
Field::new("_key_hash", DataType::UInt64, false),
Field::new("a", a.data_type().clone(), false),
Field::new("b", b.data_type().clone(), true),
]);
let schema = Arc::new(schema);
let batch1 = RecordBatch::try_new(
schema.clone(),
vec![time, subsort.clone(), key.clone(), a, b],
)
.unwrap();

let time2: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![3, 4, 5]));
let a2: ArrayRef = Arc::new(Int32Array::from(vec![3, 4, 5]));
let b2: ArrayRef = Arc::new(UInt64Array::from(vec![Some(10), None, None]));
// Can reuse the subsort/key arrays
let batch2 =
RecordBatch::try_new(schema.clone(), vec![time2, subsort, key, a2, b2]).unwrap();

let in_mem = InMemoryBatches::new(true, schema.clone());

let mut s1 = in_mem.subscribe();
let mut s2 = in_mem.subscribe();

// Add the first batch
in_mem.add_batch(batch1.clone()).await.unwrap();

let b1_s1 = s1.next().await.unwrap().unwrap();
assert_eq!(batch1, b1_s1);
let b1_s2 = s2.next().await.unwrap().unwrap();
assert_eq!(batch1, b1_s2);

// Add the second batch
in_mem.add_batch(batch2.clone()).await.unwrap();
let b2_s1 = s1.next().await.unwrap().unwrap();
assert_eq!(batch2, b2_s1);
let b2_s2 = s2.next().await.unwrap().unwrap();
assert_eq!(batch2, b2_s2);
}

#[tokio::test]
async fn test_subscribe_to_multiple_batches() {
// Sends multiple batches before reading

let time: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![0, 1, 2]));
let subsort: ArrayRef = Arc::new(UInt64Array::from(vec![0, 1, 2]));
let key: ArrayRef = Arc::new(UInt64Array::from(vec![0, 0, 0]));
let a: ArrayRef = Arc::new(Int32Array::from(vec![0, 1, 2]));
let b: ArrayRef = Arc::new(UInt64Array::from(vec![None, Some(1), Some(8)]));
let schema = Schema::new(vec![
Field::new("_time", TimestampNanosecondType::DATA_TYPE, false),
Field::new("_subsort", DataType::UInt64, false),
Field::new("_key_hash", DataType::UInt64, false),
Field::new("a", a.data_type().clone(), false),
Field::new("b", b.data_type().clone(), true),
]);
let schema = Arc::new(schema);
let batch1 = RecordBatch::try_new(
schema.clone(),
vec![time, subsort.clone(), key.clone(), a, b],
)
.unwrap();

let time2: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![3, 4, 5]));
let a2: ArrayRef = Arc::new(Int32Array::from(vec![3, 4, 5]));
let b2: ArrayRef = Arc::new(UInt64Array::from(vec![Some(10), None, None]));
// Can reuse the subsort/key arrays
let batch2 =
RecordBatch::try_new(schema.clone(), vec![time2, subsort, key, a2, b2]).unwrap();

let in_mem = InMemoryBatches::new(true, schema.clone());

let mut s1 = in_mem.subscribe();
let mut s2 = in_mem.subscribe();

// Add both batches before reading
in_mem.add_batch(batch1.clone()).await.unwrap();
in_mem.add_batch(batch2.clone()).await.unwrap();

let b1_s1 = s1.next().await.unwrap().unwrap();
assert_eq!(batch1, b1_s1);
let b1_s2 = s2.next().await.unwrap().unwrap();
assert_eq!(batch1, b1_s2);

let b2_s1 = s1.next().await.unwrap().unwrap();
assert_eq!(batch2, b2_s1);
let b2_s2 = s2.next().await.unwrap().unwrap();
assert_eq!(batch2, b2_s2);
}

#[tokio::test]
async fn test_late_subscription_receives_merged_batch() {
// Verify later subscription gets the full merged batch

let time: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![0, 1, 2]));
let subsort: ArrayRef = Arc::new(UInt64Array::from(vec![0, 1, 2]));
let key: ArrayRef = Arc::new(UInt64Array::from(vec![0, 0, 0]));
let a: ArrayRef = Arc::new(Int32Array::from(vec![0, 1, 2]));
let b: ArrayRef = Arc::new(UInt64Array::from(vec![None, Some(1), Some(8)]));
let schema = Schema::new(vec![
Field::new("_time", TimestampNanosecondType::DATA_TYPE, false),
Field::new("_subsort", DataType::UInt64, false),
Field::new("_key_hash", DataType::UInt64, false),
Field::new("a", a.data_type().clone(), false),
Field::new("b", b.data_type().clone(), true),
]);
let schema = Arc::new(schema);
let batch1 = RecordBatch::try_new(
schema.clone(),
vec![time, subsort.clone(), key.clone(), a, b],
)
.unwrap();

let time2: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![3, 4, 5]));
let a2: ArrayRef = Arc::new(Int32Array::from(vec![3, 4, 5]));
let b2: ArrayRef = Arc::new(UInt64Array::from(vec![Some(10), None, None]));
// Can reuse the subsort/key arrays
let batch2 =
RecordBatch::try_new(schema.clone(), vec![time2, subsort, key, a2, b2]).unwrap();

let in_mem = InMemoryBatches::new(true, schema.clone());

// Subscribe first stream
let mut s1 = in_mem.subscribe();

// Send both batches. In-memory should have merged them.
in_mem.add_batch(batch1.clone()).await.unwrap();
in_mem.add_batch(batch2.clone()).await.unwrap();

// Subscribe second stream
let mut s2 = in_mem.subscribe();

let b1_s1 = s1.next().await.unwrap().unwrap();
assert_eq!(batch1, b1_s1);

let b2_s1 = s1.next().await.unwrap().unwrap();
assert_eq!(batch2, b2_s1);

let time: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![0, 1, 2, 3, 4, 5]));
let subsort: ArrayRef = Arc::new(UInt64Array::from(vec![0, 1, 2, 0, 1, 2]));
let key: ArrayRef = Arc::new(UInt64Array::from(vec![0, 0, 0, 0, 0, 0]));
let a: ArrayRef = Arc::new(Int32Array::from(vec![0, 1, 2, 3, 4, 5]));
let b: ArrayRef = Arc::new(UInt64Array::from(vec![
None,
Some(1),
Some(8),
Some(10),
None,
None,
]));
let merged_batch =
RecordBatch::try_new(schema.clone(), vec![time, subsort, key, a, b]).unwrap();

let b2_s2 = s2.next().await.unwrap().unwrap();
assert_eq!(merged_batch, b2_s2);
}
}

0 comments on commit e2b35ef

Please sign in to comment.