diff --git a/Cargo.lock b/Cargo.lock index a49604e548..ffab90640d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3645,6 +3645,7 @@ dependencies = [ "tokens-accounting", "tokio", "utils", + "utils-tokio", "utxo", ] diff --git a/blockprod/src/lib.rs b/blockprod/src/lib.rs index 6f1cb2de99..c6d743abc2 100644 --- a/blockprod/src/lib.rs +++ b/blockprod/src/lib.rs @@ -168,7 +168,7 @@ mod tests { let tip_sx = utils::sync::Mutex::new(Some(tip_sx)); mempool .call_mut(move |m| { - m.subscribe_to_events(Arc::new({ + m.subscribe_to_subsystem_events(Arc::new({ move |evt| match evt { mempool::event::MempoolEvent::NewTip(tip) => { if let Some(tip_sx) = tip_sx.lock().unwrap().take() { diff --git a/mempool/Cargo.toml b/mempool/Cargo.toml index d1416f5f76..ce2eea0a81 100644 --- a/mempool/Cargo.toml +++ b/mempool/Cargo.toml @@ -22,6 +22,7 @@ serialization = { path = '../serialization' } subsystem = { path = '../subsystem' } tokens-accounting = { path = '../tokens-accounting' } utils = { path = '../utils' } +utils-tokio = { path = '../utils/tokio' } utxo = { path = '../utxo' } anyhow.workspace = true diff --git a/mempool/src/event.rs b/mempool/src/event.rs index 45c849fd2f..df99b0fdfa 100644 --- a/mempool/src/event.rs +++ b/mempool/src/event.rs @@ -17,6 +17,7 @@ use common::{ chain::{Block, Transaction}, primitives::{BlockHeight, Id}, }; +use utils::ensure; use crate::{ error::{Error, MempoolBanScore}, @@ -79,10 +80,27 @@ impl TransactionProcessed { pub fn relay_policy(&self) -> TxRelayPolicy { self.relay_policy } + + pub fn to_rpc_event(&self) -> Option { + let Self { + tx_id, + origin, + relay_policy: _, + result, + } = self; + + // Only emit events for transactions that actually end up in mempool + ensure!(result.is_ok()); + + Some(RpcMempoolEvent::TxProcessed { + tx_id: *tx_id, + origin: *origin, + }) + } } /// Event triggered when mempool has synced up to given tip -#[derive(Debug, Clone, Eq, PartialEq)] +#[derive(Debug, Clone, Eq, PartialEq, serde::Serialize, serde::Deserialize)] pub struct NewTip { block_id: Id, height: BlockHeight, @@ -102,13 +120,22 @@ impl NewTip { } } -/// Events emitted by mempool +/// Events emitted by mempool destined for other subsystems #[derive(Debug, Clone, Eq, PartialEq)] pub enum MempoolEvent { NewTip(NewTip), TransactionProcessed(TransactionProcessed), } +impl MempoolEvent { + pub fn to_rpc_event(&self) -> Option { + match self { + MempoolEvent::NewTip(new_tip) => Some(RpcMempoolEvent::NewTip(new_tip.clone())), + MempoolEvent::TransactionProcessed(tx_processed) => tx_processed.to_rpc_event(), + } + } +} + impl From for MempoolEvent { fn from(event: TransactionProcessed) -> Self { Self::TransactionProcessed(event) @@ -120,3 +147,35 @@ impl From for MempoolEvent { Self::NewTip(event) } } + +/// Events emitted by mempool destined for RPC subscribers +#[derive(Debug, Clone, Eq, PartialEq, serde::Serialize, serde::Deserialize)] +pub enum RpcMempoolEvent { + NewTip(NewTip), + + TxProcessed { + tx_id: Id, + origin: TxOrigin, + }, + + TxEvicted { + tx_id: Id, + origin: TxOrigin, + }, +} + +impl RpcMempoolEvent { + pub fn tx_processed(tx_id: Id, origin: TxOrigin) -> Self { + Self::TxProcessed { tx_id, origin } + } + + pub fn tx_evicted(tx_id: Id, origin: TxOrigin) -> Self { + Self::TxEvicted { tx_id, origin } + } +} + +impl From for RpcMempoolEvent { + fn from(new_tip: NewTip) -> Self { + Self::NewTip(new_tip) + } +} diff --git a/mempool/src/interface/mempool_interface.rs b/mempool/src/interface/mempool_interface.rs index af17c6f8e4..d0c93c538d 100644 --- a/mempool/src/interface/mempool_interface.rs +++ b/mempool/src/interface/mempool_interface.rs @@ -15,7 +15,7 @@ use crate::{ error::{BlockConstructionError, Error}, - event::MempoolEvent, + event::{MempoolEvent, RpcMempoolEvent}, tx_accumulator::{PackingStrategy, TransactionAccumulator}, tx_origin::{LocalTxOrigin, RemoteTxOrigin}, FeeRate, MempoolMaxSize, TxOptions, TxStatus, @@ -25,6 +25,7 @@ use common::{ primitives::Id, }; use std::{num::NonZeroUsize, sync::Arc}; +use utils_tokio::broadcaster; pub trait MempoolInterface: Send + Sync { /// Add a transaction from remote peer to mempool @@ -73,7 +74,10 @@ pub trait MempoolInterface: Send + Sync { ) -> Result>, BlockConstructionError>; /// Subscribe to events emitted by mempool - fn subscribe_to_events(&mut self, handler: Arc); + fn subscribe_to_subsystem_events(&mut self, handler: Arc); + + /// Subscribe to RPC events emitted by mempool + fn subscribe_to_rpc_events(&mut self) -> broadcaster::Receiver; /// Get current memory usage fn memory_usage(&self) -> usize; diff --git a/mempool/src/interface/mempool_interface_impl.rs b/mempool/src/interface/mempool_interface_impl.rs index 91c4085e56..fd25469b5e 100644 --- a/mempool/src/interface/mempool_interface_impl.rs +++ b/mempool/src/interface/mempool_interface_impl.rs @@ -16,7 +16,7 @@ use crate::{ config::MempoolConfig, error::{BlockConstructionError, Error}, - event::MempoolEvent, + event::{MempoolEvent, RpcMempoolEvent}, pool::memory_usage_estimator::StoreMemoryUsageEstimator, tx_accumulator::{PackingStrategy, TransactionAccumulator}, tx_origin::{LocalTxOrigin, RemoteTxOrigin}, @@ -190,8 +190,12 @@ impl MempoolInterface for MempoolImpl { self.mempool.collect_txs(tx_accumulator, transaction_ids, packing_strategy) } - fn subscribe_to_events(&mut self, handler: Arc) { - self.mempool.subscribe_to_events(handler); + fn subscribe_to_subsystem_events(&mut self, handler: Arc) { + self.mempool.subscribe_to_subsystem_events(handler); + } + + fn subscribe_to_rpc_events(&mut self) -> utils_tokio::broadcaster::Receiver { + self.mempool.subscribe_to_rpc_events() } fn memory_usage(&self) -> usize { diff --git a/mempool/src/pool/mod.rs b/mempool/src/pool/mod.rs index 23361c52d2..401f88d5c2 100644 --- a/mempool/src/pool/mod.rs +++ b/mempool/src/pool/mod.rs @@ -41,6 +41,7 @@ use common::{ use logging::log; use serialization::Encode; use utils::{ensure, eventhandler::EventsController, shallow_clone::ShallowClone}; +use utils_tokio::broadcaster; pub use self::feerate::FeeRate; pub use self::memory_usage_estimator::MemoryUsageEstimator; @@ -59,7 +60,7 @@ use crate::{ BlockConstructionError, Error, MempoolConflictError, MempoolPolicyError, OrphanPoolError, TxValidationError, }, - event::{self, MempoolEvent}, + event::{self, MempoolEvent, RpcMempoolEvent}, tx_accumulator::{PackingStrategy, TransactionAccumulator}, tx_options::{TxOptions, TxTrustPolicy}, tx_origin::{RemoteTxOrigin, TxOrigin}, @@ -94,7 +95,8 @@ pub struct Mempool { chainstate_handle: chainstate::ChainstateHandle, clock: TimeGetter, memory_usage_estimator: M, - events_controller: EventsController, + subsystem_events: EventsController, + rpc_events: broadcaster::Broadcaster, tx_verifier: tx_verifier::TransactionVerifier, orphans: TxOrphanPool, } @@ -130,7 +132,8 @@ impl Mempool { rolling_fee_rate: RwLock::new(RollingFeeRate::new(clock.get_time())), clock, memory_usage_estimator, - events_controller: Default::default(), + subsystem_events: Default::default(), + rpc_events: broadcaster::Broadcaster::new(), tx_verifier, orphans: TxOrphanPool::new(), } @@ -967,7 +970,7 @@ impl Mempool { let origin = tx.origin(); let relay_policy = tx.options().relay_policy(); - match self.validate_transaction(tx) { + let result = match self.validate_transaction(tx) { Ok(ValidationOutcome::Valid { transaction, conflicts, @@ -979,10 +982,6 @@ impl Mempool { tx_verifier::flush_to_storage(&mut self.tx_verifier, delta)?; self.finalize_tx(transaction)?; self.store.assert_valid(); - - let event = event::TransactionProcessed::accepted(tx_id, relay_policy, origin); - self.events_controller.broadcast(event.into()); - Ok(TxStatus::InMempool) } Ok(ValidationOutcome::Orphan { transaction }) => { @@ -991,12 +990,13 @@ impl Mempool { } Err(err) => { log::warn!("Transaction rejected: {}", err); - - let event = event::TransactionProcessed::rejected(tx_id, err.clone(), origin); - self.events_controller.broadcast(event.into()); Err(err) } - } + }; + + self.broadcast_tx_processed(tx_id, origin, relay_policy, result.clone()); + + result } pub fn get_all(&self) -> Vec { @@ -1016,8 +1016,15 @@ impl Mempool { collect_txs::collect_txs(self, tx_accumulator, transaction_ids, packing_strategy) } - pub fn subscribe_to_events(&mut self, handler: Arc) { - self.events_controller.subscribe_to_events(handler) + pub fn subscribe_to_subsystem_events( + &mut self, + handler: Arc, + ) { + self.subsystem_events.subscribe_to_events(handler) + } + + pub fn subscribe_to_rpc_events(&mut self) -> broadcaster::Receiver { + self.rpc_events.subscribe() } pub fn process_chainstate_event( @@ -1034,6 +1041,31 @@ impl Mempool { Ok(()) } + fn broadcast_tx_processed( + &mut self, + tx_id: Id, + origin: TxOrigin, + relay_policy: crate::tx_options::TxRelayPolicy, + result: Result, /* TODO context */ + ) { + // Dispatch the subsystem event first + match result { + Ok(TxStatus::InMempool) => { + let evt = event::TransactionProcessed::accepted(tx_id, relay_policy, origin); + self.subsystem_events.broadcast(evt.into()); + } + Ok(TxStatus::InOrphanPool) => { + // In orphan pool, the transaction has not been accepted or rejected yet + }, + Err(err) => { + let evt = event::TransactionProcessed::rejected(tx_id, err, origin); + self.subsystem_events.broadcast(evt.into()); + } + } + + // TODO(PR): Dispatch the RPC event + } + pub fn on_new_tip( &mut self, block_id: Id, @@ -1047,7 +1079,8 @@ impl Mempool { ); reorg::handle_new_tip(self, block_id, work_queue)?; let event = event::NewTip::new(block_id, block_height); - self.events_controller.broadcast(event.into()); + self.subsystem_events.broadcast(event.clone().into()); + self.rpc_events.broadcast(&event.into()); Ok(()) } diff --git a/mempool/src/pool/reorg.rs b/mempool/src/pool/reorg.rs index 81026630fa..e97025124d 100644 --- a/mempool/src/pool/reorg.rs +++ b/mempool/src/pool/reorg.rs @@ -201,6 +201,7 @@ fn reorg_mempool_transactions( .map_err(|_| ReorgError::BestBlockForUtxos)? ); + // Try to apply transactions that have been reorged out for tx in txs_to_insert { let tx_id = tx.transaction().get_id(); let origin = LocalTxOrigin::PastBlock.into(); diff --git a/mempool/src/rpc.rs b/mempool/src/rpc.rs index e40962deb6..16cbdcc199 100644 --- a/mempool/src/rpc.rs +++ b/mempool/src/rpc.rs @@ -22,13 +22,12 @@ use common::{ primitives::Id, }; use mempool_types::{tx_options::TxOptionsOverrides, tx_origin::LocalTxOrigin, TxOptions}; +use rpc::{subscription, RpcResult}; use serialization::hex_encoded::HexEncoded; use utils::tap_error_log::LogError; use crate::{FeeRate, MempoolMaxSize, TxStatus}; -use rpc::RpcResult; - #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub struct GetTxResponse { id: Id, @@ -77,6 +76,9 @@ trait MempoolRpc { #[method(name = "get_fee_rate_points")] async fn get_fee_rate_points(&self) -> RpcResult>; + + #[subscription(name = "subscribe_events", item = crate::event::RpcMempoolEvent)] + async fn subscribe_events(&self) -> subscription::Reply; } #[async_trait::async_trait] @@ -158,4 +160,9 @@ impl MempoolRpcServer for super::MempoolHandle { const NUM_POINTS: NonZeroUsize = NonZeroUsize::MIN.saturating_add(9); rpc::handle_result(self.call(move |this| this.get_fee_rate_points(NUM_POINTS)).await) } + + async fn subscribe_events(&self, pending: subscription::Pending) -> subscription::Reply { + let events = self.call_mut(|m| m.subscribe_to_rpc_events()).await?; + subscription::connect_broadcast(events, pending).await + } } diff --git a/mempool/types/src/tx_origin.rs b/mempool/types/src/tx_origin.rs index 196208fde7..72b3a44c2e 100644 --- a/mempool/types/src/tx_origin.rs +++ b/mempool/types/src/tx_origin.rs @@ -16,7 +16,10 @@ use p2p_types::peer_id::PeerId; /// Tracks where a transaction originates -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)] +#[derive( + Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, serde::Serialize, serde::Deserialize, +)] +#[serde(untagged)] pub enum TxOrigin { /// Transaction originates locally Local(LocalTxOrigin), @@ -38,7 +41,9 @@ impl From for TxOrigin { } /// Signifies transaction originates in our local node -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)] +#[derive( + Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, serde::Serialize, serde::Deserialize, +)] pub enum LocalTxOrigin { /// Transaction was submitted to local node's mempool. It should not be propagated further. Mempool, @@ -54,7 +59,9 @@ pub enum LocalTxOrigin { /// /// If it eventually turns out to be valid, it should be propagated further to other peers. /// If it's not valid, the original peer should be penalized as appropriate. -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)] +#[derive( + Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, serde::Serialize, serde::Deserialize, +)] pub struct RemoteTxOrigin(PeerId); impl RemoteTxOrigin { diff --git a/mocks/src/mempool.rs b/mocks/src/mempool.rs index 128c1179d7..029744111a 100644 --- a/mocks/src/mempool.rs +++ b/mocks/src/mempool.rs @@ -23,7 +23,7 @@ use common::{ }; use mempool::{ error::{BlockConstructionError, Error}, - event::MempoolEvent, + event::{MempoolEvent, RpcMempoolEvent}, tx_accumulator::{PackingStrategy, TransactionAccumulator}, tx_origin::{LocalTxOrigin, RemoteTxOrigin}, FeeRate, MempoolInterface, MempoolMaxSize, TxOptions, TxStatus, @@ -61,7 +61,8 @@ mockall::mock! { packing_strategy: PackingStrategy, ) -> Result>, BlockConstructionError>; - fn subscribe_to_events(&mut self, handler: Arc); + fn subscribe_to_subsystem_events(&mut self, handler: Arc); + fn subscribe_to_rpc_events(&mut self) -> utils_tokio::broadcaster::Receiver; fn memory_usage(&self) -> usize; fn get_max_size(&self) -> MempoolMaxSize; fn set_max_size(&mut self, max_size: MempoolMaxSize) -> Result<(), Error>; diff --git a/p2p/src/sync/mod.rs b/p2p/src/sync/mod.rs index 628bfd7951..305de16689 100644 --- a/p2p/src/sync/mod.rs +++ b/p2p/src/sync/mod.rs @@ -400,7 +400,7 @@ pub async fn subscribe_to_tx_processed( let subscribe_func = Arc::new(subscribe_func); mempool_handle - .call_mut(|this| this.subscribe_to_events(subscribe_func)) + .call_mut(|this| this.subscribe_to_subsystem_events(subscribe_func)) .await .map_err(|_| P2pError::SubsystemFailure)?;