Skip to content

Commit

Permalink
This commit adds the capability to deserialize Avro single object enc…
Browse files Browse the repository at this point in the history
…oding messages.
  • Loading branch information
tutnixzursache committed Nov 22, 2024
1 parent b7638ed commit 4c491f6
Show file tree
Hide file tree
Showing 8 changed files with 6,476 additions and 3 deletions.
6,189 changes: 6,189 additions & 0 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ edition = "2018"
flate2 = "1.0"
anyhow = "1"
async-trait = "0.1"
apache-avro = "^0.14"
apache-avro = "^0.17"
base64 = "0.13"
bytes = "1"
chrono = "0.4.31"
Expand All @@ -32,6 +32,7 @@ tokio-stream = { version = "0", features = ["fs"] }
tokio-util = "0.6.3"
uuid = { version = "0.8", features = ["serde", "v4"] }
url = "2.3"
dashmap = "6.0.1"

# datafusion feature is required for writer version 2
deltalake-core = { version = "0.21.0", features = ["json", "datafusion"]}
Expand Down
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,9 @@ pub enum MessageFormat {

/// Parses avro messages using provided schema, schema registry or schema within file
Avro(SchemaSource),

/// Parses avro messages in the single object encoding format, PathBuf can either point to a single avro schema file or a directory containing (only) multiple avro schema files
SoeAvro(PathBuf),
}

/// Source for schema
Expand Down
32 changes: 31 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,24 @@ fn to_schema_source(
}
}

fn to_schema_path(input: Option<&String>) -> Result<PathBuf, SchemaSourceError> {
match input {
None => Err(SchemaSourceError::NoFileSpecified),
Some(value) => {
if value.is_empty() {
return Err(SchemaSourceError::NoFileSpecified);
}
let p = PathBuf::from_str(value)?;
if !p.exists() {
return Err(SchemaSourceError::FileNotFound {
file_name: (*value).clone(),
});
}
return Ok(p);
}
}
}

fn init_logger(app_id: String) {
let app_id: &'static str = Box::leak(app_id.into_boxed_str());
let log_level = std::env::var("RUST_LOG")
Expand Down Expand Up @@ -272,6 +290,9 @@ enum SchemaSourceError {
},
#[error("File not found error: {file_name}")]
FileNotFound { file_name: String },

#[error("No file specified error")]
NoFileSpecified,
}

fn parse_kafka_property(val: &str) -> Result<(String, String), KafkaPropertySyntaxError> {
Expand Down Expand Up @@ -444,8 +465,12 @@ This can be used to provide TLS configuration as in:
.env("AVRO_REGISTRY")
.required(false)
.help("Schema registry endpoint, local path, or empty string"))
.arg(Arg::new("soe-avro")
.long("soe-avro")
.required(false)
.help("Local path to either a single Avro schema file or a directory containing (only) Avro schematas"))
.group(ArgGroup::new("format")
.args(["json", "avro"])
.args(["json", "avro","soe-avro"])
.required(false))
.arg(Arg::new("end")
.short('e')
Expand Down Expand Up @@ -476,6 +501,11 @@ fn convert_matches_to_message_format(
.map(MessageFormat::Avro);
}

if ingest_matches.contains_id("soe-avro") {
return to_schema_path(ingest_matches.get_one::<String>("soe-avro"))
.map( MessageFormat::SoeAvro);
}

return to_schema_source(ingest_matches.get_one::<String>("json"), true)
.map(MessageFormat::Json);
}
Expand Down
145 changes: 144 additions & 1 deletion src/serialization.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,22 @@
use crate::{dead_letters::DeadLetter, MessageDeserializationError, MessageFormat};
use async_trait::async_trait;
use dashmap::DashMap;
use flate2::read::GzDecoder;
use schema_registry_converter::async_impl::{
easy_avro::EasyAvroDecoder, easy_json::EasyJsonDecoder, schema_registry::SrSettings,
};
use serde_json::Value;
use std::{borrow::BorrowMut, convert::TryFrom, io::Cursor, io::Read, path::PathBuf};

// use crate::avro_canonical_schema_workaround::parse_into_canonical_form;
use apache_avro::{rabin::Rabin, GenericSingleObjectReader, Schema};
use std::{
borrow::BorrowMut,
convert::{TryFrom, TryInto},
io::{Cursor, Read},
path::PathBuf,
};

use log::debug;

#[async_trait]
pub(crate) trait MessageDeserializer {
Expand Down Expand Up @@ -48,6 +59,10 @@ impl MessageDeserializerFactory {
}
}
},
MessageFormat::SoeAvro(path) => match SoeAvroDeserializer::try_from_path(path) {
Ok(s) => Ok(Box::new(s)),
Err(e) => Err(e),
},
_ => Ok(Box::new(DefaultDeserializer::new(decompress_gzip))),
}
}
Expand Down Expand Up @@ -128,6 +143,10 @@ struct AvroDeserializer {
decoder: EasyAvroDecoder,
}

struct SoeAvroDeserializer { //Deserializer for avro single object encoding
decoders: DashMap<i64, GenericSingleObjectReader>,
}

#[derive(Default)]
struct AvroSchemaDeserializer {
schema: Option<apache_avro::Schema>,
Expand All @@ -137,6 +156,58 @@ struct JsonDeserializer {
decoder: EasyJsonDecoder,
}

#[async_trait]
impl MessageDeserializer for SoeAvroDeserializer {
async fn deserialize(
&mut self,
message_bytes: &[u8],
) -> Result<Value, MessageDeserializationError> {
let key = Self::extract_message_fingerprint(&message_bytes).map_err(|e| {
MessageDeserializationError::AvroDeserialization {
dead_letter: DeadLetter::from_failed_deserialization(message_bytes, e.to_string()),
}
})?;

let decoder =
self.decoders
.get(&key)
.ok_or(MessageDeserializationError::AvroDeserialization {
dead_letter: DeadLetter::from_failed_deserialization(
message_bytes,
format!(
"Unkown schema with fingerprint {}",
&message_bytes[2..10]
.iter()
.map(|byte| format!("{:02x}", byte))
.collect::<Vec<String>>()
.join("")
),
),
})?;
let mut reader = Cursor::new(message_bytes);

match decoder.read_value(&mut reader) {
Ok(drs) => match Value::try_from(drs) {
Ok(v) => Ok(v),
Err(e) => Err(MessageDeserializationError::AvroDeserialization {
dead_letter: DeadLetter::from_failed_deserialization(
message_bytes,
e.to_string(),
),
}),
},
Err(e) => {
return Err(MessageDeserializationError::AvroDeserialization {
dead_letter: DeadLetter::from_failed_deserialization(
message_bytes,
e.to_string(),
),
});
}
}
}
}

#[async_trait]
impl MessageDeserializer for AvroDeserializer {
async fn deserialize(
Expand Down Expand Up @@ -293,5 +364,77 @@ impl AvroDeserializer {
}
}

impl SoeAvroDeserializer {
pub(crate) fn try_from_path(path: &PathBuf) -> Result<Self, anyhow::Error> {
if path.is_file() {
let (key, seo_reader) = Self::read_single_schema_file(path)?;
debug!("Loaded schema {:?} with key (i64 rep of fingerprint) {:?}", path, key);
let map: DashMap<i64, GenericSingleObjectReader> = DashMap::with_capacity(1);
map.insert(key, seo_reader);
Ok(SoeAvroDeserializer { decoders: map })
} else if path.is_dir() {
let decoders = path
.read_dir()?
.into_iter()
.map(|file| {
let file_path = file?.path();
let value = Self::read_single_schema_file(&file_path)?;
Ok(value)
})
.collect::<anyhow::Result<DashMap<_, _>>>()?;

Ok(SoeAvroDeserializer { decoders })
} else {
Err(anyhow::format_err!("Path '{:?}' does not exists", path))
}
}

fn read_single_schema_file(
path: &PathBuf,
) -> Result<(i64, GenericSingleObjectReader), anyhow::Error> {
match std::fs::read_to_string(path) {
Ok(content) => match Schema::parse_str(&content) {
Ok(s) => {
let fingerprint = s.fingerprint::<Rabin>().bytes;
let fingerprint = fingerprint
.try_into()
.expect("Rabin fingerprints are 8 bytes");
let key = Self::fingerprint_to_i64(fingerprint);
match GenericSingleObjectReader::new(s) {
Ok(decoder) => Ok((key, decoder)),
Err(e) => Err(anyhow::format_err!(
"Schema file '{:?}'; Error: {}",
path,
e.to_string()
)),
}
}
Err(e) => Err(anyhow::format_err!(
"Schema file '{:?}'; Error: {}",
path,
e.to_string()
)),
},
Err(e) => Err(anyhow::format_err!(
"Schema file '{:?}'; Error: {}",
path,
e.to_string()
)),
}
}

fn extract_message_fingerprint(msg: &[u8]) -> Result<i64, anyhow::Error> {
msg.get(2..10)
.ok_or(anyhow::anyhow!(
"Message does not contain a valid fingerprint"
))
.map(|x| Self::fingerprint_to_i64(x.try_into().expect("Slice must be 8 bytes long")))
}

fn fingerprint_to_i64(msg: [u8; 8]) -> i64 {
i64::from_le_bytes(msg)
}
}

#[cfg(test)]
mod tests {}
56 changes: 56 additions & 0 deletions src/transforms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ lazy_static! {
"epoch_micros_to_iso8601",
Box::new(create_epoch_micros_to_iso8601_fn()),
);
runtime.register_function(
"epoch_millis_to_micro",
Box::new(create_epoch_millis_to_micro_fn()),
);
runtime
};
}
Expand Down Expand Up @@ -202,6 +206,13 @@ fn create_epoch_micros_to_iso8601_fn() -> CustomFunction {
)
}

fn create_epoch_millis_to_micro_fn() -> CustomFunction {
CustomFunction::new(
Signature::new(vec![ArgumentType::Number], None),
Box::new(jmespath_epoch_millis_to_micro),
)
}

fn substr(args: &[Rcvar], context: &mut Context) -> Result<Rcvar, JmespathError> {
let s = args[0].as_string().ok_or_else(|| {
InvalidTypeError::new(context, "string", args[0].get_type().to_string(), 0)
Expand Down Expand Up @@ -268,6 +279,14 @@ fn jmespath_epoch_micros_to_iso8601(
let variable = Variable::try_from(value)?;
Ok(Arc::new(variable))
}
fn jmespath_epoch_millis_to_micro(
args: &[Rcvar],
context: &mut Context,
) -> Result<Rcvar, JmespathError> {
let millis = i64_from_args(args, context, 0)?;
let variable = Variable::Number((millis * 1000).into());
Ok(Arc::new(variable))
}

fn i64_from_args(
args: &[Rcvar],
Expand Down Expand Up @@ -586,6 +605,43 @@ mod tests {
assert_eq!(expected_iso, dt);
}


#[test]
fn test_epoch_millis_to_micro() {
let mut test_value = json!({
"name": "A",
"modified": 1732279537028u64,
});

let test_message = OwnedMessage::new(
Some(test_value.to_string().into_bytes()),
None,
"test".to_string(),
rdkafka::Timestamp::NotAvailable,
0,
0,
None,
);

let mut transforms = HashMap::new();

transforms.insert(
"modified_micros".to_string(),
"epoch_millis_to_micro(modified)".to_string(),
);

let transformer = Transformer::from_transforms(&transforms).unwrap();

transformer
.transform(&mut test_value, Some(&test_message))
.unwrap();

let modified_date = test_value.get("modified_micros").unwrap().as_u64().unwrap();

assert_eq!(1732279537028000u64, modified_date);
}


#[test]
fn test_transforms_with_epoch_seconds_to_iso8601() {
let expected_iso = "2021-07-20T23:18:18Z";
Expand Down
1 change: 1 addition & 0 deletions src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,7 @@ impl DataWriter {
let mut adds = self.write_parquet_files(&table.table_uri()).await?;
let actions = adds.drain(..).map(Action::Add).collect();
let commit = deltalake_core::operations::transaction::CommitBuilder::default()
.with_max_retries(100)//We increase this from the default 15 times because (at leat for Azure) this may fail in case of to frequent writes (which happen if many messages arrive in the dead letter queue)
.with_actions(actions)
.build(
table.state.as_ref().map(|s| s as &dyn TableReference),
Expand Down
Loading

0 comments on commit 4c491f6

Please sign in to comment.