From 28577a374745c085e9fe0944bb18f3d4e910a9ad Mon Sep 17 00:00:00 2001 From: Anthony Shaw Date: Fri, 19 May 2023 10:25:58 +1000 Subject: [PATCH 1/2] Add a name argument similar to threads --- .gitignore | 3 ++- src/extrainterpreters/base_interpreter.py | 8 +++++--- src/extrainterpreters/piped_interpreter.py | 7 ++----- src/extrainterpreters/simple_interpreter.py | 4 ++-- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/.gitignore b/.gitignore index 6d4840d..7086807 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,5 @@ *swp .eggs build/ - +*.py[co] +[.]venv/ \ No newline at end of file diff --git a/src/extrainterpreters/base_interpreter.py b/src/extrainterpreters/base_interpreter.py index b3bb5b4..7e1b8f2 100644 --- a/src/extrainterpreters/base_interpreter.py +++ b/src/extrainterpreters/base_interpreter.py @@ -14,10 +14,11 @@ class BaseInterpreter: - def __init__(self): + def __init__(self, name=None): # .intno and .id are both set to the interpreter id, # but .intno is set to None when the interpreter is closed. self.intno = self.id = None + self.name = name self.lock = threading.RLock() def start(self): @@ -25,6 +26,7 @@ def start(self): raise RuntimeError("Interpreter already started") with self.lock: self.intno = self.id = interpreters.create() + self.name = self.name or f"Subinterpreter #{self.intno}" running_interpreters[self.intno] = self self.thread = None self._create_channel() @@ -44,11 +46,11 @@ def close(self, *args): try: while time.monotonic() - self._started_at < _TTL: # subinterpreters need sometime to stabilize. - # shutting then imediatelly may lead to a segfault. + # shutting then immediately may lead to a segfault. time.sleep(0.002) if interpreters.is_running(self.intno): # TBD: close on "at exit" - # # but really, just enduser code running with "run_string΅ on other thread should + # # but really, just end user code running with "run_string" on other thread should # leave the sub-interpreter on this state. return interpreters.destroy(self.intno) diff --git a/src/extrainterpreters/piped_interpreter.py b/src/extrainterpreters/piped_interpreter.py index 40244c1..03545c4 100644 --- a/src/extrainterpreters/piped_interpreter.py +++ b/src/extrainterpreters/piped_interpreter.py @@ -41,8 +41,8 @@ class FuncData(StructBase): def _dispatcher(pipe, buffer): """the core running function in a PipedInterpreter - This is responsible for watching comunications with the parent - interpreter, dispathing execution, and place the return values + This is responsible for watching communications with the parent + interpreter, dispatching execution, and place the return values """ while True: @@ -113,6 +113,3 @@ def _close_channel(self): self.pipe.read(timeout=None) self.pipe.close() super()._close_channel() - - - diff --git a/src/extrainterpreters/simple_interpreter.py b/src/extrainterpreters/simple_interpreter.py index fec2fb2..7e69fda 100644 --- a/src/extrainterpreters/simple_interpreter.py +++ b/src/extrainterpreters/simple_interpreter.py @@ -114,9 +114,9 @@ class SimpleInterpreter(_BufferedInterpreter): This implementation uses a memory area (by default of 10MB), to send pickled objects back and fort at fixed offsets. """ - def __init__(self, target=None, args=(), kwargs=None): + def __init__(self, name=None, target=None, args=(), kwargs=None): kwargs = kwargs or {} - super().__init__() + super().__init__(name=name) self.target = target self._args = args self._kwargs = kwargs From 23d23803569eb908365161bb0b62b488655fa168 Mon Sep 17 00:00:00 2001 From: Anthony Shaw Date: Fri, 19 May 2023 10:39:23 +1000 Subject: [PATCH 2/2] Initial sketch --- src/extrainterpreters/executor.py | 119 ++++++++++++++++++++++++++++++ 1 file changed, 119 insertions(+) create mode 100644 src/extrainterpreters/executor.py diff --git a/src/extrainterpreters/executor.py b/src/extrainterpreters/executor.py new file mode 100644 index 0000000..fddf6c0 --- /dev/null +++ b/src/extrainterpreters/executor.py @@ -0,0 +1,119 @@ +import concurrent.futures +from concurrent.futures.thread import _WorkItem, _worker +import itertools +import os +import queue +import threading +import weakref +from extrainterpreters import Interpreter + +_threads_queues = weakref.WeakKeyDictionary() +_shutdown = False +# Lock that ensures that new workers are not created while the interpreter is +# shutting down. Must be held while mutating _threads_queues and _shutdown. +_global_shutdown_lock = threading.Lock() + + +class SubinterpreterPoolExecutor(concurrent.futures.Executor): + + # Used to assign unique thread names when thread_name_prefix is not supplied. + _counter = itertools.count().__next__ + + def __init__(self, max_workers=None, thread_name_prefix='', + initializer=None, initargs=()): + """Initializes a new SubinterpreterPoolExecutor instance. + + Args: + max_workers: The maximum number of threads that can be used to + execute the given calls. + thread_name_prefix: An optional name prefix to give our threads. + initializer: A callable used to initialize worker threads. + initargs: A tuple of arguments to pass to the initializer. + """ + if max_workers is None: + # SubinterpreterPoolExecutor is often used to: + # * CPU bound task which releases GIL + # * I/O bound task (which releases GIL, of course) + # + # We use cpu_count + 4 for both types of tasks. + # But we limit it to 32 to avoid consuming surprisingly large resource + # on many core machine. + max_workers = min(32, (os.cpu_count() or 1) + 4) + if max_workers <= 0: + raise ValueError("max_workers must be greater than 0") + + if initializer is not None and not callable(initializer): + raise TypeError("initializer must be a callable") + + self._max_workers = max_workers + self._work_queue = queue.SimpleQueue() + self._idle_semaphore = threading.Semaphore(0) + self._threads = set() + self._broken = False + self._shutdown = False + self._shutdown_lock = threading.Lock() + self._thread_name_prefix = (thread_name_prefix or + ("SubinterpreterPoolExecutor-%d" % self._counter())) + self._initializer = initializer + self._initargs = initargs + + def submit(self, fn, /, *args, **kwargs): + with self._shutdown_lock, _global_shutdown_lock: + + if self._shutdown: + raise RuntimeError('cannot schedule new futures after shutdown') + if _shutdown: + raise RuntimeError('cannot schedule new futures after ' + 'interpreter shutdown') + + f = concurrent.futures.Future() + w = _WorkItem(f, fn, args, kwargs) + + self._work_queue.put(w) + self._adjust_thread_count() + return f + + def _adjust_thread_count(self): + # if idle threads are available, don't spin new threads + if self._idle_semaphore.acquire(timeout=0): + return + + # When the executor gets lost, the weakref callback will wake up + # the worker threads. + def weakref_cb(_, q=self._work_queue): + q.put(None) + + num_threads = len(self._threads) + if num_threads < self._max_workers: + interp_name = '%s_%d' % (self._thread_name_prefix or self, + num_threads) + t = Interpreter(name=interp_name, + target=_worker, + args=(None, # weakref.ref(self, weakref_cb), # TODO : Pickleable alternative + self._work_queue, # TODO : Pickleable + self._initializer, # TODO: Pickleable + self._initargs)) + t.start() + self._threads.add(t) + _threads_queues[t] = self._work_queue + + def shutdown(self, wait=True, *, cancel_futures=False): + with self._shutdown_lock: + self._shutdown = True + if cancel_futures: + # Drain all work items from the queue, and then cancel their + # associated futures. + while True: + try: + work_item = self._work_queue.get_nowait() + except queue.Empty: + break + if work_item is not None: + work_item.future.cancel() + + # Send a wake-up to prevent threads calling + # _work_queue.get(block=True) from permanently blocking. + self._work_queue.put(None) + if wait: + for t in self._threads: + t.join()