Skip to content

Commit

Permalink
[improve][broker] Prevent range conflicts with Key Shared sticky cons…
Browse files Browse the repository at this point in the history
…umers when TCP/IP connections get orphaned (#174)

upstream PR apache#20026
  • Loading branch information
lhotari authored Apr 13, 2023
1 parent ecbcb81 commit 167ff8b
Show file tree
Hide file tree
Showing 21 changed files with 444 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2290,6 +2290,15 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "How often to check pulsar connection is still alive"
)
private int keepAliveIntervalSeconds = 30;
@FieldContext(
category = CATEGORY_SERVER,
doc = "Timeout for connection liveness check used to check liveness of possible consumer or producer "
+ "duplicates. Helps prevent ProducerFencedException with exclusive producer, "
+ "ConsumerAssignException with range conflict for Key Shared with sticky hash ranges or "
+ "ConsumerBusyException in the case of an exclusive consumer. Set to 0 to disable connection "
+ "liveness check."
)
private long connectionLivenessCheckTimeoutMillis = 5000L;
@Deprecated
@FieldContext(
category = CATEGORY_POLICIES,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -136,20 +137,20 @@ protected boolean pickAndScheduleActiveConsumer() {
}
}

public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException {
public synchronized CompletableFuture<Void> addConsumer(Consumer consumer) {
if (IS_CLOSED_UPDATER.get(this) == TRUE) {
log.warn("[{}] Dispatcher is already closed. Closing consumer {}", this.topicName, consumer);
consumer.disconnect();
}

if (subscriptionType == SubType.Exclusive && !consumers.isEmpty()) {
throw new ConsumerBusyException("Exclusive consumer is already connected");
return FutureUtil.failedFuture(new ConsumerBusyException("Exclusive consumer is already connected"));
}

if (subscriptionType == SubType.Failover && isConsumersExceededOnSubscription()) {
log.warn("[{}] Attempting to add consumer to subscription which reached max consumers limit",
this.topicName);
throw new ConsumerBusyException("Subscription reached max consumers limit");
return FutureUtil.failedFuture(new ConsumerBusyException("Subscription reached max consumers limit"));
}

if (subscriptionType == SubType.Exclusive
Expand Down Expand Up @@ -181,6 +182,7 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce
}
}

return CompletableFuture.completedFuture(null);
}

public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.common.util.Murmur3_32Hash;

Expand All @@ -53,7 +53,7 @@ public ConsistentHashingStickyKeyConsumerSelector(int numberOfPoints) {
}

@Override
public void addConsumer(Consumer consumer) throws ConsumerAssignException {
public CompletableFuture<Void> addConsumer(Consumer consumer) {
rwLock.writeLock().lock();
try {
// Insert multiple points on the hash ring for every consumer
Expand All @@ -73,6 +73,7 @@ public void addConsumer(Consumer consumer) throws ConsumerAssignException {
}
});
}
return CompletableFuture.completedFuture(null);
} finally {
rwLock.writeLock().unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.pulsar.common.api.proto.MessageMetadata;

public interface Dispatcher {
void addConsumer(Consumer consumer) throws BrokerServiceException;
CompletableFuture<Void> addConsumer(Consumer consumer);

void removeConsumer(Consumer consumer) throws BrokerServiceException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.common.util.FutureUtil;

/**
* This is a consumer selector based fixed hash range.
Expand Down Expand Up @@ -77,13 +79,18 @@ public HashRangeAutoSplitStickyKeyConsumerSelector(int rangeSize) {
}

@Override
public synchronized void addConsumer(Consumer consumer) throws ConsumerAssignException {
public synchronized CompletableFuture<Void> addConsumer(Consumer consumer) {
if (rangeMap.isEmpty()) {
rangeMap.put(rangeSize, consumer);
consumerRange.put(consumer, rangeSize);
} else {
splitRange(findBiggestRange(), consumer);
try {
splitRange(findBiggestRange(), consumer);
} catch (ConsumerAssignException e) {
return FutureUtil.failedFuture(e);
}
}
return CompletableFuture.completedFuture(null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.common.api.proto.IntRange;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.util.FutureUtil;

/**
* This is a sticky-key consumer selector based user provided range.
Expand All @@ -52,8 +54,23 @@ public HashRangeExclusiveStickyKeyConsumerSelector(int rangeSize) {
}

@Override
public void addConsumer(Consumer consumer) throws BrokerServiceException.ConsumerAssignException {
validateKeySharedMeta(consumer);
public synchronized CompletableFuture<Void> addConsumer(Consumer consumer) {
return validateKeySharedMeta(consumer).thenRun(() -> {
try {
internalAddConsumer(consumer);
} catch (BrokerServiceException.ConsumerAssignException e) {
throw FutureUtil.wrapToCompletionException(e);
}
});
}

private synchronized void internalAddConsumer(Consumer consumer)
throws BrokerServiceException.ConsumerAssignException {
Consumer conflictingConsumer = findConflictingConsumer(consumer.getKeySharedMeta().getHashRangesList());
if (conflictingConsumer != null) {
throw new BrokerServiceException.ConsumerAssignException("Range conflict with consumer "
+ conflictingConsumer);
}
for (IntRange intRange : consumer.getKeySharedMeta().getHashRangesList()) {
rangeMap.put(intRange.getStart(), consumer);
rangeMap.put(intRange.getEnd(), consumer);
Expand Down Expand Up @@ -101,31 +118,41 @@ public Consumer select(int hash) {
}
}

private void validateKeySharedMeta(Consumer consumer) throws BrokerServiceException.ConsumerAssignException {
private synchronized CompletableFuture<Void> validateKeySharedMeta(Consumer consumer) {
if (consumer.getKeySharedMeta() == null) {
throw new BrokerServiceException.ConsumerAssignException("Must specify key shared meta for consumer.");
return FutureUtil.failedFuture(
new BrokerServiceException.ConsumerAssignException("Must specify key shared meta for consumer."));
}
List<IntRange> ranges = consumer.getKeySharedMeta().getHashRangesList();
if (ranges.isEmpty()) {
throw new BrokerServiceException.ConsumerAssignException("Ranges for KeyShared policy must not be empty.");
return FutureUtil.failedFuture(new BrokerServiceException.ConsumerAssignException(
"Ranges for KeyShared policy must not be empty."));
}
for (IntRange intRange : ranges) {

if (intRange.getStart() > intRange.getEnd()) {
throw new BrokerServiceException.ConsumerAssignException("Fixed hash range start > end");
return FutureUtil.failedFuture(
new BrokerServiceException.ConsumerAssignException("Fixed hash range start > end"));
}
}
Consumer conflictingConsumer = findConflictingConsumer(ranges);
if (conflictingConsumer != null) {
return conflictingConsumer.cnx().checkConnectionLiveness().thenRun(() -> {});
} else {
return CompletableFuture.completedFuture(null);
}
}

private synchronized Consumer findConflictingConsumer(List<IntRange> ranges) {
for (IntRange intRange : ranges) {
Map.Entry<Integer, Consumer> ceilingEntry = rangeMap.ceilingEntry(intRange.getStart());
Map.Entry<Integer, Consumer> floorEntry = rangeMap.floorEntry(intRange.getEnd());

if (floorEntry != null && floorEntry.getKey() >= intRange.getStart()) {
throw new BrokerServiceException.ConsumerAssignException("Range conflict with consumer "
+ floorEntry.getValue());
return floorEntry.getValue();
}

if (ceilingEntry != null && ceilingEntry.getKey() <= intRange.getEnd()) {
throw new BrokerServiceException.ConsumerAssignException("Range conflict with consumer "
+ ceilingEntry.getValue());
return ceilingEntry.getValue();
}

if (ceilingEntry != null && floorEntry != null && ceilingEntry.getValue().equals(floorEntry.getValue())) {
Expand All @@ -134,12 +161,12 @@ private void validateKeySharedMeta(Consumer consumer) throws BrokerServiceExcept
int start = Math.max(intRange.getStart(), range.getStart());
int end = Math.min(intRange.getEnd(), range.getEnd());
if (end >= start) {
throw new BrokerServiceException.ConsumerAssignException("Range conflict with consumer "
+ ceilingEntry.getValue());
return ceilingEntry.getValue();
}
}
}
}
return null;
}

Map<Integer, Consumer> getRangeConsumer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.common.util.netty.NettyFutureUtil;
import org.apache.pulsar.functions.utils.Exceptions;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
Expand Down Expand Up @@ -214,6 +215,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
private final long maxPendingBytesPerThread;
private final long resumeThresholdPendingBytesPerThread;

private final long connectionLivenessCheckTimeoutMillis;

// Number of bytes pending to be published from a single specific IO thread.
private static final FastThreadLocal<MutableLong> pendingBytesPerThread = new FastThreadLocal<MutableLong>() {
@Override
Expand All @@ -230,7 +233,6 @@ protected Set<ServerCnx> initialValue() throws Exception {
}
};


enum State {
Start, Connected, Failed, Connecting
}
Expand All @@ -250,6 +252,8 @@ public ServerCnx(PulsarService pulsar, String listenerName) {
this.state = State.Start;
ServiceConfiguration conf = pulsar.getConfiguration();

this.connectionLivenessCheckTimeoutMillis = conf.getConnectionLivenessCheckTimeoutMillis();

// This maps are not heavily contended since most accesses are within the cnx thread
this.producers = ConcurrentLongHashMap.<CompletableFuture<Producer>>newBuilder()
.expectedItems(8)
Expand Down Expand Up @@ -341,6 +345,11 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
}
});
this.service.getPulsarStats().recordConnectionClose();

// complete possible pending connection check future
if (connectionCheckInProgress != null && !connectionCheckInProgress.isDone()) {
connectionCheckInProgress.complete(false);
}
}

@Override
Expand Down Expand Up @@ -3087,6 +3096,43 @@ public String clientSourceAddress() {
}
}

CompletableFuture<Boolean> connectionCheckInProgress;

@Override
public CompletableFuture<Boolean> checkConnectionLiveness() {
if (connectionLivenessCheckTimeoutMillis > 0) {
return NettyFutureUtil.toCompletableFuture(ctx.executor().submit(() -> {
if (connectionCheckInProgress != null) {
return connectionCheckInProgress;
} else {
final CompletableFuture<Boolean> finalConnectionCheckInProgress = new CompletableFuture<>();
connectionCheckInProgress = finalConnectionCheckInProgress;
ctx.executor().schedule(() -> {
if (finalConnectionCheckInProgress == connectionCheckInProgress
&& !finalConnectionCheckInProgress.isDone()) {
log.warn("[{}] Connection check timed out. Closing connection.", remoteAddress);
ctx.close();
}
}, connectionLivenessCheckTimeoutMillis, TimeUnit.MILLISECONDS);
sendPing();
return finalConnectionCheckInProgress;
}
})).thenCompose(java.util.function.Function.identity());
} else {
// check is disabled
return CompletableFuture.completedFuture((Boolean) null);
}
}

@Override
protected void messageReceived() {
super.messageReceived();
if (connectionCheckInProgress != null && !connectionCheckInProgress.isDone()) {
connectionCheckInProgress.complete(true);
connectionCheckInProgress = null;
}
}

private static void logAuthException(SocketAddress remoteAddress, String operation,
String principal, Optional<TopicName> topic, Throwable ex) {
String topicString = topic.map(t -> ", topic=" + t.toString()).orElse("");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import java.util.List;
import java.util.Map;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.common.util.Murmur3_32Hash;

Expand All @@ -33,7 +33,7 @@ public interface StickyKeyConsumerSelector {
*
* @param consumer new consumer
*/
void addConsumer(Consumer consumer) throws ConsumerAssignException;
CompletableFuture<Void> addConsumer(Consumer consumer);

/**
* Remove the consumer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.netty.handler.codec.haproxy.HAProxyMessage;
import io.netty.util.concurrent.Promise;
import java.net.SocketAddress;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;

public interface TransportCnx {
Expand Down Expand Up @@ -78,4 +79,12 @@ public interface TransportCnx {

String clientSourceAddress();

/***
* Check if the connection is still alive
* by actively sending a Ping message to the client.
*
* @return a completable future where the result is true if the connection is alive, false otherwise. The result
* is null if the connection liveness check is disabled.
*/
CompletableFuture<Boolean> checkConnectionLiveness();
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

public interface NonPersistentDispatcher extends Dispatcher {

void addConsumer(Consumer consumer) throws BrokerServiceException;
CompletableFuture<Void> addConsumer(Consumer consumer);

void removeConsumer(Consumer consumer) throws BrokerServiceException;

Expand Down
Loading

0 comments on commit 167ff8b

Please sign in to comment.