From 214334ffe66a023eadef8cd9cb09a593bf750ca2 Mon Sep 17 00:00:00 2001 From: matyhtf Date: Thu, 9 Jan 2025 17:47:20 +0800 Subject: [PATCH] Add `max_wait_time` setting for the process pool to forcibly terminate worker processes after they time out. --- ext-src/php_swoole_cxx.cc | 33 ++++++++++++++ ext-src/php_swoole_cxx.h | 27 +++-------- ext-src/php_swoole_private.h | 2 + ext-src/php_swoole_process.h | 2 + ext-src/swoole_process_pool.cc | 32 ++++++++++++- ext-src/swoole_server.cc | 2 +- ext-src/swoole_timer.cc | 17 +++++-- include/swoole_process_pool.h | 9 ++++ src/core/timer.cc | 5 +- src/os/process_pool.cc | 31 +++++++++---- tests/swoole_process_pool/max_wait_time.phpt | 48 ++++++++++++++++++++ 11 files changed, 171 insertions(+), 37 deletions(-) create mode 100644 tests/swoole_process_pool/max_wait_time.phpt diff --git a/ext-src/php_swoole_cxx.cc b/ext-src/php_swoole_cxx.cc index 99702b7266e..54bee9168ea 100644 --- a/ext-src/php_swoole_cxx.cc +++ b/ext-src/php_swoole_cxx.cc @@ -10,6 +10,12 @@ static const char *sw_known_strings[] = { SW_API zend_string **sw_zend_known_strings = nullptr; +SW_API zend_refcounted *sw_refcount_ptr; + +zend_refcounted *sw_get_refcount_ptr(zval *value) { + return (sw_refcount_ptr = value->value.counted); +} + //----------------------------------known string------------------------------------ namespace zend { void known_strings_init(void) { @@ -66,4 +72,31 @@ Variable call(const std::string &func_name, int argc, zval *argv) { } } // namespace function + +Callable::Callable(zval *_zfn) { + ZVAL_UNDEF(&zfn); + if (!zval_is_true(_zfn)) { + php_swoole_fatal_error(E_WARNING, "illegal callback function"); + return; + } + if (!sw_zend_is_callable_ex(_zfn, nullptr, 0, &fn_name, nullptr, &fcc, nullptr)) { + php_swoole_fatal_error(E_WARNING, "function '%s' is not callable", fn_name); + return; + } + zfn = *_zfn; + zval_add_ref(&zfn); +} + +Callable::~Callable() { + if (!ZVAL_IS_UNDEF(&zfn)) { + zval_ptr_dtor(&zfn); + } + if (fn_name) { + efree(fn_name); + } +} + +uint32_t Callable::refcount() { + return zval_refcount_p(&zfn); +} } // namespace zend diff --git a/ext-src/php_swoole_cxx.h b/ext-src/php_swoole_cxx.h index a63a8c4dbbf..c3b19b27ccb 100644 --- a/ext-src/php_swoole_cxx.h +++ b/ext-src/php_swoole_cxx.h @@ -593,18 +593,12 @@ class Callable { Callable() {} public: - Callable(zval *_zfn) { - ZVAL_UNDEF(&zfn); - if (!zval_is_true(_zfn)) { - php_swoole_fatal_error(E_WARNING, "illegal callback function"); - return; - } - if (!sw_zend_is_callable_ex(_zfn, nullptr, 0, &fn_name, nullptr, &fcc, nullptr)) { - php_swoole_fatal_error(E_WARNING, "function '%s' is not callable", fn_name); - return; - } - zfn = *_zfn; - zval_add_ref(&zfn); + Callable(zval *_zfn); + ~Callable(); + uint32_t refcount(); + + zend_refcounted *refcount_ptr() { + return sw_get_refcount_ptr(&zfn); } zend_fcall_info_cache *ptr() { @@ -629,15 +623,6 @@ class Callable { bool call(uint32_t argc, zval *argv, zval *retval) { return sw_zend_call_function_ex(&zfn, &fcc, argc, argv, retval) == SUCCESS; } - - ~Callable() { - if (!ZVAL_IS_UNDEF(&zfn)) { - zval_ptr_dtor(&zfn); - } - if (fn_name) { - efree(fn_name); - } - } }; #define _CONCURRENCY_HASHMAP_LOCK_(code) \ diff --git a/ext-src/php_swoole_private.h b/ext-src/php_swoole_private.h index ee931acb5e0..049c898e8c9 100644 --- a/ext-src/php_swoole_private.h +++ b/ext-src/php_swoole_private.h @@ -523,6 +523,8 @@ static inline size_t sw_active_thread_count(void) { } #endif +zend_refcounted *sw_get_refcount_ptr(zval *value); + void sw_php_exit(int status); void sw_php_print_backtrace(zend_long cid = 0, zend_long options = 0, diff --git a/ext-src/php_swoole_process.h b/ext-src/php_swoole_process.h index 0dd30a41292..7717ca7ead6 100644 --- a/ext-src/php_swoole_process.h +++ b/ext-src/php_swoole_process.h @@ -24,3 +24,5 @@ void php_swoole_process_clean(); int php_swoole_process_start(swoole::Worker *process, zval *zobject); swoole::Worker *php_swoole_process_get_worker(zval *zobject); void php_swoole_process_set_worker(zval *zobject, swoole::Worker *worker); + +swoole::ProcessPool *sw_process_pool(); diff --git a/ext-src/swoole_process_pool.cc b/ext-src/swoole_process_pool.cc index 30061694b7c..fcf02d3a94c 100644 --- a/ext-src/swoole_process_pool.cc +++ b/ext-src/swoole_process_pool.cc @@ -169,6 +169,9 @@ static void process_pool_onWorkerStart(ProcessPool *pool, Worker *worker) { zend_update_property_long(swoole_process_pool_ce, SW_Z8_OBJ_P(zobject), ZEND_STRL("workerPid"), getpid()); zend_update_property_long(swoole_process_pool_ce, SW_Z8_OBJ_P(zobject), ZEND_STRL("workerId"), worker->id); + swoole_set_process_type(SW_PROCESS_WORKER); + SwooleG.enable_coroutine = pp->enable_coroutine; + if (pp->onWorkerStart) { zval args[2]; args[0] = *zobject; @@ -259,6 +262,9 @@ static void process_pool_onStart(ProcessPool *pool) { zend_update_property_long(swoole_process_pool_ce, SW_Z8_OBJ_P(zobject), ZEND_STRL("master_pid"), getpid()); zend_update_property_bool(swoole_process_pool_ce, SW_Z8_OBJ_P(zobject), ZEND_STRL("running"), true); + swoole_set_process_type(SW_PROCESS_MASTER); + SwooleG.enable_coroutine = false; + if (pp->onStart == nullptr) { return; } @@ -312,6 +318,10 @@ static void process_pool_signal_handler(int sig) { } } +ProcessPool *sw_process_pool() { + return current_pool; +} + static PHP_METHOD(swoole_process_pool, __construct) { zval *zobject = ZEND_THIS; zend_long worker_num; @@ -321,15 +331,31 @@ static PHP_METHOD(swoole_process_pool, __construct) { // only cli env if (!SWOOLE_G(cli)) { + swoole_set_last_error(SW_ERROR_OPERATION_NOT_SUPPORT); zend_throw_error(NULL, "%s can only be used in PHP CLI mode", SW_Z_OBJCE_NAME_VAL_P(zobject)); RETURN_FALSE; } if (sw_server()) { - zend_throw_error(NULL, "%s cannot use in server process", SW_Z_OBJCE_NAME_VAL_P(zobject)); + swoole_set_last_error(SW_ERROR_OPERATION_NOT_SUPPORT); + zend_throw_error(NULL, "cannot create server and process pool instances simultaneously"); RETURN_FALSE; } + if (sw_process_pool()) { + swoole_set_last_error(SW_ERROR_OPERATION_NOT_SUPPORT); + zend_throw_error(NULL, "A process pool instance has already been created and cannot be created again"); + RETURN_FALSE; + } + +#ifdef SW_THREAD + if (!tsrm_is_main_thread()) { + swoole_set_last_error(SW_ERROR_OPERATION_NOT_SUPPORT); + zend_throw_exception_ex(swoole_exception_ce, -1, "This operation is only allowed in the main thread"); + RETURN_FALSE; + } +#endif + if (zend_parse_parameters_throw(ZEND_NUM_ARGS(), "l|llb", &worker_num, &ipc_type, &msgq_key, &enable_coroutine) == FAILURE) { RETURN_FALSE; @@ -390,6 +416,10 @@ static PHP_METHOD(swoole_process_pool, set) { if (php_swoole_array_get_value(vht, "max_package_size", ztmp)) { pool->set_max_packet_size(php_swoole_parse_to_size(ztmp)); } + if (php_swoole_array_get_value(vht, "max_wait_time", ztmp)) { + zend_long v = zval_get_long(ztmp); + pool->max_wait_time = SW_MAX(0, SW_MIN(v, UINT32_MAX)); + } } static PHP_METHOD(swoole_process_pool, on) { diff --git a/ext-src/swoole_server.cc b/ext-src/swoole_server.cc index 3e7a00fc726..da8d55951b9 100644 --- a/ext-src/swoole_server.cc +++ b/ext-src/swoole_server.cc @@ -2659,7 +2659,7 @@ static PHP_METHOD(swoole_server, start) { RETURN_FALSE; } - if (SwooleTG.reactor) { + if (sw_reactor()) { php_swoole_fatal_error( E_WARNING, "eventLoop has already been created, unable to start %s", SW_Z_OBJCE_NAME_VAL_P(zserv)); RETURN_FALSE; diff --git a/ext-src/swoole_timer.cc b/ext-src/swoole_timer.cc index 750898df568..1011cb624bd 100644 --- a/ext-src/swoole_timer.cc +++ b/ext-src/swoole_timer.cc @@ -17,6 +17,7 @@ */ #include "php_swoole_cxx.h" +#include "php_swoole_process.h" #include "swoole_server.h" @@ -148,6 +149,18 @@ static void timer_callback(Timer *timer, TimerNode *tnode) { } } +static bool timer_if_use_reactor() { + auto server = sw_server(); + if (server) { + return server->is_user_worker() || (server->is_task_worker() && server->task_enable_coroutine); + } + auto process_pool = sw_process_pool(); + if (process_pool) { + return !process_pool->is_master(); + } + return true; +} + static void timer_add(INTERNAL_FUNCTION_PARAMETERS, bool persistent) { zend_long ms; Function *fci = (Function *) ecalloc(1, sizeof(Function)); @@ -166,9 +179,7 @@ static void timer_add(INTERNAL_FUNCTION_PARAMETERS, bool persistent) { RETURN_FALSE; } - // no server || user worker || task process with async mode - if (!sw_server() || sw_server()->is_user_worker() || - (sw_server()->is_task_worker() && sw_server()->task_enable_coroutine)) { + if (UNEXPECTED(!sw_reactor() && timer_if_use_reactor())) { php_swoole_check_reactor(); } diff --git a/include/swoole_process_pool.h b/include/swoole_process_pool.h index 827f5ee0268..9418a19d644 100644 --- a/include/swoole_process_pool.h +++ b/include/swoole_process_pool.h @@ -327,6 +327,14 @@ struct ProcessPool { max_packet_size_ = _max_packet_size; } + bool is_master() { + return swoole_get_process_type() == SW_PROCESS_MASTER; + } + + bool is_worker() { + return swoole_get_process_type() == SW_PROCESS_WORKER; + } + void set_protocol(enum ProtocolType _protocol_type); void set_max_request(uint32_t _max_request, uint32_t _max_request_grace); @@ -339,6 +347,7 @@ struct ProcessPool { bool reload(); pid_t spawn(Worker *worker); void stop(Worker *worker); + void kill_all_workers(int signo = SIGKILL); swResultCode dispatch(EventData *data, int *worker_id); int response(const char *data, int length); swResultCode dispatch_blocking(EventData *data, int *dst_worker_id); diff --git a/src/core/timer.cc b/src/core/timer.cc index 748480aafcc..21798968cba 100644 --- a/src/core/timer.cc +++ b/src/core/timer.cc @@ -69,9 +69,7 @@ bool Timer::init() { } bool Timer::init_with_user_scheduler(const TimerScheduler &scheduler) { - set = [&scheduler](Timer *timer, long exec_msec) -> int { - return scheduler(timer, exec_msec); - }; + set = [&scheduler](Timer *timer, long exec_msec) -> int { return scheduler(timer, exec_msec); }; close = [&scheduler](Timer *timer) { scheduler(timer, -1); }; return true; } @@ -236,6 +234,7 @@ int Timer::select() { heap.pop(); map.erase(tnode->id); delete tnode; + tnode = nullptr; } if (!tnode || !tmp) { diff --git a/src/os/process_pool.cc b/src/os/process_pool.cc index 77233d2e509..634be9ac1ac 100644 --- a/src/os/process_pool.cc +++ b/src/os/process_pool.cc @@ -122,6 +122,7 @@ int ProcessPool::create(uint32_t _worker_num, key_t _msgqueue_key, swIPCMode _ip main_loop = run_with_task_protocol; protocol_type_ = SW_PROTOCOL_TASK; max_packet_size_ = SW_INPUT_BUFFER_SIZE; + max_wait_time = SW_WORKER_MAX_WAIT_TIME; SW_LOOP_N(_worker_num) { workers[i].pool = this; @@ -444,6 +445,12 @@ void ProcessPool::stop(Worker *worker) { } } +void ProcessPool::kill_all_workers(int signo) { + SW_LOOP_N(worker_num) { + swoole_kill(workers[i].pid, signo); + } +} + void ProcessPool::shutdown() { uint32_t i; int status; @@ -462,10 +469,20 @@ void ProcessPool::shutdown() { continue; } } + if (max_wait_time) { + swoole_timer_add((long) max_wait_time * 1000, false, [this](Timer *, TimerNode *) { kill_all_workers(); }); + } for (i = 0; i < worker_num; i++) { worker = &workers[i]; - if (swoole_waitpid(worker->pid, &status, 0) < 0) { - swoole_sys_warning("waitpid(%d) failed", worker->pid); + SW_LOOP { + if (waitpid(worker->pid, &status, 0) < 0) { + if (errno == EINTR) { + sw_timer()->select(); + continue; + } + swoole_sys_warning("waitpid(%d) failed", worker->pid); + } + break; } } started = false; @@ -853,7 +870,6 @@ bool ProcessPool::detach() { int ProcessPool::wait() { pid_t new_pid, reload_worker_pid = 0; - int ret; while (running) { ExitStatus exit_status = wait_process(); @@ -892,7 +908,7 @@ int ProcessPool::wait() { } if (!reloading) { if (errno > 0 && errno != EINTR) { - swoole_sys_warning("[Manager] wait failed"); + swoole_sys_warning("wait() failed"); } continue; } else { @@ -913,7 +929,7 @@ int ProcessPool::wait() { if (onWorkerNotFound) { onWorkerNotFound(this, exit_status); } else { - swoole_warning("[Manager]unknown worker[pid=%d]", exit_status.get_pid()); + swoole_warning("unknown worker[pid=%d]", exit_status.get_pid()); } continue; } @@ -944,13 +960,12 @@ int ProcessPool::wait() { continue; } reload_worker_pid = reload_workers[reload_worker_i].pid; - ret = swoole_kill(reload_worker_pid, SIGTERM); - if (ret < 0) { + if (swoole_kill(reload_worker_pid, SIGTERM) < 0) { if (errno == ECHILD) { reload_worker_i++; goto _kill_worker; } - swoole_sys_warning("[Manager]swKill(%d) failed", reload_workers[reload_worker_i].pid); + swoole_sys_warning("kill(%d) failed", reload_workers[reload_worker_i].pid); continue; } } diff --git a/tests/swoole_process_pool/max_wait_time.phpt b/tests/swoole_process_pool/max_wait_time.phpt new file mode 100644 index 00000000000..b437dddc551 --- /dev/null +++ b/tests/swoole_process_pool/max_wait_time.phpt @@ -0,0 +1,48 @@ +--TEST-- +swoole_process_pool: max wait time +--SKIPIF-- + +--FILE-- +set([ + Constant::OPTION_ENABLE_COROUTINE => true, + Constant::OPTION_MAX_WAIT_TIME => 1, + ]); + + $pool->on('workerStart', function (Pool $pool, int $workerId) use ($atomic): void { + echo "workerStart: $workerId" . PHP_EOL; + $atomic->wait(-1); + }); + + $pool->on('start', function () use ($pool): void { + Timer::after(500, function () use ($pool): void { + $pool->shutdown(); + }); + echo 'start' . PHP_EOL; + }); + + $pool->on('shutdown', function () use ($atomic): void { + echo 'shutdown' . PHP_EOL; + }); + + $pool->start(); +})(); +?> +--EXPECTF-- +start +workerStart: %d +workerStart: %d +workerStart: %d +workerStart: %d +shutdown