Skip to content

Commit

Permalink
Merge pull request #92 from raakella1/metrics_name
Browse files Browse the repository at this point in the history
correct metrics name
  • Loading branch information
raakella1 authored Jul 24, 2024
2 parents 5868c0b + e71c93a commit 7ed58b9
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 14 deletions.
4 changes: 2 additions & 2 deletions conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

class NuRaftMesgConan(ConanFile):
name = "nuraft_mesg"
version = "3.5.4"
version = "3.5.5"

homepage = "https://github.com/eBay/nuraft_mesg"
description = "A gRPC service for NuRAFT"
Expand Down Expand Up @@ -127,4 +127,4 @@ def package_info(self):
self.cpp_info.set_property("cmake_file_name", "NuraftMesg")
self.cpp_info.set_property("cmake_target_name", "NuraftMesg::NuraftMesg")
self.cpp_info.names["cmake_find_package"] = "NuraftMesg"
self.cpp_info.names["cmake_find_package_multi"] = "NuraftMesg"
self.cpp_info.names["cmake_find_package_multi"] = "NuraftMesg"
7 changes: 4 additions & 3 deletions src/lib/service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ class group_metrics : public sisl::MetricsGroupWrapper {
public:
explicit group_metrics(group_id_t const& group_id) :
sisl::MetricsGroupWrapper("RAFTGroup", to_string(group_id).c_str()) {
REGISTER_COUNTER(group_steps, "Total group messages received", "raft_group", {"op", "step"});
REGISTER_COUNTER(group_sends, "Total group messages sent", "raft_group", {"op", "send"});
REGISTER_HISTOGRAM(group_step_latency, "Latency for processing raft step", "raft_group", {"op", "step"});
REGISTER_COUNTER(group_steps, "Total group messages received", "raft_group_cnt", {"op", "step_count"});
REGISTER_COUNTER(group_sends, "Total group messages sent", "raft_group_cnt", {"op", "send_count"});
REGISTER_HISTOGRAM(append_entries_latency_us, "Latency for processing raft step", "raft_group_latency",
{"op", "append_latency"});
register_me_to_farm();
}

Expand Down
4 changes: 2 additions & 2 deletions src/proto/proto_mesg_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ class messaging_client : public grpc_client< Messaging >, public std::enable_sha
[weak_this = std::weak_ptr< messaging_client >(shared_from_this())](auto&& response) -> NullResult {
if (response.hasError()) {
auto mc = weak_this.lock();
std::string addr = mc ? mc->_addr : "unknown";
log_every_nth(addr, response.error(), "unidirectional");
LOGD("Failed to send unidirectional data_service_request to {}, error: {}",
mc ? mc->_addr : "unknown", response.error().error_message());
return folly::makeUnexpected(grpc_status_to_nuraft_code(response.error()));
}
return folly::unit;
Expand Down
17 changes: 10 additions & 7 deletions src/proto/proto_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ static RCResponse* fromRCResponse(nuraft::resp_msg& rcmsg) {
}

class proto_service : public msg_service {
::grpc::Status step(nuraft::raft_server& server, const RaftMessage& request, RaftMessage& reply);
::grpc::Status step(nuraft::raft_server& server, const RaftMessage& request, RaftMessage& reply,
std::shared_ptr< group_metrics > metrics);

public:
template < typename... Args >
Expand Down Expand Up @@ -81,12 +82,17 @@ void proto_service::bind(::sisl::GrpcServer* server) {
}
}

::grpc::Status proto_service::step(nuraft::raft_server& server, const RaftMessage& request, RaftMessage& reply) {
::grpc::Status proto_service::step(nuraft::raft_server& server, const RaftMessage& request, RaftMessage& reply,
std::shared_ptr< group_metrics > metrics) {
LOGT("Stepping [{}] from: [{}] to: [{}]", nuraft::msg_type_to_string(nuraft::msg_type(request.base().type())),
request.base().src(), request.base().dest());
auto rcreq = toRequest(request);
auto const time_start = std::chrono::steady_clock::now();
auto resp = nuraft::raft_server_handler::process_req(&server, *rcreq);
if (!resp) { return ::grpc::Status(::grpc::StatusCode::CANCELLED, "Server rejected request"); }
if (metrics && rcreq->get_type() == nuraft::msg_type::append_entries_request) {
HISTOGRAM_OBSERVE(*metrics, append_entries_latency_us, get_elapsed_time_us(time_start));
}
assert(resp);
reply.set_allocated_base(fromBaseRequest(*resp));
reply.set_allocated_rc_response(fromRCResponse(*resp));
Expand Down Expand Up @@ -145,11 +151,8 @@ bool proto_service::raftStep(const sisl::AsyncRpcDataPtr< Messaging, RaftGroupMs
if (auto it = _raft_servers.find(gid); _raft_servers.end() != it) {
if (it->second.m_metrics) COUNTER_INCREMENT(*it->second.m_metrics, group_steps, 1);
try {
auto const time_start = std::chrono::steady_clock::now();
rpc_data->set_status(
step(*it->second.m_server->raft_server(), request.msg(), *response.mutable_msg()));
if (it->second.m_metrics)
HISTOGRAM_OBSERVE(*it->second.m_metrics, group_step_latency, get_elapsed_time_ms(time_start));
rpc_data->set_status(step(*it->second.m_server->raft_server(), request.msg(),
*response.mutable_msg(), it->second.m_metrics));
} catch (std::runtime_error& rte) {
LOGE("Caught exception during step(): {}", rte.what());
rpc_data->set_status(
Expand Down

0 comments on commit 7ed58b9

Please sign in to comment.