From 198704202d7b3d3901497d187060f1a07d712716 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Sun, 7 Jan 2024 13:29:03 -0800 Subject: [PATCH] Remove avro from the default features and feature gate its cpde This change is a little wrapped up in the introduction of DeserializedMessage but the trade-off for development targeting S3 is that I am linking 382 crates every cycle as opposed to 451. Fixes #163 --- Cargo.lock | 14 ++ Cargo.toml | 2 +- src/lib.rs | 17 +- .../avro.rs} | 179 +---------------- src/serialization/mod.rs | 184 ++++++++++++++++++ tests/delta_partitions_tests.rs | 2 +- tests/deserialization_tests.rs | 15 +- 7 files changed, 231 insertions(+), 182 deletions(-) rename src/{serialization.rs => serialization/avro.rs} (52%) create mode 100644 src/serialization/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 2acb2aa..47823b5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1781,6 +1781,7 @@ dependencies = [ "rdkafka", "rusoto_core", "rusoto_credential", + "rusoto_s3", "schema_registry_converter", "sentry", "serde", @@ -2881,6 +2882,19 @@ dependencies = [ "serde_json", ] +[[package]] +name = "rusoto_s3" +version = "0.47.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "048c2fe811a823ad5a9acc976e8bf4f1d910df719dcf44b15c3e96c5b7a51027" +dependencies = [ + "async-trait", + "bytes", + "futures", + "rusoto_core", + "xml-rs", +] + [[package]] name = "rusoto_signature" version = "0.47.0" diff --git a/Cargo.toml b/Cargo.toml index 4413ea4..a84be34 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,7 +52,6 @@ azure_storage_blobs = { version = "0.18.0", optional = true } [features] default = [ - "avro", ] avro = [ "apache-avro", @@ -78,6 +77,7 @@ utime = "0.3" serial_test = "*" tempfile = "3" time = "0.3.20" +rusoto_s3 = { version = "0.47", default-features = false, features = ["rustls"]} [profile.release] lto = true diff --git a/src/lib.rs b/src/lib.rs index b60b68a..3271c4f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -718,6 +718,7 @@ enum MessageDeserializationError { EmptyPayload, #[error("Kafka message deserialization failed")] JsonDeserialization { dead_letter: DeadLetter }, + #[cfg(feature = "avro")] #[error("Kafka message deserialization failed")] AvroDeserialization { dead_letter: DeadLetter }, } @@ -845,10 +846,18 @@ impl IngestProcessor { partition, offset ); } - Err( - MessageDeserializationError::JsonDeserialization { dead_letter } - | MessageDeserializationError::AvroDeserialization { dead_letter }, - ) => { + Err(MessageDeserializationError::JsonDeserialization { dead_letter }) => { + warn!( + "Deserialization failed - partition {}, offset {}, dead_letter {}", + partition, + offset, + dead_letter.error.as_ref().unwrap_or(&String::from("_")), + ); + self.ingest_metrics.message_deserialization_failed(); + self.dlq.write_dead_letter(dead_letter).await?; + } + #[cfg(feature = "avro")] + Err(MessageDeserializationError::AvroDeserialization { dead_letter }) => { warn!( "Deserialization failed - partition {}, offset {}, dead_letter {}", partition, diff --git a/src/serialization.rs b/src/serialization/avro.rs similarity index 52% rename from src/serialization.rs rename to src/serialization/avro.rs index 55c2fff..bd3e4ef 100644 --- a/src/serialization.rs +++ b/src/serialization/avro.rs @@ -1,192 +1,23 @@ use std::{borrow::BorrowMut, convert::TryFrom, io::Cursor, path::PathBuf}; +use super::{DeserializedMessage, MessageDeserializationError, MessageDeserializer}; +use crate::dead_letters::DeadLetter; use async_trait::async_trait; use schema_registry_converter::async_impl::{ easy_avro::EasyAvroDecoder, easy_json::EasyJsonDecoder, schema_registry::SrSettings, }; use serde_json::Value; -use crate::{dead_letters::DeadLetter, MessageDeserializationError, MessageFormat}; - -use deltalake_core::arrow::datatypes::Schema as ArrowSchema; - -/// Structure which contains the [serde_json::Value] and the inferred schema of the message -/// -/// The [ArrowSchema] helps with schema evolution -#[derive(Clone, Debug, Default, PartialEq)] -pub struct DeserializedMessage { - message: Value, - schema: Option, -} - -impl DeserializedMessage { - pub fn schema(&self) -> &Option { - &self.schema - } - pub fn message(self) -> Value { - self.message - } - pub fn get(&self, key: &str) -> Option<&Value> { - self.message.get(key) - } - pub fn as_object_mut(&mut self) -> Option<&mut serde_json::Map> { - self.message.as_object_mut() - } -} - -/// Allow for `.into()` on [Value] for ease of use -impl From for DeserializedMessage { - fn from(message: Value) -> Self { - // XXX: This seems wasteful, this function should go away, and the deserializers should - // infer straight from the buffer stream - let iter = vec![message.clone()].into_iter().map(|v| Ok(v)); - let schema = - match deltalake_core::arrow::json::reader::infer_json_schema_from_iterator(iter) { - Ok(schema) => Some(schema), - _ => None, - }; - Self { message, schema } - } -} - -#[async_trait] -pub(crate) trait MessageDeserializer { - async fn deserialize( - &mut self, - message_bytes: &[u8], - ) -> Result; -} - -pub(crate) struct MessageDeserializerFactory {} -impl MessageDeserializerFactory { - pub fn try_build( - input_format: &MessageFormat, - ) -> Result, anyhow::Error> { - match input_format { - MessageFormat::Json(data) => match data { - crate::SchemaSource::None => Ok(Self::json_default()), - crate::SchemaSource::SchemaRegistry(sr) => { - match Self::build_sr_settings(sr).map(JsonDeserializer::from_schema_registry) { - Ok(s) => Ok(Box::new(s)), - Err(e) => Err(e), - } - } - crate::SchemaSource::File(_) => Ok(Self::json_default()), - }, - MessageFormat::Avro(data) => match data { - crate::SchemaSource::None => Ok(Box::::default()), - crate::SchemaSource::SchemaRegistry(sr) => { - match Self::build_sr_settings(sr).map(AvroDeserializer::from_schema_registry) { - Ok(s) => Ok(Box::new(s)), - Err(e) => Err(e), - } - } - crate::SchemaSource::File(f) => { - match AvroSchemaDeserializer::try_from_schema_file(f) { - Ok(s) => Ok(Box::new(s)), - Err(e) => Err(e), - } - } - }, - _ => Ok(Box::new(DefaultDeserializer {})), - } - } - - fn json_default() -> Box { - Box::new(DefaultDeserializer {}) - } - - fn build_sr_settings(registry_url: &url::Url) -> Result { - let mut url_string = registry_url.as_str(); - if url_string.ends_with('/') { - url_string = &url_string[0..url_string.len() - 1]; - } - - let mut builder = SrSettings::new_builder(url_string.to_owned()); - if let Ok(username) = std::env::var("SCHEMA_REGISTRY_USERNAME") { - builder.set_basic_authorization( - username.as_str(), - std::option_env!("SCHEMA_REGISTRY_PASSWORD"), - ); - } - - if let Ok(proxy_url) = std::env::var("SCHEMA_REGISTRY_PROXY") { - builder.set_proxy(proxy_url.as_str()); - } - - match builder.build() { - Ok(s) => Ok(s), - Err(e) => Err(anyhow::Error::new(e)), - } - } -} - -#[derive(Clone, Debug, Default)] -struct DefaultDeserializer {} - -#[async_trait] -impl MessageDeserializer for DefaultDeserializer { - async fn deserialize( - &mut self, - payload: &[u8], - ) -> Result { - let value: Value = match serde_json::from_slice(payload) { - Ok(v) => v, - Err(e) => { - return Err(MessageDeserializationError::JsonDeserialization { - dead_letter: DeadLetter::from_failed_deserialization(payload, e.to_string()), - }); - } - }; - - Ok(value.into()) - } -} - -#[cfg(test)] -mod default_tests { - use super::*; - - #[tokio::test] - async fn deserialize_with_schema() { - let mut deser = DefaultDeserializer::default(); - let message = deser - .deserialize(r#"{"hello" : "world"}"#.as_bytes()) - .await - .expect("Failed to deserialize trivial JSON"); - assert!( - message.schema().is_some(), - "The DeserializedMessage doesn't have a schema!" - ); - } - - #[tokio::test] - async fn deserialize_simple_json() { - #[derive(serde::Deserialize)] - struct HW { - hello: String, - } - - let mut deser = DefaultDeserializer::default(); - let message = deser - .deserialize(r#"{"hello" : "world"}"#.as_bytes()) - .await - .expect("Failed to deserialize trivial JSON"); - let value: HW = serde_json::from_value(message.message).expect("Failed to coerce"); - assert_eq!("world", value.hello); - } -} - -struct AvroDeserializer { +pub(crate) struct AvroDeserializer { decoder: EasyAvroDecoder, } #[derive(Default)] -struct AvroSchemaDeserializer { +pub(crate) struct AvroSchemaDeserializer { schema: Option, } -struct JsonDeserializer { +pub(crate) struct JsonDeserializer { decoder: EasyJsonDecoder, } diff --git a/src/serialization/mod.rs b/src/serialization/mod.rs new file mode 100644 index 0000000..4f45ee4 --- /dev/null +++ b/src/serialization/mod.rs @@ -0,0 +1,184 @@ +use async_trait::async_trait; +use serde_json::Value; + +use crate::{dead_letters::DeadLetter, MessageDeserializationError, MessageFormat}; + +use deltalake_core::arrow::datatypes::Schema as ArrowSchema; + +#[cfg(feature = "avro")] +use schema_registry_converter::async_impl::schema_registry::SrSettings; + +#[cfg(feature = "avro")] +pub mod avro; +#[cfg(feature = "avro")] +use crate::serialization::avro::*; + +/// Structure which contains the [serde_json::Value] and the inferred schema of the message +/// +/// The [ArrowSchema] helps with schema evolution +#[derive(Clone, Debug, Default, PartialEq)] +pub struct DeserializedMessage { + message: Value, + schema: Option, +} + +impl DeserializedMessage { + pub fn schema(&self) -> &Option { + &self.schema + } + pub fn message(self) -> Value { + self.message + } + pub fn get(&self, key: &str) -> Option<&Value> { + self.message.get(key) + } + pub fn as_object_mut(&mut self) -> Option<&mut serde_json::Map> { + self.message.as_object_mut() + } +} + +/// Allow for `.into()` on [Value] for ease of use +impl From for DeserializedMessage { + fn from(message: Value) -> Self { + // XXX: This seems wasteful, this function should go away, and the deserializers should + // infer straight from the buffer stream + let iter = vec![message.clone()].into_iter().map(Ok); + let schema = + match deltalake_core::arrow::json::reader::infer_json_schema_from_iterator(iter) { + Ok(schema) => Some(schema), + _ => None, + }; + Self { message, schema } + } +} + +#[async_trait] +pub(crate) trait MessageDeserializer { + async fn deserialize( + &mut self, + message_bytes: &[u8], + ) -> Result; +} + +pub(crate) struct MessageDeserializerFactory {} +impl MessageDeserializerFactory { + pub fn try_build( + input_format: &MessageFormat, + ) -> Result, anyhow::Error> { + match input_format { + #[cfg(feature = "avro")] + MessageFormat::Json(data) => match data { + crate::SchemaSource::None => Ok(Self::json_default()), + crate::SchemaSource::SchemaRegistry(sr) => { + match Self::build_sr_settings(sr).map(JsonDeserializer::from_schema_registry) { + Ok(s) => Ok(Box::new(s)), + Err(e) => Err(e), + } + } + crate::SchemaSource::File(_) => Ok(Self::json_default()), + }, + #[cfg(feature = "avro")] + MessageFormat::Avro(data) => match data { + crate::SchemaSource::None => Ok(Box::::default()), + crate::SchemaSource::SchemaRegistry(sr) => { + match Self::build_sr_settings(sr).map(AvroDeserializer::from_schema_registry) { + Ok(s) => Ok(Box::new(s)), + Err(e) => Err(e), + } + } + crate::SchemaSource::File(f) => { + match AvroSchemaDeserializer::try_from_schema_file(f) { + Ok(s) => Ok(Box::new(s)), + Err(e) => Err(e), + } + } + }, + _ => Ok(Self::json_default()), + } + } + + fn json_default() -> Box { + Box::new(DefaultDeserializer {}) + } + + #[cfg(feature = "avro")] + fn build_sr_settings(registry_url: &url::Url) -> Result { + let mut url_string = registry_url.as_str(); + if url_string.ends_with('/') { + url_string = &url_string[0..url_string.len() - 1]; + } + + let mut builder = SrSettings::new_builder(url_string.to_owned()); + if let Ok(username) = std::env::var("SCHEMA_REGISTRY_USERNAME") { + builder.set_basic_authorization( + username.as_str(), + std::option_env!("SCHEMA_REGISTRY_PASSWORD"), + ); + } + + if let Ok(proxy_url) = std::env::var("SCHEMA_REGISTRY_PROXY") { + builder.set_proxy(proxy_url.as_str()); + } + + match builder.build() { + Ok(s) => Ok(s), + Err(e) => Err(anyhow::Error::new(e)), + } + } +} + +#[derive(Clone, Debug, Default)] +struct DefaultDeserializer {} + +#[async_trait] +impl MessageDeserializer for DefaultDeserializer { + async fn deserialize( + &mut self, + payload: &[u8], + ) -> Result { + let value: Value = match serde_json::from_slice(payload) { + Ok(v) => v, + Err(e) => { + return Err(MessageDeserializationError::JsonDeserialization { + dead_letter: DeadLetter::from_failed_deserialization(payload, e.to_string()), + }); + } + }; + + Ok(value.into()) + } +} + +#[cfg(test)] +mod default_tests { + use super::*; + + #[tokio::test] + async fn deserialize_with_schema() { + let mut deser = DefaultDeserializer::default(); + let message = deser + .deserialize(r#"{"hello" : "world"}"#.as_bytes()) + .await + .expect("Failed to deserialize trivial JSON"); + assert!( + message.schema().is_some(), + "The DeserializedMessage doesn't have a schema!" + ); + } + + #[tokio::test] + async fn deserialize_simple_json() { + #[derive(serde::Deserialize)] + struct HW { + hello: String, + } + + let mut deser = DefaultDeserializer::default(); + let message = deser + .deserialize(r#"{"hello" : "world"}"#.as_bytes()) + .await + .expect("Failed to deserialize trivial JSON"); + let value: HW = serde_json::from_value(message.message).expect("Failed to coerce"); + assert_eq!("world", value.hello); + } +} diff --git a/tests/delta_partitions_tests.rs b/tests/delta_partitions_tests.rs index b96ef16..4efd1fe 100644 --- a/tests/delta_partitions_tests.rs +++ b/tests/delta_partitions_tests.rs @@ -39,7 +39,7 @@ async fn test_delta_partitions() { "test_delta_partitions", ); - let mut table = deltalake_core::open_table(&table_path).await.unwrap(); + let table = deltalake_core::open_table(&table_path).await.unwrap(); let mut delta_writer = DataWriter::for_table(&table, HashMap::new()).unwrap(); let batch1 = vec![ diff --git a/tests/deserialization_tests.rs b/tests/deserialization_tests.rs index b1cead2..9862aa8 100644 --- a/tests/deserialization_tests.rs +++ b/tests/deserialization_tests.rs @@ -3,6 +3,7 @@ mod helpers; use kafka_delta_ingest::{IngestOptions, MessageFormat, SchemaSource}; use log::info; +#[cfg(feature = "avro")] use schema_registry_converter::{ async_impl::{ easy_avro::EasyAvroEncoder, @@ -15,8 +16,9 @@ use schema_registry_converter::{ use serde::{Deserialize, Serialize}; use serde_json::json; use serial_test::serial; -use std::path::PathBuf; -use std::str::FromStr; +#[cfg(feature = "avro")] +use std::{path::PathBuf, str::FromStr}; +#[cfg(feature = "avro")] use url::Url; const DEFAULT_AVRO_SCHEMA: &str = r#"{ @@ -115,6 +117,7 @@ async fn test_json_with_args() { rt.shutdown_background(); } +#[cfg(feature = "avro")] #[tokio::test] #[serial] async fn test_json_with_registry() { @@ -159,6 +162,7 @@ async fn test_json_with_registry() { rt.shutdown_background(); } +#[cfg(feature = "avro")] #[tokio::test] #[serial] async fn test_avro_default() { @@ -202,6 +206,7 @@ async fn test_avro_default() { rt.shutdown_background(); } +#[cfg(feature = "avro")] #[tokio::test] #[serial] async fn test_avro_with_file() { @@ -247,6 +252,7 @@ async fn test_avro_with_file() { rt.shutdown_background(); } +#[cfg(feature = "avro")] #[tokio::test] #[serial] async fn test_avro_with_registry() { @@ -298,22 +304,26 @@ struct TestMsg { name: String, } +#[cfg(feature = "avro")] fn default_settings() -> SrSettings { SrSettings::new(String::from(SCHEMA_REGISTRY_ADDRESS)) } +#[cfg(feature = "avro")] async fn avro_encode(item: impl Serialize, topic: String) -> Result, SRCError> { EasyAvroEncoder::new(default_settings()) .encode_struct(item, &SubjectNameStrategy::RecordNameStrategy(topic)) .await } +#[cfg(feature = "avro")] async fn json_encode(value: &serde_json::Value, topic: String) -> Result, SRCError> { EasyJsonEncoder::new(default_settings()) .encode(value, SubjectNameStrategy::RecordNameStrategy(topic)) .await } +#[cfg(feature = "avro")] async fn prepare_json_schema(topic: String) -> Result { let settings = default_settings(); let schema = SuppliedSchema { @@ -327,6 +337,7 @@ async fn prepare_json_schema(topic: String) -> Result Result { let settings = default_settings(); let schema = SuppliedSchema {