Skip to content

Commit

Permalink
[improve][test] Support decorating topic, subscription, dispatcher, M…
Browse files Browse the repository at this point in the history
…anagedLedger and ManagedCursors instances in tests (#23892)
  • Loading branch information
lhotari authored Jan 25, 2025
1 parent 223eea0 commit 2a9d4ac
Show file tree
Hide file tree
Showing 7 changed files with 333 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -410,11 +410,8 @@ public void asyncOpen(final String name, final ManagedLedgerConfig config, final
new EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(),
config.getBookKeeperEnsemblePlacementPolicyProperties()))
.thenAccept(bk -> {
final ManagedLedgerImpl newledger = config.getShadowSource() == null
? new ManagedLedgerImpl(this, bk, store, config, scheduledExecutor, name,
mlOwnershipChecker)
: new ShadowManagedLedgerImpl(this, bk, store, config, scheduledExecutor, name,
mlOwnershipChecker);
final ManagedLedgerImpl newledger =
createManagedLedger(bk, store, name, config, mlOwnershipChecker);
PendingInitializeManagedLedger pendingLedger = new PendingInitializeManagedLedger(newledger);
pendingInitializeLedgers.put(name, pendingLedger);
newledger.initialize(new ManagedLedgerInitializeLedgerCallback() {
Expand Down Expand Up @@ -472,6 +469,14 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
});
}

protected ManagedLedgerImpl createManagedLedger(BookKeeper bk, MetaStore store, String name,
ManagedLedgerConfig config,
Supplier<CompletableFuture<Boolean>> mlOwnershipChecker) {
return config.getShadowSource() == null
? new ManagedLedgerImpl(this, bk, store, config, scheduledExecutor, name, mlOwnershipChecker) :
new ShadowManagedLedgerImpl(this, bk, store, config, scheduledExecutor, name, mlOwnershipChecker);
}

@Override
public void asyncOpenReadOnlyManagedLedger(String managedLedgerName,
AsyncCallbacks.OpenReadOnlyManagedLedgerCallback callback,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,7 @@ public void operationComplete(List<String> consumers, Stat s) {
for (final String cursorName : consumers) {
log.info("[{}] Loading cursor {}", name, cursorName);
final ManagedCursorImpl cursor;
cursor = new ManagedCursorImpl(bookKeeper, ManagedLedgerImpl.this, cursorName);
cursor = createCursor(ManagedLedgerImpl.this.bookKeeper, cursorName);

cursor.recover(new VoidCallback() {
@Override
Expand Down Expand Up @@ -663,7 +663,7 @@ public void operationFailed(ManagedLedgerException exception) {
log.debug("[{}] Recovering cursor {} lazily", name, cursorName);
}
final ManagedCursorImpl cursor;
cursor = new ManagedCursorImpl(bookKeeper, ManagedLedgerImpl.this, cursorName);
cursor = createCursor(ManagedLedgerImpl.this.bookKeeper, cursorName);
CompletableFuture<ManagedCursor> cursorRecoveryFuture = new CompletableFuture<>();
uninitializedCursors.put(cursorName, cursorRecoveryFuture);

Expand Down Expand Up @@ -1007,7 +1007,7 @@ public synchronized void asyncOpenCursor(final String cursorName, final InitialP
if (log.isDebugEnabled()) {
log.debug("[{}] Creating new cursor: {}", name, cursorName);
}
final ManagedCursorImpl cursor = new ManagedCursorImpl(bookKeeper, this, cursorName);
final ManagedCursorImpl cursor = createCursor(bookKeeper, cursorName);
CompletableFuture<ManagedCursor> cursorFuture = new CompletableFuture<>();
uninitializedCursors.put(cursorName, cursorFuture);
Position position = InitialPosition.Earliest == initialPosition ? getFirstPosition() : getLastPosition();
Expand Down Expand Up @@ -1039,6 +1039,10 @@ public void operationFailed(ManagedLedgerException exception) {
});
}

protected ManagedCursorImpl createCursor(BookKeeper bookKeeper, String cursorName) {
return new ManagedCursorImpl(bookKeeper, this, cursorName);
}

@Override
public synchronized void asyncDeleteCursor(final String consumerName, final DeleteCursorCallback callback,
final Object ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ public void initialize(ServiceConfiguration conf, MetadataStoreExtended metadata

try {
this.managedLedgerFactory =
new ManagedLedgerFactoryImpl(metadataStore, bkFactory, managedLedgerFactoryConfig, statsLogger,
openTelemetry);
createManagedLedgerFactory(metadataStore, openTelemetry, bkFactory, managedLedgerFactoryConfig,
statsLogger);
} catch (Exception e) {
statsProvider.stop();
defaultBkClient.close();
Expand Down Expand Up @@ -147,6 +147,16 @@ public BookKeeper getBookKeeperClient() {
};
}

protected ManagedLedgerFactoryImpl createManagedLedgerFactory(MetadataStoreExtended metadataStore,
OpenTelemetry openTelemetry,
BookkeeperFactoryForCustomEnsemblePlacementPolicy
bkFactory,
ManagedLedgerFactoryConfig managedLedgerFactoryConfig,
StatsLogger statsLogger) throws Exception {
return new ManagedLedgerFactoryImpl(metadataStore, bkFactory, managedLedgerFactoryConfig, statsLogger,
openTelemetry);
}

@Override
public Collection<ManagedLedgerStorageClass> getStorageClasses() {
return List.of(getDefaultStorageClass());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service;

import java.io.Closeable;
import java.io.IOException;
import org.apache.bookkeeper.mledger.ManagedLedger;

/**
Expand All @@ -28,4 +29,8 @@
public interface TopicFactory extends Closeable {

<T extends Topic> T create(String topic, ManagedLedger ledger, BrokerService brokerService, Class<T> topicClazz);

default void close() throws IOException {
// default implementation
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -250,70 +250,10 @@ private CompletableFuture<Void> addConsumerInternal(Consumer consumer) {
}

if (dispatcher == null || !dispatcher.isConsumerConnected()) {
Dispatcher previousDispatcher = null;
switch (consumer.subType()) {
case Exclusive:
if (dispatcher == null || dispatcher.getType() != SubType.Exclusive) {
previousDispatcher = dispatcher;
dispatcher = new PersistentDispatcherSingleActiveConsumer(
cursor, SubType.Exclusive, 0, topic, this);
}
break;
case Shared:
if (dispatcher == null || dispatcher.getType() != SubType.Shared) {
previousDispatcher = dispatcher;
if (config.isSubscriptionSharedUseClassicPersistentImplementation()) {
dispatcher = new PersistentDispatcherMultipleConsumersClassic(topic, cursor, this);
} else {
dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursor, this);
}
}
break;
case Failover:
int partitionIndex = TopicName.getPartitionIndex(topicName);
if (partitionIndex < 0) {
// For non partition topics, use a negative index so
// dispatcher won't sort consumers before picking
// an active consumer for the topic.
partitionIndex = -1;
}

if (dispatcher == null || dispatcher.getType() != SubType.Failover) {
previousDispatcher = dispatcher;
dispatcher = new PersistentDispatcherSingleActiveConsumer(cursor, SubType.Failover,
partitionIndex, topic, this);
}
break;
case Key_Shared:
KeySharedMeta ksm = consumer.getKeySharedMeta();
if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared
|| !((StickyKeyDispatcher) dispatcher)
.hasSameKeySharedPolicy(ksm)) {
previousDispatcher = dispatcher;
if (config.isSubscriptionKeySharedUseClassicPersistentImplementation()) {
dispatcher =
new PersistentStickyKeyDispatcherMultipleConsumersClassic(topic, cursor,
this,
topic.getBrokerService().getPulsar().getConfiguration(), ksm);
} else {
dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(topic, cursor, this,
topic.getBrokerService().getPulsar().getConfiguration(), ksm);
}
}
break;
default:
return FutureUtil.failedFuture(
new ServerMetadataException("Unsupported subscription type"));
}

if (previousDispatcher != null) {
previousDispatcher.close().thenRun(() -> {
log.info("[{}][{}] Successfully closed previous dispatcher", topicName, subName);
}).exceptionally(ex -> {
log.error("[{}][{}] Failed to close previous dispatcher", topicName, subName, ex);
return null;
});
if (consumer.subType() == null) {
return FutureUtil.failedFuture(new ServerMetadataException("Unsupported subscription type"));
}
dispatcher = reuseOrCreateDispatcher(dispatcher, consumer);
} else {
Optional<CompletableFuture<Void>> compatibilityError =
checkForConsumerCompatibilityErrorWithDispatcher(dispatcher, consumer);
Expand All @@ -327,6 +267,79 @@ private CompletableFuture<Void> addConsumerInternal(Consumer consumer) {
});
}

/**
* Create a new dispatcher or reuse the existing one when it's compatible with the new consumer.
* This protected method can be overridded for testing purpose for injecting test dispatcher instances with
* special behaviors.
* @param dispatcher the existing dispatcher
* @param consumer the new consumer
* @return the dispatcher to use, either the existing one or a new one
*/
protected Dispatcher reuseOrCreateDispatcher(Dispatcher dispatcher, Consumer consumer) {
Dispatcher previousDispatcher = null;
switch (consumer.subType()) {
case Exclusive:
if (dispatcher == null || dispatcher.getType() != SubType.Exclusive) {
previousDispatcher = dispatcher;
dispatcher = new PersistentDispatcherSingleActiveConsumer(
cursor, SubType.Exclusive, 0, topic, this);
}
break;
case Shared:
if (dispatcher == null || dispatcher.getType() != SubType.Shared) {
previousDispatcher = dispatcher;
if (config.isSubscriptionSharedUseClassicPersistentImplementation()) {
dispatcher = new PersistentDispatcherMultipleConsumersClassic(topic, cursor, this);
} else {
dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursor, this);
}
}
break;
case Failover:
int partitionIndex = TopicName.getPartitionIndex(topicName);
if (partitionIndex < 0) {
// For non partition topics, use a negative index so
// dispatcher won't sort consumers before picking
// an active consumer for the topic.
partitionIndex = -1;
}

if (dispatcher == null || dispatcher.getType() != SubType.Failover) {
previousDispatcher = dispatcher;
dispatcher = new PersistentDispatcherSingleActiveConsumer(cursor, SubType.Failover,
partitionIndex, topic, this);
}
break;
case Key_Shared:
KeySharedMeta ksm = consumer.getKeySharedMeta();
if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared
|| !((StickyKeyDispatcher) dispatcher)
.hasSameKeySharedPolicy(ksm)) {
previousDispatcher = dispatcher;
if (config.isSubscriptionKeySharedUseClassicPersistentImplementation()) {
dispatcher =
new PersistentStickyKeyDispatcherMultipleConsumersClassic(topic, cursor,
this, config, ksm);
} else {
dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(topic, cursor, this,
config, ksm);
}
}
break;
}

if (previousDispatcher != null) {
previousDispatcher.close().thenRun(() -> {
log.info("[{}][{}] Successfully closed previous dispatcher", topicName, subName);
}).exceptionally(ex -> {
log.error("[{}][{}] Failed to close previous dispatcher", topicName, subName, ex);
return null;
});
}

return dispatcher;
}

@Override
public synchronized void removeConsumer(Consumer consumer, boolean isResetCursor) throws BrokerServiceException {
cursor.updateLastActive();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,17 @@ public CompletableFuture<Void> unloadSubscription(@Nonnull String subName) {
});
}

private PersistentSubscription createPersistentSubscription(String subscriptionName, ManagedCursor cursor,

/**
* Create a new subscription instance for the topic.
* This protected method can be overridden in tests to return a special test implementation instance.
* @param subscriptionName the name of the subscription
* @param cursor the cursor to use for the subscription
* @param replicated the subscription replication flag
* @param subscriptionProperties the subscription properties
* @return the subscription instance
*/
protected PersistentSubscription createPersistentSubscription(String subscriptionName, ManagedCursor cursor,
Boolean replicated, Map<String, String> subscriptionProperties) {
requireNonNull(topicCompactionService);
if (isCompactionSubscription(subscriptionName)
Expand Down
Loading

0 comments on commit 2a9d4ac

Please sign in to comment.