From d099ac4fa2f217b9c5f0a5e660c83048e829c5d7 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Mon, 28 Aug 2023 14:45:01 +0800 Subject: [PATCH] [improve] [broker] Improve cache handling for partitioned topic metadata when doing lookup (#21063) Motivation: If we set `allowAutoTopicCreationType` to `PARTITIONED`, the flow of the create topic progress is like the below: 1. `Client-side`: Lookup topic to get partitioned topic metadata to create a producer. 1. `Broker-side`: Create partitioned topic metadata. 1. `Broker-side`: response `{"partitions":3}`. 1. `Client-side`: Create separate connections for each partition of the topic. 1. `Broker-side`: Receive 3 connect requests and create 3 partition-topics. In the `step 2` above, the flow of the progress is like the below: 1. Check the policy of topic auto-creation( the policy is `{allowAutoTopicCreationType=PARTITIONED, defaultNumPartitions=3}` ) 1. Check the partitioned topic metadata already exists. 1. Try to create the partitioned topic metadata if it does not exist. 1. If created failed by the partitioned topic metadata already exists( maybe another broker is also creating now), read partitioned topic metadata from the metadata store and respond to the client. There is a race condition that makes the client get non-partitioned metadata of the topic: | time | `broker-1` | `broker-2` | | --- | --- | --- | | 1 | get policy: `PARTITIONED, 3` | get policy: `PARTITIONED, 3` | | 2 | check the partitioned topic metadata already exists | Check the partitioned topic metadata already exists | | 3 | Partitioned topic metadata does not exist, the metadata cache will cache an empty optional for the path | Partitioned topic metadata does not exist, the metadata cache will cache an empty optional for the path | | 4 | | succeed create the partitioned topic metadata | | 5 | Receive a ZK node changed event to invalidate the cache of the partitioned topic metadata | | 6 | Creating the metadata failed due to it already exists | | 7 | Read the partitioned topic metadata again | If `step-5` is executed later than `step-7`, `broker-1` will get an empty optional from the cache of the partitioned topic metadata and respond non-partitioned metadata to the client. **What thing would make the `step-5` is executed later than `step-7`?** Provide a scenario: Such as the issue that the PR https://github.com/apache/pulsar/pull/20303 fixed, it makes `zk operation` and `zk node changed notifications` executed in different threads: `main-thread of ZK client` and `metadata store thread`. Therefore, the mechanism of the lookup partitioned topic metadata is fragile and we need to optimize it. Modifications: Before reading the partitioned topic metadata again, refresh the cache first. --- .../pulsar/broker/service/BrokerService.java | 15 ++- .../service/BrokerServiceChaosTest.java | 103 ++++++++++++++++++ 2 files changed, 116 insertions(+), 2 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceChaosTest.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 6c48e4d9ae89f..199901b090251 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -3050,8 +3050,11 @@ public CompletableFuture fetchPartitionedTopicMetadata if (ex.getCause() instanceof MetadataStoreException .AlreadyExistsException) { + log.info("[{}] The partitioned topic is already" + + " created, try to refresh the cache and read" + + " again.", topicName); // The partitioned topic might be created concurrently - fetchPartitionedTopicMetadataAsync(topicName) + fetchPartitionedTopicMetadataAsync(topicName, true) .whenComplete((metadata2, ex2) -> { if (ex2 == null) { future.complete(metadata2); @@ -3060,6 +3063,9 @@ public CompletableFuture fetchPartitionedTopicMetadata } }); } else { + log.error("[{}] operation of creating partitioned" + + " topic metadata failed", + topicName, ex); future.completeExceptionally(ex); } return null; @@ -3105,9 +3111,14 @@ private CompletableFuture createDefaultPartitionedTopi } public CompletableFuture fetchPartitionedTopicMetadataAsync(TopicName topicName) { + return fetchPartitionedTopicMetadataAsync(topicName, false); + } + + public CompletableFuture fetchPartitionedTopicMetadataAsync(TopicName topicName, + boolean refreshCacheAndGet) { // gets the number of partitions from the configuration cache return pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources() - .getPartitionedTopicMetadataAsync(topicName).thenApply(metadata -> { + .getPartitionedTopicMetadataAsync(topicName, refreshCacheAndGet).thenApply(metadata -> { // if the partitioned topic is not found in metadata, then the topic is not partitioned return metadata.orElseGet(() -> new PartitionedTopicMetadata()); }); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceChaosTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceChaosTest.java new file mode 100644 index 0000000000000..4187364e46f65 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceChaosTest.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import static org.testng.Assert.assertEquals; +import java.nio.charset.StandardCharsets; +import java.util.UUID; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; +import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; +import org.apache.pulsar.common.policies.data.TopicType; +import org.apache.pulsar.common.policies.data.impl.AutoTopicCreationOverrideImpl; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; +import org.awaitility.reflect.WhiteboxImpl; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker") +public class BrokerServiceChaosTest extends CanReconnectZKClientPulsarServiceBaseTest { + + @Override + @BeforeClass(alwaysRun = true, timeOut = 300000) + public void setup() throws Exception { + super.setup(); + } + + @Override + @AfterClass(alwaysRun = true, timeOut = 300000) + public void cleanup() throws Exception { + super.cleanup(); + } + + @Test + public void testFetchPartitionedTopicMetadataWithCacheRefresh() throws Exception { + final String configMetadataStoreConnectString = + WhiteboxImpl.getInternalState(pulsar.getConfigurationMetadataStore(), "zkConnectString"); + final ZooKeeper anotherZKCli = new ZooKeeper(configMetadataStoreConnectString, 5000, null); + // Set policy of auto create topic to PARTITIONED. + final String ns = defaultTenant + "/ns_" + UUID.randomUUID().toString().replaceAll("-", ""); + final TopicName topicName1 = TopicName.get("persistent://" + ns + "/tp1"); + final TopicName topicName2 = TopicName.get("persistent://" + ns + "/tp2"); + admin.namespaces().createNamespace(ns); + AutoTopicCreationOverride autoTopicCreationOverride = + new AutoTopicCreationOverrideImpl.AutoTopicCreationOverrideImplBuilder().allowAutoTopicCreation(true) + .topicType(TopicType.PARTITIONED.toString()) + .defaultNumPartitions(3).build(); + admin.namespaces().setAutoTopicCreationAsync(ns, autoTopicCreationOverride); + // Make the cache of namespace policy is valid. + admin.namespaces().getAutoSubscriptionCreation(ns); + // Trigger the zk node "/admin/partitioned-topics/{namespace}/persistent" created. + admin.topics().createPartitionedTopic(topicName1.toString(), 2); + admin.topics().deletePartitionedTopic(topicName1.toString()); + + // Since there is no partitioned metadata created, the partitions count of metadata will be 0. + PartitionedTopicMetadata partitionedTopicMetadata1 = + pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(topicName2).get(); + assertEquals(partitionedTopicMetadata1.partitions, 0); + + // Create the partitioned metadata by another zk client. + // Make a error to make the cache could not update. + makeLocalMetadataStoreKeepReconnect(); + anotherZKCli.create("/admin/partitioned-topics/" + ns + "/persistent/" + topicName2.getLocalName(), + "{\"partitions\":3}".getBytes(StandardCharsets.UTF_8), + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + stopLocalMetadataStoreAlwaysReconnect(); + + // Get the partitioned metadata from cache, there is 90% chance that partitions count of metadata is 0. + PartitionedTopicMetadata partitionedTopicMetadata2 = + pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(topicName2).get(); + // Note: If you want to reproduce the issue, you can perform validation on the next line. + // assertEquals(partitionedTopicMetadata2.partitions, 0); + + // Verify the new method will return a correct result. + PartitionedTopicMetadata partitionedTopicMetadata3 = + pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(topicName2, true).get(); + assertEquals(partitionedTopicMetadata3.partitions, 3); + + // cleanup. + admin.topics().deletePartitionedTopic(topicName2.toString()); + anotherZKCli.close(); + } +}