Skip to content

Commit

Permalink
Optimize 14
Browse files Browse the repository at this point in the history
  • Loading branch information
matyhtf committed Apr 7, 2024
1 parent df62cc3 commit 053a7d4
Show file tree
Hide file tree
Showing 18 changed files with 298 additions and 252 deletions.
2 changes: 1 addition & 1 deletion core-tests/src/server/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -678,7 +678,7 @@ TEST(server, task_worker4) {
serv->gs->task_workers.dispatch(&buf, &_dst_worker_id);
sleep(1);

EventData *task_result = &(serv->task_result[sw_get_process_id()]);
EventData *task_result = &(serv->task_result[swoole_get_process_id()]);
sw_memset_zero(task_result, sizeof(*task_result));
memset(&buf.info, 0, sizeof(buf.info));
buf.info.len = strlen(packet);
Expand Down
7 changes: 6 additions & 1 deletion examples/thread/thread_server.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
]);

$http->on('Request', function ($req, $resp) use ($http) {
$resp->end("tid=" . \Swoole\Thread::getId() . ', fd=' . $req->fd);
// $resp->end("tid=" . \Swoole\Thread::getId() . ', fd=' . $req->fd);
$resp->end('hello world');
});

$http->addProcess(new \Swoole\Process(function () {
Expand All @@ -27,4 +28,8 @@
var_dump(\Swoole\Thread::getArguments());
});

$http->on('WorkerStop', function ($serv, $wid) {
var_dump('stop: T' . \Swoole\Thread::getId());
});

$http->start();
2 changes: 1 addition & 1 deletion ext-src/swoole_process.cc
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,7 @@ int php_swoole_process_start(Worker *process, zval *zobject) {
}

php_swoole_process_clean();
sw_set_process_id(process->id);
swoole_set_process_id(process->id);
SwooleWG.worker = process;

zend_update_property_long(swoole_process_ce, SW_Z8_OBJ_P(zobject), ZEND_STRL("pid"), process->pid);
Expand Down
6 changes: 3 additions & 3 deletions ext-src/swoole_process_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ static PHP_METHOD(swoole_process_pool, getProcess) {
php_swoole_error(E_WARNING, "invalid worker_id[%ld]", worker_id);
RETURN_FALSE;
} else if (worker_id < 0) {
worker_id = sw_get_process_id();
worker_id = swoole_get_process_id();
}

zval *zworkers =
Expand All @@ -584,11 +584,11 @@ static PHP_METHOD(swoole_process_pool, getProcess) {
*worker = current_pool->workers[worker_id];

object_init_ex(zprocess, swoole_process_ce);
zend_update_property_long(swoole_process_ce, SW_Z8_OBJ_P(zprocess), ZEND_STRL("id"), sw_get_process_id());
zend_update_property_long(swoole_process_ce, SW_Z8_OBJ_P(zprocess), ZEND_STRL("id"), swoole_get_process_id());
zend_update_property_long(swoole_process_ce, SW_Z8_OBJ_P(zprocess), ZEND_STRL("pid"), worker->pid);
if (current_pool->ipc_mode == SW_IPC_UNIXSOCK) {
// current process
if (worker->id == sw_get_process_id()) {
if (worker->id == swoole_get_process_id()) {
worker->pipe_current = worker->pipe_worker;
} else {
worker->pipe_current = worker->pipe_master;
Expand Down
22 changes: 13 additions & 9 deletions ext-src/swoole_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ void php_swoole_server_rshutdown() {
Server *serv = sw_server();
serv->drain_worker_pipe();

if (serv->is_started() && !serv->is_user_worker()) {
if (serv->is_started() && !serv->is_user_worker() && !serv->is_worker_thread()) {
if (php_swoole_is_fatal_error()) {
swoole_error_log(SW_LOG_ERROR,
SW_ERROR_PHP_FATAL_ERROR,
Expand Down Expand Up @@ -1900,7 +1900,11 @@ static PHP_METHOD(swoole_server, __construct) {
size_t host_len = 0;
zend_long sock_type = SW_SOCK_TCP;
zend_long serv_port = 0;
#ifdef SW_THREAD
zend_long serv_mode = Server::MODE_THREAD;
#else
zend_long serv_mode = Server::MODE_BASE;
#endif

// only cli env
if (!SWOOLE_G(cli)) {
Expand All @@ -1917,13 +1921,13 @@ static PHP_METHOD(swoole_server, __construct) {
Z_PARAM_LONG(sock_type)
ZEND_PARSE_PARAMETERS_END_EX(RETURN_FALSE);

if (serv_mode != Server::MODE_BASE && serv_mode != Server::MODE_PROCESS) {
if (serv_mode != Server::MODE_BASE && serv_mode != Server::MODE_PROCESS && serv_mode != Server::MODE_THREAD) {
zend_throw_error(NULL, "invalid $mode parameters %d", (int) serv_mode);
RETURN_FALSE;
}

#ifdef SW_THREAD
if (sw_server() || sw_server()->is_worker_thread()) {
if (sw_server() && sw_server()->is_worker_thread()) {
server_ctor(ZEND_THIS, sw_server());
return;
}
Expand Down Expand Up @@ -3039,9 +3043,9 @@ static PHP_METHOD(swoole_server, taskwait) {
}

uint64_t notify;
EventData *task_result = &(serv->task_result[sw_get_process_id()]);
EventData *task_result = &(serv->task_result[swoole_get_process_id()]);
sw_memset_zero(task_result, sizeof(*task_result));
Pipe *pipe = serv->task_notify_pipes.at(sw_get_process_id()).get();
Pipe *pipe = serv->task_notify_pipes.at(swoole_get_process_id()).get();
network::Socket *task_notify_socket = pipe->get_socket(false);

// clear history task
Expand Down Expand Up @@ -3121,10 +3125,10 @@ static PHP_METHOD(swoole_server, taskWaitMulti) {
int list_of_id[SW_MAX_CONCURRENT_TASK] = {};

uint64_t notify;
EventData *task_result = &(serv->task_result[sw_get_process_id()]);
EventData *task_result = &(serv->task_result[swoole_get_process_id()]);
sw_memset_zero(task_result, sizeof(*task_result));
Pipe *pipe = serv->task_notify_pipes.at(sw_get_process_id()).get();
Worker *worker = serv->get_worker(sw_get_process_id());
Pipe *pipe = serv->task_notify_pipes.at(swoole_get_process_id()).get();
Worker *worker = serv->get_worker(swoole_get_process_id());

File fp = swoole::make_tmpfile();
if (!fp.ready()) {
Expand Down Expand Up @@ -3442,7 +3446,7 @@ static PHP_METHOD(swoole_server, sendMessage) {
Z_PARAM_LONG(worker_id)
ZEND_PARSE_PARAMETERS_END_EX(RETURN_FALSE);

if ((serv->is_worker() || serv->is_task_worker()) && worker_id == sw_get_process_id()) {
if ((serv->is_worker() || serv->is_task_worker()) && worker_id == swoole_get_process_id()) {
php_swoole_fatal_error(E_WARNING, "can't send messages to self");
RETURN_FALSE;
}
Expand Down
81 changes: 45 additions & 36 deletions include/swoole.h
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,10 @@ struct RecvData {
struct ThreadGlobal {
uint16_t id;
uint8_t type;
#ifdef SW_THREAD
uint8_t process_type;
uint32_t process_id;
#endif
String *buffer_stack;
Reactor *reactor;
Timer *timer;
Expand Down Expand Up @@ -795,20 +799,56 @@ static inline void swoole_set_last_error(int error) {
SwooleTG.error = error;
}

static inline int swoole_get_last_error() {
static inline int swoole_get_last_error(void) {
return SwooleTG.error;
}

static inline int swoole_get_thread_id() {
static inline int swoole_get_thread_id(void) {
return SwooleTG.id;
}

static inline int swoole_get_process_type() {
return SwooleG.process_type;
static inline int swoole_get_thread_type(void) {
return SwooleTG.type;
}

static inline int swoole_get_process_id() {
static inline void swoole_set_thread_id(uint16_t id) {
SwooleTG.id = id;
}

static inline void swoole_set_thread_type(uint8_t type) {
SwooleTG.type = type;
}

static inline swoole::WorkerId swoole_get_process_id(void) {
#ifdef SW_THREAD
return SwooleTG.process_id;
#else
return SwooleG.process_id;
#endif
}

static inline void swoole_set_process_id(swoole::WorkerId id) {
#ifdef SW_THREAD
SwooleTG.process_id = id;
#else
SwooleG.process_id = id;
#endif
}

static inline void swoole_set_process_type(int type) {
#ifdef SW_THREAD
SwooleTG.process_type = type;
#else
SwooleG.process_type = type;
#endif
}

static inline int swoole_get_process_type(void) {
#ifdef SW_THREAD
return SwooleTG.process_type;
#else
return SwooleG.process_type;
#endif
}

static inline uint32_t swoole_pagesize() {
Expand Down Expand Up @@ -866,34 +906,3 @@ static sw_inline const swoole::Allocator *sw_std_allocator() {
return &SwooleG.std_allocator;
}

static sw_inline swoole::WorkerId sw_get_process_id() {
#ifdef SW_THREAD
return SwooleTG.id;
#else
return SwooleG.process_id;
#endif
}

static sw_inline void sw_set_process_id(swoole::WorkerId id) {
#ifdef SW_THREAD
SwooleTG.id = id;
#else
SwooleG.process_id = id;
#endif
}

static sw_inline void sw_set_process_type(int type) {
#ifdef SW_THREAD
SwooleTG.type = type;
#else
SwooleG.process_type = type;
#endif
}

static sw_inline int sw_get_process_type() {
#ifdef SW_THREAD
return SwooleTG.type;
#else
return SwooleG.process_type;
#endif
}
56 changes: 31 additions & 25 deletions include/swoole_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ class Factory {

class BaseFactory : public Factory {
public:
BaseFactory(Server *server) : Factory(server) {}
BaseFactory(Server *server);
~BaseFactory();
bool start() override;
bool shutdown() override;
Expand All @@ -427,9 +427,6 @@ class BaseFactory : public Factory {
};

class ProcessFactory : public Factory {
private:
std::vector<std::shared_ptr<UnixSocket>> pipes;

public:
ProcessFactory(Server *server);
~ProcessFactory();
Expand All @@ -441,6 +438,14 @@ class ProcessFactory : public Factory {
bool end(SessionId sesion_id, int flags) override;
};

class ThreadFactory : public BaseFactory {
public:
ThreadFactory(Server *server);
~ThreadFactory();
bool start() override;
bool shutdown() override;
};

enum ServerEventType {
// recv data payload
SW_SERVER_EVENT_RECV_DATA,
Expand Down Expand Up @@ -493,6 +498,7 @@ class Server {
enum Mode {
MODE_BASE = 1,
MODE_PROCESS = 2,
MODE_THREAD = 3,
};

enum TaskIpcMode {
Expand Down Expand Up @@ -731,6 +737,7 @@ class Server {
Manager *manager = nullptr;

std::vector<ListenPort *> ports;
std::vector<std::shared_ptr<UnixSocket>> worker_pipes;

ListenPort *get_primary_port() {
return ports.front();
Expand Down Expand Up @@ -931,6 +938,14 @@ class Server {
void add_http_compression_type(const std::string &type);

int create();
Factory *create_base_factory();
Factory *create_thread_factory();
Factory *create_process_factory();
bool create_worker_pipes();
void destroy_base_factory();
void destroy_thread_factory();
void destroy_process_factory();

int start();
bool reload(bool reload_all_workers);
bool shutdown();
Expand Down Expand Up @@ -1012,6 +1027,10 @@ class Server {
return mode_ == MODE_BASE;
}

bool is_thread_mode() {
return mode_ == MODE_THREAD;
}

bool is_enable_coroutine() {
if (is_task_worker()) {
return task_enable_coroutine;
Expand Down Expand Up @@ -1095,40 +1114,31 @@ class Server {
}

bool is_master() {
return SwooleG.process_type == SW_PROCESS_MASTER;
return swoole_get_process_type() == SW_PROCESS_MASTER;
}

bool is_worker() {
return sw_get_process_type() == SW_PROCESS_EVENTWORKER;
return swoole_get_process_type() == SW_PROCESS_EVENTWORKER;
}

bool is_task_worker() {
return sw_get_process_type() == SW_PROCESS_TASKWORKER;
return swoole_get_process_type() == SW_PROCESS_TASKWORKER;
}

bool is_manager() {
return sw_get_process_type() == SW_PROCESS_MANAGER;
return swoole_get_process_type() == SW_PROCESS_MANAGER;
}

bool is_user_worker() {
return sw_get_process_type() == SW_PROCESS_USERWORKER;
return swoole_get_process_type() == SW_PROCESS_USERWORKER;
}

bool is_worker_thread() {
#ifdef SW_THREAD
return sw_get_process_type() == SW_PROCESS_EVENTWORKER || sw_get_process_type() == SW_PROCESS_TASKWORKER ||
sw_get_process_type() == SW_PROCESS_USERWORKER;
#else
return false;
#endif
return is_thread_mode() && is_reactor_thread();
}

bool is_reactor_thread() {
#ifdef SW_THREAD
return false;
#else
return sw_get_process_type() == SW_PROCESS_MASTER && SwooleTG.type == Server::THREAD_REACTOR;
#endif
return swoole_get_thread_type() == Server::THREAD_REACTOR;
}

bool isset_hook(enum HookType type) {
Expand Down Expand Up @@ -1414,14 +1424,10 @@ class Server {
int start_check();
void check_port_type(ListenPort *ls);
void destroy();
void destroy_reactor_threads();
void destroy_reactor_processes();
int create_reactor_processes();
int create_reactor_threads();
int start_reactor_threads();
int start_reactor_processes();
int start_worker_threads();
int start_master_thread();
int start_master_thread(Reactor *reactor);
int start_event_worker(Worker *worker);
void start_heartbeat_thread();
void join_reactor_thread();
Expand Down
Loading

0 comments on commit 053a7d4

Please sign in to comment.