Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize create socket #5286

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 67 additions & 6 deletions ext-src/swoole_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,10 @@ static zend_class_entry *swoole_server_task_result_ce;
static zend_object_handlers swoole_server_task_result_handlers;

static SW_THREAD_LOCAL zval swoole_server_instance;
#ifdef SW_THREAD
static SW_THREAD_LOCAL WorkerFn worker_thread_fn;
static SW_THREAD_LOCAL std::vector<ServerPortProperty *> swoole_server_port_properties;
static SW_THREAD_LOCAL std::unordered_map<size_t, ServerPortProperty *> swoole_server_port_properties;
#endif

static sw_inline ServerObject *server_fetch_object(zend_object *obj) {
return (ServerObject *) ((char *) obj - swoole_server_handlers.offset);
Expand All @@ -164,18 +166,39 @@ zval *php_swoole_server_zval_ptr(Server *serv) {

ServerPortProperty *php_swoole_server_get_port_property(ListenPort *port) {
#ifdef SW_THREAD
return swoole_server_port_properties.at(port->socket->get_fd());
#if defined(__linux__) && defined(HAVE_REUSEPORT)
if (sw_server()->enable_reuse_port && sw_server()->is_worker_thread()) {
/**
* If the ListenPort of other threads is delivered here, we can return the callback function
* based on the number of the ListenPort. This is because ListenPorts with the same order will
* have the same callback function.
*/
return swoole_server_port_properties[port->number];
} else
#endif
{
return swoole_server_port_properties[port->socket->get_fd()];
}
#else
return (ServerPortProperty *) port->ptr;
#endif
}

void php_swoole_server_set_port_property(ListenPort *port, ServerPortProperty *property) {
#ifdef SW_THREAD
if (swoole_server_port_properties.size() < (size_t) port->socket->get_fd() + 1) {
swoole_server_port_properties.resize((size_t) port->socket->get_fd() + 1);
#if defined(__linux__) && defined(HAVE_REUSEPORT)
if (property->serv->enable_reuse_port && property->serv->is_worker_thread()) {
/**
* If the ListenPort of other threads is delivered here, we can return the callback function
* based on the number of the ListenPort. This is because ListenPorts with the same order will
* have the same callback function.
*/
swoole_server_port_properties[port->number] = property;
} else
#endif
{
swoole_server_port_properties[port->socket->get_fd()] = property;
}
swoole_server_port_properties[port->socket->get_fd()] = property;
#else
port->ptr = property;
#endif
Expand Down Expand Up @@ -916,6 +939,31 @@ void ServerObject::on_before_start() {
serv->onReceive = php_swoole_server_onReceive;
}

#if defined(__linux__) && defined(HAVE_REUSEPORT) && defined(SW_THREAD)
/**
* The specific process is as follows: If `enable_reuse_port` is enabled, the main thread will create
* `worker_num` copies of all ListenPorts. When the child threads are generated, these copies
* will be distributed based on the `worker_id`. At this point, the child thread will add these ListenPorts
* to its own `ServerObject::property` in the `ports` section.
*/
if (serv->enable_reuse_port) {
size_t port_count, index;
port_count = index = serv->ports.size();
for (uint32_t worker_id = 1; worker_id <= serv->worker_num; worker_id++) {
for (size_t i = 0; i < port_count; i++) {
ListenPort *ls = serv->duplicate_port(serv->ports[i], worker_id, index++, i);
if (ls) {
/**
* Add it to `property->ports` so that all the properties of the main thread's `ListenPort`
* can be copied to the newly generated `ListenPort`.
*/
php_swoole_server_add_port(this, ls);
}
}
}
}
#endif

for (size_t i = 1; i < property->ports.size(); i++) {
zval *zport = property->ports.at(i);
zval *zport_setting =
Expand Down Expand Up @@ -1862,7 +1910,17 @@ static void server_ctor(zval *zserv, Server *serv) {

/* primary port */
for (auto ls : serv->ports) {
php_swoole_server_add_port(server_object, ls);
#if defined(__linux__) && defined(HAVE_REUSEPORT) && defined(SW_THREAD)
if (serv->enable_reuse_port && serv->is_worker_thread()) {
// The child thread only adds its own ListenPort.
if (ls->worker_id == sw_get_process_id()) {
php_swoole_server_add_port(server_object, ls);
}
} else
#endif
{
php_swoole_server_add_port(server_object, ls);
}
}

/* iterator */
Expand Down Expand Up @@ -1965,6 +2023,7 @@ static PHP_METHOD(swoole_server, set) {
ServerObject *server_object = server_fetch_object(Z_OBJ_P(ZEND_THIS));
Server *serv = php_swoole_server_get_and_check_server(ZEND_THIS);
if (serv->is_worker_thread()) {
RETURN_TRUE;
return;
}
if (serv->is_started()) {
Expand Down Expand Up @@ -2361,6 +2420,7 @@ static PHP_METHOD(swoole_server, set) {
zend_long v = zval_get_long(ztmp);
serv->message_queue_key = SW_MAX(0, SW_MIN(v, INT64_MAX));
}
#ifdef SW_THREAD
// bootstrap
if (php_swoole_array_get_value(vht, "bootstrap", ztmp)) {
zend::object_set(ZEND_THIS, ZEND_STRL("bootstrap"), ztmp);
Expand All @@ -2373,6 +2433,7 @@ static PHP_METHOD(swoole_server, set) {
} else {
ZVAL_NULL(&server_object->init_arguments);
}
#endif

if (serv->task_enable_coroutine &&
(serv->task_ipc_mode == Server::TASK_IPC_MSGQUEUE || serv->task_ipc_mode == Server::TASK_IPC_PREEMPTIVE)) {
Expand Down
18 changes: 18 additions & 0 deletions ext-src/swoole_server_port.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,22 @@ static zend_object *php_swoole_server_port_create_object(zend_class_entry *ce) {
return &server_port->std;
}

/**
* Since the socket creation occurs after executing Swoole\\Server::start, the fd here is -1, indicating that
* the connection has not been established yet.
*/
static zval* swoole_server_port_read_property(zend_object *object, zend_string *name, int type, void **cache_slot, zval *rv) {
zval *property = zend_std_read_property(object, name, type, cache_slot, rv);
if (SW_STREQ(ZSTR_VAL(name), ZSTR_LEN(name), "sock")) {
ListenPort *port = php_swoole_server_port_fetch_object(object)->port;;
if (Z_LVAL_P(property) == -1 && port->socket) {
ZVAL_LONG(property, port->get_fd());
}
}

return property;
}

SW_EXTERN_C_BEGIN
static PHP_METHOD(swoole_server_port, __construct);
static PHP_METHOD(swoole_server_port, __destruct);
Expand Down Expand Up @@ -186,6 +202,8 @@ void php_swoole_server_port_minit(int module_number) {
zend_declare_property_null(swoole_server_port_ce, ZEND_STRL("setting"), ZEND_ACC_PUBLIC);

zend_declare_property_null(swoole_server_port_ce, ZEND_STRL("connections"), ZEND_ACC_PUBLIC);

swoole_server_port_handlers.read_property = swoole_server_port_read_property;
}

/**
Expand Down
24 changes: 23 additions & 1 deletion include/swoole_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,10 @@ struct ListenPort {
*/
int kernel_socket_recv_buffer_size = 0;
int kernel_socket_send_buffer_size = 0;
#if defined(__linux__) && defined(HAVE_REUSEPORT) && defined(SW_THREAD)
WorkerId worker_id = 0;
size_t number = 0;
#endif

#ifdef SW_USE_OPENSSL
SSLContext *ssl_context = nullptr;
Expand Down Expand Up @@ -324,6 +328,12 @@ struct ListenPort {
void close();
bool import(int sock);
const char *get_protocols();
#if defined(__linux__) && defined(HAVE_REUSEPORT) && defined(SW_THREAD)
int create_socket(swoole::Server *server, bool bind = true);
#else
int create_socket(swoole::Server *server);
#endif
void close_socket_fd();

#ifdef SW_USE_OPENSSL
bool ssl_create_context(SSLContext *context);
Expand Down Expand Up @@ -733,7 +743,14 @@ class Server {
std::vector<ListenPort *> ports;

ListenPort *get_primary_port() {
return ports.front();
#if defined(__linux__) && defined(HAVE_REUSEPORT) && defined(SW_THREAD)
if (enable_reuse_port && is_worker_thread()) {
return ports.at((ports.size() / (worker_num + 1)) * (sw_get_process_id() + 1));
} else
#endif
{
return ports.front();
}
}

enum Mode get_mode() const {
Expand Down Expand Up @@ -936,7 +953,12 @@ class Server {
bool shutdown();

int add_worker(Worker *worker);
#if defined(__linux__) && defined(HAVE_REUSEPORT) && defined(SW_THREAD)
ListenPort *add_port(SocketType type, const char *host, int port, bool bind = true);
ListenPort *duplicate_port(ListenPort *ls, int worker_id, size_t index, size_t number);
#else
ListenPort *add_port(SocketType type, const char *host, int port);
#endif
int add_systemd_socket();
int add_hook(enum HookType type, const Callback &func, int push_back);
bool add_command(const std::string &command, int accepted_process_types, const Command::Handler &func);
Expand Down
80 changes: 53 additions & 27 deletions src/server/master.cc
Original file line number Diff line number Diff line change
Expand Up @@ -784,7 +784,19 @@ int Server::create() {
return SW_ERR;
}

port_gs_list = (ServerPortGS *) sw_shm_calloc(ports.size(), sizeof(ServerPortGS));
#if defined(__linux__) && defined(HAVE_REUSEPORT) && defined(SW_THREAD)
/**
* When enable_reuse_port is enabled, we pre-allocate an array called port_gs_list
* with a size of ports.size() * (worker_num + 1).
*/
if (enable_reuse_port) {
port_gs_list = (ServerPortGS *) sw_shm_calloc(ports.size() * (worker_num + 1), sizeof(ServerPortGS));
} else
#endif
{
port_gs_list = (ServerPortGS *) sw_shm_calloc(ports.size(), sizeof(ServerPortGS));
}

if (port_gs_list == nullptr) {
swoole_error("sw_shm_calloc() for port_connnection_num_array failed");
return SW_ERR;
Expand Down Expand Up @@ -1741,31 +1753,23 @@ int Server::add_systemd_socket() {
return count;
}

static bool Server_create_socket(ListenPort *ls) {
ls->socket = make_socket(
ls->type, ls->is_dgram() ? SW_FD_DGRAM_SERVER : SW_FD_STREAM_SERVER, SW_SOCK_CLOEXEC | SW_SOCK_NONBLOCK);
if (!ls->socket) {
return false;
}
#if defined(SW_SUPPORT_DTLS) && defined(HAVE_KQUEUE)
if (ls->is_dtls()) {
ls->socket->set_reuse_port();
}
#endif

if (ls->socket->bind(ls->host, &ls->port) < 0) {
swoole_set_last_error(errno);
ls->socket->free();
return false;
}

ls->socket->info.assign(ls->type, ls->host, ls->port);

return true;
}

#if defined(__linux__) && defined(HAVE_REUSEPORT) && defined(SW_THREAD)
/**
* When enable_reuse_port is enabled, we need to pre-copy some ListenPort generated during the main thread period
* to allocate them to the sub-threads, but without binding them, otherwise we will encounter an error of address
* already in use.
*
* `bind` is only set to false when `Server::duplicate_port` is executed.
*/
ListenPort *Server::add_port(SocketType type, const char *host, int port, bool bind) {
#else
ListenPort *Server::add_port(SocketType type, const char *host, int port) {
if (session_list) {
#endif
if (session_list
#if defined(__linux__) && defined(HAVE_REUSEPORT) && defined(SW_THREAD)
&& !enable_reuse_port && !is_worker_thread()
#endif
) {
swoole_error_log(SW_LOG_ERROR, SW_ERROR_WRONG_OPERATION, "must add port before server is created");
return nullptr;
}
Expand Down Expand Up @@ -1821,8 +1825,11 @@ ListenPort *Server::add_port(SocketType type, const char *host, int port) {
}
#endif

if (!Server_create_socket(ls)) {
swoole_set_last_error(errno);
#if defined(__linux__) && defined(HAVE_REUSEPORT) && defined(SW_THREAD)
if (ls->create_socket(this, bind) < 0) {
#else
if (ls->create_socket(this) < 0) {
#endif
return nullptr;
}

Expand All @@ -1832,6 +1839,25 @@ ListenPort *Server::add_port(SocketType type, const char *host, int port) {
return ls;
}

#if defined(__linux__) && defined(HAVE_REUSEPORT) && defined(SW_THREAD)
/**
* When enable_reuse_port is enabled, we need to duplicate the ListenPort generated by the main thread.
*/
ListenPort *Server::duplicate_port(ListenPort *ls, int worker_id, size_t index, size_t number) {
ListenPort *port = add_port(ls->type, ls->host.c_str(), ls->port, false);
if (!port) {
swoole_warning("duplicate listen port failed");
return nullptr;
}

port->worker_id = worker_id;
port->number = number;
port->gs = &port_gs_list[index];
port->gs->connection_nums = (sw_atomic_t *) sw_shm_calloc(worker_num, sizeof(sw_atomic_t));
return port;
}
#endif

static void Server_signal_handler(int sig) {
swoole_trace_log(SW_TRACE_SERVER, "signal[%d] %s triggered in %d", sig, swoole_signal_to_str(sig), getpid());

Expand Down
Loading
Loading