Skip to content

Commit

Permalink
Squash merge PR nanocurrency#4654: Add is_originator flag to publis…
Browse files Browse the repository at this point in the history
…h messages
  • Loading branch information
gr0vity committed Jun 26, 2024
1 parent 4a125e4 commit 4e2a06b
Show file tree
Hide file tree
Showing 18 changed files with 344 additions and 148 deletions.
91 changes: 89 additions & 2 deletions nano/core_test/ledger.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#include "nano/lib/numbers.hpp"

#include <nano/lib/blocks.hpp>
#include <nano/lib/logging.hpp>
#include <nano/lib/numbers.hpp>
#include <nano/lib/stats.hpp>
#include <nano/lib/threading.hpp>
#include <nano/node/active_elections.hpp>
Expand Down Expand Up @@ -5644,3 +5643,91 @@ TEST (ledger_receivable, any_one)
ASSERT_TRUE (ctx.ledger ().any.receivable_exists (ctx.ledger ().tx_begin_read (), nano::dev::genesis_key.pub));
ASSERT_FALSE (ctx.ledger ().any.receivable_exists (ctx.ledger ().tx_begin_read (), key.pub));
}

TEST (ledger_transaction, write_refresh)
{
auto ctx = nano::test::context::ledger_empty ();
nano::block_builder builder;
nano::keypair key;
auto send1 = builder
.state ()
.account (nano::dev::genesis_key.pub)
.previous (nano::dev::genesis->hash ())
.representative (nano::dev::genesis_key.pub)
.balance (nano::dev::constants.genesis_amount - nano::Gxrb_ratio)
.link (nano::dev::genesis_key.pub)
.sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub)
.work (*ctx.pool ().generate (nano::dev::genesis->hash ()))
.build ();
auto send2 = builder
.state ()
.account (nano::dev::genesis_key.pub)
.previous (send1->hash ())
.representative (nano::dev::genesis_key.pub)
.balance (nano::dev::constants.genesis_amount - 2 * nano::Gxrb_ratio)
.link (key.pub)
.sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub)
.work (*ctx.pool ().generate (send1->hash ()))
.build ();

auto transaction = ctx.ledger ().tx_begin_write ();
ASSERT_EQ (nano::block_status::progress, ctx.ledger ().process (transaction, send1));
// Force refresh
ASSERT_TRUE (transaction.refresh_if_needed (0ms));
ASSERT_FALSE (transaction.refresh_if_needed ()); // Should not refresh again too soon
// Refreshed transaction should work just fine
ASSERT_EQ (nano::block_status::progress, ctx.ledger ().process (transaction, send2));
}

TEST (ledger_transaction, write_wait_order)
{
nano::test::system system;

auto ctx = nano::test::context::ledger_empty ();

std::atomic<bool> acquired1{ false };
std::atomic<bool> acquired2{ false };
std::atomic<bool> acquired3{ false };

std::latch latch1{ 1 };
std::latch latch2{ 1 };
std::latch latch3{ 1 };

auto fut1 = std::async (std::launch::async, [&] {
auto tx = ctx.ledger ().tx_begin_write ({}, nano::store::writer::generic);
acquired1 = true;
latch1.wait (); // Wait for the signal to drop tx
});
WAIT (250ms); // Allow thread to start

auto fut2 = std::async (std::launch::async, [&ctx, &acquired2, &latch2] {
auto tx = ctx.ledger ().tx_begin_write ({}, nano::store::writer::blockprocessor);
acquired2 = true;
latch2.wait (); // Wait for the signal to drop tx
});
WAIT (250ms); // Allow thread to start

auto fut3 = std::async (std::launch::async, [&ctx, &acquired3, &latch3] {
auto tx = ctx.ledger ().tx_begin_write ({}, nano::store::writer::confirmation_height);
acquired3 = true;
latch3.wait (); // Wait for the signal to drop tx
});
WAIT (250ms); // Allow thread to start

// First transaction should be ready immediately, others should be waiting
ASSERT_TIMELY (5s, acquired1.load ());
ASSERT_NEVER (250ms, acquired2.load ());
ASSERT_NEVER (250ms, acquired3.load ());

// Signal to continue and drop the first transaction
latch1.count_down ();
ASSERT_TIMELY (5s, acquired2.load ());
ASSERT_NEVER (250ms, acquired3.load ());

// Signal to continue and drop the second transaction
latch2.count_down ();
ASSERT_TIMELY (5s, acquired3.load ());

// Signal to continue and drop the third transaction
latch3.count_down ();
}
90 changes: 71 additions & 19 deletions nano/core_test/message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,31 @@ std::shared_ptr<nano::block> random_block ()
}
}

TEST (message, header_version)
{
// Simplest message type
nano::keepalive original{ nano::dev::network_params.network };

// Serialize the original keepalive message
std::vector<uint8_t> bytes;
{
nano::vectorstream stream (bytes);
original.serialize (stream);
}

// Deserialize the byte stream back to a message header
nano::bufferstream stream (bytes.data (), bytes.size ());
bool error = false;
nano::message_header header (error, stream);
ASSERT_FALSE (error);

// Check header versions
ASSERT_EQ (nano::dev::network_params.network.protocol_version_min, header.version_min);
ASSERT_EQ (nano::dev::network_params.network.protocol_version, header.version_using);
ASSERT_EQ (nano::dev::network_params.network.protocol_version, header.version_max);
ASSERT_EQ (nano::message_type::keepalive, header.type);
}

TEST (message, keepalive_serialization)
{
nano::keepalive request1{ nano::dev::network_params.network };
Expand Down Expand Up @@ -62,33 +87,60 @@ TEST (message, keepalive_deserialize)
ASSERT_EQ (message1.peers, message2.peers);
}

TEST (message, publish_serialization)
TEST (message, publish)
{
// Create a random block
auto block = random_block ();
nano::publish publish{ nano::dev::network_params.network, block };
ASSERT_EQ (nano::block_type::send, publish.header.block_type ());
nano::publish original{ nano::dev::network_params.network, block };
ASSERT_FALSE (original.is_originator ());

// Serialize the original publish message
std::vector<uint8_t> bytes;
{
nano::vectorstream stream (bytes);
publish.header.serialize (stream);
}
ASSERT_EQ (8, bytes.size ());
ASSERT_EQ (0x52, bytes[0]);
ASSERT_EQ (0x41, bytes[1]);
ASSERT_EQ (nano::dev::network_params.network.protocol_version, bytes[2]);
ASSERT_EQ (nano::dev::network_params.network.protocol_version, bytes[3]);
ASSERT_EQ (nano::dev::network_params.network.protocol_version_min, bytes[4]);
ASSERT_EQ (static_cast<uint8_t> (nano::message_type::publish), bytes[5]);
ASSERT_EQ (0x00, bytes[6]); // extensions
ASSERT_EQ (static_cast<uint8_t> (nano::block_type::send), bytes[7]);
original.serialize (stream);
}

// Deserialize the byte stream back to a publish message
nano::bufferstream stream (bytes.data (), bytes.size ());
auto error (false);
bool error = false;
nano::message_header header (error, stream);
ASSERT_FALSE (error);
ASSERT_EQ (nano::dev::network_params.network.protocol_version_min, header.version_min);
ASSERT_EQ (nano::dev::network_params.network.protocol_version, header.version_using);
ASSERT_EQ (nano::dev::network_params.network.protocol_version, header.version_max);
ASSERT_EQ (nano::message_type::publish, header.type);
nano::publish deserialized (error, stream, header);
ASSERT_FALSE (error);

// Assert that the original and deserialized messages are equal
ASSERT_EQ (original, deserialized);
ASSERT_EQ (*original.block, *deserialized.block);
ASSERT_EQ (original.is_originator (), deserialized.is_originator ());
}

TEST (message, publish_originator_flag)
{
// Create a random block
auto block = random_block ();
nano::publish original{ nano::dev::network_params.network, block, /* originator */ true };
ASSERT_TRUE (original.is_originator ());

// Serialize the original publish message
std::vector<uint8_t> bytes;
{
nano::vectorstream stream (bytes);
original.serialize (stream);
}

// Deserialize the byte stream back to a publish message
nano::bufferstream stream (bytes.data (), bytes.size ());
bool error = false;
nano::message_header header (error, stream);
ASSERT_FALSE (error);
nano::publish deserialized (error, stream, header);
ASSERT_FALSE (error);

// Assert that the originator flag is set correctly in both the original and deserialized messages
ASSERT_TRUE (deserialized.is_originator ());
ASSERT_EQ (original, deserialized);
ASSERT_EQ (*original.block, *deserialized.block);
}

TEST (message, confirm_header_flags)
Expand Down
1 change: 1 addition & 0 deletions nano/lib/stats_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ enum class detail

// block source
live,
live_originator,
bootstrap,
bootstrap_legacy,
unchecked,
Expand Down
5 changes: 3 additions & 2 deletions nano/node/blockprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ nano::block_processor::block_processor (nano::node & node_a) :
switch (origin.source)
{
case nano::block_source::live:
case nano::block_source::live_originator:
return config.max_peer_queue;
default:
return config.max_system_queue;
Expand All @@ -64,6 +65,7 @@ nano::block_processor::block_processor (nano::node & node_a) :
switch (origin.source)
{
case nano::block_source::live:
case nano::block_source::live_originator:
return config.priority_live;
case nano::block_source::bootstrap:
case nano::block_source::bootstrap_legacy:
Expand Down Expand Up @@ -295,8 +297,7 @@ auto nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock
{
processed_batch_t processed;

auto scoped_write_guard = node.store.write_queue.wait (nano::store::writer::process_batch);
auto transaction = node.ledger.tx_begin_write ({ tables::accounts, tables::blocks, tables::pending, tables::rep_weights });
auto transaction = node.ledger.tx_begin_write ({ tables::accounts, tables::blocks, tables::pending, tables::rep_weights }, nano::store::writer::blockprocessor);
nano::timer<std::chrono::milliseconds> timer_l;

lock_a.lock ();
Expand Down
15 changes: 8 additions & 7 deletions nano/node/blockprocessor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ enum class block_source
{
unknown = 0,
live,
live_originator,
bootstrap,
bootstrap_legacy,
unchecked,
Expand Down Expand Up @@ -67,10 +68,10 @@ class block_processor final
class context
{
public:
context (std::shared_ptr<nano::block> block, block_source source);
context (std::shared_ptr<nano::block> block, nano::block_source source);

std::shared_ptr<nano::block> const block;
block_source const source;
nano::block_source const source;
std::chrono::steady_clock::time_point const arrival{ std::chrono::steady_clock::now () };

public:
Expand All @@ -85,18 +86,18 @@ class block_processor final
};

public:
block_processor (nano::node &);
explicit block_processor (nano::node &);
~block_processor ();

void start ();
void stop ();

std::size_t size () const;
std::size_t size (block_source) const;
std::size_t size (nano::block_source) const;
bool full () const;
bool half_full () const;
bool add (std::shared_ptr<nano::block> const &, block_source = block_source::live, std::shared_ptr<nano::transport::channel> const & channel = nullptr);
std::optional<nano::block_status> add_blocking (std::shared_ptr<nano::block> const & block, block_source);
bool add (std::shared_ptr<nano::block> const &, nano::block_source = nano::block_source::live, std::shared_ptr<nano::transport::channel> const & channel = nullptr);
std::optional<nano::block_status> add_blocking (std::shared_ptr<nano::block> const & block, nano::block_source);
void force (std::shared_ptr<nano::block> const &);
bool should_log ();

Expand Down Expand Up @@ -128,7 +129,7 @@ class block_processor final
nano::node & node;

private:
nano::fair_queue<context, block_source> queue;
nano::fair_queue<context, nano::block_source> queue;

std::chrono::steady_clock::time_point next_log;

Expand Down
8 changes: 4 additions & 4 deletions nano/node/confirming_set.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,13 @@ void nano::confirming_set::run_batch (std::unique_lock<std::mutex> & lock)
lock.unlock ();

{
// TODO: Properly limiting batch times requires this <guard, transaction> combo to be wrapped in a single object that provides refresh functionality
auto guard = ledger.store.write_queue.wait (nano::store::writer::confirmation_height);
auto tx = ledger.tx_begin_write ({ nano::tables::confirmation_height });
auto transaction = ledger.tx_begin_write ({ nano::tables::confirmation_height }, nano::store::writer::confirmation_height);

for (auto const & hash : batch)
{
auto added = ledger.confirm (tx, hash);
transaction.refresh_if_needed ();

auto added = ledger.confirm (transaction, hash);
if (!added.empty ())
{
// Confirming this block may implicitly confirm more
Expand Down
5 changes: 3 additions & 2 deletions nano/node/message_processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ void nano::message_processor::run_batch (nano::unique_lock<nano::mutex> & lock)

namespace
{
// TODO: This was moved, so compare with latest develop before merging to avoid merge bugs
class process_visitor : public nano::message_visitor
{
public:
Expand All @@ -184,7 +183,9 @@ class process_visitor : public nano::message_visitor

void publish (nano::publish const & message) override
{
bool added = node.block_processor.add (message.block, nano::block_source::live, channel);
// Put blocks that are being initally broadcasted in a separate queue, so that they won't have to compete with rebroadcasted blocks
// Both queues have the same priority and size, so the potential for exploiting this is limited
bool added = node.block_processor.add (message.block, message.is_originator () ? nano::block_source::live_originator : nano::block_source::live, channel);
if (!added)
{
node.network.publish_filter.clear (message.digest);
Expand Down
10 changes: 9 additions & 1 deletion nano/node/messages.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -433,11 +433,12 @@ nano::publish::publish (bool & error_a, nano::stream & stream_a, nano::message_h
}
}

nano::publish::publish (nano::network_constants const & constants, std::shared_ptr<nano::block> const & block_a) :
nano::publish::publish (nano::network_constants const & constants, std::shared_ptr<nano::block> const & block_a, bool is_originator_a) :
message (constants, nano::message_type::publish),
block (block_a)
{
header.block_type_set (block->type ());
header.flag_set (originator_flag, is_originator_a);
}

void nano::publish::serialize (nano::stream & stream_a) const
Expand Down Expand Up @@ -465,11 +466,17 @@ bool nano::publish::operator== (nano::publish const & other_a) const
return *block == *other_a.block;
}

bool nano::publish::is_originator () const
{
return header.flag_test (originator_flag);
}

void nano::publish::operator() (nano::object_stream & obs) const
{
nano::message::operator() (obs); // Write common data

obs.write ("block", block);
obs.write ("originator", is_originator ());
}

/*
Expand Down Expand Up @@ -682,6 +689,7 @@ void nano::confirm_ack::operator() (nano::object_stream & obs) const
nano::message::operator() (obs); // Write common data

obs.write ("vote", vote);
obs.write ("rebroadcasted", is_rebroadcasted ());
}

/*
Expand Down
Loading

0 comments on commit 4e2a06b

Please sign in to comment.