-
Notifications
You must be signed in to change notification settings - Fork 34
/
qless.py
64 lines (55 loc) · 1.99 KB
/
qless.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
'''Some helper functions for running tests. This should not be confused with
the python qless bindings.'''
try:
import simplejson as json
except ImportError:
import json
json.JSONDecodeError = ValueError
class QlessRecorder(object):
'''A context-manager to capture anything that goes back and forth'''
__name__ = 'QlessRecorder'
def __init__(self, client):
self._client = client
self._pubsub = self._client.pubsub()
with open('qless.lua') as fin:
self._lua = self._client.register_script(fin.read())
# Record any log messages that we've seen
self.log = []
def raw(self, *args, **kwargs):
'''Submit raw data to the lua script, untransformed'''
return self._lua(*args, **kwargs)
def __call__(self, *args):
'''Invoke the lua script with no keys, and some simple transforms'''
transformed = []
for arg in args:
if isinstance(arg, dict) or isinstance(arg, list):
transformed.append(json.dumps(arg))
else:
transformed.append(arg)
result = self._lua([], transformed)
try:
return json.loads(result)
except json.JSONDecodeError:
return result
except TypeError:
return result
def flush(self):
'''Flush the database'''
self._client.flushdb()
def __enter__(self):
self.log = []
self._pubsub.psubscribe('*')
self._pubsub.listen().next()
return self
def __exit__(self, typ, val, traceback):
# Send the kill signal to our pubsub listener
self._pubsub.punsubscribe('*')
for message in self._pubsub.listen():
typ = message.pop('type')
# Only get subscribe messages
if typ == 'pmessage':
# And pop the pattern attribute
message.pop('pattern')
self.log.append(message)
elif typ == 'punsubscribe':
break