Skip to content

Commit

Permalink
[fix] [broker] Fix break change: could not subscribe partitioned top…
Browse files Browse the repository at this point in the history
…ic with a suffix-matched regexp due to a mistake of PIP-145 (apache#21885)
  • Loading branch information
poorbarcode authored Jan 15, 2024
1 parent cfd2c47 commit 4ebbd2f
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.testng.Assert.assertEquals;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
Expand Down Expand Up @@ -55,7 +56,9 @@
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.policies.data.ClusterData;
Expand Down Expand Up @@ -717,5 +720,16 @@ protected void sleepSeconds(int seconds){
}
}

public static void reconnectAllConnections(PulsarClientImpl c) throws Exception {
ConnectionPool pool = c.getCnxPool();
Method closeAllConnections = ConnectionPool.class.getDeclaredMethod("closeAllConnections", new Class[]{});
closeAllConnections.setAccessible(true);
closeAllConnections.invoke(pool, new Object[]{});
}

public void reconnectAllConnections() throws Exception {
reconnectAllConnections((PulsarClientImpl) pulsarClient);
}

private static final Logger log = LoggerFactory.getLogger(MockedPulsarServiceBaseTest.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import com.google.common.collect.Lists;

import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand All @@ -35,6 +36,7 @@
import java.util.stream.IntStream;

import io.netty.util.Timeout;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.InjectedClientCnxClientBuilder;
Expand Down Expand Up @@ -679,6 +681,111 @@ public void testAutoSubscribePatterConsumerFromBrokerWatcher(boolean delayWatchi
}
}

@DataProvider(name= "partitioned")
public Object[][] partitioned(){
return new Object[][]{
{true},
{false}
};
}

@Test(timeOut = testTimeout, dataProvider = "partitioned")
public void testPreciseRegexpSubscribe(boolean partitioned) throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
final String subscriptionName = "s1";
final Pattern pattern = Pattern.compile(String.format("%s$", topicName));

Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topicsPattern(pattern)
// Disable automatic discovery.
.patternAutoDiscoveryPeriod(1000)
.subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
.receiverQueueSize(4)
.subscribe();

// 1. create topic.
if (partitioned) {
admin.topics().createPartitionedTopic(topicName, 1);
} else {
admin.topics().createNonPartitionedTopic(topicName);
}

// 2. verify consumer can subscribe the topic.
assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
Awaitility.await().untilAsserted(() -> {
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions().size(), 1);
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getConsumers().size(), 1);
if (partitioned) {
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 1);
} else {
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 0);
}
});

// cleanup.
consumer.close();
if (partitioned) {
admin.topics().deletePartitionedTopic(topicName);
} else {
admin.topics().delete(topicName);
}
}

@Test(timeOut = 240 * 1000, dataProvider = "partitioned")
public void testPreciseRegexpSubscribeDisabledTopicWatcher(boolean partitioned) throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
final String subscriptionName = "s1";
final Pattern pattern = Pattern.compile(String.format("%s$", topicName));

// Close all ServerCnx by close client-side sockets to make the config changes effect.
pulsar.getConfig().setEnableBrokerSideSubscriptionPatternEvaluation(false);
reconnectAllConnections();

Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topicsPattern(pattern)
// Disable brokerSideSubscriptionPatternEvaluation will leading disable topic list watcher.
// So set patternAutoDiscoveryPeriod to a little value.
.patternAutoDiscoveryPeriod(1)
.subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
.receiverQueueSize(4)
.subscribe();

// 1. create topic.
if (partitioned) {
admin.topics().createPartitionedTopic(topicName, 1);
} else {
admin.topics().createNonPartitionedTopic(topicName);
}

// 2. verify consumer can subscribe the topic.
// Since the minimum value of `patternAutoDiscoveryPeriod` is 60s, we set the test timeout to a triple value.
assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
Awaitility.await().atMost(Duration.ofMinutes(3)).untilAsserted(() -> {
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions().size(), 1);
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getConsumers().size(), 1);
if (partitioned) {
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 1);
} else {
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 0);
}
});

// cleanup.
consumer.close();
if (partitioned) {
admin.topics().deletePartitionedTopic(topicName);
} else {
admin.topics().delete(topicName);
}
// Close all ServerCnx by close client-side sockets to make the config changes effect.
pulsar.getConfig().setEnableBrokerSideSubscriptionPatternEvaluation(true);
reconnectAllConnections();
}

private PulsarClient createDelayWatchTopicsClient() throws Exception {
ClientBuilderImpl clientBuilder = (ClientBuilderImpl) PulsarClient.builder().serviceUrl(lookupUrl.toString());
return InjectedClientCnxClientBuilder.create(clientBuilder,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,16 @@ public static List<String> filterTopics(List<String> original, String regex) {
}
public static List<String> filterTopics(List<String> original, Pattern topicsPattern) {

final Pattern shortenedTopicsPattern = topicsPattern.toString().contains(SCHEME_SEPARATOR)
? Pattern.compile(SCHEME_SEPARATOR_PATTERN.split(topicsPattern.toString())[1]) : topicsPattern;
final Pattern shortenedTopicsPattern = Pattern.compile(removeTopicDomainScheme(topicsPattern.toString()));

return original.stream()
.map(TopicName::get)
.filter(topicName -> {
String partitionedTopicName = topicName.getPartitionedTopicName();
String removedScheme = SCHEME_SEPARATOR_PATTERN.split(partitionedTopicName)[1];
return shortenedTopicsPattern.matcher(removedScheme).matches();
})
.map(TopicName::toString)
.filter(topic -> shortenedTopicsPattern.matcher(SCHEME_SEPARATOR_PATTERN.split(topic)[1]).matches())
.collect(Collectors.toList());
}

Expand All @@ -78,4 +81,16 @@ public static Set<String> minus(Collection<String> list1, Collection<String> lis
s1.removeAll(list2);
return s1;
}

private static String removeTopicDomainScheme(String originalRegexp) {
if (!originalRegexp.toString().contains(SCHEME_SEPARATOR)) {
return originalRegexp;
}
String removedTopicDomain = SCHEME_SEPARATOR_PATTERN.split(originalRegexp.toString())[1];
if (originalRegexp.contains("^")) {
return String.format("^%s", removedTopicDomain);
} else {
return removedTopicDomain;
}
}
}

0 comments on commit 4ebbd2f

Please sign in to comment.