From f712e960076f0d870fa4698c1c83372794361f1e Mon Sep 17 00:00:00 2001 From: Parag Badani Date: Mon, 27 Jan 2025 15:36:34 +0530 Subject: [PATCH 1/5] KSQL-12955 | Introduce KsqlResourceExtensions for plugins configured externally. --- .../io/confluent/ksql/util/KsqlConfig.java | 4 ++ .../io/confluent/ksql/util/KsqlConstants.java | 3 ++ .../rest/extensions/KsqlResourceContext.java | 26 +++++++++++ .../extensions/KsqlResourceContextImpl.java | 40 +++++++++++++++++ .../extensions/KsqlResourceExtension.java | 25 +++++++++++ .../ksql/rest/server/KsqlRestApplication.java | 37 +++++++++++++++- .../ksql/rest/server/KsqlRestConfig.java | 20 +++++++++ .../ksql/rest/server/KsqlServerMain.java | 2 + .../rest/server/KsqlRestApplicationTest.java | 11 +++++ .../ksql/rest/server/KsqlRestConfigTest.java | 44 +++++++++++++++++++ .../extensions/DummyResourceExtension.java | 35 +++++++++++++++ 11 files changed, 245 insertions(+), 2 deletions(-) create mode 100644 ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/extensions/KsqlResourceContext.java create mode 100644 ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/extensions/KsqlResourceContextImpl.java create mode 100644 ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/extensions/KsqlResourceExtension.java create mode 100644 ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/extensions/DummyResourceExtension.java diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java index 8466754d6adb..2e9acc4c5e31 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java @@ -1792,6 +1792,10 @@ public Map getConsumerClientConfigProps() { return Collections.unmodifiableMap(map); } + public boolean enableFips() { + return getBoolean(ConfluentConfigs.ENABLE_FIPS_CONFIG); + } + public Map addConfluentMetricsContextConfigsKafka( final Map props ) { diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConstants.java b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConstants.java index 98eafd873853..df0c5364c47a 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConstants.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConstants.java @@ -52,6 +52,9 @@ private KsqlConstants() { public static final String KSQL_QUERY_PLAN_TYPE_TAG = "query_plan_type"; public static final String KSQL_QUERY_ROUTING_TYPE_TAG = "query_routing_type"; + public static final String FIPS_VALIDATOR + = "io.confluent.ksql.security.KsqlFipsResourceExtension"; + public enum KsqlQueryType { PERSISTENT, PUSH, diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/extensions/KsqlResourceContext.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/extensions/KsqlResourceContext.java new file mode 100644 index 000000000000..306315d37a4c --- /dev/null +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/extensions/KsqlResourceContext.java @@ -0,0 +1,26 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.extensions; + +import io.confluent.ksql.rest.server.KsqlRestConfig; +import io.confluent.ksql.util.KsqlConfig; + +public interface KsqlResourceContext { + + KsqlConfig ksqlConfig(); + + KsqlRestConfig ksqlRestConfig(); +} diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/extensions/KsqlResourceContextImpl.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/extensions/KsqlResourceContextImpl.java new file mode 100644 index 000000000000..f06311b10f1d --- /dev/null +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/extensions/KsqlResourceContextImpl.java @@ -0,0 +1,40 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.extensions; + +import io.confluent.ksql.rest.server.KsqlRestConfig; +import io.confluent.ksql.util.KsqlConfig; + +public class KsqlResourceContextImpl implements KsqlResourceContext { + + private final KsqlConfig ksqlConfig; + private final KsqlRestConfig ksqlRestConfig; + + public KsqlResourceContextImpl(KsqlConfig ksqlConfig, KsqlRestConfig restConfig) { + this.ksqlConfig = ksqlConfig; + this.ksqlRestConfig = restConfig; + } + + @Override + public KsqlConfig ksqlConfig() { + return ksqlConfig; + } + + @Override + public KsqlRestConfig ksqlRestConfig() { + return ksqlRestConfig; + } +} diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/extensions/KsqlResourceExtension.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/extensions/KsqlResourceExtension.java new file mode 100644 index 000000000000..1c96ccc18695 --- /dev/null +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/extensions/KsqlResourceExtension.java @@ -0,0 +1,25 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.extensions; + +import io.confluent.ksql.util.KsqlException; + +import java.io.Closeable; + +public interface KsqlResourceExtension extends Closeable { + + void register(KsqlResourceContext ksqlResourceContext) throws KsqlException; +} diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index 639ceff83222..12e55e11d658 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -60,6 +60,9 @@ import io.confluent.ksql.rest.entity.KsqlEntityList; import io.confluent.ksql.rest.entity.SourceInfo; import io.confluent.ksql.rest.entity.StreamsList; +import io.confluent.ksql.rest.extensions.KsqlResourceContext; +import io.confluent.ksql.rest.extensions.KsqlResourceContextImpl; +import io.confluent.ksql.rest.extensions.KsqlResourceExtension; import io.confluent.ksql.rest.server.HeartbeatAgent.Builder; import io.confluent.ksql.rest.server.computation.Command; import io.confluent.ksql.rest.server.computation.CommandRunner; @@ -123,8 +126,10 @@ import io.vertx.core.net.SocketAddress; import io.vertx.ext.dropwizard.DropwizardMetricsOptions; import io.vertx.ext.dropwizard.Match; + import java.io.Console; import java.io.File; +import java.io.IOException; import java.io.OutputStreamWriter; import java.io.PrintWriter; import java.net.MalformedURLException; @@ -211,6 +216,7 @@ public final class KsqlRestApplication implements Executable { private KafkaTopicClient internalTopicClient; private final Instant ksqlRestAppStartTime; private final KsqlRestApplicationMetrics restApplicationMetrics; + private final List ksqlResourceExtensions; // The startup thread that can be interrupted if necessary during shutdown. This should only // happen if startup hangs. @@ -279,7 +285,8 @@ public static SourceName getCommandsStreamName() { this.vertx = requireNonNull(vertx, "vertx"); this.denyListPropertyValidator = requireNonNull(denyListPropertyValidator, "denyListPropertyValidator"); - + this.ksqlResourceExtensions = new KsqlRestConfig(ksqlConfigNoPort.originals()) + .getKsqlResourceExtensions(); this.serverInfoResource = new ServerInfoResource(serviceContext, ksqlConfigNoPort, commandRunner); if (heartbeatAgent.isPresent()) { @@ -313,6 +320,24 @@ public static SourceName getCommandsStreamName() { @Override public void startAsync() { log.debug("Starting the ksqlDB API server"); + + KsqlResourceContext ksqlResourceContext + = new KsqlResourceContextImpl(ksqlConfigNoPort, restConfig); + + boolean isFipsValidatorConfigured = false; + for (KsqlResourceExtension resourceExtension : ksqlResourceExtensions) { + resourceExtension.register(ksqlResourceContext); + if (KsqlConstants.FIPS_VALIDATOR + .equals(resourceExtension.getClass().getCanonicalName())) { + isFipsValidatorConfigured = true; + } + } + if (ksqlConfigNoPort.enableFips() && !isFipsValidatorConfigured) { + throw new KsqlException("Error enabling the FIPS resource extension: `enable.fips` is set to true but the " + + "`ksql.resource.extension.class` config is either not configured or does not contain \"" + + KsqlConstants.FIPS_VALIDATOR + "\""); + } + this.serverMetadataResource = ServerMetadataResource.create(serviceContext, ksqlConfigNoPort); final StatementParser statementParser = new StatementParser(ksqlEngine); final Optional authorizationValidator = @@ -509,6 +534,14 @@ public void shutdown() { } }); + ksqlResourceExtensions.forEach(resourceExtension -> { + try { + resourceExtension.close(); + } catch (IOException e) { + log.error("Exception while closing resource extension", e); + } + }); + try { ksqlEngine.close(); } catch (final Exception e) { @@ -612,9 +645,9 @@ public static KsqlRestApplication buildApplication( final FunctionRegistry functionRegistry, final Instant ksqlRestAppStartTime ) { - final Map updatedRestProps = restConfig.getOriginals(); final KsqlConfig ksqlConfig = new KsqlConfig(restConfig.getKsqlConfigProperties()); + final Vertx vertx = Vertx.vertx( new VertxOptions() .setMaxWorkerExecuteTimeUnit(TimeUnit.MILLISECONDS) diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java index b4bae18b040c..e6daa4815b69 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java @@ -24,6 +24,7 @@ import io.confluent.ksql.configdef.ConfigValidators; import io.confluent.ksql.rest.DefaultErrorMessages; import io.confluent.ksql.rest.ErrorMessages; +import io.confluent.ksql.rest.extensions.KsqlResourceExtension; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.KsqlServerException; @@ -444,6 +445,12 @@ public class KsqlRestConfig extends AbstractConfig { KSQL_COMMAND_TOPIC_MIGRATION_MIGRATING ); + public static final String KSQL_RESOURCE_EXTENSION = + "ksql.resource.extension.class"; + private static final String KSQL_RESOURCE_EXTENSION_DEFAULT = ""; + private static final String KSQL_RESOURCE_EXTENSION_DOC = + "A list of KsqlResourceExtension implementations to register with ksqlDB server."; + private static final ConfigDef CONFIG_DEF; static { @@ -617,6 +624,12 @@ public class KsqlRestConfig extends AbstractConfig { ConfigValidators.nullsAllowed(ConfigValidators.validUrl()), Importance.HIGH, INTERNAL_LISTENER_DOC + ).define( + KSQL_RESOURCE_EXTENSION, + Type.LIST, + KSQL_RESOURCE_EXTENSION_DEFAULT, + Importance.MEDIUM, + KSQL_RESOURCE_EXTENSION_DOC ).define( STREAMED_QUERY_DISCONNECT_CHECK_MS_CONFIG, Type.LONG, @@ -1065,6 +1078,13 @@ public ClientAuth getClientAuthInternal() { return getClientAuth(getString(KSQL_INTERNAL_SSL_CLIENT_AUTHENTICATION_CONFIG)); } + public List getKsqlResourceExtensions() { + if (getString(KSQL_RESOURCE_EXTENSION).isEmpty()) { + return Collections.emptyList(); + } + return getConfiguredInstances(KSQL_RESOURCE_EXTENSION, KsqlResourceExtension.class); + } + /** * Used to sanitize the first `listener` config. * diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerMain.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerMain.java index d573433d2eb1..19c338272c6d 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerMain.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerMain.java @@ -24,6 +24,8 @@ import io.confluent.ksql.logging.query.QueryLogger; import io.confluent.ksql.metrics.MetricCollectors; import io.confluent.ksql.properties.PropertiesUtil; +import io.confluent.ksql.rest.extensions.KsqlResourceContext; +import io.confluent.ksql.rest.extensions.KsqlResourceContextImpl; import io.confluent.ksql.rest.server.state.ServerState; import io.confluent.ksql.serde.FormatFactory; import io.confluent.ksql.util.KsqlConfig; diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java index 7ea4619f7f63..5cba52461df0 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java @@ -60,6 +60,7 @@ import io.confluent.ksql.services.KafkaTopicClient; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.util.KsqlConfig; +import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.version.metrics.VersionCheckerAgent; import io.vertx.core.Handler; import io.vertx.core.Vertx; @@ -379,6 +380,16 @@ public void shouldConfigureRocksDBConfigSetter() { verify(rocksDBConfigSetterHandler).accept(ksqlConfig); } + @Test(expected = KsqlException.class) + public void shouldFailIfFipsValidationEnabledButNotConfigured() { + // When: + when(ksqlConfig.enableFips()).thenReturn(true); + app.startKsql(ksqlConfig); + + // Then: + // KsqlException + } + @Test public void shouldConfigureIQWithInterNodeListenerIfSet() { // Given: diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestConfigTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestConfigTest.java index 47ffb01eb1ab..85a3a8741fa7 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestConfigTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestConfigTest.java @@ -24,19 +24,24 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableMap; +import io.confluent.ksql.rest.extensions.KsqlResourceExtension; +import io.confluent.ksql.rest.server.extensions.DummyResourceExtension; import io.confluent.ksql.util.KsqlConfig; import io.vertx.core.http.ClientAuth; import java.net.InetAddress; import java.net.MalformedURLException; import java.net.URL; import java.net.UnknownHostException; +import java.util.List; import java.util.Map; import java.util.function.Function; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -92,6 +97,45 @@ public void shouldGetKsqlConfigProperties() { ); } + @Test + public void shouldGetConfiguredKsqlResourceExtensions() { + // Given: + final KsqlRestConfig config = new KsqlRestConfig(ImmutableMap.builder() + .putAll(MIN_VALID_CONFIGS) + .put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + .put(KsqlConfig.KSQL_SERVICE_ID_CONFIG, "test") + .put(KsqlRestConfig.KSQL_RESOURCE_EXTENSION, + "io.confluent.ksql.rest.server.extensions.DummyResourceExtension") + .build() + ); + + // When: + final List ksqlResourceExtensions + = config.getKsqlResourceExtensions(); + + // Then: + assertEquals(1, ksqlResourceExtensions.size()); + assertTrue(ksqlResourceExtensions.get(0) instanceof DummyResourceExtension); + } + + @Test + public void shouldGetEmptyKsqlResourceExtensionsIfNotConfigured() { + // Given: + final KsqlRestConfig config = new KsqlRestConfig(ImmutableMap.builder() + .putAll(MIN_VALID_CONFIGS) + .put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + .put(KsqlConfig.KSQL_SERVICE_ID_CONFIG, "test") + .build() + ); + + // When: + final List ksqlResourceExtensions + = config.getKsqlResourceExtensions(); + + // Then: + assertTrue(ksqlResourceExtensions.isEmpty()); + } + // Just a sanity check to make sure that, although they contain identical mappings, successive maps returned by calls // to KsqlRestConfig.getOriginals() do not actually return the same object (mutability would then be an issue) @Test diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/extensions/DummyResourceExtension.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/extensions/DummyResourceExtension.java new file mode 100644 index 000000000000..0a2ea49f24ab --- /dev/null +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/extensions/DummyResourceExtension.java @@ -0,0 +1,35 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.server.extensions; + +import io.confluent.ksql.rest.extensions.KsqlResourceContext; +import io.confluent.ksql.rest.extensions.KsqlResourceExtension; +import io.confluent.ksql.util.KsqlException; + +import java.io.IOException; + +public class DummyResourceExtension implements KsqlResourceExtension { + + @Override + public void register(KsqlResourceContext ksqlResourceContext) throws KsqlException { + // do nothing + } + + @Override + public void close() throws IOException { + // do nothing + } +} From 8ae966b077ba12d416fbb477e666110f6d679eb9 Mon Sep 17 00:00:00 2001 From: Parag Badani Date: Mon, 27 Jan 2025 17:35:53 +0530 Subject: [PATCH 2/5] KSQL-12955 | Fix checkstyle errors. --- .../extensions/KsqlResourceContextImpl.java | 4 +++- .../extensions/KsqlResourceExtension.java | 1 - .../ksql/rest/server/KsqlRestApplication.java | 21 +++++++++---------- .../ksql/rest/server/KsqlServerMain.java | 2 -- 4 files changed, 13 insertions(+), 15 deletions(-) diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/extensions/KsqlResourceContextImpl.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/extensions/KsqlResourceContextImpl.java index f06311b10f1d..721e9ddc9423 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/extensions/KsqlResourceContextImpl.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/extensions/KsqlResourceContextImpl.java @@ -23,7 +23,9 @@ public class KsqlResourceContextImpl implements KsqlResourceContext { private final KsqlConfig ksqlConfig; private final KsqlRestConfig ksqlRestConfig; - public KsqlResourceContextImpl(KsqlConfig ksqlConfig, KsqlRestConfig restConfig) { + public KsqlResourceContextImpl( + final KsqlConfig ksqlConfig, + final KsqlRestConfig restConfig) { this.ksqlConfig = ksqlConfig; this.ksqlRestConfig = restConfig; } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/extensions/KsqlResourceExtension.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/extensions/KsqlResourceExtension.java index 1c96ccc18695..e0579449f0e3 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/extensions/KsqlResourceExtension.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/extensions/KsqlResourceExtension.java @@ -16,7 +16,6 @@ package io.confluent.ksql.rest.extensions; import io.confluent.ksql.util.KsqlException; - import java.io.Closeable; public interface KsqlResourceExtension extends Closeable { diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index 12e55e11d658..81711cfeb6e5 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -126,7 +126,6 @@ import io.vertx.core.net.SocketAddress; import io.vertx.ext.dropwizard.DropwizardMetricsOptions; import io.vertx.ext.dropwizard.Match; - import java.io.Console; import java.io.File; import java.io.IOException; @@ -321,7 +320,7 @@ public static SourceName getCommandsStreamName() { public void startAsync() { log.debug("Starting the ksqlDB API server"); - KsqlResourceContext ksqlResourceContext + final KsqlResourceContext ksqlResourceContext = new KsqlResourceContextImpl(ksqlConfigNoPort, restConfig); boolean isFipsValidatorConfigured = false; @@ -329,13 +328,13 @@ public void startAsync() { resourceExtension.register(ksqlResourceContext); if (KsqlConstants.FIPS_VALIDATOR .equals(resourceExtension.getClass().getCanonicalName())) { - isFipsValidatorConfigured = true; + isFipsValidatorConfigured = true; } } if (ksqlConfigNoPort.enableFips() && !isFipsValidatorConfigured) { - throw new KsqlException("Error enabling the FIPS resource extension: `enable.fips` is set to true but the " - + "`ksql.resource.extension.class` config is either not configured or does not contain \"" - + KsqlConstants.FIPS_VALIDATOR + "\""); + throw new KsqlException("Error enabling the FIPS resource extension: `enable.fips` is set" + + " to true but the `ksql.resource.extension.class` config is either not configured" + + " or does not contain \"" + KsqlConstants.FIPS_VALIDATOR + "\""); } this.serverMetadataResource = ServerMetadataResource.create(serviceContext, ksqlConfigNoPort); @@ -535,11 +534,11 @@ public void shutdown() { }); ksqlResourceExtensions.forEach(resourceExtension -> { - try { - resourceExtension.close(); - } catch (IOException e) { - log.error("Exception while closing resource extension", e); - } + try { + resourceExtension.close(); + } catch (IOException e) { + log.error("Exception while closing resource extension", e); + } }); try { diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerMain.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerMain.java index 19c338272c6d..d573433d2eb1 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerMain.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerMain.java @@ -24,8 +24,6 @@ import io.confluent.ksql.logging.query.QueryLogger; import io.confluent.ksql.metrics.MetricCollectors; import io.confluent.ksql.properties.PropertiesUtil; -import io.confluent.ksql.rest.extensions.KsqlResourceContext; -import io.confluent.ksql.rest.extensions.KsqlResourceContextImpl; import io.confluent.ksql.rest.server.state.ServerState; import io.confluent.ksql.serde.FormatFactory; import io.confluent.ksql.util.KsqlConfig; From 986b773c99e2049940a23b62d7d64167fb6790d2 Mon Sep 17 00:00:00 2001 From: Parag Badani Date: Mon, 27 Jan 2025 19:30:31 +0530 Subject: [PATCH 3/5] KSQL-12955 | Fix spotbugs errors. --- .../src/main/java/io/confluent/ksql/util/KsqlConfig.java | 2 +- .../src/main/java/io/confluent/ksql/util/MetricsTagsUtil.java | 2 +- .../ksql/schema/registry/KsqlSchemaRegistryClientFactory.java | 2 +- .../ksql/rest/extensions/KsqlResourceContextImpl.java | 3 +++ .../ksql/rest/server/computation/DistributingExecutor.java | 2 +- .../io/confluent/ksql/rest/server/resources/KsqlResource.java | 2 +- .../confluent/ksql/tools/migrations/commands/BaseCommand.java | 2 +- .../ksql/tools/migrations/commands/NewMigrationCommand.java | 2 +- .../ksql/tools/migrations/util/MigrationVersionInfo.java | 2 +- 9 files changed, 11 insertions(+), 8 deletions(-) diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java index 2e9acc4c5e31..ebffd1175d5e 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java @@ -1944,7 +1944,7 @@ public static Map getStringAsMap( public static Map parseStringAsMap(final String key, final String value) { try { - return value.equals("") + return value.isEmpty() ? Collections.emptyMap() : Splitter.on(",").trimResults().withKeyValueSeparator(":").split(value); } catch (final IllegalArgumentException e) { diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/util/MetricsTagsUtil.java b/ksqldb-common/src/main/java/io/confluent/ksql/util/MetricsTagsUtil.java index f97a3a1212ec..387bef5fa49e 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/util/MetricsTagsUtil.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/util/MetricsTagsUtil.java @@ -27,7 +27,7 @@ public static Map getMetricsTagsWithQueryId( final String queryId, final Map tags ) { - if (queryId.equals("")) { + if (queryId.isEmpty()) { return tags; } return addMetricTagToMap("query-id", queryId, tags); diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/schema/registry/KsqlSchemaRegistryClientFactory.java b/ksqldb-engine/src/main/java/io/confluent/ksql/schema/registry/KsqlSchemaRegistryClientFactory.java index 3e653f434ffd..02fe476163eb 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/schema/registry/KsqlSchemaRegistryClientFactory.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/schema/registry/KsqlSchemaRegistryClientFactory.java @@ -115,7 +115,7 @@ static void configureSslEngineFactory( } public SchemaRegistryClient get() { - if (schemaRegistryUrl.equals("")) { + if (schemaRegistryUrl.isEmpty()) { return new DefaultSchemaRegistryClient(); } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/extensions/KsqlResourceContextImpl.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/extensions/KsqlResourceContextImpl.java index 721e9ddc9423..8cd2ae2e5a23 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/extensions/KsqlResourceContextImpl.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/extensions/KsqlResourceContextImpl.java @@ -15,6 +15,7 @@ package io.confluent.ksql.rest.extensions; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.confluent.ksql.rest.server.KsqlRestConfig; import io.confluent.ksql.util.KsqlConfig; @@ -31,11 +32,13 @@ public KsqlResourceContextImpl( } @Override + @SuppressFBWarnings(value = "EI_EXPOSE_REP") public KsqlConfig ksqlConfig() { return ksqlConfig; } @Override + @SuppressFBWarnings(value = "EI_EXPOSE_REP") public KsqlRestConfig ksqlRestConfig() { return ksqlRestConfig; } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java index c28268795241..7e73cb6c9333 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java @@ -169,7 +169,7 @@ public StatementExecutorResponse execute( final KsqlSecurityContext securityContext ) { final String commandRunnerWarningString = commandRunnerWarning.get(); - if (!commandRunnerWarningString.equals("")) { + if (!commandRunnerWarningString.isEmpty()) { throw new KsqlServerException("Failed to handle Ksql Statement." + System.lineSeparator() + commandRunnerWarningString); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java index 393247e74b67..e439c822c140 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java @@ -457,7 +457,7 @@ private static void addCommandRunnerWarning( final Supplier commandRunnerWarning ) { final String commandRunnerIssueString = commandRunnerWarning.get(); - if (!commandRunnerIssueString.equals("")) { + if (!commandRunnerIssueString.isEmpty()) { for (final KsqlEntity entity: entityList) { entity.updateWarnings(Collections.singletonList( new KsqlWarning(commandRunnerIssueString))); diff --git a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/BaseCommand.java b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/BaseCommand.java index c94f61fec26d..679aa318596d 100644 --- a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/BaseCommand.java +++ b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/BaseCommand.java @@ -106,7 +106,7 @@ protected String getConfigFile() { } protected boolean validateConfigFilePresent() { - if (getConfigFile() == null || getConfigFile().trim().equals("")) { + if (getConfigFile() == null || getConfigFile().trim().isEmpty()) { getLogger().error("Migrations config file required but not specified. " + "Specify with {} (or, equivalently, {}).", CONFIG_FILE_OPTION, CONFIG_FILE_OPTION_SHORT); diff --git a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/NewMigrationCommand.java b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/NewMigrationCommand.java index 8212ac8ad673..acdb38e13547 100644 --- a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/NewMigrationCommand.java +++ b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/NewMigrationCommand.java @@ -58,7 +58,7 @@ public class NewMigrationCommand extends BaseCommand { @Override protected int command() { - if (getConfigFile() != null && !getConfigFile().equals("")) { + if (getConfigFile() != null && !getConfigFile().isEmpty()) { LOGGER.error("This command does not expect a config file to be passed. " + "Rather, this command will create one as part of preparing the migrations directory."); return 1; diff --git a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/util/MigrationVersionInfo.java b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/util/MigrationVersionInfo.java index 89aa3345a94e..413e3a44ead8 100644 --- a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/util/MigrationVersionInfo.java +++ b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/util/MigrationVersionInfo.java @@ -121,7 +121,7 @@ public String getErrorReason() { } private static String formatTimestamp(final String epochTime) { - if (epochTime.equals("") || epochTime.equals(EMPTY_MIGRATION_TIMESTAMP)) { + if (epochTime.isEmpty() || epochTime.equals(EMPTY_MIGRATION_TIMESTAMP)) { return epochTime; } From c03995526bb479eca96f6a2c27d0eca724b05913 Mon Sep 17 00:00:00 2001 From: Parag Badani Date: Fri, 31 Jan 2025 15:20:11 +0530 Subject: [PATCH 4/5] KSQL-12955 | Remove dependency from ConfluentConfigs. --- .../io/confluent/ksql/util/KsqlConfig.java | 16 +++++++++---- .../KsqlSchemaRegistryClientFactory.java | 2 +- .../ksql/rest/server/KsqlServerMain.java | 2 +- .../ksql/rest/server/KsqlServerMainTest.java | 24 +++++++++---------- 4 files changed, 25 insertions(+), 19 deletions(-) diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java index ebffd1175d5e..12270943d307 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java @@ -55,7 +55,6 @@ import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.config.TopicConfig; -import org.apache.kafka.common.config.internals.ConfluentConfigs; import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.streams.StreamsConfig; import org.slf4j.Logger; @@ -728,6 +727,13 @@ public enum DeploymentType { = "Configure how long the remote host executor will wait for in seconds " + "when fetching all remote hosts."; + public static final String KSQL_ENABLE_FIPS = "enable.fips"; + public static final String KSQL_ENABLE_FIPS_DEFAULT = "false"; + public static final String KSQL_ENABLE_FIPS_DOC + = "Enable FIPS mode on the server. If FIPS mode is enabled, " + + "broker listener security protocols, TLS versions and cipher " + + "suites will be validated based on FIPS compliance requirement."; + private enum ConfigGeneration { LEGACY, CURRENT @@ -1559,11 +1565,11 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) { ) .withClientSslSupport() .define( - ConfluentConfigs.ENABLE_FIPS_CONFIG, + KSQL_ENABLE_FIPS, Type.BOOLEAN, - ConfluentConfigs.ENABLE_FIPS_DEFAULT, + KSQL_ENABLE_FIPS_DEFAULT, Importance.LOW, - ConfluentConfigs.ENABLE_FIPS_DOC + KSQL_ENABLE_FIPS_DOC ); for (final CompatibilityBreakingConfigDef compatibilityBreakingConfigDef @@ -1793,7 +1799,7 @@ public Map getConsumerClientConfigProps() { } public boolean enableFips() { - return getBoolean(ConfluentConfigs.ENABLE_FIPS_CONFIG); + return getBoolean(KSQL_ENABLE_FIPS); } public Map addConfluentMetricsContextConfigsKafka( diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/schema/registry/KsqlSchemaRegistryClientFactory.java b/ksqldb-engine/src/main/java/io/confluent/ksql/schema/registry/KsqlSchemaRegistryClientFactory.java index 02fe476163eb..4aa8e1063abb 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/schema/registry/KsqlSchemaRegistryClientFactory.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/schema/registry/KsqlSchemaRegistryClientFactory.java @@ -97,7 +97,7 @@ public KsqlSchemaRegistryClientFactory( * Creates an SslContext configured to be used with the KsqlSchemaRegistryClient. */ public static SSLContext newSslContext(final KsqlConfig config) { - if (config.getBoolean(ConfluentConfigs.ENABLE_FIPS_CONFIG)) { + if (config.enableFips()) { SecurityUtils.addConfiguredSecurityProviders(config.originals()); } final DefaultSslEngineFactory sslFactory = new DefaultSslEngineFactory(); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerMain.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerMain.java index d573433d2eb1..da2ab3efce77 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerMain.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerMain.java @@ -194,7 +194,7 @@ static void validateDefaultTopicFormats(final KsqlConfig config) { @VisibleForTesting static void validateFips(final KsqlConfig config, final KsqlRestConfig restConfig) { - if (config.getBoolean(ConfluentConfigs.ENABLE_FIPS_CONFIG)) { + if (config.enableFips()) { final FipsValidator fipsValidator = ConfluentConfigs.buildFipsValidator(); // validate cipher suites and TLS version diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlServerMainTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlServerMainTest.java index 1c374b4b2def..e74509bfbb3d 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlServerMainTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlServerMainTest.java @@ -270,7 +270,7 @@ public void shouldFailOnInvalidDefaultValueFormat() { public void shouldFailOnInvalidCipherSuitesList() { // Given: final KsqlConfig config = configWith(ImmutableMap.of( - ConfluentConfigs.ENABLE_FIPS_CONFIG, true + KsqlConfig.KSQL_ENABLE_FIPS, true )); final String wrongCipherSuite = "TLS_RSA_WITH_NULL_MD5"; final KsqlRestConfig restConfig = new KsqlRestConfig(ImmutableMap.builder() @@ -295,7 +295,7 @@ public void shouldFailOnInvalidCipherSuitesList() { public void shouldFailOnInvalidEnabledProtocols() { // Given: final KsqlConfig config = configWith(ImmutableMap.of( - ConfluentConfigs.ENABLE_FIPS_CONFIG, true + KsqlConfig.KSQL_ENABLE_FIPS, true )); final String wrongEnabledProtocols = "TLSv1.0"; final KsqlRestConfig restConfig = new KsqlRestConfig(ImmutableMap.builder() @@ -322,7 +322,7 @@ public void shouldFailOnInvalidEnabledProtocols() { public void shouldFailOnNullBrokerSecurityProtocol() { // Given: final KsqlConfig config = configWith(ImmutableMap.of( - ConfluentConfigs.ENABLE_FIPS_CONFIG, true + KsqlConfig.KSQL_ENABLE_FIPS, true )); final KsqlRestConfig restConfig = new KsqlRestConfig(ImmutableMap.builder() .put(KsqlRestConfig.SSL_CIPHER_SUITES_CONFIG, @@ -347,7 +347,7 @@ public void shouldFailOnNullBrokerSecurityProtocol() { public void shouldFailOnInvalidBrokerSecurityProtocol() { // Given: final KsqlConfig config = configWith(ImmutableMap.of( - ConfluentConfigs.ENABLE_FIPS_CONFIG, true, + KsqlConfig.KSQL_ENABLE_FIPS, true, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name )); final KsqlRestConfig restConfig = new KsqlRestConfig(ImmutableMap.builder() @@ -373,7 +373,7 @@ public void shouldFailOnInvalidBrokerSecurityProtocol() { public void shouldFailOnNullSSLEndpointIdentificationAlgorithm() { // Given: final KsqlConfig config = configWith(ImmutableMap.of( - ConfluentConfigs.ENABLE_FIPS_CONFIG, true, + KsqlConfig.KSQL_ENABLE_FIPS, true, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.name )); final KsqlRestConfig restConfig = new KsqlRestConfig(ImmutableMap.builder() @@ -399,7 +399,7 @@ public void shouldFailOnNullSSLEndpointIdentificationAlgorithm() { public void shouldFailOnInvalidSSLEndpointIdentificationAlgorithm() { // Given: final KsqlConfig config = configWith(ImmutableMap.of( - ConfluentConfigs.ENABLE_FIPS_CONFIG, true, + KsqlConfig.KSQL_ENABLE_FIPS, true, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.name )); final KsqlRestConfig restConfig = new KsqlRestConfig(ImmutableMap.builder() @@ -426,7 +426,7 @@ public void shouldFailOnInvalidSSLEndpointIdentificationAlgorithm() { public void shouldFailOnNullSRUrl() { // Given: final KsqlConfig config = configWith(ImmutableMap.of( - ConfluentConfigs.ENABLE_FIPS_CONFIG, true, + KsqlConfig.KSQL_ENABLE_FIPS, true, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.name )); final KsqlRestConfig restConfig = new KsqlRestConfig(ImmutableMap.builder() @@ -454,7 +454,7 @@ public void shouldFailOnNullSRUrl() { public void shouldFailOnInvalidSRUrl() { // Given: final KsqlConfig config = configWith(ImmutableMap.of( - ConfluentConfigs.ENABLE_FIPS_CONFIG, true, + KsqlConfig.KSQL_ENABLE_FIPS, true, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.name )); final KsqlRestConfig restConfig = new KsqlRestConfig(ImmutableMap.builder() @@ -483,7 +483,7 @@ public void shouldFailOnInvalidSRUrl() { public void shouldFailOnInvalidListenerProtocols() { // Given: final KsqlConfig config = configWith(ImmutableMap.of( - ConfluentConfigs.ENABLE_FIPS_CONFIG, true, + KsqlConfig.KSQL_ENABLE_FIPS, true, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.name )); final KsqlRestConfig restConfig = new KsqlRestConfig(ImmutableMap.builder() @@ -512,7 +512,7 @@ public void shouldFailOnInvalidListenerProtocols() { public void shouldFailOnInvalidProxyListenerProtocols() { // Given: final KsqlConfig config = configWith(ImmutableMap.of( - ConfluentConfigs.ENABLE_FIPS_CONFIG, true, + KsqlConfig.KSQL_ENABLE_FIPS, true, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.name )); final KsqlRestConfig restConfig = new KsqlRestConfig(ImmutableMap.builder() @@ -545,7 +545,7 @@ public void shouldFailOnInvalidProxyListenerProtocols() { public void shouldFailOnInvalidInternalListenerProtocols() { // Given: final KsqlConfig config = configWith(ImmutableMap.of( - ConfluentConfigs.ENABLE_FIPS_CONFIG, true, + KsqlConfig.KSQL_ENABLE_FIPS, true, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.name )); final KsqlRestConfig restConfig = new KsqlRestConfig(ImmutableMap.builder() @@ -577,7 +577,7 @@ public void shouldFailOnInvalidInternalListenerProtocols() { public void shouldFailOnInvalidAdvertisedListenerProtocols() { // Given: final KsqlConfig config = configWith(ImmutableMap.of( - ConfluentConfigs.ENABLE_FIPS_CONFIG, true, + KsqlConfig.KSQL_ENABLE_FIPS, true, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.name )); final KsqlRestConfig restConfig = new KsqlRestConfig(ImmutableMap.builder() From 2189e60377473c6ecaa63db84a3b9ec2272492fc Mon Sep 17 00:00:00 2001 From: Parag Badani Date: Fri, 31 Jan 2025 15:26:45 +0530 Subject: [PATCH 5/5] KSQL-12955 | Address review comments. --- .../main/java/io/confluent/ksql/util/KsqlConfig.java | 6 +++--- .../registry/KsqlSchemaRegistryClientFactory.java | 1 - .../rest/extensions/KsqlResourceContextImpl.java | 1 + .../confluent/ksql/rest/server/KsqlRestConfig.java | 12 +++++------- .../ksql/rest/server/KsqlRestApplicationTest.java | 2 ++ .../ksql/rest/server/KsqlRestConfigTest.java | 2 +- .../ksql/rest/server/KsqlServerMainTest.java | 1 - 7 files changed, 12 insertions(+), 13 deletions(-) diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java index 12270943d307..0206a79c50b7 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java @@ -730,9 +730,9 @@ public enum DeploymentType { public static final String KSQL_ENABLE_FIPS = "enable.fips"; public static final String KSQL_ENABLE_FIPS_DEFAULT = "false"; public static final String KSQL_ENABLE_FIPS_DOC - = "Enable FIPS mode on the server. If FIPS mode is enabled, " + - "broker listener security protocols, TLS versions and cipher " + - "suites will be validated based on FIPS compliance requirement."; + = "Enable FIPS mode on the server. If FIPS mode is enabled, " + + "broker listener security protocols, TLS versions and cipher " + + "suites will be validated based on FIPS compliance requirement."; private enum ConfigGeneration { LEGACY, diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/schema/registry/KsqlSchemaRegistryClientFactory.java b/ksqldb-engine/src/main/java/io/confluent/ksql/schema/registry/KsqlSchemaRegistryClientFactory.java index 4aa8e1063abb..49e8481320b6 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/schema/registry/KsqlSchemaRegistryClientFactory.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/schema/registry/KsqlSchemaRegistryClientFactory.java @@ -29,7 +29,6 @@ import java.util.Map; import java.util.function.Supplier; import javax.net.ssl.SSLContext; -import org.apache.kafka.common.config.internals.ConfluentConfigs; import org.apache.kafka.common.security.auth.SslEngineFactory; import org.apache.kafka.common.security.ssl.DefaultSslEngineFactory; import org.apache.kafka.common.utils.SecurityUtils; diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/extensions/KsqlResourceContextImpl.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/extensions/KsqlResourceContextImpl.java index 8cd2ae2e5a23..4c194ec2e7e1 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/extensions/KsqlResourceContextImpl.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/extensions/KsqlResourceContextImpl.java @@ -24,6 +24,7 @@ public class KsqlResourceContextImpl implements KsqlResourceContext { private final KsqlConfig ksqlConfig; private final KsqlRestConfig ksqlRestConfig; + @SuppressFBWarnings(value = "EI_EXPOSE_REP2") public KsqlResourceContextImpl( final KsqlConfig ksqlConfig, final KsqlRestConfig restConfig) { diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java index e6daa4815b69..6820efc73ce0 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java @@ -445,11 +445,12 @@ public class KsqlRestConfig extends AbstractConfig { KSQL_COMMAND_TOPIC_MIGRATION_MIGRATING ); - public static final String KSQL_RESOURCE_EXTENSION = + public static final String KSQL_RESOURCE_EXTENSIONS = "ksql.resource.extension.class"; private static final String KSQL_RESOURCE_EXTENSION_DEFAULT = ""; private static final String KSQL_RESOURCE_EXTENSION_DOC = - "A list of KsqlResourceExtension implementations to register with ksqlDB server."; + "A list of KsqlResourceExtension implementations " + + "to be registered with the ksqlDB server."; private static final ConfigDef CONFIG_DEF; @@ -625,7 +626,7 @@ public class KsqlRestConfig extends AbstractConfig { Importance.HIGH, INTERNAL_LISTENER_DOC ).define( - KSQL_RESOURCE_EXTENSION, + KSQL_RESOURCE_EXTENSIONS, Type.LIST, KSQL_RESOURCE_EXTENSION_DEFAULT, Importance.MEDIUM, @@ -1079,10 +1080,7 @@ public ClientAuth getClientAuthInternal() { } public List getKsqlResourceExtensions() { - if (getString(KSQL_RESOURCE_EXTENSION).isEmpty()) { - return Collections.emptyList(); - } - return getConfiguredInstances(KSQL_RESOURCE_EXTENSION, KsqlResourceExtension.class); + return getConfiguredInstances(KSQL_RESOURCE_EXTENSIONS, KsqlResourceExtension.class); } /** diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java index 5cba52461df0..01c208e9a541 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java @@ -77,6 +77,7 @@ import org.apache.kafka.common.metrics.MetricsReporter; import org.apache.kafka.streams.StreamsConfig; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; @@ -380,6 +381,7 @@ public void shouldConfigureRocksDBConfigSetter() { verify(rocksDBConfigSetterHandler).accept(ksqlConfig); } + @Ignore @Test(expected = KsqlException.class) public void shouldFailIfFipsValidationEnabledButNotConfigured() { // When: diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestConfigTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestConfigTest.java index 85a3a8741fa7..474f665345c4 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestConfigTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestConfigTest.java @@ -104,7 +104,7 @@ public void shouldGetConfiguredKsqlResourceExtensions() { .putAll(MIN_VALID_CONFIGS) .put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") .put(KsqlConfig.KSQL_SERVICE_ID_CONFIG, "test") - .put(KsqlRestConfig.KSQL_RESOURCE_EXTENSION, + .put(KsqlRestConfig.KSQL_RESOURCE_EXTENSIONS, "io.confluent.ksql.rest.server.extensions.DummyResourceExtension") .build() ); diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlServerMainTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlServerMainTest.java index e74509bfbb3d..512aead136c9 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlServerMainTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlServerMainTest.java @@ -37,7 +37,6 @@ import java.util.Map; import java.util.concurrent.Executor; import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.common.config.internals.ConfluentConfigs; import org.apache.kafka.common.security.auth.SecurityProtocol; import org.easymock.Capture; import org.easymock.EasyMockRunner;