Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
iljakuklic committed Jan 26, 2024
1 parent 3d1ac03 commit 59a612f
Show file tree
Hide file tree
Showing 11 changed files with 90 additions and 24 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion blockprod/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ mod tests {
let tip_sx = utils::sync::Mutex::new(Some(tip_sx));
mempool
.call_mut(move |m| {
m.subscribe_to_events(Arc::new({
m.subscribe_to_subsystem_events(Arc::new({
move |evt| match evt {
mempool::event::MempoolEvent::NewTip(tip) => {
if let Some(tip_sx) = tip_sx.lock().unwrap().take() {
Expand Down
1 change: 1 addition & 0 deletions mempool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ serialization = { path = '../serialization' }
subsystem = { path = '../subsystem' }
tokens-accounting = { path = '../tokens-accounting' }
utils = { path = '../utils' }
utils-tokio = { path = '../utils/tokio' }
utxo = { path = '../utxo' }

anyhow.workspace = true
Expand Down
27 changes: 25 additions & 2 deletions mempool/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,14 @@ impl TransactionProcessed {
pub fn relay_policy(&self) -> TxRelayPolicy {
self.relay_policy
}

pub fn to_rpc_event(&self) -> Option<RpcMempoolEvent> {
todo!()
}
}

/// Event triggered when mempool has synced up to given tip
#[derive(Debug, Clone, Eq, PartialEq)]
#[derive(Debug, Clone, Eq, PartialEq, serde::Serialize, serde::Deserialize)]
pub struct NewTip {
block_id: Id<Block>,
height: BlockHeight,
Expand All @@ -102,13 +106,22 @@ impl NewTip {
}
}

/// Events emitted by mempool
/// Events emitted by mempool destined for other subsystems
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum MempoolEvent {
NewTip(NewTip),
TransactionProcessed(TransactionProcessed),
}

impl MempoolEvent {
pub fn to_rpc_event(&self) -> Option<RpcMempoolEvent> {
match self {
MempoolEvent::NewTip(new_tip) => Some(RpcMempoolEvent::NewTip(new_tip.clone())),
MempoolEvent::TransactionProcessed(tx_processed) => tx_processed.to_rpc_event(),
}
}
}

impl From<TransactionProcessed> for MempoolEvent {
fn from(event: TransactionProcessed) -> Self {
Self::TransactionProcessed(event)
Expand All @@ -120,3 +133,13 @@ impl From<NewTip> for MempoolEvent {
Self::NewTip(event)
}
}

/// Events emitted by mempool destined for RPC subscribers
#[derive(Debug, Clone, Eq, PartialEq, serde::Serialize, serde::Deserialize)]
pub enum RpcMempoolEvent {
NewTip(NewTip),
TransactionProcessed {
tx_id: Id<Transaction>,
origin: TxOrigin,
},
}
8 changes: 6 additions & 2 deletions mempool/src/interface/mempool_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

use crate::{
error::{BlockConstructionError, Error},
event::MempoolEvent,
event::{MempoolEvent, RpcMempoolEvent},
tx_accumulator::{PackingStrategy, TransactionAccumulator},
tx_origin::{LocalTxOrigin, RemoteTxOrigin},
FeeRate, MempoolMaxSize, TxOptions, TxStatus,
Expand All @@ -25,6 +25,7 @@ use common::{
primitives::Id,
};
use std::{num::NonZeroUsize, sync::Arc};
use utils_tokio::broadcaster;

pub trait MempoolInterface: Send + Sync {
/// Add a transaction from remote peer to mempool
Expand Down Expand Up @@ -73,7 +74,10 @@ pub trait MempoolInterface: Send + Sync {
) -> Result<Option<Box<dyn TransactionAccumulator>>, BlockConstructionError>;

/// Subscribe to events emitted by mempool
fn subscribe_to_events(&mut self, handler: Arc<dyn Fn(MempoolEvent) + Send + Sync>);
fn subscribe_to_subsystem_events(&mut self, handler: Arc<dyn Fn(MempoolEvent) + Send + Sync>);

/// Subscribe to RPC events emitted by mempool
fn subscribe_to_rpc_events(&mut self) -> broadcaster::Receiver<RpcMempoolEvent>;

/// Get current memory usage
fn memory_usage(&self) -> usize;
Expand Down
10 changes: 7 additions & 3 deletions mempool/src/interface/mempool_interface_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
use crate::{
config::MempoolConfig,
error::{BlockConstructionError, Error},
event::MempoolEvent,
event::{MempoolEvent, RpcMempoolEvent},
pool::memory_usage_estimator::StoreMemoryUsageEstimator,
tx_accumulator::{PackingStrategy, TransactionAccumulator},
tx_origin::{LocalTxOrigin, RemoteTxOrigin},
Expand Down Expand Up @@ -190,8 +190,12 @@ impl MempoolInterface for MempoolImpl {
self.mempool.collect_txs(tx_accumulator, transaction_ids, packing_strategy)
}

fn subscribe_to_events(&mut self, handler: Arc<dyn Fn(MempoolEvent) + Send + Sync>) {
self.mempool.subscribe_to_events(handler);
fn subscribe_to_subsystem_events(&mut self, handler: Arc<dyn Fn(MempoolEvent) + Send + Sync>) {
self.mempool.subscribe_to_subsystem_events(handler);
}

fn subscribe_to_rpc_events(&mut self) -> utils_tokio::broadcaster::Receiver<RpcMempoolEvent> {
self.mempool.subscribe_to_rpc_events()
}

fn memory_usage(&self) -> usize {
Expand Down
34 changes: 26 additions & 8 deletions mempool/src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use common::{
use logging::log;
use serialization::Encode;
use utils::{ensure, eventhandler::EventsController, shallow_clone::ShallowClone};
use utils_tokio::broadcaster;

pub use self::feerate::FeeRate;
pub use self::memory_usage_estimator::MemoryUsageEstimator;
Expand All @@ -59,7 +60,7 @@ use crate::{
BlockConstructionError, Error, MempoolConflictError, MempoolPolicyError, OrphanPoolError,
TxValidationError,
},
event::{self, MempoolEvent},
event::{self, MempoolEvent, RpcMempoolEvent},
tx_accumulator::{PackingStrategy, TransactionAccumulator},
tx_options::{TxOptions, TxTrustPolicy},
tx_origin::{RemoteTxOrigin, TxOrigin},
Expand Down Expand Up @@ -94,7 +95,8 @@ pub struct Mempool<M> {
chainstate_handle: chainstate::ChainstateHandle,
clock: TimeGetter,
memory_usage_estimator: M,
events_controller: EventsController<MempoolEvent>,
subsystem_events: EventsController<MempoolEvent>,
rpc_events: broadcaster::Broadcaster<RpcMempoolEvent>,
tx_verifier: tx_verifier::TransactionVerifier,
orphans: TxOrphanPool,
}
Expand Down Expand Up @@ -130,7 +132,8 @@ impl<M> Mempool<M> {
rolling_fee_rate: RwLock::new(RollingFeeRate::new(clock.get_time())),
clock,
memory_usage_estimator,
events_controller: Default::default(),
subsystem_events: Default::default(),
rpc_events: broadcaster::Broadcaster::new(),
tx_verifier,
orphans: TxOrphanPool::new(),
}
Expand Down Expand Up @@ -171,6 +174,14 @@ impl<M> Mempool<M> {
pub fn max_size(&self) -> MempoolMaxSize {
self.max_size
}

fn broadcast(&mut self, event: MempoolEvent) {
if let Some(rpc_event) = event.to_rpc_event() {
self.rpc_events.broadcast(&rpc_event);
}

self.subsystem_events.broadcast(event);
}
}

// Rolling-fee-related methods
Expand Down Expand Up @@ -965,7 +976,7 @@ impl<M: MemoryUsageEstimator> Mempool<M> {
self.store.assert_valid();

let event = event::TransactionProcessed::accepted(tx_id, relay_policy, origin);
self.events_controller.broadcast(event.into());
self.broadcast(event.into());

Ok(TxStatus::InMempool)
}
Expand All @@ -989,7 +1000,7 @@ impl<M: MemoryUsageEstimator> Mempool<M> {
}

let event = event::TransactionProcessed::rejected(tx_id, err.clone(), origin);
self.events_controller.broadcast(event.into());
self.broadcast(event.into());
Err(err)
}
}
Expand All @@ -1012,8 +1023,15 @@ impl<M: MemoryUsageEstimator> Mempool<M> {
collect_txs::collect_txs(self, tx_accumulator, transaction_ids, packing_strategy)
}

pub fn subscribe_to_events(&mut self, handler: Arc<dyn Fn(MempoolEvent) + Send + Sync>) {
self.events_controller.subscribe_to_events(handler)
pub fn subscribe_to_subsystem_events(
&mut self,
handler: Arc<dyn Fn(MempoolEvent) + Send + Sync>,
) {
self.subsystem_events.subscribe_to_events(handler)
}

pub fn subscribe_to_rpc_events(&mut self) -> broadcaster::Receiver<RpcMempoolEvent> {
self.rpc_events.subscribe()
}

pub fn process_chainstate_event(
Expand All @@ -1039,7 +1057,7 @@ impl<M: MemoryUsageEstimator> Mempool<M> {
log::info!("new tip: block {block_id:?} height {block_height:?}");
reorg::handle_new_tip(self, block_id, work_queue)?;
let event = event::NewTip::new(block_id, block_height);
self.events_controller.broadcast(event.into());
self.broadcast(event.into());
Ok(())
}

Expand Down
11 changes: 9 additions & 2 deletions mempool/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@ use common::{
primitives::Id,
};
use mempool_types::{tx_options::TxOptionsOverrides, tx_origin::LocalTxOrigin, TxOptions};
use rpc::{subscription, RpcResult};
use serialization::hex_encoded::HexEncoded;
use utils::tap_error_log::LogError;

use crate::{FeeRate, MempoolMaxSize, TxStatus};

use rpc::RpcResult;

#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub struct GetTxResponse {
id: Id<Transaction>,
Expand Down Expand Up @@ -77,6 +76,9 @@ trait MempoolRpc {

#[method(name = "get_fee_rate_points")]
async fn get_fee_rate_points(&self) -> RpcResult<Vec<(usize, FeeRate)>>;

#[subscription(name = "subscribe_events", item = crate::event::RpcMempoolEvent)]
async fn subscribe_events(&self) -> subscription::Reply;
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -158,4 +160,9 @@ impl MempoolRpcServer for super::MempoolHandle {
const NUM_POINTS: NonZeroUsize = NonZeroUsize::MIN.saturating_add(9);
rpc::handle_result(self.call(move |this| this.get_fee_rate_points(NUM_POINTS)).await)
}

async fn subscribe_events(&self, pending: subscription::Pending) -> subscription::Reply {
let events = self.call_mut(|m| m.subscribe_to_rpc_events()).await?;
subscription::connect_broadcast(events, pending).await
}
}
13 changes: 10 additions & 3 deletions mempool/types/src/tx_origin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
use p2p_types::peer_id::PeerId;

/// Tracks where a transaction originates
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
#[derive(
Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, serde::Serialize, serde::Deserialize,
)]
#[serde(untagged)]
pub enum TxOrigin {
/// Transaction originates locally
Local(LocalTxOrigin),
Expand All @@ -38,7 +41,9 @@ impl From<RemoteTxOrigin> for TxOrigin {
}

/// Signifies transaction originates in our local node
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
#[derive(
Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, serde::Serialize, serde::Deserialize,
)]
pub enum LocalTxOrigin {
/// Transaction was submitted to local node's mempool. It should not be propagated further.
Mempool,
Expand All @@ -54,7 +59,9 @@ pub enum LocalTxOrigin {
///
/// If it eventually turns out to be valid, it should be propagated further to other peers.
/// If it's not valid, the original peer should be penalized as appropriate.
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
#[derive(
Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, serde::Serialize, serde::Deserialize,
)]
pub struct RemoteTxOrigin(PeerId);

impl RemoteTxOrigin {
Expand Down
5 changes: 3 additions & 2 deletions mocks/src/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use common::{
};
use mempool::{
error::{BlockConstructionError, Error},
event::MempoolEvent,
event::{MempoolEvent, RpcMempoolEvent},
tx_accumulator::{PackingStrategy, TransactionAccumulator},
tx_origin::{LocalTxOrigin, RemoteTxOrigin},
FeeRate, MempoolInterface, MempoolMaxSize, TxOptions, TxStatus,
Expand Down Expand Up @@ -61,7 +61,8 @@ mockall::mock! {
packing_strategy: PackingStrategy,
) -> Result<Option<Box<dyn TransactionAccumulator>>, BlockConstructionError>;

fn subscribe_to_events(&mut self, handler: Arc<dyn Fn(MempoolEvent) + Send + Sync>);
fn subscribe_to_subsystem_events(&mut self, handler: Arc<dyn Fn(MempoolEvent) + Send + Sync>);
fn subscribe_to_rpc_events(&mut self) -> utils_tokio::broadcaster::Receiver<RpcMempoolEvent>;
fn memory_usage(&self) -> usize;
fn get_max_size(&self) -> MempoolMaxSize;
fn set_max_size(&mut self, max_size: MempoolMaxSize) -> Result<(), Error>;
Expand Down
2 changes: 1 addition & 1 deletion p2p/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ pub async fn subscribe_to_tx_processed(
let subscribe_func = Arc::new(subscribe_func);

mempool_handle
.call_mut(|this| this.subscribe_to_events(subscribe_func))
.call_mut(|this| this.subscribe_to_subsystem_events(subscribe_func))
.await
.map_err(|_| P2pError::SubsystemFailure)?;

Expand Down

0 comments on commit 59a612f

Please sign in to comment.