diff --git a/rust/lance-encoding-datafusion/src/zone.rs b/rust/lance-encoding-datafusion/src/zone.rs index 06ddfe69f4..03b8e5278a 100644 --- a/rust/lance-encoding-datafusion/src/zone.rs +++ b/rust/lance-encoding-datafusion/src/zone.rs @@ -393,6 +393,8 @@ impl ZoneMapsFieldScheduler { &FilterExpression::no_filter(), Arc::::default(), /*should_validate= */ false, + LanceFileVersion::default(), + None, ) .await?; diff --git a/rust/lance-encoding/Cargo.toml b/rust/lance-encoding/Cargo.toml index 14e777a101..e43a7c634e 100644 --- a/rust/lance-encoding/Cargo.toml +++ b/rust/lance-encoding/Cargo.toml @@ -69,4 +69,4 @@ name = "buffer" harness = false [lints] -workspace = true \ No newline at end of file +workspace = true diff --git a/rust/lance-encoding/benches/decoder.rs b/rust/lance-encoding/benches/decoder.rs index 4aba5142a4..922f6636f8 100644 --- a/rust/lance-encoding/benches/decoder.rs +++ b/rust/lance-encoding/benches/decoder.rs @@ -6,6 +6,8 @@ use arrow_array::{RecordBatch, UInt32Array}; use arrow_schema::{DataType, Field, Schema, TimeUnit}; use arrow_select::take::take; use criterion::{criterion_group, criterion_main, Criterion}; +use lance_core::cache::FileMetadataCache; +use lance_datagen::ArrayGeneratorExt; use lance_encoding::{ decoder::{DecoderPlugins, FilterExpression}, encoder::{default_encoding_strategy, encode_batch, EncodingOptions}, @@ -41,22 +43,10 @@ const PRIMITIVE_TYPES: &[DataType] = &[ // Some types are supported by the encoder/decoder but Lance // schema doesn't yet parse them in the context of a fixed size list. -const PRIMITIVE_TYPES_FOR_FSL: &[DataType] = &[ - DataType::Int8, - DataType::Int16, - DataType::Int32, - DataType::Int64, - DataType::UInt8, - DataType::UInt16, - DataType::UInt32, - DataType::UInt64, - DataType::Float16, - DataType::Float32, - DataType::Float64, -]; +const PRIMITIVE_TYPES_FOR_FSL: &[DataType] = &[DataType::Int8, DataType::Float32]; const ENCODING_OPTIONS: EncodingOptions = EncodingOptions { - cache_bytes_per_column: 1024 * 1024, + cache_bytes_per_column: 8 * 1024 * 1024, max_page_bytes: 32 * 1024 * 1024, keep_original_array: true, buffer_alignment: 64, @@ -65,26 +55,28 @@ const ENCODING_OPTIONS: EncodingOptions = EncodingOptions { fn bench_decode(c: &mut Criterion) { let rt = tokio::runtime::Runtime::new().unwrap(); let mut group = c.benchmark_group("decode_primitive"); + const NUM_BYTES: u64 = 1024 * 1024 * 128; + group.throughput(criterion::Throughput::Bytes(NUM_BYTES)); for data_type in PRIMITIVE_TYPES { - let data = lance_datagen::gen() - .anon_col(lance_datagen::array::rand_type(data_type)) - .into_batch_rows(lance_datagen::RowCount::from(1024 * 1024 * 1024)) - .unwrap(); - let lance_schema = - Arc::new(lance_core::datatypes::Schema::try_from(data.schema().as_ref()).unwrap()); - let input_bytes = data.get_array_memory_size(); - group.throughput(criterion::Throughput::Bytes(input_bytes as u64)); - let encoding_strategy = default_encoding_strategy(LanceFileVersion::default()); - let encoded = rt - .block_on(encode_batch( - &data, - lance_schema, - encoding_strategy.as_ref(), - &ENCODING_OPTIONS, - )) - .unwrap(); let func_name = format!("{:?}", data_type).to_lowercase(); + let num_rows = NUM_BYTES / data_type.primitive_width().unwrap() as u64; group.bench_function(func_name, |b| { + let data = lance_datagen::gen() + .anon_col(lance_datagen::array::rand_type(data_type)) + .into_batch_rows(lance_datagen::RowCount::from(num_rows)) + .unwrap(); + let lance_schema = + Arc::new(lance_core::datatypes::Schema::try_from(data.schema().as_ref()).unwrap()); + let encoding_strategy = default_encoding_strategy(LanceFileVersion::default()); + let encoded = rt + .block_on(encode_batch( + &data, + lance_schema, + encoding_strategy.as_ref(), + &ENCODING_OPTIONS, + )) + .unwrap(); + b.iter(|| { let batch = rt .block_on(lance_encoding::decoder::decode_batch( @@ -92,6 +84,8 @@ fn bench_decode(c: &mut Criterion) { &FilterExpression::no_filter(), Arc::::default(), false, + LanceFileVersion::default(), + Some(Arc::new(FileMetadataCache::no_cache())), )) .unwrap(); assert_eq!(data.num_rows(), batch.num_rows()); @@ -102,48 +96,76 @@ fn bench_decode(c: &mut Criterion) { fn bench_decode_fsl(c: &mut Criterion) { let rt = tokio::runtime::Runtime::new().unwrap(); - let mut group = c.benchmark_group("decode_primitive_fsl"); - for data_type in PRIMITIVE_TYPES_FOR_FSL { - let data = lance_datagen::gen() - .anon_col(lance_datagen::array::rand_type(&DataType::FixedSizeList( - Arc::new(Field::new("item", DataType::Int32, true)), - 1024, - ))) - .into_batch_rows(lance_datagen::RowCount::from(1024)) - .unwrap(); - let lance_schema = - Arc::new(lance_core::datatypes::Schema::try_from(data.schema().as_ref()).unwrap()); - let input_bytes = data.get_array_memory_size(); - group.throughput(criterion::Throughput::Bytes(input_bytes as u64)); - let encoding_strategy = default_encoding_strategy(LanceFileVersion::default()); - let encoded = rt - .block_on(encode_batch( - &data, - lance_schema, - encoding_strategy.as_ref(), - &ENCODING_OPTIONS, - )) - .unwrap(); - let func_name = format!("{:?}", data_type).to_lowercase(); - group.bench_function(func_name, |b| { - b.iter(|| { - let batch = rt - .block_on(lance_encoding::decoder::decode_batch( - &encoded, - &FilterExpression::no_filter(), - Arc::::default(), - false, - )) - .unwrap(); - assert_eq!(data.num_rows(), batch.num_rows()); - }) - }); + let mut group = c.benchmark_group("decode_fsl"); + const NUM_BYTES: u64 = 1024 * 1024 * 128; + for version in [LanceFileVersion::V2_0, LanceFileVersion::V2_1] { + for data_type in PRIMITIVE_TYPES_FOR_FSL { + for dimension in [4, 16, 32, 64, 128] { + let nullable_choices: &[bool] = if version == LanceFileVersion::V2_0 { + &[false] + } else { + &[false, true] + }; + for nullable in nullable_choices { + let func_name = format!( + "{:?}_{}_v{}_null{}", + data_type, dimension, version, nullable + ) + .to_lowercase(); + group.throughput(criterion::Throughput::Bytes(NUM_BYTES)); + group.bench_function(func_name, |b| { + let num_rows = + NUM_BYTES / (dimension * data_type.primitive_width().unwrap() as u64); + let mut arraygen = + lance_datagen::array::rand_type(&DataType::FixedSizeList( + Arc::new(Field::new("item", data_type.clone(), true)), + dimension as i32, + )); + if *nullable { + arraygen = arraygen.with_random_nulls(0.5); + } + let data = lance_datagen::gen() + .anon_col(arraygen) + .into_batch_rows(lance_datagen::RowCount::from(num_rows)) + .unwrap(); + let lance_schema = Arc::new( + lance_core::datatypes::Schema::try_from(data.schema().as_ref()) + .unwrap(), + ); + let encoding_strategy = default_encoding_strategy(version); + let encoded = rt + .block_on(encode_batch( + &data, + lance_schema, + encoding_strategy.as_ref(), + &ENCODING_OPTIONS, + )) + .unwrap(); + b.iter(|| { + let batch = rt + .block_on(lance_encoding::decoder::decode_batch( + &encoded, + &FilterExpression::no_filter(), + Arc::::default(), + false, + version, + Some(Arc::new(FileMetadataCache::no_cache())), + )) + .unwrap(); + assert_eq!(data.num_rows(), batch.num_rows()); + }) + }); + } + } + } } } fn bench_decode_str_with_dict_encoding(c: &mut Criterion) { let rt = tokio::runtime::Runtime::new().unwrap(); let mut group = c.benchmark_group("decode_primitive"); + const NUM_ROWS: u64 = 100000; + let data_type = DataType::Utf8; // generate string column with 20 rows let string_data = lance_datagen::gen() @@ -151,38 +173,40 @@ fn bench_decode_str_with_dict_encoding(c: &mut Criterion) { .into_batch_rows(lance_datagen::RowCount::from(20)) .unwrap(); - let string_array = string_data.column(0); - - // generate random int column with 100000 rows - let mut rng = rand::thread_rng(); - let integer_arr: Vec = (0..100_000).map(|_| rng.gen_range(0..20)).collect(); - let integer_array = UInt32Array::from(integer_arr); - - let mapped_strings = take(string_array, &integer_array, None).unwrap(); - - let schema = Arc::new(Schema::new(vec![Field::new( - "string", - DataType::Utf8, - false, - )])); - - let data = RecordBatch::try_new(schema, vec![Arc::new(mapped_strings)]).unwrap(); - - let lance_schema = - Arc::new(lance_core::datatypes::Schema::try_from(data.schema().as_ref()).unwrap()); - let input_bytes = data.get_array_memory_size(); - group.throughput(criterion::Throughput::Bytes(input_bytes as u64)); - let encoding_strategy = default_encoding_strategy(LanceFileVersion::default()); - let encoded = rt - .block_on(encode_batch( - &data, - lance_schema, - encoding_strategy.as_ref(), - &ENCODING_OPTIONS, - )) - .unwrap(); + group.throughput(criterion::Throughput::Bytes( + NUM_ROWS * std::mem::size_of::() as u64 + string_data.get_array_memory_size() as u64, + )); + let func_name = format!("{:?}", data_type).to_lowercase(); group.bench_function(func_name, |b| { + let string_array = string_data.column(0); + + // generate random int column with 100000 rows + let mut rng = rand::thread_rng(); + let integer_arr: Vec = (0..100_000).map(|_| rng.gen_range(0..20)).collect(); + let integer_array = UInt32Array::from(integer_arr); + + let mapped_strings = take(string_array, &integer_array, None).unwrap(); + + let schema = Arc::new(Schema::new(vec![Field::new( + "string", + DataType::Utf8, + false, + )])); + + let data = RecordBatch::try_new(schema, vec![Arc::new(mapped_strings)]).unwrap(); + + let lance_schema = + Arc::new(lance_core::datatypes::Schema::try_from(data.schema().as_ref()).unwrap()); + let encoding_strategy = default_encoding_strategy(LanceFileVersion::default()); + let encoded = rt + .block_on(encode_batch( + &data, + lance_schema, + encoding_strategy.as_ref(), + &ENCODING_OPTIONS, + )) + .unwrap(); b.iter(|| { let batch = rt .block_on(lance_encoding::decoder::decode_batch( @@ -190,6 +214,8 @@ fn bench_decode_str_with_dict_encoding(c: &mut Criterion) { &FilterExpression::no_filter(), Arc::::default(), false, + LanceFileVersion::default(), + Some(Arc::new(FileMetadataCache::no_cache())), )) .unwrap(); assert_eq!(data.num_rows(), batch.num_rows()); @@ -201,58 +227,62 @@ fn bench_decode_packed_struct(c: &mut Criterion) { let rt = tokio::runtime::Runtime::new().unwrap(); let mut group = c.benchmark_group("decode_primitive"); - let fields = vec![ - Arc::new(Field::new("int_field", DataType::Int32, false)), - Arc::new(Field::new("float_field", DataType::Float32, false)), - Arc::new(Field::new( - "fsl_field", - DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int32, true)), 5), - false, - )), - ] - .into(); - - // generate struct column with 1M rows - let data = lance_datagen::gen() - .anon_col(lance_datagen::array::rand_type(&DataType::Struct(fields))) - .into_batch_rows(lance_datagen::RowCount::from(10000)) - .unwrap(); - - let schema = data.schema(); - let new_fields: Vec> = schema - .fields() - .iter() - .map(|field| { - if matches!(field.data_type(), &DataType::Struct(_)) { - let mut metadata = HashMap::new(); - metadata.insert("packed".to_string(), "true".to_string()); - let field = - Field::new(field.name(), field.data_type().clone(), field.is_nullable()); - Arc::new(field.with_metadata(metadata)) - } else { - field.clone() - } - }) - .collect(); - - let new_schema = Schema::new(new_fields); - let data = RecordBatch::try_new(Arc::new(new_schema.clone()), data.columns().to_vec()).unwrap(); - - let lance_schema = Arc::new(lance_core::datatypes::Schema::try_from(&new_schema).unwrap()); - let input_bytes = data.get_array_memory_size(); - group.throughput(criterion::Throughput::Bytes(input_bytes as u64)); - let encoding_strategy = default_encoding_strategy(LanceFileVersion::default()); - let encoded = rt - .block_on(encode_batch( - &data, - lance_schema, - encoding_strategy.as_ref(), - &ENCODING_OPTIONS, - )) - .unwrap(); + const NUM_ROWS: u64 = 10000; + let size_bytes = + ((6 * std::mem::size_of::() as u64) + std::mem::size_of::() as u64) * NUM_ROWS; + group.throughput(criterion::Throughput::Bytes(size_bytes)); let func_name = "struct"; group.bench_function(func_name, |b| { + let fields = vec![ + Arc::new(Field::new("int_field", DataType::Int32, false)), + Arc::new(Field::new("float_field", DataType::Float32, false)), + Arc::new(Field::new( + "fsl_field", + DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int32, true)), 5), + false, + )), + ] + .into(); + + // generate struct column with 1M rows + let data = lance_datagen::gen() + .anon_col(lance_datagen::array::rand_type(&DataType::Struct(fields))) + .into_batch_rows(lance_datagen::RowCount::from(NUM_ROWS)) + .unwrap(); + + let schema = data.schema(); + let new_fields: Vec> = schema + .fields() + .iter() + .map(|field| { + if matches!(field.data_type(), &DataType::Struct(_)) { + let mut metadata = HashMap::new(); + metadata.insert("packed".to_string(), "true".to_string()); + let field = + Field::new(field.name(), field.data_type().clone(), field.is_nullable()); + Arc::new(field.with_metadata(metadata)) + } else { + field.clone() + } + }) + .collect(); + + let new_schema = Schema::new(new_fields); + let data = + RecordBatch::try_new(Arc::new(new_schema.clone()), data.columns().to_vec()).unwrap(); + + let lance_schema = Arc::new(lance_core::datatypes::Schema::try_from(&new_schema).unwrap()); + let encoding_strategy = default_encoding_strategy(LanceFileVersion::default()); + let encoded = rt + .block_on(encode_batch( + &data, + lance_schema, + encoding_strategy.as_ref(), + &ENCODING_OPTIONS, + )) + .unwrap(); + b.iter(|| { let batch = rt .block_on(lance_encoding::decoder::decode_batch( @@ -260,6 +290,8 @@ fn bench_decode_packed_struct(c: &mut Criterion) { &FilterExpression::no_filter(), Arc::::default(), false, + LanceFileVersion::default(), + Some(Arc::new(FileMetadataCache::no_cache())), )) .unwrap(); assert_eq!(data.num_rows(), batch.num_rows()); @@ -267,41 +299,44 @@ fn bench_decode_packed_struct(c: &mut Criterion) { }); } -#[allow(dead_code)] fn bench_decode_str_with_fixed_size_binary_encoding(c: &mut Criterion) { let rt = tokio::runtime::Runtime::new().unwrap(); let mut group = c.benchmark_group("decode_primitive"); - // generate string column with 10k rows - // Currently the generator generates fixed size strings by default - // This function will need to be updated once that changes. - let string_data = lance_datagen::gen() - .anon_col(lance_datagen::array::rand_type(&DataType::Utf8)) - .into_batch_rows(lance_datagen::RowCount::from(10000)) - .unwrap(); - let schema = Arc::new(Schema::new(vec![Field::new( - "string", - DataType::Utf8, - false, - )])); - - let data = RecordBatch::try_new(schema, string_data.columns().to_vec()).unwrap(); - - let lance_schema = - Arc::new(lance_core::datatypes::Schema::try_from(data.schema().as_ref()).unwrap()); - let input_bytes = data.get_array_memory_size(); - group.throughput(criterion::Throughput::Bytes(input_bytes as u64)); - let encoding_strategy = default_encoding_strategy(LanceFileVersion::default()); - let encoded = rt - .block_on(encode_batch( - &data, - lance_schema, - encoding_strategy.as_ref(), - &ENCODING_OPTIONS, - )) - .unwrap(); + const NUM_ROWS: u64 = 10000; + // Randomly generated strings are always 12 characters (at the moment) + const NUM_BYTES: u64 = NUM_ROWS * 16; + group.throughput(criterion::Throughput::Bytes(NUM_BYTES)); + let func_name = "fixed-utf8".to_string(); group.bench_function(func_name, |b| { + // generate string column with 10k rows + // Currently the generator generates fixed size strings by default + // This function will need to be updated once that changes. + let string_data = lance_datagen::gen() + .anon_col(lance_datagen::array::rand_type(&DataType::Utf8)) + .into_batch_rows(lance_datagen::RowCount::from(10000)) + .unwrap(); + + let schema = Arc::new(Schema::new(vec![Field::new( + "string", + DataType::Utf8, + false, + )])); + + let data = RecordBatch::try_new(schema, string_data.columns().to_vec()).unwrap(); + + let lance_schema = + Arc::new(lance_core::datatypes::Schema::try_from(data.schema().as_ref()).unwrap()); + let encoding_strategy = default_encoding_strategy(LanceFileVersion::default()); + let encoded = rt + .block_on(encode_batch( + &data, + lance_schema, + encoding_strategy.as_ref(), + &ENCODING_OPTIONS, + )) + .unwrap(); b.iter(|| { let batch = rt .block_on(lance_encoding::decoder::decode_batch( @@ -309,6 +344,8 @@ fn bench_decode_str_with_fixed_size_binary_encoding(c: &mut Criterion) { &FilterExpression::no_filter(), Arc::::default(), false, + LanceFileVersion::default(), + Some(Arc::new(FileMetadataCache::no_cache())), )) .unwrap(); assert_eq!(data.num_rows(), batch.num_rows()); diff --git a/rust/lance-encoding/src/decoder.rs b/rust/lance-encoding/src/decoder.rs index 949f9ad9d3..ff5e48f791 100644 --- a/rust/lance-encoding/src/decoder.rs +++ b/rust/lance-encoding/src/decoder.rs @@ -253,6 +253,7 @@ use crate::encodings::physical::value::{ConstantDecompressor, ValueDecompressor} use crate::encodings::physical::{ColumnBuffers, FileBuffers}; use crate::format::pb::{self, column_encoding}; use crate::repdef::{LevelBuffer, RepDefUnraveler}; +use crate::version::LanceFileVersion; use crate::{BufferScheduler, EncodingsIo}; // If users are getting batches over 10MiB large then it's time to reduce the batch size @@ -2364,16 +2365,20 @@ pub async fn decode_batch( filter: &FilterExpression, decoder_plugins: Arc, should_validate: bool, + version: LanceFileVersion, + cache: Option>, ) -> Result { // The io is synchronous so it shouldn't be possible for any async stuff to still be in progress // Still, if we just use now_or_never we hit misfires because some futures (channels) need to be // polled twice. let io_scheduler = Arc::new(BufferScheduler::new(batch.data.clone())) as Arc; - let cache = Arc::new(FileMetadataCache::with_capacity( - 128 * 1024 * 1024, - CapacityMode::Bytes, - )); + let cache = cache.unwrap_or_else(|| { + Arc::new(FileMetadataCache::with_capacity( + 128 * 1024 * 1024, + CapacityMode::Bytes, + )) + }); let mut decode_scheduler = DecodeBatchScheduler::try_new( batch.schema.as_ref(), &batch.top_level_columns, @@ -2388,7 +2393,7 @@ pub async fn decode_batch( .await?; let (tx, rx) = unbounded_channel(); decode_scheduler.schedule_range(0..batch.num_rows, filter, tx, io_scheduler); - let is_structural = false; + let is_structural = version >= LanceFileVersion::V2_1; let mut decode_stream = create_decode_stream( &batch.schema, batch.num_rows, diff --git a/rust/lance-file/src/v2/reader.rs b/rust/lance-file/src/v2/reader.rs index 4dfad26c3a..172bc6f41c 100644 --- a/rust/lance-file/src/v2/reader.rs +++ b/rust/lance-file/src/v2/reader.rs @@ -1315,6 +1315,8 @@ pub mod tests { &FilterExpression::no_filter(), Arc::::default(), false, + LanceFileVersion::default(), + None, ) .await .unwrap(); @@ -1331,6 +1333,8 @@ pub mod tests { &FilterExpression::no_filter(), Arc::::default(), false, + LanceFileVersion::default(), + None, ) .await .unwrap();