From b5ad739b12180f7402d3b2e7f57b6162f1f5cdfb Mon Sep 17 00:00:00 2001 From: Ralf Date: Sun, 10 Sep 2023 20:58:32 +0200 Subject: [PATCH] [FSTORE-840][APPEND] Use existing Kafka cluster (#1542) --- .../spec/helpers/storage_connector_helper.rb | 2 +- .../test/ruby/spec/storage_connector_spec.rb | 86 +++++++++++++++++-- .../FeaturestoreStorageConnectorService.java | 2 +- ...eaturestoreStorageConnectorController.java | 5 +- .../hops/hopsworks/common/util/Settings.java | 14 +++ 5 files changed, 98 insertions(+), 11 deletions(-) diff --git a/hopsworks-IT/src/test/ruby/spec/helpers/storage_connector_helper.rb b/hopsworks-IT/src/test/ruby/spec/helpers/storage_connector_helper.rb index a415fe09f4..676dc0392a 100644 --- a/hopsworks-IT/src/test/ruby/spec/helpers/storage_connector_helper.rb +++ b/hopsworks-IT/src/test/ruby/spec/helpers/storage_connector_helper.rb @@ -48,7 +48,7 @@ def get_storage_connector(project_id, featurestore_id, name) end def get_online_kafka_storage_connector(project_id, featurestore_id) - get "#{ENV['HOPSWORKS_API']}/project/#{project_id}/featurestores/#{featurestore_id}/storageconnectors/kafka_connector" + get "#{ENV['HOPSWORKS_API']}/project/#{project_id}/featurestores/#{featurestore_id}/storageconnectors/kafka_connector/byok" end def create_hopsfs_connector(project_id, featurestore_id, datasetName: "Resources") diff --git a/hopsworks-IT/src/test/ruby/spec/storage_connector_spec.rb b/hopsworks-IT/src/test/ruby/spec/storage_connector_spec.rb index 85fe4f6577..de467cd947 100644 --- a/hopsworks-IT/src/test/ruby/spec/storage_connector_spec.rb +++ b/hopsworks-IT/src/test/ruby/spec/storage_connector_spec.rb @@ -937,32 +937,35 @@ describe "Operations for online kafka connector" do context 'with valid project, featurestore service enabled' do before :all do - @online_kafka_storage_connector_name = getVar('online_kafka_storage_connector_name') + @enable_bring_your_own_kafka = getVar('enable_bring_your_own_kafka') @enable_kafka_storage_connectors = getVar('enable_kafka_storage_connectors') setVar('enable_kafka_storage_connectors', "true") + @connector_name = "kafka_connector" + @cleanup = true @debugOpt = false with_valid_project end after :all do - setVar('online_kafka_storage_connector_name', @online_kafka_storage_connector_name[:value]) + setVar('enable_bring_your_own_kafka', @enable_bring_your_own_kafka[:value]) setVar('enable_kafka_storage_connectors', @enable_kafka_storage_connectors[:value]) end it "should get a online kafka storage connector with default values if it didn't exist" do project = get_project featurestore_id = get_featurestore_id(project.id) - connector_name = "kafka_connector" + setVar('enable_bring_your_own_kafka', "true") create_session(project[:username], "Pass123") + json_result = get_online_kafka_storage_connector(project.id, featurestore_id) parsed_json = JSON.parse(json_result) expect_status_details(200) expect(parsed_json.key?("id")).to be true expect(parsed_json["featurestoreId"]).to eql(featurestore_id) - expect(parsed_json["name"]).to eql(connector_name) + expect(parsed_json["name"]).to eql(@connector_name) expect(parsed_json["storageConnectorType"]).to eql("KAFKA") expect(parsed_json["bootstrapServers"]).to eql("10.0.2.15:9091") expect(parsed_json["securityProtocol"]).to eql("SSL") @@ -978,7 +981,9 @@ it "should get already created online kafka storage connector" do project = get_project featurestore_id = get_featurestore_id(project.id) - connector_name = "kafka_connector" + + setVar('enable_bring_your_own_kafka', "true") + create_session(project[:username], "Pass123") additional_data = { bootstrapServers: "localhost:9091", @@ -987,16 +992,15 @@ options: [{name: "option1", value: "value1"}] } - create_kafka_connector(project.id, featurestore_id, additional_data, connector_name) + create_kafka_connector(project.id, featurestore_id, additional_data, @connector_name) expect_status_details(201) - create_session(project[:username], "Pass123") json_result = get_online_kafka_storage_connector(project.id, featurestore_id) parsed_json = JSON.parse(json_result) expect_status_details(200) expect(parsed_json.key?("id")).to be true expect(parsed_json["featurestoreId"]).to eql(featurestore_id) - expect(parsed_json["name"]).to eql(connector_name) + expect(parsed_json["name"]).to eql(@connector_name) expect(parsed_json["storageConnectorType"]).to eql("KAFKA") expect(parsed_json["bootstrapServers"]).to eql("localhost:9091") expect(parsed_json["securityProtocol"]).to eql("SASL_SSL") @@ -1005,6 +1009,72 @@ expect(parsed_json["options"][0]["name"]).to eql("option1") expect(parsed_json["options"][0]["value"]).to eql("value1") expect(parsed_json["externalKafka"]).to be true + + delete "#{ENV['HOPSWORKS_API']}/project/#{project.id}/featurestores/#{featurestore_id}/storageconnectors/#{@connector_name}" + expect_status_details(200) + end + + it "should get a online kafka storage connector with default values if it didn't exist and byok is not enabled" do + project = get_project + featurestore_id = get_featurestore_id(project.id) + + setVar('enable_bring_your_own_kafka', "false") + create_session(project[:username], "Pass123") + + json_result = get_online_kafka_storage_connector(project.id, featurestore_id) + parsed_json = JSON.parse(json_result) + expect_status_details(200) + expect(parsed_json.key?("id")).to be true + expect(parsed_json["featurestoreId"]).to eql(featurestore_id) + expect(parsed_json["name"]).to eql(@connector_name) + expect(parsed_json["storageConnectorType"]).to eql("KAFKA") + expect(parsed_json["bootstrapServers"]).to eql("10.0.2.15:9091") + expect(parsed_json["securityProtocol"]).to eql("SSL") + expect(parsed_json.key?("sslTruststoreLocation")).to be false + expect(parsed_json.key?("sslTruststorePassword")).to be false + expect(parsed_json.key?("sslKeystoreLocation")).to be false + expect(parsed_json.key?("sslKeystorePassword")).to be false + expect(parsed_json.key?("sslKeyPassword")).to be false + expect(parsed_json["sslEndpointIdentificationAlgorithm"]).to eql("") + expect(parsed_json["externalKafka"]).to be false + end + + it "should get a online kafka storage connector with default values if it exists and byok is not enabled" do + project = get_project + featurestore_id = get_featurestore_id(project.id) + + setVar('enable_bring_your_own_kafka', "false") + create_session(project[:username], "Pass123") + + additional_data = { + bootstrapServers: "localhost:9091", + securityProtocol: "SASL_SSL", + sslEndpointIdentificationAlgorithm: "", + options: [{name: "option1", value: "value1"}] + } + + create_kafka_connector(project.id, featurestore_id, additional_data, @connector_name) + expect_status_details(201) + + json_result = get_online_kafka_storage_connector(project.id, featurestore_id) + parsed_json = JSON.parse(json_result) + expect_status_details(200) + expect(parsed_json.key?("id")).to be true + expect(parsed_json["featurestoreId"]).to eql(featurestore_id) + expect(parsed_json["name"]).to eql(@connector_name) + expect(parsed_json["storageConnectorType"]).to eql("KAFKA") + expect(parsed_json["bootstrapServers"]).to eql("10.0.2.15:9091") + expect(parsed_json["securityProtocol"]).to eql("SSL") + expect(parsed_json.key?("sslTruststoreLocation")).to be false + expect(parsed_json.key?("sslTruststorePassword")).to be false + expect(parsed_json.key?("sslKeystoreLocation")).to be false + expect(parsed_json.key?("sslKeystorePassword")).to be false + expect(parsed_json.key?("sslKeyPassword")).to be false + expect(parsed_json["sslEndpointIdentificationAlgorithm"]).to eql("") + expect(parsed_json["externalKafka"]).to be false + + delete "#{ENV['HOPSWORKS_API']}/project/#{project.id}/featurestores/#{featurestore_id}/storageconnectors/#{@connector_name}" + expect_status_details(200) end end end diff --git a/hopsworks-api/src/main/java/io/hops/hopsworks/api/featurestore/storageconnector/FeaturestoreStorageConnectorService.java b/hopsworks-api/src/main/java/io/hops/hopsworks/api/featurestore/storageconnector/FeaturestoreStorageConnectorService.java index 9214f0226e..d844dd2876 100644 --- a/hopsworks-api/src/main/java/io/hops/hopsworks/api/featurestore/storageconnector/FeaturestoreStorageConnectorService.java +++ b/hopsworks-api/src/main/java/io/hops/hopsworks/api/featurestore/storageconnector/FeaturestoreStorageConnectorService.java @@ -308,7 +308,7 @@ public Response getOnlineFeaturestoreStorageConnector(@Context SecurityContext s * @throws FeaturestoreException */ @GET - @Path("/kafka_connector") + @Path("/kafka_connector/byok") @Produces(MediaType.APPLICATION_JSON) @AllowedProjectRoles({AllowedProjectRoles.DATA_OWNER, AllowedProjectRoles.DATA_SCIENTIST}) @JWTRequired(acceptedTokens = {Audience.API, Audience.JOB}, diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/storageconnectors/FeaturestoreStorageConnectorController.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/storageconnectors/FeaturestoreStorageConnectorController.java index aad31ff434..7d5107d8f9 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/storageconnectors/FeaturestoreStorageConnectorController.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/storageconnectors/FeaturestoreStorageConnectorController.java @@ -41,6 +41,7 @@ import io.hops.hopsworks.common.featurestore.storageconnectors.snowflake.FeaturestoreSnowflakeConnectorDTO; import io.hops.hopsworks.common.featurestore.utils.FeaturestoreUtils; import io.hops.hopsworks.common.kafka.KafkaBrokers; +import io.hops.hopsworks.common.util.Settings; import io.hops.hopsworks.exceptions.FeaturestoreException; import io.hops.hopsworks.exceptions.ProjectException; import io.hops.hopsworks.exceptions.UserException; @@ -102,6 +103,8 @@ public class FeaturestoreStorageConnectorController { private KafkaBrokers kafkaBrokers; @EJB private FeaturestoreController featurestoreController; + @EJB + private Settings settings; private static final String KAFKA_STORAGE_CONNECTOR_NAME = "kafka_connector"; @@ -165,7 +168,7 @@ public FeatureStoreKafkaConnectorDTO getKafkaConnector(Featurestore featureStore featurestoreConnectorFacade.findByFeaturestoreName(featureStore, KAFKA_STORAGE_CONNECTOR_NAME); FeatureStoreKafkaConnectorDTO kafkaConnectorDTO; - if (featurestoreConnector.isPresent()) { + if (featurestoreConnector.isPresent() && settings.isBringYourOwnKafkaEnabled()) { // connector found FeaturestoreConnector connector = featurestoreConnector.get(); if (!connector.getConnectorType().equals(FeaturestoreConnectorType.KAFKA)) { diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/util/Settings.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/util/Settings.java index 4a3f47c62b..4b66e21722 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/util/Settings.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/util/Settings.java @@ -331,6 +331,10 @@ public class Settings implements Serializable { private final static String VARIABLE_ENABLE_BIGQUERY_STORAGE_CONNECTORS = "enable_bigquery_storage_connectors"; private final static String VARIABLE_CONNECTOR_IMAGE_VERSION = "testconnector_image_version"; + // BYOK + + private final static String VARIABLE_ENABLE_BRING_YOUR_OWN_KAFKA = "enable_bring_your_own_kafka"; + //OpenSearch Security private static final String VARIABLE_OPENSEARCH_SECURITY_ENABLED = "elastic_opendistro_security_enabled"; private static final String VARIABLE_OPENSEARCH_HTTPS_ENABLED = "elastic_https_enabled"; @@ -842,6 +846,9 @@ private void populateCache() { ENABLE_GCS_STORAGE_CONNECTORS); ENABLE_BIGQUERY_STORAGE_CONNECTORS = setBoolVar(VARIABLE_ENABLE_BIGQUERY_STORAGE_CONNECTORS, ENABLE_BIGQUERY_STORAGE_CONNECTORS); + + ENABLE_BRING_YOUR_OWN_KAFKA = setBoolVar(VARIABLE_ENABLE_BRING_YOUR_OWN_KAFKA, + ENABLE_BRING_YOUR_OWN_KAFKA); TESTCONNECTOR_IMAGE_VERSION = setStrVar(VARIABLE_CONNECTOR_IMAGE_VERSION, "0.1"); YARN_RUNTIME = setStrVar(VARIABLE_YARN_RUNTIME, YARN_RUNTIME); @@ -3350,6 +3357,13 @@ public synchronized boolean isBigqueryStorageConnectorsEnabled() { checkCache(); return ENABLE_BIGQUERY_STORAGE_CONNECTORS; } + + private boolean ENABLE_BRING_YOUR_OWN_KAFKA = false; + public synchronized boolean isBringYourOwnKafkaEnabled() { + checkCache(); + return ENABLE_BRING_YOUR_OWN_KAFKA; + } + // test connectors docker private String TESTCONNECTOR_IMAGE_VERSION = "0.1";