From d5a45a45203c4c43f2d6d390acc70ea4f30af231 Mon Sep 17 00:00:00 2001 From: Fabio Buso Date: Tue, 12 Sep 2023 17:53:09 +0200 Subject: [PATCH] =?UTF-8?q?[HWORKS-734]=20Remove=20jdbc=20pool=20when=20in?= =?UTF-8?q?itializing=20online=20feature=20store=20=E2=80=A6=20(#1545)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [HWORKS-734] Remove jdbc pool when initializing online feature store database for project * Revert changes to hopsworks-common pom.xml * Fix initialization of the jdbcString variable --- .../featurestore/FeaturestoreController.java | 6 +- .../common/featurestore/FeaturestoreDTO.java | 14 +- .../cached/CachedFeaturegroupController.java | 41 +-- .../online/OnlineFeaturegroupController.java | 30 +- .../online/OnlineFeaturestoreController.java | 165 +-------- .../online/OnlineFeaturestoreFacade.java | 332 ++++++++++++------ .../featurestore/utils/FeaturestoreUtils.java | 43 +++ .../hops/hopsworks/common/util/Settings.java | 19 +- 8 files changed, 301 insertions(+), 349 deletions(-) diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/FeaturestoreController.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/FeaturestoreController.java index 3889295d3c..fd041e976f 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/FeaturestoreController.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/FeaturestoreController.java @@ -22,6 +22,7 @@ import io.hops.hopsworks.common.featurestore.featureview.FeatureViewFacade; import io.hops.hopsworks.common.featurestore.online.OnlineFeaturestoreController; import io.hops.hopsworks.common.featurestore.featuregroup.FeaturegroupFacade; +import io.hops.hopsworks.common.featurestore.online.OnlineFeaturestoreFacade; import io.hops.hopsworks.common.featurestore.storageconnectors.FeaturestoreConnectorFacade; import io.hops.hopsworks.common.featurestore.storageconnectors.FeaturestoreStorageConnectorController; import io.hops.hopsworks.common.featurestore.storageconnectors.FeaturestoreStorageConnectorDTO; @@ -80,6 +81,8 @@ public class FeaturestoreController { @EJB private OnlineFeaturestoreController onlineFeaturestoreController; @EJB + private OnlineFeaturestoreFacade onlineFeaturestoreFacade; + @EJB private HiveController hiveController; @EJB private FeaturegroupFacade featuregroupFacade; @@ -375,8 +378,7 @@ private FeaturestoreDTO convertFeaturestoreToDTO(Featurestore featurestore) { if (settings.isOnlineFeaturestore() && onlineFeaturestoreController.checkIfDatabaseExists( onlineFeaturestoreController.getOnlineFeaturestoreDbName(featurestore.getProject()))) { - featurestoreDTO.setMysqlServerEndpoint(onlineFeaturestoreController.getJdbcURL()); - featurestoreDTO.setOnlineFeaturestoreSize(onlineFeaturestoreController.getDbSize(featurestore)); + featurestoreDTO.setMysqlServerEndpoint(onlineFeaturestoreFacade.getJdbcURL()); featurestoreDTO.setOnlineFeaturestoreName(featurestore.getProject().getName()); featurestoreDTO.setOnlineEnabled(true); } diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/FeaturestoreDTO.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/FeaturestoreDTO.java index 2909eed70b..910382b3b6 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/FeaturestoreDTO.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/FeaturestoreDTO.java @@ -35,7 +35,6 @@ public class FeaturestoreDTO { private String projectName; private Integer projectId; private String onlineFeaturestoreName; - private Double onlineFeaturestoreSize; private String offlineFeaturestoreName; private String hiveEndpoint; private String mysqlServerEndpoint; @@ -58,7 +57,6 @@ public FeaturestoreDTO(Featurestore featurestore) { this.offlineFeaturestoreName = null; this.hiveEndpoint = null; this.mysqlServerEndpoint = null; - this.onlineFeaturestoreSize = 0.0; this.onlineEnabled = false; } @@ -104,16 +102,7 @@ public String getOfflineFeaturestoreName() { public void setOfflineFeaturestoreName(String offlineFeaturestoreName) { this.offlineFeaturestoreName = offlineFeaturestoreName; } - - @XmlElement - public Double getOnlineFeaturestoreSize() { - return onlineFeaturestoreSize; - } - - public void setOnlineFeaturestoreSize(Double onlineFeaturestoreSize) { - this.onlineFeaturestoreSize = onlineFeaturestoreSize; - } - + @XmlElement public String getHiveEndpoint() { return hiveEndpoint; @@ -186,7 +175,6 @@ public String toString() { ", projectName='" + projectName + '\'' + ", projectId=" + projectId + ", onlineFeaturestoreName='" + onlineFeaturestoreName + '\'' + - ", onlineFeaturestoreSize=" + onlineFeaturestoreSize + ", offlineFeaturestoreName='" + offlineFeaturestoreName + '\'' + ", hiveEndpoint='" + hiveEndpoint + '\'' + ", mysqlServerEndpoint='" + mysqlServerEndpoint + '\'' + diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/cached/CachedFeaturegroupController.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/cached/CachedFeaturegroupController.java index cda49181e1..eb61e5f9eb 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/cached/CachedFeaturegroupController.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/cached/CachedFeaturegroupController.java @@ -62,7 +62,6 @@ import org.apache.calcite.sql.dialect.HiveSqlDialect; import org.apache.calcite.sql.fun.SqlStdOperatorTable; import org.apache.calcite.sql.parser.SqlParserPos; -import org.javatuples.Pair; import javax.annotation.PostConstruct; import javax.ejb.EJB; @@ -74,7 +73,6 @@ import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; -import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; @@ -371,43 +369,6 @@ public void deleteFeatureGroup(Featuregroup featuregroup, Project project, Users cachedFeatureGroupFacade.remove(featuregroup.getCachedFeaturegroup()); } - /** - * Parses a ResultSet from a Hive query into a list of RowValueQueryResultDTOs - * - * @param rs resultset to parse - * @return list of parsed rows - * @throws SQLException - */ - public FeaturegroupPreview parseResultset(ResultSet rs) throws SQLException { - ResultSetMetaData rsmd = rs.getMetaData(); - FeaturegroupPreview featuregroupPreview = new FeaturegroupPreview(); - - while (rs.next()) { - FeaturegroupPreview.Row row = new FeaturegroupPreview.Row(); - - for (int i = 1; i <= rsmd.getColumnCount(); i++) { - Object columnValue = rs.getObject(i); - row.addValue(new Pair<>(parseColumnLabel(rsmd.getColumnLabel(i)), - columnValue == null ? null : columnValue.toString())); - } - featuregroupPreview.addRow(row); - } - - return featuregroupPreview; - } - - /** - * Column labels contain the table name as well. Remove it - * @param columnLabel - * @return - */ - private String parseColumnLabel(String columnLabel) { - if (columnLabel.contains(".")) { - return columnLabel.split("\\.")[1]; - } - return columnLabel; - } - /** * Opens a JDBC connection to HS2 using the given database and project-user and then executes a regular * SQL query @@ -430,7 +391,7 @@ private FeaturegroupPreview executeReadHiveQuery(String query, String databaseNa conn = initConnection(databaseName, project, user); stmt = conn.createStatement(); ResultSet rs = stmt.executeQuery(query); - return parseResultset(rs); + return featurestoreUtils.parseResultset(rs); } catch (SQLException e) { //Hive throws a generic HiveSQLException not a specific AuthorizationException if (e.getMessage().toLowerCase().contains("permission denied")) { diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/online/OnlineFeaturegroupController.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/online/OnlineFeaturegroupController.java index 14cf6fcbbc..99d32df0ee 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/online/OnlineFeaturegroupController.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/online/OnlineFeaturegroupController.java @@ -21,7 +21,6 @@ import io.hops.hopsworks.common.dao.kafka.TopicDTO; import io.hops.hopsworks.common.featurestore.feature.FeatureGroupFeatureDTO; import io.hops.hopsworks.common.featurestore.featuregroup.FeaturegroupController; -import io.hops.hopsworks.common.featurestore.featuregroup.cached.CachedFeaturegroupController; import io.hops.hopsworks.common.featurestore.featuregroup.cached.FeaturegroupPreview; import io.hops.hopsworks.common.featurestore.online.OnlineFeaturestoreController; import io.hops.hopsworks.common.featurestore.online.OnlineFeaturestoreFacade; @@ -93,8 +92,6 @@ public class OnlineFeaturegroupController { @EJB private ProjectController projectController; @EJB - private CachedFeaturegroupController cachedFeaturegroupController; - @EJB private ConstructorController constructorController; @EJB private FeaturegroupController featuregroupController; @@ -120,27 +117,26 @@ protected OnlineFeaturegroupController(Settings settings) { * @throws SQLException * @throws FeaturestoreException */ - public void dropMySQLTable(Featuregroup featuregroup, Project project, Users user) throws SQLException, - FeaturestoreException { + public void dropMySQLTable(Featuregroup featuregroup, Project project, Users user) throws FeaturestoreException { //Drop data table String query = "DROP TABLE " + featuregroup.getName() + "_" + featuregroup.getVersion() + ";"; - onlineFeaturestoreController.executeUpdateJDBCQuery(query, + onlineFeaturestoreFacade.executeUpdateJDBCQuery(query, onlineFeaturestoreController.getOnlineFeaturestoreDbName(featuregroup.getFeaturestore().getProject()), project, user); } public void createMySQLTable(Featurestore featurestore, String tableName, List features, Project project, Users user) - throws FeaturestoreException, SQLException{ + throws FeaturestoreException { String dbName = onlineFeaturestoreController.getOnlineFeaturestoreDbName(featurestore.getProject()); String createStatement = buildCreateStatement(dbName, tableName, features); - onlineFeaturestoreController.executeUpdateJDBCQuery(createStatement, dbName, project, user); + onlineFeaturestoreFacade.executeUpdateJDBCQuery(createStatement, dbName, project, user); } public void setupOnlineFeatureGroup(Featurestore featureStore, Featuregroup featureGroup, List features, Project project, Users user) - throws KafkaException, SchemaException, ProjectException, FeaturestoreException, SQLException, - IOException, HopsSecurityException, ServiceException { + throws KafkaException, SchemaException, ProjectException, FeaturestoreException, IOException, + HopsSecurityException, ServiceException { // check if onlinefs user is part of project if (project.getProjectTeamCollection().stream().noneMatch(pt -> pt.getUser().getUsername().equals(OnlineFeaturestoreController.ONLINEFS_USERNAME))) { @@ -163,7 +159,7 @@ public void setupOnlineFeatureGroup(Featurestore featureStore, Featuregroup feat // The topic schema is registered for each feature group public void createFeatureGroupKafkaTopic(Project project, Featuregroup featureGroup, List features) - throws KafkaException, SchemaException, FeaturestoreException { + throws KafkaException, SchemaException, FeaturestoreException { String avroSchema = avroSchemaConstructorController.constructSchema(featureGroup, features); schemasController.validateSchema(project, avroSchema); @@ -181,7 +177,7 @@ public void createFeatureGroupKafkaTopic(Project project, Featuregroup featureGr } public void deleteFeatureGroupKafkaTopic(Project project, Featuregroup featureGroup) - throws KafkaException, SchemaException { + throws KafkaException, SchemaException { String topicName = Utils.getFeatureGroupTopicName(featureGroup); String featureGroupEntityName = Utils.getFeaturegroupName(featureGroup); // user might have deleted topic manually @@ -291,9 +287,9 @@ public String buildAlterStatement(String tableName, String dbName, List featureDTOs, Project project, Users user) - throws FeaturestoreException, SQLException { + throws FeaturestoreException { String dbName = onlineFeaturestoreController.getOnlineFeaturestoreDbName(featurestore.getProject()); - onlineFeaturestoreController.executeUpdateJDBCQuery(buildAlterStatement(tableName, dbName, featureDTOs), dbName, + onlineFeaturestoreFacade.executeUpdateJDBCQuery(buildAlterStatement(tableName, dbName, featureDTOs), dbName, project, user); } @@ -330,7 +326,7 @@ public String getOnlineType(FeatureGroupFeatureDTO featureGroupFeatureDTO) { * @throws SQLException */ public FeaturegroupPreview getFeaturegroupPreview(Featuregroup featuregroup, Project project, Users user, int limit) - throws FeaturestoreException, SQLException { + throws FeaturestoreException { String tbl = featuregroupController.getTblName(featuregroup.getName(), featuregroup.getVersion()); List features = featuregroupController.getFeatures(featuregroup, project, user); @@ -352,10 +348,10 @@ public FeaturegroupPreview getFeaturegroupPreview(Featuregroup featuregroup, Pro SqlLiteral.createExactNumeric(String.valueOf(limit), SqlParserPos.ZERO), null); String db = onlineFeaturestoreController.getOnlineFeaturestoreDbName(featuregroup.getFeaturestore().getProject()); try { - return onlineFeaturestoreController.executeReadJDBCQuery( + return onlineFeaturestoreFacade.executeReadJDBCQuery( select.toSqlString(new MysqlSqlDialect(SqlDialect.EMPTY_CONTEXT)).getSql(), db, project, user); } catch(Exception e) { - return onlineFeaturestoreController.executeReadJDBCQuery( + return onlineFeaturestoreFacade.executeReadJDBCQuery( select.toSqlString(new MysqlSqlDialect(SqlDialect.EMPTY_CONTEXT)).getSql(), db, project, user); } } diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/online/OnlineFeaturestoreController.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/online/OnlineFeaturestoreController.java index 5929336c95..b1fcef07b0 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/online/OnlineFeaturestoreController.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/online/OnlineFeaturestoreController.java @@ -15,17 +15,12 @@ */ package io.hops.hopsworks.common.featurestore.online; -import com.logicalclocks.servicediscoverclient.exceptions.ServiceDiscoveryException; import io.hops.hopsworks.common.dao.project.team.ProjectTeamFacade; -import io.hops.hopsworks.common.dao.user.UserFacade; import io.hops.hopsworks.common.dao.user.security.secrets.SecretsFacade; import io.hops.hopsworks.common.featurestore.FeaturestoreConstants; import io.hops.hopsworks.common.featurestore.OptionDTO; -import io.hops.hopsworks.common.featurestore.featuregroup.cached.CachedFeaturegroupController; -import io.hops.hopsworks.common.featurestore.featuregroup.cached.FeaturegroupPreview; import io.hops.hopsworks.common.featurestore.storageconnectors.FeaturestoreConnectorFacade; import io.hops.hopsworks.common.featurestore.storageconnectors.StorageConnectorUtil; -import io.hops.hopsworks.common.hosts.ServiceDiscoveryController; import io.hops.hopsworks.common.security.secrets.SecretsController; import io.hops.hopsworks.common.util.Settings; import io.hops.hopsworks.exceptions.FeaturestoreException; @@ -42,25 +37,19 @@ import io.hops.hopsworks.persistence.entity.user.security.secrets.SecretId; import io.hops.hopsworks.persistence.entity.user.security.secrets.VisibilityType; import io.hops.hopsworks.restutils.RESTCodes; -import io.hops.hopsworks.servicediscovery.HopsworksService; -import io.hops.hopsworks.servicediscovery.tags.MysqlTags; import org.apache.commons.lang3.RandomStringUtils; -import javax.annotation.PostConstruct; import javax.ejb.EJB; import javax.ejb.Stateless; import javax.ejb.TransactionAttribute; import javax.ejb.TransactionAttributeType; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; import java.util.ArrayList; import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; +import static io.hops.hopsworks.common.featurestore.online.OnlineFeaturestoreFacade.MYSQL_DRIVER; + /** * Class controlling the interaction with the online featurestore databases in Hopsworks and the associated * business logic. @@ -70,9 +59,7 @@ public class OnlineFeaturestoreController { private static final Logger LOGGER = Logger.getLogger(OnlineFeaturestoreController.class.getName()); - private static final String MYSQL_DRIVER = "com.mysql.cj.jdbc.Driver"; - private static final String MYSQL_JDBC = "jdbc:mysql://"; - private static final String MYSQL_PROPERTIES = "?useSSL=false&allowPublicKeyRetrieval=true"; + public static final String ONLINEFS_USERNAME = "onlinefs"; @EJB @@ -84,137 +71,12 @@ public class OnlineFeaturestoreController { @EJB private OnlineFeaturestoreFacade onlineFeaturestoreFacade; @EJB - private CachedFeaturegroupController cachedFeaturegroupController; - @EJB private ProjectTeamFacade projectTeamFacade; @EJB - private UserFacade userFacade; - @EJB - private ServiceDiscoveryController serviceDiscoveryController; - @EJB private FeaturestoreConnectorFacade featurestoreConnectorFacade; @EJB private StorageConnectorUtil storageConnectorUtil; - - @PostConstruct - public void init() { - try { - // Load MySQL JDBC Driver - Class.forName(MYSQL_DRIVER); - } catch (ClassNotFoundException e) { - LOGGER.log(Level.SEVERE, "Could not load the MySQL JDBC driver: " + MYSQL_DRIVER, e); - } - } - - /** - * Initializes a JDBC connection MySQL Server using an online featurestore user and password - * - * @param databaseName name of the MySQL database to open a connection to - * @param project the project of the user making the request - * @param user the user making the request - * @return conn the JDBC connection - * @throws FeaturestoreException - */ - private Connection initConnection(String databaseName, Project project, Users user) throws FeaturestoreException { - String jdbcString = ""; - String dbUsername = onlineDbUsername(project, user); - String password = ""; - try { - password = secretsController.get(user, dbUsername).getPlaintext(); - } catch (UserException e) { - throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.FEATURESTORE_ONLINE_SECRETS_ERROR, - Level.SEVERE, "Problem getting secrets for the JDBC connection to the online FS"); - } - try { - return DriverManager.getConnection(getJdbcURL(databaseName), dbUsername, password); - } catch (SQLException | ServiceDiscoveryException e) { - throw new FeaturestoreException( - RESTCodes.FeaturestoreErrorCode.COULD_NOT_INITIATE_MYSQL_CONNECTION_TO_ONLINE_FEATURESTORE, Level.SEVERE, - "project: " + project.getName() + ", database: " + databaseName + ", db user:" + dbUsername + - ", jdbcString: " + jdbcString, e.getMessage(), e); - } - } - - /** - * Runs a update/create SQL query against an online featurestore database, impersonating the user making the request - * - * @param query the update/create query to run - * @param databaseName the name of the database to run the query against - * @param project the project of the online featurestore - * @param user the user to run the query as - * @throws FeaturestoreException - * @throws SQLException - */ - public void executeUpdateJDBCQuery(String query, String databaseName, Project project, Users user) - throws FeaturestoreException, SQLException { - //Re-create the connection every time since the connection is database and user-specific - Statement stmt = null; - Connection conn = null; - //Run Query - try { - conn = initConnection(databaseName, project, user); - stmt = conn.createStatement(); - stmt.executeUpdate(query); - } catch (SQLException e) { - throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.MYSQL_JDBC_UPDATE_STATEMENT_ERROR, Level.SEVERE, - "project: " + project.getName() + ", Online featurestore database: " + databaseName + " jdbc query: " + query, - e.getMessage(), e); - } finally { - if (stmt != null) { - stmt.close(); - } - closeConnection(conn); - } - } - - /** - * Runs a Read-SQL query against an online featurestore database, impersonating the user making the request - * - * @param query the read query - * @param databaseName the name of the MySQL database - * @param project the project that owns the online featurestore - * @param user the user making the request - * @return parsed resultset - * @throws SQLException - * @throws FeaturestoreException - */ - public FeaturegroupPreview executeReadJDBCQuery(String query, String databaseName, Project project, Users user) - throws SQLException, FeaturestoreException { - Connection conn = null; - Statement stmt = null; - try { - //Re-create the connection every time since the connection is database and user-specific - conn = initConnection(databaseName, project, user); - stmt = conn.createStatement(); - ResultSet rs = stmt.executeQuery(query); - return cachedFeaturegroupController.parseResultset(rs); - } catch (SQLException e) { - throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.MYSQL_JDBC_READ_QUERY_ERROR, Level.SEVERE, - "project: " + project.getName() + ", mysql database: " + databaseName + " jdbc query: " + query, - e.getMessage(), e); - } finally { - if (stmt != null) { - stmt.close(); - } - closeConnection(conn); - } - } - /** - * Checks if the JDBC connection to MySQL Server is open, and if so closes it. - * - * @param conn the JDBC connection - */ - private void closeConnection(Connection conn) { - try { - if (conn != null) { - conn.close(); - } - } catch (SQLException e) { - LOGGER.log(Level.WARNING, "Error closing MySQL JDBC connection: " + e); - } - } - /** * Sets up the online feature store database for a new project and creating a database-user for the project-owner * @@ -527,17 +389,6 @@ public void unshareOnlineFeatureStore(Project project, Featurestore featurestore } } - /** - * Gets the size of an online featurestore database. I.e the size of a MySQL-cluster database. - * - * @param featurestore the feature store for which to compute the online size - * @return the size in MB - */ - public Double getDbSize(Featurestore featurestore) { - String onlineName = getOnlineFeaturestoreDbName(featurestore.getProject()); - return onlineFeaturestoreFacade.getDbSize(onlineName); - } - /** * Checks if a mysql database exists * @@ -547,14 +398,4 @@ public Double getDbSize(Featurestore featurestore) { public Boolean checkIfDatabaseExists(String dbName) { return onlineFeaturestoreFacade.checkIfDatabaseExists(dbName); } - - public String getJdbcURL() throws ServiceDiscoveryException { - return getJdbcURL(""); - } - - private String getJdbcURL(String dbName) throws ServiceDiscoveryException { - return MYSQL_JDBC + serviceDiscoveryController - .constructServiceAddressWithPort(HopsworksService.MYSQL.getNameWithTag(MysqlTags.onlinefs)) - + "/" + dbName + MYSQL_PROPERTIES; - } } diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/online/OnlineFeaturestoreFacade.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/online/OnlineFeaturestoreFacade.java index b3c7e0899e..5d7017b2cd 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/online/OnlineFeaturestoreFacade.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/online/OnlineFeaturestoreFacade.java @@ -16,16 +16,28 @@ package io.hops.hopsworks.common.featurestore.online; +import com.logicalclocks.servicediscoverclient.exceptions.ServiceDiscoveryException; import io.hops.hopsworks.common.featurestore.feature.FeatureGroupFeatureDTO; +import io.hops.hopsworks.common.featurestore.featuregroup.cached.FeaturegroupPreview; +import io.hops.hopsworks.common.featurestore.utils.FeaturestoreUtils; +import io.hops.hopsworks.common.hosts.ServiceDiscoveryController; +import io.hops.hopsworks.common.security.secrets.SecretsController; +import io.hops.hopsworks.common.util.Settings; import io.hops.hopsworks.exceptions.FeaturestoreException; +import io.hops.hopsworks.exceptions.UserException; +import io.hops.hopsworks.persistence.entity.project.Project; +import io.hops.hopsworks.persistence.entity.user.Users; import io.hops.hopsworks.restutils.RESTCodes; +import io.hops.hopsworks.servicediscovery.HopsworksService; +import io.hops.hopsworks.servicediscovery.tags.MysqlTags; -import javax.annotation.Resource; +import javax.annotation.PostConstruct; +import javax.ejb.EJB; import javax.ejb.Stateless; import javax.ejb.TransactionAttribute; import javax.ejb.TransactionAttributeType; -import javax.sql.DataSource; import java.sql.Connection; +import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; @@ -47,75 +59,29 @@ public class OnlineFeaturestoreFacade { private static final Logger LOGGER = Logger.getLogger(OnlineFeaturestoreFacade.class.getName()); - @Resource(name = "jdbc/featurestore") - private DataSource featureStoreDataSource; + public static final String MYSQL_DRIVER = "com.mysql.cj.jdbc.Driver"; + public static final String MYSQL_JDBC = "jdbc:mysql://"; + public static final String MYSQL_PROPERTIES = "?useSSL=false&allowPublicKeyRetrieval=true"; - /** - * Gets the size of an online featurestore database. I.e the size of a MySQL-cluster database. - * - * @param dbName the name of the database - * @return the size in MB - */ - public Double getDbSize(String dbName) { - try { - ResultSet resultSet = null; - try (Connection connection = featureStoreDataSource.getConnection(); - PreparedStatement pStmt = connection.prepareStatement("SELECT " + - "ROUND(SUM(`tables`.`data_length` + `index_length`) / 1024 / 1024, 1) AS 'size_mb' " + - "FROM information_schema.`tables` " + - "WHERE `tables`.`table_schema`=? GROUP BY `tables`.`table_schema`")) { - pStmt.setString(1, dbName); - resultSet = pStmt.executeQuery(); - if (resultSet.next()) { - return resultSet.getDouble("size_mb"); - } - } finally { - if (resultSet != null) { - resultSet.close(); - } - } - } catch (SQLException se) { - LOGGER.log(Level.SEVERE, "Could not get database size", se); - } + @EJB + private ServiceDiscoveryController serviceDiscoveryController; + @EJB + private Settings settings; + @EJB + private OnlineFeaturestoreController onlineFeaturestoreController; + @EJB + private SecretsController secretsController; + @EJB + private FeaturestoreUtils featurestoreUtils; - return 0.0; - } - - /** - * Gets the features of a online featuregroup from the MySQL metadata - * - * @param tableName the name of the table of the online featuregroup - * @param db the name of the mysql database - * @return list of featureDTOs with name,type,comment - */ - public List getMySQLFeatures(String tableName, String db) throws FeaturestoreException { - ArrayList featureGroupFeatureDTOS = new ArrayList<>(); + @PostConstruct + public void init() { try { - ResultSet resultSet = null; - try (Connection connection = featureStoreDataSource.getConnection(); - PreparedStatement pStmt = connection.prepareStatement( - "SELECT `COLUMNS`.`COLUMN_NAME`,`COLUMNS`.`COLUMN_TYPE`, `COLUMNS`.`COLUMN_COMMENT` " + - "FROM INFORMATION_SCHEMA.`COLUMNS` " + - "WHERE `COLUMNS`.`TABLE_NAME`=? AND `COLUMNS`.`TABLE_SCHEMA`=?;")) { - pStmt.setString(1, tableName); - pStmt.setString(2, db); - resultSet = pStmt.executeQuery(); - while (resultSet.next()) { - featureGroupFeatureDTOS.add(new FeatureGroupFeatureDTO(resultSet.getString(1), - resultSet.getString(2), - resultSet.getString(3))); - } - } finally { - if (resultSet != null) { - resultSet.close(); - } - } - } catch (SQLException se) { - throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ERROR_ONLINE_FEATURES, Level.SEVERE, - "Error reading features from schema", se.getMessage(), se); + // Load MySQL JDBC Driver + Class.forName(MYSQL_DRIVER); + } catch (ClassNotFoundException e) { + LOGGER.log(Level.SEVERE, "Could not load the MySQL JDBC driver: " + MYSQL_DRIVER, e); } - - return featureGroupFeatureDTOS; } /** @@ -159,7 +125,7 @@ public void removeOnlineFeaturestoreDatabase(String db) throws FeaturestoreExcep */ public void createOnlineFeaturestoreUser(String user, String pw) throws FeaturestoreException { try { - try (Connection connection = featureStoreDataSource.getConnection(); + try (Connection connection = establishAdminConnection(); PreparedStatement pStmt = connection.prepareStatement("CREATE USER IF NOT EXISTS ? IDENTIFIED BY ?;")) { pStmt.setString(1, user); pStmt.setString(2, pw); @@ -174,37 +140,23 @@ public void createOnlineFeaturestoreUser(String user, String pw) throws Features } /** - * Revokes user privileges for a user on a specific online featurestore + * Removes a database user for an online featurestore * - * @param dbName name of the MYSQL database - * @param dbUser the database username to revoke privileges for + * @param dbUser the database-username */ - public void revokeUserPrivileges(String dbName, String dbUser) { - ResultSet resultSet = null; + public void removeOnlineFeaturestoreUser(String dbUser) throws FeaturestoreException { + //Prepared statements with parameters can only be done for + //WHERE/HAVING Clauses, not names of tables or databases try { - try (Connection connection = featureStoreDataSource.getConnection(); - PreparedStatement pStmt = connection.prepareStatement( - "SELECT COUNT(*) FROM information_schema.SCHEMA_PRIVILEGES WHERE GRANTEE = ? AND TABLE_SCHEMA = ?")){ - // If the grant does not exists, MySQL returns a 1141 error which JPA catches and logs it together - // with the stack trace, polluting the logs. To avoid this we first query the information_schema - // to check that the grant exists, if so, we remove it - String grantee = "'" + dbUser + "'@'%'"; - pStmt.setString(1, grantee); - pStmt.setString(2, dbName); - - resultSet = pStmt.executeQuery(); - if (resultSet.next() && resultSet.getInt(1) != 0) { - //Prepared statements with parameters can only be done for - //WHERE/HAVING Clauses, not names of tables or databases - executeUpdate("REVOKE ALL PRIVILEGES ON " + dbName + ".* FROM " + dbUser + ";"); - } - } finally { - if (resultSet != null) { - resultSet.close(); - } + try (Connection connection = establishAdminConnection(); + PreparedStatement pStmt = connection.prepareStatement("DROP USER IF EXISTS ?")) { + pStmt.setString(1, dbUser); + pStmt.executeUpdate(); } - } catch (Exception e) { - LOGGER.log(Level.SEVERE, "Exception in revoking the privileges", e); + } catch (SQLException se) { + throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ERROR_DELETING_ONLINE_FEATURESTORE_USER, + Level.SEVERE, "An error occurred when trying to delete the MySQL database user for an online feature store", + se.getMessage(), se); } } @@ -240,9 +192,9 @@ public void grantDataScientistPrivileges(String dbName, String dbUser) throws Fe } } - private void grantUserPrivileges(String dbUser, String grantQuery) throws SQLException { + private void grantUserPrivileges(String dbUser, String grantQuery) throws FeaturestoreException, SQLException { ResultSet resultSet = null; - try (Connection connection = featureStoreDataSource.getConnection(); + try (Connection connection = establishAdminConnection(); PreparedStatement pStmt = connection.prepareStatement( "SELECT COUNT(*) FROM mysql.user WHERE User = ?")){ // If the user doesn't exist, the grant permissions will fail blocking the rest of the user assignments @@ -261,26 +213,78 @@ private void grantUserPrivileges(String dbUser, String grantQuery) throws SQLExc } /** - * Removes a database user for an online featurestore + * Gets the features of a online featuregroup from the MySQL metadata * - * @param dbUser the database-username + * @param tableName the name of the table of the online featuregroup + * @param db the name of the mysql database + * @return list of featureDTOs with name,type,comment */ - public void removeOnlineFeaturestoreUser(String dbUser) throws FeaturestoreException { - //Prepared statements with parameters can only be done for - //WHERE/HAVING Clauses, not names of tables or databases + public List getMySQLFeatures(String tableName, String db) throws FeaturestoreException { + ArrayList featureGroupFeatureDTOS = new ArrayList<>(); try { - try (Connection connection = featureStoreDataSource.getConnection(); - PreparedStatement pStmt = connection.prepareStatement("DROP USER IF EXISTS ?")) { - pStmt.setString(1, dbUser); - pStmt.executeUpdate(); + ResultSet resultSet = null; + try (Connection connection = establishAdminConnection(); + PreparedStatement pStmt = connection.prepareStatement( + "SELECT `COLUMNS`.`COLUMN_NAME`,`COLUMNS`.`COLUMN_TYPE`, `COLUMNS`.`COLUMN_COMMENT` " + + "FROM INFORMATION_SCHEMA.`COLUMNS` " + + "WHERE `COLUMNS`.`TABLE_NAME`=? AND `COLUMNS`.`TABLE_SCHEMA`=?;")) { + pStmt.setString(1, tableName); + pStmt.setString(2, db); + resultSet = pStmt.executeQuery(); + while (resultSet.next()) { + featureGroupFeatureDTOS.add(new FeatureGroupFeatureDTO(resultSet.getString(1), + resultSet.getString(2), + resultSet.getString(3))); + } + } finally { + if (resultSet != null) { + resultSet.close(); + } } } catch (SQLException se) { - throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ERROR_DELETING_ONLINE_FEATURESTORE_USER, - Level.SEVERE, "An error occurred when trying to delete the MySQL database user for an online feature store", - se.getMessage(), se); + throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ERROR_ONLINE_FEATURES, Level.SEVERE, + "Error reading features from schema", se.getMessage(), se); } + + return featureGroupFeatureDTOS; } + /** + * Revokes user privileges for a user on a specific online featurestore + * + * @param dbName name of the MYSQL database + * @param dbUser the database username to revoke privileges for + */ + public void revokeUserPrivileges(String dbName, String dbUser) { + ResultSet resultSet = null; + try { + try (Connection connection = establishAdminConnection(); + PreparedStatement pStmt = connection.prepareStatement( + "SELECT COUNT(*) FROM information_schema.SCHEMA_PRIVILEGES WHERE GRANTEE = ? AND TABLE_SCHEMA = ?")){ + // If the grant does not exists, MySQL returns a 1141 error which JPA catches and logs it together + // with the stack trace, polluting the logs. To avoid this we first query the information_schema + // to check that the grant exists, if so, we remove it + String grantee = "'" + dbUser + "'@'%'"; + pStmt.setString(1, grantee); + pStmt.setString(2, dbName); + + resultSet = pStmt.executeQuery(); + if (resultSet.next() && resultSet.getInt(1) != 0) { + //Prepared statements with parameters can only be done for + //WHERE/HAVING Clauses, not names of tables or databases + executeUpdate("REVOKE ALL PRIVILEGES ON " + dbName + ".* FROM " + dbUser + ";"); + } + } finally { + if (resultSet != null) { + resultSet.close(); + } + } + } catch (Exception e) { + LOGGER.log(Level.SEVERE, "Exception in revoking the privileges", e); + } + } + + /** * Checks if a mysql database exists * @@ -290,7 +294,7 @@ public void removeOnlineFeaturestoreUser(String dbUser) throws FeaturestoreExcep public Boolean checkIfDatabaseExists(String dbName) { try { ResultSet resultSet = null; - try (Connection connection = featureStoreDataSource.getConnection(); + try (Connection connection = establishAdminConnection(); PreparedStatement pStmt = connection.prepareStatement( "SELECT `SCHEMA_NAME` FROM `INFORMATION_SCHEMA`.`SCHEMATA` WHERE `SCHEMA_NAME`=?")) { pStmt.setString(1, dbName); @@ -301,12 +305,61 @@ public Boolean checkIfDatabaseExists(String dbName) { resultSet.close(); } } - } catch (SQLException se) { + } catch (SQLException | FeaturestoreException se) { LOGGER.log(Level.SEVERE, "Error checking if database exists", se); return false; } } + /** + * Runs a update/create SQL query against an online featurestore database, impersonating the user making the request + * + * @param query the update/create query to run + * @param databaseName the name of the database to run the query against + * @param project the project of the online featurestore + * @param user the user to run the query as + * @throws FeaturestoreException + * @throws SQLException + */ + public void executeUpdateJDBCQuery(String query, String databaseName, Project project, Users user) + throws FeaturestoreException{ + //Re-create the connection every time since the connection is database and user-specific + //Run Query + try (Connection conn = establishUserConnection(databaseName, project, user); + Statement stmt = conn.createStatement()) { + stmt.executeUpdate(query); + } catch (SQLException e) { + throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.MYSQL_JDBC_UPDATE_STATEMENT_ERROR, Level.SEVERE, + "project: " + project.getName() + ", Online featurestore database: " + databaseName + " jdbc query: " + query, + e.getMessage(), e); + } + } + + /** + * Runs a Read-SQL query against an online featurestore database, impersonating the user making the request + * + * @param query the read query + * @param databaseName the name of the MySQL database + * @param project the project that owns the online featurestore + * @param user the user making the request + * @return parsed resultset + * @throws SQLException + * @throws FeaturestoreException + */ + public FeaturegroupPreview executeReadJDBCQuery(String query, String databaseName, Project project, Users user) + throws FeaturestoreException { + try (Connection conn = establishUserConnection(databaseName, project, user); + Statement stmt = conn.createStatement()) { + //Re-create the connection every time since the connection is database and user-specific + ResultSet rs = stmt.executeQuery(query); + return featurestoreUtils.parseResultset(rs); + } catch (SQLException e) { + throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.MYSQL_JDBC_READ_QUERY_ERROR, Level.SEVERE, + "project: " + project.getName() + ", mysql database: " + databaseName + " jdbc query: " + query, + e.getMessage(), e); + } + } + /** * Create a Kafka Offset table in Online Featurestore Database. * @@ -330,11 +383,64 @@ public void createOnlineFeaturestoreKafkaOffsetTable(String db) throws Featurest } } - private void executeUpdate(String query) throws SQLException { - try (Connection connection = featureStoreDataSource.getConnection(); + private void executeUpdate(String query) throws SQLException, FeaturestoreException { + try (Connection connection = establishAdminConnection(); Statement stmt = connection.createStatement()) { stmt.executeUpdate(query); } } + private Connection establishAdminConnection() throws FeaturestoreException { + try { + return DriverManager.getConnection(getJdbcURL(), + settings.getVariableFeaturestoreDbAdminUser(), + settings.getVariableFeaturestoreDbAdminPwd()); + } catch (SQLException | ServiceDiscoveryException e) { + throw new FeaturestoreException( + RESTCodes.FeaturestoreErrorCode.COULD_NOT_INITIATE_MYSQL_CONNECTION_TO_ONLINE_FEATURESTORE, + Level.SEVERE, e.getMessage(), e.getMessage(), e); + } + } + + /** + * Initializes a JDBC connection MySQL Server using an online featurestore user and password + * + * @param databaseName name of the MySQL database to open a connection to + * @param project the project of the user making the request + * @param user the user making the request + * @return conn the JDBC connection + * @throws FeaturestoreException + */ + private Connection establishUserConnection(String databaseName, Project project, Users user) + throws FeaturestoreException { + String dbUsername = onlineFeaturestoreController.onlineDbUsername(project, user); + String password; + try { + password = secretsController.get(user, dbUsername).getPlaintext(); + } catch (UserException e) { + throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.FEATURESTORE_ONLINE_SECRETS_ERROR, + Level.SEVERE, "Problem getting secrets for the JDBC connection to the online FS"); + } + + String jdbcString = ""; + try { + jdbcString = getJdbcURL(databaseName); + return DriverManager.getConnection(jdbcString, dbUsername, password); + } catch (SQLException | ServiceDiscoveryException e) { + throw new FeaturestoreException( + RESTCodes.FeaturestoreErrorCode.COULD_NOT_INITIATE_MYSQL_CONNECTION_TO_ONLINE_FEATURESTORE, Level.SEVERE, + "project: " + project.getName() + ", database: " + databaseName + ", db user:" + dbUsername + + ", jdbcString: " + jdbcString, e.getMessage(), e); + } + } + + public String getJdbcURL() throws ServiceDiscoveryException { + return getJdbcURL(""); + } + + public String getJdbcURL(String dbName) throws ServiceDiscoveryException { + return MYSQL_JDBC + serviceDiscoveryController + .constructServiceAddressWithPort(HopsworksService.MYSQL.getNameWithTag(MysqlTags.onlinefs)) + + "/" + dbName + MYSQL_PROPERTIES; + } } diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/utils/FeaturestoreUtils.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/utils/FeaturestoreUtils.java index 2bb2e9f9d1..d4b137f012 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/utils/FeaturestoreUtils.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/utils/FeaturestoreUtils.java @@ -21,6 +21,7 @@ import io.hops.hopsworks.common.api.ResourceRequest; import io.hops.hopsworks.common.constants.auth.AllowedRoles; import io.hops.hopsworks.common.dao.project.team.ProjectTeamFacade; +import io.hops.hopsworks.common.featurestore.featuregroup.cached.FeaturegroupPreview; import io.hops.hopsworks.common.hdfs.DistributedFsService; import io.hops.hopsworks.common.hosts.ServiceDiscoveryController; import io.hops.hopsworks.exceptions.FeaturestoreException; @@ -36,10 +37,14 @@ import io.hops.hopsworks.restutils.RESTCodes; import io.hops.hopsworks.servicediscovery.HopsworksService; import io.hops.hopsworks.servicediscovery.tags.NamenodeTags; +import org.javatuples.Pair; import javax.ejb.EJB; import javax.ejb.Stateless; import javax.ws.rs.core.UriBuilder; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; import java.util.ArrayList; import java.util.List; import java.util.logging.Level; @@ -257,4 +262,42 @@ public UriBuilder trainingDatasetURI(UriBuilder uriBuilder, Project accessProjec .path(ResourceRequest.Name.VERSION.toString().toLowerCase()) .path(Integer.toString(trainingDataset.getVersion())); } + + + /** + * Parses a ResultSet from a Hive query into a list of RowValueQueryResultDTOs + * + * @param rs resultset to parse + * @return list of parsed rows + * @throws SQLException + */ + public FeaturegroupPreview parseResultset(ResultSet rs) throws SQLException { + ResultSetMetaData rsmd = rs.getMetaData(); + FeaturegroupPreview featuregroupPreview = new FeaturegroupPreview(); + + while (rs.next()) { + FeaturegroupPreview.Row row = new FeaturegroupPreview.Row(); + + for (int i = 1; i <= rsmd.getColumnCount(); i++) { + Object columnValue = rs.getObject(i); + row.addValue(new Pair<>(parseColumnLabel(rsmd.getColumnLabel(i)), + columnValue == null ? null : columnValue.toString())); + } + featuregroupPreview.addRow(row); + } + + return featuregroupPreview; + } + + /** + * Column labels contain the table name as well. Remove it + * @param columnLabel + * @return + */ + private String parseColumnLabel(String columnLabel) { + if (columnLabel.contains(".")) { + return columnLabel.split("\\.")[1]; + } + return columnLabel; + } } diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/util/Settings.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/util/Settings.java index 4b66e21722..7ab56e692c 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/util/Settings.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/util/Settings.java @@ -310,6 +310,8 @@ public class Settings implements Serializable { private static final String VARIABLE_FEATURESTORE_DEFAULT_QUOTA = "featurestore_default_quota"; private static final String VARIABLE_FEATURESTORE_DEFAULT_STORAGE_FORMAT = "featurestore_default_storage_format"; private static final String VARIABLE_FEATURESTORE_JDBC_URL = "featurestore_jdbc_url"; + private static final String VARIABLE_FEATURESTORE_DB_ADMIN_USER = "featurestore_db_admin_user"; + private static final String VARIABLE_FEATURESTORE_DB_ADMIN_PWD = "featurestore_db_admin_pwd"; private static final String VARIABLE_ONLINE_FEATURESTORE = "featurestore_online_enabled"; private static final String VARIABLE_FG_PREVIEW_LIMIT = "fg_preview_limit"; private static final String VARIABLE_ONLINE_FEATURESTORE_TS = "featurestore_online_tablespace"; @@ -808,6 +810,8 @@ private void populateCache() { FEATURESTORE_DB_DEFAULT_STORAGE_FORMAT = setStrVar(VARIABLE_FEATURESTORE_DEFAULT_STORAGE_FORMAT, FEATURESTORE_DB_DEFAULT_STORAGE_FORMAT); FEATURESTORE_JDBC_URL = setStrVar(VARIABLE_FEATURESTORE_JDBC_URL, FEATURESTORE_JDBC_URL); + FEATURESTORE_DB_ADMIN_USER = setStrVar(VARIABLE_FEATURESTORE_DB_ADMIN_USER, FEATURESTORE_DB_ADMIN_USER); + FEATURESTORE_DB_ADMIN_PWD = setStrVar(VARIABLE_FEATURESTORE_DB_ADMIN_PWD, FEATURESTORE_DB_ADMIN_PWD); ONLINE_FEATURESTORE = setBoolVar(VARIABLE_ONLINE_FEATURESTORE, ONLINE_FEATURESTORE); ONLINE_FEATURESTORE_TS = setStrVar(VARIABLE_ONLINE_FEATURESTORE_TS, ONLINE_FEATURESTORE_TS); ONLINEFS_THREAD_NUMBER = setIntVar(VARIABLE_ONLINEFS_THREAD_NUMBER, ONLINEFS_THREAD_NUMBER); @@ -3421,12 +3425,23 @@ public Boolean isHopsUtilInsecure() { } private String FEATURESTORE_JDBC_URL = "jdbc:mysql://onlinefs.mysql.service.consul:3306/"; - public synchronized String getFeaturestoreJdbcUrl() { checkCache(); return FEATURESTORE_JDBC_URL; } - + + private String FEATURESTORE_DB_ADMIN_USER = ""; + public synchronized String getVariableFeaturestoreDbAdminUser() { + checkCache(); + return FEATURESTORE_DB_ADMIN_USER; + } + + private String FEATURESTORE_DB_ADMIN_PWD = ""; + public synchronized String getVariableFeaturestoreDbAdminPwd() { + checkCache(); + return FEATURESTORE_DB_ADMIN_PWD; + } + private Boolean REQUESTS_VERIFY = false; /**