diff --git a/ext-src/php_swoole_cxx.h b/ext-src/php_swoole_cxx.h index 4031a9707c7..403057bd436 100644 --- a/ext-src/php_swoole_cxx.h +++ b/ext-src/php_swoole_cxx.h @@ -737,6 +737,12 @@ static inline void array_unset(zval *arg, const char *key, size_t l_key) { zend_hash_str_del(Z_ARRVAL_P(arg), key, l_key); } +static inline zend_long object_get_long(zval *obj, zend_string *key) { + static zval rv; + zval *property = zend_read_property_ex(Z_OBJCE_P(obj), Z_OBJ_P(obj), key, 1, &rv); + return property ? zval_get_long(property) : 0; +} + static inline zend_long object_get_long(zval *obj, const char *key, size_t l_key) { static zval rv; zval *property = zend_read_property(Z_OBJCE_P(obj), Z_OBJ_P(obj), key, l_key, 1, &rv); diff --git a/ext-src/swoole_process_pool.cc b/ext-src/swoole_process_pool.cc index f3c0e444c1d..30061694b7c 100644 --- a/ext-src/swoole_process_pool.cc +++ b/ext-src/swoole_process_pool.cc @@ -34,8 +34,10 @@ static Worker *current_worker = nullptr; struct ProcessPoolObject { ProcessPool *pool; zend::Callable *onStart; + zend::Callable *onShutdown; zend::Callable *onWorkerStart; zend::Callable *onWorkerStop; + zend::Callable *onWorkerExit; zend::Callable *onMessage; zend_bool enable_coroutine; zend_bool enable_message_bus; @@ -86,6 +88,12 @@ static void process_pool_free_object(zend_object *object) { if (pp->onStart) { sw_callable_free(pp->onStart); } + if (pp->onWorkerExit) { + sw_callable_free(pp->onWorkerExit); + } + if (pp->onShutdown) { + sw_callable_free(pp->onShutdown); + } zend_object_std_dtor(object); } @@ -141,7 +149,11 @@ void php_swoole_process_pool_minit(int module_number) { swoole_process_pool, process_pool_create_object, process_pool_free_object, ProcessPoolObject, std); zend_declare_property_long(swoole_process_pool_ce, ZEND_STRL("master_pid"), -1, ZEND_ACC_PUBLIC); + zend_declare_property_long(swoole_process_pool_ce, ZEND_STRL("workerPid"), -1, ZEND_ACC_PUBLIC); + zend_declare_property_long(swoole_process_pool_ce, ZEND_STRL("workerId"), -1, ZEND_ACC_PUBLIC); zend_declare_property_null(swoole_process_pool_ce, ZEND_STRL("workers"), ZEND_ACC_PUBLIC); + zend_declare_property_bool(swoole_process_pool_ce, ZEND_STRL("workerRunning"), -1, ZEND_ACC_PUBLIC); + zend_declare_property_bool(swoole_process_pool_ce, ZEND_STRL("running"), -1, ZEND_ACC_PUBLIC); } static void process_pool_onWorkerStart(ProcessPool *pool, Worker *worker) { @@ -152,19 +164,22 @@ static void process_pool_onWorkerStart(ProcessPool *pool, Worker *worker) { current_pool = pool; current_worker = worker; - if (pp->onMessage) { - swoole_signal_set(SIGTERM, process_pool_signal_handler); - } + zend_update_property_bool(swoole_process_pool_ce, SW_Z8_OBJ_P(zobject), ZEND_STRL("running"), true); + zend_update_property_bool(swoole_process_pool_ce, SW_Z8_OBJ_P(zobject), ZEND_STRL("workerRunning"), true); + zend_update_property_long(swoole_process_pool_ce, SW_Z8_OBJ_P(zobject), ZEND_STRL("workerPid"), getpid()); + zend_update_property_long(swoole_process_pool_ce, SW_Z8_OBJ_P(zobject), ZEND_STRL("workerId"), worker->id); - if (!pp->onWorkerStart) { - return; + if (pp->onWorkerStart) { + zval args[2]; + args[0] = *zobject; + ZVAL_LONG(&args[1], worker->id); + if (UNEXPECTED(!zend::function::call(pp->onWorkerStart->ptr(), 2, args, nullptr, pp->enable_coroutine))) { + php_swoole_error(E_WARNING, "%s->onWorkerStart handler error", SW_Z_OBJCE_NAME_VAL_P(zobject)); + } } - zval args[2]; - args[0] = *zobject; - ZVAL_LONG(&args[1], worker->id); - if (UNEXPECTED(!zend::function::call(pp->onWorkerStart->ptr(), 2, args, nullptr, pp->enable_coroutine))) { - php_swoole_error(E_WARNING, "%s->onWorkerStart handler error", SW_Z_OBJCE_NAME_VAL_P(zobject)); + if (!swoole_signal_isset(SIGTERM) && (pp->onMessage || pp->enable_coroutine)) { + swoole_signal_set(SIGTERM, process_pool_signal_handler); } } @@ -201,6 +216,9 @@ static void process_pool_onWorkerStop(ProcessPool *pool, Worker *worker) { ProcessPoolObject *pp = process_pool_fetch_object(zobject); zval args[2]; + zend_update_property_bool(swoole_process_pool_ce, SW_Z8_OBJ_P(zobject), ZEND_STRL("running"), false); + zend_update_property_bool(swoole_process_pool_ce, SW_Z8_OBJ_P(zobject), ZEND_STRL("workerRunning"), false); + if (pp->onWorkerStop == nullptr) { return; } @@ -213,6 +231,63 @@ static void process_pool_onWorkerStop(ProcessPool *pool, Worker *worker) { } } +static void process_pool_onWorkerExit(ProcessPool *pool, Worker *worker) { + zval *zobject = (zval *) pool->ptr; + ProcessPoolObject *pp = process_pool_fetch_object(zobject); + zval args[2]; + + zend_update_property_bool(swoole_process_pool_ce, SW_Z8_OBJ_P(zobject), ZEND_STRL("running"), false); + zend_update_property_bool(swoole_process_pool_ce, SW_Z8_OBJ_P(zobject), ZEND_STRL("workerRunning"), false); + + if (pp->onWorkerExit == nullptr) { + return; + } + + args[0] = *zobject; + ZVAL_LONG(&args[1], worker->id); + + if (UNEXPECTED(!zend::function::call(pp->onWorkerExit->ptr(), 2, args, nullptr, false))) { + php_swoole_error(E_WARNING, "%s->onWorkerExit handler error", SW_Z_OBJCE_NAME_VAL_P(zobject)); + } +} + +static void process_pool_onStart(ProcessPool *pool) { + zval *zobject = (zval *) pool->ptr; + ProcessPoolObject *pp = process_pool_fetch_object(zobject); + zval args[1]; + + zend_update_property_long(swoole_process_pool_ce, SW_Z8_OBJ_P(zobject), ZEND_STRL("master_pid"), getpid()); + zend_update_property_bool(swoole_process_pool_ce, SW_Z8_OBJ_P(zobject), ZEND_STRL("running"), true); + + if (pp->onStart == nullptr) { + return; + } + + args[0] = *zobject; + if (UNEXPECTED(!zend::function::call(pp->onStart->ptr(), 1, args, nullptr, false))) { + php_swoole_error(E_WARNING, "%s->onStart handler error", SW_Z_OBJCE_NAME_VAL_P(zobject)); + } +} + +static void process_pool_onShutdown(ProcessPool *pool) { + zval *zobject = (zval *) pool->ptr; + ProcessPoolObject *pp = process_pool_fetch_object(zobject); + zval args[1]; + + zend_update_property_bool(swoole_process_pool_ce, SW_Z8_OBJ_P(zobject), ZEND_STRL("running"), false); + zend_update_property_bool(swoole_process_pool_ce, SW_Z8_OBJ_P(zobject), ZEND_STRL("workerRunning"), false); + + if (pp->onShutdown == nullptr) { + return; + } + + args[0] = *zobject; + + if (UNEXPECTED(!zend::function::call(pp->onShutdown->ptr(), 1, args, nullptr, false))) { + php_swoole_error(E_WARNING, "%s->onShutdown handler error", SW_Z_OBJCE_NAME_VAL_P(zobject)); + } +} + static void process_pool_signal_handler(int sig) { if (!current_pool) { return; @@ -343,7 +418,8 @@ static PHP_METHOD(swoole_process_pool, on) { pp->onWorkerStart = sw_callable_create(zfn); } else if (SW_STRCASEEQ(name, l_name, "Message")) { if (pool->ipc_mode == SW_IPC_NONE) { - php_swoole_fatal_error(E_WARNING, "cannot set onMessage event with ipc_type=0"); + zend_throw_exception( + swoole_exception_ce, "cannot set `onMessage` event with ipc_type=0", SW_ERROR_INVALID_PARAMS); RETURN_FALSE; } if (pp->onMessage) { @@ -355,11 +431,21 @@ static PHP_METHOD(swoole_process_pool, on) { sw_callable_free(pp->onWorkerStop); } pp->onWorkerStop = sw_callable_create(zfn); + } else if (SW_STRCASEEQ(name, l_name, "WorkerExit")) { + if (pp->onWorkerExit) { + sw_callable_free(pp->onWorkerExit); + } + pp->onWorkerExit = sw_callable_create(zfn); } else if (SW_STRCASEEQ(name, l_name, "Start")) { if (pp->onStart) { sw_callable_free(pp->onStart); } pp->onStart = sw_callable_create(zfn); + } else if (SW_STRCASEEQ(name, l_name, "Shutdown")) { + if (pp->onShutdown) { + sw_callable_free(pp->onShutdown); + } + pp->onShutdown = sw_callable_create(zfn); } else { php_swoole_error(E_WARNING, "unknown event type[%s]", name); RETURN_FALSE; @@ -489,31 +575,33 @@ static PHP_METHOD(swoole_process_pool, start) { } } + if (pp->onWorkerExit && !pp->enable_coroutine) { + zend_throw_exception( + swoole_exception_ce, "cannot set `onWorkerExit` without enable_coroutine", SW_ERROR_INVALID_PARAMS); + RETURN_FALSE; + } + if (pp->onMessage) { pool->onMessage = process_pool_onMessage; } else { pool->main_loop = nullptr; } + current_pool = pool; + + pool->onStart = process_pool_onStart; + pool->onShutdown = process_pool_onShutdown; pool->onWorkerStart = process_pool_onWorkerStart; pool->onWorkerStop = process_pool_onWorkerStop; - zend_update_property_long(swoole_process_pool_ce, SW_Z8_OBJ_P(ZEND_THIS), ZEND_STRL("master_pid"), getpid()); + if (pp->enable_coroutine && pp->onWorkerExit) { + pool->onWorkerExit = process_pool_onWorkerExit; + } if (pool->start() < 0) { RETURN_FALSE; } - current_pool = pool; - - if (pp->onStart) { - zval args[1]; - args[0] = *ZEND_THIS; - if (UNEXPECTED(!zend::function::call(pp->onStart->ptr(), 1, args, nullptr, 0))) { - php_swoole_error(E_WARNING, "%s->onStart handler error", SW_Z_OBJCE_NAME_VAL_P(ZEND_THIS)); - } - } - pool->wait(); pool->shutdown(); @@ -618,10 +706,13 @@ static PHP_METHOD(swoole_process_pool, stop) { } static PHP_METHOD(swoole_process_pool, shutdown) { - zval *retval = - sw_zend_read_property_ex(swoole_process_pool_ce, ZEND_THIS, SW_ZSTR_KNOWN(SW_ZEND_STR_MASTER_PID), 0); - long pid = zval_get_long(retval); - RETURN_BOOL(swoole_kill(pid, SIGTERM) == 0); + long pid = zend::object_get_long(ZEND_THIS, SW_ZSTR_KNOWN(SW_ZEND_STR_MASTER_PID)); + if (pid > 0) { + RETURN_BOOL(swoole_kill(pid, SIGTERM) == 0); + } else { + zend_throw_exception(swoole_exception_ce, "invalid master pid", SW_ERROR_INVALID_PARAMS); + RETURN_FALSE; + } } static PHP_METHOD(swoole_process_pool, __destruct) {} diff --git a/include/swoole_process_pool.h b/include/swoole_process_pool.h index 003dacc3bfd..827f5ee0268 100644 --- a/include/swoole_process_pool.h +++ b/include/swoole_process_pool.h @@ -263,9 +263,12 @@ struct ProcessPool { uint8_t scheduler_warning; time_t warning_time; + void (*onStart)(ProcessPool *pool); + void (*onShutdown)(ProcessPool *pool); int (*onTask)(ProcessPool *pool, Worker *worker, EventData *task); void (*onWorkerStart)(ProcessPool *pool, Worker *worker); void (*onMessage)(ProcessPool *pool, RecvData *msg); + void (*onWorkerExit)(ProcessPool *pool, Worker *worker); void (*onWorkerStop)(ProcessPool *pool, Worker *worker); void (*onWorkerError)(ProcessPool *pool, Worker *worker, const ExitStatus &exit_status); void (*onWorkerMessage)(ProcessPool *pool, EventData *msg); @@ -277,7 +280,6 @@ struct ProcessPool { Worker *workers; std::vector> *pipes; std::unordered_map *map_; - Reactor *reactor; MsgQueue *queue; StreamInfo *stream_info_; Channel *message_box = nullptr; diff --git a/include/swoole_signal.h b/include/swoole_signal.h index f20a056e8b8..5416dedce5d 100644 --- a/include/swoole_signal.h +++ b/include/swoole_signal.h @@ -37,6 +37,7 @@ void swoole_signalfd_init(); #endif SW_API swSignalHandler swoole_signal_set(int signo, swSignalHandler func); +SW_API bool swoole_signal_isset(int signo); SW_API swSignalHandler swoole_signal_set(int signo, swSignalHandler func, int restart, int mask); SW_API swSignalHandler swoole_signal_get_handler(int signo); diff --git a/src/os/process_pool.cc b/src/os/process_pool.cc index 2762a979476..77233d2e509 100644 --- a/src/os/process_pool.cc +++ b/src/os/process_pool.cc @@ -265,11 +265,17 @@ int ProcessPool::start() { if (start_check() < 0) { return SW_ERR; } + + if (onStart) { + onStart(this); + } + SW_LOOP_N(worker_num) { if (spawn(&(workers[i])) < 0) { return SW_ERR; } } + return SW_OK; } @@ -417,9 +423,25 @@ bool ProcessPool::reload() { } void ProcessPool::stop(Worker *worker) { - if (async && worker->pipe_worker) { + worker->shutdown(); + + if (!swoole_event_is_available()) { + return; + } + + auto reactor = sw_reactor(); + if (worker->pipe_worker) { swoole_event_del(worker->pipe_worker); } + + if (onWorkerExit) { + reactor->set_end_callback(Reactor::PRIORITY_TRY_EXIT, [this, worker](Reactor *reactor) { + onWorkerExit(this, worker); + if (reactor->if_exit()) { + reactor->running = false; + } + }); + } } void ProcessPool::shutdown() { @@ -428,6 +450,10 @@ void ProcessPool::shutdown() { Worker *worker; running = 0; + if (onShutdown) { + onShutdown(this); + } + // concurrent kill for (i = 0; i < worker_num; i++) { worker = &workers[i]; diff --git a/src/os/signal.cc b/src/os/signal.cc index 86d1b26da5a..cab4a16e1fc 100644 --- a/src/os/signal.cc +++ b/src/os/signal.cc @@ -122,6 +122,10 @@ SignalHandler swoole_signal_set(int signo, SignalHandler func, int restart, int return oact.sa_handler; } +SW_API bool swoole_signal_isset(int signo) { + return signals[signo].handler && signals[signo].activated; +} + /** * set new signal handler and return origin signal handler */ diff --git a/tests/include/config.php b/tests/include/config.php index d54a78d3848..cc00a7d3bff 100644 --- a/tests/include/config.php +++ b/tests/include/config.php @@ -98,7 +98,7 @@ define('REDIS_SERVER_PWD', getenv('REDIS_SERVER_PWD') ?: 'root'); define('REDIS_SERVER_DB', (int)(getenv('REDIS_SERVER_DB') ?: 0)); -if (!getenv('SWOOLE_TEST_NO_DOCKER')) { +if (getenv('SWOOLE_TEST_IN_DOCKER')) { if (!empty($info = `docker ps 2>&1 | grep httpbin 2>&1`) && preg_match('/\s+?[^:]+:(\d+)->\d+\/tcp\s+/', $info, $matches) && is_numeric($matches[1])) { diff --git a/tests/swoole_process_pool/master_callback.phpt b/tests/swoole_process_pool/master_callback.phpt new file mode 100644 index 00000000000..0bfb29f95ca --- /dev/null +++ b/tests/swoole_process_pool/master_callback.phpt @@ -0,0 +1,47 @@ +--TEST-- +swoole_process_pool: master callback +--SKIPIF-- + +--FILE-- +on('workerStart', function (Swoole\Process\Pool $pool, int $workerId) { + echo "worker start\n"; + Assert::true($pool->workerRunning); + Assert::eq($pool->workerId, 0); + Assert::eq($pool->workerPid, posix_getpid()); + pcntl_signal(SIGTERM, function (){ + + }); + $pool->shutdown(); + sleep(20); + echo "worker exit\n"; +}); + +$pool->on('workerStop', function (Swoole\Process\Pool $pool, int $workerId) { + Assert::false($pool->workerRunning); + echo "worker stop\n"; +}); + +$pool->on('start', function (Swoole\Process\Pool $pool) { + Assert::true($pool->running); + echo "start\n"; +}); + +$pool->on('shutdown', function (Swoole\Process\Pool $pool) { + Assert::false($pool->running); + echo "shutdown\n"; +}); + +$pool->start(); +?> +--EXPECT-- +start +worker start +shutdown +worker exit +worker stop diff --git a/tests/swoole_process_pool/worker_exit_1.phpt b/tests/swoole_process_pool/worker_exit_1.phpt new file mode 100644 index 00000000000..2b9462eee2a --- /dev/null +++ b/tests/swoole_process_pool/worker_exit_1.phpt @@ -0,0 +1,56 @@ +--TEST-- +swoole_process_pool: worker exit +--SKIPIF-- + +--FILE-- +on('workerStart', function (Swoole\Process\Pool $pool, int $workerId) { + echo "worker start\n"; + Assert::eq($pool->workerId, $workerId); + + $count = 0; + while ($GLOBALS['running']) { + Co::sleep(0.03); + echo "sleep\n"; + if (++$count === 3) { + $pool->shutdown(); + } + } +}); + +$pool->on('workerStop', function ($pool, $data) { + echo "worker stop\n"; +}); + +$pool->on('workerExit', function ($pool, $data) { + $GLOBALS['count']++; + if ($GLOBALS['count'] == 3) { + $GLOBALS['running'] = false; + } + echo ('worker exit') . PHP_EOL; +}); + +$pool->start(); +?> +--EXPECT-- +worker start +sleep +sleep +sleep +worker exit +sleep +worker exit +sleep +worker exit +worker stop