Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Interpreter Pool Executor #4

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@
*swp
.eggs
build/

*.py[co]
[.]venv/
8 changes: 5 additions & 3 deletions src/extrainterpreters/base_interpreter.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,19 @@

class BaseInterpreter:

def __init__(self):
def __init__(self, name=None):
# .intno and .id are both set to the interpreter id,
# but .intno is set to None when the interpreter is closed.
self.intno = self.id = None
self.name = name
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you also add name to __repr__ in the same batch?

self.lock = threading.RLock()

def start(self):
if self.intno is not None:
raise RuntimeError("Interpreter already started")
with self.lock:
self.intno = self.id = interpreters.create()
self.name = self.name or f"Subinterpreter #{self.intno}"
running_interpreters[self.intno] = self
self.thread = None
self._create_channel()
Expand All @@ -44,11 +46,11 @@ def close(self, *args):
try:
while time.monotonic() - self._started_at < _TTL:
# subinterpreters need sometime to stabilize.
# shutting then imediatelly may lead to a segfault.
# shutting then immediately may lead to a segfault.
time.sleep(0.002)
if interpreters.is_running(self.intno):
# TBD: close on "at exit"
# # but really, just enduser code running with "run_string΅ on other thread should
# # but really, just end user code running with "run_string" on other thread should
# leave the sub-interpreter on this state.
return
interpreters.destroy(self.intno)
Expand Down
119 changes: 119 additions & 0 deletions src/extrainterpreters/executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import concurrent.futures
from concurrent.futures.thread import _WorkItem, _worker
import itertools
import os
import queue
import threading
import weakref
from extrainterpreters import Interpreter

_threads_queues = weakref.WeakKeyDictionary()
_shutdown = False
# Lock that ensures that new workers are not created while the interpreter is
# shutting down. Must be held while mutating _threads_queues and _shutdown.
_global_shutdown_lock = threading.Lock()


class SubinterpreterPoolExecutor(concurrent.futures.Executor):

# Used to assign unique thread names when thread_name_prefix is not supplied.
_counter = itertools.count().__next__

def __init__(self, max_workers=None, thread_name_prefix='',
initializer=None, initargs=()):
"""Initializes a new SubinterpreterPoolExecutor instance.

Args:
max_workers: The maximum number of threads that can be used to
execute the given calls.
thread_name_prefix: An optional name prefix to give our threads.
initializer: A callable used to initialize worker threads.
initargs: A tuple of arguments to pass to the initializer.
"""
if max_workers is None:
# SubinterpreterPoolExecutor is often used to:
# * CPU bound task which releases GIL
# * I/O bound task (which releases GIL, of course)
#
# We use cpu_count + 4 for both types of tasks.
# But we limit it to 32 to avoid consuming surprisingly large resource
# on many core machine.
max_workers = min(32, (os.cpu_count() or 1) + 4)
if max_workers <= 0:
raise ValueError("max_workers must be greater than 0")

if initializer is not None and not callable(initializer):
raise TypeError("initializer must be a callable")

self._max_workers = max_workers
self._work_queue = queue.SimpleQueue()
tonybaloney marked this conversation as resolved.
Show resolved Hide resolved
self._idle_semaphore = threading.Semaphore(0)
self._threads = set()
self._broken = False
self._shutdown = False
self._shutdown_lock = threading.Lock()
self._thread_name_prefix = (thread_name_prefix or
("SubinterpreterPoolExecutor-%d" % self._counter()))
tonybaloney marked this conversation as resolved.
Show resolved Hide resolved
self._initializer = initializer
self._initargs = initargs

def submit(self, fn, /, *args, **kwargs):
with self._shutdown_lock, _global_shutdown_lock:

if self._shutdown:
raise RuntimeError('cannot schedule new futures after shutdown')
if _shutdown:
raise RuntimeError('cannot schedule new futures after '
'interpreter shutdown')

f = concurrent.futures.Future()
w = _WorkItem(f, fn, args, kwargs)

self._work_queue.put(w)
self._adjust_thread_count()
return f

def _adjust_thread_count(self):
# if idle threads are available, don't spin new threads
if self._idle_semaphore.acquire(timeout=0):
return

# When the executor gets lost, the weakref callback will wake up
# the worker threads.
def weakref_cb(_, q=self._work_queue):
q.put(None)

num_threads = len(self._threads)
if num_threads < self._max_workers:
interp_name = '%s_%d' % (self._thread_name_prefix or self,
num_threads)
t = Interpreter(name=interp_name,
target=_worker,
args=(None, # weakref.ref(self, weakref_cb), # TODO : Pickleable alternative
self._work_queue, # TODO : Pickleable
self._initializer, # TODO: Pickleable
self._initargs))
t.start()
self._threads.add(t)
_threads_queues[t] = self._work_queue

def shutdown(self, wait=True, *, cancel_futures=False):
with self._shutdown_lock:
self._shutdown = True
if cancel_futures:
# Drain all work items from the queue, and then cancel their
# associated futures.
while True:
try:
work_item = self._work_queue.get_nowait()
except queue.Empty:
break
if work_item is not None:
work_item.future.cancel()

# Send a wake-up to prevent threads calling
# _work_queue.get(block=True) from permanently blocking.
self._work_queue.put(None)
if wait:
for t in self._threads:
t.join()
7 changes: 2 additions & 5 deletions src/extrainterpreters/piped_interpreter.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ class FuncData(StructBase):
def _dispatcher(pipe, buffer):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is in a very early stage - but the concept looks fine: a looping listener function in each interpreter tied to a queue which will send different tasks - which could be run a function, spin a thread, and so on.

I am thinking of doing away with the "PipedInterpreter" altogether, and have an optional post-start call which would start such a listenner function. The InterpreterPoolExecutor would do it automatically.

Just let me know if you have a different idea to approach this.

"""the core running function in a PipedInterpreter

This is responsible for watching comunications with the parent
interpreter, dispathing execution, and place the return values
This is responsible for watching communications with the parent
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as you can see, my code editor does not spell check English - :-) I always have a hard time with doubled consonants.

interpreter, dispatching execution, and place the return values
"""

while True:
Expand Down Expand Up @@ -113,6 +113,3 @@ def _close_channel(self):
self.pipe.read(timeout=None)
self.pipe.close()
super()._close_channel()



4 changes: 2 additions & 2 deletions src/extrainterpreters/simple_interpreter.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,9 @@ class SimpleInterpreter(_BufferedInterpreter):
This implementation uses a memory area (by default of 10MB), to send pickled objects back and fort at fixed offsets.
"""

def __init__(self, target=None, args=(), kwargs=None):
def __init__(self, name=None, target=None, args=(), kwargs=None):
kwargs = kwargs or {}
super().__init__()
super().__init__(name=name)
self.target = target
self._args = args
self._kwargs = kwargs
Expand Down