Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pr/189 update #258

Open
wants to merge 6 commits into
base: trunk
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Multi-threaded validation and attestation in sentinel
Signed-off-by: Gert-Jaap Glasbergen <[email protected]>
wadagso-gertjaap authored and HalosGhost committed Apr 24, 2024
commit 1c13e11be58c8dda5c1e41333604957df4c92a8a
164 changes: 127 additions & 37 deletions src/uhs/twophase/sentinel_2pc/controller.cpp
Original file line number Diff line number Diff line change
@@ -88,42 +88,92 @@ namespace cbdc::sentinel_2pc {
return false;
}

auto n_threads = std::thread::hardware_concurrency() / 2;
if(n_threads < 1) {
n_threads = 1;
}
for(size_t i = 0; i < n_threads; i++) {
m_validation_threads.emplace_back([&]() {
validation_worker();
});
}

for(size_t i = 0; i < n_threads; i++) {
m_attestation_threads.emplace_back([&]() {
attestation_worker();
});
}

m_rpc_server = std::make_unique<decltype(m_rpc_server)::element_type>(
this,
std::move(rpc_server));

return true;
}

auto controller::execute_transaction(
transaction::full_tx tx,
execute_result_callback_type result_callback) -> bool {
const auto validation_err = transaction::validation::check_tx(tx);
if(validation_err.has_value()) {
auto tx_id = transaction::tx_id(tx);
m_logger->debug(
"Rejected (",
transaction::validation::to_string(validation_err.value()),
")",
to_string(tx_id));
result_callback(cbdc::sentinel::execute_response{
cbdc::sentinel::tx_status::static_invalid,
validation_err});
return true;
void controller::validation_worker() {
while(m_running) {
auto v = queued_validation();
if(m_validation_queue.pop(v)) {
auto [tx, cb] = v;
cb(std::move(tx), transaction::validation::check_tx(tx));
}
}
}

auto compact_tx = cbdc::transaction::compact_tx(tx);
auto controller::validate_tx(const transaction::full_tx& tx,
validation_callback cb) -> bool {
m_validation_queue.push({std::move(tx), std::move(cb)});
return true;
}

if(m_opts.m_attestation_threshold > 0) {
auto attestation = compact_tx.sign(m_secp.get(), m_privkey);
compact_tx.m_attestations.insert(attestation);
void controller::attestation_worker() {
while(m_running) {
auto v = queued_attestation();
if(m_attestation_queue.pop(v)) {
auto [tx, cb] = v;
auto compact_tx = cbdc::transaction::compact_tx(tx);
cb(std::move(tx), compact_tx.sign(m_secp.get(), m_privkey));
}
}
}

gather_attestations(tx, std::move(result_callback), compact_tx, {});

auto controller::attest_tx(const transaction::full_tx& tx,
attestation_callback cb) -> bool {
m_attestation_queue.push({std::move(tx), std::move(cb)});
return true;
}

auto controller::execute_transaction(
transaction::full_tx tx,
execute_result_callback_type result_callback) -> bool {
return controller::validate_tx(
std::move(tx),
[&, result_callback](
const transaction::full_tx& tx2,
std::optional<cbdc::transaction::validation::tx_error> err) {
auto tx_id = cbdc::transaction::tx_id(tx2);
if(err.has_value()) {
m_logger->debug(
"Rejected (",
transaction::validation::to_string(err.value()),
")",
to_string(tx_id));
result_callback(cbdc::sentinel::execute_response{
cbdc::sentinel::tx_status::static_invalid,
err});
return;
}

auto compact_tx = cbdc::transaction::compact_tx(tx2);
gather_attestations(std::move(tx2),
std::move(result_callback),
compact_tx,
{});
return;
});
}

void
controller::result_handler(std::optional<bool> res,
const execute_result_callback_type& res_cb) {
@@ -143,15 +193,23 @@ namespace cbdc::sentinel_2pc {
auto controller::validate_transaction(
transaction::full_tx tx,
validate_result_callback_type result_callback) -> bool {
const auto validation_err = transaction::validation::check_tx(tx);
if(validation_err.has_value()) {
result_callback(std::nullopt);
return true;
}
auto compact_tx = cbdc::transaction::compact_tx(tx);
auto attestation = compact_tx.sign(m_secp.get(), m_privkey);
result_callback(std::move(attestation));
return true;
return controller::validate_tx(
std::move(tx),
[&, result_callback](
const transaction::full_tx& tx2,
std::optional<cbdc::transaction::validation::tx_error> err) {
if(err.has_value()) {
result_callback(std::nullopt);
return;
}
controller::attest_tx(
std::move(tx2),
[&, result_callback](
const transaction::full_tx& /* tx3 */,
std::optional<cbdc::sentinel::validate_response> res) {
result_callback(std::move(res));
});
});
}

void controller::validate_result_handler(
@@ -173,14 +231,44 @@ namespace cbdc::sentinel_2pc {
std::move(requested));
}

void controller::stop() {
m_running = false;
for(auto& t : m_validation_threads) {
if(t.joinable()) {
t.join();
}
}

for(auto& t : m_attestation_threads) {
if(t.joinable()) {
t.join();
}
}
}

void controller::gather_attestations(
const transaction::full_tx& tx,
execute_result_callback_type result_callback,
const transaction::compact_tx& ctx,
std::unordered_set<size_t> requested) {
if(ctx.m_attestations.size() < m_opts.m_attestation_threshold) {
if(ctx.m_attestations.size() == 0) {
// Self-attest first
controller::attest_tx(
std::move(tx),
[&, ctx, result_callback](const transaction::full_tx& tx2,
validate_result res) {
validate_result_handler(res,
tx2,
result_callback,
ctx,
{});
});

return;
}
auto success = false;
while(!success) {
while(!success && m_running) {
auto sentinel_id = m_dist(m_rand);
if(requested.find(sentinel_id) != requested.end()) {
continue;
@@ -209,14 +297,16 @@ namespace cbdc::sentinel_2pc {
void
controller::send_compact_tx(const transaction::compact_tx& ctx,
execute_result_callback_type result_callback) {
auto cb =
[&, res_cb = std::move(result_callback)](std::optional<bool> res) {
result_handler(res, res_cb);
};
auto cb = [&, this, ctx, res_cb = std::move(result_callback)](
std::optional<bool> res) {
result_handler(res, res_cb);
};

// TODO: add a "retry" error response to offload sentinels from this
// TODO: add a "retry" error response to offload sentinels from
// this
// infinite retry responsibility.
while(!m_coordinator_client.execute_transaction(ctx, cb)) {
while(!m_coordinator_client.execute_transaction(ctx, cb)
&& m_running) {
// TODO: the network currently doesn't provide a callback for
// reconnection events so we have to sleep here to
// prevent a needless spin. Instead, add such a callback
32 changes: 32 additions & 0 deletions src/uhs/twophase/sentinel_2pc/controller.hpp
Original file line number Diff line number Diff line change
@@ -13,6 +13,7 @@
#include "uhs/sentinel/format.hpp"
#include "uhs/transaction/messages.hpp"
#include "uhs/twophase/coordinator/client.hpp"
#include "util/common/blocking_queue.hpp"
#include "util/common/config.hpp"
#include "util/common/hashmap.hpp"
#include "util/network/connection_manager.hpp"
@@ -66,7 +67,22 @@ namespace cbdc::sentinel_2pc {
validate_result_callback_type result_callback)
-> bool override;

/// Cleanly stop the controller
void stop();

private:
using validation_callback = std::function<void(
const transaction::full_tx&,
std::optional<cbdc::transaction::validation::tx_error>)>;
using queued_validation
= std::pair<transaction::full_tx, validation_callback>;

using attestation_callback = std::function<void(
const transaction::full_tx&,
std::optional<cbdc::sentinel::validate_response>)>;
using queued_attestation
= std::pair<transaction::full_tx, attestation_callback>;

static void result_handler(std::optional<bool> res,
const execute_result_callback_type& res_cb);

@@ -85,10 +101,24 @@ namespace cbdc::sentinel_2pc {
void send_compact_tx(const transaction::compact_tx& ctx,
execute_result_callback_type result_callback);

auto validate_tx(const transaction::full_tx& tx,
validation_callback cb) -> bool;
void validation_worker();

auto attest_tx(const transaction::full_tx& tx, attestation_callback cb)
-> bool;
void attestation_worker();

uint32_t m_sentinel_id;
cbdc::config::options m_opts;
std::shared_ptr<logging::log> m_logger;

blocking_queue<queued_validation> m_validation_queue{};
std::vector<std::thread> m_validation_threads{};

blocking_queue<queued_attestation> m_attestation_queue{};
std::vector<std::thread> m_attestation_threads{};

std::unique_ptr<cbdc::sentinel::rpc::async_server> m_rpc_server;

std::unique_ptr<secp256k1_context,
@@ -106,6 +136,8 @@ namespace cbdc::sentinel_2pc {
std::uniform_int_distribution<size_t> m_dist{};

privkey_t m_privkey{};

std::atomic<bool> m_running{true};
};
}

2 changes: 2 additions & 0 deletions src/uhs/twophase/sentinel_2pc/sentineld_2pc.cpp
Original file line number Diff line number Diff line change
@@ -63,6 +63,8 @@ auto main(int argc, char** argv) -> int {

logger->info("Shutting down...");

ctl.stop();

return 0;
}
// LCOV_EXCL_STOP