Skip to content

Commit

Permalink
[Refactor][Web]Add catalog (DataLinkDC#2386)
Browse files Browse the repository at this point in the history
* optimize_version

* optimize_version

* add_structure

* add_catalog

* add_catalog

* add_catalog

* add_catalog

* add_catalog

* add_catalog

* add_catalog
  • Loading branch information
zackyoungh authored Oct 16, 2023
1 parent 3cbe7ed commit 28b7440
Show file tree
Hide file tree
Showing 22 changed files with 740 additions and 180 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.dinky.data.enums.BusinessType;
import org.dinky.data.enums.Status;
import org.dinky.data.model.Catalog;
import org.dinky.data.model.FlinkColumn;
import org.dinky.data.model.Column;
import org.dinky.data.model.Schema;
import org.dinky.data.result.IResult;
import org.dinky.data.result.Result;
Expand Down Expand Up @@ -176,7 +176,7 @@ public Result<Schema> getMSSchemaInfo(@RequestBody StudioMetaStoreDTO studioMeta
paramType = "query"),
@ApiImplicitParam(name = "table", value = "table", required = true, dataType = "String", paramType = "query")
})
public Result<List<FlinkColumn>> getMSFlinkColumns(
public Result<List<Column>> getMSFlinkColumns(
@RequestParam Integer envId,
@RequestParam String catalog,
@RequestParam String database,
Expand Down
52 changes: 2 additions & 50 deletions dinky-admin/src/main/java/org/dinky/service/StudioService.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.dinky.data.dto.StudioDDLDTO;
import org.dinky.data.dto.StudioMetaStoreDTO;
import org.dinky.data.model.Catalog;
import org.dinky.data.model.FlinkColumn;
import org.dinky.data.model.Column;
import org.dinky.data.model.Schema;
import org.dinky.data.result.IResult;
import org.dinky.data.result.SelectResult;
Expand All @@ -41,67 +41,19 @@
*/
public interface StudioService {

/**
* Execute a DDL statement and return the result.
*
* @param studioDDLDTO A {@link StudioDDLDTO} object representing the DDL statement to execute.
* @return An {@link IResult} object representing the result of the DDL statement execution.
*/
IResult executeDDL(StudioDDLDTO studioDDLDTO);

/**
* Get common SQL data based on the specified task ID.
*
* @param taskId The ID of the task to get the common SQL data for.
* @return A {@link JdbcSelectResult} object representing the common SQL data for the specified task ID.
*/
JdbcSelectResult getCommonSqlData(Integer taskId);

/**
* Get job data based on the specified job ID.
*
* @param jobId The ID of the job to get the job data for.
* @return A {@link SelectResult} object representing the job data for the specified job ID.
*/
SelectResult getJobData(String jobId);

/**
* Get the lineage information for a specified studio CAD.
*
* @param studioCADTO A {@link StudioCADTO} object representing the studio CAD to get the lineage information for.
* @return A {@link LineageResult} object representing the lineage information for the specified studio CAD.
*/
LineageResult getLineage(StudioCADTO studioCADTO);

/**
* Get a list of Flink jobs based on the specified cluster ID.
*
* @param clusterId The ID of the cluster to get the Flink jobs for.
* @return A list of {@link JsonNode} objects representing the Flink jobs for the specified cluster ID.
*/
List<JsonNode> listFlinkJobs(Integer clusterId);

/**
* Get MS catalogs based on the specified studio meta store DTO.
*
* @param studioMetaStoreDTO A {@link StudioMetaStoreDTO} object representing the studio meta store DTO to get the MS catalogs for.
* @return A list of {@link Catalog} objects representing the MS catalogs for the specified studio meta store DTO.
*/
List<Catalog> getMSCatalogs(StudioMetaStoreDTO studioMetaStoreDTO);

/**
* Get the schema information for a specified MS catalog.
*
* @param studioMetaStoreDTO A {@link StudioMetaStoreDTO} object representing the MS catalog to get the schema information for.
* @return A {@link Schema} object representing the schema information for the specified MS catalog.
*/
Schema getMSSchemaInfo(StudioMetaStoreDTO studioMetaStoreDTO);

/**
* Get a list of Flink columns based on the specified MS catalog.
*
* @param studioMetaStoreDTO A {@link StudioMetaStoreDTO} object representing the MS catalog to get the Flink columns for.
* @return A list of {@link FlinkColumn} objects representing the Flink columns for the specified MS catalog.
*/
List<FlinkColumn> getMSFlinkColumns(StudioMetaStoreDTO studioMetaStoreDTO);
List<Column> getMSFlinkColumns(StudioMetaStoreDTO studioMetaStoreDTO);
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@
import org.dinky.data.dto.StudioMetaStoreDTO;
import org.dinky.data.model.Catalog;
import org.dinky.data.model.Cluster;
import org.dinky.data.model.Column;
import org.dinky.data.model.DataBase;
import org.dinky.data.model.FlinkColumn;
import org.dinky.data.model.Schema;
import org.dinky.data.model.Table;
import org.dinky.data.result.DDLResult;
import org.dinky.data.result.IResult;
import org.dinky.data.result.ResultPool;
import org.dinky.data.result.SelectResult;
import org.dinky.executor.CustomTableEnvironment;
import org.dinky.explainer.lineage.LineageBuilder;
import org.dinky.explainer.lineage.LineageResult;
import org.dinky.job.JobConfig;
Expand All @@ -49,6 +50,7 @@
import org.dinky.service.StudioService;
import org.dinky.service.TaskService;
import org.dinky.sql.FlinkQuery;
import org.dinky.utils.FlinkTableMetadataUtil;
import org.dinky.utils.RunTimeUtil;

import java.util.ArrayList;
Expand All @@ -60,10 +62,14 @@
import com.fasterxml.jackson.databind.JsonNode;

import cn.dev33.satoken.stp.StpUtil;
import cn.hutool.cache.Cache;
import cn.hutool.cache.CacheUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

/** StudioServiceImpl */
/**
* StudioServiceImpl
*/
@Service
@RequiredArgsConstructor
@Slf4j
Expand All @@ -72,6 +78,7 @@ public class StudioServiceImpl implements StudioService {
private final ClusterInstanceService clusterInstanceService;
private final DataBaseService dataBaseService;
private final TaskService taskService;
private final Cache<String, JobManager> jobManagerCache = CacheUtil.newTimedCache(1000 * 60 * 2);

private IResult executeMSFlinkSql(StudioMetaStoreDTO studioMetaStoreDTO) {
String envSql = taskService.buildEnvSql(studioMetaStoreDTO);
Expand Down Expand Up @@ -153,116 +160,65 @@ public List<Catalog> getMSCatalogs(StudioMetaStoreDTO studioMetaStoreDTO) {
catalogs.add(defaultCatalog);
}
} else {
studioMetaStoreDTO.setStatement(FlinkQuery.showCatalogs());
IResult result = executeMSFlinkSql(studioMetaStoreDTO);

if (result instanceof DDLResult) {
DDLResult ddlResult = (DDLResult) result;
ddlResult.getColumns().stream().findFirst().ifPresent(key -> {
for (Map<String, Object> item : ddlResult.getRowData()) {
catalogs.add(Catalog.build(item.get(key).toString()));
}
});

for (Catalog catalog : catalogs) {
String statement = FlinkQuery.useCatalog(catalog.getName())
+ FlinkQuery.separator()
+ FlinkQuery.showDatabases();
studioMetaStoreDTO.setStatement(statement);
IResult tableResult = executeMSFlinkSql(studioMetaStoreDTO);
DDLResult tableDDLResult = (DDLResult) tableResult;
tableDDLResult.getColumns().stream().findFirst().ifPresent(key -> {
List<Map<String, Object>> rowData = tableDDLResult.getRowData();
List<Schema> schemas = new ArrayList<>();
for (Map<String, Object> item : rowData) {
schemas.add(Schema.build(item.get(key).toString()));
}
catalog.setSchemas(schemas);
});
}
}
String envSql = taskService.buildEnvSql(studioMetaStoreDTO);
JobManager jobManager = getJobManager(studioMetaStoreDTO, envSql);
CustomTableEnvironment customTableEnvironment =
jobManager.getExecutor().getCustomTableEnvironment();
catalogs.addAll(FlinkTableMetadataUtil.getCatalog(customTableEnvironment));
}
return catalogs;
}

@Override
public Schema getMSSchemaInfo(StudioMetaStoreDTO studioMetaStoreDTO) {
Schema schema = Schema.build(studioMetaStoreDTO.getDatabase());
String database = studioMetaStoreDTO.getDatabase();
Schema schema = Schema.build(database);
List<Table> tables = new ArrayList<>();
if (Dialect.isCommonSql(studioMetaStoreDTO.getDialect())) {
DataBase dataBase = dataBaseService.getById(studioMetaStoreDTO.getDatabaseId());
if (Asserts.isNotNull(dataBase)) {
Driver driver = Driver.build(dataBase.getDriverConfig());
tables.addAll(driver.listTables(studioMetaStoreDTO.getDatabase()));
tables.addAll(driver.listTables(database));
}
} else {
String baseStatement = FlinkQuery.useCatalog(studioMetaStoreDTO.getCatalog())
+ FlinkQuery.separator()
+ FlinkQuery.useDatabase(studioMetaStoreDTO.getDatabase())
+ FlinkQuery.separator();

// show tables
String tableStatement = baseStatement + FlinkQuery.showTables();
studioMetaStoreDTO.setStatement(tableStatement);
IResult result = executeMSFlinkSql(studioMetaStoreDTO);
if (result instanceof DDLResult) {
DDLResult ddlResult = (DDLResult) result;
ddlResult.getColumns().stream().findFirst().ifPresent(key -> {
List<Map<String, Object>> rowData = ddlResult.getRowData();
for (Map<String, Object> item : rowData) {
Table table = Table.build(item.get(key).toString(), studioMetaStoreDTO.getDatabase());
table.setCatalog(studioMetaStoreDTO.getCatalog());
tables.add(table);
}
});
}
// show views
schema.setViews(showInfo(studioMetaStoreDTO, baseStatement, FlinkQuery.showViews()));
// show functions
schema.setFunctions(showInfo(studioMetaStoreDTO, baseStatement, FlinkQuery.showFunctions()));
// show user functions
schema.setUserFunctions(showInfo(studioMetaStoreDTO, baseStatement, FlinkQuery.showUserFunctions()));
// show modules
schema.setModules(showInfo(studioMetaStoreDTO, baseStatement, FlinkQuery.showModules()));
String envSql = taskService.buildEnvSql(studioMetaStoreDTO);
JobManager jobManager = getJobManager(studioMetaStoreDTO, envSql);
CustomTableEnvironment customTableEnvironment =
jobManager.getExecutor().getCustomTableEnvironment();
FlinkTableMetadataUtil.setSchemaInfo(
customTableEnvironment, studioMetaStoreDTO.getCatalog(), database, schema, tables);
}
schema.setTables(tables);
return schema;
}

@Override
public List<FlinkColumn> getMSFlinkColumns(StudioMetaStoreDTO studioMetaStoreDTO) {
List<FlinkColumn> columns = new ArrayList<>();
public List<Column> getMSFlinkColumns(StudioMetaStoreDTO studioMetaStoreDTO) {
List<Column> columns = new ArrayList<>();
if (!Dialect.isCommonSql(studioMetaStoreDTO.getDialect())) {
String baseStatement = FlinkQuery.useCatalog(studioMetaStoreDTO.getCatalog())
+ FlinkQuery.separator()
+ FlinkQuery.useDatabase(studioMetaStoreDTO.getDatabase())
+ FlinkQuery.separator();

// desc tables
String tableStatement = baseStatement + FlinkQuery.descTable(studioMetaStoreDTO.getTable());
studioMetaStoreDTO.setStatement(tableStatement);
IResult result = executeMSFlinkSql(studioMetaStoreDTO);
if (result instanceof DDLResult) {
DDLResult ddlResult = (DDLResult) result;
List<Map<String, Object>> rowData = ddlResult.getRowData();
int i = 1;
for (Map<String, Object> item : rowData) {
FlinkColumn column = FlinkColumn.build(
i,
item.get(FlinkQuery.columnName()).toString(),
item.get(FlinkQuery.columnType()).toString(),
item.get(FlinkQuery.columnKey()).toString(),
item.get(FlinkQuery.columnNull()).toString(),
item.get(FlinkQuery.columnExtras()).toString(),
item.get(FlinkQuery.columnWatermark()).toString());
columns.add(column);
i++;
}
}
String catalogName = studioMetaStoreDTO.getCatalog();
String database = studioMetaStoreDTO.getDatabase();
String tableName = studioMetaStoreDTO.getTable();
String envSql = taskService.buildEnvSql(studioMetaStoreDTO);
JobManager jobManager = getJobManager(studioMetaStoreDTO, envSql);
CustomTableEnvironment customTableEnvironment =
jobManager.getExecutor().getCustomTableEnvironment();
columns.addAll(
FlinkTableMetadataUtil.getColumnList(customTableEnvironment, catalogName, database, tableName));
}
return columns;
}

private JobManager getJobManager(StudioMetaStoreDTO studioMetaStoreDTO, String envSql) {
JobManager jobManager = jobManagerCache.get(envSql, () -> {
JobConfig config = studioMetaStoreDTO.getJobConfig();
JobManager jobManagerTmp = JobManager.build(config);
jobManagerTmp.executeDDL(envSql);
return jobManagerTmp;
});
return jobManager;
}

private List<String> showInfo(StudioMetaStoreDTO studioMetaStoreDTO, String baseStatement, String statement) {
List<String> infos = new ArrayList<>();
studioMetaStoreDTO.setStatement(baseStatement + statement);
Expand Down
2 changes: 1 addition & 1 deletion dinky-admin/src/main/resources/db/db-h2.sql
Original file line number Diff line number Diff line change
Expand Up @@ -1766,7 +1766,7 @@ INSERT INTO `dinky_sys_menu` VALUES (32, 1, '作业监控', '/home/jobOverView',
INSERT INTO `dinky_sys_menu` VALUES (33, 1, '数据开发', '/home/devOverView', 'DevOverView', 'home:devOverView', 'AimOutlined', 'F', 0, 3, '2023-08-15 16:54:47', '2023-09-26 14:49:00', null);
INSERT INTO `dinky_sys_menu` VALUES (34, 5, '项目列表', '/datastudio/left/project', null, 'datastudio:left:project', 'ConsoleSqlOutlined', 'F', 0, 5, '2023-09-01 18:00:39', '2023-09-26 14:49:31', null);
INSERT INTO `dinky_sys_menu` VALUES (35, 5, '元数据', '/datastudio/left/metadata', null, 'datastudio:left:metadata', 'TableOutlined', 'F', 0, 7, '2023-09-01 18:01:09', '2023-09-26 14:49:42', null);
INSERT INTO `dinky_sys_menu` VALUES (36, 5, '结构', '/datastudio/left/structure', null, 'datastudio:left:structure', 'DatabaseOutlined', 'F', 0, 6, '2023-09-01 18:01:30', '2023-09-26 14:49:54', null);
INSERT INTO `dinky_sys_menu` VALUES (36, 5, 'catalog', '/datastudio/left/catalog', null, 'datastudio:left:structure', 'DatabaseOutlined', 'F', 0, 6, '2023-09-01 18:01:30', '2023-09-26 14:49:54', null);
INSERT INTO `dinky_sys_menu` VALUES (37, 5, '作业配置', '/datastudio/right/jobConfig', null, 'datastudio:right:jobConfig', 'SettingOutlined', 'F', 0, 8, '2023-09-01 18:02:15', '2023-09-26 14:50:24', null);
INSERT INTO `dinky_sys_menu` VALUES (38, 5, '执行配置', '/datastudio/right/executeConfig', null, 'datastudio:right:executeConfig', 'ExperimentOutlined', 'F', 0, 9, '2023-09-01 18:03:08', '2023-09-26 14:50:54', null);
INSERT INTO `dinky_sys_menu` VALUES (39, 5, '版本历史', '/datastudio/right/historyVision', null, 'datastudio:right:historyVision', 'HistoryOutlined', 'F', 0, 10, '2023-09-01 18:03:29', '2023-09-26 14:51:03', null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ public interface TableEnvironmentInstance {
TableEnvironment getTableEnvironment();

default void injectParser(CustomParser parser) {}
;

default void injectExtendedExecutor(CustomExtendedOperationExecutor extendedExecutor) {}
;
}
Loading

0 comments on commit 28b7440

Please sign in to comment.