Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add a db management client for later job management #305

Merged
merged 29 commits into from
Oct 24, 2023
Merged
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix pr comments
SonglinLyu committed Oct 18, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
commit 317d0862c06cfdebfde611ecf5f9ec5c1ae395a5
4 changes: 3 additions & 1 deletion src/protobuf/tugraph_db_management.proto
Original file line number Diff line number Diff line change
@@ -59,7 +59,7 @@ message CreateJobResponse {
}

message ReadJobRequest {

optional int32 job_id = 1;
}

message ReadJobResponse {
@@ -120,10 +120,12 @@ message JobManagementResponse {

message HeartbeatRequest {
required string request_msg = 1;
required int32 heartbeat_count = 2;
};

message HeartbeatResponse {
required string response_msg = 1;
required int32 heartbeat_count = 2;
};

service JobManagementService {
110 changes: 73 additions & 37 deletions src/server/db_management_client.cpp
Original file line number Diff line number Diff line change
@@ -17,33 +17,41 @@
namespace lgraph {

DBManagementClient::DBManagementClient()
: job_stub_(com::antgroup::tugraph::JobManagementService_Stub(&channel_)),
heartbeat_stub_(com::antgroup::tugraph::HeartbeatService_Stub(&channel_)) {
// Initialize brpc channel to db_management.
: job_stub_(db_management::JobManagementService_Stub(&channel_)),
heartbeat_stub_(db_management::HeartbeatService_Stub(&channel_)) {
}

void DBManagementClient::InitChannel(std::string server) {
// Initialize brpc channel to db_management.
brpc::ChannelOptions options;
options.protocol = "baidu_std";
options.connection_type = "";
options.timeout_ms = 1000/*milliseconds*/;
options.timeout_ms = 1000;
options.max_retry = 3;
if (this->channel_.Init("localhost:6091", "", &options) != 0) {
if (this->channel_.Init(server.c_str(), "", &options) != 0) {
DEBUG_LOG(ERROR) << "Fail to initialize channel";
throw;
std::runtime_error("failed to initialize channel.");
}
}

void DBManagementClient::SetHeartbeat(bool heartbeat) {
if (this->heartbeat_ == false && heartbeat == true) {
GENERAL_LOG(INFO) << "connected to db management";
} else if (this->heartbeat_ == true && heartbeat == false) {
GENERAL_LOG(INFO) << "lost connection to db management";
}
this->heartbeat_ = heartbeat;
}

bool DBManagementClient::GetHeartbeat() {
return this->heartbeat_;
}

com::antgroup::tugraph::HeartbeatService_Stub& DBManagementClient::GetHeartbeatStub() {
db_management::HeartbeatService_Stub& DBManagementClient::GetHeartbeatStub() {
return this->heartbeat_stub_;
}

com::antgroup::tugraph::JobManagementService_Stub& DBManagementClient::GetJobStub() {
db_management::JobManagementService_Stub& DBManagementClient::GetJobStub() {
return this->job_stub_;
}

@@ -53,11 +61,11 @@ DBManagementClient& DBManagementClient::GetInstance() {
}

void DBManagementClient::DetectHeartbeat() {
com::antgroup::tugraph::HeartbeatService_Stub& stub =
db_management::HeartbeatService_Stub& stub =
DBManagementClient::GetInstance().GetHeartbeatStub();
while (true) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

会死循环么?加个max retry?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

心跳线程初始化之后就detach出去了,所以我理解应该是一直循环的吧?

com::antgroup::tugraph::HeartbeatRequest request;
com::antgroup::tugraph::HeartbeatResponse response;
db_management::HeartbeatRequest request;
db_management::HeartbeatResponse response;
brpc::Controller cntl;
request.set_request_msg("this is a heartbeat request message.");
stub.detectHeartbeat(&cntl, &request, &response, NULL);
@@ -75,16 +83,16 @@ void DBManagementClient::DetectHeartbeat() {
int DBManagementClient::CreateJob(std::string host, std::string port, std::int64_t start_time,
std::string period, std::string name, std::string type,
std::string user, std::int64_t create_time) {
com::antgroup::tugraph::JobManagementService_Stub& stub =
db_management::JobManagementService_Stub& stub =
DBManagementClient::GetInstance().GetJobStub();
com::antgroup::tugraph::JobManagementRequest request;
com::antgroup::tugraph::JobManagementResponse response;
db_management::JobManagementRequest request;
db_management::JobManagementResponse response;
brpc::Controller cntl;

// build create_job_request
request.set_db_host(host);
request.set_db_port(port);
request.set_allocated_create_job_request(new com::antgroup::tugraph::CreateJobRequest());
request.set_allocated_create_job_request(new db_management::CreateJobRequest());
request.mutable_create_job_request()->set_start_time(start_time);
request.mutable_create_job_request()->set_period(period);
request.mutable_create_job_request()->set_procedure_name(name);
@@ -106,16 +114,16 @@ int DBManagementClient::CreateJob(std::string host, std::string port, std::int64
void DBManagementClient::UpdateJob(std::string host, std::string port, int job_id,
std::string status, std::int64_t runtime,
std::string result) {
com::antgroup::tugraph::JobManagementService_Stub& stub =
db_management::JobManagementService_Stub& stub =
DBManagementClient::GetInstance().GetJobStub();
com::antgroup::tugraph::JobManagementRequest request;
com::antgroup::tugraph::JobManagementResponse response;
db_management::JobManagementRequest request;
db_management::JobManagementResponse response;
brpc::Controller cntl;

// build create_job_request
request.set_db_host(host);
request.set_db_port(port);
request.set_allocated_update_job_request(new com::antgroup::tugraph::UpdateJobRequest());
request.set_allocated_update_job_request(new db_management::UpdateJobRequest());
request.mutable_update_job_request()->set_job_id(job_id);
request.mutable_update_job_request()->set_status(status);
request.mutable_update_job_request()->set_runtime(runtime);
@@ -130,23 +138,23 @@ void DBManagementClient::UpdateJob(std::string host, std::string port, int job_i
}
}

std::vector<com::antgroup::tugraph::Job> DBManagementClient::ReadJob(std::string host,
std::string port) {
com::antgroup::tugraph::JobManagementService_Stub& stub =
std::vector<db_management::Job> DBManagementClient::ReadJob(std::string host,
std::string port) {
db_management::JobManagementService_Stub& stub =
DBManagementClient::GetInstance().GetJobStub();
com::antgroup::tugraph::JobManagementRequest request;
com::antgroup::tugraph::JobManagementResponse response;
db_management::JobManagementRequest request;
db_management::JobManagementResponse response;
brpc::Controller cntl;

// build create_job_request
request.set_db_host(host);
request.set_db_port(port);
request.set_allocated_read_job_request(new com::antgroup::tugraph::ReadJobRequest());
request.set_allocated_read_job_request(new db_management::ReadJobRequest());

stub.handleRequest(&cntl, &request, &response, NULL);
if (!cntl.Failed()) {
DEBUG_LOG(INFO) << "[READ JOB REQUEST]: " << "success";
std::vector<com::antgroup::tugraph::Job> job_list;
std::vector<db_management::Job> job_list;
for (int i = 0; i < response.read_job_response().job_size(); i++) {
job_list.push_back(response.read_job_response().job(i));
}
@@ -157,50 +165,78 @@ std::vector<com::antgroup::tugraph::Job> DBManagementClient::ReadJob(std::string
}
}

com::antgroup::tugraph::JobResult DBManagementClient::ReadJobResult(std::string host,
db_management::Job DBManagementClient::ReadJob(std::string host,
std::string port,
int job_id) {
db_management::JobManagementService_Stub& stub =
DBManagementClient::GetInstance().GetJobStub();
db_management::JobManagementRequest request;
db_management::JobManagementResponse response;
brpc::Controller cntl;

// build create_job_request
request.set_db_host(host);
request.set_db_port(port);
request.set_allocated_read_job_request(new db_management::ReadJobRequest());
request.mutable_read_job_request()->set_job_id(job_id);

stub.handleRequest(&cntl, &request, &response, NULL);
if (!cntl.Failed()) {
DEBUG_LOG(INFO) << "[READ JOB REQUEST]: " << "success";
std::vector<db_management::Job> job_list;
db_management::Job job;
job = response.read_job_response().job(0);
return job;
} else {
DEBUG_LOG(ERROR) << "[READ JOB REQUEST]: " << cntl.ErrorText();
throw std::runtime_error("failed to connect to db management.");
}
}

db_management::JobResult DBManagementClient::ReadJobResult(std::string host,
std::string port, int job_id) {
com::antgroup::tugraph::JobManagementService_Stub& stub =
db_management::JobManagementService_Stub& stub =
DBManagementClient::GetInstance().GetJobStub();
com::antgroup::tugraph::JobManagementRequest request;
com::antgroup::tugraph::JobManagementResponse response;
db_management::JobManagementRequest request;
db_management::JobManagementResponse response;
brpc::Controller cntl;

// build create_job_request
request.set_db_host(host);
request.set_db_port(port);
request.set_allocated_read_job_result_request(
new com::antgroup::tugraph::ReadJobResultRequest());
new db_management::ReadJobResultRequest());
request.mutable_read_job_result_request()->set_job_id(job_id);

stub.handleRequest(&cntl, &request, &response, NULL);
if (!cntl.Failed()) {
DEBUG_LOG(INFO) << "[READ JOB RESULT REQUEST]: " << "success";
return response.read_job_result_response().job_result();
} else {
DEBUG_LOG(ERROR) << "[READ JOB REQUEST]: " << cntl.ErrorText();
DEBUG_LOG(ERROR) << "[READ JOB RESULT REQUEST]: " << cntl.ErrorText();
throw std::runtime_error("failed to connect to db management.");
}
}

void DBManagementClient::DeleteJob(std::string host, std::string port, int job_id) {
com::antgroup::tugraph::JobManagementService_Stub& stub =
db_management::JobManagementService_Stub& stub =
DBManagementClient::GetInstance().GetJobStub();
com::antgroup::tugraph::JobManagementRequest request;
com::antgroup::tugraph::JobManagementResponse response;
db_management::JobManagementRequest request;
db_management::JobManagementResponse response;
brpc::Controller cntl;

// build create_job_request
request.set_db_host(host);
request.set_db_port(port);
request.set_allocated_delete_job_request(new com::antgroup::tugraph::DeleteJobRequest());
request.set_allocated_delete_job_request(new db_management::DeleteJobRequest());
request.mutable_delete_job_request()->set_job_id(job_id);

stub.handleRequest(&cntl, &request, &response, NULL);
if (!cntl.Failed()) {
DEBUG_LOG(INFO) << "[DELETE JOB REQUEST]: " << "success";
return;
} else {
DEBUG_LOG(ERROR) << "[READ JOB REQUEST]: " << cntl.ErrorText();
DEBUG_LOG(ERROR) << "[DELETE JOB REQUEST]: " << cntl.ErrorText();
throw std::runtime_error("failed to connect to db management.");
}
}
17 changes: 11 additions & 6 deletions src/server/db_management_client.h
Original file line number Diff line number Diff line change
@@ -21,26 +21,29 @@
#include "brpc/channel.h"

namespace lgraph {
namespace db_management = com::antgroup::tugraph;
class DBManagementClient {
private:
bool heartbeat_ = false;
static const int detect_freq_ = 5;
brpc::Channel channel_;
com::antgroup::tugraph::JobManagementService_Stub job_stub_;
com::antgroup::tugraph::HeartbeatService_Stub heartbeat_stub_;
db_management::JobManagementService_Stub job_stub_;
db_management::HeartbeatService_Stub heartbeat_stub_;

public:
DBManagementClient();

static DBManagementClient& GetInstance();

void InitChannel(std::string sever);

void SetHeartbeat(bool heartbeat);

bool GetHeartbeat();

com::antgroup::tugraph::HeartbeatService_Stub& GetHeartbeatStub();
db_management::HeartbeatService_Stub& GetHeartbeatStub();

com::antgroup::tugraph::JobManagementService_Stub& GetJobStub();
db_management::JobManagementService_Stub& GetJobStub();

static void DetectHeartbeat();

@@ -50,9 +53,11 @@ class DBManagementClient {
void UpdateJob(std::string host, std::string port, int job_id, std::string status,
std::int64_t runtime, std::string result);

std::vector<com::antgroup::tugraph::Job> ReadJob(std::string host, std::string port);
std::vector<db_management::Job> ReadJob(std::string host, std::string port);

db_management::Job ReadJob(std::string host, std::string port, int job_id);

com::antgroup::tugraph::JobResult ReadJobResult(std::string host, std::string port, int job_id);
db_management::JobResult ReadJobResult(std::string host, std::string port, int job_id);

void DeleteJob(std::string host, std::string port, int job_id);
};
5 changes: 5 additions & 0 deletions src/server/lgraph_server.cpp
Original file line number Diff line number Diff line change
@@ -260,6 +260,11 @@ int LGraphServer::Start() {
GENERAL_LOG(INFO) << "Listening for RPC on port " << config_->rpc_port;
}
// start db management service
try {
DBManagementClient::GetInstance().InitChannel("localhost:6091");
} catch(std::exception& e) {
GENERAL_LOG(ERROR) << "Failed to init db management channel";
}
std::thread heartbeat_detect(&DBManagementClient::DetectHeartbeat);
heartbeat_detect.detach();
#endif
68 changes: 36 additions & 32 deletions test/test_db_management_client.cpp
Original file line number Diff line number Diff line change
@@ -47,6 +47,13 @@ TEST_F(TestDBManagementClient, DBManagementClient) {
std::int64_t runtime = 100;
std::string result = "this is only a test of result";

// init dbmanagement client
try {
DBManagementClient::GetInstance().InitChannel("localhost:6091");
} catch(std::exception& e) {
UT_EXPECT_EQ(1, 0);
}

// test exception handle
try {
DBManagementClient::GetInstance()
@@ -55,24 +62,28 @@ TEST_F(TestDBManagementClient, DBManagementClient) {
} catch(std::exception& e) {
UT_EXPECT_EQ(e.what(), exception_msg);
}

try {
DBManagementClient::GetInstance().UpdateJob(host, port, job_id, status, runtime, result);
UT_EXPECT_EQ(1, 0);
} catch(std::exception& e) {
UT_EXPECT_EQ(e.what(), exception_msg);
}

try {
DBManagementClient::GetInstance().ReadJob(host, port);
UT_EXPECT_EQ(1, 0);
} catch(std::exception& e) {
UT_EXPECT_EQ(e.what(), exception_msg);
}

try {
DBManagementClient::GetInstance().ReadJobResult(host, port, job_id);
UT_EXPECT_EQ(1, 0);
} catch(std::exception& e) {
UT_EXPECT_EQ(e.what(), exception_msg);
}

try {
SonglinLyu marked this conversation as resolved.
Show resolved Hide resolved
DBManagementClient::GetInstance().DeleteJob(host, port, job_id);
UT_EXPECT_EQ(1, 0);
@@ -85,49 +96,42 @@ TEST_F(TestDBManagementClient, DBManagementClient) {
rt = system(cmd.c_str());
UT_EXPECT_EQ(rt, 0);
// sleep to wait db management start
fma_common::SleepS(600);
fma_common::SleepS(300);

// test crud
try {
SonglinLyu marked this conversation as resolved.
Show resolved Hide resolved
// create a new job
job_id = DBManagementClient::GetInstance()
.CreateJob(host, port, start_time, period, name, type, user, create_time);
UT_EXPECT_EQ(1, job_id);
SonglinLyu marked this conversation as resolved.
Show resolved Hide resolved
} catch(std::exception& e) {
DEBUG_LOG(ERROR) << e.what();
UT_EXPECT_EQ(1, 0);
}
try {
// test jobid self increment
job_id = DBManagementClient::GetInstance()
.CreateJob(host, port, start_time, period, name, type, user, create_time);
UT_EXPECT_EQ(2, job_id);

// update this job
DBManagementClient::GetInstance().UpdateJob(host, port, job_id, status, runtime, result);
} catch(std::exception& e) {
DEBUG_LOG(ERROR) << e.what();
UT_EXPECT_EQ(1, 0);
}
try {
std::vector<com::antgroup::tugraph::Job> jobs =

// read all jobs status
std::vector<db_management::Job> jobs =
DBManagementClient::GetInstance().ReadJob(host, port);
UT_EXPECT_EQ(1, jobs.size());
} catch(std::exception& e) {
DEBUG_LOG(ERROR) << e.what();
UT_EXPECT_EQ(1, 0);
}
try {
com::antgroup::tugraph::JobResult job_result =
UT_EXPECT_EQ(2, jobs.size());
// read job by jobid
db_management::Job job =
DBManagementClient::GetInstance().ReadJob(host, port, job_id);
UT_EXPECT_EQ(2, job.job_id());

// read job result by id
db_management::JobResult job_result =
DBManagementClient::GetInstance().ReadJobResult(host, port, job_id);
UT_EXPECT_EQ(result, job_result.result());
} catch(std::exception& e) {
DEBUG_LOG(ERROR) << e.what();
UT_EXPECT_EQ(1, 0);
}
try {

// delete job
DBManagementClient::GetInstance().DeleteJob(host, port, job_id);
} catch(std::exception& e) {
DEBUG_LOG(ERROR) << e.what();
UT_EXPECT_EQ(1, 0);
}
try {
std::vector<com::antgroup::tugraph::Job> jobs =
DBManagementClient::GetInstance().ReadJob(host, port);
UT_EXPECT_EQ(0, jobs.size());

// test if deleted successfully
jobs = DBManagementClient::GetInstance().ReadJob(host, port);
UT_EXPECT_EQ(1, jobs.size());
} catch(std::exception& e) {
DEBUG_LOG(ERROR) << e.what();
UT_EXPECT_EQ(1, 0);