Skip to content

Commit

Permalink
Add rpc function to subscribe
Browse files Browse the repository at this point in the history
  • Loading branch information
azarovh committed Sep 2, 2024
1 parent eb492ae commit e519c76
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 1 deletion.
1 change: 1 addition & 0 deletions mempool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub mod event;
mod interface;
mod pool;
pub mod rpc;
pub mod rpc_event;
pub mod tx_accumulator;

pub use {config::MempoolConfig, pool::feerate_points::find_interpolated_value, pool::FeeRate};
Expand Down
16 changes: 15 additions & 1 deletion mempool/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use mempool_types::{tx_options::TxOptionsOverrides, tx_origin::LocalTxOrigin, Tx
use serialization::hex_encoded::HexEncoded;
use utils::tap_log::TapLog;

use crate::{FeeRate, MempoolMaxSize, TxStatus};
use crate::{rpc_event::RpcEvent, FeeRate, MempoolMaxSize, TxStatus};

use rpc::RpcResult;

Expand Down Expand Up @@ -102,6 +102,12 @@ trait MempoolRpc {
/// Get the curve data points that represent the fee rate as a function of transaction size.
#[method(name = "get_fee_rate_points")]
async fn get_fee_rate_points(&self) -> RpcResult<Vec<(usize, FeeRate)>>;

/// Subscribe to mempool events, such as tx processed.
///
/// After a successful subscription, the node will message the subscriber with a message on every event.
#[subscription(name = "subscribe_events", item = RpcEvent)]
async fn subscribe_events(&self) -> rpc::subscription::Reply;
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -182,4 +188,12 @@ impl MempoolRpcServer for super::MempoolHandle {
const NUM_POINTS: NonZeroUsize = NonZeroUsize::MIN.saturating_add(9);
rpc::handle_result(self.call(move |this| this.get_fee_rate_points(NUM_POINTS)).await)
}

async fn subscribe_events(
&self,
pending: rpc::subscription::Pending,
) -> rpc::subscription::Reply {
let event_rx = self.call_mut(move |this| this.subscribe_to_rpc_events()).await?;
rpc::subscription::connect_broadcast_map(event_rx, pending, RpcEvent::from_event).await
}
}
97 changes: 97 additions & 0 deletions mempool/src/rpc_event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright (c) 2024 RBB S.r.l
// [email protected]
// SPDX-License-Identifier: MIT
// Licensed under the MIT License;
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://github.com/mintlayer/mintlayer-core/blob/master/LICENSE
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use common::{
chain::{Block, Transaction},
primitives::{BlockHeight, Id},
};
use mempool_types::{tx_options::TxRelayPolicy, tx_origin::LocalTxOrigin};
use p2p_types::PeerId;

use crate::event::MempoolEvent;

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, rpc::description::HasValueHint)]
#[serde(tag = "type", content = "content")]
pub enum RpcEvent {
NewTip {
id: Id<Block>,
height: BlockHeight,
},
TransactionProcessed {
tx_id: Id<Transaction>,
origin: RpcTxOrigin,
relay: RpcTxRelayPolicy,
successful: bool,
},
}

impl RpcEvent {
pub fn from_event(event: MempoolEvent) -> Self {
match event {
MempoolEvent::NewTip(e) => RpcEvent::NewTip {
id: *e.block_id(),
height: e.block_height(),
},
MempoolEvent::TransactionProcessed(e) => RpcEvent::TransactionProcessed {
tx_id: *e.tx_id(),
origin: match e.origin() {
mempool_types::tx_origin::TxOrigin::Local(local_origin) => RpcTxOrigin::Local {
origin: match local_origin {
LocalTxOrigin::Mempool => RpcLocalTxOrigin::Mempool,
LocalTxOrigin::P2p => RpcLocalTxOrigin::P2p,
LocalTxOrigin::PastBlock => RpcLocalTxOrigin::PastBlock,
},
},
mempool_types::tx_origin::TxOrigin::Remote(r) => RpcTxOrigin::Remote {
peer_id: r.peer_id(),
},
},
relay: match e.relay_policy() {
TxRelayPolicy::DoRelay => RpcTxRelayPolicy::DoRelay,
TxRelayPolicy::DontRelay => RpcTxRelayPolicy::DontRelay,
},
successful: e.result().is_ok(),
},
}
}
}

#[derive(
Debug, Clone, Copy, serde::Serialize, serde::Deserialize, rpc_description::HasValueHint,
)]
#[serde(tag = "type", content = "content")]
pub enum RpcTxOrigin {
Local { origin: RpcLocalTxOrigin },
Remote { peer_id: PeerId },
}

#[derive(
Debug, Clone, Copy, serde::Serialize, serde::Deserialize, rpc_description::HasValueHint,
)]
#[serde(tag = "type", content = "content")]
pub enum RpcLocalTxOrigin {
Mempool,
P2p,
PastBlock,
}

#[derive(
Debug, Clone, Copy, serde::Serialize, serde::Deserialize, rpc_description::HasValueHint,
)]
#[serde(tag = "type", content = "content")]
pub enum RpcTxRelayPolicy {
DoRelay,
DontRelay,
}
50 changes: 50 additions & 0 deletions node-daemon/docs/RPC.md
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,56 @@ Returns:
], .. ]
```

### Subscription `mempool_subscribe_events`

Subscribe to mempool events, such as tx processed.

After a successful subscription, the node will message the subscriber with a message on every event.


Parameters:
```
{}
```

Produces:
```
EITHER OF
1) {
"type": "NewTip",
"content": {
"id": hex string,
"height": number,
},
}
2) {
"type": "TransactionProcessed",
"content": {
"tx_id": hex string,
"origin": EITHER OF
1) {
"type": "Local",
"content": { "origin": EITHER OF
1) { "type": "Mempool" }
2) { "type": "P2p" }
3) { "type": "PastBlock" } },
}
2) {
"type": "Remote",
"content": { "peer_id": number },
},
"relay": EITHER OF
1) { "type": "DoRelay" }
2) { "type": "DontRelay" },
"successful": bool,
},
}
```

Unsubscribe using `mempool_unsubscribe_events`.

Note: Subscriptions only work over WebSockets.

## Module `p2p`

### Method `p2p_enable_networking`
Expand Down

0 comments on commit e519c76

Please sign in to comment.