Skip to content

Commit

Permalink
[fix][broker] In replication scenario, remote consumer could not be r…
Browse files Browse the repository at this point in the history
…egistered if there has no message was sent (#20888)

Motivation: In the replication scenario, we want to produce messages on the native cluster and consume messages on the remote cluster, the producer and consumer both use a same schema, but the consumer cannot be registered if there has no messages in the topic yet.The root cause is that for the remote cluster, there is a producer who has been registered with `AUTO_PRODUCE_BYTES` schema, so there is no schema to check the compatibility.

Modifications: If there is no schema and only the replicator producer was registered, skip the compatibility check.
(cherry picked from commit 9be0b52)
  • Loading branch information
poorbarcode committed Jul 28, 2023
1 parent ebf9961 commit 54359b6
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP

private static final AtomicIntegerFieldUpdater<AbstractTopic> USER_CREATED_PRODUCER_COUNTER_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(AbstractTopic.class, "userCreatedProducerCount");
private volatile int userCreatedProducerCount = 0;
protected volatile int userCreatedProducerCount = 0;

protected volatile Optional<Long> topicEpoch = Optional.empty();
private volatile boolean hasExclusiveProducer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2868,7 +2868,7 @@ public CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData schem
.mapToInt(subscription -> subscription.getConsumers().size())
.sum();
if (hasSchema
|| (!producers.isEmpty())
|| (userCreatedProducerCount > 0)
|| (numActiveConsumers != 0)
|| (ledger.getTotalSize() != 0)) {
return checkSchemaCompatibleForConsumer(schema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,19 @@
package org.apache.pulsar.broker.service;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.junit.Assert;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -75,4 +81,29 @@ public void testReplicatorProducerStatInTopic() throws Exception {
admin2.topics().delete(topicName);
});
}

@Test
public void testCreateRemoteConsumerFirst() throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp_");
Producer<String> producer1 = client1.newProducer(Schema.STRING).topic(topicName).create();
// Wait for replicator started.
Awaitility.await().untilAsserted(() -> {
Optional<Topic> topicOptional2 = pulsar2.getBrokerService().getTopic(topicName, false).get();
assertTrue(topicOptional2.isPresent());
PersistentTopic persistentTopic2 = (PersistentTopic) topicOptional2.get();
assertFalse(persistentTopic2.getProducers().isEmpty());
});
// The topic in cluster2 has a replicator created producer(schema Auto_Produce), but does not have any schema。
// Verify: the consumer of this cluster2 can create successfully.
Consumer<String> consumer2 = client2.newConsumer(Schema.STRING).topic(topicName).subscriptionName("s1")
.subscribe();;

// cleanup.
producer1.close();
consumer2.close();
cleanupTopics(() -> {
admin1.topics().delete(topicName);
admin2.topics().delete(topicName);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1179,6 +1179,33 @@ private void testIncompatibleSchema() throws Exception {
}
}

/**
* This test just ensure that schema check still keeps the original logic: if there has any producer, but no schema
* was registered, the new consumer could not register new schema.
* TODO: I think this design should be improved: if a producer used "AUTO_PRODUCE_BYTES" schema, we should allow
* the new consumer to register new schema. But before we can solve this problem, we need to modify
* "CmdProducer" to let the Broker know that the Producer uses a schema of type "AUTO_PRODUCE_BYTES".
*/
@Test
public void testAutoProduceAndSpecifiedConsumer() throws Exception {
final String namespace = PUBLIC_TENANT + "/ns_" + randomName(16);
admin.namespaces().createNamespace(namespace, Sets.newHashSet(CLUSTER_NAME));
final String topicName = "persistent://" + namespace + "/tp_" + randomName(16);
admin.topics().createNonPartitionedTopic(topicName);

Producer producer = pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES()).topic(topicName).create();
try {
pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName("sub1").subscribe();
fail("Should throw ex: Topic does not have schema to check");
} catch (Exception ex){
assertTrue(ex.getMessage().contains("Topic does not have schema to check"));
}

// Cleanup.
producer.close();
admin.topics().delete(topicName);
}

@Test
public void testCreateSchemaInParallel() throws Exception {
final String namespace = "test-namespace-" + randomName(16);
Expand Down

0 comments on commit 54359b6

Please sign in to comment.