diff --git a/src/mito2/src/read/merge.rs b/src/mito2/src/read/merge.rs index 917146391b08..144874984bc4 100644 --- a/src/mito2/src/read/merge.rs +++ b/src/mito2/src/read/merge.rs @@ -902,6 +902,47 @@ mod tests { .await; } + #[tokio::test] + async fn test_merge_many_duplicates() { + let mut builder = MergeReaderBuilder::new(); + builder.batch_size(3); + for i in 0..10 { + let batches: Vec<_> = (0..8) + .map(|ts| new_batch(b"k1", &[ts], &[i], &[OpType::Put], &[100])) + .collect(); + let reader = VecBatchReader::new(&batches); + builder.push_batch_reader(Box::new(reader)); + } + let mut reader = builder.build().await.unwrap(); + check_reader_result( + &mut reader, + &[ + new_batch( + b"k1", + &[0, 1, 2], + &[9, 9, 9], + &[OpType::Put, OpType::Put, OpType::Put], + &[100, 100, 100], + ), + new_batch( + b"k1", + &[3, 4, 5], + &[9, 9, 9], + &[OpType::Put, OpType::Put, OpType::Put], + &[100, 100, 100], + ), + new_batch( + b"k1", + &[6, 7], + &[9, 9], + &[OpType::Put, OpType::Put], + &[100, 100], + ), + ], + ) + .await; + } + #[tokio::test] async fn test_merge_more_than_batch_size() { let batches: Vec<_> = (0..MIN_BATCH_SIZE as i64 * 2)