Skip to content

Commit

Permalink
[fix] [test] Fix flaky test ReplicatorTest (apache#22594)
Browse files Browse the repository at this point in the history
  • Loading branch information
poorbarcode authored Apr 29, 2024
1 parent 93afd89 commit 6fdc0e3
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,24 @@
*/
package org.apache.pulsar.broker.service;

import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import lombok.Cleanup;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand All @@ -41,6 +47,11 @@
import java.lang.reflect.Method;
import java.util.concurrent.TimeUnit;

/**
* The tests in this class should be denied in a production pulsar cluster. they are very dangerous, which leads to
* a lot of topic deletion and makes namespace policies being incorrect.
*/
@Slf4j
@Test(groups = "broker-impl")
public class ReplicatorGlobalNSTest extends ReplicatorTestBase {

Expand Down Expand Up @@ -81,7 +92,7 @@ public void cleanup() throws Exception {
*
* @throws Exception
*/
@Test
@Test(priority = Integer.MAX_VALUE)
public void testRemoveLocalClusterOnGlobalNamespace() throws Exception {
log.info("--- Starting ReplicatorTest::testRemoveLocalClusterOnGlobalNamespace ---");

Expand Down Expand Up @@ -115,32 +126,88 @@ public void testRemoveLocalClusterOnGlobalNamespace() throws Exception {
});
}

@Test
public void testForcefullyTopicDeletion() throws Exception {
log.info("--- Starting ReplicatorTest::testForcefullyTopicDeletion ---");

final String namespace = "pulsar/removeClusterTest";
admin1.namespaces().createNamespace(namespace);
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1"));

final String topicName = "persistent://" + namespace + "/topic";

@Cleanup
PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
.build();

ProducerImpl<byte[]> producer1 = (ProducerImpl<byte[]>) client1.newProducer().topic(topicName)
.enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
producer1.close();

admin1.topics().delete(topicName, true);

MockedPulsarServiceBaseTest
.retryStrategically((test) -> !pulsar1.getBrokerService().getTopics().containsKey(topicName), 50, 150);

Assert.assertFalse(pulsar1.getBrokerService().getTopics().containsKey(topicName));
/**
* This is not a formal operation and can cause serious problems if call it in a production environment.
*/
@Test(priority = Integer.MAX_VALUE - 1)
public void testConfigChange() throws Exception {
log.info("--- Starting ReplicatorTest::testConfigChange ---");
// This test is to verify that the config change on global namespace is successfully applied in broker during
// runtime.
// Run a set of producer tasks to create the topics
List<Future<Void>> results = new ArrayList<>();
for (int i = 0; i < 10; i++) {
final TopicName dest = TopicName.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/topic-" + i));

results.add(executor.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {

@Cleanup
MessageProducer producer = new MessageProducer(url1, dest);
log.info("--- Starting producer --- " + url1);

@Cleanup
MessageConsumer consumer = new MessageConsumer(url1, dest);
log.info("--- Starting Consumer --- " + url1);

producer.produce(2);
consumer.receive(2);
return null;
}
}));
}

for (Future<Void> result : results) {
try {
result.get();
} catch (Exception e) {
log.error("exception in getting future result ", e);
fail(String.format("replication test failed with %s exception", e.getMessage()));
}
}

Thread.sleep(1000L);
// Make sure that the internal replicators map contains remote cluster info
ConcurrentOpenHashMap<String, PulsarClient> replicationClients1 = ns1.getReplicationClients();
ConcurrentOpenHashMap<String, PulsarClient> replicationClients2 = ns2.getReplicationClients();
ConcurrentOpenHashMap<String, PulsarClient> replicationClients3 = ns3.getReplicationClients();

Assert.assertNotNull(replicationClients1.get("r2"));
Assert.assertNotNull(replicationClients1.get("r3"));
Assert.assertNotNull(replicationClients2.get("r1"));
Assert.assertNotNull(replicationClients2.get("r3"));
Assert.assertNotNull(replicationClients3.get("r1"));
Assert.assertNotNull(replicationClients3.get("r2"));

// Case 1: Update the global namespace replication configuration to only contains the local cluster itself
admin1.namespaces().setNamespaceReplicationClusters("pulsar/ns", Sets.newHashSet("r1"));

// Wait for config changes to be updated.
Thread.sleep(1000L);

// Make sure that the internal replicators map still contains remote cluster info
Assert.assertNotNull(replicationClients1.get("r2"));
Assert.assertNotNull(replicationClients1.get("r3"));
Assert.assertNotNull(replicationClients2.get("r1"));
Assert.assertNotNull(replicationClients2.get("r3"));
Assert.assertNotNull(replicationClients3.get("r1"));
Assert.assertNotNull(replicationClients3.get("r2"));

// Case 2: Update the configuration back
admin1.namespaces().setNamespaceReplicationClusters("pulsar/ns", Sets.newHashSet("r1", "r2", "r3"));

// Wait for config changes to be updated.
Thread.sleep(1000L);

// Make sure that the internal replicators map still contains remote cluster info
Assert.assertNotNull(replicationClients1.get("r2"));
Assert.assertNotNull(replicationClients1.get("r3"));
Assert.assertNotNull(replicationClients2.get("r1"));
Assert.assertNotNull(replicationClients2.get("r3"));
Assert.assertNotNull(replicationClients3.get("r1"));
Assert.assertNotNull(replicationClients3.get("r2"));

// Case 3: TODO: Once automatic cleanup is implemented, add tests case to verify auto removal of clusters
}

private static final Logger log = LoggerFactory.getLogger(ReplicatorGlobalNSTest.class);

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,11 @@
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
Expand All @@ -68,6 +66,7 @@
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
Expand Down Expand Up @@ -154,88 +153,6 @@ public Object[][] partitionedTopicProvider() {
return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
}

@Test(priority = Integer.MAX_VALUE)
public void testConfigChange() throws Exception {
log.info("--- Starting ReplicatorTest::testConfigChange ---");
// This test is to verify that the config change on global namespace is successfully applied in broker during
// runtime.
// Run a set of producer tasks to create the topics
List<Future<Void>> results = new ArrayList<>();
for (int i = 0; i < 10; i++) {
final TopicName dest = TopicName.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/topic-" + i));

results.add(executor.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {

@Cleanup
MessageProducer producer = new MessageProducer(url1, dest);
log.info("--- Starting producer --- " + url1);

@Cleanup
MessageConsumer consumer = new MessageConsumer(url1, dest);
log.info("--- Starting Consumer --- " + url1);

producer.produce(2);
consumer.receive(2);
return null;
}
}));
}

for (Future<Void> result : results) {
try {
result.get();
} catch (Exception e) {
log.error("exception in getting future result ", e);
fail(String.format("replication test failed with %s exception", e.getMessage()));
}
}

Thread.sleep(1000L);
// Make sure that the internal replicators map contains remote cluster info
ConcurrentOpenHashMap<String, PulsarClient> replicationClients1 = ns1.getReplicationClients();
ConcurrentOpenHashMap<String, PulsarClient> replicationClients2 = ns2.getReplicationClients();
ConcurrentOpenHashMap<String, PulsarClient> replicationClients3 = ns3.getReplicationClients();

Assert.assertNotNull(replicationClients1.get("r2"));
Assert.assertNotNull(replicationClients1.get("r3"));
Assert.assertNotNull(replicationClients2.get("r1"));
Assert.assertNotNull(replicationClients2.get("r3"));
Assert.assertNotNull(replicationClients3.get("r1"));
Assert.assertNotNull(replicationClients3.get("r2"));

// Case 1: Update the global namespace replication configuration to only contains the local cluster itself
admin1.namespaces().setNamespaceReplicationClusters("pulsar/ns", Sets.newHashSet("r1"));

// Wait for config changes to be updated.
Thread.sleep(1000L);

// Make sure that the internal replicators map still contains remote cluster info
Assert.assertNotNull(replicationClients1.get("r2"));
Assert.assertNotNull(replicationClients1.get("r3"));
Assert.assertNotNull(replicationClients2.get("r1"));
Assert.assertNotNull(replicationClients2.get("r3"));
Assert.assertNotNull(replicationClients3.get("r1"));
Assert.assertNotNull(replicationClients3.get("r2"));

// Case 2: Update the configuration back
admin1.namespaces().setNamespaceReplicationClusters("pulsar/ns", Sets.newHashSet("r1", "r2", "r3"));

// Wait for config changes to be updated.
Thread.sleep(1000L);

// Make sure that the internal replicators map still contains remote cluster info
Assert.assertNotNull(replicationClients1.get("r2"));
Assert.assertNotNull(replicationClients1.get("r3"));
Assert.assertNotNull(replicationClients2.get("r1"));
Assert.assertNotNull(replicationClients2.get("r3"));
Assert.assertNotNull(replicationClients3.get("r1"));
Assert.assertNotNull(replicationClients3.get("r2"));

// Case 3: TODO: Once automatic cleanup is implemented, add tests case to verify auto removal of clusters
}

@Test(timeOut = 10000)
public void activeBrokerParse() throws Exception {
pulsar1.getConfiguration().setAuthorizationEnabled(true);
Expand All @@ -253,6 +170,32 @@ public void activeBrokerParse() throws Exception {
pulsar1.getConfiguration().setAuthorizationEnabled(false);
}

@Test
public void testForcefullyTopicDeletion() throws Exception {
log.info("--- Starting ReplicatorTest::testForcefullyTopicDeletion ---");

final String namespace = BrokerTestUtil.newUniqueName("pulsar/removeClusterTest");
admin1.namespaces().createNamespace(namespace);
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1"));

final String topicName = "persistent://" + namespace + "/topic";

@Cleanup
PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
.build();

ProducerImpl<byte[]> producer1 = (ProducerImpl<byte[]>) client1.newProducer().topic(topicName)
.enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
producer1.close();

admin1.topics().delete(topicName, true);

MockedPulsarServiceBaseTest
.retryStrategically((test) -> !pulsar1.getBrokerService().getTopics().containsKey(topicName), 50, 150);

Assert.assertFalse(pulsar1.getBrokerService().getTopics().containsKey(topicName));
}

@SuppressWarnings("unchecked")
@Test(timeOut = 30000)
public void testConcurrentReplicator() throws Exception {
Expand Down Expand Up @@ -1270,7 +1213,7 @@ public void testReplicatedCluster() throws Exception {

log.info("--- Starting ReplicatorTest::testReplicatedCluster ---");

final String namespace = "pulsar/global/repl";
final String namespace = BrokerTestUtil.newUniqueName("pulsar/global/repl");
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/topic1");
admin1.namespaces().createNamespace(namespace);
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2", "r3"));
Expand Down Expand Up @@ -1677,7 +1620,7 @@ public void testReplicatorWithFailedAck() throws Exception {

log.info("--- Starting ReplicatorTest::testReplication ---");

String namespace = "pulsar/global/ns2";
String namespace = BrokerTestUtil.newUniqueName("pulsar/global/ns");
admin1.namespaces().createNamespace(namespace, Sets.newHashSet("r1"));
final TopicName dest = TopicName
.get(BrokerTestUtil.newUniqueName("persistent://" + namespace + "/ackFailedTopic"));
Expand Down Expand Up @@ -1749,7 +1692,7 @@ public void testReplicatorWithFailedAck() throws Exception {
@Test
public void testWhenUpdateReplicationCluster() throws Exception {
log.info("--- testWhenUpdateReplicationCluster ---");
String namespace = "pulsar/ns2";
String namespace = BrokerTestUtil.newUniqueName("pulsar/ns");;
admin1.namespaces().createNamespace(namespace);
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
final TopicName dest = TopicName.get(
Expand Down Expand Up @@ -1778,12 +1721,12 @@ public void testWhenUpdateReplicationCluster() throws Exception {
@Test
public void testReplicatorProducerNotExceed() throws Exception {
log.info("--- testReplicatorProducerNotExceed ---");
String namespace1 = "pulsar/ns11";
String namespace1 = BrokerTestUtil.newUniqueName("pulsar/ns1");
admin1.namespaces().createNamespace(namespace1);
admin1.namespaces().setNamespaceReplicationClusters(namespace1, Sets.newHashSet("r1", "r2"));
final TopicName dest1 = TopicName.get(
BrokerTestUtil.newUniqueName("persistent://" + namespace1 + "/testReplicatorProducerNotExceed1"));
String namespace2 = "pulsar/ns22";
String namespace2 = BrokerTestUtil.newUniqueName("pulsar/ns2");
admin2.namespaces().createNamespace(namespace2);
admin2.namespaces().setNamespaceReplicationClusters(namespace2, Sets.newHashSet("r1", "r2"));
final TopicName dest2 = TopicName.get(
Expand Down

0 comments on commit 6fdc0e3

Please sign in to comment.