Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add a db management client for later job management #305

Merged
merged 29 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,6 @@
[submodule "deps/tugraph-db-client-java"]
path = deps/tugraph-db-client-java
url = https://github.com/TuGraph-family/tugraph-db-client-java.git
[submodule "deps/tugraph-db-management"]
path = deps/tugraph-db-management
url = https://github.com/TuGraph-family/tugraph-db-management.git
1 change: 1 addition & 0 deletions deps/tugraph-db-management
Submodule tugraph-db-management added at 58312a
7 changes: 5 additions & 2 deletions src/BuildLGraphServer.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ endif ()
include(cmake/GenerateProtobuf.cmake)
GenerateProtobufCpp(${CMAKE_CURRENT_LIST_DIR}/protobuf
PROTO_SRCS PROTO_HEADERS
${CMAKE_CURRENT_LIST_DIR}/protobuf/ha.proto)
${CMAKE_CURRENT_LIST_DIR}/protobuf/ha.proto
${CMAKE_CURRENT_LIST_DIR}/protobuf/tugraph_db_management.proto)

include_directories(${DEPS_INCLUDE_DIR})

Expand All @@ -44,14 +45,16 @@ add_library(${TARGET_SERVER_LIB} STATIC
server/lgraph_server.cpp
server/state_machine.cpp
server/ha_state_machine.cpp
server/db_management_client.cpp
import/import_online.cpp
import/import_v2.cpp
import/import_v3.cpp
restful/server/rest_server.cpp
restful/server/stdafx.cpp
http/http_server.cpp
http/import_manager.cpp
http/import_task.cpp)
http/import_task.cpp
${PROTO_SRCS})
qishipengqsp marked this conversation as resolved.
Show resolved Hide resolved

target_compile_options(${TARGET_SERVER_LIB} PUBLIC
-DGFLAGS_NS=${GFLAGS_NS}
Expand Down
135 changes: 135 additions & 0 deletions src/protobuf/tugraph_db_management.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

syntax="proto2";
package com.antgroup.tugraph;

option cc_generic_services = true;
option java_outer_classname = "TuGraphDBManagement";

enum ResponseCode {
SUCCESS = 0;
FAILED = 1;
}

message Job {
required string db_id = 1;
required int32 job_id = 2;
required int64 start_time = 4;
required string period = 5;
required string procedure_name = 6;
required string procedure_type = 7;
required string status = 8;
required int64 runtime = 3;
required string user = 9;
required int64 create_time = 10;
}

message JobResult {
required int32 job_id = 2;
required string result = 1;
}

message CreateJobRequest {
required int64 start_time = 3;
SonglinLyu marked this conversation as resolved.
Show resolved Hide resolved
required string period = 4;
required string procedure_name = 5;
required string procedure_type = 6;
required string user = 7;
required int64 create_time = 8;
}

message CreateJobResponse {
required int32 job_id = 2;
required ResponseCode response_code = 1;
}

message ReadJobRequest {

SonglinLyu marked this conversation as resolved.
Show resolved Hide resolved
}

message ReadJobResponse {
repeated Job job = 2;
required ResponseCode response_code = 1;
}

message ReadJobResultRequest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ReadJobResult 换个名字

required int32 job_id = 1;
}

message ReadJobResultResponse {
required JobResult job_result = 1;
required ResponseCode response_code = 2;
}

message UpdateJobRequest {
required int32 job_id = 1;
required string status = 2;
optional int64 runtime = 3;
optional string result = 4;
}

message UpdateJobResponse {
required ResponseCode response_code = 1;
}

message DeleteJobRequest {
required int32 job_id = 1;
}

message DeleteJobResponse {
required ResponseCode response_code = 1;
}

message JobManagementRequest {
required string db_host = 5;
required string db_port = 6;
oneof Req {
CreateJobRequest create_job_request = 1;
ReadJobRequest read_job_request = 2;
ReadJobResultRequest read_job_result_request = 7;
UpdateJobRequest update_job_request = 3;
DeleteJobRequest delete_job_request = 4;
};
};

message JobManagementResponse {
required ResponseCode response_code = 5;
oneof Resp {
CreateJobResponse create_job_response = 1;
ReadJobResponse read_job_response = 2;
ReadJobResultResponse read_job_result_response = 6;
UpdateJobResponse update_job_response = 3;
DeleteJobResponse delete_job_response = 4;
};
};

message HeartbeatRequest {
required string request_msg = 1;
};

message HeartbeatResponse {
required string response_msg = 1;
};

service JobManagementService {
rpc handleRequest(JobManagementRequest) returns (JobManagementResponse);
};

service HeartbeatService {
rpc detectHeartbeat(HeartbeatRequest) returns (HeartbeatResponse);
}
209 changes: 209 additions & 0 deletions src/server/db_management_client.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
/**
* Copyright 2022 AntGroup CO., Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/

#include "server/db_management_client.h"

namespace lgraph {

DBManagementClient::DBManagementClient()
: job_stub_(com::antgroup::tugraph::JobManagementService_Stub(&channel_)),
SonglinLyu marked this conversation as resolved.
Show resolved Hide resolved
heartbeat_stub_(com::antgroup::tugraph::HeartbeatService_Stub(&channel_)) {
// Initialize brpc channel to db_management.
brpc::ChannelOptions options;
options.protocol = "baidu_std";
options.connection_type = "";
options.timeout_ms = 1000/*milliseconds*/;
SonglinLyu marked this conversation as resolved.
Show resolved Hide resolved
options.max_retry = 3;
if (this->channel_.Init("localhost:6091", "", &options) != 0) {
SonglinLyu marked this conversation as resolved.
Show resolved Hide resolved
DEBUG_LOG(ERROR) << "Fail to initialize channel";
throw;
SonglinLyu marked this conversation as resolved.
Show resolved Hide resolved
}
}

void DBManagementClient::SetHeartbeat(bool heartbeat) {
this->heartbeat_ = heartbeat;
}

bool DBManagementClient::GetHeartbeat() {
return this->heartbeat_;
}

com::antgroup::tugraph::HeartbeatService_Stub& DBManagementClient::GetHeartbeatStub() {
return this->heartbeat_stub_;
}

com::antgroup::tugraph::JobManagementService_Stub& DBManagementClient::GetJobStub() {
return this->job_stub_;
}

DBManagementClient& DBManagementClient::GetInstance() {
static DBManagementClient instance;
return instance;
}

void DBManagementClient::DetectHeartbeat() {
com::antgroup::tugraph::HeartbeatService_Stub& stub =
DBManagementClient::GetInstance().GetHeartbeatStub();
while (true) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

会死循环么?加个max retry?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

心跳线程初始化之后就detach出去了,所以我理解应该是一直循环的吧?

com::antgroup::tugraph::HeartbeatRequest request;
com::antgroup::tugraph::HeartbeatResponse response;
brpc::Controller cntl;
request.set_request_msg("this is a heartbeat request message.");
stub.detectHeartbeat(&cntl, &request, &response, NULL);
if (!cntl.Failed()) {
DBManagementClient::GetInstance().SetHeartbeat(true);
} else {
DBManagementClient::GetInstance().SetHeartbeat(false);
DEBUG_LOG(ERROR) << cntl.ErrorText();
}

fma_common::SleepS(detect_freq_);
}
}

int DBManagementClient::CreateJob(std::string host, std::string port, std::int64_t start_time,
std::string period, std::string name, std::string type,
std::string user, std::int64_t create_time) {
com::antgroup::tugraph::JobManagementService_Stub& stub =
DBManagementClient::GetInstance().GetJobStub();
com::antgroup::tugraph::JobManagementRequest request;
com::antgroup::tugraph::JobManagementResponse response;
brpc::Controller cntl;

// build create_job_request
request.set_db_host(host);
request.set_db_port(port);
request.set_allocated_create_job_request(new com::antgroup::tugraph::CreateJobRequest());
request.mutable_create_job_request()->set_start_time(start_time);
request.mutable_create_job_request()->set_period(period);
request.mutable_create_job_request()->set_procedure_name(name);
request.mutable_create_job_request()->set_procedure_type(type);
request.mutable_create_job_request()->set_user(user);
request.mutable_create_job_request()->set_create_time(create_time);

stub.handleRequest(&cntl, &request, &response, NULL);
if (!cntl.Failed()) {
int job_id = response.create_job_response().job_id();
DEBUG_LOG(INFO) << "[CREATE JOB REQUEST]: " << "success, JobId is " << job_id;
return job_id;
} else {
DEBUG_LOG(ERROR) << "[CREATE JOB REQUEST]: " << cntl.ErrorText();
throw std::runtime_error("failed to connect to db management.");
}
}

void DBManagementClient::UpdateJob(std::string host, std::string port, int job_id,
std::string status, std::int64_t runtime,
std::string result) {
com::antgroup::tugraph::JobManagementService_Stub& stub =
DBManagementClient::GetInstance().GetJobStub();
com::antgroup::tugraph::JobManagementRequest request;
com::antgroup::tugraph::JobManagementResponse response;
brpc::Controller cntl;

// build create_job_request
request.set_db_host(host);
request.set_db_port(port);
request.set_allocated_update_job_request(new com::antgroup::tugraph::UpdateJobRequest());
request.mutable_update_job_request()->set_job_id(job_id);
request.mutable_update_job_request()->set_status(status);
request.mutable_update_job_request()->set_runtime(runtime);
request.mutable_update_job_request()->set_result(result);

stub.handleRequest(&cntl, &request, &response, NULL);
if (!cntl.Failed()) {
DEBUG_LOG(INFO) << "[UPDATE JOB REQUEST]: " << "success";
} else {
DEBUG_LOG(ERROR) << "[UPDATE JOB REQUEST]: " << cntl.ErrorText();
throw std::runtime_error("failed to connect to db management.");
}
}

std::vector<com::antgroup::tugraph::Job> DBManagementClient::ReadJob(std::string host,
std::string port) {
com::antgroup::tugraph::JobManagementService_Stub& stub =
DBManagementClient::GetInstance().GetJobStub();
com::antgroup::tugraph::JobManagementRequest request;
com::antgroup::tugraph::JobManagementResponse response;
brpc::Controller cntl;

// build create_job_request
request.set_db_host(host);
request.set_db_port(port);
request.set_allocated_read_job_request(new com::antgroup::tugraph::ReadJobRequest());

stub.handleRequest(&cntl, &request, &response, NULL);
if (!cntl.Failed()) {
DEBUG_LOG(INFO) << "[READ JOB REQUEST]: " << "success";
std::vector<com::antgroup::tugraph::Job> job_list;
for (int i = 0; i < response.read_job_response().job_size(); i++) {
job_list.push_back(response.read_job_response().job(i));
}
return job_list;
} else {
DEBUG_LOG(ERROR) << "[READ JOB REQUEST]: " << cntl.ErrorText();
throw std::runtime_error("failed to connect to db management.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

error最好都包一遍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

都包了

}
}

com::antgroup::tugraph::JobResult DBManagementClient::ReadJobResult(std::string host,
std::string port, int job_id) {
com::antgroup::tugraph::JobManagementService_Stub& stub =
DBManagementClient::GetInstance().GetJobStub();
com::antgroup::tugraph::JobManagementRequest request;
com::antgroup::tugraph::JobManagementResponse response;
brpc::Controller cntl;

// build create_job_request
request.set_db_host(host);
request.set_db_port(port);
request.set_allocated_read_job_result_request(
new com::antgroup::tugraph::ReadJobResultRequest());
request.mutable_read_job_result_request()->set_job_id(job_id);

stub.handleRequest(&cntl, &request, &response, NULL);
if (!cntl.Failed()) {
DEBUG_LOG(INFO) << "[READ JOB RESULT REQUEST]: " << "success";
return response.read_job_result_response().job_result();
} else {
DEBUG_LOG(ERROR) << "[READ JOB REQUEST]: " << cntl.ErrorText();
throw std::runtime_error("failed to connect to db management.");
}
}

void DBManagementClient::DeleteJob(std::string host, std::string port, int job_id) {
com::antgroup::tugraph::JobManagementService_Stub& stub =
DBManagementClient::GetInstance().GetJobStub();
com::antgroup::tugraph::JobManagementRequest request;
com::antgroup::tugraph::JobManagementResponse response;
brpc::Controller cntl;

// build create_job_request
request.set_db_host(host);
request.set_db_port(port);
request.set_allocated_delete_job_request(new com::antgroup::tugraph::DeleteJobRequest());
request.mutable_delete_job_request()->set_job_id(job_id);

stub.handleRequest(&cntl, &request, &response, NULL);
if (!cntl.Failed()) {
DEBUG_LOG(INFO) << "[DELETE JOB REQUEST]: " << "success";
return;
} else {
DEBUG_LOG(ERROR) << "[READ JOB REQUEST]: " << cntl.ErrorText();
throw std::runtime_error("failed to connect to db management.");
}
}
} // namespace lgraph


Loading
Loading