Skip to content

Commit

Permalink
Tidy up ordinal types
Browse files Browse the repository at this point in the history
  • Loading branch information
adamreeve committed Dec 18, 2024
1 parent 7f27a97 commit 56419be
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 42 deletions.
2 changes: 1 addition & 1 deletion parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -698,7 +698,7 @@ impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
let file_decryptor = Arc::new(self.metadata.file_decryptor().clone().unwrap());

let crypto_context = CryptoContext::new(
rg_idx as i16, self.column_idx as i16, file_decryptor.clone(), file_decryptor);
rg_idx, self.column_idx, file_decryptor.clone(), file_decryptor);
let crypto_context = Arc::new(crypto_context);

let ret = SerializedPageReader::new(reader, meta, total_rows, page_locations, Some(crypto_context));
Expand Down
46 changes: 18 additions & 28 deletions parquet/src/encryption/ciphers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,15 +170,15 @@ pub(crate) enum ModuleType {
}

pub fn create_footer_aad(file_aad: &[u8]) -> Result<Vec<u8>> {
create_module_aad(file_aad, ModuleType::Footer, -1, -1, None)
create_module_aad(file_aad, ModuleType::Footer, 0, 0, None)
}

pub fn create_page_aad(file_aad: &[u8], module_type: ModuleType, row_group_ordinal: i16, column_ordinal: i16, page_ordinal: Option<i16>) -> Result<Vec<u8>> {
pub fn create_page_aad(file_aad: &[u8], module_type: ModuleType, row_group_ordinal: usize, column_ordinal: usize, page_ordinal: Option<usize>) -> Result<Vec<u8>> {
create_module_aad(file_aad, module_type, row_group_ordinal, column_ordinal, page_ordinal)
}

pub fn create_module_aad(file_aad: &[u8], module_type: ModuleType, row_group_ordinal: i16,
column_ordinal: i16, page_ordinal: Option<i16>) -> Result<Vec<u8>> {
fn create_module_aad(file_aad: &[u8], module_type: ModuleType, row_group_ordinal: usize,
column_ordinal: usize, page_ordinal: Option<usize>) -> Result<Vec<u8>> {

let module_buf = [module_type as u8];

Expand All @@ -189,19 +189,11 @@ pub fn create_module_aad(file_aad: &[u8], module_type: ModuleType, row_group_ord
return Ok(aad)
}

if row_group_ordinal < 0 {
return Err(general_err!("Wrong row group ordinal: {}", row_group_ordinal));
}
// todo: this check is a noop here
if row_group_ordinal > i16::MAX {
if row_group_ordinal > i16::MAX as usize {
return Err(general_err!("Encrypted parquet files can't have more than {} row groups: {}",
i16::MAX, row_group_ordinal));
}
if column_ordinal < 0 {
return Err(general_err!("Wrong column ordinal: {}", column_ordinal));
}
// todo: this check is a noop here
if column_ordinal > i16::MAX {
if column_ordinal > i16::MAX as usize {
return Err(general_err!("Encrypted parquet files can't have more than {} columns: {}",
i16::MAX, column_ordinal));
}
Expand All @@ -219,16 +211,17 @@ pub fn create_module_aad(file_aad: &[u8], module_type: ModuleType, row_group_ord
let page_ordinal = page_ordinal.ok_or_else(|| general_err!(
"Page ordinal must be set for data pages"))?;

if page_ordinal < 0 {
return Err(general_err!("Wrong page ordinal: {}", page_ordinal));
if page_ordinal > i16::MAX as usize {
return Err(general_err!("Encrypted parquet files can't have more than {} pages per column chunk: {}",
i16::MAX, page_ordinal));
}

let mut aad = Vec::with_capacity(file_aad.len() + 7);
aad.extend_from_slice(file_aad);
aad.extend_from_slice(module_buf.as_ref());
aad.extend_from_slice(row_group_ordinal.to_le_bytes().as_ref());
aad.extend_from_slice(column_ordinal.to_le_bytes().as_ref());
aad.extend_from_slice(page_ordinal.to_le_bytes().as_ref());
aad.extend_from_slice((row_group_ordinal as i16).to_le_bytes().as_ref());
aad.extend_from_slice((column_ordinal as i16).to_le_bytes().as_ref());
aad.extend_from_slice((page_ordinal as i16).to_le_bytes().as_ref());
Ok(aad)
}

Expand Down Expand Up @@ -317,17 +310,17 @@ impl FileDecryptor {

#[derive(Debug, Clone)]
pub struct CryptoContext {
pub(crate) row_group_ordinal: i16,
pub(crate) column_ordinal: i16,
pub(crate) page_ordinal: Option<i16>,
pub(crate) row_group_ordinal: usize,
pub(crate) column_ordinal: usize,
pub(crate) page_ordinal: Option<usize>,
pub(crate) dictionary_page: bool,
pub(crate) data_decryptor: Arc<FileDecryptor>,
pub(crate) metadata_decryptor: Arc<FileDecryptor>,
}

impl CryptoContext {
pub fn new(row_group_ordinal: i16,
column_ordinal: i16, data_decryptor: Arc<FileDecryptor>,
pub fn new(row_group_ordinal: usize,
column_ordinal: usize, data_decryptor: Arc<FileDecryptor>,
metadata_decryptor: Arc<FileDecryptor>) -> Self {
Self {
row_group_ordinal,
Expand All @@ -339,7 +332,7 @@ impl CryptoContext {
}
}

pub fn with_page_ordinal(&self, page_ordinal: i16) -> Self {
pub fn with_page_ordinal(&self, page_ordinal: usize) -> Self {
Self {
row_group_ordinal: self.row_group_ordinal,
column_ordinal: self.column_ordinal,
Expand All @@ -361,9 +354,6 @@ impl CryptoContext {
}
}

pub fn row_group_ordinal(&self) -> &i16 { &self.row_group_ordinal }
pub fn column_ordinal(&self) -> &i16 { &self.column_ordinal }
pub fn page_ordinal(&self) -> &Option<i16> { &self.page_ordinal }
pub fn data_decryptor(&self) -> Arc<FileDecryptor> { self.data_decryptor.clone()}
pub fn metadata_decryptor(&self) -> Arc<FileDecryptor> { self.metadata_decryptor.clone() }
}
21 changes: 8 additions & 13 deletions parquet/src/file/serialized_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,14 @@ use crate::file::{
reader::*,
statistics,
};
use crate::format::{PageHeader, PageLocation, PageType, FileCryptoMetaData as TFileCryptoMetaData, EncryptionAlgorithm};
use crate::format::{PageHeader, PageLocation, PageType};
use crate::record::reader::RowIter;
use crate::record::Row;
use crate::schema::types::Type as SchemaType;
use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
use bytes::Bytes;
use num::ToPrimitive;
use thrift::protocol::{TCompactInputProtocol, TInputProtocol};
use zstd::zstd_safe::WriteBuf;
use crate::data_type::AsBytes;
use crate::encryption::ciphers::{create_page_aad, BlockDecryptor, CryptoContext, FileDecryptionProperties, ModuleType};
use thrift::protocol::TCompactInputProtocol;
use crate::encryption::ciphers::{create_page_aad, BlockDecryptor, CryptoContext, ModuleType};

impl TryFrom<File> for SerializedFileReader<File> {
type Error = ParquetError;
Expand Down Expand Up @@ -857,14 +854,12 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
}

fn page_crypto_context(crypto_context: &Option<Arc<CryptoContext>>, page_ordinal: usize, dictionary_page: bool) -> Result<Option<Arc<CryptoContext>>> {
let page_ordinal = page_ordinal
.to_i16()
.ok_or_else(|| general_err!(
"Page ordinal {} is greater than the maximum allowed in encrypted Parquet files ({})",
page_ordinal, i16::MAX))?;

Ok(crypto_context.as_ref().map(
|c| Arc::new(if dictionary_page { c.for_dictionary_page() } else { c.with_page_ordinal(page_ordinal) })))
|c| Arc::new(if dictionary_page {
c.for_dictionary_page()
} else {
c.with_page_ordinal(page_ordinal)
})))
}

#[cfg(test)]
Expand Down

0 comments on commit 56419be

Please sign in to comment.