Skip to content

Commit

Permalink
Specialize Thrift Decoding (~40% Faster) (#4891) (#4892)
Browse files Browse the repository at this point in the history
* Specialize thrift (#4891)

* Review feedback
  • Loading branch information
tustvold authored Oct 10, 2023
1 parent c6387c1 commit 538a7bf
Show file tree
Hide file tree
Showing 15 changed files with 571 additions and 219 deletions.
6 changes: 1 addition & 5 deletions parquet/CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,6 @@ To compile and view in the browser, run `cargo doc --no-deps --open`.

## Update Parquet Format

To generate the parquet format (thrift definitions) code run from the repository root run

```
$ docker run -v $(pwd):/thrift/src -it archlinux pacman -Sy --noconfirm thrift && wget https://raw.githubusercontent.com/apache/parquet-format/apache-parquet-format-2.9.0/src/main/thrift/parquet.thrift -O /tmp/parquet.thrift && thrift --gen rs /tmp/parquet.thrift && sed -i '/use thrift::server::TProcessor;/d' parquet.rs && mv parquet.rs parquet/src/format.rs
```
To generate the parquet format (thrift definitions) code run [`./regen.sh`](./regen.sh).

You may need to manually patch up doc comments that contain unescaped `[]`
5 changes: 5 additions & 0 deletions parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -173,5 +173,10 @@ name = "compression"
required-features = ["experimental", "default"]
harness = false


[[bench]]
name = "metadata"
harness = false

[lib]
bench = false
42 changes: 42 additions & 0 deletions parquet/benches/metadata.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use bytes::Bytes;
use criterion::*;
use parquet::file::reader::SerializedFileReader;
use parquet::file::serialized_reader::ReadOptionsBuilder;

fn criterion_benchmark(c: &mut Criterion) {
// Read file into memory to isolate filesystem performance
let file = "../parquet-testing/data/alltypes_tiny_pages.parquet";
let data = std::fs::read(file).unwrap();
let data = Bytes::from(data);

c.bench_function("open(default)", |b| {
b.iter(|| SerializedFileReader::new(data.clone()).unwrap())
});

c.bench_function("open(page index)", |b| {
b.iter(|| {
let options = ReadOptionsBuilder::new().with_page_index().build();
SerializedFileReader::new_with_options(data.clone(), options).unwrap()
})
});
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
35 changes: 35 additions & 0 deletions parquet/regen.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#!/usr/bin/env bash

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

REVISION=aeae80660c1d0c97314e9da837de1abdebd49c37

SOURCE_DIR="$(cd "$(dirname "${BASH_SOURCE[0]:-$0}")" && pwd)"

docker run -v $SOURCE_DIR:/thrift/src -it archlinux pacman -Sy --noconfirm thrift && \
wget https://raw.githubusercontent.com/apache/parquet-format/$REVISION/src/main/thrift/parquet.thrift -O /tmp/parquet.thrift && \
thrift --gen rs /tmp/parquet.thrift && \
echo "Removing TProcessor" && \
sed -i '/use thrift::server::TProcessor;/d' parquet.rs && \
echo "Replacing TSerializable" && \
sed -i 's/impl TSerializable for/impl crate::thrift::TSerializable for/g' parquet.rs && \
echo "Rewriting write_to_out_protocol" && \
sed -i 's/fn write_to_out_protocol(&self, o_prot: &mut dyn TOutputProtocol)/fn write_to_out_protocol<T: TOutputProtocol>(\&self, o_prot: \&mut T)/g' parquet.rs && \
echo "Rewriting read_from_in_protocol" && \
sed -i 's/fn read_from_in_protocol(i_prot: &mut dyn TInputProtocol)/fn read_from_in_protocol<T: TInputProtocol>(i_prot: \&mut T)/g' parquet.rs && \
mv parquet.rs src/format.rs
3 changes: 2 additions & 1 deletion parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::iter::Peekable;
use std::slice::Iter;
use std::sync::{Arc, Mutex};
use std::vec::IntoIter;
use thrift::protocol::{TCompactOutputProtocol, TSerializable};
use thrift::protocol::TCompactOutputProtocol;

use arrow_array::cast::AsArray;
use arrow_array::types::*;
Expand All @@ -50,6 +50,7 @@ use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
use crate::file::reader::{ChunkReader, Length};
use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
use crate::schema::types::{ColumnDescPtr, SchemaDescriptor};
use crate::thrift::TSerializable;
use levels::{calculate_array_levels, ArrayLevels};

mod byte_array;
Expand Down
11 changes: 4 additions & 7 deletions parquet/src/arrow/async_reader/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use crate::arrow::async_reader::AsyncFileReader;
use crate::errors::{ParquetError, Result};
use crate::file::footer::{decode_footer, read_metadata};
use crate::file::footer::{decode_footer, decode_metadata};
use crate::file::metadata::ParquetMetaData;
use crate::file::page_index::index::Index;
use crate::file::page_index::index_reader::{
Expand All @@ -27,7 +27,6 @@ use bytes::Bytes;
use futures::future::BoxFuture;
use futures::FutureExt;
use std::future::Future;
use std::io::Read;
use std::ops::Range;

/// A data source that can be used with [`MetadataLoader`] to load [`ParquetMetaData`]
Expand Down Expand Up @@ -95,16 +94,14 @@ impl<F: MetadataFetch> MetadataLoader<F> {
// Did not fetch the entire file metadata in the initial read, need to make a second request
let (metadata, remainder) = if length > suffix_len - 8 {
let metadata_start = file_size - length - 8;
let remaining_metadata = fetch.fetch(metadata_start..footer_start).await?;

let reader = remaining_metadata.as_ref().chain(&suffix[..suffix_len - 8]);
(read_metadata(reader)?, None)
let meta = fetch.fetch(metadata_start..file_size - 8).await?;
(decode_metadata(&meta)?, None)
} else {
let metadata_start = file_size - length - 8 - footer_start;

let slice = &suffix[metadata_start..suffix_len - 8];
(
read_metadata(slice)?,
decode_metadata(slice)?,
Some((footer_start, suffix.slice(..metadata_start))),
)
};
Expand Down
3 changes: 2 additions & 1 deletion parquet/src/bin/parquet-layout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,13 @@ use std::io::Read;

use clap::Parser;
use serde::Serialize;
use thrift::protocol::{TCompactInputProtocol, TSerializable};
use thrift::protocol::TCompactInputProtocol;

use parquet::basic::{Compression, Encoding};
use parquet::errors::Result;
use parquet::file::reader::ChunkReader;
use parquet::format::PageHeader;
use parquet::thrift::TSerializable;

#[derive(Serialize, Debug)]
struct ParquetFile {
Expand Down
15 changes: 5 additions & 10 deletions parquet/src/bloom_filter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,12 @@ use crate::format::{
BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash, BloomFilterHeader,
SplitBlockAlgorithm, Uncompressed, XxHash,
};
use bytes::{Buf, Bytes};
use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
use bytes::Bytes;
use std::hash::Hasher;
use std::io::Write;
use std::sync::Arc;
use thrift::protocol::{
TCompactInputProtocol, TCompactOutputProtocol, TOutputProtocol, TSerializable,
};
use thrift::protocol::{TCompactOutputProtocol, TOutputProtocol};
use twox_hash::XxHash64;

/// Salt as defined in the [spec](https://github.com/apache/parquet-format/blob/master/BloomFilter.md#technical-approach).
Expand Down Expand Up @@ -152,15 +151,11 @@ fn read_bloom_filter_header_and_length(
buffer: Bytes,
) -> Result<(BloomFilterHeader, u64), ParquetError> {
let total_length = buffer.len();
let mut buf_reader = buffer.reader();
let mut prot = TCompactInputProtocol::new(&mut buf_reader);
let mut prot = TCompactSliceInputProtocol::new(buffer.as_ref());
let header = BloomFilterHeader::read_from_in_protocol(&mut prot).map_err(|e| {
ParquetError::General(format!("Could not read bloom filter header: {e}"))
})?;
Ok((
header,
(total_length - buf_reader.into_inner().remaining()) as u64,
))
Ok((header, (total_length - prot.as_slice().len()) as u64))
}

pub(crate) const BITSET_MIN_LENGTH: usize = 32;
Expand Down
13 changes: 4 additions & 9 deletions parquet/src/file/footer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use std::{io::Read, sync::Arc};

use crate::format::{ColumnOrder as TColumnOrder, FileMetaData as TFileMetaData};
use thrift::protocol::{TCompactInputProtocol, TSerializable};
use crate::thrift::{TCompactSliceInputProtocol, TSerializable};

use crate::basic::ColumnOrder;

Expand Down Expand Up @@ -62,18 +62,13 @@ pub fn parse_metadata<R: ChunkReader>(chunk_reader: &R) -> Result<ParquetMetaDat
}

let start = file_size - footer_metadata_len as u64;
read_metadata(chunk_reader.get_read(start)?)
decode_metadata(chunk_reader.get_bytes(start, metadata_len)?.as_ref())
}

/// Decodes [`ParquetMetaData`] from the provided bytes
pub fn decode_metadata(metadata_read: &[u8]) -> Result<ParquetMetaData> {
read_metadata(metadata_read)
}

/// Decodes [`ParquetMetaData`] from the provided [`Read`]
pub(crate) fn read_metadata<R: Read>(read: R) -> Result<ParquetMetaData> {
pub fn decode_metadata(buf: &[u8]) -> Result<ParquetMetaData> {
// TODO: row group filtering
let mut prot = TCompactInputProtocol::new(read);
let mut prot = TCompactSliceInputProtocol::new(buf);
let t_file_metadata: TFileMetaData = TFileMetaData::read_from_in_protocol(&mut prot)
.map_err(|e| ParquetError::General(format!("Could not parse metadata: {e}")))?;
let schema = types::from_thrift(&t_file_metadata.schema)?;
Expand Down
8 changes: 3 additions & 5 deletions parquet/src/file/page_index/index_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@ use crate::file::metadata::ColumnChunkMetaData;
use crate::file::page_index::index::{Index, NativeIndex};
use crate::file::reader::ChunkReader;
use crate::format::{ColumnIndex, OffsetIndex, PageLocation};
use std::io::Cursor;
use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
use std::ops::Range;
use thrift::protocol::{TCompactInputProtocol, TSerializable};

/// Computes the covering range of two optional ranges
///
Expand Down Expand Up @@ -116,7 +115,7 @@ pub fn read_pages_locations<R: ChunkReader>(
pub(crate) fn decode_offset_index(
data: &[u8],
) -> Result<Vec<PageLocation>, ParquetError> {
let mut prot = TCompactInputProtocol::new(data);
let mut prot = TCompactSliceInputProtocol::new(data);
let offset = OffsetIndex::read_from_in_protocol(&mut prot)?;
Ok(offset.page_locations)
}
Expand All @@ -125,8 +124,7 @@ pub(crate) fn decode_column_index(
data: &[u8],
column_type: Type,
) -> Result<Index, ParquetError> {
let mut d = Cursor::new(data);
let mut prot = TCompactInputProtocol::new(&mut d);
let mut prot = TCompactSliceInputProtocol::new(data);

let index = ColumnIndex::read_from_in_protocol(&mut prot)?;

Expand Down
12 changes: 6 additions & 6 deletions parquet/src/file/serialized_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
//! Also contains implementations of the ChunkReader for files (with buffering) and byte arrays (RAM)
use std::collections::VecDeque;
use std::io::Cursor;
use std::iter;
use std::{convert::TryFrom, fs::File, io::Read, path::Path, sync::Arc};

Expand All @@ -40,8 +39,9 @@ use crate::format::{PageHeader, PageLocation, PageType};
use crate::record::reader::RowIter;
use crate::record::Row;
use crate::schema::types::Type as SchemaType;
use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
use crate::util::memory::ByteBufferPtr;
use thrift::protocol::{TCompactInputProtocol, TSerializable};
use thrift::protocol::TCompactInputProtocol;

impl TryFrom<File> for SerializedFileReader<File> {
type Error = ParquetError;
Expand Down Expand Up @@ -661,11 +661,11 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {

let buffer = self.reader.get_bytes(front.offset as u64, page_len)?;

let mut cursor = Cursor::new(buffer.as_ref());
let header = read_page_header(&mut cursor)?;
let offset = cursor.position();
let mut prot = TCompactSliceInputProtocol::new(buffer.as_ref());
let header = PageHeader::read_from_in_protocol(&mut prot)?;
let offset = buffer.len() - prot.as_slice().len();

let bytes = buffer.slice(offset as usize..);
let bytes = buffer.slice(offset..);
decode_page(
header,
bytes.into(),
Expand Down
3 changes: 2 additions & 1 deletion parquet/src/file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@
use crate::bloom_filter::Sbbf;
use crate::format as parquet;
use crate::format::{ColumnIndex, OffsetIndex, RowGroup};
use crate::thrift::TSerializable;
use std::fmt::Debug;
use std::io::{BufWriter, IoSlice, Read};
use std::{io::Write, sync::Arc};
use thrift::protocol::{TCompactOutputProtocol, TSerializable};
use thrift::protocol::TCompactOutputProtocol;

use crate::column::writer::{
get_typed_column_writer_mut, ColumnCloseResult, ColumnWriterImpl,
Expand Down
Loading

0 comments on commit 538a7bf

Please sign in to comment.