Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
iljakuklic committed Feb 21, 2024
1 parent 1f8d076 commit 9915c78
Show file tree
Hide file tree
Showing 12 changed files with 149 additions and 31 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion blockprod/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ mod tests {
let tip_sx = utils::sync::Mutex::new(Some(tip_sx));
mempool
.call_mut(move |m| {
m.subscribe_to_events(Arc::new({
m.subscribe_to_subsystem_events(Arc::new({
move |evt| match evt {
mempool::event::MempoolEvent::NewTip(tip) => {
if let Some(tip_sx) = tip_sx.lock().unwrap().take() {
Expand Down
1 change: 1 addition & 0 deletions mempool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ serialization = { path = '../serialization' }
subsystem = { path = '../subsystem' }
tokens-accounting = { path = '../tokens-accounting' }
utils = { path = '../utils' }
utils-tokio = { path = '../utils/tokio' }
utxo = { path = '../utxo' }

anyhow.workspace = true
Expand Down
63 changes: 61 additions & 2 deletions mempool/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use common::{
chain::{Block, Transaction},
primitives::{BlockHeight, Id},
};
use utils::ensure;

use crate::{
error::{Error, MempoolBanScore},
Expand Down Expand Up @@ -79,10 +80,27 @@ impl TransactionProcessed {
pub fn relay_policy(&self) -> TxRelayPolicy {
self.relay_policy
}

pub fn to_rpc_event(&self) -> Option<RpcMempoolEvent> {
let Self {
tx_id,
origin,
relay_policy: _,
result,
} = self;

// Only emit events for transactions that actually end up in mempool
ensure!(result.is_ok());

Some(RpcMempoolEvent::TxProcessed {
tx_id: *tx_id,
origin: *origin,
})
}
}

/// Event triggered when mempool has synced up to given tip
#[derive(Debug, Clone, Eq, PartialEq)]
#[derive(Debug, Clone, Eq, PartialEq, serde::Serialize, serde::Deserialize)]
pub struct NewTip {
block_id: Id<Block>,
height: BlockHeight,
Expand All @@ -102,13 +120,22 @@ impl NewTip {
}
}

/// Events emitted by mempool
/// Events emitted by mempool destined for other subsystems
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum MempoolEvent {
NewTip(NewTip),
TransactionProcessed(TransactionProcessed),
}

impl MempoolEvent {
pub fn to_rpc_event(&self) -> Option<RpcMempoolEvent> {
match self {
MempoolEvent::NewTip(new_tip) => Some(RpcMempoolEvent::NewTip(new_tip.clone())),
MempoolEvent::TransactionProcessed(tx_processed) => tx_processed.to_rpc_event(),
}
}
}

impl From<TransactionProcessed> for MempoolEvent {
fn from(event: TransactionProcessed) -> Self {
Self::TransactionProcessed(event)
Expand All @@ -120,3 +147,35 @@ impl From<NewTip> for MempoolEvent {
Self::NewTip(event)
}
}

/// Events emitted by mempool destined for RPC subscribers
#[derive(Debug, Clone, Eq, PartialEq, serde::Serialize, serde::Deserialize)]
pub enum RpcMempoolEvent {
NewTip(NewTip),

TxProcessed {
tx_id: Id<Transaction>,
origin: TxOrigin,
},

TxEvicted {
tx_id: Id<Transaction>,
origin: TxOrigin,
},
}

impl RpcMempoolEvent {
pub fn tx_processed(tx_id: Id<Transaction>, origin: TxOrigin) -> Self {
Self::TxProcessed { tx_id, origin }
}

pub fn tx_evicted(tx_id: Id<Transaction>, origin: TxOrigin) -> Self {
Self::TxEvicted { tx_id, origin }
}
}

impl From<NewTip> for RpcMempoolEvent {
fn from(new_tip: NewTip) -> Self {
Self::NewTip(new_tip)
}
}
8 changes: 6 additions & 2 deletions mempool/src/interface/mempool_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

use crate::{
error::{BlockConstructionError, Error},
event::MempoolEvent,
event::{MempoolEvent, RpcMempoolEvent},
tx_accumulator::{PackingStrategy, TransactionAccumulator},
tx_origin::{LocalTxOrigin, RemoteTxOrigin},
FeeRate, MempoolMaxSize, TxOptions, TxStatus,
Expand All @@ -25,6 +25,7 @@ use common::{
primitives::Id,
};
use std::{num::NonZeroUsize, sync::Arc};
use utils_tokio::broadcaster;

pub trait MempoolInterface: Send + Sync {
/// Add a transaction from remote peer to mempool
Expand Down Expand Up @@ -73,7 +74,10 @@ pub trait MempoolInterface: Send + Sync {
) -> Result<Option<Box<dyn TransactionAccumulator>>, BlockConstructionError>;

/// Subscribe to events emitted by mempool
fn subscribe_to_events(&mut self, handler: Arc<dyn Fn(MempoolEvent) + Send + Sync>);
fn subscribe_to_subsystem_events(&mut self, handler: Arc<dyn Fn(MempoolEvent) + Send + Sync>);

/// Subscribe to RPC events emitted by mempool
fn subscribe_to_rpc_events(&mut self) -> broadcaster::Receiver<RpcMempoolEvent>;

/// Get current memory usage
fn memory_usage(&self) -> usize;
Expand Down
10 changes: 7 additions & 3 deletions mempool/src/interface/mempool_interface_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
use crate::{
config::MempoolConfig,
error::{BlockConstructionError, Error},
event::MempoolEvent,
event::{MempoolEvent, RpcMempoolEvent},
pool::memory_usage_estimator::StoreMemoryUsageEstimator,
tx_accumulator::{PackingStrategy, TransactionAccumulator},
tx_origin::{LocalTxOrigin, RemoteTxOrigin},
Expand Down Expand Up @@ -190,8 +190,12 @@ impl MempoolInterface for MempoolImpl {
self.mempool.collect_txs(tx_accumulator, transaction_ids, packing_strategy)
}

fn subscribe_to_events(&mut self, handler: Arc<dyn Fn(MempoolEvent) + Send + Sync>) {
self.mempool.subscribe_to_events(handler);
fn subscribe_to_subsystem_events(&mut self, handler: Arc<dyn Fn(MempoolEvent) + Send + Sync>) {
self.mempool.subscribe_to_subsystem_events(handler);
}

fn subscribe_to_rpc_events(&mut self) -> utils_tokio::broadcaster::Receiver<RpcMempoolEvent> {
self.mempool.subscribe_to_rpc_events()
}

fn memory_usage(&self) -> usize {
Expand Down
63 changes: 48 additions & 15 deletions mempool/src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use common::{
use logging::log;
use serialization::Encode;
use utils::{ensure, eventhandler::EventsController, shallow_clone::ShallowClone};
use utils_tokio::broadcaster;

pub use self::feerate::FeeRate;
pub use self::memory_usage_estimator::MemoryUsageEstimator;
Expand All @@ -59,7 +60,7 @@ use crate::{
BlockConstructionError, Error, MempoolConflictError, MempoolPolicyError, OrphanPoolError,
TxValidationError,
},
event::{self, MempoolEvent},
event::{self, MempoolEvent, RpcMempoolEvent},
tx_accumulator::{PackingStrategy, TransactionAccumulator},
tx_options::{TxOptions, TxTrustPolicy},
tx_origin::{RemoteTxOrigin, TxOrigin},
Expand Down Expand Up @@ -94,7 +95,8 @@ pub struct Mempool<M> {
chainstate_handle: chainstate::ChainstateHandle,
clock: TimeGetter,
memory_usage_estimator: M,
events_controller: EventsController<MempoolEvent>,
subsystem_events: EventsController<MempoolEvent>,
rpc_events: broadcaster::Broadcaster<RpcMempoolEvent>,
tx_verifier: tx_verifier::TransactionVerifier,
orphans: TxOrphanPool,
}
Expand Down Expand Up @@ -130,7 +132,8 @@ impl<M> Mempool<M> {
rolling_fee_rate: RwLock::new(RollingFeeRate::new(clock.get_time())),
clock,
memory_usage_estimator,
events_controller: Default::default(),
subsystem_events: Default::default(),
rpc_events: broadcaster::Broadcaster::new(),
tx_verifier,
orphans: TxOrphanPool::new(),
}
Expand Down Expand Up @@ -967,7 +970,7 @@ impl<M: MemoryUsageEstimator> Mempool<M> {
let origin = tx.origin();
let relay_policy = tx.options().relay_policy();

match self.validate_transaction(tx) {
let result = match self.validate_transaction(tx) {
Ok(ValidationOutcome::Valid {
transaction,
conflicts,
Expand All @@ -979,10 +982,6 @@ impl<M: MemoryUsageEstimator> Mempool<M> {
tx_verifier::flush_to_storage(&mut self.tx_verifier, delta)?;
self.finalize_tx(transaction)?;
self.store.assert_valid();

let event = event::TransactionProcessed::accepted(tx_id, relay_policy, origin);
self.events_controller.broadcast(event.into());

Ok(TxStatus::InMempool)
}
Ok(ValidationOutcome::Orphan { transaction }) => {
Expand All @@ -991,12 +990,13 @@ impl<M: MemoryUsageEstimator> Mempool<M> {
}
Err(err) => {
log::warn!("Transaction rejected: {}", err);

let event = event::TransactionProcessed::rejected(tx_id, err.clone(), origin);
self.events_controller.broadcast(event.into());
Err(err)
}
}
};

self.broadcast_tx_processed(tx_id, origin, relay_policy, result.clone());

result
}

pub fn get_all(&self) -> Vec<SignedTransaction> {
Expand All @@ -1016,8 +1016,15 @@ impl<M: MemoryUsageEstimator> Mempool<M> {
collect_txs::collect_txs(self, tx_accumulator, transaction_ids, packing_strategy)
}

pub fn subscribe_to_events(&mut self, handler: Arc<dyn Fn(MempoolEvent) + Send + Sync>) {
self.events_controller.subscribe_to_events(handler)
pub fn subscribe_to_subsystem_events(
&mut self,
handler: Arc<dyn Fn(MempoolEvent) + Send + Sync>,
) {
self.subsystem_events.subscribe_to_events(handler)
}

pub fn subscribe_to_rpc_events(&mut self) -> broadcaster::Receiver<RpcMempoolEvent> {
self.rpc_events.subscribe()
}

pub fn process_chainstate_event(
Expand All @@ -1034,6 +1041,31 @@ impl<M: MemoryUsageEstimator> Mempool<M> {
Ok(())
}

fn broadcast_tx_processed(
&mut self,
tx_id: Id<Transaction>,
origin: TxOrigin,
relay_policy: crate::tx_options::TxRelayPolicy,
result: Result<TxStatus, Error>, /* TODO context */
) {
// Dispatch the subsystem event first
match result {
Ok(TxStatus::InMempool) => {
let evt = event::TransactionProcessed::accepted(tx_id, relay_policy, origin);
self.subsystem_events.broadcast(evt.into());
}
Ok(TxStatus::InOrphanPool) => {
// In orphan pool, the transaction has not been accepted or rejected yet
},
Err(err) => {
let evt = event::TransactionProcessed::rejected(tx_id, err, origin);
self.subsystem_events.broadcast(evt.into());
}
}

// TODO(PR): Dispatch the RPC event
}

pub fn on_new_tip(
&mut self,
block_id: Id<Block>,
Expand All @@ -1047,7 +1079,8 @@ impl<M: MemoryUsageEstimator> Mempool<M> {
);
reorg::handle_new_tip(self, block_id, work_queue)?;
let event = event::NewTip::new(block_id, block_height);
self.events_controller.broadcast(event.into());
self.subsystem_events.broadcast(event.clone().into());
self.rpc_events.broadcast(&event.into());
Ok(())
}

Expand Down
1 change: 1 addition & 0 deletions mempool/src/pool/reorg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ fn reorg_mempool_transactions<M: MemoryUsageEstimator>(
.map_err(|_| ReorgError::BestBlockForUtxos)?
);

// Try to apply transactions that have been reorged out
for tx in txs_to_insert {
let tx_id = tx.transaction().get_id();
let origin = LocalTxOrigin::PastBlock.into();
Expand Down
11 changes: 9 additions & 2 deletions mempool/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@ use common::{
primitives::Id,
};
use mempool_types::{tx_options::TxOptionsOverrides, tx_origin::LocalTxOrigin, TxOptions};
use rpc::{subscription, RpcResult};
use serialization::hex_encoded::HexEncoded;
use utils::tap_error_log::LogError;

use crate::{FeeRate, MempoolMaxSize, TxStatus};

use rpc::RpcResult;

#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub struct GetTxResponse {
id: Id<Transaction>,
Expand Down Expand Up @@ -77,6 +76,9 @@ trait MempoolRpc {

#[method(name = "get_fee_rate_points")]
async fn get_fee_rate_points(&self) -> RpcResult<Vec<(usize, FeeRate)>>;

#[subscription(name = "subscribe_events", item = crate::event::RpcMempoolEvent)]
async fn subscribe_events(&self) -> subscription::Reply;
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -158,4 +160,9 @@ impl MempoolRpcServer for super::MempoolHandle {
const NUM_POINTS: NonZeroUsize = NonZeroUsize::MIN.saturating_add(9);
rpc::handle_result(self.call(move |this| this.get_fee_rate_points(NUM_POINTS)).await)
}

async fn subscribe_events(&self, pending: subscription::Pending) -> subscription::Reply {
let events = self.call_mut(|m| m.subscribe_to_rpc_events()).await?;
subscription::connect_broadcast(events, pending).await
}
}
13 changes: 10 additions & 3 deletions mempool/types/src/tx_origin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
use p2p_types::peer_id::PeerId;

/// Tracks where a transaction originates
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
#[derive(
Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, serde::Serialize, serde::Deserialize,
)]
#[serde(untagged)]
pub enum TxOrigin {
/// Transaction originates locally
Local(LocalTxOrigin),
Expand All @@ -38,7 +41,9 @@ impl From<RemoteTxOrigin> for TxOrigin {
}

/// Signifies transaction originates in our local node
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
#[derive(
Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, serde::Serialize, serde::Deserialize,
)]
pub enum LocalTxOrigin {
/// Transaction was submitted to local node's mempool. It should not be propagated further.
Mempool,
Expand All @@ -54,7 +59,9 @@ pub enum LocalTxOrigin {
///
/// If it eventually turns out to be valid, it should be propagated further to other peers.
/// If it's not valid, the original peer should be penalized as appropriate.
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
#[derive(
Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, serde::Serialize, serde::Deserialize,
)]
pub struct RemoteTxOrigin(PeerId);

impl RemoteTxOrigin {
Expand Down
Loading

0 comments on commit 9915c78

Please sign in to comment.