From cf5d375782cc752c9b89ee422d298604b7f3aaf8 Mon Sep 17 00:00:00 2001 From: John Heinnickel <2162075+jheinnic@users.noreply.github.com> Date: Mon, 23 Sep 2019 07:53:24 -0700 Subject: [PATCH] Cure HTTP Authentication Singleton's Password Overwrite (#19) * Enable Service Provider Relocation (#17) (#4) Schema Registry Client's basic HTTP Authentication support is implemented through a ServiceProvider. Without handling the fact that relocating the schema client also relocates the service implementations, no implementations are found when the client attempts to find a strategy that matches an authentication source type specified through basic.auth.credentials.source... * Improve test cases that distinguish between source and destination credentials -- Split credentials fixture value pair into two distinct value pairs, one for source registry clients, another for destinations. -- Add test case verifying source authentication properties reach source registry client -- Add test case verifying destination authentication properties reach destination registry client -- Add test cases verifying expected exception when incorrect credentials are passed to source and/or destination registry client. -- Add test case using distinct credentials for both source and destination in same execution and using same authentication source strategy (currently does not pass!) * Compensate for Basic HTTP Authentication's Singleton Implementation The fact that Basic HTTP Authentication as implemented in the Kafka Connect Client uses a singleton to hold configured credentials means that if both the source and destination schema registries require basic HTTP authentication and want to provide credentials via `basic.auth.credentials.source`, the second set of credentials will overwrite and replace the first` Connect's three singletons for Basic HTTP Authentication are selected by the same three key values used in `basic.auth.credentials.source` to designate which represented algorithm to use. This commit begins compensating for these singletons first by creating and registering two additional copies of the Basic Auth singletons in the SMT's code base. One is intended for use by the source broker's schema registry client, the other is for destinaation broker's registry. The only intentional difference between what is built here and the production Kafka Connect namespac is the addition of a short prefix to distinguish `SRC_` from `DEST_`. Now, when removing a prefix it uses to broker input to one adapter or the orther, in addition to selecting which configuration hash destrination to use, it also adds that prefix to the value it provides for `basic.auth.credentials.source`. As a result, the source and destination schema registry clients will each now use a diffent singleton to hold onto their credentials with two distinct singletons. This work relies on addition of ServicesRouterTransformer to he maven shade plugin that was recently reviewd and released. --- .../transforms/SchemaRegistryTransfer.java | 4 +- .../DestSaslBasicAuthCredentialProvider.java | 11 +++++ .../DestUrlBasicAuthCredentialProvider.java | 11 +++++ .../DestUserInfoCredentialProvider.java | 12 +++++ .../SrcSaslBasicAuthCredentialProvider.java | 11 +++++ .../SrcUrlBasicAuthCredentialProvider.java | 11 +++++ .../SrcUserInfoCredentialProvider.java | 12 +++++ ...rity.basicauth.BasicAuthCredentialProvider | 6 +++ .../kafka/connect/transforms/Constants.java | 4 +- .../transforms/SchemaRegistryMock.java | 5 ++- .../connect/transforms/TransformTest.java | 45 +++++++++++++++---- 11 files changed, 120 insertions(+), 12 deletions(-) create mode 100644 src/main/java/cricket/jmoore/security/basicauth/DestSaslBasicAuthCredentialProvider.java create mode 100644 src/main/java/cricket/jmoore/security/basicauth/DestUrlBasicAuthCredentialProvider.java create mode 100644 src/main/java/cricket/jmoore/security/basicauth/DestUserInfoCredentialProvider.java create mode 100644 src/main/java/cricket/jmoore/security/basicauth/SrcSaslBasicAuthCredentialProvider.java create mode 100644 src/main/java/cricket/jmoore/security/basicauth/SrcUrlBasicAuthCredentialProvider.java create mode 100644 src/main/java/cricket/jmoore/security/basicauth/SrcUserInfoCredentialProvider.java create mode 100644 src/main/resources/META-INF/services/io.confluent.kafka.schemaregistry.client.security.basicauth.BasicAuthCredentialProvider diff --git a/src/main/java/cricket/jmoore/kafka/connect/transforms/SchemaRegistryTransfer.java b/src/main/java/cricket/jmoore/kafka/connect/transforms/SchemaRegistryTransfer.java index 53ae097..2cf9e11 100644 --- a/src/main/java/cricket/jmoore/kafka/connect/transforms/SchemaRegistryTransfer.java +++ b/src/main/java/cricket/jmoore/kafka/connect/transforms/SchemaRegistryTransfer.java @@ -99,7 +99,7 @@ public void configure(Map props) { List sourceUrls = config.getList(ConfigName.SRC_SCHEMA_REGISTRY_URL); final Map sourceProps = new HashMap<>(); sourceProps.put(AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE, - config.getString(ConfigName.SRC_BASIC_AUTH_CREDENTIALS_SOURCE)); + "SRC_" + config.getString(ConfigName.SRC_BASIC_AUTH_CREDENTIALS_SOURCE)); sourceProps.put(AbstractKafkaAvroSerDeConfig.USER_INFO_CONFIG, config.getPassword(ConfigName.SRC_USER_INFO) .value()); @@ -107,7 +107,7 @@ public void configure(Map props) { List destUrls = config.getList(ConfigName.DEST_SCHEMA_REGISTRY_URL); final Map destProps = new HashMap<>(); destProps.put(AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE, - config.getString(ConfigName.DEST_BASIC_AUTH_CREDENTIALS_SOURCE)); + "DEST_" + config.getString(ConfigName.DEST_BASIC_AUTH_CREDENTIALS_SOURCE)); destProps.put(AbstractKafkaAvroSerDeConfig.USER_INFO_CONFIG, config.getPassword(ConfigName.DEST_USER_INFO) .value()); diff --git a/src/main/java/cricket/jmoore/security/basicauth/DestSaslBasicAuthCredentialProvider.java b/src/main/java/cricket/jmoore/security/basicauth/DestSaslBasicAuthCredentialProvider.java new file mode 100644 index 0000000..0ccb154 --- /dev/null +++ b/src/main/java/cricket/jmoore/security/basicauth/DestSaslBasicAuthCredentialProvider.java @@ -0,0 +1,11 @@ +/* Licensed under Apache-2.0 */ +package cricket.jmoore.security.basicauth; + +import io.confluent.kafka.schemaregistry.client.security.basicauth.SaslBasicAuthCredentialProvider; + +public class DestSaslBasicAuthCredentialProvider extends SaslBasicAuthCredentialProvider { + @Override + public String alias() { + return "DEST_SASL_INHERIT"; + } +} diff --git a/src/main/java/cricket/jmoore/security/basicauth/DestUrlBasicAuthCredentialProvider.java b/src/main/java/cricket/jmoore/security/basicauth/DestUrlBasicAuthCredentialProvider.java new file mode 100644 index 0000000..e2c0079 --- /dev/null +++ b/src/main/java/cricket/jmoore/security/basicauth/DestUrlBasicAuthCredentialProvider.java @@ -0,0 +1,11 @@ +/* Licensed under Apache-2.0 */ +package cricket.jmoore.security.basicauth; + +import io.confluent.kafka.schemaregistry.client.security.basicauth.UrlBasicAuthCredentialProvider; + +public class DestUrlBasicAuthCredentialProvider extends UrlBasicAuthCredentialProvider { + @Override + public String alias() { + return "DEST_URL"; + } +} diff --git a/src/main/java/cricket/jmoore/security/basicauth/DestUserInfoCredentialProvider.java b/src/main/java/cricket/jmoore/security/basicauth/DestUserInfoCredentialProvider.java new file mode 100644 index 0000000..d137273 --- /dev/null +++ b/src/main/java/cricket/jmoore/security/basicauth/DestUserInfoCredentialProvider.java @@ -0,0 +1,12 @@ +/* Licensed under Apache-2.0 */ +package cricket.jmoore.security.basicauth; + +import io.confluent.kafka.schemaregistry.client.security.basicauth.UserInfoCredentialProvider; + +public class DestUserInfoCredentialProvider extends UserInfoCredentialProvider +{ + @Override + public String alias() { + return "DEST_USER_INFO"; + } +} diff --git a/src/main/java/cricket/jmoore/security/basicauth/SrcSaslBasicAuthCredentialProvider.java b/src/main/java/cricket/jmoore/security/basicauth/SrcSaslBasicAuthCredentialProvider.java new file mode 100644 index 0000000..b5e225a --- /dev/null +++ b/src/main/java/cricket/jmoore/security/basicauth/SrcSaslBasicAuthCredentialProvider.java @@ -0,0 +1,11 @@ +/* Licensed under Apache-2.0 */ +package cricket.jmoore.security.basicauth; + +import io.confluent.kafka.schemaregistry.client.security.basicauth.SaslBasicAuthCredentialProvider; + +public class SrcSaslBasicAuthCredentialProvider extends SaslBasicAuthCredentialProvider { + @Override + public String alias() { + return "SRC_SASL_INHERIT"; + } +} diff --git a/src/main/java/cricket/jmoore/security/basicauth/SrcUrlBasicAuthCredentialProvider.java b/src/main/java/cricket/jmoore/security/basicauth/SrcUrlBasicAuthCredentialProvider.java new file mode 100644 index 0000000..4afc2ab --- /dev/null +++ b/src/main/java/cricket/jmoore/security/basicauth/SrcUrlBasicAuthCredentialProvider.java @@ -0,0 +1,11 @@ +/* Licensed under Apache-2.0 */ +package cricket.jmoore.security.basicauth; + +import io.confluent.kafka.schemaregistry.client.security.basicauth.UrlBasicAuthCredentialProvider; + +public class SrcUrlBasicAuthCredentialProvider extends UrlBasicAuthCredentialProvider { + @Override + public String alias() { + return "SRC_URL"; + } +} diff --git a/src/main/java/cricket/jmoore/security/basicauth/SrcUserInfoCredentialProvider.java b/src/main/java/cricket/jmoore/security/basicauth/SrcUserInfoCredentialProvider.java new file mode 100644 index 0000000..b43ba42 --- /dev/null +++ b/src/main/java/cricket/jmoore/security/basicauth/SrcUserInfoCredentialProvider.java @@ -0,0 +1,12 @@ +/* Licensed under Apache-2.0 */ +package cricket.jmoore.security.basicauth; + +import io.confluent.kafka.schemaregistry.client.security.basicauth.UserInfoCredentialProvider; + +public class SrcUserInfoCredentialProvider extends UserInfoCredentialProvider +{ + @Override + public String alias() { + return "SRC_USER_INFO"; + } +} diff --git a/src/main/resources/META-INF/services/io.confluent.kafka.schemaregistry.client.security.basicauth.BasicAuthCredentialProvider b/src/main/resources/META-INF/services/io.confluent.kafka.schemaregistry.client.security.basicauth.BasicAuthCredentialProvider new file mode 100644 index 0000000..e6a5c02 --- /dev/null +++ b/src/main/resources/META-INF/services/io.confluent.kafka.schemaregistry.client.security.basicauth.BasicAuthCredentialProvider @@ -0,0 +1,6 @@ +cricket.jmoore.security.basicauth.DestSaslBasicAuthCredentialProvider +cricket.jmoore.security.basicauth.DestUrlBasicAuthCredentialProvider +cricket.jmoore.security.basicauth.DestUserInfoCredentialProvider +cricket.jmoore.security.basicauth.SrcSaslBasicAuthCredentialProvider +cricket.jmoore.security.basicauth.SrcUrlBasicAuthCredentialProvider +cricket.jmoore.security.basicauth.SrcUserInfoCredentialProvider \ No newline at end of file diff --git a/src/test/java/cricket/jmoore/kafka/connect/transforms/Constants.java b/src/test/java/cricket/jmoore/kafka/connect/transforms/Constants.java index 93fb875..df81549 100644 --- a/src/test/java/cricket/jmoore/kafka/connect/transforms/Constants.java +++ b/src/test/java/cricket/jmoore/kafka/connect/transforms/Constants.java @@ -10,5 +10,7 @@ public interface Constants { public static final String URL_SOURCE = "URL"; - public static final String HTTP_AUTH_CREDENTIALS_FIXTURE = "username:password"; + public static final String HTTP_AUTH_SOURCE_CREDENTIALS_FIXTURE = "sourceuser:sourcepass"; + + public static final String HTTP_AUTH_DEST_CREDENTIALS_FIXTURE = "destuser:destpass"; } diff --git a/src/test/java/cricket/jmoore/kafka/connect/transforms/SchemaRegistryMock.java b/src/test/java/cricket/jmoore/kafka/connect/transforms/SchemaRegistryMock.java index 5d6e45b..1ff74ec 100644 --- a/src/test/java/cricket/jmoore/kafka/connect/transforms/SchemaRegistryMock.java +++ b/src/test/java/cricket/jmoore/kafka/connect/transforms/SchemaRegistryMock.java @@ -113,6 +113,7 @@ public enum Role { this.getConfigHandler)); private final SchemaRegistryClient schemaRegistryClient = new MockSchemaRegistryClient(); private final String basicAuthTag; + private final String basicAuthCredentials; private Function stubFor; private static final Logger log = LoggerFactory.getLogger(SchemaRegistryMock.class); @@ -123,6 +124,8 @@ public SchemaRegistryMock(Role role) { } this.basicAuthTag = (role == Role.SOURCE) ? Constants.USE_BASIC_AUTH_SOURCE_TAG : Constants.USE_BASIC_AUTH_DEST_TAG; + this.basicAuthCredentials = + (role == Role.SOURCE)? Constants.HTTP_AUTH_SOURCE_CREDENTIALS_FIXTURE : Constants.HTTP_AUTH_DEST_CREDENTIALS_FIXTURE; } @Override @@ -133,7 +136,7 @@ public void afterEach(final ExtensionContext context) { @Override public void beforeEach(final ExtensionContext context) { if (context.getTags().contains(this.basicAuthTag)) { - String[] userPass = Constants.HTTP_AUTH_CREDENTIALS_FIXTURE.split(":"); + final String[] userPass = this.basicAuthCredentials.split(":"); this.stubFor = (MappingBuilder mappingBuilder) -> this.mockSchemaRegistry.stubFor( mappingBuilder.withBasicAuth(userPass[0], userPass[1])); } else { diff --git a/src/test/java/cricket/jmoore/kafka/connect/transforms/TransformTest.java b/src/test/java/cricket/jmoore/kafka/connect/transforms/TransformTest.java index e5b48fc..aedea18 100644 --- a/src/test/java/cricket/jmoore/kafka/connect/transforms/TransformTest.java +++ b/src/test/java/cricket/jmoore/kafka/connect/transforms/TransformTest.java @@ -270,10 +270,23 @@ public void testValueSchemaLookupFailure() { assertThrows(ConnectException.class, () -> smt.apply(record)); } + @Test + @Tag(Constants.USE_BASIC_AUTH_SOURCE_TAG) + @Tag(Constants.USE_BASIC_AUTH_DEST_TAG) + public void testBothBasicHttpAuthUserInfo() throws IOException { + configure( + Constants.HTTP_AUTH_SOURCE_CREDENTIALS_FIXTURE, + Constants.HTTP_AUTH_DEST_CREDENTIALS_FIXTURE, + ExplicitAuthType.USER_INFO); + + this.passSimpleMessage(); + } + + @Test @Tag(Constants.USE_BASIC_AUTH_SOURCE_TAG) public void testSourceBasicHttpAuthUserInfo() throws IOException { - configure(Constants.HTTP_AUTH_CREDENTIALS_FIXTURE, null, ExplicitAuthType.USER_INFO); + configure(Constants.HTTP_AUTH_SOURCE_CREDENTIALS_FIXTURE, null, ExplicitAuthType.USER_INFO); this.passSimpleMessage(); } @@ -281,7 +294,7 @@ public void testSourceBasicHttpAuthUserInfo() throws IOException { @Test @Tag(Constants.USE_BASIC_AUTH_DEST_TAG) public void testDestinationBasicHttpAuthUserInfo() throws IOException { - configure(null, Constants.HTTP_AUTH_CREDENTIALS_FIXTURE, ExplicitAuthType.USER_INFO); + configure(null, Constants.HTTP_AUTH_DEST_CREDENTIALS_FIXTURE, ExplicitAuthType.USER_INFO); this.passSimpleMessage(); } @@ -289,7 +302,7 @@ public void testDestinationBasicHttpAuthUserInfo() throws IOException { @Test @Tag(Constants.USE_BASIC_AUTH_SOURCE_TAG) public void testSourceBasicHttpAuthUrl() throws IOException { - configure(Constants.HTTP_AUTH_CREDENTIALS_FIXTURE, null, ExplicitAuthType.URL); + configure(Constants.HTTP_AUTH_SOURCE_CREDENTIALS_FIXTURE, null, ExplicitAuthType.URL); this.passSimpleMessage(); } @@ -297,7 +310,7 @@ public void testSourceBasicHttpAuthUrl() throws IOException { @Test @Tag(Constants.USE_BASIC_AUTH_DEST_TAG) public void testDestinationBasicHttpAuthUrl() throws IOException { - configure(null, Constants.HTTP_AUTH_CREDENTIALS_FIXTURE, ExplicitAuthType.URL); + configure(null, Constants.HTTP_AUTH_DEST_CREDENTIALS_FIXTURE, ExplicitAuthType.URL); this.passSimpleMessage(); } @@ -305,7 +318,7 @@ public void testDestinationBasicHttpAuthUrl() throws IOException { @Test @Tag(Constants.USE_BASIC_AUTH_SOURCE_TAG) public void testSourceBasicHttpAuthNull() throws IOException { - configure(Constants.HTTP_AUTH_CREDENTIALS_FIXTURE, null, ExplicitAuthType.NULL); + configure(Constants.HTTP_AUTH_SOURCE_CREDENTIALS_FIXTURE, null, ExplicitAuthType.NULL); assertThrows(ConnectException.class, () -> this.passSimpleMessage()); } @@ -313,7 +326,7 @@ public void testSourceBasicHttpAuthNull() throws IOException { @Test @Tag(Constants.USE_BASIC_AUTH_DEST_TAG) public void testDestinationBasicHttpAuthNull() throws IOException { - configure(null, Constants.HTTP_AUTH_CREDENTIALS_FIXTURE, ExplicitAuthType.NULL); + configure(null, Constants.HTTP_AUTH_DEST_CREDENTIALS_FIXTURE, ExplicitAuthType.NULL); assertThrows(ConnectException.class, () -> this.passSimpleMessage()); } @@ -321,7 +334,7 @@ public void testDestinationBasicHttpAuthNull() throws IOException { @Test @Tag(Constants.USE_BASIC_AUTH_SOURCE_TAG) public void testSourceBasicHttpAuthImplicitDefault() throws IOException { - configure(Constants.HTTP_AUTH_CREDENTIALS_FIXTURE, null, null); + configure(Constants.HTTP_AUTH_SOURCE_CREDENTIALS_FIXTURE, null, null); this.passSimpleMessage(); } @@ -329,11 +342,27 @@ public void testSourceBasicHttpAuthImplicitDefault() throws IOException { @Test @Tag(Constants.USE_BASIC_AUTH_DEST_TAG) public void testDestinationBasicHttpAuthImplicitDefault() throws IOException { - configure(null, Constants.HTTP_AUTH_CREDENTIALS_FIXTURE, null); + configure(null, Constants.HTTP_AUTH_DEST_CREDENTIALS_FIXTURE, null); this.passSimpleMessage(); } + @Test + @Tag(Constants.USE_BASIC_AUTH_SOURCE_TAG) + public void testSourceBasicHttpAuthWrong() throws IOException { + configure(Constants.HTTP_AUTH_DEST_CREDENTIALS_FIXTURE, null, null); + + assertThrows(ConnectException.class, () -> this.passSimpleMessage()); + } + + @Test + @Tag(Constants.USE_BASIC_AUTH_DEST_TAG) + public void testDestinationBasicHttpAuthWrong() throws IOException { + configure(null, Constants.HTTP_AUTH_SOURCE_CREDENTIALS_FIXTURE, null); + + assertThrows(ConnectException.class, () -> this.passSimpleMessage()); + } + @Test @Tag(Constants.USE_BASIC_AUTH_SOURCE_TAG) public void testSourceBasicHttpAuthOmit() throws IOException {