Skip to content

Commit 71f7254

Browse files
committed
When the connection queue is full, respond with a 503 error
The 503 responses are run in a thread
1 parent fdaa24d commit 71f7254

File tree

2 files changed

+105
-2
lines changed

2 files changed

+105
-2
lines changed

cheroot/server.py

Lines changed: 71 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,38 @@
101101
)
102102

103103

104+
if sys.version_info[:2] >= (3, 13):
105+
from queue import (
106+
Queue as QueueWithShutdown,
107+
ShutDown as QueueShutDown,
108+
)
109+
else:
110+
111+
class QueueShutDown(Exception):
112+
"""Queue has been shut down."""
113+
114+
class QueueWithShutdown(queue.Queue):
115+
"""Equivalent to Python 3.13+ Queue."""
116+
117+
# Unique identifier used to indicate the queue is shut down().
118+
_SHUT_DOWN_TOKEN = object()
119+
120+
def shutdown(self, immediate=False):
121+
if immediate:
122+
while True:
123+
try:
124+
self.get_nowait()
125+
except queue.Empty:
126+
break
127+
self.put(self._SHUT_DOWN_TOKEN)
128+
129+
def get(self, *args, **kwargs):
130+
result = super(QueueWithShutdown, self).get(*args, **kwargs)
131+
if result is self._SHUT_DOWN_TOKEN:
132+
raise QueueShutDown
133+
return result
134+
135+
104136
IS_WINDOWS = platform.system() == 'Windows'
105137
"""Flag indicating whether the app is running under Windows."""
106138

@@ -1658,6 +1690,8 @@ def __init__(
16581690
self.reuse_port = reuse_port
16591691
self.clear_stats()
16601692

1693+
self._unservicable_conns = QueueWithShutdown()
1694+
16611695
def clear_stats(self):
16621696
"""Reset server stat counters.."""
16631697
self._start_time = None
@@ -1866,8 +1900,39 @@ def prepare(self): # noqa: C901 # FIXME
18661900
self.ready = True
18671901
self._start_time = time.time()
18681902

1903+
def _serve_unservicable(self):
1904+
"""Serve connections we can't handle a 503."""
1905+
while self.ready:
1906+
try:
1907+
conn = self._unservicable_conns.get()
1908+
except QueueShutDown:
1909+
return
1910+
request = HTTPRequest(self, conn)
1911+
try:
1912+
request.simple_response('503 Service Unavailable')
1913+
except (socket.error, errors.FatalSSLAlert):
1914+
# We're sending the 503 error to be polite, it it fails that's
1915+
# fine.
1916+
continue
1917+
except Exception as ex:
1918+
# We can't just raise an exception because that will kill this
1919+
# thread, and prevent 503 errors from being sent to future
1920+
# connections.
1921+
self.server.error_log(
1922+
repr(ex),
1923+
level=logging.ERROR,
1924+
traceback=True,
1925+
)
1926+
conn.linger = True
1927+
conn.close()
1928+
18691929
def serve(self):
18701930
"""Serve requests, after invoking :func:`prepare()`."""
1931+
# This thread will handle unservicable connections, as added to
1932+
# self._unservicable_conns queue. It will run forever, until
1933+
# self.stop() tells it to shut down.
1934+
threading.Thread(target=self._serve_unservicable).start()
1935+
18711936
while self.ready and not self.interrupt:
18721937
try:
18731938
self._connections.run(self.expiration_interval)
@@ -2162,8 +2227,7 @@ def process_conn(self, conn):
21622227
try:
21632228
self.requests.put(conn)
21642229
except queue.Full:
2165-
# Just drop the conn. TODO: write 503 back?
2166-
conn.close()
2230+
self._unservicable_conns.put(conn)
21672231

21682232
@property
21692233
def interrupt(self):
@@ -2201,6 +2265,11 @@ def stop(self): # noqa: C901 # FIXME
22012265
return # already stopped
22022266

22032267
self.ready = False
2268+
2269+
# This tells the thread that handles unservicable connections to shut
2270+
# down:
2271+
self._unservicable_conns.shutdown(immediate=True)
2272+
22042273
if self._start_time is not None:
22052274
self._run_time += time.time() - self._start_time
22062275
self._start_time = None

cheroot/test/test_server.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import types
99
import urllib.parse # noqa: WPS301
1010
import uuid
11+
from http import HTTPStatus
1112

1213
import pytest
1314

@@ -570,3 +571,36 @@ def test_threadpool_multistart_validation(monkeypatch):
570571
match='Threadpools can only be started once.',
571572
):
572573
tp.start()
574+
575+
576+
def test_overload_results_in_suitable_http_error(request):
577+
"""A server that can't keep up with requests returns a 503 HTTP error."""
578+
localhost = '127.0.0.1'
579+
httpserver = HTTPServer(
580+
bind_addr=(localhost, EPHEMERAL_PORT),
581+
gateway=Gateway,
582+
)
583+
# Can only handle on request in parallel:
584+
httpserver.requests = ThreadPool(
585+
min=1,
586+
max=1,
587+
accepted_queue_size=1,
588+
accepted_queue_timeout=0,
589+
server=httpserver,
590+
)
591+
592+
httpserver.prepare()
593+
serve_thread = threading.Thread(target=httpserver.serve)
594+
serve_thread.start()
595+
request.addfinalizer(httpserver.stop)
596+
# Stop the thread pool to ensure the queue fills up:
597+
httpserver.requests.stop()
598+
599+
_host, port = httpserver.bind_addr
600+
601+
# Use up the very limited thread pool queue we've set up, so future
602+
# requests fail:
603+
httpserver.requests._queue.put(None)
604+
605+
response = requests.get(f'http://{localhost}:{port}', timeout=20)
606+
assert response.status_code == HTTPStatus.SERVICE_UNAVAILABLE

0 commit comments

Comments
 (0)