From 81d4f82b64d9658a4af86a8f30cb43db864467bc Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 30 Oct 2023 19:02:15 +0000 Subject: [PATCH 1/6] Map AvroSchema to Arrow (#4886) --- arrow-avro/src/codec.rs | 292 ++++++++++++++++++++++++++++++++ arrow-avro/src/lib.rs | 2 + arrow-avro/src/reader/header.rs | 31 +++- arrow-avro/src/schema.rs | 30 +++- 4 files changed, 349 insertions(+), 6 deletions(-) create mode 100644 arrow-avro/src/codec.rs diff --git a/arrow-avro/src/codec.rs b/arrow-avro/src/codec.rs new file mode 100644 index 000000000000..cc66021ba8d9 --- /dev/null +++ b/arrow-avro/src/codec.rs @@ -0,0 +1,292 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::schema::{ComplexType, PrimitiveType, Record, Schema, TypeName}; +use arrow_schema::{ + ArrowError, DataType, Field, FieldRef, IntervalUnit, SchemaBuilder, SchemaRef, TimeUnit, +}; +use std::borrow::Cow; +use std::collections::HashMap; +use std::sync::Arc; + +#[derive(Debug, Copy, Clone)] +enum Nulls { + NullFirst, + NullSecond, +} + +/// An Avro field mapped to the arrow data model +#[derive(Debug, Clone)] +pub struct AvroField { + nulls: Option, + meta: Arc, +} + +#[derive(Debug, Clone)] +struct AvroFieldMeta { + name: String, + metadata: HashMap, + codec: Codec, +} + +impl AvroField { + /// Returns the arrow [`Field`] + pub fn field(&self) -> Field { + let d = self.meta.codec.data_type(); + Field::new(&self.meta.name, d, self.nulls.is_some()) + .with_metadata(self.meta.metadata.clone()) + } + + /// Returns the [`Codec`] + pub fn codec(&self) -> &Codec { + &self.meta.codec + } +} + +impl<'a> TryFrom<&Schema<'a>> for AvroField { + type Error = ArrowError; + + fn try_from(schema: &Schema<'a>) -> Result { + let mut resolver = Resolver::default(); + make_field(schema, "item", None, &mut resolver) + } +} + +#[derive(Debug, Clone)] +pub enum Codec { + Null, + Boolean, + Int32, + Int64, + Float32, + Float64, + Binary, + Utf8, + Date32, + TimeMillis, + TimeMicros, + /// Timestamp (is_utc) + TimestampMillis(bool), + TimestampMicros(bool), + Fixed(i32), + List(Arc), + Struct(Arc<[AvroField]>), + Duration, +} + +impl Codec { + fn data_type(&self) -> DataType { + match self { + Self::Null => DataType::Null, + Self::Boolean => DataType::Boolean, + Self::Int32 => DataType::Int32, + Self::Int64 => DataType::Int64, + Self::Float32 => DataType::Float32, + Self::Float64 => DataType::Float64, + Self::Binary => DataType::Binary, + Self::Utf8 => DataType::Utf8, + Self::Date32 => DataType::Date32, + Self::TimeMillis => DataType::Time32(TimeUnit::Millisecond), + Self::TimeMicros => DataType::Time64(TimeUnit::Microsecond), + Self::TimestampMillis(is_utc) => { + DataType::Timestamp(TimeUnit::Millisecond, is_utc.then(|| "+00:00".into())) + } + Self::TimestampMicros(is_utc) => { + DataType::Timestamp(TimeUnit::Microsecond, is_utc.then(|| "+00:00".into())) + } + Self::Duration => DataType::Interval(IntervalUnit::MonthDayNano), + Self::Fixed(size) => DataType::FixedSizeBinary(*size), + Self::List(f) => DataType::List(Arc::new(f.field())), + Self::Struct(f) => DataType::Struct(f.iter().map(|x| x.field()).collect()), + } + } +} + +impl From for Codec { + fn from(value: PrimitiveType) -> Self { + match value { + PrimitiveType::Null => Self::Null, + PrimitiveType::Boolean => Self::Boolean, + PrimitiveType::Int => Self::Int32, + PrimitiveType::Long => Self::Int64, + PrimitiveType::Float => Self::Float32, + PrimitiveType::Double => Self::Float64, + PrimitiveType::Bytes => Self::Binary, + PrimitiveType::String => Self::Utf8, + } + } +} + +#[derive(Debug, Default)] +struct Resolver<'a> { + map: HashMap<(&'a str, &'a str), AvroField>, +} + +impl<'a> Resolver<'a> { + fn register(&mut self, name: &'a str, namespace: Option<&'a str>, schema: AvroField) { + self.map.insert((name, namespace.unwrap_or("")), schema); + } + + fn resolve(&self, name: &str, namespace: Option<&'a str>) -> Result { + let (namespace, name) = name + .rsplit_once('.') + .unwrap_or_else(|| (namespace.unwrap_or(""), name)); + + self.map + .get(&(namespace, name)) + .ok_or_else(|| ArrowError::ParseError(format!("Failed to resolve {namespace}.{name}"))) + .cloned() + } +} + +fn make_field<'a>( + schema: &Schema<'a>, + name: &'a str, + namespace: Option<&'a str>, + resolver: &mut Resolver<'a>, +) -> Result { + match schema { + Schema::TypeName(TypeName::Primitive(p)) => Ok(AvroField { + nulls: None, + meta: Arc::new(AvroFieldMeta { + name: name.to_string(), + metadata: Default::default(), + codec: (*p).into(), + }), + }), + Schema::TypeName(TypeName::Ref(name)) => resolver.resolve(name, namespace), + Schema::Union(f) => { + let null = f + .iter() + .position(|x| x == &Schema::TypeName(TypeName::Primitive(PrimitiveType::Null))); + match (f.len() == 2, null) { + (true, Some(0)) => { + let mut field = make_field(&f[1], name, namespace, resolver)?; + field.nulls = Some(Nulls::NullFirst); + Ok(field) + } + (true, Some(1)) => { + let mut field = make_field(&f[0], name, namespace, resolver)?; + field.nulls = Some(Nulls::NullSecond); + Ok(field) + } + _ => Err(ArrowError::NotYetImplemented(format!( + "Union of {f:?} not currently supported" + ))), + } + } + Schema::Complex(c) => match c { + ComplexType::Record(r) => { + let namespace = r.namespace.or(namespace); + let fields = r + .fields + .iter() + .map(|field| make_field(&field.r#type, field.name, namespace, resolver)) + .collect::>()?; + + let field = AvroField { + nulls: None, + meta: Arc::new(AvroFieldMeta { + name: r.name.to_string(), + codec: Codec::Struct(fields), + metadata: extract_metadata(&r.attributes.additional), + }), + }; + resolver.register(name, namespace, field.clone()); + Ok(field) + } + ComplexType::Array(a) => { + let mut field = make_field(a.items.as_ref(), "item", namespace, resolver)?; + Ok(AvroField { + nulls: None, + meta: Arc::new(AvroFieldMeta { + name: name.to_string(), + metadata: extract_metadata(&a.attributes.additional), + codec: Codec::List(Arc::new(field)), + }), + }) + } + ComplexType::Fixed(f) => { + let size = f.size.try_into().map_err(|e| { + ArrowError::ParseError(format!("Overflow converting size to i32: {e}")) + })?; + + let field = AvroField { + nulls: None, + meta: Arc::new(AvroFieldMeta { + name: f.name.to_string(), + metadata: extract_metadata(&f.attributes.additional), + codec: Codec::Fixed(size), + }), + }; + resolver.register(f.name, namespace, field.clone()); + Ok(field) + } + ComplexType::Enum(e) => Err(ArrowError::NotYetImplemented(format!( + "Enum of {e:?} not currently supported" + ))), + ComplexType::Map(m) => Err(ArrowError::NotYetImplemented(format!( + "Map of {m:?} not currently supported" + ))), + }, + Schema::Type(t) => { + let mut field = make_field( + &Schema::TypeName(t.r#type.clone()), + name, + namespace, + resolver, + )?; + let meta = Arc::make_mut(&mut field.meta); + + // https://avro.apache.org/docs/1.11.1/specification/#logical-types + match (t.attributes.logical_type, &mut meta.codec) { + (Some("decimal"), c @ Codec::Fixed(_)) => { + return Err(ArrowError::NotYetImplemented(format!( + "Decimals are not currently supported" + ))) + } + (Some("date"), c @ Codec::Int32) => *c = Codec::Date32, + (Some("time-millis"), c @ Codec::Int32) => *c = Codec::TimeMillis, + (Some("time-micros"), c @ Codec::Int64) => *c = Codec::TimeMicros, + (Some("timestamp-millis"), c @ Codec::Int64) => *c = Codec::TimestampMillis(true), + (Some("timestamp-micros"), c @ Codec::Int64) => *c = Codec::TimestampMicros(true), + (Some("local-timestamp-millis"), c @ Codec::Int64) => { + *c = Codec::TimestampMillis(false) + } + (Some("local-timestamp-micros"), c @ Codec::Int64) => { + *c = Codec::TimestampMicros(false) + } + (Some("duration"), c @ Codec::Fixed(12)) => *c = Codec::Duration, + _ => {} + } + + if !t.attributes.additional.is_empty() { + for (k, v) in &t.attributes.additional { + meta.metadata.insert(k.to_string(), v.to_string()); + } + } + Ok(field) + } + } +} + +fn extract_metadata(metadata: &HashMap<&str, serde_json::Value>) -> HashMap { + metadata + .iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect() +} diff --git a/arrow-avro/src/lib.rs b/arrow-avro/src/lib.rs index c76ecb399a45..f74edd7e9260 100644 --- a/arrow-avro/src/lib.rs +++ b/arrow-avro/src/lib.rs @@ -27,6 +27,8 @@ mod schema; mod compression; +mod codec; + #[cfg(test)] mod test_util { pub fn arrow_test_data(path: &str) -> String { diff --git a/arrow-avro/src/reader/header.rs b/arrow-avro/src/reader/header.rs index 00e85b39be73..c2171cccf301 100644 --- a/arrow-avro/src/reader/header.rs +++ b/arrow-avro/src/reader/header.rs @@ -236,9 +236,11 @@ impl HeaderDecoder { #[cfg(test)] mod test { use super::*; + use crate::codec::AvroField; use crate::reader::read_header; use crate::schema::SCHEMA_METADATA_KEY; use crate::test_util::arrow_test_data; + use arrow_schema::{DataType, Field, Fields, TimeUnit}; use std::fs::File; use std::io::{BufRead, BufReader}; @@ -269,7 +271,34 @@ mod test { let schema_json = header.get(SCHEMA_METADATA_KEY).unwrap(); let expected = br#"{"type":"record","name":"topLevelRecord","fields":[{"name":"id","type":["int","null"]},{"name":"bool_col","type":["boolean","null"]},{"name":"tinyint_col","type":["int","null"]},{"name":"smallint_col","type":["int","null"]},{"name":"int_col","type":["int","null"]},{"name":"bigint_col","type":["long","null"]},{"name":"float_col","type":["float","null"]},{"name":"double_col","type":["double","null"]},{"name":"date_string_col","type":["bytes","null"]},{"name":"string_col","type":["bytes","null"]},{"name":"timestamp_col","type":[{"type":"long","logicalType":"timestamp-micros"},"null"]}]}"#; assert_eq!(schema_json, expected); - let _schema: Schema<'_> = serde_json::from_slice(schema_json).unwrap(); + let schema: Schema<'_> = serde_json::from_slice(schema_json).unwrap(); + let field = AvroField::try_from(&schema).unwrap(); + + assert_eq!( + field.field(), + Field::new( + "topLevelRecord", + DataType::Struct(Fields::from(vec![ + Field::new("id", DataType::Int32, true), + Field::new("bool_col", DataType::Boolean, true), + Field::new("tinyint_col", DataType::Int32, true), + Field::new("smallint_col", DataType::Int32, true), + Field::new("int_col", DataType::Int32, true), + Field::new("bigint_col", DataType::Int64, true), + Field::new("float_col", DataType::Float32, true), + Field::new("double_col", DataType::Float64, true), + Field::new("date_string_col", DataType::Binary, true), + Field::new("string_col", DataType::Binary, true), + Field::new( + "timestamp_col", + DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())), + true + ), + ])), + false + ) + ); + assert_eq!( u128::from_le_bytes(header.sync()), 226966037233754408753420635932530907102 diff --git a/arrow-avro/src/schema.rs b/arrow-avro/src/schema.rs index 17b82cf861b7..80148b6f494f 100644 --- a/arrow-avro/src/schema.rs +++ b/arrow-avro/src/schema.rs @@ -34,7 +34,7 @@ pub enum TypeName<'a> { /// A primitive type /// /// -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub enum PrimitiveType { Null, @@ -94,8 +94,6 @@ pub enum Schema<'a> { #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[serde(tag = "type", rename_all = "camelCase")] pub enum ComplexType<'a> { - #[serde(borrow)] - Union(Vec>), #[serde(borrow)] Record(Record<'a>), #[serde(borrow)] @@ -202,7 +200,10 @@ pub struct Fixed<'a> { #[cfg(test)] mod tests { use super::*; + use crate::codec::AvroField; + use arrow_schema::{DataType, Fields, TimeUnit}; use serde_json::json; + #[test] fn test_deserialize() { let t: Schema = serde_json::from_str("\"string\"").unwrap(); @@ -352,6 +353,10 @@ mod tests { })) ); + // Recursive schema are not supported + let err = AvroField::try_from(&schema).unwrap_err().to_string(); + assert_eq!(err, "Parser error: Failed to resolve .LongList"); + let schema: Schema = serde_json::from_str( r#"{ "type":"record", @@ -409,14 +414,29 @@ mod tests { attributes: Default::default(), })) ); + let codec = AvroField::try_from(&schema).unwrap(); + assert_eq!( + codec.field(), + arrow_schema::Field::new( + "topLevelRecord", + DataType::Struct(Fields::from(vec![ + arrow_schema::Field::new("id", DataType::Int32, true), + arrow_schema::Field::new( + "timestamp_col", + DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())), + true + ), + ])), + false + ) + ); let schema: Schema = serde_json::from_str( r#"{ "type": "record", "name": "HandshakeRequest", "namespace":"org.apache.avro.ipc", "fields": [ - {"name": "clientHash", - "type": {"type": "fixed", "name": "MD5", "size": 16}}, + {"name": "clientHash", "type": {"type": "fixed", "name": "MD5", "size": 16}}, {"name": "clientProtocol", "type": ["null", "string"]}, {"name": "serverHash", "type": "MD5"}, {"name": "meta", "type": ["null", {"type": "map", "values": "bytes"}]} From cae6dc59f2cbe9fee128170244b18ec3d90c9b6d Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 30 Oct 2023 19:07:38 +0000 Subject: [PATCH 2/6] Docs --- arrow-avro/src/codec.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/arrow-avro/src/codec.rs b/arrow-avro/src/codec.rs index cc66021ba8d9..5c79b25468b6 100644 --- a/arrow-avro/src/codec.rs +++ b/arrow-avro/src/codec.rs @@ -23,9 +23,16 @@ use std::borrow::Cow; use std::collections::HashMap; use std::sync::Arc; +/// Avro types are not nullable, with nullability instead encoded as a union +/// where one of the variants is the null type. +/// +/// To accommodate this we special case two-variant unions where one of the +/// variants is the null type, and use this to derive arrow's notion of nullability #[derive(Debug, Copy, Clone)] enum Nulls { + /// The nulls are encoded as the first union variant NullFirst, + /// The nulls are encoded as the second union variant NullSecond, } From 0a4eb777be4868de95a41968e549c99a284409b9 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 30 Oct 2023 19:22:31 +0000 Subject: [PATCH 3/6] Clippy --- arrow-avro/src/codec.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/arrow-avro/src/codec.rs b/arrow-avro/src/codec.rs index 5c79b25468b6..eff13f2bf44e 100644 --- a/arrow-avro/src/codec.rs +++ b/arrow-avro/src/codec.rs @@ -262,9 +262,9 @@ fn make_field<'a>( // https://avro.apache.org/docs/1.11.1/specification/#logical-types match (t.attributes.logical_type, &mut meta.codec) { (Some("decimal"), c @ Codec::Fixed(_)) => { - return Err(ArrowError::NotYetImplemented(format!( - "Decimals are not currently supported" - ))) + return Err(ArrowError::NotYetImplemented( + "Decimals are not currently supported".to_string(), + )) } (Some("date"), c @ Codec::Int32) => *c = Codec::Date32, (Some("time-millis"), c @ Codec::Int32) => *c = Codec::TimeMillis, From af9fa6cbf0ff583eaef7c0a5ed9c821f7e2a7e0b Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 30 Oct 2023 19:24:11 +0000 Subject: [PATCH 4/6] Add unrecognized logicalType to metadata --- arrow-avro/src/codec.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/arrow-avro/src/codec.rs b/arrow-avro/src/codec.rs index eff13f2bf44e..b429a722992a 100644 --- a/arrow-avro/src/codec.rs +++ b/arrow-avro/src/codec.rs @@ -278,7 +278,11 @@ fn make_field<'a>( *c = Codec::TimestampMicros(false) } (Some("duration"), c @ Codec::Fixed(12)) => *c = Codec::Duration, - _ => {} + (Some(logical), _) => { + // Insert unrecognized logical type into metadata map + meta.metadata.insert("logicalType".into(), logical.into()); + } + (None, _) => {} } if !t.attributes.additional.is_empty() { From f839970ce82cb633e7018fcb57569e95e37d6935 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Thu, 11 Jan 2024 14:21:13 +0000 Subject: [PATCH 5/6] More docs --- arrow-avro/src/codec.rs | 31 +++++++++++++++++++------------ arrow-avro/src/schema.rs | 10 ++++++++++ 2 files changed, 29 insertions(+), 12 deletions(-) diff --git a/arrow-avro/src/codec.rs b/arrow-avro/src/codec.rs index b429a722992a..db552c373fcb 100644 --- a/arrow-avro/src/codec.rs +++ b/arrow-avro/src/codec.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::schema::{ComplexType, PrimitiveType, Record, Schema, TypeName}; +use crate::schema::{Attributes, ComplexType, PrimitiveType, Record, Schema, TypeName}; use arrow_schema::{ ArrowError, DataType, Field, FieldRef, IntervalUnit, SchemaBuilder, SchemaRef, TimeUnit, }; @@ -73,6 +73,9 @@ impl<'a> TryFrom<&Schema<'a>> for AvroField { } } +/// An Avro encoding +/// +/// #[derive(Debug, Clone)] pub enum Codec { Null, @@ -86,8 +89,9 @@ pub enum Codec { Date32, TimeMillis, TimeMicros, - /// Timestamp (is_utc) + /// TimestampMillis(is_utc) TimestampMillis(bool), + /// TimestampMicros(is_utc) TimestampMicros(bool), Fixed(i32), List(Arc), @@ -138,6 +142,9 @@ impl From for Codec { } } +/// Resolves Avro type names to [`AvroField`] +/// +/// See #[derive(Debug, Default)] struct Resolver<'a> { map: HashMap<(&'a str, &'a str), AvroField>, @@ -160,6 +167,12 @@ impl<'a> Resolver<'a> { } } +/// Parses a [`AvroField`] from the provided [`Schema`] and the given `name` and `namespace` +/// +/// `name`: is name used to refer to `schema` in its parent +/// `namespace`: an optional qualifier used as part of a type hierarchy +/// +/// See [`Resolver`] for more information fn make_field<'a>( schema: &Schema<'a>, name: &'a str, @@ -177,6 +190,7 @@ fn make_field<'a>( }), Schema::TypeName(TypeName::Ref(name)) => resolver.resolve(name, namespace), Schema::Union(f) => { + // Special case the common case of nullable primitives let null = f .iter() .position(|x| x == &Schema::TypeName(TypeName::Primitive(PrimitiveType::Null))); @@ -210,7 +224,7 @@ fn make_field<'a>( meta: Arc::new(AvroFieldMeta { name: r.name.to_string(), codec: Codec::Struct(fields), - metadata: extract_metadata(&r.attributes.additional), + metadata: r.attributes.field_metadata(), }), }; resolver.register(name, namespace, field.clone()); @@ -222,7 +236,7 @@ fn make_field<'a>( nulls: None, meta: Arc::new(AvroFieldMeta { name: name.to_string(), - metadata: extract_metadata(&a.attributes.additional), + metadata: a.attributes.field_metadata(), codec: Codec::List(Arc::new(field)), }), }) @@ -236,7 +250,7 @@ fn make_field<'a>( nulls: None, meta: Arc::new(AvroFieldMeta { name: f.name.to_string(), - metadata: extract_metadata(&f.attributes.additional), + metadata: f.attributes.field_metadata(), codec: Codec::Fixed(size), }), }; @@ -294,10 +308,3 @@ fn make_field<'a>( } } } - -fn extract_metadata(metadata: &HashMap<&str, serde_json::Value>) -> HashMap { - metadata - .iter() - .map(|(k, v)| (k.to_string(), v.to_string())) - .collect() -} diff --git a/arrow-avro/src/schema.rs b/arrow-avro/src/schema.rs index 80148b6f494f..6f1f3d6bd012 100644 --- a/arrow-avro/src/schema.rs +++ b/arrow-avro/src/schema.rs @@ -64,6 +64,16 @@ pub struct Attributes<'a> { pub additional: HashMap<&'a str, serde_json::Value>, } +impl<'a> Attributes<'a> { + /// Returns the field metadata for this [`Attributes`] + pub(crate) fn field_metadata(&self) -> HashMap { + self.additional + .iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect() + } +} + /// A type definition that is not a variant of [`ComplexType`] #[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)] #[serde(rename_all = "camelCase")] From b024862f23cc482237631364658ffaa0b321c9af Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Thu, 11 Jan 2024 16:53:29 +0000 Subject: [PATCH 6/6] Split out AvroDataType --- arrow-avro/src/codec.rs | 129 +++++++++++++++++--------------- arrow-avro/src/reader/header.rs | 2 +- arrow-avro/src/schema.rs | 2 +- 3 files changed, 69 insertions(+), 64 deletions(-) diff --git a/arrow-avro/src/codec.rs b/arrow-avro/src/codec.rs index db552c373fcb..1e2acd99d828 100644 --- a/arrow-avro/src/codec.rs +++ b/arrow-avro/src/codec.rs @@ -36,31 +36,38 @@ enum Nulls { NullSecond, } -/// An Avro field mapped to the arrow data model +/// An Avro datatype mapped to the arrow data model #[derive(Debug, Clone)] -pub struct AvroField { +pub struct AvroDataType { nulls: Option, - meta: Arc, + metadata: HashMap, + codec: Codec, +} + +impl AvroDataType { + /// Returns an arrow [`Field`] with the given name + pub fn field_with_name(&self, name: &str) -> Field { + let d = self.codec.data_type(); + Field::new(name, d, self.nulls.is_some()).with_metadata(self.metadata.clone()) + } } +/// A named [`AvroDataType`] #[derive(Debug, Clone)] -struct AvroFieldMeta { +pub struct AvroField { name: String, - metadata: HashMap, - codec: Codec, + data_type: AvroDataType, } impl AvroField { /// Returns the arrow [`Field`] pub fn field(&self) -> Field { - let d = self.meta.codec.data_type(); - Field::new(&self.meta.name, d, self.nulls.is_some()) - .with_metadata(self.meta.metadata.clone()) + self.data_type.field_with_name(&self.name) } /// Returns the [`Codec`] pub fn codec(&self) -> &Codec { - &self.meta.codec + &self.data_type.codec } } @@ -68,8 +75,19 @@ impl<'a> TryFrom<&Schema<'a>> for AvroField { type Error = ArrowError; fn try_from(schema: &Schema<'a>) -> Result { - let mut resolver = Resolver::default(); - make_field(schema, "item", None, &mut resolver) + match schema { + Schema::Complex(ComplexType::Record(r)) => { + let mut resolver = Resolver::default(); + let data_type = make_data_type(schema, None, &mut resolver)?; + Ok(AvroField { + data_type, + name: r.name.to_string(), + }) + } + _ => Err(ArrowError::ParseError(format!( + "Expected record got {schema:?}" + ))), + } } } @@ -94,7 +112,7 @@ pub enum Codec { /// TimestampMicros(is_utc) TimestampMicros(bool), Fixed(i32), - List(Arc), + List(Arc), Struct(Arc<[AvroField]>), Duration, } @@ -121,7 +139,7 @@ impl Codec { } Self::Duration => DataType::Interval(IntervalUnit::MonthDayNano), Self::Fixed(size) => DataType::FixedSizeBinary(*size), - Self::List(f) => DataType::List(Arc::new(f.field())), + Self::List(f) => DataType::List(Arc::new(f.field_with_name("item"))), Self::Struct(f) => DataType::Struct(f.iter().map(|x| x.field()).collect()), } } @@ -142,20 +160,20 @@ impl From for Codec { } } -/// Resolves Avro type names to [`AvroField`] +/// Resolves Avro type names to [`AvroDataType`] /// /// See #[derive(Debug, Default)] struct Resolver<'a> { - map: HashMap<(&'a str, &'a str), AvroField>, + map: HashMap<(&'a str, &'a str), AvroDataType>, } impl<'a> Resolver<'a> { - fn register(&mut self, name: &'a str, namespace: Option<&'a str>, schema: AvroField) { + fn register(&mut self, name: &'a str, namespace: Option<&'a str>, schema: AvroDataType) { self.map.insert((name, namespace.unwrap_or("")), schema); } - fn resolve(&self, name: &str, namespace: Option<&'a str>) -> Result { + fn resolve(&self, name: &str, namespace: Option<&'a str>) -> Result { let (namespace, name) = name .rsplit_once('.') .unwrap_or_else(|| (namespace.unwrap_or(""), name)); @@ -167,26 +185,22 @@ impl<'a> Resolver<'a> { } } -/// Parses a [`AvroField`] from the provided [`Schema`] and the given `name` and `namespace` +/// Parses a [`AvroDataType`] from the provided [`Schema`] and the given `name` and `namespace` /// /// `name`: is name used to refer to `schema` in its parent /// `namespace`: an optional qualifier used as part of a type hierarchy /// /// See [`Resolver`] for more information -fn make_field<'a>( +fn make_data_type<'a>( schema: &Schema<'a>, - name: &'a str, namespace: Option<&'a str>, resolver: &mut Resolver<'a>, -) -> Result { +) -> Result { match schema { - Schema::TypeName(TypeName::Primitive(p)) => Ok(AvroField { + Schema::TypeName(TypeName::Primitive(p)) => Ok(AvroDataType { nulls: None, - meta: Arc::new(AvroFieldMeta { - name: name.to_string(), - metadata: Default::default(), - codec: (*p).into(), - }), + metadata: Default::default(), + codec: (*p).into(), }), Schema::TypeName(TypeName::Ref(name)) => resolver.resolve(name, namespace), Schema::Union(f) => { @@ -196,12 +210,12 @@ fn make_field<'a>( .position(|x| x == &Schema::TypeName(TypeName::Primitive(PrimitiveType::Null))); match (f.len() == 2, null) { (true, Some(0)) => { - let mut field = make_field(&f[1], name, namespace, resolver)?; + let mut field = make_data_type(&f[1], namespace, resolver)?; field.nulls = Some(Nulls::NullFirst); Ok(field) } (true, Some(1)) => { - let mut field = make_field(&f[0], name, namespace, resolver)?; + let mut field = make_data_type(&f[0], namespace, resolver)?; field.nulls = Some(Nulls::NullSecond); Ok(field) } @@ -216,29 +230,28 @@ fn make_field<'a>( let fields = r .fields .iter() - .map(|field| make_field(&field.r#type, field.name, namespace, resolver)) - .collect::>()?; + .map(|field| { + Ok(AvroField { + name: field.name.to_string(), + data_type: make_data_type(&field.r#type, namespace, resolver)?, + }) + }) + .collect::>()?; - let field = AvroField { + let field = AvroDataType { nulls: None, - meta: Arc::new(AvroFieldMeta { - name: r.name.to_string(), - codec: Codec::Struct(fields), - metadata: r.attributes.field_metadata(), - }), + codec: Codec::Struct(fields), + metadata: r.attributes.field_metadata(), }; - resolver.register(name, namespace, field.clone()); + resolver.register(r.name, namespace, field.clone()); Ok(field) } ComplexType::Array(a) => { - let mut field = make_field(a.items.as_ref(), "item", namespace, resolver)?; - Ok(AvroField { + let mut field = make_data_type(a.items.as_ref(), namespace, resolver)?; + Ok(AvroDataType { nulls: None, - meta: Arc::new(AvroFieldMeta { - name: name.to_string(), - metadata: a.attributes.field_metadata(), - codec: Codec::List(Arc::new(field)), - }), + metadata: a.attributes.field_metadata(), + codec: Codec::List(Arc::new(field)), }) } ComplexType::Fixed(f) => { @@ -246,13 +259,10 @@ fn make_field<'a>( ArrowError::ParseError(format!("Overflow converting size to i32: {e}")) })?; - let field = AvroField { + let field = AvroDataType { nulls: None, - meta: Arc::new(AvroFieldMeta { - name: f.name.to_string(), - metadata: f.attributes.field_metadata(), - codec: Codec::Fixed(size), - }), + metadata: f.attributes.field_metadata(), + codec: Codec::Fixed(size), }; resolver.register(f.name, namespace, field.clone()); Ok(field) @@ -265,16 +275,11 @@ fn make_field<'a>( ))), }, Schema::Type(t) => { - let mut field = make_field( - &Schema::TypeName(t.r#type.clone()), - name, - namespace, - resolver, - )?; - let meta = Arc::make_mut(&mut field.meta); + let mut field = + make_data_type(&Schema::TypeName(t.r#type.clone()), namespace, resolver)?; // https://avro.apache.org/docs/1.11.1/specification/#logical-types - match (t.attributes.logical_type, &mut meta.codec) { + match (t.attributes.logical_type, &mut field.codec) { (Some("decimal"), c @ Codec::Fixed(_)) => { return Err(ArrowError::NotYetImplemented( "Decimals are not currently supported".to_string(), @@ -294,14 +299,14 @@ fn make_field<'a>( (Some("duration"), c @ Codec::Fixed(12)) => *c = Codec::Duration, (Some(logical), _) => { // Insert unrecognized logical type into metadata map - meta.metadata.insert("logicalType".into(), logical.into()); + field.metadata.insert("logicalType".into(), logical.into()); } (None, _) => {} } if !t.attributes.additional.is_empty() { for (k, v) in &t.attributes.additional { - meta.metadata.insert(k.to_string(), v.to_string()); + field.metadata.insert(k.to_string(), v.to_string()); } } Ok(field) diff --git a/arrow-avro/src/reader/header.rs b/arrow-avro/src/reader/header.rs index c2171cccf301..97f5d3b8b112 100644 --- a/arrow-avro/src/reader/header.rs +++ b/arrow-avro/src/reader/header.rs @@ -236,7 +236,7 @@ impl HeaderDecoder { #[cfg(test)] mod test { use super::*; - use crate::codec::AvroField; + use crate::codec::{AvroDataType, AvroField}; use crate::reader::read_header; use crate::schema::SCHEMA_METADATA_KEY; use crate::test_util::arrow_test_data; diff --git a/arrow-avro/src/schema.rs b/arrow-avro/src/schema.rs index 6f1f3d6bd012..6707f8137c9b 100644 --- a/arrow-avro/src/schema.rs +++ b/arrow-avro/src/schema.rs @@ -210,7 +210,7 @@ pub struct Fixed<'a> { #[cfg(test)] mod tests { use super::*; - use crate::codec::AvroField; + use crate::codec::{AvroDataType, AvroField}; use arrow_schema::{DataType, Fields, TimeUnit}; use serde_json::json;