From 4605ae0a679262576bba1e1bdf9e670862ca0c9e Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Tue, 9 Apr 2024 13:05:23 +0800 Subject: [PATCH] fix --- Cargo.lock | 2 +- src/connector/src/sink/big_query.rs | 2 +- src/connector/src/sink/encoder/proto.rs | 21 +-------------------- src/connector/src/sink/formatter/mod.rs | 3 +-- 4 files changed, 4 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8fb0405fa50a..bcd2f0971da2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8516,7 +8516,7 @@ dependencies = [ "indoc", "libc", "memoffset", - "parking_lot 0.11.2", + "parking_lot 0.12.1", "portable-atomic", "pyo3-build-config", "pyo3-ffi", diff --git a/src/connector/src/sink/big_query.rs b/src/connector/src/sink/big_query.rs index 30f392b8268e..f3933ff86029 100644 --- a/src/connector/src/sink/big_query.rs +++ b/src/connector/src/sink/big_query.rs @@ -366,7 +366,7 @@ impl BigQuerySinkWriter { &config.common.table )) })?; - let row_encoder = ProtoEncoder::new_with_bigquery( + let row_encoder = ProtoEncoder::new( schema.clone(), None, message_descriptor.clone(), diff --git a/src/connector/src/sink/encoder/proto.rs b/src/connector/src/sink/encoder/proto.rs index bca69ed91fd8..3f50b3d97ff2 100644 --- a/src/connector/src/sink/encoder/proto.rs +++ b/src/connector/src/sink/encoder/proto.rs @@ -74,24 +74,6 @@ impl ProtoEncoder { header, }) } - - pub fn new_with_default( - schema: Schema, - col_indices: Option>, - descriptor: MessageDescriptor, - header: ProtoHeader, - ) -> SinkResult { - Self::new(schema, col_indices, descriptor, header) - } - - pub fn new_with_bigquery( - schema: Schema, - col_indices: Option>, - descriptor: MessageDescriptor, - header: ProtoHeader, - ) -> SinkResult { - Self::new(schema, col_indices, descriptor, header) - } } pub struct ProtoEncoded { @@ -499,8 +481,7 @@ mod tests { ]); let encoder = - ProtoEncoder::new_with_default(schema, None, descriptor.clone(), ProtoHeader::None) - .unwrap(); + ProtoEncoder::new(schema, None, descriptor.clone(), ProtoHeader::None).unwrap(); let m = encoder.encode(row).unwrap(); let encoded: Vec = m.ser_to().unwrap(); assert_eq!( diff --git a/src/connector/src/sink/formatter/mod.rs b/src/connector/src/sink/formatter/mod.rs index d6be02c70aea..d923d337a3ff 100644 --- a/src/connector/src/sink/formatter/mod.rs +++ b/src/connector/src/sink/formatter/mod.rs @@ -134,8 +134,7 @@ impl SinkFormatterImpl { None => ProtoHeader::None, Some(sid) => ProtoHeader::ConfluentSchemaRegistry(sid), }; - let val_encoder = - ProtoEncoder::new_with_default(schema, None, descriptor, header)?; + let val_encoder = ProtoEncoder::new(schema, None, descriptor, header)?; let formatter = AppendOnlyFormatter::new(key_encoder, val_encoder); Ok(SinkFormatterImpl::AppendOnlyProto(formatter)) }