Skip to content

Commit

Permalink
skip reading unused columns
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangli20 committed Oct 23, 2024
1 parent be8e1f1 commit be3c0aa
Showing 1 changed file with 15 additions and 13 deletions.
28 changes: 15 additions & 13 deletions src/stripe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use std::{collections::HashMap, io::Read, sync::Arc};
use std::collections::HashSet;

use bytes::Bytes;
use prost::Message;
Expand Down Expand Up @@ -139,23 +140,23 @@ impl Stripe {
.context(IoSnafu)?;
let footer = Arc::new(deserialize_stripe_footer(footer, compression)?);

let columns = projected_data_type
let columns: Vec<Column> = projected_data_type
.children()
.iter()
.map(|col| Column::new(col.name(), col.data_type(), &footer))
.collect();
let column_ids: HashSet<u32> = columns.iter().map(|c| c.column_id()).collect();

let mut stream_map = HashMap::new();
let mut stream_offset = info.offset();
for stream in &footer.streams {
let length = stream.length();
let column_id = stream.column();
let kind = stream.kind();
let data = Column::read_stream(reader, stream_offset, length)?;

// TODO(weny): filter out unused streams.
stream_map.insert((column_id, kind), data);

if column_ids.contains(&column_id) {
let kind = stream.kind();
let data = Column::read_stream(reader, stream_offset, length)?;
stream_map.insert((column_id, kind), data);
}
stream_offset += length;
}

Expand Down Expand Up @@ -192,22 +193,23 @@ impl Stripe {
.context(IoSnafu)?;
let footer = Arc::new(deserialize_stripe_footer(footer, compression)?);

let columns = projected_data_type
let columns: Vec<Column> = projected_data_type
.children()
.iter()
.map(|col| Column::new(col.name(), col.data_type(), &footer))
.collect();
let column_ids: HashSet<u32> = columns.iter().map(|c| c.column_id()).collect();

let mut stream_map = HashMap::new();
let mut stream_offset = info.offset();
for stream in &footer.streams {
let length = stream.length();
let column_id = stream.column();
let kind = stream.kind();
let data = Column::read_stream_async(reader, stream_offset, length).await?;

// TODO(weny): filter out unused streams.
stream_map.insert((column_id, kind), data);
if column_ids.contains(&column_id) {
let kind = stream.kind();
let data = Column::read_stream_async(reader, stream_offset, length).await?;
stream_map.insert((column_id, kind), data);
}

stream_offset += length;
}
Expand Down

0 comments on commit be3c0aa

Please sign in to comment.