Skip to content

Commit

Permalink
Add solution for intercepting topic, subscription and dispatcher inst…
Browse files Browse the repository at this point in the history
…ances in tests
  • Loading branch information
lhotari committed Jan 24, 2025
1 parent 798a014 commit 4a80067
Show file tree
Hide file tree
Showing 4 changed files with 221 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service;

import java.io.Closeable;
import java.io.IOException;
import org.apache.bookkeeper.mledger.ManagedLedger;

/**
Expand All @@ -28,4 +29,8 @@
public interface TopicFactory extends Closeable {

<T extends Topic> T create(String topic, ManagedLedger ledger, BrokerService brokerService, Class<T> topicClazz);

default void close() throws IOException {
// default implementation
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -250,70 +250,10 @@ private CompletableFuture<Void> addConsumerInternal(Consumer consumer) {
}

if (dispatcher == null || !dispatcher.isConsumerConnected()) {
Dispatcher previousDispatcher = null;
switch (consumer.subType()) {
case Exclusive:
if (dispatcher == null || dispatcher.getType() != SubType.Exclusive) {
previousDispatcher = dispatcher;
dispatcher = new PersistentDispatcherSingleActiveConsumer(
cursor, SubType.Exclusive, 0, topic, this);
}
break;
case Shared:
if (dispatcher == null || dispatcher.getType() != SubType.Shared) {
previousDispatcher = dispatcher;
if (config.isSubscriptionSharedUseClassicPersistentImplementation()) {
dispatcher = new PersistentDispatcherMultipleConsumersClassic(topic, cursor, this);
} else {
dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursor, this);
}
}
break;
case Failover:
int partitionIndex = TopicName.getPartitionIndex(topicName);
if (partitionIndex < 0) {
// For non partition topics, use a negative index so
// dispatcher won't sort consumers before picking
// an active consumer for the topic.
partitionIndex = -1;
}

if (dispatcher == null || dispatcher.getType() != SubType.Failover) {
previousDispatcher = dispatcher;
dispatcher = new PersistentDispatcherSingleActiveConsumer(cursor, SubType.Failover,
partitionIndex, topic, this);
}
break;
case Key_Shared:
KeySharedMeta ksm = consumer.getKeySharedMeta();
if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared
|| !((StickyKeyDispatcher) dispatcher)
.hasSameKeySharedPolicy(ksm)) {
previousDispatcher = dispatcher;
if (config.isSubscriptionKeySharedUseClassicPersistentImplementation()) {
dispatcher =
new PersistentStickyKeyDispatcherMultipleConsumersClassic(topic, cursor,
this,
topic.getBrokerService().getPulsar().getConfiguration(), ksm);
} else {
dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(topic, cursor, this,
topic.getBrokerService().getPulsar().getConfiguration(), ksm);
}
}
break;
default:
return FutureUtil.failedFuture(
new ServerMetadataException("Unsupported subscription type"));
}

if (previousDispatcher != null) {
previousDispatcher.close().thenRun(() -> {
log.info("[{}][{}] Successfully closed previous dispatcher", topicName, subName);
}).exceptionally(ex -> {
log.error("[{}][{}] Failed to close previous dispatcher", topicName, subName, ex);
return null;
});
if (consumer.subType() == null) {
return FutureUtil.failedFuture(new ServerMetadataException("Unsupported subscription type"));
}
dispatcher = reuseOrCreateDispatcher(dispatcher, consumer);
} else {
Optional<CompletableFuture<Void>> compatibilityError =
checkForConsumerCompatibilityErrorWithDispatcher(dispatcher, consumer);
Expand All @@ -327,6 +267,79 @@ private CompletableFuture<Void> addConsumerInternal(Consumer consumer) {
});
}

/**
* Create a new dispatcher or reuse the existing one when it's compatible with the new consumer.
* This protected method can be overridded for testing purpose for injecting test dispatcher instances with
* special behaviors.
* @param dispatcher the existing dispatcher
* @param consumer the new consumer
* @return the dispatcher to use, either the existing one or a new one
*/
protected Dispatcher reuseOrCreateDispatcher(Dispatcher dispatcher, Consumer consumer) {
Dispatcher previousDispatcher = null;
switch (consumer.subType()) {
case Exclusive:
if (dispatcher == null || dispatcher.getType() != SubType.Exclusive) {
previousDispatcher = dispatcher;
dispatcher = new PersistentDispatcherSingleActiveConsumer(
cursor, SubType.Exclusive, 0, topic, this);
}
break;
case Shared:
if (dispatcher == null || dispatcher.getType() != SubType.Shared) {
previousDispatcher = dispatcher;
if (config.isSubscriptionSharedUseClassicPersistentImplementation()) {
dispatcher = new PersistentDispatcherMultipleConsumersClassic(topic, cursor, this);
} else {
dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursor, this);
}
}
break;
case Failover:
int partitionIndex = TopicName.getPartitionIndex(topicName);
if (partitionIndex < 0) {
// For non partition topics, use a negative index so
// dispatcher won't sort consumers before picking
// an active consumer for the topic.
partitionIndex = -1;
}

if (dispatcher == null || dispatcher.getType() != SubType.Failover) {
previousDispatcher = dispatcher;
dispatcher = new PersistentDispatcherSingleActiveConsumer(cursor, SubType.Failover,
partitionIndex, topic, this);
}
break;
case Key_Shared:
KeySharedMeta ksm = consumer.getKeySharedMeta();
if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared
|| !((StickyKeyDispatcher) dispatcher)
.hasSameKeySharedPolicy(ksm)) {
previousDispatcher = dispatcher;
if (config.isSubscriptionKeySharedUseClassicPersistentImplementation()) {
dispatcher =
new PersistentStickyKeyDispatcherMultipleConsumersClassic(topic, cursor,
this, config, ksm);
} else {
dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(topic, cursor, this,
config, ksm);
}
}
break;
}

if (previousDispatcher != null) {
previousDispatcher.close().thenRun(() -> {
log.info("[{}][{}] Successfully closed previous dispatcher", topicName, subName);
}).exceptionally(ex -> {
log.error("[{}][{}] Failed to close previous dispatcher", topicName, subName, ex);
return null;
});
}

return dispatcher;
}

@Override
public synchronized void removeConsumer(Consumer consumer, boolean isResetCursor) throws BrokerServiceException {
cursor.updateLastActive();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,17 @@ public CompletableFuture<Void> unloadSubscription(@Nonnull String subName) {
});
}

private PersistentSubscription createPersistentSubscription(String subscriptionName, ManagedCursor cursor,

/**
* Create a new subscription instance for the topic.
* This protected method can be overridden in tests to return a special test implementation instance.
* @param subscriptionName the name of the subscription
* @param cursor the cursor to use for the subscription
* @param replicated the subscription replication flag
* @param subscriptionProperties the subscription properties
* @return the subscription instance
*/
protected PersistentSubscription createPersistentSubscription(String subscriptionName, ManagedCursor cursor,
Boolean replicated, Map<String, String> subscriptionProperties) {
requireNonNull(topicCompactionService);
if (isCompactionSubscription(subscriptionName)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.testinterceptor;

import java.util.Map;
import java.util.function.Function;
import lombok.Getter;
import lombok.Setter;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.TopicFactory;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;

/**
* A test interceptor for broker tests that allows to decorate persistent topics, subscriptions and dispatchers.
*/
public class BrokerTestInterceptor {
public static final BrokerTestInterceptor INSTANCE = new BrokerTestInterceptor();

// Suppress default constructor for noninstantiability
private BrokerTestInterceptor() {

}

public static class TestTopicFactory implements TopicFactory {
@Override
public <T extends Topic> T create(String topic, ManagedLedger ledger, BrokerService brokerService,
Class<T> topicClazz) {
if (!topicClazz.isAssignableFrom(PersistentTopic.class)) {
throw new UnsupportedOperationException("Unsupported topic class");
}
return topicClazz.cast(
INSTANCE.getPersistentTopicDecorator().apply(new TestTopic(topic, ledger, brokerService)));
}
}

static class TestTopic extends PersistentTopic {

public TestTopic(String topic, ManagedLedger ledger, BrokerService brokerService) {
super(topic, ledger, brokerService);
}

@Override
protected PersistentSubscription createPersistentSubscription(String subscriptionName, ManagedCursor cursor,
Boolean replicated,
Map<String, String> subscriptionProperties) {
return INSTANCE.getPersistentSubscriptionDecorator()
.apply(new TestSubscription(this, subscriptionName, cursor, replicated, subscriptionProperties));
}
}

static class TestSubscription extends PersistentSubscription {
public TestSubscription(PersistentTopic topic, String subscriptionName, ManagedCursor cursor,
Boolean replicated,
Map<String, String> subscriptionProperties) {
super(topic, subscriptionName, cursor, replicated, subscriptionProperties);
}

@Override
protected Dispatcher reuseOrCreateDispatcher(Dispatcher dispatcher,
Consumer consumer) {
Dispatcher previousInstance = dispatcher;
dispatcher = super.reuseOrCreateDispatcher(dispatcher, consumer);
if (dispatcher != previousInstance) {
dispatcher = INSTANCE.getDispatcherDecorator().apply(dispatcher);
}
return dispatcher;
}
}

@Getter
@Setter
private Function<PersistentTopic, PersistentTopic> persistentTopicDecorator = Function.identity();

@Getter
@Setter
private Function<PersistentSubscription, PersistentSubscription> persistentSubscriptionDecorator = Function.identity();

@Getter
@Setter
private Function<Dispatcher, Dispatcher> dispatcherDecorator = Function.identity();

public void reset() {
persistentTopicDecorator = Function.identity();
persistentSubscriptionDecorator = Function.identity();
dispatcherDecorator = Function.identity();
}

public void configure(ServiceConfiguration conf) {
conf.setTopicFactoryClassName(TestTopicFactory.class.getName());
}

public <T extends Dispatcher> void applyDispatcherSpyDecorator(Class<T> dispatcherClass,
java.util.function.Consumer<T> spyCustomizer) {
setDispatcherDecorator(createDispatcherSpyDecorator(dispatcherClass, spyCustomizer));
}

public static <T extends Dispatcher> Function<Dispatcher, Dispatcher> createDispatcherSpyDecorator(
Class<T> dispatcherClass, java.util.function.Consumer<T> spyCustomizer) {
return dispatcher -> {
Dispatcher spy = BrokerTestUtil.spyWithoutRecordingInvocations(dispatcher);
spyCustomizer.accept(dispatcherClass.cast(spy));
return spy;
};
}
}

0 comments on commit 4a80067

Please sign in to comment.