From 167ff8b84ef2aed545dd575e4ad559cfe22c4200 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 13 Apr 2023 10:42:03 +0300 Subject: [PATCH] [improve][broker] Prevent range conflicts with Key Shared sticky consumers when TCP/IP connections get orphaned (#174) upstream PR https://github.com/apache/pulsar/pull/20026 --- .../pulsar/broker/ServiceConfiguration.java | 9 + ...bstractDispatcherSingleActiveConsumer.java | 8 +- ...stentHashingStickyKeyConsumerSelector.java | 5 +- .../pulsar/broker/service/Dispatcher.java | 2 +- ...ngeAutoSplitStickyKeyConsumerSelector.java | 11 +- ...ngeExclusiveStickyKeyConsumerSelector.java | 53 +++-- .../pulsar/broker/service/ServerCnx.java | 48 ++++- .../service/StickyKeyConsumerSelector.java | 4 +- .../pulsar/broker/service/TransportCnx.java | 9 + .../NonPersistentDispatcher.java | 2 +- ...PersistentDispatcherMultipleConsumers.java | 8 +- ...tStickyKeyDispatcherMultipleConsumers.java | 24 ++- .../NonPersistentSubscription.java | 7 +- ...PersistentDispatcherMultipleConsumers.java | 9 +- ...tStickyKeyDispatcherMultipleConsumers.java | 48 +++-- .../persistent/PersistentSubscription.java | 7 +- .../service/AbstractBaseDispatcherTest.java | 4 +- ...xclusiveStickyKeyConsumerSelectorTest.java | 84 +++++--- .../pulsar/common/protocol/PulsarHandler.java | 23 ++- pulsar-proxy/pom.xml | 5 + .../server/ProxyStuckConnectionTest.java | 188 ++++++++++++++++++ 21 files changed, 444 insertions(+), 114 deletions(-) create mode 100644 pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index b8337053d4897..6385f6118862c 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2290,6 +2290,15 @@ public class ServiceConfiguration implements PulsarConfiguration { doc = "How often to check pulsar connection is still alive" ) private int keepAliveIntervalSeconds = 30; + @FieldContext( + category = CATEGORY_SERVER, + doc = "Timeout for connection liveness check used to check liveness of possible consumer or producer " + + "duplicates. Helps prevent ProducerFencedException with exclusive producer, " + + "ConsumerAssignException with range conflict for Key Shared with sticky hash ranges or " + + "ConsumerBusyException in the case of an exclusive consumer. Set to 0 to disable connection " + + "liveness check." + ) + private long connectionLivenessCheckTimeoutMillis = 5000L; @Deprecated @FieldContext( category = CATEGORY_POLICIES, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java index 8cab06be116af..5ee4f9853b0e2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java @@ -31,6 +31,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException; import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; +import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -136,20 +137,20 @@ protected boolean pickAndScheduleActiveConsumer() { } } - public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException { + public synchronized CompletableFuture addConsumer(Consumer consumer) { if (IS_CLOSED_UPDATER.get(this) == TRUE) { log.warn("[{}] Dispatcher is already closed. Closing consumer {}", this.topicName, consumer); consumer.disconnect(); } if (subscriptionType == SubType.Exclusive && !consumers.isEmpty()) { - throw new ConsumerBusyException("Exclusive consumer is already connected"); + return FutureUtil.failedFuture(new ConsumerBusyException("Exclusive consumer is already connected")); } if (subscriptionType == SubType.Failover && isConsumersExceededOnSubscription()) { log.warn("[{}] Attempting to add consumer to subscription which reached max consumers limit", this.topicName); - throw new ConsumerBusyException("Subscription reached max consumers limit"); + return FutureUtil.failedFuture(new ConsumerBusyException("Subscription reached max consumers limit")); } if (subscriptionType == SubType.Exclusive @@ -181,6 +182,7 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce } } + return CompletableFuture.completedFuture(null); } public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java index c8530e230c07c..f3a48ac2dde6c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java @@ -26,9 +26,9 @@ import java.util.Map; import java.util.NavigableMap; import java.util.TreeMap; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException; import org.apache.pulsar.client.api.Range; import org.apache.pulsar.common.util.Murmur3_32Hash; @@ -53,7 +53,7 @@ public ConsistentHashingStickyKeyConsumerSelector(int numberOfPoints) { } @Override - public void addConsumer(Consumer consumer) throws ConsumerAssignException { + public CompletableFuture addConsumer(Consumer consumer) { rwLock.writeLock().lock(); try { // Insert multiple points on the hash ring for every consumer @@ -73,6 +73,7 @@ public void addConsumer(Consumer consumer) throws ConsumerAssignException { } }); } + return CompletableFuture.completedFuture(null); } finally { rwLock.writeLock().unlock(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java index 86980ee340c96..78e5a02ad4972 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java @@ -27,7 +27,7 @@ import org.apache.pulsar.common.api.proto.MessageMetadata; public interface Dispatcher { - void addConsumer(Consumer consumer) throws BrokerServiceException; + CompletableFuture addConsumer(Consumer consumer); void removeConsumer(Consumer consumer) throws BrokerServiceException; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java index e4a2bf7085159..d3f4ca76df77e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java @@ -23,9 +23,11 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentSkipListMap; import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException; import org.apache.pulsar.client.api.Range; +import org.apache.pulsar.common.util.FutureUtil; /** * This is a consumer selector based fixed hash range. @@ -77,13 +79,18 @@ public HashRangeAutoSplitStickyKeyConsumerSelector(int rangeSize) { } @Override - public synchronized void addConsumer(Consumer consumer) throws ConsumerAssignException { + public synchronized CompletableFuture addConsumer(Consumer consumer) { if (rangeMap.isEmpty()) { rangeMap.put(rangeSize, consumer); consumerRange.put(consumer, rangeSize); } else { - splitRange(findBiggestRange(), consumer); + try { + splitRange(findBiggestRange(), consumer); + } catch (ConsumerAssignException e) { + return FutureUtil.failedFuture(e); + } } + return CompletableFuture.completedFuture(null); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java index 7c496321d1d3a..a300e83c28242 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java @@ -23,10 +23,12 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentSkipListMap; import org.apache.pulsar.client.api.Range; import org.apache.pulsar.common.api.proto.IntRange; import org.apache.pulsar.common.api.proto.KeySharedMeta; +import org.apache.pulsar.common.util.FutureUtil; /** * This is a sticky-key consumer selector based user provided range. @@ -52,8 +54,23 @@ public HashRangeExclusiveStickyKeyConsumerSelector(int rangeSize) { } @Override - public void addConsumer(Consumer consumer) throws BrokerServiceException.ConsumerAssignException { - validateKeySharedMeta(consumer); + public synchronized CompletableFuture addConsumer(Consumer consumer) { + return validateKeySharedMeta(consumer).thenRun(() -> { + try { + internalAddConsumer(consumer); + } catch (BrokerServiceException.ConsumerAssignException e) { + throw FutureUtil.wrapToCompletionException(e); + } + }); + } + + private synchronized void internalAddConsumer(Consumer consumer) + throws BrokerServiceException.ConsumerAssignException { + Consumer conflictingConsumer = findConflictingConsumer(consumer.getKeySharedMeta().getHashRangesList()); + if (conflictingConsumer != null) { + throw new BrokerServiceException.ConsumerAssignException("Range conflict with consumer " + + conflictingConsumer); + } for (IntRange intRange : consumer.getKeySharedMeta().getHashRangesList()) { rangeMap.put(intRange.getStart(), consumer); rangeMap.put(intRange.getEnd(), consumer); @@ -101,31 +118,41 @@ public Consumer select(int hash) { } } - private void validateKeySharedMeta(Consumer consumer) throws BrokerServiceException.ConsumerAssignException { + private synchronized CompletableFuture validateKeySharedMeta(Consumer consumer) { if (consumer.getKeySharedMeta() == null) { - throw new BrokerServiceException.ConsumerAssignException("Must specify key shared meta for consumer."); + return FutureUtil.failedFuture( + new BrokerServiceException.ConsumerAssignException("Must specify key shared meta for consumer.")); } List ranges = consumer.getKeySharedMeta().getHashRangesList(); if (ranges.isEmpty()) { - throw new BrokerServiceException.ConsumerAssignException("Ranges for KeyShared policy must not be empty."); + return FutureUtil.failedFuture(new BrokerServiceException.ConsumerAssignException( + "Ranges for KeyShared policy must not be empty.")); } for (IntRange intRange : ranges) { - if (intRange.getStart() > intRange.getEnd()) { - throw new BrokerServiceException.ConsumerAssignException("Fixed hash range start > end"); + return FutureUtil.failedFuture( + new BrokerServiceException.ConsumerAssignException("Fixed hash range start > end")); } + } + Consumer conflictingConsumer = findConflictingConsumer(ranges); + if (conflictingConsumer != null) { + return conflictingConsumer.cnx().checkConnectionLiveness().thenRun(() -> {}); + } else { + return CompletableFuture.completedFuture(null); + } + } + private synchronized Consumer findConflictingConsumer(List ranges) { + for (IntRange intRange : ranges) { Map.Entry ceilingEntry = rangeMap.ceilingEntry(intRange.getStart()); Map.Entry floorEntry = rangeMap.floorEntry(intRange.getEnd()); if (floorEntry != null && floorEntry.getKey() >= intRange.getStart()) { - throw new BrokerServiceException.ConsumerAssignException("Range conflict with consumer " - + floorEntry.getValue()); + return floorEntry.getValue(); } if (ceilingEntry != null && ceilingEntry.getKey() <= intRange.getEnd()) { - throw new BrokerServiceException.ConsumerAssignException("Range conflict with consumer " - + ceilingEntry.getValue()); + return ceilingEntry.getValue(); } if (ceilingEntry != null && floorEntry != null && ceilingEntry.getValue().equals(floorEntry.getValue())) { @@ -134,12 +161,12 @@ private void validateKeySharedMeta(Consumer consumer) throws BrokerServiceExcept int start = Math.max(intRange.getStart(), range.getStart()); int end = Math.min(intRange.getEnd(), range.getEnd()); if (end >= start) { - throw new BrokerServiceException.ConsumerAssignException("Range conflict with consumer " - + ceilingEntry.getValue()); + return ceilingEntry.getValue(); } } } } + return null; } Map getRangeConsumer() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index ee0ee4b221ff5..801dc9585825e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -152,6 +152,7 @@ import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap; +import org.apache.pulsar.common.util.netty.NettyFutureUtil; import org.apache.pulsar.functions.utils.Exceptions; import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException; @@ -214,6 +215,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { private final long maxPendingBytesPerThread; private final long resumeThresholdPendingBytesPerThread; + private final long connectionLivenessCheckTimeoutMillis; + // Number of bytes pending to be published from a single specific IO thread. private static final FastThreadLocal pendingBytesPerThread = new FastThreadLocal() { @Override @@ -230,7 +233,6 @@ protected Set initialValue() throws Exception { } }; - enum State { Start, Connected, Failed, Connecting } @@ -250,6 +252,8 @@ public ServerCnx(PulsarService pulsar, String listenerName) { this.state = State.Start; ServiceConfiguration conf = pulsar.getConfiguration(); + this.connectionLivenessCheckTimeoutMillis = conf.getConnectionLivenessCheckTimeoutMillis(); + // This maps are not heavily contended since most accesses are within the cnx thread this.producers = ConcurrentLongHashMap.>newBuilder() .expectedItems(8) @@ -341,6 +345,11 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { } }); this.service.getPulsarStats().recordConnectionClose(); + + // complete possible pending connection check future + if (connectionCheckInProgress != null && !connectionCheckInProgress.isDone()) { + connectionCheckInProgress.complete(false); + } } @Override @@ -3087,6 +3096,43 @@ public String clientSourceAddress() { } } + CompletableFuture connectionCheckInProgress; + + @Override + public CompletableFuture checkConnectionLiveness() { + if (connectionLivenessCheckTimeoutMillis > 0) { + return NettyFutureUtil.toCompletableFuture(ctx.executor().submit(() -> { + if (connectionCheckInProgress != null) { + return connectionCheckInProgress; + } else { + final CompletableFuture finalConnectionCheckInProgress = new CompletableFuture<>(); + connectionCheckInProgress = finalConnectionCheckInProgress; + ctx.executor().schedule(() -> { + if (finalConnectionCheckInProgress == connectionCheckInProgress + && !finalConnectionCheckInProgress.isDone()) { + log.warn("[{}] Connection check timed out. Closing connection.", remoteAddress); + ctx.close(); + } + }, connectionLivenessCheckTimeoutMillis, TimeUnit.MILLISECONDS); + sendPing(); + return finalConnectionCheckInProgress; + } + })).thenCompose(java.util.function.Function.identity()); + } else { + // check is disabled + return CompletableFuture.completedFuture((Boolean) null); + } + } + + @Override + protected void messageReceived() { + super.messageReceived(); + if (connectionCheckInProgress != null && !connectionCheckInProgress.isDone()) { + connectionCheckInProgress.complete(true); + connectionCheckInProgress = null; + } + } + private static void logAuthException(SocketAddress remoteAddress, String operation, String principal, Optional topic, Throwable ex) { String topicString = topic.map(t -> ", topic=" + t.toString()).orElse(""); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java index b129ef533df4e..847561cfa9c2f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java @@ -20,7 +20,7 @@ import java.util.List; import java.util.Map; -import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException; +import java.util.concurrent.CompletableFuture; import org.apache.pulsar.client.api.Range; import org.apache.pulsar.common.util.Murmur3_32Hash; @@ -33,7 +33,7 @@ public interface StickyKeyConsumerSelector { * * @param consumer new consumer */ - void addConsumer(Consumer consumer) throws ConsumerAssignException; + CompletableFuture addConsumer(Consumer consumer); /** * Remove the consumer. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java index cc0c559eaf22a..f28016c2de9fa 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java @@ -21,6 +21,7 @@ import io.netty.handler.codec.haproxy.HAProxyMessage; import io.netty.util.concurrent.Promise; import java.net.SocketAddress; +import java.util.concurrent.CompletableFuture; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; public interface TransportCnx { @@ -78,4 +79,12 @@ public interface TransportCnx { String clientSourceAddress(); + /*** + * Check if the connection is still alive + * by actively sending a Ping message to the client. + * + * @return a completable future where the result is true if the connection is alive, false otherwise. The result + * is null if the connection liveness check is disabled. + */ + CompletableFuture checkConnectionLiveness(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcher.java index 1ef2db2561340..82b09995344ab 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcher.java @@ -31,7 +31,7 @@ public interface NonPersistentDispatcher extends Dispatcher { - void addConsumer(Consumer consumer) throws BrokerServiceException; + CompletableFuture addConsumer(Consumer consumer); void removeConsumer(Consumer consumer) throws BrokerServiceException; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java index 9694584025597..289434f4257e3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java @@ -35,6 +35,7 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.stats.Rate; +import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,20 +68,21 @@ public NonPersistentDispatcherMultipleConsumers(NonPersistentTopic topic, Subscr } @Override - public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException { + public synchronized CompletableFuture addConsumer(Consumer consumer) { if (IS_CLOSED_UPDATER.get(this) == TRUE) { log.warn("[{}] Dispatcher is already closed. Closing consumer {}", name, consumer); consumer.disconnect(); - return; + return CompletableFuture.completedFuture(null); } if (isConsumersExceededOnSubscription()) { log.warn("[{}] Attempting to add consumer to subscription which reached max consumers limit", name); - throw new ConsumerBusyException("Subscription reached max consumers limit"); + return FutureUtil.failedFuture(new ConsumerBusyException("Subscription reached max consumers limit")); } consumerList.add(consumer); consumerSet.add(consumer); + return CompletableFuture.completedFuture(null); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java index e5e5349651946..d1e199733a2f0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import org.apache.bookkeeper.mledger.Entry; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.service.BrokerServiceException; @@ -39,6 +40,7 @@ import org.apache.pulsar.common.api.proto.KeySharedMeta; import org.apache.pulsar.common.api.proto.KeySharedMode; import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.util.FutureUtil; public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersistentDispatcherMultipleConsumers { @@ -83,15 +85,19 @@ public NonPersistentStickyKeyDispatcherMultipleConsumers(NonPersistentTopic topi } @Override - public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException { - super.addConsumer(consumer); - try { - selector.addConsumer(consumer); - } catch (BrokerServiceException e) { - consumerSet.removeAll(consumer); - consumerList.remove(consumer); - throw e; - } + public synchronized CompletableFuture addConsumer(Consumer consumer) { + return super.addConsumer(consumer).thenCompose(__ -> + selector.addConsumer(consumer).handle((value, ex) -> { + if (ex != null) { + synchronized (NonPersistentStickyKeyDispatcherMultipleConsumers.this) { + consumerSet.removeAll(consumer); + consumerList.remove(consumer); + } + throw FutureUtil.wrapToCompletionException(ex); + } else { + return value; + } + })); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java index 34e86ca761587..38e254e5fb611 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java @@ -172,12 +172,7 @@ public synchronized CompletableFuture addConsumer(Consumer consumer) { } } - try { - dispatcher.addConsumer(consumer); - return CompletableFuture.completedFuture(null); - } catch (BrokerServiceException brokerServiceException) { - return FutureUtil.failedFuture(brokerServiceException); - } + return dispatcher.addConsumer(consumer); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index a82269c73504f..059e1f886903a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -65,6 +65,7 @@ import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.util.Codec; +import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -142,11 +143,11 @@ public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCurso } @Override - public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException { + public synchronized CompletableFuture addConsumer(Consumer consumer) { if (IS_CLOSED_UPDATER.get(this) == TRUE) { log.warn("[{}] Dispatcher is already closed. Closing consumer {}", name, consumer); consumer.disconnect(); - return; + return CompletableFuture.completedFuture(null); } if (consumerList.isEmpty()) { if (havePendingRead || havePendingReplayRead) { @@ -162,7 +163,7 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce if (isConsumersExceededOnSubscription()) { log.warn("[{}] Attempting to add consumer to subscription which reached max consumers limit", name); - throw new ConsumerBusyException("Subscription reached max consumers limit"); + return FutureUtil.failedFuture(new ConsumerBusyException("Subscription reached max consumers limit")); } consumerList.add(consumer); @@ -171,6 +172,8 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce consumerList.sort(Comparator.comparingInt(Consumer::getPriorityLevel)); } consumerSet.add(consumer); + + return CompletableFuture.completedFuture(null); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index 5eb553106e679..55ddf54ea5395 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.bookkeeper.mledger.Entry; @@ -51,6 +52,7 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.KeySharedMeta; import org.apache.pulsar.common.api.proto.KeySharedMode; +import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -101,26 +103,32 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi } @Override - public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException { - super.addConsumer(consumer); - try { - selector.addConsumer(consumer); - } catch (BrokerServiceException e) { - consumerSet.removeAll(consumer); - consumerList.remove(consumer); - throw e; - } - - PositionImpl readPositionWhenJoining = (PositionImpl) cursor.getReadPosition(); - consumer.setReadPositionWhenJoining(readPositionWhenJoining); - // If this was the 1st consumer, or if all the messages are already acked, then we - // don't need to do anything special - if (!allowOutOfOrderDelivery - && recentlyJoinedConsumers != null - && consumerList.size() > 1 - && cursor.getNumberOfEntriesSinceFirstNotAckedMessage() > 1) { - recentlyJoinedConsumers.put(consumer, readPositionWhenJoining); - } + public synchronized CompletableFuture addConsumer(Consumer consumer) { + return super.addConsumer(consumer).thenCompose(__ -> + selector.addConsumer(consumer).handle((result, ex) -> { + if (ex != null) { + synchronized (PersistentStickyKeyDispatcherMultipleConsumers.this) { + consumerSet.removeAll(consumer); + consumerList.remove(consumer); + } + throw FutureUtil.wrapToCompletionException(ex); + } + return result; + }) + ).thenRun(() -> { + synchronized (PersistentStickyKeyDispatcherMultipleConsumers.this) { + PositionImpl readPositionWhenJoining = (PositionImpl) cursor.getReadPosition(); + consumer.setReadPositionWhenJoining(readPositionWhenJoining); + // If this was the 1st consumer, or if all the messages are already acked, then we + // don't need to do anything special + if (!allowOutOfOrderDelivery + && recentlyJoinedConsumers != null + && consumerList.size() > 1 + && cursor.getNumberOfEntriesSinceFirstNotAckedMessage() > 1) { + recentlyJoinedConsumers.put(consumer, readPositionWhenJoining); + } + } + }); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 3f7f698b8036f..86251691496bd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -291,12 +291,7 @@ public CompletableFuture addConsumer(Consumer consumer) { } } - try { - dispatcher.addConsumer(consumer); - return CompletableFuture.completedFuture(null); - } catch (BrokerServiceException brokerServiceException) { - return FutureUtil.failedFuture(brokerServiceException); - } + return dispatcher.addConsumer(consumer); } }); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java index 2277171cca585..a7ad42e7457cc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java @@ -244,8 +244,8 @@ protected void reScheduleRead() { } @Override - public void addConsumer(Consumer consumer) throws BrokerServiceException { - + public CompletableFuture addConsumer(Consumer consumer) { + return CompletableFuture.completedFuture(null); } @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelectorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelectorTest.java index dde0a586f03f2..bcab91a0bad0c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelectorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelectorTest.java @@ -16,20 +16,20 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.pulsar.broker.service; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; - import com.google.common.collect.Lists; - import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; - +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import org.apache.pulsar.client.api.Range; import org.apache.pulsar.common.api.proto.IntRange; import org.apache.pulsar.common.api.proto.KeySharedMeta; @@ -41,7 +41,7 @@ public class HashRangeExclusiveStickyKeyConsumerSelectorTest { @Test - public void testConsumerSelect() throws BrokerServiceException.ConsumerAssignException { + public void testConsumerSelect() throws ExecutionException, InterruptedException { HashRangeExclusiveStickyKeyConsumerSelector selector = new HashRangeExclusiveStickyKeyConsumerSelector(10); Consumer consumer1 = mock(Consumer.class); @@ -50,8 +50,8 @@ public void testConsumerSelect() throws BrokerServiceException.ConsumerAssignExc keySharedMeta1.addHashRange().setStart(0).setEnd(2); when(consumer1.getKeySharedMeta()).thenReturn(keySharedMeta1); Assert.assertEquals(consumer1.getKeySharedMeta(), keySharedMeta1); - selector.addConsumer(consumer1); - Assert.assertEquals(selector.getRangeConsumer().size(),2); + selector.addConsumer(consumer1).get(); + Assert.assertEquals(selector.getRangeConsumer().size(), 2); Consumer selectedConsumer; for (int i = 0; i < 3; i++) { selectedConsumer = selector.select(i); @@ -66,8 +66,8 @@ public void testConsumerSelect() throws BrokerServiceException.ConsumerAssignExc keySharedMeta2.addHashRange().setStart(3).setEnd(9); when(consumer2.getKeySharedMeta()).thenReturn(keySharedMeta2); Assert.assertEquals(consumer2.getKeySharedMeta(), keySharedMeta2); - selector.addConsumer(consumer2); - Assert.assertEquals(selector.getRangeConsumer().size(),4); + selector.addConsumer(consumer2).get(); + Assert.assertEquals(selector.getRangeConsumer().size(), 4); for (int i = 3; i < 10; i++) { selectedConsumer = selector.select(i); @@ -80,32 +80,46 @@ public void testConsumerSelect() throws BrokerServiceException.ConsumerAssignExc } selector.removeConsumer(consumer1); - Assert.assertEquals(selector.getRangeConsumer().size(),2); + Assert.assertEquals(selector.getRangeConsumer().size(), 2); selectedConsumer = selector.select(1); Assert.assertNull(selectedConsumer); selector.removeConsumer(consumer2); - Assert.assertEquals(selector.getRangeConsumer().size(),0); + Assert.assertEquals(selector.getRangeConsumer().size(), 0); selectedConsumer = selector.select(5); Assert.assertNull(selectedConsumer); } - @Test(expectedExceptions = BrokerServiceException.ConsumerAssignException.class) - public void testEmptyRanges() throws BrokerServiceException.ConsumerAssignException { + @Test + public void testEmptyRanges() { HashRangeExclusiveStickyKeyConsumerSelector selector = new HashRangeExclusiveStickyKeyConsumerSelector(10); Consumer consumer = mock(Consumer.class); KeySharedMeta keySharedMeta = new KeySharedMeta() .setKeySharedMode(KeySharedMode.STICKY); when(consumer.getKeySharedMeta()).thenReturn(keySharedMeta); - selector.addConsumer(consumer); + try { + selector.addConsumer(consumer).get(); + Assert.fail("Should have failed"); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } catch (ExecutionException e) { + Assert.assertTrue(e.getCause() instanceof BrokerServiceException.ConsumerAssignException); + } } - @Test(expectedExceptions = BrokerServiceException.ConsumerAssignException.class) - public void testNullKeySharedMeta() throws BrokerServiceException.ConsumerAssignException { + @Test + public void testNullKeySharedMeta() throws ExecutionException, InterruptedException { HashRangeExclusiveStickyKeyConsumerSelector selector = new HashRangeExclusiveStickyKeyConsumerSelector(10); Consumer consumer = mock(Consumer.class); when(consumer.getKeySharedMeta()).thenReturn(null); - selector.addConsumer(consumer); + try { + selector.addConsumer(consumer).get(); + Assert.fail("Should have failed"); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } catch (ExecutionException e) { + Assert.assertTrue(e.getCause() instanceof BrokerServiceException.ConsumerAssignException); + } } @Test(expectedExceptions = IllegalArgumentException.class) @@ -114,7 +128,7 @@ public void testInvalidRangeTotal() { } @Test - public void testGetConsumerKeyHashRanges() throws BrokerServiceException.ConsumerAssignException { + public void testGetConsumerKeyHashRanges() throws ExecutionException, InterruptedException { HashRangeExclusiveStickyKeyConsumerSelector selector = new HashRangeExclusiveStickyKeyConsumerSelector(10); List consumerName = Arrays.asList("consumer1", "consumer2", "consumer3", "consumer4"); List range = Arrays.asList(new int[] {0, 2}, new int[] {3, 7}, new int[] {9, 12}, new int[] {15, 20}); @@ -129,7 +143,7 @@ public void testGetConsumerKeyHashRanges() throws BrokerServiceException.Consume when(consumer.getKeySharedMeta()).thenReturn(keySharedMeta); when(consumer.consumerName()).thenReturn(consumerName.get(index)); Assert.assertEquals(consumer.getKeySharedMeta(), keySharedMeta); - selector.addConsumer(consumer); + selector.addConsumer(consumer).get(); consumers.add(consumer); } @@ -161,7 +175,7 @@ public void testGetConsumerKeyHashRangesWithSameConsumerName() throws Exception when(consumer.getKeySharedMeta()).thenReturn(keySharedMeta); when(consumer.consumerName()).thenReturn(consumerName); Assert.assertEquals(consumer.getKeySharedMeta(), keySharedMeta); - selector.addConsumer(consumer); + selector.addConsumer(consumer).get(); consumers.add(consumer); } @@ -177,16 +191,19 @@ public void testGetConsumerKeyHashRangesWithSameConsumerName() throws Exception } @Test - public void testSingleRangeConflict() throws BrokerServiceException.ConsumerAssignException { + public void testSingleRangeConflict() throws ExecutionException, InterruptedException { HashRangeExclusiveStickyKeyConsumerSelector selector = new HashRangeExclusiveStickyKeyConsumerSelector(10); Consumer consumer1 = mock(Consumer.class); + TransportCnx transportCnx = mock(TransportCnx.class); + when(consumer1.cnx()).thenReturn(transportCnx); + when(transportCnx.checkConnectionLiveness()).thenReturn(CompletableFuture.completedFuture(null)); KeySharedMeta keySharedMeta1 = new KeySharedMeta() .setKeySharedMode(KeySharedMode.STICKY); keySharedMeta1.addHashRange().setStart(2).setEnd(5); when(consumer1.getKeySharedMeta()).thenReturn(keySharedMeta1); Assert.assertEquals(consumer1.getKeySharedMeta(), keySharedMeta1); - selector.addConsumer(consumer1); - Assert.assertEquals(selector.getRangeConsumer().size(),2); + selector.addConsumer(consumer1).get(); + Assert.assertEquals(selector.getRangeConsumer().size(), 2); final List testRanges = new ArrayList<>(); testRanges.add(new IntRange().setStart(4).setEnd(6)); @@ -207,25 +224,29 @@ public void testSingleRangeConflict() throws BrokerServiceException.ConsumerAssi when(consumer.getKeySharedMeta()).thenReturn(keySharedMeta); Assert.assertEquals(consumer.getKeySharedMeta(), keySharedMeta); try { - selector.addConsumer(consumer); + selector.addConsumer(consumer).get(); Assert.fail("should be failed"); - } catch (BrokerServiceException.ConsumerAssignException ignore) { + } catch (ExecutionException | InterruptedException e) { + // ignore } - Assert.assertEquals(selector.getRangeConsumer().size(),2); + Assert.assertEquals(selector.getRangeConsumer().size(), 2); } } @Test - public void testMultipleRangeConflict() throws BrokerServiceException.ConsumerAssignException { + public void testMultipleRangeConflict() throws ExecutionException, InterruptedException { HashRangeExclusiveStickyKeyConsumerSelector selector = new HashRangeExclusiveStickyKeyConsumerSelector(10); Consumer consumer1 = mock(Consumer.class); + TransportCnx transportCnx = mock(TransportCnx.class); + when(consumer1.cnx()).thenReturn(transportCnx); + when(transportCnx.checkConnectionLiveness()).thenReturn(CompletableFuture.completedFuture(null)); KeySharedMeta keySharedMeta1 = new KeySharedMeta() .setKeySharedMode(KeySharedMode.STICKY); keySharedMeta1.addHashRange().setStart(2).setEnd(5); when(consumer1.getKeySharedMeta()).thenReturn(keySharedMeta1); Assert.assertEquals(consumer1.getKeySharedMeta(), keySharedMeta1); - selector.addConsumer(consumer1); - Assert.assertEquals(selector.getRangeConsumer().size(),2); + selector.addConsumer(consumer1).get(); + Assert.assertEquals(selector.getRangeConsumer().size(), 2); final List> testRanges = new ArrayList<>(); testRanges.add(Lists.newArrayList( @@ -246,11 +267,12 @@ public void testMultipleRangeConflict() throws BrokerServiceException.ConsumerAs when(consumer.getKeySharedMeta()).thenReturn(keySharedMeta); Assert.assertEquals(consumer.getKeySharedMeta(), keySharedMeta); try { - selector.addConsumer(consumer); + selector.addConsumer(consumer).get(); Assert.fail("should be failed"); - } catch (BrokerServiceException.ConsumerAssignException ignore) { + } catch (ExecutionException | InterruptedException e) { + // ignore } - Assert.assertEquals(selector.getRangeConsumer().size(),2); + Assert.assertEquals(selector.getRangeConsumer().size(), 2); } } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java index 3cd91809f5293..8cab3742bac5b 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java @@ -19,6 +19,7 @@ package org.apache.pulsar.common.protocol; import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.util.concurrent.ScheduledFuture; import java.net.SocketAddress; @@ -53,7 +54,7 @@ public PulsarHandler(int keepAliveInterval, TimeUnit unit) { } @Override - protected final void messageReceived() { + protected void messageReceived() { waitingForPingResponse = false; } @@ -117,14 +118,7 @@ private void handleKeepAliveTimeout() { log.debug("[{}] Sending ping message", ctx.channel()); } waitingForPingResponse = true; - ctx.writeAndFlush(Commands.newPing()) - .addListener(future -> { - if (!future.isSuccess()) { - log.warn("[{}] Forcing connection to close since cannot send a ping message.", - ctx.channel(), future.cause()); - ctx.close(); - } - }); + sendPing(); } else { if (log.isDebugEnabled()) { log.debug("[{}] Peer doesn't support keep-alive", ctx.channel()); @@ -132,6 +126,17 @@ private void handleKeepAliveTimeout() { } } + protected ChannelFuture sendPing() { + return ctx.writeAndFlush(Commands.newPing()) + .addListener(future -> { + if (!future.isSuccess()) { + log.warn("[{}] Forcing connection to close since cannot send a ping message.", + ctx.channel(), future.cause()); + ctx.close(); + } + }); + } + public void cancelKeepAliveTask() { if (keepAliveTask != null) { keepAliveTask.cancel(false); diff --git a/pulsar-proxy/pom.xml b/pulsar-proxy/pom.xml index 93fef4f7f9b46..352da828afef7 100644 --- a/pulsar-proxy/pom.xml +++ b/pulsar-proxy/pom.xml @@ -190,6 +190,11 @@ awaitility test + + org.testcontainers + testcontainers + test + diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java new file mode 100644 index 0000000000000..9c4e52e37caf3 --- /dev/null +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.proxy.server; + +import static org.mockito.Mockito.doReturn; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import lombok.Cleanup; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.KeySharedPolicy; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerAccessMode; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Range; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; +import org.apache.pulsar.metadata.impl.ZKMetadataStore; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.Testcontainers; +import org.testcontainers.containers.SocatContainer; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class ProxyStuckConnectionTest extends MockedPulsarServiceBaseTest { + + private static final Logger log = LoggerFactory.getLogger(ProxyStuckConnectionTest.class); + + private ProxyService proxyService; + private ProxyConfiguration proxyConfig; + private SocatContainer socatContainer; + + private static String brokerServiceUriSocat; + private static volatile boolean useBrokerSocatProxy = true; + + @Override + @BeforeMethod + protected void setup() throws Exception { + useBrokerSocatProxy = true; + internalSetup(); + + int brokerPort = pulsar.getBrokerService().getListenPort().get(); + Testcontainers.exposeHostPorts(brokerPort); + + socatContainer = new SocatContainer(); + socatContainer.withTarget(brokerPort, "host.testcontainers.internal", brokerPort); + socatContainer.start(); + brokerServiceUriSocat = "pulsar://" + socatContainer.getHost() + ":" + socatContainer.getMappedPort(brokerPort); + + proxyConfig = new ProxyConfiguration(); + proxyConfig.setServicePort(Optional.ofNullable(0)); + proxyConfig.setBrokerProxyAllowedTargetPorts("*"); + proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl()); + proxyConfig.setLookupHandler(TestLookupProxyHandler.class.getName()); + + startProxyService(); + // use the same port for subsequent restarts + proxyConfig.setServicePort(proxyService.getListenPort()); + } + + private void startProxyService() throws Exception { + proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( + PulsarConfigurationLoader.convertFrom(proxyConfig)))); + doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore(); + doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore(); + proxyService.start(); + } + + @Override + @AfterMethod(alwaysRun = true) + protected void cleanup() throws Exception { + internalCleanup(); + if (proxyService != null) { + proxyService.close(); + } + if (socatContainer != null) { + socatContainer.close(); + } + } + + private static final class TestLookupProxyHandler extends DefaultLookupProxyHandler { + @Override + protected CompletableFuture performLookup(long clientRequestId, String topic, String brokerServiceUrl, + boolean authoritative, int numberOfRetries) { + return super.performLookup(clientRequestId, topic, brokerServiceUrl, authoritative, numberOfRetries) + .thenApply(url -> useBrokerSocatProxy ? brokerServiceUriSocat : url); + } + } + + @Test + public void testKeySharedStickyWithStuckConnection() throws Exception { + @Cleanup + PulsarClient client = PulsarClient.builder().serviceUrl(proxyService.getServiceUrl()) + // keep alive is set to 2 seconds to detect the dead connection on the client side + // the main focus of the test is to verify that the broker and proxy doesn't get stuck forever + // when there's a hanging connection from the proxy to the broker and that it doesn't cause issues + // such as hash range conflicts + .keepAliveInterval(2, TimeUnit.SECONDS) + .build(); + String topicName = BrokerTestUtil.newUniqueName("persistent://sample/test/local/test-topic"); + + @Cleanup + Consumer consumer = client.newConsumer() + .topic(topicName) + .subscriptionName("test-subscription") + .subscriptionType(SubscriptionType.Key_Shared) + .keySharedPolicy(KeySharedPolicy.stickyHashRange() + .ranges(Range.of(0, 65535))) + .receiverQueueSize(2) + .isAckReceiptEnabled(true) + .subscribe(); + + Set messages = new HashSet<>(); + + try (Producer producer = client.newProducer() + .topic(topicName) + .accessMode(ProducerAccessMode.Shared) + .enableBatching(false) + .create()) { + for (int i = 0; i < 10; i++) { + String message = "test" + i; + producer.newMessage().value(message.getBytes()) + .key("A") + .send(); + messages.add(message); + } + } + + int counter = 0; + while (true) { + counter++; + Message msg = consumer.receive(15, TimeUnit.SECONDS); + if (msg == null) { + break; + } + String msgString = new String(msg.getData()); + log.info("Received message {}", msgString); + try { + consumer.acknowledge(msg); + } catch (PulsarClientException e) { + log.error("Failed to ack message {}", msgString, e); + } + messages.remove(msgString); + log.info("Remaining messages {}", messages.size()); + if (messages.size() == 0) { + break; + } + if (counter == 2) { + log.info( + "Pausing connection between proxy and broker and making further connections from proxy " + + "directly to broker"); + useBrokerSocatProxy = false; + socatContainer.getDockerClient().pauseContainerCmd(socatContainer.getContainerId()).exec(); + } + } + + Assert.assertEquals(messages.size(), 0); + Assert.assertEquals(consumer.receive(1, TimeUnit.MILLISECONDS), null); + } +}