Skip to content

Commit

Permalink
Merge pull request #54 from b3rnh8rd/develop
Browse files Browse the repository at this point in the history
enabled Token Auth for SR
  • Loading branch information
patschuh authored Oct 25, 2023
2 parents d80d9f8 + 199119f commit 7f4f7c9
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 36 deletions.
27 changes: 15 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,21 +110,24 @@ This can also be combined with given trust and keystore configuration

###### Example with Schema Registry with HTTPS and Basic Auth

The http**s** and 'sslEnabled' is important if you want to use truststore and/or keystore otherwise those attributes are ignored and now sslContext is provided to Schema Registry client

you can use only Basic Auth if you SR is only protected with basic auth, you can use only keystore+truststore if your SR is protected with mTLS or you can use both settings in parallel.
The http**s** and 'sslEnabled' is important if you want to use truststore and/or keystore otherwise those attributes are ignored and now sslContext is provided to Schema Registry client.

You can use only Basic Auth if youy SR is only protected with basic auth, you can use Token Auth if your SR is protected with an OAUTH Token, you can use only keystore+truststore if your SR is protected with mTLS or you can use both settings in parallel.
schemaRegistryBasicAuthUserInfo is deprecated since token auth is supported in addition to basic auth.
There is a schemaRegistryAuthMode property with possible values NONE, BASIC or TOKEN and schemaRegistryAuthConfig property with either basic auth credentials or OAuthToken.
```
{
....
"schemaRegistry": "https://myschemaregistry:8081",
"schemaRegistryBasicAuthUserInfo": "<BasicAuthUser>:<BasicAuthPW>",
...
"sslEnabled": true,
"keyStoreLocation": "mykeystore.jks",
"keyStorePassword": "mykeystorepw",
"trustStoreLocation": "mytruststore.jks",
"trustStorePassword": "mykeystorepw"
....
"schemaRegistry": "https://myschemaregistry:8081",
deprecated-> "schemaRegistryBasicAuthUserInfo": "<BasicAuthUser>:<BasicAuthPW>",
"schemaRegistryAuthMode": "NONE|BASIC|TOKEN",
"schemaRegistryAuthConfig": "<BasicAuthUser>:<BasicAuthPW>|<OAuthToken>:",
...
"sslEnabled": true,
"keyStoreLocation": "mykeystore.jks",
"keyStorePassword": "mykeystorepw",
"trustStoreLocation": "mytruststore.jks",
"trustStorePassword": "mykeystorepw"
}
```

Expand Down
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ plugins {
}

group = 'at.esque.kafka'
version = '2.7.3'
version = '2.8.0'

repositories {
mavenCentral()
Expand Down Expand Up @@ -54,7 +54,7 @@ dependencies {
implementation 'com.squareup.okhttp3:okhttp:4.9.3'
implementation 'com.flipkart.zjsonpatch:zjsonpatch:0.4.12'
implementation 'tech.allegro.schema.json2avro:converter:0.2.15'
implementation 'software.amazon.msk:aws-msk-iam-auth:1.1.5'
implementation 'software.amazon.msk:aws-msk-iam-auth:1.1.9'

testImplementation 'junit:junit:4.13.2'
testImplementation 'org.springframework.kafka:spring-kafka-test:2.4.13.RELEASE'
Expand Down
62 changes: 57 additions & 5 deletions src/main/java/at/esque/kafka/cluster/ClusterConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,20 @@
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import javafx.beans.property.BooleanProperty;
import javafx.beans.property.SimpleBooleanProperty;
import javafx.beans.property.SimpleStringProperty;
import javafx.beans.property.StringProperty;
import javafx.beans.property.*;
import javafx.collections.FXCollections;

import java.util.Arrays;

@JsonIgnoreProperties(ignoreUnknown = true)
public class ClusterConfig {
private StringProperty identifier = new SimpleStringProperty();
private StringProperty bootStrapServers = new SimpleStringProperty();
private StringProperty schemaRegistry = new SimpleStringProperty();
private StringProperty schemaRegistryBasicAuthUserInfo = new SimpleStringProperty();
private StringProperty schemaRegistryAuthConfig = new SimpleStringProperty();
private ListProperty<SchemaRegistryAuthMode> schemaRegistryAuthModes = new SimpleListProperty<>(FXCollections.observableArrayList(Arrays.asList(SchemaRegistryAuthMode.NONE, SchemaRegistryAuthMode.BASIC, SchemaRegistryAuthMode.TOKEN)));
private ObjectProperty<SchemaRegistryAuthMode> schemaRegistryAuthMode = new SimpleObjectProperty<>(SchemaRegistryAuthMode.NONE);
private BooleanProperty schemaRegistryUseSsl = new SimpleBooleanProperty();
private BooleanProperty sslEnabled = new SimpleBooleanProperty();
private BooleanProperty certPathValidationSuppressed = new SimpleBooleanProperty();
Expand All @@ -34,16 +37,24 @@ public class ClusterConfig {
public ClusterConfig() {
}

public enum SchemaRegistryAuthMode {
NONE,
BASIC,
TOKEN
}

public ClusterConfig(ClusterConfig existingConfig) {
update(existingConfig);
}

public void update(ClusterConfig existingConfig) {
if(existingConfig != null) {
if (existingConfig != null) {
this.setIdentifier(existingConfig.getIdentifier());
this.setBootStrapServers(existingConfig.getBootStrapServers());
this.setSchemaRegistry(existingConfig.getSchemaRegistry());
this.setSchemaRegistryBasicAuthUserInfo(existingConfig.getSchemaRegistryBasicAuthUserInfo());
this.setSchemaRegistryAuthConfig(existingConfig.getSchemaRegistryAuthConfig());
this.setSchemaRegistryAuthMode(existingConfig.getSchemaRegistryAuthMode());
this.setSchemaRegistryUseSsl(existingConfig.isSchemaRegistryUseSsl());
this.setSchemaRegistrySuppressCertPathValidation(existingConfig.isSchemaRegistrySuppressCertPathValidation());
this.setSslEnabled(existingConfig.isSslEnabled());
Expand Down Expand Up @@ -119,6 +130,7 @@ public void setSslEnabled(boolean sslEnabled) {
public boolean isSchemaRegistrySuppressCertPathValidation() {
return certPathValidationSuppressed.get();
}

public BooleanProperty suppressCertPathValidation() {
return certPathValidationSuppressed;
}
Expand Down Expand Up @@ -232,19 +244,59 @@ public void setSaslJaasConfig(String saslJaasConfig) {
this.saslJaasConfig.set(saslJaasConfig);
}

/**
* Deprecated use schemaRegistryAuthConfig instead for basic and token auth
*/
@Deprecated
@JsonProperty("schemaRegistryBasicAuthUserInfo")
public String getSchemaRegistryBasicAuthUserInfo() {
return schemaRegistryBasicAuthUserInfo.get();
}

/**
* Deprecated use schemaRegistryAuthConfig instead for basic and token auth
*/
public StringProperty schemaRegistryBasicAuthUserInfoProperty() {
return schemaRegistryBasicAuthUserInfo;
}

/**
* Deprecated use schemaRegistryAuthConfig instead for basic and token auth
*/
public void setSchemaRegistryBasicAuthUserInfo(String schemaRegistryBasicAuthUserInfo) {
this.schemaRegistryBasicAuthUserInfo.set(schemaRegistryBasicAuthUserInfo);
}

@JsonProperty("schemaRegistryAuthMode")
public SchemaRegistryAuthMode getSchemaRegistryAuthMode() {
return schemaRegistryAuthMode.get();
}

public ObjectProperty<SchemaRegistryAuthMode> schemaRegistryAuthModeProperty() {
return schemaRegistryAuthMode;
}

public void setSchemaRegistryAuthMode(SchemaRegistryAuthMode schemaRegistryAuthMode) {
this.schemaRegistryAuthMode.set(schemaRegistryAuthMode);
}

@JsonProperty("schemaRegistryAuthConfig")
public String getSchemaRegistryAuthConfig() {
return schemaRegistryAuthConfig.get();
}

public StringProperty schemaRegistryAuthConfigProperty() {
return schemaRegistryAuthConfig;
}

public void setSchemaRegistryAuthConfig(String schemaRegistryAuthConfig) {
this.schemaRegistryAuthConfig.set(schemaRegistryAuthConfig);
}

public ListProperty<SchemaRegistryAuthMode> schemaRegistryAuthModesProperty() {
return schemaRegistryAuthModes;
}

@JsonProperty("schemaRegistryUseSsl")
public boolean isSchemaRegistryUseSsl() {
return schemaRegistryUseSsl.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ public static SSLSocketFactory buildSSlSocketFactory(ClusterConfig clusterConfig
KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
ks.load(new FileInputStream(sslProperties.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)), sslProperties.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG).toCharArray());
kmf.init(ks, sslProperties.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG).toCharArray());
}else{
kmf.init(null, null);
}

if (clusterConfig.isSchemaRegistrySuppressCertPathValidation()) {
sc.init(kmf.getKeyManagers(), UNQUESTIONING_TRUST_MANAGER, null);
} else {
Expand Down
37 changes: 25 additions & 12 deletions src/main/java/at/esque/kafka/dialogs/ClusterConfigDialog.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.dlsc.formsfx.model.structure.Group;
import com.dlsc.formsfx.model.util.BindingMode;
import com.dlsc.formsfx.view.renderer.FormRenderer;
import com.dlsc.formsfx.view.util.ColSpan;
import javafx.scene.Node;
import javafx.scene.control.ButtonBar;
import javafx.scene.control.ButtonType;
Expand All @@ -20,7 +21,8 @@ public class ClusterConfigDialog {
public static final String LABEL_IDENTIFIER = "Identifier";
public static final String LABEL_BOOTSTRAP_SERVERS = "Bootstrap-Servers";
public static final String LABEL_SCHEMA_REGISTRY_URL = "Schema Registry URL";
public static final String LABEL_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO = "Schema Registry Basic Auth User Info";
public static final String LABEL_SCHEMA_REGISTRY_AUTH_USER_INFO = "Schema Registry Auth Info";
public static final String LABEL_SCHEMA_REGISTRY_AUTH_MODE = "Schema Registry Auth Mode";
public static final String LABEL_ENABLE_SSL = "Enable SSL";
public static final String LABEL_KEY_STORE_LOCATION = "Key Store Location";
public static final String LABEL_KEY_STORE_PASSWORD = "Key Store Password";
Expand Down Expand Up @@ -73,20 +75,28 @@ public static Optional<ClusterConfig> show(ClusterConfig existingConfig) {
.placeholder(LABEL_SCHEMA_REGISTRY_URL)
.format(new NullFormatStringConverter())
.bind(copy.schemaRegistryProperty()),
Field.ofStringType(copy.getSchemaRegistryBasicAuthUserInfo() == null ? "" : copy.getSchemaRegistryBasicAuthUserInfo())
.label(LABEL_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO)
.tooltip(LABEL_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO)
.placeholder(LABEL_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO)
Field.ofSingleSelectionType(copy.schemaRegistryAuthModesProperty())
.label(LABEL_SCHEMA_REGISTRY_AUTH_MODE)
.tooltip(LABEL_SCHEMA_REGISTRY_AUTH_MODE)
.bind(copy.schemaRegistryAuthModesProperty(),copy.schemaRegistryAuthModeProperty())
.span(ColSpan.HALF),
Field.ofStringType(copy.getSchemaRegistryAuthConfig() == null ? "" : copy.getSchemaRegistryAuthConfig())
.label(LABEL_SCHEMA_REGISTRY_AUTH_USER_INFO)
.tooltip(LABEL_SCHEMA_REGISTRY_AUTH_USER_INFO)
.placeholder(LABEL_SCHEMA_REGISTRY_AUTH_USER_INFO)
.format(new NullFormatStringConverter())
.bind(copy.schemaRegistryBasicAuthUserInfoProperty()),
.bind(copy.schemaRegistryAuthConfigProperty())
.span(ColSpan.HALF),
Field.ofBooleanType(copy.isSchemaRegistryUseSsl())
.label(LABEL_USE_SSL_CONFIGURATION)
.tooltip(LABEL_USE_SSL_CONFIGURATION)
.bind(copy.schemaRegistryUseSslProperty()),
.bind(copy.schemaRegistryUseSslProperty())
.span(ColSpan.HALF),
Field.ofBooleanType(copy.isSchemaRegistrySuppressCertPathValidation())
.label(LABEL_SUPPRESS_CERT_PATH_VALIDATION)
.tooltip(LABEL_SUPPRESS_CERT_PATH_VALIDATION)
.bind(copy.suppressCertPathValidation())
.span(ColSpan.HALF)
),
Group.of(
Field.ofStringType(copy.getkafkaConnectUrl()==null?"":copy.getkafkaConnectUrl())
Expand Down Expand Up @@ -116,11 +126,13 @@ public static Optional<ClusterConfig> show(ClusterConfig existingConfig) {
Field.ofBooleanType(copy.isSslEnabled())
.label(LABEL_ENABLE_SSL)
.tooltip(LABEL_ENABLE_SSL)
.bind(copy.sslEnabledProperty()),
.bind(copy.sslEnabledProperty())
.span(ColSpan.HALF),
Field.ofBooleanType(copy.issuppressSslEndPointIdentification())
.label(LABEL_SUPPRESS_SSL_ENDPOINT_IDENTIFICATION)
.tooltip(LABEL_SUPPRESS_SSL_ENDPOINT_IDENTIFICATION)
.bind(copy.suppressSslEndPointIdentificationProperty()),
.bind(copy.suppressSslEndPointIdentificationProperty())
.span(ColSpan.HALF),
Field.ofStringType(copy.getKeyStoreLocation()==null?"":copy.getKeyStoreLocation())
.label(LABEL_KEY_STORE_LOCATION)
.tooltip(LABEL_KEY_STORE_LOCATION)
Expand Down Expand Up @@ -152,13 +164,15 @@ public static Optional<ClusterConfig> show(ClusterConfig existingConfig) {
.tooltip(LABEL_SASL_SECURITY_PROTOCOL)
.placeholder(LABEL_SASL_SECURITY_PROTOCOL)
.format(new NullFormatStringConverter())
.bind(copy.saslSecurityProtocolProperty()),
.bind(copy.saslSecurityProtocolProperty())
.span(ColSpan.HALF),
Field.ofStringType(copy.getSaslMechanism()==null?"":copy.getSaslMechanism())
.label(LABEL_SASL_MECHANISM)
.tooltip(LABEL_SASL_MECHANISM)
.placeholder(LABEL_SASL_MECHANISM)
.format(new NullFormatStringConverter())
.bind(copy.saslMechanismProperty()),
.bind(copy.saslMechanismProperty())
.span(ColSpan.HALF),
Field.ofStringType(copy.getSaslJaasConfig()==null?"":copy.getSaslJaasConfig())
.label(LABEL_SASL_JAAS_CONFIG)
.tooltip(LABEL_SASL_JAAS_CONFIG)
Expand All @@ -169,7 +183,6 @@ public static Optional<ClusterConfig> show(ClusterConfig existingConfig) {
.label(LABEL_SASL_CLIENT_CALLBACK_HANDLER_CLASS)
.tooltip(LABEL_SASL_CLIENT_CALLBACK_HANDLER_CLASS)
.placeholder(LABEL_SASL_CLIENT_CALLBACK_HANDLER_CLASS)
.valueDescription(String.format("Is used f.e. %s=AWS_MSK_IAM, %s=software.amazon.msk.auth.iam.IAMClientCallbackHandler", LABEL_SASL_MECHANISM,LABEL_SASL_CLIENT_CALLBACK_HANDLER_CLASS))
.format(new NullFormatStringConverter())
.bind(copy.saslClientCallbackHandlerClassProperty())
)
Expand Down
34 changes: 31 additions & 3 deletions src/main/java/at/esque/kafka/handlers/ConfigHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -214,6 +215,7 @@ public ClusterConfigs loadOrCreateConfigs() {
} else if (clusterConfig.exists()) {
try {
clusterConfigs = objectMapper.readValue(clusterConfig, ClusterConfigs.class);
maybeMigrateDeprecatedConfig(clusterConfigs);
return clusterConfigs;
} catch (IOException e) {
ErrorAlert.show(e);
Expand All @@ -232,11 +234,34 @@ public ClusterConfigs loadOrCreateConfigs() {
return clusterConfigs;
}

public void saveConfigs() {
public void maybeMigrateDeprecatedConfig(ClusterConfigs clusterConfigs) {
AtomicBoolean updated = new AtomicBoolean(false);
clusterConfigs.getClusterConfigs().forEach(config -> {
var schemaRegistryBasicAuthUserInfo = config.getSchemaRegistryBasicAuthUserInfo();
if (StringUtils.isNotBlank(schemaRegistryBasicAuthUserInfo)) {
config.setSchemaRegistryAuthMode(ClusterConfig.SchemaRegistryAuthMode.BASIC);
config.setSchemaRegistryAuthConfig(schemaRegistryBasicAuthUserInfo);
config.setSchemaRegistryBasicAuthUserInfo(null);
updated.set(true);
}
});
if (updated.get()) {
if (saveConfigs()) {
LOGGER.info("deprecated property migration sucessful!");
} else {
LOGGER.warn("deprecated property migration failed!");
}
}
}


public boolean saveConfigs() {
try {
objectMapper.writeValue(clusterConfig, clusterConfigs);
return true;
} catch (IOException e) {
ErrorAlert.show(e);
return false;
}
}

Expand Down Expand Up @@ -318,9 +343,12 @@ public Map<String, String> getSaslProperties(ClusterConfig config) {
public Map<String, ?> getSchemaRegistryAuthProperties(ClusterConfig config) {
Map<String, String> props = new HashMap<>();

if (StringUtils.isNoneEmpty(config.getSchemaRegistryBasicAuthUserInfo())) {
if (ClusterConfig.SchemaRegistryAuthMode.BASIC.equals(config.getSchemaRegistryAuthMode())) {
props.put(SchemaRegistryClientConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO");
props.put(SchemaRegistryClientConfig.CLIENT_NAMESPACE + SchemaRegistryClientConfig.USER_INFO_CONFIG, config.getSchemaRegistryBasicAuthUserInfo());
props.put(SchemaRegistryClientConfig.CLIENT_NAMESPACE + SchemaRegistryClientConfig.USER_INFO_CONFIG, config.getSchemaRegistryAuthConfig());
} else if (ClusterConfig.SchemaRegistryAuthMode.TOKEN.equals(config.getSchemaRegistryAuthMode())) {
props.put(SchemaRegistryClientConfig.BEARER_AUTH_CREDENTIALS_SOURCE, "STATIC_TOKEN");
props.put(SchemaRegistryClientConfig.BEARER_AUTH_TOKEN_CONFIG, config.getSchemaRegistryAuthConfig());
}

return props;
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/at/esque/kafka/handlers/VersionInfoHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ private Map<String, Object> checkLatestVersion() {


} catch (Exception e) {
Platform.runLater(() -> ErrorAlert.show(e));
Platform.runLater(() -> ErrorAlert.show("Update Check failed", "Failed to check for availabe Updates", e.getMessage(), e, null, false));
}
} else {
return (Map<String, Object>) versionCheckContent.get("release");
Expand All @@ -133,7 +133,7 @@ public void showDialogIfUpdateIsAvailable(HostServices hostServices) {
if (openInBrowser) {
try {
hostServices.showDocument(updateInfo.getReleasePage());
}catch (Exception e){
} catch (Exception e) {
ErrorAlert.show(e);
}
}
Expand Down

0 comments on commit 7f4f7c9

Please sign in to comment.