Skip to content

Commit

Permalink
The thread mode server supports manager timer
Browse files Browse the repository at this point in the history
  • Loading branch information
matyhtf committed Oct 17, 2024
1 parent c71b7d4 commit 82382bc
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 10 deletions.
2 changes: 2 additions & 0 deletions include/swoole.h
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ struct DataHead;
typedef int (*ReactorHandler)(Reactor *reactor, Event *event);
typedef std::function<void(void *)> Callback;
typedef std::function<void(Timer *, TimerNode *)> TimerCallback;
typedef std::function<int(Timer *, long)> TimerScheduler;
} // namespace swoole

typedef swoole::Reactor swReactor;
Expand Down Expand Up @@ -700,6 +701,7 @@ struct ThreadGlobal {
String *buffer_stack;
Reactor *reactor;
Timer *timer;
TimerScheduler *timer_scheduler;
MessageBus *message_bus;
AsyncThreads *async_threads;
#ifdef SW_USE_IOURING
Expand Down
1 change: 1 addition & 0 deletions include/swoole_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,7 @@ class ThreadFactory : public BaseFactory {
std::mutex lock_;
std::condition_variable cv_;
std::queue<Worker *> queue_;
long cv_timeout_ms_;
bool reload_all_workers;
bool reloading;
Worker manager;
Expand Down
9 changes: 5 additions & 4 deletions include/swoole_timer.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,12 @@ class Timer {
/*---------------event timer--------------*/
struct timeval base_time;
/*----------------------------------------*/
int (*set)(Timer *timer, long exec_msec) = nullptr;
void (*close)(Timer *timer) = nullptr;
std::function<int(Timer *timer, long exec_msec)> set;
std::function<void(Timer *timer)> close;

bool init_reactor(Reactor *reactor);
bool init_system_timer();
bool init_with_reactor(Reactor *reactor);
bool init_with_user_scheduler(TimerScheduler *scheduler);
bool init_with_system_timer();

public:
long next_msec_;
Expand Down
18 changes: 14 additions & 4 deletions src/core/timer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,23 @@ bool Timer::init() {
return false;
}
if (SwooleTG.reactor) {
return init_reactor(SwooleTG.reactor);
return init_with_reactor(SwooleTG.reactor);
} else if (SwooleTG.timer_scheduler) {
return init_with_user_scheduler(SwooleTG.timer_scheduler);
} else {
return init_system_timer();
return init_with_system_timer();
}
}

bool Timer::init_reactor(Reactor *reactor) {
bool Timer::init_with_user_scheduler(TimerScheduler *scheduler) {
set = [scheduler](Timer *timer, long exec_msec) -> int {
return (*scheduler)(timer, exec_msec);
};
close = [scheduler](Timer *timer) { (*scheduler)(timer, -1); };
return true;
}

bool Timer::init_with_reactor(Reactor *reactor) {
reactor_ = reactor;
set = [](Timer *timer, long exec_msec) -> int {
timer->reactor_->timeout_msec = exec_msec;
Expand All @@ -89,7 +99,7 @@ bool Timer::init_reactor(Reactor *reactor) {
}

void Timer::reinit(Reactor *reactor) {
init_reactor(reactor);
init_with_reactor(reactor);
reactor->timeout_msec = next_msec_;
}

Expand Down
2 changes: 1 addition & 1 deletion src/os/timer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ namespace swoole {

static int SystemTimer_set(Timer *timer, long next_msec);

bool Timer::init_system_timer() {
bool Timer::init_with_system_timer() {
set = SystemTimer_set;
close = [](Timer *timer) { SystemTimer_set(timer, -1); };
swoole_signal_set(SIGALRM, [](int sig) { SwooleG.signal_alarm = true; });
Expand Down
19 changes: 18 additions & 1 deletion src/server/thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,13 @@ void ThreadFactory::spawn_manager_thread(WorkerId i) {
swoole_set_thread_id(i);
manager.id = i;
manager.type = SW_PROCESS_MANAGER;

TimerScheduler timer_scheduler = [this](Timer *timer, long exec_msec) -> int {
cv_timeout_ms_ = exec_msec;
return SW_OK;
};
SwooleTG.timer_scheduler = &timer_scheduler;

server_->worker_thread_start([=]() {
if (server_->onManagerStart) {
server_->onManagerStart(server_);
Expand All @@ -190,9 +197,12 @@ void ThreadFactory::spawn_manager_thread(WorkerId i) {
server_->onManagerStop(server_);
}
});

if (server_->running) {
swoole_warning("Fatal Error: manager thread exits abnormally");
}

SwooleTG.timer_scheduler = nullptr;
});
}

Expand Down Expand Up @@ -235,7 +245,14 @@ void ThreadFactory::wait() {
}
_lock.unlock();
} else {
cv_.wait(_lock);
if (cv_timeout_ms_ > 0) {
cv_.wait_for(_lock, std::chrono::milliseconds(cv_timeout_ms_));
} else {
cv_.wait(_lock);
}
}
if (SwooleTG.timer) {
swoole_timer_select();
}
if (server_->running && reloading) {
reload(reload_all_workers);
Expand Down
73 changes: 73 additions & 0 deletions tests/swoole_thread/server/manager_timer.phpt
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
--TEST--
swoole_thread/server: reload
--SKIPIF--
<?php
require __DIR__ . '/../../include/skipif.inc';
skip_if_nts();
?>
--FILE--
<?php
require __DIR__ . '/../../include/bootstrap.php';

use Swoole\Thread;
use Swoole\Timer;

$port = get_constant_port(__FILE__);

$serv = new Swoole\Http\Server('127.0.0.1', $port, SWOOLE_THREAD);
$serv->set(array(
'worker_num' => 2,
'log_level' => SWOOLE_LOG_ERROR,
'log_file' => '/dev/null',
'init_arguments' => function () {
global $queue, $atomic1, $atomic2;
$queue = new Swoole\Thread\Queue();
$atomic1 = new Swoole\Thread\Atomic(0);
$atomic2 = new Swoole\Thread\Atomic(0);
return [$queue, $atomic1, $atomic2];
}
));
$serv->on('WorkerStart', function (Swoole\Server $serv, $workerId) use ($port) {
[$queue, $atomic1, $atomic2] = Thread::getArguments();
$atomic1->add();
});
$serv->on('WorkerStop', function (Swoole\Server $serv, $workerId) {
[$queue, $atomic1, $atomic2] = Thread::getArguments();
$atomic2->add();
});
$serv->on('Request', function ($req, $resp) use ($serv) {
});
$serv->on('Task', function ($serv, $task_id, $worker_id, $data) {
});
$serv->on('managerStart', function ($serv) {
[$queue, $atomic1, $atomic2] = Thread::getArguments();
$queue->push("begin 1\n", Thread\Queue::NOTIFY_ALL);
$count = 0;
Timer::tick(100, function () use ($queue, &$count) {
$count++;
if ($count == 5) {
$queue->push("begin 2\n", Thread\Queue::NOTIFY_ALL);
}
});
});
$serv->on('shutdown', function () {
global $queue, $atomic1, $atomic2;
echo 'shutdown', PHP_EOL;
Assert::eq($atomic1->get(), 2);
Assert::eq($atomic2->get(), 2);
});
$serv->addProcess(new Swoole\Process(function ($process) use ($serv) {
[$queue, $atomic] = Thread::getArguments();
echo $queue->pop(-1);
echo $queue->pop(-1);
echo "done\n";
$serv->shutdown();
}));
$serv->start();
?>
--EXPECT--
begin 1
begin 2
done
shutdown

0 comments on commit 82382bc

Please sign in to comment.