Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Kafka] Support native format read/write kafka record #8724

Open
wants to merge 18 commits into
base: dev
Choose a base branch
from

Conversation

CosmosNi
Copy link
Contributor

close #8651

Purpose of this pull request

Does this PR introduce any user-facing change?

How was this patch tested?

Check list

@@ -80,6 +83,7 @@ public KafkaSinkWriter(
&& !CollectionUtils.isEmpty(pluginConfig.get(ASSIGN_PARTITIONS))) {
MessageContentPartitioner.setAssignPartitions(pluginConfig.get(ASSIGN_PARTITIONS));
}
isNative = pluginConfig.get(IS_NATIVE) != null && pluginConfig.get(IS_NATIVE);
Copy link
Contributor

@litiliu litiliu Feb 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it better to use Boolean.True.equals(pluginConfig.get(IS_NATIVE))?

Comment on lines 195 to 202
ConsumerRecord<String, String> oldRecord = data.get(i);
ConsumerRecord<String, String> newRecord = data.get(i);
Assertions.assertEquals(oldRecord.key(), newRecord.key());
Assertions.assertEquals(oldRecord.headers(), newRecord.headers());
Assertions.assertEquals(oldRecord.partition(), newRecord.partition());
Assertions.assertEquals(oldRecord.timestamp(), newRecord.timestamp());
Assertions.assertEquals(oldRecord.timestamp(), newRecord.timestamp());
Assertions.assertEquals(oldRecord.value(), newRecord.value());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems the oldRecord and newRecord are referring to the same object, what's the meaning of those assertions?

* @param out
*/
public void deserialize(ConsumerRecord<byte[], byte[]> msg, Collector<SeaTunnelRow> out) {
tryInitConverter();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we try to move the init logic to the constructor?


@Override
public SeaTunnelRow deserialize(byte[] message) throws IOException {
throw new UnsupportedOperationException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it not suggested that this method be invoked?

Comment on lines 381 to 390
is_native = true
topic = "test_topic"
bootstrap.servers = "kafkaCluster:9092"
kafka.config = {
client.id = client_1
max.poll.records = 500
auto.offset.reset = "earliest"
enable.auto.commit = "false"
}
format = "COMPATIBLE_KAFKA_CONNECT_JSON"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
is_native = true
topic = "test_topic"
bootstrap.servers = "kafkaCluster:9092"
kafka.config = {
client.id = client_1
max.poll.records = 500
auto.offset.reset = "earliest"
enable.auto.commit = "false"
}
format = "COMPATIBLE_KAFKA_CONNECT_JSON"
topic = "test_topic"
bootstrap.servers = "kafkaCluster:9092"
kafka.config = {
client.id = client_1
max.poll.records = 500
auto.offset.reset = "earliest"
enable.auto.commit = "false"
}
format = "NATIVE"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -234,6 +275,7 @@ private static SerializationSchema createSerializationSchema(
ReadonlyConfig pluginConfig) {
switch (format) {
case JSON:
case NATIVE:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the format is native, we shouldn't expect the content to be JSON

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, content is not json.This is used to process producerRecord.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just like:SeatTunnelRowType<topic, partition, offset, timestamp, headers<map<string,string>>, key, value>


if (format == MessageFormat.NATIVE) {
tableSchema =
TableSchema.builder()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move to static final field

seaTunnelRowSerializer.serializeRow(element);
ProducerRecord<byte[], byte[]> producerRecord;
if (isNative) {
producerRecord = seaTunnelRowSerializer.serializeNativeRow(element, seaTunnelRowType);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not override topicExtractor, partitionExtractor, ...

remove this if-else

@CosmosNi CosmosNi requested a review from hailin0 February 21, 2025 07:22
@@ -170,18 +170,22 @@ private Properties getKafkaProperties(ReadonlyConfig pluginConfig) {
private SeaTunnelRowSerializer<byte[], byte[]> getSerializer(
ReadonlyConfig pluginConfig, SeaTunnelRowType seaTunnelRowType) {
MessageFormat messageFormat = pluginConfig.get(FORMAT);
String topic = pluginConfig.get(TOPIC);
if (MessageFormat.NATIVE.equals(messageFormat)) {
return DefaultSeaTunnelRowSerializer.create(topic, messageFormat, seaTunnelRowType);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check input fields & types when sink init

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature][Kafka] Support native format read/write kafka record
3 participants