diff --git a/py/src/gnumake_tokenpool/read_byte.py b/py/src/gnumake_tokenpool/read_byte.py new file mode 100644 index 0000000..200b4e5 --- /dev/null +++ b/py/src/gnumake_tokenpool/read_byte.py @@ -0,0 +1,24 @@ +# pipe one byte from stdin to stdout + +# raise OSError("test: asdf") # test exception +# import sys; sys.exit(123) # test error +# import time; time.sleep(999) # test timeout + +import os + +try: + _bytes = os.read(0, 1) +except BlockingIOError as e: + if e.errno == 11: + # Resource temporarily unavailable + import sys + sys.exit(e.errno) + raise e +except OSError as e: + if e.errno == 9: + # Bad file descriptor = pipe is closed + import sys + sys.exit(e.errno) + raise e + +os.write(1, _bytes) diff --git a/py/src/gnumake_tokenpool/tokenpool.py b/py/src/gnumake_tokenpool/tokenpool.py index 8aaf691..fa14f97 100644 --- a/py/src/gnumake_tokenpool/tokenpool.py +++ b/py/src/gnumake_tokenpool/tokenpool.py @@ -1,6 +1,5 @@ -import sys, os, stat, select, signal, time, re +import sys, os, stat, select, time, re, subprocess, _compat_pickle -from contextlib import contextmanager from datetime import datetime from typing import List, Any, Iterator, Never @@ -15,6 +14,52 @@ class InvalidToken(Exception): pass +def _parse_exception(_bytes): + #print("_bytes", repr(_bytes)) + e_msg = _bytes.strip() + e_trace = b"" + e_name = b"" + parts = e_msg.rsplit(b"\n", 1) + #print("parts", repr(parts)) + if len(parts) == 2: + e_trace, e_msg = parts + parts = e_msg.split(b": ", 1) + #print("parts", repr(parts)) + if len(parts) == 2: + e_name, e_msg = parts + e_class = Exception + if re.fullmatch(rb"[A-Z][A-Za-z]+", e_name): + # eval is evil... + # e_class_2 = eval(e_name) + # if issubclass(e_class_2, BaseException): + # e_class = e_class_2 + # also check e_name versus list of built-in exceptions + # https://docs.python.org/3/library/exceptions.html + e_name = e_name.decode("ascii") + if ( + e_name in _compat_pickle.PYTHON2_EXCEPTIONS or + e_name in _compat_pickle.PYTHON3_OSERROR_EXCEPTIONS or + e_name in _compat_pickle.PYTHON3_IMPORTERROR_EXCEPTIONS or + e_name in _compat_pickle.MULTIPROCESSING_EXCEPTIONS or + e_name == "OSError" or + e_name == "ImportError" + ): + e_class = eval(e_name) + try: + e_msg = e_msg.decode("utf8") + except UnicodeDecodeError: + pass + try: + e_trace = e_trace.decode("utf8") + except UnicodeDecodeError: + pass + try: + text = _bytes.decode("utf8").strip() + except UnicodeDecodeError: + text = repr(stderr) + return e_class, e_msg + "\n\nexception parsed from inner exception:\n\n" + text + + class JobClient: "jobclient for the gnumake jobserver" @@ -27,11 +72,9 @@ def __init__( max_load: int | None = None, debug: bool | None = None, debug2: bool | None = None, - use_cysignals: bool | None = None, ): self._fdRead: int | None = None - self._fdReadDup: int | None = None self._fdWrite: int | None = None self._fifoPath = None self._fdFifo = None @@ -40,6 +83,8 @@ def __init__( self._fileRead = None self._fileWrite = None + self._read_byte_py_path = os.path.dirname(__file__) + "/read_byte.py" + self._debug = bool(os.environ.get("DEBUG_JOBCLIENT")) self._debug2 = bool(os.environ.get("DEBUG_JOBCLIENT_2")) # more verbose @@ -51,16 +96,6 @@ def __init__( self._log = self._get_log(self._debug) self._log2 = self._get_log(self._debug2) - if use_cysignals is not False: - try: - from cysignals.pysignals import changesignal - except ImportError: - if use_cysignals: - raise - else: - self._log("init: using cysignals.pysignals.changesignal") - self._changesignal = changesignal # type: ignore - makeFlags = os.environ.get("MAKEFLAGS", "") if makeFlags: self._log(f"init: MAKEFLAGS: {makeFlags}") @@ -181,7 +216,11 @@ def maxLoad(self) -> int | None: return self._maxLoad - def acquire(self) -> int | None: + def acquire( + self, + # a successful read takes about 0.1 to 0.2 seconds + timeout=0.5, + ) -> int | None: # http://make.mad-scientist.net/papers/jobserver-implementation/ # check if fdRead is readable @@ -190,50 +229,43 @@ def acquire(self) -> int | None: self._log2(f"acquire failed: fd is empty") return None - # handle race condition: - # between select and read, another process can read from the pipe. - # when the pipe is empty, read can block forever. - # by closing fdReadDup, we interrupt read - if self._fdRead and not self._fdReadDup: - self._fdReadDup = os.dup(self._fdRead) - - if not self._fdReadDup: - self._log(f"acquire: failed to duplicate fd") - return None + args = [ + sys.executable, # python + self._read_byte_py_path, + ] - def read_timeout_handler(_signum: int, _frame: Any) -> None: + self._log(f"acquire: read with timeout {timeout} ...") + buffer = b"" + t1 = time.time() + try: + proc = subprocess.run( + args, + capture_output=True, + timeout=timeout, + stdin=self._fdRead, + #check=True, # raise CalledProcessError + ) + buffer = proc.stdout + t2 = time.time() + self._log(f"acquire: read done after {t2 - t1} seconds") + except subprocess.TimeoutExpired: self._log(f"acquire: read timeout") - assert self._fdReadDup - os.close(self._fdReadDup) - - # SIGALRM = timer has fired = read timeout - with self._changesignal(signal.SIGALRM, read_timeout_handler): - try: - # Set SA_RESTART to limit EINTR occurrences. - # by default, signal.signal clears the SA_RESTART flag. - # TODO is this necessary? - signal.siginterrupt(signal.SIGALRM, False) - - read_timeout = 0.1 - signal.setitimer(signal.ITIMER_REAL, read_timeout) # set timer for SIGALRM. unix only - - # blocking read - self._log(f"acquire: read with timeout {read_timeout} ...") - buffer = b"" - try: - buffer = os.read(self._fdReadDup, 1) - except BlockingIOError as e: - if e.errno == 11: # Resource temporarily unavailable - self._log2(f"acquire failed: fd is empty 2") - return None # jobserver is full, try again later - raise e # unexpected error - except OSError as e: - if e.errno == 9: # EBADF: Bad file descriptor = pipe is closed - self._log(f"acquire: read failed: {e}") - return None # jobserver is full, try again later - raise e # unexpected error - finally: - signal.setitimer(signal.ITIMER_REAL, 0) # clear timer. unix only + return None + #raise TimeoutError + #except subprocess.CalledProcessError as proc: + if proc.returncode != 0: + if proc.returncode == 11: # Resource temporarily unavailable + self._log2(f"acquire failed: fd is empty 2") + return None # jobserver is full, try again later + if proc.returncode == 9: # EBADF: Bad file descriptor = pipe is closed + #self._log(f"acquire: read failed: {e}") + self._log(f"acquire: read failed: Bad file descriptor") + return None # jobserver is full, try again later + if proc.returncode == 1: + e_class, e_msg = _parse_exception(proc.stderr) + raise e_class(e_msg) # unexpected error + e_msg = f"read_byte_py process returned {proc.returncode}. stdout={repr(proc.stdout)}. stderr={repr(proc.stderr)}" + raise Exception(e_msg) # unexpected error #if len(buffer) == 0: # return None @@ -315,13 +347,3 @@ def _get_stat(self, fd: int) -> os.stat_result: self._log(f"init failed: fd {fd} stat failed: {e}") raise NoJobServer() raise e # unexpected error - - @staticmethod - @contextmanager - def _changesignal(sig: int, action: Any) -> Iterator[None]: - old_sig_handler = signal.signal(sig, action) - try: - yield - finally: - # clear signal handler - signal.signal(sig, old_sig_handler)