Skip to content

Commit

Permalink
[fix][broker] Partitioned shadow topic not work properly (#22797)
Browse files Browse the repository at this point in the history
  • Loading branch information
RobertIndie committed Jun 26, 2024
1 parent 20eb3c6 commit 2c6fcc7
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2198,8 +2198,13 @@ protected CompletableFuture<Void> addShadowReplicationCluster(String shadowTopic
.thenAccept(replicationClient -> {
Replicator replicator = shadowReplicators.computeIfAbsent(shadowTopic, r -> {
try {
return new ShadowReplicator(shadowTopic, PersistentTopic.this, cursor, brokerService,
(PulsarClientImpl) replicationClient);
TopicName sourceTopicName = TopicName.get(getName());
String shadowPartitionTopic = shadowTopic;
if (sourceTopicName.isPartitioned()) {
shadowPartitionTopic += "-partition-" + sourceTopicName.getPartitionIndex();
}
return new ShadowReplicator(shadowPartitionTopic, PersistentTopic.this, cursor,
brokerService, (PulsarClientImpl) replicationClient);
} catch (PulsarServerException e) {
log.error("[{}] ShadowReplicator startup failed {}", topic, shadowTopic, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import com.google.common.collect.Lists;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import lombok.AllArgsConstructor;
import lombok.Cleanup;
Expand Down Expand Up @@ -113,6 +115,35 @@ public void testPartitionedShadowTopicSetup() throws Exception {
Assert.assertEquals(brokerShadowTopic.getShadowSourceTopic().get().toString(), sourceTopicPartition);
}

@Test
public void testPartitionedShadowTopicProduceAndConsume() throws Exception {
String sourceTopic = newShadowSourceTopicName();
String shadowTopic = sourceTopic + "-shadow";
admin.topics().createPartitionedTopic(sourceTopic, 3);
admin.topics().createShadowTopic(shadowTopic, sourceTopic);

admin.topics().setShadowTopics(sourceTopic, Lists.newArrayList(shadowTopic));

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();
@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(shadowTopic).subscriptionName("test")
.subscribe();

for (int i = 0; i < 10; i++) {
producer.send("msg-" + i);
}

Set<String> set = new HashSet<>();
for (int i = 0; i < 10; i++) {
Message<String> msg = consumer.receive();
set.add(msg.getValue());
}
for (int i = 0; i < 10; i++) {
Assert.assertTrue(set.contains("msg-" + i));
}
}

@Test
public void testShadowTopicNotWritable() throws Exception {
String sourceTopic = newShadowSourceTopicName();
Expand Down

0 comments on commit 2c6fcc7

Please sign in to comment.