Skip to content

Commit

Permalink
AVRO-3926: [Rust] Allow UUID to serialize to Fixed[16] (#2676)
Browse files Browse the repository at this point in the history
Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
  • Loading branch information
martin-g authored Feb 5, 2024
1 parent b2afd2e commit e6d495f
Show file tree
Hide file tree
Showing 6 changed files with 200 additions and 34 deletions.
4 changes: 2 additions & 2 deletions lang/rust/avro/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -428,8 +428,8 @@ fn main() -> Result<(), Error> {

`apache-avro` also supports the logical types listed in the [Avro specification](https://avro.apache.org/docs/current/spec.html#Logical+Types):

1. `Decimal` using the [`num_bigint`](https://docs.rs/num-bigint/0.2.6/num_bigint) crate
1. UUID using the [`uuid`](https://docs.rs/uuid/1.0.0/uuid) crate
1. `Decimal` using the [`num_bigint`](https://docs.rs/num-bigint/latest/num_bigint) crate
1. UUID using the [`uuid`](https://docs.rs/uuid/latest/uuid) crate
1. Date, Time (milli) as `i32` and Time (micro) as `i64`
1. Timestamp (milli and micro) as `i64`
1. Local timestamp (milli and micro) as `i64`
Expand Down
103 changes: 94 additions & 9 deletions lang/rust/avro/src/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::{
bigdecimal::deserialize_big_decimal,
decimal::Decimal,
duration::Duration,
encode::encode_long,
schema::{
DecimalSchema, EnumSchema, FixedSchema, Name, Namespace, RecordSchema, ResolvedSchema,
Schema,
Expand Down Expand Up @@ -121,15 +122,60 @@ pub(crate) fn decode_internal<R: Read, S: Borrow<Schema>>(
value => Err(Error::BytesValue(value.into())),
}
}
Schema::Uuid => Ok(Value::Uuid(
Uuid::from_str(
match decode_internal(&Schema::String, names, enclosing_namespace, reader)? {
Value::String(ref s) => s,
value => return Err(Error::GetUuidFromStringValue(value.into())),
},
)
.map_err(Error::ConvertStrToUuid)?,
)),
Schema::Uuid => {
let len = decode_len(reader)?;
let mut bytes = vec![0u8; len];
reader.read_exact(&mut bytes).map_err(Error::ReadIntoBuf)?;

// use a Vec to be able re-read the bytes more than once if needed
let mut reader = Vec::with_capacity(len + 1);
encode_long(len as i64, &mut reader);
reader.extend_from_slice(&bytes);

let decode_from_string = |reader| match decode_internal(
&Schema::String,
names,
enclosing_namespace,
reader,
)? {
Value::String(ref s) => Uuid::from_str(s).map_err(Error::ConvertStrToUuid),
value => Err(Error::GetUuidFromStringValue(value.into())),
};

let uuid: Uuid = if len == 16 {
// most probably a Fixed schema
let fixed_result = decode_internal(
&Schema::Fixed(FixedSchema {
size: 16,
name: "uuid".into(),
aliases: None,
doc: None,
attributes: Default::default(),
}),
names,
enclosing_namespace,
&mut bytes.as_slice(),
);
if fixed_result.is_ok() {
match fixed_result? {
Value::Fixed(ref size, ref bytes) => {
if *size != 16 {
return Err(Error::ConvertFixedToUuid(*size));
}
Uuid::from_slice(bytes).map_err(Error::ConvertSliceToUuid)?
}
_ => decode_from_string(&mut reader.as_slice())?,
}
} else {
// try to decode as string
decode_from_string(&mut reader.as_slice())?
}
} else {
// definitely a string
decode_from_string(&mut reader.as_slice())?
};
Ok(Value::Uuid(uuid))
}
Schema::Int => decode_int(reader),
Schema::Date => zag_i32(reader).map(Value::Date),
Schema::TimeMillis => zag_i32(reader).map(Value::TimeMillis),
Expand Down Expand Up @@ -323,6 +369,7 @@ mod tests {
use apache_avro_test_helper::TestResult;
use pretty_assertions::assert_eq;
use std::collections::HashMap;
use uuid::Uuid;

#[test]
fn test_decode_array_without_size() -> TestResult {
Expand Down Expand Up @@ -821,4 +868,42 @@ mod tests {

Ok(())
}

#[test]
fn avro_3926_encode_decode_uuid_to_string() -> TestResult {
use crate::encode::encode;

let schema = Schema::String;
let value = Value::Uuid(Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000")?);

let mut buffer = Vec::new();
encode(&value, &schema, &mut buffer).expect(&success(&value, &schema));

let result = decode(&Schema::Uuid, &mut &buffer[..])?;
assert_eq!(result, value);

Ok(())
}

#[test]
fn avro_3926_encode_decode_uuid_to_fixed() -> TestResult {
use crate::encode::encode;

let schema = Schema::Fixed(FixedSchema {
size: 16,
name: "uuid".into(),
aliases: None,
doc: None,
attributes: Default::default(),
});
let value = Value::Uuid(Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000")?);

let mut buffer = Vec::new();
encode(&value, &schema, &mut buffer).expect(&success(&value, &schema));

let result = decode(&Schema::Uuid, &mut &buffer[..])?;
assert_eq!(result, value);

Ok(())
}
}
52 changes: 46 additions & 6 deletions lang/rust/avro/src/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,28 @@ pub(crate) fn encode_internal<S: Borrow<Schema>>(
let slice: [u8; 12] = duration.into();
buffer.extend_from_slice(&slice);
}
Value::Uuid(uuid) => encode_bytes(
// we need the call .to_string() to properly convert ASCII to UTF-8
#[allow(clippy::unnecessary_to_owned)]
&uuid.to_string(),
buffer,
),
Value::Uuid(uuid) => match *schema {
Schema::Uuid | Schema::String => encode_bytes(
// we need the call .to_string() to properly convert ASCII to UTF-8
#[allow(clippy::unnecessary_to_owned)]
&uuid.to_string(),
buffer,
),
Schema::Fixed(FixedSchema { size, .. }) => {
if size != 16 {
return Err(Error::ConvertFixedToUuid(size));
}

let bytes = uuid.as_bytes();
encode_bytes(bytes, buffer)
}
_ => {
return Err(Error::EncodeValueAsSchemaError {
value_kind: ValueKind::Uuid,
supported_schema: vec![SchemaKind::Uuid, SchemaKind::Fixed],
});
}
},
Value::BigDecimal(bg) => {
let buf: Vec<u8> = serialize_big_decimal(bg);
buffer.extend_from_slice(buf.as_slice());
Expand Down Expand Up @@ -293,8 +309,10 @@ pub fn encode_to_vec(value: &Value, schema: &Schema) -> AvroResult<Vec<u8>> {
#[allow(clippy::expect_fun_call)]
pub(crate) mod tests {
use super::*;
use apache_avro_test_helper::TestResult;
use pretty_assertions::assert_eq;
use std::collections::HashMap;
use uuid::Uuid;

pub(crate) fn success(value: &Value, schema: &Schema) -> String {
format!(
Expand Down Expand Up @@ -880,4 +898,26 @@ pub(crate) mod tests {
assert!(encoded.is_ok());
assert!(!buffer.is_empty());
}

#[test]
fn avro_3926_encode_decode_uuid_to_fixed_wrong_schema_size() -> TestResult {
let schema = Schema::Fixed(FixedSchema {
size: 15,
name: "uuid".into(),
aliases: None,
doc: None,
attributes: Default::default(),
});
let value = Value::Uuid(Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000")?);

let mut buffer = Vec::new();
match encode(&value, &schema, &mut buffer) {
Err(Error::ConvertFixedToUuid(actual)) => {
assert_eq!(actual, 15);
}
_ => panic!("Expected Error::ConvertFixedToUuid"),
}

Ok(())
}
}
6 changes: 6 additions & 0 deletions lang/rust/avro/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ pub enum Error {
#[error("Failed to convert &str to UUID")]
ConvertStrToUuid(#[source] uuid::Error),

#[error("Failed to convert Fixed bytes to UUID. It must be exactly 16 bytes, got {0}")]
ConvertFixedToUuid(usize),

#[error("Failed to convert Fixed bytes to UUID")]
ConvertSliceToUuid(#[source] uuid::Error),

#[error("Map key is not a string; key type is {0:?}")]
MapKeyType(ValueKind),

Expand Down
4 changes: 2 additions & 2 deletions lang/rust/avro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,8 +541,8 @@
//!
//! `apache-avro` also supports the logical types listed in the [Avro specification](https://avro.apache.org/docs/current/spec.html#Logical+Types):
//!
//! 1. `Decimal` using the [`num_bigint`](https://docs.rs/num-bigint/0.2.6/num_bigint) crate
//! 1. UUID using the [`uuid`](https://docs.rs/uuid/1.0.0/uuid) crate
//! 1. `Decimal` using the [`num_bigint`](https://docs.rs/num-bigint/latest/num_bigint) crate
//! 1. UUID using the [`uuid`](https://docs.rs/uuid/latest/uuid) crate
//! 1. Date, Time (milli) as `i32` and Time (micro) as `i64`
//! 1. Timestamp (milli and micro) as `i64`
//! 1. Local timestamp (milli and micro) as `i64`
Expand Down
65 changes: 50 additions & 15 deletions lang/rust/avro/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1419,8 +1419,22 @@ impl Parser {
return try_convert_to_logical_type(
"uuid",
parse_as_native_complex(complex, self, enclosing_namespace)?,
&[SchemaKind::String],
|_| -> AvroResult<Schema> { Ok(Schema::Uuid) },
&[SchemaKind::String, SchemaKind::Fixed],
|schema| match schema {
Schema::String => Ok(Schema::Uuid),
Schema::Fixed(FixedSchema { size: 16, .. }) => Ok(Schema::Uuid),
Schema::Fixed(FixedSchema { size, .. }) => {
warn!("Ignoring uuid logical type for a Fixed schema because its size ({size:?}) is not 16! Schema: {:?}", schema);
Ok(schema)
}
_ => {
warn!(
"Ignoring invalid uuid logical type for schema: {:?}",
schema
);
Ok(schema)
}
},
);
}
"date" => {
Expand Down Expand Up @@ -2386,8 +2400,10 @@ pub mod derive {
#[cfg(test)]
mod tests {
use super::*;
use apache_avro_test_helper::TestResult;
use pretty_assertions::assert_eq;
use apache_avro_test_helper::{
logger::{assert_logged, assert_not_logged},
TestResult,
};
use serde_json::json;

#[test]
Expand Down Expand Up @@ -6298,7 +6314,7 @@ mod tests {
}

#[test]
fn test_avro_3896_uuid_schema() -> TestResult {
fn avro_3896_uuid_schema_for_string() -> TestResult {
// string uuid, represents as native logical type.
let schema = json!(
{
Expand All @@ -6309,8 +6325,11 @@ mod tests {
let parse_result = Schema::parse(&schema)?;
assert_eq!(parse_result, Schema::Uuid);

// uuid logical type is not supported for SchemaKind::Fixed, so it is parsed as Schema::Fixed
// and the `logicalType` is preserved as an attribute.
Ok(())
}

#[test]
fn avro_3926_uuid_schema_for_fixed_with_size_16() -> TestResult {
let schema = json!(
{
"type": "fixed",
Expand All @@ -6319,19 +6338,37 @@ mod tests {
"logicalType": "uuid"
});
let parse_result = Schema::parse(&schema)?;
assert_eq!(parse_result, Schema::Uuid);
assert_not_logged(
r#"Ignoring uuid logical type for a Fixed schema because its size (6) is not 16! Schema: Fixed(FixedSchema { name: Name { name: "FixedUUID", namespace: None }, aliases: None, doc: None, size: 6, attributes: {"logicalType": String("uuid")} })"#,
);

Ok(())
}

#[test]
fn avro_3926_uuid_schema_for_fixed_with_size_different_than_16() -> TestResult {
let schema = json!(
{
"type": "fixed",
"name": "FixedUUID",
"size": 6,
"logicalType": "uuid"
});
let parse_result = Schema::parse(&schema)?;
assert_eq!(
parse_result,
Schema::Fixed(FixedSchema {
name: Name::new("FixedUUID")?,
doc: None,
aliases: None,
size: 16,
attributes: BTreeMap::from([(
"logicalType".to_string(),
Value::String(String::from("uuid")),
)]),
doc: None,
size: 6,
attributes: BTreeMap::from([("logicalType".to_string(), "uuid".into())]),
})
);
assert_logged(
r#"Ignoring uuid logical type for a Fixed schema because its size (6) is not 16! Schema: Fixed(FixedSchema { name: Name { name: "FixedUUID", namespace: None }, aliases: None, doc: None, size: 6, attributes: {"logicalType": String("uuid")} })"#,
);

Ok(())
}
Expand Down Expand Up @@ -6379,8 +6416,6 @@ mod tests {

#[test]
fn test_avro_3899_parse_decimal_type() -> TestResult {
use apache_avro_test_helper::logger::{assert_logged, assert_not_logged};

let schema = Schema::parse_str(
r#"{
"name": "InvalidDecimal",
Expand Down

0 comments on commit e6d495f

Please sign in to comment.