Skip to content

Commit

Permalink
[HWORKS-734][append] utilize single connection for online feature sto…
Browse files Browse the repository at this point in the history
…re setup (#1559)

* [HWORKS-734][append] utilize single connection for online feature store setup

* fixes
  • Loading branch information
robzor92 authored and SirOibaf committed Sep 21, 2023
1 parent 901846a commit a3c803a
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1201,7 +1201,13 @@ public void unshare(Project project, Users user, Dataset dataset, String targetP
unshareFeatureStoreServiceDataset(user, project, targetProject, datasetSharedWith,
Settings.ServiceDataset.STATISTICS, dfso);
// Unshare the online feature store
onlineFeaturestoreController.unshareOnlineFeatureStore(targetProject, dataset.getFeatureStore());
try {
onlineFeaturestoreController.unshareOnlineFeatureStore(targetProject, dataset.getFeatureStore());
} catch (FeaturestoreException e) {
throw new DatasetException(
RESTCodes.DatasetErrorCode.DATASET_OPERATION_ERROR,
Level.SEVERE, e.getMessage(), e.getMessage(), e);
}
}
unshareDs(project, user, datasetSharedWith, dfso);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Date;
import java.util.List;
Expand Down Expand Up @@ -312,14 +314,19 @@ private void createOnlineFeatureStore(Project project, Users user, Featurestore
return;
}

onlineFeaturestoreController.setupOnlineFeaturestore(user, featurestore);

// Create online feature store users for existing team members
for (ProjectTeam projectTeam : projectTeamFacade.findMembersByProject(project)) {
if (!projectTeam.getUser().equals(user)) {
onlineFeaturestoreController.createDatabaseUser(projectTeam.getUser(),
featurestore, projectTeam.getTeamRole());
try (Connection connection = onlineFeaturestoreFacade.establishAdminConnection()) {
onlineFeaturestoreController.setupOnlineFeaturestore(user, featurestore, connection);
// Create online feature store users for existing team members
for (ProjectTeam projectTeam : projectTeamFacade.findMembersByProject(project)) {
if (!projectTeam.getUser().equals(user)) {
onlineFeaturestoreController.createDatabaseUser(projectTeam.getUser(),
featurestore, projectTeam.getTeamRole(), connection);
}
}
} catch(SQLException e) {
throw new FeaturestoreException(
RESTCodes.FeaturestoreErrorCode.COULD_NOT_INITIATE_MYSQL_CONNECTION_TO_ONLINE_FEATURESTORE,
Level.SEVERE, e.getMessage(), e.getMessage(), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,26 +86,20 @@ public class OnlineFeaturestoreController {
* @param featurestore the featurestore metadata entity
* @throws FeaturestoreException
*/
public void setupOnlineFeaturestore(Users user, Featurestore featurestore)
public void setupOnlineFeaturestore(Users user, Featurestore featurestore, Connection connection)
throws FeaturestoreException {
if (!settings.isOnlineFeaturestore()) {
throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.FEATURESTORE_ONLINE_NOT_ENABLED,
Level.FINE, "Online feature store service is not enabled for this Hopsworks instance");
}
String db = getOnlineFeaturestoreDbName(featurestore.getProject());
try (Connection connection = onlineFeaturestoreFacade.establishAdminConnection()) {
// Create dataset
onlineFeaturestoreFacade.createOnlineFeaturestoreDatabase(db, connection);

// Create kafka offset table
onlineFeaturestoreFacade.createOnlineFeaturestoreKafkaOffsetTable(db, connection);
} catch (SQLException se) {
throw new FeaturestoreException(
RESTCodes.FeaturestoreErrorCode.COULD_NOT_INITIATE_MYSQL_CONNECTION_TO_ONLINE_FEATURESTORE,
Level.SEVERE, "Error closing connection", se.getMessage(), se);
}
onlineFeaturestoreFacade.createOnlineFeaturestoreDatabase(db, connection);

// Create kafka offset table
onlineFeaturestoreFacade.createOnlineFeaturestoreKafkaOffsetTable(db, connection);
// Create project owner database user
createDatabaseUser(user, featurestore, ProjectRoleTypes.DATA_OWNER.getRole());
createDatabaseUser(user, featurestore, ProjectRoleTypes.DATA_OWNER.getRole(), connection);
}

/**
Expand All @@ -115,7 +109,7 @@ public void setupOnlineFeaturestore(Users user, Featurestore featurestore)
* @param featurestore the feature store
* @throws FeaturestoreException
*/
public void createDatabaseUser(Users user, Featurestore featurestore, String projectRole)
public void createDatabaseUser(Users user, Featurestore featurestore, String projectRole, Connection connection)
throws FeaturestoreException {
String db = getOnlineFeaturestoreDbName(featurestore.getProject());
if (!checkIfDatabaseExists(db)) {
Expand All @@ -127,9 +121,9 @@ public void createDatabaseUser(Users user, Featurestore featurestore, String pro
//Generate random pw
String onlineFsPw = createOnlineFeaturestoreUserSecret(dbUser, user, featurestore.getProject());
//database is the same as the project name
onlineFeaturestoreFacade.createOnlineFeaturestoreUser(dbUser, onlineFsPw);
onlineFeaturestoreFacade.createOnlineFeaturestoreUser(dbUser, onlineFsPw, connection);

updateUserOnlineFeatureStoreDB(user, featurestore, projectRole);
updateUserOnlineFeatureStoreDB(user, featurestore, projectRole, connection);
}

/**
Expand Down Expand Up @@ -198,7 +192,8 @@ private String onlineDbUsername(String project, String user) {
* @param featurestore the feature store
* @throws FeaturestoreException
*/
public void updateUserOnlineFeatureStoreDB(Users user, Featurestore featurestore, String projectRole)
public void updateUserOnlineFeatureStoreDB(Users user, Featurestore featurestore, String projectRole,
Connection connection)
throws FeaturestoreException {
String db = getOnlineFeaturestoreDbName(featurestore.getProject());
if (!settings.isOnlineFeaturestore() || !checkIfDatabaseExists(db)) {
Expand All @@ -209,9 +204,9 @@ public void updateUserOnlineFeatureStoreDB(Users user, Featurestore featurestore
String dbuser = onlineDbUsername(featurestore.getProject(), user);

if (projectRole.equals(ProjectRoleTypes.DATA_OWNER.getRole())) {
onlineFeaturestoreFacade.grantDataOwnerPrivileges(db, dbuser);
onlineFeaturestoreFacade.grantDataOwnerPrivileges(db, dbuser, connection);
} else {
onlineFeaturestoreFacade.grantDataScientistPrivileges(db, dbuser);
onlineFeaturestoreFacade.grantDataScientistPrivileges(db, dbuser, connection);
}

try {
Expand Down Expand Up @@ -332,8 +327,15 @@ public void shareOnlineFeatureStore(Project project, Featurestore featurestore,
return;
}

for (ProjectTeam member : projectTeamFacade.findMembersByProject(project)) {
shareOnlineFeatureStoreUser(project, member.getUser(), member.getTeamRole(), featureStoreDb, permission);
try (Connection connection = onlineFeaturestoreFacade.establishAdminConnection()) {
for (ProjectTeam member : projectTeamFacade.findMembersByProject(project)) {
shareOnlineFeatureStoreUser(project, member.getUser(), member.getTeamRole(), featureStoreDb, permission,
connection);
}
} catch (SQLException e) {
throw new FeaturestoreException(
RESTCodes.FeaturestoreErrorCode.COULD_NOT_INITIATE_MYSQL_CONNECTION_TO_ONLINE_FEATURESTORE,
Level.SEVERE, e.getMessage(), e.getMessage(), e);
}
}

Expand All @@ -347,14 +349,15 @@ public void shareOnlineFeatureStore(Project project, Featurestore featurestore,
* @throws FeaturestoreException
*/
public void shareOnlineFeatureStore(Project project, Users user, String role, Featurestore featurestore,
DatasetAccessPermission permission) throws FeaturestoreException {
DatasetAccessPermission permission, Connection conn)
throws FeaturestoreException {
String featureStoreDb = getOnlineFeaturestoreDbName(featurestore.getProject());
if (!checkIfDatabaseExists(featureStoreDb)) {
// Nothing to share
return;
}

shareOnlineFeatureStoreUser(project, user, role, featureStoreDb, permission);
shareOnlineFeatureStoreUser(project, user, role, featureStoreDb, permission, conn);
}

/**
Expand All @@ -367,18 +370,18 @@ public void shareOnlineFeatureStore(Project project, Users user, String role, Fe
* @throws FeaturestoreException
*/
private void shareOnlineFeatureStoreUser(Project project, Users user, String role,
String featureStoreDb, DatasetAccessPermission permission)
String featureStoreDb, DatasetAccessPermission permission, Connection conn)
throws FeaturestoreException {
String dbUser = onlineDbUsername(project, user);

if (permission == DatasetAccessPermission.READ_ONLY ||
(permission == DatasetAccessPermission.EDITABLE_BY_OWNERS &&
role.equals(ProjectRoleTypes.DATA_SCIENTIST.getRole()))) {
// Read Only
onlineFeaturestoreFacade.grantDataScientistPrivileges(featureStoreDb, dbUser);
onlineFeaturestoreFacade.grantDataScientistPrivileges(featureStoreDb, dbUser, conn);
} else {
// Write permissions
onlineFeaturestoreFacade.grantDataOwnerPrivileges(featureStoreDb, dbUser);
onlineFeaturestoreFacade.grantDataOwnerPrivileges(featureStoreDb, dbUser, conn);
}
}

Expand All @@ -391,16 +394,23 @@ private void shareOnlineFeatureStoreUser(Project project, Users user, String rol
* @param featurestore: feature store to remove access
* @throws FeaturestoreException
*/
public void unshareOnlineFeatureStore(Project project, Featurestore featurestore) {
public void unshareOnlineFeatureStore(Project project, Featurestore featurestore) throws FeaturestoreException {
String featureStoreDb = getOnlineFeaturestoreDbName(featurestore.getProject());
if (!checkIfDatabaseExists(featureStoreDb)) {
// Nothing to share
return;
}

for (ProjectTeam member : projectTeamFacade.findMembersByProject(project)) {
String dbUser = onlineDbUsername(project, member.getUser());
onlineFeaturestoreFacade.revokeUserPrivileges(featureStoreDb, dbUser);

try (Connection connection = onlineFeaturestoreFacade.establishAdminConnection()) {
for (ProjectTeam member : projectTeamFacade.findMembersByProject(project)) {
String dbUser = onlineDbUsername(project, member.getUser());
onlineFeaturestoreFacade.revokeUserPrivileges(featureStoreDb, dbUser, connection);
}
} catch (SQLException e) {
throw new FeaturestoreException(
RESTCodes.FeaturestoreErrorCode.COULD_NOT_INITIATE_MYSQL_CONNECTION_TO_ONLINE_FEATURESTORE,
Level.SEVERE, e.getMessage(), e.getMessage(), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,9 @@ public void removeOnlineFeaturestoreDatabase(String db, Connection connection) t
* @param user the database username
* @param pw the database user password
*/
public void createOnlineFeaturestoreUser(String user, String pw) throws FeaturestoreException {
public void createOnlineFeaturestoreUser(String user, String pw, Connection connection) throws FeaturestoreException {
try {
try (Connection connection = establishAdminConnection();
PreparedStatement pStmt = connection.prepareStatement("CREATE USER IF NOT EXISTS ? IDENTIFIED BY ?;");
try (PreparedStatement pStmt = connection.prepareStatement("CREATE USER IF NOT EXISTS ? IDENTIFIED BY ?;");
Statement stmt = connection.createStatement()) {
pStmt.setString(1, user);
pStmt.setString(2, pw);
Expand Down Expand Up @@ -166,9 +165,10 @@ public void removeOnlineFeaturestoreUser(String dbUser, Connection connection) t
* @param dbName name of the online featurestore database
* @param dbUser the database-username
*/
public void grantDataOwnerPrivileges(String dbName, String dbUser) throws FeaturestoreException {
public void grantDataOwnerPrivileges(String dbName, String dbUser, Connection conn) throws FeaturestoreException {
try {
grantUserPrivileges(dbUser, "GRANT ALL PRIVILEGES ON " + dbName + ".* TO " + dbUser + ";", dbName);
grantUserPrivileges(dbUser, "GRANT ALL PRIVILEGES ON " + dbName + ".* TO " + dbUser + ";", dbName,
conn);
} catch (SQLException se) {
throw new FeaturestoreException(
RESTCodes.FeaturestoreErrorCode.ERROR_GRANTING_ONLINE_FEATURESTORE_USER_PRIVILEGES, Level.SEVERE,
Expand All @@ -182,30 +182,30 @@ public void grantDataOwnerPrivileges(String dbName, String dbUser) throws Featur
* @param dbName name of the online featurestore database
* @param dbUser the database-username
*/
public void grantDataScientistPrivileges(String dbName, String dbUser) throws FeaturestoreException {
public void grantDataScientistPrivileges(String dbName, String dbUser, Connection conn) throws FeaturestoreException {
try {
grantUserPrivileges(dbUser, "GRANT SELECT ON " + dbName + ".* TO " + dbUser + ";", dbName);
grantUserPrivileges(dbUser, "GRANT SELECT ON " + dbName + ".* TO " + dbUser + ";", dbName, conn);
} catch (SQLException se) {
throw new FeaturestoreException(
RESTCodes.FeaturestoreErrorCode.ERROR_GRANTING_ONLINE_FEATURESTORE_USER_PRIVILEGES, Level.SEVERE,
"Error running the grant query", se.getMessage(), se);
}
}

private void grantUserPrivileges(String dbUser, String grantQuery, String dbName) throws FeaturestoreException,
private void grantUserPrivileges(String dbUser, String grantQuery, String dbName, Connection conn)
throws FeaturestoreException,
SQLException {
ResultSet resultSet = null;
try (Connection connection = establishAdminConnection();
PreparedStatement pStmt = connection.prepareStatement(
try (PreparedStatement pStmt = conn.prepareStatement(
"SELECT COUNT(*) FROM mysql.user WHERE User = ?")){
// If the user doesn't exist, the grant permissions will fail blocking the rest of the user assignments
// check if the user exists before executing the granting command
pStmt.setString(1, dbUser);
resultSet = pStmt.executeQuery();

if (resultSet.next() && resultSet.getInt(1) != 0) {
revokeUserPrivileges(dbName, dbUser, connection);
executeUpdate(grantQuery, connection);
revokeUserPrivileges(dbName, dbUser, conn);
executeUpdate(grantQuery, conn);
}
} finally {
if (resultSet != null) {
Expand Down Expand Up @@ -250,22 +250,6 @@ public List<FeatureGroupFeatureDTO> getMySQLFeatures(String tableName, String db

return featureGroupFeatureDTOS;
}

/**
* Revokes user privileges for a user on a specific online featurestore
*
* @param dbName name of the MYSQL database
* @param dbUser the database username to revoke privileges for
*/
public void revokeUserPrivileges(String dbName, String dbUser) {
try {
try (Connection connection = establishAdminConnection()) {
revokeUserPrivileges(dbName, dbUser, connection);
}
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Exception in revoking the privileges", e);
}
}

public void revokeUserPrivileges(String dbName, String dbUser, Connection connection) {
ResultSet resultSet = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import io.hops.hopsworks.common.dao.user.activity.ActivityFacade;
import io.hops.hopsworks.common.dataset.DatasetController;
import io.hops.hopsworks.common.dataset.FolderNameValidator;
import io.hops.hopsworks.common.featurestore.online.OnlineFeaturestoreFacade;
import io.hops.hopsworks.common.kafka.KafkaController;
import io.hops.hopsworks.common.opensearch.OpenSearchController;
import io.hops.hopsworks.common.experiments.tensorboard.TensorBoardController;
Expand Down Expand Up @@ -179,6 +180,7 @@
import java.security.UnrecoverableKeyException;
import java.security.cert.Certificate;
import java.security.cert.CertificateException;
import java.sql.Connection;
import java.sql.SQLException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
Expand Down Expand Up @@ -287,6 +289,8 @@ public class ProjectController {
@EJB
private ProjectServiceFacade projectServiceFacade;
@EJB
private OnlineFeaturestoreFacade onlineFeaturestoreFacade;
@EJB
private ProjectQuotasController projectQuotasController;
@EJB
private KafkaController kafkaController;
Expand Down Expand Up @@ -1858,16 +1862,23 @@ public boolean addMember(ProjectTeam projectTeam, Project project, Users newMemb
if (projectServiceFacade.isServiceEnabledForProject(project, ProjectServiceEnum.FEATURESTORE) &&
settings.isOnlineFeaturestore()) {
Featurestore featurestore = featurestoreController.getProjectFeaturestore(project);
onlineFeaturestoreController.createDatabaseUser(projectTeam.getUser(),
featurestore, projectTeam.getTeamRole());

// give access to the shared online feature stores
for (DatasetSharedWith sharedDs : project.getDatasetSharedWithCollection()) {
if (sharedDs.getAccepted() && sharedDs.getDataset().getDsType() == DatasetType.FEATURESTORE) {
onlineFeaturestoreController
try (Connection connection = onlineFeaturestoreFacade.establishAdminConnection()) {
onlineFeaturestoreController.createDatabaseUser(projectTeam.getUser(),
featurestore, projectTeam.getTeamRole(), connection);
// give access to the shared online feature stores

for (DatasetSharedWith sharedDs : project.getDatasetSharedWithCollection()) {
if (sharedDs.getAccepted() && sharedDs.getDataset().getDsType() == DatasetType.FEATURESTORE) {
onlineFeaturestoreController
.shareOnlineFeatureStore(project, newMember, projectTeam.getTeamRole(),
sharedDs.getDataset().getFeatureStore(), sharedDs.getPermission());
sharedDs.getDataset().getFeatureStore(), sharedDs.getPermission(), connection);
}
}
} catch (SQLException e) {
throw new FeaturestoreException(
RESTCodes.FeaturestoreErrorCode.COULD_NOT_INITIATE_MYSQL_CONNECTION_TO_ONLINE_FEATURESTORE,
Level.SEVERE, e.getMessage(), e.getMessage(), e);
}
}

Expand Down Expand Up @@ -2207,7 +2218,13 @@ public void updateMemberRole(Project project, Users user, String newRole) throws
// Update privileges for online feature store
if (projectServiceFacade.isServiceEnabledForProject(project, ProjectServiceEnum.FEATURESTORE)) {
Featurestore featurestore = featurestoreController.getProjectFeaturestore(project);
onlineFeaturestoreController.updateUserOnlineFeatureStoreDB(user, featurestore, newRole);
try (Connection connection = onlineFeaturestoreFacade.establishAdminConnection()) {
onlineFeaturestoreController.updateUserOnlineFeatureStoreDB(user, featurestore, newRole, connection);
} catch (SQLException e) {
throw new FeaturestoreException(
RESTCodes.FeaturestoreErrorCode.COULD_NOT_INITIATE_MYSQL_CONNECTION_TO_ONLINE_FEATURESTORE,
Level.SEVERE, e.getMessage(), e.getMessage(), e);
}
}
}

Expand Down

0 comments on commit a3c803a

Please sign in to comment.