Skip to content

Commit

Permalink
Stub in even subscriber.
Browse files Browse the repository at this point in the history
  • Loading branch information
evoskuil committed Feb 10, 2024
1 parent 560523f commit db18489
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 92 deletions.
35 changes: 10 additions & 25 deletions include/bitcoin/node/chasers/chaser.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,49 +28,34 @@ namespace node {
class full_node;

/// Abstract base chaser.

/// Chasers impose order on blockchain/pool construction as necessary.

/// Each chaser operates on its own strand, implemented here, allowing
/// concurrent chaser operations to the extent that threads are available.

/// Events are passed between chasers using the full_node shared notifier.
/// Notifications are bounced from sink (e.g. chaser) to its strand, and
/// full_node::notify bounces from source (e.g. chaser) to network strand.

class BCN_API chaser
: public network::enable_shared_from_base<chaser>,
public network::reporter
{
public:
typedef uint64_t object_key;
typedef network::desubscriber<object_key> subscriber;
typedef subscriber::handler notifier;
DELETE_COPY_MOVE(chaser);

/// Start/stop.
/// -----------------------------------------------------------------------
void start(network::result_handler&& handler) NOEXCEPT;
void stop() NOEXCEPT;

/// Subscriptions.
/// -----------------------------------------------------------------------
object_key subscribe(notifier&& handler) NOEXCEPT;
bool notify(object_key key) NOEXCEPT;

/// Properties.
/// -----------------------------------------------------------------------
bool stopped() const NOEXCEPT;
bool stranded() const NOEXCEPT;
/// True if the current thread is on the chaser strand.
virtual bool stranded() const NOEXCEPT;

protected:
chaser(full_node& node) NOEXCEPT;
virtual ~chaser() NOEXCEPT;

private:
object_key create_key() NOEXCEPT;
void do_stop() NOEXCEPT;

// These are thread safe (mostly).
full_node& node_;
network::asio::strand strand_;
std::atomic_bool stopped_{ true };

// These are protected by the strand.
object_key keys_{};
subscriber subscriber_;
};

} // namespace node
Expand Down
4 changes: 4 additions & 0 deletions include/bitcoin/node/full_node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class BCN_API full_node
typedef std::shared_ptr<full_node> ptr;
typedef database::store<database::map> store;
typedef database::query<store> query;
typedef network::subscriber<> event_subscriber;

/// Constructors.
/// -----------------------------------------------------------------------
Expand Down Expand Up @@ -78,6 +79,9 @@ class BCN_API full_node
// These are thread safe.
const configuration& config_;
query& query_;

// This is protected by strand.
event_subscriber event_subscriber_;
};

} // namespace node
Expand Down
65 changes: 0 additions & 65 deletions src/chasers/chaser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,84 +30,19 @@ BC_PUSH_WARNING(NO_THROW_IN_NOEXCEPT)
chaser::chaser(full_node& node) NOEXCEPT
: node_(node),
strand_(node.service().get_executor()),
subscriber_(strand_),
reporter(node.log)
{
}

chaser::~chaser() NOEXCEPT
{
BC_ASSERT_MSG(stopped(), "The chaser was not stopped.");
if (!stopped()) { LOGF("~chaser is not stopped."); }
}

void chaser::start(network::result_handler&& handler) NOEXCEPT
{
if (!stopped())
{
handler(network::error::operation_failed);
return;
}

stopped_.store(false);
handler(network::error::success);
}

void chaser::stop() NOEXCEPT
{
stopped_.store(true);

// The chaser can be deleted once threadpool joins after this call.
boost::asio::post(strand_,
std::bind(&chaser::do_stop, this));
}

chaser::object_key chaser::subscribe(notifier&& handler) NOEXCEPT
{
BC_ASSERT_MSG(stranded(), "strand");
const auto key = create_key();
subscriber_.subscribe(std::move(handler), key);
return key;
}

// TODO: closing channel notifies itself to desubscribe.
bool chaser::notify(object_key key) NOEXCEPT
{
return subscriber_.notify_one(key, network::error::success);
}

bool chaser::stopped() const NOEXCEPT
{
return stopped_.load();
}

bool chaser::stranded() const NOEXCEPT
{
return strand_.running_in_this_thread();
}

// private
chaser::object_key chaser::create_key() NOEXCEPT
{
BC_ASSERT_MSG(stranded(), "strand");

if (is_zero(++keys_))
{
BC_ASSERT_MSG(false, "overflow");
LOGF("Chaser object overflow.");
}

return keys_;
}

// private
void chaser::do_stop() NOEXCEPT
{
BC_ASSERT_MSG(stranded(), "strand");

subscriber_.stop(network::error::service_stopped);
}

BC_POP_WARNING()

} // namespace database
Expand Down
5 changes: 3 additions & 2 deletions src/full_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,13 @@ BC_PUSH_WARNING(NO_THROW_IN_NOEXCEPT)
using namespace network;
using namespace std::placeholders;

// TODO: replace channel_heartbeat.
// p2p::strand() as it is non-virtual (safe to call from constructor).
full_node::full_node(query& query, const configuration& configuration,
const logger& log) NOEXCEPT
: p2p(configuration.network, log),
config_(configuration),
query_(query)
query_(query),
event_subscriber_(strand())
{
}

Expand Down

0 comments on commit db18489

Please sign in to comment.