Skip to content

Commit

Permalink
[FSTORE-1273] Get Kafka brokers using admin client (#1499)
Browse files Browse the repository at this point in the history
  • Loading branch information
bubriks authored Feb 27, 2024
1 parent d4043be commit 0786ca7
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,16 @@
import io.hops.hopsworks.servicediscovery.tags.KafkaTags;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.SslConfigs;

import javax.ejb.ConcurrencyManagement;
Expand All @@ -41,9 +45,16 @@
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;

import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand Down Expand Up @@ -205,5 +216,24 @@ public DescribeTopicsResult describeTopics(FeatureStoreKafkaConnectorDTO connect
return adminClient.describeTopics(topics);
}
}

public Set<String> getBrokerEndpoints() {
Set<String> kafkaBrokers = new HashSet<>();
try (AdminClient adminClient = AdminClient.create(getHopsworksKafkaProperties())) {
Collection<Node> clusterDetails = adminClient.describeCluster().nodes().get(5, TimeUnit.SECONDS);
for (Node node : clusterDetails) {
ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, node.idString());
DescribeConfigsResult describeConfigsResult =
adminClient.describeConfigs(Collections.singleton(configResource));
Map<ConfigResource, Config> configMap = describeConfigsResult.all().get();
Config config = configMap.get(configResource);
String advertisedListeners = config.get("advertised.listeners").value();
kafkaBrokers.addAll(Arrays.asList(advertisedListeners.split(",")));
}
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Could not get Kafka broker information", e);
}
return kafkaBrokers;
}
//endregion
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,42 +16,27 @@

package io.hops.hopsworks.common.kafka;

import com.logicalclocks.servicediscoverclient.exceptions.ServiceDiscoveryException;
import com.logicalclocks.servicediscoverclient.resolvers.Type;
import com.logicalclocks.servicediscoverclient.service.ServiceQuery;
import io.hops.hopsworks.common.dao.kafka.KafkaConst;
import io.hops.hopsworks.common.hosts.ServiceDiscoveryController;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.servicediscovery.HopsworksService;
import io.hops.hopsworks.servicediscovery.tags.ZooKeeperTags;
import org.apache.commons.lang3.StringUtils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import io.hops.hopsworks.common.dao.kafka.HopsKafkaAdminClient;

import org.apache.commons.lang3.StringUtils;
import javax.annotation.PostConstruct;
import javax.ejb.EJB;
import javax.ejb.Lock;
import javax.ejb.LockType;
import javax.ejb.Singleton;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;

@Singleton
@TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
public class KafkaBrokers {
private final static Logger LOGGER = Logger.getLogger(KafkaBrokers.class.getName());

@EJB
private ServiceDiscoveryController serviceDiscoveryController;
protected HopsKafkaAdminClient hopsKafkaAdminClient;

public enum BrokerProtocol {
INTERNAL,
Expand All @@ -63,12 +48,8 @@ public enum BrokerProtocol {
@PostConstruct
@Lock(LockType.WRITE)
public void setBrokerEndpoints() {
try {
this.kafkaBrokers.clear();
this.kafkaBrokers.addAll(getBrokerEndpoints());
} catch (IOException | KeeperException | InterruptedException ex) {
LOGGER.log(Level.SEVERE, null, ex);
}
this.kafkaBrokers.clear();
this.kafkaBrokers.addAll(hopsKafkaAdminClient.getBrokerEndpoints());
}

@Lock(LockType.READ)
Expand All @@ -87,59 +68,4 @@ public String getBrokerEndpointsString(BrokerProtocol protocol) {
}
return null;
}

private Set<String> getBrokerEndpoints()
throws IOException, KeeperException, InterruptedException {
try {
String zkConnectionString = getZookeeperConnectionString();
try (ZooKeeper zk = new ZooKeeper(zkConnectionString, Settings.ZOOKEEPER_SESSION_TIMEOUT_MS, watchedEvent -> {
})) {
return zk.getChildren("/brokers/ids", false).stream()
.map(bi -> getBrokerInfo(zk, bi))
.filter(StringUtils::isNoneEmpty)
.map(bi -> bi.split(KafkaConst.DLIMITER))
.flatMap(Arrays::stream)
.filter(this::isValidBrokerInfo)
.collect(Collectors.toSet());
}
} catch (ServiceDiscoveryException ex) {
throw new IOException(ex);
} catch (RuntimeException ex) {
if (ex.getCause() instanceof KeeperException) {
throw (KeeperException) ex.getCause();
}
if (ex.getCause() instanceof InterruptedException) {
throw (InterruptedException) ex.getCause();
}
throw ex;
}
}

private String getZookeeperConnectionString() throws ServiceDiscoveryException {
return serviceDiscoveryController.getService(
Type.DNS, ServiceQuery.of(
serviceDiscoveryController.constructServiceFQDN(
HopsworksService.ZOOKEEPER.getNameWithTag(ZooKeeperTags.client)),
Collections.emptySet()))
.map(zkServer -> zkServer.getAddress() + ":" + zkServer.getPort())
.collect(Collectors.joining(","));
}

private String getBrokerInfo(ZooKeeper zk, String brokerId) {
try {
return new String(zk.getData("/brokers/ids/" + brokerId, false, null));
} catch (KeeperException | InterruptedException ex) {
LOGGER.log(Level.SEVERE, "Could not get Kafka broker information", ex);
throw new RuntimeException(ex);
}
}

private boolean isValidBrokerInfo(String brokerInfo) {
String[] brokerProtocolNames = Arrays
.stream(BrokerProtocol.values())
.map(Enum::name)
.toArray(String[]::new);
return StringUtils.startsWithAny(brokerInfo, brokerProtocolNames)
&& brokerInfo.contains(KafkaConst.SLASH_SEPARATOR);
}
}

0 comments on commit 0786ca7

Please sign in to comment.