diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkBase.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkBase.java index 5d1887c2..00d1cccd 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkBase.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkBase.java @@ -214,6 +214,9 @@ public FlinkPulsarSinkBase( if (this.clientConfigurationData.getServiceUrl() == null) { throw new IllegalArgumentException("ServiceUrl must be supplied in the client configuration"); } + + // setup auth parameters based on ClientConfigurationData and Properties + PulsarClientUtils.setupAuthIfNeed(clientConfigurationData, properties); } public FlinkPulsarSinkBase( diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSource.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSource.java index 3edd5ce4..9f29088a 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSource.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSource.java @@ -257,6 +257,9 @@ public FlinkPulsarSource( throw new IllegalArgumentException("ServiceUrl must be supplied in the client configuration"); } this.oldStateVersion = SourceSinkUtils.getOldStateVersion(caseInsensitiveParams, oldStateVersion); + + // setup auth parameters based on ClientConfigurationData and Properties + PulsarClientUtils.setupAuthIfNeed(clientConfigurationData, properties); } public FlinkPulsarSource( diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarClientUtils.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarClientUtils.java index f447d7f1..7a467a11 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarClientUtils.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarClientUtils.java @@ -53,4 +53,20 @@ public static ClientConfigurationData newClientConf(String serviceUrl, Propertie return clientConf; } + public static void setupAuthIfNeed(ClientConfigurationData conf, Properties properties) { + if (!StringUtils.isBlank(conf.getAuthPluginClassName()) + && (!StringUtils.isBlank(conf.getAuthParams()) || conf.getAuthParamMap() != null)) { + // User has set up auth with ClientConfigurationData, which has the highest priority. + } else { + if (properties != null + && !StringUtils.isBlank(properties.getProperty(PulsarOptions.AUTH_PLUGIN_CLASSNAME_KEY)) + && !StringUtils.isBlank(properties.getProperty(PulsarOptions.AUTH_PARAMS_KEY))) { + // User only set up auth with Properties. Copy the properties to ClientConfigurationData. + conf.setAuthParams(properties.getProperty(PulsarOptions.AUTH_PARAMS_KEY)); + conf.setAuthPluginClassName(properties.getProperty(PulsarOptions.AUTH_PLUGIN_CLASSNAME_KEY)); + } else { + // User does not enable authentication. + } + } + } }