diff --git a/interface/common/resdb_state_accessor.cpp b/interface/common/resdb_state_accessor.cpp index 34db7eab1..ec7033ea6 100644 --- a/interface/common/resdb_state_accessor.cpp +++ b/interface/common/resdb_state_accessor.cpp @@ -35,8 +35,7 @@ std::unique_ptr ResDBStateAccessor::GetNetChannel( } // Obtain ReplicaState of each replica. -absl::StatusOr -ResDBStateAccessor::GetReplicaState() { +absl::StatusOr ResDBStateAccessor::GetReplicaState() { const auto& client_info = config_.GetReplicaInfos()[0]; Request request; diff --git a/interface/common/resdb_state_accessor_test.cpp b/interface/common/resdb_state_accessor_test.cpp index fad1e4604..ad27ca5f8 100644 --- a/interface/common/resdb_state_accessor_test.cpp +++ b/interface/common/resdb_state_accessor_test.cpp @@ -28,8 +28,8 @@ namespace resdb { namespace { -using ::resdb::testing::EqualsProto; using ::google::protobuf::util::MessageDifferencer; +using ::resdb::testing::EqualsProto; using ::testing::ElementsAre; using ::testing::Invoke; using ::testing::Test; diff --git a/interface/common/resdb_txn_accessor.cpp b/interface/common/resdb_txn_accessor.cpp index 55ef23b43..c9c2049c4 100644 --- a/interface/common/resdb_txn_accessor.cpp +++ b/interface/common/resdb_txn_accessor.cpp @@ -147,4 +147,44 @@ absl::StatusOr> ResDBTxnAccessor::GetRequestFromReplica( return txn_resp; } +absl::StatusOr ResDBTxnAccessor::GetBlockNumbers() { + QueryRequest request; + request.set_min_seq(0); + request.set_max_seq(0); + + std::vector> clients; + std::vector ths; + std::string final_str; + std::mutex mtx; + std::condition_variable resp_cv; + + std::unique_ptr client = + GetNetChannel(replicas_[0].ip(), replicas_[0].port()); + + LOG(ERROR) << "ip:" << replicas_[0].ip() << " port:" << replicas_[0].port(); + + std::string response_str; + int ret = 0; + for (int i = 0; i < 5; ++i) { + ret = client->SendRequest(request, Request::TYPE_QUERY); + if (ret) { + continue; + } + client->SetRecvTimeout(100000); + ret = client->RecvRawMessageStr(&response_str); + LOG(ERROR) << "receive str:" << ret << " len:" << response_str.size(); + if (ret != 0) { + continue; + } + break; + } + + QueryResponse resp; + if (response_str.empty() || !resp.ParseFromString(response_str)) { + LOG(ERROR) << "parse fail len:" << final_str.size(); + return absl::InternalError("recv data fail."); + } + return resp.max_seq(); +} + } // namespace resdb diff --git a/interface/common/resdb_txn_accessor.h b/interface/common/resdb_txn_accessor.h index 9fab8d222..db5dc042a 100644 --- a/interface/common/resdb_txn_accessor.h +++ b/interface/common/resdb_txn_accessor.h @@ -40,6 +40,8 @@ class ResDBTxnAccessor { virtual absl::StatusOr> GetRequestFromReplica( uint64_t min_seq, uint64_t max_seq, const ReplicaInfo& replica); + virtual absl::StatusOr GetBlockNumbers(); + protected: virtual std::unique_ptr GetNetChannel(const std::string& ip, int port); diff --git a/platform/networkstrate/consensus_manager.cpp b/platform/networkstrate/consensus_manager.cpp index 0e8667e28..00b708109 100644 --- a/platform/networkstrate/consensus_manager.cpp +++ b/platform/networkstrate/consensus_manager.cpp @@ -210,9 +210,9 @@ int ConsensusManager::ProcessHeartBeat(std::unique_ptr context, } LOG(ERROR) << "receive public size:" << hb_info.public_keys().size() - << " primary:" << hb_info.primary() - << " version:" << hb_info.version() - << " from region:" << request->region_info().region_id(); + << " primary:" << hb_info.primary() + << " version:" << hb_info.version() + << " from region:" << request->region_info().region_id(); if (request->region_info().region_id() == config_.GetConfigData().self_region_id()) { diff --git a/platform/proto/resdb.proto b/platform/proto/resdb.proto index b17b12404..14ad976a7 100644 --- a/platform/proto/resdb.proto +++ b/platform/proto/resdb.proto @@ -168,6 +168,7 @@ message QueryRequest { message QueryResponse { repeated Request transactions = 1; + uint64 max_seq = 2; } message CustomQueryResponse {