Skip to content

Commit

Permalink
[INLONG-10522][SDK] SortSDK support assgin subscription (apache#10523)
Browse files Browse the repository at this point in the history
  • Loading branch information
vernedeng authored Jun 27, 2024
1 parent 396ae72 commit f1679e3
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class SortClientConfig implements Serializable {
private static final long serialVersionUID = -7531960714809683830L;

private final String sortTaskId;
private final String subscription;
private final String sortClusterName;
private InLongTopicChangeListener assignmentsListener;
private ReadCallback callback;
Expand Down Expand Up @@ -83,11 +84,22 @@ public SortClientConfig(
InLongTopicChangeListener assignmentsListener,
ConsumeStrategy consumeStrategy,
String localIp) {
this(sortTaskId, sortClusterName, assignmentsListener, consumeStrategy, localIp, sortTaskId);
}

public SortClientConfig(
String sortTaskId,
String sortClusterName,
InLongTopicChangeListener assignmentsListener,
ConsumeStrategy consumeStrategy,
String localIp,
String subscription) {
this.sortTaskId = sortTaskId;
this.sortClusterName = sortClusterName;
this.assignmentsListener = assignmentsListener;
this.consumeStrategy = consumeStrategy;
this.localIp = localIp;
this.subscription = subscription;
}

public boolean isStopConsume() {
Expand All @@ -102,6 +114,10 @@ public String getSortTaskId() {
return sortTaskId;
}

public String getSubscription() {
return subscription;
}

public String getSortClusterName() {
return sortClusterName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public boolean init() {
private KafkaConsumer<byte[], byte[]> createKafkaConsumer() {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, context.getConfig().getSortTaskId());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, context.getConfig().getSubscription());
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public List<InLongTopic> getTopics() {
private void createKafkaConsumer(String bootstrapServers) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, context.getConfig().getSortTaskId());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, context.getConfig().getSubscription());
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ private Consumer<byte[]> createConsumer(Collection<InLongTopic> newTopics) {
.collect(Collectors.toList());
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topics(topicNames)
.subscriptionName(context.getConfig().getSortTaskId())
.subscriptionName(context.getConfig().getSubscription())
.subscriptionType(SubscriptionType.Shared)
.startMessageIdInclusive()
.subscriptionInitialPosition(position)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ private boolean createConsumer(PulsarClient client) {

consumer = client.newConsumer(Schema.BYTES)
.topic(topic.getTopic())
.subscriptionName(context.getConfig().getSortTaskId())
.subscriptionName(context.getConfig().getSubscription())
.subscriptionType(SubscriptionType.Shared)
.startMessageIdInclusive()
.subscriptionInitialPosition(position)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public boolean init() {
TubeClientConfig tubeClientConfig = tubeConsumerCreator.getTubeClientConfig();
try {
ConsumerConfig consumerConfig = new ConsumerConfig(tubeClientConfig.getMasterInfo(),
context.getConfig().getSortTaskId());
context.getConfig().getSubscription());

messageConsumer = tubeConsumerCreator.getMessageSessionFactory().createPullConsumer(consumerConfig);
if (messageConsumer != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public void setUp() throws Exception {

when(clientContext.getConfig()).thenReturn(sortClientConfig);
when(sortClientConfig.getSortTaskId()).thenReturn("sortTaskId");
when(sortClientConfig.getSubscription()).thenReturn("sortTaskId");

}

Expand Down

0 comments on commit f1679e3

Please sign in to comment.