Skip to content

Commit

Permalink
Add support for config push at boot up time (#130)
Browse files Browse the repository at this point in the history
* Add support for config push at boot up time for federated client
  • Loading branch information
kehuum authored Aug 1, 2019
1 parent 32eb6db commit e13442b
Show file tree
Hide file tree
Showing 9 changed files with 189 additions and 74 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ project.ext {
}
}
kafkaVersion = "2.0.1"
marioVersion = "0.0.6"
marioVersion = "0.0.8"
}

subprojects {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import com.linkedin.kafka.clients.metadataservice.MetadataServiceClient;
import com.linkedin.kafka.clients.metadataservice.MetadataServiceClientException;
import com.linkedin.kafka.clients.utils.LiKafkaClientsUtils;
import com.linkedin.mario.common.websockets.MsgType;
import com.linkedin.mario.common.websockets.MessageType;

import java.time.Duration;
import java.util.ArrayList;
Expand Down Expand Up @@ -96,6 +96,9 @@ public LiKafkaConsumer<K, V> getConsumer() {
// Consumer configs common to all clusters
private LiKafkaConsumerConfig _commonConsumerConfigs;

// Consumer configs received from Conductor at boot up time
private Map<String, String> _bootupConfigsFromConductor;

// max.poll.records for the federated consumer
private int _maxPollRecordsForFederatedConsumer;

Expand All @@ -113,6 +116,9 @@ public LiKafkaConsumer<K, V> getConsumer() {

private int _nextClusterIndexToPoll;

// Number of consumers that successfully applied configs at boot up time, used for testing only
private Set<LiKafkaConsumer<K, V>> _numConsumersWithBootupConfigs = new HashSet<>();

// The prefix of the client.id property to be used for individual consumers. Since the client id is part of the
// consumer metric keys, we need to use cluster-specific client ids to differentiate metrics from different clusters.
// Per-cluster client id is generated by appending the cluster name to this prefix.
Expand Down Expand Up @@ -203,29 +209,17 @@ private LiKafkaFederatedConsumerImpl(LiKafkaConsumerConfig configs, MetadataServ
_closed = false;
_numConfigReloads = 0;

try {
// Instantiate metadata service client if necessary.
_mdsClient = mdsClient != null ? mdsClient :
configs.getConfiguredInstance(LiKafkaConsumerConfig.METADATA_SERVICE_CLIENT_CLASS_CONFIG, MetadataServiceClient.class);

// Register this federated client with the metadata service. The metadata service will assign a UUID to this
// client, which will be used for later interaction between the metadata service and the client.
//
// Registration may also return further information such as the metadata server version and any protocol settings.
// We assume that such information will be kept and used by the metadata service client itself.
//
// TODO: make sure this is not blocking indefinitely and also works when Mario is not available.
_mdsClient.registerFederatedClient(this, _clusterGroup, configs.originals(), _mdsRequestTimeoutMs);
} catch (Exception e) {
try {
if (_mdsClient != null) {
_mdsClient.close(_mdsRequestTimeoutMs);
}
} catch (Exception e2) {
e.addSuppressed(e2);
}
throw e;
}
// Instantiate metadata service client if necessary.
_mdsClient = mdsClient != null ? mdsClient :
configs.getConfiguredInstance(LiKafkaConsumerConfig.METADATA_SERVICE_CLIENT_CLASS_CONFIG, MetadataServiceClient.class);

// Register this federated client with the metadata service. The metadata service will assign a UUID to this
// client, which will be used for later interaction between the metadata service and the client.
//
// Registration may also return further information such as the metadata server version and any protocol settings.
// We assume that such information will be kept and used by the metadata service client itself.
//
_mdsClient.registerFederatedClient(this, _clusterGroup, configs.originals(), _mdsRequestTimeoutMs);

// Create a watchdog thread that polls the creation of nonexistent topics in the current assignment/subscription
// and re-assign/subscribe if any of them have been created since the last poll.
Expand Down Expand Up @@ -665,6 +659,14 @@ public FederatedSubscriptionState getCurrentSubscription() {
return _currentSubscription;
}

int getNumConsumersWithBootupConfigs() {
return _numConsumersWithBootupConfigs.size();
}

int getNumConfigReloads() {
return _numConfigReloads;
}

private void closeNoLock(Duration timeout) {
// Get an immutable copy of the current set of consumers.
List<ClusterConsumerPair<K, V>> consumers = _consumers;
Expand Down Expand Up @@ -731,6 +733,17 @@ public LiKafkaConsumerConfig getCommonConsumerConfigs() {
return _commonConsumerConfigs;
}

synchronized public void applyBootupConfigFromConductor(Map<String, String> configs) {
_bootupConfigsFromConductor = configs;

// Only try to recreate per-cluster consumers when _consumers are initialized, i.e. after subscribe/assign has
// been called, otherwise it's impossible to create per-cluster consumers without the topic-cluster information,
// just save the boot up configs
if (_consumers != null && !_consumers.isEmpty()) {
recreateConsumers(configs, null);
}
}

public void reloadConfig(Map<String, String> newConfigs, UUID commandId) {
// Go over each consumer, close them, and update existing configs with newConfigs. Since each per-cluster consumer will be
// instantiated when the client began consuming from that cluster, we just need to clear the mappings and update the configs.
Expand Down Expand Up @@ -772,14 +785,18 @@ synchronized void recreateConsumers(Map<String, String> newConfigs, UUID command
// update existing configs with newConfigs
// originals() would return a copy of the internal consumer configs, put in new configs and update existing configs
Map<String, Object> configMap = _commonConsumerConfigs.originals();
configMap.putAll(newConfigs);
_commonConsumerConfigs = new LiKafkaConsumerConfig(configMap);

if (newConfigs != null && !newConfigs.isEmpty()) {
configMap.putAll(newConfigs);
_commonConsumerConfigs = new LiKafkaConsumerConfig(configMap);
}

// re-create per-cluster consumers with new set of configs
// if any error occurs, recreate all per-cluster consumers with previous last-known-good configs and
// TODO : send an error back to Mario
// _consumers should be filled when reload config happens
List<ClusterConsumerPair<K, V>> newConsumers = new ArrayList<>();
boolean recreateConsumerSuccessful = false;

try {
for (ClusterConsumerPair<K, V> entry : _consumers) {
Expand All @@ -790,6 +807,7 @@ synchronized void recreateConsumers(Map<String, String> newConfigs, UUID command
// replace _consumers with newly created consumers
_consumers.clear();
_consumers = newConsumers;
recreateConsumerSuccessful = true;
} catch (Exception e) {
// if any exception occurs, re-create per-cluster consumers with last-known-good configs
LOG.error("Failed to recreate per-cluster consumers with new config with exception, restore to previous consumers ", e);
Expand Down Expand Up @@ -819,8 +837,7 @@ synchronized void recreateConsumers(Map<String, String> newConfigs, UUID command
for (Map.Entry<String, Object> entry : _commonConsumerConfigs.originals().entrySet()) {
convertedConfig.put(entry.getKey(), String.valueOf(entry.getValue()));
}
// TODO: Add a flag in RELOAD_CONFIG_RESPONSE message to indicate whether re-creating per-cluster consumers is successful
_mdsClient.reportCommandExecutionComplete(commandId, convertedConfig, MsgType.RELOAD_CONFIG_RESPONSE);
_mdsClient.reportCommandExecutionComplete(commandId, convertedConfig, MessageType.RELOAD_CONFIG_RESPONSE, recreateConsumerSuccessful);

// re-register federated client with updated configs
_mdsClient.reRegisterFederatedClient(newConfigs);
Expand Down Expand Up @@ -879,9 +896,23 @@ private LiKafkaConsumer<K, V> createPerClusterConsumer(ClusterDescriptor cluster
configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapUrl());
configMap.put(ConsumerConfig.CLIENT_ID_CONFIG, _clientIdPrefix + "-" + cluster.getName());

Map<String, Object> configMapWithBootupConfig = new HashMap<>(configMap);
// Apply the configs received from Conductor at boot up/registration time
if (_bootupConfigsFromConductor != null && !_bootupConfigsFromConductor.isEmpty()) {
configMapWithBootupConfig.putAll(_bootupConfigsFromConductor);
}

int maxPollRecordsPerCluster = Math.max(_maxPollRecordsForFederatedConsumer / _numClustersToConnectTo, 1);
configMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecordsPerCluster);
LiKafkaConsumer<K, V> newConsumer = _consumerBuilder.setConsumerConfig(configMap).build();
LiKafkaConsumer<K, V> newConsumer;

try {
newConsumer = _consumerBuilder.setConsumerConfig(configMapWithBootupConfig).build();
_numConsumersWithBootupConfigs.add(newConsumer);
} catch (Exception e) {
LOG.error("Failed to create per-cluster consumer with config {}, try creating with config {}", configMapWithBootupConfig, configMap);
newConsumer = _consumerBuilder.setConsumerConfig(configMap).build();
}
return newConsumer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.linkedin.mario.common.websockets.MarioCommandCallback;
import com.linkedin.mario.common.websockets.Messages;

import com.linkedin.mario.common.websockets.RegisterResponseMessages;
import com.linkedin.mario.common.websockets.ReloadConfigRequestMessages;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -31,19 +32,30 @@ class MarioCommandCallbackImpl implements MarioCommandCallback {
public void onReceivingCommand(Messages marioCommandMessage) {
// Find a federate client callback that matches the given Mario command message type and execute it with arguments
// included in the message.
LiKafkaFederatedClientType clientType = _federatedClient.getClientType();

switch (marioCommandMessage.getMsgType()) {
case RELOAD_CONFIG_REQUEST:
ReloadConfigRequestMessages reloadConfigMsg = (ReloadConfigRequestMessages) marioCommandMessage;

if (_federatedClient.getClientType() == LiKafkaFederatedClientType.FEDERATED_PRODUCER) {
if (clientType == LiKafkaFederatedClientType.FEDERATED_PRODUCER) {
// Call producer reload config method
((LiKafkaFederatedProducerImpl) _federatedClient).reloadConfig(reloadConfigMsg.getConfigs(), reloadConfigMsg.getCommandId());
} else {
// call consumer reload config method
((LiKafkaFederatedConsumerImpl) _federatedClient).reloadConfig(reloadConfigMsg.getConfigs(), reloadConfigMsg.getCommandId());
}
break;
case REGISTER_RESPONSE:
// Upon receiving register response from conductor, federated client would save the configs from the message and apply the
// configs when actually creating the per-cluster clients
RegisterResponseMessages registerResponseMessage = (RegisterResponseMessages) marioCommandMessage;
if (clientType == LiKafkaFederatedClientType.FEDERATED_PRODUCER) {
((LiKafkaFederatedProducerImpl) _federatedClient).applyBootupConfigFromConductor(registerResponseMessage.getConfigs());
} else {
((LiKafkaFederatedConsumerImpl) _federatedClient).applyBootupConfigFromConductor(registerResponseMessage.getConfigs());
}
break;
default:
// No current support at the moment
throw new UnsupportedOperationException("command " + marioCommandMessage.getMsgType() + " is not supported");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
import com.linkedin.mario.common.models.v1.TopicQueryResults;
import com.linkedin.mario.common.websockets.MarioCommandCallback;

import com.linkedin.mario.common.websockets.MarioException;
import com.linkedin.mario.common.websockets.Messages;
import com.linkedin.mario.common.websockets.MsgType;
import com.linkedin.mario.common.websockets.MessageType;
import com.linkedin.mario.common.websockets.ReloadConfigResponseMessages;
import java.util.Arrays;
import java.util.HashMap;
Expand Down Expand Up @@ -58,7 +59,19 @@ public void registerFederatedClient(LiKafkaFederatedClient federatedClient, Clus
MarioClusterGroupDescriptor marioClusterGroup = new MarioClusterGroupDescriptor(clusterGroup.getName(),
clusterGroup.getEnvironment());
MarioCommandCallback marioCommandCallback = new MarioCommandCallbackImpl(federatedClient);
_marioClient.registerFederatedClient(marioClusterGroup, (Map<String, String>) configs, timeoutMs, marioCommandCallback);

try {
_marioClient.registerFederatedClient(marioClusterGroup, (Map<String, String>) configs, timeoutMs,
marioCommandCallback);
} catch (MarioException e) {
// Based on the exception thrown, different actions might be taken, e.g. if mario returns a client version
// too low/not supported exception, we can stop the client immediately; otherwise if it's mario temperarily
// we can continue to create clients with original config and let mario client retry connecting in the background,
// once mario is available,
//
// For now, just print error and continue with original config
e.printStackTrace();
}
}

@Override
Expand Down Expand Up @@ -191,11 +204,11 @@ private Map<String, Set<ClusterDescriptor>> getClusterMapForTopics(Set<String> t
}

@Override
public void reportCommandExecutionComplete(UUID commandId, Map<String, String> configs, MsgType messageType) {
public void reportCommandExecutionComplete(UUID commandId, Map<String, String> configs, MessageType messageType, boolean commandExecutionResult) {
Messages messageToSent;
switch (messageType) {
case RELOAD_CONFIG_RESPONSE:
messageToSent = new ReloadConfigResponseMessages(configs, commandId);
messageToSent = new ReloadConfigResponseMessages(configs, commandId, commandExecutionResult);
break;
default:
throw new UnsupportedOperationException("Message type " + messageType + " is not supported right now");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import com.linkedin.kafka.clients.common.PartitionLookupResult;
import com.linkedin.kafka.clients.common.TopicLookupResult;

import com.linkedin.mario.common.websockets.MsgType;
import com.linkedin.mario.common.websockets.MessageType;
import java.util.Map;
import java.util.Set;

Expand Down Expand Up @@ -67,11 +67,12 @@ TopicLookupResult getClustersForTopics(Set<String> topics, ClusterGroupDescripto

/**
* Report to mario server that command execution for commandId is completed
* @param commandId UUID identifying the completed command
* @param configs config diff before and after the command
* @param messageType response message type to mario server
* @param commandId UUID identifying the completed command
* @param configs config diff before and after the command
* @param messageType response message type to mario server
* @param commandExecutionResult execution result of the given command
*/
void reportCommandExecutionComplete(UUID commandId, Map<String, String> configs, MsgType messageType);
void reportCommandExecutionComplete(UUID commandId, Map<String, String> configs, MessageType messageType, boolean commandExecutionResult);

/**
* Re-register federated client with new set of configs
Expand Down
Loading

0 comments on commit e13442b

Please sign in to comment.