Skip to content

Commit

Permalink
raft: make replicate_sync truly blocking
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Maurer <[email protected]>
  • Loading branch information
maurermi committed Oct 30, 2024
1 parent 4b3f319 commit 1708a31
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 11 deletions.
23 changes: 20 additions & 3 deletions src/util/raft/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

#include "node.hpp"

#include <future>

namespace cbdc::raft {
node::node(int node_id,
std::vector<network::endpoint_t> raft_endpoints,
Expand Down Expand Up @@ -99,11 +101,26 @@ namespace cbdc::raft {
auto node::replicate_sync(const nuraft::ptr<nuraft::buffer>& new_log) const
-> std::optional<nuraft::ptr<nuraft::buffer>> {
auto ret = m_raft_instance->append_entries({new_log});
if(!ret->get_accepted()
|| ret->get_result_code() != nuraft::cmd_result_code::OK) {
if(!ret->get_accepted()) {
return std::nullopt;
}
auto result_code = nuraft::cmd_result_code::RESULT_NOT_EXIST_YET;
auto blocking_promise = std::promise<void>();
auto blocking_future = blocking_promise.get_future();
ret->when_ready([&result_code,
&blocking_promise](raft::result_type& r,
nuraft::ptr<std::exception>& err) {
if(err) {
result_code = nuraft::cmd_result_code::FAILED;
} else {
result_code = r.get_result_code();
}
blocking_promise.set_value();
});
blocking_future.wait();
if(result_code != nuraft::cmd_result_code::OK) {
return std::nullopt;
}

return ret->get();
}

Expand Down
13 changes: 5 additions & 8 deletions tests/unit/raft_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ class raft_test : public ::testing::Test {
auto new_log
= cbdc::make_buffer<uint64_t, nuraft::ptr<nuraft::buffer>>(1);

auto res = nodes[0]->replicate_sync(new_log);
ASSERT_TRUE(res.has_value());
ASSERT_EQ(nodes[0]->last_log_idx(), 2UL);

cbdc::raft::callback_type result_fn = nullptr;
auto result_done = std::atomic<bool>(false);
if(!blocking) {
Expand All @@ -190,14 +194,7 @@ class raft_test : public ::testing::Test {
while(!result_done) {
std::this_thread::sleep_for(std::chrono::milliseconds(250));
}
ASSERT_EQ(nodes[0]->last_log_idx(), 2UL);

if(blocking) {
// Replicate sync will only return a value in the blocking context
auto res = nodes[0]->replicate_sync(new_log);
ASSERT_TRUE(res.has_value());
ASSERT_EQ(nodes[0]->last_log_idx(), 3UL);
}
ASSERT_EQ(nodes[0]->last_log_idx(), 3UL);

for(size_t i{0}; i < nodes.size(); i++) {
ASSERT_EQ(nodes[i]->get_sm(), sms[i].get());
Expand Down

0 comments on commit 1708a31

Please sign in to comment.