-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdatabase.py
130 lines (104 loc) · 3.43 KB
/
database.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import datetime
import json
import rethinkdb as r
from tornado import ioloop
RDB_HOST = 'localhost'
RDB_PORT = 28015
RDB_NAME = 'iOSTest'
def time_now():
return datetime.datetime.now(r.make_timezone('+08:00'))
def setup():
conn = r.connect(RDB_HOST, RDB_PORT)
def safe_run(rql):
try:
return rql.run(conn)
except r.RqlRuntimeError:
return False
print("database init db and tables")
safe_run(r.db_create(RDB_NAME))
safe_run(r.db(RDB_NAME).table_create("tasks", primary_key='id'))
safe_run(r.db(RDB_NAME).table_create('devices', primary_key='id'))
print("database init done")
conn.close()
r.set_loop_type('tornado')
class DB(object):
def __init__(self, host='localhost', port=28015, db='iOSTest'):
self._host = host
self._port = port
self._db = db
async def run(self, resql):
conn = await r.connect(self._host, self._port, self._db)
try:
return await resql.run(conn)
finally:
conn.close()
async def update_or_insert(self,
table_name: str,
data: dict,
primary_key='id') -> (str, bool):
"""
Return:
(id, inserted)
"""
if primary_key != 'id':
assert primary_key in data
# update first
if primary_key in data:
id = data[primary_key]
ret = await self.run(r.table(table_name).get(id).update(data))
if not ret['skipped']:
return id, False
# insert data into table
data["createdAt"] = time_now()
ret = await self.run(r.table(table_name).insert(data))
assert ret['errors'] == 0
# get id
if "generated_keys" in ret:
return ret["generated_keys"][0], True
return data[primary_key], False
async def task_save(self, task: dict) -> str:
"""
Save task into db
Returns:
saved task id
"""
task_id, _ = await self.update_or_insert("tasks", task)
return task_id
async def device_save(self, device: dict, primary_key='id'):
if primary_key in device:
id = device[primary_key]
ret = await self.run(r.table('devices').get(id).update(device))
await self.run(r.table('devices').insert(device))
assert ret['errors'] == 0
async def _get_all(self, table_name: str, filter=None):
"""
Args:
filter: rethinkdb filter or function return resql
Return
Async Generator
Required Python 3.6
"""
resql = r.table(table_name)
if callable(filter):
resql = filter(resql)
elif filter:
resql = resql.filter(filter)
conn = await r.connect(self._host, self._port, self._db)
try:
cursor = await resql.run(conn)
while (await cursor.fetch_next()):
item = await cursor.next()
yield item
finally:
conn.close()
async def task_all(self, filter=None):
"""
Return
Async Generator
Required Python 3.6
"""
return await self.run(
r.table("tasks").order_by(r.desc("createdAt")))
def device_all(self, filter=None):
return self._get_all('devices', filter)
db = DB(RDB_HOST, RDB_PORT, RDB_NAME)