-
Notifications
You must be signed in to change notification settings - Fork 2
/
utils.py
172 lines (159 loc) · 6.18 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
import re
import ast
import json
from fal import FalDbt
from typing import Tuple, List
def _remove_comments(str1: str = "") -> str:
"""
Remove comments/excessive spaces/"create table as"/"create view as" from the sql file
:param str1: the original sql
:return: the parsed sql
"""
# remove the /* */ comments
q = re.sub(r"/\*[^*]*\*+(?:[^*/][^*]*\*+)*/", "", str1)
# remove whole line -- and # comments
lines = [line for line in q.splitlines() if not re.match("^\s*(--|#)", line)]
# remove trailing -- and # comments
q = " ".join([re.split("--|#", line)[0] for line in lines])
# replace all spaces around commas
q = re.sub(r"\s*,\s*", ",", q)
# replace all multiple spaces to one space
str1 = re.sub("\s\s+", " ", q)
str1 = str1.replace("\n", " ").strip()
return str1
def dbt_preprocess_sql(node: dict = None) -> str:
"""
Process the sql, remove database name in the clause/datetime_add/datetime_sub adding quotes
:param node: the node containing the original sql, file: file name for the sql
:return: None
"""
if node is None:
return ""
org_sql = node["compiled_code"]
ret_sql = _remove_comments(str1=org_sql)
ret_sql = ret_sql.replace("`", "")
# remove any database names in the query
schema = node["schema"]
# for i in schemas:
# ret_sql = re.sub("[^ (,]*(\.{}\.)".format(i), "{}.".format(i), ret_sql)
ret_sql = re.sub("[^ (,]*(\.{}\.)".format(schema), "{}.".format(schema), ret_sql)
ret_sql = re.sub(
r"DATETIME_DIFF\((.+?),\s?(.+?),\s?(DAY|MINUTE|SECOND|HOUR|YEAR)\)",
r"DATETIME_DIFF(\1, \2, '\3'::TEXT)",
ret_sql,
)
ret_sql = re.sub("datetime_add", "DATETIME_ADD", ret_sql, flags=re.IGNORECASE)
ret_sql = re.sub("datetime_sub", "DATETIME_SUB", ret_sql, flags=re.IGNORECASE)
# DATETIME_ADD '' value
dateime_groups = re.findall(
r"DATETIME_ADD\(\s?(.+?),\s?INTERVAL\s?(.+?)\s?(DAY|MINUTE|SECOND|HOUR|YEAR)\)",
ret_sql,
)
if dateime_groups:
for i in dateime_groups:
if not i[1].startswith("'") and not i[1].endswith("'"):
ret_sql = ret_sql.replace(
"DATETIME_ADD({},INTERVAL {} {})".format(i[0], i[1], i[2]),
"DATETIME_ADD({},INTERVAL '{}' {})".format(i[0], i[1], i[2]),
)
else:
continue
# DATETIME_SUB '' value
dateime_sub_groups = re.findall(
r"DATETIME_SUB\(\s?(.+?),\s?INTERVAL\s?(.+?)\s?(DAY|MINUTE|SECOND|HOUR|YEAR)\)",
ret_sql,
)
if dateime_sub_groups:
for i in dateime_sub_groups:
if not i[1].startswith("'") and not i[1].endswith("'"):
ret_sql = ret_sql.replace(
"DATETIME_SUB({},INTERVAL {} {})".format(i[0], i[1], i[2]),
"DATETIME_SUB({},INTERVAL '{}' {})".format(i[0], i[1], i[2]),
)
else:
continue
return ret_sql
def dbt_find_column(table_name: str = "", engine: FalDbt = None) -> List:
"""
Find the columns for the base table in the database
:param engine: the connection engine
:param table_name: the base table name
:return: the list of columns in the base table
"""
cols_fal = engine.execute_sql(
"""SELECT attname AS col
FROM pg_attribute
WHERE attrelid = '{}'::regclass -- table name optionally schema-qualified
AND attnum > 0
AND NOT attisdropped
ORDER BY attnum;
;""".format(
table_name
)
)
return list(cols_fal["col"])
def dbt_produce_json(output_dict: dict = None, engine: FalDbt = None) -> dict:
table_to_model_dict = {}
for key, val in output_dict.items():
table_to_model_dict[val["table_name"]] = key
dep_dict = {}
for key, val in output_dict.items():
if key not in dep_dict.keys():
dep_dict[key] = {}
dep_dict[key]["upstream_tables"] = val["tables"]
else:
dep_dict[key]["upstream_tables"] = val["tables"]
for i in val["tables"]:
key_name = table_to_model_dict.get(i, i)
if key_name not in dep_dict.keys():
dep_dict[key_name] = {}
dep_dict[key_name]["downstream_tables"] = [key]
else:
if "downstream_tables" not in dep_dict[key_name].keys():
dep_dict[key_name]["downstream_tables"] = [key]
else:
dep_dict[key_name]["downstream_tables"].append(key)
base_table_dict = {}
for key, val in dep_dict.items():
if "upstream_tables" not in list(val.keys()):
val["upstream_tables"] = []
if "downstream_tables" not in list(val.keys()):
val["downstream_tables"] = []
if key in list(output_dict.keys()):
val["is_model"] = True
else:
base_table_dict[key] = {}
base_table_dict[key]["tables"] = [""]
base_table_dict[key]["columns"] = {}
cols = dbt_find_column(key, engine)
for i in cols:
base_table_dict[key]["columns"][i] = [""]
base_table_dict[key]["table_name"] = str(key)
val["is_model"] = False
base_table_dict.update(output_dict)
with open("output.json", "w") as outfile:
json.dump(base_table_dict, outfile)
_produce_html(output_json=str(base_table_dict).replace("'", '"'))
return base_table_dict
def _produce_html(output_json: str = ""):
# Creating the HTML file
file_html = open("index.html", "w", encoding="utf-8")
# Adding the input data to the HTML file
file_html.write('''<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta http-equiv="X-UA-Compatible" content="ie=edge">
</head>
<body>
<script>
window.inlineSource = `{}`;
</script>
<div id="main"></div>
<script type="text/javascript" src="vendor.js"></script><script type="text/javascript" src="app.js"></script></body>
</html>'''.format(output_json))
# Saving the data into the HTML file
file_html.close()
if __name__ == "__main__":
pass