From 878a4129a2fdaa133e935b2fe4aca8fc4f0292ad Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Tue, 21 May 2024 10:08:19 +0800 Subject: [PATCH] [improve][client] Deprecate `MessageIdUtils.getOffset` and `MessageIdUtils.getMessageId` (#22747) ### Motivation After discussing [here](https://github.com/apache/pulsar/pull/22698#discussion_r1597445741), the pulsar client shouldn't expose the `offset` term to users. ### Modifications - Deprecate `MessageIdUtils.getOffset` and `MessageIdUtils.getMessageId` - For connectors, use `FunctionCommon.getOffset` and `FunctionCommon.getMessageId` --- .../apache/pulsar/client/util/MessageIdUtils.java | 2 ++ pulsar-io/kafka-connect-adaptor/pom.xml | 6 ++++++ .../io/kafka/connect/PulsarKafkaSinkTaskContext.java | 4 ++-- .../io/kafka/connect/KafkaConnectSinkTest.java | 12 ++++++------ 4 files changed, 16 insertions(+), 8 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/util/MessageIdUtils.java b/pulsar-client/src/main/java/org/apache/pulsar/client/util/MessageIdUtils.java index 60cdad8e77200..459a31ee72022 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/util/MessageIdUtils.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/MessageIdUtils.java @@ -22,6 +22,7 @@ import org.apache.pulsar.client.impl.MessageIdImpl; public class MessageIdUtils { + @Deprecated public static final long getOffset(MessageId messageId) { MessageIdImpl msgId = (MessageIdImpl) messageId; long ledgerId = msgId.getLedgerId(); @@ -34,6 +35,7 @@ public static final long getOffset(MessageId messageId) { return offset; } + @Deprecated public static final MessageId getMessageId(long offset) { // Demultiplex ledgerId and entryId from offset long ledgerId = offset >>> 28; diff --git a/pulsar-io/kafka-connect-adaptor/pom.xml b/pulsar-io/kafka-connect-adaptor/pom.xml index dcb040840c7a2..0be2a68dc1bcb 100644 --- a/pulsar-io/kafka-connect-adaptor/pom.xml +++ b/pulsar-io/kafka-connect-adaptor/pom.xml @@ -46,6 +46,12 @@ compile + + org.apache.pulsar + pulsar-functions-utils + ${project.version} + + com.fasterxml.jackson.core jackson-databind diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java index 7a908b553a89a..760799e0daa29 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java @@ -40,7 +40,7 @@ import org.apache.kafka.connect.sink.SinkTaskContext; import org.apache.kafka.connect.storage.OffsetBackingStore; import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.util.MessageIdUtils; +import org.apache.pulsar.functions.utils.FunctionCommon; import org.apache.pulsar.io.core.SinkContext; @Slf4j @@ -150,7 +150,7 @@ private void seekAndUpdateOffset(TopicPartition topicPartition, long offset) { try { ctx.seek(desanitizeTopicName.apply(topicPartition.topic()), topicPartition.partition(), - MessageIdUtils.getMessageId(offset)); + FunctionCommon.getMessageId(offset)); } catch (PulsarClientException e) { log.error("Failed to seek topic {} partition {} offset {}", topicPartition.topic(), topicPartition.partition(), offset, e); diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java index 1100b13b425b4..1bcd244200199 100644 --- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java +++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java @@ -64,12 +64,12 @@ import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord; import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema; -import org.apache.pulsar.client.util.MessageIdUtils; import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.functions.source.PulsarRecord; +import org.apache.pulsar.functions.utils.FunctionCommon; import org.apache.pulsar.io.core.SinkContext; import org.apache.pulsar.io.kafka.connect.schema.KafkaConnectData; import org.apache.pulsar.io.kafka.connect.schema.PulsarSchemaToKafkaSchema; @@ -303,8 +303,8 @@ public void seekPauseResumeTest() throws Exception { assertEquals(status.get(), 1); final TopicPartition tp = new TopicPartition("fake-topic", 0); - assertNotEquals(MessageIdUtils.getOffset(msgId), 0); - assertEquals(sink.currentOffset(tp.topic(), tp.partition()), MessageIdUtils.getOffset(msgId)); + assertNotEquals(FunctionCommon.getSequenceId(msgId), 0); + assertEquals(sink.currentOffset(tp.topic(), tp.partition()), FunctionCommon.getSequenceId(msgId)); sink.taskContext.offset(tp, 0); verify(context, times(1)).seek(Mockito.anyString(), Mockito.anyInt(), any()); @@ -347,12 +347,12 @@ public void seekPauseResumeWithSanitizeTest() throws Exception { assertEquals(status.get(), 1); final TopicPartition tp = new TopicPartition(sink.sanitizeNameIfNeeded(pulsarTopicName, true), 0); - assertNotEquals(MessageIdUtils.getOffset(msgId), 0); - assertEquals(sink.currentOffset(tp.topic(), tp.partition()), MessageIdUtils.getOffset(msgId)); + assertNotEquals(FunctionCommon.getSequenceId(msgId), 0); + assertEquals(sink.currentOffset(tp.topic(), tp.partition()), FunctionCommon.getSequenceId(msgId)); sink.taskContext.offset(tp, 0); verify(context, times(1)).seek(pulsarTopicName, - tp.partition(), MessageIdUtils.getMessageId(0)); + tp.partition(), FunctionCommon.getMessageId(0)); assertEquals(sink.currentOffset(tp.topic(), tp.partition()), 0); sink.taskContext.pause(tp);