From e519c7676b25e385a3cb3e66cd66b2e94e533853 Mon Sep 17 00:00:00 2001 From: Heorhii Azarov Date: Mon, 2 Sep 2024 16:53:04 +0300 Subject: [PATCH] Add rpc function to subscribe --- mempool/src/lib.rs | 1 + mempool/src/rpc.rs | 16 ++++++- mempool/src/rpc_event.rs | 97 ++++++++++++++++++++++++++++++++++++++++ node-daemon/docs/RPC.md | 50 +++++++++++++++++++++ 4 files changed, 163 insertions(+), 1 deletion(-) create mode 100644 mempool/src/rpc_event.rs diff --git a/mempool/src/lib.rs b/mempool/src/lib.rs index 4b1545fae5..e80891ec7b 100644 --- a/mempool/src/lib.rs +++ b/mempool/src/lib.rs @@ -25,6 +25,7 @@ pub mod event; mod interface; mod pool; pub mod rpc; +pub mod rpc_event; pub mod tx_accumulator; pub use {config::MempoolConfig, pool::feerate_points::find_interpolated_value, pool::FeeRate}; diff --git a/mempool/src/rpc.rs b/mempool/src/rpc.rs index 41f8eb7b57..4cf58a3bea 100644 --- a/mempool/src/rpc.rs +++ b/mempool/src/rpc.rs @@ -25,7 +25,7 @@ use mempool_types::{tx_options::TxOptionsOverrides, tx_origin::LocalTxOrigin, Tx use serialization::hex_encoded::HexEncoded; use utils::tap_log::TapLog; -use crate::{FeeRate, MempoolMaxSize, TxStatus}; +use crate::{rpc_event::RpcEvent, FeeRate, MempoolMaxSize, TxStatus}; use rpc::RpcResult; @@ -102,6 +102,12 @@ trait MempoolRpc { /// Get the curve data points that represent the fee rate as a function of transaction size. #[method(name = "get_fee_rate_points")] async fn get_fee_rate_points(&self) -> RpcResult>; + + /// Subscribe to mempool events, such as tx processed. + /// + /// After a successful subscription, the node will message the subscriber with a message on every event. + #[subscription(name = "subscribe_events", item = RpcEvent)] + async fn subscribe_events(&self) -> rpc::subscription::Reply; } #[async_trait::async_trait] @@ -182,4 +188,12 @@ impl MempoolRpcServer for super::MempoolHandle { const NUM_POINTS: NonZeroUsize = NonZeroUsize::MIN.saturating_add(9); rpc::handle_result(self.call(move |this| this.get_fee_rate_points(NUM_POINTS)).await) } + + async fn subscribe_events( + &self, + pending: rpc::subscription::Pending, + ) -> rpc::subscription::Reply { + let event_rx = self.call_mut(move |this| this.subscribe_to_rpc_events()).await?; + rpc::subscription::connect_broadcast_map(event_rx, pending, RpcEvent::from_event).await + } } diff --git a/mempool/src/rpc_event.rs b/mempool/src/rpc_event.rs new file mode 100644 index 0000000000..c75e65d51d --- /dev/null +++ b/mempool/src/rpc_event.rs @@ -0,0 +1,97 @@ +// Copyright (c) 2024 RBB S.r.l +// opensource@mintlayer.org +// SPDX-License-Identifier: MIT +// Licensed under the MIT License; +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://github.com/mintlayer/mintlayer-core/blob/master/LICENSE +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common::{ + chain::{Block, Transaction}, + primitives::{BlockHeight, Id}, +}; +use mempool_types::{tx_options::TxRelayPolicy, tx_origin::LocalTxOrigin}; +use p2p_types::PeerId; + +use crate::event::MempoolEvent; + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, rpc::description::HasValueHint)] +#[serde(tag = "type", content = "content")] +pub enum RpcEvent { + NewTip { + id: Id, + height: BlockHeight, + }, + TransactionProcessed { + tx_id: Id, + origin: RpcTxOrigin, + relay: RpcTxRelayPolicy, + successful: bool, + }, +} + +impl RpcEvent { + pub fn from_event(event: MempoolEvent) -> Self { + match event { + MempoolEvent::NewTip(e) => RpcEvent::NewTip { + id: *e.block_id(), + height: e.block_height(), + }, + MempoolEvent::TransactionProcessed(e) => RpcEvent::TransactionProcessed { + tx_id: *e.tx_id(), + origin: match e.origin() { + mempool_types::tx_origin::TxOrigin::Local(local_origin) => RpcTxOrigin::Local { + origin: match local_origin { + LocalTxOrigin::Mempool => RpcLocalTxOrigin::Mempool, + LocalTxOrigin::P2p => RpcLocalTxOrigin::P2p, + LocalTxOrigin::PastBlock => RpcLocalTxOrigin::PastBlock, + }, + }, + mempool_types::tx_origin::TxOrigin::Remote(r) => RpcTxOrigin::Remote { + peer_id: r.peer_id(), + }, + }, + relay: match e.relay_policy() { + TxRelayPolicy::DoRelay => RpcTxRelayPolicy::DoRelay, + TxRelayPolicy::DontRelay => RpcTxRelayPolicy::DontRelay, + }, + successful: e.result().is_ok(), + }, + } + } +} + +#[derive( + Debug, Clone, Copy, serde::Serialize, serde::Deserialize, rpc_description::HasValueHint, +)] +#[serde(tag = "type", content = "content")] +pub enum RpcTxOrigin { + Local { origin: RpcLocalTxOrigin }, + Remote { peer_id: PeerId }, +} + +#[derive( + Debug, Clone, Copy, serde::Serialize, serde::Deserialize, rpc_description::HasValueHint, +)] +#[serde(tag = "type", content = "content")] +pub enum RpcLocalTxOrigin { + Mempool, + P2p, + PastBlock, +} + +#[derive( + Debug, Clone, Copy, serde::Serialize, serde::Deserialize, rpc_description::HasValueHint, +)] +#[serde(tag = "type", content = "content")] +pub enum RpcTxRelayPolicy { + DoRelay, + DontRelay, +} diff --git a/node-daemon/docs/RPC.md b/node-daemon/docs/RPC.md index 88623f19ed..c119842d18 100644 --- a/node-daemon/docs/RPC.md +++ b/node-daemon/docs/RPC.md @@ -760,6 +760,56 @@ Returns: ], .. ] ``` +### Subscription `mempool_subscribe_events` + +Subscribe to mempool events, such as tx processed. + +After a successful subscription, the node will message the subscriber with a message on every event. + + +Parameters: +``` +{} +``` + +Produces: +``` +EITHER OF + 1) { + "type": "NewTip", + "content": { + "id": hex string, + "height": number, + }, + } + 2) { + "type": "TransactionProcessed", + "content": { + "tx_id": hex string, + "origin": EITHER OF + 1) { + "type": "Local", + "content": { "origin": EITHER OF + 1) { "type": "Mempool" } + 2) { "type": "P2p" } + 3) { "type": "PastBlock" } }, + } + 2) { + "type": "Remote", + "content": { "peer_id": number }, + }, + "relay": EITHER OF + 1) { "type": "DoRelay" } + 2) { "type": "DontRelay" }, + "successful": bool, + }, + } +``` + +Unsubscribe using `mempool_unsubscribe_events`. + +Note: Subscriptions only work over WebSockets. + ## Module `p2p` ### Method `p2p_enable_networking`