From 9d6173caa1756600981f245c43197bb4d52dcac7 Mon Sep 17 00:00:00 2001 From: Sam Hughes Date: Wed, 24 Jul 2024 17:49:30 -0700 Subject: [PATCH] feat: Implement Parquet footer-mode encryption Uses PARC magic. Haven't tested if we perfectly follow the Parquet spec. Includes some design choices around key metadata (such as using Sha3 key metadata to select keys) which are not sufficiently abstracted for a general purpose library. (That is, there would be some changes to make if this were to be upstreamed.) --- parquet/Cargo.toml | 8 +- parquet/src/basic.rs | 11 ++ parquet/src/column/page.rs | 15 +- parquet/src/column/writer.rs | 67 ++++++-- parquet/src/file/encryption.rs | 217 +++++++++++++++++++++++++ parquet/src/file/footer.rs | 114 +++++++++++-- parquet/src/file/metadata.rs | 52 +++++- parquet/src/file/mod.rs | 5 + parquet/src/file/properties.rs | 11 ++ parquet/src/file/serialized_reader.rs | 135 ++++++++++++++-- parquet/src/file/writer.rs | 223 ++++++++++++++++++++++---- 11 files changed, 774 insertions(+), 84 deletions(-) create mode 100644 parquet/src/file/encryption.rs diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 96db44882cfa..e27e035dac95 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -30,8 +30,10 @@ edition = "2018" [dependencies] # update note: pin `parquet-format` to specific version until it does not break at minor -# version, see ARROW-11187. -parquet-format = "~2.6.1" +# version, see ARROW-11187. update: since this comment, it is now pinned at ~4.0.0 and +# upstream arrow-rs parquet vendors it +parquet-format = "~4.0.0" +aes-gcm = "0.10.3" byteorder = "1" thrift = "0.13" snap = { version = "1.0", optional = true } @@ -45,7 +47,9 @@ arrow = { path = "../arrow", version = "5.0.0", optional = true } base64 = { version = "0.13", optional = true } clap = { version = "2.33.3", optional = true } serde_json = { version = "1.0", features = ["preserve_order"], optional = true } +serde = { version = "1.0.115", features = ["derive"] } rand = "0.8" +sha3 = "0.10.8" [dev-dependencies] criterion = "0.3" diff --git a/parquet/src/basic.rs b/parquet/src/basic.rs index 631257e0ed1d..198b6a382173 100644 --- a/parquet/src/basic.rs +++ b/parquet/src/basic.rs @@ -250,6 +250,15 @@ pub enum Encoding { /// /// The ids are encoded using the RLE encoding. RLE_DICTIONARY, + + /// Encoding for floating-point data. + /// + /// K byte-streams are created where K is the size in bytes of the data type. + /// The individual bytes of an FP value are scattered to the corresponding stream and + /// the streams are concatenated. + /// This itself does not reduce the size of the data but can lead to better compression + /// afterwards. + BYTE_STREAM_SPLIT, } // ---------------------------------------------------------------------- @@ -701,6 +710,7 @@ impl convert::From for Encoding { parquet::Encoding::DeltaLengthByteArray => Encoding::DELTA_LENGTH_BYTE_ARRAY, parquet::Encoding::DeltaByteArray => Encoding::DELTA_BYTE_ARRAY, parquet::Encoding::RleDictionary => Encoding::RLE_DICTIONARY, + parquet::Encoding::ByteStreamSplit => Encoding::BYTE_STREAM_SPLIT, } } } @@ -716,6 +726,7 @@ impl convert::From for parquet::Encoding { Encoding::DELTA_LENGTH_BYTE_ARRAY => parquet::Encoding::DeltaLengthByteArray, Encoding::DELTA_BYTE_ARRAY => parquet::Encoding::DeltaByteArray, Encoding::RLE_DICTIONARY => parquet::Encoding::RleDictionary, + Encoding::BYTE_STREAM_SPLIT => parquet::Encoding::ByteStreamSplit, } } } diff --git a/parquet/src/column/page.rs b/parquet/src/column/page.rs index b75d3b5028bb..c046651486d0 100644 --- a/parquet/src/column/page.rs +++ b/parquet/src/column/page.rs @@ -139,11 +139,14 @@ impl CompressedPage { self.uncompressed_size } - /// Returns compressed size in bytes. + /// Returns compressed size (but not encrypted size) in bytes. /// - /// Note that it is assumed that buffer is compressed, but it may not be. In this - /// case compressed size will be equal to uncompressed size. - pub fn compressed_size(&self) -> usize { + /// Note that it is assumed that buffer is compressed, but it may not be. In this case + /// compressed size will be equal to uncompressed size. + /// + /// Other so-called "(total_)?compressed_size" fields include encryption overhead, when + /// applicable, which this does not. + pub fn compressed_unencrypted_size(&self) -> usize { self.compressed_page.buffer().len() } @@ -206,7 +209,7 @@ pub trait PageWriter { /// /// This method is called for every compressed page we write into underlying buffer, /// either data page or dictionary page. - fn write_page(&mut self, page: CompressedPage) -> Result; + fn write_page(&mut self, page: CompressedPage, aad_page_ordinal: Option) -> Result; /// Writes column chunk metadata into the output stream/sink. /// @@ -299,7 +302,7 @@ mod tests { assert_eq!(cpage.page_type(), PageType::DATA_PAGE); assert_eq!(cpage.uncompressed_size(), 5); - assert_eq!(cpage.compressed_size(), 3); + assert_eq!(cpage.compressed_unencrypted_size(), 3); assert_eq!(cpage.num_values(), 10); assert_eq!(cpage.encoding(), Encoding::PLAIN); assert_eq!(cpage.data(), &[0, 1, 2]); diff --git a/parquet/src/column/writer.rs b/parquet/src/column/writer.rs index 910a9ed5dcaf..d20fa9f7d774 100644 --- a/parquet/src/column/writer.rs +++ b/parquet/src/column/writer.rs @@ -168,6 +168,7 @@ pub struct ColumnWriterImpl { descr: ColumnDescPtr, props: WriterPropertiesPtr, page_writer: Box, + page_ordinal: usize, has_dictionary: bool, dict_encoder: Option>, encoder: Box>, @@ -185,6 +186,8 @@ pub struct ColumnWriterImpl { total_bytes_written: u64, total_rows_written: u64, total_uncompressed_size: u64, + // Includes encryption overhead -- the thrift definition field includes encryption overhead, and + // we keep its name here. total_compressed_size: u64, total_num_values: u64, dictionary_page_offset: Option, @@ -231,10 +234,14 @@ impl ColumnWriterImpl { ) .unwrap(); + // We start counting pages from zero. + let page_ordinal: usize = 0; + Self { descr, props, page_writer, + page_ordinal, has_dictionary, dict_encoder, encoder: fallback_encoder, @@ -824,7 +831,10 @@ impl ColumnWriterImpl { /// Writes compressed data page into underlying sink and updates global metrics. #[inline] fn write_data_page(&mut self, page: CompressedPage) -> Result<()> { - let page_spec = self.page_writer.write_page(page)?; + let page_ordinal = self.page_ordinal; + let aad_page_ordinal: Option = Some(page_ordinal as u16); + self.page_ordinal += 1; + let page_spec = self.page_writer.write_page(page, aad_page_ordinal)?; self.update_metrics_for_page(page_spec); Ok(()) } @@ -858,7 +868,7 @@ impl ColumnWriterImpl { CompressedPage::new(dict_page, uncompressed_size) }; - let page_spec = self.page_writer.write_page(compressed_page)?; + let page_spec = self.page_writer.write_page(compressed_page, None)?; self.update_metrics_for_page(page_spec); Ok(()) } @@ -1026,10 +1036,10 @@ fn has_dictionary_support(kind: Type, props: &WriterProperties) -> bool { mod tests { use rand::distributions::uniform::SampleUniform; - use crate::column::{ + use crate::{column::{ page::PageReader, reader::{get_column_reader, get_typed_column_reader, ColumnReaderImpl}, - }; + }, file::encryption::USUAL_ENCRYPTION_OVERHEAD}; use crate::file::{ properties::WriterProperties, reader::SerializedPageReader, writer::SerializedPageWriter, @@ -1642,13 +1652,16 @@ mod tests { ); } + const TEST_ROW_GROUP_ORDINAL: i16 = 1234; + const TEST_COLUMN_ORDINAL: u16 = 135; + #[test] fn test_column_writer_add_data_pages_with_dict() { // ARROW-5129: Test verifies that we add data page in case of dictionary encoding // and no fallback occurred so far. let file = get_temp_file("test_column_writer_add_data_pages_with_dict", &[]); let sink = FileSink::new(&file); - let page_writer = Box::new(SerializedPageWriter::new(sink)); + let page_writer = Box::new(SerializedPageWriter::new(sink, None, TEST_ROW_GROUP_ORDINAL, TEST_COLUMN_ORDINAL)); let props = Arc::new( WriterProperties::builder() .set_data_pagesize_limit(15) // actually each page will have size 15-18 bytes @@ -1656,7 +1669,7 @@ mod tests { .build(), ); let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; - let mut writer = get_test_column_writer::(page_writer, 0, 0, props); + let mut writer = get_test_column_writer::(page_writer, 0, 0, props.clone()); writer.write_batch(data, None, None).unwrap(); let (bytes_written, _, _) = writer.close().unwrap(); @@ -1665,8 +1678,12 @@ mod tests { let mut page_reader = Box::new( SerializedPageReader::new( source, + None, + TEST_ROW_GROUP_ORDINAL, + TEST_COLUMN_ORDINAL, data.len() as i64, Compression::UNCOMPRESSED, + props.dictionary_enabled(&ColumnPath::from("col")), Int32Type::get_physical_type(), ) .unwrap(), @@ -1803,7 +1820,7 @@ mod tests { ) { let file = get_temp_file(file_name, &[]); let sink = FileSink::new(&file); - let page_writer = Box::new(SerializedPageWriter::new(sink)); + let page_writer = Box::new(SerializedPageWriter::new(sink, None, TEST_ROW_GROUP_ORDINAL, TEST_COLUMN_ORDINAL)); let max_def_level = match def_levels { Some(buf) => *buf.iter().max().unwrap_or(&0i16), @@ -1823,11 +1840,12 @@ mod tests { max_batch_size = cmp::max(max_batch_size, levels.len()); } + let props = Arc::new(props); let mut writer = get_test_column_writer::( page_writer, max_def_level, max_rep_level, - Arc::new(props), + props.clone(), ); let values_written = writer.write_batch(values, def_levels, rep_levels).unwrap(); @@ -1838,8 +1856,12 @@ mod tests { let page_reader = Box::new( SerializedPageReader::new( source, + None, + TEST_ROW_GROUP_ORDINAL, + TEST_COLUMN_ORDINAL, column_metadata.num_values(), column_metadata.compression(), + props.dictionary_enabled(&ColumnPath::from("col")), T::get_physical_type(), ) .unwrap(), @@ -1977,20 +1999,39 @@ mod tests { /// Returns page writer that collects pages without serializing them. fn get_test_page_writer() -> Box { - Box::new(TestPageWriter {}) + Box::new(TestPageWriter {simulate_encrypted: false, last_page_ordinal: None}) } - struct TestPageWriter {} + struct TestPageWriter { + /// Always false, currently -- enabling would just affect return values that get fed into + /// test assertions. + simulate_encrypted: bool, + last_page_ordinal: Option, + } impl PageWriter for TestPageWriter { - fn write_page(&mut self, page: CompressedPage) -> Result { + fn write_page(&mut self, page: CompressedPage, aad_page_ordinal: Option) -> Result { + // We're a bit loose in this assertion -- the caller could write or not write a dictionary page. + match aad_page_ordinal { + Some(n) if n != 0 => { + assert_eq!(self.last_page_ordinal, Some(n - 1)); + } + _ => { + assert_eq!(None, self.last_page_ordinal); + } + } + self.last_page_ordinal = aad_page_ordinal; + + // Note, the normal PageWriteSpec result would include PageMetaData overhead, and these + // values are thus not perfectly faked, but the only thing that looks at them are test + // assertions. let mut res = PageWriteSpec::new(); res.page_type = page.page_type(); res.uncompressed_size = page.uncompressed_size(); - res.compressed_size = page.compressed_size(); + res.compressed_size = self.simulate_encrypted as usize * USUAL_ENCRYPTION_OVERHEAD + page.compressed_unencrypted_size(); res.num_values = page.num_values(); res.offset = 0; - res.bytes_written = page.data().len() as u64; + res.bytes_written = (self.simulate_encrypted as usize * USUAL_ENCRYPTION_OVERHEAD + page.data().len()) as u64; Ok(res) } diff --git a/parquet/src/file/encryption.rs b/parquet/src/file/encryption.rs new file mode 100644 index 000000000000..3680db28ba3f --- /dev/null +++ b/parquet/src/file/encryption.rs @@ -0,0 +1,217 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::io::{Cursor, Read, Write}; +use std::convert::TryFrom; + +use aes_gcm::aead::AeadMutInPlace; +use aes_gcm::{KeySizeUser, Tag}; +use aes_gcm::{AeadCore as _, Aes256Gcm, KeyInit as _, Nonce}; +use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; +use rand::{rngs::OsRng, RngCore as _}; +use serde::{Deserialize, Serialize}; +use sha3::{Digest, Sha3_224}; + +use crate::errors::{ParquetError, Result}; + +use crate::file::{PARQUET_MAGIC, PARQUET_MAGIC_ENCRYPTED_FOOTER_CUBE}; + +pub type ParquetEncryptionKeyId = String; + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct ParquetEncryptionKeyInfo { + pub key_id: ParquetEncryptionKeyId, + pub key: ParquetEncryptionKey, +} + +/// Describes general parquet encryption configuration -- new files are encrypted with the +/// write_key(), but old files can be decrypted with any of the valid read keys. +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct ParquetEncryptionConfig { + // The last key is the write key, and all the keys are valid read keys. + keys: Vec, +} + +impl ParquetEncryptionConfig { + pub fn new(keys: Vec) -> Option { + if keys.is_empty() { + None + } else { + Some(ParquetEncryptionConfig{ keys }) + } + } + + pub fn write_key(&self) -> &ParquetEncryptionKeyInfo { + self.keys.last().unwrap() + } + + pub fn read_keys(&self) -> &[ParquetEncryptionKeyInfo] { + self.keys.as_slice() + } +} + +// Since keys are 32 bytes (being 256 bits), we use 28-byte hashes to avoid mistaking a key for a +// key hash. +pub const PARQUET_KEY_HASH_LENGTH: usize = 28; +pub const PARQUET_KEY_SIZE: usize = 32; // Aes256Gcm, hence 32 bytes + +/// Describes how we encrypt or encrypted the Parquet files. Right now (in this implementation) +/// files can only be encrypted in "encrypted footer mode" with the footer and columns all encrypted +/// with the same key. + +#[derive(Serialize, Deserialize, Debug, Clone, Copy)] +pub struct ParquetEncryptionKey { + /// The key we use for all parts and components of the Parquet files. + pub key: [u8; PARQUET_KEY_SIZE] +} + +impl ParquetEncryptionKey { + pub fn default() -> ParquetEncryptionKey { + ParquetEncryptionKey{key: Default::default()} + } + + pub fn key_size() -> usize { + Aes256Gcm::key_size() + } + + pub fn generate_key() -> ParquetEncryptionKey { + let key = Aes256Gcm::generate_key(OsRng); + let mut result = ParquetEncryptionKey::default(); + result.key.copy_from_slice(&key); + result + } + + pub fn to_aes256_gcm_key(&self) -> aes_gcm::Key { + let mut result = aes_gcm::Key::::default(); + let r: &mut [u8] = &mut result; + r.copy_from_slice(&self.key); + result + } + + pub fn compute_key_hash(&self) -> [u8; PARQUET_KEY_HASH_LENGTH] { + let mut hasher = Sha3_224::new(); + hasher.update(&self.key); + let result = hasher.finalize(); + result.into() + } +} + +pub const AAD_FILE_UNIQUE_SIZE: usize = 20; +pub type RandomFileIdentifier = [u8; AAD_FILE_UNIQUE_SIZE]; + +const NONCE_SIZE: usize = 12; +const TAG_SIZE: usize = 16; +/// The value is 32. A 4-byte length field, 12-byte nonce, 16-byte tag. +pub const USUAL_ENCRYPTION_OVERHEAD: usize = 4 + NONCE_SIZE + TAG_SIZE; + +pub fn generate_random_file_identifier() -> RandomFileIdentifier { + let mut v = [0u8; AAD_FILE_UNIQUE_SIZE]; + OsRng.fill_bytes(&mut v); + v +} + +/// Returns the magic to use at the beginning and end of the file (depending on whether we use footer encryption) +pub fn parquet_magic(is_footer_encrypted: bool) -> [u8; 4] { + // For now ParquetEncryptionKey only allows footer encryption mode. And we use a custom "PARC" + // magic until we have checked that we're exactly following the format spec defined with "PARE". + if !is_footer_encrypted { PARQUET_MAGIC } else { PARQUET_MAGIC_ENCRYPTED_FOOTER_CUBE } +} + +// TODO: Could return fixed length array or some flat array,size pair instead of allocating. +pub fn parquet_aad_suffix(file_identifier: &RandomFileIdentifier, aad_module_type: u8, row_group_ordinal: i16, column_ordinal: u16, page_ordinal: Option) -> Vec { + let mut aad = Vec::::new(); + aad.extend_from_slice(file_identifier); + aad.push(aad_module_type); + let _ = aad.write_i16::(row_group_ordinal); + let _ = aad.write_u16::(column_ordinal); + if let Some(page_ordinal) = page_ordinal { + let _ = aad.write_u16::(page_ordinal); + } + aad +} + +/// PrepaddedPlaintext simply carries a buf with 16 empty bytes at the front. Then you can append +/// plaintext to it and pass it to encrypt_module, and it can then encrypt in-place and pass to the +/// Write with a single call. +pub struct PrepaddedPlaintext { + buf: Vec, +} + +impl PrepaddedPlaintext { + /// Constructs a buf for appending with plaintext and passing to encrypt_module. It is + /// recommended that you use the result of self.buf_mut() as a `Write` to append the plaintext. + pub fn new() -> PrepaddedPlaintext { + PrepaddedPlaintext{buf: vec![0u8; 16]} + } + pub fn buf_mut(&mut self) -> &mut Vec { + &mut self.buf + } +} + +/// Writes "length (4 bytes) nonce (12 bytes) ciphertext (length - 28 bytes) tag (16 bytes)" +pub fn encrypt_module(what: &str, w: &mut W, encryption_key: &ParquetEncryptionKey, mut prepadded: PrepaddedPlaintext, aad: &[u8]) -> Result<()> { + let mut cipher = Aes256Gcm::new(&encryption_key.to_aes256_gcm_key()); + let nonce = Aes256Gcm::generate_nonce(&mut OsRng); + + let buf = prepadded.buf_mut(); + let buflen = buf.len(); + let tag: Tag<_>; + { + let (front, plaintext) = buf.split_at_mut(4 + NONCE_SIZE); + + let written_len = u32::try_from(buflen - 4 + TAG_SIZE) + .map_err(|_| general_err!("Error encrypting {}. Module is too large", what))?; + front[..4].copy_from_slice(&u32::to_le_bytes(written_len)); + front[4..].copy_from_slice(&nonce); + + tag = cipher.encrypt_in_place_detached(&nonce, aad, plaintext) + .map_err(|_| general_err!("Error encrypting {}", what))?; + } + + buf.extend_from_slice(&tag); + + w.write_all(buf)?; + Ok(()) +} + +pub fn decrypt_module(what: &str, mut r: R, encryption_key: &ParquetEncryptionKey, aad: &[u8]) -> Result>> { + let mut cipher = Aes256Gcm::new(&encryption_key.to_aes256_gcm_key()); + + let buflen = r.read_u32::()?; + let buflen = buflen as usize; + if buflen < NONCE_SIZE + TAG_SIZE { + return Err(general_err!("Invalid Parquet file. Encrypted buffer length too short")); + } + let mut buf = vec![0u8; buflen]; + r.read_exact(&mut buf)?; + + let nonce = *Nonce::from_slice(&buf[..NONCE_SIZE]); + let tag = *Tag::from_slice(&buf[buflen - TAG_SIZE..]); + + cipher.decrypt_in_place_detached(&nonce, aad, &mut buf[NONCE_SIZE..buflen - TAG_SIZE], &tag) + .map_err(|_| general_err!("Error decrypting {}", what))?; + + // Now trim the buf of its trailing tag, and return a Cursor that skips past the nonce. + // And just to prevent any weirdness, zero out the nonce. + buf.truncate(buflen - TAG_SIZE); + buf[..NONCE_SIZE].fill(0); + + let mut cursor = Cursor::new(buf); + cursor.set_position(NONCE_SIZE as u64); + + Ok(cursor) +} diff --git a/parquet/src/file/footer.rs b/parquet/src/file/footer.rs index 1e2d95fc1c9f..90a167615608 100644 --- a/parquet/src/file/footer.rs +++ b/parquet/src/file/footer.rs @@ -22,7 +22,7 @@ use std::{ }; use byteorder::{ByteOrder, LittleEndian}; -use parquet_format::{ColumnOrder as TColumnOrder, FileMetaData as TFileMetaData}; +use parquet_format::{ColumnOrder as TColumnOrder, FileCryptoMetaData as TFileCryptoMetaData, FileMetaData as TFileMetaData}; use thrift::protocol::TCompactInputProtocol; use crate::basic::ColumnOrder; @@ -35,6 +35,27 @@ use crate::file::{ use crate::schema::types::{self, SchemaDescriptor}; +use crate::file::{encryption::{decrypt_module, parquet_magic, ParquetEncryptionConfig, PARQUET_KEY_HASH_LENGTH, ParquetEncryptionKey, ParquetEncryptionKeyInfo, RandomFileIdentifier, AAD_FILE_UNIQUE_SIZE}, PARQUET_MAGIC_ENCRYPTED_FOOTER_CUBE, PARQUET_MAGIC_UNSUPPORTED_PARE}; + +fn select_key(encryption_config: &ParquetEncryptionConfig, key_metadata: &Option>) -> Result { + if let Some(key_id) = key_metadata { + if key_id.len() != PARQUET_KEY_HASH_LENGTH { + return Err(general_err!("Unsupported Parquet file. key_metadata field length is not supported")); + } + let mut key_id_arr = [0u8; PARQUET_KEY_HASH_LENGTH]; + key_id_arr.copy_from_slice(&key_id); + let read_keys: &[ParquetEncryptionKeyInfo] = encryption_config.read_keys(); + for key_info in read_keys { + if key_info.key.compute_key_hash() == key_id_arr { + return Ok(key_info.key) + } + } + return Err(general_err!("Parquet file is encrypted with an unknown or out-of-rotation key")); + } else { + return Err(general_err!("Unsupported Parquet file. Expecting key_metadata field to be used")); + } +} + /// Layout of Parquet file /// +---------------------------+-----+---+ /// | Rest of file | B | A | @@ -43,7 +64,7 @@ use crate::schema::types::{self, SchemaDescriptor}; /// /// The reader first reads DEFAULT_FOOTER_SIZE bytes from the end of the file. /// If it is not enough according to the length indicated in the footer, it reads more bytes. -pub fn parse_metadata(chunk_reader: &R) -> Result { +pub fn parse_metadata(chunk_reader: &R, encryption_config: &Option) -> Result<(ParquetMetaData, Option)> { // check file is large enough to hold footer let file_size = chunk_reader.len(); if file_size < (FOOTER_SIZE as u64) { @@ -60,8 +81,19 @@ pub fn parse_metadata(chunk_reader: &R) -> Result(chunk_reader: &R) -> Result; + + let mut metadata_read: Box; if footer_metadata_len > file_size as usize { return Err(general_err!( "Invalid Parquet file. Metadata start is less than zero ({})", @@ -86,21 +121,71 @@ pub fn parse_metadata(chunk_reader: &R) -> Result; + + let random_file_identifier: Option; + if let Some(encryption_config) = encryption_config { + let file_crypto_metadata = { + let mut prot = TCompactInputProtocol::new(&mut metadata_read); + TFileCryptoMetaData::read_from_in_protocol(&mut prot) + .map_err(|e| ParquetError::General(format!("Could not parse crypto metadata: {}", e)))? + }; + + let encryption_key = select_key(encryption_config, &file_crypto_metadata.key_metadata)?; + + let mut aad_file_unique: RandomFileIdentifier; + // TODO: What's to stop somebody from switching out aad_file_unique with their own value and then swapping components between files? + match file_crypto_metadata.encryption_algorithm { + parquet_format::EncryptionAlgorithm::AESGCMV1(gcmv1) => { + if gcmv1.aad_prefix.is_some() || gcmv1.supply_aad_prefix.is_some() { + return Err(general_err!("Unsupported Parquet file. Use of aad_prefix is not expected")); + } + if let Some(afu) = gcmv1.aad_file_unique { + if afu.len() != AAD_FILE_UNIQUE_SIZE { + return Err(general_err!("Unsupported Parquet file. aad_file_unique is not of the expected size")); + } + aad_file_unique = [0u8; AAD_FILE_UNIQUE_SIZE]; + aad_file_unique.copy_from_slice(&afu); + } else { + return Err(general_err!("Unsupported Parquet file. aad_file_unique must be set")); + } + }, + parquet_format::EncryptionAlgorithm::AESGCMCTRV1(_) => { + return Err(general_err!("Unsupported Parquet file. AES_GCM_CTR_V1 mode is not expected")); + } + } + + let no_aad = &[]; + let plaintext_cursor = decrypt_module("footer", metadata_read, &encryption_key, no_aad)?; + + metadata_read = Box::new(plaintext_cursor); + + returned_encryption_key = Some(encryption_key); + random_file_identifier = Some(aad_file_unique); + } else { + returned_encryption_key = None; + random_file_identifier = None; + } + // TODO: row group filtering let mut prot = TCompactInputProtocol::new(metadata_read); let t_file_metadata: TFileMetaData = TFileMetaData::read_from_in_protocol(&mut prot) .map_err(|e| ParquetError::General(format!("Could not parse metadata: {}", e)))?; + if t_file_metadata.encryption_algorithm.is_some() || t_file_metadata.footer_signing_key_metadata.is_some() { + return Err(general_err!("Unsupported Parquet file. Plaintext footer mode is not supported")); + } + let schema = types::from_thrift(&t_file_metadata.schema)?; let schema_descr = Arc::new(SchemaDescriptor::new(schema)); let mut row_groups = Vec::new(); @@ -116,12 +201,13 @@ pub fn parse_metadata(chunk_reader: &R) -> Result>, schema_descr: SchemaDescPtr, column_orders: Option>, + random_file_identifier: Option, } impl FileMetaData { @@ -124,6 +127,7 @@ impl FileMetaData { key_value_metadata: Option>, schema_descr: SchemaDescPtr, column_orders: Option>, + random_file_identifier: Option, ) -> Self { FileMetaData { version, @@ -132,6 +136,7 @@ impl FileMetaData { key_value_metadata, schema_descr, column_orders, + random_file_identifier, } } @@ -196,6 +201,10 @@ impl FileMetaData { .map(|data| data[i]) .unwrap_or(ColumnOrder::UNDEFINED) } + + pub fn random_file_identifier(&self) -> &Option { + &self.random_file_identifier + } } /// Reference counted pointer for [`RowGroupMetaData`]. @@ -208,12 +217,14 @@ pub struct RowGroupMetaData { num_rows: i64, total_byte_size: i64, schema_descr: SchemaDescPtr, + /// Ordinal position of this row group in file + ordinal: Option, } impl RowGroupMetaData { /// Returns builer for row group metadata. - pub fn builder(schema_descr: SchemaDescPtr) -> RowGroupMetaDataBuilder { - RowGroupMetaDataBuilder::new(schema_descr) + pub fn builder(schema_descr: SchemaDescPtr, ordinal: i16) -> RowGroupMetaDataBuilder { + RowGroupMetaDataBuilder::new(schema_descr, ordinal) } /// Number of columns in this row group. @@ -269,21 +280,33 @@ impl RowGroupMetaData { let cc = ColumnChunkMetaData::from_thrift(d.clone(), c)?; columns.push(cc); } + // Notably, the function to_thrift, below, doesn't write these fields, and RowGroupMetadata doesn't have them. + if rg.file_offset.is_some() { + return Err(ParquetError::NYI("Parsing RowGroup file_offset fields is not yet implemented".to_string())); + } + if rg.total_compressed_size.is_some() { + return Err(ParquetError::NYI("Parsing RowGroup total_compressed_size fields is not yet implemented".to_string())); + } Ok(RowGroupMetaData { columns, num_rows, total_byte_size, schema_descr, + ordinal: rg.ordinal, }) } /// Method to convert to Thrift. pub fn to_thrift(&self) -> RowGroup { + // TODO: Understand file_offset and total_compressed_size fields. RowGroup { columns: self.columns().iter().map(|v| v.to_thrift()).collect(), total_byte_size: self.total_byte_size, num_rows: self.num_rows, sorting_columns: None, + file_offset: None, + total_compressed_size: None, + ordinal: self.ordinal, } } } @@ -294,16 +317,18 @@ pub struct RowGroupMetaDataBuilder { schema_descr: SchemaDescPtr, num_rows: i64, total_byte_size: i64, + ordinal: Option, } impl RowGroupMetaDataBuilder { /// Creates new builder from schema descriptor. - fn new(schema_descr: SchemaDescPtr) -> Self { + fn new(schema_descr: SchemaDescPtr, ordinal: i16) -> Self { Self { columns: Vec::with_capacity(schema_descr.num_columns()), schema_descr, num_rows: 0, total_byte_size: 0, + ordinal: Some(ordinal), } } @@ -325,6 +350,12 @@ impl RowGroupMetaDataBuilder { self } + /// Sets ordinal for this row group. + pub fn set_ordinal(mut self, value: i16) -> Self { + self.ordinal = Some(value); + self + } + /// Builds row group metadata. pub fn build(self) -> Result { if self.schema_descr.num_columns() != self.columns.len() { @@ -340,6 +371,7 @@ impl RowGroupMetaDataBuilder { num_rows: self.num_rows, total_byte_size: self.total_byte_size, schema_descr: self.schema_descr, + ordinal: self.ordinal, }) } } @@ -497,6 +529,9 @@ impl ColumnChunkMetaData { let index_page_offset = col_metadata.index_page_offset; let dictionary_page_offset = col_metadata.dictionary_page_offset; let statistics = statistics::from_thrift(column_type, col_metadata.statistics); + if col_metadata.bloom_filter_offset.is_some() { + return Err(ParquetError::NYI("Parsing ColumnMetaData bloom_filter_offset fields is not yet implemented".to_string())) + } let result = ColumnChunkMetaData { column_type, column_path, @@ -532,6 +567,7 @@ impl ColumnChunkMetaData { dictionary_page_offset: self.dictionary_page_offset, statistics: statistics::to_thrift(self.statistics.as_ref()), encoding_stats: None, + bloom_filter_offset: None, }; ColumnChunk { @@ -542,6 +578,8 @@ impl ColumnChunkMetaData { offset_index_length: None, column_index_offset: None, column_index_length: None, + crypto_metadata: None, + encrypted_column_metadata: None, } } } @@ -672,6 +710,8 @@ impl ColumnChunkMetaDataBuilder { mod tests { use super::*; + const TEST_ROW_GROUP_ORDINAL: i16 = 124; + #[test] fn test_row_group_metadata_thrift_conversion() { let schema_descr = get_test_schema_descr(); @@ -681,7 +721,7 @@ mod tests { let column = ColumnChunkMetaData::builder(ptr.clone()).build().unwrap(); columns.push(column); } - let row_group_meta = RowGroupMetaData::builder(schema_descr.clone()) + let row_group_meta = RowGroupMetaData::builder(schema_descr.clone(), TEST_ROW_GROUP_ORDINAL) .set_num_rows(1000) .set_total_byte_size(2000) .set_column_metadata(columns) @@ -701,7 +741,7 @@ mod tests { fn test_row_group_metadata_thrift_conversion_empty() { let schema_descr = get_test_schema_descr(); - let row_group_meta = RowGroupMetaData::builder(schema_descr).build(); + let row_group_meta = RowGroupMetaData::builder(schema_descr, TEST_ROW_GROUP_ORDINAL).build(); assert!(row_group_meta.is_err()); if let Err(e) = row_group_meta { @@ -769,7 +809,7 @@ mod tests { .unwrap(); columns.push(column); } - let row_group_meta = RowGroupMetaData::builder(schema_descr) + let row_group_meta = RowGroupMetaData::builder(schema_descr, TEST_ROW_GROUP_ORDINAL) .set_num_rows(1000) .set_column_metadata(columns) .build() diff --git a/parquet/src/file/mod.rs b/parquet/src/file/mod.rs index f85de98ccab6..b3913016b69e 100644 --- a/parquet/src/file/mod.rs +++ b/parquet/src/file/mod.rs @@ -102,9 +102,14 @@ pub mod reader; pub mod serialized_reader; pub mod statistics; pub mod writer; +pub mod encryption; const FOOTER_SIZE: usize = 8; const PARQUET_MAGIC: [u8; 4] = [b'P', b'A', b'R', b'1']; +/// Parquet uses PARE for encrypted footer mode, not PARC -- once we take care to check that we obey +/// the Parquet encryption spec in exact detail, this can be PARE. +const PARQUET_MAGIC_ENCRYPTED_FOOTER_CUBE: [u8; 4] = [b'P', b'A', b'R', b'C']; +const PARQUET_MAGIC_UNSUPPORTED_PARE: [u8; 4] = [b'P', b'A', b'R', b'E']; /// The number of bytes read at the end of the parquet file on first read const DEFAULT_FOOTER_READ_SIZE: usize = 64 * 1024; diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index c48e4e7a07b0..90dd95a36bc4 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -51,6 +51,7 @@ use std::{collections::HashMap, sync::Arc}; use crate::basic::{Compression, Encoding}; +use crate::file::encryption::{ParquetEncryptionKeyInfo, RandomFileIdentifier}; use crate::file::metadata::KeyValue; use crate::schema::types::ColumnPath; @@ -102,6 +103,7 @@ pub struct WriterProperties { pub(crate) key_value_metadata: Option>, default_column_properties: ColumnProperties, column_properties: HashMap, + pub(crate) encryption_info: Option<(ParquetEncryptionKeyInfo, RandomFileIdentifier)>, } impl WriterProperties { @@ -228,6 +230,7 @@ pub struct WriterPropertiesBuilder { key_value_metadata: Option>, default_column_properties: ColumnProperties, column_properties: HashMap, + encryption_info: Option<(ParquetEncryptionKeyInfo, RandomFileIdentifier)>, } impl WriterPropertiesBuilder { @@ -243,6 +246,7 @@ impl WriterPropertiesBuilder { key_value_metadata: None, default_column_properties: ColumnProperties::new(), column_properties: HashMap::new(), + encryption_info: None, } } @@ -258,6 +262,7 @@ impl WriterPropertiesBuilder { key_value_metadata: self.key_value_metadata, default_column_properties: self.default_column_properties, column_properties: self.column_properties, + encryption_info: self.encryption_info, } } @@ -307,6 +312,12 @@ impl WriterPropertiesBuilder { self } + /// Sets "encryption key" property. + pub fn set_encryption_info(mut self, value: Option<(ParquetEncryptionKeyInfo, RandomFileIdentifier)>) -> Self { + self.encryption_info = value; + self + } + // ---------------------------------------------------------------------- // Setters for any column (global) diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index d0158852d938..d95bb76c1b09 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -18,6 +18,7 @@ //! Contains implementations of the reader traits FileReader, RowGroupReader and PageReader //! Also contains implementations of the ChunkReader for files (with buffering) and byte arrays (RAM) +use std::io::Cursor; use std::{convert::TryFrom, fs::File, io::Read, path::Path, sync::Arc}; use parquet_format::{PageHeader, PageType}; @@ -27,6 +28,7 @@ use crate::basic::{Compression, Encoding, Type}; use crate::column::page::{Page, PageReader}; use crate::compression::{create_codec, Codec}; use crate::errors::{ParquetError, Result}; +use crate::file::encryption::{decrypt_module, parquet_aad_suffix, ParquetEncryptionKey, RandomFileIdentifier, USUAL_ENCRYPTION_OVERHEAD}; use crate::file::{footer, metadata::*, reader::*, statistics}; use crate::record::reader::RowIter; use crate::record::Row; @@ -37,6 +39,8 @@ use crate::util::{io::TryClone, memory::ByteBufferPtr}; // re-use the logic in their own ParquetFileWriter wrappers pub use crate::util::{cursor::SliceableCursor, io::FileSource}; +use super::encryption::ParquetEncryptionConfig; + // ---------------------------------------------------------------------- // Implementations of traits facilitating the creation of a new reader @@ -125,16 +129,18 @@ impl IntoIterator for SerializedFileReader { pub struct SerializedFileReader { chunk_reader: Arc, metadata: ParquetMetaData, + encryption_info: Option<(ParquetEncryptionKey, RandomFileIdentifier)> } impl SerializedFileReader { /// Creates file reader from a Parquet file. /// Returns error if Parquet file does not exist or is corrupt. pub fn new(chunk_reader: R) -> Result { - let metadata = footer::parse_metadata(&chunk_reader)?; + let (metadata, _) = footer::parse_metadata(&chunk_reader, &None)?; Ok(Self { chunk_reader: Arc::new(chunk_reader), metadata, + encryption_info: None, }) } @@ -143,9 +149,33 @@ impl SerializedFileReader { Self { chunk_reader: Arc::new(chunk_reader), metadata, + encryption_info: None, } } + pub fn new_maybe_encrypted(chunk_reader: R, encryption_config: &Option) -> Result { + let (metadata, encryption_key) = footer::parse_metadata(&chunk_reader, encryption_config)?; + Self::new_with_metadata_maybe_encrypted(chunk_reader, metadata, &encryption_key) + } + + /// Creates file reader from a Parquet file, using pre-read metadata. + pub fn new_with_metadata_maybe_encrypted(chunk_reader: R, metadata: ParquetMetaData, encryption_key: &Option) -> Result { + let encryption_info: Option<_>; + if let Some(encryption_key) = encryption_key { + let random_file_identifier = metadata.file_metadata().random_file_identifier().ok_or_else( + || general_err!("Unsupported Parquet file: When encryption is used, FileMetaData must have a random_file_identifier") + )?; + encryption_info = Some((*encryption_key, random_file_identifier)); + } else { + encryption_info = None; + } + Ok(Self { + chunk_reader: Arc::new(chunk_reader), + metadata, + encryption_info, + }) + } + /// Filters row group metadata to only those row groups, /// for which the predicate function returns true pub fn filter_row_groups( @@ -179,9 +209,14 @@ impl FileReader for SerializedFileReader { let row_group_metadata = self.metadata.row_group(i); // Row groups should be processed sequentially. let f = Arc::clone(&self.chunk_reader); + // TODO: It seems lame that we have this limit in unencrypted mode. And maybe we could error earlier. + let row_group_ordinal: i16 = i16::try_from(i) + .map_err(|_| general_err!("number of row groups cannot exceed {}", 1 << 15))?; Ok(Box::new(SerializedRowGroupReader::new( f, row_group_metadata, + self.encryption_info, + row_group_ordinal, ))) } @@ -194,14 +229,18 @@ impl FileReader for SerializedFileReader { pub struct SerializedRowGroupReader<'a, R: ChunkReader> { chunk_reader: Arc, metadata: &'a RowGroupMetaData, + encryption_info: Option<(ParquetEncryptionKey, RandomFileIdentifier)>, + row_group_ordinal: i16, } impl<'a, R: ChunkReader> SerializedRowGroupReader<'a, R> { /// Creates new row group reader from a file and row group metadata. - fn new(chunk_reader: Arc, metadata: &'a RowGroupMetaData) -> Self { + fn new(chunk_reader: Arc, metadata: &'a RowGroupMetaData, encryption_info: Option<(ParquetEncryptionKey, RandomFileIdentifier)>, row_group_ordinal: i16) -> Self { Self { chunk_reader, metadata, + encryption_info, + row_group_ordinal, } } } @@ -220,10 +259,17 @@ impl<'a, R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<' let col = self.metadata.column(i); let (col_start, col_length) = col.byte_range(); let file_chunk = self.chunk_reader.get_read(col_start, col_length as usize)?; + // TODO: It seems lame that we have this limit in unencrypted mode. And maybe we could error earlier. + let column_ordinal = u16::try_from(i) + .map_err(|_| general_err!("number of columns cannot exceed {}", u16::MAX as u32 + 1))?; let page_reader = SerializedPageReader::new( file_chunk, + self.encryption_info, + self.row_group_ordinal, + column_ordinal, col.num_values(), col.compression(), + col.has_dictionary_page(), col.column_descr().physical_type(), )?; Ok(Box::new(page_reader)) @@ -240,6 +286,14 @@ pub struct SerializedPageReader { // to be read by this page reader. buf: T, + encryption_info: Option<(ParquetEncryptionKey, RandomFileIdentifier)>, + row_group_ordinal: i16, + column_ordinal: u16, + + // Mutable: the page_ordinal of the next page read. Initialized with None in the case we expect + // to start with a dictionary page, then gets incremented to Some(0) before the first data page. + page_ordinal: Option, + // The compression codec for this column chunk. Only set for non-PLAIN codec. decompressor: Option>, @@ -257,13 +311,22 @@ impl SerializedPageReader { /// Creates a new serialized page reader from file source. pub fn new( buf: T, + encryption_info: Option<(ParquetEncryptionKey, RandomFileIdentifier)>, + row_group_ordinal: i16, + column_ordinal: u16, total_num_values: i64, compression: Compression, + dictionary_enabled: bool, physical_type: Type, ) -> Result { let decompressor = create_codec(compression)?; + let page_ordinal = if dictionary_enabled { None } else { Some(0) }; let result = Self { buf, + encryption_info, + row_group_ordinal, + column_ordinal, + page_ordinal, total_num_values, seen_num_values: 0, decompressor, @@ -273,10 +336,21 @@ impl SerializedPageReader { } /// Reads Page header from Thrift. - fn read_page_header(&mut self) -> Result { - let mut prot = TCompactInputProtocol::new(&mut self.buf); - let page_header = PageHeader::read_from_in_protocol(&mut prot)?; - Ok(page_header) + fn read_page_header(&mut self, aad_header_module_type: u8, aad_page_ordinal: Option) -> Result { + if let Some((encryption_key, random_file_identifier)) = &self.encryption_info { + let aad_suffix = parquet_aad_suffix(random_file_identifier, aad_header_module_type, + self.row_group_ordinal, self.column_ordinal, aad_page_ordinal); + + let plaintext_cursor = decrypt_module("PageHeader", &mut self.buf, encryption_key, &aad_suffix)?; + + let mut prot = TCompactInputProtocol::new(plaintext_cursor); + let page_header = PageHeader::read_from_in_protocol(&mut prot)?; + Ok(page_header) + } else { + let mut prot = TCompactInputProtocol::new(&mut self.buf); + let page_header = PageHeader::read_from_in_protocol(&mut prot)?; + Ok(page_header) + } } } @@ -284,14 +358,47 @@ impl Iterator for SerializedPageReader { type Item = Result; fn next(&mut self) -> Option { - self.get_next_page().transpose() + let ret = self.get_next_page().transpose(); + ret } } +pub const DATA_PAGE_MODULE_TYPE: u8 = 2; +pub const DICTIONARY_PAGE_MODULE_TYPE: u8 = 3; +pub const DATA_PAGE_HEADER_MODULE_TYPE: u8 = 4; +pub const DICTIONARY_PAGE_HEADER_MODULE_TYPE: u8 = 5; +// TODO: We (the existing Rust lib) write a ColumnChunk after the data pages. Is this described in +// the docs? It never gets read -- in fact the ChunkReader, using total_compressed_size, emits +// Read objects over intervals that don't contain it. What do the other Parquet libs do? +pub const COLUMNCHUNK_MODULE_TYPE: u8 = 255; + impl PageReader for SerializedPageReader { fn get_next_page(&mut self) -> Result> { while self.seen_num_values < self.total_num_values { - let page_header = self.read_page_header()?; + let aad_page_ordinal = self.page_ordinal; + let aad_module_type: u8; + let aad_header_module_type: u8; + if aad_page_ordinal.is_some() { + // Data pages + aad_module_type = DATA_PAGE_MODULE_TYPE; + aad_header_module_type = DATA_PAGE_HEADER_MODULE_TYPE; + } else { + // The dictionary page + aad_module_type = DICTIONARY_PAGE_MODULE_TYPE; + aad_header_module_type = DICTIONARY_PAGE_HEADER_MODULE_TYPE; + } + let page_header = self.read_page_header(aad_header_module_type, aad_page_ordinal)?; + + let mut cursor: Cursor>; + let reader: &mut dyn Read; + if let Some((encryption_key, random_file_identifier)) = &self.encryption_info { + let aad = parquet_aad_suffix(random_file_identifier, aad_module_type, self.row_group_ordinal, + self.column_ordinal, aad_page_ordinal); + cursor = decrypt_module("Page data", &mut self.buf, encryption_key, &aad)?; + reader = &mut cursor; + } else { + reader = &mut self.buf; + } // When processing data page v2, depending on enabled compression for the // page, we should account for uncompressed data ('offset') of @@ -310,11 +417,11 @@ impl PageReader for SerializedPageReader { can_decompress = header_v2.is_compressed.unwrap_or(true); } - let compressed_len = page_header.compressed_page_size as usize - offset; + let compressed_unencrypted_len = (page_header.compressed_page_size as usize) - (if self.encryption_info.is_some() { USUAL_ENCRYPTION_OVERHEAD } else { 0 }) - offset; let uncompressed_len = page_header.uncompressed_page_size as usize - offset; // We still need to read all bytes from buffered stream - let mut buffer = vec![0; offset + compressed_len]; - self.buf.read_exact(&mut buffer)?; + let mut buffer = vec![0; offset + compressed_unencrypted_len]; + reader.read_exact(&mut buffer)?; // TODO: page header could be huge because of statistics. We should set a // maximum page header size and abort if that is exceeded. @@ -398,6 +505,12 @@ impl PageReader for SerializedPageReader { continue; } }; + self.page_ordinal = if let Some(n) = aad_page_ordinal { + let n_plus_1 = n.checked_add(1).ok_or_else(|| general_err!("Number of pages in row group exceeded {}", u16::MAX))?; + Some(n_plus_1) + } else { + Some(0) + }; return Ok(Some(result)); } diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index e37fca3fbef3..d1303e7926f4 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -19,6 +19,7 @@ //! using row group writers and column writers respectively. use std::{ + convert::TryFrom, io::{Seek, SeekFrom, Write}, sync::Arc, }; @@ -27,15 +28,15 @@ use byteorder::{ByteOrder, LittleEndian}; use parquet_format as parquet; use thrift::protocol::{TCompactOutputProtocol, TOutputProtocol}; -use crate::basic::PageType; +use crate::{basic::PageType}; use crate::column::{ page::{CompressedPage, Page, PageWriteSpec, PageWriter}, writer::{get_column_writer, ColumnWriter}, }; use crate::errors::{ParquetError, Result}; use crate::file::{ - metadata::*, properties::WriterPropertiesPtr, - statistics::to_thrift as statistics_to_thrift, FOOTER_SIZE, PARQUET_MAGIC, + metadata::*, properties::{WriterPropertiesPtr}, + statistics::to_thrift as statistics_to_thrift, FOOTER_SIZE, }; use crate::schema::types::{self, SchemaDescPtr, SchemaDescriptor, TypePtr}; use crate::util::io::{FileSink, Position}; @@ -46,6 +47,8 @@ pub use crate::util::io::TryClone; // Exposed publically for convenience of writing Parquet to a buffer of bytes pub use crate::util::cursor::InMemoryWriteableCursor; +use crate::file::{encryption::{encrypt_module, parquet_aad_suffix, parquet_magic, ParquetEncryptionKey, PrepaddedPlaintext, RandomFileIdentifier, USUAL_ENCRYPTION_OVERHEAD}, serialized_reader::{COLUMNCHUNK_MODULE_TYPE, DATA_PAGE_HEADER_MODULE_TYPE, DATA_PAGE_MODULE_TYPE, DICTIONARY_PAGE_HEADER_MODULE_TYPE, DICTIONARY_PAGE_MODULE_TYPE}}; + // ---------------------------------------------------------------------- // APIs for file & row group writers @@ -146,7 +149,7 @@ impl SerializedFileWriter { schema: TypePtr, properties: WriterPropertiesPtr, ) -> Result { - Self::start_file(&mut buf)?; + Self::start_file(&mut buf, properties.encryption_info.is_some())?; Ok(Self { buf, schema: schema.clone(), @@ -159,9 +162,10 @@ impl SerializedFileWriter { }) } - /// Writes magic bytes at the beginning of the file. - fn start_file(buf: &mut W) -> Result<()> { - buf.write_all(&PARQUET_MAGIC)?; + /// Writes magic bytes at the beginning of the file, depending on whether the file is encrypted + /// (in encrypted footer mode). + fn start_file(buf: &mut W, is_footer_encrypted: bool) -> Result<()> { + buf.write_all(&parquet_magic(is_footer_encrypted))?; Ok(()) } @@ -191,11 +195,45 @@ impl SerializedFileWriter { key_value_metadata: self.props.key_value_metadata().to_owned(), created_by: Some(self.props.created_by().to_owned()), column_orders: None, + // encryption_algorithm and footer_signing_key_metadata are used in plaintext footer + // mode, which we don't use. + encryption_algorithm: None, + footer_signing_key_metadata: None, }; - // Write file metadata + // Write file metadata (FileCryptoMetaData (if applicable) and FileMetaData) let start_pos = self.buf.seek(SeekFrom::Current(0))?; - { + + if let Some((key_info, random_file_identifier)) = &self.props.encryption_info { + // FileCryptoMetaData and FileMetadata + + let file_crypto_metadata = parquet::FileCryptoMetaData { + encryption_algorithm: parquet::EncryptionAlgorithm::AESGCMV1(parquet_format::AesGcmV1{ + aad_prefix: None, aad_file_unique: Some(random_file_identifier.to_vec()), supply_aad_prefix: None, + }), + // TODO: Maybe the user of this parquet lib will want to make their own decision + // about this. Right now this library supports passing multiple read keys, and uses + // the Sha3-256 of the key as a key id to select the key. + key_metadata: Some(key_info.key.compute_key_hash().to_vec()), + }; + + { + let mut protocol = TCompactOutputProtocol::new(&mut self.buf); + file_crypto_metadata.write_to_out_protocol(&mut protocol)?; + protocol.flush()?; + } + + let mut plaintext = PrepaddedPlaintext::new(); + { + let mut protocol = TCompactOutputProtocol::new(plaintext.buf_mut()); + file_metadata.write_to_out_protocol(&mut protocol)?; + protocol.flush()?; + } + + let no_aad = &[]; + encrypt_module("FileMetaData", &mut self.buf, &key_info.key, plaintext, no_aad)?; + } else { + // just FileMetaData let mut protocol = TCompactOutputProtocol::new(&mut self.buf); file_metadata.write_to_out_protocol(&mut protocol)?; protocol.flush()?; @@ -206,7 +244,7 @@ impl SerializedFileWriter { let mut footer_buffer: [u8; FOOTER_SIZE] = [0; FOOTER_SIZE]; let metadata_len = (end_pos - start_pos) as i32; LittleEndian::write_i32(&mut footer_buffer, metadata_len); - (&mut footer_buffer[4..]).write_all(&PARQUET_MAGIC)?; + (&mut footer_buffer[4..]).write_all(&parquet_magic(self.props.encryption_info.is_some()))?; self.buf.write_all(&footer_buffer)?; Ok(file_metadata) } @@ -235,9 +273,12 @@ impl FileWriter for SerializedFileWriter { fn next_row_group(&mut self) -> Result> { self.assert_closed()?; self.assert_previous_writer_closed()?; + let row_group_ordinal: i16 = i16::try_from(self.row_groups.len()) + .map_err(|_| general_err!("Number of row groups cannot exceed {}", i16::MAX as i32 + 1))?; let row_group_writer = SerializedRowGroupWriter::new( self.descr.clone(), self.props.clone(), + row_group_ordinal, &self.buf, ); self.previous_writer_closed = false; @@ -277,6 +318,7 @@ pub struct SerializedRowGroupWriter { total_bytes_written: u64, column_index: usize, previous_writer_closed: bool, + row_group_ordinal: i16, row_group_metadata: Option, column_chunks: Vec, } @@ -285,6 +327,7 @@ impl SerializedRowGroupWriter { pub fn new( schema_descr: SchemaDescPtr, properties: WriterPropertiesPtr, + row_group_ordinal: i16, buf: &W, ) -> Self { let num_columns = schema_descr.num_columns(); @@ -296,6 +339,7 @@ impl SerializedRowGroupWriter { total_bytes_written: 0, column_index: 0, previous_writer_closed: true, + row_group_ordinal, row_group_metadata: None, column_chunks: Vec::with_capacity(num_columns), } @@ -360,8 +404,10 @@ impl RowGroupWriter for SerializedRowGroupWriter if self.column_index >= self.descr.num_columns() { return Ok(None); } + let column_ordinal: u16 = u16::try_from(self.column_index) + .map_err(|_| general_err!("Number of columns cannot exceed {}", u16::MAX as u32 + 1))?; let sink = FileSink::new(&self.buf); - let page_writer = Box::new(SerializedPageWriter::new(sink)); + let page_writer = Box::new(SerializedPageWriter::new(sink, self.props.encryption_info.as_ref().map(|(key_info, rfi)| (key_info.key, *rfi)), self.row_group_ordinal, column_ordinal)); let column_writer = get_column_writer( self.descr.column(self.column_index), self.props.clone(), @@ -386,7 +432,7 @@ impl RowGroupWriter for SerializedRowGroupWriter self.assert_previous_writer_closed()?; let column_chunks = std::mem::take(&mut self.column_chunks); - let row_group_metadata = RowGroupMetaData::builder(self.descr.clone()) + let row_group_metadata = RowGroupMetaData::builder(self.descr.clone(), self.row_group_ordinal) .set_column_metadata(column_chunks) .set_total_byte_size(self.total_bytes_written as i64) .set_num_rows(self.total_rows_written.unwrap_or(0) as i64) @@ -406,20 +452,36 @@ impl RowGroupWriter for SerializedRowGroupWriter /// `SerializedPageWriter` should not be used after calling `close()`. pub struct SerializedPageWriter { sink: T, + encryption_info: Option<(ParquetEncryptionKey, RandomFileIdentifier)>, + row_group_ordinal: i16, + column_ordinal: u16, } impl SerializedPageWriter { /// Creates new page writer. - pub fn new(sink: T) -> Self { - Self { sink } + pub fn new(sink: T, encryption_info: Option<(ParquetEncryptionKey, RandomFileIdentifier)>, row_group_ordinal: i16, column_ordinal: u16) -> Self { + Self { sink, encryption_info, row_group_ordinal, column_ordinal } } /// Serializes page header into Thrift. + /// aad_header_module_type needs to be the correct value that corresponds to header.page_type(). /// Returns number of bytes that have been written into the sink. #[inline] - fn serialize_page_header(&mut self, header: parquet::PageHeader) -> Result { + fn serialize_page_header(&mut self, header: parquet::PageHeader, aad_header_module_type: u8, page_ordinal: Option) -> Result { let start_pos = self.sink.pos(); - { + if let Some((encryption_key, random_file_identifier)) = &self.encryption_info { + let aad_suffix = parquet_aad_suffix(random_file_identifier, aad_header_module_type, + self.row_group_ordinal, self.column_ordinal, page_ordinal); + + let mut plaintext = PrepaddedPlaintext::new(); + { + let mut protocol = TCompactOutputProtocol::new(plaintext.buf_mut()); + header.write_to_out_protocol(&mut protocol)?; + protocol.flush()?; + } + + encrypt_module("PageHeader", &mut self.sink, encryption_key, plaintext, &aad_suffix)?; + } else { let mut protocol = TCompactOutputProtocol::new(&mut self.sink); header.write_to_out_protocol(&mut protocol)?; protocol.flush()?; @@ -431,17 +493,35 @@ impl SerializedPageWriter { /// Returns Ok() if there are not errors serializing and writing data into the sink. #[inline] fn serialize_column_chunk(&mut self, chunk: parquet::ColumnChunk) -> Result<()> { - let mut protocol = TCompactOutputProtocol::new(&mut self.sink); - chunk.write_to_out_protocol(&mut protocol)?; - protocol.flush()?; + if let Some((encryption_key, random_file_identifier)) = &self.encryption_info { + // TODO: Verify that we behave the same way as other arrow implementations here, in the + // sense that we should verify that others write out this ColumnChunk _here_ at all. + let aad_module_type = COLUMNCHUNK_MODULE_TYPE; + let aad_suffix = parquet_aad_suffix(random_file_identifier, aad_module_type, + self.row_group_ordinal, self.column_ordinal, None); + + let mut plaintext = PrepaddedPlaintext::new(); + { + let mut protocol = TCompactOutputProtocol::new(plaintext.buf_mut()); + chunk.write_to_out_protocol(&mut protocol)?; + protocol.flush()?; + } + + encrypt_module("ColumnChunk", &mut self.sink, encryption_key, plaintext, &aad_suffix)?; + } else { + let mut protocol = TCompactOutputProtocol::new(&mut self.sink); + chunk.write_to_out_protocol(&mut protocol)?; + protocol.flush()?; + } Ok(()) } } impl PageWriter for SerializedPageWriter { - fn write_page(&mut self, page: CompressedPage) -> Result { + fn write_page(&mut self, page: CompressedPage, aad_page_ordinal: Option) -> Result { let uncompressed_size = page.uncompressed_size(); - let compressed_size = page.compressed_size(); + let compressed_unencrypted_size = page.compressed_unencrypted_size(); + let compressed_size = (if self.encryption_info.is_some() { USUAL_ENCRYPTION_OVERHEAD } else { 0 }) + compressed_unencrypted_size; let num_values = page.num_values(); let encoding = page.encoding(); let page_type = page.page_type(); @@ -458,6 +538,8 @@ impl PageWriter for SerializedPageWriter { data_page_header_v2: None, }; + let aad_module_type: u8; + let aad_header_module_type: u8; match *page.compressed_page() { Page::DataPage { def_level_encoding, @@ -473,6 +555,8 @@ impl PageWriter for SerializedPageWriter { statistics: statistics_to_thrift(statistics.as_ref()), }; page_header.data_page_header = Some(data_page_header); + aad_module_type = DATA_PAGE_MODULE_TYPE; + aad_header_module_type = DATA_PAGE_HEADER_MODULE_TYPE; } Page::DataPageV2 { num_nulls, @@ -494,6 +578,8 @@ impl PageWriter for SerializedPageWriter { statistics: statistics_to_thrift(statistics.as_ref()), }; page_header.data_page_header_v2 = Some(data_page_header_v2); + aad_module_type = DATA_PAGE_MODULE_TYPE; + aad_header_module_type = DATA_PAGE_HEADER_MODULE_TYPE; } Page::DictionaryPage { is_sorted, .. } => { let dictionary_page_header = parquet::DictionaryPageHeader { @@ -502,13 +588,26 @@ impl PageWriter for SerializedPageWriter { is_sorted: Some(is_sorted), }; page_header.dictionary_page_header = Some(dictionary_page_header); + aad_module_type = DICTIONARY_PAGE_MODULE_TYPE; + aad_header_module_type = DICTIONARY_PAGE_HEADER_MODULE_TYPE; } } let start_pos = self.sink.pos(); - let header_size = self.serialize_page_header(page_header)?; - self.sink.write_all(page.data())?; + // TODO: header_size is after encryption -- is that what we want? What about for uncompressed_size? + let header_size = self.serialize_page_header(page_header, aad_header_module_type, aad_page_ordinal)?; + + if let Some((encryption_key, random_file_identifier)) = &self.encryption_info { + let aad_suffix = parquet_aad_suffix(random_file_identifier, aad_module_type, + self.row_group_ordinal, self.column_ordinal, aad_page_ordinal); + + let mut plaintext = PrepaddedPlaintext::new(); + plaintext.buf_mut().extend_from_slice(page.data()); + encrypt_module("Page data", &mut self.sink, encryption_key, plaintext, &aad_suffix)?; + } else { + self.sink.write_all(page.data())?; + } let mut spec = PageWriteSpec::new(); spec.page_type = page_type; @@ -538,11 +637,15 @@ impl PageWriter for SerializedPageWriter { mod tests { use super::*; + use std::os::unix::fs::FileExt; use std::{fs::File, io::Cursor}; use crate::basic::{Compression, Encoding, IntType, LogicalType, Repetition, Type}; use crate::column::page::PageReader; use crate::compression::{create_codec, Codec}; + use crate::file::encryption::{generate_random_file_identifier, ParquetEncryptionConfig, ParquetEncryptionKeyInfo}; + use crate::file::reader::Length; + use crate::file::{PARQUET_MAGIC, PARQUET_MAGIC_ENCRYPTED_FOOTER_CUBE}; use crate::file::{ properties::{WriterProperties, WriterVersion}, reader::{FileReader, SerializedFileReader, SerializedPageReader}, @@ -879,14 +982,31 @@ mod tests { test_page_roundtrip(&pages[..], Compression::UNCOMPRESSED, Type::INT32); } + const TEST_ROW_GROUP_ORDINAL: i16 = 2325; + const TEST_COLUMN_ORDINAL: u16 = 135; + /// Tests writing and reading pages. /// Physical type is for statistics only, should match any defined statistics type in /// pages. fn test_page_roundtrip(pages: &[Page], codec: Compression, physical_type: Type) { + test_page_roundtrip_helper(pages, codec, physical_type, &None); + test_page_roundtrip_helper(pages, codec, physical_type, + &Some((ParquetEncryptionKey::generate_key(), generate_random_file_identifier()))); + } + + fn test_page_roundtrip_helper(pages: &[Page], codec: Compression, physical_type: Type, + encryption_info: &Option<(ParquetEncryptionKey, RandomFileIdentifier)>) { + let mut compressed_pages = vec![]; let mut total_num_values = 0i64; let mut compressor = create_codec(codec).unwrap(); + // Kind of silly because we don't enforce in this test helper function that pages are in the + // correct order (dictionary first), but we don't have encryption in this test (yet?) anyway + // (as that's where pages need to be in the proper order, as we need to know the aad suffix + // in advance) so it doesn't really matter. + let mut has_dictionary_page = false; + for page in pages { let uncompressed_len = page.buffer().len(); @@ -955,6 +1075,8 @@ mod tests { } => { let output_buf = compress_helper(compressor.as_mut(), buf.data()); + has_dictionary_page = true; + Page::DictionaryPage { buf: ByteBufferPtr::new(output_buf), num_values, @@ -972,18 +1094,24 @@ mod tests { let mut result_pages: Vec = vec![]; { let cursor = Cursor::new(&mut buffer); - let mut page_writer = SerializedPageWriter::new(cursor); + let mut page_writer = SerializedPageWriter::new(cursor, *encryption_info, TEST_ROW_GROUP_ORDINAL, TEST_COLUMN_ORDINAL); + let mut page_ordinal = if has_dictionary_page { None:: } else { Some(0) }; for page in compressed_pages { - page_writer.write_page(page).unwrap(); + page_writer.write_page(page, page_ordinal).unwrap(); + page_ordinal = Some(page_ordinal.map_or(0, |x| x + 1)); } page_writer.close().unwrap(); } { let mut page_reader = SerializedPageReader::new( Cursor::new(&buffer), + *encryption_info, + TEST_ROW_GROUP_ORDINAL, + TEST_COLUMN_ORDINAL, total_num_values, codec, + has_dictionary_page, physical_type, ) .unwrap(); @@ -1019,9 +1147,30 @@ mod tests { assert_eq!(to_thrift(left.statistics()), to_thrift(right.statistics())); } + fn assert_magic(file: &mut File, expected: [u8; 4]) { + let length = file.len(); + // Of course the file has to be larger than just 8, but we're just sanity-checking when checking the magic. + assert!(length >= 8); + + let mut buf = [0xCDu8, 0xCD, 0xCD, 0xCD]; + file.read_exact_at(&mut buf[..], 0).unwrap(); + assert_eq!(buf, expected); + file.read_exact_at(&mut buf[..], length - 4).unwrap(); + assert_eq!(buf, expected); + } + /// File write-read roundtrip. /// `data` consists of arrays of values for each row group. - fn test_file_roundtrip(file: File, data: Vec>) { + fn test_file_roundtrip(mut file: File, data: Vec>) { + test_file_roundtrip_with_encryption_key(file.try_clone().unwrap(), &data, &None); + assert_magic(&mut file, PARQUET_MAGIC); + file.set_len(0).unwrap(); + file.seek(SeekFrom::Start(0)).unwrap(); + test_file_roundtrip_with_encryption_key(file.try_clone().unwrap(), &data, &Some(ParquetEncryptionKey::generate_key())); + assert_magic(&mut file, PARQUET_MAGIC_ENCRYPTED_FOOTER_CUBE); + } + + fn test_file_roundtrip_with_encryption_key(file: File, data: &Vec>, encryption_key: &Option) { let schema = Arc::new( types::Type::group_type_builder("schema") .with_fields(&mut vec![Arc::new( @@ -1033,13 +1182,14 @@ mod tests { .build() .unwrap(), ); - let props = Arc::new(WriterProperties::builder().build()); + let encryption_info = encryption_key.map(|key| (ParquetEncryptionKeyInfo{key_id: "a key id".to_string(), key}, generate_random_file_identifier())); + let props = Arc::new(WriterProperties::builder().set_encryption_info(encryption_info.clone()).build()); let mut file_writer = assert_send( SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap(), ); let mut rows: i64 = 0; - for subset in &data { + for subset in data { let mut row_group_writer = file_writer.next_row_group().unwrap(); let col_writer = row_group_writer.next_column().unwrap(); if let Some(mut writer) = col_writer { @@ -1059,7 +1209,8 @@ mod tests { file_writer.close().unwrap(); - let reader = assert_send(SerializedFileReader::new(file).unwrap()); + let encryption_config = encryption_info.map(|(key_info, _)| ParquetEncryptionConfig::new(vec![key_info]).unwrap()); + let reader = assert_send(SerializedFileReader::new_maybe_encrypted(file, &encryption_config).unwrap()); assert_eq!(reader.num_row_groups(), data.len()); assert_eq!( reader.metadata().file_metadata().num_rows(), @@ -1101,6 +1252,11 @@ mod tests { } fn test_bytes_roundtrip(data: Vec>) { + test_bytes_roundtrip_helper(&data, &None); + test_bytes_roundtrip_helper(&data, &Some(ParquetEncryptionKey::generate_key())); + } + + fn test_bytes_roundtrip_helper(data: &Vec>, encryption_key: &Option) { let cursor = InMemoryWriteableCursor::default(); let schema = Arc::new( @@ -1115,13 +1271,15 @@ mod tests { .unwrap(), ); + let encryption_info = encryption_key.map(|key| (ParquetEncryptionKeyInfo{key_id: "a key id".to_string(), key}, generate_random_file_identifier())); + let mut rows: i64 = 0; { - let props = Arc::new(WriterProperties::builder().build()); + let props = Arc::new(WriterProperties::builder().set_encryption_info(encryption_info.clone()).build()); let mut writer = SerializedFileWriter::new(cursor.clone(), schema, props).unwrap(); - for subset in &data { + for subset in data { let mut row_group_writer = writer.next_row_group().unwrap(); let col_writer = row_group_writer.next_column().unwrap(); if let Some(mut writer) = col_writer { @@ -1145,7 +1303,8 @@ mod tests { let buffer = cursor.into_inner().unwrap(); let reading_cursor = crate::file::serialized_reader::SliceableCursor::new(buffer); - let reader = SerializedFileReader::new(reading_cursor).unwrap(); + let encryption_config = encryption_info.map(|(key_info, _)| ParquetEncryptionConfig::new(vec![key_info]).unwrap()); + let reader = SerializedFileReader::new_maybe_encrypted(reading_cursor, &encryption_config).unwrap(); assert_eq!(reader.num_row_groups(), data.len()); assert_eq!(