Skip to content

Commit

Permalink
mempool: remove polling for state updates
Browse files Browse the repository at this point in the history
  • Loading branch information
erwanor committed Jun 24, 2024
1 parent 4586b25 commit a222cd8
Showing 1 changed file with 19 additions and 38 deletions.
57 changes: 19 additions & 38 deletions crates/core/app/src/server/mempool.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use anyhow::Result;

use cnidarium::{Snapshot, Storage};
use cnidarium::Storage;

use tendermint::v0_37::abci::{
request::CheckTx as CheckTxReq, request::CheckTxKind, response::CheckTx as CheckTxRsp,
MempoolRequest as Request, MempoolResponse as Response,
};
use tokio::sync::{mpsc, watch};
use tokio::sync::mpsc;
use tower_actor::Message;
use tracing::Instrument;

Expand All @@ -15,23 +15,15 @@ use crate::{app::App, metrics};
/// A mempool service that applies transaction checks against an isolated application fork.
pub struct Mempool {
queue: mpsc::Receiver<Message<Request, Response, tower::BoxError>>,
snapshot: Snapshot,
rx_snapshot: watch::Receiver<Snapshot>,
storage: Storage,
}

impl Mempool {
pub fn new(
storage: Storage,
queue: mpsc::Receiver<Message<Request, Response, tower::BoxError>>,
) -> Self {
let snapshot = storage.latest_snapshot();
let snapshot_rx = storage.subscribe();

Self {
queue,
snapshot,
rx_snapshot: snapshot_rx,
}
Self { queue, storage }
}

pub async fn check_tx(&mut self, req: Request) -> Result<Response, tower::BoxError> {
Expand All @@ -45,7 +37,7 @@ impl Mempool {
CheckTxKind::Recheck => "recheck",
};

let mut app = App::new(self.snapshot.clone());
let mut app = App::new(self.storage.latest_snapshot());

match app.deliver_tx_bytes(tx_bytes.as_ref()).await {
Ok(events) => {
Expand All @@ -72,31 +64,20 @@ impl Mempool {
}

pub async fn run(mut self) -> Result<(), tower::BoxError> {
loop {
tokio::select! {
// Use a biased select to poll for height changes *before* polling for messages.
biased;
// Check whether the height has changed, which requires us to throw away our
// ephemeral mempool state, and create a new one based on the new state.
change = self.rx_snapshot.changed() => {
if let Ok(()) = change {
let snapshot = self.rx_snapshot.borrow().clone();
tracing::debug!(height = ?snapshot.version(), "mempool has rewired to use the latest snapshot");
self.snapshot = snapshot;
} else {
tracing::info!("state notification channel closed, shutting down");
return Ok(());
}
}
message = self.queue.recv() => {
if let Some(Message {req, rsp_sender, span }) = message {
let _ = rsp_sender.send(self.check_tx(req).instrument(span).await);
} else {
// The queue is closed, so we're done.
return Ok(());
}
}
}
tracing::info!("mempool service started");
while let Some(Message {
req,
rsp_sender,
span,
// We could perform `CheckTx` asynchronously, and poll many
// entries from the queue:
// See https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.Receiver.html#method.recv_many
}) = self.queue.recv().await
{
let result = self.check_tx(req).instrument(span).await;
let _ = rsp_sender.send(result);
}
tracing::info!("mempool service stopped");
Ok(())
}
}

0 comments on commit a222cd8

Please sign in to comment.