Skip to content

Commit

Permalink
Merge pull request #298 from pinterest/memq_241015
Browse files Browse the repository at this point in the history
Fix orion error caused by brokers removed outside orion
  • Loading branch information
yisheng-zhou authored Oct 15, 2024
2 parents 3a37e31 + 2fd308d commit a57b112
Showing 1 changed file with 30 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
Expand All @@ -29,6 +31,7 @@
import com.pinterest.orion.core.PluginConfigurationException;
import com.pinterest.orion.core.memq.MemqCluster;
import com.pinterest.orion.utils.NetworkUtils;
import org.apache.zookeeper.KeeperException;

import static com.pinterest.orion.core.memq.MemqCluster.CLUSTER_CONTEXT;

Expand Down Expand Up @@ -71,8 +74,22 @@ public void sense(MemqCluster cluster) throws Exception {
Map<String, Broker> rawBrokerMap = new HashMap<>();

Gson gson = new Gson();

Set<String> brokersInZookeeper = new HashSet<>();
for (String brokerName : brokerNames) {
byte[] brokerData = zkClient.getData().forPath(BROKERS + "/" + brokerName);
byte[] brokerData = null;
try {
brokerData = zkClient.getData().forPath(BROKERS + "/" + brokerName);
} catch (KeeperException.NoNodeException e) {
cluster.getNodeMap().remove(brokerName);
logger.info(
"Broker data of " + brokerName + " is not available in zookeeper. The broker might be removed.");
continue;
} catch (Exception e) {
logger.severe(
"Faced an unknown exception when getting broker data for " + brokerName +" from zookeeper:" + e);
continue;
}
Broker broker = gson.fromJson(new String(brokerData), Broker.class);
NodeInfo info = new NodeInfo();
info.setClusterId(cluster.getClusterId());
Expand All @@ -96,6 +113,18 @@ public void sense(MemqCluster cluster) throws Exception {
}
hostnames.add(hostname);
}
brokersInZookeeper.add(broker.getBrokerIP());
}

if (brokersInZookeeper.isEmpty()) {
logger.warning("No broker found in zookeeper for cluster " + cluster.getClusterId());
} else {
// Remove brokers that are not in zookeeper from the cluster node map
for (String nodeId : cluster.getNodeMap().keySet()) {
if (!brokersInZookeeper.contains(nodeId)) {
cluster.getNodeMap().remove(nodeId);
}
}
}

Map<String, TopicConfig> topicConfigMap = new HashMap<>();
Expand Down

0 comments on commit a57b112

Please sign in to comment.