-
Notifications
You must be signed in to change notification settings - Fork 52
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Federated Clients: implement various federated consumer methods #119
base: master
Are you sure you want to change the base?
Conversation
60fafdf
to
d9f853a
Compare
...e-kafka-clients/src/main/java/com/linkedin/kafka/clients/consumer/LiKafkaConsumerConfig.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
@Override | ||
public void commitAsync(OffsetCommitCallback callback) { | ||
throw new UnsupportedOperationException("Not implemented yet"); | ||
for (ClusterConsumerPair<K, V> entry : getImmutableConsumerList()) { | ||
entry.getConsumer().commitAsync(callback); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this would call a user-provided callback N times for N consumers. we need to aggregate them 1st and call the user callback only once
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for catching this. Added a wrapper callback, which will call the user callback once.
List<ClusterConsumerPair<K, V>> consumers = getImmutableConsumerList(); | ||
for (Map.Entry<ClusterDescriptor, Map<TopicPartition, OffsetAndMetadata>> entry : | ||
getPartitionValueMapByCluster(offsets).entrySet()) { | ||
getOrCreatePerClusterConsumer(consumers, entry.getKey()).commitAsync(entry.getValue(), callback); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similar to above - user-provided callbacks should be called once
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
throw new UnsupportedOperationException("Not implemented yet"); | ||
List<ClusterConsumerPair<K, V>> consumers = getImmutableConsumerList(); | ||
for (Map.Entry<ClusterDescriptor, Collection<TopicPartition>> entry : getPartitionsByCluster(partitions).entrySet()) { | ||
getOrCreatePerClusterConsumer(consumers, entry.getKey()).seekToBeginning(entry.getValue()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure if seek() should result in the creation of a consumer - i think only subscribe/assign should
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current implementation of config reload closes clients but does not recreate them. They will be recreated at the subsequent operation. If we keep this "lazy" reload behavior, seek() may need to create a consumer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i dont know what the lazy approach gets us - we expect these clients to be active.
@@ -534,27 +694,17 @@ public void uncaughtException(Thread t, Throwable e) { | |||
try { | |||
if (!countDownLatch.await(deadlineTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS)) { | |||
LiKafkaClientsUtils.dumpStacksForAllLiveThreads(threads); | |||
throw new KafkaException("Fail to close all consumers for cluster group " + _clusterGroup + " in " + | |||
timeout + " " + timeUnit); | |||
throw new KafkaException("fail to perform " + methodName + " for cluster group " + _clusterGroup + " in " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for bonus points could iterate over threads still alive and print their stacks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that's what the line above (LiKafkaClientsUtils.dumpStacksForAllLiveThreads(threads)) is already doing? Did you mean something else?
d749b92
to
c823088
Compare
...-clients/src/main/java/com/linkedin/kafka/clients/consumer/LiKafkaFederatedConsumerImpl.java
Outdated
Show resolved
Hide resolved
...-clients/src/main/java/com/linkedin/kafka/clients/consumer/LiKafkaFederatedConsumerImpl.java
Outdated
Show resolved
Hide resolved
...-clients/src/main/java/com/linkedin/kafka/clients/consumer/LiKafkaFederatedConsumerImpl.java
Outdated
Show resolved
Hide resolved
...-clients/src/main/java/com/linkedin/kafka/clients/consumer/LiKafkaFederatedConsumerImpl.java
Outdated
Show resolved
Hide resolved
...-clients/src/main/java/com/linkedin/kafka/clients/consumer/LiKafkaFederatedConsumerImpl.java
Outdated
Show resolved
Hide resolved
862daa2
to
3c96cb7
Compare
3c96cb7
to
6a3427b
Compare
No description provided.