-
Notifications
You must be signed in to change notification settings - Fork 0
/
app.py
405 lines (315 loc) · 11.9 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
import os
import ssl
import urllib.parse
from base64 import b64decode
from base64 import b64encode
from contextlib import AbstractAsyncContextManager
from types import SimpleNamespace
from types import TracebackType
from typing import AsyncIterator
from typing import Optional
from typing import Type
from urllib.parse import urlparse
from urllib.parse import urlunparse
import aiohttp
import certifi
import orjson
import redis.asyncio as redis
from dotenv import load_dotenv
from multidict import CIMultiDict
from multidict import CIMultiDictProxy
from sanic import HTTPResponse
from sanic import Request
from sanic import Sanic
from sanic import redirect
from sanic.log import logger
# from cache import CACHE_LOCK
from cache import DISK_CACHE
from utils import parse_kv_pairs
from worker import do_vercel_get
from worker import flush_redis
load_dotenv()
DISKCACHE_VERSION_KEY = "cache_version"
class AppContext(SimpleNamespace):
def __init__(self, vercel_url: Optional[str] = None, **kwargs):
super().__init__(**kwargs)
self._vercel_url: str = vercel_url or ""
self._vercel_url_components: urllib.parse.ParseResult = urlparse(self._vercel_url)
self._aiohttp_session: Optional[aiohttp.ClientSession] = None
self._redis: Optional[redis.Redis] = None
self.ssl_context = ssl.create_default_context(cafile=certifi.where())
@property
def vercel_url(self) -> str:
return self._vercel_url
@property
def vercel_url_components(self) -> urllib.parse.ParseResult:
return self._vercel_url_components
@property
def aiohttp_session(self) -> aiohttp.ClientSession:
if not self._aiohttp_session:
self._aiohttp_session = aiohttp.ClientSession(
base_url=self.vercel_url,
auto_decompress=False,
)
return self._aiohttp_session
@aiohttp_session.setter
def aiohttp_session(self, session: aiohttp.ClientSession):
self._aiohttp_session = session
@property
def redis(self) -> "redis.Redis":
if not self._redis:
raise ValueError("redis not instance not set")
return self._redis
@redis.setter
def redis(self, redis_: "redis.Redis"):
self._redis = redis_
class CustomSanic(Sanic):
def __init__(self, *args, ctx: Optional[AppContext] = None, **kwargs):
super().__init__(*args, **kwargs)
self.ctx: AppContext = ctx or AppContext()
VERCEL_URL = "https://github-readme-stats-tuokri.vercel.app"
app = CustomSanic(
name=__name__,
ctx=AppContext(
vercel_url=VERCEL_URL,
),
)
app.config.AUTO_EXTEND = False
app.config.LOGGING = False
app.ctx.cache = DISK_CACHE
# app.ctx.cache_lock = CACHE_LOCK
def _set_cache(key: str, value: bytes):
# with app.ctx.cache_lock:
app.ctx.cache[key] = value
async def set_disk_cache(key: str, value: bytes):
future = app.loop.run_in_executor(None, _set_cache, key, value)
await future
def _get_cache(key: str) -> dict:
data = {}
# with app.ctx.cache_lock:
cached = app.ctx.cache.get(key)
if cached:
data = orjson.loads(cached)
if "headers" in data:
data["headers"] = CIMultiDict(data["headers"])
return data
async def get_disk_cache(key: str) -> dict:
future = app.loop.run_in_executor(None, _get_cache, key)
return await future
async def _do_vercel_get(vercel_url: str, vercel_route: str):
logger.info("scheduling Vercel get")
try:
do_vercel_get.delay(vercel_url, vercel_route)
except Exception as e:
logger.error("failed to schedule Vercel get: %s: %s",
type(e).__name__, e)
finally:
app.purge_tasks()
async def _schedule_vercel_get_task(vercel_url: str, vercel_route: str):
t = app.add_task(_do_vercel_get(vercel_url, vercel_route))
logger.info("added task %s", t)
class VercelSession(AbstractAsyncContextManager["VercelSession"]):
def __init__(self, request: Request) -> None:
self._resp: Optional[aiohttp.ClientResponse] = None
self._vercel_route = urlunparse((
"",
"",
request.path,
None,
request.query_string,
None,
))
self._headers = CIMultiDictProxy(CIMultiDict())
self._payload = b""
@property
def headers(self) -> CIMultiDictProxy:
return self._headers
async def __aenter__(self) -> "VercelSession":
await self._get()
return self
async def __aexit__(
self,
__exc_type: Type[BaseException] | None,
__exc_value: BaseException | None,
__traceback: TracebackType | None,
) -> bool | None:
return None
async def _redis_set(self, data: bytes):
try:
await app.ctx.redis.set(self._vercel_route, data)
ttl = 7500
cc = self._headers.get("cache-control")
if cc:
kvs = parse_kv_pairs(cc)
try:
ttl = int(kvs.get("max-age", 7500))
except ValueError:
pass
await app.ctx.redis.expire(self._vercel_route, ttl)
except redis.ConnectionError as ce:
logger.error("error setting redis data: %s: %s",
type(ce).__name__, ce, exc_info=False)
async def _redis_get(self):
try:
redis_data = await app.ctx.redis.get(self._vercel_route)
if redis_data:
redis_dict = orjson.loads(redis_data)
self._headers = CIMultiDict(redis_dict.get("headers"))
self._payload = b64decode(redis_dict.get("payload"))
except redis.ConnectionError as ce:
logger.error("error getting redis data: %s: %s",
type(ce).__name__, ce)
async def _diskcache_set(self, data: bytes):
try:
await set_disk_cache(self._vercel_route, data)
except Exception as e:
logger.info("error getting diskcache data: %s: %s",
type(e).__name__, e)
async def _diskcache_get(self):
try:
disk_data = await get_disk_cache(self._vercel_route)
if disk_data:
self._headers = CIMultiDict(disk_data.get("headers"))
self._payload = b64decode(disk_data.get("payload"))
except Exception as e:
logger.info("error setting diskcache data: %s: %s",
type(e).__name__, e)
async def _vercel_get(self):
logger.info("performing vercel get immediately")
async with app.ctx.aiohttp_session.get(
url=self._vercel_route,
ssl_context=app.ctx.ssl_context) as resp:
self._headers = resp.headers.copy()
# print(f"_vercel_get headers: {pformat(self._headers)}")
self._payload = b""
async for chunk in resp.content.iter_chunked(4096):
self._payload += chunk
dump_headers = {str(k): str(v) for k, v in self._headers.items()}
dump_payload = b64encode(self._payload).decode("utf-8")
json_dump = orjson.dumps({
"headers": dump_headers,
"payload": dump_payload,
})
await self._redis_set(json_dump)
await self._diskcache_set(json_dump)
logger.info("vercel get results cached")
async def _get(self):
await self._redis_get()
if self._headers and self._payload:
return
logger.info("redis cache miss for: '%s'", self._vercel_route)
await self._diskcache_get()
# Schedule always to refresh cache in the background.
await _schedule_vercel_get_task(app.ctx.vercel_url, self._vercel_route)
if self._headers and self._payload:
return
logger.info("diskcache miss for '%s'", self._vercel_route)
# Not found in any caches, gotta do a Vercel get immediately.
await self._vercel_get()
async def iter_chunked(self) -> AsyncIterator[bytes]:
# TODO: stream directly from Redis or Diskcache
# here instead of caching the payload in VercelSession
# instance variables?
yield self._payload
async def vercel_get(request: Request):
if not request.url:
raise PermissionError
async with VercelSession(request) as vs:
response = await request.respond(
headers={
"content-encoding": vs.headers.get("content-encoding", ""),
},
content_type=vs.headers.get("content-type", ""),
)
async for data in vs.iter_chunked():
await response.send(data)
@app.before_server_start
async def before_server_start(_app: CustomSanic):
_app.ctx.aiohttp_session = aiohttp.ClientSession(
base_url=_app.ctx.vercel_url,
auto_decompress=False,
)
_app.ctx.redis = redis.StrictRedis(
# connection_pool=REDIS_POOL,
).from_url(os.environ["REDIS_URL"])
logger.info("created redis instance: %s", _app.ctx.redis)
try:
await _app.ctx.redis.ping()
except Exception as e:
logger.error(f"error connecting to redis: %s", e)
logger.exception(e)
# noinspection PyBroadException
@app.after_server_stop
async def after_server_stop(_app: CustomSanic):
try:
await _app.ctx.aiohttp_session.close()
except Exception:
pass
try:
await _app.ctx.redis.close()
except Exception:
pass
@app.get("/api/<_endpoint:(.+)>/")
async def api_endpoint(request: Request, _endpoint: str) -> HTTPResponse:
try:
await vercel_get(request)
except PermissionError:
return HTTPResponse(status=400)
@app.get("/api/")
async def api_root(request: Request) -> HTTPResponse:
try:
await vercel_get(request)
except PermissionError:
return HTTPResponse(status=400)
@app.get("/")
async def root(*_) -> HTTPResponse:
return redirect("https://github.com/tuokri/github-readme-stats-cache")
@app.exception
async def on_exception(request: Request, exc: Exception) -> HTTPResponse:
logger.error("error on request:%s: %s: %s", request, type(exc).__name__, exc)
logger.exception()
return HTTPResponse(status=500)
@app.after_server_start
async def after_server_start(*_):
# Pre-warm caches.
# TODO: probably don't want to run this in each worker.
# TODO: hard-coding these is kinda tedious.
do_vercel_get.delay(
VERCEL_URL,
("/api?username=tuokri&count_private=true&theme=default&"
"show_icons=true&include_all_commits=true"),
)
do_vercel_get.delay(
VERCEL_URL,
("/api?username=tuokri&count_private=true&theme=synthwave&"
"show_icons=true&include_all_commits=true"),
)
do_vercel_get.delay(
VERCEL_URL,
("/api/top-langs/?username=tuokri&layout=compact&"
"theme=default&langs_count=10&count_private=true&"
"size_weight=0.6&count_weight=0.4&"
"exclude_repo=github-readme-stats,DPP,mumble,UnrealEngine,"
"pyspellchecker,ftp-tail,SquadJS,CnC_Remastered_Collection,UDK-Lite"),
)
do_vercel_get.delay(
VERCEL_URL,
("/api/top-langs/?username=tuokri&layout=compact&"
"theme=synthwave&langs_count=10&count_private=true&"
"size_weight=0.6&count_weight=0.4&"
"exclude_repo=github-readme-stats,DPP,mumble,UnrealEngine,"
"pyspellchecker,ftp-tail,SquadJS,CnC_Remastered_Collection,UDK-Lite"),
)
@app.main_process_start
async def main_process_start(*_):
from _version import __version__
cache_version = app.ctx.cache.get(DISKCACHE_VERSION_KEY)
logger.info("cache_version: %s", cache_version)
logger.info("__version__: %s", __version__)
if cache_version != __version__:
logger.info("version changed, clearing cache")
flush_redis.delay()
app.ctx.cache.clear(retry=True)
app.ctx.cache.set(DISKCACHE_VERSION_KEY, __version__)
if __name__ == "__main__":
app.run(debug=True, dev=True, workers=4)