diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/util/MessageIdUtils.java b/pulsar-client/src/main/java/org/apache/pulsar/client/util/MessageIdUtils.java
index 60cdad8e77200..459a31ee72022 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/util/MessageIdUtils.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/MessageIdUtils.java
@@ -22,6 +22,7 @@
import org.apache.pulsar.client.impl.MessageIdImpl;
public class MessageIdUtils {
+ @Deprecated
public static final long getOffset(MessageId messageId) {
MessageIdImpl msgId = (MessageIdImpl) messageId;
long ledgerId = msgId.getLedgerId();
@@ -34,6 +35,7 @@ public static final long getOffset(MessageId messageId) {
return offset;
}
+ @Deprecated
public static final MessageId getMessageId(long offset) {
// Demultiplex ledgerId and entryId from offset
long ledgerId = offset >>> 28;
diff --git a/pulsar-io/kafka-connect-adaptor/pom.xml b/pulsar-io/kafka-connect-adaptor/pom.xml
index dcb040840c7a2..0be2a68dc1bcb 100644
--- a/pulsar-io/kafka-connect-adaptor/pom.xml
+++ b/pulsar-io/kafka-connect-adaptor/pom.xml
@@ -46,6 +46,12 @@
compile
+
+ org.apache.pulsar
+ pulsar-functions-utils
+ ${project.version}
+
+
com.fasterxml.jackson.core
jackson-databind
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
index 7a908b553a89a..760799e0daa29 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
@@ -40,7 +40,7 @@
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.util.MessageIdUtils;
+import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.io.core.SinkContext;
@Slf4j
@@ -150,7 +150,7 @@ private void seekAndUpdateOffset(TopicPartition topicPartition, long offset) {
try {
ctx.seek(desanitizeTopicName.apply(topicPartition.topic()),
topicPartition.partition(),
- MessageIdUtils.getMessageId(offset));
+ FunctionCommon.getMessageId(offset));
} catch (PulsarClientException e) {
log.error("Failed to seek topic {} partition {} offset {}",
topicPartition.topic(), topicPartition.partition(), offset, e);
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index 1100b13b425b4..1bcd244200199 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -64,12 +64,12 @@
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
-import org.apache.pulsar.client.util.MessageIdUtils;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.source.PulsarRecord;
+import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.kafka.connect.schema.KafkaConnectData;
import org.apache.pulsar.io.kafka.connect.schema.PulsarSchemaToKafkaSchema;
@@ -303,8 +303,8 @@ public void seekPauseResumeTest() throws Exception {
assertEquals(status.get(), 1);
final TopicPartition tp = new TopicPartition("fake-topic", 0);
- assertNotEquals(MessageIdUtils.getOffset(msgId), 0);
- assertEquals(sink.currentOffset(tp.topic(), tp.partition()), MessageIdUtils.getOffset(msgId));
+ assertNotEquals(FunctionCommon.getSequenceId(msgId), 0);
+ assertEquals(sink.currentOffset(tp.topic(), tp.partition()), FunctionCommon.getSequenceId(msgId));
sink.taskContext.offset(tp, 0);
verify(context, times(1)).seek(Mockito.anyString(), Mockito.anyInt(), any());
@@ -347,12 +347,12 @@ public void seekPauseResumeWithSanitizeTest() throws Exception {
assertEquals(status.get(), 1);
final TopicPartition tp = new TopicPartition(sink.sanitizeNameIfNeeded(pulsarTopicName, true), 0);
- assertNotEquals(MessageIdUtils.getOffset(msgId), 0);
- assertEquals(sink.currentOffset(tp.topic(), tp.partition()), MessageIdUtils.getOffset(msgId));
+ assertNotEquals(FunctionCommon.getSequenceId(msgId), 0);
+ assertEquals(sink.currentOffset(tp.topic(), tp.partition()), FunctionCommon.getSequenceId(msgId));
sink.taskContext.offset(tp, 0);
verify(context, times(1)).seek(pulsarTopicName,
- tp.partition(), MessageIdUtils.getMessageId(0));
+ tp.partition(), FunctionCommon.getMessageId(0));
assertEquals(sink.currentOffset(tp.topic(), tp.partition()), 0);
sink.taskContext.pause(tp);