Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Encode UUID as FixedLenByteArray in parquet_derive #5773

Merged
merged 4 commits into from
May 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 90 additions & 35 deletions parquet_derive/src/parquet_field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,19 +136,19 @@ impl Field {
Type::Option(_) => unimplemented!("Unsupported nesting encountered"),
Type::Reference(_, ref second_type)
| Type::Vec(ref second_type)
| Type::Array(ref second_type)
| Type::Array(ref second_type, _)
| Type::Slice(ref second_type) => match **second_type {
Type::TypePath(_) => Some(self.optional_definition_levels()),
_ => unimplemented!("Unsupported nesting encountered"),
},
},
Type::Reference(_, ref first_type)
| Type::Vec(ref first_type)
| Type::Array(ref first_type)
| Type::Array(ref first_type, _)
| Type::Slice(ref first_type) => match **first_type {
Type::TypePath(_) => None,
Type::Vec(ref second_type)
| Type::Array(ref second_type)
| Type::Array(ref second_type, _)
| Type::Slice(ref second_type) => match **second_type {
Type::TypePath(_) => None,
Type::Reference(_, ref third_type) => match **third_type {
Expand All @@ -161,7 +161,7 @@ impl Field {
match **second_type {
Type::TypePath(_) => Some(self.optional_definition_levels()),
Type::Vec(ref third_type)
| Type::Array(ref third_type)
| Type::Array(ref third_type, _)
| Type::Slice(ref third_type) => match **third_type {
Type::TypePath(_) => Some(self.optional_definition_levels()),
Type::Reference(_, ref fourth_type) => match **fourth_type {
Expand Down Expand Up @@ -316,25 +316,23 @@ impl Field {
let logical_type = self.ty.logical_type();
let repetition = self.ty.repetition();
let converted_type = self.ty.converted_type();
let length = self.ty.length();

let mut builder = quote! {
ParquetType::primitive_type_builder(#field_name, #physical_type)
.with_logical_type(#logical_type)
.with_repetition(#repetition)
};

if let Some(converted_type) = converted_type {
quote! {
fields.push(ParquetType::primitive_type_builder(#field_name, #physical_type)
.with_logical_type(#logical_type)
.with_repetition(#repetition)
.with_converted_type(#converted_type)
.build().unwrap().into()
)
}
} else {
quote! {
fields.push(ParquetType::primitive_type_builder(#field_name, #physical_type)
.with_logical_type(#logical_type)
.with_repetition(#repetition)
.build().unwrap().into()
)
}
builder = quote! { #builder.with_converted_type(#converted_type) };
}

if let Some(length) = length {
builder = quote! { #builder.with_length(#length) };
}

quote! { fields.push(#builder.build().unwrap().into()) }
}

fn option_into_vals(&self) -> proc_macro2::TokenStream {
Expand Down Expand Up @@ -394,7 +392,7 @@ impl Field {
quote! { rec.#field_name.signed_duration_since(::chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32 }
}
Some(ThirdPartyType::Uuid) => {
quote! { (&rec.#field_name.to_string()[..]).into() }
quote! { rec.#field_name.as_bytes().to_vec().into() }
}
_ => {
if self.is_a_byte_buf {
Expand Down Expand Up @@ -430,7 +428,7 @@ impl Field {
}
}
Some(ThirdPartyType::Uuid) => {
quote! { ::uuid::Uuid::parse_str(vals[i].data().convert()).unwrap() }
quote! { ::uuid::Uuid::from_bytes(vals[i].data().try_into().unwrap()) }
}
_ => match &self.ty {
Type::TypePath(_) => match self.ty.last_part().as_str() {
Expand Down Expand Up @@ -469,7 +467,7 @@ impl Field {
#[allow(clippy::large_enum_variant)]
#[derive(Debug, PartialEq)]
enum Type {
Array(Box<Type>),
Array(Box<Type>, syn::Expr),
Option(Box<Type>),
Slice(Box<Type>),
Vec(Box<Type>),
Expand Down Expand Up @@ -542,7 +540,7 @@ impl Type {
Type::TypePath(_) => parent_ty.unwrap_or(ty),
Type::Option(ref first_type)
| Type::Vec(ref first_type)
| Type::Array(ref first_type)
| Type::Array(ref first_type, _)
| Type::Slice(ref first_type)
| Type::Reference(_, ref first_type) => {
Type::leaf_type_recursive_helper(first_type, Some(ty))
Expand All @@ -560,7 +558,7 @@ impl Type {
Type::TypePath(ref type_) => type_,
Type::Option(ref first_type)
| Type::Vec(ref first_type)
| Type::Array(ref first_type)
| Type::Array(ref first_type, _)
| Type::Slice(ref first_type)
| Type::Reference(_, ref first_type) => match **first_type {
Type::TypePath(ref type_) => type_,
Expand Down Expand Up @@ -607,7 +605,7 @@ impl Type {
let leaf_type = self.leaf_type_recursive();

match leaf_type {
Type::Array(ref first_type) => {
Type::Array(ref first_type, _length) => {
if let Type::TypePath(_) = **first_type {
if last_part == "u8" {
return BasicType::FIXED_LEN_BYTE_ARRAY;
Expand Down Expand Up @@ -638,17 +636,38 @@ impl Type {
}
"f32" => BasicType::FLOAT,
"f64" => BasicType::DOUBLE,
"String" | "str" | "Uuid" => BasicType::BYTE_ARRAY,
"String" | "str" => BasicType::BYTE_ARRAY,
"Uuid" => BasicType::FIXED_LEN_BYTE_ARRAY,
f => unimplemented!("{} currently is not supported", f),
}
}

fn length(&self) -> Option<syn::Expr> {
let last_part = self.last_part();
let leaf_type = self.leaf_type_recursive();

// `[u8; N]` => Some(N)
if let Type::Array(ref first_type, length) = leaf_type {
conradludgate marked this conversation as resolved.
Show resolved Hide resolved
if let Type::TypePath(_) = **first_type {
if last_part == "u8" {
return Some(length.clone());
}
}
}

match last_part.trim() {
// Uuid => [u8; 16] => Some(16)
"Uuid" => Some(syn::parse_quote!(16)),
_ => None,
}
}

fn logical_type(&self) -> proc_macro2::TokenStream {
let last_part = self.last_part();
let leaf_type = self.leaf_type_recursive();

match leaf_type {
Type::Array(ref first_type) => {
Type::Array(ref first_type, _length) => {
if let Type::TypePath(_) = **first_type {
if last_part == "u8" {
return quote! { None };
Expand Down Expand Up @@ -789,7 +808,7 @@ impl Type {

fn from_type_array(f: &syn::Field, ta: &syn::TypeArray) -> Self {
let inner_type = Type::from_type(f, ta.elem.as_ref());
Type::Array(Box::new(inner_type))
Type::Array(Box::new(inner_type), ta.len.clone())
}

fn from_type_slice(f: &syn::Field, ts: &syn::TypeSlice) -> Self {
Expand Down Expand Up @@ -1091,6 +1110,7 @@ mod test {
a_fix_byte_buf: [u8; 10],
a_complex_option: ::std::option::Option<&Vec<u8>>,
a_complex_vec: &::std::vec::Vec<&Option<u8>>,
a_uuid: ::uuid::Uuid,
}
};

Expand All @@ -1110,7 +1130,42 @@ mod test {
BasicType::BYTE_ARRAY,
BasicType::FIXED_LEN_BYTE_ARRAY,
BasicType::BYTE_ARRAY,
BasicType::INT32
BasicType::INT32,
BasicType::FIXED_LEN_BYTE_ARRAY,
]
)
}

#[test]
fn test_type_length() {
let snippet: proc_macro2::TokenStream = quote! {
struct LotsOfInnerTypes {
a_buf: ::std::vec::Vec<u8>,
a_number: i32,
a_verbose_option: ::std::option::Option<bool>,
a_silly_string: String,
a_fix_byte_buf: [u8; 10],
a_complex_option: ::std::option::Option<&Vec<u8>>,
a_complex_vec: &::std::vec::Vec<&Option<u8>>,
a_uuid: ::uuid::Uuid,
}
};

let fields = extract_fields(snippet);
let converted_fields: Vec<_> = fields.iter().map(Type::from).collect();
let lengths: Vec<_> = converted_fields.iter().map(|ty| ty.length()).collect();

assert_eq!(
lengths,
vec![
None,
None,
None,
None,
Some(syn::parse_quote!(10)),
None,
None,
Some(syn::parse_quote!(16)),
]
)
}
Expand Down Expand Up @@ -1328,8 +1383,8 @@ mod test {
let when = Field::from(&fields[0]);
assert_eq!(when.writer_snippet().to_string(),(quote!{
{
let vals : Vec<_> = records.iter().map(|rec| (&rec.unique_id.to_string()[..]).into() ).collect();
if let ColumnWriter::ByteArrayColumnWriter(ref mut typed) = column_writer.untyped() {
let vals : Vec<_> = records.iter().map(|rec| rec.unique_id.as_bytes().to_vec().into() ).collect();
if let ColumnWriter::FixedLenByteArrayColumnWriter(ref mut typed) = column_writer.untyped() {
typed.write_batch(&vals[..], None, None) ?;
} else {
panic!("Schema and struct disagree on type for {}" , stringify!{ unique_id })
Expand All @@ -1349,7 +1404,7 @@ mod test {
}
}).collect();

if let ColumnWriter::ByteArrayColumnWriter(ref mut typed) = column_writer.untyped() {
if let ColumnWriter::FixedLenByteArrayColumnWriter(ref mut typed) = column_writer.untyped() {
typed.write_batch(&vals[..], Some(&definition_levels[..]), None) ?;
} else {
panic!("Schema and struct disagree on type for {}" , stringify!{ maybe_unique_id })
Expand All @@ -1371,13 +1426,13 @@ mod test {
assert_eq!(when.reader_snippet().to_string(),(quote!{
{
let mut vals = Vec::new();
if let ColumnReader::ByteArrayColumnReader(mut typed) = column_reader {
if let ColumnReader::FixedLenByteArrayColumnReader(mut typed) = column_reader {
typed.read_records(num_records, None, None, &mut vals)?;
} else {
panic!("Schema and struct disagree on type for {}", stringify!{ unique_id });
}
for (i, r) in &mut records[..num_records].iter_mut().enumerate() {
r.unique_id = ::uuid::Uuid::parse_str(vals[i].data().convert()).unwrap();
r.unique_id = ::uuid::Uuid::from_bytes(vals[i].data().try_into().unwrap());
}
}
}).to_string());
Expand Down
1 change: 1 addition & 0 deletions parquet_derive_test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@ rust-version = { workspace = true }
parquet = { workspace = true }
parquet_derive = { path = "../parquet_derive", default-features = false }
chrono = { workspace = true }
uuid = { version = "1", features = ["v4"] }
5 changes: 5 additions & 0 deletions parquet_derive_test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ struct ACompleteRecord<'a> {
pub borrowed_maybe_a_string: &'a Option<String>,
pub borrowed_maybe_a_str: &'a Option<&'a str>,
pub now: chrono::NaiveDateTime,
pub uuid: uuid::Uuid,
pub byte_vec: Vec<u8>,
pub maybe_byte_vec: Option<Vec<u8>>,
pub borrowed_byte_vec: &'a [u8],
Expand All @@ -61,6 +62,7 @@ struct APartiallyCompleteRecord {
pub double: f64,
pub now: chrono::NaiveDateTime,
pub date: chrono::NaiveDate,
pub uuid: uuid::Uuid,
pub byte_vec: Vec<u8>,
}

Expand Down Expand Up @@ -105,6 +107,7 @@ mod tests {
OPTIONAL BINARY borrowed_maybe_a_string (STRING);
OPTIONAL BINARY borrowed_maybe_a_str (STRING);
REQUIRED INT64 now (TIMESTAMP_MILLIS);
REQUIRED FIXED_LEN_BYTE_ARRAY (16) uuid (UUID);
REQUIRED BINARY byte_vec;
OPTIONAL BINARY maybe_byte_vec;
REQUIRED BINARY borrowed_byte_vec;
Expand Down Expand Up @@ -144,6 +147,7 @@ mod tests {
borrowed_maybe_a_string: &maybe_a_string,
borrowed_maybe_a_str: &maybe_a_str,
now: chrono::Utc::now().naive_local(),
uuid: uuid::Uuid::new_v4(),
byte_vec: vec![0x65, 0x66, 0x67],
maybe_byte_vec: Some(vec![0x88, 0x89, 0x90]),
borrowed_byte_vec: &borrowed_byte_vec,
Expand Down Expand Up @@ -179,6 +183,7 @@ mod tests {
double: std::f64::NAN,
now: chrono::Utc::now().naive_local(),
date: chrono::naive::NaiveDate::from_ymd_opt(2015, 3, 14).unwrap(),
uuid: uuid::Uuid::new_v4(),
byte_vec: vec![0x65, 0x66, 0x67],
}];

Expand Down
Loading