Skip to content

Commit

Permalink
Hide user info (#22)
Browse files Browse the repository at this point in the history
* Enable Service Provider Relocation (#17) (#4)

Schema Registry Client's basic HTTP Authentication support is
implemented through a ServiceProvider.  Without handling the fact that
relocating the schema client also relocates the service implementations,
no implementations are found when the client attempts to find a strategy
that matches an authentication source type specified through
basic.auth.credentials.source...

* Modify USER_INFO fields to use type Password instead of String

Kafka Connect logs connector configurations before launching
them, which is a problem if some of those configuration
properties happen to contain sensitive information that does
not belong in a log file, such as any Basic HTTP Authentication
credentials MirrorTool ahs been configured to make use of.

Kakfa Connect provides a `Password` data type that is always
masked on display.  It was relatifely simple to change both
the USER_INFO fields recently added to use PASSWORD instead
of STRING as their data types.

The URL field can sometimes also contain a password, when
the authentiation source is set to URL instead of USER_INFO.
There is no way to make these data types conditional, so
it is not possible to make URL of type PASSWORD if the
credential source is URL, while it is also of type STRING
if the credential source is not URL.  Since the credential
format when using URL is the same as it is when using
USER_INFO, and there is arguably a good reason to not mask
the rest of the URL, the URL fields continue to have type
String here.
  • Loading branch information
jheinnic authored and OneCricketeer committed Sep 3, 2019
1 parent f961af9 commit 8b76edb
Showing 1 changed file with 6 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ public SchemaRegistryTransfer() {
.define(ConfigName.SRC_SCHEMA_REGISTRY_URL, ConfigDef.Type.LIST, ConfigDef.NO_DEFAULT_VALUE, new NonEmptyListValidator(), ConfigDef.Importance.HIGH, SRC_SCHEMA_REGISTRY_CONFIG_DOC)
.define(ConfigName.DEST_SCHEMA_REGISTRY_URL, ConfigDef.Type.LIST, ConfigDef.NO_DEFAULT_VALUE, new NonEmptyListValidator(), ConfigDef.Importance.HIGH, DEST_SCHEMA_REGISTRY_CONFIG_DOC)
.define(ConfigName.SRC_BASIC_AUTH_CREDENTIALS_SOURCE, ConfigDef.Type.STRING, SRC_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DEFAULT, ConfigDef.Importance.MEDIUM, SRC_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DOC)
.define(ConfigName.SRC_USER_INFO, ConfigDef.Type.STRING, SRC_USER_INFO_CONFIG_DEFAULT, ConfigDef.Importance.MEDIUM, SRC_USER_INFO_CONFIG_DOC)
.define(ConfigName.SRC_USER_INFO, ConfigDef.Type.PASSWORD, SRC_USER_INFO_CONFIG_DEFAULT, ConfigDef.Importance.MEDIUM, SRC_USER_INFO_CONFIG_DOC)
.define(ConfigName.DEST_BASIC_AUTH_CREDENTIALS_SOURCE, ConfigDef.Type.STRING, DEST_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DEFAULT, ConfigDef.Importance.MEDIUM, DEST_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DOC)
.define(ConfigName.DEST_USER_INFO, ConfigDef.Type.STRING, DEST_USER_INFO_CONFIG_DEFAULT, ConfigDef.Importance.MEDIUM, DEST_USER_INFO_CONFIG_DOC)
.define(ConfigName.DEST_USER_INFO, ConfigDef.Type.PASSWORD, DEST_USER_INFO_CONFIG_DEFAULT, ConfigDef.Importance.MEDIUM, DEST_USER_INFO_CONFIG_DOC)
.define(ConfigName.SCHEMA_CAPACITY, ConfigDef.Type.INT, SCHEMA_CAPACITY_CONFIG_DEFAULT, ConfigDef.Importance.LOW, SCHEMA_CAPACITY_CONFIG_DOC)
.define(ConfigName.TRANSFER_KEYS, ConfigDef.Type.BOOLEAN, TRANSFER_KEYS_CONFIG_DEFAULT, ConfigDef.Importance.MEDIUM, TRANSFER_KEYS_CONFIG_DOC)
.define(ConfigName.INCLUDE_HEADERS, ConfigDef.Type.BOOLEAN, INCLUDE_HEADERS_CONFIG_DEFAULT, ConfigDef.Importance.MEDIUM, INCLUDE_HEADERS_CONFIG_DOC)
Expand All @@ -101,14 +101,16 @@ public void configure(Map<String, ?> props) {
sourceProps.put(AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE,
config.getString(ConfigName.SRC_BASIC_AUTH_CREDENTIALS_SOURCE));
sourceProps.put(AbstractKafkaAvroSerDeConfig.USER_INFO_CONFIG,
config.getString(ConfigName.SRC_USER_INFO));
config.getPassword(ConfigName.SRC_USER_INFO)
.value());

List<String> destUrls = config.getList(ConfigName.DEST_SCHEMA_REGISTRY_URL);
final Map<String, String> destProps = new HashMap<>();
destProps.put(AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE,
config.getString(ConfigName.DEST_BASIC_AUTH_CREDENTIALS_SOURCE));
destProps.put(AbstractKafkaAvroSerDeConfig.USER_INFO_CONFIG,
config.getString(ConfigName.DEST_USER_INFO));
config.getPassword(ConfigName.DEST_USER_INFO)
.value());

Integer schemaCapacity = config.getInt(ConfigName.SCHEMA_CAPACITY);

Expand Down

0 comments on commit 8b76edb

Please sign in to comment.