Skip to content

Commit

Permalink
Handle when a file is encrypted but encryption is disabled or no decr…
Browse files Browse the repository at this point in the history
…yption properties are provided
  • Loading branch information
adamreeve committed Dec 20, 2024
1 parent 9030f3a commit 27e77ad
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 30 deletions.
22 changes: 19 additions & 3 deletions parquet/src/arrow/async_reader/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ impl<F: MetadataFetch> MetadataLoader<F> {
let mut footer = [0; FOOTER_SIZE];
footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);

let length = ParquetMetaDataReader::decode_footer(&footer)?;
let footer = ParquetMetaDataReader::decode_footer_tail(&footer)?;
let length = footer.metadata_length();

if file_size < length + FOOTER_SIZE {
return Err(ParquetError::EOF(format!(
Expand All @@ -127,13 +128,28 @@ impl<F: MetadataFetch> MetadataLoader<F> {
let (metadata, remainder) = if length > suffix_len - FOOTER_SIZE {
let metadata_start = file_size - length - FOOTER_SIZE;
let meta = fetch.fetch(metadata_start..file_size - FOOTER_SIZE).await?;
(ParquetMetaDataReader::decode_metadata(&meta, None)?, None)
(
ParquetMetaDataReader::decode_metadata(
&meta,
#[cfg(feature = "encryption")]
None,
#[cfg(feature = "encryption")]
footer.encrypted_footer(),
)?,
None,
)
} else {
let metadata_start = file_size - length - FOOTER_SIZE - footer_start;

let slice = &suffix[metadata_start..suffix_len - FOOTER_SIZE];
(
ParquetMetaDataReader::decode_metadata(slice, None)?,
ParquetMetaDataReader::decode_metadata(
slice,
#[cfg(feature = "encryption")]
None,
#[cfg(feature = "encryption")]
footer.encrypted_footer(),
)?,
Some((footer_start, suffix.slice(..metadata_start))),
)
};
Expand Down
9 changes: 7 additions & 2 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,8 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
let mut buf = [0_u8; FOOTER_SIZE];
self.read_exact(&mut buf).await?;

let metadata_len = ParquetMetaDataReader::decode_footer(&buf)?;
let footer = ParquetMetaDataReader::decode_footer_tail(&buf)?;
let metadata_len = footer.metadata_length();
self.seek(SeekFrom::End(-FOOTER_SIZE_I64 - metadata_len as i64))
.await?;

Expand All @@ -207,7 +208,11 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {

// todo: use file_decryption_properties
Ok(Arc::new(ParquetMetaDataReader::decode_metadata(
&buf, None,
&buf,
#[cfg(feature = "encryption")]
None,
#[cfg(feature = "encryption")]
footer.encrypted_footer(),
)?))
}
.boxed()
Expand Down
16 changes: 9 additions & 7 deletions parquet/src/file/footer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,13 @@ pub fn parse_metadata<R: ChunkReader>(chunk_reader: &R) -> Result<ParquetMetaDat
///
/// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
#[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader::decode_metadata")]
pub fn decode_metadata(
buf: &[u8],
#[cfg(feature = "encryption")] file_decryption_properties: Option<&FileDecryptionProperties>,
) -> Result<ParquetMetaData> {
pub fn decode_metadata(buf: &[u8]) -> Result<ParquetMetaData> {
ParquetMetaDataReader::decode_metadata(
buf,
#[cfg(feature = "encryption")]
file_decryption_properties,
None,
#[cfg(feature = "encryption")]
false,
)
}

Expand All @@ -81,7 +80,10 @@ pub fn decode_metadata(
/// | len | 'PAR1' |
/// +-----+--------+
/// ```
#[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader::decode_footer")]
#[deprecated(
since = "53.1.0",
note = "Use ParquetMetaDataReader::decode_footer_tail"
)]
pub fn decode_footer(slice: &[u8; FOOTER_SIZE]) -> Result<usize> {
ParquetMetaDataReader::decode_footer(slice)
ParquetMetaDataReader::decode_footer_tail(slice).map(|f| f.metadata_length())
}
98 changes: 80 additions & 18 deletions parquet/src/file/metadata/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,26 @@ pub struct ParquetMetaDataReader {
file_decryption_properties: Option<FileDecryptionProperties>,
}

/// Describes how the footer metadata is stored
///
/// This is parsed from the last 8 bytes of the Parquet file
pub struct FooterTail {
metadata_length: usize,
encrypted_footer: bool,
}

impl FooterTail {
/// The length of the footer metadata in bytes
pub fn metadata_length(&self) -> usize {
self.metadata_length
}

/// Whether the footer metadata is encrypted
pub fn encrypted_footer(&self) -> bool {
self.encrypted_footer
}
}

impl ParquetMetaDataReader {
/// Create a new [`ParquetMetaDataReader`]
pub fn new() -> Self {
Expand Down Expand Up @@ -366,6 +386,7 @@ impl ParquetMetaDataReader {
&mut fetch,
file_size,
self.get_prefetch_size(),
#[cfg(feature = "encryption")]
self.file_decryption_properties.as_ref(),
)
.await?;
Expand Down Expand Up @@ -520,7 +541,8 @@ impl ParquetMetaDataReader {
.get_read(file_size - 8)?
.read_exact(&mut footer)?;

let metadata_len = Self::decode_footer(&footer)?;
let footer = Self::decode_footer_tail(&footer)?;
let metadata_len = footer.metadata_length();
let footer_metadata_len = FOOTER_SIZE + metadata_len;
self.metadata_size = Some(footer_metadata_len);

Expand All @@ -536,6 +558,8 @@ impl ParquetMetaDataReader {
chunk_reader.get_bytes(start, metadata_len)?.as_ref(),
#[cfg(feature = "encryption")]
self.file_decryption_properties.as_ref(),
#[cfg(feature = "encryption")]
footer.encrypted_footer(),
)
}

Expand All @@ -557,7 +581,9 @@ impl ParquetMetaDataReader {
fetch: &mut F,
file_size: usize,
prefetch: usize,
file_decryption_properties: Option<&FileDecryptionProperties>,
#[cfg(feature = "encryption")] file_decryption_properties: Option<
&FileDecryptionProperties,
>,
) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> {
if file_size < FOOTER_SIZE {
return Err(eof_err!("file size of {} is less than footer", file_size));
Expand All @@ -582,7 +608,8 @@ impl ParquetMetaDataReader {
let mut footer = [0; FOOTER_SIZE];
footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);

let length = Self::decode_footer(&footer)?;
let footer = Self::decode_footer_tail(&footer)?;
let length = footer.metadata_length();

if file_size < length + FOOTER_SIZE {
return Err(eof_err!(
Expand All @@ -597,22 +624,34 @@ impl ParquetMetaDataReader {
let metadata_start = file_size - length - FOOTER_SIZE;
let meta = fetch.fetch(metadata_start..file_size - FOOTER_SIZE).await?;
Ok((
Self::decode_metadata(&meta, file_decryption_properties)?,
Self::decode_metadata(
&meta,
#[cfg(feature = "encryption")]
file_decryption_properties,
#[cfg(feature = "encryption")]
footer.encrypted_footer(),
)?,
None,
))
} else {
let metadata_start = file_size - length - FOOTER_SIZE - footer_start;
let slice = &suffix[metadata_start..suffix_len - FOOTER_SIZE];
Ok((
Self::decode_metadata(slice, file_decryption_properties)?,
Self::decode_metadata(
slice,
#[cfg(feature = "encryption")]
file_decryption_properties,
#[cfg(feature = "encryption")]
footer.encrypted_footer(),
)?,
Some((footer_start, suffix.slice(..metadata_start))),
))
}
}

/// Decodes the Parquet footer returning the metadata length in bytes
/// Decodes the end of the Parquet footer
///
/// A parquet footer is 8 bytes long and has the following layout:
/// There are 8 bytes at the end of the Parquet footer with the following layout:
/// * 4 bytes for the metadata length
/// * 4 bytes for the magic bytes 'PAR1' or 'PARE' (encrypted footer)
///
Expand All @@ -621,16 +660,28 @@ impl ParquetMetaDataReader {
/// | len | 'PAR1' or 'PARE' |
/// +-----+------------------+
/// ```
pub fn decode_footer(slice: &[u8; FOOTER_SIZE]) -> Result<usize> {
// check this is indeed a parquet file
if slice[4..] != PARQUET_MAGIC && slice[4..] != PARQUET_MAGIC_ENCR_FOOTER {
pub fn decode_footer_tail(slice: &[u8; FOOTER_SIZE]) -> Result<FooterTail> {
let magic = &slice[4..];
let encrypted_footer = if magic == PARQUET_MAGIC_ENCR_FOOTER {
true
} else if magic == PARQUET_MAGIC {
false
} else {
return Err(general_err!("Invalid Parquet file. Corrupt footer"));
}

};
// get the metadata length from the footer
let metadata_len = u32::from_le_bytes(slice[..4].try_into().unwrap());
// u32 won't be larger than usize in most cases
Ok(metadata_len as usize)
Ok(FooterTail {
// u32 won't be larger than usize in most cases
metadata_length: metadata_len as usize,
encrypted_footer,
})
}

/// Decodes the Parquet footer, returning the metadata length in bytes
#[deprecated(note = "use decode_footer_tail instead")]
pub fn decode_footer(slice: &[u8; FOOTER_SIZE]) -> Result<usize> {
Self::decode_footer_tail(slice).map(|f| f.metadata_length)
}

/// Decodes [`ParquetMetaData`] from the provided bytes.
Expand All @@ -645,18 +696,29 @@ impl ParquetMetaDataReader {
#[cfg(feature = "encryption")] file_decryption_properties: Option<
&FileDecryptionProperties,
>,
#[cfg(feature = "encryption")] encrypted_footer: bool,
) -> Result<ParquetMetaData> {
let mut prot = TCompactSliceInputProtocol::new(buf);

#[cfg(not(feature = "encryption"))]
if encrypted_footer() {
return Err(general_err!(
"Parquet file has an encrypted footer but the encryption feature is disabled"
));
}

#[cfg(feature = "encryption")]
let mut file_decryptor = None;
#[cfg(feature = "encryption")]
let decrypted_fmd_buf;

#[cfg(feature = "encryption")]
if file_decryption_properties.is_some()
&& file_decryption_properties.unwrap().has_footer_key()
{
if encrypted_footer {
if file_decryption_properties.is_none() {
return Err(general_err!("Parquet file has an encrypted footer but no decryption properties were provided"));
};
let file_decryption_properties = file_decryption_properties.unwrap();

let t_file_crypto_metadata: TFileCryptoMetaData =
TFileCryptoMetaData::read_from_in_protocol(&mut prot)
.map_err(|e| general_err!("Could not parse crypto metadata: {}", e))?;
Expand All @@ -678,7 +740,7 @@ impl ParquetMetaDataReader {
let aad_prefix: Vec<u8> = aes_gcm_algo.aad_prefix.unwrap_or_default();

file_decryptor = Some(FileDecryptor::new(
file_decryption_properties.unwrap(),
file_decryption_properties,
aad_file_unique.clone(),
aad_prefix.clone(),
));
Expand Down

0 comments on commit 27e77ad

Please sign in to comment.