diff --git a/src/stripe.rs b/src/stripe.rs index d43e9cee..c73ed8e8 100644 --- a/src/stripe.rs +++ b/src/stripe.rs @@ -16,6 +16,7 @@ // under the License. use std::{collections::HashMap, io::Read, sync::Arc}; +use std::collections::HashSet; use bytes::Bytes; use prost::Message; @@ -139,23 +140,23 @@ impl Stripe { .context(IoSnafu)?; let footer = Arc::new(deserialize_stripe_footer(footer, compression)?); - let columns = projected_data_type + let columns: Vec = projected_data_type .children() .iter() .map(|col| Column::new(col.name(), col.data_type(), &footer)) .collect(); + let column_ids: HashSet = columns.iter().map(|c| c.column_id()).collect(); let mut stream_map = HashMap::new(); let mut stream_offset = info.offset(); for stream in &footer.streams { let length = stream.length(); let column_id = stream.column(); - let kind = stream.kind(); - let data = Column::read_stream(reader, stream_offset, length)?; - - // TODO(weny): filter out unused streams. - stream_map.insert((column_id, kind), data); - + if column_ids.contains(&column_id) { + let kind = stream.kind(); + let data = Column::read_stream(reader, stream_offset, length)?; + stream_map.insert((column_id, kind), data); + } stream_offset += length; } @@ -192,22 +193,23 @@ impl Stripe { .context(IoSnafu)?; let footer = Arc::new(deserialize_stripe_footer(footer, compression)?); - let columns = projected_data_type + let columns: Vec = projected_data_type .children() .iter() .map(|col| Column::new(col.name(), col.data_type(), &footer)) .collect(); + let column_ids: HashSet = columns.iter().map(|c| c.column_id()).collect(); let mut stream_map = HashMap::new(); let mut stream_offset = info.offset(); for stream in &footer.streams { let length = stream.length(); let column_id = stream.column(); - let kind = stream.kind(); - let data = Column::read_stream_async(reader, stream_offset, length).await?; - - // TODO(weny): filter out unused streams. - stream_map.insert((column_id, kind), data); + if column_ids.contains(&column_id) { + let kind = stream.kind(); + let data = Column::read_stream_async(reader, stream_offset, length).await?; + stream_map.insert((column_id, kind), data); + } stream_offset += length; }