Skip to content

Commit

Permalink
[HWORKS-734] Remove jdbc pool when initializing online feature store … (
Browse files Browse the repository at this point in the history
#1545)

* [HWORKS-734] Remove jdbc pool when initializing online feature store database for project

* Revert changes to hopsworks-common pom.xml

* Fix initialization of the jdbcString variable
  • Loading branch information
SirOibaf committed Sep 12, 2023
1 parent 826a161 commit d5a45a4
Show file tree
Hide file tree
Showing 8 changed files with 301 additions and 349 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.hops.hopsworks.common.featurestore.featureview.FeatureViewFacade;
import io.hops.hopsworks.common.featurestore.online.OnlineFeaturestoreController;
import io.hops.hopsworks.common.featurestore.featuregroup.FeaturegroupFacade;
import io.hops.hopsworks.common.featurestore.online.OnlineFeaturestoreFacade;
import io.hops.hopsworks.common.featurestore.storageconnectors.FeaturestoreConnectorFacade;
import io.hops.hopsworks.common.featurestore.storageconnectors.FeaturestoreStorageConnectorController;
import io.hops.hopsworks.common.featurestore.storageconnectors.FeaturestoreStorageConnectorDTO;
Expand Down Expand Up @@ -80,6 +81,8 @@ public class FeaturestoreController {
@EJB
private OnlineFeaturestoreController onlineFeaturestoreController;
@EJB
private OnlineFeaturestoreFacade onlineFeaturestoreFacade;
@EJB
private HiveController hiveController;
@EJB
private FeaturegroupFacade featuregroupFacade;
Expand Down Expand Up @@ -375,8 +378,7 @@ private FeaturestoreDTO convertFeaturestoreToDTO(Featurestore featurestore) {
if (settings.isOnlineFeaturestore() &&
onlineFeaturestoreController.checkIfDatabaseExists(
onlineFeaturestoreController.getOnlineFeaturestoreDbName(featurestore.getProject()))) {
featurestoreDTO.setMysqlServerEndpoint(onlineFeaturestoreController.getJdbcURL());
featurestoreDTO.setOnlineFeaturestoreSize(onlineFeaturestoreController.getDbSize(featurestore));
featurestoreDTO.setMysqlServerEndpoint(onlineFeaturestoreFacade.getJdbcURL());
featurestoreDTO.setOnlineFeaturestoreName(featurestore.getProject().getName());
featurestoreDTO.setOnlineEnabled(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ public class FeaturestoreDTO {
private String projectName;
private Integer projectId;
private String onlineFeaturestoreName;
private Double onlineFeaturestoreSize;
private String offlineFeaturestoreName;
private String hiveEndpoint;
private String mysqlServerEndpoint;
Expand All @@ -58,7 +57,6 @@ public FeaturestoreDTO(Featurestore featurestore) {
this.offlineFeaturestoreName = null;
this.hiveEndpoint = null;
this.mysqlServerEndpoint = null;
this.onlineFeaturestoreSize = 0.0;
this.onlineEnabled = false;
}

Expand Down Expand Up @@ -104,16 +102,7 @@ public String getOfflineFeaturestoreName() {
public void setOfflineFeaturestoreName(String offlineFeaturestoreName) {
this.offlineFeaturestoreName = offlineFeaturestoreName;
}

@XmlElement
public Double getOnlineFeaturestoreSize() {
return onlineFeaturestoreSize;
}

public void setOnlineFeaturestoreSize(Double onlineFeaturestoreSize) {
this.onlineFeaturestoreSize = onlineFeaturestoreSize;
}


@XmlElement
public String getHiveEndpoint() {
return hiveEndpoint;
Expand Down Expand Up @@ -186,7 +175,6 @@ public String toString() {
", projectName='" + projectName + '\'' +
", projectId=" + projectId +
", onlineFeaturestoreName='" + onlineFeaturestoreName + '\'' +
", onlineFeaturestoreSize=" + onlineFeaturestoreSize +
", offlineFeaturestoreName='" + offlineFeaturestoreName + '\'' +
", hiveEndpoint='" + hiveEndpoint + '\'' +
", mysqlServerEndpoint='" + mysqlServerEndpoint + '\'' +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
import org.apache.calcite.sql.dialect.HiveSqlDialect;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.javatuples.Pair;

import javax.annotation.PostConstruct;
import javax.ejb.EJB;
Expand All @@ -74,7 +73,6 @@
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
Expand Down Expand Up @@ -371,43 +369,6 @@ public void deleteFeatureGroup(Featuregroup featuregroup, Project project, Users
cachedFeatureGroupFacade.remove(featuregroup.getCachedFeaturegroup());
}

/**
* Parses a ResultSet from a Hive query into a list of RowValueQueryResultDTOs
*
* @param rs resultset to parse
* @return list of parsed rows
* @throws SQLException
*/
public FeaturegroupPreview parseResultset(ResultSet rs) throws SQLException {
ResultSetMetaData rsmd = rs.getMetaData();
FeaturegroupPreview featuregroupPreview = new FeaturegroupPreview();

while (rs.next()) {
FeaturegroupPreview.Row row = new FeaturegroupPreview.Row();

for (int i = 1; i <= rsmd.getColumnCount(); i++) {
Object columnValue = rs.getObject(i);
row.addValue(new Pair<>(parseColumnLabel(rsmd.getColumnLabel(i)),
columnValue == null ? null : columnValue.toString()));
}
featuregroupPreview.addRow(row);
}

return featuregroupPreview;
}

/**
* Column labels contain the table name as well. Remove it
* @param columnLabel
* @return
*/
private String parseColumnLabel(String columnLabel) {
if (columnLabel.contains(".")) {
return columnLabel.split("\\.")[1];
}
return columnLabel;
}

/**
* Opens a JDBC connection to HS2 using the given database and project-user and then executes a regular
* SQL query
Expand All @@ -430,7 +391,7 @@ private FeaturegroupPreview executeReadHiveQuery(String query, String databaseNa
conn = initConnection(databaseName, project, user);
stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(query);
return parseResultset(rs);
return featurestoreUtils.parseResultset(rs);
} catch (SQLException e) {
//Hive throws a generic HiveSQLException not a specific AuthorizationException
if (e.getMessage().toLowerCase().contains("permission denied")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.hops.hopsworks.common.dao.kafka.TopicDTO;
import io.hops.hopsworks.common.featurestore.feature.FeatureGroupFeatureDTO;
import io.hops.hopsworks.common.featurestore.featuregroup.FeaturegroupController;
import io.hops.hopsworks.common.featurestore.featuregroup.cached.CachedFeaturegroupController;
import io.hops.hopsworks.common.featurestore.featuregroup.cached.FeaturegroupPreview;
import io.hops.hopsworks.common.featurestore.online.OnlineFeaturestoreController;
import io.hops.hopsworks.common.featurestore.online.OnlineFeaturestoreFacade;
Expand Down Expand Up @@ -93,8 +92,6 @@ public class OnlineFeaturegroupController {
@EJB
private ProjectController projectController;
@EJB
private CachedFeaturegroupController cachedFeaturegroupController;
@EJB
private ConstructorController constructorController;
@EJB
private FeaturegroupController featuregroupController;
Expand All @@ -120,27 +117,26 @@ protected OnlineFeaturegroupController(Settings settings) {
* @throws SQLException
* @throws FeaturestoreException
*/
public void dropMySQLTable(Featuregroup featuregroup, Project project, Users user) throws SQLException,
FeaturestoreException {
public void dropMySQLTable(Featuregroup featuregroup, Project project, Users user) throws FeaturestoreException {
//Drop data table
String query = "DROP TABLE " + featuregroup.getName() + "_" + featuregroup.getVersion() + ";";
onlineFeaturestoreController.executeUpdateJDBCQuery(query,
onlineFeaturestoreFacade.executeUpdateJDBCQuery(query,
onlineFeaturestoreController.getOnlineFeaturestoreDbName(featuregroup.getFeaturestore().getProject()),
project, user);
}

public void createMySQLTable(Featurestore featurestore, String tableName, List<FeatureGroupFeatureDTO> features,
Project project, Users user)
throws FeaturestoreException, SQLException{
throws FeaturestoreException {
String dbName = onlineFeaturestoreController.getOnlineFeaturestoreDbName(featurestore.getProject());
String createStatement = buildCreateStatement(dbName, tableName, features);
onlineFeaturestoreController.executeUpdateJDBCQuery(createStatement, dbName, project, user);
onlineFeaturestoreFacade.executeUpdateJDBCQuery(createStatement, dbName, project, user);
}

public void setupOnlineFeatureGroup(Featurestore featureStore, Featuregroup featureGroup,
List<FeatureGroupFeatureDTO> features, Project project, Users user)
throws KafkaException, SchemaException, ProjectException, FeaturestoreException, SQLException,
IOException, HopsSecurityException, ServiceException {
throws KafkaException, SchemaException, ProjectException, FeaturestoreException, IOException,
HopsSecurityException, ServiceException {
// check if onlinefs user is part of project
if (project.getProjectTeamCollection().stream().noneMatch(pt ->
pt.getUser().getUsername().equals(OnlineFeaturestoreController.ONLINEFS_USERNAME))) {
Expand All @@ -163,7 +159,7 @@ public void setupOnlineFeatureGroup(Featurestore featureStore, Featuregroup feat
// The topic schema is registered for each feature group
public void createFeatureGroupKafkaTopic(Project project, Featuregroup featureGroup,
List<FeatureGroupFeatureDTO> features)
throws KafkaException, SchemaException, FeaturestoreException {
throws KafkaException, SchemaException, FeaturestoreException {
String avroSchema = avroSchemaConstructorController.constructSchema(featureGroup, features);
schemasController.validateSchema(project, avroSchema);

Expand All @@ -181,7 +177,7 @@ public void createFeatureGroupKafkaTopic(Project project, Featuregroup featureGr
}

public void deleteFeatureGroupKafkaTopic(Project project, Featuregroup featureGroup)
throws KafkaException, SchemaException {
throws KafkaException, SchemaException {
String topicName = Utils.getFeatureGroupTopicName(featureGroup);
String featureGroupEntityName = Utils.getFeaturegroupName(featureGroup);
// user might have deleted topic manually
Expand Down Expand Up @@ -291,9 +287,9 @@ public String buildAlterStatement(String tableName, String dbName, List<FeatureG

public void alterMySQLTableColumns(Featurestore featurestore, String tableName,
List<FeatureGroupFeatureDTO> featureDTOs, Project project, Users user)
throws FeaturestoreException, SQLException {
throws FeaturestoreException {
String dbName = onlineFeaturestoreController.getOnlineFeaturestoreDbName(featurestore.getProject());
onlineFeaturestoreController.executeUpdateJDBCQuery(buildAlterStatement(tableName, dbName, featureDTOs), dbName,
onlineFeaturestoreFacade.executeUpdateJDBCQuery(buildAlterStatement(tableName, dbName, featureDTOs), dbName,
project, user);
}

Expand Down Expand Up @@ -330,7 +326,7 @@ public String getOnlineType(FeatureGroupFeatureDTO featureGroupFeatureDTO) {
* @throws SQLException
*/
public FeaturegroupPreview getFeaturegroupPreview(Featuregroup featuregroup, Project project, Users user, int limit)
throws FeaturestoreException, SQLException {
throws FeaturestoreException {
String tbl = featuregroupController.getTblName(featuregroup.getName(), featuregroup.getVersion());

List<FeatureGroupFeatureDTO> features = featuregroupController.getFeatures(featuregroup, project, user);
Expand All @@ -352,10 +348,10 @@ public FeaturegroupPreview getFeaturegroupPreview(Featuregroup featuregroup, Pro
SqlLiteral.createExactNumeric(String.valueOf(limit), SqlParserPos.ZERO), null);
String db = onlineFeaturestoreController.getOnlineFeaturestoreDbName(featuregroup.getFeaturestore().getProject());
try {
return onlineFeaturestoreController.executeReadJDBCQuery(
return onlineFeaturestoreFacade.executeReadJDBCQuery(
select.toSqlString(new MysqlSqlDialect(SqlDialect.EMPTY_CONTEXT)).getSql(), db, project, user);
} catch(Exception e) {
return onlineFeaturestoreController.executeReadJDBCQuery(
return onlineFeaturestoreFacade.executeReadJDBCQuery(
select.toSqlString(new MysqlSqlDialect(SqlDialect.EMPTY_CONTEXT)).getSql(), db, project, user);
}
}
Expand Down
Loading

0 comments on commit d5a45a4

Please sign in to comment.