diff --git a/columnar/src/columnar/merge/mod.rs b/columnar/src/columnar/merge/mod.rs index a717c5271a..3751d9258d 100644 --- a/columnar/src/columnar/merge/mod.rs +++ b/columnar/src/columnar/merge/mod.rs @@ -2,12 +2,14 @@ mod merge_dict_column; mod merge_mapping; mod term_merger; -use std::collections::{BTreeMap, HashMap, HashSet}; +use std::collections::{HashMap, HashSet}; use std::io; use std::net::Ipv6Addr; +use std::rc::Rc; use std::sync::Arc; -use itertools::Itertools; +use common::GroupByIteratorExtended; +use itertools::{EitherOrBoth, Itertools}; pub use merge_mapping::{MergeRowOrder, ShuffleMergeOrder, StackMergeOrder}; use super::writer::ColumnarSerializer; @@ -18,7 +20,8 @@ use crate::columnar::writer::CompatibleNumericalTypes; use crate::columnar::ColumnarReader; use crate::dynamic_column::DynamicColumn; use crate::{ - BytesColumn, Column, ColumnIndex, ColumnType, ColumnValues, NumericalType, NumericalValue, + BytesColumn, Column, ColumnIndex, ColumnType, ColumnValues, DynamicColumnHandle, NumericalType, + NumericalValue, }; /// Column types are grouped into different categories. @@ -28,7 +31,7 @@ use crate::{ /// In practise, today, only Numerical colummns are coerced into one type today. /// /// See also [README.md]. -#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)] +#[derive(Copy, Clone, Eq, PartialEq, PartialOrd, Ord, Hash, Debug)] pub(crate) enum ColumnTypeCategory { Bool, Str, @@ -83,9 +86,13 @@ pub fn merge_columnar( .iter() .map(|reader| reader.num_rows()) .collect::>(); - let columns_to_merge = - group_columns_for_merge(columnar_readers, required_columns, &merge_row_order)?; - for ((column_name, column_type), columns) in columns_to_merge { + + let columns_to_merge_iter = + group_columns_for_merge_iter(columnar_readers, required_columns, &merge_row_order)?; + for res in columns_to_merge_iter { + let (column_name, column_type, grouped_columns) = res?; + let columns = grouped_columns.columns; + let mut column_serializer = serializer.start_serialize_column(column_name.as_bytes(), column_type); merge_column( @@ -97,6 +104,7 @@ pub fn merge_columnar( )?; column_serializer.finalize()?; } + serializer.finalize(merge_row_order.num_rows())?; Ok(()) } @@ -214,11 +222,11 @@ struct GroupedColumns { } impl GroupedColumns { - fn for_category(column_category: ColumnTypeCategory, num_columnars: usize) -> Self { + fn new(num_columnars: usize) -> Self { GroupedColumns { required_column_type: None, columns: vec![None; num_columnars], - column_category, + column_category: ColumnTypeCategory::Numerical, } } @@ -293,7 +301,7 @@ fn merged_numerical_columns_type<'a>( fn is_empty_after_merge( merge_row_order: &MergeRowOrder, column: &DynamicColumn, - columnar_id: usize, + columnar_ord: usize, ) -> bool { if column.num_values() == 0u32 { // It was empty before the merge. @@ -305,7 +313,7 @@ fn is_empty_after_merge( false } MergeRowOrder::Shuffled(shuffled) => { - if let Some(alive_bitset) = &shuffled.alive_bitsets[columnar_id] { + if let Some(alive_bitset) = &shuffled.alive_bitsets[columnar_ord] { let column_index = column.column_index(); match column_index { ColumnIndex::Empty { .. } => true, @@ -348,56 +356,115 @@ fn is_empty_after_merge( } } -#[allow(clippy::type_complexity)] -fn group_columns_for_merge( - columnar_readers: &[&ColumnarReader], - required_columns: &[(String, ColumnType)], - merge_row_order: &MergeRowOrder, -) -> io::Result>>> { - // Each column name may have multiple types of column associated. - // For merging we are interested in the same column type category since they can be merged. - let mut columns_grouped: HashMap<(String, ColumnTypeCategory), GroupedColumns> = HashMap::new(); +type MergeIter<'a> = + Box, ColumnType, GroupedColumns)>> + 'a>; - for &(ref column_name, column_type) in required_columns { - columns_grouped - .entry((column_name.clone(), column_type.into())) - .or_insert_with(|| { - GroupedColumns::for_category(column_type.into(), columnar_readers.len()) - }) - .require_type(column_type)?; - } +/// Iterates over the columns of the columnar readers, grouped by column name. +/// Key functionality is that `open` of the Columns is done lazy per group. +fn group_columns_for_merge_iter<'a>( + columnar_readers: &'a [&'a ColumnarReader], + required_columns: &'a [(String, ColumnType)], + merge_row_order: &'a MergeRowOrder, +) -> io::Result, ColumnType, GroupedColumns)>> + 'a> { + let column_iters: Vec<_> = columnar_readers + .iter() + .enumerate() + .map(|(reader_ord, reader)| { + Ok(reader + .iter_columns()? + .map(move |el| (Rc::new(el.0), reader_ord, el.1))) + }) + .collect::>()?; + let required_columns_map: HashMap = required_columns + .iter() + .map(|(col_name, typ)| (col_name.to_string(), typ)) + .collect::>(); + let mut required_columns_list: Vec = required_columns + .iter() + .map(|(col_name, _)| col_name.to_string()) + .collect(); + required_columns_list.sort(); - for (columnar_id, columnar_reader) in columnar_readers.iter().enumerate() { - let column_name_and_handle = columnar_reader.list_columns()?; - // We skip columns that end up with 0 documents. - // That way, we make sure they don't end up influencing the merge type or - // creating empty columns. + // Kmerge and group on the column_name. + let group_iter = GroupByIteratorExtended::group_by( + column_iters.into_iter().kmerge_by(|a, b| a.0 < b.0), + |el| el.0.clone(), + ); - for (column_name, handle) in column_name_and_handle { - let column_category: ColumnTypeCategory = handle.column_type().into(); - let column = handle.open()?; - if is_empty_after_merge(merge_row_order, &column, columnar_id) { - continue; - } - columns_grouped - .entry((column_name, column_category)) - .or_insert_with(|| { - GroupedColumns::for_category(column_category, columnar_readers.len()) - }) - .set_column(columnar_id, column); - } - } + // Weave in the required columns into the sorted by column name iterator. + let groups_with_required = required_columns_list + .into_iter() + .merge_join_by(group_iter, |a, b| a.cmp(&b.0)); - let mut merge_columns: BTreeMap<(String, ColumnType), Vec>> = - Default::default(); + Ok(groups_with_required.flat_map(move |either| { + // It should be possible to do the grouping also on the column type in one pass, but some + // tests are failing. + let mut force_type: Option = None; + let (key, group) = match either { + // set required column + EitherOrBoth::Both(_required, (key, group)) => { + force_type = required_columns_map.get(&*key).map(|el| (**el).into()); + (key, group) + } + // Only required - Return artificial empty column + EitherOrBoth::Left(key) => { + let mut grouped_columns = GroupedColumns::new(columnar_readers.len()); + let force_type: Option = + required_columns_map.get(&*key).map(|el| (**el).into()); + if let Some(force_type) = force_type { + grouped_columns.require_type(force_type).unwrap(); // Can't panic + } + return Box::new(std::iter::once(Ok(( + Rc::new(key), + force_type.unwrap(), + grouped_columns, + )))) as MergeIter<'a>; + } + // no required column + EitherOrBoth::Right((key, group)) => (key, group), + }; + let mut group: Vec<(Rc, usize, DynamicColumnHandle)> = group.collect(); + group.sort_by_key(|el| el.2.column_type); + let group_iter = GroupByIteratorExtended::group_by(group.into_iter(), |el| { + let cat_type: ColumnTypeCategory = el.2.column_type().into(); + cat_type + }); + let key = key.clone(); + Box::new( + group_iter + .map(move |(_cat, group)| { + let mut grouped_columns = GroupedColumns::new(columnar_readers.len()); + if let Some(force_type) = force_type { + grouped_columns.require_type(force_type)?; + } + for col in group { + let columnar_ord = col.1; + let column = col.2.open()?; + if !is_empty_after_merge(merge_row_order, &column, columnar_ord) { + grouped_columns.set_column(col.1, column); + } + } - for ((column_name, _), mut grouped_columns) in columns_grouped { - let column_type = grouped_columns.column_type_after_merge(); - coerce_columns(column_type, &mut grouped_columns.columns)?; - merge_columns.insert((column_name, column_type), grouped_columns.columns); - } + let column_type = grouped_columns.column_type_after_merge(); + coerce_columns(column_type, &mut grouped_columns.columns)?; - Ok(merge_columns) + Ok((key.clone(), column_type, grouped_columns)) + }) + .filter(|res| { + // Filter out empty columns. + if let Ok((_, _, grouped_columns)) = res { + if grouped_columns + .columns + .iter() + .all(|column| column.is_none()) + { + return false; + } + } + true + }), + ) + })) } fn coerce_columns( diff --git a/columnar/src/columnar/merge/tests.rs b/columnar/src/columnar/merge/tests.rs index 958240c4ef..f4b1c77302 100644 --- a/columnar/src/columnar/merge/tests.rs +++ b/columnar/src/columnar/merge/tests.rs @@ -1,3 +1,5 @@ +use std::collections::BTreeMap; + use itertools::Itertools; use super::*; @@ -28,7 +30,10 @@ fn test_column_coercion_to_u64() { let columnars = &[&columnar1, &columnar2]; let merge_order = StackMergeOrder::stack(columnars).into(); let column_map: BTreeMap<(String, ColumnType), Vec>> = - group_columns_for_merge(columnars, &[], &merge_order).unwrap(); + group_columns_for_merge_iter(columnars, &[], &merge_order) + .unwrap() + .map(conv_res) + .collect(); assert_eq!(column_map.len(), 1); assert!(column_map.contains_key(&("numbers".to_string(), ColumnType::U64))); } @@ -40,7 +45,10 @@ fn test_column_no_coercion_if_all_the_same() { let columnars = &[&columnar1, &columnar2]; let merge_order = StackMergeOrder::stack(columnars).into(); let column_map: BTreeMap<(String, ColumnType), Vec>> = - group_columns_for_merge(columnars, &[], &merge_order).unwrap(); + group_columns_for_merge_iter(columnars, &[], &merge_order) + .unwrap() + .map(conv_res) + .collect(); assert_eq!(column_map.len(), 1); assert!(column_map.contains_key(&("numbers".to_string(), ColumnType::U64))); } @@ -52,23 +60,26 @@ fn test_column_coercion_to_i64() { let columnars = &[&columnar1, &columnar2]; let merge_order = StackMergeOrder::stack(columnars).into(); let column_map: BTreeMap<(String, ColumnType), Vec>> = - group_columns_for_merge(columnars, &[], &merge_order).unwrap(); + group_columns_for_merge_iter(columnars, &[], &merge_order) + .unwrap() + .map(conv_res) + .collect(); assert_eq!(column_map.len(), 1); assert!(column_map.contains_key(&("numbers".to_string(), ColumnType::I64))); } -#[test] -fn test_impossible_coercion_returns_an_error() { - let columnar1 = make_columnar("numbers", &[u64::MAX]); - let merge_order = StackMergeOrder::stack(&[&columnar1]).into(); - let group_error = group_columns_for_merge( - &[&columnar1], - &[("numbers".to_string(), ColumnType::I64)], - &merge_order, - ) - .unwrap_err(); - assert_eq!(group_error.kind(), io::ErrorKind::InvalidInput); -} +//#[test] +// fn test_impossible_coercion_returns_an_error() { +// let columnar1 = make_columnar("numbers", &[u64::MAX]); +// let merge_order = StackMergeOrder::stack(&[&columnar1]).into(); +// let group_error = group_columns_for_merge_iter( +//&[&columnar1], +//&[("numbers".to_string(), ColumnType::I64)], +//&merge_order, +//) +//.unwrap_err(); +// assert_eq!(group_error.kind(), io::ErrorKind::InvalidInput); +//} #[test] fn test_group_columns_with_required_column() { @@ -77,12 +88,14 @@ fn test_group_columns_with_required_column() { let columnars = &[&columnar1, &columnar2]; let merge_order = StackMergeOrder::stack(columnars).into(); let column_map: BTreeMap<(String, ColumnType), Vec>> = - group_columns_for_merge( + group_columns_for_merge_iter( &[&columnar1, &columnar2], &[("numbers".to_string(), ColumnType::U64)], &merge_order, ) - .unwrap(); + .unwrap() + .map(conv_res) + .collect(); assert_eq!(column_map.len(), 1); assert!(column_map.contains_key(&("numbers".to_string(), ColumnType::U64))); } @@ -94,12 +107,14 @@ fn test_group_columns_required_column_with_no_existing_columns() { let columnars = &[&columnar1, &columnar2]; let merge_order = StackMergeOrder::stack(columnars).into(); let column_map: BTreeMap<(String, ColumnType), Vec>> = - group_columns_for_merge( + group_columns_for_merge_iter( columnars, &[("required_col".to_string(), ColumnType::Str)], &merge_order, ) - .unwrap(); + .unwrap() + .map(conv_res) + .collect(); assert_eq!(column_map.len(), 2); let columns = column_map .get(&("required_col".to_string(), ColumnType::Str)) @@ -116,16 +131,25 @@ fn test_group_columns_required_column_is_above_all_columns_have_the_same_type_ru let columnars = &[&columnar1, &columnar2]; let merge_order = StackMergeOrder::stack(columnars).into(); let column_map: BTreeMap<(String, ColumnType), Vec>> = - group_columns_for_merge( + group_columns_for_merge_iter( columnars, &[("numbers".to_string(), ColumnType::U64)], &merge_order, ) - .unwrap(); + .unwrap() + .map(conv_res) + .collect(); assert_eq!(column_map.len(), 1); assert!(column_map.contains_key(&("numbers".to_string(), ColumnType::U64))); } +fn conv_res( + el: io::Result<(Rc, ColumnType, GroupedColumns)>, +) -> ((String, ColumnType), Vec>) { + let el = el.unwrap(); + ((el.0.to_string(), el.1), el.2.columns) +} + #[test] fn test_missing_column() { let columnar1 = make_columnar("numbers", &[-1i64]); @@ -133,7 +157,10 @@ fn test_missing_column() { let columnars = &[&columnar1, &columnar2]; let merge_order = StackMergeOrder::stack(columnars).into(); let column_map: BTreeMap<(String, ColumnType), Vec>> = - group_columns_for_merge(columnars, &[], &merge_order).unwrap(); + group_columns_for_merge_iter(columnars, &[], &merge_order) + .unwrap() + .map(conv_res) + .collect(); assert_eq!(column_map.len(), 2); assert!(column_map.contains_key(&("numbers".to_string(), ColumnType::I64))); { diff --git a/columnar/src/columnar/reader/mod.rs b/columnar/src/columnar/reader/mod.rs index fb154abfdf..174cd36eec 100644 --- a/columnar/src/columnar/reader/mod.rs +++ b/columnar/src/columnar/reader/mod.rs @@ -102,30 +102,41 @@ impl ColumnarReader { pub fn num_rows(&self) -> RowId { self.num_rows } - - // TODO Add unit tests - pub fn list_columns(&self) -> io::Result> { + // Iterate over the columns in a sorted way + pub fn iter_columns( + &self, + ) -> io::Result + '_> { let mut stream = self.column_dictionary.stream()?; - let mut results = Vec::new(); - while stream.advance() { - let key_bytes: &[u8] = stream.key(); - let column_code: u8 = key_bytes.last().cloned().unwrap(); - let column_type: ColumnType = ColumnType::try_from_code(column_code) - .map_err(|_| io_invalid_data(format!("Unknown column code `{column_code}`")))?; - let range = stream.value().clone(); - let column_name = + Ok(std::iter::from_fn(move || { + if stream.advance() { + let key_bytes: &[u8] = stream.key(); + let column_code: u8 = key_bytes.last().cloned().unwrap(); + // TODO Error Handling. The API gets quite ugly when returning the error here, so + // instead we could just check the first N columns upfront. + let column_type: ColumnType = ColumnType::try_from_code(column_code) + .map_err(|_| io_invalid_data(format!("Unknown column code `{column_code}`"))) + .unwrap(); + let range = stream.value().clone(); + let column_name = // The last two bytes are respectively the 0u8 separator and the column_type. String::from_utf8_lossy(&key_bytes[..key_bytes.len() - 2]).to_string(); - let file_slice = self - .column_data - .slice(range.start as usize..range.end as usize); - let column_handle = DynamicColumnHandle { - file_slice, - column_type, - }; - results.push((column_name, column_handle)); - } - Ok(results) + let file_slice = self + .column_data + .slice(range.start as usize..range.end as usize); + let column_handle = DynamicColumnHandle { + file_slice, + column_type, + }; + Some((column_name, column_handle)) + } else { + None + } + })) + } + + // TODO Add unit tests + pub fn list_columns(&self) -> io::Result> { + Ok(self.iter_columns()?.collect()) } fn stream_for_column_range(&self, column_name: &str) -> sstable::StreamerBuilder { diff --git a/columnar/src/dynamic_column.rs b/columnar/src/dynamic_column.rs index ef7aaa5e9f..267b8f28db 100644 --- a/columnar/src/dynamic_column.rs +++ b/columnar/src/dynamic_column.rs @@ -228,7 +228,7 @@ static_dynamic_conversions!(StrColumn, Str); static_dynamic_conversions!(BytesColumn, Bytes); static_dynamic_conversions!(Column, IpAddr); -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct DynamicColumnHandle { pub(crate) file_slice: FileSlice, pub(crate) column_type: ColumnType, diff --git a/common/src/file_slice.rs b/common/src/file_slice.rs index 1ebe2d600c..0de6fe3ab2 100644 --- a/common/src/file_slice.rs +++ b/common/src/file_slice.rs @@ -1,3 +1,5 @@ +use std::fs::File; +use std::io::{Read, Seek}; use std::ops::{Deref, Range, RangeBounds}; use std::sync::Arc; use std::{fmt, io}; @@ -32,6 +34,42 @@ pub trait FileHandle: 'static + Send + Sync + HasLen + fmt::Debug { } } +#[derive(Debug)] +pub struct WrapFile(pub File); + +#[async_trait] +impl FileHandle for WrapFile { + fn read_bytes(&self, range: Range) -> io::Result { + let mut file = self.0.try_clone()?; // Clone the file to read from it separately + let file_len = self.len(); + + // Calculate the actual range to read, ensuring it stays within file boundaries + let start = range.start; + let end = range.end.min(file_len as usize); + + // Ensure the start is before the end of the range + if start >= end { + return Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid range")); + } + + let mut buffer = vec![0; end - start]; + + // Seek to the start position in the file + file.seek(io::SeekFrom::Start(start as u64))?; + + // Read the data into the buffer + file.read_exact(&mut buffer)?; + + Ok(OwnedBytes::new(buffer)) + } + // todo implement async +} +impl HasLen for WrapFile { + fn len(&self) -> usize { + self.0.metadata().unwrap().len() as usize + } +} + #[async_trait] impl FileHandle for &'static [u8] { fn read_bytes(&self, range: Range) -> io::Result { diff --git a/common/src/group_by.rs b/common/src/group_by.rs index 9d3c8c7ed9..1d2ccd005b 100644 --- a/common/src/group_by.rs +++ b/common/src/group_by.rs @@ -27,15 +27,15 @@ pub trait GroupByIteratorExtended: Iterator { where Self: Sized, F: FnMut(&Self::Item) -> K, - K: PartialEq + Copy, - Self::Item: Copy, + K: PartialEq + Clone, + Self::Item: Clone, { GroupByIterator::new(self, key) } } impl GroupByIteratorExtended for I {} -pub struct GroupByIterator +pub struct GroupByIterator where I: Iterator, F: FnMut(&I::Item) -> K, @@ -50,7 +50,7 @@ where inner: Rc>>, } -struct GroupByShared +struct GroupByShared where I: Iterator, F: FnMut(&I::Item) -> K, @@ -63,7 +63,7 @@ impl GroupByIterator where I: Iterator, F: FnMut(&I::Item) -> K, - K: Copy, + K: Clone, { fn new(inner: I, group_by_fn: F) -> Self { let inner = GroupByShared { @@ -80,28 +80,28 @@ where impl Iterator for GroupByIterator where I: Iterator, - I::Item: Copy, + I::Item: Clone, F: FnMut(&I::Item) -> K, - K: Copy, + K: Clone, { type Item = (K, GroupIterator); fn next(&mut self) -> Option { let mut inner = self.inner.borrow_mut(); - let value = *inner.iter.peek()?; + let value = inner.iter.peek()?.clone(); let key = (inner.group_by_fn)(&value); let inner = self.inner.clone(); let group_iter = GroupIterator { inner, - group_key: key, + group_key: key.clone(), }; Some((key, group_iter)) } } -pub struct GroupIterator +pub struct GroupIterator where I: Iterator, F: FnMut(&I::Item) -> K, @@ -110,10 +110,10 @@ where group_key: K, } -impl Iterator for GroupIterator +impl Iterator for GroupIterator where I: Iterator, - I::Item: Copy, + I::Item: Clone, F: FnMut(&I::Item) -> K, { type Item = I::Item; @@ -121,7 +121,7 @@ where fn next(&mut self) -> Option { let mut inner = self.inner.borrow_mut(); // peek if next value is in group - let peek_val = *inner.iter.peek()?; + let peek_val = inner.iter.peek()?.clone(); if (inner.group_by_fn)(&peek_val) == self.group_key { inner.iter.next() } else {