From ad98de47c07e3626301b047c30cf92793de19c35 Mon Sep 17 00:00:00 2001 From: NathanFreeman <1056159381@qq.com> Date: Wed, 3 Apr 2024 23:47:34 +0800 Subject: [PATCH 1/4] Optimize create socket --- ext-src/swoole_server.cc | 3 ++ ext-src/swoole_server_port.cc | 18 ++++++++++++ include/swoole_server.h | 1 + src/server/master.cc | 26 +---------------- src/server/port.cc | 48 +++++++++++++++++++++++++++++++ src/server/reactor_process.cc | 54 ++++++++++------------------------- src/server/worker_threads.cc | 27 +++++++----------- 7 files changed, 96 insertions(+), 81 deletions(-) diff --git a/ext-src/swoole_server.cc b/ext-src/swoole_server.cc index 937492c3f32..ad3440d897f 100644 --- a/ext-src/swoole_server.cc +++ b/ext-src/swoole_server.cc @@ -1965,6 +1965,7 @@ static PHP_METHOD(swoole_server, set) { ServerObject *server_object = server_fetch_object(Z_OBJ_P(ZEND_THIS)); Server *serv = php_swoole_server_get_and_check_server(ZEND_THIS); if (serv->is_worker_thread()) { + RETURN_TRUE; return; } if (serv->is_started()) { @@ -2361,6 +2362,7 @@ static PHP_METHOD(swoole_server, set) { zend_long v = zval_get_long(ztmp); serv->message_queue_key = SW_MAX(0, SW_MIN(v, INT64_MAX)); } +#ifdef SW_THREAD // bootstrap if (php_swoole_array_get_value(vht, "bootstrap", ztmp)) { zend::object_set(ZEND_THIS, ZEND_STRL("bootstrap"), ztmp); @@ -2373,6 +2375,7 @@ static PHP_METHOD(swoole_server, set) { } else { ZVAL_NULL(&server_object->init_arguments); } +#endif if (serv->task_enable_coroutine && (serv->task_ipc_mode == Server::TASK_IPC_MSGQUEUE || serv->task_ipc_mode == Server::TASK_IPC_PREEMPTIVE)) { diff --git a/ext-src/swoole_server_port.cc b/ext-src/swoole_server_port.cc index e76a07b5c48..b7a74f7f950 100644 --- a/ext-src/swoole_server_port.cc +++ b/ext-src/swoole_server_port.cc @@ -125,6 +125,22 @@ static zend_object *php_swoole_server_port_create_object(zend_class_entry *ce) { return &server_port->std; } +/** + * Since the socket creation occurs after executing Swoole\\Server::start, the fd here is -1, indicating that + * the connection has not been established yet. + */ +static zval* swoole_server_port_read_property(zend_object *object, zend_string *name, int type, void **cache_slot, zval *rv) { + zval *property = zend_std_read_property(object, name, type, cache_slot, rv); + if (SW_STREQ(ZSTR_VAL(name), ZSTR_LEN(name), "sock")) { + ListenPort *port = php_swoole_server_port_fetch_object(object)->port;; + if (Z_LVAL_P(property) == -1 && port->socket) { + ZVAL_LONG(property, port->get_fd()); + } + } + + return property; +} + SW_EXTERN_C_BEGIN static PHP_METHOD(swoole_server_port, __construct); static PHP_METHOD(swoole_server_port, __destruct); @@ -186,6 +202,8 @@ void php_swoole_server_port_minit(int module_number) { zend_declare_property_null(swoole_server_port_ce, ZEND_STRL("setting"), ZEND_ACC_PUBLIC); zend_declare_property_null(swoole_server_port_ce, ZEND_STRL("connections"), ZEND_ACC_PUBLIC); + + swoole_server_port_handlers.read_property = swoole_server_port_read_property; } /** diff --git a/include/swoole_server.h b/include/swoole_server.h index 97756282da1..853c102d056 100644 --- a/include/swoole_server.h +++ b/include/swoole_server.h @@ -356,6 +356,7 @@ struct ListenPort { } size_t get_connection_num(); + int create_socket(swoole::Server *server); }; struct ServerGS { diff --git a/src/server/master.cc b/src/server/master.cc index d150de7f067..600ecd76cc8 100644 --- a/src/server/master.cc +++ b/src/server/master.cc @@ -1741,29 +1741,6 @@ int Server::add_systemd_socket() { return count; } -static bool Server_create_socket(ListenPort *ls) { - ls->socket = make_socket( - ls->type, ls->is_dgram() ? SW_FD_DGRAM_SERVER : SW_FD_STREAM_SERVER, SW_SOCK_CLOEXEC | SW_SOCK_NONBLOCK); - if (!ls->socket) { - return false; - } -#if defined(SW_SUPPORT_DTLS) && defined(HAVE_KQUEUE) - if (ls->is_dtls()) { - ls->socket->set_reuse_port(); - } -#endif - - if (ls->socket->bind(ls->host, &ls->port) < 0) { - swoole_set_last_error(errno); - ls->socket->free(); - return false; - } - - ls->socket->info.assign(ls->type, ls->host, ls->port); - - return true; -} - ListenPort *Server::add_port(SocketType type, const char *host, int port) { if (session_list) { swoole_error_log(SW_LOG_ERROR, SW_ERROR_WRONG_OPERATION, "must add port before server is created"); @@ -1821,8 +1798,7 @@ ListenPort *Server::add_port(SocketType type, const char *host, int port) { } #endif - if (!Server_create_socket(ls)) { - swoole_set_last_error(errno); + if (ls->create_socket(this) < 0) { return nullptr; } diff --git a/src/server/port.cc b/src/server/port.cc index a302c869102..6cce6ba2837 100644 --- a/src/server/port.cc +++ b/src/server/port.cc @@ -792,4 +792,52 @@ size_t ListenPort::get_connection_num() { } } +int ListenPort::create_socket(Server *server) { + if (socket) { +#if defined(__linux__) && defined(HAVE_REUSEPORT) + if (server->enable_reuse_port) { + if (::close(socket->fd) < 0) { + swoole_sys_warning("close(%d) failed", socket->fd); + } + delete socket; + socket = nullptr; + } else +#endif + { + return SW_OK; + } + } + + socket = swoole::make_socket( + type, is_dgram() ? SW_FD_DGRAM_SERVER : SW_FD_STREAM_SERVER, SW_SOCK_CLOEXEC | SW_SOCK_NONBLOCK); + if (socket == nullptr) { + swoole_set_last_error(errno); + return SW_ERR; + } + +#if defined(SW_SUPPORT_DTLS) && defined(HAVE_KQUEUE) + if (>is_dtls()) { + socket->set_reuse_port(); + } +#endif + +#if defined(__linux__) && defined(HAVE_REUSEPORT) + if (server->enable_reuse_port) { + if (socket->set_reuse_port() < 0) { + socket->free(); + return SW_ERR; + } + } +#endif + + if (socket->bind(host, &port) < 0) { + swoole_set_last_error(errno); + socket->free(); + return SW_ERR; + } + + socket->info.assign(type, host, port); + return SW_OK; +} + } // namespace swoole diff --git a/src/server/reactor_process.cc b/src/server/reactor_process.cc index 5257135ae60..3c6d21892bc 100644 --- a/src/server/reactor_process.cc +++ b/src/server/reactor_process.cc @@ -24,10 +24,6 @@ static int ReactorProcess_onPipeRead(Reactor *reactor, Event *event); static int ReactorProcess_onClose(Reactor *reactor, Event *event); static void ReactorProcess_onTimeout(Timer *timer, TimerNode *tnode); -#ifdef HAVE_REUSEPORT -static int ReactorProcess_reuse_port(ListenPort *ls); -#endif - static bool Server_is_single(Server *serv) { return (serv->worker_num == 1 && serv->task_worker_num == 0 && serv->max_request == 0 && serv->user_worker_list.empty()); @@ -53,24 +49,17 @@ int Server::start_reactor_processes() { // listen TCP if (have_stream_sock == 1) { for (auto ls : ports) { - if (ls->is_dgram()) { - continue; - } -#ifdef HAVE_REUSEPORT - if (enable_reuse_port) { - if (::close(ls->socket->fd) < 0) { - swoole_sys_warning("close(%d) failed", ls->socket->fd); - } - delete ls->socket; - ls->socket = nullptr; - continue; - } else + if (ls->is_stream()) { +#if defined(__linux__) && defined(HAVE_REUSEPORT) + if (!enable_reuse_port) { #endif - { - // listen server socket - if (ls->listen() < 0) { - return SW_ERR; + // listen server socket + if (ls->listen() < 0) { + return SW_ERR; + } +#if defined(__linux__) && defined(HAVE_REUSEPORT) } +#endif } } } @@ -207,12 +196,16 @@ int Server::worker_main_loop(ProcessPool *pool, Worker *worker) { worker_signal_init(); for (auto ls : serv->ports) { -#ifdef HAVE_REUSEPORT +#if defined(__linux__) && defined(HAVE_REUSEPORT) if (ls->is_stream() && serv->enable_reuse_port) { - if (ReactorProcess_reuse_port(ls) < 0) { + if (ls->create_socket(serv) < 0) { swoole_event_free(); return SW_ERR; } + + if (ls->listen() < 0) { + return SW_ERR; + } } #endif if (reactor->add(ls->socket, SW_EVENT_READ) < 0) { @@ -363,21 +356,4 @@ static void ReactorProcess_onTimeout(Timer *timer, TimerNode *tnode) { ReactorProcess_onClose(reactor, ¬ify_ev); }); } - -#ifdef HAVE_REUSEPORT -static int ReactorProcess_reuse_port(ListenPort *ls) { - ls->socket = swoole::make_socket( - ls->type, ls->is_dgram() ? SW_FD_DGRAM_SERVER : SW_FD_STREAM_SERVER, SW_SOCK_CLOEXEC | SW_SOCK_NONBLOCK); - if (ls->socket->set_reuse_port() < 0) { - ls->socket->free(); - return SW_ERR; - } - if (ls->socket->bind(ls->host, &ls->port) < 0) { - ls->socket->free(); - return SW_ERR; - } - return ls->listen(); -} -#endif - } // namespace swoole diff --git a/src/server/worker_threads.cc b/src/server/worker_threads.cc index 799bc9d8438..8f062fb7cc1 100644 --- a/src/server/worker_threads.cc +++ b/src/server/worker_threads.cc @@ -46,7 +46,7 @@ struct WorkerThreads { cv_.notify_one(); } - template + template void create_thread(int i, _Callable fn) { if (threads_[i].joinable()) { threads_[i].join(); @@ -128,24 +128,17 @@ int Server::start_worker_threads() { // listen TCP if (have_stream_sock == 1) { for (auto ls : ports) { - if (ls->is_dgram()) { - continue; - } -#ifdef HAVE_REUSEPORT - if (enable_reuse_port) { - if (::close(ls->socket->fd) < 0) { - swoole_sys_warning("close(%d) failed", ls->socket->fd); - } - delete ls->socket; - ls->socket = nullptr; - continue; - } else + if (ls->is_stream()) { +#if defined(__linux__) && defined(HAVE_REUSEPORT) + if (!enable_reuse_port) { #endif - { - // listen server socket - if (ls->listen() < 0) { - return SW_ERR; + // listen server socket + if (ls->listen() < 0) { + return SW_ERR; + } +#if defined(__linux__) && defined(HAVE_REUSEPORT) } +#endif } } } From c37fdd6c4e9b8a01d4f592d1e0abaf1f37e4a7f4 Mon Sep 17 00:00:00 2001 From: NathanFreeman <1056159381@qq.com> Date: Wed, 3 Apr 2024 23:57:58 +0800 Subject: [PATCH 2/4] fix error --- src/server/port.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server/port.cc b/src/server/port.cc index 6cce6ba2837..745864a9195 100644 --- a/src/server/port.cc +++ b/src/server/port.cc @@ -816,7 +816,7 @@ int ListenPort::create_socket(Server *server) { } #if defined(SW_SUPPORT_DTLS) && defined(HAVE_KQUEUE) - if (>is_dtls()) { + if (ls->is_dtls()) { socket->set_reuse_port(); } #endif From 052e0fe8542167a27c32eb43cb7f81ae013342e1 Mon Sep 17 00:00:00 2001 From: NathanFreeman <1056159381@qq.com> Date: Thu, 4 Apr 2024 17:06:21 +0800 Subject: [PATCH 3/4] fix error --- include/swoole_server.h | 3 ++- src/server/port.cc | 8 ++++++++ src/server/reactor_process.cc | 2 ++ src/server/worker_threads.cc | 2 ++ 4 files changed, 14 insertions(+), 1 deletion(-) diff --git a/include/swoole_server.h b/include/swoole_server.h index 853c102d056..1193ba43804 100644 --- a/include/swoole_server.h +++ b/include/swoole_server.h @@ -324,6 +324,8 @@ struct ListenPort { void close(); bool import(int sock); const char *get_protocols(); + int create_socket(swoole::Server *server); + void close_socket_fd(); #ifdef SW_USE_OPENSSL bool ssl_create_context(SSLContext *context); @@ -356,7 +358,6 @@ struct ListenPort { } size_t get_connection_num(); - int create_socket(swoole::Server *server); }; struct ServerGS { diff --git a/src/server/port.cc b/src/server/port.cc index 745864a9195..ab720038249 100644 --- a/src/server/port.cc +++ b/src/server/port.cc @@ -840,4 +840,12 @@ int ListenPort::create_socket(Server *server) { return SW_OK; } +void ListenPort::close_socket_fd() { + if (::close(socket->fd) < 0) { + swoole_sys_warning("close(%d) failed", socket->fd); + } + delete socket; + socket = nullptr; +} + } // namespace swoole diff --git a/src/server/reactor_process.cc b/src/server/reactor_process.cc index 3c6d21892bc..dd272c6e676 100644 --- a/src/server/reactor_process.cc +++ b/src/server/reactor_process.cc @@ -58,6 +58,8 @@ int Server::start_reactor_processes() { return SW_ERR; } #if defined(__linux__) && defined(HAVE_REUSEPORT) + } else { + ls->close_socket_fd(); } #endif } diff --git a/src/server/worker_threads.cc b/src/server/worker_threads.cc index 8f062fb7cc1..d906862ad47 100644 --- a/src/server/worker_threads.cc +++ b/src/server/worker_threads.cc @@ -137,6 +137,8 @@ int Server::start_worker_threads() { return SW_ERR; } #if defined(__linux__) && defined(HAVE_REUSEPORT) + } else { + ls->close_socket_fd(); } #endif } From 871dbb69d79b753b4b8a45aa546e9e8c7d4c53c8 Mon Sep 17 00:00:00 2001 From: NathanFreeman <1056159381@qq.com> Date: Sat, 6 Apr 2024 11:43:27 +0800 Subject: [PATCH 4/4] enable_reuse_port --- ext-src/swoole_server.cc | 70 ++++++++++++++++++++++++++++++++--- include/swoole_server.h | 22 ++++++++++- src/server/master.cc | 54 ++++++++++++++++++++++++++- src/server/port.cc | 30 +++++++++++++-- src/server/reactor_process.cc | 14 ++++++- src/server/worker_threads.cc | 2 - 6 files changed, 175 insertions(+), 17 deletions(-) diff --git a/ext-src/swoole_server.cc b/ext-src/swoole_server.cc index ad3440d897f..58a6b2094c1 100644 --- a/ext-src/swoole_server.cc +++ b/ext-src/swoole_server.cc @@ -139,8 +139,10 @@ static zend_class_entry *swoole_server_task_result_ce; static zend_object_handlers swoole_server_task_result_handlers; static SW_THREAD_LOCAL zval swoole_server_instance; +#ifdef SW_THREAD static SW_THREAD_LOCAL WorkerFn worker_thread_fn; -static SW_THREAD_LOCAL std::vector swoole_server_port_properties; +static SW_THREAD_LOCAL std::unordered_map swoole_server_port_properties; +#endif static sw_inline ServerObject *server_fetch_object(zend_object *obj) { return (ServerObject *) ((char *) obj - swoole_server_handlers.offset); @@ -164,7 +166,19 @@ zval *php_swoole_server_zval_ptr(Server *serv) { ServerPortProperty *php_swoole_server_get_port_property(ListenPort *port) { #ifdef SW_THREAD - return swoole_server_port_properties.at(port->socket->get_fd()); +#if defined(__linux__) && defined(HAVE_REUSEPORT) + if (sw_server()->enable_reuse_port && sw_server()->is_worker_thread()) { + /** + * If the ListenPort of other threads is delivered here, we can return the callback function + * based on the number of the ListenPort. This is because ListenPorts with the same order will + * have the same callback function. + */ + return swoole_server_port_properties[port->number]; + } else +#endif + { + return swoole_server_port_properties[port->socket->get_fd()]; + } #else return (ServerPortProperty *) port->ptr; #endif @@ -172,10 +186,19 @@ ServerPortProperty *php_swoole_server_get_port_property(ListenPort *port) { void php_swoole_server_set_port_property(ListenPort *port, ServerPortProperty *property) { #ifdef SW_THREAD - if (swoole_server_port_properties.size() < (size_t) port->socket->get_fd() + 1) { - swoole_server_port_properties.resize((size_t) port->socket->get_fd() + 1); +#if defined(__linux__) && defined(HAVE_REUSEPORT) + if (property->serv->enable_reuse_port && property->serv->is_worker_thread()) { + /** + * If the ListenPort of other threads is delivered here, we can return the callback function + * based on the number of the ListenPort. This is because ListenPorts with the same order will + * have the same callback function. + */ + swoole_server_port_properties[port->number] = property; + } else +#endif + { + swoole_server_port_properties[port->socket->get_fd()] = property; } - swoole_server_port_properties[port->socket->get_fd()] = property; #else port->ptr = property; #endif @@ -916,6 +939,31 @@ void ServerObject::on_before_start() { serv->onReceive = php_swoole_server_onReceive; } +#if defined(__linux__) && defined(HAVE_REUSEPORT) && defined(SW_THREAD) + /** + * The specific process is as follows: If `enable_reuse_port` is enabled, the main thread will create + * `worker_num` copies of all ListenPorts. When the child threads are generated, these copies + * will be distributed based on the `worker_id`. At this point, the child thread will add these ListenPorts + * to its own `ServerObject::property` in the `ports` section. + */ + if (serv->enable_reuse_port) { + size_t port_count, index; + port_count = index = serv->ports.size(); + for (uint32_t worker_id = 1; worker_id <= serv->worker_num; worker_id++) { + for (size_t i = 0; i < port_count; i++) { + ListenPort *ls = serv->duplicate_port(serv->ports[i], worker_id, index++, i); + if (ls) { + /** + * Add it to `property->ports` so that all the properties of the main thread's `ListenPort` + * can be copied to the newly generated `ListenPort`. + */ + php_swoole_server_add_port(this, ls); + } + } + } + } +#endif + for (size_t i = 1; i < property->ports.size(); i++) { zval *zport = property->ports.at(i); zval *zport_setting = @@ -1862,7 +1910,17 @@ static void server_ctor(zval *zserv, Server *serv) { /* primary port */ for (auto ls : serv->ports) { - php_swoole_server_add_port(server_object, ls); +#if defined(__linux__) && defined(HAVE_REUSEPORT) && defined(SW_THREAD) + if (serv->enable_reuse_port && serv->is_worker_thread()) { + // The child thread only adds its own ListenPort. + if (ls->worker_id == sw_get_process_id()) { + php_swoole_server_add_port(server_object, ls); + } + } else +#endif + { + php_swoole_server_add_port(server_object, ls); + } } /* iterator */ diff --git a/include/swoole_server.h b/include/swoole_server.h index 1193ba43804..17e970a2332 100644 --- a/include/swoole_server.h +++ b/include/swoole_server.h @@ -272,6 +272,10 @@ struct ListenPort { */ int kernel_socket_recv_buffer_size = 0; int kernel_socket_send_buffer_size = 0; +#if defined(__linux__) && defined(HAVE_REUSEPORT) && defined(SW_THREAD) + WorkerId worker_id = 0; + size_t number = 0; +#endif #ifdef SW_USE_OPENSSL SSLContext *ssl_context = nullptr; @@ -324,7 +328,11 @@ struct ListenPort { void close(); bool import(int sock); const char *get_protocols(); +#if defined(__linux__) && defined(HAVE_REUSEPORT) && defined(SW_THREAD) + int create_socket(swoole::Server *server, bool bind = true); +#else int create_socket(swoole::Server *server); +#endif void close_socket_fd(); #ifdef SW_USE_OPENSSL @@ -735,7 +743,14 @@ class Server { std::vector ports; ListenPort *get_primary_port() { - return ports.front(); +#if defined(__linux__) && defined(HAVE_REUSEPORT) && defined(SW_THREAD) + if (enable_reuse_port && is_worker_thread()) { + return ports.at((ports.size() / (worker_num + 1)) * (sw_get_process_id() + 1)); + } else +#endif + { + return ports.front(); + } } enum Mode get_mode() const { @@ -938,7 +953,12 @@ class Server { bool shutdown(); int add_worker(Worker *worker); +#if defined(__linux__) && defined(HAVE_REUSEPORT) && defined(SW_THREAD) + ListenPort *add_port(SocketType type, const char *host, int port, bool bind = true); + ListenPort *duplicate_port(ListenPort *ls, int worker_id, size_t index, size_t number); +#else ListenPort *add_port(SocketType type, const char *host, int port); +#endif int add_systemd_socket(); int add_hook(enum HookType type, const Callback &func, int push_back); bool add_command(const std::string &command, int accepted_process_types, const Command::Handler &func); diff --git a/src/server/master.cc b/src/server/master.cc index 600ecd76cc8..92963d89aab 100644 --- a/src/server/master.cc +++ b/src/server/master.cc @@ -784,7 +784,19 @@ int Server::create() { return SW_ERR; } - port_gs_list = (ServerPortGS *) sw_shm_calloc(ports.size(), sizeof(ServerPortGS)); +#if defined(__linux__) && defined(HAVE_REUSEPORT) && defined(SW_THREAD) + /** + * When enable_reuse_port is enabled, we pre-allocate an array called port_gs_list + * with a size of ports.size() * (worker_num + 1). + */ + if (enable_reuse_port) { + port_gs_list = (ServerPortGS *) sw_shm_calloc(ports.size() * (worker_num + 1), sizeof(ServerPortGS)); + } else +#endif + { + port_gs_list = (ServerPortGS *) sw_shm_calloc(ports.size(), sizeof(ServerPortGS)); + } + if (port_gs_list == nullptr) { swoole_error("sw_shm_calloc() for port_connnection_num_array failed"); return SW_ERR; @@ -1741,8 +1753,23 @@ int Server::add_systemd_socket() { return count; } +#if defined(__linux__) && defined(HAVE_REUSEPORT) && defined(SW_THREAD) +/** + * When enable_reuse_port is enabled, we need to pre-copy some ListenPort generated during the main thread period + * to allocate them to the sub-threads, but without binding them, otherwise we will encounter an error of address + * already in use. + * + * `bind` is only set to false when `Server::duplicate_port` is executed. + */ +ListenPort *Server::add_port(SocketType type, const char *host, int port, bool bind) { +#else ListenPort *Server::add_port(SocketType type, const char *host, int port) { - if (session_list) { +#endif + if (session_list +#if defined(__linux__) && defined(HAVE_REUSEPORT) && defined(SW_THREAD) + && !enable_reuse_port && !is_worker_thread() +#endif + ) { swoole_error_log(SW_LOG_ERROR, SW_ERROR_WRONG_OPERATION, "must add port before server is created"); return nullptr; } @@ -1798,7 +1825,11 @@ ListenPort *Server::add_port(SocketType type, const char *host, int port) { } #endif +#if defined(__linux__) && defined(HAVE_REUSEPORT) && defined(SW_THREAD) + if (ls->create_socket(this, bind) < 0) { +#else if (ls->create_socket(this) < 0) { +#endif return nullptr; } @@ -1808,6 +1839,25 @@ ListenPort *Server::add_port(SocketType type, const char *host, int port) { return ls; } +#if defined(__linux__) && defined(HAVE_REUSEPORT) && defined(SW_THREAD) +/** + * When enable_reuse_port is enabled, we need to duplicate the ListenPort generated by the main thread. + */ +ListenPort *Server::duplicate_port(ListenPort *ls, int worker_id, size_t index, size_t number) { + ListenPort *port = add_port(ls->type, ls->host.c_str(), ls->port, false); + if (!port) { + swoole_warning("duplicate listen port failed"); + return nullptr; + } + + port->worker_id = worker_id; + port->number = number; + port->gs = &port_gs_list[index]; + port->gs->connection_nums = (sw_atomic_t *) sw_shm_calloc(worker_num, sizeof(sw_atomic_t)); + return port; +} +#endif + static void Server_signal_handler(int sig) { swoole_trace_log(SW_TRACE_SERVER, "signal[%d] %s triggered in %d", sig, swoole_signal_to_str(sig), getpid()); diff --git a/src/server/port.cc b/src/server/port.cc index ab720038249..26e690da307 100644 --- a/src/server/port.cc +++ b/src/server/port.cc @@ -792,7 +792,22 @@ size_t ListenPort::get_connection_num() { } } +#if defined(__linux__) && defined(HAVE_REUSEPORT) && defined(SW_THREAD) +/** + * When `bind` is set to false, it indicates that it is the process of duplicating `ListenPort` + * using `Server::duplicate_port`. + */ +int ListenPort::create_socket(Server *server, bool bind) { + /** + * When `worker_id` is greater than 0, the socket is created through `Server::duplicate_port`, and no additional + * operations are required. Simply execute the bind operation. + */ + if (worker_id > 0) { + goto start_bind; + } +#else int ListenPort::create_socket(Server *server) { +#endif if (socket) { #if defined(__linux__) && defined(HAVE_REUSEPORT) if (server->enable_reuse_port) { @@ -830,11 +845,18 @@ int ListenPort::create_socket(Server *server) { } #endif - if (socket->bind(host, &port) < 0) { - swoole_set_last_error(errno); - socket->free(); - return SW_ERR; +#if defined(__linux__) && defined(HAVE_REUSEPORT) && defined(SW_THREAD) +start_bind: + if (bind) { +#endif + if (socket->bind(host, &port) < 0) { + swoole_set_last_error(errno); + socket->free(); + return SW_ERR; + } +#if defined(__linux__) && defined(HAVE_REUSEPORT) && defined(SW_THREAD) } +#endif socket->info.assign(type, host, port); return SW_OK; diff --git a/src/server/reactor_process.cc b/src/server/reactor_process.cc index dd272c6e676..ee5350f4341 100644 --- a/src/server/reactor_process.cc +++ b/src/server/reactor_process.cc @@ -179,7 +179,13 @@ int Server::worker_main_loop(ProcessPool *pool, Worker *worker) { } SwooleWG.max_request = serv->max_request; SwooleWG.worker = worker; - SwooleTG.id = 0; +#if defined(__linux__) && defined(HAVE_REUSEPORT) && defined(SW_THREAD) + if (!serv->enable_reuse_port) { +#endif + SwooleTG.id = 0; +#if defined(__linux__) && defined(HAVE_REUSEPORT) && defined(SW_THREAD) + } +#endif serv->init_worker(worker); @@ -200,11 +206,15 @@ int Server::worker_main_loop(ProcessPool *pool, Worker *worker) { for (auto ls : serv->ports) { #if defined(__linux__) && defined(HAVE_REUSEPORT) if (ls->is_stream() && serv->enable_reuse_port) { +#ifdef SW_THREAD + if (ls->worker_id != worker->id + 1) { + continue; + } +#endif if (ls->create_socket(serv) < 0) { swoole_event_free(); return SW_ERR; } - if (ls->listen() < 0) { return SW_ERR; } diff --git a/src/server/worker_threads.cc b/src/server/worker_threads.cc index d906862ad47..8f062fb7cc1 100644 --- a/src/server/worker_threads.cc +++ b/src/server/worker_threads.cc @@ -137,8 +137,6 @@ int Server::start_worker_threads() { return SW_ERR; } #if defined(__linux__) && defined(HAVE_REUSEPORT) - } else { - ls->close_socket_fd(); } #endif }