Skip to content

Commit

Permalink
Cure HTTP Authentication Singleton's Password Overwrite (#19)
Browse files Browse the repository at this point in the history
* Enable Service Provider Relocation (#17) (#4)

Schema Registry Client's basic HTTP Authentication support is
implemented through a ServiceProvider.  Without handling the fact that
relocating the schema client also relocates the service implementations,
no implementations are found when the client attempts to find a strategy
that matches an authentication source type specified through
basic.auth.credentials.source...

* Improve test cases that distinguish between source and destination credentials

-- Split credentials fixture value pair into two distinct value pairs,
one for source registry clients, another for destinations.
-- Add test case verifying source authentication properties reach source
registry client
-- Add test case verifying destination authentication properties reach
destination registry client
-- Add test cases verifying expected exception when incorrect
credentials are passed to source and/or destination registry client.
-- Add test case using distinct credentials for both source and
destination in same execution and using same authentication source
strategy (currently does not pass!)

* Compensate for Basic HTTP Authentication's Singleton Implementation

The fact that Basic HTTP Authentication as implemented in the Kafka
Connect Client uses a singleton to hold configured credentials means
that if both the source and destination schema registries require basic
HTTP authentication and want to provide credentials via
`basic.auth.credentials.source`, the second set of credentials will
overwrite and replace the first`

Connect's three singletons for Basic HTTP Authentication are selected by
the same three key values used in `basic.auth.credentials.source` to
designate which represented algorithm to use.

This commit begins compensating for these singletons first by creating
and registering two additional copies of the Basic Auth singletons in
the SMT's code base.  One is intended for use by the source broker's
schema registry client, the other is for destinaation broker's registry.
The only intentional difference between what is built here and the
production Kafka Connect namespac is the addition of a short prefix to
distinguish `SRC_` from `DEST_`.

Now, when removing a prefix it uses to broker input to one adapter or
the orther, in addition to selecting which configuration hash
destrination to use, it also adds that prefix to the value it provides
for `basic.auth.credentials.source`.  As a result, the source and
destination schema registry clients will each now use a diffent
singleton to hold onto their credentials with two distinct singletons.

This work relies on addition of ServicesRouterTransformer to he maven
shade plugin that was recently reviewd and released.
  • Loading branch information
jheinnic authored and OneCricketeer committed Sep 23, 2019
1 parent 8b76edb commit cf5d375
Show file tree
Hide file tree
Showing 11 changed files with 120 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,15 @@ public void configure(Map<String, ?> props) {
List<String> sourceUrls = config.getList(ConfigName.SRC_SCHEMA_REGISTRY_URL);
final Map<String, String> sourceProps = new HashMap<>();
sourceProps.put(AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE,
config.getString(ConfigName.SRC_BASIC_AUTH_CREDENTIALS_SOURCE));
"SRC_" + config.getString(ConfigName.SRC_BASIC_AUTH_CREDENTIALS_SOURCE));
sourceProps.put(AbstractKafkaAvroSerDeConfig.USER_INFO_CONFIG,
config.getPassword(ConfigName.SRC_USER_INFO)
.value());

List<String> destUrls = config.getList(ConfigName.DEST_SCHEMA_REGISTRY_URL);
final Map<String, String> destProps = new HashMap<>();
destProps.put(AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE,
config.getString(ConfigName.DEST_BASIC_AUTH_CREDENTIALS_SOURCE));
"DEST_" + config.getString(ConfigName.DEST_BASIC_AUTH_CREDENTIALS_SOURCE));
destProps.put(AbstractKafkaAvroSerDeConfig.USER_INFO_CONFIG,
config.getPassword(ConfigName.DEST_USER_INFO)
.value());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/* Licensed under Apache-2.0 */
package cricket.jmoore.security.basicauth;

import io.confluent.kafka.schemaregistry.client.security.basicauth.SaslBasicAuthCredentialProvider;

public class DestSaslBasicAuthCredentialProvider extends SaslBasicAuthCredentialProvider {
@Override
public String alias() {
return "DEST_SASL_INHERIT";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/* Licensed under Apache-2.0 */
package cricket.jmoore.security.basicauth;

import io.confluent.kafka.schemaregistry.client.security.basicauth.UrlBasicAuthCredentialProvider;

public class DestUrlBasicAuthCredentialProvider extends UrlBasicAuthCredentialProvider {
@Override
public String alias() {
return "DEST_URL";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/* Licensed under Apache-2.0 */
package cricket.jmoore.security.basicauth;

import io.confluent.kafka.schemaregistry.client.security.basicauth.UserInfoCredentialProvider;

public class DestUserInfoCredentialProvider extends UserInfoCredentialProvider
{
@Override
public String alias() {
return "DEST_USER_INFO";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/* Licensed under Apache-2.0 */
package cricket.jmoore.security.basicauth;

import io.confluent.kafka.schemaregistry.client.security.basicauth.SaslBasicAuthCredentialProvider;

public class SrcSaslBasicAuthCredentialProvider extends SaslBasicAuthCredentialProvider {
@Override
public String alias() {
return "SRC_SASL_INHERIT";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/* Licensed under Apache-2.0 */
package cricket.jmoore.security.basicauth;

import io.confluent.kafka.schemaregistry.client.security.basicauth.UrlBasicAuthCredentialProvider;

public class SrcUrlBasicAuthCredentialProvider extends UrlBasicAuthCredentialProvider {
@Override
public String alias() {
return "SRC_URL";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/* Licensed under Apache-2.0 */
package cricket.jmoore.security.basicauth;

import io.confluent.kafka.schemaregistry.client.security.basicauth.UserInfoCredentialProvider;

public class SrcUserInfoCredentialProvider extends UserInfoCredentialProvider
{
@Override
public String alias() {
return "SRC_USER_INFO";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
cricket.jmoore.security.basicauth.DestSaslBasicAuthCredentialProvider
cricket.jmoore.security.basicauth.DestUrlBasicAuthCredentialProvider
cricket.jmoore.security.basicauth.DestUserInfoCredentialProvider
cricket.jmoore.security.basicauth.SrcSaslBasicAuthCredentialProvider
cricket.jmoore.security.basicauth.SrcUrlBasicAuthCredentialProvider
cricket.jmoore.security.basicauth.SrcUserInfoCredentialProvider
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,7 @@ public interface Constants {

public static final String URL_SOURCE = "URL";

public static final String HTTP_AUTH_CREDENTIALS_FIXTURE = "username:password";
public static final String HTTP_AUTH_SOURCE_CREDENTIALS_FIXTURE = "sourceuser:sourcepass";

public static final String HTTP_AUTH_DEST_CREDENTIALS_FIXTURE = "destuser:destpass";
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public enum Role {
this.getConfigHandler));
private final SchemaRegistryClient schemaRegistryClient = new MockSchemaRegistryClient();
private final String basicAuthTag;
private final String basicAuthCredentials;
private Function<MappingBuilder, StubMapping> stubFor;

private static final Logger log = LoggerFactory.getLogger(SchemaRegistryMock.class);
Expand All @@ -123,6 +124,8 @@ public SchemaRegistryMock(Role role) {
}

this.basicAuthTag = (role == Role.SOURCE) ? Constants.USE_BASIC_AUTH_SOURCE_TAG : Constants.USE_BASIC_AUTH_DEST_TAG;
this.basicAuthCredentials =
(role == Role.SOURCE)? Constants.HTTP_AUTH_SOURCE_CREDENTIALS_FIXTURE : Constants.HTTP_AUTH_DEST_CREDENTIALS_FIXTURE;
}

@Override
Expand All @@ -133,7 +136,7 @@ public void afterEach(final ExtensionContext context) {
@Override
public void beforeEach(final ExtensionContext context) {
if (context.getTags().contains(this.basicAuthTag)) {
String[] userPass = Constants.HTTP_AUTH_CREDENTIALS_FIXTURE.split(":");
final String[] userPass = this.basicAuthCredentials.split(":");
this.stubFor = (MappingBuilder mappingBuilder) -> this.mockSchemaRegistry.stubFor(
mappingBuilder.withBasicAuth(userPass[0], userPass[1]));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,70 +270,99 @@ public void testValueSchemaLookupFailure() {
assertThrows(ConnectException.class, () -> smt.apply(record));
}

@Test
@Tag(Constants.USE_BASIC_AUTH_SOURCE_TAG)
@Tag(Constants.USE_BASIC_AUTH_DEST_TAG)
public void testBothBasicHttpAuthUserInfo() throws IOException {
configure(
Constants.HTTP_AUTH_SOURCE_CREDENTIALS_FIXTURE,
Constants.HTTP_AUTH_DEST_CREDENTIALS_FIXTURE,
ExplicitAuthType.USER_INFO);

this.passSimpleMessage();
}


@Test
@Tag(Constants.USE_BASIC_AUTH_SOURCE_TAG)
public void testSourceBasicHttpAuthUserInfo() throws IOException {
configure(Constants.HTTP_AUTH_CREDENTIALS_FIXTURE, null, ExplicitAuthType.USER_INFO);
configure(Constants.HTTP_AUTH_SOURCE_CREDENTIALS_FIXTURE, null, ExplicitAuthType.USER_INFO);

this.passSimpleMessage();
}

@Test
@Tag(Constants.USE_BASIC_AUTH_DEST_TAG)
public void testDestinationBasicHttpAuthUserInfo() throws IOException {
configure(null, Constants.HTTP_AUTH_CREDENTIALS_FIXTURE, ExplicitAuthType.USER_INFO);
configure(null, Constants.HTTP_AUTH_DEST_CREDENTIALS_FIXTURE, ExplicitAuthType.USER_INFO);

this.passSimpleMessage();
}

@Test
@Tag(Constants.USE_BASIC_AUTH_SOURCE_TAG)
public void testSourceBasicHttpAuthUrl() throws IOException {
configure(Constants.HTTP_AUTH_CREDENTIALS_FIXTURE, null, ExplicitAuthType.URL);
configure(Constants.HTTP_AUTH_SOURCE_CREDENTIALS_FIXTURE, null, ExplicitAuthType.URL);

this.passSimpleMessage();
}

@Test
@Tag(Constants.USE_BASIC_AUTH_DEST_TAG)
public void testDestinationBasicHttpAuthUrl() throws IOException {
configure(null, Constants.HTTP_AUTH_CREDENTIALS_FIXTURE, ExplicitAuthType.URL);
configure(null, Constants.HTTP_AUTH_DEST_CREDENTIALS_FIXTURE, ExplicitAuthType.URL);

this.passSimpleMessage();
}

@Test
@Tag(Constants.USE_BASIC_AUTH_SOURCE_TAG)
public void testSourceBasicHttpAuthNull() throws IOException {
configure(Constants.HTTP_AUTH_CREDENTIALS_FIXTURE, null, ExplicitAuthType.NULL);
configure(Constants.HTTP_AUTH_SOURCE_CREDENTIALS_FIXTURE, null, ExplicitAuthType.NULL);

assertThrows(ConnectException.class, () -> this.passSimpleMessage());
}

@Test
@Tag(Constants.USE_BASIC_AUTH_DEST_TAG)
public void testDestinationBasicHttpAuthNull() throws IOException {
configure(null, Constants.HTTP_AUTH_CREDENTIALS_FIXTURE, ExplicitAuthType.NULL);
configure(null, Constants.HTTP_AUTH_DEST_CREDENTIALS_FIXTURE, ExplicitAuthType.NULL);

assertThrows(ConnectException.class, () -> this.passSimpleMessage());
}

@Test
@Tag(Constants.USE_BASIC_AUTH_SOURCE_TAG)
public void testSourceBasicHttpAuthImplicitDefault() throws IOException {
configure(Constants.HTTP_AUTH_CREDENTIALS_FIXTURE, null, null);
configure(Constants.HTTP_AUTH_SOURCE_CREDENTIALS_FIXTURE, null, null);

this.passSimpleMessage();
}

@Test
@Tag(Constants.USE_BASIC_AUTH_DEST_TAG)
public void testDestinationBasicHttpAuthImplicitDefault() throws IOException {
configure(null, Constants.HTTP_AUTH_CREDENTIALS_FIXTURE, null);
configure(null, Constants.HTTP_AUTH_DEST_CREDENTIALS_FIXTURE, null);

this.passSimpleMessage();
}

@Test
@Tag(Constants.USE_BASIC_AUTH_SOURCE_TAG)
public void testSourceBasicHttpAuthWrong() throws IOException {
configure(Constants.HTTP_AUTH_DEST_CREDENTIALS_FIXTURE, null, null);

assertThrows(ConnectException.class, () -> this.passSimpleMessage());
}

@Test
@Tag(Constants.USE_BASIC_AUTH_DEST_TAG)
public void testDestinationBasicHttpAuthWrong() throws IOException {
configure(null, Constants.HTTP_AUTH_SOURCE_CREDENTIALS_FIXTURE, null);

assertThrows(ConnectException.class, () -> this.passSimpleMessage());
}

@Test
@Tag(Constants.USE_BASIC_AUTH_SOURCE_TAG)
public void testSourceBasicHttpAuthOmit() throws IOException {
Expand Down

0 comments on commit cf5d375

Please sign in to comment.