Skip to content

Commit

Permalink
pythongh-110693: Pending Calls Machinery Cleanups (pythongh-118296)
Browse files Browse the repository at this point in the history
This does some cleanup in preparation for later changes.
  • Loading branch information
ericsnowcurrently authored Apr 26, 2024
1 parent d5df252 commit 09c2947
Show file tree
Hide file tree
Showing 7 changed files with 314 additions and 112 deletions.
6 changes: 5 additions & 1 deletion Include/internal/pycore_ceval.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,12 @@ extern void _PyEval_SignalReceived(void);
#define _Py_PENDING_MAINTHREADONLY 1
#define _Py_PENDING_RAWFREE 2

typedef int _Py_add_pending_call_result;
#define _Py_ADD_PENDING_SUCCESS 0
#define _Py_ADD_PENDING_FULL -1

// Export for '_testinternalcapi' shared extension
PyAPI_FUNC(int) _PyEval_AddPendingCall(
PyAPI_FUNC(_Py_add_pending_call_result) _PyEval_AddPendingCall(
PyInterpreterState *interp,
_Py_pending_call_func func,
void *arg,
Expand Down
52 changes: 43 additions & 9 deletions Include/internal/pycore_ceval_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,56 @@ extern "C" {

typedef int (*_Py_pending_call_func)(void *);

struct _pending_call {
_Py_pending_call_func func;
void *arg;
int flags;
};

#define PENDINGCALLSARRAYSIZE 32

#define MAXPENDINGCALLS PENDINGCALLSARRAYSIZE
/* For interpreter-level pending calls, we want to avoid spending too
much time on pending calls in any one thread, so we apply a limit. */
#if MAXPENDINGCALLS > 100
# define MAXPENDINGCALLSLOOP 100
#else
# define MAXPENDINGCALLSLOOP MAXPENDINGCALLS
#endif

#define MAXPENDINGCALLS_MAIN PENDINGCALLSARRAYSIZE
/* For the main thread, we want to make sure all pending calls are
run at once, for the sake of prompt signal handling. This is
unlikely to cause any problems since there should be very few
pending calls for the main thread. */
#define MAXPENDINGCALLSLOOP_MAIN 0

struct _pending_calls {
int busy;
PyMutex mutex;
/* Request for running pending calls. */
int32_t calls_to_do;
#define NPENDINGCALLS 32
struct _pending_call {
_Py_pending_call_func func;
void *arg;
int flags;
} calls[NPENDINGCALLS];
int32_t npending;
/* The maximum allowed number of pending calls.
If the queue fills up to this point then _PyEval_AddPendingCall()
will return _Py_ADD_PENDING_FULL. */
int32_t max;
/* We don't want a flood of pending calls to interrupt any one thread
for too long, so we keep a limit on the number handled per pass.
A value of 0 means there is no limit (other than the maximum
size of the list of pending calls). */
int32_t maxloop;
struct _pending_call calls[PENDINGCALLSARRAYSIZE];
int first;
int last;
int next;
};


typedef enum {
PERF_STATUS_FAILED = -1, // Perf trampoline is in an invalid state
PERF_STATUS_NO_INIT = 0, // Perf trampoline is not initialized
PERF_STATUS_OK = 1, // Perf trampoline is ready to be executed
} perf_status_t;


#ifdef PY_HAVE_PERF_TRAMPOLINE
struct code_arena_st;

Expand All @@ -48,6 +76,7 @@ struct trampoline_api_st {
};
#endif


struct _ceval_runtime_state {
struct {
#ifdef PY_HAVE_PERF_TRAMPOLINE
Expand All @@ -62,10 +91,15 @@ struct _ceval_runtime_state {
#endif
} perf;
/* Pending calls to be made only on the main thread. */
// The signal machinery falls back on this
// so it must be especially stable and efficient.
// For example, we use a preallocated array
// for the list of pending calls.
struct _pending_calls pending_mainthread;
PyMutex sys_trace_profile_mutex;
};


#ifdef PY_HAVE_PERF_TRAMPOLINE
# define _PyEval_RUNTIME_PERF_INIT \
{ \
Expand Down
8 changes: 8 additions & 0 deletions Include/internal/pycore_runtime_init.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ extern PyTypeObject _PyExc_MemoryError;
.autoTSSkey = Py_tss_NEEDS_INIT, \
.parser = _parser_runtime_state_INIT, \
.ceval = { \
.pending_mainthread = { \
.max = MAXPENDINGCALLS_MAIN, \
.maxloop = MAXPENDINGCALLSLOOP_MAIN, \
}, \
.perf = _PyEval_RUNTIME_PERF_INIT, \
}, \
.gilstate = { \
Expand Down Expand Up @@ -166,6 +170,10 @@ extern PyTypeObject _PyExc_MemoryError;
.imports = IMPORTS_INIT, \
.ceval = { \
.recursion_limit = Py_DEFAULT_RECURSION_LIMIT, \
.pending = { \
.max = MAXPENDINGCALLS, \
.maxloop = MAXPENDINGCALLSLOOP, \
}, \
}, \
.gc = { \
.enabled = 1, \
Expand Down
85 changes: 70 additions & 15 deletions Lib/test/test_capi/test_misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -1172,6 +1172,12 @@ class MyType:
self.assertEqual(get_type_fullyqualname(MyType), 'my_qualname')


def test_gen_get_code(self):
def genf(): yield
gen = genf()
self.assertEqual(_testcapi.gen_get_code(gen), gen.gi_code)


@requires_limited_api
class TestHeapTypeRelative(unittest.TestCase):
"""Test API for extending opaque types (PEP 697)"""
Expand Down Expand Up @@ -1452,7 +1458,7 @@ class TestPendingCalls(unittest.TestCase):
# about when pending calls get run. This is especially relevant
# here for creating deterministic tests.

def pendingcalls_submit(self, l, n):
def main_pendingcalls_submit(self, l, n):
def callback():
#this function can be interrupted by thread switching so let's
#use an atomic operation
Expand All @@ -1467,12 +1473,27 @@ def callback():
if _testcapi._pending_threadfunc(callback):
break

def pendingcalls_wait(self, l, n, context = None):
def pendingcalls_submit(self, l, n, *, main=True, ensure=False):
def callback():
#this function can be interrupted by thread switching so let's
#use an atomic operation
l.append(None)

if main:
return _testcapi._pending_threadfunc(callback, n,
blocking=False,
ensure_added=ensure)
else:
return _testinternalcapi.pending_threadfunc(callback, n,
blocking=False,
ensure_added=ensure)

def pendingcalls_wait(self, l, numadded, context = None):
#now, stick around until l[0] has grown to 10
count = 0
while len(l) != n:
while len(l) != numadded:
#this busy loop is where we expect to be interrupted to
#run our callbacks. Note that callbacks are only run on the
#run our callbacks. Note that some callbacks are only run on the
#main thread
if False and support.verbose:
print("(%i)"%(len(l),),)
Expand All @@ -1482,12 +1503,12 @@ def pendingcalls_wait(self, l, n, context = None):
continue
count += 1
self.assertTrue(count < 10000,
"timeout waiting for %i callbacks, got %i"%(n, len(l)))
"timeout waiting for %i callbacks, got %i"%(numadded, len(l)))
if False and support.verbose:
print("(%i)"%(len(l),))

@threading_helper.requires_working_threading()
def test_pendingcalls_threaded(self):
def test_main_pendingcalls_threaded(self):

#do every callback on a separate thread
n = 32 #total callbacks
Expand All @@ -1501,15 +1522,15 @@ class foo(object):pass
context.lock = threading.Lock()
context.event = threading.Event()

threads = [threading.Thread(target=self.pendingcalls_thread,
threads = [threading.Thread(target=self.main_pendingcalls_thread,
args=(context,))
for i in range(context.nThreads)]
with threading_helper.start_threads(threads):
self.pendingcalls_wait(context.l, n, context)

def pendingcalls_thread(self, context):
def main_pendingcalls_thread(self, context):
try:
self.pendingcalls_submit(context.l, context.n)
self.main_pendingcalls_submit(context.l, context.n)
finally:
with context.lock:
context.nFinished += 1
Expand All @@ -1519,20 +1540,54 @@ def pendingcalls_thread(self, context):
if nFinished == context.nThreads:
context.event.set()

def test_pendingcalls_non_threaded(self):
def test_main_pendingcalls_non_threaded(self):
#again, just using the main thread, likely they will all be dispatched at
#once. It is ok to ask for too many, because we loop until we find a slot.
#the loop can be interrupted to dispatch.
#there are only 32 dispatch slots, so we go for twice that!
l = []
n = 64
self.pendingcalls_submit(l, n)
self.main_pendingcalls_submit(l, n)
self.pendingcalls_wait(l, n)

def test_gen_get_code(self):
def genf(): yield
gen = genf()
self.assertEqual(_testcapi.gen_get_code(gen), gen.gi_code)
def test_max_pending(self):
with self.subTest('main-only'):
maxpending = 32

l = []
added = self.pendingcalls_submit(l, 1, main=True)
self.pendingcalls_wait(l, added)
self.assertEqual(added, 1)

l = []
added = self.pendingcalls_submit(l, maxpending, main=True)
self.pendingcalls_wait(l, added)
self.assertEqual(added, maxpending)

l = []
added = self.pendingcalls_submit(l, maxpending+1, main=True)
self.pendingcalls_wait(l, added)
self.assertEqual(added, maxpending)

with self.subTest('not main-only'):
# Per-interpreter pending calls has the same low limit
# on how many may be pending at a time.
maxpending = 32

l = []
added = self.pendingcalls_submit(l, 1, main=False)
self.pendingcalls_wait(l, added)
self.assertEqual(added, 1)

l = []
added = self.pendingcalls_submit(l, maxpending, main=False)
self.pendingcalls_wait(l, added)
self.assertEqual(added, maxpending)

l = []
added = self.pendingcalls_submit(l, maxpending+1, main=False)
self.pendingcalls_wait(l, added)
self.assertEqual(added, maxpending)

class PendingTask(types.SimpleNamespace):

Expand Down
53 changes: 42 additions & 11 deletions Modules/_testcapimodule.c
Original file line number Diff line number Diff line change
Expand Up @@ -819,25 +819,55 @@ static int _pending_callback(void *arg)
* run from any python thread.
*/
static PyObject *
pending_threadfunc(PyObject *self, PyObject *arg)
pending_threadfunc(PyObject *self, PyObject *arg, PyObject *kwargs)
{
static char *kwlist[] = {"callback", "num",
"blocking", "ensure_added", NULL};
PyObject *callable;
int r;
if (PyArg_ParseTuple(arg, "O", &callable) == 0)
unsigned int num = 1;
int blocking = 0;
int ensure_added = 0;
if (!PyArg_ParseTupleAndKeywords(arg, kwargs,
"O|I$pp:_pending_threadfunc", kwlist,
&callable, &num, &blocking, &ensure_added))
{
return NULL;
}

/* create the reference for the callbackwhile we hold the lock */
Py_INCREF(callable);
for (unsigned int i = 0; i < num; i++) {
Py_INCREF(callable);
}

Py_BEGIN_ALLOW_THREADS
r = Py_AddPendingCall(&_pending_callback, callable);
Py_END_ALLOW_THREADS
PyThreadState *save_tstate = NULL;
if (!blocking) {
save_tstate = PyEval_SaveThread();
}

unsigned int num_added = 0;
for (; num_added < num; num_added++) {
if (ensure_added) {
int r;
do {
r = Py_AddPendingCall(&_pending_callback, callable);
} while (r < 0);
}
else {
if (Py_AddPendingCall(&_pending_callback, callable) < 0) {
break;
}
}
}

if (!blocking) {
PyEval_RestoreThread(save_tstate);
}

if (r<0) {
for (unsigned int i = num_added; i < num; i++) {
Py_DECREF(callable); /* unsuccessful add, destroy the extra reference */
Py_RETURN_FALSE;
}
Py_RETURN_TRUE;
/* The callable is decref'ed above in each added _pending_callback(). */
return PyLong_FromUnsignedLong((unsigned long)num_added);
}

/* Test PyOS_string_to_double. */
Expand Down Expand Up @@ -3232,7 +3262,8 @@ static PyMethodDef TestMethods[] = {
{"_spawn_pthread_waiter", spawn_pthread_waiter, METH_NOARGS},
{"_end_spawned_pthread", end_spawned_pthread, METH_NOARGS},
#endif
{"_pending_threadfunc", pending_threadfunc, METH_VARARGS},
{"_pending_threadfunc", _PyCFunction_CAST(pending_threadfunc),
METH_VARARGS|METH_KEYWORDS},
#ifdef HAVE_GETTIMEOFDAY
{"profile_int", profile_int, METH_NOARGS},
#endif
Expand Down
Loading

0 comments on commit 09c2947

Please sign in to comment.