-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
98 lines (69 loc) · 2.67 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
from dotenv import load_dotenv
from flask import Flask, request
from flask_httpauth import HTTPBasicAuth
from werkzeug.security import generate_password_hash, check_password_hash
import redis
import os
# Use singleton for Redis connection pool
# https://stackoverflow.com/questions/49398590/correct-way-of-using-redis-connection-pool-in-python
class Singleton(type):
"""
A metaclass for singleton purpose. Every singleton class should inherit from this class by 'metaclass=Singleton'.
"""
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]
class Redis(metaclass=Singleton):
def __init__(self):
self.pool = {db: redis.ConnectionPool(host=os.environ['HOST'], port=os.environ['PORT'], password=os.environ['PASS'], db=db, decode_responses=True) for db in range(16)}
@property
def conn(self):
if not hasattr(self, '_conn'):
self.get_connection()
return self._conn
def get_connection(self):
self._conn = {db: redis.Redis(connection_pool=self.pool[db]) for db in range(16)}
class RedisProxyPipeline:
def __init__(self, clients: Redis, db: str):
self.client = clients.conn[int(db)]
self.pipe = self.client.pipeline()
def info(self):
return self.client.info()
def sadd(self, arguments):
return self.pipe.sadd(arguments['name'], *arguments['values'])
def zrange(self, arguments):
return self.pipe.zrange(arguments['name'], arguments['start'], arguments['end'], byscore=arguments.get('byscore', False))
def execute(self):
return self.pipe.execute()
load_dotenv()
app = Flask(__name__)
clients = Redis()
users = {
os.environ['AUTH_USERNAME']: generate_password_hash(os.environ['AUTH_PASSWD'])
}
auth = HTTPBasicAuth()
@auth.verify_password
def verify_password(username, password):
if username in users and check_password_hash(users.get(username), password):
return username
@app.route('/', methods=['POST'])
@auth.login_required
def pipeline():
try:
req = request.get_json()
except Exception as e:
return RedisProxyPipeline(clients, '0').info()
try:
if int(req['db']) not in range(16):
return {}, 400
proxy = RedisProxyPipeline(clients, req['db'])
for c in req['cmds']:
if c['cmd'] in ['sadd', 'zrange']:
getattr(proxy, c['cmd'])(c['args'])
return proxy.execute()
except Exception as e:
return {"message": str(e)}, 400
if __name__ == '__main__':
app.run(host='0.0.0.0', port=6380)