-
Notifications
You must be signed in to change notification settings - Fork 52
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
recreate producers and consumers during config reload command #123
Conversation
|
||
if (recreateSucceeded) { | ||
// replace _consumers with newly created consumers | ||
_consumers.clear(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about moving this to the try block and move the else block into the catch block?
_commonConsumerConfigs = new LiKafkaConsumerConfig(originalConfigs); | ||
|
||
for (ClusterConsumerPair<K, V> entry : _consumers) { | ||
getOrCreatePerClusterConsumer(newConsumers, entry.getCluster()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does what we want, but it is not obvious that newConsumers is updated. Can we add a new method called createPerClusterConsumer() and use it here and also refactor getOrCreatePerClusterConsumer()?
convertedConfig.put(entry.getKey(), String.valueOf(entry.getValue())); | ||
} | ||
// TODO: add a flag in RELOAD_CONFIG_RESPONSE message to indicate result of config reload | ||
// TODO: add a flag in RELOAD_CONFIG_RESPONSE message to indicate result of config reload |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as the previous line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you remove line 411?
} | ||
|
||
if (recreateSucceeded) { | ||
// replace _producers with newly created producers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All comments for consumer config reload apply here as well.
LiKafkaConsumer<K, V> curConsumer = createPerClusterConsumer(entry.getCluster()); | ||
newConsumers.add(new ClusterConsumerPair<K, V>(entry.getCluster(), curConsumer)); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, we will need to re-assign/subscribe the previous assignment/subscriptions with the newly created consumers.
...-clients/src/main/java/com/linkedin/kafka/clients/consumer/LiKafkaFederatedConsumerImpl.java
Show resolved
Hide resolved
...-clients/src/main/java/com/linkedin/kafka/clients/consumer/LiKafkaFederatedConsumerImpl.java
Outdated
Show resolved
Hide resolved
_closed = false; | ||
|
||
// keep previous partition assignment after config reload | ||
// TODO: after subscribe() is supported, we need to also keep the original subscription | ||
Map<ClusterDescriptor, Set<TopicPartition>> perClusterTopicPartitionSet = getPerClusterTopicPartitionSet(curPartitions); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar comment as above. Why don't you use a map from cluster descriptor to topic partition set to begin with by calling per-cluster consumer's assignment()?
Re-create producers and consumers as part of config reload command instead of lazy construction to avoid delay of errors and user impact.