|
101 | 101 | ) |
102 | 102 |
|
103 | 103 |
|
| 104 | +if sys.version_info[:2] >= (3, 13): |
| 105 | + from queue import ( |
| 106 | + Queue as QueueWithShutdown, |
| 107 | + ShutDown as QueueShutDown, |
| 108 | + ) |
| 109 | +else: |
| 110 | + |
| 111 | + class QueueShutDown(Exception): |
| 112 | + """Queue has been shut down.""" |
| 113 | + |
| 114 | + class QueueWithShutdown(queue.Queue): |
| 115 | + """Add shutdown() similar to Python 3.13+ Queue.""" |
| 116 | + |
| 117 | + _queue_shut_down: bool = False |
| 118 | + |
| 119 | + def shutdown(self, immediate=False): |
| 120 | + if immediate: |
| 121 | + while True: |
| 122 | + try: |
| 123 | + self.get_nowait() |
| 124 | + except queue.Empty: |
| 125 | + break |
| 126 | + self._queue_shut_down = True |
| 127 | + |
| 128 | + def get(self, *args, **kwargs): |
| 129 | + if self._queue_shut_down: |
| 130 | + raise QueueShutDown |
| 131 | + return super().get(*args, **kwargs) |
| 132 | + |
| 133 | + |
104 | 134 | IS_WINDOWS = platform.system() == 'Windows' |
105 | 135 | """Flag indicating whether the app is running under Windows.""" |
106 | 136 |
|
@@ -1658,6 +1688,8 @@ def __init__( |
1658 | 1688 | self.reuse_port = reuse_port |
1659 | 1689 | self.clear_stats() |
1660 | 1690 |
|
| 1691 | + self._unservicable_conns = QueueWithShutdown() |
| 1692 | + |
1661 | 1693 | def clear_stats(self): |
1662 | 1694 | """Reset server stat counters..""" |
1663 | 1695 | self._start_time = None |
@@ -1866,8 +1898,39 @@ def prepare(self): # noqa: C901 # FIXME |
1866 | 1898 | self.ready = True |
1867 | 1899 | self._start_time = time.time() |
1868 | 1900 |
|
| 1901 | + def _serve_unservicable(self): |
| 1902 | + """Serve connections we can't handle a 503.""" |
| 1903 | + while self.ready: |
| 1904 | + try: |
| 1905 | + conn = self._unservicable_conns.get() |
| 1906 | + except QueueShutDown: |
| 1907 | + return |
| 1908 | + request = HTTPRequest(self, conn) |
| 1909 | + try: |
| 1910 | + request.simple_response('503 Service Unavailable') |
| 1911 | + except (OSError, errors.FatalSSLAlert): |
| 1912 | + # We're sending the 503 error to be polite, it it fails that's |
| 1913 | + # fine. |
| 1914 | + continue |
| 1915 | + except Exception as ex: |
| 1916 | + # We can't just raise an exception because that will kill this |
| 1917 | + # thread, and prevent 503 errors from being sent to future |
| 1918 | + # connections. |
| 1919 | + self.server.error_log( |
| 1920 | + repr(ex), |
| 1921 | + level=logging.ERROR, |
| 1922 | + traceback=True, |
| 1923 | + ) |
| 1924 | + conn.linger = True |
| 1925 | + conn.close() |
| 1926 | + |
1869 | 1927 | def serve(self): |
1870 | 1928 | """Serve requests, after invoking :func:`prepare()`.""" |
| 1929 | + # This thread will handle unservicable connections, as added to |
| 1930 | + # self._unservicable_conns queue. It will run forever, until |
| 1931 | + # self.stop() tells it to shut down. |
| 1932 | + threading.Thread(target=self._serve_unservicable).start() |
| 1933 | + |
1871 | 1934 | while self.ready and not self.interrupt: |
1872 | 1935 | try: |
1873 | 1936 | self._connections.run(self.expiration_interval) |
@@ -2162,8 +2225,7 @@ def process_conn(self, conn): |
2162 | 2225 | try: |
2163 | 2226 | self.requests.put(conn) |
2164 | 2227 | except queue.Full: |
2165 | | - # Just drop the conn. TODO: write 503 back? |
2166 | | - conn.close() |
| 2228 | + self._unservicable_conns.put(conn) |
2167 | 2229 |
|
2168 | 2230 | @property |
2169 | 2231 | def interrupt(self): |
@@ -2201,6 +2263,11 @@ def stop(self): # noqa: C901 # FIXME |
2201 | 2263 | return # already stopped |
2202 | 2264 |
|
2203 | 2265 | self.ready = False |
| 2266 | + |
| 2267 | + # This tells the thread that handles unservicable connections to shut |
| 2268 | + # down: |
| 2269 | + self._unservicable_conns.shutdown(immediate=True) |
| 2270 | + |
2204 | 2271 | if self._start_time is not None: |
2205 | 2272 | self._run_time += time.time() - self._start_time |
2206 | 2273 | self._start_time = None |
|
0 commit comments