From f5cfce98e8d57c6af2f0ab469ecf979f400f199a Mon Sep 17 00:00:00 2001 From: Milan Hauth Date: Mon, 11 Mar 2024 13:58:22 +0100 Subject: [PATCH 1/4] py: use subprocess to read with timeout --- py/src/gnumake_tokenpool/tokenpool.py | 179 ++++++++++++++++---------- 1 file changed, 111 insertions(+), 68 deletions(-) diff --git a/py/src/gnumake_tokenpool/tokenpool.py b/py/src/gnumake_tokenpool/tokenpool.py index 8aaf691..008a438 100644 --- a/py/src/gnumake_tokenpool/tokenpool.py +++ b/py/src/gnumake_tokenpool/tokenpool.py @@ -1,6 +1,5 @@ -import sys, os, stat, select, signal, time, re +import sys, os, stat, select, time, re, subprocess, _compat_pickle -from contextlib import contextmanager from datetime import datetime from typing import List, Any, Iterator, Never @@ -15,6 +14,52 @@ class InvalidToken(Exception): pass +def _parse_exception(_bytes): + print("_bytes", repr(_bytes)) + e_msg = _bytes.strip() + e_trace = b"" + e_name = b"" + parts = e_msg.rsplit(b"\n", 1) + print("parts", repr(parts)) + if len(parts) == 2: + e_trace, e_msg = parts + parts = e_msg.split(b": ", 1) + print("parts", repr(parts)) + if len(parts) == 2: + e_name, e_msg = parts + e_class = Exception + if re.fullmatch(rb"[A-Z][A-Za-z]+", e_name): + # eval is evil... + # e_class_2 = eval(e_name) + # if issubclass(e_class_2, BaseException): + # e_class = e_class_2 + # also check e_name versus list of built-in exceptions + # https://docs.python.org/3/library/exceptions.html + e_name = e_name.decode("ascii") + if ( + e_name in _compat_pickle.PYTHON2_EXCEPTIONS or + e_name in _compat_pickle.PYTHON3_OSERROR_EXCEPTIONS or + e_name in _compat_pickle.PYTHON3_IMPORTERROR_EXCEPTIONS or + e_name in _compat_pickle.MULTIPROCESSING_EXCEPTIONS or + e_name == "OSError" or + e_name == "ImportError" + ): + e_class = eval(e_name) + try: + e_msg = e_msg.decode("utf8") + except UnicodeDecodeError: + pass + try: + e_trace = e_trace.decode("utf8") + except UnicodeDecodeError: + pass + try: + text = _bytes.decode("utf8").strip() + except UnicodeDecodeError: + text = repr(stderr) + return e_class, e_msg + "\n\nexception parsed from inner exception:\n\n" + text + + class JobClient: "jobclient for the gnumake jobserver" @@ -27,11 +72,9 @@ def __init__( max_load: int | None = None, debug: bool | None = None, debug2: bool | None = None, - use_cysignals: bool | None = None, ): self._fdRead: int | None = None - self._fdReadDup: int | None = None self._fdWrite: int | None = None self._fifoPath = None self._fdFifo = None @@ -51,16 +94,6 @@ def __init__( self._log = self._get_log(self._debug) self._log2 = self._get_log(self._debug2) - if use_cysignals is not False: - try: - from cysignals.pysignals import changesignal - except ImportError: - if use_cysignals: - raise - else: - self._log("init: using cysignals.pysignals.changesignal") - self._changesignal = changesignal # type: ignore - makeFlags = os.environ.get("MAKEFLAGS", "") if makeFlags: self._log(f"init: MAKEFLAGS: {makeFlags}") @@ -181,7 +214,11 @@ def maxLoad(self) -> int | None: return self._maxLoad - def acquire(self) -> int | None: + def acquire( + self, + # a successful read takes about 0.1 to 0.2 seconds + timeout=0.5, + ) -> int | None: # http://make.mad-scientist.net/papers/jobserver-implementation/ # check if fdRead is readable @@ -190,50 +227,66 @@ def acquire(self) -> int | None: self._log2(f"acquire failed: fd is empty") return None - # handle race condition: - # between select and read, another process can read from the pipe. - # when the pipe is empty, read can block forever. - # by closing fdReadDup, we interrupt read - if self._fdRead and not self._fdReadDup: - self._fdReadDup = os.dup(self._fdRead) - - if not self._fdReadDup: - self._log(f"acquire: failed to duplicate fd") - return None - - def read_timeout_handler(_signum: int, _frame: Any) -> None: + # pipe one byte from stdin to stdout + # test: printf + | python -c "import os; os.write(1, os.read(0, 1))" + read_byte_py = "\n".join(( + #'raise OSError("test: asdf")', # test exception + 'import sys', + 'import os', + #'sys.exit(123)', # test error + #'import time; time.sleep(999)', # test timeout + 'try:', + #' _bytes = os.read(' + str(self._fdRead) + ', 1)', + ' _bytes = os.read(0, 1)', # read from stdin + 'except BlockingIOError as e:', + ' if e.errno == 11:', # Resource temporarily unavailable + ' import sys', + ' sys.exit(e.errno)', + ' raise e', # unexpected error + 'except OSError as e:', + ' if e.errno == 9:', # EBADF: Bad file descriptor = pipe is closed + ' sys.exit(e.errno)', + ' raise e', # unexpected error + 'os.write(1, _bytes)', # write to stdout + )) + + args = [ + sys.executable, # python + "-c", read_byte_py, + ] + + self._log(f"acquire: read with timeout {timeout} ...") + buffer = b"" + t1 = time.time() + try: + proc = subprocess.run( + args, + capture_output=True, + timeout=timeout, + stdin=self._fdRead, + #check=True, # raise CalledProcessError + ) + buffer = proc.stdout + t2 = time.time() + self._log(f"acquire: read done after {t2 - t1} seconds") + except subprocess.TimeoutExpired: self._log(f"acquire: read timeout") - assert self._fdReadDup - os.close(self._fdReadDup) - - # SIGALRM = timer has fired = read timeout - with self._changesignal(signal.SIGALRM, read_timeout_handler): - try: - # Set SA_RESTART to limit EINTR occurrences. - # by default, signal.signal clears the SA_RESTART flag. - # TODO is this necessary? - signal.siginterrupt(signal.SIGALRM, False) - - read_timeout = 0.1 - signal.setitimer(signal.ITIMER_REAL, read_timeout) # set timer for SIGALRM. unix only - - # blocking read - self._log(f"acquire: read with timeout {read_timeout} ...") - buffer = b"" - try: - buffer = os.read(self._fdReadDup, 1) - except BlockingIOError as e: - if e.errno == 11: # Resource temporarily unavailable - self._log2(f"acquire failed: fd is empty 2") - return None # jobserver is full, try again later - raise e # unexpected error - except OSError as e: - if e.errno == 9: # EBADF: Bad file descriptor = pipe is closed - self._log(f"acquire: read failed: {e}") - return None # jobserver is full, try again later - raise e # unexpected error - finally: - signal.setitimer(signal.ITIMER_REAL, 0) # clear timer. unix only + return None + #raise TimeoutError + #except subprocess.CalledProcessError as proc: + if proc.returncode != 0: + if proc.returncode == 11: # Resource temporarily unavailable + self._log2(f"acquire failed: fd is empty 2") + return None # jobserver is full, try again later + if proc.returncode == 9: # EBADF: Bad file descriptor = pipe is closed + #self._log(f"acquire: read failed: {e}") + self._log(f"acquire: read failed: Bad file descriptor") + return None # jobserver is full, try again later + if proc.returncode == 1: + e_class, e_msg = _parse_exception(proc.stderr) + raise e_class(e_msg) # unexpected error + e_msg = f"read_byte_py process returned {proc.returncode}. stdout={repr(proc.stdout)}. stderr={repr(proc.stderr)}" + raise Exception(e_msg) # unexpected error #if len(buffer) == 0: # return None @@ -315,13 +368,3 @@ def _get_stat(self, fd: int) -> os.stat_result: self._log(f"init failed: fd {fd} stat failed: {e}") raise NoJobServer() raise e # unexpected error - - @staticmethod - @contextmanager - def _changesignal(sig: int, action: Any) -> Iterator[None]: - old_sig_handler = signal.signal(sig, action) - try: - yield - finally: - # clear signal handler - signal.signal(sig, old_sig_handler) From ab5c501da15f159e01791b1535766852b618dc79 Mon Sep 17 00:00:00 2001 From: Milan Hauth Date: Mon, 11 Mar 2024 19:53:35 +0100 Subject: [PATCH 2/4] fixup: move _read_byte_py to global string --- py/src/gnumake_tokenpool/tokenpool.py | 50 ++++++++++++++------------- 1 file changed, 26 insertions(+), 24 deletions(-) diff --git a/py/src/gnumake_tokenpool/tokenpool.py b/py/src/gnumake_tokenpool/tokenpool.py index 008a438..724b183 100644 --- a/py/src/gnumake_tokenpool/tokenpool.py +++ b/py/src/gnumake_tokenpool/tokenpool.py @@ -14,6 +14,31 @@ class InvalidToken(Exception): pass +# pipe one byte from stdin to stdout +# test: printf + | python -c "import os; os.write(1, os.read(0, 1))" +# raise OSError("test: asdf") # test exception +# sys.exit(123) # test error +# import time; time.sleep(999) # test timeout +# if e.errno == 11: # Resource temporarily unavailable +# if e.errno == 9: # EBADF: Bad file descriptor = pipe is closed +_read_byte_py = """ +import os +try: + _bytes = os.read(0, 1) +except BlockingIOError as e: + if e.errno == 11: + import sys + sys.exit(e.errno) + raise e +except OSError as e: + if e.errno == 9: + import sys + sys.exit(e.errno) + raise e +os.write(1, _bytes) +""" + + def _parse_exception(_bytes): print("_bytes", repr(_bytes)) e_msg = _bytes.strip() @@ -227,32 +252,9 @@ def acquire( self._log2(f"acquire failed: fd is empty") return None - # pipe one byte from stdin to stdout - # test: printf + | python -c "import os; os.write(1, os.read(0, 1))" - read_byte_py = "\n".join(( - #'raise OSError("test: asdf")', # test exception - 'import sys', - 'import os', - #'sys.exit(123)', # test error - #'import time; time.sleep(999)', # test timeout - 'try:', - #' _bytes = os.read(' + str(self._fdRead) + ', 1)', - ' _bytes = os.read(0, 1)', # read from stdin - 'except BlockingIOError as e:', - ' if e.errno == 11:', # Resource temporarily unavailable - ' import sys', - ' sys.exit(e.errno)', - ' raise e', # unexpected error - 'except OSError as e:', - ' if e.errno == 9:', # EBADF: Bad file descriptor = pipe is closed - ' sys.exit(e.errno)', - ' raise e', # unexpected error - 'os.write(1, _bytes)', # write to stdout - )) - args = [ sys.executable, # python - "-c", read_byte_py, + "-c", _read_byte_py, ] self._log(f"acquire: read with timeout {timeout} ...") From 092cf3514fd56d8289cb08371cdc664cff4a84f3 Mon Sep 17 00:00:00 2001 From: Milan Hauth Date: Tue, 12 Mar 2024 12:59:14 +0100 Subject: [PATCH 3/4] move code to read_byte.py --- py/src/gnumake_tokenpool/read_byte.py | 24 ++++++++++++++++++++++ py/src/gnumake_tokenpool/tokenpool.py | 29 +++------------------------ 2 files changed, 27 insertions(+), 26 deletions(-) create mode 100644 py/src/gnumake_tokenpool/read_byte.py diff --git a/py/src/gnumake_tokenpool/read_byte.py b/py/src/gnumake_tokenpool/read_byte.py new file mode 100644 index 0000000..200b4e5 --- /dev/null +++ b/py/src/gnumake_tokenpool/read_byte.py @@ -0,0 +1,24 @@ +# pipe one byte from stdin to stdout + +# raise OSError("test: asdf") # test exception +# import sys; sys.exit(123) # test error +# import time; time.sleep(999) # test timeout + +import os + +try: + _bytes = os.read(0, 1) +except BlockingIOError as e: + if e.errno == 11: + # Resource temporarily unavailable + import sys + sys.exit(e.errno) + raise e +except OSError as e: + if e.errno == 9: + # Bad file descriptor = pipe is closed + import sys + sys.exit(e.errno) + raise e + +os.write(1, _bytes) diff --git a/py/src/gnumake_tokenpool/tokenpool.py b/py/src/gnumake_tokenpool/tokenpool.py index 724b183..79338c8 100644 --- a/py/src/gnumake_tokenpool/tokenpool.py +++ b/py/src/gnumake_tokenpool/tokenpool.py @@ -14,31 +14,6 @@ class InvalidToken(Exception): pass -# pipe one byte from stdin to stdout -# test: printf + | python -c "import os; os.write(1, os.read(0, 1))" -# raise OSError("test: asdf") # test exception -# sys.exit(123) # test error -# import time; time.sleep(999) # test timeout -# if e.errno == 11: # Resource temporarily unavailable -# if e.errno == 9: # EBADF: Bad file descriptor = pipe is closed -_read_byte_py = """ -import os -try: - _bytes = os.read(0, 1) -except BlockingIOError as e: - if e.errno == 11: - import sys - sys.exit(e.errno) - raise e -except OSError as e: - if e.errno == 9: - import sys - sys.exit(e.errno) - raise e -os.write(1, _bytes) -""" - - def _parse_exception(_bytes): print("_bytes", repr(_bytes)) e_msg = _bytes.strip() @@ -108,6 +83,8 @@ def __init__( self._fileRead = None self._fileWrite = None + self._read_byte_py_path = os.path.dirname(__file__) + "/read_byte.py" + self._debug = bool(os.environ.get("DEBUG_JOBCLIENT")) self._debug2 = bool(os.environ.get("DEBUG_JOBCLIENT_2")) # more verbose @@ -254,7 +231,7 @@ def acquire( args = [ sys.executable, # python - "-c", _read_byte_py, + self._read_byte_py_path, ] self._log(f"acquire: read with timeout {timeout} ...") From 016495ec45d31cf85cd75c2cec5bfd236cc46f1c Mon Sep 17 00:00:00 2001 From: Milan Hauth Date: Tue, 12 Mar 2024 12:59:58 +0100 Subject: [PATCH 4/4] disable debug prints --- py/src/gnumake_tokenpool/tokenpool.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/py/src/gnumake_tokenpool/tokenpool.py b/py/src/gnumake_tokenpool/tokenpool.py index 79338c8..fa14f97 100644 --- a/py/src/gnumake_tokenpool/tokenpool.py +++ b/py/src/gnumake_tokenpool/tokenpool.py @@ -15,16 +15,16 @@ class InvalidToken(Exception): def _parse_exception(_bytes): - print("_bytes", repr(_bytes)) + #print("_bytes", repr(_bytes)) e_msg = _bytes.strip() e_trace = b"" e_name = b"" parts = e_msg.rsplit(b"\n", 1) - print("parts", repr(parts)) + #print("parts", repr(parts)) if len(parts) == 2: e_trace, e_msg = parts parts = e_msg.split(b": ", 1) - print("parts", repr(parts)) + #print("parts", repr(parts)) if len(parts) == 2: e_name, e_msg = parts e_class = Exception