-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdb_connector.py
103 lines (80 loc) · 2.67 KB
/
db_connector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import json
import sqlite3
import ibm_db
import psycopg2
import pandas as pd
from collections import defaultdict
class db_connector:
def __init__(self):
pass
def do_query(self, query):
pass
def close_connection(self):
pass
class sqlite_connector(db_connector):
def __init__(self, db_path):
self.db_path = db_path
self.con = sqlite3.connect(self.db_path)
self.con.text_factory = lambda b: b.decode(errors = 'ignore')
self.cur = self.con.cursor()
def do_query(self, query):
res = self.cur.execute(query)
if self.cur.description != None:
columns = [desc[0] for desc in self.cur.description]
rows = res.fetchall()
result_df = pd.DataFrame(data = rows, columns = columns)
else:
result_df = pd.DataFrame()
self.con.commit()
return result_df
class db2_connector(db_connector):
def __init__(self):
f = open('./.local/db2.json')
db2_params = json.load(f)
f.close()
self.con = ibm_db.connect(db2_params['db2_connect_params'], "", "")
def do_query(self, query):
def default_value():
return []
stmt = ibm_db.exec_immediate(self.con, query)
dict = ibm_db.fetch_assoc(stmt)
results = defaultdict(default_value)
while dict != False:
for column in dict.keys():
results[column].append(dict[column])
result_df = pd.DataFrame(results)
return result_df
def close_connection(self):
ibm_db.close(self.con)
class postgresql_connector(db_connector):
def __init__(self):
f = open('./.local/postgresql.json')
postgresql_params = json.load(f)
f.close()
self.con = psycopg2.connect(
host = "localhost",
port = 5432,
user = postgresql_params['username'],
password = postgresql_params['password']
)
self.cur = self.con.cursor()
def do_query(self, query):
def default_value():
return []
self.con.commit()
self.cur.execute(query)
if self.cur.description == None:
self.con.commit()
return pd.DataFrame()
columns = [d[0] for d in self.cur.description]
results = self.cur.fetchall()
result_dict = defaultdict(default_value)
for i in range(0, len(columns)):
for tuple in results:
result_dict[columns[i]].append(tuple[i])
result_df = pd.DataFrame(result_dict)
self.con.commit()
return result_df
def close_connection(self):
self.cur.close()
self.con.close()