diff --git a/Cargo.toml b/Cargo.toml index 71809fdb7..9a76ff416 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -98,3 +98,4 @@ uuid = { version = "1.6.1", features = ["v7"] } volo-thrift = "0.10" hive_metastore = "0.1" tera = "1" +zstd = "0.13.2" diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 1307cc6f3..f2e6694bc 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -81,6 +81,7 @@ tokio = { workspace = true, optional = true } typed-builder = { workspace = true } url = { workspace = true } uuid = { workspace = true } +zstd = { workspace = true } [dev-dependencies] ctor = { workspace = true } diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index 72cf18d4b..d2ec20348 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -84,3 +84,5 @@ mod runtime; pub mod arrow; mod utils; pub mod writer; + +pub mod puffin; diff --git a/crates/iceberg/src/puffin/blob.rs b/crates/iceberg/src/puffin/blob.rs new file mode 100644 index 000000000..d714987eb --- /dev/null +++ b/crates/iceberg/src/puffin/blob.rs @@ -0,0 +1,38 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; + +/// A serialized form of a "compact" Theta sketch produced by the Apache DataSketches library. +pub const APACHE_DATASKETCHES_THETA_V1: &str = "apache-datasketches-theta-v1"; + +/// The blob +#[derive(Debug, PartialEq, Clone)] +pub struct Blob { + /// See blob types: https://iceberg.apache.org/puffin-spec/#blob-types + pub r#type: String, + /// List of field IDs the blob was computed for; the order of items is used to compute sketches stored in the blob. + pub input_fields: Vec, + /// ID of the Iceberg table's snapshot the blob was computed from + pub snapshot_id: i64, + /// Sequence number of the Iceberg table's snapshot the blob was computed from + pub sequence_number: i64, + /// The actual blob data + pub data: Vec, + /// Arbitrary meta-information about the blob + pub properties: HashMap, +} diff --git a/crates/iceberg/src/puffin/compression.rs b/crates/iceberg/src/puffin/compression.rs new file mode 100644 index 000000000..652e8974e --- /dev/null +++ b/crates/iceberg/src/puffin/compression.rs @@ -0,0 +1,123 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use serde::{Deserialize, Serialize}; + +use crate::{Error, ErrorKind, Result}; + +#[derive(Debug, PartialEq, Eq, Clone, Copy, Default, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +/// Data compression formats +pub enum CompressionCodec { + #[default] + /// No compression + None, + /// LZ4 single compression frame with content size present + Lz4, + /// Zstandard single compression frame with content size present + Zstd, +} + +impl CompressionCodec { + pub(crate) fn decompress(&self, bytes: Vec) -> Result> { + match self { + CompressionCodec::None => Ok(bytes), + CompressionCodec::Lz4 => Err(Error::new( + ErrorKind::FeatureUnsupported, + "LZ4 decompression is not supported currently", + )), + CompressionCodec::Zstd => { + let decompressed = zstd::stream::decode_all(&bytes[..])?; + Ok(decompressed) + } + } + } + + pub(crate) fn compress(&self, bytes: Vec) -> Result> { + match self { + CompressionCodec::None => Ok(bytes), + CompressionCodec::Lz4 => Err(Error::new( + ErrorKind::FeatureUnsupported, + "LZ4 compression is not supported currently", + )), + CompressionCodec::Zstd => { + let writer = Vec::::new(); + let mut encoder = zstd::stream::Encoder::new(writer, 3)?; + encoder.include_checksum(true)?; + encoder.set_pledged_src_size(Some(bytes.len().try_into()?))?; + std::io::copy(&mut &bytes[..], &mut encoder)?; + let compressed = encoder.finish()?; + Ok(compressed) + } + } + } + + pub(crate) fn is_none(&self) -> bool { + matches!(self, CompressionCodec::None) + } +} + +#[cfg(test)] +mod tests { + use crate::puffin::compression::CompressionCodec; + + #[tokio::test] + async fn test_compression_codec_none() { + let compression_codec = CompressionCodec::None; + let bytes_vec = [0_u8; 100].to_vec(); + + let compressed = compression_codec.compress(bytes_vec.clone()).unwrap(); + assert_eq!(bytes_vec, compressed); + + let decompressed = compression_codec.decompress(compressed.clone()).unwrap(); + assert_eq!(compressed, decompressed) + } + + #[tokio::test] + async fn test_compression_codec_lz4() { + let compression_codec = CompressionCodec::Lz4; + let bytes_vec = [0_u8; 100].to_vec(); + + assert_eq!( + compression_codec + .compress(bytes_vec.clone()) + .unwrap_err() + .to_string(), + "FeatureUnsupported => LZ4 compression is not supported currently", + ); + + assert_eq!( + compression_codec + .decompress(bytes_vec.clone()) + .unwrap_err() + .to_string(), + "FeatureUnsupported => LZ4 decompression is not supported currently", + ) + } + + #[tokio::test] + async fn test_compression_codec_zstd() { + let compression_codec = CompressionCodec::Zstd; + let bytes_vec = [0_u8; 100].to_vec(); + + let compressed = compression_codec.compress(bytes_vec.clone()).unwrap(); + assert!(compressed.len() < bytes_vec.len()); + + let decompressed = compression_codec.decompress(compressed.clone()).unwrap(); + assert_eq!(decompressed, bytes_vec) + } +} diff --git a/crates/iceberg/src/puffin/lib.rs b/crates/iceberg/src/puffin/lib.rs new file mode 100644 index 000000000..c96c596d2 --- /dev/null +++ b/crates/iceberg/src/puffin/lib.rs @@ -0,0 +1,38 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Iceberg Puffin implementation. + +#![deny(missing_docs)] + +mod blob; +pub use blob::{Blob, APACHE_DATASKETCHES_THETA_V1}; + +mod compression; +pub use compression::CompressionCodec; + +mod metadata; +pub use metadata::{BlobMetadata, FileMetadata, CREATED_BY_PROPERTY}; + +mod reader; +pub use reader::PuffinReader; + +#[cfg(test)] +mod test_utils; + +mod writer; +pub use writer::PuffinWriter; diff --git a/crates/iceberg/src/puffin/metadata.rs b/crates/iceberg/src/puffin/metadata.rs new file mode 100644 index 000000000..f4025d526 --- /dev/null +++ b/crates/iceberg/src/puffin/metadata.rs @@ -0,0 +1,809 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::{HashMap, HashSet}; + +use bytes::Bytes; +use once_cell::sync::Lazy; +use serde::{Deserialize, Serialize}; + +use crate::io::{FileRead, InputFile}; +use crate::puffin::compression::CompressionCodec; +use crate::{Error, ErrorKind, Result}; + +/// Human-readable identification of the application writing the file, along with its version. +/// Example: "Trino version 381" +pub const CREATED_BY_PROPERTY: &str = "created-by"; + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)] +#[serde(rename_all = "kebab-case")] +/// Metadata about a blob. +/// For more information, see: https://iceberg.apache.org/puffin-spec/#blobmetadata +pub struct BlobMetadata { + /// See blob types: https://iceberg.apache.org/puffin-spec/#blob-types + pub(crate) r#type: String, + /// List of field IDs the blob was computed for; the order of items is used to compute sketches stored in the blob. + #[serde(rename = "fields")] + pub(crate) input_fields: Vec, + /// ID of the Iceberg table's snapshot the blob was computed from + pub(crate) snapshot_id: i64, + /// Sequence number of the Iceberg table's snapshot the blob was computed from + pub(crate) sequence_number: i64, + /// The offset in the file where the blob contents start + pub(crate) offset: u64, + /// The length of the blob stored in the file (after compression, if compressed) + pub(crate) length: usize, + /// The compression codec used to compress the data + #[serde(skip_serializing_if = "CompressionCodec::is_none")] + #[serde(default)] + pub(crate) compression_codec: CompressionCodec, + /// Arbitrary meta-information about the blob + #[serde(skip_serializing_if = "HashMap::is_empty")] + #[serde(default)] + pub(crate) properties: HashMap, +} + +#[derive(Clone, PartialEq, Eq, Hash, Debug)] +pub(crate) enum Flag { + FooterPayloadCompressed, +} + +#[derive(PartialEq, Eq, Hash)] +pub(crate) struct ByteNumber(pub u8); + +#[derive(PartialEq, Eq, Hash)] +pub(crate) struct BitNumber(pub u8); + +static FLAGS_BY_BYTE_AND_BIT: Lazy> = Lazy::new(|| { + let mut m = HashMap::new(); + m.insert( + ( + Flag::FooterPayloadCompressed.byte_number(), + Flag::FooterPayloadCompressed.bit_number(), + ), + Flag::FooterPayloadCompressed, + ); + m +}); + +impl Flag { + pub(crate) fn byte_number(&self) -> ByteNumber { + match self { + Flag::FooterPayloadCompressed => ByteNumber(0), + } + } + + pub(crate) fn bit_number(&self) -> BitNumber { + match self { + Flag::FooterPayloadCompressed => BitNumber(0), + } + } + + fn from(byte_and_bit: &(ByteNumber, BitNumber)) -> Option { + FLAGS_BY_BYTE_AND_BIT.get(byte_and_bit).cloned() + } +} + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)] +/// Metadata about a puffin file. +/// For more information, see: https://iceberg.apache.org/puffin-spec/#filemetadata +pub struct FileMetadata { + /// Metadata about blobs in file + pub blobs: Vec, + #[serde(skip_serializing_if = "HashMap::is_empty")] + #[serde(default)] + /// Arbitrary meta-information, like writer identification/version. + pub properties: HashMap, +} + +impl FileMetadata { + pub(crate) const MAGIC_LENGTH: u8 = 4; + pub(crate) const MAGIC: [u8; FileMetadata::MAGIC_LENGTH as usize] = [0x50, 0x46, 0x41, 0x31]; + + // We use the term FOOTER_STRUCT to refer to the fixed-length portion of the Footer, as illustrated below. + // + // Footer + // | + // ------------------------------------------------- + // | | + // Magic FooterPayload FooterPayloadLength Flags Magic + // | | + // ----------------------------- + // | + // FOOTER_STRUCT + + const FOOTER_STRUCT_PAYLOAD_LENGTH_OFFSET: u8 = 0; + const FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH: u8 = 4; + const FOOTER_STRUCT_FLAGS_OFFSET: u8 = FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_OFFSET + + FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH; + pub(crate) const FOOTER_STRUCT_FLAGS_LENGTH: u8 = 4; + const FOOTER_STRUCT_MAGIC_OFFSET: u8 = + FileMetadata::FOOTER_STRUCT_FLAGS_OFFSET + FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH; + pub(crate) const FOOTER_STRUCT_LENGTH: u8 = + FileMetadata::FOOTER_STRUCT_MAGIC_OFFSET + FileMetadata::MAGIC_LENGTH; + + fn check_magic(bytes: &[u8]) -> Result<()> { + if bytes != FileMetadata::MAGIC { + Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Bad magic value: {:?} should be {:?}", + bytes, + FileMetadata::MAGIC + ), + )) + } else { + Ok(()) + } + } + + async fn read_footer_payload_length( + file_read: &dyn FileRead, + input_file_length: u64, + ) -> Result { + let start = input_file_length - u64::from(FileMetadata::FOOTER_STRUCT_LENGTH); + let end = start + u64::from(FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH); + let footer_payload_length_bytes = file_read.read(start..end).await?; + let mut buf = [0; 4]; + buf.copy_from_slice(&footer_payload_length_bytes); + let footer_payload_length = u32::from_le_bytes(buf); + Ok(footer_payload_length) + } + + async fn read_footer_bytes( + file_read: &dyn FileRead, + input_file_length: u64, + footer_payload_length: u32, + ) -> Result { + let footer_length = u64::from(footer_payload_length) + + u64::from(FileMetadata::FOOTER_STRUCT_LENGTH) + + u64::from(FileMetadata::MAGIC_LENGTH); + let start = input_file_length - footer_length; + let end = input_file_length; + file_read.read(start..end).await + } + + fn err_out_of_bounds() -> Result { + Err(Error::new( + ErrorKind::DataInvalid, + "Index range is out of bounds.", + )) + } + + fn decode_flags(footer_bytes: &[u8]) -> Result> { + let mut flags = HashSet::new(); + for byte_number in 0..FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH { + let byte_offset = footer_bytes.len() + - usize::from(FileMetadata::MAGIC_LENGTH) + - usize::from(FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH) + + usize::from(byte_number); + + let mut flag_byte = match footer_bytes.get(byte_offset) { + None => FileMetadata::err_out_of_bounds(), + Some(byte) => Ok(*byte), + }?; + let mut bit_number = 0; + while flag_byte != 0 { + if flag_byte & 0x1 != 0 { + match Flag::from(&(ByteNumber(byte_number), BitNumber(bit_number))) { + Some(flag) => flags.insert(flag), + None => { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Unknown flag byte {} and bit {} combination", + byte_number, bit_number + ), + )) + } + }; + } + flag_byte >>= 1; + bit_number += 1; + } + } + Ok(flags) + } + + fn extract_footer_payload_as_str( + footer_bytes: &[u8], + footer_payload_length: u32, + ) -> Result { + let flags = FileMetadata::decode_flags(footer_bytes)?; + let footer_compression_codec = if flags.contains(&Flag::FooterPayloadCompressed) { + CompressionCodec::Lz4 + } else { + CompressionCodec::None + }; + + let start_offset = usize::from(FileMetadata::MAGIC_LENGTH); + let end_offset = + usize::from(FileMetadata::MAGIC_LENGTH) + usize::try_from(footer_payload_length)?; + let footer_payload_bytes = match footer_bytes.get(start_offset..end_offset) { + None => FileMetadata::err_out_of_bounds(), + Some(data) => Ok(data), + }?; + let decompressed_footer_payload_bytes = + footer_compression_codec.decompress(footer_payload_bytes.into())?; + + match String::from_utf8(decompressed_footer_payload_bytes) { + Err(src) => Err(Error::new( + ErrorKind::DataInvalid, + "Footer is not a valid UTF-8 string", + ) + .with_source(src)), + Ok(str) => Ok(str), + } + } + + fn from_json_str(string: &str) -> Result { + match serde_json::from_str::(string) { + Ok(file_metadata) => Ok(file_metadata), + Err(src) => Err( + Error::new(ErrorKind::DataInvalid, "Given string is not valid JSON") + .with_source(src), + ), + } + } + + #[rustfmt::skip] + /// Returns the file metadata about a Puffin file + pub(crate) async fn read(input_file: &InputFile) -> Result { + let file_read = input_file.reader().await?; + + let first_four_bytes = file_read.read(0..FileMetadata::MAGIC_LENGTH.into()).await?; + FileMetadata::check_magic(&first_four_bytes)?; + + let input_file_length = input_file.metadata().await?.size; + let footer_payload_length = FileMetadata::read_footer_payload_length(&file_read, input_file_length).await?; + let footer_bytes = FileMetadata::read_footer_bytes(&file_read, input_file_length, footer_payload_length).await?; + + let magic_length = usize::from(FileMetadata::MAGIC_LENGTH); + FileMetadata::check_magic(&footer_bytes[..magic_length])?; // first four bytes of footer + FileMetadata::check_magic(&footer_bytes[footer_bytes.len() - magic_length..])?; // last four bytes of footer + + let footer_payload_str = FileMetadata::extract_footer_payload_as_str(&footer_bytes, footer_payload_length)?; + FileMetadata::from_json_str(&footer_payload_str) + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use bytes::Bytes; + use tempfile::TempDir; + + use crate::io::{FileIOBuilder, InputFile}; + use crate::puffin::metadata::{BlobMetadata, CompressionCodec, FileMetadata}; + use crate::puffin::test_utils::{ + empty_footer_payload, empty_footer_payload_bytes, empty_footer_payload_bytes_length_bytes, + rust_empty_uncompressed_input_file, rust_uncompressed_metric_input_file, + rust_zstd_compressed_metric_input_file, uncompressed_metric_file_metadata, + zstd_compressed_metric_file_metadata, + }; + + const INVALID_MAGIC_VALUE: [u8; 4] = [80, 70, 65, 0]; + + async fn input_file_with_bytes(temp_dir: &TempDir, slice: &[u8]) -> InputFile { + let file_io = FileIOBuilder::new_fs_io().build().unwrap(); + + let path_buf = temp_dir.path().join("abc.puffin"); + let temp_path = path_buf.to_str().unwrap(); + let output_file = file_io.new_output(temp_path).unwrap(); + + output_file + .write(Bytes::copy_from_slice(slice)) + .await + .unwrap(); + + output_file.to_input_file() + } + + async fn input_file_with_payload(temp_dir: &TempDir, payload_str: &str) -> InputFile { + let payload_bytes = payload_str.as_bytes(); + + let mut bytes = vec![]; + bytes.extend(FileMetadata::MAGIC.to_vec()); + bytes.extend(FileMetadata::MAGIC.to_vec()); + bytes.extend(payload_bytes); + bytes.extend(u32::to_le_bytes(payload_bytes.len() as u32)); + bytes.extend(vec![0, 0, 0, 0]); + bytes.extend(FileMetadata::MAGIC); + + input_file_with_bytes(temp_dir, &bytes).await + } + + #[tokio::test] + async fn test_file_starting_with_invalid_magic_returns_error() { + let temp_dir = TempDir::new().unwrap(); + + let mut bytes = vec![]; + bytes.extend(INVALID_MAGIC_VALUE.to_vec()); + bytes.extend(FileMetadata::MAGIC.to_vec()); + bytes.extend(empty_footer_payload_bytes()); + bytes.extend(empty_footer_payload_bytes_length_bytes()); + bytes.extend(vec![0, 0, 0, 0]); + bytes.extend(FileMetadata::MAGIC); + + let input_file = input_file_with_bytes(&temp_dir, &bytes).await; + + assert_eq!( + FileMetadata::read(&input_file) + .await + .unwrap_err() + .to_string(), + "DataInvalid => Bad magic value: [80, 70, 65, 0] should be [80, 70, 65, 49]", + ) + } + + #[tokio::test] + async fn test_file_with_invalid_magic_at_start_of_footer_returns_error() { + let temp_dir = TempDir::new().unwrap(); + + let mut bytes = vec![]; + bytes.extend(FileMetadata::MAGIC.to_vec()); + bytes.extend(INVALID_MAGIC_VALUE.to_vec()); + bytes.extend(empty_footer_payload_bytes()); + bytes.extend(empty_footer_payload_bytes_length_bytes()); + bytes.extend(vec![0, 0, 0, 0]); + bytes.extend(FileMetadata::MAGIC); + + let input_file = input_file_with_bytes(&temp_dir, &bytes).await; + + assert_eq!( + FileMetadata::read(&input_file) + .await + .unwrap_err() + .to_string(), + "DataInvalid => Bad magic value: [80, 70, 65, 0] should be [80, 70, 65, 49]", + ) + } + + #[tokio::test] + async fn test_file_ending_with_invalid_magic_returns_error() { + let temp_dir = TempDir::new().unwrap(); + + let mut bytes = vec![]; + bytes.extend(FileMetadata::MAGIC.to_vec()); + bytes.extend(FileMetadata::MAGIC.to_vec()); + bytes.extend(empty_footer_payload_bytes()); + bytes.extend(empty_footer_payload_bytes_length_bytes()); + bytes.extend(vec![0, 0, 0, 0]); + bytes.extend(INVALID_MAGIC_VALUE); + + let input_file = input_file_with_bytes(&temp_dir, &bytes).await; + + assert_eq!( + FileMetadata::read(&input_file) + .await + .unwrap_err() + .to_string(), + "DataInvalid => Bad magic value: [80, 70, 65, 0] should be [80, 70, 65, 49]", + ) + } + + #[tokio::test] + async fn test_encoded_payload_length_larger_than_actual_payload_length_returns_error() { + let temp_dir = TempDir::new().unwrap(); + + let mut bytes = vec![]; + bytes.extend(FileMetadata::MAGIC.to_vec()); + bytes.extend(FileMetadata::MAGIC.to_vec()); + bytes.extend(empty_footer_payload_bytes()); + bytes.extend(u32::to_le_bytes( + empty_footer_payload_bytes().len() as u32 + 1, + )); + bytes.extend(vec![0, 0, 0, 0]); + bytes.extend(FileMetadata::MAGIC.to_vec()); + + let input_file = input_file_with_bytes(&temp_dir, &bytes).await; + + assert_eq!( + FileMetadata::read(&input_file) + .await + .unwrap_err() + .to_string(), + "DataInvalid => Bad magic value: [49, 80, 70, 65] should be [80, 70, 65, 49]", + ) + } + + #[tokio::test] + async fn test_encoded_payload_length_smaller_than_actual_payload_length_returns_error() { + let temp_dir = TempDir::new().unwrap(); + + let mut bytes = vec![]; + bytes.extend(FileMetadata::MAGIC.to_vec()); + bytes.extend(FileMetadata::MAGIC.to_vec()); + bytes.extend(empty_footer_payload_bytes()); + bytes.extend(u32::to_le_bytes( + empty_footer_payload_bytes().len() as u32 - 1, + )); + bytes.extend(vec![0, 0, 0, 0]); + bytes.extend(FileMetadata::MAGIC.to_vec()); + + let input_file = input_file_with_bytes(&temp_dir, &bytes).await; + + assert_eq!( + FileMetadata::read(&input_file) + .await + .unwrap_err() + .to_string(), + "DataInvalid => Bad magic value: [70, 65, 49, 123] should be [80, 70, 65, 49]", + ) + } + + #[tokio::test] + async fn test_lz4_compressed_footer_returns_error() { + let temp_dir = TempDir::new().unwrap(); + + let mut bytes = vec![]; + bytes.extend(FileMetadata::MAGIC.to_vec()); + bytes.extend(FileMetadata::MAGIC.to_vec()); + bytes.extend(empty_footer_payload_bytes()); + bytes.extend(empty_footer_payload_bytes_length_bytes()); + bytes.extend(vec![0b00000001, 0, 0, 0]); + bytes.extend(FileMetadata::MAGIC.to_vec()); + + let input_file = input_file_with_bytes(&temp_dir, &bytes).await; + + assert_eq!( + FileMetadata::read(&input_file) + .await + .unwrap_err() + .to_string(), + "FeatureUnsupported => LZ4 decompression is not supported currently", + ) + } + + #[tokio::test] + async fn test_unknown_byte_bit_combination_returns_error() { + let temp_dir = TempDir::new().unwrap(); + + let mut bytes = vec![]; + bytes.extend(FileMetadata::MAGIC.to_vec()); + bytes.extend(FileMetadata::MAGIC.to_vec()); + bytes.extend(empty_footer_payload_bytes()); + bytes.extend(empty_footer_payload_bytes_length_bytes()); + bytes.extend(vec![0b00000010, 0, 0, 0]); + bytes.extend(FileMetadata::MAGIC.to_vec()); + + let input_file = input_file_with_bytes(&temp_dir, &bytes).await; + + assert_eq!( + FileMetadata::read(&input_file) + .await + .unwrap_err() + .to_string(), + "DataInvalid => Unknown flag byte 0 and bit 1 combination", + ) + } + + #[tokio::test] + async fn test_non_utf8_string_payload_returns_error() { + let temp_dir = TempDir::new().unwrap(); + + let payload_bytes: [u8; 4] = [0, 159, 146, 150]; + let payload_bytes_length_bytes: [u8; 4] = u32::to_le_bytes(payload_bytes.len() as u32); + + let mut bytes = vec![]; + bytes.extend(FileMetadata::MAGIC.to_vec()); + bytes.extend(FileMetadata::MAGIC.to_vec()); + bytes.extend(payload_bytes); + bytes.extend(payload_bytes_length_bytes); + bytes.extend(vec![0, 0, 0, 0]); + bytes.extend(FileMetadata::MAGIC.to_vec()); + + let input_file = input_file_with_bytes(&temp_dir, &bytes).await; + + assert_eq!( + FileMetadata::read(&input_file).await.unwrap_err().to_string(), + "DataInvalid => Footer is not a valid UTF-8 string, source: invalid utf-8 sequence of 1 bytes from index 1", + ) + } + + #[tokio::test] + async fn test_minimal_valid_file_returns_file_metadata() { + let temp_dir = TempDir::new().unwrap(); + + let mut bytes = vec![]; + bytes.extend(FileMetadata::MAGIC.to_vec()); + bytes.extend(FileMetadata::MAGIC.to_vec()); + bytes.extend(empty_footer_payload_bytes()); + bytes.extend(empty_footer_payload_bytes_length_bytes()); + bytes.extend(vec![0, 0, 0, 0]); + bytes.extend(FileMetadata::MAGIC); + + let input_file = input_file_with_bytes(&temp_dir, &bytes).await; + + assert_eq!( + FileMetadata::read(&input_file).await.unwrap(), + FileMetadata { + blobs: vec![], + properties: HashMap::new(), + } + ) + } + + #[tokio::test] + async fn test_returns_file_metadata_property() { + let temp_dir = TempDir::new().unwrap(); + + let input_file = input_file_with_payload( + &temp_dir, + r#"{ + "blobs" : [ ], + "properties" : { + "a property" : "a property value" + } + }"#, + ) + .await; + + assert_eq!( + FileMetadata::read(&input_file).await.unwrap(), + FileMetadata { + blobs: vec![], + properties: { + let mut map = HashMap::new(); + map.insert("a property".to_string(), "a property value".to_string()); + map + }, + } + ) + } + + #[tokio::test] + async fn test_returns_file_metadata_properties() { + let temp_dir = TempDir::new().unwrap(); + + let input_file = input_file_with_payload( + &temp_dir, + r#"{ + "blobs" : [ ], + "properties" : { + "a property" : "a property value", + "another one": "also with value" + } + }"#, + ) + .await; + + assert_eq!( + FileMetadata::read(&input_file).await.unwrap(), + FileMetadata { + blobs: vec![], + properties: { + let mut map = HashMap::new(); + map.insert("a property".to_string(), "a property value".to_string()); + map.insert("another one".to_string(), "also with value".to_string()); + map + }, + } + ) + } + + #[tokio::test] + async fn test_returns_error_if_blobs_field_is_missing() { + let temp_dir = TempDir::new().unwrap(); + + let input_file = input_file_with_payload( + &temp_dir, + r#"{ + "properties" : {} + }"#, + ) + .await; + + assert_eq!( + FileMetadata::read(&input_file).await.unwrap_err().to_string(), + format!( + "DataInvalid => Given string is not valid JSON, source: missing field `blobs` at line 3 column 13" + ), + ) + } + + #[tokio::test] + async fn test_returns_error_if_blobs_field_is_bad() { + let temp_dir = TempDir::new().unwrap(); + + let input_file = input_file_with_payload( + &temp_dir, + r#"{ + "blobs" : {} + }"#, + ) + .await; + + assert_eq!( + FileMetadata::read(&input_file).await.unwrap_err().to_string(), + format!("DataInvalid => Given string is not valid JSON, source: invalid type: map, expected a sequence at line 2 column 26"), + ) + } + + #[tokio::test] + async fn test_returns_blobs_metadatas() { + let temp_dir = TempDir::new().unwrap(); + + let input_file = input_file_with_payload( + &temp_dir, + r#"{ + "blobs" : [ + { + "type" : "type-a", + "fields" : [ 1 ], + "snapshot-id" : 14, + "sequence-number" : 3, + "offset" : 4, + "length" : 16 + }, + { + "type" : "type-bbb", + "fields" : [ 2, 3, 4 ], + "snapshot-id" : 77, + "sequence-number" : 4, + "offset" : 21474836470000, + "length" : 79834 + } + ] + }"#, + ) + .await; + + assert_eq!( + FileMetadata::read(&input_file).await.unwrap(), + FileMetadata { + blobs: vec![ + BlobMetadata { + r#type: "type-a".to_string(), + input_fields: vec![1], + snapshot_id: 14, + sequence_number: 3, + offset: 4, + length: 16, + compression_codec: CompressionCodec::None, + properties: HashMap::new(), + }, + BlobMetadata { + r#type: "type-bbb".to_string(), + input_fields: vec![2, 3, 4], + snapshot_id: 77, + sequence_number: 4, + offset: 21474836470000, + length: 79834, + compression_codec: CompressionCodec::None, + properties: HashMap::new(), + }, + ], + properties: HashMap::new(), + } + ) + } + + #[tokio::test] + async fn test_returns_properties_in_blob_metadata() { + let temp_dir = TempDir::new().unwrap(); + + let input_file = input_file_with_payload( + &temp_dir, + r#"{ + "blobs" : [ + { + "type" : "type-a", + "fields" : [ 1 ], + "snapshot-id" : 14, + "sequence-number" : 3, + "offset" : 4, + "length" : 16, + "properties" : { + "some key" : "some value" + } + } + ] + }"#, + ) + .await; + + assert_eq!( + FileMetadata::read(&input_file).await.unwrap(), + FileMetadata { + blobs: vec![BlobMetadata { + r#type: "type-a".to_string(), + input_fields: vec![1], + snapshot_id: 14, + sequence_number: 3, + offset: 4, + length: 16, + compression_codec: CompressionCodec::None, + properties: { + let mut map = HashMap::new(); + map.insert("some key".to_string(), "some value".to_string()); + map + }, + }], + properties: HashMap::new(), + } + ) + } + + #[tokio::test] + async fn test_returns_error_if_blobs_fields_value_is_outside_i32_range() { + let temp_dir = TempDir::new().unwrap(); + + let out_of_i32_range_number: i64 = i32::MAX as i64 + 1; + + let input_file = input_file_with_payload( + &temp_dir, + &format!( + r#"{{ + "blobs" : [ + {{ + "type" : "type-a", + "fields" : [ {} ], + "snapshot-id" : 14, + "sequence-number" : 3, + "offset" : 4, + "length" : 16 + }} + ] + }}"#, + out_of_i32_range_number + ), + ) + .await; + + assert_eq!( + FileMetadata::read(&input_file).await.unwrap_err().to_string(), + format!( + "DataInvalid => Given string is not valid JSON, source: invalid value: integer `{}`, expected i32 at line 5 column 51", + out_of_i32_range_number + ), + ) + } + + #[tokio::test] + async fn test_returns_errors_if_footer_payload_is_not_encoded_in_json_format() { + let temp_dir = TempDir::new().unwrap(); + + let input_file = input_file_with_payload(&temp_dir, r#""blobs" = []"#).await; + assert_eq!( + FileMetadata::read(&input_file).await.unwrap_err().to_string(), + "DataInvalid => Given string is not valid JSON, source: invalid type: string \"blobs\", expected struct FileMetadata at line 1 column 7", + ) + } + + #[tokio::test] + async fn test_read_file_metadata_of_uncompressed_empty_file() { + let input_file = rust_empty_uncompressed_input_file(); + let file_metadata = FileMetadata::read(&input_file).await.unwrap(); + assert_eq!(file_metadata, empty_footer_payload()) + } + + #[tokio::test] + async fn test_read_file_metadata_of_uncompressed_metric_data() { + let input_file = rust_uncompressed_metric_input_file(); + let file_metadata = FileMetadata::read(&input_file).await.unwrap(); + assert_eq!(file_metadata, uncompressed_metric_file_metadata()) + } + + #[tokio::test] + async fn test_read_file_metadata_of_zstd_compressed_metric_data() { + let input_file = rust_zstd_compressed_metric_input_file(); + let file_metadata = FileMetadata::read(&input_file).await.unwrap(); + assert_eq!(file_metadata, zstd_compressed_metric_file_metadata()) + } +} diff --git a/crates/iceberg/src/puffin/mod.rs b/crates/iceberg/src/puffin/mod.rs new file mode 100644 index 000000000..33d6853dc --- /dev/null +++ b/crates/iceberg/src/puffin/mod.rs @@ -0,0 +1,38 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Iceberg Puffin file format implementation + +#![deny(missing_docs)] + +mod blob; +pub use blob::{Blob, APACHE_DATASKETCHES_THETA_V1}; + +mod compression; +pub use compression::CompressionCodec; + +mod metadata; +pub use metadata::{BlobMetadata, FileMetadata, CREATED_BY_PROPERTY}; + +mod reader; +pub use reader::PuffinReader; + +#[cfg(test)] +mod test_utils; + +mod writer; +pub use writer::PuffinWriter; diff --git a/crates/iceberg/src/puffin/reader.rs b/crates/iceberg/src/puffin/reader.rs new file mode 100644 index 000000000..bcef11045 --- /dev/null +++ b/crates/iceberg/src/puffin/reader.rs @@ -0,0 +1,126 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::io::{FileRead, InputFile}; +use crate::puffin::blob::Blob; +use crate::puffin::metadata::{BlobMetadata, FileMetadata}; +use crate::Result; + +/// Puffin reader +pub struct PuffinReader { + input_file: InputFile, + file_metadata: Option, +} + +impl PuffinReader { + /// Returns a new Puffin reader + pub fn new(input_file: InputFile) -> Self { + Self { + input_file, + file_metadata: None, + } + } + + /// Returns file metadata + pub async fn file_metadata(&mut self) -> Result<&FileMetadata> { + if let Some(ref file_metadata) = self.file_metadata { + Ok(file_metadata) + } else { + let file_metadata = FileMetadata::read(&self.input_file).await?; + Ok(self.file_metadata.insert(file_metadata)) + } + } + + /// Returns blob + pub async fn blob(&self, blob_metadata: BlobMetadata) -> Result { + let file_read = self.input_file.reader().await?; + let start = blob_metadata.offset; + let end = start + u64::try_from(blob_metadata.length)?; + let bytes = file_read.read(start..end).await?.to_vec(); + let data = blob_metadata.compression_codec.decompress(bytes)?; + + Ok(Blob { + r#type: blob_metadata.r#type, + input_fields: blob_metadata.input_fields, + snapshot_id: blob_metadata.snapshot_id, + sequence_number: blob_metadata.sequence_number, + data, + properties: blob_metadata.properties, + }) + } +} + +#[cfg(test)] +mod tests { + + use crate::puffin::test_utils::{ + blob_0, blob_1, rust_uncompressed_metric_input_file, + rust_zstd_compressed_metric_input_file, uncompressed_metric_file_metadata, + zstd_compressed_metric_file_metadata, + }; + use crate::puffin::PuffinReader; + + #[tokio::test] + async fn test_puffin_reader_uncompressed_metric_data() { + let input_file = rust_uncompressed_metric_input_file(); + let mut puffin_reader = PuffinReader::new(input_file); + + let file_metadata = puffin_reader.file_metadata().await.unwrap().clone(); + assert_eq!(file_metadata, uncompressed_metric_file_metadata()); + + assert_eq!( + puffin_reader + .blob(file_metadata.blobs.first().unwrap().clone()) + .await + .unwrap(), + blob_0() + ); + + assert_eq!( + puffin_reader + .blob(file_metadata.blobs.get(1).unwrap().clone()) + .await + .unwrap(), + blob_1(), + ) + } + + #[tokio::test] + async fn test_puffin_reader_zstd_compressed_metric_data() { + let input_file = rust_zstd_compressed_metric_input_file(); + let mut puffin_reader = PuffinReader::new(input_file); + + let file_metadata = puffin_reader.file_metadata().await.unwrap().clone(); + assert_eq!(file_metadata, zstd_compressed_metric_file_metadata()); + + assert_eq!( + puffin_reader + .blob(file_metadata.blobs.first().unwrap().clone()) + .await + .unwrap(), + blob_0() + ); + + assert_eq!( + puffin_reader + .blob(file_metadata.blobs.get(1).unwrap().clone()) + .await + .unwrap(), + blob_1(), + ) + } +} diff --git a/crates/iceberg/src/puffin/test_utils.rs b/crates/iceberg/src/puffin/test_utils.rs new file mode 100644 index 000000000..4ae1911df --- /dev/null +++ b/crates/iceberg/src/puffin/test_utils.rs @@ -0,0 +1,197 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; + +use crate::io::{FileIOBuilder, InputFile}; +use crate::puffin::blob::Blob; +use crate::puffin::compression::CompressionCodec; +use crate::puffin::metadata::{BlobMetadata, FileMetadata, CREATED_BY_PROPERTY}; + +const RUST_TESTDATA: &str = "testdata/puffin/rust-generated"; +const JAVA_TESTDATA: &str = "testdata/puffin/java-generated"; +const EMPTY_UNCOMPRESSED: &str = "empty-puffin-uncompressed.bin"; +const METRIC_UNCOMPRESSED: &str = "sample-metric-data-uncompressed.bin"; +const METRIC_ZSTD_COMPRESSED: &str = "sample-metric-data-compressed-zstd.bin"; + +fn input_file_for_test_data(path: &str) -> InputFile { + FileIOBuilder::new_fs_io() + .build() + .unwrap() + .new_input(env!("CARGO_MANIFEST_DIR").to_owned() + "/" + path) + .unwrap() +} + +pub(crate) fn java_empty_uncompressed_input_file() -> InputFile { + input_file_for_test_data(&[JAVA_TESTDATA, EMPTY_UNCOMPRESSED].join("/")) +} + +pub(crate) fn rust_empty_uncompressed_input_file() -> InputFile { + input_file_for_test_data(&[RUST_TESTDATA, EMPTY_UNCOMPRESSED].join("/")) +} + +pub(crate) fn java_uncompressed_metric_input_file() -> InputFile { + input_file_for_test_data(&[JAVA_TESTDATA, METRIC_UNCOMPRESSED].join("/")) +} + +pub(crate) fn rust_uncompressed_metric_input_file() -> InputFile { + input_file_for_test_data(&[RUST_TESTDATA, METRIC_UNCOMPRESSED].join("/")) +} + +pub(crate) fn java_zstd_compressed_metric_input_file() -> InputFile { + input_file_for_test_data(&[JAVA_TESTDATA, METRIC_ZSTD_COMPRESSED].join("/")) +} + +pub(crate) fn rust_zstd_compressed_metric_input_file() -> InputFile { + input_file_for_test_data(&[RUST_TESTDATA, METRIC_ZSTD_COMPRESSED].join("/")) +} + +pub(crate) fn empty_footer_payload() -> FileMetadata { + FileMetadata { + blobs: Vec::new(), + properties: HashMap::new(), + } +} + +pub(crate) fn empty_footer_payload_bytes() -> Vec { + return serde_json::to_string::(&empty_footer_payload()) + .unwrap() + .as_bytes() + .to_vec(); +} + +pub(crate) fn empty_footer_payload_bytes_length_bytes() -> [u8; 4] { + u32::to_le_bytes(empty_footer_payload_bytes().len() as u32) +} + +pub(crate) const METRIC_BLOB_0_TYPE: &str = "some-blob"; +pub(crate) const METRIC_BLOB_0_INPUT_FIELDS: [i32; 1] = [1]; +pub(crate) const METRIC_BLOB_0_SNAPSHOT_ID: i64 = 2; +pub(crate) const METRIC_BLOB_0_SEQUENCE_NUMBER: i64 = 1; +pub(crate) const METRIC_BLOB_0_DATA: &str = "abcdefghi"; + +pub(crate) fn zstd_compressed_metric_blob_0_metadata() -> BlobMetadata { + BlobMetadata { + r#type: METRIC_BLOB_0_TYPE.to_string(), + input_fields: METRIC_BLOB_0_INPUT_FIELDS.to_vec(), + snapshot_id: METRIC_BLOB_0_SNAPSHOT_ID, + sequence_number: METRIC_BLOB_0_SEQUENCE_NUMBER, + offset: 4, + length: 22, + compression_codec: CompressionCodec::Zstd, + properties: HashMap::new(), + } +} + +pub(crate) fn uncompressed_metric_blob_0_metadata() -> BlobMetadata { + BlobMetadata { + r#type: METRIC_BLOB_0_TYPE.to_string(), + input_fields: METRIC_BLOB_0_INPUT_FIELDS.to_vec(), + snapshot_id: METRIC_BLOB_0_SNAPSHOT_ID, + sequence_number: METRIC_BLOB_0_SEQUENCE_NUMBER, + offset: 4, + length: 9, + compression_codec: CompressionCodec::None, + properties: HashMap::new(), + } +} + +pub(crate) fn blob_0() -> Blob { + Blob { + r#type: METRIC_BLOB_0_TYPE.to_string(), + input_fields: METRIC_BLOB_0_INPUT_FIELDS.to_vec(), + snapshot_id: METRIC_BLOB_0_SNAPSHOT_ID, + sequence_number: METRIC_BLOB_0_SEQUENCE_NUMBER, + data: METRIC_BLOB_0_DATA.as_bytes().to_vec(), + properties: HashMap::new(), + } +} + +pub(crate) const METRIC_BLOB_1_TYPE: &str = "some-other-blob"; +pub(crate) const METRIC_BLOB_1_INPUT_FIELDS: [i32; 1] = [2]; +pub(crate) const METRIC_BLOB_1_SNAPSHOT_ID: i64 = 2; +pub(crate) const METRIC_BLOB_1_SEQUENCE_NUMBER: i64 = 1; +pub(crate) const METRIC_BLOB_1_DATA: &str = + "some blob \u{0000} binary data 🤯 that is not very very very very very very long, is it?"; + +pub(crate) fn uncompressed_metric_blob_1_metadata() -> BlobMetadata { + BlobMetadata { + r#type: METRIC_BLOB_1_TYPE.to_string(), + input_fields: METRIC_BLOB_1_INPUT_FIELDS.to_vec(), + snapshot_id: METRIC_BLOB_1_SNAPSHOT_ID, + sequence_number: METRIC_BLOB_1_SEQUENCE_NUMBER, + offset: 13, + length: 83, + compression_codec: CompressionCodec::None, + properties: HashMap::new(), + } +} + +pub(crate) fn zstd_compressed_metric_blob_1_metadata() -> BlobMetadata { + BlobMetadata { + r#type: METRIC_BLOB_1_TYPE.to_string(), + input_fields: METRIC_BLOB_1_INPUT_FIELDS.to_vec(), + snapshot_id: METRIC_BLOB_1_SNAPSHOT_ID, + sequence_number: METRIC_BLOB_1_SEQUENCE_NUMBER, + offset: 26, + length: 77, + compression_codec: CompressionCodec::Zstd, + properties: HashMap::new(), + } +} + +pub(crate) fn blob_1() -> Blob { + Blob { + r#type: METRIC_BLOB_1_TYPE.to_string(), + input_fields: METRIC_BLOB_1_INPUT_FIELDS.to_vec(), + snapshot_id: METRIC_BLOB_1_SNAPSHOT_ID, + sequence_number: METRIC_BLOB_1_SEQUENCE_NUMBER, + data: METRIC_BLOB_1_DATA.as_bytes().to_vec(), + properties: HashMap::new(), + } +} + +pub(crate) const CREATED_BY_PROPERTY_VALUE: &str = "Test 1234"; + +pub(crate) fn file_properties() -> HashMap { + let mut properties = HashMap::new(); + properties.insert( + CREATED_BY_PROPERTY.to_string(), + CREATED_BY_PROPERTY_VALUE.to_string(), + ); + properties +} + +pub(crate) fn uncompressed_metric_file_metadata() -> FileMetadata { + FileMetadata { + blobs: vec![ + uncompressed_metric_blob_0_metadata(), + uncompressed_metric_blob_1_metadata(), + ], + properties: file_properties(), + } +} + +pub(crate) fn zstd_compressed_metric_file_metadata() -> FileMetadata { + FileMetadata { + blobs: vec![ + zstd_compressed_metric_blob_0_metadata(), + zstd_compressed_metric_blob_1_metadata(), + ], + properties: file_properties(), + } +} diff --git a/crates/iceberg/src/puffin/writer.rs b/crates/iceberg/src/puffin/writer.rs new file mode 100644 index 000000000..b0afb86d0 --- /dev/null +++ b/crates/iceberg/src/puffin/writer.rs @@ -0,0 +1,418 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::{HashMap, HashSet}; +use std::sync::atomic::AtomicU64; +use std::sync::Arc; + +use bytes::Bytes; + +use crate::io::{FileWrite, OutputFile}; +use crate::puffin::blob::Blob; +use crate::puffin::compression::CompressionCodec; +use crate::puffin::metadata::{BlobMetadata, ByteNumber, FileMetadata, Flag}; +use crate::writer::file_writer::track_writer::TrackWriter; +use crate::{Error, ErrorKind, Result}; + +/// Puffin writer +pub struct PuffinWriter { + writer: TrackWriter, + written_blobs_metadata: Vec, + properties: HashMap, + footer_compression_codec: CompressionCodec, + flags: HashSet, + header_written: bool, + is_closed: bool, +} + +impl PuffinWriter { + /// Returns a new Puffin writer + pub async fn new( + output_file: &OutputFile, + properties: HashMap, + compress_footer: bool, + ) -> Result { + let mut flags = HashSet::::new(); + let footer_compression_codec = if compress_footer { + flags.insert(Flag::FooterPayloadCompressed); + CompressionCodec::Lz4 + } else { + CompressionCodec::None + }; + + let written_size = Arc::new(AtomicU64::new(0)); + let track_writer = TrackWriter::new(output_file.writer().await?, written_size); + + Ok(Self { + writer: track_writer, + written_blobs_metadata: Vec::new(), + properties, + footer_compression_codec, + flags, + header_written: false, + is_closed: false, + }) + } + + fn already_closed_err() -> Result { + Err(Error::new( + ErrorKind::Unexpected, + "PuffinWriter is already closed", + )) + } + + async fn maybe_write_header(&mut self) -> Result<()> { + if !self.header_written { + self.writer + .write(Bytes::copy_from_slice(&FileMetadata::MAGIC)) + .await?; + self.header_written = true; + } + Ok(()) + } + + /// Adds blob to Puffin file + pub async fn add(&mut self, blob: Blob, compression_codec: CompressionCodec) -> Result<()> { + if self.is_closed { + PuffinWriter::already_closed_err() + } else { + self.maybe_write_header().await?; + + let offset = self.writer.bytes_written(); + + let compressed_data = compression_codec.compress(blob.data)?; + + self.writer + .write(Bytes::copy_from_slice(&compressed_data)) + .await?; + + self.written_blobs_metadata.push(BlobMetadata { + r#type: blob.r#type, + input_fields: blob.input_fields, + snapshot_id: blob.snapshot_id, + sequence_number: blob.sequence_number, + offset, + length: compressed_data.len(), + compression_codec, + properties: blob.properties, + }); + + Ok(()) + } + } + + fn footer_payload_bytes(&self) -> Result> { + let file_metadata = FileMetadata { + blobs: self.written_blobs_metadata.clone(), + properties: self.properties.clone(), + }; + let json = serde_json::to_string::(&file_metadata)?; + let bytes = json.as_bytes(); + self.footer_compression_codec.compress(bytes.to_vec()) + } + + fn flags_bytes(&mut self) -> Vec { + let mut flags_by_byte_number: HashMap> = HashMap::new(); + for flag in &self.flags { + let byte_number = flag.byte_number(); + match flags_by_byte_number.get_mut(&byte_number) { + Some(vec) => vec.push(flag), + None => { + let _ = flags_by_byte_number.insert(byte_number, vec![flag]); + } + }; + } + + let mut flags_bytes = Vec::new(); + for byte_number in 0..FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH { + let mut byte_flag: u8 = 0; + for flag in flags_by_byte_number + .get(&ByteNumber(byte_number)) + .unwrap_or(&vec![]) + { + byte_flag |= 0x1 << flag.bit_number().0; + } + + flags_bytes.push(byte_flag); + } + flags_bytes + } + + async fn write_footer(&mut self) -> Result<()> { + let mut footer_payload_bytes = self.footer_payload_bytes()?; + let footer_payload_bytes_length = u32::to_le_bytes(footer_payload_bytes.len().try_into()?); + + let mut footer_bytes = Vec::new(); + footer_bytes.extend(&FileMetadata::MAGIC); + footer_bytes.append(&mut footer_payload_bytes); + footer_bytes.extend(footer_payload_bytes_length); + footer_bytes.append(&mut self.flags_bytes()); + footer_bytes.extend(&FileMetadata::MAGIC); + + self.writer.write(footer_bytes.into()).await?; + + Ok(()) + } + + /// Finalizes the Puffin file + pub async fn close(&mut self) -> Result<()> { + if self.is_closed { + PuffinWriter::already_closed_err() + } else { + self.maybe_write_header().await?; + self.write_footer().await?; + self.writer.close().await?; + self.is_closed = true; + Ok(()) + } + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use tempfile::TempDir; + + use crate::io::{FileIOBuilder, InputFile, OutputFile}; + use crate::puffin::blob::Blob; + use crate::puffin::compression::CompressionCodec; + use crate::puffin::metadata::FileMetadata; + use crate::puffin::test_utils::{ + blob_0, blob_1, empty_footer_payload, empty_footer_payload_bytes, file_properties, + java_empty_uncompressed_input_file, java_uncompressed_metric_input_file, + java_zstd_compressed_metric_input_file, uncompressed_metric_file_metadata, + zstd_compressed_metric_file_metadata, + }; + use crate::puffin::writer::PuffinWriter; + use crate::puffin::PuffinReader; + use crate::Result; + + #[tokio::test] + async fn test_throws_error_if_attempt_to_add_blob_after_closing() { + let temp_dir = TempDir::new().unwrap(); + + let file_name = "temp_puffin.bin"; + let full_path = format!("{}/{}", temp_dir.path().to_str().unwrap(), file_name); + + let file_io = FileIOBuilder::new_fs_io().build().unwrap(); + let output_file = file_io.new_output(full_path).unwrap(); + let mut writer = PuffinWriter::new(&output_file, HashMap::new(), false) + .await + .unwrap(); + writer.close().await.unwrap(); + + assert_eq!( + writer + .add(blob_0(), CompressionCodec::None) + .await + .unwrap_err() + .to_string(), + "Unexpected => PuffinWriter is already closed", + ) + } + + #[tokio::test] + async fn test_throws_error_if_attempt_to_close_multiple_times() { + let temp_dir = TempDir::new().unwrap(); + + let file_name = "temp_puffin.bin"; + let full_path = format!("{}/{}", temp_dir.path().to_str().unwrap(), file_name); + + let file_io = FileIOBuilder::new_fs_io().build().unwrap(); + let output_file = file_io.new_output(full_path).unwrap(); + let mut writer = PuffinWriter::new(&output_file, HashMap::new(), false) + .await + .unwrap(); + writer.close().await.unwrap(); + + assert_eq!( + writer.close().await.unwrap_err().to_string(), + "Unexpected => PuffinWriter is already closed", + ) + } + + async fn write_puffin_file( + temp_dir: &TempDir, + blobs: Vec<(Blob, CompressionCodec)>, + properties: HashMap, + ) -> Result { + let file_io = FileIOBuilder::new_fs_io().build()?; + + let path_buf = temp_dir.path().join("temp_puffin.bin"); + let temp_path = path_buf.to_str().unwrap(); + let output_file = file_io.new_output(temp_path)?; + + let mut writer = PuffinWriter::new(&output_file, properties, false).await?; + for (blob, compression_codec) in blobs { + writer.add(blob, compression_codec).await?; + } + writer.close().await?; + + Ok(output_file) + } + + async fn read_all_blobs_from_puffin_file(input_file: InputFile) -> Vec { + let mut puffin_reader = PuffinReader::new(input_file); + let mut blobs = Vec::new(); + let blobs_metadata = puffin_reader.file_metadata().await.unwrap().clone().blobs; + for blob_metadata in blobs_metadata { + blobs.push(puffin_reader.blob(blob_metadata).await.unwrap()); + } + blobs + } + + #[tokio::test] + async fn test_write_uncompressed_empty_file() { + let temp_dir = TempDir::new().unwrap(); + + let input_file = write_puffin_file(&temp_dir, Vec::new(), HashMap::new()) + .await + .unwrap() + .to_input_file(); + + assert_eq!( + FileMetadata::read(&input_file).await.unwrap(), + empty_footer_payload() + ); + + assert_eq!( + input_file.read().await.unwrap().len(), + FileMetadata::MAGIC_LENGTH as usize + // no blobs since puffin file is empty + + FileMetadata::MAGIC_LENGTH as usize + + empty_footer_payload_bytes().len() + + FileMetadata::FOOTER_STRUCT_LENGTH as usize + ) + } + + fn blobs_with_compression( + blobs: Vec, + compression_codec: CompressionCodec, + ) -> Vec<(Blob, CompressionCodec)> { + blobs + .into_iter() + .map(|blob| (blob, compression_codec)) + .collect() + } + + #[tokio::test] + async fn test_write_uncompressed_metric_data() { + let temp_dir = TempDir::new().unwrap(); + let blobs = vec![blob_0(), blob_1()]; + let blobs_with_compression = blobs_with_compression(blobs.clone(), CompressionCodec::None); + + let input_file = write_puffin_file(&temp_dir, blobs_with_compression, file_properties()) + .await + .unwrap() + .to_input_file(); + + assert_eq!( + FileMetadata::read(&input_file).await.unwrap(), + uncompressed_metric_file_metadata() + ); + + assert_eq!(read_all_blobs_from_puffin_file(input_file).await, blobs) + } + + #[tokio::test] + async fn test_write_zstd_compressed_metric_data() { + let temp_dir = TempDir::new().unwrap(); + let blobs = vec![blob_0(), blob_1()]; + let blobs_with_compression = blobs_with_compression(blobs.clone(), CompressionCodec::Zstd); + + let input_file = write_puffin_file(&temp_dir, blobs_with_compression, file_properties()) + .await + .unwrap() + .to_input_file(); + + assert_eq!( + FileMetadata::read(&input_file).await.unwrap(), + zstd_compressed_metric_file_metadata() + ); + + assert_eq!(read_all_blobs_from_puffin_file(input_file).await, blobs) + } + + #[tokio::test] + async fn test_write_lz4_compressed_metric_data() { + let temp_dir = TempDir::new().unwrap(); + let blobs = vec![blob_0(), blob_1()]; + let blobs_with_compression = blobs_with_compression(blobs.clone(), CompressionCodec::Lz4); + + assert_eq!( + write_puffin_file(&temp_dir, blobs_with_compression, file_properties()) + .await + .unwrap_err() + .to_string(), + "FeatureUnsupported => LZ4 compression is not supported currently" + ); + } + + async fn get_file_as_byte_vec(input_file: InputFile) -> Vec { + input_file.read().await.unwrap().to_vec() + } + + async fn assert_files_are_bit_identical(actual: OutputFile, expected: InputFile) { + let actual_bytes = get_file_as_byte_vec(actual.to_input_file()).await; + let expected_bytes = get_file_as_byte_vec(expected).await; + assert_eq!(actual_bytes, expected_bytes); + } + + #[tokio::test] + async fn test_uncompressed_empty_puffin_file_is_bit_identical_to_java_generated_file() { + let temp_dir = TempDir::new().unwrap(); + + assert_files_are_bit_identical( + write_puffin_file(&temp_dir, Vec::new(), HashMap::new()) + .await + .unwrap(), + java_empty_uncompressed_input_file(), + ) + .await + } + + #[tokio::test] + async fn test_uncompressed_metric_data_is_bit_identical_to_java_generated_file() { + let temp_dir = TempDir::new().unwrap(); + let blobs = vec![blob_0(), blob_1()]; + let blobs_with_compression = blobs_with_compression(blobs, CompressionCodec::None); + + assert_files_are_bit_identical( + write_puffin_file(&temp_dir, blobs_with_compression, file_properties()) + .await + .unwrap(), + java_uncompressed_metric_input_file(), + ) + .await + } + + #[tokio::test] + async fn test_zstd_compressed_metric_data_is_bit_identical_to_java_generated_file() { + let temp_dir = TempDir::new().unwrap(); + let blobs = vec![blob_0(), blob_1()]; + let blobs_with_compression = blobs_with_compression(blobs, CompressionCodec::Zstd); + + assert_files_are_bit_identical( + write_puffin_file(&temp_dir, blobs_with_compression, file_properties()) + .await + .unwrap(), + java_zstd_compressed_metric_input_file(), + ) + .await + } +} diff --git a/crates/iceberg/src/writer/file_writer/mod.rs b/crates/iceberg/src/writer/file_writer/mod.rs index 4a0fffcc1..854ef1d39 100644 --- a/crates/iceberg/src/writer/file_writer/mod.rs +++ b/crates/iceberg/src/writer/file_writer/mod.rs @@ -26,7 +26,8 @@ use crate::Result; mod parquet_writer; pub use parquet_writer::{ParquetWriter, ParquetWriterBuilder}; -mod track_writer; + +pub(crate) mod track_writer; pub mod location_generator; diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index f4cec63ed..ab1cf96d0 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -18,7 +18,7 @@ //! The module contains the file writer for parquet file format. use std::collections::HashMap; -use std::sync::atomic::AtomicI64; +use std::sync::atomic::AtomicU64; use std::sync::Arc; use arrow_schema::SchemaRef as ArrowSchemaRef; @@ -81,7 +81,7 @@ impl FileWriterBuilder for ParquetWr async fn build(self) -> crate::Result { let arrow_schema: ArrowSchemaRef = Arc::new(self.schema.as_ref().try_into()?); - let written_size = Arc::new(AtomicI64::new(0)); + let written_size = Arc::new(AtomicU64::new(0)); let out_file = self.file_io.new_output( self.location_generator .generate_location(&self.file_name_generator.generate_file_name()), @@ -214,7 +214,7 @@ pub struct ParquetWriter { schema: SchemaRef, out_file: OutputFile, writer: AsyncArrowWriter>, - written_size: Arc, + written_size: Arc, current_row_num: usize, } diff --git a/crates/iceberg/src/writer/file_writer/track_writer.rs b/crates/iceberg/src/writer/file_writer/track_writer.rs index 6c60a1aa7..71bce7ad1 100644 --- a/crates/iceberg/src/writer/file_writer/track_writer.rs +++ b/crates/iceberg/src/writer/file_writer/track_writer.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use std::sync::atomic::AtomicI64; +use std::sync::atomic::AtomicU64; use std::sync::Arc; use bytes::Bytes; @@ -26,16 +26,22 @@ use crate::Result; /// `TrackWriter` is used to track the written size. pub(crate) struct TrackWriter { inner: Box, - written_size: Arc, + written_size: Arc, } impl TrackWriter { - pub fn new(writer: Box, written_size: Arc) -> Self { + /// Create new writer + pub fn new(writer: Box, written_size: Arc) -> Self { Self { inner: writer, written_size, } } + + /// Number of bytes written so far + pub fn bytes_written(&self) -> u64 { + self.written_size.load(std::sync::atomic::Ordering::SeqCst) + } } #[async_trait::async_trait] @@ -44,7 +50,7 @@ impl FileWrite for TrackWriter { let size = bs.len(); self.inner.write(bs).await.map(|v| { self.written_size - .fetch_add(size as i64, std::sync::atomic::Ordering::Relaxed); + .fetch_add(size as u64, std::sync::atomic::Ordering::Relaxed); v }) } diff --git a/crates/iceberg/testdata/puffin/java-generated/empty-puffin-uncompressed.bin b/crates/iceberg/testdata/puffin/java-generated/empty-puffin-uncompressed.bin new file mode 100644 index 000000000..142b45bd4 Binary files /dev/null and b/crates/iceberg/testdata/puffin/java-generated/empty-puffin-uncompressed.bin differ diff --git a/crates/iceberg/testdata/puffin/java-generated/sample-metric-data-compressed-zstd.bin b/crates/iceberg/testdata/puffin/java-generated/sample-metric-data-compressed-zstd.bin new file mode 100644 index 000000000..ac8b69c76 Binary files /dev/null and b/crates/iceberg/testdata/puffin/java-generated/sample-metric-data-compressed-zstd.bin differ diff --git a/crates/iceberg/testdata/puffin/java-generated/sample-metric-data-uncompressed.bin b/crates/iceberg/testdata/puffin/java-generated/sample-metric-data-uncompressed.bin new file mode 100644 index 000000000..ab8da1382 Binary files /dev/null and b/crates/iceberg/testdata/puffin/java-generated/sample-metric-data-uncompressed.bin differ diff --git a/crates/iceberg/testdata/puffin/rust-generated/empty-puffin-uncompressed.bin b/crates/iceberg/testdata/puffin/rust-generated/empty-puffin-uncompressed.bin new file mode 100644 index 000000000..142b45bd4 Binary files /dev/null and b/crates/iceberg/testdata/puffin/rust-generated/empty-puffin-uncompressed.bin differ diff --git a/crates/iceberg/testdata/puffin/rust-generated/sample-metric-data-compressed-zstd.bin b/crates/iceberg/testdata/puffin/rust-generated/sample-metric-data-compressed-zstd.bin new file mode 100644 index 000000000..ac8b69c76 Binary files /dev/null and b/crates/iceberg/testdata/puffin/rust-generated/sample-metric-data-compressed-zstd.bin differ diff --git a/crates/iceberg/testdata/puffin/rust-generated/sample-metric-data-uncompressed.bin b/crates/iceberg/testdata/puffin/rust-generated/sample-metric-data-uncompressed.bin new file mode 100644 index 000000000..ab8da1382 Binary files /dev/null and b/crates/iceberg/testdata/puffin/rust-generated/sample-metric-data-uncompressed.bin differ