diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 19abb22b01706..76e1df0d93d7f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -134,7 +134,7 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener USER_CREATED_PRODUCER_COUNTER_UPDATER = AtomicIntegerFieldUpdater.newUpdater(AbstractTopic.class, "userCreatedProducerCount"); - private volatile int userCreatedProducerCount = 0; + protected volatile int userCreatedProducerCount = 0; protected volatile Optional topicEpoch = Optional.empty(); private volatile boolean hasExclusiveProducer; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 3579f63503a0a..14074fc280bd6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -2868,7 +2868,7 @@ public CompletableFuture addSchemaIfIdleOrCheckCompatible(SchemaData schem .mapToInt(subscription -> subscription.getConsumers().size()) .sum(); if (hasSchema - || (!producers.isEmpty()) + || (userCreatedProducerCount > 0) || (numActiveConsumers != 0) || (ledger.getTotalSize() != 0)) { return checkSchemaCompatibleForConsumer(schema); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index 83f4040aa40e7..73a8aca13a9a0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -19,13 +19,19 @@ package org.apache.pulsar.broker.service; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import java.util.Optional; import java.util.concurrent.TimeUnit; import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.policies.data.TopicStats; import org.junit.Assert; +import org.awaitility.Awaitility; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -75,4 +81,29 @@ public void testReplicatorProducerStatInTopic() throws Exception { admin2.topics().delete(topicName); }); } + + @Test + public void testCreateRemoteConsumerFirst() throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp_"); + Producer producer1 = client1.newProducer(Schema.STRING).topic(topicName).create(); + // Wait for replicator started. + Awaitility.await().untilAsserted(() -> { + Optional topicOptional2 = pulsar2.getBrokerService().getTopic(topicName, false).get(); + assertTrue(topicOptional2.isPresent()); + PersistentTopic persistentTopic2 = (PersistentTopic) topicOptional2.get(); + assertFalse(persistentTopic2.getProducers().isEmpty()); + }); + // The topic in cluster2 has a replicator created producer(schema Auto_Produce), but does not have any schema。 + // Verify: the consumer of this cluster2 can create successfully. + Consumer consumer2 = client2.newConsumer(Schema.STRING).topic(topicName).subscriptionName("s1") + .subscribe();; + + // cleanup. + producer1.close(); + consumer2.close(); + cleanupTopics(() -> { + admin1.topics().delete(topicName); + admin2.topics().delete(topicName); + }); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java index 7133684721c9b..dcd6671780a6d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java @@ -1179,6 +1179,33 @@ private void testIncompatibleSchema() throws Exception { } } + /** + * This test just ensure that schema check still keeps the original logic: if there has any producer, but no schema + * was registered, the new consumer could not register new schema. + * TODO: I think this design should be improved: if a producer used "AUTO_PRODUCE_BYTES" schema, we should allow + * the new consumer to register new schema. But before we can solve this problem, we need to modify + * "CmdProducer" to let the Broker know that the Producer uses a schema of type "AUTO_PRODUCE_BYTES". + */ + @Test + public void testAutoProduceAndSpecifiedConsumer() throws Exception { + final String namespace = PUBLIC_TENANT + "/ns_" + randomName(16); + admin.namespaces().createNamespace(namespace, Sets.newHashSet(CLUSTER_NAME)); + final String topicName = "persistent://" + namespace + "/tp_" + randomName(16); + admin.topics().createNonPartitionedTopic(topicName); + + Producer producer = pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES()).topic(topicName).create(); + try { + pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName("sub1").subscribe(); + fail("Should throw ex: Topic does not have schema to check"); + } catch (Exception ex){ + assertTrue(ex.getMessage().contains("Topic does not have schema to check")); + } + + // Cleanup. + producer.close(); + admin.topics().delete(topicName); + } + @Test public void testCreateSchemaInParallel() throws Exception { final String namespace = "test-namespace-" + randomName(16);