Skip to content

Commit

Permalink
doris
Browse files Browse the repository at this point in the history
  • Loading branch information
nick2wang committed Sep 20, 2024
1 parent 07abe01 commit bd1d557
Show file tree
Hide file tree
Showing 9 changed files with 248 additions and 102 deletions.
5 changes: 5 additions & 0 deletions sql/engines/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,3 +244,8 @@ def get_engine(instance=None): # pragma: no cover
from .clickhouse import ClickHouseEngine

return ClickHouseEngine(instance=instance)

elif instance.db_type == "doris":
from .doris import DorisEngine

return DorisEngine(instance=instance)
188 changes: 188 additions & 0 deletions sql/engines/doris.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
# -*- coding: UTF-8 -*-
from sql.utils.sql_utils import get_syntax_type, remove_comments
from sql.engines.mysql import MysqlEngine
from .models import ResultSet, ReviewResult, ReviewSet
from common.utils.timer import FuncTimer
from common.config import SysConfig
from MySQLdb.constants import FIELD_TYPE
import traceback
import MySQLdb
import pymysql
import sqlparse
import logging
import re


logger = logging.getLogger("default")


class DorisEngine(MysqlEngine):
name = "Doris"
info = "Doris engine"

auto_backup = False

@property
def server_version(self):
sql = "show frontends"
result = self.query(sql=sql)
version = result.rows[0][-1].split("-")[0]
return tuple([int(n) for n in version.split(".")[:3]])

def query(self, db_name=None, sql="", limit_num=0, close_conn=True, **kwargs):
"""返回 ResultSet"""
result_set = ResultSet(full_sql=sql)
try:
conn = self.get_connection(db_name=db_name)
cursor = conn.cursor()
cursor.execute(sql)
if int(limit_num) > 0:
rows = cursor.fetchmany(size=int(limit_num))
else:
rows = cursor.fetchall()
fields = cursor.description

result_set.column_list = [i[0] for i in fields] if fields else []
result_set.rows = rows
result_set.affected_rows = len(rows)
except Exception as e:
logger.warning(f"Doris语句执行报错,语句:{sql},错误信息{e}")
result_set.error = str(e).split("Stack trace")[0]
finally:
if close_conn:
self.close()
return result_set

forbidden_databases = [
"__internal_schema",
"INFORMATION_SCHEMA",
"information_schema",
]

def execute_check(self, db_name=None, sql=""):
"""上线单执行前的检查, 返回Review set"""
check_result = ReviewSet(full_sql=sql)
# 禁用/高危语句检查
line = 1
critical_ddl_regex = self.config.get("critical_ddl_regex", "")
p = re.compile(critical_ddl_regex)
check_result.syntax_type = 2 # TODO 工单类型 0、其他 1、DDL,2、DML
for statement in sqlparse.split(sql):
statement = sqlparse.format(statement, strip_comments=True)
# 禁用语句
if re.match(r"^select|^show|^explain", statement.lower()):
result = ReviewResult(
id=line,
errlevel=2,
stagestatus="驳回不支持语句",
errormessage="仅支持DML和DDL语句,查询语句请使用SQL查询功能!",
sql=statement,
)
# 高危语句
elif critical_ddl_regex and p.match(statement.strip().lower()):
result = ReviewResult(
id=line,
errlevel=2,
stagestatus="驳回高危SQL",
errormessage="禁止提交匹配" + critical_ddl_regex + "条件的语句!",
sql=statement,
)
# 驳回未带where数据修改语句,如确实需做全部删除或更新,显示的带上where 1=1
elif re.match(
r"^update((?!where).)*$|^delete((?!where).)*$", statement.lower()
):
result = ReviewResult(
id=line,
errlevel=2,
stagestatus="驳回未带where数据修改",
errormessage="数据修改需带where条件!",
sql=statement,
)
# 正常语句
else:
result = ReviewResult(
id=line,
errlevel=0,
stagestatus="Audit completed",
errormessage="None",
sql=statement,
affected_rows=0,
execute_time=0,
)
# 判断工单类型
if get_syntax_type(statement) == "DDL":
check_result.syntax_type = 1
check_result.rows += [result]
line += 1
# 统计警告和错误数量
for r in check_result.rows:
if r.errlevel == 1:
check_result.warning_count += 1
if r.errlevel == 2:
check_result.error_count += 1
return check_result

def execute_workflow(self, workflow):
return self.execute(
db_name=workflow.db_name, sql=workflow.sqlworkflowcontent.sql_content
)

def execute(self, db_name=None, sql="", close_conn=True):
"""执行sql语句 返回 Review set"""
execute_result = ReviewSet(full_sql=sql)
conn = self.get_connection(db_name=db_name)
rowid = 1
effect_row = 0
sql_list = sqlparse.split(sql)
for statement in sql_list:
try:
cursor = conn.cursor()
with FuncTimer() as t:
effect_row = cursor.execute(statement)
cursor.close()
execute_result.rows.append(
ReviewResult(
id=rowid,
errlevel=0,
stagestatus="Execute Successfully",
errormessage="None",
sql=statement,
affected_rows=effect_row,
execute_time=t.cost,
)
)
except Exception as e:
logger.warning(
f"{self.name} 命令执行报错,语句:{sql}, 错误信息:{traceback.format_exc()}"
)
execute_result.error = str(e)
execute_result.rows.append(
ReviewResult(
id=rowid,
errlevel=2,
stagestatus="Execute Failed",
errormessage=f"异常信息:{e}",
sql=statement,
affected_rows=effect_row,
execute_time=t.cost,
)
)
break
rowid += 1
if execute_result.error:
for statement in sql_list[rowid:]:
execute_result.rows.append(
ReviewResult(
id=rowid + 1,
errlevel=2,
stagestatus="Audit Completed",
errormessage="前序语句失败, 未执行",
sql=statement,
affected_rows=0,
execute_time=0,
)
)
rowid += 1
if close_conn:
self.close()
return execute_result
9 changes: 8 additions & 1 deletion sql/engines/mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import traceback
import MySQLdb
import pymysql
import re

import schemaobject
Expand Down Expand Up @@ -98,6 +99,10 @@ def name(self):
def info(self):
return "MySQL engine"

def escape_string(self, value: str) -> str:
"""字符串参数转义"""
return pymysql.escape_string(value)

@property
def auto_backup(self):
"""是否支持备份"""
Expand Down Expand Up @@ -152,7 +157,7 @@ def get_all_databases(self):
row[0]
for row in result.rows
if row[0]
not in ("information_schema", "performance_schema", "mysql", "test", "sys")
not in ("information_schema", "performance_schema", "mysql", "test", "sys", "__internal_schema")
]
result.rows = db_list
return result
Expand Down Expand Up @@ -390,6 +395,8 @@ def reset_instance_user_pwd(self, user_host: str, reset_pwd: str, **kwargs):

def get_all_columns_by_tb(self, db_name, tb_name, **kwargs):
"""获取所有字段, 返回一个ResultSet"""
db_name = self.escape_string(db_name)
tb_name = self.escape_string(tb_name)
sql = f"""SELECT
COLUMN_NAME,
COLUMN_TYPE,
Expand Down
1 change: 1 addition & 0 deletions sql/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ class Meta:
("odps", "ODPS"),
("clickhouse", "ClickHouse"),
("goinception", "goInception"),
("doris", "Doris"),
)


Expand Down
1 change: 1 addition & 0 deletions sql/templates/instance.html
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
<option value="phoenix">Phoenix</option>
<option value="odps">ODPS</option>
<option value="clickhouse">ClickHouse</option>
<option value="doris">Doris</option>
</select>
</div>
<div class="form-group">
Expand Down
43 changes: 10 additions & 33 deletions sql/templates/queryapplylist.html
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ <h4 class="modal-title" id="myModalLabel">申请数据库查询权限</h4>
<optgroup id="optgroup-phoenix" label="Phoenix"></optgroup>
<optgroup id="optgroup-odps" label="ODPS"></optgroup>
<optgroup id="optgroup-clickhouse" label="ClickHouse"></optgroup>
<optgroup id="optgroup-doris" label="Doris"></optgroup>
</select>
</div>
<div class="form-group">
Expand Down Expand Up @@ -157,40 +158,16 @@ <h4 class="modal-title text-danger">工单日志</h4>
},
success: function (data) {
if (data.status === 0) {
var result = data['data'];
$("#optgroup-mysql").empty();
$("#optgroup-mssql").empty();
$("#optgroup-redis").empty();
$("#optgroup-pgsql").empty();
$("#optgroup-oracle").empty();
$("#optgroup-mongo").empty();
$("#optgroup-phoenix").empty();
$("#optgroup-odps").empty();
$("#optgroup-clickhouse").empty();
for (var i = 0; i < result.length; i++) {
var instance = "<option value=\"" + result[i]['instance_name'] + "\">" + result[i]['instance_name'] + "</option>";
if (result[i]['db_type'] === 'mysql') {
$("#optgroup-mysql").append(instance);
} else if (result[i]['db_type'] === 'mssql') {
$("#optgroup-mssql").append(instance);
} else if (result[i]['db_type'] === 'redis') {
$("#optgroup-redis").append(instance);
} else if (result[i]['db_type'] === 'pgsql') {
$("#optgroup-pgsql").append(instance);
} else if (result[i]['db_type'] === 'oracle') {
$("#optgroup-oracle").append(instance);
} else if (result[i]['db_type'] === 'mongo') {
$("#optgroup-mongo").append(instance);
} else if (result[i]['db_type'] === 'phoenix') {
$("#optgroup-phoenix").append(instance);
} else if (result[i]['db_type'] === 'odps') {
$("#optgroup-odps").append(instance);
} else if (result[i]['db_type'] === 'clickhouse') {
$("#optgroup-clickhouse").append(instance);
$("optgroup[id^='optgroup']").empty();
let result = data['data']
const supportDb = ['mysql', 'mssql', 'redis', 'pgsql', 'oracle', 'mongo', 'phoenix','odps' ,'clickhouse', 'doris']
for (let i of result) {
let instance = "<option value=\"" + i.instance_name + "\" instance-id=" + i.id + ">" + i.instance_name + "</option>";
if (supportDb.indexOf(i.db_type) !== -1) {
$("#optgroup-" + i.db_type).append(instance);
}
}
$('#instance_name').selectpicker('render');
$('#instance_name').selectpicker('refresh');
$('#instance_name').selectpicker('render').selectpicker('refresh');
} else {
alert(data.msg);
}
Expand Down Expand Up @@ -399,7 +376,7 @@ <h4 class="modal-title text-danger">工单日志</h4>
// 仅MySQL支持限制到表级权限
var optgroup = $('#instance_name :selected').parent().attr('label');
$("#priv_type").empty();
if (optgroup === "MySQL") {
if (optgroup === "MySQL" || optgroup === "Doris") {
$('#priv_type').append("<option value=\"1\">DATABASE</option>");
$('#priv_type').append("<option value=\"2\" selected=\"selected\">TABLE</option>");
} else {
Expand Down
Loading

0 comments on commit bd1d557

Please sign in to comment.