From 7b19b32f2522c99591b3833a1a5933b949def0f2 Mon Sep 17 00:00:00 2001 From: Aaron Luna Date: Sat, 11 May 2024 10:16:39 -0700 Subject: [PATCH] feat: :sparkles: improve logging for same site requests --- app/core/rate_limit.py | 33 ++++++++++++++++++++------------- app/core/util.py | 13 +++++++++++++ 2 files changed, 33 insertions(+), 13 deletions(-) diff --git a/app/core/rate_limit.py b/app/core/rate_limit.py index a112d64..c2f07fa 100644 --- a/app/core/rate_limit.py +++ b/app/core/rate_limit.py @@ -12,6 +12,7 @@ from app.core.util import ( dtaware_fromtimestamp, format_timedelta_str, + get_dict_report, get_duration_between_timestamps, get_time_until_timestamp, s, @@ -104,7 +105,25 @@ def is_exceeded(self, request: Request) -> Result[None]: def apply_rate_limit_to_request(self, request: Request, client_ip: str): if self.settings.is_test: return enable_rate_limit_feature_for_test(request) - return rate_limit_applies_to_route(request) and client_ip_is_external(request, client_ip) # pragma: no cover + return self.rate_limit_applies_to_route(request) and self.client_ip_is_external( + request, client_ip + ) # pragma: no cover + + def rate_limit_applies_to_route(self, request: Request) -> bool: # pragma: no cover + return bool(RATE_LIMIT_ROUTE_REGEX.search(request.url.path)) + + def client_ip_is_external(self, request: Request, client_ip: str) -> bool: # pragma: no cover + if client_ip in ["localhost", "127.0.0.1", "testserver"] or client_ip.startswith("172.17.0."): + return False + if "sec-fetch-site" in request.headers: + if request.headers["sec-fetch-site"] == "same-site": + self.logger.info(f"##### BYPASS RATE LIMITING (SAME SITE, IP: {client_ip}) #####") + for log in get_dict_report(request.headers): + self.logger.info(log) + return False + else: + return True + return True def get_allowed_at(self, tat: float) -> float: return (dtaware_fromtimestamp(tat) - self.delay_tolerance_ms).timestamp() @@ -148,18 +167,6 @@ def enable_rate_limit_feature_for_test(request: Request) -> bool: return False # pragma: no cover -def rate_limit_applies_to_route(request: Request) -> bool: # pragma: no cover - return bool(RATE_LIMIT_ROUTE_REGEX.search(request.url.path)) - - -def client_ip_is_external(request: Request, client_ip: str) -> bool: # pragma: no cover - if client_ip in ["localhost", "127.0.0.1", "testserver"] or client_ip.startswith("172.17.0."): - return False - if "sec-fetch-site" in request.headers: - return request.headers["sec-fetch-site"] != "same-site" - return True - - def get_time_portion(ts: float) -> str: return dtaware_fromtimestamp(ts).time().strftime("%I:%M:%S.%f %p") diff --git a/app/core/util.py b/app/core/util.py index ae2ded5..ceff917 100644 --- a/app/core/util.py +++ b/app/core/util.py @@ -77,3 +77,16 @@ def get_time_until_timestamp(ts: float) -> timedelta: def get_duration_between_timestamps(ts1: float, ts2: float) -> timedelta: return dtaware_fromtimestamp(ts2) - dtaware_fromtimestamp(ts1) + + +def get_dict_report(data: dict, title: str | None = None) -> list[str]: + def dots(key: str, max_len: int) -> str: + return "." * ((max_len - len(key)) + 2) + + report = [] + if title: + report.append([f"{'#' * 5} {title} {'#' * 5}"]) + max_key_len = max(len(str(key)) for key in data) + for key, value in data.items(): + report.append(f"{key}{dots(str(key), max_key_len)}: {value}") + return report