diff --git a/.github/workflows/ubuntu-builder.yml b/.github/workflows/ubuntu-builder.yml index 58796eaf8..1df720a2c 100644 --- a/.github/workflows/ubuntu-builder.yml +++ b/.github/workflows/ubuntu-builder.yml @@ -11,9 +11,9 @@ jobs: fail-fast: false matrix: include: - - builder-image: "ghcr.io/${{ github.repository_owner }}/psibase-builder-ubuntu-2004:b2a46a609b12f645aaeb03245afd0e3d2bf75340" + - builder-image: "ghcr.io/${{ github.repository_owner }}/psibase-builder-ubuntu-2004:efa3d81568eb2d4cdc3b3d8a343dd26de0fcba6c" ubuntu-version: "2004" - - builder-image: "ghcr.io/${{ github.repository_owner }}/psibase-builder-ubuntu-2204:7e3c6e5739df95ba33943789d2cdf5472f91111a" + - builder-image: "ghcr.io/${{ github.repository_owner }}/psibase-builder-ubuntu-2204:efa3d81568eb2d4cdc3b3d8a343dd26de0fcba6c" ubuntu-version: "2204" steps: - name: Timestamp diff --git a/.gitignore b/.gitignore index 711ae4d51..be1f9e1a9 100644 --- a/.gitignore +++ b/.gitignore @@ -24,3 +24,4 @@ vite.config.ts.timestamp-*.mjs psibase.127.0.0.1.sslip.io*.pem psinode_db psinode_db_secure +__pycache__ diff --git a/doc/psidk/src/consensus.md b/doc/psidk/src/consensus.md index 05d82bf1e..b6c6c250e 100644 --- a/doc/psidk/src/consensus.md +++ b/doc/psidk/src/consensus.md @@ -3,6 +3,18 @@ - [CFT](#cft) - [BFT](#bft) - [Joint Consensus](#joint-consensus) +- [Special Cases](#special-cases) + +## Overview + +The blockchain forms a replicated state machine. Each block contains a cryptographic hash identifying the previous block and a bundle of transactions that can be applied to the chain state to create a new state. Block execution is strictly deterministic. Applying the same sequence of blocks to the same initial state will always result in the same final state. The process of consensus ensures that nodes agree on the sequence of blocks to apply, and thus agree on the state of the chain. + +New blocks are created by block producers designated in the chain state. After a block producer creates a block, the block is distributed to all other block producers which then agree to make the block irreversible. If there are conflicts, then the consensus algorithm will resolve them. Each consensus algorithm has its own constraints on the behavior of block producers. As long as these constraints are met, the consensus algorithm guarantees safety: +- Only one fork can ever become irreversible +- If two blocks are irreversible, one must be a descendant of the other +- An irreversible block cannot be forked out + +All of these formulations are equivalent. ## CFT @@ -57,24 +69,53 @@ BFT consensus has the following properties: ### Block Production -Blocks are produced by the leader. Leader selection is round-robin. The producer should build off a chain that includes the best block that has been prepared by 2f+1 producers. +Blocks are produced by the leader. Leader selection is round-robin. ### Irreversibility - Blocks are ordered first by term, then by block height. - Blocks go through two rounds of confirmation, prepare and commit. Each producer broadcasts its confirmations to all other producer nodes. -- A producer can only commit a block that has been prepared by 2f+1 producers. -- If a producer commits a block A, it can only prepare a conflicting block B that is better than A if at least 2f+1 nodes have already prepared an ancestor of B that is better than A. - A block is considered irreversible if it has been committed by 2f+1 nodes. +Block producers must obey these restrictions: +1. A producer shall only commit a block that has been prepared by 2f+1 producers. +2. A producer shall not prepare or commit conflicting blocks in the same term +3. A view change for view N names a block from any prior view that has been prepared by 2f+1 producers. This block must not be worse than any block that the producer has committed in any view before N. +4. The first block of a term must be a descendant of a block which is the best block referenced by 2f+1 view changes from different producers, and is referenced by the leader's best view change. The leader MAY issue more than one view change and MUST issue a second view change if its original view change refers to a block that is too old. + ### View Change Each producer maintains a view change timer to detect failure in the leader. The view change timer is restarted whenever irreversibility advances. When the view change timer expires, a producer broadcasts a `view_change_message` to all other producers and increments the current term, moving to the next leader. -- After initiating a view change, a producer will not restart the view change timer until it sees 2f+1 view change messages. -- Consecutive view changes during which the chain does not make progress, cause the view change timeout to increase by a flat amount for each view change after the first. +- After initiating a view change, a producer will not restart the view change timer until it sees 2f+1 block producers are in the same term or later. +- Consecutive view changes during which the chain does not make progress, cause the view change timeout to increase. - A view change will trigger immediately if the leader of the current term or at least f+1 other producers are in a later term. This view change is independent of the view change timer and may skip terms. +View change messages are broadcast network-wide. Every node retains the best view change from each active producer and broadcasts it to its peers whenever it receives a better one. When a node changes its view of active producers, it requests the current list of view change messages from all its peers. + +The ordering of view change messages from a single node is defined as follows: +- A higher term is always better than a lower term +- The ordering of view changes with the same term depends on whether the node that issued the view change is the leader. + - If the source is not the leader, a view change is better if it refers to an older block + - If the source is the leader, a view change is better if it refers to a newer block + +Note: Since the condition for activating a view requires the leader's view change to refer to a block which not older than other nodes' view changes, this ordering guarantees that as long as a valid set of view changes exist, all connected, honest nodes will converge to the same valid set. + +### Safety + +Let block X and block Y be two conflicting blocks which are both irreversible +- Without loss of generality, let X be better than Y +- Y has been committed by 2f+1 producers, because that is the requirement for it to be considered irreversible +- let Z be the earliest block that is an ancestor of X (inclusive of X), conflicts with Y, is better than Y, and is prepared by 2f+1 producers. Such a block is guaranteed to exist because X itself must be prepared by 2f+1 producers in order to be committed by any honest producer. + +The existence of Z implies that at least f+1 producers have violated the protocol +- If Z and Y have the same term then at least f+1 producers have violated rule 2 +- Y does not have a higher term than Z because Z is better than Y +- If Z is in a higher term than Y, then there is a valid view change set for the term of Z + - Let W be the block named by the leader's view change + - If W is worse than Y, then at least f+1 producers have violated rule 3 + - Otherwise, W is in a lower term than Z and is prepared by 2f+1 producers (by the definition of a view change) and conflicts with Y and is better than Y (otherwise X would be a descendant of Y), which contradicts the definition of Z. + ### Messages `BlockMessage` @@ -92,7 +133,7 @@ The leader is selected from any of the old or new producers. Leader elections an ### CFT → BFT -The leader is selected by election from the old producers. Irreversibility requires a majority of the old producers and a quorum of the new producers. Leader election requires a majority of the old producers and a quorum of the new producers. +The leader is selected by election from the old producers. Irreversibility requires a majority of the old producers and a quorum of the new producers. Leader election requires a majority of the old producers and a quorum of the new producers. The new producers cannot advance the current term. ### BFT → CFT @@ -100,4 +141,9 @@ The leader is selected from the old producers. The new producers commit, but do ### BFT → BFT -The leader is selected from the old producers. Committing a block and advancing irreversibility require quorums of both the old and the new producers. Switching forks after a commit is permitted if a quorum of either the old or the new producers have prepared the new fork. +The leader is selected from the old producers. Activating a view, committing a block, and advancing irreversibility all require quorums of both the old and the new producers. View change auto triggers if f+1 of the old producers are ahead of the current term. + +## Special Cases + +- Blocks with a single producer are immediately irreversible. This is a degenerate case for both algorithms and they behave identically. +- The genesis block is immediately irreversible. There is nothing to preserve pre-genesis. diff --git a/libraries/net/include/psibase/bft.hpp b/libraries/net/include/psibase/bft.hpp index c439d9611..49e438ac9 100644 --- a/libraries/net/include/psibase/bft.hpp +++ b/libraries/net/include/psibase/bft.hpp @@ -62,6 +62,14 @@ namespace psibase::net // assuming that the signature remains valid. PSIO_REFLECT(CommitMessage, definitionWillNotChange(), block_id, producer, signer) + struct ProducerConfirm + { + AccountNumber producer; + std::vector signature; + friend bool operator==(const ProducerConfirm&, const ProducerConfirm&) = default; + }; + PSIO_REFLECT(ProducerConfirm, producer, signature) + struct ViewChangeMessage { static constexpr unsigned type = 40; @@ -69,13 +77,36 @@ namespace psibase::net AccountNumber producer; Claim signer; + // This is the best block which is prepared in a previous term. This + // is sufficient to guarantee the properties that we require + // - The block must be prepared + // - We must not commit a better block in a previous term + Checksum256 best_prepared; + TermNum best_prepared_term; + std::vector prepares; + std::optional> nextPrepares; + + auto by_best_prepared() const { return std::tie(best_prepared_term, best_prepared); } + friend bool operator==(const ViewChangeMessage&, const ViewChangeMessage&) = default; std::string to_string() const { - return "view change: term=" + std::to_string(term) + " producer=" + producer.str(); + return "view change: term=" + std::to_string(term) + " producer=" + producer.str() + + " prepared=" + loggers::to_string(best_prepared); } }; - PSIO_REFLECT(ViewChangeMessage, term, producer, signer) + PSIO_REFLECT(ViewChangeMessage, + term, + producer, + signer, + best_prepared, + best_prepared_term, + prepares, + nextPrepares) + inline auto by_best_prepared(psio::view message) + { + return std::tuple(message.best_prepared_term().unpack(), message.best_prepared().unpack()); + } struct RequestViewMessage { @@ -85,13 +116,6 @@ namespace psibase::net }; PSIO_REFLECT(RequestViewMessage); - struct ProducerConfirm - { - AccountNumber producer; - std::vector signature; - }; - PSIO_REFLECT(ProducerConfirm, producer, signature) - // TODO: consider using a multiparty signature scheme to save space struct BlockConfirm { @@ -116,6 +140,7 @@ namespace psibase::net using Base::self; using Base::start_leader; using Base::stop_leader; + using Base::validate_message; using Base::validate_producer; using typename Base::producer_state; enum class confirm_type @@ -124,21 +149,23 @@ namespace psibase::net commit }; + template void verifyMsig(const auto& revision, const Checksum256& id, - const auto& commits, + const auto& confirms, const ProducerSet& prods) { AccountNumber prevAccount{}; - check(commits.size() >= prods.threshold(), "Not enough commits"); - for (const auto& [prod, sig] : commits) + check(confirms.size() >= prods.threshold(), "Not enough confirmations"); + for (const ProducerConfirm& confirm : confirms) { + const auto& [prod, sig] = confirm; // mostly to guarantee that the producers are unique - check(prevAccount < prod, "Commits must be ordered by producer"); + check(prevAccount < prod, "Confirmations must be ordered by producer"); auto claim = prods.getClaim(prod); - check(!!claim, "Not a valid producer"); - CommitMessage originalCommit{id, prod, *claim}; - auto msg = network().serialize_unsigned_message(originalCommit); + check(!!claim, "Not a valid producer: " + prod.str()); + M originalMsg{id, prod, *claim}; + auto msg = network().serialize_unsigned_message(originalMsg); chain().verify(revision, {msg.data(), msg.size()}, *claim, sig); } } @@ -147,11 +174,12 @@ namespace psibase::net const BlockConfirm& commits, const BlockHeaderState* state) { - verifyMsig(revision, state->blockId(), commits.commits, *state->producers); + verifyMsig(revision, state->blockId(), commits.commits, *state->producers); if (state->nextProducers) { check(!!commits.nextCommits, "nextCommits required during joint consensus"); - verifyMsig(revision, state->blockId(), *commits.nextCommits, *state->nextProducers); + verifyMsig(revision, state->blockId(), *commits.nextCommits, + *state->nextProducers); } else { @@ -278,14 +306,18 @@ namespace psibase::net std::map confirmations; std::vector producer_views[2]; + // Blocks that are referenced by view change messages + // When such a block is received, it requires the subtree + // to be re-evaluated + std::vector pendingBlocks; using BlockOrder = decltype(std::declval().order()); BlockOrder best_prepared = {}; - BlockOrder best_prepare = {}; - BlockOrder alt_prepare = {}; + BlockOrder best_confirm = {}; Timer _new_term_timer; + TermNum _ready_term = -1; std::chrono::milliseconds _timeout = std::chrono::seconds(10); std::chrono::milliseconds _timeout_delta = std::chrono::seconds(5); @@ -297,25 +329,6 @@ namespace psibase::net std::tuple&, const std::shared_ptr&> get_producers(const BlockHeaderState* state) { -#if 0 - std::cout << "producers for block: " << loggers::to_string(state->blockId()); - std::cout << " {"; - for(const auto& [name,key] : state->producers->activeProducers) - { - std::cout << name.str() << ','; - } - std::cout << '}'; - if(state->nextProducers) - { - std::cout << " {"; - for(const auto& [name,key] : state->nextProducers->activeProducers) - { - std::cout << name.str() << ','; - } - std::cout << '}'; - } - std::cout << std::endl; -#endif return {state->producers, state->nextProducers}; } @@ -424,19 +437,36 @@ namespace psibase::net _state = producer_state::follower; } } - bool new_producers = !active_producers[0] || *active_producers[0] != *prods.first; + bool new_producers = !active_producers[0] || *active_producers[0] != *prods.first; + bool need_view_sync = false; if (start_bft || new_producers) { - // TODO: start tracking views of joint producers early. Not necessary for safety, - // but will help reduce unnecessary delays on a producer change. producer_views[0].clear(); producer_views[0].resize(prods.first->size()); - network().multicast(RequestViewMessage{}); + need_view_sync = true; + } + if ((!active_producers[1] && prods.second) || + (prods.second && *active_producers[1] == *prods.second)) + { + producer_views[1].clear(); + producer_views[1].resize(prods.second->size()); + need_view_sync = true; } - if (new_producers) + else if (!prods.second) { - PSIBASE_LOG(logger, info) << "New producers: " << prods.first->size() << ", " - << (prods.second ? prods.second->size() : 0); + producer_views[1].clear(); + } + if (new_producers || active_producers[1] != prods.second) + { + PSIBASE_LOG(logger, info) << "Active producers: " << *prods.first + << print_function( + [&](std::ostream& os) + { + if (prods.second) + { + os << ", " << *prods.second; + } + }); } active_producers[0] = std::move(prods.first); active_producers[1] = std::move(prods.second); @@ -446,15 +476,15 @@ namespace psibase::net { PSIBASE_LOG(logger, info) << "Node is active producer"; _state = producer_state::follower; - start_timer(); + maybe_start_timer(); + // If we became a producer due to renaming ourselves, instead of + // because the producer set changed, this might not have been set + // previously. + need_view_sync = true; } else if (start_bft) { - start_timer(); - } - if (start_bft || new_producers) - { - sync_current_term(); + maybe_start_timer(); } } else @@ -470,7 +500,13 @@ namespace psibase::net _state = producer_state::nonvoting; } } + if (need_view_sync) + { + sync_current_term(); + network().multicast(RequestViewMessage{}); + } assert(producer_views[0].size() == active_producers[0]->size()); + assert(!active_producers[1] || producer_views[1].size() == active_producers[1]->size()); } void start_timer() @@ -484,36 +520,372 @@ namespace psibase::net if (!ec && old_term == current_term) { auto committed = chain().get_block_by_num(chain().commit_index()); - if (committed && committed->block().header().term() < current_term) + if (!committed || committed->block().header().term() < current_term) { // If we have not committed a block in the current term, increase the block timer - _timeout += _timeout_delta; + //_timeout += _timeout_delta; + _timeout = _timeout * 3 / 2; + PSIBASE_LOG(logger, info) + << "Increased consensus timeout to " << _timeout.count() << " ms"; } increase_view(); - start_timer(); } }); } } // returns true if a quorum of producers are believed to be in the current term. - bool is_term_ready() const + bool is_term_ready() { return get_subtree_from_leader() != nullptr; } + + // If the leader has provided a view change that is viable + // as the root of the current view, return it. Otherwise, + // return nullptr. + BlockHeaderState* get_subtree_from_leader() { - auto threshold = producer_views[0].size() * 2 / 3 + 1; - return std::count_if(producer_views[0].begin(), producer_views[0].end(), - [current_term = current_term](const auto& v) - { return v.term == current_term; }) >= threshold; + auto& leader_view = producer_views[0][get_leader_index(current_term)]; + if (leader_view.term != current_term) + { + PSIBASE_LOG(logger, debug) + << "Leader " + << (active_producers[0]->activeProducers.begin() + get_leader_index(current_term)) + ->first.str() + << " not in current term " << current_term << ": " + << print_function([&](std::ostream& os) { print_views(os, 0); }); + return nullptr; + } + if (!leader_view.best_message) + { + PSIBASE_LOG(logger, debug) << "View change missing message"; + return nullptr; + } + Checksum256 id = leader_view.best_message->data->best_prepared(); + BlockHeaderState* state = chain().get_state(id); + if (id == Checksum256{}) + { + state = chain().get_state(chain().get_block_id(chain().commit_index())); + } + else if (state) + { + // check prepares + if (!validate_view_change(state, leader_view)) + { + PSIBASE_LOG(logger, debug) << "View change validation failed"; + return nullptr; + } + } + if (!state) + { + if (chain().get_block_id(getBlockNum(id)) == id) + { + state = chain().get_state(chain().get_block_id(chain().commit_index())); + } + else + { + PSIBASE_LOG(logger, debug) << "leader view change refers to unknown state"; + return nullptr; + } + } + + auto n = std::count_if( + producer_views[0].begin(), producer_views[0].end(), + [current_term = current_term, + limit = by_best_prepared(*leader_view.best_message->data)](const auto& v) + { + return v.term == current_term && v.best_message && + by_best_prepared(*v.best_message->data) <= limit; + }); + if (!is_term_ready(0, *leader_view.best_message)) + { + PSIBASE_LOG(logger, debug) + << "Not enough producers in the current term " << current_term << ": " + << print_function([&](std::ostream& os) + { print_views(os, 0, *leader_view.best_message); }); + return nullptr; + } + if (active_producers[1] && !is_term_ready(1, *leader_view.best_message)) + { + PSIBASE_LOG(logger, debug) + << "Not enough joint producers in the current term " << current_term << ": " + << print_function([&](std::ostream& os) + { print_views(os, 1, *leader_view.best_message); }); + return nullptr; + } + PSIBASE_LOG(logger, debug) << "Term " << current_term << " ready"; + return state; + } + + void print_views(std::ostream& os, + int group, + const SignedMessage& leader_msg) + { + os << '['; + bool first = true; + for (std::size_t i = 0; i < producer_views[group].size(); ++i) + { + const auto& view = producer_views[group][i]; + const auto& prod = (active_producers[group]->activeProducers.begin() + i)->first; + if (first) + { + first = false; + } + else + { + os << ' '; + } + os << prod.str() << '=' << view.term; + if (view.term == current_term && + !(view.best_message && + by_best_prepared(*view.best_message->data) <= by_best_prepared(*leader_msg.data))) + { + os << '!'; + } + } + os << ']'; + } + + void print_views(std::ostream& os, int group) + { + os << '['; + bool first = true; + for (std::size_t i = 0; i < producer_views[group].size(); ++i) + { + const auto& view = producer_views[group][i]; + const auto& prod = (active_producers[group]->activeProducers.begin() + i)->first; + if (first) + { + first = false; + } + else + { + os << ' '; + } + os << prod.str() << '=' << view.term; + } + os << ']'; } + bool is_term_ready(int group, const SignedMessage& leader_msg) + { + auto n = std::ranges::count_if(producer_views[group], + [current_term = current_term, + limit = by_best_prepared(*leader_msg.data)](const auto& v) + { + return v.term == current_term && v.best_message && + by_best_prepared(*v.best_message->data) <= limit; + }); + return n >= active_producers[group]->threshold(); + } + + BlockHeaderState* get_subtree_from_view_change(view_info& info) + { + if (!info.best_message) + { + return nullptr; + } + if (info.best_message->data->best_prepared().unpack() == Checksum256{}) + { + return chain().get_state(chain().get_block_id(chain().commit_index())); + } + BlockHeaderState* state = chain().get_state(info.best_message->data->best_prepared()); + if (!state) + { + if (getBlockNum(info.best_message->data->best_prepared()) > chain().commit_index()) + { + pendingBlocks.push_back(info.best_message->data->best_prepared()); + } + return nullptr; + } + if (!validate_view_change(state, info)) + { + return nullptr; + } + return state; + } + + bool validate_view_change(BlockHeaderState* state, view_info& info) + { + if (info.best_message->data->best_prepared_term() != state->info.header.term) + { + PSIBASE_LOG(logger, warning) << "invalid view change: wrong term"; + return false; + } + auto verifyState = chain().get_state(state->info.header.previous); + if (!verifyState) + { + // TODO: This implies that state is LIB. The need for this check + // is yet another symptom of the problems with the way we're handling + // the services for signature verification + return true; + } + try + { + verifyMsig(verifyState->authState->revision, state->blockId(), + info.best_message->data->prepares(), *state->producers); + if (state->nextProducers && state->nextProducers->algorithm == ConsensusAlgorithm::bft) + { + if (auto nextPrepares = info.best_message->data->nextPrepares()) + { + verifyMsig(verifyState->authState->revision, state->blockId(), + *nextPrepares, *state->nextProducers); + } + else + { + PSIBASE_LOG(logger, warning) << "invalid view change: missing nextPrepares"; + return false; + } + } + else if (info.best_message->data->nextPrepares()) + { + PSIBASE_LOG(logger, warning) << "invalid view change: unexpected nextPrepares"; + return false; + } + } + catch (std::runtime_error& e) + { + PSIBASE_LOG(logger, warning) << "invalid view change: " << e.what(); + return false; + } + return true; + } + + void load_prepares(const BlockHeaderState* state, + const ProducerSet& prods, + psio::view> prepares) + { + for (const ProducerConfirm& confirm : prepares) + { + const auto& [prod, sig] = confirm; + auto claim = prods.getClaim(prod); + check(!!claim, "Not a valid producer"); + PrepareMessage originalPrepare{state->blockId(), prod, *claim}; + on_prepare(state, prod, SignedMessage{originalPrepare, sig}); + } + } + + // \pre the msg has been validated, including all prepare signatures + void load_prepares(BlockHeaderState* state, const SignedMessage& msg) + { + load_prepares(state, *state->producers, msg.data->prepares()); + if (auto nextPrepares = msg.data->nextPrepares()) + { + load_prepares(state, *state->nextProducers, *nextPrepares); + } + } + + void set_root() + { + // If the current producer's ViewChange for the current term is viable, use it + // Otherwise, use the best ViewChange for the current term + BlockHeaderState* state = get_subtree_from_leader(); + if (!state) + { + pendingBlocks.clear(); + view_info* best_view_change = nullptr; + for (int i = 0; i < 2; ++i) + { + for (auto& view_change : producer_views[i]) + { + if (view_change.term == current_term) + { + BlockHeaderState* check_state = get_subtree_from_view_change(view_change); + if (check_state) + { + if (!best_view_change || + by_best_prepared(*best_view_change->best_message->data) < + by_best_prepared(*view_change.best_message->data)) + { + best_view_change = &view_change; + state = check_state; + } + } + } + } + } + std::ranges::sort(pendingBlocks); + auto [b, e] = std::ranges::unique(pendingBlocks); + pendingBlocks.erase(b, e); + if (state && is_leader()) + { + // TODO: We should not assume that state corresponds to best_message + load_prepares(state, *best_view_change->best_message); + sync_current_term(); + } + } + // Because we accept view changes even if we don't have the referenced block, + // it's possible to trigger a view change without being able to set the subtree. + if (!state) + { + state = chain().get_state(chain().get_block_id(chain().commit_index())); + } + chain().set_subtree(state, "view change from current leader"); + } + + ViewChangeMessage make_view_change(BlockHeaderState* state) + { + ViewChangeMessage result{.term = current_term, .producer = self}; + result.best_prepared = state->blockId(); + result.best_prepared_term = state->info.header.term; + auto pos = confirmations.find(state->blockId()); + assert(pos != confirmations.end()); + if (state->nextProducers && state->nextProducers->algorithm == ConsensusAlgorithm::bft) + { + result.nextPrepares.emplace(); + } + for (const auto& msg : pos->second.prepares) + { + if (auto expected = state->producers->getClaim(msg.data->producer()); + expected && expected == msg.data->signer()) + { + result.prepares.push_back({msg.data->producer(), msg.signature}); + } + if (result.nextPrepares) + { + if (auto expected = state->nextProducers->getClaim(msg.data->producer()); + expected && expected == msg.data->signer()) + { + result.nextPrepares->push_back({msg.data->producer(), msg.signature}); + } + } + } + auto compareProducer = [](const auto& lhs, const auto& rhs) + { return lhs.producer < rhs.producer; }; + std::ranges::sort(result.prepares, compareProducer); + if (result.nextPrepares) + { + std::ranges::sort(*result.nextPrepares, compareProducer); + } + return result; + } + ViewChangeMessage make_view_change() + { + auto id = orderToXid(best_prepared); + if (auto* state = chain().get_state(id.id()); state && state->blockId() != Checksum256{}) + { + return make_view_change(state); + } + else + { + // The absence of a state implies that a better block has been committed (The + // fact that this block has been prepared prevents it from being forked out + // by a worse block) + // Therefore at least 2f+1 producers have seen a better prepared block + // Therefore this view change will not be chosen + // Therefore the list of prepares is not needed + return ViewChangeMessage{ + .term = current_term, .producer = self, .best_prepared = id.id()}; + } + } void sync_current_term() { + auto view_change = make_view_change(); + for_each_key( [&](const auto& k) { - auto msg = network().sign_message( - ViewChangeMessage{.term = current_term, .producer = self, .signer = k}); + view_change.signer = k; + auto msg = network().sign_message(view_change); set_producer_view(msg); - network().multicast_producers(msg); + network().multicast(msg); }); } @@ -555,6 +927,25 @@ namespace psibase::net } } + bool compare_view_change(const ProducerSet& producers, + psio::view lhs, + psio::view rhs) + { + assert(lhs.producer() == rhs.producer()); + if (lhs.term() < rhs.term()) + return true; + if (lhs.term() > rhs.term()) + return false; + if (is_leader(producers, lhs.term(), lhs.producer())) + { + return by_best_prepared(lhs) < by_best_prepared(rhs); + } + else + { + return by_best_prepared(lhs) > by_best_prepared(rhs); + } + } + const SignedMessage* get_newer_view( const SignedMessage& msg) { @@ -574,7 +965,8 @@ namespace psibase::net if (auto idx = active_producers[group]->getIndex(msg.data->producer(), msg.data->signer())) { auto& view = producer_views[group][*idx]; - if (view.best_message && view.best_message->data->term() > msg.data->term()) + if (view.best_message && + compare_view_change(*active_producers[group], *msg.data, *view.best_message->data)) { return &*view.best_message; } @@ -619,7 +1011,9 @@ namespace psibase::net view.term = term; result = true; } - if (msg && (!view.best_message || view.best_message->data->term() < msg->data->term())) + if (msg && + (!view.best_message || compare_view_change(*active_producers[group], + *view.best_message->data, *msg->data))) { view.best_message = *msg; result = true; @@ -644,6 +1038,7 @@ namespace psibase::net Base::switch_fork(); } } + maybe_start_timer(); } return result; } @@ -656,6 +1051,21 @@ namespace psibase::net } } + void maybe_start_timer() + { + if (_ready_term != current_term) + { + auto n = std::ranges::count_if(producer_views[0], [current_term = current_term](auto& v) + { return v.term >= current_term; }); + if (n >= active_producers[0]->threshold()) + { + _ready_term = current_term; + PSIBASE_LOG(logger, debug) << "Starting consensus timer for term " << current_term; + start_timer(); + } + } + } + void check_view_change_threshold() { std::vector view_copy; @@ -671,10 +1081,26 @@ namespace psibase::net if (new_term > current_term) { set_view(new_term); - start_timer(); + maybe_start_timer(); } } + bool is_forbidden_fork(const BlockHeaderState* state) + { + // If we have not confirmed anything, there are no restrictions + if (orderToXid(best_confirm).id() == Checksum256{}) + { + return false; + } + // If the block does not fork, then we can confirm it + if (chain().in_best_chain(state->xid()) && chain().in_best_chain(orderToXid(best_confirm))) + { + return false; + } + // Forks are allowed on a new term + return orderToTerm(best_confirm) >= state->info.header.term; + } + // track best committed, best prepared, best prepared on different fork bool can_prepare(const BlockHeaderState* state) { @@ -686,9 +1112,20 @@ namespace psibase::net { return false; } + if (state->info.header.term != current_term || + active_producers[0]->algorithm != ConsensusAlgorithm::bft || !is_term_ready()) + { + return false; + } + // CFT joint producers should not prepare if (state->nextProducers && state->nextProducers->algorithm != ConsensusAlgorithm::bft) { - return state->producers->isProducer(self); + if (!state->producers->isProducer(self)) + return false; + } + if (is_forbidden_fork(state)) + { + return false; } return true; } @@ -702,16 +1139,12 @@ namespace psibase::net { return false; } - // have threshold prepares AND have not prepared a better conflicting block - if (chain().in_best_chain(state->xid()) && chain().in_best_chain(orderToXid(best_prepare))) + if (state->info.header.term != current_term || + active_producers[0]->algorithm != ConsensusAlgorithm::bft || !is_term_ready()) { - return state->order() > alt_prepare; - } - else - { - return state->order() > best_prepare; + return false; } - return false; + return !is_forbidden_fork(state); } void on_prepare(const BlockHeaderState* state, AccountNumber producer, const auto& msg) { @@ -722,7 +1155,7 @@ namespace psibase::net { best_prepared = state->order(); set_view(state->info.header.term); - chain().set_subtree(state, "prepared by a quorum of producers"); + //chain().set_subtree(state, "prepared by a quorum of producers"); // Fork switch handled by caller. It cannot be handled // here because we might already by in the process of // switching forks @@ -737,13 +1170,9 @@ namespace psibase::net { const auto& id = state->blockId(); assert(chain().in_best_chain(state->xid())); - if (state->order() > best_prepare) + if (state->order() > best_confirm) { - if (!chain().in_best_chain(orderToXid(best_prepare))) - { - alt_prepare = best_prepare; - } - best_prepare = state->order(); + best_confirm = state->order(); } for_each_key(state, [&](const auto& key) @@ -751,7 +1180,7 @@ namespace psibase::net auto message = network().sign_message( PrepareMessage{.block_id = id, .producer = self, .signer = key}); on_prepare(state, self, message); - network().multicast_producers(message); + network().multicast_producers(id, message); }); } void save_commit_data(const BlockHeaderState* state) @@ -788,17 +1217,16 @@ namespace psibase::net } chain().setBlockData(state->blockId(), psio::convert_to_frac(result)); } - // void verify_commit(const BlockHeaderState* state) { - if (state->order() < best_prepared) + if (state->order() < best_prepared && + !chain().is_ancestor(state->xid(), orderToXid(best_prepared))) { PSIBASE_LOG(logger, critical) << "Consensus failure: committing a block that that is not in the best " "chain. This means that either there is a severe bug in the software, or " "the maximum number of byzantine faults was exceeded."; - // TODO: Now what? Trigger shutdown? Halt block production, but - // leave the server running at the last valid state? + throw consensus_failure{}; } } void on_commit(const BlockHeaderState* state, @@ -829,13 +1257,17 @@ namespace psibase::net void do_commit(const BlockHeaderState* state, AccountNumber producer) { const auto& id = state->blockId(); + if (state->order() > best_confirm) + { + best_confirm = state->order(); + } for_each_key(state, [&](const auto& key) { auto message = network().sign_message( CommitMessage{.block_id = id, .producer = self, .signer = key}); on_commit(state, self, message); - network().multicast_producers(message); + network().multicast_producers(id, message); }); } std::optional> makeBlockData(const BlockHeaderState* state) @@ -899,13 +1331,63 @@ namespace psibase::net { for (const auto& msg : iter->second.prepares) { - network().async_send_block(peer, msg, [](const std::error_code&) {}); + network().async_send(peer, msg, [](const std::error_code&) {}); } for (const auto& msg : iter->second.commits) { - network().async_send_block(peer, msg, [](const std::error_code&) {}); + network().async_send(peer, msg, [](const std::error_code&) {}); + } + } + Base::post_send_block(peer, id); + } + + void validate_aux_consensus_data(const BlockHeaderState* state, + psio::view> aux) + { + auto data = psio::from_frac(aux); + const auto& header = state->info.header; + check(data.blockNum >= header.commitNum && data.blockNum <= header.blockNum, + "blockNum out of range"); + auto committed = state; + while (committed->blockNum() > data.blockNum) + { + committed = chain().get_state(committed->info.header.previous); + if (!committed) + { + // TODO: We can't verify the signatures, but that's okay because + // they're proving somthing that we already know. We just + // need to remove or replace them for outgoing blocks. + return; } } + auto verifyState = chain().get_state(state->info.header.previous); + assert(verifyState && "accept_block_header requires the previous block to be known"); + verifyIrreversibleSignature(verifyState->authState->revision, data, committed); + if (committed->producers->algorithm == ConsensusAlgorithm::bft) + { + chain().setBlockData(committed->blockId(), aux); + } + // Ensure that the committed block is a candidate for the best block + set_view(committed->info.header.term); + if (!chain().in_best_chain(committed->xid())) + { + verify_commit(committed); + //chain().set_subtree(committed, "made irreversible by a subsequent block"); + // fork switch will be handled by the caller + auto [iter, _] = confirmations.try_emplace(committed->blockId(), state->producers, + state->nextProducers); + // Signal to on_accept_block that it should commit this block + // as soon as it is applied. + iter->second.committedByBlock = true; + } + else if (chain().commit(data.blockNum)) + { + start_timer(); + } + if (committed->order() > best_prepared) + { + best_prepared = committed->order(); + } } void on_accept_block_header(const BlockHeaderState* state) @@ -918,50 +1400,7 @@ namespace psibase::net auto block = chain().get(state->blockId()); if (auto aux = block->auxConsensusData()) { - auto data = psio::from_frac(*aux); - auto header = block->block().header(); - check(data.blockNum >= header.commitNum() && data.blockNum <= header.blockNum(), - "blockNum out of range"); - auto committed = state; - while (committed->blockNum() > data.blockNum) - { - committed = chain().get_state(committed->info.header.previous); - if (!committed) - { - // TODO: We can't verify the signatures, but that's okay because - // they're proving somthing that we already know. We just - // need to remove or replace them for outgoing blocks. - return; - } - } - auto verifyState = chain().get_state(state->info.header.previous); - assert(verifyState && "accept_block_header requires the previous block to be known"); - verifyIrreversibleSignature(verifyState->authState->revision, data, committed); - if (committed->producers->algorithm == ConsensusAlgorithm::bft) - { - chain().setBlockData(committed->blockId(), *aux); - } - // Ensure that the committed block is a candidate for the best block - set_view(committed->info.header.term); - if (!chain().in_best_chain(committed->xid())) - { - verify_commit(committed); - chain().set_subtree(committed, "made irreversible by a subsequent block"); - // fork switch will be handled by the caller - auto [iter, _] = confirmations.try_emplace(committed->blockId(), state->producers, - state->nextProducers); - // Signal to on_accept_block that it should commit this block - // as soon as it is applied. - iter->second.committedByBlock = true; - } - else if (chain().commit(data.blockNum)) - { - start_timer(); - } - if (committed->order() > best_prepared) - { - best_prepared = committed->order(); - } + validate_aux_consensus_data(state, *aux); } else { @@ -970,6 +1409,10 @@ namespace psibase::net check(state->singleProducer() || !state->needsIrreversibleSignature(), "Missing irreversibility proof"); } + if (std::ranges::binary_search(pendingBlocks, state->blockId())) + { + set_root(); + } } return Base::on_accept_block_header(state); } @@ -977,16 +1420,7 @@ namespace psibase::net void connect(peer_id peer) { Base::connect(peer); - if (active_producers[0]->algorithm == ConsensusAlgorithm::bft) - { - for (const auto& [term, msg] : producer_views[0]) - { - if (msg) - { - network().async_send_block(peer, *msg, [](const std::error_code&) {}); - } - } - } + send_all_view_changes(peer); } void recv(peer_id peer, const SignedMessage& msg) @@ -1003,7 +1437,7 @@ namespace psibase::net validate_producer(state, msg.data->producer(), msg.data->signer()); do_view_change(msg.data->producer(), msg.data->signer(), state->info.header.term); on_prepare(state, msg.data->producer(), msg); - Base::switch_fork(); + //Base::switch_fork(); } void recv(peer_id peer, const SignedMessage& msg) { @@ -1028,23 +1462,32 @@ namespace psibase::net if (set_producer_view(msg)) { check_view_change_threshold(); + set_root(); + Base::switch_fork(); // If this is a new view, notify our peers network().multicast(msg); } - if (const auto* response = get_newer_view(msg)) - { - assert(response->data->producer().unpack() == msg.data->producer().unpack()); - assert(response->data->term() > msg.data->term()); - // If we have a newer view than the sender, reply with our view - network().async_send_block(peer, *response, [](const std::error_code&) {}); - } if (current_term != saved_term) { Base::switch_fork(); } } - void recv(peer_id peer, const RequestViewMessage&) + std::optional validate_message(const PrepareMessage& msg) + { + // TODO: actual validation + return msg.block_id; + } + + std::optional validate_message(const CommitMessage& msg) + { + // TODO: actual validation + return msg.block_id; + } + + void recv(peer_id peer, const RequestViewMessage&) { send_all_view_changes(peer); } + + void send_all_view_changes(peer_id peer) { if (active_producers[0]->algorithm == ConsensusAlgorithm::bft) { @@ -1052,7 +1495,17 @@ namespace psibase::net { if (msg) { - network().async_send_block(peer, *msg, [](const std::error_code&) {}); + network().async_send(peer, *msg, [](const std::error_code&) {}); + } + } + if (active_producers[1]) + { + for (const auto& [term, msg] : producer_views[1]) + { + if (msg) + { + network().async_send(peer, *msg, [](const std::error_code&) {}); + } } } } diff --git a/libraries/net/include/psibase/blocknet.hpp b/libraries/net/include/psibase/blocknet.hpp index 4fbc54bf2..593854448 100644 --- a/libraries/net/include/psibase/blocknet.hpp +++ b/libraries/net/include/psibase/blocknet.hpp @@ -28,6 +28,17 @@ namespace psibase::net return tp - rem; } + template + struct print_function + { + F f; + friend std::ostream& operator<<(std::ostream& os, const print_function& f) + { + f.f(os); + return os; + } + }; + struct HelloRequest { static constexpr unsigned type = 32; @@ -163,6 +174,12 @@ namespace psibase::net connection.hello_sent = false; connection.hello.xid = chain().get_head_state()->xid(); async_send_hello(connection); + if (connection.hello.xid.id() == Checksum256{}) + { + connection.last_received = {Checksum256{}, 1}; + connection.last_sent = connection.last_received; + connection.ready = true; + } } void async_send_hello(peer_connection& connection) { @@ -188,25 +205,25 @@ namespace psibase::net } } connection.hello_sent = true; - network().async_send_block(connection.id, connection.hello, - [this, &connection](const std::error_code& ec) - { - if (connection.closed) - { - connection.peer_ready = true; - disconnect(connection.id); - return; - } - else if (ec) - { - connection.peer_ready = true; - } - if (!connection.peer_ready) - { - // TODO: rate limit hellos, delay second hello until we have received the first peer hello - async_send_hello(connection); - } - }); + network().async_send(connection.id, connection.hello, + [this, &connection](const std::error_code& ec) + { + if (connection.closed) + { + connection.peer_ready = true; + disconnect(connection.id); + return; + } + else if (ec) + { + connection.peer_ready = true; + } + if (!connection.peer_ready) + { + // TODO: rate limit hellos, delay second hello until we have received the first peer hello + async_send_hello(connection); + } + }); } void recv(peer_id origin, const HelloRequest& request) { @@ -228,6 +245,8 @@ namespace psibase::net // sync from genesis connection.last_received = {Checksum256{}, 1}; connection.last_sent = connection.last_received; + // With no common block, we don't expect to get a HelloResponse + connection.peer_ready = true; } else { @@ -255,9 +274,9 @@ namespace psibase::net //std::cout << "ready: received=" << to_string(connection.last_received.id()) // << " common=" << to_string(connection.last_sent.id()) << std::endl; // FIXME: blocks and hellos need to be sequenced correctly - network().async_send_block(connection.id, HelloResponse{}, - [this, &connection](const std::error_code&) - { async_send_fork(connection); }); + network().async_send(connection.id, HelloResponse{}, + [this, &connection](const std::error_code&) + { async_send_fork(connection); }); } void recv(peer_id origin, const HelloResponse&) { @@ -272,9 +291,17 @@ namespace psibase::net } bool is_sole_producer() const { - return ((active_producers[0]->size() == 0 && self != AccountNumber()) || - (active_producers[0]->size() == 1 && active_producers[0]->isProducer(self))) && - !active_producers[1]; + if (self == AccountNumber()) + { + return false; + } + if (active_producers[1]) + { + if (active_producers[1]->size() != 1 || !active_producers[1]->isProducer(self)) + return false; + } + return active_producers[0]->size() == 0 || + (active_producers[0]->size() == 1 && active_producers[0]->isProducer(self)); } bool is_producer() const { @@ -285,6 +312,13 @@ namespace psibase::net active_producers[0]->isProducer(self) || (active_producers[1] && active_producers[1]->isProducer(self)); } + bool is_producer(AccountNumber account) const + { + return (active_producers[0]->size() == 0 && account != AccountNumber() && + !active_producers[1]) || + active_producers[0]->isProducer(account) || + (active_producers[1] && active_producers[1]->isProducer(account)); + } producer_id producer_name() const { return self; } void set_producer_id(producer_id prod) @@ -301,6 +335,7 @@ namespace psibase::net // I believe that this is safe because the fact that it's // still the same node guarantees that the hand-off is atomic. consensus().set_producers({active_producers[0], active_producers[1]}); + network().on_producer_change(); // set_producers may abort the current block, // if we are no longer an active producer switch_fork(); @@ -406,12 +441,29 @@ namespace psibase::net // to rely on the invariant that there is an active block // iff _state == leader. start_leader(); + bool updatedProducers = false; + // If a new consensus was set while building this block, + // our current producers might be out-dated + if (b->info.header.newConsensus) + { + consensus().set_producers({b->producers, b->nextProducers}); + updatedProducers = true; + } // on_produce_block and on_fork_switch should both run // before set_producers, because they should see the // producers of this of this block. consensus().on_produce_block(b); consensus().on_fork_switch(&b->info.header); - consensus().set_producers(chain().getProducers()); + // Set tentative producers for the next block + if (b->endsJointConsensus()) + { + consensus().set_producers({b->nextProducers, nullptr}); + updatedProducers = true; + } + if (updatedProducers) + { + network().on_producer_change(); + } // do_gc needs to run after on_fork_switch, because // on_fork_switch is responsible for cleaning up any // pointers that will become dangling. @@ -469,9 +521,9 @@ namespace psibase::net peer.last_sent = {next_block_id, peer.last_sent.num() + 1}; auto next_block = chain().get(next_block_id); - network().async_send_block(peer.id, BlockMessage{next_block}, - [this, &peer](const std::error_code& e) - { async_send_fork(peer); }); + network().async_send(peer.id, BlockMessage{next_block}, + [this, &peer](const std::error_code& e) + { async_send_fork(peer); }); consensus().post_send_block(peer.id, peer.last_sent.id()); } else @@ -519,6 +571,10 @@ namespace psibase::net async_send_fork(*peer); } } + else + { + PSIBASE_LOG(logger, debug) << "Peer " << peer->id << " not ready"; + } // ------------------------------------------------------------------ } } @@ -530,11 +586,12 @@ namespace psibase::net { peer.last_sent = xid; } + consensus().post_send_block(peer.id, xid.id()); } void recv(peer_id origin, const BlockMessage& request) { - if (auto state = chain().insert(request.block)) + if (auto [state, inserted] = chain().insert(request.block); inserted) { try { @@ -545,12 +602,15 @@ namespace psibase::net chain().erase(state); throw; } - // TODO: update_last_received should run even if the block - // is already known. auto& connection = get_connection(origin); update_last_received(connection, state->xid()); switch_fork(); } + else if (state) + { + auto& connection = get_connection(origin); + update_last_received(connection, state->xid()); + } } // This should be called after any operation that might change the head block. @@ -564,7 +624,13 @@ namespace psibase::net PSIBASE_LOG(logger, debug) << "New head block"; } // TODO: only run set_producers when the producers actually changed - consensus().set_producers(chain().getProducers()); + auto producers = chain().getProducers(); + if (producers.first != active_producers[0] || + producers.second != active_producers[1]) + { + consensus().set_producers(std::move(producers)); + network().on_producer_change(); + } if (_state == producer_state::leader && chain().getBlockContext()->current.header.previous != head.blockId) { @@ -582,16 +648,36 @@ namespace psibase::net chain().gc([this](const auto& b) { consensus().on_erase_block(b); }); } + bool peer_has_block(peer_id peer, const Checksum256& id) + { + // If peer_has_block returns true, then the peer's receipt of the block + // happens before its receipt of any message sent after peer_has_block returns. + // + // If peer_has_block returns false, then one of the following will eventually happen: + // - The block is not in the best chain + // - post_send_block(peer, id) + + auto& connection = get_connection(peer); + if (chain().in_best_chain(id) && getBlockNum(id) <= connection.last_sent.num()) + { + return true; + } + return chain().is_ancestor(id, connection.last_received); + } + // Default implementations std::optional> makeBlockData(const BlockHeaderState*) { return {}; } void on_accept_block_header(const BlockHeaderState*) {} void on_produce_block(const BlockHeaderState*) {} void on_accept_block(const BlockHeaderState*) {} - void post_send_block(peer_id, const Checksum256&) {} - void on_erase_block(const Checksum256&) {} - void set_producers(auto prods) + void post_send_block(peer_id peer, const Checksum256& id) + { + network().on_peer_block(peer, id); + } + void on_erase_block(const Checksum256&) {} + void set_producers(auto prods) { - if (prods.first->size() != 0 || prods.second) + if (prods.first->size() != 0) throw std::runtime_error("Consensus algorithm not available"); active_producers[0] = std::move(prods.first); active_producers[1] = std::move(prods.second); @@ -605,5 +691,6 @@ namespace psibase::net } } void cancel() {} + void validate_message() {} }; } // namespace psibase::net diff --git a/libraries/net/include/psibase/cft.hpp b/libraries/net/include/psibase/cft.hpp index b47854353..114622581 100644 --- a/libraries/net/include/psibase/cft.hpp +++ b/libraries/net/include/psibase/cft.hpp @@ -87,6 +87,7 @@ namespace psibase::net using Base::self; using Base::start_leader; using Base::stop_leader; + using Base::validate_message; using Base::validate_producer; using typename Base::producer_state; @@ -172,6 +173,18 @@ namespace psibase::net _state = producer_state::unknown; } } + if (active_producers[0] != prods.first || active_producers[1] != prods.second) + { + PSIBASE_LOG(logger, info) << "Active producers: " << *prods.first + << print_function( + [&](std::ostream& os) + { + if (prods.second) + { + os << ", " << *prods.second; + } + }); + } active_producers[0] = std::move(prods.first); active_producers[1] = std::move(prods.second); if (is_producer()) @@ -360,7 +373,7 @@ namespace psibase::net void randomize_timer() { // Don't bother waiting if we're the only producer - if (active_producers[0]->size() <= 1 && !active_producers[1]) + if (this->is_sole_producer()) { if (_state == producer_state::follower) { @@ -504,6 +517,21 @@ namespace psibase::net check_votes(); } } + bool validate_message(const ConfirmMessage&) + { + // TODO: + return true; + } + bool validate_message(const RequestVoteRequest&) + { + // TODO: + return true; + } + bool validate_message(const RequestVoteResponse&) + { + // TODO: + return true; + } }; } // namespace psibase::net diff --git a/libraries/net/include/psibase/direct_routing.hpp b/libraries/net/include/psibase/direct_routing.hpp index 68bd0f2ca..f14ee2b4f 100644 --- a/libraries/net/include/psibase/direct_routing.hpp +++ b/libraries/net/include/psibase/direct_routing.hpp @@ -9,26 +9,15 @@ #include #include #include -#include #include +#include #include #include +#include #include namespace psibase::net { - // message type 0 is reserved to ensure that message signatures - // are disjoint from block signatures. - // message types 1-31 are used for routing messages - // message types 32-63 are used for consensus messages - struct InitMessage - { - static constexpr unsigned type = 1; - std::uint32_t version; - NodeId id; - std::string to_string() const { return "init: version=" + std::to_string(version); } - }; - PSIO_REFLECT(InitMessage, version, id); struct ProducerMessage { static constexpr unsigned type = 2; @@ -37,33 +26,24 @@ namespace psibase::net }; PSIO_REFLECT(ProducerMessage, producer) - template - concept has_block_id = requires(T& t) { t.block_id; }; - // This requires all producers to be peers template - struct direct_routing : message_serializer + struct direct_routing : routing_base { - auto& peers() { return static_cast(this)->peers(); } - auto& chain() { return static_cast(this)->chain(); } - explicit direct_routing(boost::asio::io_context& ctx) - { - std::random_device rng; - nodeId = std::uniform_int_distribution()(rng); - } - static const std::uint32_t protocol_version = 0; - auto get_message_impl() - { - return boost::mp11::mp_push_back< - typename std::remove_cvref_t< - decltype(static_cast(this)->consensus())>::message_type, - InitMessage, ProducerMessage>{}; - } + using base_type = routing_base; + using base_type::async_send; + using base_type::base_type; + using base_type::chain; + using base_type::consensus; + using base_type::peers; + using base_type::recv; + + using message_type = std::variant; + template void async_send_block(peer_id id, const Msg& msg, F&& f) { - PSIBASE_LOG(peers().logger(id), debug) << "Sending message: " << msg.to_string(); - peers().async_send(id, this->serialize_message(msg), std::forward(f)); + async_send(id, msg, f); } // Sends a message to each peer in a list // each peer will receive the message only once even if it is duplicated in the input list. @@ -108,18 +88,17 @@ namespace psibase::net struct connection; void connect(peer_id id) { - async_send_block(id, InitMessage{.version = protocol_version, .id = nodeId}, - [](const std::error_code&) {}); + base_type::connect(id); if (auto producer = static_cast(this)->consensus().producer_name(); producer != AccountNumber()) { async_send_block(id, ProducerMessage{producer}, [](const std::error_code&) {}); } - static_cast(this)->consensus().connect(id); + consensus().connect(id); } void disconnect(peer_id id) { - static_cast(this)->consensus().disconnect(id); + consensus().disconnect(id); for (auto iter = producers.begin(), end = producers.end(); iter != end;) { if (iter->second == id) @@ -132,105 +111,12 @@ namespace psibase::net } } } - template