-
Notifications
You must be signed in to change notification settings - Fork 0
/
worker.py
139 lines (111 loc) · 3.45 KB
/
worker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import os
import traceback
from base64 import b64encode
from traceback import print_exc
from urllib.parse import urlparse
from urllib.parse import urlunparse
import celery
import orjson
import redis
import requests
import urllib3
from celery import Task
from celery.utils.log import get_task_logger
from dotenv import load_dotenv
from cache import DISK_CACHE
from utils import parse_kv_pairs
logger = get_task_logger(__name__)
load_dotenv()
REDIS_FLUSHED_RECENTLY_KEY = "redis_flushed_recently"
REDIS_URL = os.environ["REDIS_URL"]
app = celery.Celery(
"worker",
broker=REDIS_URL,
backend=REDIS_URL,
)
redis_instance = redis.StrictRedis.from_url(REDIS_URL)
cache = DISK_CACHE
# cache_lock = CACHE_LOCK
class BaseTask(Task):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
try:
resp = redis_instance.ping()
print(f"{redis}: ping response: {resp}")
except Exception as e:
print(f"error connecting to redis: {e}")
traceback.print_exception(e)
def run(self, *args, **kwargs):
super().run(*args, **kwargs)
def set_redis(key: str, value: bytes, ttl: int):
# noinspection PyBroadException
try:
# print(f"set_redis: key={key}")
resp = redis_instance.set(key, value)
# print(f"redis set: {resp}")
resp = redis_instance.expire(key, ttl)
# print(f"redis expire: {resp}")
except Exception:
print_exc()
def set_cache(key: str, value: bytes):
# noinspection PyBroadException
try:
# with cache_lock:
# print(f"set_cache: key={key}")
cache.set(key, value)
except Exception:
print_exc()
@app.task(base=BaseTask, ignore_result=True)
def do_vercel_get(vercel_url: str, vercel_route: str):
logger.info("do_vercel_get: '%s' '%s'",
vercel_url, vercel_route)
vercel_url_parts = urlparse(vercel_url)
vercel_route_parts = urlparse(vercel_route)
# scheme, netloc, url, params, query, fragment
url = urlunparse((
vercel_url_parts.scheme,
vercel_url_parts.netloc,
vercel_route_parts.path,
None,
vercel_route_parts.query,
None,
))
resp = requests.get(url, timeout=30, stream=True)
headers = resp.headers
raw_response: urllib3.HTTPResponse = resp.raw
data: bytes = b""
for stream_bytes in raw_response.stream(1024, decode_content=False):
data += stream_bytes
# print(f"do_vercel_get headers: {pformat(headers)}")
dump_headers = {str(k): str(v) for k, v in headers.items()}
dump_payload = b64encode(data).decode("utf-8")
json_dump = orjson.dumps({
"headers": dump_headers,
"payload": dump_payload,
})
url_parts = urlparse(url)
key = urlunparse((
"",
"",
url_parts.path,
None,
url_parts.query,
None,
))
set_cache(key, json_dump)
ttl = 7500
cc = headers.get("cache-control")
if cc:
kvs = parse_kv_pairs(cc)
try:
ttl = int(kvs.get("max-age", 7500))
except ValueError:
pass
set_redis(key, json_dump, ttl)
@app.task(base=BaseTask, ignore_result=True)
def flush_redis():
if not redis_instance.get(REDIS_FLUSHED_RECENTLY_KEY):
print("flushing redis")
redis_instance.set(REDIS_FLUSHED_RECENTLY_KEY, 1)
redis_instance.expire(REDIS_FLUSHED_RECENTLY_KEY, 30)
redis_instance.flushdb(asynchronous=True)