Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add rate limit HTTP service for external rate limits #1460

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions binderhub/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
from .health import HealthHandler
from .launcher import Launcher
from .log import log_request
from .ratelimit import RateLimiter
from .ratelimit import RequestRateLimiter
from .repoproviders import RepoProvider
from .registry import DockerRegistry
from .main import MainHandler, ParameterizedMainHandler, LegacyRedirectHandler
Expand Down Expand Up @@ -329,6 +329,18 @@ def _valid_badge_base_url(self, proposal):
config=True,
)

rate_limit_url = Unicode(
config=True,
help="""Use external rate-limiter service

Allows shared rate-limit state across a federation of BinderHub instances
""",
)

rate_limit_token = Unicode(
config=True, help="""Token used to access external rate limit service"""
)

log_tail_lines = Integer(
100,
help="""
Expand Down Expand Up @@ -788,7 +800,8 @@ def initialize(self, *args, **kwargs):
"per_repo_quota": self.per_repo_quota,
"per_repo_quota_higher": self.per_repo_quota_higher,
"repo_providers": self.repo_providers,
"rate_limiter": RateLimiter(parent=self),
"rate_limit_url": self.rate_limit_url,
"rate_limit_token": self.rate_limit_token,
"use_registry": self.use_registry,
"build_class": self.build_class,
"registry": registry,
Expand All @@ -814,6 +827,12 @@ def initialize(self, *args, **kwargs):
"normalized_origin": self.normalized_origin,
}
)
if not self.rate_limit_url:
self.tornado_settings["rate_limiters"] = {
"request": RequestRateLimiter(parent=self),
"repo": RepoRateLimiter(parent=self),
}

if self.auth_enabled:
self.tornado_settings['cookie_secret'] = os.urandom(32)
if self.cors_allow_origin:
Expand Down
69 changes: 51 additions & 18 deletions binderhub/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@
import jwt
from http.client import responses
from tornado import web
from tornado.httpclient import AsyncHTTPClient, HTTPClientError
from tornado.log import app_log
from jupyterhub.services.auth import HubOAuthenticated, HubOAuth

from . import __version__ as binder_version
from .ratelimit import RateLimitExceeded
from .utils import ip_in_networks
from .utils import ip_in_networks, url_path_join


class BaseHandler(HubOAuthenticated, web.RequestHandler):
Expand Down Expand Up @@ -98,12 +99,51 @@ def check_build_token(self, build_token, provider_spec):
self._have_build_token = True
return decoded

def check_rate_limit(self):
rate_limiter = self.settings["rate_limiter"]
if rate_limiter.limit == 0:
# no limit enabled
return
async def _check_rate_limit(self, which, key, quota=None):

if self.settings["rate_limit_url"]:
# check with remote rate-limiter service
# defined in binderhub/ratelimitapp
if quota:
body = json.dumps({"quota": quota})
else:
body = ""
try:
response = await AsyncHTTPClient().fetch(
url_path_join(self.settings["rate_limit_url"], which, key),
method="POST",
body=body,
headers={
"Authorization": f"Bearer {self.settings['rate_limit_token']}",
},
)
limit = json.loads(response.body)["limit"]
except HTTPClientError as e:
if e.code == 429:
# turn remote 429 back into RateLimitExceeded
response = json.loads(e.response.body)
raise RateLimitExceeded(response["message"])
else:
app_log.warning(f"Failed to check external rate limit: {e}")
return
else:
# check with internal rate limiter
rate_limiter = self.settings["rate_limiters"][which]
if rate_limiter.limit == 0:
# no limit enabled
return

limit = rate_limiter.increment(key, quota)

app_log.debug(f"Rate limit for {which}/{key}: {limit}")

self.set_header("x-ratelimit-remaining", str(limit["remaining"]))
self.set_header("x-ratelimit-reset", str(limit["reset"]))
self.set_header("x-ratelimit-limit", str(limit["limit"]))

return limit

async def check_request_rate_limit(self):
if self.settings['auth_enabled'] and self.current_user:
# authenticated, no limit
# TODO: separate authenticated limit
Expand All @@ -116,19 +156,12 @@ def check_rate_limit(self):

# rate limit is applied per-ip
request_ip = self.request.remote_ip
try:
limit = rate_limiter.increment(request_ip)
except RateLimitExceeded:
raise web.HTTPError(
429,
f"Rate limit exceeded. Try again in {rate_limiter.period_seconds} seconds.",
)
else:
app_log.debug(f"Rate limit for {request_ip}: {limit}")
return self._check_rate_limit("request", request_ip)

self.set_header("x-ratelimit-remaining", str(limit["remaining"]))
self.set_header("x-ratelimit-reset", str(limit["reset"]))
self.set_header("x-ratelimit-limit", str(rate_limiter.limit))
def check_repo_rate_limit(self, repo_url, quota):
return self._check_rate_limit(
"repo", urllib.parse.quote(repo_url, safe=""), quota
)

def get_current_user(self):
if not self.settings['auth_enabled']:
Expand Down
67 changes: 33 additions & 34 deletions binderhub/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import docker
from tornado import gen
from tornado.httpclient import HTTPClientError
from tornado.web import Finish, authenticated
from tornado.web import Finish, HTTPError, authenticated
from tornado.queues import Queue
from tornado.iostream import StreamClosedError
from tornado.ioloop import IOLoop
Expand All @@ -23,6 +23,7 @@

from .base import BaseHandler
from .build import ProgressEvent
from .ratelimit import RateLimitExceeded
from .utils import KUBE_REQUEST_TIMEOUT

# Separate buckets for builds and launches.
Expand Down Expand Up @@ -242,7 +243,6 @@ async def get(self, provider_prefix, _unescaped_spec):
# verify the build token and rate limit
build_token = self.get_argument("build_token", None)
self.check_build_token(build_token, f"{provider_prefix}/{spec}")
self.check_rate_limit()

# Verify if the provider is valid for EventSource.
# EventSource cannot handle HTTP errors, so we must validate and send
Expand Down Expand Up @@ -280,6 +280,30 @@ async def get(self, provider_prefix, _unescaped_spec):
'repo': repo_url,
}

# check request (client ip) rate limit
try:
await self.check_request_rate_limit()
except RateLimitExceeded as e:
LAUNCH_COUNT.labels(
status="request_quota",
).inc()
raise HTTPError(429, str(e))

# check repo rate limit
repo_config = provider.repo_config(self.settings)
# TODO: put busy users in a queue rather than fail?
# That would be hard to do without in-memory state.
try:
await self.check_repo_rate_limit(repo_url, quota=repo_config.get("quota"))
except RateLimitExceeded as e:
LAUNCH_COUNT.labels(
status="repo_quota",
**self.repo_metric_labels,
).inc()
app_log.error(str(e))
await self.fail(f"Too many users running {self.repo_url}! Try again soon.")
return

try:
ref = await provider.get_resolved_ref()
except Exception as e:
Expand Down Expand Up @@ -511,20 +535,8 @@ async def get(self, provider_prefix, _unescaped_spec):

async def launch(self, provider):
"""Ask JupyterHub to launch the image."""
# Load the spec-specific configuration if it has been overridden
repo_config = provider.repo_config(self.settings)

# the image name (without tag) is unique per repo
# use this to count the number of pods running with a given repo
# if we added annotations/labels with the repo name via KubeSpawner
# we could do this better
image_no_tag = self.image_name.rsplit(':', 1)[0]

# TODO: put busy users in a queue rather than fail?
# That would be hard to do without in-memory state.
repo_quota = repo_config.get("quota")
pod_quota = self.settings["pod_quota"]
if pod_quota is not None or repo_quota:
if pod_quota is not None:
# Fetch info on currently running users *only* if quotas are set
matching_pods = 0

Expand Down Expand Up @@ -559,25 +571,12 @@ async def launch(self, provider):
matching_pods += 1
break

if repo_quota and matching_pods >= repo_quota:
LAUNCH_COUNT.labels(
status="repo_quota",
**self.repo_metric_labels,
).inc()
app_log.error(
f"{self.repo_url} has exceeded quota: {matching_pods}/{repo_quota} ({total_pods} total)"
)
await self.fail(
f"Too many users running {self.repo_url}! Try again soon."
)
return

if matching_pods >= 0.5 * repo_quota:
log = app_log.warning
else:
log = app_log.info
log("Launching pod for %s: %s other pods running this repo (%s total)",
self.repo_url, matching_pods, total_pods)
app_log.info(
"Launching pod for %s: %s other pods running this repo (%s total)",
self.repo_url,
matching_pods,
total_pods,
)

await self.emit({
'phase': 'launching',
Expand Down
55 changes: 48 additions & 7 deletions binderhub/ratelimit.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from traitlets import Integer, Dict, Float, default
from traitlets.config import LoggingConfigurable


class RateLimitExceeded(Exception):
"""Exception raised when rate limit is exceeded"""

Expand Down Expand Up @@ -66,11 +65,17 @@ def time():
"""Mostly here to enable override in tests"""
return time.time()

def increment(self, key):
def increment(self, key, quota=None):
"""Check rate limit for a key

key: key for recording rate limit. Each key tracks a different rate limit.
Returns: {"remaining": int_remaining, "reset": int_timestamp}
Returns:
{
"remaining": int_remaining,
"limit": int_total_limit,
"reset": int_timestamp,
"reset_in": int_seconds_remaining,
}
Raises: RateLimitExceeded if the request would exceed the rate limit.
"""
now = int(self.time())
Expand All @@ -79,17 +84,53 @@ def increment(self, key):

if key not in self._limits or self._limits[key]["reset"] < now:
# no limit recorded, or reset expired
max_limit = quota or self.limit
self._limits[key] = {
"remaining": self.limit,
"remaining": max_limit,
"limit": max_limit,
"reset": now + self.period_seconds,
}
limit = self._limits[key]
# keep decrementing, so we have a track of excess requests
# which indicate abuse
limit["remaining"] -= 1
reset_in = int(limit["reset"] - now)
if limit["remaining"] < 0:
seconds_until_reset = int(limit["reset"] - now)
raise RateLimitExceeded(
f"Rate limit exceeded (by {-limit['remaining']}) for {key!r}, reset in {seconds_until_reset}s."
f"Rate limit exceeded (by {-limit['remaining']}) for {key!r}, reset in {reset_in}s."
)
return limit
limit_copy = limit.copy()
limit_copy["reset_in"] = reset_in
return limit_copy


# classes for storing separate config


class RequestRateLimiter(RateLimiter):
"""RateLimiter subclass for client requests

Rate limit is applied to launch requests by client ip
"""


class RepoRateLimiter(RateLimiter):
"""RateLimiter subclass for repo launches

Rate limit is applied to launch requests by repo
"""

@default("limit")
def _defaul_limit(self):
# default: no limit
return 0


def main():
from .ratelimitapp import main

main()


if __name__ == "__main__":
main()
Loading