diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicFactory.java index f8bac9d8134b0..523f995cc5dc9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicFactory.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.service; import java.io.Closeable; +import java.io.IOException; import org.apache.bookkeeper.mledger.ManagedLedger; /** @@ -28,4 +29,8 @@ public interface TopicFactory extends Closeable { T create(String topic, ManagedLedger ledger, BrokerService brokerService, Class topicClazz); + + default void close() throws IOException { + // default implementation + } } \ No newline at end of file diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index a96a7e75506eb..275d1ae5818b0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -250,70 +250,10 @@ private CompletableFuture addConsumerInternal(Consumer consumer) { } if (dispatcher == null || !dispatcher.isConsumerConnected()) { - Dispatcher previousDispatcher = null; - switch (consumer.subType()) { - case Exclusive: - if (dispatcher == null || dispatcher.getType() != SubType.Exclusive) { - previousDispatcher = dispatcher; - dispatcher = new PersistentDispatcherSingleActiveConsumer( - cursor, SubType.Exclusive, 0, topic, this); - } - break; - case Shared: - if (dispatcher == null || dispatcher.getType() != SubType.Shared) { - previousDispatcher = dispatcher; - if (config.isSubscriptionSharedUseClassicPersistentImplementation()) { - dispatcher = new PersistentDispatcherMultipleConsumersClassic(topic, cursor, this); - } else { - dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursor, this); - } - } - break; - case Failover: - int partitionIndex = TopicName.getPartitionIndex(topicName); - if (partitionIndex < 0) { - // For non partition topics, use a negative index so - // dispatcher won't sort consumers before picking - // an active consumer for the topic. - partitionIndex = -1; - } - - if (dispatcher == null || dispatcher.getType() != SubType.Failover) { - previousDispatcher = dispatcher; - dispatcher = new PersistentDispatcherSingleActiveConsumer(cursor, SubType.Failover, - partitionIndex, topic, this); - } - break; - case Key_Shared: - KeySharedMeta ksm = consumer.getKeySharedMeta(); - if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared - || !((StickyKeyDispatcher) dispatcher) - .hasSameKeySharedPolicy(ksm)) { - previousDispatcher = dispatcher; - if (config.isSubscriptionKeySharedUseClassicPersistentImplementation()) { - dispatcher = - new PersistentStickyKeyDispatcherMultipleConsumersClassic(topic, cursor, - this, - topic.getBrokerService().getPulsar().getConfiguration(), ksm); - } else { - dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(topic, cursor, this, - topic.getBrokerService().getPulsar().getConfiguration(), ksm); - } - } - break; - default: - return FutureUtil.failedFuture( - new ServerMetadataException("Unsupported subscription type")); - } - - if (previousDispatcher != null) { - previousDispatcher.close().thenRun(() -> { - log.info("[{}][{}] Successfully closed previous dispatcher", topicName, subName); - }).exceptionally(ex -> { - log.error("[{}][{}] Failed to close previous dispatcher", topicName, subName, ex); - return null; - }); + if (consumer.subType() == null) { + return FutureUtil.failedFuture(new ServerMetadataException("Unsupported subscription type")); } + dispatcher = reuseOrCreateDispatcher(dispatcher, consumer); } else { Optional> compatibilityError = checkForConsumerCompatibilityErrorWithDispatcher(dispatcher, consumer); @@ -327,6 +267,79 @@ private CompletableFuture addConsumerInternal(Consumer consumer) { }); } + /** + * Create a new dispatcher or reuse the existing one when it's compatible with the new consumer. + * This protected method can be overridded for testing purpose for injecting test dispatcher instances with + * special behaviors. + * @param dispatcher the existing dispatcher + * @param consumer the new consumer + * @return the dispatcher to use, either the existing one or a new one + */ + protected Dispatcher reuseOrCreateDispatcher(Dispatcher dispatcher, Consumer consumer) { + Dispatcher previousDispatcher = null; + switch (consumer.subType()) { + case Exclusive: + if (dispatcher == null || dispatcher.getType() != SubType.Exclusive) { + previousDispatcher = dispatcher; + dispatcher = new PersistentDispatcherSingleActiveConsumer( + cursor, SubType.Exclusive, 0, topic, this); + } + break; + case Shared: + if (dispatcher == null || dispatcher.getType() != SubType.Shared) { + previousDispatcher = dispatcher; + if (config.isSubscriptionSharedUseClassicPersistentImplementation()) { + dispatcher = new PersistentDispatcherMultipleConsumersClassic(topic, cursor, this); + } else { + dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursor, this); + } + } + break; + case Failover: + int partitionIndex = TopicName.getPartitionIndex(topicName); + if (partitionIndex < 0) { + // For non partition topics, use a negative index so + // dispatcher won't sort consumers before picking + // an active consumer for the topic. + partitionIndex = -1; + } + + if (dispatcher == null || dispatcher.getType() != SubType.Failover) { + previousDispatcher = dispatcher; + dispatcher = new PersistentDispatcherSingleActiveConsumer(cursor, SubType.Failover, + partitionIndex, topic, this); + } + break; + case Key_Shared: + KeySharedMeta ksm = consumer.getKeySharedMeta(); + if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared + || !((StickyKeyDispatcher) dispatcher) + .hasSameKeySharedPolicy(ksm)) { + previousDispatcher = dispatcher; + if (config.isSubscriptionKeySharedUseClassicPersistentImplementation()) { + dispatcher = + new PersistentStickyKeyDispatcherMultipleConsumersClassic(topic, cursor, + this, config, ksm); + } else { + dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(topic, cursor, this, + config, ksm); + } + } + break; + } + + if (previousDispatcher != null) { + previousDispatcher.close().thenRun(() -> { + log.info("[{}][{}] Successfully closed previous dispatcher", topicName, subName); + }).exceptionally(ex -> { + log.error("[{}][{}] Failed to close previous dispatcher", topicName, subName, ex); + return null; + }); + } + + return dispatcher; + } + @Override public synchronized void removeConsumer(Consumer consumer, boolean isResetCursor) throws BrokerServiceException { cursor.updateLastActive(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 2325c8286a1be..e920c483bb3ea 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -586,7 +586,17 @@ public CompletableFuture unloadSubscription(@Nonnull String subName) { }); } - private PersistentSubscription createPersistentSubscription(String subscriptionName, ManagedCursor cursor, + + /** + * Create a new subscription instance for the topic. + * This protected method can be overridden in tests to return a special test implementation instance. + * @param subscriptionName the name of the subscription + * @param cursor the cursor to use for the subscription + * @param replicated the subscription replication flag + * @param subscriptionProperties the subscription properties + * @return the subscription instance + */ + protected PersistentSubscription createPersistentSubscription(String subscriptionName, ManagedCursor cursor, Boolean replicated, Map subscriptionProperties) { requireNonNull(topicCompactionService); if (isCompactionSubscription(subscriptionName) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testinterceptor/BrokerTestInterceptor.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testinterceptor/BrokerTestInterceptor.java new file mode 100644 index 0000000000000..ad0b2216587b6 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testinterceptor/BrokerTestInterceptor.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.testinterceptor; + +import java.util.Map; +import java.util.function.Function; +import lombok.Getter; +import lombok.Setter; +import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.service.Consumer; +import org.apache.pulsar.broker.service.Dispatcher; +import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.service.TopicFactory; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; + +/** + * A test interceptor for broker tests that allows to decorate persistent topics, subscriptions and dispatchers. + */ +public class BrokerTestInterceptor { + public static final BrokerTestInterceptor INSTANCE = new BrokerTestInterceptor(); + + // Suppress default constructor for noninstantiability + private BrokerTestInterceptor() { + + } + + public static class TestTopicFactory implements TopicFactory { + @Override + public T create(String topic, ManagedLedger ledger, BrokerService brokerService, + Class topicClazz) { + if (!topicClazz.isAssignableFrom(PersistentTopic.class)) { + throw new UnsupportedOperationException("Unsupported topic class"); + } + return topicClazz.cast( + INSTANCE.getPersistentTopicDecorator().apply(new TestTopic(topic, ledger, brokerService))); + } + } + + static class TestTopic extends PersistentTopic { + + public TestTopic(String topic, ManagedLedger ledger, BrokerService brokerService) { + super(topic, ledger, brokerService); + } + + @Override + protected PersistentSubscription createPersistentSubscription(String subscriptionName, ManagedCursor cursor, + Boolean replicated, + Map subscriptionProperties) { + return INSTANCE.getPersistentSubscriptionDecorator() + .apply(new TestSubscription(this, subscriptionName, cursor, replicated, subscriptionProperties)); + } + } + + static class TestSubscription extends PersistentSubscription { + public TestSubscription(PersistentTopic topic, String subscriptionName, ManagedCursor cursor, + Boolean replicated, + Map subscriptionProperties) { + super(topic, subscriptionName, cursor, replicated, subscriptionProperties); + } + + @Override + protected Dispatcher reuseOrCreateDispatcher(Dispatcher dispatcher, + Consumer consumer) { + Dispatcher previousInstance = dispatcher; + dispatcher = super.reuseOrCreateDispatcher(dispatcher, consumer); + if (dispatcher != previousInstance) { + dispatcher = INSTANCE.getDispatcherDecorator().apply(dispatcher); + } + return dispatcher; + } + } + + @Getter + @Setter + private Function persistentTopicDecorator = Function.identity(); + + @Getter + @Setter + private Function persistentSubscriptionDecorator = Function.identity(); + + @Getter + @Setter + private Function dispatcherDecorator = Function.identity(); + + public void reset() { + persistentTopicDecorator = Function.identity(); + persistentSubscriptionDecorator = Function.identity(); + dispatcherDecorator = Function.identity(); + } + + public void configure(ServiceConfiguration conf) { + conf.setTopicFactoryClassName(TestTopicFactory.class.getName()); + } + + public void applyDispatcherSpyDecorator(Class dispatcherClass, + java.util.function.Consumer spyCustomizer) { + setDispatcherDecorator(createDispatcherSpyDecorator(dispatcherClass, spyCustomizer)); + } + + public static Function createDispatcherSpyDecorator( + Class dispatcherClass, java.util.function.Consumer spyCustomizer) { + return dispatcher -> { + Dispatcher spy = BrokerTestUtil.spyWithoutRecordingInvocations(dispatcher); + spyCustomizer.accept(dispatcherClass.cast(spy)); + return spy; + }; + } +}