Skip to content

Commit

Permalink
Hook up Avro Decoder (#6820)
Browse files Browse the repository at this point in the history
* Hook up Avro Decoder

* Docs

* Improved varint decode

* Docs

* Clippy

* More clippy
  • Loading branch information
tustvold authored Dec 1, 2024
1 parent 7247e6b commit 3ed0f06
Show file tree
Hide file tree
Showing 7 changed files with 670 additions and 32 deletions.
5 changes: 4 additions & 1 deletion arrow-avro/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ deflate = ["flate2"]
snappy = ["snap", "crc"]

[dependencies]
arrow-schema = { workspace = true }
arrow-schema = { workspace = true }
arrow-buffer = { workspace = true }
arrow-array = { workspace = true }
serde_json = { version = "1.0", default-features = false, features = ["std"] }
serde = { version = "1.0.188", features = ["derive"] }
flate2 = { version = "1.0", default-features = false, features = ["rust_backend"], optional = true }
Expand All @@ -49,4 +51,5 @@ crc = { version = "3.0", optional = true }


[dev-dependencies]
rand = { version = "0.8", default-features = false, features = ["std", "std_rng"] }

42 changes: 27 additions & 15 deletions arrow-avro/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use std::sync::Arc;
/// To accommodate this we special case two-variant unions where one of the
/// variants is the null type, and use this to derive arrow's notion of nullability
#[derive(Debug, Copy, Clone)]
enum Nulls {
pub enum Nullability {
/// The nulls are encoded as the first union variant
NullFirst,
/// The nulls are encoded as the second union variant
Expand All @@ -39,7 +39,7 @@ enum Nulls {
/// An Avro datatype mapped to the arrow data model
#[derive(Debug, Clone)]
pub struct AvroDataType {
nulls: Option<Nulls>,
nullability: Option<Nullability>,
metadata: HashMap<String, String>,
codec: Codec,
}
Expand All @@ -48,7 +48,15 @@ impl AvroDataType {
/// Returns an arrow [`Field`] with the given name
pub fn field_with_name(&self, name: &str) -> Field {
let d = self.codec.data_type();
Field::new(name, d, self.nulls.is_some()).with_metadata(self.metadata.clone())
Field::new(name, d, self.nullability.is_some()).with_metadata(self.metadata.clone())
}

pub fn codec(&self) -> &Codec {
&self.codec
}

pub fn nullability(&self) -> Option<Nullability> {
self.nullability
}
}

Expand All @@ -65,9 +73,13 @@ impl AvroField {
self.data_type.field_with_name(&self.name)
}

/// Returns the [`Codec`]
pub fn codec(&self) -> &Codec {
&self.data_type.codec
/// Returns the [`AvroDataType`]
pub fn data_type(&self) -> &AvroDataType {
&self.data_type
}

pub fn name(&self) -> &str {
&self.name
}
}

Expand Down Expand Up @@ -114,7 +126,7 @@ pub enum Codec {
Fixed(i32),
List(Arc<AvroDataType>),
Struct(Arc<[AvroField]>),
Duration,
Interval,
}

impl Codec {
Expand All @@ -137,7 +149,7 @@ impl Codec {
Self::TimestampMicros(is_utc) => {
DataType::Timestamp(TimeUnit::Microsecond, is_utc.then(|| "+00:00".into()))
}
Self::Duration => DataType::Interval(IntervalUnit::MonthDayNano),
Self::Interval => DataType::Interval(IntervalUnit::MonthDayNano),
Self::Fixed(size) => DataType::FixedSizeBinary(*size),
Self::List(f) => {
DataType::List(Arc::new(f.field_with_name(Field::LIST_FIELD_DEFAULT_NAME)))
Expand Down Expand Up @@ -200,7 +212,7 @@ fn make_data_type<'a>(
) -> Result<AvroDataType, ArrowError> {
match schema {
Schema::TypeName(TypeName::Primitive(p)) => Ok(AvroDataType {
nulls: None,
nullability: None,
metadata: Default::default(),
codec: (*p).into(),
}),
Expand All @@ -213,12 +225,12 @@ fn make_data_type<'a>(
match (f.len() == 2, null) {
(true, Some(0)) => {
let mut field = make_data_type(&f[1], namespace, resolver)?;
field.nulls = Some(Nulls::NullFirst);
field.nullability = Some(Nullability::NullFirst);
Ok(field)
}
(true, Some(1)) => {
let mut field = make_data_type(&f[0], namespace, resolver)?;
field.nulls = Some(Nulls::NullSecond);
field.nullability = Some(Nullability::NullSecond);
Ok(field)
}
_ => Err(ArrowError::NotYetImplemented(format!(
Expand All @@ -241,7 +253,7 @@ fn make_data_type<'a>(
.collect::<Result<_, ArrowError>>()?;

let field = AvroDataType {
nulls: None,
nullability: None,
codec: Codec::Struct(fields),
metadata: r.attributes.field_metadata(),
};
Expand All @@ -251,7 +263,7 @@ fn make_data_type<'a>(
ComplexType::Array(a) => {
let mut field = make_data_type(a.items.as_ref(), namespace, resolver)?;
Ok(AvroDataType {
nulls: None,
nullability: None,
metadata: a.attributes.field_metadata(),
codec: Codec::List(Arc::new(field)),
})
Expand All @@ -262,7 +274,7 @@ fn make_data_type<'a>(
})?;

let field = AvroDataType {
nulls: None,
nullability: None,
metadata: f.attributes.field_metadata(),
codec: Codec::Fixed(size),
};
Expand Down Expand Up @@ -298,7 +310,7 @@ fn make_data_type<'a>(
(Some("local-timestamp-micros"), c @ Codec::Int64) => {
*c = Codec::TimestampMicros(false)
}
(Some("duration"), c @ Codec::Fixed(12)) => *c = Codec::Duration,
(Some("duration"), c @ Codec::Fixed(12)) => *c = Codec::Interval,
(Some(logical), _) => {
// Insert unrecognized logical type into metadata map
field.metadata.insert("logicalType".into(), logical.into());
Expand Down
121 changes: 121 additions & 0 deletions arrow-avro/src/reader/cursor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::reader::vlq::read_varint;
use arrow_schema::ArrowError;

/// A wrapper around a byte slice, providing low-level decoding for Avro
///
/// <https://avro.apache.org/docs/1.11.1/specification/#encodings>
#[derive(Debug)]
pub(crate) struct AvroCursor<'a> {
buf: &'a [u8],
start_len: usize,
}

impl<'a> AvroCursor<'a> {
pub(crate) fn new(buf: &'a [u8]) -> Self {
Self {
buf,
start_len: buf.len(),
}
}

/// Returns the current cursor position
#[inline]
pub(crate) fn position(&self) -> usize {
self.start_len - self.buf.len()
}

/// Read a single `u8`
#[inline]
pub(crate) fn get_u8(&mut self) -> Result<u8, ArrowError> {
match self.buf.first().copied() {
Some(x) => {
self.buf = &self.buf[1..];
Ok(x)
}
None => Err(ArrowError::ParseError("Unexpected EOF".to_string())),
}
}

#[inline]
pub(crate) fn get_bool(&mut self) -> Result<bool, ArrowError> {
Ok(self.get_u8()? != 0)
}

pub(crate) fn read_vlq(&mut self) -> Result<u64, ArrowError> {
let (val, offset) = read_varint(self.buf)
.ok_or_else(|| ArrowError::ParseError("bad varint".to_string()))?;
self.buf = &self.buf[offset..];
Ok(val)
}

#[inline]
pub(crate) fn get_int(&mut self) -> Result<i32, ArrowError> {
let varint = self.read_vlq()?;
let val: u32 = varint
.try_into()
.map_err(|_| ArrowError::ParseError("varint overflow".to_string()))?;
Ok((val >> 1) as i32 ^ -((val & 1) as i32))
}

#[inline]
pub(crate) fn get_long(&mut self) -> Result<i64, ArrowError> {
let val = self.read_vlq()?;
Ok((val >> 1) as i64 ^ -((val & 1) as i64))
}

pub(crate) fn get_bytes(&mut self) -> Result<&'a [u8], ArrowError> {
let len: usize = self.get_long()?.try_into().map_err(|_| {
ArrowError::ParseError("offset overflow reading avro bytes".to_string())
})?;

if (self.buf.len() < len) {
return Err(ArrowError::ParseError(
"Unexpected EOF reading bytes".to_string(),
));
}
let ret = &self.buf[..len];
self.buf = &self.buf[len..];
Ok(ret)
}

#[inline]
pub(crate) fn get_float(&mut self) -> Result<f32, ArrowError> {
if (self.buf.len() < 4) {
return Err(ArrowError::ParseError(
"Unexpected EOF reading float".to_string(),
));
}
let ret = f32::from_le_bytes(self.buf[..4].try_into().unwrap());
self.buf = &self.buf[4..];
Ok(ret)
}

#[inline]
pub(crate) fn get_double(&mut self) -> Result<f64, ArrowError> {
if (self.buf.len() < 8) {
return Err(ArrowError::ParseError(
"Unexpected EOF reading float".to_string(),
));
}
let ret = f64::from_le_bytes(self.buf[..8].try_into().unwrap());
self.buf = &self.buf[8..];
Ok(ret)
}
}
13 changes: 12 additions & 1 deletion arrow-avro/src/reader/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
use crate::compression::{CompressionCodec, CODEC_METADATA_KEY};
use crate::reader::vlq::VLQDecoder;
use crate::schema::Schema;
use crate::schema::{Schema, SCHEMA_METADATA_KEY};
use arrow_schema::ArrowError;

#[derive(Debug)]
Expand Down Expand Up @@ -89,6 +89,17 @@ impl Header {
))),
}
}

/// Returns the [`Schema`] if any
pub fn schema(&self) -> Result<Option<Schema<'_>>, ArrowError> {
self.get(SCHEMA_METADATA_KEY)
.map(|x| {
serde_json::from_slice(x).map_err(|e| {
ArrowError::ParseError(format!("Failed to parse Avro schema JSON: {e}"))
})
})
.transpose()
}
}

/// A decoder for [`Header`]
Expand Down
Loading

0 comments on commit 3ed0f06

Please sign in to comment.