diff --git a/.gitmodules b/.gitmodules index ec7af3f031..178f3e8111 100644 --- a/.gitmodules +++ b/.gitmodules @@ -4,3 +4,6 @@ [submodule "deps/tugraph-db-client-java"] path = deps/tugraph-db-client-java url = https://github.com/TuGraph-family/tugraph-db-client-java.git +[submodule "deps/tugraph-db-management"] + path = deps/tugraph-db-management + url = https://github.com/TuGraph-family/tugraph-db-management.git diff --git a/deps/tugraph-db-management b/deps/tugraph-db-management new file mode 160000 index 0000000000..6dcacc0535 --- /dev/null +++ b/deps/tugraph-db-management @@ -0,0 +1 @@ +Subproject commit 6dcacc05352b51fd173fffbc6c95329744b97ab1 diff --git a/src/BuildLGraphServer.cmake b/src/BuildLGraphServer.cmake index 0e7d9f30b4..3e03114a73 100644 --- a/src/BuildLGraphServer.cmake +++ b/src/BuildLGraphServer.cmake @@ -25,7 +25,8 @@ endif () include(cmake/GenerateProtobuf.cmake) GenerateProtobufCpp(${CMAKE_CURRENT_LIST_DIR}/protobuf PROTO_SRCS PROTO_HEADERS - ${CMAKE_CURRENT_LIST_DIR}/protobuf/ha.proto) + ${CMAKE_CURRENT_LIST_DIR}/protobuf/ha.proto + ${CMAKE_CURRENT_LIST_DIR}/protobuf/tugraph_db_management.proto) include_directories(${DEPS_INCLUDE_DIR}) @@ -44,6 +45,7 @@ add_library(${TARGET_SERVER_LIB} STATIC server/lgraph_server.cpp server/state_machine.cpp server/ha_state_machine.cpp + server/db_management_client.cpp import/import_online.cpp import/import_v2.cpp import/import_v3.cpp @@ -51,7 +53,8 @@ add_library(${TARGET_SERVER_LIB} STATIC restful/server/stdafx.cpp http/http_server.cpp http/import_manager.cpp - http/import_task.cpp) + http/import_task.cpp + ${PROTO_SRCS}) target_compile_options(${TARGET_SERVER_LIB} PUBLIC -DGFLAGS_NS=${GFLAGS_NS} diff --git a/src/protobuf/tugraph_db_management.proto b/src/protobuf/tugraph_db_management.proto new file mode 100644 index 0000000000..0b4e5accde --- /dev/null +++ b/src/protobuf/tugraph_db_management.proto @@ -0,0 +1,137 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +syntax="proto2"; +package com.antgroup.tugraph; + +option cc_generic_services = true; +option java_outer_classname = "TuGraphDBManagement"; + +enum ResponseCode { + SUCCESS = 0; + FAILED = 1; +} + +message Job { + required string db_id = 1; + required int32 job_id = 2; + required int64 start_time = 3; + required string period = 4; + required string procedure_name = 5; + required string procedure_type = 6; + required string status = 7; + required int64 runtime = 8; + required string user = 9; + required int64 create_time = 10; +} + +message JobResult { + required int32 job_id = 1; + required string result = 2; +} + +message CreateJobRequest { + required int64 start_time = 1; + required string period = 2; + required string procedure_name = 3; + required string procedure_type = 4; + required string user = 5; + required int64 create_time = 6; +} + +message CreateJobResponse { + required int32 job_id = 1; + required ResponseCode response_code = 2; +} + +message ReadJobRequest { + optional int32 job_id = 1; +} + +message ReadJobResponse { + repeated Job job = 2; + required ResponseCode response_code = 1; +} + +message ReadJobResultRequest { + required int32 job_id = 1; +} + +message ReadJobResultResponse { + required JobResult job_result = 1; + required ResponseCode response_code = 2; +} + +message UpdateJobRequest { + required int32 job_id = 1; + required string status = 2; + optional int64 runtime = 3; + optional string result = 4; +} + +message UpdateJobResponse { + required ResponseCode response_code = 1; +} + +message DeleteJobRequest { + required int32 job_id = 1; +} + +message DeleteJobResponse { + required ResponseCode response_code = 1; +} + +message JobManagementRequest { + required string db_host = 1; + required string db_port = 2; + oneof Req { + CreateJobRequest create_job_request = 3; + ReadJobRequest read_job_request = 4; + ReadJobResultRequest read_job_result_request = 5; + UpdateJobRequest update_job_request = 6; + DeleteJobRequest delete_job_request = 7; + }; +}; + +message JobManagementResponse { + required ResponseCode response_code = 1; + oneof Resp { + CreateJobResponse create_job_response = 2; + ReadJobResponse read_job_response = 3; + ReadJobResultResponse read_job_result_response = 4; + UpdateJobResponse update_job_response = 5; + DeleteJobResponse delete_job_response = 6; + }; +}; + +message HeartbeatRequest { + required string request_msg = 1; + required int32 heartbeat_count = 2; +}; + +message HeartbeatResponse { + required string response_msg = 1; + required int32 heartbeat_count = 2; +}; + +service JobManagementService { + rpc handleRequest(JobManagementRequest) returns (JobManagementResponse); +}; + +service HeartbeatService { + rpc detectHeartbeat(HeartbeatRequest) returns (HeartbeatResponse); +} diff --git a/src/server/db_management_client.cpp b/src/server/db_management_client.cpp new file mode 100644 index 0000000000..248f92bc46 --- /dev/null +++ b/src/server/db_management_client.cpp @@ -0,0 +1,261 @@ +/** + * Copyright 2022 AntGroup CO., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ + +#include "server/db_management_client.h" + +namespace lgraph { + +DBManagementClient::DBManagementClient() + : job_stub_(db_management::JobManagementService_Stub(&channel_)), + heartbeat_stub_(db_management::HeartbeatService_Stub(&channel_)) { +} + +void DBManagementClient::InitChannel(std::string server) { + // Initialize brpc channel to db_management. + brpc::ChannelOptions options; + options.protocol = "baidu_std"; + options.connection_type = ""; + options.timeout_ms = 1000; + options.max_retry = 3; + if (this->channel_.Init(server.c_str(), "", &options) != 0) { + DEBUG_LOG(ERROR) << "Fail to initialize channel"; + std::runtime_error("failed to initialize channel."); + } +} + +void DBManagementClient::SetHeartbeat(bool heartbeat) { + if (this->heartbeat_ == false && heartbeat == true) { + GENERAL_LOG(INFO) << "connected to db management"; + } else if (this->heartbeat_ == true && heartbeat == false) { + GENERAL_LOG(INFO) << "lost connection to db management"; + } + this->heartbeat_ = heartbeat; +} + +bool DBManagementClient::GetHeartbeat() { + return this->heartbeat_; +} + +void DBManagementClient::SetHeartbeatCount(int heartbeat_count) { + this->heartbeat_count_ = heartbeat_count; +} + +int DBManagementClient::GetHeartbeatCount() { + return this->heartbeat_count_; +} + +db_management::HeartbeatService_Stub& DBManagementClient::GetHeartbeatStub() { + return this->heartbeat_stub_; +} + +db_management::JobManagementService_Stub& DBManagementClient::GetJobStub() { + return this->job_stub_; +} + +DBManagementClient& DBManagementClient::GetInstance() { + static DBManagementClient instance; + return instance; +} + +void DBManagementClient::DetectHeartbeat() { + db_management::HeartbeatService_Stub& stub = + DBManagementClient::GetInstance().GetHeartbeatStub(); + while (true) { + db_management::HeartbeatRequest request; + db_management::HeartbeatResponse response; + brpc::Controller cntl; + int heartbeat_count = DBManagementClient::GetInstance().GetHeartbeatCount(); + request.set_request_msg("this is a heartbeat request message."); + request.set_heartbeat_count(heartbeat_count); + stub.detectHeartbeat(&cntl, &request, &response, NULL); + if (!cntl.Failed()) { + if (response.heartbeat_count() == heartbeat_count + 1) { + DBManagementClient::GetInstance().SetHeartbeat(true); + DBManagementClient::GetInstance().SetHeartbeatCount(heartbeat_count + 1); + } else { + DBManagementClient::GetInstance().SetHeartbeat(false); + DBManagementClient::GetInstance().SetHeartbeatCount(0); + } + } else { + DBManagementClient::GetInstance().SetHeartbeat(false); + DBManagementClient::GetInstance().SetHeartbeatCount(0); + } + + fma_common::SleepS(detect_freq_); + } +} + +int DBManagementClient::CreateJob(std::string host, std::string port, std::int64_t start_time, + std::string period, std::string name, std::string type, + std::string user, std::int64_t create_time) { + db_management::JobManagementService_Stub& stub = + DBManagementClient::GetInstance().GetJobStub(); + db_management::JobManagementRequest request; + db_management::JobManagementResponse response; + brpc::Controller cntl; + + // build create_job_request + request.set_db_host(host); + request.set_db_port(port); + request.set_allocated_create_job_request(new db_management::CreateJobRequest()); + request.mutable_create_job_request()->set_start_time(start_time); + request.mutable_create_job_request()->set_period(period); + request.mutable_create_job_request()->set_procedure_name(name); + request.mutable_create_job_request()->set_procedure_type(type); + request.mutable_create_job_request()->set_user(user); + request.mutable_create_job_request()->set_create_time(create_time); + + stub.handleRequest(&cntl, &request, &response, NULL); + if (!cntl.Failed()) { + int job_id = response.create_job_response().job_id(); + DEBUG_LOG(INFO) << "[CREATE JOB REQUEST]: " << "success, JobId is " << job_id; + return job_id; + } else { + DEBUG_LOG(ERROR) << "[CREATE JOB REQUEST]: " << cntl.ErrorText(); + throw std::runtime_error("failed to connect to db management."); + } +} + +void DBManagementClient::UpdateJob(std::string host, std::string port, int job_id, + std::string status, std::int64_t runtime, + std::string result) { + db_management::JobManagementService_Stub& stub = + DBManagementClient::GetInstance().GetJobStub(); + db_management::JobManagementRequest request; + db_management::JobManagementResponse response; + brpc::Controller cntl; + + // build create_job_request + request.set_db_host(host); + request.set_db_port(port); + request.set_allocated_update_job_request(new db_management::UpdateJobRequest()); + request.mutable_update_job_request()->set_job_id(job_id); + request.mutable_update_job_request()->set_status(status); + request.mutable_update_job_request()->set_runtime(runtime); + request.mutable_update_job_request()->set_result(result); + + stub.handleRequest(&cntl, &request, &response, NULL); + if (!cntl.Failed()) { + DEBUG_LOG(INFO) << "[UPDATE JOB REQUEST]: " << "success"; + } else { + DEBUG_LOG(ERROR) << "[UPDATE JOB REQUEST]: " << cntl.ErrorText(); + throw std::runtime_error("failed to connect to db management."); + } +} + +std::vector DBManagementClient::ReadJob(std::string host, + std::string port) { + db_management::JobManagementService_Stub& stub = + DBManagementClient::GetInstance().GetJobStub(); + db_management::JobManagementRequest request; + db_management::JobManagementResponse response; + brpc::Controller cntl; + + // build create_job_request + request.set_db_host(host); + request.set_db_port(port); + request.set_allocated_read_job_request(new db_management::ReadJobRequest()); + + stub.handleRequest(&cntl, &request, &response, NULL); + if (!cntl.Failed()) { + DEBUG_LOG(INFO) << "[READ JOB REQUEST]: " << "success"; + std::vector job_list; + for (int i = 0; i < response.read_job_response().job_size(); i++) { + job_list.push_back(response.read_job_response().job(i)); + } + return job_list; + } else { + DEBUG_LOG(ERROR) << "[READ JOB REQUEST]: " << cntl.ErrorText(); + throw std::runtime_error("failed to connect to db management."); + } +} + +db_management::Job DBManagementClient::ReadJob(std::string host, + std::string port, + int job_id) { + db_management::JobManagementService_Stub& stub = + DBManagementClient::GetInstance().GetJobStub(); + db_management::JobManagementRequest request; + db_management::JobManagementResponse response; + brpc::Controller cntl; + + // build create_job_request + request.set_db_host(host); + request.set_db_port(port); + request.set_allocated_read_job_request(new db_management::ReadJobRequest()); + request.mutable_read_job_request()->set_job_id(job_id); + + stub.handleRequest(&cntl, &request, &response, NULL); + if (!cntl.Failed()) { + DEBUG_LOG(INFO) << "[READ JOB REQUEST]: " << "success"; + std::vector job_list; + db_management::Job job; + job = response.read_job_response().job(0); + return job; + } else { + DEBUG_LOG(ERROR) << "[READ JOB REQUEST]: " << cntl.ErrorText(); + throw std::runtime_error("failed to connect to db management."); + } +} + +db_management::JobResult DBManagementClient::ReadJobResult(std::string host, + std::string port, int job_id) { + db_management::JobManagementService_Stub& stub = + DBManagementClient::GetInstance().GetJobStub(); + db_management::JobManagementRequest request; + db_management::JobManagementResponse response; + brpc::Controller cntl; + + // build create_job_request + request.set_db_host(host); + request.set_db_port(port); + request.set_allocated_read_job_result_request( + new db_management::ReadJobResultRequest()); + request.mutable_read_job_result_request()->set_job_id(job_id); + + stub.handleRequest(&cntl, &request, &response, NULL); + if (!cntl.Failed()) { + DEBUG_LOG(INFO) << "[READ JOB RESULT REQUEST]: " << "success"; + return response.read_job_result_response().job_result(); + } else { + DEBUG_LOG(ERROR) << "[READ JOB RESULT REQUEST]: " << cntl.ErrorText(); + throw std::runtime_error("failed to connect to db management."); + } +} + +void DBManagementClient::DeleteJob(std::string host, std::string port, int job_id) { + db_management::JobManagementService_Stub& stub = + DBManagementClient::GetInstance().GetJobStub(); + db_management::JobManagementRequest request; + db_management::JobManagementResponse response; + brpc::Controller cntl; + + // build create_job_request + request.set_db_host(host); + request.set_db_port(port); + request.set_allocated_delete_job_request(new db_management::DeleteJobRequest()); + request.mutable_delete_job_request()->set_job_id(job_id); + + stub.handleRequest(&cntl, &request, &response, NULL); + if (!cntl.Failed()) { + DEBUG_LOG(INFO) << "[DELETE JOB REQUEST]: " << "success"; + return; + } else { + DEBUG_LOG(ERROR) << "[DELETE JOB REQUEST]: " << cntl.ErrorText(); + throw std::runtime_error("failed to connect to db management."); + } +} +} // namespace lgraph + + diff --git a/src/server/db_management_client.h b/src/server/db_management_client.h new file mode 100644 index 0000000000..2170e3d2be --- /dev/null +++ b/src/server/db_management_client.h @@ -0,0 +1,187 @@ +/** + * Copyright 2022 AntGroup CO., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ + +#pragma once + +#include "tools/lgraph_log.h" +#include "fma-common/utils.h" +#include "protobuf/tugraph_db_management.pb.h" +#include "gflags/gflags.h" +#include "brpc/channel.h" + +namespace lgraph { +namespace db_management = com::antgroup::tugraph; +class DBManagementClient { + // TODO(qsp): get host and port from license + // TODO(qsp): change name of ReadJobResult + // TODO(qsp): kill all process after ut + private: + bool heartbeat_ = false; + int heartbeat_count_ = 0; + static const int detect_freq_ = 5; + brpc::Channel channel_; + db_management::JobManagementService_Stub job_stub_; + db_management::HeartbeatService_Stub heartbeat_stub_; + + public: + DBManagementClient(); + + /** + * @brief Get an instance of DBManagementClient. + */ + static DBManagementClient& GetInstance(); + + /** + * @brief Init brpc channel. + * + * @param server The server address of db management, like "localhost:6091". + */ + void InitChannel(std::string sever); + + /** + * @brief set heartbeat of db management. + * + * @param heartbeat true if connected, false if not connected. + */ + void SetHeartbeat(bool heartbeat); + + /** + * @brief Get heartbeat of db management. + * + * @returns true if connected, false if not. + */ + bool GetHeartbeat(); + + /** + * @brief Set heartbeat count of db management. + * + * @param heartbeat_count count of heartbeat. + */ + void SetHeartbeatCount(int heartbeat_count); + + /** + * @brief Get heartbeat count of db management. + * + * @returns heartbeat count. + */ + int GetHeartbeatCount(); + + /** + * @brief Get brpc stub for heart detection. + c + */ + db_management::HeartbeatService_Stub& GetHeartbeatStub(); + + /** + * @brief Get brpc stub for job management. + * + * @returns job management stub. + */ + db_management::JobManagementService_Stub& GetJobStub(); + + /** + * @brief Heartbeat detection function. Runs in a detached thread. + * + */ + static void DetectHeartbeat(); + + /** + * @brief create a job record in db management. + * + * @param host host address of current db. + * + * @param port port of current db. + * + * @param start_time start time of a job, echo time in ms, int64. + * + * @param period job period type, PERIODIC, IMMEDIATE, DELAYED. + * + * @param name name of this job. + * + * @param type type of this job. + * + * @param user user of db who create this job. + * + * @param create_time create time of a job, echo time in ms, int64. + * + * @returns unique job_id for created job record. + */ + int CreateJob(std::string host, std::string port, std::int64_t start_time, std::string period, + std::string name, std::string type, std::string user, std::int64_t create_time); + + /** + * @brief update a job record in db management with job_id. + * + * @param host host address of current db. + * + * @param port port of current db. + * + * @param status job status, pending, success, failed. + * + * @param runtime total runtime of a job, echo time in ms, int64. + * + * @param result result of job in string. + */ + void UpdateJob(std::string host, std::string port, int job_id, std::string status, + std::int64_t runtime, std::string result); + + /** + * @brief read all job status in db management. + * + * @param host host address of current db. + * + * @param port port of current db. + * + * @returns a list for all job status in db management. + */ + std::vector ReadJob(std::string host, std::string port); + + /** + * @brief read job status with given job_id in db management. + * + * @param host host address of current db. + * + * @param port port of current db. + * + * @param job_id job_id of the job you want to read. + * + * @returns job. + */ + db_management::Job ReadJob(std::string host, std::string port, int job_id); + + /** + * @brief read job result with given job_id in db management. + * + * @param host host address of current db. + * + * @param port port of current db. + * + * @param job_id job_id of the job you want to read. + * + * @returns job result. + */ + db_management::JobResult ReadJobResult(std::string host, std::string port, int job_id); + + /** + * @brief delete a job record with given job_id in db management. + * + * @param host host address of current db. + * + * @param port port of current db. + * + * @param job_id job_id of the job you want to read. + */ + void DeleteJob(std::string host, std::string port, int job_id); +}; +} // namespace lgraph diff --git a/src/server/lgraph_server.cpp b/src/server/lgraph_server.cpp index af1028b2c7..dd7d9fbba8 100644 --- a/src/server/lgraph_server.cpp +++ b/src/server/lgraph_server.cpp @@ -32,6 +32,7 @@ #include "restful/server/rest_server.h" #include "server/state_machine.h" #include "server/ha_state_machine.h" +#include "server/db_management_client.h" #ifndef _WIN32 #include "brpc/server.h" @@ -258,6 +259,14 @@ int LGraphServer::Start() { } GENERAL_LOG(INFO) << "Listening for RPC on port " << config_->rpc_port; } + // start db management service + try { + DBManagementClient::GetInstance().InitChannel("localhost:6091"); + } catch(std::exception& e) { + GENERAL_LOG(ERROR) << "Failed to init db management channel"; + } + std::thread heartbeat_detect(&DBManagementClient::DetectHeartbeat); + heartbeat_detect.detach(); #endif state_machine_->Start(); if (config_->unlimited_token == 1) { diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index d8ad866541..f8e50af700 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -33,6 +33,7 @@ add_executable(unit_test test_cypher.cpp test_cypher_plan.cpp test_data_type.cpp + test_db_management_client.cpp test_dense_string.cpp test_edge_index.cpp test_embed.cpp diff --git a/test/test_db_management_client.cpp b/test/test_db_management_client.cpp new file mode 100644 index 0000000000..0fde10fd4c --- /dev/null +++ b/test/test_db_management_client.cpp @@ -0,0 +1,144 @@ +/** + * Copyright 2022 AntGroup CO., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ + +#include +#include +#include + +#include "./ut_utils.h" +#include "./test_tools.h" +#include "fma-common/utils.h" +#include "gtest/gtest.h" +#include "server/db_management_client.h" + +class TestDBManagementClient : public TuGraphTest {}; + +TEST_F(TestDBManagementClient, DBManagementClient) { + using namespace lgraph; + + // set up cmd + std::string db_management_folder = "../../deps/tugraph-db-management/"; + std::string cmd; + int rt; + + // set up test veriables + std::string exception_msg = "failed to connect to db management."; + std::string host = "127.0.0.1"; + std::string port = "8888"; + std::int64_t start_time = 169413845845; + std::string period = "IMMEDIATE"; + std::string name = "Khop_test"; + std::string type = "Python"; + std::string user = "user"; + std::int64_t create_time = 1694138458457; + int job_id = 1; + std::string status = "SUCCESS"; + std::int64_t runtime = 100; + std::string result = "this is only a test of result"; + + // init dbmanagement client + try { + DBManagementClient::GetInstance().InitChannel("localhost:5091"); + } catch(std::exception& e) { + UT_EXPECT_EQ(1, 0); + } + + // test exception handle + try { + DBManagementClient::GetInstance() + .CreateJob(host, port, start_time, period, name, type, user, create_time); + UT_EXPECT_EQ(1, 0); + } catch(std::exception& e) { + UT_EXPECT_EQ(e.what(), exception_msg); + } + + try { + DBManagementClient::GetInstance().UpdateJob(host, port, job_id, status, runtime, result); + UT_EXPECT_EQ(1, 0); + } catch(std::exception& e) { + UT_EXPECT_EQ(e.what(), exception_msg); + } + + try { + DBManagementClient::GetInstance().ReadJob(host, port); + UT_EXPECT_EQ(1, 0); + } catch(std::exception& e) { + UT_EXPECT_EQ(e.what(), exception_msg); + } + + try { + DBManagementClient::GetInstance().ReadJobResult(host, port, job_id); + UT_EXPECT_EQ(1, 0); + } catch(std::exception& e) { + UT_EXPECT_EQ(e.what(), exception_msg); + } + + try { + DBManagementClient::GetInstance().DeleteJob(host, port, job_id); + UT_EXPECT_EQ(1, 0); + } catch(std::exception& e) { + UT_EXPECT_EQ(e.what(), exception_msg); + } + + // start db management + cmd = "cd " + db_management_folder + " && " + "bash ut_start.sh"; + rt = system(cmd.c_str()); + UT_EXPECT_EQ(rt, 0); + // sleep and wait db management start + fma_common::SleepS(300); + + // test crud + try { + // create a new job + job_id = DBManagementClient::GetInstance() + .CreateJob(host, port, start_time, period, name, type, user, create_time); + UT_EXPECT_EQ(1, job_id); + // test jobid self increment + job_id = DBManagementClient::GetInstance() + .CreateJob(host, port, start_time, period, name, type, user, create_time); + UT_EXPECT_EQ(2, job_id); + + // update this job + DBManagementClient::GetInstance().UpdateJob(host, port, job_id, status, runtime, result); + + // read all jobs status + std::vector jobs = + DBManagementClient::GetInstance().ReadJob(host, port); + UT_EXPECT_EQ(2, jobs.size()); + // read job by jobid + db_management::Job job = + DBManagementClient::GetInstance().ReadJob(host, port, job_id); + UT_EXPECT_EQ(2, job.job_id()); + + // read job result by id + db_management::JobResult job_result = + DBManagementClient::GetInstance().ReadJobResult(host, port, job_id); + UT_EXPECT_EQ(result, job_result.result()); + + // delete job + DBManagementClient::GetInstance().DeleteJob(host, port, job_id); + + // test if deleted successfully + jobs = DBManagementClient::GetInstance().ReadJob(host, port); + UT_EXPECT_EQ(1, jobs.size()); + } catch(std::exception& e) { + DEBUG_LOG(ERROR) << e.what(); + UT_EXPECT_EQ(1, 0); + } + + // stop db management + cmd = "cd " + db_management_folder + " && " + "bash ut_stop.sh "; + rt = system(cmd.c_str()); + UT_EXPECT_EQ(rt, 0); +}