Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs committed Apr 9, 2024
1 parent 693d8b1 commit 4605ae0
Show file tree
Hide file tree
Showing 4 changed files with 4 additions and 24 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/connector/src/sink/big_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ impl BigQuerySinkWriter {
&config.common.table
))
})?;
let row_encoder = ProtoEncoder::new_with_bigquery(
let row_encoder = ProtoEncoder::new(
schema.clone(),
None,
message_descriptor.clone(),
Expand Down
21 changes: 1 addition & 20 deletions src/connector/src/sink/encoder/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,24 +74,6 @@ impl ProtoEncoder {
header,
})
}

pub fn new_with_default(
schema: Schema,
col_indices: Option<Vec<usize>>,
descriptor: MessageDescriptor,
header: ProtoHeader,
) -> SinkResult<Self> {
Self::new(schema, col_indices, descriptor, header)
}

pub fn new_with_bigquery(
schema: Schema,
col_indices: Option<Vec<usize>>,
descriptor: MessageDescriptor,
header: ProtoHeader,
) -> SinkResult<Self> {
Self::new(schema, col_indices, descriptor, header)
}
}

pub struct ProtoEncoded {
Expand Down Expand Up @@ -499,8 +481,7 @@ mod tests {
]);

let encoder =
ProtoEncoder::new_with_default(schema, None, descriptor.clone(), ProtoHeader::None)
.unwrap();
ProtoEncoder::new(schema, None, descriptor.clone(), ProtoHeader::None).unwrap();
let m = encoder.encode(row).unwrap();
let encoded: Vec<u8> = m.ser_to().unwrap();
assert_eq!(
Expand Down
3 changes: 1 addition & 2 deletions src/connector/src/sink/formatter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,7 @@ impl SinkFormatterImpl {
None => ProtoHeader::None,
Some(sid) => ProtoHeader::ConfluentSchemaRegistry(sid),
};
let val_encoder =
ProtoEncoder::new_with_default(schema, None, descriptor, header)?;
let val_encoder = ProtoEncoder::new(schema, None, descriptor, header)?;
let formatter = AppendOnlyFormatter::new(key_encoder, val_encoder);
Ok(SinkFormatterImpl::AppendOnlyProto(formatter))
}
Expand Down

0 comments on commit 4605ae0

Please sign in to comment.