Skip to content

Commit

Permalink
work
Browse files Browse the repository at this point in the history
  • Loading branch information
rok committed Dec 16, 2024
1 parent 9d17990 commit f90d8b4
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 62 deletions.
22 changes: 19 additions & 3 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
use std::collections::VecDeque;
use std::sync::Arc;

use num::ToPrimitive;
use arrow_array::cast::AsArray;
use arrow_array::Array;
use arrow_array::{RecordBatch, RecordBatchReader};
Expand All @@ -42,7 +42,7 @@ mod filter;
mod selection;
pub mod statistics;

use crate::encryption::ciphers::FileDecryptionProperties;
use crate::encryption::ciphers::{CryptoContext, FileDecryptionProperties};

/// Builder for constructing parquet readers into arrow.
///
Expand Down Expand Up @@ -695,7 +695,18 @@ impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
let total_rows = rg.num_rows() as usize;
let reader = self.reader.clone();

let ret = SerializedPageReader::new(reader, meta, total_rows, page_locations);
let file_decryptor = Arc::new(self.metadata.file_decryptor().clone().unwrap());
// let aad_file_unique = file_decryptor?.aad_file_unique();
// let aad_prefix = file_decryptor?.aad_prefix();
//
// let file_decryptor = FileDecryptor::new(file_decryptor, aad_file_unique.clone(), aad_prefix.clone());

let crypto_context = CryptoContext::new(
meta.dictionary_page_offset().is_some(), rg_idx.to_i16()?, self.column_idx.to_i16()?, file_decryptor.clone(), file_decryptor);
let crypto_context = Arc::new(crypto_context);

let ret = SerializedPageReader::new(reader, meta, total_rows, page_locations, Some(crypto_context));
// let ret = SerializedPageReader::new(reader, meta, total_rows, page_locations);
Some(ret.map(|x| Box::new(x) as _))
}
}
Expand Down Expand Up @@ -1728,6 +1739,11 @@ mod tests {
});

// todo: decrypting data
let decryption_properties = Some(
ciphers::FileDecryptionProperties::builder()
.with_footer_key(key_code.to_vec())
.build(),
);
let record_reader =
ParquetRecordBatchReader::try_new_with_decryption(file, 128, decryption_properties.as_ref())
.unwrap();
Expand Down
1 change: 1 addition & 0 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -842,6 +842,7 @@ impl RowGroups for InMemoryRowGroup<'_> {
self.metadata.column(i),
self.row_count,
page_locations,
None,
)?);

Ok(Box::new(ColumnChunkIterator {
Expand Down
84 changes: 62 additions & 22 deletions parquet/src/encryption/ciphers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! Encryption implementation specific to Parquet, as described
//! in the [spec](https://github.com/apache/parquet-format/blob/master/Encryption.md).
use std::sync::Arc;
use ring::aead::{Aad, LessSafeKey, NonceSequence, UnboundKey, AES_128_GCM};
use ring::rand::{SecureRandom, SystemRandom};
use crate::errors::{ParquetError, Result};
Expand Down Expand Up @@ -172,8 +173,12 @@ pub fn create_footer_aad(file_aad: &[u8]) -> Result<Vec<u8>> {
create_module_aad(file_aad, ModuleType::Footer, -1, -1, -1)
}

fn create_module_aad(file_aad: &[u8], module_type: ModuleType, row_group_ordinal: i32,
column_ordinal: i32, page_ordinal: i32) -> Result<Vec<u8>> {
pub fn create_page_aad(file_aad: &[u8], module_type: ModuleType, row_group_ordinal: i16, column_ordinal: i16, page_ordinal: i32) -> Result<Vec<u8>> {
create_module_aad(file_aad, module_type, row_group_ordinal, column_ordinal, page_ordinal)
}

fn create_module_aad(file_aad: &[u8], module_type: ModuleType, row_group_ordinal: i16,
column_ordinal: i16, page_ordinal: i32) -> Result<Vec<u8>> {

let module_buf = [module_type as u8];

Expand All @@ -187,15 +192,15 @@ fn create_module_aad(file_aad: &[u8], module_type: ModuleType, row_group_ordinal
if row_group_ordinal < 0 {
return Err(general_err!("Wrong row group ordinal: {}", row_group_ordinal));
}
if row_group_ordinal > u16::MAX as i32 {
if row_group_ordinal > i16::MAX {
return Err(general_err!("Encrypted parquet files can't have more than {} row groups: {}",
u16::MAX, row_group_ordinal));
}

if column_ordinal < 0 {
return Err(general_err!("Wrong column ordinal: {}", column_ordinal));
}
if column_ordinal > u16::MAX as i32 {
if column_ordinal > i16::MAX {
return Err(general_err!("Encrypted parquet files can't have more than {} columns: {}",
u16::MAX, column_ordinal));
}
Expand All @@ -205,25 +210,25 @@ fn create_module_aad(file_aad: &[u8], module_type: ModuleType, row_group_ordinal
let mut aad = Vec::with_capacity(file_aad.len() + 5);
aad.extend_from_slice(file_aad);
aad.extend_from_slice(module_buf.as_ref());
aad.extend_from_slice((row_group_ordinal as u16).to_le_bytes().as_ref());
aad.extend_from_slice((column_ordinal as u16).to_le_bytes().as_ref());
aad.extend_from_slice((row_group_ordinal as i16).to_le_bytes().as_ref());
aad.extend_from_slice((column_ordinal as i16).to_le_bytes().as_ref());
return Ok(aad)
}

if page_ordinal < 0 {
return Err(general_err!("Wrong column ordinal: {}", page_ordinal));
return Err(general_err!("Wrong page ordinal: {}", page_ordinal));
}
if page_ordinal > u16::MAX as i32 {
if page_ordinal > i32::MAX {
return Err(general_err!("Encrypted parquet files can't have more than {} pages in a chunk: {}",
u16::MAX, page_ordinal));
}

let mut aad = Vec::with_capacity(file_aad.len() + 7);
aad.extend_from_slice(file_aad);
aad.extend_from_slice(module_buf.as_ref());
aad.extend_from_slice((row_group_ordinal as u16).to_le_bytes().as_ref());
aad.extend_from_slice((column_ordinal as u16).to_le_bytes().as_ref());
aad.extend_from_slice((page_ordinal as u16).to_le_bytes().as_ref());
aad.extend_from_slice(row_group_ordinal.to_le_bytes().as_ref());
aad.extend_from_slice(column_ordinal.to_le_bytes().as_ref());
aad.extend_from_slice(page_ordinal.to_le_bytes().as_ref());
Ok(aad)
}

Expand Down Expand Up @@ -266,7 +271,9 @@ impl DecryptionPropertiesBuilder {
pub struct FileDecryptor {
decryption_properties: FileDecryptionProperties,
// todo decr: change to BlockDecryptor
footer_decryptor: RingGcmBlockDecryptor
footer_decryptor: RingGcmBlockDecryptor,
aad_file_unique: Vec<u8>,
aad_prefix: Vec<u8>,
}

impl PartialEq for FileDecryptor {
Expand All @@ -276,30 +283,63 @@ impl PartialEq for FileDecryptor {
}

impl FileDecryptor {
pub(crate) fn new(decryption_properties: &FileDecryptionProperties) -> Self {
pub(crate) fn new(decryption_properties: &FileDecryptionProperties, aad_file_unique: Vec<u8>, aad_prefix: Vec<u8>) -> Self {
Self {
// todo decr: if no key available yet (not set in properties, will be retrieved from metadata)
footer_decryptor: RingGcmBlockDecryptor::new(decryption_properties.footer_key.clone().unwrap().as_ref()),
decryption_properties: decryption_properties.clone()
decryption_properties: decryption_properties.clone(),
aad_file_unique,
aad_prefix,
}
}

// todo decr: change to BlockDecryptor
pub(crate) fn get_footer_decryptor(self) -> RingGcmBlockDecryptor {
self.footer_decryptor
}

pub(crate) fn decryption_properties(&self) -> &FileDecryptionProperties {
&self.decryption_properties
}

pub(crate) fn footer_decryptor(&self) -> RingGcmBlockDecryptor {
self.footer_decryptor.clone()
}

pub(crate) fn aad_file_unique(&self) -> &Vec<u8> {
&self.aad_file_unique
}

pub(crate) fn aad_prefix(&self) -> &Vec<u8> {
&self.aad_prefix
}
}

#[derive(Debug, Clone)]
pub struct CryptoContext {
row_group_ordinal: i32,
column_ordinal: i32,
metadata_decryptor: FileDecryptor,
data_decryptor: FileDecryptor,
file_decryption_properties: FileDecryptionProperties,
aad: Vec<u8>,
pub(crate) start_decrypt_with_dictionary_page: bool,
pub(crate) row_group_ordinal: i16,
pub(crate) column_ordinal: i16,
pub(crate) data_decryptor: Arc<FileDecryptor>,
pub(crate) metadata_decryptor: Arc<FileDecryptor>,

}

impl CryptoContext {
pub fn data_decryptor(self) -> FileDecryptor { self.data_decryptor }
pub fn file_decryption_properties(&self) -> &FileDecryptionProperties { &self.file_decryption_properties }
pub fn new(start_decrypt_with_dictionary_page: bool, row_group_ordinal: i16,
column_ordinal: i16, data_decryptor: Arc<FileDecryptor>,
metadata_decryptor: Arc<FileDecryptor>) -> Self {
Self {
start_decrypt_with_dictionary_page,
row_group_ordinal,
column_ordinal,
data_decryptor,
metadata_decryptor,
}
}
pub fn start_decrypt_with_dictionary_page(&self) -> &bool { &self.start_decrypt_with_dictionary_page }
pub fn row_group_ordinal(&self) -> &i16 { &self.row_group_ordinal }
pub fn column_ordinal(&self) -> &i16 { &self.column_ordinal }
pub fn data_decryptor(&self) -> Arc<FileDecryptor> { self.data_decryptor.clone()}
pub fn metadata_decryptor(&self) -> Arc<FileDecryptor> { self.metadata_decryptor.clone() }
}
7 changes: 7 additions & 0 deletions parquet/src/file/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,13 @@ impl ParquetMetaData {
&self.file_metadata
}

/// Returns file decryptor as reference.
pub fn file_decryptor(&self) -> &Option<FileDecryptor> {
&self.file_decryptor
}



/// Returns number of row groups in this file.
pub fn num_row_groups(&self) -> usize {
self.row_groups.len()
Expand Down
16 changes: 11 additions & 5 deletions parquet/src/file/metadata/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -659,13 +659,19 @@ impl ParquetMetaDataReader {
// todo decr: get key_metadata

// remaining buffer contains encrypted FileMetaData
file_decryptor = Some(FileDecryptor::new(file_decryption_properties));
let decryptor = file_decryptor.clone().unwrap().get_footer_decryptor();

// todo decr: get aad_prefix
// todo decr: set both aad_prefix and aad_file_unique in file_decryptor
let fmd_aad = create_footer_aad(aes_gcm_algo.aad_file_unique.unwrap().as_ref());
let aad_file_unique = aes_gcm_algo.aad_file_unique.unwrap();
let aad_footer = create_footer_aad(aad_file_unique.as_ref())?;
let aad_prefix : Vec<u8> = aes_gcm_algo.aad_prefix.unwrap_or_default();

file_decryptor = Some(FileDecryptor::new(file_decryption_properties, aad_file_unique.clone(), aad_prefix.clone()));
let decryptor = file_decryptor.clone().unwrap().get_footer_decryptor();
// file_decryptor = Some(FileDecryptor::new(file_decryption_properties, aad, aad_prefix));

decrypted_fmd_buf =
decryptor.decrypt(prot.as_slice().as_ref(), fmd_aad?.as_ref());
decryptor.decrypt(prot.as_slice().as_ref(), aad_footer.as_ref());
prot = TCompactSliceInputProtocol::new(decrypted_fmd_buf.as_ref());
}

Expand Down Expand Up @@ -694,7 +700,7 @@ impl ParquetMetaDataReader {
schema_descr,
column_orders,
);
Ok(ParquetMetaData::new(file_metadata, row_groups, file_decryptor))
Ok(ParquetMetaData::new(file_metadata, row_groups, Some(file_decryptor.unwrap())))
}

/// Parses column orders from Thrift definition.
Expand Down
70 changes: 38 additions & 32 deletions parquet/src/file/serialized_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@ use crate::record::reader::RowIter;
use crate::record::Row;
use crate::schema::types::Type as SchemaType;
use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
use bytes::{Buf, Bytes};
use thrift::protocol::TCompactInputProtocol;
use crate::encryption::ciphers::{create_footer_aad, BlockDecryptor, CryptoContext, FileDecryptionProperties, FileDecryptor, RingGcmBlockDecryptor};
use bytes::Bytes;
use thrift::protocol::{TCompactInputProtocol, TInputProtocol};
use zstd::zstd_safe::WriteBuf;
use crate::data_type::AsBytes;
use crate::encryption::ciphers::{create_page_aad, BlockDecryptor, CryptoContext, FileDecryptionProperties, ModuleType};

impl TryFrom<File> for SerializedFileReader<File> {
type Error = ParquetError;
Expand Down Expand Up @@ -339,37 +341,40 @@ impl<R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<'_, R
}

/// Reads a [`PageHeader`] from the provided [`Read`]
pub(crate) fn read_page_header<T: Read>(input: &mut T, crypto_context: Option<&CryptoContext>) -> Result<PageHeader> {
let buf = &mut [];
let size = input.read(buf)?;

// todo: decrypt buffer
let mut prot = TCompactSliceInputProtocol::new(buf.as_slice());
let t_file_crypto_metadata: TFileCryptoMetaData =
TFileCryptoMetaData::read_from_in_protocol(&mut prot)
.map_err(|e| general_err!("Could not parse crypto metadata: {}", e))?;

let file_decryption_properties = crypto_context.unwrap().file_decryption_properties();
let file_decryptor = FileDecryptor::new(file_decryption_properties);

// let fmd_aad = create_footer_aad(aes_gcm_algo.aad_file_unique.unwrap().as_ref());
let algo = t_file_crypto_metadata.encryption_algorithm;
let aes_gcm_algo = if let EncryptionAlgorithm::AESGCMV1(a) = algo {
a
} else {
unreachable!()
}; // todo decr: add support for GCMCTRV1
let fmd_aad = create_footer_aad(aes_gcm_algo.aad_file_unique.unwrap().as_ref());
let buf2 = file_decryptor.get_footer_decryptor().decrypt(prot.as_slice().as_ref(), fmd_aad?.as_ref());

let mut prot = TCompactInputProtocol::new(buf2.reader());
pub(crate) fn read_page_header<T: Read>(input: &mut T, crypto_context: Option<Arc<CryptoContext>>) -> Result<PageHeader> {

if let Some(crypto_context) = crypto_context {
let mut buf = [0; 16 * 1024];
let size = input.read(&mut buf)?;

let decryptor = &crypto_context.data_decryptor();
let file_decryptor = decryptor.footer_decryptor();
let aad_file_unique = decryptor.aad_file_unique();
// let aad_prefix = decryptor.aad_prefix();

let aad = create_page_aad(
aad_file_unique.as_slice(),
ModuleType::DictionaryPageHeader,
crypto_context.row_group_ordinal,
crypto_context.column_ordinal,
0,
)?;

// todo: This currently fails, possibly due to wrongly generated AAD
let buf = file_decryptor.decrypt(buf[4..].as_slice(), aad.as_ref());
todo!("Decrypted page header!");
let mut prot = TCompactSliceInputProtocol::new(buf.as_slice());
let page_header = PageHeader::read_from_in_protocol(&mut prot)?;
return Ok(page_header)
}

let mut prot = TCompactInputProtocol::new(input);
let page_header = PageHeader::read_from_in_protocol(&mut prot)?;
Ok(page_header)
}

/// Reads a [`PageHeader`] from the provided [`Read`] returning the number of bytes read
fn read_page_header_len<T: Read>(input: &mut T, crypto_context: Option<&CryptoContext>) -> Result<(usize, PageHeader)> {
fn read_page_header_len<T: Read>(input: &mut T, crypto_context: Option<Arc<CryptoContext>>) -> Result<(usize, PageHeader)> {
/// A wrapper around a [`std::io::Read`] that keeps track of the bytes read
struct TrackedRead<R> {
inner: R,
Expand Down Expand Up @@ -538,7 +543,7 @@ pub struct SerializedPageReader<R: ChunkReader> {
state: SerializedPageReaderState,

/// Crypto context
crypto_context: Option<&'static CryptoContext>,
crypto_context: Option<Arc<CryptoContext>>,
}

impl<R: ChunkReader> SerializedPageReader<R> {
Expand All @@ -548,9 +553,10 @@ impl<R: ChunkReader> SerializedPageReader<R> {
meta: &ColumnChunkMetaData,
total_rows: usize,
page_locations: Option<Vec<PageLocation>>,
crypto_context: Option<Arc<CryptoContext>>,
) -> Result<Self> {
let props = Arc::new(ReaderProperties::builder().build());
SerializedPageReader::new_with_properties(reader, meta, total_rows, page_locations, props, None)
SerializedPageReader::new_with_properties(reader, meta, total_rows, page_locations, props, crypto_context)
}

/// Creates a new serialized page with custom options.
Expand All @@ -560,7 +566,7 @@ impl<R: ChunkReader> SerializedPageReader<R> {
total_rows: usize,
page_locations: Option<Vec<PageLocation>>,
props: ReaderPropertiesPtr,
crypto_context: Option<&'static CryptoContext>,
crypto_context: Option<Arc<CryptoContext>>,
) -> Result<Self> {
let decompressor = create_codec(meta.compression(), props.codec_options())?;
let (start, len) = meta.byte_range();
Expand Down Expand Up @@ -648,7 +654,7 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
let header = if let Some(header) = next_page_header.take() {
*header
} else {
let (header_len, header) = read_page_header_len(&mut read, self.crypto_context)?;
let (header_len, header) = read_page_header_len(&mut read, self.crypto_context.clone())?;
*offset += header_len;
*remaining -= header_len;
header
Expand Down

0 comments on commit f90d8b4

Please sign in to comment.