-
Notifications
You must be signed in to change notification settings - Fork 132
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-35283] PoC for supporting unique Kafka producer client ids #101
base: main
Are you sure you want to change the base?
[FLINK-35283] PoC for supporting unique Kafka producer client ids #101
Conversation
Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html) |
I think this is very necessary. We can start multiple parallelism in the same JVM |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much for your contribution. Changes mostly LGTM.
I'm wondering if there is a way to test the change. I think a unit test is trivial and doesn't contribute to anything. In the end, we would like to assert that the exception is never thrown but I don't think we can come up with a non-hacky solution. So in this case, I'd add a manual verification in your stead (=I checked the logs of the run and the exception doesn't occur anymore).
* @param subtaskId id of the Kafka producer subtask | ||
* @return clientId | ||
*/ | ||
public static String buildClientId( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to include checkpoint id? At any given time we have at least 2 producers open.
private static void overwriteConfig(Properties properties, String clientIdPrefix, int subtaskId) { | ||
if (clientIdPrefix == null) { | ||
return; | ||
} | ||
String updatedClientId = ClientIdFactory.buildClientId(clientIdPrefix, subtaskId); | ||
overrideProperty(properties, ProducerConfig.CLIENT_ID_CONFIG, updatedClientId); | ||
} | ||
|
||
private static void overrideProperty(Properties properties, String key, String value) { | ||
String userValue = properties.getProperty(key); | ||
if (userValue != null) { | ||
LOG.warn( | ||
String.format( | ||
"Property %s is provided but will be overridden from %s to %s", | ||
key, userValue, value)); | ||
} | ||
properties.setProperty(key, value); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this still needed? looks dead to me.
QQ: do we need to sanatize/validate the prefix? I don't think we currently do it if you manually set the prefix, so it's probably not needed. |
This PR came out of debuging a warning we’re seeing in our Flink logs. We’re running Flink 1.18 and have an application that uses Kafka topics as a source and a sink. We’re running with several tasks. The warning we’re seeing in the logs is:
I’ve spent a bit of time debugging, and it looks like the root cause of this warning is the Flink
KafkaSink
creating multipleKafkaWriter
s that, in turn, create multipleKafkaProducer
s with the same Kafka producerclient.id
. Since the value forclient.id
is used when registering theAppInfo
MBean — when multipleKafkaProducer
s with the sameclient.id
are registered we get the aboveInstanceAlreadyExistsException
. Since we’re running with several tasks and we get a Kafka producer per task this duplicate registration exception makes sense to me.This PR proposes a fix that would update the
KafkaSink.builder
by adding asetClientIdPrefix
method, similar to what we have already on theKafkaSource.builder
.