From f3791ea84e9a5b495c5c27d58f66138fa0117dc3 Mon Sep 17 00:00:00 2001 From: kennethmhc Date: Fri, 2 Feb 2024 11:43:23 +0100 Subject: [PATCH] [HWORKS-919] Create database on the online feature store on-demand (#1458) --- .../src/test/ruby/spec/featurestore_spec.rb | 125 ++++++++++++++- .../test/ruby/spec/storage_connector_spec.rb | 147 ++++++++++-------- .../FeaturestoreStorageConnectorService.java | 7 +- .../user/security/secrets/SecretsFacade.java | 1 + .../featurestore/FeaturestoreController.java | 27 ---- .../online/OnlineFeaturegroupController.java | 9 ++ .../online/OnlineFeaturestoreController.java | 117 ++++++++++---- .../online/OnlineFeaturestoreFacade.java | 32 ++-- ...eaturestoreStorageConnectorController.java | 2 + .../common/project/ProjectController.java | 16 +- .../security/secrets/SecretsController.java | 21 ++- .../TestOnlineFeatureGroupController.java | 2 + .../persistence/entity/project/Project.java | 16 +- .../hops/hopsworks/restutils/RESTCodes.java | 4 +- 14 files changed, 369 insertions(+), 157 deletions(-) diff --git a/hopsworks-IT/src/test/ruby/spec/featurestore_spec.rb b/hopsworks-IT/src/test/ruby/spec/featurestore_spec.rb index abd0dfa0f9..584ef12c8a 100644 --- a/hopsworks-IT/src/test/ruby/spec/featurestore_spec.rb +++ b/hopsworks-IT/src/test/ruby/spec/featurestore_spec.rb @@ -112,16 +112,22 @@ end end end - - describe "ensure currect tables are created in online feature store" do + + describe "ensure correct tables are created in online feature store" do context 'with valid project and online feature store enabled' do before :all do if getVar("featurestore_online_enabled") == false skip "Online Feature Store not enabled, skip online featurestore tests" end with_valid_project + # setup online feature store by creating a online feature group (see HWORKS-919) + project = get_project + featurestore_id = get_featurestore_id(project.id) + json_result, featuregroup_name = create_cached_featuregroup(project.id, featurestore_id, online:true) + parsed_json = JSON.parse(json_result) + expect_status_details(201) end - + it "should make sure that kafka_offsets table is created in featurestores" do project = get_project tables = Tables.where(TABLE_SCHEMA:project[:projectname], TABLE_NAME:"kafka_offsets") @@ -137,6 +143,12 @@ skip "Online Feature Store not enabled, skip online featurestore tests" end with_valid_project + # setup online feature store by creating a online feature group (see HWORKS-919) + project = get_project + featurestore_id = get_featurestore_id(project.id) + json_result, featuregroup_name = create_cached_featuregroup(project.id, featurestore_id, online:true) + parsed_json = JSON.parse(json_result) + expect_status_details(201) end it "should grant all privileges to the project owner" do @@ -211,6 +223,11 @@ update_project({projectId: no_fs_project[:id], projectName: no_fs_project[:projectname], services: ["FEATURESTORE"]}) + # setup online feature store by creating a online feature group (see HWORKS-919) + featurestore_id = get_featurestore_id(no_fs_project.id) + json_result, featuregroup_name = create_cached_featuregroup(no_fs_project.id, featurestore_id, online:true) + parsed_json = JSON.parse(json_result) + expect_status_details(201) # Project owner should have the correct permissions on the online feature store # online fs username are capped to 30 chars @@ -234,6 +251,16 @@ it "should not remove users from other projects - see HOPSWORKS-2856" do demo_project = create_project(projectName = "demo") demo_demo_project = create_project(projectName = "demo_demo") + # setup online feature store by creating a online feature group (see HWORKS-919) + featurestore_id = get_featurestore_id(demo_project.id) + json_result, featuregroup_name = create_cached_featuregroup(demo_project.id, featurestore_id, online:true) + parsed_json = JSON.parse(json_result) + expect_status_details(201) + # setup online feature store by creating a online feature group (see HWORKS-919) + featurestore_id = get_featurestore_id(demo_demo_project.id) + json_result, featuregroup_name = create_cached_featuregroup(demo_demo_project.id, featurestore_id, online:true) + parsed_json = JSON.parse(json_result) + expect_status_details(201) delete_project(demo_project) @@ -252,7 +279,18 @@ skip "Online Feature Store not enabled, skip online featurestore tests" end with_valid_project + # setup online feature store by creating a online feature group (see HWORKS-919) + project = get_project + featurestore_id = get_featurestore_id(project.id) + json_result, featuregroup_name = create_cached_featuregroup(project.id, featurestore_id, online:true) + parsed_json = JSON.parse(json_result) + expect_status_details(201) @shared_project = create_project + # setup online feature store by creating a online feature group (see HWORKS-919) + featurestore_id = get_featurestore_id(@shared_project.id) + json_result, featuregroup_name = create_cached_featuregroup(@shared_project.id, featurestore_id, online:true) + parsed_json = JSON.parse(json_result) + expect_status_details(201) @shared_user_do = create_user @shared_user_ds = create_user add_member_to_project(@shared_project, @shared_user_do[:email], 'Data owner') @@ -320,5 +358,86 @@ end end end + + describe "check permission when shared online feature store has not been initialised" do + before :all do + if getVar("featurestore_online_enabled") == false + skip "Online Feature Store not enabled, skip online featurestore tests" + end + @project_a = create_project + @user_a_do = create_user + @user_a_ds = create_user + add_member_to_project(@project_a, @user_a_do[:email], 'Data owner') + add_member_to_project(@project_a, @user_a_ds[:email], 'Data scientist') + # setup online feature store by creating a online feature group (see HWORKS-919) + # before adding member + @project_b = create_project + featurestore_id = get_featurestore_id(@project_b.id) + json_result, featuregroup_name = create_cached_featuregroup(@project_b.id, featurestore_id, online:true) + parsed_json = JSON.parse(json_result) + @user_b_do = create_user + @user_b_ds = create_user + add_member_to_project(@project_b, @user_b_do[:email], 'Data owner') + add_member_to_project(@project_b, @user_b_ds[:email], 'Data scientist') + @project_c = create_project + @user_c_do = create_user + @user_c_ds = create_user + add_member_to_project(@project_c, @user_c_do[:email], 'Data owner') + add_member_to_project(@project_c, @user_c_ds[:email], 'Data scientist') + # setup online feature store by creating a online feature group (see HWORKS-919) + # after adding member + featurestore_id = get_featurestore_id(@project_c.id) + json_result, featuregroup_name = create_cached_featuregroup(@project_c.id, featurestore_id, online:true) + parsed_json = JSON.parse(json_result) + end + + it "Project C should be able to share with project A (has not been initialised)" do + featurestore = "#{@project_c['projectname'].downcase}_featurestore.db" + # Project C should be able to share with project A without error, + # but privileges will not be added to information_schema.SCHEMA_PRIVILEGES yet. + share_dataset(@project_c, featurestore, @project_a['projectname'], datasetType: "&type=FEATURESTORE") + accept_dataset(@project_a, "#{@project_c['projectname']}::#{featurestore}", + datasetType: "&type=FEATURESTORE") + end + + it "Project A (has not been initialised) should be able to share with project B (A )" do + featurestore = "#{@project_a['projectname'].downcase}_featurestore.db" + # Project A should be able to share with project B without error + # but privileges will not be added to information_schema.SCHEMA_PRIVILEGES yet. + share_dataset(@project_a, featurestore, @project_b['projectname'], datasetType: "&type=FEATURESTORE") + accept_dataset(@project_b, "#{@project_a['projectname']}::#{featurestore}", + datasetType: "&type=FEATURESTORE") + end + + it "After project A has initialised online feature store, privileges should be set properly" do + # Project B should have access to project A, and project A should have access to project C + # setup online feature store in project A by creating a online feature group + featurestore_id = get_featurestore_id(@project_a.id) + json_result, featuregroup_name = create_cached_featuregroup(@project_a.id, featurestore_id, online:true) + parsed_json = JSON.parse(json_result) + + # make sure project B get access to project A + online_db_name_ds = "#{@project_b[:projectname]}_#{@user_b_ds[:username]}"[0..30] + grantee_ds = "'#{online_db_name_ds}'@'%'" + online_db_name_do = "#{@project_b[:projectname]}_#{@user_b_do[:username]}"[0..30] + grantee_do = "'#{online_db_name_do}'@'%'" + privileges_ds = SchemaPrivileges.where(TABLE_SCHEMA:@project_a[:projectname], GRANTEE:grantee_ds) + privileges_do = SchemaPrivileges.where(TABLE_SCHEMA:@project_a[:projectname], GRANTEE:grantee_do) + expect(privileges_ds.length).to eq(1) + expect(privileges_do.length).to eq(1) + + # make sure project A cannot access to project C + online_db_name_ds = "#{@project_a[:projectname]}_#{@user_a_ds[:username]}"[0..30] + grantee_ds = "'#{online_db_name_ds}'@'%'" + online_db_name_do = "#{@project_a[:projectname]}_#{@user_a_do[:username]}"[0..30] + grantee_do = "'#{online_db_name_do}'@'%'" + + privileges_ds = SchemaPrivileges.where(TABLE_SCHEMA:@project_c[:projectname], GRANTEE:grantee_ds) + privileges_do = SchemaPrivileges.where(TABLE_SCHEMA:@project_c[:projectname], GRANTEE:grantee_do) + + expect(privileges_ds.length).to eq(1) + expect(privileges_do.length).to eq(1) + end + end end end \ No newline at end of file diff --git a/hopsworks-IT/src/test/ruby/spec/storage_connector_spec.rb b/hopsworks-IT/src/test/ruby/spec/storage_connector_spec.rb index 695d441802..0d14edf877 100644 --- a/hopsworks-IT/src/test/ruby/spec/storage_connector_spec.rb +++ b/hopsworks-IT/src/test/ruby/spec/storage_connector_spec.rb @@ -18,9 +18,89 @@ describe "Create, delete and update operations on storage connectors in a specific featurestore" do - describe 'with valid project, featurestore service enabled' do + describe "online feature store storage connector" do + context "with valid project, featurestore service enabled, and with storage connector flags enabled" do + before :all do + with_valid_project + # setup online feature store by creating a online feature group (see HWORKS-919) + project = get_project + featurestore_id = get_featurestore_id(project.id) + json_result, featuregroup_name = create_cached_featuregroup(project.id, featurestore_id, online:true) + parsed_json = JSON.parse(json_result) + expect_status_details(201) + end + + after :each do + create_session(@project[:username], "Pass123") + end + + it "online storage connector connection string should contain the IP of the mysql" do + # Storage connector looks like this: [project name]_[username]_onlinefeaturestore + connector_name = "#{@project['projectname']}_#{@user['username']}_onlinefeaturestore" + featurestore_id = get_featurestore_id(@project['id']) + connector_json = get_storage_connector(@project['id'], featurestore_id, connector_name) + connector = JSON.parse(connector_json) + + expect(connector['connectionString']).to match(/jdbc:mysql:\/\/\d{1,3}\.\d{1,3}\.\d{1,3}.\d{1,3}/) + end + + it "online storage connector connection string should be stored with consul name in the database" do + # Storage connector looks like this: [project name]_[username]_onlinefeaturestore + connector_name = "#{@project['projectname']}_#{@user['username']}_onlinefeaturestore" + connector_db = FeatureStoreConnector.find_by(name: connector_name) + jdbc_connector_db = FeatureStoreJDBCConnector.find_by(id: connector_db.jdbc_id) + + expect(jdbc_connector_db[:connection_string]).to start_with("jdbc:mysql://onlinefs.mysql.service.consul") + end + + it "online storage connector should contain the driver class and isolationLevel" do + connector_name = "#{@project['projectname']}_#{@user['username']}_onlinefeaturestore" + featurestore_id = get_featurestore_id(@project['id']) + connector_json = get_storage_connector(@project['id'], featurestore_id, connector_name) + connector = JSON.parse(connector_json) + + arguments_hash = connector['arguments'] + puts arguments_hash + expect(arguments_hash.find{ |item| item['name'] == 'driver' }['value']).to eql("com.mysql.cj.jdbc.Driver") + expect(arguments_hash.find{ |item| item['name'] == 'isolationLevel' }['value']).to eql("NONE") + end + + it "should get online storage connector from base project when accessing a shared feature store" do + project = get_project + base_featurestore_id = get_featurestore_id(project.id) + reset_session - context "with storage connector flags enabled" do + #create another project + projectname = "project_#{short_random_id}" + shared_fs_project = create_project_by_name(projectname) + shared_fs_id = get_featurestore_id(shared_fs_project.id) + # login with user for project and share dataset + create_session(shared_fs_project[:username], "Pass123") + featurestore = "#{shared_fs_project[:projectname].downcase}_featurestore.db" + share_dataset(shared_fs_project, featurestore, project[:projectname], datasetType: "&type=FEATURESTORE") + reset_session + #login with user for shared_fs_project and accept dataset + create_session(project[:username],"Pass123") + accept_dataset(project, "#{shared_fs_project[:projectname]}::#{featurestore}", datasetType: "&type=FEATURESTORE") + + base_project_connector = "#{project['projectname']}_#{@user['username']}_onlinefeaturestore" + connector_name = "onlinefeaturestore" + + #connector from shared fs should use base project connector + connector_json = get_storage_connector(project['id'], shared_fs_id, connector_name) + connector = JSON.parse(connector_json) + expect(connector["name"]).to eql(base_project_connector) + + #connector from base should use base project connector + connector_json = get_storage_connector(project['id'], base_featurestore_id, connector_name) + connector = JSON.parse(connector_json) + expect(connector["name"]).to eql(base_project_connector) + end + end + end + + describe "other storage connectors" do + context "with valid project, featurestore service enabled, and with storage connector flags enabled" do before :all do with_valid_project end @@ -454,69 +534,6 @@ expect(parsed_json2["storageConnectorType"] == "JDBC").to be true expect(parsed_json2["connectionString"] == "jdbc://test3").to be true end - - it "online storage connector connection string should contain the IP of the mysql" do - # Storage connector looks like this: [project name]_[username]_onlinefeaturestore - connector_name = "#{@project['projectname']}_#{@user['username']}_onlinefeaturestore" - featurestore_id = get_featurestore_id(@project['id']) - connector_json = get_storage_connector(@project['id'], featurestore_id, connector_name) - connector = JSON.parse(connector_json) - - expect(connector['connectionString']).to match(/jdbc:mysql:\/\/\d{1,3}\.\d{1,3}\.\d{1,3}.\d{1,3}/) - end - - it "online storage connector connection string should be stored with consul name in the database" do - # Storage connector looks like this: [project name]_[username]_onlinefeaturestore - connector_name = "#{@project['projectname']}_#{@user['username']}_onlinefeaturestore" - connector_db = FeatureStoreConnector.find_by(name: connector_name) - jdbc_connector_db = FeatureStoreJDBCConnector.find_by(id: connector_db.jdbc_id) - - expect(jdbc_connector_db[:connection_string]).to start_with("jdbc:mysql://onlinefs.mysql.service.consul") - end - - it "online storage connector should contain the driver class and isolationLevel" do - connector_name = "#{@project['projectname']}_#{@user['username']}_onlinefeaturestore" - featurestore_id = get_featurestore_id(@project['id']) - connector_json = get_storage_connector(@project['id'], featurestore_id, connector_name) - connector = JSON.parse(connector_json) - - arguments_hash = connector['arguments'] - puts arguments_hash - expect(arguments_hash.find{ |item| item['name'] == 'driver' }['value']).to eql("com.mysql.cj.jdbc.Driver") - expect(arguments_hash.find{ |item| item['name'] == 'isolationLevel' }['value']).to eql("NONE") - end - - it "should get online storage connector from base project when accessing a shared feature store" do - project = get_project - base_featurestore_id = get_featurestore_id(project.id) - reset_session - - #create another project - projectname = "project_#{short_random_id}" - shared_fs_project = create_project_by_name(projectname) - shared_fs_id = get_featurestore_id(shared_fs_project.id) - # login with user for project and share dataset - create_session(shared_fs_project[:username], "Pass123") - featurestore = "#{shared_fs_project[:projectname].downcase}_featurestore.db" - share_dataset(shared_fs_project, featurestore, project[:projectname], datasetType: "&type=FEATURESTORE") - reset_session - #login with user for shared_fs_project and accept dataset - create_session(project[:username],"Pass123") - accept_dataset(project, "#{shared_fs_project[:projectname]}::#{featurestore}", datasetType: "&type=FEATURESTORE") - - base_project_connector = "#{project['projectname']}_#{@user['username']}_onlinefeaturestore" - connector_name = "onlinefeaturestore" - - #connector from shared fs should use base project connector - connector_json = get_storage_connector(project['id'], shared_fs_id, connector_name) - connector = JSON.parse(connector_json) - expect(connector["name"]).to eql(base_project_connector) - - #connector from base should use base project connector - connector_json = get_storage_connector(project['id'], base_featurestore_id, connector_name) - connector = JSON.parse(connector_json) - expect(connector["name"]).to eql(base_project_connector) - end end context "with storage connector flags disabled" do diff --git a/hopsworks-api/src/main/java/io/hops/hopsworks/api/featurestore/storageconnector/FeaturestoreStorageConnectorService.java b/hopsworks-api/src/main/java/io/hops/hopsworks/api/featurestore/storageconnector/FeaturestoreStorageConnectorService.java index 4b5594efc3..75b6fa965c 100644 --- a/hopsworks-api/src/main/java/io/hops/hopsworks/api/featurestore/storageconnector/FeaturestoreStorageConnectorService.java +++ b/hopsworks-api/src/main/java/io/hops/hopsworks/api/featurestore/storageconnector/FeaturestoreStorageConnectorService.java @@ -283,12 +283,7 @@ public Response getOnlineFeaturestoreStorageConnector(@Context SecurityContext s throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.FEATURESTORE_ONLINE_NOT_ENABLED, Level.FINE, "Online Featurestore is not enabled for this Hopsworks cluster."); } - if (!onlineFeaturestoreController.checkIfDatabaseExists( - onlineFeaturestoreController.getOnlineFeaturestoreDbName(featurestore.getProject()))) { - throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.FEATURESTORE_ONLINE_NOT_ENABLED, - Level.FINE, "Online Featurestore is not enabled for this project. To enable online feature store," + - " talk to an administrator."); - } + // continue even if featureStoreDb does not exist, see HWORKS-919 Users user = jWTHelper.getUserPrincipal(sc); FeaturestoreStorageConnectorDTO featurestoreJdbcConnectorDTO = storageConnectorController.getOnlineFeaturestoreConnector(user, project); diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/dao/user/security/secrets/SecretsFacade.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/dao/user/security/secrets/SecretsFacade.java index 7cd81c83fa..268d29058d 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/dao/user/security/secrets/SecretsFacade.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/dao/user/security/secrets/SecretsFacade.java @@ -42,6 +42,7 @@ public Secret findById(SecretId id) { public void persist(Secret secret) { entityManager.persist(secret); + entityManager.flush(); } public void update(Secret secret) { diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/FeaturestoreController.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/FeaturestoreController.java index 86575e788b..d4dfdf1d1d 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/FeaturestoreController.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/FeaturestoreController.java @@ -46,7 +46,6 @@ import io.hops.hopsworks.persistence.entity.featurestore.Featurestore; import io.hops.hopsworks.persistence.entity.featurestore.storageconnector.FeaturestoreConnectorType; import io.hops.hopsworks.persistence.entity.project.Project; -import io.hops.hopsworks.persistence.entity.project.team.ProjectTeam; import io.hops.hopsworks.persistence.entity.user.Users; import io.hops.hopsworks.persistence.entity.user.activity.ActivityFlag; import io.hops.hopsworks.restutils.RESTCodes; @@ -58,8 +57,6 @@ import javax.ejb.Stateless; import javax.ejb.TransactionAttribute; import javax.ejb.TransactionAttributeType; -import java.sql.Connection; -import java.sql.SQLException; import java.util.Collection; import java.util.Date; import java.util.List; @@ -276,8 +273,6 @@ public Featurestore createProjectFeatureStore(Project project, Users user, Strin activityFacade.persistActivity(ActivityFacade.ADDED_FEATURESTORE_STORAGE_CONNECTOR + project.getName(), project, project.getOwner(), ActivityFlag.SERVICE); - createOnlineFeatureStore(project, user, featurestore); - return featurestore; } @@ -307,28 +302,6 @@ public void createStorageConnectorResourceDirectory(Project project, Users user) } } - private void createOnlineFeatureStore(Project project, Users user, Featurestore featurestore) - throws FeaturestoreException { - if (!settings.isOnlineFeaturestore()) { - return; - } - - try (Connection connection = onlineFeaturestoreFacade.establishAdminConnection()) { - onlineFeaturestoreController.setupOnlineFeaturestore(user, featurestore, connection); - // Create online feature store users for existing team members - for (ProjectTeam projectTeam : projectTeamFacade.findMembersByProject(project)) { - if (!projectTeam.getUser().equals(user)) { - onlineFeaturestoreController.createDatabaseUser(projectTeam.getUser(), - featurestore, projectTeam.getTeamRole(), connection); - } - } - } catch(SQLException e) { - throw new FeaturestoreException( - RESTCodes.FeaturestoreErrorCode.COULD_NOT_INITIATE_MYSQL_CONNECTION_TO_ONLINE_FEATURESTORE, - Level.SEVERE, e.getMessage(), e.getMessage(), e); - } - } - public FeaturestoreStorageConnectorDTO hopsfsTrainingDatasetConnector(Dataset hopsfsDataset) { String name = hopsfsDataset.getName(); String description = "HOPSFS backend for storing Training Datasets of the Hopsworks Feature Store"; diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/online/OnlineFeaturegroupController.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/online/OnlineFeaturegroupController.java index c78d3dc39c..409ca680e5 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/online/OnlineFeaturegroupController.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/online/OnlineFeaturegroupController.java @@ -142,6 +142,9 @@ public void setupOnlineFeatureGroup(Featurestore featureStore, Featuregroup feat List features, Project project, Users user) throws KafkaException, SchemaException, ProjectException, FeaturestoreException, IOException, HopsSecurityException, ServiceException { + // online feature store is created only when creating the first feature group. + createOnlineFeatureStore(project, featureStore, user); + // check if onlinefs user is part of project checkOnlineFsUserExist(project); @@ -153,6 +156,12 @@ public void setupOnlineFeatureGroup(Featurestore featureStore, Featuregroup feat } } + void createOnlineFeatureStore(Project project, Featurestore featurestore, Users user) + throws FeaturestoreException { + // Create online feature store users for existing team members + onlineFeaturestoreController.setupOnlineFeaturestore(project, featurestore, user); + } + void checkOnlineFsUserExist(Project project) throws ServiceException, HopsSecurityException, IOException, ProjectException { if (project.getProjectTeamCollection().stream().noneMatch(pt -> diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/online/OnlineFeaturestoreController.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/online/OnlineFeaturestoreController.java index c9186b59db..1af6c9c431 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/online/OnlineFeaturestoreController.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/online/OnlineFeaturestoreController.java @@ -15,17 +15,23 @@ */ package io.hops.hopsworks.common.featurestore.online; +import com.google.common.base.Strings; +import io.hops.hopsworks.common.dao.project.team.ProjectTeamFacade; import io.hops.hopsworks.common.dao.user.security.secrets.SecretsFacade; import io.hops.hopsworks.common.featurestore.FeaturestoreConstants; import io.hops.hopsworks.common.featurestore.OptionDTO; import io.hops.hopsworks.common.featurestore.storageconnectors.FeaturestoreConnectorFacade; import io.hops.hopsworks.common.featurestore.storageconnectors.StorageConnectorUtil; +import io.hops.hopsworks.common.project.ProjectController; import io.hops.hopsworks.common.security.secrets.SecretsController; import io.hops.hopsworks.common.util.ProjectUtils; import io.hops.hopsworks.common.util.Settings; import io.hops.hopsworks.exceptions.FeaturestoreException; +import io.hops.hopsworks.exceptions.ProjectException; import io.hops.hopsworks.exceptions.UserException; import io.hops.hopsworks.persistence.entity.dataset.DatasetAccessPermission; +import io.hops.hopsworks.persistence.entity.dataset.DatasetSharedWith; +import io.hops.hopsworks.persistence.entity.dataset.DatasetType; import io.hops.hopsworks.persistence.entity.featurestore.Featurestore; import io.hops.hopsworks.persistence.entity.featurestore.storageconnector.FeaturestoreConnector; import io.hops.hopsworks.persistence.entity.featurestore.storageconnector.FeaturestoreConnectorType; @@ -49,6 +55,7 @@ import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; +import java.util.stream.Collectors; import static io.hops.hopsworks.common.featurestore.online.OnlineFeaturestoreFacade.MYSQL_DRIVER; @@ -78,28 +85,66 @@ public class OnlineFeaturestoreController { private StorageConnectorUtil storageConnectorUtil; @EJB private ProjectUtils projectUtils; + @EJB + private ProjectController projectController; + @EJB + private ProjectTeamFacade projectTeamFacade; /** * Sets up the online feature store database for a new project and creating a database-user for the project-owner * - * @param user the project owner + * @param project project * @param featurestore the featurestore metadata entity * @throws FeaturestoreException */ - public void setupOnlineFeaturestore(Users user, Featurestore featurestore, Connection connection) + public void setupOnlineFeaturestore(Project project, Featurestore featurestore, Users user) throws FeaturestoreException { + + try { + if (projectController.findProjectById(project.getId()).getOnlineFeatureStoreAvailable()) { + return; + } + } catch (ProjectException e) { + // Should not happen but skip setup if project is not found. + if (e.getErrorCode() == RESTCodes.ProjectErrorCode.PROJECT_NOT_FOUND) { + return; + } + } + if (!settings.isOnlineFeaturestore()) { throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.FEATURESTORE_ONLINE_NOT_ENABLED, - Level.FINE, "Online feature store service is not enabled for this Hopsworks instance"); + Level.FINE, "Online feature store service is not enabled for this Hopsworks instance"); } - String db = getOnlineFeaturestoreDbName(featurestore.getProject()); - // Create dataset - onlineFeaturestoreFacade.createOnlineFeaturestoreDatabase(db, connection); - // Create kafka offset table - onlineFeaturestoreFacade.createOnlineFeaturestoreKafkaOffsetTable(db, connection); - // Create project owner database user - createDatabaseUser(user, featurestore, ProjectRoleTypes.DATA_OWNER.getRole(), connection); + try (Connection connection = onlineFeaturestoreFacade.establishAdminConnection()) { + String db = getOnlineFeaturestoreDbName(featurestore.getProject()); + // Create kafka offset table + onlineFeaturestoreFacade.createOnlineFeaturestoreDatabaseIfNotExist(db, connection); + // Create kafka offset table + onlineFeaturestoreFacade.createOnlineFeaturestoreKafkaOffsetTable(db, connection); + + // Create online feature store users for existing team members + for (ProjectTeam projectTeam : projectTeamFacade.findMembersByProject(project)) { + createDatabaseUser(projectTeam.getUser(), featurestore, projectTeam.getTeamRole(), connection); + } + } catch(SQLException e) { + throw new FeaturestoreException( + RESTCodes.FeaturestoreErrorCode.COULD_NOT_INITIATE_MYSQL_CONNECTION_TO_ONLINE_FEATURESTORE, + Level.SEVERE, e.getMessage(), e.getMessage(), e); + } + + projectController.setOnlineFeatureStoreAvailable(project); + } + + private void shareOnlineFeatureStore(Project project, Users user, String role, Connection connection) + throws FeaturestoreException { + List sharedFeatureStores = project.getDatasetSharedWithCollection().stream() + .filter(ds -> ds.getAccepted() && ds.getDataset().getDsType() == DatasetType.FEATURESTORE) + .collect(Collectors.toList()); + for (DatasetSharedWith shared : sharedFeatureStores) { + String featureStoreDb = getOnlineFeaturestoreDbName(shared.getDataset().getProject()); + shareOnlineFeatureStoreUser(project, user, role, featureStoreDb, shared.getPermission(), connection); + } } /** @@ -119,11 +164,13 @@ public void createDatabaseUser(Users user, Featurestore featurestore, String pro String dbUser = onlineDbUsername(featurestore.getProject(), user); //Generate random pw - String onlineFsPw = createOnlineFeaturestoreUserSecret(dbUser, user, featurestore.getProject()); + String onlineFsPw = getOrCreateUserSecret(dbUser, user, featurestore.getProject()); //database is the same as the project name onlineFeaturestoreFacade.createOnlineFeaturestoreUser(dbUser, onlineFsPw, connection); updateUserOnlineFeatureStoreDB(user, featurestore, projectRole, connection); + + shareOnlineFeatureStore(featurestore.getProject(), user, projectRole, connection); } /** @@ -135,17 +182,37 @@ public void createDatabaseUser(Users user, Featurestore featurestore, String pro * @return the password * @throws FeaturestoreException */ - private String createOnlineFeaturestoreUserSecret(String dbuser, Users user, Project project) - throws FeaturestoreException { - String onlineFsPw = RandomStringUtils.randomAlphabetic(FeaturestoreConstants.ONLINE_FEATURESTORE_PW_LENGTH); + private String getOrCreateUserSecret(String dbuser, Users user, Project project) throws FeaturestoreException { + String password = ""; try { - secretsController.delete(user, dbuser); //Delete if the secret already exsits - secretsController.add(user, dbuser, onlineFsPw, VisibilityType.PRIVATE, project.getId()); + password = secretsController.get(user, dbuser).getPlaintext(); } catch (UserException e) { - throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.FEATURESTORE_ONLINE_SECRETS_ERROR, - Level.SEVERE, "Problem adding online featurestore password to hopsworks secretsmgr"); + if (e.getErrorCode() != RESTCodes.UserErrorCode.SECRET_EMPTY) { + throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.FEATURESTORE_ONLINE_SECRETS_ERROR, + Level.SEVERE, "Error retrieving existing online feature store password from secret manager"); + } + } + + if (!Strings.isNullOrEmpty(password)) { + // Password was found, return it + return password; + } + + try { + password = RandomStringUtils.randomAlphabetic(FeaturestoreConstants.ONLINE_FEATURESTORE_PW_LENGTH); + secretsController.add(user, dbuser, password, VisibilityType.PRIVATE, project.getId()); + } catch (UserException e) { + // Secret may be created concurrently when multiple online feature groups are created concurrently. + // If secret already exists due to race condition, get it again. + if (e.getErrorCode() == RESTCodes.UserErrorCode.SECRET_EXISTS) { + return getOrCreateUserSecret(dbuser, user, project); + } else { + throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.FEATURESTORE_ONLINE_SECRETS_ERROR, + Level.SEVERE, "Error adding online feature store password to secret manager"); + } } - return onlineFsPw; + + return password; } /** @@ -322,11 +389,7 @@ public void removeOnlineFeaturestoreUser(Featurestore featurestore, Users user) public void shareOnlineFeatureStore(Project project, Featurestore featurestore, DatasetAccessPermission permission) throws FeaturestoreException { String featureStoreDb = getOnlineFeaturestoreDbName(featurestore.getProject()); - if (!checkIfDatabaseExists(featureStoreDb)) { - // Nothing to share - return; - } - + // do not skip sharing even if featureStoreDb does not exist, see HWORKS-919 try (Connection connection = onlineFeaturestoreFacade.establishAdminConnection()) { for (ProjectTeam member : projectUtils.getProjectTeamCollection(project)) { shareOnlineFeatureStoreUser(project, member.getUser(), member.getTeamRole(), featureStoreDb, permission, @@ -352,11 +415,7 @@ public void shareOnlineFeatureStore(Project project, Users user, String role, Fe DatasetAccessPermission permission, Connection conn) throws FeaturestoreException { String featureStoreDb = getOnlineFeaturestoreDbName(featurestore.getProject()); - if (!checkIfDatabaseExists(featureStoreDb)) { - // Nothing to share - return; - } - + // do not skip sharing even if featureStoreDb does not exist, see HWORKS-919 shareOnlineFeatureStoreUser(project, user, role, featureStoreDb, permission, conn); } diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/online/OnlineFeaturestoreFacade.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/online/OnlineFeaturestoreFacade.java index 09c37ec3b4..04f02f8bb1 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/online/OnlineFeaturestoreFacade.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/online/OnlineFeaturestoreFacade.java @@ -89,15 +89,27 @@ public void init() { * * @param db name of the database */ - public void createOnlineFeaturestoreDatabase(String db, Connection connection) throws FeaturestoreException { + public void createOnlineFeaturestoreDatabaseIfNotExist(String db, Connection connection) + throws FeaturestoreException { + int numRetry = 0; + int maxRetries = 3; //Prepared statements with parameters can only be done for //WHERE/HAVING Clauses, not names of tables or databases - //Don't add 'IF EXISTS', this call should fail if the database already exists - try { - executeUpdate("CREATE DATABASE " + db + ";", connection); - } catch (SQLException se) { - throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ERROR_CREATING_ONLINE_FEATURESTORE_DB, - Level.SEVERE, "Error running create query", se.getMessage(), se); + while (numRetry < maxRetries) { + try { + executeUpdate("CREATE DATABASE IF NOT EXISTS " + db + ";", connection); + break; + } catch (SQLException se) { + // Retry on deadlock, it is caused by concurrent creation of database + if (se.getMessage().contains("Deadlock found when trying to get lock")) { + numRetry++; + if (numRetry < maxRetries) { + continue; + } + } + throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ERROR_CREATING_ONLINE_FEATURESTORE_DB, + Level.SEVERE, "Error running create query", se.getMessage(), se); + } } } @@ -274,8 +286,10 @@ public void revokeUserPrivileges(String dbName, String dbUser, Connection connec resultSet.close(); } } - } catch (Exception e) { - LOGGER.log(Level.SEVERE, "Exception in revoking the privileges", e); + } catch (SQLException e) { + if (!e.getMessage().contains("There is no such grant defined for user")) { + LOGGER.log(Level.SEVERE, "Exception in revoking the privileges", e); + } } } diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/storageconnectors/FeaturestoreStorageConnectorController.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/storageconnectors/FeaturestoreStorageConnectorController.java index 7d5107d8f9..7f8a0df03c 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/storageconnectors/FeaturestoreStorageConnectorController.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/storageconnectors/FeaturestoreStorageConnectorController.java @@ -438,6 +438,8 @@ public FeaturestoreStorageConnectorDTO getOnlineFeaturestoreConnector(Users user throws FeaturestoreException { Featurestore userFeatureStore = featurestoreController.getProjectFeaturestore(userProject); String dbUsername = onlineFeaturestoreController.onlineDbUsername(userProject, user); + // Accessing online feature store from a shared project requires online feature store being setup in users' project. + onlineFeaturestoreController.setupOnlineFeaturestore(userProject, userFeatureStore, user); Optional featurestoreConnector = featurestoreConnectorFacade .findByFeaturestoreName(userFeatureStore, dbUsername + FeaturestoreConstants.ONLINE_FEATURE_STORE_CONNECTOR_SUFFIX); diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/project/ProjectController.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/project/ProjectController.java index c07a38e1f2..3d18e90088 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/project/ProjectController.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/project/ProjectController.java @@ -613,6 +613,7 @@ private Project createProjectDbMetadata(String projectName, Users user, ProjectD project.setKafkaMaxNumTopics(settings.getKafkaMaxNumTopics()); project.setDescription(projectDTO.getDescription()); project.setTopicName(projectDTO.getFeatureStoreTopic()); + project.setOnlineFeatureStoreAvailable(false); project.setCreationStatus(CreationStatus.ONGOING); //Persist project object @@ -1047,6 +1048,12 @@ public boolean updateProjectDescription(Project project, String projectDescr, return false; } + public void setOnlineFeatureStoreAvailable(Project project) { + project.setOnlineFeatureStoreAvailable(true); + projectFacade.mergeProject(project); + projectFacade.flushEm(); + } + //Set the project owner as project master in ProjectTeam table private void addProjectOwner(Project project, Users user) { ProjectTeamPK stp = new ProjectTeamPK(project.getId(), user.getEmail()); @@ -1881,15 +1888,6 @@ public boolean addMember(ProjectTeam projectTeam, Project project, Users newMemb try (Connection connection = onlineFeaturestoreFacade.establishAdminConnection()) { onlineFeaturestoreController.createDatabaseUser(projectTeam.getUser(), featurestore, projectTeam.getTeamRole(), connection); - // give access to the shared online feature stores - - for (DatasetSharedWith sharedDs : project.getDatasetSharedWithCollection()) { - if (sharedDs.getAccepted() && sharedDs.getDataset().getDsType() == DatasetType.FEATURESTORE) { - onlineFeaturestoreController - .shareOnlineFeatureStore(project, newMember, projectTeam.getTeamRole(), - sharedDs.getDataset().getFeatureStore(), sharedDs.getPermission(), connection); - } - } } catch (SQLException e) { throw new FeaturestoreException( RESTCodes.FeaturestoreErrorCode.COULD_NOT_INITIATE_MYSQL_CONNECTION_TO_ONLINE_FEATURESTORE, diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/security/secrets/SecretsController.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/security/secrets/SecretsController.java index d899088569..430b75b2c8 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/security/secrets/SecretsController.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/security/secrets/SecretsController.java @@ -88,13 +88,22 @@ public class SecretsController { public Secret add(Users user, String secretName, String secret, VisibilityType visibilityType, Integer projectIdScope) throws UserException { SecretId secretId = new SecretId(user.getUid(), secretName); - if(secretsFacade.findById(secretId) != null) { - throw new UserException(RESTCodes.UserErrorCode.SECRET_EXISTS, Level.FINE, - "Secret already exists", "Secret with name " + secretName + " already exists for user " + user.getUsername()); - } Secret storedSecret = validateAndCreateSecret(secretId, user, secret, visibilityType, projectIdScope); - secretsFacade.persist(storedSecret); - return storedSecret; + try { + secretsFacade.persist(storedSecret); + return storedSecret; + } catch (EJBException e) { + Throwable rootCause = getRootCause(e); + if (rootCause instanceof SQLIntegrityConstraintViolationException + && rootCause.getMessage().contains("Duplicate entry")) { + throw new UserException(RESTCodes.UserErrorCode.SECRET_EXISTS, Level.FINE, + "Secret already exists", "Secret with name " + secretName + " already exists for user " + + user.getUsername()); + } else { + throw new UserException(RESTCodes.UserErrorCode.SECRET_CREATION_FAILED, Level.FINE, + "Failed to create secret for user " + user.getUsername(), e.getMessage(), e); + } + } } /** diff --git a/hopsworks-common/src/test/io/hops/hopsworks/common/featurestore/featuregroup/online/TestOnlineFeatureGroupController.java b/hopsworks-common/src/test/io/hops/hopsworks/common/featurestore/featuregroup/online/TestOnlineFeatureGroupController.java index 678194c117..931e4738d1 100644 --- a/hopsworks-common/src/test/io/hops/hopsworks/common/featurestore/featuregroup/online/TestOnlineFeatureGroupController.java +++ b/hopsworks-common/src/test/io/hops/hopsworks/common/featurestore/featuregroup/online/TestOnlineFeatureGroupController.java @@ -212,6 +212,7 @@ public void testSetupOnlineFeatureGroupWithEmbedding() throws Exception { doNothing().when(onlineFeaturegroupController).checkOnlineFsUserExist(eq(project)); doNothing().when(onlineFeaturegroupController) .createFeatureGroupKafkaTopic(eq(project), eq(featureGroup), eq(features)); + doNothing().when(onlineFeaturegroupController).createOnlineFeatureStore(any(), any(), any()); // Act onlineFeaturegroupController.setupOnlineFeatureGroup(featureStore, featureGroup, features, project, user); @@ -237,6 +238,7 @@ public void testSetupOnlineFeatureGroupWithoutEmbedding() throws Exception { doNothing().when(onlineFeaturegroupController).checkOnlineFsUserExist(eq(project)); doNothing().when(onlineFeaturegroupController) .createFeatureGroupKafkaTopic(eq(project), eq(featureGroup), eq(features)); + doNothing().when(onlineFeaturegroupController).createOnlineFeatureStore(any(), any(), any()); // Act onlineFeaturegroupController.setupOnlineFeatureGroup(featureStore, featureGroup, features, project, user); diff --git a/hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/project/Project.java b/hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/project/Project.java index 37d66c73ea..b50a8c507f 100644 --- a/hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/project/Project.java +++ b/hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/project/Project.java @@ -196,7 +196,11 @@ public class Project implements Serializable { @Column(name = "topic_name") private String topicName; - + + @Basic(optional = false) + @Column(name = "online_feature_store_available") + private Boolean onlineFeatureStoreAvailable; + @Basic(optional = false) @NotNull @Enumerated(EnumType.ORDINAL) @@ -304,7 +308,15 @@ public CreationStatus getCreationStatus() { public void setCreationStatus(CreationStatus creationStatus) { this.creationStatus = creationStatus; } - + + public Boolean getOnlineFeatureStoreAvailable() { + return onlineFeatureStoreAvailable; + } + + public void setOnlineFeatureStoreAvailable(Boolean onlineFeatureStoreAvailable) { + this.onlineFeatureStoreAvailable = onlineFeatureStoreAvailable; + } + @Override public int hashCode() { int hash = 0; diff --git a/hopsworks-rest-utils/src/main/java/io/hops/hopsworks/restutils/RESTCodes.java b/hopsworks-rest-utils/src/main/java/io/hops/hopsworks/restutils/RESTCodes.java index 15869c952d..cd7f67d72e 100644 --- a/hopsworks-rest-utils/src/main/java/io/hops/hopsworks/restutils/RESTCodes.java +++ b/hopsworks-rest-utils/src/main/java/io/hops/hopsworks/restutils/RESTCodes.java @@ -990,7 +990,9 @@ public enum UserErrorCode implements RESTErrorCode { USER_ACCOUNT_HANDLER_REMOVE_ERROR(62, "Error occurred during user account remove handler.", Response.Status.INTERNAL_SERVER_ERROR), OPERATION_NOT_ALLOWED(63, "Operation not allowed on user", Response.Status.BAD_REQUEST), - ACCOUNT_REJECTION_FAILED(64, "Account rejection failed", Response.Status.BAD_REQUEST); + ACCOUNT_REJECTION_FAILED(64, "Account rejection failed", Response.Status.BAD_REQUEST), + SECRET_CREATION_FAILED(65, "Secret creation failed", Response.Status.INTERNAL_SERVER_ERROR); + private Integer code; private String message;