Skip to content

Commit

Permalink
Multi-threaded support (WorkerThreads support) (#4)
Browse files Browse the repository at this point in the history
* Replaced HolyThread with DispatchThread
* Dividing Ndb objects between worker threads
* FIX: Enabled sending back message to in WorkerThread
* Added ping command
* Add redis-server to dependencies
* Bumped number of connections
  • Loading branch information
olapiv authored Oct 24, 2024
1 parent 4093b87 commit 665b6fc
Show file tree
Hide file tree
Showing 14 changed files with 379 additions and 247 deletions.
2 changes: 1 addition & 1 deletion .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
}
}
},
"postCreateCommand": "sudo apt-get update && sudo apt-get install -y build-essential libprotobuf-dev protobuf-compiler",
"postCreateCommand": "sudo apt-get update && sudo apt-get install -y build-essential redis-server libprotobuf-dev protobuf-compiler",
"containerEnv": {
"RONDB_PATH": "/workspaces/pink/rondb-22.10.5-linux-glibc2.28-arm64_v8"
// Do this manually:
Expand Down
4 changes: 2 additions & 2 deletions pink/examples/binlog_parser_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ int main(int argc, char* argv[]) {
std::string one_command = "*3\r\n$3\r\nSET\r\n$1\r\na\r\n$2\r\nab\r\n";

std::string binlog_body;
slash::PutFixed16(&binlog_body, 1); // type
// slash::PutFixed16(&binlog_body, 1); // type
slash::PutFixed32(&binlog_body, 0); //exec_time
slash::PutFixed32(&binlog_body, 10); // server_id
slash::PutFixed64(&binlog_body, 0); // logic_id
Expand All @@ -42,7 +42,7 @@ int main(int argc, char* argv[]) {
binlog_body.append(one_command);

std::string header;
slash::PutFixed16(&header, 2);
// slash::PutFixed16(&header, 2);
slash::PutFixed32(&header, binlog_body.size());

std::string command = header + binlog_body;
Expand Down
148 changes: 84 additions & 64 deletions pink/examples/mydispatch_srv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,84 +16,104 @@

using namespace pink;

class MyConn: public PbConn {
public:
MyConn(int fd, const std::string& ip_port, Thread *thread,
void* worker_specific_data);
virtual ~MyConn();
protected:
virtual int DealMessage();

private:
myproto::Ping ping_;
myproto::PingRes ping_res_;
class MyConn : public PbConn
{
public:
MyConn(
int fd,
const std::string &ip_port,
Thread *thread,
void *worker_specific_data);
virtual ~MyConn();

protected:
virtual int DealMessage();

private:
myproto::Ping ping_;
myproto::PingRes ping_res_;
};

MyConn::MyConn(int fd, const std::string& ip_port, Thread *thread,
void* worker_specific_data)
: PbConn(fd, ip_port, thread) {
// Handle worker_specific_data ...
MyConn::MyConn(
int fd,
const std::string &ip_port,
Thread *thread,
void *worker_specific_data)
: PbConn(fd, ip_port, thread)
{
// Handle worker_specific_data ...
}

MyConn::~MyConn() {
MyConn::~MyConn()
{
}

int MyConn::DealMessage() {
printf("In the myconn DealMessage branch\n");
ping_.ParseFromArray(rbuf_ + cur_pos_ - header_len_, header_len_);
ping_res_.Clear();
ping_res_.set_res(11234);
ping_res_.set_mess("heiheidfdfdf");
printf ("DealMessage receive (%s)\n", ping_res_.mess().c_str());
std::string res;
ping_res_.SerializeToString(&res);
WriteResp(res);
return 0;
int MyConn::DealMessage()
{
printf("In the myconn DealMessage branch\n");
ping_.ParseFromArray(rbuf_ + cur_pos_ - header_len_, header_len_);
ping_res_.Clear();
ping_res_.set_res(11234);
ping_res_.set_mess("heiheidfdfdf");
printf("DealMessage receive (%s)\n", ping_res_.mess().c_str());
std::string res;
ping_res_.SerializeToString(&res);
WriteResp(res);
return 0;
}

class MyConnFactory : public ConnFactory {
public:
virtual std::shared_ptr<PinkConn> NewPinkConn(int connfd, const std::string &ip_port,
Thread *thread,
void* worker_specific_data,
PinkEpoll* pink_epoll) const {
return std::make_shared<MyConn>(connfd, ip_port, thread, worker_specific_data);
}
class MyConnFactory : public ConnFactory
{
public:
virtual std::shared_ptr<PinkConn> NewPinkConn(
int connfd,
const std::string &ip_port,
Thread *thread,
void *worker_specific_data,
PinkEpoll *pink_epoll) const
{
return std::make_shared<MyConn>(connfd, ip_port, thread, worker_specific_data);
}
};

static std::atomic<bool> running(false);

static void IntSigHandle(const int sig) {
printf("Catch Signal %d, cleanup...\n", sig);
running.store(false);
printf("server Exit");
static void IntSigHandle(const int sig)
{
printf("Catch Signal %d, cleanup...\n", sig);
running.store(false);
printf("server Exit");
}

static void SignalSetup() {
signal(SIGHUP, SIG_IGN);
signal(SIGPIPE, SIG_IGN);
signal(SIGINT, &IntSigHandle);
signal(SIGQUIT, &IntSigHandle);
signal(SIGTERM, &IntSigHandle);
static void SignalSetup()
{
signal(SIGHUP, SIG_IGN);
signal(SIGPIPE, SIG_IGN);
signal(SIGINT, &IntSigHandle);
signal(SIGQUIT, &IntSigHandle);
signal(SIGTERM, &IntSigHandle);
}

int main() {
SignalSetup();
ConnFactory *my_conn_factory = new MyConnFactory();
ServerThread *st = NewDispatchThread(9211, 10, my_conn_factory, 1000);

if (st->StartThread() != 0) {
printf("StartThread error happened!\n");
exit(-1);
}
running.store(true);
while (running.load()) {
sleep(1);
}
st->StopThread();

delete st;
delete my_conn_factory;

return 0;
int main()
{
SignalSetup();
ConnFactory *my_conn_factory = new MyConnFactory();
ServerThread *st = NewDispatchThread(9211, 10, my_conn_factory, 1000);

if (st->StartThread() != 0)
{
printf("StartThread error happened!\n");
exit(-1);
}
running.store(true);
while (running.load())
{
sleep(1);
}
st->StopThread();

delete st;
delete my_conn_factory;

return 0;
}
Loading

0 comments on commit 665b6fc

Please sign in to comment.