Skip to content

Commit

Permalink
add txpool_status (#1102)
Browse files Browse the repository at this point in the history
add txpool_status
  • Loading branch information
tcoratger authored May 22, 2024
1 parent ee13a2d commit 8fbda07
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 3 deletions.
9 changes: 8 additions & 1 deletion src/eth_rpc/api/txpool_api.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
use jsonrpsee::{core::RpcResult, proc_macros::rpc};
use reth_primitives::Address;
use reth_rpc_types::txpool::{TxpoolContent, TxpoolContentFrom};
use reth_rpc_types::txpool::{TxpoolContent, TxpoolContentFrom, TxpoolStatus};

/// Txpool API
#[rpc(server, namespace = "txpool")]
#[async_trait]
pub trait TxPoolApi {
/// Returns the number of transactions currently pending for inclusion in the next block(s), as
/// well as the ones that are being scheduled for future execution only.
///
/// See [here](https://geth.ethereum.org/docs/rpc/ns-txpool#txpool_status) for more details
#[method(name = "status")]
async fn txpool_status(&self) -> RpcResult<TxpoolStatus>;

/// Retrieves the transactions contained within the txpool, returning pending
/// transactions of this address, grouped by nonce.
///
Expand Down
13 changes: 12 additions & 1 deletion src/eth_rpc/servers/txpool_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::eth_provider::provider::EthereumProvider;
use crate::eth_rpc::api::txpool_api::TxPoolApiServer;
use jsonrpsee::core::{async_trait, RpcResult as Result};
use reth_primitives::Address;
use reth_rpc_types::txpool::{TxpoolContent, TxpoolContentFrom};
use reth_rpc_types::txpool::{TxpoolContent, TxpoolContentFrom, TxpoolStatus};
use tracing::trace;

/// The RPC module for implementing the Txpool api
Expand All @@ -19,6 +19,17 @@ impl<P: EthereumProvider> TxpoolRpc<P> {

#[async_trait]
impl<P: EthereumProvider + Send + Sync + 'static> TxPoolApiServer for TxpoolRpc<P> {
/// Returns the number of transactions currently pending for inclusion in the next block(s), as
/// well as the ones that are being scheduled for future execution only.
/// Ref: [Here](https://geth.ethereum.org/docs/rpc/ns-txpool#txpool_status)
///
/// Handler for `txpool_status`
async fn txpool_status(&self) -> Result<TxpoolStatus> {
trace!(target: "rpc::eth", "Serving txpool_status");
let all = self.eth_provider.txpool_content().await?;
Ok(TxpoolStatus { pending: all.pending.len() as u64, queued: all.queued.len() as u64 })
}

/// Retrieves the transactions contained within the txpool, returning pending
/// transactions of this address, grouped by nonce.
///
Expand Down
54 changes: 53 additions & 1 deletion tests/tests/txpool_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use kakarot_rpc::test_utils::katana::Katana;
use kakarot_rpc::test_utils::mongo::RANDOM_BYTES_SIZE;
use kakarot_rpc::test_utils::rpc::start_kakarot_rpc_server;
use kakarot_rpc::test_utils::rpc::RawRpcParamsBuilder;
use reth_rpc_types::txpool::{TxpoolContent, TxpoolContentFrom};
use reth_rpc_types::txpool::{TxpoolContent, TxpoolContentFrom, TxpoolStatus};
use rstest::*;
use serde_json::Value;

Expand Down Expand Up @@ -147,3 +147,55 @@ async fn test_txpool_content_from(#[future] katana: Katana, _setup: ()) {
// Drop the server handle to shut down the server after the test
drop(server_handle);
}

#[rstest]
#[awt]
#[tokio::test(flavor = "multi_thread")]
async fn test_txpool_status(#[future] katana: Katana, _setup: ()) {
// Start the Kakarot RPC server and retrieve the server address and handle
let (server_addr, server_handle) =
start_kakarot_rpc_server(&katana).await.expect("Error setting up Kakarot RPC server");

// Generate a vector of random bytes
let bytes: Vec<u8> = (0..RANDOM_BYTES_SIZE).map(|_| rand::random()).collect();
let mut unstructured = arbitrary::Unstructured::new(&bytes);

// Generate 10 pending transactions and add them to the database
let mut pending_transactions = Vec::new();
for _ in 0..10 {
pending_transactions
.push(StoredPendingTransaction::arbitrary_with_optional_fields(&mut unstructured).unwrap().tx);
}
katana.add_pending_transactions_to_database(pending_transactions).await;

// Create a reqwest client
let reqwest_client = reqwest::Client::new();

// Send a POST request to the Kakarot RPC server to retrieve the transaction pool status
let res = reqwest_client
.post(format!("http://localhost:{}", server_addr.port()))
.header("Content-Type", "application/json")
.body(RawRpcParamsBuilder::new("txpool_status").build())
.send()
.await
.expect("Failed to call TxPool RPC");

// Extract the response body as text
let response = res.text().await.expect("Failed to get response body");

// Deserialize the response body into JSON
let raw: Value = serde_json::from_str(&response).expect("Failed to deserialize response body");

// Deserialize the 'result' field of the JSON into a TxpoolStatus struct
let tx_pool_status: TxpoolStatus =
serde_json::from_value(raw["result"].clone()).expect("Failed to deserialize result");

// Assert that we recovered the 10 pending transactions
assert_eq!(tx_pool_status.pending, 10);

// Assert that no queued transactions are registered
assert_eq!(tx_pool_status.queued, 0);

// Drop the server handle to shut down the server after the test
drop(server_handle);
}

0 comments on commit 8fbda07

Please sign in to comment.