Skip to content

Commit

Permalink
[FSTORE-840][APPEND] Use existing Kafka cluster (#1542)
Browse files Browse the repository at this point in the history
  • Loading branch information
bubriks authored and SirOibaf committed Sep 10, 2023
1 parent 0f52723 commit b5ad739
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def get_storage_connector(project_id, featurestore_id, name)
end

def get_online_kafka_storage_connector(project_id, featurestore_id)
get "#{ENV['HOPSWORKS_API']}/project/#{project_id}/featurestores/#{featurestore_id}/storageconnectors/kafka_connector"
get "#{ENV['HOPSWORKS_API']}/project/#{project_id}/featurestores/#{featurestore_id}/storageconnectors/kafka_connector/byok"
end

def create_hopsfs_connector(project_id, featurestore_id, datasetName: "Resources")
Expand Down
86 changes: 78 additions & 8 deletions hopsworks-IT/src/test/ruby/spec/storage_connector_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -937,32 +937,35 @@
describe "Operations for online kafka connector" do
context 'with valid project, featurestore service enabled' do
before :all do
@online_kafka_storage_connector_name = getVar('online_kafka_storage_connector_name')
@enable_bring_your_own_kafka = getVar('enable_bring_your_own_kafka')
@enable_kafka_storage_connectors = getVar('enable_kafka_storage_connectors')
setVar('enable_kafka_storage_connectors', "true")

@connector_name = "kafka_connector"

@cleanup = true
@debugOpt = false
with_valid_project
end

after :all do
setVar('online_kafka_storage_connector_name', @online_kafka_storage_connector_name[:value])
setVar('enable_bring_your_own_kafka', @enable_bring_your_own_kafka[:value])
setVar('enable_kafka_storage_connectors', @enable_kafka_storage_connectors[:value])
end

it "should get a online kafka storage connector with default values if it didn't exist" do
project = get_project
featurestore_id = get_featurestore_id(project.id)
connector_name = "kafka_connector"

setVar('enable_bring_your_own_kafka', "true")
create_session(project[:username], "Pass123")

json_result = get_online_kafka_storage_connector(project.id, featurestore_id)
parsed_json = JSON.parse(json_result)
expect_status_details(200)
expect(parsed_json.key?("id")).to be true
expect(parsed_json["featurestoreId"]).to eql(featurestore_id)
expect(parsed_json["name"]).to eql(connector_name)
expect(parsed_json["name"]).to eql(@connector_name)
expect(parsed_json["storageConnectorType"]).to eql("KAFKA")
expect(parsed_json["bootstrapServers"]).to eql("10.0.2.15:9091")
expect(parsed_json["securityProtocol"]).to eql("SSL")
Expand All @@ -978,7 +981,9 @@
it "should get already created online kafka storage connector" do
project = get_project
featurestore_id = get_featurestore_id(project.id)
connector_name = "kafka_connector"

setVar('enable_bring_your_own_kafka', "true")
create_session(project[:username], "Pass123")

additional_data = {
bootstrapServers: "localhost:9091",
Expand All @@ -987,16 +992,15 @@
options: [{name: "option1", value: "value1"}]
}

create_kafka_connector(project.id, featurestore_id, additional_data, connector_name)
create_kafka_connector(project.id, featurestore_id, additional_data, @connector_name)
expect_status_details(201)

create_session(project[:username], "Pass123")
json_result = get_online_kafka_storage_connector(project.id, featurestore_id)
parsed_json = JSON.parse(json_result)
expect_status_details(200)
expect(parsed_json.key?("id")).to be true
expect(parsed_json["featurestoreId"]).to eql(featurestore_id)
expect(parsed_json["name"]).to eql(connector_name)
expect(parsed_json["name"]).to eql(@connector_name)
expect(parsed_json["storageConnectorType"]).to eql("KAFKA")
expect(parsed_json["bootstrapServers"]).to eql("localhost:9091")
expect(parsed_json["securityProtocol"]).to eql("SASL_SSL")
Expand All @@ -1005,6 +1009,72 @@
expect(parsed_json["options"][0]["name"]).to eql("option1")
expect(parsed_json["options"][0]["value"]).to eql("value1")
expect(parsed_json["externalKafka"]).to be true

delete "#{ENV['HOPSWORKS_API']}/project/#{project.id}/featurestores/#{featurestore_id}/storageconnectors/#{@connector_name}"
expect_status_details(200)
end

it "should get a online kafka storage connector with default values if it didn't exist and byok is not enabled" do
project = get_project
featurestore_id = get_featurestore_id(project.id)

setVar('enable_bring_your_own_kafka', "false")
create_session(project[:username], "Pass123")

json_result = get_online_kafka_storage_connector(project.id, featurestore_id)
parsed_json = JSON.parse(json_result)
expect_status_details(200)
expect(parsed_json.key?("id")).to be true
expect(parsed_json["featurestoreId"]).to eql(featurestore_id)
expect(parsed_json["name"]).to eql(@connector_name)
expect(parsed_json["storageConnectorType"]).to eql("KAFKA")
expect(parsed_json["bootstrapServers"]).to eql("10.0.2.15:9091")
expect(parsed_json["securityProtocol"]).to eql("SSL")
expect(parsed_json.key?("sslTruststoreLocation")).to be false
expect(parsed_json.key?("sslTruststorePassword")).to be false
expect(parsed_json.key?("sslKeystoreLocation")).to be false
expect(parsed_json.key?("sslKeystorePassword")).to be false
expect(parsed_json.key?("sslKeyPassword")).to be false
expect(parsed_json["sslEndpointIdentificationAlgorithm"]).to eql("")
expect(parsed_json["externalKafka"]).to be false
end

it "should get a online kafka storage connector with default values if it exists and byok is not enabled" do
project = get_project
featurestore_id = get_featurestore_id(project.id)

setVar('enable_bring_your_own_kafka', "false")
create_session(project[:username], "Pass123")

additional_data = {
bootstrapServers: "localhost:9091",
securityProtocol: "SASL_SSL",
sslEndpointIdentificationAlgorithm: "",
options: [{name: "option1", value: "value1"}]
}

create_kafka_connector(project.id, featurestore_id, additional_data, @connector_name)
expect_status_details(201)

json_result = get_online_kafka_storage_connector(project.id, featurestore_id)
parsed_json = JSON.parse(json_result)
expect_status_details(200)
expect(parsed_json.key?("id")).to be true
expect(parsed_json["featurestoreId"]).to eql(featurestore_id)
expect(parsed_json["name"]).to eql(@connector_name)
expect(parsed_json["storageConnectorType"]).to eql("KAFKA")
expect(parsed_json["bootstrapServers"]).to eql("10.0.2.15:9091")
expect(parsed_json["securityProtocol"]).to eql("SSL")
expect(parsed_json.key?("sslTruststoreLocation")).to be false
expect(parsed_json.key?("sslTruststorePassword")).to be false
expect(parsed_json.key?("sslKeystoreLocation")).to be false
expect(parsed_json.key?("sslKeystorePassword")).to be false
expect(parsed_json.key?("sslKeyPassword")).to be false
expect(parsed_json["sslEndpointIdentificationAlgorithm"]).to eql("")
expect(parsed_json["externalKafka"]).to be false

delete "#{ENV['HOPSWORKS_API']}/project/#{project.id}/featurestores/#{featurestore_id}/storageconnectors/#{@connector_name}"
expect_status_details(200)
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ public Response getOnlineFeaturestoreStorageConnector(@Context SecurityContext s
* @throws FeaturestoreException
*/
@GET
@Path("/kafka_connector")
@Path("/kafka_connector/byok")
@Produces(MediaType.APPLICATION_JSON)
@AllowedProjectRoles({AllowedProjectRoles.DATA_OWNER, AllowedProjectRoles.DATA_SCIENTIST})
@JWTRequired(acceptedTokens = {Audience.API, Audience.JOB},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import io.hops.hopsworks.common.featurestore.storageconnectors.snowflake.FeaturestoreSnowflakeConnectorDTO;
import io.hops.hopsworks.common.featurestore.utils.FeaturestoreUtils;
import io.hops.hopsworks.common.kafka.KafkaBrokers;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.exceptions.FeaturestoreException;
import io.hops.hopsworks.exceptions.ProjectException;
import io.hops.hopsworks.exceptions.UserException;
Expand Down Expand Up @@ -102,6 +103,8 @@ public class FeaturestoreStorageConnectorController {
private KafkaBrokers kafkaBrokers;
@EJB
private FeaturestoreController featurestoreController;
@EJB
private Settings settings;

private static final String KAFKA_STORAGE_CONNECTOR_NAME = "kafka_connector";

Expand Down Expand Up @@ -165,7 +168,7 @@ public FeatureStoreKafkaConnectorDTO getKafkaConnector(Featurestore featureStore
featurestoreConnectorFacade.findByFeaturestoreName(featureStore, KAFKA_STORAGE_CONNECTOR_NAME);

FeatureStoreKafkaConnectorDTO kafkaConnectorDTO;
if (featurestoreConnector.isPresent()) {
if (featurestoreConnector.isPresent() && settings.isBringYourOwnKafkaEnabled()) {
// connector found
FeaturestoreConnector connector = featurestoreConnector.get();
if (!connector.getConnectorType().equals(FeaturestoreConnectorType.KAFKA)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,10 @@ public class Settings implements Serializable {
private final static String VARIABLE_ENABLE_BIGQUERY_STORAGE_CONNECTORS = "enable_bigquery_storage_connectors";
private final static String VARIABLE_CONNECTOR_IMAGE_VERSION = "testconnector_image_version";

// BYOK

private final static String VARIABLE_ENABLE_BRING_YOUR_OWN_KAFKA = "enable_bring_your_own_kafka";

//OpenSearch Security
private static final String VARIABLE_OPENSEARCH_SECURITY_ENABLED = "elastic_opendistro_security_enabled";
private static final String VARIABLE_OPENSEARCH_HTTPS_ENABLED = "elastic_https_enabled";
Expand Down Expand Up @@ -842,6 +846,9 @@ private void populateCache() {
ENABLE_GCS_STORAGE_CONNECTORS);
ENABLE_BIGQUERY_STORAGE_CONNECTORS = setBoolVar(VARIABLE_ENABLE_BIGQUERY_STORAGE_CONNECTORS,
ENABLE_BIGQUERY_STORAGE_CONNECTORS);

ENABLE_BRING_YOUR_OWN_KAFKA = setBoolVar(VARIABLE_ENABLE_BRING_YOUR_OWN_KAFKA,
ENABLE_BRING_YOUR_OWN_KAFKA);

TESTCONNECTOR_IMAGE_VERSION = setStrVar(VARIABLE_CONNECTOR_IMAGE_VERSION, "0.1");
YARN_RUNTIME = setStrVar(VARIABLE_YARN_RUNTIME, YARN_RUNTIME);
Expand Down Expand Up @@ -3350,6 +3357,13 @@ public synchronized boolean isBigqueryStorageConnectorsEnabled() {
checkCache();
return ENABLE_BIGQUERY_STORAGE_CONNECTORS;
}

private boolean ENABLE_BRING_YOUR_OWN_KAFKA = false;
public synchronized boolean isBringYourOwnKafkaEnabled() {
checkCache();
return ENABLE_BRING_YOUR_OWN_KAFKA;
}

// test connectors docker
private String TESTCONNECTOR_IMAGE_VERSION = "0.1";

Expand Down

0 comments on commit b5ad739

Please sign in to comment.