Skip to content

Commit

Permalink
Adds pending_outputs field on mempool Storage and responds to pendi…
Browse files Browse the repository at this point in the history
…ng outputs requests when inserting new transactions into the mempool's verified set
  • Loading branch information
arya2 committed Sep 7, 2024
1 parent 0a72b41 commit 2fc7829
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 22 deletions.
21 changes: 12 additions & 9 deletions zebra-consensus/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -655,17 +655,20 @@ where
.oneshot(mempool::Request::UnspentOutput(spent_mempool_outpoint));

let mempool::Response::UnspentOutput(output) = query.await? else {
unreachable!("UnspentOutput always responds with Option<Output>")
unreachable!("UnspentOutput always responds with UnspentOutput")
};

let Some(output) = output else {
// TODO: Add an `AwaitOutput` mempool Request to wait for another transaction
// to be added to the mempool that creates the required output and call
// here with a short timeout (everything will be invalidated by a chain
// tip change anyway, and the tx verifier should poll the mempool so it
// checks for new created outputs at the queued pending output outpoint
// almost immediately after a tx is verified).
return Err(TransactionError::TransparentInputNotFound);
let output = if let Some(output) = output {
output
} else {
let query = mempool
.clone()
.oneshot(mempool::Request::AwaitOutput(spent_mempool_outpoint));
if let mempool::Response::UnspentOutput(output) = query.await? {
output.ok_or(TransactionError::TransparentInputNotFound)?
} else {
unreachable!("AwaitOutput always responds with UnspentOutput")
}
};

spent_outputs.push(output.clone());
Expand Down
19 changes: 18 additions & 1 deletion zebra-node-services/src/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,27 @@ pub enum Request {
/// the [`AuthDigest`](zebra_chain::transaction::AuthDigest).
TransactionsByMinedId(HashSet<transaction::Hash>),

/// Looks up a UTXO in the mempool transparent identified by the given [`OutPoint`](transparent::OutPoint),
/// Looks up a [`transparent::Output`] in the mempool identified by the given [`OutPoint`](transparent::OutPoint),
/// returning `None` immediately if it is unknown.
///
/// Does not gaurantee that the output will remain in the mempool or that it is unspent.

Check failure on line 48 in zebra-node-services/src/mempool.rs

View workflow job for this annotation

GitHub Actions / codespell

gaurantee ==> guarantee
UnspentOutput(transparent::OutPoint),

/// Request a [`transparent::Output`] identified by the given [`OutPoint`](transparent::OutPoint),
/// waiting until it becomes available if it is unknown.
///
/// This request is purely informational, and there are no guarantees about
/// whether the UTXO remains unspent or is on the best chain, or any chain.
/// Its purpose is to allow orphaned mempool transaction verification.
///
/// # Correctness
///
/// Output requests should be wrapped in a timeout, so that
/// out-of-order and invalid requests do not hang indefinitely.
///
/// Outdated requests are pruned on a regular basis.
AwaitOutput(transparent::OutPoint),

/// Get all the [`VerifiedUnminedTx`] in the mempool.
///
/// Equivalent to `TransactionsById(TransactionIds)`,
Expand Down
12 changes: 12 additions & 0 deletions zebrad/src/components/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ mod crawler;
pub mod downloads;
mod error;
pub mod gossip;
mod pending_outputs;
mod queue_checker;
mod storage;

Expand Down Expand Up @@ -741,6 +742,16 @@ impl Service<Request> for Mempool {
async move { Ok(Response::UnspentOutput(res)) }.boxed()
}

Request::AwaitOutput(outpoint) => {
trace!(?req, "got mempool request");

let response_fut = storage.pending_outputs.queue(outpoint);

trace!("answered mempool request");

response_fut.boxed()
}

#[cfg(feature = "getblocktemplate-rpcs")]
Request::FullTransactions => {
trace!(?req, "got mempool request");
Expand Down Expand Up @@ -820,6 +831,7 @@ impl Service<Request> for Mempool {
Request::TransactionsById(_) => Response::Transactions(Default::default()),
Request::TransactionsByMinedId(_) => Response::Transactions(Default::default()),
Request::UnspentOutput(_) => Response::UnspentOutput(None),
Request::AwaitOutput(_) => Response::UnspentOutput(None),

#[cfg(feature = "getblocktemplate-rpcs")]
Request::FullTransactions => {
Expand Down
64 changes: 64 additions & 0 deletions zebrad/src/components/mempool/pending_outputs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
//! Pending [`transparent::Output`] tracker for [`AwaitOutput` requests](zebra_node_services::Mempool::Request::AwaitOutput).

Check failure on line 1 in zebrad/src/components/mempool/pending_outputs.rs

View workflow job for this annotation

GitHub Actions / Build and Deploy Zebra Internal Docs

unresolved link to `zebra_node_services::Mempool::Request::AwaitOutput`

use std::{collections::HashMap, future::Future};

use tokio::sync::broadcast;

use tower::BoxError;
use zebra_chain::transparent;

use zebra_node_services::mempool::Response;

#[derive(Debug, Default)]
pub struct PendingOutputs(HashMap<transparent::OutPoint, broadcast::Sender<transparent::Output>>);

impl PendingOutputs {
/// Returns a future that will resolve to the `transparent::Output` pointed
/// to by the given `transparent::OutPoint` when it is available.
pub fn queue(
&mut self,
outpoint: transparent::OutPoint,
) -> impl Future<Output = Result<Response, BoxError>> {
let mut receiver = self
.0
.entry(outpoint)
.or_insert_with(|| {
let (sender, _) = broadcast::channel(1);
sender
})
.subscribe();

async move {
receiver
.recv()
.await
.map(Some)
.map(Response::UnspentOutput)
.map_err(BoxError::from)
}
}

/// Notify all requests waiting for the [`transparent::Output`] pointed to by
/// the given [`transparent::OutPoint`] that the [`transparent::Output`] has
/// arrived.
#[inline]
pub fn respond(&mut self, outpoint: &transparent::OutPoint, output: transparent::Output) {
if let Some(sender) = self.0.remove(outpoint) {
// Adding the outpoint as a field lets us cross-reference
// with the trace of the verification that made the request.
tracing::trace!(?outpoint, "found pending mempool output");
let _ = sender.send(output);
}
}

/// Scan the set of waiting Output requests for channels where all receivers
/// have been dropped and remove the corresponding sender.
pub fn prune(&mut self) {

Check warning on line 56 in zebrad/src/components/mempool/pending_outputs.rs

View workflow job for this annotation

GitHub Actions / Check Cargo.lock is up to date

methods `prune` and `len` are never used

Check warning on line 56 in zebrad/src/components/mempool/pending_outputs.rs

View workflow job for this annotation

GitHub Actions / Build and Deploy Zebra Internal Docs

methods `prune` and `len` are never used

Check failure on line 56 in zebrad/src/components/mempool/pending_outputs.rs

View workflow job for this annotation

GitHub Actions / Clippy (stable) Results

methods `prune` and `len` are never used

error: methods `prune` and `len` are never used --> zebrad/src/components/mempool/pending_outputs.rs:56:12 | 15 | impl PendingOutputs { | ------------------- methods in this implementation ... 56 | pub fn prune(&mut self) { | ^^^^^ ... 61 | pub fn len(&self) -> usize { | ^^^ | = note: `-D dead-code` implied by `-D warnings` = help: to override `-D warnings` add `#[allow(dead_code)]`

Check failure on line 56 in zebrad/src/components/mempool/pending_outputs.rs

View workflow job for this annotation

GitHub Actions / Clippy (stable) Results

methods `prune` and `len` are never used

error: methods `prune` and `len` are never used --> zebrad/src/components/mempool/pending_outputs.rs:56:12 | 15 | impl PendingOutputs { | ------------------- methods in this implementation ... 56 | pub fn prune(&mut self) { | ^^^^^ ... 61 | pub fn len(&self) -> usize { | ^^^ | = note: `-D dead-code` implied by `-D warnings` = help: to override `-D warnings` add `#[allow(dead_code)]`

Check failure on line 56 in zebrad/src/components/mempool/pending_outputs.rs

View workflow job for this annotation

GitHub Actions / Clippy (stable) Results

methods `prune` and `len` are never used

error: methods `prune` and `len` are never used --> zebrad/src/components/mempool/pending_outputs.rs:56:12 | 15 | impl PendingOutputs { | ------------------- methods in this implementation ... 56 | pub fn prune(&mut self) { | ^^^^^ ... 61 | pub fn len(&self) -> usize { | ^^^ | = note: `-D dead-code` implied by `-D warnings` = help: to override `-D warnings` add `#[allow(dead_code)]`

Check failure on line 56 in zebrad/src/components/mempool/pending_outputs.rs

View workflow job for this annotation

GitHub Actions / Build zebrad crate

methods `prune` and `len` are never used

Check warning on line 56 in zebrad/src/components/mempool/pending_outputs.rs

View workflow job for this annotation

GitHub Actions / Install zebrad from lockfile without cache on ubuntu-latest

methods `prune` and `len` are never used

Check warning on line 56 in zebrad/src/components/mempool/pending_outputs.rs

View workflow job for this annotation

GitHub Actions / Integration tests / lightwalletd tip update / Create lwd-update-sync cached state image

methods `prune` and `len` are never used

Check warning on line 56 in zebrad/src/components/mempool/pending_outputs.rs

View workflow job for this annotation

GitHub Actions / Integration tests / lightwalletd tip update / Create lwd-update-sync cached state image

methods `prune` and `len` are never used

Check warning on line 56 in zebrad/src/components/mempool/pending_outputs.rs

View workflow job for this annotation

GitHub Actions / Integration tests / lightwalletd tip update / Create lwd-update-sync cached state image

methods `prune` and `len` are never used

Check warning on line 56 in zebrad/src/components/mempool/pending_outputs.rs

View workflow job for this annotation

GitHub Actions / Integration tests / lightwalletd tip update / Create lwd-update-sync cached state image

methods `prune` and `len` are never used

Check warning on line 56 in zebrad/src/components/mempool/pending_outputs.rs

View workflow job for this annotation

GitHub Actions / Integration tests / lightwalletd tip update / Create lwd-update-sync cached state image

methods `prune` and `len` are never used

Check warning on line 56 in zebrad/src/components/mempool/pending_outputs.rs

View workflow job for this annotation

GitHub Actions / Integration tests / lightwalletd tip update / Create lwd-update-sync cached state image

methods `prune` and `len` are never used

Check warning on line 56 in zebrad/src/components/mempool/pending_outputs.rs

View workflow job for this annotation

GitHub Actions / Integration tests / lightwalletd tip update / Create lwd-update-sync cached state image

methods `prune` and `len` are never used

Check warning on line 56 in zebrad/src/components/mempool/pending_outputs.rs

View workflow job for this annotation

GitHub Actions / Integration tests / Zebra tip update / Create update-to-tip cached state image

methods `prune` and `len` are never used

Check warning on line 56 in zebrad/src/components/mempool/pending_outputs.rs

View workflow job for this annotation

GitHub Actions / Integration tests / Zebra tip update / Create update-to-tip cached state image

methods `prune` and `len` are never used

Check warning on line 56 in zebrad/src/components/mempool/pending_outputs.rs

View workflow job for this annotation

GitHub Actions / Integration tests / Zebra tip update / Create update-to-tip cached state image

methods `prune` and `len` are never used

Check warning on line 56 in zebrad/src/components/mempool/pending_outputs.rs

View workflow job for this annotation

GitHub Actions / Integration tests / Zebra tip update / Create update-to-tip cached state image

methods `prune` and `len` are never used

Check warning on line 56 in zebrad/src/components/mempool/pending_outputs.rs

View workflow job for this annotation

GitHub Actions / Integration tests / Zebra tip update / Create update-to-tip cached state image

methods `prune` and `len` are never used

Check warning on line 56 in zebrad/src/components/mempool/pending_outputs.rs

View workflow job for this annotation

GitHub Actions / Integration tests / Generate checkpoints testnet / Create checkpoints-testnet cached state image

methods `prune` and `len` are never used

Check warning on line 56 in zebrad/src/components/mempool/pending_outputs.rs

View workflow job for this annotation

GitHub Actions / Integration tests / Generate checkpoints testnet / Create checkpoints-testnet cached state image

methods `prune` and `len` are never used

Check warning on line 56 in zebrad/src/components/mempool/pending_outputs.rs

View workflow job for this annotation

GitHub Actions / Integration tests / Generate checkpoints testnet / Create checkpoints-testnet cached state image

methods `prune` and `len` are never used

Check warning on line 56 in zebrad/src/components/mempool/pending_outputs.rs

View workflow job for this annotation

GitHub Actions / Integration tests / Scan starts where left / Create scan-start-where-left cached state image

methods `prune` and `len` are never used

Check warning on line 56 in zebrad/src/components/mempool/pending_outputs.rs

View workflow job for this annotation

GitHub Actions / Integration tests / Scan starts where left / Create scan-start-where-left cached state image

methods `prune` and `len` are never used

Check warning on line 56 in zebrad/src/components/mempool/pending_outputs.rs

View workflow job for this annotation

GitHub Actions / Integration tests / Scan starts where left / Create scan-start-where-left cached state image

methods `prune` and `len` are never used

Check warning on line 56 in zebrad/src/components/mempool/pending_outputs.rs

View workflow job for this annotation

GitHub Actions / Integration tests / Scan starts where left / Create scan-start-where-left cached state image

methods `prune` and `len` are never used

Check warning on line 56 in zebrad/src/components/mempool/pending_outputs.rs

View workflow job for this annotation

GitHub Actions / Integration tests / Scan starts where left / Create scan-start-where-left cached state image

methods `prune` and `len` are never used

Check warning on line 56 in zebrad/src/components/mempool/pending_outputs.rs

View workflow job for this annotation

GitHub Actions / Integration tests / Scan starts where left / Create scan-start-where-left cached state image

methods `prune` and `len` are never used

Check warning on line 56 in zebrad/src/components/mempool/pending_outputs.rs

View workflow job for this annotation

GitHub Actions / Integration tests / Scan starts where left / Create scan-start-where-left cached state image

methods `prune` and `len` are never used

Check warning on line 56 in zebrad/src/components/mempool/pending_outputs.rs

View workflow job for this annotation

GitHub Actions / Integration tests / Scan starts where left / Create scan-start-where-left cached state image

methods `prune` and `len` are never used

Check warning on line 56 in zebrad/src/components/mempool/pending_outputs.rs

View workflow job for this annotation

GitHub Actions / Integration tests / Scan starts where left / Create scan-start-where-left cached state image

methods `prune` and `len` are never used

Check warning on line 56 in zebrad/src/components/mempool/pending_outputs.rs

View workflow job for this annotation

GitHub Actions / Test beta on ubuntu-latest

methods `prune` and `len` are never used

Check warning on line 56 in zebrad/src/components/mempool/pending_outputs.rs

View workflow job for this annotation

GitHub Actions / Test stable on ubuntu-latest

methods `prune` and `len` are never used

Check warning on line 56 in zebrad/src/components/mempool/pending_outputs.rs

View workflow job for this annotation

GitHub Actions / Test stable on macos-latest

methods `prune` and `len` are never used

Check warning on line 56 in zebrad/src/components/mempool/pending_outputs.rs

View workflow job for this annotation

GitHub Actions / Test stable on macos-latest

methods `prune` and `len` are never used

Check warning on line 56 in zebrad/src/components/mempool/pending_outputs.rs

View workflow job for this annotation

GitHub Actions / Test stable on windows-latest

methods `prune` and `len` are never used

Check warning on line 56 in zebrad/src/components/mempool/pending_outputs.rs

View workflow job for this annotation

GitHub Actions / Test beta on windows-latest

methods `prune` and `len` are never used
self.0.retain(|_, chan| chan.receiver_count() > 0);
}

/// Returns the number of Outputs that are being waited on.
pub fn len(&self) -> usize {
self.0.len()
}
}
17 changes: 12 additions & 5 deletions zebrad/src/components/mempool/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ use zebra_chain::{
};

use self::{eviction_list::EvictionList, verified_set::VerifiedSet};
use super::{config, downloads::TransactionDownloadVerifyError, MempoolError};
use super::{
config, downloads::TransactionDownloadVerifyError, pending_outputs::PendingOutputs,
MempoolError,
};

#[cfg(any(test, feature = "proptest-impl"))]
use proptest_derive::Arbitrary;
Expand Down Expand Up @@ -119,13 +122,13 @@ pub enum RejectionError {
}

/// Hold mempool verified and rejected mempool transactions.
//
// Add a `pending_outputs` field similar to the `pending_utxos` field in the state service
// for queuing outpoint queries.
pub struct Storage {
/// The set of verified transactions in the mempool.
verified: VerifiedSet,

/// The set of outpoints with pending requests for their associated transparent::Output.
pub(super) pending_outputs: PendingOutputs,

/// The set of transactions rejected due to bad authorizations, or for other
/// reasons, and their rejection reasons. These rejections only apply to the
/// current tip.
Expand Down Expand Up @@ -175,6 +178,7 @@ impl Storage {
tx_cost_limit: config.tx_cost_limit,
eviction_memory_time: config.eviction_memory_time,
verified: Default::default(),
pending_outputs: Default::default(),
tip_rejected_exact: Default::default(),
tip_rejected_same_effects: Default::default(),
chain_rejected_same_effects: Default::default(),
Expand Down Expand Up @@ -228,7 +232,10 @@ impl Storage {

// Then, we try to insert into the pool. If this fails the transaction is rejected.
let mut result = Ok(tx_id);
if let Err(rejection_error) = self.verified.insert(tx, spent_mempool_outpoints) {
if let Err(rejection_error) =
self.verified
.insert(tx, spent_mempool_outpoints, &mut self.pending_outputs)
{
tracing::debug!(
?tx_id,
?rejection_error,
Expand Down
26 changes: 19 additions & 7 deletions zebrad/src/components/mempool/storage/verified_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use zebra_chain::{
transparent,
};

use crate::components::mempool::pending_outputs::PendingOutputs;

use super::super::SameEffectsTipRejectionError;

// Imports for doc links
Expand Down Expand Up @@ -184,6 +186,7 @@ impl VerifiedSet {
&mut self,
transaction: VerifiedUnminedTx,
spent_mempool_outpoints: Vec<transparent::OutPoint>,
pending_outputs: &mut PendingOutputs,
) -> Result<(), SameEffectsTipRejectionError> {
if self.has_spend_conflicts(&transaction.transaction) {
return Err(SameEffectsTipRejectionError::SpendConflict);
Expand All @@ -199,12 +202,15 @@ impl VerifiedSet {
}

let tx_id = transaction.transaction.id;

// TODO: Update `transaction_dependencies`
self.transaction_dependencies
.add(tx_id.mined_id(), spent_mempool_outpoints);

self.cache_outputs_from(tx_id.mined_id(), &transaction.transaction.transaction);
self.cache_outputs_and_respond_to_pending_output_requests_from(
tx_id.mined_id(),
&transaction.transaction.transaction,
pending_outputs,
);

self.transactions_serialized_size += transaction.transaction.size;
self.total_cost += transaction.cost();
self.transactions.insert(tx_id, transaction);
Expand Down Expand Up @@ -313,11 +319,17 @@ impl VerifiedSet {
|| Self::has_conflicts(&self.orchard_nullifiers, tx.orchard_nullifiers().copied())
}

/// Inserts the transaction's outputs into the internal caches.
fn cache_outputs_from(&mut self, tx_hash: transaction::Hash, tx: &Transaction) {
/// Inserts the transaction's outputs into the internal caches and responds to pending output requests.
fn cache_outputs_and_respond_to_pending_output_requests_from(
&mut self,
tx_hash: transaction::Hash,
tx: &Transaction,
pending_outputs: &mut PendingOutputs,
) {
for (index, output) in tx.outputs().iter().cloned().enumerate() {
self.created_outputs
.insert(transparent::OutPoint::from_usize(tx_hash, index), output);
let outpoint = transparent::OutPoint::from_usize(tx_hash, index);
self.created_outputs.insert(outpoint, output.clone());
pending_outputs.respond(&outpoint, output)
}

self.spent_outpoints.extend(tx.spent_outpoints());
Expand Down

0 comments on commit 2fc7829

Please sign in to comment.