Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KSQL-12955 | Introduce KsqlResourceExtensions for plugins configured externally. #10665

Open
wants to merge 5 commits into
base: 7.6.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.streams.StreamsConfig;
import org.slf4j.Logger;
Expand Down Expand Up @@ -728,6 +727,13 @@ public enum DeploymentType {
= "Configure how long the remote host executor will wait for in seconds "
+ "when fetching all remote hosts.";

public static final String KSQL_ENABLE_FIPS = "enable.fips";
public static final String KSQL_ENABLE_FIPS_DEFAULT = "false";
public static final String KSQL_ENABLE_FIPS_DOC
= "Enable FIPS mode on the server. If FIPS mode is enabled, "
+ "broker listener security protocols, TLS versions and cipher "
+ "suites will be validated based on FIPS compliance requirement.";

private enum ConfigGeneration {
LEGACY,
CURRENT
Expand Down Expand Up @@ -1559,11 +1565,11 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
)
.withClientSslSupport()
.define(
ConfluentConfigs.ENABLE_FIPS_CONFIG,
KSQL_ENABLE_FIPS,
Type.BOOLEAN,
ConfluentConfigs.ENABLE_FIPS_DEFAULT,
KSQL_ENABLE_FIPS_DEFAULT,
Importance.LOW,
ConfluentConfigs.ENABLE_FIPS_DOC
KSQL_ENABLE_FIPS_DOC
);

for (final CompatibilityBreakingConfigDef compatibilityBreakingConfigDef
Expand Down Expand Up @@ -1792,6 +1798,10 @@ public Map<String, Object> getConsumerClientConfigProps() {
return Collections.unmodifiableMap(map);
}

public boolean enableFips() {
return getBoolean(KSQL_ENABLE_FIPS);
}

public Map<String, Object> addConfluentMetricsContextConfigsKafka(
final Map<String,Object> props
) {
Expand Down Expand Up @@ -1940,7 +1950,7 @@ public static Map<String, String> getStringAsMap(

public static Map<String, String> parseStringAsMap(final String key, final String value) {
try {
return value.equals("")
return value.isEmpty()
? Collections.emptyMap()
: Splitter.on(",").trimResults().withKeyValueSeparator(":").split(value);
} catch (final IllegalArgumentException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ private KsqlConstants() {
public static final String KSQL_QUERY_PLAN_TYPE_TAG = "query_plan_type";
public static final String KSQL_QUERY_ROUTING_TYPE_TAG = "query_routing_type";

public static final String FIPS_VALIDATOR
= "io.confluent.ksql.security.KsqlFipsResourceExtension";

public enum KsqlQueryType {
PERSISTENT,
PUSH,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public static Map<String, String> getMetricsTagsWithQueryId(
final String queryId,
final Map<String, String> tags
) {
if (queryId.equals("")) {
if (queryId.isEmpty()) {
return tags;
}
return addMetricTagToMap("query-id", queryId, tags);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.Map;
import java.util.function.Supplier;
import javax.net.ssl.SSLContext;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.security.auth.SslEngineFactory;
import org.apache.kafka.common.security.ssl.DefaultSslEngineFactory;
import org.apache.kafka.common.utils.SecurityUtils;
Expand Down Expand Up @@ -97,7 +96,7 @@ public KsqlSchemaRegistryClientFactory(
* Creates an SslContext configured to be used with the KsqlSchemaRegistryClient.
*/
public static SSLContext newSslContext(final KsqlConfig config) {
if (config.getBoolean(ConfluentConfigs.ENABLE_FIPS_CONFIG)) {
if (config.enableFips()) {
SecurityUtils.addConfiguredSecurityProviders(config.originals());
}
final DefaultSslEngineFactory sslFactory = new DefaultSslEngineFactory();
Expand All @@ -115,7 +114,7 @@ static void configureSslEngineFactory(
}

public SchemaRegistryClient get() {
if (schemaRegistryUrl.equals("")) {
if (schemaRegistryUrl.isEmpty()) {
return new DefaultSchemaRegistryClient();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2018 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.extensions;

import io.confluent.ksql.rest.server.KsqlRestConfig;
import io.confluent.ksql.util.KsqlConfig;

public interface KsqlResourceContext {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any benefit of building an interface? Avoid if there is exactly one implementation and in future also, there is no rationale for multiple implementations.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea behind this is to encapsulate a collection of objects that are useful to the ResourceExtension in future.


KsqlConfig ksqlConfig();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since KsqlConfig and KsqlRestConfig are already available, is an additional Context class necessary? If not, it should be avoided.


KsqlRestConfig ksqlRestConfig();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2018 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.extensions;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.rest.server.KsqlRestConfig;
import io.confluent.ksql.util.KsqlConfig;

public class KsqlResourceContextImpl implements KsqlResourceContext {

private final KsqlConfig ksqlConfig;
private final KsqlRestConfig ksqlRestConfig;

@SuppressFBWarnings(value = "EI_EXPOSE_REP2")
public KsqlResourceContextImpl(
final KsqlConfig ksqlConfig,
final KsqlRestConfig restConfig) {
this.ksqlConfig = ksqlConfig;
this.ksqlRestConfig = restConfig;
}

@Override
@SuppressFBWarnings(value = "EI_EXPOSE_REP")
public KsqlConfig ksqlConfig() {
return ksqlConfig;
}

@Override
@SuppressFBWarnings(value = "EI_EXPOSE_REP")
public KsqlRestConfig ksqlRestConfig() {
return ksqlRestConfig;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2018 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.extensions;

import io.confluent.ksql.util.KsqlException;
import java.io.Closeable;

public interface KsqlResourceExtension extends Closeable {

void register(KsqlResourceContext ksqlResourceContext) throws KsqlException;
hrishabhg marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.entity.SourceInfo;
import io.confluent.ksql.rest.entity.StreamsList;
import io.confluent.ksql.rest.extensions.KsqlResourceContext;
import io.confluent.ksql.rest.extensions.KsqlResourceContextImpl;
import io.confluent.ksql.rest.extensions.KsqlResourceExtension;
import io.confluent.ksql.rest.server.HeartbeatAgent.Builder;
import io.confluent.ksql.rest.server.computation.Command;
import io.confluent.ksql.rest.server.computation.CommandRunner;
Expand Down Expand Up @@ -125,6 +128,7 @@
import io.vertx.ext.dropwizard.Match;
import java.io.Console;
import java.io.File;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.MalformedURLException;
Expand Down Expand Up @@ -211,6 +215,7 @@ public final class KsqlRestApplication implements Executable {
private KafkaTopicClient internalTopicClient;
private final Instant ksqlRestAppStartTime;
private final KsqlRestApplicationMetrics restApplicationMetrics;
private final List<KsqlResourceExtension> ksqlResourceExtensions;

// The startup thread that can be interrupted if necessary during shutdown. This should only
// happen if startup hangs.
Expand Down Expand Up @@ -279,7 +284,8 @@ public static SourceName getCommandsStreamName() {
this.vertx = requireNonNull(vertx, "vertx");
this.denyListPropertyValidator =
requireNonNull(denyListPropertyValidator, "denyListPropertyValidator");

this.ksqlResourceExtensions = new KsqlRestConfig(ksqlConfigNoPort.originals())
.getKsqlResourceExtensions();
this.serverInfoResource =
new ServerInfoResource(serviceContext, ksqlConfigNoPort, commandRunner);
if (heartbeatAgent.isPresent()) {
Expand Down Expand Up @@ -313,6 +319,24 @@ public static SourceName getCommandsStreamName() {
@Override
public void startAsync() {
log.debug("Starting the ksqlDB API server");

final KsqlResourceContext ksqlResourceContext
= new KsqlResourceContextImpl(ksqlConfigNoPort, restConfig);

boolean isFipsValidatorConfigured = false;
for (KsqlResourceExtension resourceExtension : ksqlResourceExtensions) {
resourceExtension.register(ksqlResourceContext);
if (KsqlConstants.FIPS_VALIDATOR
.equals(resourceExtension.getClass().getCanonicalName())) {
isFipsValidatorConfigured = true;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can get rid of FIPS Specific checks. We can move tests related to that to confluent-security-plugins itself.

}
if (ksqlConfigNoPort.enableFips() && !isFipsValidatorConfigured) {
throw new KsqlException("Error enabling the FIPS resource extension: `enable.fips` is set"
+ " to true but the `ksql.resource.extension.class` config is either not configured"
+ " or does not contain \"" + KsqlConstants.FIPS_VALIDATOR + "\"");
}

this.serverMetadataResource = ServerMetadataResource.create(serviceContext, ksqlConfigNoPort);
final StatementParser statementParser = new StatementParser(ksqlEngine);
final Optional<KsqlAuthorizationValidator> authorizationValidator =
Expand Down Expand Up @@ -509,6 +533,14 @@ public void shutdown() {
}
});

ksqlResourceExtensions.forEach(resourceExtension -> {
try {
resourceExtension.close();
} catch (IOException e) {
log.error("Exception while closing resource extension", e);
}
});

try {
ksqlEngine.close();
} catch (final Exception e) {
Expand Down Expand Up @@ -612,9 +644,9 @@ public static KsqlRestApplication buildApplication(
final FunctionRegistry functionRegistry,
final Instant ksqlRestAppStartTime
) {

final Map<String, Object> updatedRestProps = restConfig.getOriginals();
final KsqlConfig ksqlConfig = new KsqlConfig(restConfig.getKsqlConfigProperties());

final Vertx vertx = Vertx.vertx(
new VertxOptions()
.setMaxWorkerExecuteTimeUnit(TimeUnit.MILLISECONDS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.confluent.ksql.configdef.ConfigValidators;
import io.confluent.ksql.rest.DefaultErrorMessages;
import io.confluent.ksql.rest.ErrorMessages;
import io.confluent.ksql.rest.extensions.KsqlResourceExtension;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlServerException;
Expand Down Expand Up @@ -444,6 +445,13 @@ public class KsqlRestConfig extends AbstractConfig {
KSQL_COMMAND_TOPIC_MIGRATION_MIGRATING
);

public static final String KSQL_RESOURCE_EXTENSIONS =
"ksql.resource.extension.class";
private static final String KSQL_RESOURCE_EXTENSION_DEFAULT = "";
private static final String KSQL_RESOURCE_EXTENSION_DOC =
"A list of KsqlResourceExtension implementations "
+ "to be registered with the ksqlDB server.";

private static final ConfigDef CONFIG_DEF;

static {
Expand Down Expand Up @@ -617,6 +625,12 @@ public class KsqlRestConfig extends AbstractConfig {
ConfigValidators.nullsAllowed(ConfigValidators.validUrl()),
Importance.HIGH,
INTERNAL_LISTENER_DOC
).define(
KSQL_RESOURCE_EXTENSIONS,
Type.LIST,
KSQL_RESOURCE_EXTENSION_DEFAULT,
Importance.MEDIUM,
KSQL_RESOURCE_EXTENSION_DOC
).define(
STREAMED_QUERY_DISCONNECT_CHECK_MS_CONFIG,
Type.LONG,
Expand Down Expand Up @@ -1065,6 +1079,10 @@ public ClientAuth getClientAuthInternal() {
return getClientAuth(getString(KSQL_INTERNAL_SSL_CLIENT_AUTHENTICATION_CONFIG));
}

public List<KsqlResourceExtension> getKsqlResourceExtensions() {
return getConfiguredInstances(KSQL_RESOURCE_EXTENSIONS, KsqlResourceExtension.class);
}

/**
* Used to sanitize the first `listener` config.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ static void validateDefaultTopicFormats(final KsqlConfig config) {

@VisibleForTesting
static void validateFips(final KsqlConfig config, final KsqlRestConfig restConfig) {
if (config.getBoolean(ConfluentConfigs.ENABLE_FIPS_CONFIG)) {
if (config.enableFips()) {
final FipsValidator fipsValidator = ConfluentConfigs.buildFipsValidator();

// validate cipher suites and TLS version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public StatementExecutorResponse execute(
final KsqlSecurityContext securityContext
) {
final String commandRunnerWarningString = commandRunnerWarning.get();
if (!commandRunnerWarningString.equals("")) {
if (!commandRunnerWarningString.isEmpty()) {
throw new KsqlServerException("Failed to handle Ksql Statement."
+ System.lineSeparator()
+ commandRunnerWarningString);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ private static void addCommandRunnerWarning(
final Supplier<String> commandRunnerWarning
) {
final String commandRunnerIssueString = commandRunnerWarning.get();
if (!commandRunnerIssueString.equals("")) {
if (!commandRunnerIssueString.isEmpty()) {
for (final KsqlEntity entity: entityList) {
entity.updateWarnings(Collections.singletonList(
new KsqlWarning(commandRunnerIssueString)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.version.metrics.VersionCheckerAgent;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
Expand All @@ -76,6 +77,7 @@
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.streams.StreamsConfig;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
Expand Down Expand Up @@ -379,6 +381,17 @@ public void shouldConfigureRocksDBConfigSetter() {
verify(rocksDBConfigSetterHandler).accept(ksqlConfig);
}

@Ignore
@Test(expected = KsqlException.class)
public void shouldFailIfFipsValidationEnabledButNotConfigured() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can make a generic test.

shouldFailIfExtensionIsEnabledButNotConfigured

// When:
when(ksqlConfig.enableFips()).thenReturn(true);
app.startKsql(ksqlConfig);

// Then:
// KsqlException
}

@Test
public void shouldConfigureIQWithInterNodeListenerIfSet() {
// Given:
Expand Down
Loading