From 49618bea69625d9e2a8dd962f43dcf376f575425 Mon Sep 17 00:00:00 2001 From: Kurt McKee Date: Thu, 25 Jul 2024 15:17:48 -0500 Subject: [PATCH 01/22] Close a temporary file acting as a fake socket This resolves a `ResourceWarning` caused by the test suite. --- tests/t.py | 3 +++ tests/test_http.py | 13 ++++++++----- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/tests/t.py b/tests/t.py index 9b76e7deb..4551e9444 100644 --- a/tests/t.py +++ b/tests/t.py @@ -65,3 +65,6 @@ def send(self, data): def seek(self, offset, whence=0): self.tmp.seek(offset, whence) + + def close(self): + self.tmp.close() diff --git a/tests/test_http.py b/tests/test_http.py index f0ddc3bb2..658690d25 100644 --- a/tests/test_http.py +++ b/tests/test_http.py @@ -184,11 +184,14 @@ def test_socket_unreader_chunk(): fake_sock = t.FakeSocket(io.BytesIO(b'Lorem ipsum dolor')) sock_unreader = SocketUnreader(fake_sock, max_chunk=5) - assert sock_unreader.chunk() == b'Lorem' - assert sock_unreader.chunk() == b' ipsu' - assert sock_unreader.chunk() == b'm dol' - assert sock_unreader.chunk() == b'or' - assert sock_unreader.chunk() == b'' + try: + assert sock_unreader.chunk() == b'Lorem' + assert sock_unreader.chunk() == b' ipsu' + assert sock_unreader.chunk() == b'm dol' + assert sock_unreader.chunk() == b'or' + assert sock_unreader.chunk() == b'' + finally: + fake_sock.close() def test_length_reader_read(): From 62530ef41bea468ab91ca0cd63d9e40072fb5519 Mon Sep 17 00:00:00 2001 From: Kurt McKee Date: Thu, 25 Jul 2024 15:22:42 -0500 Subject: [PATCH 02/22] Close open sockets when an exception is encountered --- gunicorn/instrument/statsd.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/gunicorn/instrument/statsd.py b/gunicorn/instrument/statsd.py index 2c54b2e72..e993b46da 100644 --- a/gunicorn/instrument/statsd.py +++ b/gunicorn/instrument/statsd.py @@ -32,11 +32,14 @@ def __init__(self, cfg): else: address_family = socket.AF_INET + self.sock = None try: self.sock = socket.socket(address_family, socket.SOCK_DGRAM) self.sock.connect(cfg.statsd_host) except Exception: - self.sock = None + if self.sock is not None: + self.sock.close() + self.sock = None self.dogstatsd_tags = cfg.dogstatsd_tags From dc197ffbf91c8dc1de28388e02e01f371d849d66 Mon Sep 17 00:00:00 2001 From: Kurt McKee Date: Thu, 25 Jul 2024 16:54:09 -0500 Subject: [PATCH 03/22] Close open files to prevent `ResourceWarning` in the test suite --- tests/test_reload.py | 32 +++++++++++++++++++------------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/tests/test_reload.py b/tests/test_reload.py index f5b182583..5ea0549f1 100644 --- a/tests/test_reload.py +++ b/tests/test_reload.py @@ -39,9 +39,12 @@ def test_reload_on_syntax_error(): log = mock.Mock() worker = MyWorker(age=0, ppid=0, sockets=[], app=app, timeout=0, cfg=cfg, log=log) - worker.init_process() - reloader.start.assert_called_with() - reloader.add_extra_file.assert_called_with('syntax_error_filename') + try: + worker.init_process() + reloader.start.assert_called_with() + reloader.add_extra_file.assert_called_with('syntax_error_filename') + finally: + worker.tmp.close() def test_start_reloader_after_load_wsgi(): @@ -56,13 +59,16 @@ def test_start_reloader_after_load_wsgi(): log = mock.Mock() worker = MyWorker(age=0, ppid=0, sockets=[], app=app, timeout=0, cfg=cfg, log=log) - worker.load_wsgi = mock.Mock() - mock_parent = mock.Mock() - mock_parent.attach_mock(worker.load_wsgi, 'load_wsgi') - mock_parent.attach_mock(reloader.start, 'reloader_start') - - worker.init_process() - mock_parent.assert_has_calls([ - mock.call.load_wsgi(), - mock.call.reloader_start(), - ]) + try: + worker.load_wsgi = mock.Mock() + mock_parent = mock.Mock() + mock_parent.attach_mock(worker.load_wsgi, 'load_wsgi') + mock_parent.attach_mock(reloader.start, 'reloader_start') + + worker.init_process() + mock_parent.assert_has_calls([ + mock.call.load_wsgi(), + mock.call.reloader_start(), + ]) + finally: + worker.tmp.close() From 7756175f54e3d5da52572ad4d0e984e69b246944 Mon Sep 17 00:00:00 2001 From: Kurt McKee Date: Thu, 25 Jul 2024 16:54:53 -0500 Subject: [PATCH 04/22] Escalate warnings to errors during testing This ensures that unclosed resources will be caught in CI --- pyproject.toml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index eaca1eac0..24fae1d6c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -76,6 +76,9 @@ main = "gunicorn.app.pasterapp:serve" norecursedirs = ["examples", "lib", "local", "src"] testpaths = ["tests/"] addopts = "--assert=plain --cov=gunicorn --cov-report=xml" +filterwarnings = [ + "error", +] [tool.setuptools] zip-safe = false From 2096e428357fb26bed917d456a0f6ba741284cb8 Mon Sep 17 00:00:00 2001 From: "Paul J. Dorn" Date: Tue, 13 Aug 2024 22:24:36 +0200 Subject: [PATCH 05/22] config: reload-extra without reload --- docs/source/settings.rst | 9 ++++++++- gunicorn/config.py | 9 ++++++++- gunicorn/reloader.py | 15 ++++++++++++--- gunicorn/workers/base.py | 4 ++-- 4 files changed, 30 insertions(+), 7 deletions(-) diff --git a/docs/source/settings.rst b/docs/source/settings.rst index e1e91fa76..824b42fee 100644 --- a/docs/source/settings.rst +++ b/docs/source/settings.rst @@ -82,6 +82,10 @@ The default behavior is to attempt inotify with a fallback to file system polling. Generally, inotify should be preferred if available because it consumes less system resources. +.. note:: + If the application fails to load while this option is used, + the (potentially sensitive!) traceback will be shared in + the response to subsequent HTTP requests. .. note:: In order to use the inotify reloader, you must have the ``inotify`` package installed. @@ -114,10 +118,13 @@ Valid engines are: **Default:** ``[]`` -Extends :ref:`reload` option to also watch and reload on additional files +Alternative or extension to :ref:`reload` option to (also) watch +and reload on additional files (e.g., templates, configurations, specifications, etc.). .. versionadded:: 19.8 +.. versionchanged:: 23.FIXME + Option no longer silently ignored if used without :ref:`reload`. .. _spew: diff --git a/gunicorn/config.py b/gunicorn/config.py index 402a26b68..e37238d81 100644 --- a/gunicorn/config.py +++ b/gunicorn/config.py @@ -921,6 +921,10 @@ class Reload(Setting): system polling. Generally, inotify should be preferred if available because it consumes less system resources. + .. note:: + If the application fails to load while this option is used, + the (potentially sensitive!) traceback will be shared in + the response to subsequent HTTP requests. .. note:: In order to use the inotify reloader, you must have the ``inotify`` package installed. @@ -956,10 +960,13 @@ class ReloadExtraFiles(Setting): validator = validate_list_of_existing_files default = [] desc = """\ - Extends :ref:`reload` option to also watch and reload on additional files + Alternative or extension to :ref:`reload` option to (also) watch + and reload on additional files (e.g., templates, configurations, specifications, etc.). .. versionadded:: 19.8 + .. versionchanged:: 23.FIXME + Option no longer silently ignored if used without :ref:`reload`. """ diff --git a/gunicorn/reloader.py b/gunicorn/reloader.py index 1c67f2a7d..51fd15c88 100644 --- a/gunicorn/reloader.py +++ b/gunicorn/reloader.py @@ -14,17 +14,21 @@ class Reloader(threading.Thread): - def __init__(self, extra_files=None, interval=1, callback=None): + def __init__(self, extra_files=None, interval=1, callback=None, auto_detect=False): super().__init__() self.daemon = True self._extra_files = set(extra_files or ()) self._interval = interval self._callback = callback + self._auto_detect = auto_detect def add_extra_file(self, filename): self._extra_files.add(filename) def get_files(self): + if not self._auto_detect: + return self._extra_files + fnames = [ COMPILED_EXT_RE.sub('py', module.__file__) for module in tuple(sys.modules.values()) @@ -71,12 +75,13 @@ class InotifyReloader(threading.Thread): | inotify.constants.IN_MOVE_SELF | inotify.constants.IN_MOVED_FROM | inotify.constants.IN_MOVED_TO) - def __init__(self, extra_files=None, callback=None): + def __init__(self, extra_files=None, callback=None, auto_detect=False): super().__init__() self.daemon = True self._callback = callback self._dirs = set() self._watcher = Inotify() + self._auto_detect = auto_detect for extra_file in extra_files: self.add_extra_file(extra_file) @@ -91,6 +96,9 @@ def add_extra_file(self, filename): self._dirs.add(dirname) def get_dirs(self): + if not self._auto_detect: + return set() + fnames = [ os.path.dirname(os.path.abspath(COMPILED_EXT_RE.sub('py', module.__file__))) for module in tuple(sys.modules.values()) @@ -100,6 +108,7 @@ def get_dirs(self): return set(fnames) def run(self): + # FIXME: _watchers/_dirs inconsistent - latter gets reset self._dirs = self.get_dirs() for dirname in self._dirs: @@ -117,7 +126,7 @@ def run(self): else: class InotifyReloader: - def __init__(self, extra_files=None, callback=None): + def __init__(self, extra_files=None, callback=None, auto_detect=False): raise ImportError('You must have the inotify module installed to ' 'use the inotify reloader') diff --git a/gunicorn/workers/base.py b/gunicorn/workers/base.py index 93c465c98..8869472dc 100644 --- a/gunicorn/workers/base.py +++ b/gunicorn/workers/base.py @@ -119,7 +119,7 @@ def init_process(self): self.init_signals() # start the reloader - if self.cfg.reload: + if self.cfg.reload or self.cfg.reload_extra_files: def changed(fname): self.log.info("Worker reloading: %s modified", fname) self.alive = False @@ -130,7 +130,7 @@ def changed(fname): reloader_cls = reloader_engines[self.cfg.reload_engine] self.reloader = reloader_cls(extra_files=self.cfg.reload_extra_files, - callback=changed) + callback=changed, auto_detect=self.cfg.reload) self.load_wsgi() if self.reloader: From 56b3e4231c1889cb4db05667c7d6dfec736284d7 Mon Sep 17 00:00:00 2001 From: "Paul J. Dorn" Date: Tue, 13 Aug 2024 23:38:18 +0200 Subject: [PATCH 06/22] style: re-verbosify HTTP commentary --- gunicorn/http/message.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/gunicorn/http/message.py b/gunicorn/http/message.py index 59ce0bf4b..5279a07bd 100644 --- a/gunicorn/http/message.py +++ b/gunicorn/http/message.py @@ -178,8 +178,9 @@ def set_body_reader(self): elif name == "TRANSFER-ENCODING": # T-E can be a list # https://datatracker.ietf.org/doc/html/rfc9112#name-transfer-encoding - vals = [v.strip() for v in value.split(',')] - for val in vals: + te_split_at_comma = [v.strip() for v in value.split(',')] + # N.B. we might have split in the middle of quoted transfer-parameter + for val in te_split_at_comma: if val.lower() == "chunked": # DANGER: transfer codings stack, and stacked chunking is never intended if chunked: @@ -187,7 +188,7 @@ def set_body_reader(self): chunked = True elif val.lower() == "identity": # does not do much, could still plausibly desync from what the proxy does - # safe option: nuke it, its never needed + # safe option: reject, its never needed if chunked: raise InvalidHeader("TRANSFER-ENCODING", req=self) elif val.lower() in ('compress', 'deflate', 'gzip'): @@ -196,6 +197,8 @@ def set_body_reader(self): raise InvalidHeader("TRANSFER-ENCODING", req=self) self.force_close() else: + # DANGER: this not only rejects unknown encodings, but also + # leftovers from not splitting at transfer-coding boundary raise UnsupportedTransferCoding(value) if chunked: @@ -203,11 +206,13 @@ def set_body_reader(self): # a) CL + TE (TE overrides CL.. only safe if the recipient sees it that way too) # b) chunked HTTP/1.0 (always faulty) if self.version < (1, 1): - # framing wonky, see RFC 9112 Section 6.1 + # framing is faulty + # https://datatracker.ietf.org/doc/html/rfc9112#section-6.1-16 raise InvalidHeader("TRANSFER-ENCODING", req=self) if content_length is not None: # we cannot be certain the message framing we understood matches proxy intent # -> whatever happens next, remaining input must not be trusted + # https://datatracker.ietf.org/doc/html/rfc9112#section-6.1-15 raise InvalidHeader("CONTENT-LENGTH", req=self) self.body = Body(ChunkedReader(self, self.unreader)) elif content_length is not None: From 6d332ba4bc6bd83634a973e1cddc8fee6f99588b Mon Sep 17 00:00:00 2001 From: Raghu Udiyar Date: Thu, 20 Aug 2020 14:48:00 +0530 Subject: [PATCH 07/22] Add socket backlog metric If all the workers are busy or max connections is reached, new connections will queue in the socket backlog, which defaults to 2048. The `gunicorn.backlog` metric provide visibility into this queue, and give an idea on concurrency, and worker saturation. This also adds a distinction between the `timer` and `histogram` statsd metric types, which although treated the same, can be difference, for e.g. in this case histogram is not a timer: https://github.com/b/statsd_spec#timers --- gunicorn/arbiter.py | 7 +++++++ gunicorn/instrument/statsd.py | 10 ++++++++-- gunicorn/sock.py | 17 +++++++++++++++++ 3 files changed, 32 insertions(+), 2 deletions(-) diff --git a/gunicorn/arbiter.py b/gunicorn/arbiter.py index 1eaf453d5..49d7cb43a 100644 --- a/gunicorn/arbiter.py +++ b/gunicorn/arbiter.py @@ -583,6 +583,13 @@ def manage_workers(self): "value": active_worker_count, "mtype": "gauge"}) + backlog = sum([sock.get_backlog() for sock in self.LISTENERS]) + if backlog: + self.log.debug("socket backlog: {0}".format(backlog), + extra={"metric": "gunicorn.backlog", + "value": backlog, + "mtype": "histogram"}) + def spawn_worker(self): self.worker_age += 1 worker = self.worker_class(self.worker_age, self.pid, self.LISTENERS, diff --git a/gunicorn/instrument/statsd.py b/gunicorn/instrument/statsd.py index 7bc4e6ffd..708a1d6bf 100644 --- a/gunicorn/instrument/statsd.py +++ b/gunicorn/instrument/statsd.py @@ -17,6 +17,7 @@ GAUGE_TYPE = "gauge" COUNTER_TYPE = "counter" HISTOGRAM_TYPE = "histogram" +TIMER_TYPE = "timer" class Statsd(Logger): @@ -80,6 +81,8 @@ def log(self, lvl, msg, *args, **kwargs): self.increment(metric, value) elif typ == HISTOGRAM_TYPE: self.histogram(metric, value) + elif typ == TIMER_TYPE: + self.timer(metric, value) else: pass @@ -101,7 +104,7 @@ def access(self, resp, req, environ, request_time): status = status.decode('utf-8') if isinstance(status, str): status = int(status.split(None, 1)[0]) - self.histogram("gunicorn.request.duration", duration_in_ms) + self.timer("gunicorn.request.duration", duration_in_ms) self.increment("gunicorn.requests", 1) self.increment("gunicorn.request.status.%d" % status, 1) @@ -116,9 +119,12 @@ def increment(self, name, value, sampling_rate=1.0): def decrement(self, name, value, sampling_rate=1.0): self._sock_send("{0}{1}:-{2}|c|@{3}".format(self.prefix, name, value, sampling_rate)) - def histogram(self, name, value): + def timer(self, name, value): self._sock_send("{0}{1}:{2}|ms".format(self.prefix, name, value)) + def histogram(self, name, value): + self._sock_send("{0}{1}:{2}|h".format(self.prefix, name, value)) + def _sock_send(self, msg): try: if isinstance(msg, str): diff --git a/gunicorn/sock.py b/gunicorn/sock.py index eb2b6fa9c..b26bd4991 100644 --- a/gunicorn/sock.py +++ b/gunicorn/sock.py @@ -9,8 +9,10 @@ import stat import sys import time +import struct from gunicorn import util +PLATFORM = sys.platform class BaseSocket: @@ -70,6 +72,9 @@ def close(self): self.sock = None + def get_backlog(self): + return 0 + class TCPSocket(BaseSocket): @@ -88,6 +93,18 @@ def set_options(self, sock, bound=False): sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) return super().set_options(sock, bound=bound) + def get_backlog(self): + if self.sock and PLATFORM == "linux": + # tcp_info struct from include/uapi/linux/tcp.h + fmt = 'B'*8+'I'*24 + try: + tcp_info_struct = self.sock.getsockopt(socket.IPPROTO_TCP, + socket.TCP_INFO, 104) + # 12 is tcpi_unacked + return struct.unpack(fmt, tcp_info_struct)[12] + except AttributeError: + pass + return 0 class TCP6Socket(TCPSocket): From 4d532639070c112685c8fbd990c6842edffd97c8 Mon Sep 17 00:00:00 2001 From: Raghu Udiyar Date: Sun, 17 Jan 2021 19:55:25 +0530 Subject: [PATCH 08/22] Do not emit backlog metric if its unsupported --- gunicorn/arbiter.py | 6 +++++- gunicorn/sock.py | 8 +++++--- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/gunicorn/arbiter.py b/gunicorn/arbiter.py index 49d7cb43a..f5727c283 100644 --- a/gunicorn/arbiter.py +++ b/gunicorn/arbiter.py @@ -583,7 +583,11 @@ def manage_workers(self): "value": active_worker_count, "mtype": "gauge"}) - backlog = sum([sock.get_backlog() for sock in self.LISTENERS]) + backlog = sum([ + sock.get_backlog() + for sock in self.LISTENERS + if sock.get_backlog() is not None + ]) if backlog: self.log.debug("socket backlog: {0}".format(backlog), extra={"metric": "gunicorn.backlog", diff --git a/gunicorn/sock.py b/gunicorn/sock.py index b26bd4991..a87ca9733 100644 --- a/gunicorn/sock.py +++ b/gunicorn/sock.py @@ -73,7 +73,7 @@ def close(self): self.sock = None def get_backlog(self): - return 0 + return None class TCPSocket(BaseSocket): @@ -99,12 +99,14 @@ def get_backlog(self): fmt = 'B'*8+'I'*24 try: tcp_info_struct = self.sock.getsockopt(socket.IPPROTO_TCP, - socket.TCP_INFO, 104) + socket.TCP_INFO, 104) # 12 is tcpi_unacked return struct.unpack(fmt, tcp_info_struct)[12] except AttributeError: pass - return 0 + + return None + class TCP6Socket(TCPSocket): From aa73f3c9aa03209174a0c1a45895e71f8e958dbe Mon Sep 17 00:00:00 2001 From: Raghu Udiyar Date: Wed, 15 Nov 2023 10:43:26 +0530 Subject: [PATCH 09/22] Avoid calling get_backlog twice Fix failing lint tests --- gunicorn/arbiter.py | 8 ++++---- gunicorn/sock.py | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/gunicorn/arbiter.py b/gunicorn/arbiter.py index f5727c283..8537f1e85 100644 --- a/gunicorn/arbiter.py +++ b/gunicorn/arbiter.py @@ -583,11 +583,11 @@ def manage_workers(self): "value": active_worker_count, "mtype": "gauge"}) - backlog = sum([ - sock.get_backlog() + backlog = sum( + sock.get_backlog() or 0 for sock in self.LISTENERS - if sock.get_backlog() is not None - ]) + ) + if backlog: self.log.debug("socket backlog: {0}".format(backlog), extra={"metric": "gunicorn.backlog", diff --git a/gunicorn/sock.py b/gunicorn/sock.py index a87ca9733..13eeece71 100644 --- a/gunicorn/sock.py +++ b/gunicorn/sock.py @@ -96,7 +96,7 @@ def set_options(self, sock, bound=False): def get_backlog(self): if self.sock and PLATFORM == "linux": # tcp_info struct from include/uapi/linux/tcp.h - fmt = 'B'*8+'I'*24 + fmt = 'B' * 8 + 'I' * 24 try: tcp_info_struct = self.sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_INFO, 104) From 1fcadcb01670e4e4e21c93d5bc8b39897fdadff4 Mon Sep 17 00:00:00 2001 From: "Paul J. Dorn" Date: Wed, 14 Aug 2024 23:01:38 +0200 Subject: [PATCH 10/22] Revert "let's exception not bubble" This reverts commit 40232284934c32939c0e4e78caad1987c3773e08. We use sys.exit. On purpose. We should therefore not be catching SystemExit. --- gunicorn/workers/base_async.py | 2 +- gunicorn/workers/sync.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/gunicorn/workers/base_async.py b/gunicorn/workers/base_async.py index 9466d6aaa..082be66fc 100644 --- a/gunicorn/workers/base_async.py +++ b/gunicorn/workers/base_async.py @@ -81,7 +81,7 @@ def handle(self, listener, client, addr): self.log.debug("Ignoring socket not connected") else: self.log.debug("Ignoring EPIPE") - except BaseException as e: + except Exception as e: self.handle_error(req, client, addr, e) finally: util.close(client) diff --git a/gunicorn/workers/sync.py b/gunicorn/workers/sync.py index 4c029f912..754ae08fe 100644 --- a/gunicorn/workers/sync.py +++ b/gunicorn/workers/sync.py @@ -153,7 +153,7 @@ def handle(self, listener, client, addr): self.log.debug("Ignoring socket not connected") else: self.log.debug("Ignoring EPIPE") - except BaseException as e: + except Exception as e: self.handle_error(req, client, addr, e) finally: util.close(client) From 8617c39de9ec527111404d07b3ae9abd2e426d9d Mon Sep 17 00:00:00 2001 From: "Paul J. Dorn" Date: Wed, 14 Aug 2024 23:47:38 +0200 Subject: [PATCH 11/22] workaround: reintroduce gevent.Timeout handling This is probably wrong. But less wrong than handling *all* BaseException. Reports of this happening may be the result of some *other* async Timeout in the wsgi app bubbling through to us. Gevent docs promise: "[..] if *exception* is the literal ``False``, the timeout is still raised, but the context manager suppresses it, so the code outside the with-block won't see it." --- gunicorn/workers/base.py | 2 ++ gunicorn/workers/base_async.py | 3 +++ gunicorn/workers/geventlet.py | 2 ++ gunicorn/workers/ggevent.py | 2 ++ gunicorn/workers/sync.py | 2 ++ 5 files changed, 11 insertions(+) diff --git a/gunicorn/workers/base.py b/gunicorn/workers/base.py index 93c465c98..1a0176d17 100644 --- a/gunicorn/workers/base.py +++ b/gunicorn/workers/base.py @@ -29,6 +29,8 @@ class Worker: + WORKAROUND_BASE_EXCEPTIONS = () # none + SIGNALS = [getattr(signal, "SIG%s" % x) for x in ( "ABRT HUP QUIT INT TERM USR1 USR2 WINCH CHLD".split() )] diff --git a/gunicorn/workers/base_async.py b/gunicorn/workers/base_async.py index 082be66fc..bc790912a 100644 --- a/gunicorn/workers/base_async.py +++ b/gunicorn/workers/base_async.py @@ -81,6 +81,9 @@ def handle(self, listener, client, addr): self.log.debug("Ignoring socket not connected") else: self.log.debug("Ignoring EPIPE") + except self.WORKAROUND_BASE_EXCEPTIONS as e: + self.log.warning("Catched async exception (compat workaround). If this is not a bug in your app, please file a report.") + self.handle_error(req, client, addr, e) except Exception as e: self.handle_error(req, client, addr, e) finally: diff --git a/gunicorn/workers/geventlet.py b/gunicorn/workers/geventlet.py index 087eb61ec..87f9c4e1f 100644 --- a/gunicorn/workers/geventlet.py +++ b/gunicorn/workers/geventlet.py @@ -123,6 +123,8 @@ def patch_sendfile(): class EventletWorker(AsyncWorker): + WORKAROUND_BASE_EXCEPTIONS = (eventlet.Timeout, ) + def patch(self): hubs.use_hub() eventlet.monkey_patch() diff --git a/gunicorn/workers/ggevent.py b/gunicorn/workers/ggevent.py index b9b9b4408..538dd13c9 100644 --- a/gunicorn/workers/ggevent.py +++ b/gunicorn/workers/ggevent.py @@ -31,6 +31,8 @@ class GeventWorker(AsyncWorker): + WORKAROUND_BASE_EXCEPTIONS = (gevent.Timeout, ) + server_class = None wsgi_handler = None diff --git a/gunicorn/workers/sync.py b/gunicorn/workers/sync.py index 754ae08fe..e972d9c3a 100644 --- a/gunicorn/workers/sync.py +++ b/gunicorn/workers/sync.py @@ -110,6 +110,8 @@ def run_for_multiple(self, timeout): return def run(self): + assert len(self.WORKAROUND_BASE_EXCEPTIONS) == 0 + # if no timeout is given the worker will never wait and will # use the CPU for nothing. This minimal timeout prevent it. timeout = self.timeout or 0.5 From ef94875043fea78ff158067f8d6da506fad698ca Mon Sep 17 00:00:00 2001 From: "Paul J. Dorn" Date: Wed, 14 Aug 2024 23:58:17 +0200 Subject: [PATCH 12/22] style: line break --- gunicorn/workers/base_async.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/gunicorn/workers/base_async.py b/gunicorn/workers/base_async.py index bc790912a..bb6d560d6 100644 --- a/gunicorn/workers/base_async.py +++ b/gunicorn/workers/base_async.py @@ -82,7 +82,8 @@ def handle(self, listener, client, addr): else: self.log.debug("Ignoring EPIPE") except self.WORKAROUND_BASE_EXCEPTIONS as e: - self.log.warning("Catched async exception (compat workaround). If this is not a bug in your app, please file a report.") + self.log.warning("Catched async exception (compat workaround). " + "If this is not a bug in your app, please file a report.") self.handle_error(req, client, addr, e) except Exception as e: self.handle_error(req, client, addr, e) From d5aa52e2fb58f2d5f4de432137e10f8439ce3ef8 Mon Sep 17 00:00:00 2001 From: Raghu Udiyar Date: Wed, 7 Aug 2024 16:35:36 +0530 Subject: [PATCH 13/22] Enable only on Linux platforms, and add config flag --- docs/source/settings.rst | 13 +++++++++++++ gunicorn/arbiter.py | 19 +++++++++---------- gunicorn/config.py | 14 ++++++++++++++ gunicorn/sock.py | 31 +++++++++++++++++-------------- 4 files changed, 53 insertions(+), 24 deletions(-) diff --git a/docs/source/settings.rst b/docs/source/settings.rst index e1e91fa76..283a2eaee 100644 --- a/docs/source/settings.rst +++ b/docs/source/settings.rst @@ -461,6 +461,19 @@ if not provided). .. versionadded:: 19.2 +.. _enable-backlog-metric: + +``enable_backlog_metric`` +~~~~~~~~~~~~~~~~~~~~~~~~~ + +**Command line:** ``--enable-backlog-metric`` + +**Default:** ``False`` + +Enable socket backlog metric (only supported on Linux). + +.. versionadded:: 23.1 + Process Naming -------------- diff --git a/gunicorn/arbiter.py b/gunicorn/arbiter.py index 8537f1e85..373d1b534 100644 --- a/gunicorn/arbiter.py +++ b/gunicorn/arbiter.py @@ -583,16 +583,15 @@ def manage_workers(self): "value": active_worker_count, "mtype": "gauge"}) - backlog = sum( - sock.get_backlog() or 0 - for sock in self.LISTENERS - ) - - if backlog: - self.log.debug("socket backlog: {0}".format(backlog), - extra={"metric": "gunicorn.backlog", - "value": backlog, - "mtype": "histogram"}) + if self.cfg.enable_backlog_metric: + backlog = sum(sock.get_backlog() or 0 + for sock in self.LISTENERS) + + if backlog >= 0: + self.log.debug("socket backlog: {0}".format(backlog), + extra={"metric": "gunicorn.backlog", + "value": backlog, + "mtype": "histogram"}) def spawn_worker(self): self.worker_age += 1 diff --git a/gunicorn/config.py b/gunicorn/config.py index 402a26b68..df0f3a36c 100644 --- a/gunicorn/config.py +++ b/gunicorn/config.py @@ -1693,6 +1693,20 @@ class StatsdPrefix(Setting): """ +class BacklogMetric(Setting): + name = "enable_backlog_metric" + section = "Logging" + cli = ["--enable-backlog-metric"] + validator = validate_bool + default = False + action = "store_true" + desc = """\ + Enable socket backlog metric (only supported on Linux). + + .. versionadded:: 23.1 + """ + + class Procname(Setting): name = "proc_name" section = "Process Naming" diff --git a/gunicorn/sock.py b/gunicorn/sock.py index 13eeece71..b7e64deb7 100644 --- a/gunicorn/sock.py +++ b/gunicorn/sock.py @@ -73,7 +73,7 @@ def close(self): self.sock = None def get_backlog(self): - return None + return -1 class TCPSocket(BaseSocket): @@ -93,19 +93,22 @@ def set_options(self, sock, bound=False): sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) return super().set_options(sock, bound=bound) - def get_backlog(self): - if self.sock and PLATFORM == "linux": - # tcp_info struct from include/uapi/linux/tcp.h - fmt = 'B' * 8 + 'I' * 24 - try: - tcp_info_struct = self.sock.getsockopt(socket.IPPROTO_TCP, - socket.TCP_INFO, 104) - # 12 is tcpi_unacked - return struct.unpack(fmt, tcp_info_struct)[12] - except AttributeError: - pass - - return None + if PLATFORM == "linux": + def get_backlog(self): + if self.sock: + # tcp_info struct from include/uapi/linux/tcp.h + fmt = 'B' * 8 + 'I' * 24 + try: + tcp_info_struct = self.sock.getsockopt(socket.IPPROTO_TCP, + socket.TCP_INFO, 104) + # 12 is tcpi_unacked + return struct.unpack(fmt, tcp_info_struct)[12] + except AttributeError: + pass + return 0 + else: + def get_backlog(self): + return -1 class TCP6Socket(TCPSocket): From 5b33c013d359b43d7bf951028f7e6daea415f7f5 Mon Sep 17 00:00:00 2001 From: Richard Eklycke Date: Fri, 2 Feb 2024 11:28:54 +0100 Subject: [PATCH 14/22] arbiter: Handle SIGCHLD in normal/main process context ... as opposed to in signal context. This is beneficial, since it means that we can, in a signal safe way, print messages about why e.g. a worker stopped its execution. And since handle_sigchld() logs what it does anyway, don't bother printing out that we're handling SIGCHLD. If workers are killed at rapid pace, we won't get as many SIGCHLD as there are workers killed anyway. --- gunicorn/arbiter.py | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/gunicorn/arbiter.py b/gunicorn/arbiter.py index 1eaf453d5..465b9db11 100644 --- a/gunicorn/arbiter.py +++ b/gunicorn/arbiter.py @@ -40,12 +40,9 @@ class Arbiter: # I love dynamic languages SIG_QUEUE = [] - SIGNALS = [getattr(signal, "SIG%s" % x) - for x in "HUP QUIT INT TERM TTIN TTOU USR1 USR2 WINCH".split()] - SIG_NAMES = dict( - (getattr(signal, name), name[3:].lower()) for name in dir(signal) - if name[:3] == "SIG" and name[3] != "_" - ) + SIGNALS = [getattr(signal.Signals, "SIG%s" % x) + for x in "CHLD HUP QUIT INT TERM TTIN TTOU USR1 USR2 WINCH".split()] + SIG_NAMES = dict((sig, sig.name[3:].lower()) for sig in SIGNALS) def __init__(self, app): os.environ["SERVER_SOFTWARE"] = SERVER_SOFTWARE @@ -185,7 +182,6 @@ def init_signals(self): # initialize all signals for s in self.SIGNALS: signal.signal(s, self.signal) - signal.signal(signal.SIGCHLD, self.handle_chld) def signal(self, sig, frame): if len(self.SIG_QUEUE) < 5: @@ -219,7 +215,8 @@ def run(self): if not handler: self.log.error("Unhandled signal: %s", signame) continue - self.log.info("Handling signal: %s", signame) + if sig != signal.SIGCHLD: + self.log.info("Handling signal: %s", signame) handler() self.wakeup() except (StopIteration, KeyboardInterrupt): @@ -236,10 +233,9 @@ def run(self): self.pidfile.unlink() sys.exit(-1) - def handle_chld(self, sig, frame): + def handle_chld(self): "SIGCHLD handling" self.reap_workers() - self.wakeup() def handle_hup(self): """\ @@ -391,7 +387,10 @@ def stop(self, graceful=True): # instruct the workers to exit self.kill_workers(sig) # wait until the graceful timeout - while self.WORKERS and time.time() < limit: + while True: + self.reap_workers() + if not self.WORKERS or time.time() >= limit: + break time.sleep(0.1) self.kill_workers(signal.SIGKILL) From d653ebc07cd8e633444f067170f755c4af6edfb1 Mon Sep 17 00:00:00 2001 From: Richard Eklycke Date: Fri, 2 Feb 2024 12:36:36 +0100 Subject: [PATCH 15/22] arbiter: Remove PIPE and only use SIG_QUEUE instead Since we can use something from queue.*, we can make it blocking as well, removing the need for two different data structures. --- gunicorn/arbiter.py | 90 +++++++++++---------------------------------- 1 file changed, 22 insertions(+), 68 deletions(-) diff --git a/gunicorn/arbiter.py b/gunicorn/arbiter.py index 465b9db11..3aa31c980 100644 --- a/gunicorn/arbiter.py +++ b/gunicorn/arbiter.py @@ -4,11 +4,11 @@ import errno import os import random -import select import signal import sys import time import traceback +import queue from gunicorn.errors import HaltServer, AppImportError from gunicorn.pidfile import Pidfile @@ -36,10 +36,9 @@ class Arbiter: LISTENERS = [] WORKERS = {} - PIPE = [] # I love dynamic languages - SIG_QUEUE = [] + SIG_QUEUE = queue.SimpleQueue() SIGNALS = [getattr(signal.Signals, "SIG%s" % x) for x in "CHLD HUP QUIT INT TERM TTIN TTOU USR1 USR2 WINCH".split()] SIG_NAMES = dict((sig, sig.name[3:].lower()) for sig in SIGNALS) @@ -167,16 +166,6 @@ def init_signals(self): Initialize master signal handling. Most of the signals are queued. Child signals only wake up the master. """ - # close old PIPE - for p in self.PIPE: - os.close(p) - - # initialize the pipe - self.PIPE = pair = os.pipe() - for p in pair: - util.set_non_blocking(p) - util.close_on_exec(p) - self.log.close_on_exec() # initialize all signals @@ -184,9 +173,8 @@ def init_signals(self): signal.signal(s, self.signal) def signal(self, sig, frame): - if len(self.SIG_QUEUE) < 5: - self.SIG_QUEUE.append(sig) - self.wakeup() + """ Note: Signal handler! No logging allowed. """ + self.SIG_QUEUE.put(sig) def run(self): "Main master loop." @@ -199,26 +187,23 @@ def run(self): while True: self.maybe_promote_master() - sig = self.SIG_QUEUE.pop(0) if self.SIG_QUEUE else None - if sig is None: - self.sleep() - self.murder_workers() - self.manage_workers() - continue - - if sig not in self.SIG_NAMES: - self.log.info("Ignoring unknown signal: %s", sig) - continue + try: + sig = self.SIG_QUEUE.get(timeout=1) + except queue.Empty: + sig = None + + if sig: + signame = self.SIG_NAMES.get(sig) + handler = getattr(self, "handle_%s" % signame, None) + if not handler: + self.log.error("Unhandled signal: %s", signame) + continue + if sig != signal.SIGCHLD: + self.log.info("Handling signal: %s", signame) + handler() - signame = self.SIG_NAMES.get(sig) - handler = getattr(self, "handle_%s" % signame, None) - if not handler: - self.log.error("Unhandled signal: %s", signame) - continue - if sig != signal.SIGCHLD: - self.log.info("Handling signal: %s", signame) - handler() - self.wakeup() + self.murder_workers() + self.manage_workers() except (StopIteration, KeyboardInterrupt): self.halt() except HaltServer as inst: @@ -322,16 +307,6 @@ def maybe_promote_master(self): # reset proctitle util._setproctitle("master [%s]" % self.proc_name) - def wakeup(self): - """\ - Wake up the arbiter by writing to the PIPE - """ - try: - os.write(self.PIPE[1], b'.') - except OSError as e: - if e.errno not in [errno.EAGAIN, errno.EINTR]: - raise - def halt(self, reason=None, exit_status=0): """ halt arbiter """ self.stop() @@ -346,25 +321,6 @@ def halt(self, reason=None, exit_status=0): self.cfg.on_exit(self) sys.exit(exit_status) - def sleep(self): - """\ - Sleep until PIPE is readable or we timeout. - A readable PIPE means a signal occurred. - """ - try: - ready = select.select([self.PIPE[0]], [], [], 1.0) - if not ready[0]: - return - while os.read(self.PIPE[0], 1): - pass - except OSError as e: - # TODO: select.error is a subclass of OSError since Python 3.3. - error_number = getattr(e, 'errno', e.args[0]) - if error_number not in [errno.EAGAIN, errno.EINTR]: - raise - except KeyboardInterrupt: - sys.exit() - def stop(self, graceful=True): """\ Stop workers @@ -387,11 +343,9 @@ def stop(self, graceful=True): # instruct the workers to exit self.kill_workers(sig) # wait until the graceful timeout - while True: - self.reap_workers() - if not self.WORKERS or time.time() >= limit: - break + while self.WORKERS and time.time() < limit: time.sleep(0.1) + self.reap_workers() self.kill_workers(signal.SIGKILL) From 052448a64fb6f4cd54e757647e958ff889554211 Mon Sep 17 00:00:00 2001 From: Richard Eklycke Date: Thu, 1 Feb 2024 22:19:05 +0100 Subject: [PATCH 16/22] arbiter: Use waitpid() facilities to handle worker exit status This change is meant to handle the return value of waitpid() in a way that is more in line with the man page of said syscall. The changes can be summarized as follows: * Use os.WIFEXITED and os.WIFSIGNALED to determine what caused waitpid() to return, and exactly how a worker may have exited. * In case of normal termination, use os.WEXITSTATUS() to read the exit status (instead of using a hand rolled bit shift). A redundant log was removed in this code path. * In case of termination by a signal, use os.WTERMSIG() to determine the signal which caused the worker to terminate. This was buggy before, since the WCOREFLAG (0x80) could cause e.g. a SIGSEGV (code 11) to be reported as "code 139", meaning "code (0x80 | 11)". * Since waitpid() isn't called with WSTOPPED nor WCONTINUED, there's no need to have any os.WIFSTOPPED or os.WIFCONTINUED handling. --- gunicorn/arbiter.py | 58 ++++++++++++++++++++------------------------- 1 file changed, 26 insertions(+), 32 deletions(-) diff --git a/gunicorn/arbiter.py b/gunicorn/arbiter.py index 3aa31c980..b4fc1808d 100644 --- a/gunicorn/arbiter.py +++ b/gunicorn/arbiter.py @@ -470,44 +470,38 @@ def reap_workers(self): break if self.reexec_pid == wpid: self.reexec_pid = 0 - else: - # A worker was terminated. If the termination reason was - # that it could not boot, we'll shut it down to avoid - # infinite start/stop cycles. - exitcode = status >> 8 - if exitcode != 0: - self.log.error('Worker (pid:%s) exited with code %s', wpid, exitcode) + continue + + if os.WIFEXITED(status): + # A worker was normally terminated. If the termination + # reason was that it could not boot, we'll halt the server + # to avoid infinite start/stop cycles. + exitcode = os.WEXITSTATUS(status) + log = self.log.error if exitcode != 0 else self.log.debug + log('Worker (pid:%s) exited with code %s', wpid, exitcode) if exitcode == self.WORKER_BOOT_ERROR: reason = "Worker failed to boot." raise HaltServer(reason, self.WORKER_BOOT_ERROR) if exitcode == self.APP_LOAD_ERROR: reason = "App failed to load." raise HaltServer(reason, self.APP_LOAD_ERROR) - - if exitcode > 0: - # If the exit code of the worker is greater than 0, - # let the user know. - self.log.error("Worker (pid:%s) exited with code %s.", - wpid, exitcode) - elif status > 0: - # If the exit code of the worker is 0 and the status - # is greater than 0, then it was most likely killed - # via a signal. - try: - sig_name = signal.Signals(status).name - except ValueError: - sig_name = "code {}".format(status) - msg = "Worker (pid:{}) was sent {}!".format( - wpid, sig_name) - - # Additional hint for SIGKILL - if status == signal.SIGKILL: - msg += " Perhaps out of memory?" - self.log.error(msg) - - worker = self.WORKERS.pop(wpid, None) - if not worker: - continue + elif os.WIFSIGNALED(status): + # A worker was terminated by a signal. + sig = os.WTERMSIG(status) + try: + sig_name = signal.Signals(sig).name + except ValueError: + sig_name = "signal {}".format(sig) + msg = "Worker (pid:{}) was terminated by {}!".format( + wpid, sig_name) + + # Additional hint for SIGKILL + if sig == signal.SIGKILL: + msg += " Perhaps out of memory?" + self.log.error(msg) + + worker = self.WORKERS.pop(wpid, None) + if worker: worker.tmp.close() self.cfg.child_exit(self, worker) except OSError as e: From b3db5b90a23806ac413dec800f3c27e22c8fe848 Mon Sep 17 00:00:00 2001 From: Richard Eklycke Date: Sat, 3 Feb 2024 17:08:16 +0100 Subject: [PATCH 17/22] arbiter: Reinstall SIGCHLD as required by some UNIXes According to the python signal documentation[1], SIGCHLD is handled differently from other signals. Specifically, if the underlying implementation resets the SIGCHLD signal handler, then python won't reinstall it (as it does for other signals). This behavior doesn't seem to exist for neither Linux nor Mac, but perhaps one could argue that it's good practise anyway. [1] https://docs.python.org/3/library/signal.html --- gunicorn/arbiter.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/gunicorn/arbiter.py b/gunicorn/arbiter.py index b4fc1808d..efc6769ba 100644 --- a/gunicorn/arbiter.py +++ b/gunicorn/arbiter.py @@ -176,6 +176,10 @@ def signal(self, sig, frame): """ Note: Signal handler! No logging allowed. """ self.SIG_QUEUE.put(sig) + # Some UNIXes require SIGCHLD to be reinstalled, see python signal docs + if sig == signal.SIGCHLD: + signal.signal(sig, self.signal) + def run(self): "Main master loop." self.start() From 64387d1715d9da1c19d427e51834df2dfdd52201 Mon Sep 17 00:00:00 2001 From: Richard Eklycke Date: Sun, 4 Feb 2024 23:06:54 +0100 Subject: [PATCH 18/22] arbiter: clean up main loop * Look up handlers in __init__() to induce run-time error early on if something is wrong. * Since we now know that all handlers exist, we can simplify the main loop in arbiter, in such a way that we don't need to call wakeup(). So after this commit, the pipe in arbiter is only used to deliver which signal was sent. --- gunicorn/arbiter.py | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/gunicorn/arbiter.py b/gunicorn/arbiter.py index efc6769ba..256cba66f 100644 --- a/gunicorn/arbiter.py +++ b/gunicorn/arbiter.py @@ -41,7 +41,6 @@ class Arbiter: SIG_QUEUE = queue.SimpleQueue() SIGNALS = [getattr(signal.Signals, "SIG%s" % x) for x in "CHLD HUP QUIT INT TERM TTIN TTOU USR1 USR2 WINCH".split()] - SIG_NAMES = dict((sig, sig.name[3:].lower()) for sig in SIGNALS) def __init__(self, app): os.environ["SERVER_SOFTWARE"] = SERVER_SOFTWARE @@ -71,6 +70,11 @@ def __init__(self, app): 0: sys.executable } + self.SIG_HANDLERS = dict( + (sig, getattr(self, "handle_%s" % sig.name[3:].lower())) + for sig in self.SIGNALS + ) + def _get_num_workers(self): return self._num_workers @@ -193,18 +197,11 @@ def run(self): try: sig = self.SIG_QUEUE.get(timeout=1) - except queue.Empty: - sig = None - - if sig: - signame = self.SIG_NAMES.get(sig) - handler = getattr(self, "handle_%s" % signame, None) - if not handler: - self.log.error("Unhandled signal: %s", signame) - continue if sig != signal.SIGCHLD: - self.log.info("Handling signal: %s", signame) - handler() + self.log.info("Handling signal: %s", signal.Signals(sig).name) + self.SIG_HANDLERS[sig]() + except queue.Empty: + pass self.murder_workers() self.manage_workers() From 7ecea2d54437ce9025aacd7f52a361d96cab4125 Mon Sep 17 00:00:00 2001 From: Richard Eklycke Date: Sun, 18 Aug 2024 12:26:37 +0200 Subject: [PATCH 19/22] arbiter: Add Arbiter:wakeup() method It accepts an optional "due_to_signal" argument which can be used to tell if the wakeup was made because a signal handler needs to be executed or not. --- gunicorn/arbiter.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/gunicorn/arbiter.py b/gunicorn/arbiter.py index 256cba66f..b918e6e80 100644 --- a/gunicorn/arbiter.py +++ b/gunicorn/arbiter.py @@ -41,6 +41,7 @@ class Arbiter: SIG_QUEUE = queue.SimpleQueue() SIGNALS = [getattr(signal.Signals, "SIG%s" % x) for x in "CHLD HUP QUIT INT TERM TTIN TTOU USR1 USR2 WINCH".split()] + WAKEUP_REQUEST = signal.NSIG def __init__(self, app): os.environ["SERVER_SOFTWARE"] = SERVER_SOFTWARE @@ -178,12 +179,18 @@ def init_signals(self): def signal(self, sig, frame): """ Note: Signal handler! No logging allowed. """ - self.SIG_QUEUE.put(sig) + self.wakeup(due_to_signal=sig) # Some UNIXes require SIGCHLD to be reinstalled, see python signal docs if sig == signal.SIGCHLD: signal.signal(sig, self.signal) + def wakeup(self, due_to_signal=None): + """\ + Wake up the main master loop. + """ + self.SIG_QUEUE.put(due_to_signal or self.WAKEUP_REQUEST) + def run(self): "Main master loop." self.start() @@ -197,9 +204,10 @@ def run(self): try: sig = self.SIG_QUEUE.get(timeout=1) - if sig != signal.SIGCHLD: - self.log.info("Handling signal: %s", signal.Signals(sig).name) - self.SIG_HANDLERS[sig]() + if sig != self.WAKEUP_REQUEST: + if sig != signal.SIGCHLD: + self.log.info("Handling signal: %s", signal.Signals(sig).name) + self.SIG_HANDLERS[sig]() except queue.Empty: pass From 497ad24ceb20c40a6f3c49281170f392ad0f68d4 Mon Sep 17 00:00:00 2001 From: Richard Eklycke Date: Sun, 18 Feb 2024 00:29:44 +0100 Subject: [PATCH 20/22] workers/gthread: Remove locks + one event queue + general cleanup The main purpose is to remove complexity from gthread by: * Removing the lock for handling self._keep and self.poller. This is possible since we now do all such manipulation on the main thread instead. When a connection is done, it posts a callback through the PollableMethodCaller which gets executed on the main thread. * Having a single event queue (self.poller), as opposed to also managing a set of futures. This fixes #3146 (although there are more minimal ways of doing it). There are other more minor things as well: * Renaming some variables, e.g. self._keep to self.keepalived_conns. * Remove self-explanatory comments (what the code does, not why). * Just decide that socket is blocking. * Use time.monotonic() for timeouts in gthread. Some complexity has been added to the shutdown sequence, but hopefully for good reason: it's to make sure that all already accepted connections are served within the grace period. --- gunicorn/workers/gthread.py | 261 ++++++++++++++++-------------------- 1 file changed, 114 insertions(+), 147 deletions(-) diff --git a/gunicorn/workers/gthread.py b/gunicorn/workers/gthread.py index 7a23228cd..b47ddaef5 100644 --- a/gunicorn/workers/gthread.py +++ b/gunicorn/workers/gthread.py @@ -13,6 +13,7 @@ from concurrent import futures import errno import os +import queue import selectors import socket import ssl @@ -21,7 +22,6 @@ from collections import deque from datetime import datetime from functools import partial -from threading import RLock from . import base from .. import http @@ -40,44 +40,64 @@ def __init__(self, cfg, sock, client, server): self.timeout = None self.parser = None - self.initialized = False - - # set the socket to non blocking - self.sock.setblocking(False) def init(self): - self.initialized = True - self.sock.setblocking(True) - if self.parser is None: # wrap the socket if needed if self.cfg.is_ssl: self.sock = sock.ssl_wrap_socket(self.sock, self.cfg) - # initialize the parser self.parser = http.RequestParser(self.cfg, self.sock, self.client) - def set_timeout(self): - # set the timeout - self.timeout = time.time() + self.cfg.keepalive + def is_initialized(self): + return bool(self.parser) + + def set_keepalive_timeout(self): + self.timeout = time.monotonic() + self.cfg.keepalive def close(self): util.close(self.sock) +class PollableMethodQueue(object): + + def __init__(self): + self.fds = [] + self.method_queue = None + + def init(self): + self.fds = os.pipe() + self.method_queue = queue.SimpleQueue() + + def close(self): + for fd in self.fds: + os.close(fd) + + def get_fd(self): + return self.fds[0] + + def defer(self, callback, *args): + self.method_queue.put(partial(callback, *args)) + os.write(self.fds[1], b'0') + + def run_callbacks(self, max_callbacks_at_a_time=10): + zeroes = os.read(self.fds[0], max_callbacks_at_a_time) + for _ in range(0, len(zeroes)): + method = self.method_queue.get() + method() + + class ThreadWorker(base.Worker): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.worker_connections = self.cfg.worker_connections self.max_keepalived = self.cfg.worker_connections - self.cfg.threads - # initialise the pool - self.tpool = None + self.thread_pool = None self.poller = None - self._lock = None - self.futures = deque() - self._keep = deque() + self.keepalived_conns = deque() self.nr_conns = 0 + self.method_queue = PollableMethodQueue() @classmethod def check_config(cls, cfg, log): @@ -88,100 +108,67 @@ def check_config(cls, cfg, log): "Check the number of worker connections and threads.") def init_process(self): - self.tpool = self.get_thread_pool() + self.thread_pool = self.get_thread_pool() self.poller = selectors.DefaultSelector() - self._lock = RLock() + self.method_queue.init() super().init_process() def get_thread_pool(self): """Override this method to customize how the thread pool is created""" return futures.ThreadPoolExecutor(max_workers=self.cfg.threads) + def handle_exit(self, sig, frame): + if self.alive: + self.alive = False + self.method_queue.defer(lambda: None) # To wake up poller.select() + def handle_quit(self, sig, frame): - self.alive = False - # worker_int callback - self.cfg.worker_int(self) - self.tpool.shutdown(False) - time.sleep(0.1) - sys.exit(0) - - def _wrap_future(self, fs, conn): - fs.conn = conn - self.futures.append(fs) - fs.add_done_callback(self.finish_request) - - def enqueue_req(self, conn): - conn.init() - # submit the connection to a worker - fs = self.tpool.submit(self.handle, conn) - self._wrap_future(fs, conn) + self.thread_pool.shutdown(False) + super().handle_quit(sig, frame) + + def set_accept_enabled(self, enabled): + for sock in self.sockets: + if enabled: + self.poller.register(sock, selectors.EVENT_READ, self.accept) + else: + self.poller.unregister(sock) - def accept(self, server, listener): + def accept(self, listener): try: sock, client = listener.accept() - # initialize the connection object - conn = TConn(self.cfg, sock, client, server) - self.nr_conns += 1 - # wait until socket is readable - with self._lock: - self.poller.register(conn.sock, selectors.EVENT_READ, - partial(self.on_client_socket_readable, conn)) + sock.setblocking(True) # Explicitly set behavior since it differs per OS + conn = TConn(self.cfg, sock, client, listener.getsockname()) + + self.poller.register(conn.sock, selectors.EVENT_READ, + partial(self.on_client_socket_readable, conn)) except OSError as e: if e.errno not in (errno.EAGAIN, errno.ECONNABORTED, errno.EWOULDBLOCK): raise def on_client_socket_readable(self, conn, client): - with self._lock: - # unregister the client from the poller - self.poller.unregister(client) + self.poller.unregister(client) - if conn.initialized: - # remove the connection from keepalive - try: - self._keep.remove(conn) - except ValueError: - # race condition - return + if conn.is_initialized(): + self.keepalived_conns.remove(conn) + conn.init() - # submit the connection to a worker - self.enqueue_req(conn) + fs = self.thread_pool.submit(self.handle, conn) + fs.add_done_callback( + lambda fut: self.method_queue.defer(self.finish_request, conn, fut)) def murder_keepalived(self): - now = time.time() - while True: - with self._lock: - try: - # remove the connection from the queue - conn = self._keep.popleft() - except IndexError: - break - - delta = conn.timeout - now + now = time.monotonic() + while self.keepalived_conns: + delta = self.keepalived_conns[0].timeout - now if delta > 0: - # add the connection back to the queue - with self._lock: - self._keep.appendleft(conn) break - else: - self.nr_conns -= 1 - # remove the socket from the poller - with self._lock: - try: - self.poller.unregister(conn.sock) - except OSError as e: - if e.errno != errno.EBADF: - raise - except KeyError: - # already removed by the system, continue - pass - except ValueError: - # already removed by the system continue - pass - - # close the socket - conn.close() + + conn = self.keepalived_conns.popleft() + self.poller.unregister(conn.sock) + self.nr_conns -= 1 + conn.close() def is_parent_alive(self): # If our parent changed then we shut down. @@ -190,39 +177,23 @@ def is_parent_alive(self): return False return True + def wait_for_and_dispatch_events(self, timeout): + for key, _ in self.poller.select(timeout): + callback = key.data + callback(key.fileobj) + def run(self): - # init listeners, add them to the event loop - for sock in self.sockets: - sock.setblocking(False) - # a race condition during graceful shutdown may make the listener - # name unavailable in the request handler so capture it once here - server = sock.getsockname() - acceptor = partial(self.accept, server) - self.poller.register(sock, selectors.EVENT_READ, acceptor) + self.set_accept_enabled(True) + self.poller.register(self.method_queue.get_fd(), + selectors.EVENT_READ, + self.method_queue.run_callbacks) while self.alive: # notify the arbiter we are alive self.notify() - # can we accept more connections? - if self.nr_conns < self.worker_connections: - # wait for an event - events = self.poller.select(1.0) - for key, _ in events: - callback = key.data - callback(key.fileobj) - - # check (but do not wait) for finished requests - result = futures.wait(self.futures, timeout=0, - return_when=futures.FIRST_COMPLETED) - else: - # wait for a request to finish - result = futures.wait(self.futures, timeout=1.0, - return_when=futures.FIRST_COMPLETED) - - # clean up finished requests - for fut in result.done: - self.futures.remove(fut) + new_connections_accepted = self.nr_conns < self.worker_connections + self.wait_for_and_dispatch_events(timeout=1) if not self.is_parent_alive(): break @@ -230,57 +201,53 @@ def run(self): # handle keepalive timeouts self.murder_keepalived() - self.tpool.shutdown(False) + new_connections_still_accepted = self.nr_conns < self.worker_connections + if new_connections_accepted != new_connections_still_accepted: + self.set_accept_enabled(new_connections_still_accepted) + + # Don't accept any new connections, as we're about to shut down + if self.nr_conns < self.worker_connections: + self.set_accept_enabled(False) + + # ... but try handle all already accepted connections within the grace period + graceful_timeout = time.monotonic() + self.cfg.graceful_timeout + while self.nr_conns > 0: + time_remaining = max(graceful_timeout - time.monotonic(), 0) + if time_remaining == 0: + break + self.wait_for_and_dispatch_events(timeout=time_remaining) + + self.thread_pool.shutdown(wait=False) self.poller.close() + self.method_queue.close() for s in self.sockets: s.close() - futures.wait(self.futures, timeout=self.cfg.graceful_timeout) - - def finish_request(self, fs): - if fs.cancelled(): - self.nr_conns -= 1 - fs.conn.close() - return - + def finish_request(self, conn, fs): try: - (keepalive, conn) = fs.result() - # if the connection should be kept alived add it - # to the eventloop and record it + keepalive = not fs.cancelled() and fs.result() if keepalive and self.alive: - # flag the socket as non blocked - conn.sock.setblocking(False) - - # register the connection - conn.set_timeout() - with self._lock: - self._keep.append(conn) - - # add the socket to the event loop - self.poller.register(conn.sock, selectors.EVENT_READ, - partial(self.on_client_socket_readable, conn)) + conn.set_keepalive_timeout() + self.keepalived_conns.append(conn) + self.poller.register(conn.sock, selectors.EVENT_READ, + partial(self.on_client_socket_readable, conn)) else: self.nr_conns -= 1 conn.close() except Exception: - # an exception happened, make sure to close the - # socket. self.nr_conns -= 1 - fs.conn.close() + conn.close() def handle(self, conn): - keepalive = False req = None try: req = next(conn.parser) if not req: - return (False, conn) + return False # handle the request - keepalive = self.handle_request(req, conn) - if keepalive: - return (keepalive, conn) + return self.handle_request(req, conn) except http.errors.NoMoreData as e: self.log.debug("Ignored premature client disconnection. %s", e) @@ -307,7 +274,7 @@ def handle(self, conn): except Exception as e: self.handle_error(req, conn.sock, conn.client, e) - return (False, conn) + return False def handle_request(self, req, conn): environ = {} @@ -327,7 +294,7 @@ def handle_request(self, req, conn): if not self.alive or not self.cfg.keepalive: resp.force_close() - elif len(self._keep) >= self.max_keepalived: + elif len(self.keepalived_conns) >= self.max_keepalived: resp.force_close() respiter = self.wsgi(environ, resp.start_response) From e5236b14c728b7206a9d5ea6b065549f98fb8f4b Mon Sep 17 00:00:00 2001 From: "Paul J. Dorn" Date: Fri, 16 Aug 2024 23:13:55 +0200 Subject: [PATCH 21/22] test: setup nginx proxy --- .github/workflows/tox.yml | 5 + tests/test_nginx.py | 457 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 462 insertions(+) create mode 100644 tests/test_nginx.py diff --git a/.github/workflows/tox.yml b/.github/workflows/tox.yml index 759800eb1..b6be4e082 100644 --- a/.github/workflows/tox.yml +++ b/.github/workflows/tox.yml @@ -54,6 +54,11 @@ jobs: cache-dependency-path: requirements_test.txt check-latest: true allow-prereleases: ${{ matrix.unsupported }} + - name: Add test utils + if: matrix.os == 'ubuntu-latest' + run: | + sudo systemctl mask nginx.service + sudo apt install nginx openssl - name: Install Dependencies run: | python -m pip install --upgrade pip diff --git a/tests/test_nginx.py b/tests/test_nginx.py new file mode 100644 index 000000000..69742edb7 --- /dev/null +++ b/tests/test_nginx.py @@ -0,0 +1,457 @@ +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + +# hint: can see stdout as the (complex) test progresses using: +# python -B -m pytest -s -vvvv --ff \ +# --override-ini=addopts=--strict-markers --exitfirst \ +# -- tests/test_nginx.py + +import importlib +import os +import secrets +import signal +import subprocess +import sys +import time +from pathlib import Path +from tempfile import TemporaryDirectory +from typing import TYPE_CHECKING +from filelock import FileLock + +import pytest + +if TYPE_CHECKING: + import http.client + from typing import Any, NamedTuple, Self + +CMD_OPENSSL = Path("/usr/bin/openssl") +CMD_NGINX = Path("/usr/sbin/nginx") + +pytestmark = pytest.mark.skipif( + not CMD_OPENSSL.is_file() or not CMD_NGINX.is_file(), + reason="need %s and %s" % (CMD_OPENSSL, CMD_NGINX), +) + +STDOUT = 0 +STDERR = 1 + +TEST_SIMPLE = [ + pytest.param("sync"), + "eventlet", + "gevent", + "gevent_wsgi", + "gevent_pywsgi", + # "tornado", + "gthread", + # "aiohttp.GunicornWebWorker", # different app signature + # "aiohttp.GunicornUVLoopWebWorker", # " +] # type: list[str|NamedTuple] + +WORKER_DEPENDS = { + "aiohttp.GunicornWebWorker": ["aiohttp"], + "aiohttp.GunicornUVLoopWebWorker": ["aiohttp", "uvloop"], + "uvicorn.workers.UvicornWorker": ["uvicorn"], # deprecated + "uvicorn.workers.UvicornH11Worker": ["uvicorn"], # deprecated + "uvicorn_worker.UvicornWorker": ["uvicorn_worker"], + "uvicorn_worker.UvicornH11Worker": ["uvicorn_worker"], + "eventlet": ["eventlet"], + "gevent": ["gevent"], + "gevent_wsgi": ["gevent"], + "gevent_pywsgi": ["gevent"], + "tornado": ["tornado"], +} +DEP_WANTED = set(sum(WORKER_DEPENDS.values(), start=[])) # type: set[str] +DEP_INSTALLED = set() # type: set[str] + +for dependency in DEP_WANTED: + try: + importlib.import_module(dependency) + DEP_INSTALLED.add(dependency) + except ImportError: + pass + +for worker_name, worker_needs in WORKER_DEPENDS.items(): + missing = list(pkg for pkg in worker_needs if pkg not in DEP_INSTALLED) + if missing: + for T in (TEST_SIMPLE,): + if worker_name not in T: + continue + T.remove(worker_name) + skipped_worker = pytest.param( + worker_name, marks=pytest.mark.skip("%s not installed" % (missing[0])) + ) + T.append(skipped_worker) + +WORKER_COUNT = 2 +GRACEFUL_TIMEOUT = 3 +APP_IMPORT_NAME = "testsyntax" +APP_FUNC_NAME = "myapp" +HTTP_HOST = "local.test" + +PY_APPLICATION = f""" +import time +def {APP_FUNC_NAME}(environ, start_response): + body = b"response body from app" + response_head = [ + ("Content-Type", "text/plain"), + ("Content-Length", "%d" % len(body)), + ] + start_response("200 OK", response_head) + time.sleep(0.02) + return iter([body]) +""" + +# used in string.format() - duplicate {{ and }} +NGINX_CONFIG_TEMPLATE = """ +pid {pid_path}; +worker_processes 1; +error_log stderr notice; +events {{ + worker_connections 1024; +}} +worker_shutdown_timeout 1; +http {{ + default_type application/octet-stream; + access_log /dev/stdout combined; + upstream upstream_gunicorn {{ + server {gunicorn_upstream} fail_timeout=0; + }} + + server {{ listen {server_bind} default_server; return 400; }} + server {{ + listen {server_bind}; client_max_body_size 4G; + server_name {server_name}; + root {static_dir}; + location / {{ try_files $uri @proxy_to_app; }} + + location @proxy_to_app {{ + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + proxy_set_header Host $http_host; + proxy_http_version 1.1; + proxy_redirect off; + proxy_pass {proxy_method}://upstream_gunicorn; + }} + }} +}} +""" + + +class SubProcess: + GRACEFUL_SIGNAL = signal.SIGTERM + + def __enter__(self): + # type: () -> Self + self.run() + return self + + def __exit__(self, *exc): + # type: (*Any) -> None + if self.p is None: + return + self.p.send_signal(signal.SIGKILL) + stdout, stderr = self.p.communicate(timeout=1 + GRACEFUL_TIMEOUT) + ret = self.p.returncode + assert stdout == b"", stdout + assert ret == 0, (ret, stdout, stderr) + + def read_stdio(self, *, key, timeout_sec, wait_for_keyword, expect=None): + # type: (int, int, str, set[str]|None) -> str + # try: + # stdout, stderr = self.p.communicate(timeout=timeout) + # except subprocess.TimeoutExpired: + buf = ["", ""] + seen_keyword = 0 + unseen_keywords = list(expect or []) + poll_per_second = 20 + assert key in {0, 1}, key + assert self.p is not None # this helps static type checkers + assert self.p.stdout is not None # this helps static type checkers + assert self.p.stderr is not None # this helps static type checkers + for _ in range(timeout_sec * poll_per_second): + print("parsing", buf, "waiting for", wait_for_keyword, unseen_keywords) + for fd, file in enumerate([self.p.stdout, self.p.stderr]): + read = file.read(64 * 1024) + if read is not None: + buf[fd] += read.decode("utf-8", "surrogateescape") + if seen_keyword or wait_for_keyword in buf[key]: + seen_keyword += 1 + for additional_keyword in tuple(unseen_keywords): + for somewhere in buf: + if additional_keyword in somewhere: + unseen_keywords.remove(additional_keyword) + # gathered all the context we wanted + if seen_keyword and not unseen_keywords: + break + # not seen expected output? wait for % of original timeout + # .. maybe we will still see better error context that way + if seen_keyword > (0.5 * timeout_sec * poll_per_second): + break + # retcode = self.p.poll() + # if retcode is not None: + # break + time.sleep(1.0 / poll_per_second) + # assert buf[abs(key - 1)] == "" + assert wait_for_keyword in buf[key], (wait_for_keyword, *buf) + assert not unseen_keywords, (unseen_keywords, *buf) + return buf[key] + + def run(self): + # type: () -> None + self.p = subprocess.Popen( + self._argv, + bufsize=0, # allow read to return short + cwd=self.temp_path, + shell=False, + close_fds=True, + stdin=subprocess.DEVNULL, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + # creationflags=subprocess.CREATE_NEW_PROCESS_GROUP, + ) + os.set_blocking(self.p.stdout.fileno(), False) + os.set_blocking(self.p.stderr.fileno(), False) + assert self.p.stdout is not None # this helps static type checkers + + def graceful_quit(self, expect=None): + # type: (set[str]|None) -> str + if self.p is None: + raise AssertionError("called graceful_quit() when not running") + self.p.send_signal(self.GRACEFUL_SIGNAL) + # self.p.kill() + stdout = self.p.stdout.read(64 * 1024) or b"" + stderr = self.p.stderr.read(64 * 1024) or b"" + try: + o, e = self.p.communicate(timeout=GRACEFUL_TIMEOUT) + stdout += o + stderr += e + except subprocess.TimeoutExpired: + pass + assert stdout == b"" + self.p.stdout.close() + self.p.stderr.close() + exitcode = self.p.poll() # will return None if running + assert exitcode == 0, (exitcode, stdout, stderr) + print("output after signal: ", stdout, stderr, exitcode) + self.p = None + ret = stderr.decode("utf-8", "surrogateescape") + for keyword in expect or (): + assert keyword in ret, (keyword, ret) + return ret + + +class NginxProcess(SubProcess): + GRACEFUL_SIGNAL = signal.SIGQUIT + + def __init__( + self, + *, + temp_path, + config, + ): + assert isinstance(temp_path, Path) + self.conf_path = (temp_path / ("%s.nginx" % APP_IMPORT_NAME)).absolute() + self.p = None # type: subprocess.Popen[bytes] | None + self.temp_path = temp_path + with open(self.conf_path, "w+") as f: + f.write(config) + self._argv = [ + CMD_NGINX, + "-e", + "stderr", + "-c", + "%s" % self.conf_path, + ] + + +def generate_dummy_ssl_cert(cert_path, key_path): + # dummy self-signed cert + subprocess.check_output( + [ + CMD_OPENSSL, + "req", + "-new", + "-newkey", + "ed25519", + "-outform", + "PEM", + "-subj", + "/C=DE", + "-addext", + "subjectAltName=DNS:%s" % (HTTP_HOST), + "-days", + "1", + "-nodes", + "-x509", + "-keyout", + "%s" % (key_path), + "-out", + "%s" % (cert_path), + ], + shell=False, + ) + + +@pytest.fixture(scope="session") +def dummy_ssl_cert(tmp_path_factory): + base_tmp_dir = tmp_path_factory.getbasetemp().parent + crt = base_tmp_dir / "dummy.crt" + key = base_tmp_dir / "dummy.key" + print(crt, key) + # generate once, reuse for all tests + with FileLock("%s.lock" % crt): + if not crt.is_file(): + generate_dummy_ssl_cert(crt, key) + return crt, key + + +class GunicornProcess(SubProcess): + def __init__( + self, + *, + temp_path, + server_bind, + read_size=1024, + ssl_files=None, + worker_class="sync", + ): + self.conf_path = Path(os.devnull) + self.p = None # type: subprocess.Popen[bytes] | None + assert isinstance(temp_path, Path) + self.temp_path = temp_path + self.py_path = (temp_path / ("%s.py" % APP_IMPORT_NAME)).absolute() + with open(self.py_path, "w+") as f: + f.write(PY_APPLICATION) + + ssl_opt = [] + if ssl_files is not None: + cert_path, key_path = ssl_files + ssl_opt = [ + "--do-handshake-on-connect", + "--certfile=%s" % cert_path, + "--keyfile=%s" % key_path, + ] + + self._argv = [ + sys.executable, + "-m", + "gunicorn", + "--config=%s" % self.conf_path, + "--log-level=debug", + "--worker-class=%s" % worker_class, + "--workers=%d" % WORKER_COUNT, + # unsupported at the time this test was submitted + # "--buf-read-size=%d" % read_size, + "--enable-stdio-inheritance", + "--access-logfile=-", + "--disable-redirect-access-to-syslog", + "--graceful-timeout=%d" % (GRACEFUL_TIMEOUT,), + "--bind=%s" % server_bind, + "--reuse-port", + *ssl_opt, + "--", + f"{APP_IMPORT_NAME}:{APP_FUNC_NAME}", + ] + + +class Client: + def __init__(self, host_port): + # type: (str) -> None + self._host_port = host_port + + def __enter__(self): + # type: () -> Self + import http.client + + self.conn = http.client.HTTPConnection(self._host_port, timeout=2) + return self + + def __exit__(self, *exc): + self.conn.close() + + def get(self, path): + # type: () -> http.client.HTTPResponse + self.conn.request("GET", path, headers={"Host": HTTP_HOST}, body="GETBODY!") + return self.conn.getresponse() + + +# @pytest.mark.parametrize("read_size", [50+secrets.randbelow(2048)]) +@pytest.mark.parametrize("ssl", [False, True], ids=["plain", "ssl"]) +@pytest.mark.parametrize("worker_class", TEST_SIMPLE) +def test_nginx_proxy(*, ssl, worker_class, dummy_ssl_cert, read_size=1024): + # avoid ports <= 6144 which may be in use by CI runner + fixed_port = 1024 * 6 + secrets.randbelow(1024 * 9) + # FIXME: should also test inherited socket (LISTEN_FDS) + # FIXME: should also test non-inherited (named) UNIX socket + gunicorn_bind = "[::1]:%d" % fixed_port + + # syntax matches between nginx conf and http client + nginx_bind = "[::1]:%d" % (fixed_port + 1) + + static_dir = "/run/gunicorn/nonexist" + # gunicorn_upstream = "unix:/run/gunicorn/for-nginx.sock" + # syntax "[ipv6]:port" matches between gunicorn and nginx + gunicorn_upstream = gunicorn_bind + + with TemporaryDirectory(suffix="_temp_py") as tempdir_name, Client( + nginx_bind + ) as client: + temp_path = Path(tempdir_name) + nginx_config = NGINX_CONFIG_TEMPLATE.format( + server_bind=nginx_bind, + pid_path="%s" % (temp_path / "nginx.pid"), + gunicorn_upstream=gunicorn_upstream, + server_name=HTTP_HOST, + static_dir=static_dir, + proxy_method="https" if ssl else "http", + ) + + with GunicornProcess( + server_bind=gunicorn_bind, + worker_class=worker_class, + read_size=read_size, + ssl_files=dummy_ssl_cert if ssl else None, + temp_path=temp_path, + ) as server, NginxProcess( + config=nginx_config, + temp_path=temp_path, + ) as proxy: + proxy.read_stdio( + key=STDERR, + timeout_sec=4, + wait_for_keyword="start worker processes", + ) + + server.read_stdio( + key=STDERR, + wait_for_keyword="Arbiter booted", + timeout_sec=4, + expect={ + "Booting worker", + }, + ) + + for num_request in range(5): + path = "/pytest/%d" % (num_request) + response = client.get(path) + assert response.status == 200 + assert response.read() == b"response body from app" + + # using 1.1 to not fail on tornado reporting for 1.0 + # nginx sees our HTTP/1.1 request + proxy.read_stdio( + key=STDOUT, timeout_sec=2, wait_for_keyword="GET %s HTTP/1.1" % path + ) + # gunicorn sees the HTTP/1.1 request from nginx + server.read_stdio( + key=STDOUT, timeout_sec=2, wait_for_keyword="GET %s HTTP/1.1" % path + ) + + server.graceful_quit( + expect={ + "Handling signal: term", + "Shutting down: Master", + }, + ) + proxy.graceful_quit() From d3461ede73ff5f34eac008504706f765a264d122 Mon Sep 17 00:00:00 2001 From: "Paul J. Dorn" Date: Sat, 24 Aug 2024 02:47:46 +0200 Subject: [PATCH 22/22] test: measure request rate --- .github/workflows/tox.yml | 2 +- tests/test_wrk.py | 406 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 407 insertions(+), 1 deletion(-) create mode 100644 tests/test_wrk.py diff --git a/.github/workflows/tox.yml b/.github/workflows/tox.yml index b6be4e082..dbb099783 100644 --- a/.github/workflows/tox.yml +++ b/.github/workflows/tox.yml @@ -58,7 +58,7 @@ jobs: if: matrix.os == 'ubuntu-latest' run: | sudo systemctl mask nginx.service - sudo apt install nginx openssl + sudo apt install nginx openssl wrk - name: Install Dependencies run: | python -m pip install --upgrade pip diff --git a/tests/test_wrk.py b/tests/test_wrk.py new file mode 100644 index 000000000..f89dc0131 --- /dev/null +++ b/tests/test_wrk.py @@ -0,0 +1,406 @@ +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + +# hint: can see stdout as the (complex) test progresses using: +# python -B -m pytest -s -vvvv --ff \ +# --override-ini=addopts=--strict-markers --exitfirst \ +# -- tests/test_nginx.py + +import importlib +import os +import secrets +import shutil +import signal +import subprocess +import sys +import re +import time +from itertools import chain +from pathlib import Path +from tempfile import TemporaryDirectory +from typing import TYPE_CHECKING + +import pytest + +if TYPE_CHECKING: + import http.client + from typing import Any, NamedTuple, Self + +# path may be /usr/local/bin for packages ported from other OS +CMD_OPENSSL = shutil.which("openssl") +CMD_WRK = shutil.which("wrk") + +RATE = re.compile(r"^Requests/sec: *([0-9]+(?:\.[0-9]+)?)$", re.MULTILINE) + +pytestmark = pytest.mark.skipif( + CMD_OPENSSL is None or CMD_WRK is None, + reason="need openssl and wrk binaries", +) + +STDOUT = 0 +STDERR = 1 + +TEST_SIMPLE = [ + pytest.param("sync"), + "eventlet", + "gevent", + "gevent_wsgi", + "gevent_pywsgi", + # "tornado", + "gthread", + # "aiohttp.GunicornWebWorker", # different app signature + # "aiohttp.GunicornUVLoopWebWorker", # " +] # type: list[str|NamedTuple] + +WORKER_DEPENDS = { + "aiohttp.GunicornWebWorker": ["aiohttp"], + "aiohttp.GunicornUVLoopWebWorker": ["aiohttp", "uvloop"], + "uvicorn.workers.UvicornWorker": ["uvicorn"], # deprecated + "uvicorn.workers.UvicornH11Worker": ["uvicorn"], # deprecated + "uvicorn_worker.UvicornWorker": ["uvicorn_worker"], + "uvicorn_worker.UvicornH11Worker": ["uvicorn_worker"], + "eventlet": ["eventlet"], + "gevent": ["gevent"], + "gevent_wsgi": ["gevent"], + "gevent_pywsgi": ["gevent"], + "tornado": ["tornado"], +} +DEP_WANTED = set(chain(*WORKER_DEPENDS.values())) # type: set[str] +DEP_INSTALLED = set() # type: set[str] + +for dependency in DEP_WANTED: + try: + importlib.import_module(dependency) + DEP_INSTALLED.add(dependency) + except ImportError: + pass + +for worker_name, worker_needs in WORKER_DEPENDS.items(): + missing = list(pkg for pkg in worker_needs if pkg not in DEP_INSTALLED) + if missing: + for T in (TEST_SIMPLE,): + if worker_name not in T: + continue + T.remove(worker_name) + skipped_worker = pytest.param( + worker_name, marks=pytest.mark.skip("%s not installed" % (missing[0])) + ) + T.append(skipped_worker) + +WORKER_COUNT = 2 +GRACEFUL_TIMEOUT = 10 +APP_IMPORT_NAME = "testsyntax" +APP_FUNC_NAME = "myapp" +HTTP_HOST = "local.test" + +PY_APPLICATION = f""" +import time +def {APP_FUNC_NAME}(environ, start_response): + body = b"response body from app" + response_head = [ + ("Content-Type", "text/plain"), + ("Content-Length", "%d" % len(body)), + ] + start_response("200 OK", response_head) + time.sleep(0.1) + return iter([body]) +""" + +class SubProcess: + GRACEFUL_SIGNAL = signal.SIGTERM + + def __enter__(self): + # type: () -> Self + self.run() + return self + + def __exit__(self, *exc): + # type: (*Any) -> None + if self.p is None: + return + self.p.send_signal(signal.SIGKILL) + stdout, stderr = self.p.communicate(timeout=1 + GRACEFUL_TIMEOUT) + ret = self.p.returncode + assert stdout[-512:] == b"", stdout + assert ret == 0, (ret, stdout, stderr) + + def read_stdio(self, *, key, timeout_sec, wait_for_keyword, expect=None): + # type: (int, int, str, set[str]|None) -> str + # try: + # stdout, stderr = self.p.communicate(timeout=timeout) + # except subprocess.TimeoutExpired: + buf = ["", ""] + seen_keyword = 0 + unseen_keywords = list(expect or []) + poll_per_second = 20 + assert key in {0, 1}, key + assert self.p is not None # this helps static type checkers + assert self.p.stdout is not None # this helps static type checkers + assert self.p.stderr is not None # this helps static type checkers + for _ in range(timeout_sec * poll_per_second): + keep_reading = False + for fd, file in enumerate([self.p.stdout, self.p.stderr]): + read = file.read(64 * 1024) + if read is not None: + buf[fd] += read.decode("utf-8", "surrogateescape") + keep_reading = True + if seen_keyword or wait_for_keyword in buf[key]: + seen_keyword += 1 + for additional_keyword in tuple(unseen_keywords): + for somewhere in buf: + if additional_keyword in somewhere: + unseen_keywords.remove(additional_keyword) + # gathered all the context we wanted + if seen_keyword and not unseen_keywords: + if not keep_reading: + break + # not seen expected output? wait for % of original timeout + # .. maybe we will still see better error context that way + if seen_keyword > (0.5 * timeout_sec * poll_per_second): + break + # retcode = self.p.poll() + # if retcode is not None: + # break + time.sleep(1.0 / poll_per_second) + # assert buf[abs(key - 1)] == "" + assert wait_for_keyword in buf[key], (wait_for_keyword, *buf) + assert not unseen_keywords, (unseen_keywords, *buf) + return buf[key] + + def run(self): + # type: () -> None + self.p = subprocess.Popen( + self._argv, + bufsize=0, # allow read to return short + cwd=self.temp_path, + shell=False, + close_fds=True, + stdin=subprocess.DEVNULL, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + # creationflags=subprocess.CREATE_NEW_PROCESS_GROUP, + ) + os.set_blocking(self.p.stdout.fileno(), False) + os.set_blocking(self.p.stderr.fileno(), False) + assert self.p.stdout is not None # this helps static type checkers + + def graceful_quit(self, expect=None, ignore=None): + # type: (set[str]|None) -> str + if self.p is None: + raise AssertionError("called graceful_quit() when not running") + self.p.send_signal(self.GRACEFUL_SIGNAL) + # self.p.kill() + stdout = self.p.stdout.read(64 * 1024) or b"" + stderr = self.p.stderr.read(64 * 1024) or b"" + try: + o, e = self.p.communicate(timeout=GRACEFUL_TIMEOUT) + stdout += o + stderr += e + except subprocess.TimeoutExpired: + pass + out = stdout.decode("utf-8", "surrogateescape") + for line in out.split("\n"): + if any(i in line for i in (ignore or ())): + continue + assert line == "" + exitcode = self.p.poll() # will return None if running + self.p.stdout.close() + self.p.stderr.close() + assert exitcode == 0, (exitcode, stdout, stderr) + # print("output after signal: ", stdout, stderr, exitcode) + self.p = None + ret = stderr.decode("utf-8", "surrogateescape") + for keyword in expect or (): + assert keyword in ret, (keyword, ret) + return ret + + +def generate_dummy_ssl_cert(cert_path, key_path): + # dummy self-signed cert + subprocess.check_output( + [ + CMD_OPENSSL, + "req", + "-new", + "-newkey", + # "ed25519", + # OpenBSD 7.5 / LibreSSL 3.9.0 / Python 3.10.13 + # ssl.SSLError: [SSL: UNKNOWN_CERTIFICATE_TYPE] unknown certificate type (_ssl.c:3900) + # workaround: use RSA keys for testing + "rsa", + "-outform", + "PEM", + "-subj", + "/C=DE", + "-addext", + "subjectAltName=DNS:%s" % (HTTP_HOST), + "-days", + "1", + "-nodes", + "-x509", + "-keyout", + "%s" % (key_path), + "-out", + "%s" % (cert_path), + ], + shell=False, + ) + + +@pytest.fixture(scope="session") +def dummy_ssl_cert(tmp_path_factory): + base_tmp_dir = tmp_path_factory.getbasetemp().parent + crt = base_tmp_dir / "dummy.crt" + key = base_tmp_dir / "dummy.key" + print(crt, key) + # generate once, reuse for all tests + # with FileLock("%s.lock" % crt): + if not crt.is_file(): + generate_dummy_ssl_cert(crt, key) + return crt, key + + +class GunicornProcess(SubProcess): + def __init__( + self, + *, + temp_path, + server_bind, + read_size=1024, + ssl_files=None, + worker_class="sync", + ): + self.conf_path = Path(os.devnull) + self.p = None # type: subprocess.Popen[bytes] | None + assert isinstance(temp_path, Path) + self.temp_path = temp_path + self.py_path = (temp_path / ("%s.py" % APP_IMPORT_NAME)).absolute() + with open(self.py_path, "w+") as f: + f.write(PY_APPLICATION) + + ssl_opt = [] + if ssl_files is not None: + cert_path, key_path = ssl_files + ssl_opt = [ + "--do-handshake-on-connect", + "--certfile=%s" % cert_path, + "--keyfile=%s" % key_path, + ] + thread_opt = [] + if worker_class != "sync": + thread_opt = ["--threads=50"] + + self._argv = [ + sys.executable, + "-m", + "gunicorn", + "--config=%s" % self.conf_path, + "--log-level=info", + "--worker-class=%s" % worker_class, + "--workers=%d" % WORKER_COUNT, + # unsupported at the time this test was submitted + # "--buf-read-size=%d" % read_size, + "--enable-stdio-inheritance", + "--access-logfile=-", + "--disable-redirect-access-to-syslog", + "--graceful-timeout=%d" % (GRACEFUL_TIMEOUT,), + "--bind=%s" % server_bind, + "--reuse-port", + *thread_opt, + *ssl_opt, + "--", + f"{APP_IMPORT_NAME}:{APP_FUNC_NAME}", + ] + + +class Client: + def __init__(self, url_base): + # type: (str) -> None + self._url_base = url_base + self._env = os.environ.copy() + self._env["LC_ALL"] = "C" + + def __enter__(self): + # type: () -> Self + return self + + def __exit__(self, *exc): + pass + + def get(self, path): + # type: () -> http.client.HTTPResponse + assert path.startswith("/") + threads = 10 + connections = 100 + out = subprocess.check_output([CMD_WRK, "-t", "%d" % threads, "-c","%d" % connections, "-d5s","%s%s" % (self._url_base, path, )], shell=False, env=self._env) + + return out.decode("utf-8", "replace") + + +# @pytest.mark.parametrize("read_size", [50+secrets.randbelow(2048)]) +@pytest.mark.parametrize("ssl", [False, True], ids=["plain", "ssl"]) +@pytest.mark.parametrize("worker_class", TEST_SIMPLE) +def test_wrk(*, ssl, worker_class, dummy_ssl_cert, read_size=1024): + + if worker_class == "eventlet" and ssl: + pytest.skip("eventlet worker does not catch errors in ssl.wrap_socket") + + # avoid ports <= 6144 which may be in use by CI runner + fixed_port = 1024 * 6 + secrets.randbelow(1024 * 9) + # FIXME: should also test inherited socket (LISTEN_FDS) + # FIXME: should also test non-inherited (named) UNIX socket + gunicorn_bind = "[::1]:%d" % fixed_port + + proxy_method="https" if ssl else "http" + + with TemporaryDirectory(suffix="_temp_py") as tempdir_name, Client( + proxy_method + "://" + gunicorn_bind + ) as client: + temp_path = Path(tempdir_name) + + with GunicornProcess( + server_bind=gunicorn_bind, + worker_class=worker_class, + read_size=read_size, + ssl_files=dummy_ssl_cert if ssl else None, + temp_path=temp_path, + ) as server: + server.read_stdio( + key=STDERR, + wait_for_keyword="[INFO] Starting gunicorn", + timeout_sec=4, + expect={ + "[INFO] Booting worker", + }, + ) + + path = "/pytest/basic" + out = client.get(path) + print("##############\n" + out) + + extract = RATE.search(out) + assert extract is not None, out + rate = float(extract.groups()[0]) + if worker_class == "sync": + assert rate > 5 + else: + assert rate > 50 + + server.read_stdio( + key=STDOUT, timeout_sec=2, wait_for_keyword="GET %s HTTP/1.1" % path + ) + if ssl: + pass + #server.read_stdio( + # key=STDERR, + # wait_for_keyword="[DEBUG] ssl connection closed", + # timeout_sec=4, + #) + + server.graceful_quit( + ignore={"GET %s HTTP/1.1" % path, "Ignoring connection epipe", "Ignoring connection reset"}, + expect={ + "[INFO] Handling signal: term", + }, + )