Skip to content

Commit

Permalink
skip reading unused columns (#133)
Browse files Browse the repository at this point in the history
* skip reading unused columns

* fix lint

* supports complex data types

---------

Co-authored-by: zhangli20 <[email protected]>
  • Loading branch information
richox and zhangli20 authored Oct 24, 2024
1 parent d3fb27b commit b15df36
Showing 1 changed file with 24 additions and 13 deletions.
37 changes: 24 additions & 13 deletions src/stripe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::collections::HashSet;
use std::{collections::HashMap, io::Read, sync::Arc};

use bytes::Bytes;
Expand Down Expand Up @@ -139,23 +140,23 @@ impl Stripe {
.context(IoSnafu)?;
let footer = Arc::new(deserialize_stripe_footer(footer, compression)?);

let columns = projected_data_type
let columns: Vec<Column> = projected_data_type
.children()
.iter()
.map(|col| Column::new(col.name(), col.data_type(), &footer))
.collect();
let column_ids = collect_required_column_ids(&columns);

let mut stream_map = HashMap::new();
let mut stream_offset = info.offset();
for stream in &footer.streams {
let length = stream.length();
let column_id = stream.column();
let kind = stream.kind();
let data = Column::read_stream(reader, stream_offset, length)?;

// TODO(weny): filter out unused streams.
stream_map.insert((column_id, kind), data);

if column_ids.contains(&column_id) {
let kind = stream.kind();
let data = Column::read_stream(reader, stream_offset, length)?;
stream_map.insert((column_id, kind), data);
}
stream_offset += length;
}

Expand Down Expand Up @@ -192,22 +193,23 @@ impl Stripe {
.context(IoSnafu)?;
let footer = Arc::new(deserialize_stripe_footer(footer, compression)?);

let columns = projected_data_type
let columns: Vec<Column> = projected_data_type
.children()
.iter()
.map(|col| Column::new(col.name(), col.data_type(), &footer))
.collect();
let column_ids = collect_required_column_ids(&columns);

let mut stream_map = HashMap::new();
let mut stream_offset = info.offset();
for stream in &footer.streams {
let length = stream.length();
let column_id = stream.column();
let kind = stream.kind();
let data = Column::read_stream_async(reader, stream_offset, length).await?;

// TODO(weny): filter out unused streams.
stream_map.insert((column_id, kind), data);
if column_ids.contains(&column_id) {
let kind = stream.kind();
let data = Column::read_stream_async(reader, stream_offset, length).await?;
stream_map.insert((column_id, kind), data);
}

stream_offset += length;
}
Expand Down Expand Up @@ -282,3 +284,12 @@ fn deserialize_stripe_footer(
.context(error::IoSnafu)?;
StripeFooter::decode(buffer.as_slice()).context(error::DecodeProtoSnafu)
}

fn collect_required_column_ids(columns: &[Column]) -> HashSet<u32> {
let mut set = HashSet::new();
for column in columns {
set.insert(column.column_id());
set.extend(collect_required_column_ids(&column.children()));
}
set
}

0 comments on commit b15df36

Please sign in to comment.