diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 96db44882cfa..e27e035dac95 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -30,8 +30,10 @@ edition = "2018" [dependencies] # update note: pin `parquet-format` to specific version until it does not break at minor -# version, see ARROW-11187. -parquet-format = "~2.6.1" +# version, see ARROW-11187. update: since this comment, it is now pinned at ~4.0.0 and +# upstream arrow-rs parquet vendors it +parquet-format = "~4.0.0" +aes-gcm = "0.10.3" byteorder = "1" thrift = "0.13" snap = { version = "1.0", optional = true } @@ -45,7 +47,9 @@ arrow = { path = "../arrow", version = "5.0.0", optional = true } base64 = { version = "0.13", optional = true } clap = { version = "2.33.3", optional = true } serde_json = { version = "1.0", features = ["preserve_order"], optional = true } +serde = { version = "1.0.115", features = ["derive"] } rand = "0.8" +sha3 = "0.10.8" [dev-dependencies] criterion = "0.3" diff --git a/parquet/src/basic.rs b/parquet/src/basic.rs index 631257e0ed1d..198b6a382173 100644 --- a/parquet/src/basic.rs +++ b/parquet/src/basic.rs @@ -250,6 +250,15 @@ pub enum Encoding { /// /// The ids are encoded using the RLE encoding. RLE_DICTIONARY, + + /// Encoding for floating-point data. + /// + /// K byte-streams are created where K is the size in bytes of the data type. + /// The individual bytes of an FP value are scattered to the corresponding stream and + /// the streams are concatenated. + /// This itself does not reduce the size of the data but can lead to better compression + /// afterwards. + BYTE_STREAM_SPLIT, } // ---------------------------------------------------------------------- @@ -701,6 +710,7 @@ impl convert::From for Encoding { parquet::Encoding::DeltaLengthByteArray => Encoding::DELTA_LENGTH_BYTE_ARRAY, parquet::Encoding::DeltaByteArray => Encoding::DELTA_BYTE_ARRAY, parquet::Encoding::RleDictionary => Encoding::RLE_DICTIONARY, + parquet::Encoding::ByteStreamSplit => Encoding::BYTE_STREAM_SPLIT, } } } @@ -716,6 +726,7 @@ impl convert::From for parquet::Encoding { Encoding::DELTA_LENGTH_BYTE_ARRAY => parquet::Encoding::DeltaLengthByteArray, Encoding::DELTA_BYTE_ARRAY => parquet::Encoding::DeltaByteArray, Encoding::RLE_DICTIONARY => parquet::Encoding::RleDictionary, + Encoding::BYTE_STREAM_SPLIT => parquet::Encoding::ByteStreamSplit, } } } diff --git a/parquet/src/column/page.rs b/parquet/src/column/page.rs index b75d3b5028bb..c046651486d0 100644 --- a/parquet/src/column/page.rs +++ b/parquet/src/column/page.rs @@ -139,11 +139,14 @@ impl CompressedPage { self.uncompressed_size } - /// Returns compressed size in bytes. + /// Returns compressed size (but not encrypted size) in bytes. /// - /// Note that it is assumed that buffer is compressed, but it may not be. In this - /// case compressed size will be equal to uncompressed size. - pub fn compressed_size(&self) -> usize { + /// Note that it is assumed that buffer is compressed, but it may not be. In this case + /// compressed size will be equal to uncompressed size. + /// + /// Other so-called "(total_)?compressed_size" fields include encryption overhead, when + /// applicable, which this does not. + pub fn compressed_unencrypted_size(&self) -> usize { self.compressed_page.buffer().len() } @@ -206,7 +209,7 @@ pub trait PageWriter { /// /// This method is called for every compressed page we write into underlying buffer, /// either data page or dictionary page. - fn write_page(&mut self, page: CompressedPage) -> Result; + fn write_page(&mut self, page: CompressedPage, aad_page_ordinal: Option) -> Result; /// Writes column chunk metadata into the output stream/sink. /// @@ -299,7 +302,7 @@ mod tests { assert_eq!(cpage.page_type(), PageType::DATA_PAGE); assert_eq!(cpage.uncompressed_size(), 5); - assert_eq!(cpage.compressed_size(), 3); + assert_eq!(cpage.compressed_unencrypted_size(), 3); assert_eq!(cpage.num_values(), 10); assert_eq!(cpage.encoding(), Encoding::PLAIN); assert_eq!(cpage.data(), &[0, 1, 2]); diff --git a/parquet/src/column/writer.rs b/parquet/src/column/writer.rs index 910a9ed5dcaf..d20fa9f7d774 100644 --- a/parquet/src/column/writer.rs +++ b/parquet/src/column/writer.rs @@ -168,6 +168,7 @@ pub struct ColumnWriterImpl { descr: ColumnDescPtr, props: WriterPropertiesPtr, page_writer: Box, + page_ordinal: usize, has_dictionary: bool, dict_encoder: Option>, encoder: Box>, @@ -185,6 +186,8 @@ pub struct ColumnWriterImpl { total_bytes_written: u64, total_rows_written: u64, total_uncompressed_size: u64, + // Includes encryption overhead -- the thrift definition field includes encryption overhead, and + // we keep its name here. total_compressed_size: u64, total_num_values: u64, dictionary_page_offset: Option, @@ -231,10 +234,14 @@ impl ColumnWriterImpl { ) .unwrap(); + // We start counting pages from zero. + let page_ordinal: usize = 0; + Self { descr, props, page_writer, + page_ordinal, has_dictionary, dict_encoder, encoder: fallback_encoder, @@ -824,7 +831,10 @@ impl ColumnWriterImpl { /// Writes compressed data page into underlying sink and updates global metrics. #[inline] fn write_data_page(&mut self, page: CompressedPage) -> Result<()> { - let page_spec = self.page_writer.write_page(page)?; + let page_ordinal = self.page_ordinal; + let aad_page_ordinal: Option = Some(page_ordinal as u16); + self.page_ordinal += 1; + let page_spec = self.page_writer.write_page(page, aad_page_ordinal)?; self.update_metrics_for_page(page_spec); Ok(()) } @@ -858,7 +868,7 @@ impl ColumnWriterImpl { CompressedPage::new(dict_page, uncompressed_size) }; - let page_spec = self.page_writer.write_page(compressed_page)?; + let page_spec = self.page_writer.write_page(compressed_page, None)?; self.update_metrics_for_page(page_spec); Ok(()) } @@ -1026,10 +1036,10 @@ fn has_dictionary_support(kind: Type, props: &WriterProperties) -> bool { mod tests { use rand::distributions::uniform::SampleUniform; - use crate::column::{ + use crate::{column::{ page::PageReader, reader::{get_column_reader, get_typed_column_reader, ColumnReaderImpl}, - }; + }, file::encryption::USUAL_ENCRYPTION_OVERHEAD}; use crate::file::{ properties::WriterProperties, reader::SerializedPageReader, writer::SerializedPageWriter, @@ -1642,13 +1652,16 @@ mod tests { ); } + const TEST_ROW_GROUP_ORDINAL: i16 = 1234; + const TEST_COLUMN_ORDINAL: u16 = 135; + #[test] fn test_column_writer_add_data_pages_with_dict() { // ARROW-5129: Test verifies that we add data page in case of dictionary encoding // and no fallback occurred so far. let file = get_temp_file("test_column_writer_add_data_pages_with_dict", &[]); let sink = FileSink::new(&file); - let page_writer = Box::new(SerializedPageWriter::new(sink)); + let page_writer = Box::new(SerializedPageWriter::new(sink, None, TEST_ROW_GROUP_ORDINAL, TEST_COLUMN_ORDINAL)); let props = Arc::new( WriterProperties::builder() .set_data_pagesize_limit(15) // actually each page will have size 15-18 bytes @@ -1656,7 +1669,7 @@ mod tests { .build(), ); let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; - let mut writer = get_test_column_writer::(page_writer, 0, 0, props); + let mut writer = get_test_column_writer::(page_writer, 0, 0, props.clone()); writer.write_batch(data, None, None).unwrap(); let (bytes_written, _, _) = writer.close().unwrap(); @@ -1665,8 +1678,12 @@ mod tests { let mut page_reader = Box::new( SerializedPageReader::new( source, + None, + TEST_ROW_GROUP_ORDINAL, + TEST_COLUMN_ORDINAL, data.len() as i64, Compression::UNCOMPRESSED, + props.dictionary_enabled(&ColumnPath::from("col")), Int32Type::get_physical_type(), ) .unwrap(), @@ -1803,7 +1820,7 @@ mod tests { ) { let file = get_temp_file(file_name, &[]); let sink = FileSink::new(&file); - let page_writer = Box::new(SerializedPageWriter::new(sink)); + let page_writer = Box::new(SerializedPageWriter::new(sink, None, TEST_ROW_GROUP_ORDINAL, TEST_COLUMN_ORDINAL)); let max_def_level = match def_levels { Some(buf) => *buf.iter().max().unwrap_or(&0i16), @@ -1823,11 +1840,12 @@ mod tests { max_batch_size = cmp::max(max_batch_size, levels.len()); } + let props = Arc::new(props); let mut writer = get_test_column_writer::( page_writer, max_def_level, max_rep_level, - Arc::new(props), + props.clone(), ); let values_written = writer.write_batch(values, def_levels, rep_levels).unwrap(); @@ -1838,8 +1856,12 @@ mod tests { let page_reader = Box::new( SerializedPageReader::new( source, + None, + TEST_ROW_GROUP_ORDINAL, + TEST_COLUMN_ORDINAL, column_metadata.num_values(), column_metadata.compression(), + props.dictionary_enabled(&ColumnPath::from("col")), T::get_physical_type(), ) .unwrap(), @@ -1977,20 +1999,39 @@ mod tests { /// Returns page writer that collects pages without serializing them. fn get_test_page_writer() -> Box { - Box::new(TestPageWriter {}) + Box::new(TestPageWriter {simulate_encrypted: false, last_page_ordinal: None}) } - struct TestPageWriter {} + struct TestPageWriter { + /// Always false, currently -- enabling would just affect return values that get fed into + /// test assertions. + simulate_encrypted: bool, + last_page_ordinal: Option, + } impl PageWriter for TestPageWriter { - fn write_page(&mut self, page: CompressedPage) -> Result { + fn write_page(&mut self, page: CompressedPage, aad_page_ordinal: Option) -> Result { + // We're a bit loose in this assertion -- the caller could write or not write a dictionary page. + match aad_page_ordinal { + Some(n) if n != 0 => { + assert_eq!(self.last_page_ordinal, Some(n - 1)); + } + _ => { + assert_eq!(None, self.last_page_ordinal); + } + } + self.last_page_ordinal = aad_page_ordinal; + + // Note, the normal PageWriteSpec result would include PageMetaData overhead, and these + // values are thus not perfectly faked, but the only thing that looks at them are test + // assertions. let mut res = PageWriteSpec::new(); res.page_type = page.page_type(); res.uncompressed_size = page.uncompressed_size(); - res.compressed_size = page.compressed_size(); + res.compressed_size = self.simulate_encrypted as usize * USUAL_ENCRYPTION_OVERHEAD + page.compressed_unencrypted_size(); res.num_values = page.num_values(); res.offset = 0; - res.bytes_written = page.data().len() as u64; + res.bytes_written = (self.simulate_encrypted as usize * USUAL_ENCRYPTION_OVERHEAD + page.data().len()) as u64; Ok(res) } diff --git a/parquet/src/file/encryption.rs b/parquet/src/file/encryption.rs new file mode 100644 index 000000000000..3680db28ba3f --- /dev/null +++ b/parquet/src/file/encryption.rs @@ -0,0 +1,217 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::io::{Cursor, Read, Write}; +use std::convert::TryFrom; + +use aes_gcm::aead::AeadMutInPlace; +use aes_gcm::{KeySizeUser, Tag}; +use aes_gcm::{AeadCore as _, Aes256Gcm, KeyInit as _, Nonce}; +use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; +use rand::{rngs::OsRng, RngCore as _}; +use serde::{Deserialize, Serialize}; +use sha3::{Digest, Sha3_224}; + +use crate::errors::{ParquetError, Result}; + +use crate::file::{PARQUET_MAGIC, PARQUET_MAGIC_ENCRYPTED_FOOTER_CUBE}; + +pub type ParquetEncryptionKeyId = String; + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct ParquetEncryptionKeyInfo { + pub key_id: ParquetEncryptionKeyId, + pub key: ParquetEncryptionKey, +} + +/// Describes general parquet encryption configuration -- new files are encrypted with the +/// write_key(), but old files can be decrypted with any of the valid read keys. +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct ParquetEncryptionConfig { + // The last key is the write key, and all the keys are valid read keys. + keys: Vec, +} + +impl ParquetEncryptionConfig { + pub fn new(keys: Vec) -> Option { + if keys.is_empty() { + None + } else { + Some(ParquetEncryptionConfig{ keys }) + } + } + + pub fn write_key(&self) -> &ParquetEncryptionKeyInfo { + self.keys.last().unwrap() + } + + pub fn read_keys(&self) -> &[ParquetEncryptionKeyInfo] { + self.keys.as_slice() + } +} + +// Since keys are 32 bytes (being 256 bits), we use 28-byte hashes to avoid mistaking a key for a +// key hash. +pub const PARQUET_KEY_HASH_LENGTH: usize = 28; +pub const PARQUET_KEY_SIZE: usize = 32; // Aes256Gcm, hence 32 bytes + +/// Describes how we encrypt or encrypted the Parquet files. Right now (in this implementation) +/// files can only be encrypted in "encrypted footer mode" with the footer and columns all encrypted +/// with the same key. + +#[derive(Serialize, Deserialize, Debug, Clone, Copy)] +pub struct ParquetEncryptionKey { + /// The key we use for all parts and components of the Parquet files. + pub key: [u8; PARQUET_KEY_SIZE] +} + +impl ParquetEncryptionKey { + pub fn default() -> ParquetEncryptionKey { + ParquetEncryptionKey{key: Default::default()} + } + + pub fn key_size() -> usize { + Aes256Gcm::key_size() + } + + pub fn generate_key() -> ParquetEncryptionKey { + let key = Aes256Gcm::generate_key(OsRng); + let mut result = ParquetEncryptionKey::default(); + result.key.copy_from_slice(&key); + result + } + + pub fn to_aes256_gcm_key(&self) -> aes_gcm::Key { + let mut result = aes_gcm::Key::::default(); + let r: &mut [u8] = &mut result; + r.copy_from_slice(&self.key); + result + } + + pub fn compute_key_hash(&self) -> [u8; PARQUET_KEY_HASH_LENGTH] { + let mut hasher = Sha3_224::new(); + hasher.update(&self.key); + let result = hasher.finalize(); + result.into() + } +} + +pub const AAD_FILE_UNIQUE_SIZE: usize = 20; +pub type RandomFileIdentifier = [u8; AAD_FILE_UNIQUE_SIZE]; + +const NONCE_SIZE: usize = 12; +const TAG_SIZE: usize = 16; +/// The value is 32. A 4-byte length field, 12-byte nonce, 16-byte tag. +pub const USUAL_ENCRYPTION_OVERHEAD: usize = 4 + NONCE_SIZE + TAG_SIZE; + +pub fn generate_random_file_identifier() -> RandomFileIdentifier { + let mut v = [0u8; AAD_FILE_UNIQUE_SIZE]; + OsRng.fill_bytes(&mut v); + v +} + +/// Returns the magic to use at the beginning and end of the file (depending on whether we use footer encryption) +pub fn parquet_magic(is_footer_encrypted: bool) -> [u8; 4] { + // For now ParquetEncryptionKey only allows footer encryption mode. And we use a custom "PARC" + // magic until we have checked that we're exactly following the format spec defined with "PARE". + if !is_footer_encrypted { PARQUET_MAGIC } else { PARQUET_MAGIC_ENCRYPTED_FOOTER_CUBE } +} + +// TODO: Could return fixed length array or some flat array,size pair instead of allocating. +pub fn parquet_aad_suffix(file_identifier: &RandomFileIdentifier, aad_module_type: u8, row_group_ordinal: i16, column_ordinal: u16, page_ordinal: Option) -> Vec { + let mut aad = Vec::::new(); + aad.extend_from_slice(file_identifier); + aad.push(aad_module_type); + let _ = aad.write_i16::(row_group_ordinal); + let _ = aad.write_u16::(column_ordinal); + if let Some(page_ordinal) = page_ordinal { + let _ = aad.write_u16::(page_ordinal); + } + aad +} + +/// PrepaddedPlaintext simply carries a buf with 16 empty bytes at the front. Then you can append +/// plaintext to it and pass it to encrypt_module, and it can then encrypt in-place and pass to the +/// Write with a single call. +pub struct PrepaddedPlaintext { + buf: Vec, +} + +impl PrepaddedPlaintext { + /// Constructs a buf for appending with plaintext and passing to encrypt_module. It is + /// recommended that you use the result of self.buf_mut() as a `Write` to append the plaintext. + pub fn new() -> PrepaddedPlaintext { + PrepaddedPlaintext{buf: vec![0u8; 16]} + } + pub fn buf_mut(&mut self) -> &mut Vec { + &mut self.buf + } +} + +/// Writes "length (4 bytes) nonce (12 bytes) ciphertext (length - 28 bytes) tag (16 bytes)" +pub fn encrypt_module(what: &str, w: &mut W, encryption_key: &ParquetEncryptionKey, mut prepadded: PrepaddedPlaintext, aad: &[u8]) -> Result<()> { + let mut cipher = Aes256Gcm::new(&encryption_key.to_aes256_gcm_key()); + let nonce = Aes256Gcm::generate_nonce(&mut OsRng); + + let buf = prepadded.buf_mut(); + let buflen = buf.len(); + let tag: Tag<_>; + { + let (front, plaintext) = buf.split_at_mut(4 + NONCE_SIZE); + + let written_len = u32::try_from(buflen - 4 + TAG_SIZE) + .map_err(|_| general_err!("Error encrypting {}. Module is too large", what))?; + front[..4].copy_from_slice(&u32::to_le_bytes(written_len)); + front[4..].copy_from_slice(&nonce); + + tag = cipher.encrypt_in_place_detached(&nonce, aad, plaintext) + .map_err(|_| general_err!("Error encrypting {}", what))?; + } + + buf.extend_from_slice(&tag); + + w.write_all(buf)?; + Ok(()) +} + +pub fn decrypt_module(what: &str, mut r: R, encryption_key: &ParquetEncryptionKey, aad: &[u8]) -> Result>> { + let mut cipher = Aes256Gcm::new(&encryption_key.to_aes256_gcm_key()); + + let buflen = r.read_u32::()?; + let buflen = buflen as usize; + if buflen < NONCE_SIZE + TAG_SIZE { + return Err(general_err!("Invalid Parquet file. Encrypted buffer length too short")); + } + let mut buf = vec![0u8; buflen]; + r.read_exact(&mut buf)?; + + let nonce = *Nonce::from_slice(&buf[..NONCE_SIZE]); + let tag = *Tag::from_slice(&buf[buflen - TAG_SIZE..]); + + cipher.decrypt_in_place_detached(&nonce, aad, &mut buf[NONCE_SIZE..buflen - TAG_SIZE], &tag) + .map_err(|_| general_err!("Error decrypting {}", what))?; + + // Now trim the buf of its trailing tag, and return a Cursor that skips past the nonce. + // And just to prevent any weirdness, zero out the nonce. + buf.truncate(buflen - TAG_SIZE); + buf[..NONCE_SIZE].fill(0); + + let mut cursor = Cursor::new(buf); + cursor.set_position(NONCE_SIZE as u64); + + Ok(cursor) +} diff --git a/parquet/src/file/footer.rs b/parquet/src/file/footer.rs index 1e2d95fc1c9f..90a167615608 100644 --- a/parquet/src/file/footer.rs +++ b/parquet/src/file/footer.rs @@ -22,7 +22,7 @@ use std::{ }; use byteorder::{ByteOrder, LittleEndian}; -use parquet_format::{ColumnOrder as TColumnOrder, FileMetaData as TFileMetaData}; +use parquet_format::{ColumnOrder as TColumnOrder, FileCryptoMetaData as TFileCryptoMetaData, FileMetaData as TFileMetaData}; use thrift::protocol::TCompactInputProtocol; use crate::basic::ColumnOrder; @@ -35,6 +35,27 @@ use crate::file::{ use crate::schema::types::{self, SchemaDescriptor}; +use crate::file::{encryption::{decrypt_module, parquet_magic, ParquetEncryptionConfig, PARQUET_KEY_HASH_LENGTH, ParquetEncryptionKey, ParquetEncryptionKeyInfo, RandomFileIdentifier, AAD_FILE_UNIQUE_SIZE}, PARQUET_MAGIC_ENCRYPTED_FOOTER_CUBE, PARQUET_MAGIC_UNSUPPORTED_PARE}; + +fn select_key(encryption_config: &ParquetEncryptionConfig, key_metadata: &Option>) -> Result { + if let Some(key_id) = key_metadata { + if key_id.len() != PARQUET_KEY_HASH_LENGTH { + return Err(general_err!("Unsupported Parquet file. key_metadata field length is not supported")); + } + let mut key_id_arr = [0u8; PARQUET_KEY_HASH_LENGTH]; + key_id_arr.copy_from_slice(&key_id); + let read_keys: &[ParquetEncryptionKeyInfo] = encryption_config.read_keys(); + for key_info in read_keys { + if key_info.key.compute_key_hash() == key_id_arr { + return Ok(key_info.key) + } + } + return Err(general_err!("Parquet file is encrypted with an unknown or out-of-rotation key")); + } else { + return Err(general_err!("Unsupported Parquet file. Expecting key_metadata field to be used")); + } +} + /// Layout of Parquet file /// +---------------------------+-----+---+ /// | Rest of file | B | A | @@ -43,7 +64,7 @@ use crate::schema::types::{self, SchemaDescriptor}; /// /// The reader first reads DEFAULT_FOOTER_SIZE bytes from the end of the file. /// If it is not enough according to the length indicated in the footer, it reads more bytes. -pub fn parse_metadata(chunk_reader: &R) -> Result { +pub fn parse_metadata(chunk_reader: &R, encryption_config: &Option) -> Result<(ParquetMetaData, Option)> { // check file is large enough to hold footer let file_size = chunk_reader.len(); if file_size < (FOOTER_SIZE as u64) { @@ -60,8 +81,19 @@ pub fn parse_metadata(chunk_reader: &R) -> Result(chunk_reader: &R) -> Result; + + let mut metadata_read: Box; if footer_metadata_len > file_size as usize { return Err(general_err!( "Invalid Parquet file. Metadata start is less than zero ({})", @@ -86,21 +121,71 @@ pub fn parse_metadata(chunk_reader: &R) -> Result; + + let random_file_identifier: Option; + if let Some(encryption_config) = encryption_config { + let file_crypto_metadata = { + let mut prot = TCompactInputProtocol::new(&mut metadata_read); + TFileCryptoMetaData::read_from_in_protocol(&mut prot) + .map_err(|e| ParquetError::General(format!("Could not parse crypto metadata: {}", e)))? + }; + + let encryption_key = select_key(encryption_config, &file_crypto_metadata.key_metadata)?; + + let mut aad_file_unique: RandomFileIdentifier; + // TODO: What's to stop somebody from switching out aad_file_unique with their own value and then swapping components between files? + match file_crypto_metadata.encryption_algorithm { + parquet_format::EncryptionAlgorithm::AESGCMV1(gcmv1) => { + if gcmv1.aad_prefix.is_some() || gcmv1.supply_aad_prefix.is_some() { + return Err(general_err!("Unsupported Parquet file. Use of aad_prefix is not expected")); + } + if let Some(afu) = gcmv1.aad_file_unique { + if afu.len() != AAD_FILE_UNIQUE_SIZE { + return Err(general_err!("Unsupported Parquet file. aad_file_unique is not of the expected size")); + } + aad_file_unique = [0u8; AAD_FILE_UNIQUE_SIZE]; + aad_file_unique.copy_from_slice(&afu); + } else { + return Err(general_err!("Unsupported Parquet file. aad_file_unique must be set")); + } + }, + parquet_format::EncryptionAlgorithm::AESGCMCTRV1(_) => { + return Err(general_err!("Unsupported Parquet file. AES_GCM_CTR_V1 mode is not expected")); + } + } + + let no_aad = &[]; + let plaintext_cursor = decrypt_module("footer", metadata_read, &encryption_key, no_aad)?; + + metadata_read = Box::new(plaintext_cursor); + + returned_encryption_key = Some(encryption_key); + random_file_identifier = Some(aad_file_unique); + } else { + returned_encryption_key = None; + random_file_identifier = None; + } + // TODO: row group filtering let mut prot = TCompactInputProtocol::new(metadata_read); let t_file_metadata: TFileMetaData = TFileMetaData::read_from_in_protocol(&mut prot) .map_err(|e| ParquetError::General(format!("Could not parse metadata: {}", e)))?; + if t_file_metadata.encryption_algorithm.is_some() || t_file_metadata.footer_signing_key_metadata.is_some() { + return Err(general_err!("Unsupported Parquet file. Plaintext footer mode is not supported")); + } + let schema = types::from_thrift(&t_file_metadata.schema)?; let schema_descr = Arc::new(SchemaDescriptor::new(schema)); let mut row_groups = Vec::new(); @@ -116,12 +201,13 @@ pub fn parse_metadata(chunk_reader: &R) -> Result>, schema_descr: SchemaDescPtr, column_orders: Option>, + random_file_identifier: Option, } impl FileMetaData { @@ -124,6 +127,7 @@ impl FileMetaData { key_value_metadata: Option>, schema_descr: SchemaDescPtr, column_orders: Option>, + random_file_identifier: Option, ) -> Self { FileMetaData { version, @@ -132,6 +136,7 @@ impl FileMetaData { key_value_metadata, schema_descr, column_orders, + random_file_identifier, } } @@ -196,6 +201,10 @@ impl FileMetaData { .map(|data| data[i]) .unwrap_or(ColumnOrder::UNDEFINED) } + + pub fn random_file_identifier(&self) -> &Option { + &self.random_file_identifier + } } /// Reference counted pointer for [`RowGroupMetaData`]. @@ -208,12 +217,14 @@ pub struct RowGroupMetaData { num_rows: i64, total_byte_size: i64, schema_descr: SchemaDescPtr, + /// Ordinal position of this row group in file + ordinal: Option, } impl RowGroupMetaData { /// Returns builer for row group metadata. - pub fn builder(schema_descr: SchemaDescPtr) -> RowGroupMetaDataBuilder { - RowGroupMetaDataBuilder::new(schema_descr) + pub fn builder(schema_descr: SchemaDescPtr, ordinal: i16) -> RowGroupMetaDataBuilder { + RowGroupMetaDataBuilder::new(schema_descr, ordinal) } /// Number of columns in this row group. @@ -269,21 +280,33 @@ impl RowGroupMetaData { let cc = ColumnChunkMetaData::from_thrift(d.clone(), c)?; columns.push(cc); } + // Notably, the function to_thrift, below, doesn't write these fields, and RowGroupMetadata doesn't have them. + if rg.file_offset.is_some() { + return Err(ParquetError::NYI("Parsing RowGroup file_offset fields is not yet implemented".to_string())); + } + if rg.total_compressed_size.is_some() { + return Err(ParquetError::NYI("Parsing RowGroup total_compressed_size fields is not yet implemented".to_string())); + } Ok(RowGroupMetaData { columns, num_rows, total_byte_size, schema_descr, + ordinal: rg.ordinal, }) } /// Method to convert to Thrift. pub fn to_thrift(&self) -> RowGroup { + // TODO: Understand file_offset and total_compressed_size fields. RowGroup { columns: self.columns().iter().map(|v| v.to_thrift()).collect(), total_byte_size: self.total_byte_size, num_rows: self.num_rows, sorting_columns: None, + file_offset: None, + total_compressed_size: None, + ordinal: self.ordinal, } } } @@ -294,16 +317,18 @@ pub struct RowGroupMetaDataBuilder { schema_descr: SchemaDescPtr, num_rows: i64, total_byte_size: i64, + ordinal: Option, } impl RowGroupMetaDataBuilder { /// Creates new builder from schema descriptor. - fn new(schema_descr: SchemaDescPtr) -> Self { + fn new(schema_descr: SchemaDescPtr, ordinal: i16) -> Self { Self { columns: Vec::with_capacity(schema_descr.num_columns()), schema_descr, num_rows: 0, total_byte_size: 0, + ordinal: Some(ordinal), } } @@ -325,6 +350,12 @@ impl RowGroupMetaDataBuilder { self } + /// Sets ordinal for this row group. + pub fn set_ordinal(mut self, value: i16) -> Self { + self.ordinal = Some(value); + self + } + /// Builds row group metadata. pub fn build(self) -> Result { if self.schema_descr.num_columns() != self.columns.len() { @@ -340,6 +371,7 @@ impl RowGroupMetaDataBuilder { num_rows: self.num_rows, total_byte_size: self.total_byte_size, schema_descr: self.schema_descr, + ordinal: self.ordinal, }) } } @@ -497,6 +529,9 @@ impl ColumnChunkMetaData { let index_page_offset = col_metadata.index_page_offset; let dictionary_page_offset = col_metadata.dictionary_page_offset; let statistics = statistics::from_thrift(column_type, col_metadata.statistics); + if col_metadata.bloom_filter_offset.is_some() { + return Err(ParquetError::NYI("Parsing ColumnMetaData bloom_filter_offset fields is not yet implemented".to_string())) + } let result = ColumnChunkMetaData { column_type, column_path, @@ -532,6 +567,7 @@ impl ColumnChunkMetaData { dictionary_page_offset: self.dictionary_page_offset, statistics: statistics::to_thrift(self.statistics.as_ref()), encoding_stats: None, + bloom_filter_offset: None, }; ColumnChunk { @@ -542,6 +578,8 @@ impl ColumnChunkMetaData { offset_index_length: None, column_index_offset: None, column_index_length: None, + crypto_metadata: None, + encrypted_column_metadata: None, } } } @@ -672,6 +710,8 @@ impl ColumnChunkMetaDataBuilder { mod tests { use super::*; + const TEST_ROW_GROUP_ORDINAL: i16 = 124; + #[test] fn test_row_group_metadata_thrift_conversion() { let schema_descr = get_test_schema_descr(); @@ -681,7 +721,7 @@ mod tests { let column = ColumnChunkMetaData::builder(ptr.clone()).build().unwrap(); columns.push(column); } - let row_group_meta = RowGroupMetaData::builder(schema_descr.clone()) + let row_group_meta = RowGroupMetaData::builder(schema_descr.clone(), TEST_ROW_GROUP_ORDINAL) .set_num_rows(1000) .set_total_byte_size(2000) .set_column_metadata(columns) @@ -701,7 +741,7 @@ mod tests { fn test_row_group_metadata_thrift_conversion_empty() { let schema_descr = get_test_schema_descr(); - let row_group_meta = RowGroupMetaData::builder(schema_descr).build(); + let row_group_meta = RowGroupMetaData::builder(schema_descr, TEST_ROW_GROUP_ORDINAL).build(); assert!(row_group_meta.is_err()); if let Err(e) = row_group_meta { @@ -769,7 +809,7 @@ mod tests { .unwrap(); columns.push(column); } - let row_group_meta = RowGroupMetaData::builder(schema_descr) + let row_group_meta = RowGroupMetaData::builder(schema_descr, TEST_ROW_GROUP_ORDINAL) .set_num_rows(1000) .set_column_metadata(columns) .build() diff --git a/parquet/src/file/mod.rs b/parquet/src/file/mod.rs index f85de98ccab6..b3913016b69e 100644 --- a/parquet/src/file/mod.rs +++ b/parquet/src/file/mod.rs @@ -102,9 +102,14 @@ pub mod reader; pub mod serialized_reader; pub mod statistics; pub mod writer; +pub mod encryption; const FOOTER_SIZE: usize = 8; const PARQUET_MAGIC: [u8; 4] = [b'P', b'A', b'R', b'1']; +/// Parquet uses PARE for encrypted footer mode, not PARC -- once we take care to check that we obey +/// the Parquet encryption spec in exact detail, this can be PARE. +const PARQUET_MAGIC_ENCRYPTED_FOOTER_CUBE: [u8; 4] = [b'P', b'A', b'R', b'C']; +const PARQUET_MAGIC_UNSUPPORTED_PARE: [u8; 4] = [b'P', b'A', b'R', b'E']; /// The number of bytes read at the end of the parquet file on first read const DEFAULT_FOOTER_READ_SIZE: usize = 64 * 1024; diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index c48e4e7a07b0..90dd95a36bc4 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -51,6 +51,7 @@ use std::{collections::HashMap, sync::Arc}; use crate::basic::{Compression, Encoding}; +use crate::file::encryption::{ParquetEncryptionKeyInfo, RandomFileIdentifier}; use crate::file::metadata::KeyValue; use crate::schema::types::ColumnPath; @@ -102,6 +103,7 @@ pub struct WriterProperties { pub(crate) key_value_metadata: Option>, default_column_properties: ColumnProperties, column_properties: HashMap, + pub(crate) encryption_info: Option<(ParquetEncryptionKeyInfo, RandomFileIdentifier)>, } impl WriterProperties { @@ -228,6 +230,7 @@ pub struct WriterPropertiesBuilder { key_value_metadata: Option>, default_column_properties: ColumnProperties, column_properties: HashMap, + encryption_info: Option<(ParquetEncryptionKeyInfo, RandomFileIdentifier)>, } impl WriterPropertiesBuilder { @@ -243,6 +246,7 @@ impl WriterPropertiesBuilder { key_value_metadata: None, default_column_properties: ColumnProperties::new(), column_properties: HashMap::new(), + encryption_info: None, } } @@ -258,6 +262,7 @@ impl WriterPropertiesBuilder { key_value_metadata: self.key_value_metadata, default_column_properties: self.default_column_properties, column_properties: self.column_properties, + encryption_info: self.encryption_info, } } @@ -307,6 +312,12 @@ impl WriterPropertiesBuilder { self } + /// Sets "encryption key" property. + pub fn set_encryption_info(mut self, value: Option<(ParquetEncryptionKeyInfo, RandomFileIdentifier)>) -> Self { + self.encryption_info = value; + self + } + // ---------------------------------------------------------------------- // Setters for any column (global) diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index d0158852d938..d95bb76c1b09 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -18,6 +18,7 @@ //! Contains implementations of the reader traits FileReader, RowGroupReader and PageReader //! Also contains implementations of the ChunkReader for files (with buffering) and byte arrays (RAM) +use std::io::Cursor; use std::{convert::TryFrom, fs::File, io::Read, path::Path, sync::Arc}; use parquet_format::{PageHeader, PageType}; @@ -27,6 +28,7 @@ use crate::basic::{Compression, Encoding, Type}; use crate::column::page::{Page, PageReader}; use crate::compression::{create_codec, Codec}; use crate::errors::{ParquetError, Result}; +use crate::file::encryption::{decrypt_module, parquet_aad_suffix, ParquetEncryptionKey, RandomFileIdentifier, USUAL_ENCRYPTION_OVERHEAD}; use crate::file::{footer, metadata::*, reader::*, statistics}; use crate::record::reader::RowIter; use crate::record::Row; @@ -37,6 +39,8 @@ use crate::util::{io::TryClone, memory::ByteBufferPtr}; // re-use the logic in their own ParquetFileWriter wrappers pub use crate::util::{cursor::SliceableCursor, io::FileSource}; +use super::encryption::ParquetEncryptionConfig; + // ---------------------------------------------------------------------- // Implementations of traits facilitating the creation of a new reader @@ -125,16 +129,18 @@ impl IntoIterator for SerializedFileReader { pub struct SerializedFileReader { chunk_reader: Arc, metadata: ParquetMetaData, + encryption_info: Option<(ParquetEncryptionKey, RandomFileIdentifier)> } impl SerializedFileReader { /// Creates file reader from a Parquet file. /// Returns error if Parquet file does not exist or is corrupt. pub fn new(chunk_reader: R) -> Result { - let metadata = footer::parse_metadata(&chunk_reader)?; + let (metadata, _) = footer::parse_metadata(&chunk_reader, &None)?; Ok(Self { chunk_reader: Arc::new(chunk_reader), metadata, + encryption_info: None, }) } @@ -143,9 +149,33 @@ impl SerializedFileReader { Self { chunk_reader: Arc::new(chunk_reader), metadata, + encryption_info: None, } } + pub fn new_maybe_encrypted(chunk_reader: R, encryption_config: &Option) -> Result { + let (metadata, encryption_key) = footer::parse_metadata(&chunk_reader, encryption_config)?; + Self::new_with_metadata_maybe_encrypted(chunk_reader, metadata, &encryption_key) + } + + /// Creates file reader from a Parquet file, using pre-read metadata. + pub fn new_with_metadata_maybe_encrypted(chunk_reader: R, metadata: ParquetMetaData, encryption_key: &Option) -> Result { + let encryption_info: Option<_>; + if let Some(encryption_key) = encryption_key { + let random_file_identifier = metadata.file_metadata().random_file_identifier().ok_or_else( + || general_err!("Unsupported Parquet file: When encryption is used, FileMetaData must have a random_file_identifier") + )?; + encryption_info = Some((*encryption_key, random_file_identifier)); + } else { + encryption_info = None; + } + Ok(Self { + chunk_reader: Arc::new(chunk_reader), + metadata, + encryption_info, + }) + } + /// Filters row group metadata to only those row groups, /// for which the predicate function returns true pub fn filter_row_groups( @@ -179,9 +209,14 @@ impl FileReader for SerializedFileReader { let row_group_metadata = self.metadata.row_group(i); // Row groups should be processed sequentially. let f = Arc::clone(&self.chunk_reader); + // TODO: It seems lame that we have this limit in unencrypted mode. And maybe we could error earlier. + let row_group_ordinal: i16 = i16::try_from(i) + .map_err(|_| general_err!("number of row groups cannot exceed {}", 1 << 15))?; Ok(Box::new(SerializedRowGroupReader::new( f, row_group_metadata, + self.encryption_info, + row_group_ordinal, ))) } @@ -194,14 +229,18 @@ impl FileReader for SerializedFileReader { pub struct SerializedRowGroupReader<'a, R: ChunkReader> { chunk_reader: Arc, metadata: &'a RowGroupMetaData, + encryption_info: Option<(ParquetEncryptionKey, RandomFileIdentifier)>, + row_group_ordinal: i16, } impl<'a, R: ChunkReader> SerializedRowGroupReader<'a, R> { /// Creates new row group reader from a file and row group metadata. - fn new(chunk_reader: Arc, metadata: &'a RowGroupMetaData) -> Self { + fn new(chunk_reader: Arc, metadata: &'a RowGroupMetaData, encryption_info: Option<(ParquetEncryptionKey, RandomFileIdentifier)>, row_group_ordinal: i16) -> Self { Self { chunk_reader, metadata, + encryption_info, + row_group_ordinal, } } } @@ -220,10 +259,17 @@ impl<'a, R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<' let col = self.metadata.column(i); let (col_start, col_length) = col.byte_range(); let file_chunk = self.chunk_reader.get_read(col_start, col_length as usize)?; + // TODO: It seems lame that we have this limit in unencrypted mode. And maybe we could error earlier. + let column_ordinal = u16::try_from(i) + .map_err(|_| general_err!("number of columns cannot exceed {}", u16::MAX as u32 + 1))?; let page_reader = SerializedPageReader::new( file_chunk, + self.encryption_info, + self.row_group_ordinal, + column_ordinal, col.num_values(), col.compression(), + col.has_dictionary_page(), col.column_descr().physical_type(), )?; Ok(Box::new(page_reader)) @@ -240,6 +286,14 @@ pub struct SerializedPageReader { // to be read by this page reader. buf: T, + encryption_info: Option<(ParquetEncryptionKey, RandomFileIdentifier)>, + row_group_ordinal: i16, + column_ordinal: u16, + + // Mutable: the page_ordinal of the next page read. Initialized with None in the case we expect + // to start with a dictionary page, then gets incremented to Some(0) before the first data page. + page_ordinal: Option, + // The compression codec for this column chunk. Only set for non-PLAIN codec. decompressor: Option>, @@ -257,13 +311,22 @@ impl SerializedPageReader { /// Creates a new serialized page reader from file source. pub fn new( buf: T, + encryption_info: Option<(ParquetEncryptionKey, RandomFileIdentifier)>, + row_group_ordinal: i16, + column_ordinal: u16, total_num_values: i64, compression: Compression, + dictionary_enabled: bool, physical_type: Type, ) -> Result { let decompressor = create_codec(compression)?; + let page_ordinal = if dictionary_enabled { None } else { Some(0) }; let result = Self { buf, + encryption_info, + row_group_ordinal, + column_ordinal, + page_ordinal, total_num_values, seen_num_values: 0, decompressor, @@ -273,10 +336,21 @@ impl SerializedPageReader { } /// Reads Page header from Thrift. - fn read_page_header(&mut self) -> Result { - let mut prot = TCompactInputProtocol::new(&mut self.buf); - let page_header = PageHeader::read_from_in_protocol(&mut prot)?; - Ok(page_header) + fn read_page_header(&mut self, aad_header_module_type: u8, aad_page_ordinal: Option) -> Result { + if let Some((encryption_key, random_file_identifier)) = &self.encryption_info { + let aad_suffix = parquet_aad_suffix(random_file_identifier, aad_header_module_type, + self.row_group_ordinal, self.column_ordinal, aad_page_ordinal); + + let plaintext_cursor = decrypt_module("PageHeader", &mut self.buf, encryption_key, &aad_suffix)?; + + let mut prot = TCompactInputProtocol::new(plaintext_cursor); + let page_header = PageHeader::read_from_in_protocol(&mut prot)?; + Ok(page_header) + } else { + let mut prot = TCompactInputProtocol::new(&mut self.buf); + let page_header = PageHeader::read_from_in_protocol(&mut prot)?; + Ok(page_header) + } } } @@ -284,14 +358,47 @@ impl Iterator for SerializedPageReader { type Item = Result; fn next(&mut self) -> Option { - self.get_next_page().transpose() + let ret = self.get_next_page().transpose(); + ret } } +pub const DATA_PAGE_MODULE_TYPE: u8 = 2; +pub const DICTIONARY_PAGE_MODULE_TYPE: u8 = 3; +pub const DATA_PAGE_HEADER_MODULE_TYPE: u8 = 4; +pub const DICTIONARY_PAGE_HEADER_MODULE_TYPE: u8 = 5; +// TODO: We (the existing Rust lib) write a ColumnChunk after the data pages. Is this described in +// the docs? It never gets read -- in fact the ChunkReader, using total_compressed_size, emits +// Read objects over intervals that don't contain it. What do the other Parquet libs do? +pub const COLUMNCHUNK_MODULE_TYPE: u8 = 255; + impl PageReader for SerializedPageReader { fn get_next_page(&mut self) -> Result> { while self.seen_num_values < self.total_num_values { - let page_header = self.read_page_header()?; + let aad_page_ordinal = self.page_ordinal; + let aad_module_type: u8; + let aad_header_module_type: u8; + if aad_page_ordinal.is_some() { + // Data pages + aad_module_type = DATA_PAGE_MODULE_TYPE; + aad_header_module_type = DATA_PAGE_HEADER_MODULE_TYPE; + } else { + // The dictionary page + aad_module_type = DICTIONARY_PAGE_MODULE_TYPE; + aad_header_module_type = DICTIONARY_PAGE_HEADER_MODULE_TYPE; + } + let page_header = self.read_page_header(aad_header_module_type, aad_page_ordinal)?; + + let mut cursor: Cursor>; + let reader: &mut dyn Read; + if let Some((encryption_key, random_file_identifier)) = &self.encryption_info { + let aad = parquet_aad_suffix(random_file_identifier, aad_module_type, self.row_group_ordinal, + self.column_ordinal, aad_page_ordinal); + cursor = decrypt_module("Page data", &mut self.buf, encryption_key, &aad)?; + reader = &mut cursor; + } else { + reader = &mut self.buf; + } // When processing data page v2, depending on enabled compression for the // page, we should account for uncompressed data ('offset') of @@ -310,11 +417,11 @@ impl PageReader for SerializedPageReader { can_decompress = header_v2.is_compressed.unwrap_or(true); } - let compressed_len = page_header.compressed_page_size as usize - offset; + let compressed_unencrypted_len = (page_header.compressed_page_size as usize) - (if self.encryption_info.is_some() { USUAL_ENCRYPTION_OVERHEAD } else { 0 }) - offset; let uncompressed_len = page_header.uncompressed_page_size as usize - offset; // We still need to read all bytes from buffered stream - let mut buffer = vec![0; offset + compressed_len]; - self.buf.read_exact(&mut buffer)?; + let mut buffer = vec![0; offset + compressed_unencrypted_len]; + reader.read_exact(&mut buffer)?; // TODO: page header could be huge because of statistics. We should set a // maximum page header size and abort if that is exceeded. @@ -398,6 +505,12 @@ impl PageReader for SerializedPageReader { continue; } }; + self.page_ordinal = if let Some(n) = aad_page_ordinal { + let n_plus_1 = n.checked_add(1).ok_or_else(|| general_err!("Number of pages in row group exceeded {}", u16::MAX))?; + Some(n_plus_1) + } else { + Some(0) + }; return Ok(Some(result)); } diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index e37fca3fbef3..d1303e7926f4 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -19,6 +19,7 @@ //! using row group writers and column writers respectively. use std::{ + convert::TryFrom, io::{Seek, SeekFrom, Write}, sync::Arc, }; @@ -27,15 +28,15 @@ use byteorder::{ByteOrder, LittleEndian}; use parquet_format as parquet; use thrift::protocol::{TCompactOutputProtocol, TOutputProtocol}; -use crate::basic::PageType; +use crate::{basic::PageType}; use crate::column::{ page::{CompressedPage, Page, PageWriteSpec, PageWriter}, writer::{get_column_writer, ColumnWriter}, }; use crate::errors::{ParquetError, Result}; use crate::file::{ - metadata::*, properties::WriterPropertiesPtr, - statistics::to_thrift as statistics_to_thrift, FOOTER_SIZE, PARQUET_MAGIC, + metadata::*, properties::{WriterPropertiesPtr}, + statistics::to_thrift as statistics_to_thrift, FOOTER_SIZE, }; use crate::schema::types::{self, SchemaDescPtr, SchemaDescriptor, TypePtr}; use crate::util::io::{FileSink, Position}; @@ -46,6 +47,8 @@ pub use crate::util::io::TryClone; // Exposed publically for convenience of writing Parquet to a buffer of bytes pub use crate::util::cursor::InMemoryWriteableCursor; +use crate::file::{encryption::{encrypt_module, parquet_aad_suffix, parquet_magic, ParquetEncryptionKey, PrepaddedPlaintext, RandomFileIdentifier, USUAL_ENCRYPTION_OVERHEAD}, serialized_reader::{COLUMNCHUNK_MODULE_TYPE, DATA_PAGE_HEADER_MODULE_TYPE, DATA_PAGE_MODULE_TYPE, DICTIONARY_PAGE_HEADER_MODULE_TYPE, DICTIONARY_PAGE_MODULE_TYPE}}; + // ---------------------------------------------------------------------- // APIs for file & row group writers @@ -146,7 +149,7 @@ impl SerializedFileWriter { schema: TypePtr, properties: WriterPropertiesPtr, ) -> Result { - Self::start_file(&mut buf)?; + Self::start_file(&mut buf, properties.encryption_info.is_some())?; Ok(Self { buf, schema: schema.clone(), @@ -159,9 +162,10 @@ impl SerializedFileWriter { }) } - /// Writes magic bytes at the beginning of the file. - fn start_file(buf: &mut W) -> Result<()> { - buf.write_all(&PARQUET_MAGIC)?; + /// Writes magic bytes at the beginning of the file, depending on whether the file is encrypted + /// (in encrypted footer mode). + fn start_file(buf: &mut W, is_footer_encrypted: bool) -> Result<()> { + buf.write_all(&parquet_magic(is_footer_encrypted))?; Ok(()) } @@ -191,11 +195,45 @@ impl SerializedFileWriter { key_value_metadata: self.props.key_value_metadata().to_owned(), created_by: Some(self.props.created_by().to_owned()), column_orders: None, + // encryption_algorithm and footer_signing_key_metadata are used in plaintext footer + // mode, which we don't use. + encryption_algorithm: None, + footer_signing_key_metadata: None, }; - // Write file metadata + // Write file metadata (FileCryptoMetaData (if applicable) and FileMetaData) let start_pos = self.buf.seek(SeekFrom::Current(0))?; - { + + if let Some((key_info, random_file_identifier)) = &self.props.encryption_info { + // FileCryptoMetaData and FileMetadata + + let file_crypto_metadata = parquet::FileCryptoMetaData { + encryption_algorithm: parquet::EncryptionAlgorithm::AESGCMV1(parquet_format::AesGcmV1{ + aad_prefix: None, aad_file_unique: Some(random_file_identifier.to_vec()), supply_aad_prefix: None, + }), + // TODO: Maybe the user of this parquet lib will want to make their own decision + // about this. Right now this library supports passing multiple read keys, and uses + // the Sha3-256 of the key as a key id to select the key. + key_metadata: Some(key_info.key.compute_key_hash().to_vec()), + }; + + { + let mut protocol = TCompactOutputProtocol::new(&mut self.buf); + file_crypto_metadata.write_to_out_protocol(&mut protocol)?; + protocol.flush()?; + } + + let mut plaintext = PrepaddedPlaintext::new(); + { + let mut protocol = TCompactOutputProtocol::new(plaintext.buf_mut()); + file_metadata.write_to_out_protocol(&mut protocol)?; + protocol.flush()?; + } + + let no_aad = &[]; + encrypt_module("FileMetaData", &mut self.buf, &key_info.key, plaintext, no_aad)?; + } else { + // just FileMetaData let mut protocol = TCompactOutputProtocol::new(&mut self.buf); file_metadata.write_to_out_protocol(&mut protocol)?; protocol.flush()?; @@ -206,7 +244,7 @@ impl SerializedFileWriter { let mut footer_buffer: [u8; FOOTER_SIZE] = [0; FOOTER_SIZE]; let metadata_len = (end_pos - start_pos) as i32; LittleEndian::write_i32(&mut footer_buffer, metadata_len); - (&mut footer_buffer[4..]).write_all(&PARQUET_MAGIC)?; + (&mut footer_buffer[4..]).write_all(&parquet_magic(self.props.encryption_info.is_some()))?; self.buf.write_all(&footer_buffer)?; Ok(file_metadata) } @@ -235,9 +273,12 @@ impl FileWriter for SerializedFileWriter { fn next_row_group(&mut self) -> Result> { self.assert_closed()?; self.assert_previous_writer_closed()?; + let row_group_ordinal: i16 = i16::try_from(self.row_groups.len()) + .map_err(|_| general_err!("Number of row groups cannot exceed {}", i16::MAX as i32 + 1))?; let row_group_writer = SerializedRowGroupWriter::new( self.descr.clone(), self.props.clone(), + row_group_ordinal, &self.buf, ); self.previous_writer_closed = false; @@ -277,6 +318,7 @@ pub struct SerializedRowGroupWriter { total_bytes_written: u64, column_index: usize, previous_writer_closed: bool, + row_group_ordinal: i16, row_group_metadata: Option, column_chunks: Vec, } @@ -285,6 +327,7 @@ impl SerializedRowGroupWriter { pub fn new( schema_descr: SchemaDescPtr, properties: WriterPropertiesPtr, + row_group_ordinal: i16, buf: &W, ) -> Self { let num_columns = schema_descr.num_columns(); @@ -296,6 +339,7 @@ impl SerializedRowGroupWriter { total_bytes_written: 0, column_index: 0, previous_writer_closed: true, + row_group_ordinal, row_group_metadata: None, column_chunks: Vec::with_capacity(num_columns), } @@ -360,8 +404,10 @@ impl RowGroupWriter for SerializedRowGroupWriter if self.column_index >= self.descr.num_columns() { return Ok(None); } + let column_ordinal: u16 = u16::try_from(self.column_index) + .map_err(|_| general_err!("Number of columns cannot exceed {}", u16::MAX as u32 + 1))?; let sink = FileSink::new(&self.buf); - let page_writer = Box::new(SerializedPageWriter::new(sink)); + let page_writer = Box::new(SerializedPageWriter::new(sink, self.props.encryption_info.as_ref().map(|(key_info, rfi)| (key_info.key, *rfi)), self.row_group_ordinal, column_ordinal)); let column_writer = get_column_writer( self.descr.column(self.column_index), self.props.clone(), @@ -386,7 +432,7 @@ impl RowGroupWriter for SerializedRowGroupWriter self.assert_previous_writer_closed()?; let column_chunks = std::mem::take(&mut self.column_chunks); - let row_group_metadata = RowGroupMetaData::builder(self.descr.clone()) + let row_group_metadata = RowGroupMetaData::builder(self.descr.clone(), self.row_group_ordinal) .set_column_metadata(column_chunks) .set_total_byte_size(self.total_bytes_written as i64) .set_num_rows(self.total_rows_written.unwrap_or(0) as i64) @@ -406,20 +452,36 @@ impl RowGroupWriter for SerializedRowGroupWriter /// `SerializedPageWriter` should not be used after calling `close()`. pub struct SerializedPageWriter { sink: T, + encryption_info: Option<(ParquetEncryptionKey, RandomFileIdentifier)>, + row_group_ordinal: i16, + column_ordinal: u16, } impl SerializedPageWriter { /// Creates new page writer. - pub fn new(sink: T) -> Self { - Self { sink } + pub fn new(sink: T, encryption_info: Option<(ParquetEncryptionKey, RandomFileIdentifier)>, row_group_ordinal: i16, column_ordinal: u16) -> Self { + Self { sink, encryption_info, row_group_ordinal, column_ordinal } } /// Serializes page header into Thrift. + /// aad_header_module_type needs to be the correct value that corresponds to header.page_type(). /// Returns number of bytes that have been written into the sink. #[inline] - fn serialize_page_header(&mut self, header: parquet::PageHeader) -> Result { + fn serialize_page_header(&mut self, header: parquet::PageHeader, aad_header_module_type: u8, page_ordinal: Option) -> Result { let start_pos = self.sink.pos(); - { + if let Some((encryption_key, random_file_identifier)) = &self.encryption_info { + let aad_suffix = parquet_aad_suffix(random_file_identifier, aad_header_module_type, + self.row_group_ordinal, self.column_ordinal, page_ordinal); + + let mut plaintext = PrepaddedPlaintext::new(); + { + let mut protocol = TCompactOutputProtocol::new(plaintext.buf_mut()); + header.write_to_out_protocol(&mut protocol)?; + protocol.flush()?; + } + + encrypt_module("PageHeader", &mut self.sink, encryption_key, plaintext, &aad_suffix)?; + } else { let mut protocol = TCompactOutputProtocol::new(&mut self.sink); header.write_to_out_protocol(&mut protocol)?; protocol.flush()?; @@ -431,17 +493,35 @@ impl SerializedPageWriter { /// Returns Ok() if there are not errors serializing and writing data into the sink. #[inline] fn serialize_column_chunk(&mut self, chunk: parquet::ColumnChunk) -> Result<()> { - let mut protocol = TCompactOutputProtocol::new(&mut self.sink); - chunk.write_to_out_protocol(&mut protocol)?; - protocol.flush()?; + if let Some((encryption_key, random_file_identifier)) = &self.encryption_info { + // TODO: Verify that we behave the same way as other arrow implementations here, in the + // sense that we should verify that others write out this ColumnChunk _here_ at all. + let aad_module_type = COLUMNCHUNK_MODULE_TYPE; + let aad_suffix = parquet_aad_suffix(random_file_identifier, aad_module_type, + self.row_group_ordinal, self.column_ordinal, None); + + let mut plaintext = PrepaddedPlaintext::new(); + { + let mut protocol = TCompactOutputProtocol::new(plaintext.buf_mut()); + chunk.write_to_out_protocol(&mut protocol)?; + protocol.flush()?; + } + + encrypt_module("ColumnChunk", &mut self.sink, encryption_key, plaintext, &aad_suffix)?; + } else { + let mut protocol = TCompactOutputProtocol::new(&mut self.sink); + chunk.write_to_out_protocol(&mut protocol)?; + protocol.flush()?; + } Ok(()) } } impl PageWriter for SerializedPageWriter { - fn write_page(&mut self, page: CompressedPage) -> Result { + fn write_page(&mut self, page: CompressedPage, aad_page_ordinal: Option) -> Result { let uncompressed_size = page.uncompressed_size(); - let compressed_size = page.compressed_size(); + let compressed_unencrypted_size = page.compressed_unencrypted_size(); + let compressed_size = (if self.encryption_info.is_some() { USUAL_ENCRYPTION_OVERHEAD } else { 0 }) + compressed_unencrypted_size; let num_values = page.num_values(); let encoding = page.encoding(); let page_type = page.page_type(); @@ -458,6 +538,8 @@ impl PageWriter for SerializedPageWriter { data_page_header_v2: None, }; + let aad_module_type: u8; + let aad_header_module_type: u8; match *page.compressed_page() { Page::DataPage { def_level_encoding, @@ -473,6 +555,8 @@ impl PageWriter for SerializedPageWriter { statistics: statistics_to_thrift(statistics.as_ref()), }; page_header.data_page_header = Some(data_page_header); + aad_module_type = DATA_PAGE_MODULE_TYPE; + aad_header_module_type = DATA_PAGE_HEADER_MODULE_TYPE; } Page::DataPageV2 { num_nulls, @@ -494,6 +578,8 @@ impl PageWriter for SerializedPageWriter { statistics: statistics_to_thrift(statistics.as_ref()), }; page_header.data_page_header_v2 = Some(data_page_header_v2); + aad_module_type = DATA_PAGE_MODULE_TYPE; + aad_header_module_type = DATA_PAGE_HEADER_MODULE_TYPE; } Page::DictionaryPage { is_sorted, .. } => { let dictionary_page_header = parquet::DictionaryPageHeader { @@ -502,13 +588,26 @@ impl PageWriter for SerializedPageWriter { is_sorted: Some(is_sorted), }; page_header.dictionary_page_header = Some(dictionary_page_header); + aad_module_type = DICTIONARY_PAGE_MODULE_TYPE; + aad_header_module_type = DICTIONARY_PAGE_HEADER_MODULE_TYPE; } } let start_pos = self.sink.pos(); - let header_size = self.serialize_page_header(page_header)?; - self.sink.write_all(page.data())?; + // TODO: header_size is after encryption -- is that what we want? What about for uncompressed_size? + let header_size = self.serialize_page_header(page_header, aad_header_module_type, aad_page_ordinal)?; + + if let Some((encryption_key, random_file_identifier)) = &self.encryption_info { + let aad_suffix = parquet_aad_suffix(random_file_identifier, aad_module_type, + self.row_group_ordinal, self.column_ordinal, aad_page_ordinal); + + let mut plaintext = PrepaddedPlaintext::new(); + plaintext.buf_mut().extend_from_slice(page.data()); + encrypt_module("Page data", &mut self.sink, encryption_key, plaintext, &aad_suffix)?; + } else { + self.sink.write_all(page.data())?; + } let mut spec = PageWriteSpec::new(); spec.page_type = page_type; @@ -538,11 +637,15 @@ impl PageWriter for SerializedPageWriter { mod tests { use super::*; + use std::os::unix::fs::FileExt; use std::{fs::File, io::Cursor}; use crate::basic::{Compression, Encoding, IntType, LogicalType, Repetition, Type}; use crate::column::page::PageReader; use crate::compression::{create_codec, Codec}; + use crate::file::encryption::{generate_random_file_identifier, ParquetEncryptionConfig, ParquetEncryptionKeyInfo}; + use crate::file::reader::Length; + use crate::file::{PARQUET_MAGIC, PARQUET_MAGIC_ENCRYPTED_FOOTER_CUBE}; use crate::file::{ properties::{WriterProperties, WriterVersion}, reader::{FileReader, SerializedFileReader, SerializedPageReader}, @@ -879,14 +982,31 @@ mod tests { test_page_roundtrip(&pages[..], Compression::UNCOMPRESSED, Type::INT32); } + const TEST_ROW_GROUP_ORDINAL: i16 = 2325; + const TEST_COLUMN_ORDINAL: u16 = 135; + /// Tests writing and reading pages. /// Physical type is for statistics only, should match any defined statistics type in /// pages. fn test_page_roundtrip(pages: &[Page], codec: Compression, physical_type: Type) { + test_page_roundtrip_helper(pages, codec, physical_type, &None); + test_page_roundtrip_helper(pages, codec, physical_type, + &Some((ParquetEncryptionKey::generate_key(), generate_random_file_identifier()))); + } + + fn test_page_roundtrip_helper(pages: &[Page], codec: Compression, physical_type: Type, + encryption_info: &Option<(ParquetEncryptionKey, RandomFileIdentifier)>) { + let mut compressed_pages = vec![]; let mut total_num_values = 0i64; let mut compressor = create_codec(codec).unwrap(); + // Kind of silly because we don't enforce in this test helper function that pages are in the + // correct order (dictionary first), but we don't have encryption in this test (yet?) anyway + // (as that's where pages need to be in the proper order, as we need to know the aad suffix + // in advance) so it doesn't really matter. + let mut has_dictionary_page = false; + for page in pages { let uncompressed_len = page.buffer().len(); @@ -955,6 +1075,8 @@ mod tests { } => { let output_buf = compress_helper(compressor.as_mut(), buf.data()); + has_dictionary_page = true; + Page::DictionaryPage { buf: ByteBufferPtr::new(output_buf), num_values, @@ -972,18 +1094,24 @@ mod tests { let mut result_pages: Vec = vec![]; { let cursor = Cursor::new(&mut buffer); - let mut page_writer = SerializedPageWriter::new(cursor); + let mut page_writer = SerializedPageWriter::new(cursor, *encryption_info, TEST_ROW_GROUP_ORDINAL, TEST_COLUMN_ORDINAL); + let mut page_ordinal = if has_dictionary_page { None:: } else { Some(0) }; for page in compressed_pages { - page_writer.write_page(page).unwrap(); + page_writer.write_page(page, page_ordinal).unwrap(); + page_ordinal = Some(page_ordinal.map_or(0, |x| x + 1)); } page_writer.close().unwrap(); } { let mut page_reader = SerializedPageReader::new( Cursor::new(&buffer), + *encryption_info, + TEST_ROW_GROUP_ORDINAL, + TEST_COLUMN_ORDINAL, total_num_values, codec, + has_dictionary_page, physical_type, ) .unwrap(); @@ -1019,9 +1147,30 @@ mod tests { assert_eq!(to_thrift(left.statistics()), to_thrift(right.statistics())); } + fn assert_magic(file: &mut File, expected: [u8; 4]) { + let length = file.len(); + // Of course the file has to be larger than just 8, but we're just sanity-checking when checking the magic. + assert!(length >= 8); + + let mut buf = [0xCDu8, 0xCD, 0xCD, 0xCD]; + file.read_exact_at(&mut buf[..], 0).unwrap(); + assert_eq!(buf, expected); + file.read_exact_at(&mut buf[..], length - 4).unwrap(); + assert_eq!(buf, expected); + } + /// File write-read roundtrip. /// `data` consists of arrays of values for each row group. - fn test_file_roundtrip(file: File, data: Vec>) { + fn test_file_roundtrip(mut file: File, data: Vec>) { + test_file_roundtrip_with_encryption_key(file.try_clone().unwrap(), &data, &None); + assert_magic(&mut file, PARQUET_MAGIC); + file.set_len(0).unwrap(); + file.seek(SeekFrom::Start(0)).unwrap(); + test_file_roundtrip_with_encryption_key(file.try_clone().unwrap(), &data, &Some(ParquetEncryptionKey::generate_key())); + assert_magic(&mut file, PARQUET_MAGIC_ENCRYPTED_FOOTER_CUBE); + } + + fn test_file_roundtrip_with_encryption_key(file: File, data: &Vec>, encryption_key: &Option) { let schema = Arc::new( types::Type::group_type_builder("schema") .with_fields(&mut vec![Arc::new( @@ -1033,13 +1182,14 @@ mod tests { .build() .unwrap(), ); - let props = Arc::new(WriterProperties::builder().build()); + let encryption_info = encryption_key.map(|key| (ParquetEncryptionKeyInfo{key_id: "a key id".to_string(), key}, generate_random_file_identifier())); + let props = Arc::new(WriterProperties::builder().set_encryption_info(encryption_info.clone()).build()); let mut file_writer = assert_send( SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap(), ); let mut rows: i64 = 0; - for subset in &data { + for subset in data { let mut row_group_writer = file_writer.next_row_group().unwrap(); let col_writer = row_group_writer.next_column().unwrap(); if let Some(mut writer) = col_writer { @@ -1059,7 +1209,8 @@ mod tests { file_writer.close().unwrap(); - let reader = assert_send(SerializedFileReader::new(file).unwrap()); + let encryption_config = encryption_info.map(|(key_info, _)| ParquetEncryptionConfig::new(vec![key_info]).unwrap()); + let reader = assert_send(SerializedFileReader::new_maybe_encrypted(file, &encryption_config).unwrap()); assert_eq!(reader.num_row_groups(), data.len()); assert_eq!( reader.metadata().file_metadata().num_rows(), @@ -1101,6 +1252,11 @@ mod tests { } fn test_bytes_roundtrip(data: Vec>) { + test_bytes_roundtrip_helper(&data, &None); + test_bytes_roundtrip_helper(&data, &Some(ParquetEncryptionKey::generate_key())); + } + + fn test_bytes_roundtrip_helper(data: &Vec>, encryption_key: &Option) { let cursor = InMemoryWriteableCursor::default(); let schema = Arc::new( @@ -1115,13 +1271,15 @@ mod tests { .unwrap(), ); + let encryption_info = encryption_key.map(|key| (ParquetEncryptionKeyInfo{key_id: "a key id".to_string(), key}, generate_random_file_identifier())); + let mut rows: i64 = 0; { - let props = Arc::new(WriterProperties::builder().build()); + let props = Arc::new(WriterProperties::builder().set_encryption_info(encryption_info.clone()).build()); let mut writer = SerializedFileWriter::new(cursor.clone(), schema, props).unwrap(); - for subset in &data { + for subset in data { let mut row_group_writer = writer.next_row_group().unwrap(); let col_writer = row_group_writer.next_column().unwrap(); if let Some(mut writer) = col_writer { @@ -1145,7 +1303,8 @@ mod tests { let buffer = cursor.into_inner().unwrap(); let reading_cursor = crate::file::serialized_reader::SliceableCursor::new(buffer); - let reader = SerializedFileReader::new(reading_cursor).unwrap(); + let encryption_config = encryption_info.map(|(key_info, _)| ParquetEncryptionConfig::new(vec![key_info]).unwrap()); + let reader = SerializedFileReader::new_maybe_encrypted(reading_cursor, &encryption_config).unwrap(); assert_eq!(reader.num_row_groups(), data.len()); assert_eq!(