Skip to content

Commit

Permalink
Transaction pool -- fix rare cases where byte size tracking was inacc…
Browse files Browse the repository at this point in the history
…urate (kaspanet#535)

* remove crate-level mut access to inner mempool tx and fix bytes size tracking

* another fix to the same problem: only update_revalidated_transaction if validation result is ok, otherwise we remove it anyway so why update

* Apply suggestions from code review

Co-authored-by: Maxim <[email protected]>

* debug log `other`

---------

Co-authored-by: Maxim <[email protected]>
  • Loading branch information
michaelsutton and biryukovmaxim committed Sep 1, 2024
1 parent c839a9d commit 864aaf6
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 75 deletions.
111 changes: 53 additions & 58 deletions mining/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -717,10 +717,10 @@ impl MiningManager {
let _swo = Stopwatch::<60>::with_threshold("revalidate update_revalidated_transaction op");
for (transaction, validation_result) in chunk {
let transaction_id = transaction.id();
// Only consider transactions still being in the mempool since during the validation some might have been removed.
if mempool.update_revalidated_transaction(transaction) {
match validation_result {
Ok(()) => {
match validation_result {
Ok(()) => {
// Only consider transactions still being in the mempool since during the validation some might have been removed.
if mempool.update_revalidated_transaction(transaction) {
// A following transaction should not remove this one from the pool since we process in a topological order.
// Still, considering the (very unlikely) scenario of two high priority txs sandwiching a low one, where
// in this case topological order is not guaranteed since we only considered chained dependencies of
Expand All @@ -729,66 +729,55 @@ impl MiningManager {
// provided upon request.
valid_ids.push(transaction_id);
valid += 1;
} else {
other += 1;
}
Err(RuleError::RejectMissingOutpoint) => {
let transaction = mempool.get_transaction(&transaction_id, TransactionQuery::TransactionsOnly).unwrap();
let missing_txs = transaction
.entries
.iter()
.zip(transaction.tx.inputs.iter())
.flat_map(
|(entry, input)| {
if entry.is_none() {
Some(input.previous_outpoint.transaction_id)
} else {
None
}
},
)
.collect::<Vec<_>>();

// A transaction may have missing outpoints for legitimate reasons related to concurrency, like a race condition between
// an accepted block having not started yet or unfinished call to handle_new_block_transactions but already processed by
// the consensus and this ongoing call to revalidate.
//
// So we only remove the transaction and keep its redeemers in the mempool because we cannot be sure they are invalid, in
// fact in the race condition case they are valid regarding outpoints.
let extra_info = match missing_txs.len() {
0 => " but no missing tx!".to_string(), // this is never supposed to happen
1 => format!(" missing tx {}", missing_txs[0]),
n => format!(" with {} missing txs {}..{}", n, missing_txs[0], missing_txs.last().unwrap()),
};

// This call cleanly removes the invalid transaction.
let result = mempool.remove_transaction(
}
Err(RuleError::RejectMissingOutpoint) => {
let missing_txs = transaction
.entries
.iter()
.zip(transaction.tx.inputs.iter())
.filter_map(|(entry, input)| entry.is_none().then_some(input.previous_outpoint.transaction_id))
.collect::<Vec<_>>();

// A transaction may have missing outpoints for legitimate reasons related to concurrency, like a race condition between
// an accepted block having not started yet or unfinished call to handle_new_block_transactions but already processed by
// the consensus and this ongoing call to revalidate.
//
// So we only remove the transaction and keep its redeemers in the mempool because we cannot be sure they are invalid, in
// fact in the race condition case they are valid regarding outpoints.
let extra_info = match missing_txs.len() {
0 => " but no missing tx!".to_string(), // this is never supposed to happen
1 => format!(" missing tx {}", missing_txs[0]),
n => format!(" with {} missing txs {}..{}", n, missing_txs[0], missing_txs.last().unwrap()),
};

// This call cleanly removes the invalid transaction.
_ = mempool
.remove_transaction(
&transaction_id,
false,
TxRemovalReason::RevalidationWithMissingOutpoints,
extra_info.as_str(),
);
if let Err(err) = result {
warn!("Failed to remove transaction {} from mempool: {}", transaction_id, err);
}
missing_outpoint += 1;
}
Err(err) => {
// Rust rewrite note:
// The behavior changes here compared to the golang version.
// The failed revalidation is simply logged and the process continues.
warn!(
"Removing high priority transaction {0} and its redeemers, it failed revalidation with {1}",
transaction_id, err
);
// This call cleanly removes the invalid transaction and its redeemers.
let result = mempool.remove_transaction(&transaction_id, true, TxRemovalReason::Muted, "");
if let Err(err) = result {
warn!("Failed to remove transaction {} from mempool: {}", transaction_id, err);
}
invalid += 1;
}
)
.inspect_err(|err| warn!("Failed to remove transaction {} from mempool: {}", transaction_id, err));
missing_outpoint += 1;
}
Err(err) => {
// Rust rewrite note:
// The behavior changes here compared to the golang version.
// The failed revalidation is simply logged and the process continues.
warn!(
"Removing high priority transaction {0} and its redeemers, it failed revalidation with {1}",
transaction_id, err
);
// This call cleanly removes the invalid transaction and its redeemers.
_ = mempool
.remove_transaction(&transaction_id, true, TxRemovalReason::Muted, "")
.inspect_err(|err| warn!("Failed to remove transaction {} from mempool: {}", transaction_id, err));
invalid += 1;
}
} else {
other += 1;
}
}
if !valid_ids.is_empty() {
Expand All @@ -810,6 +799,12 @@ impl MiningManager {
missing_outpoint,
invalid,
);
if other > 0 {
debug!(
"During revalidation of high priority transactions {} txs were removed from the mempool by concurrent flows",
other
)
}
}
}
}
Expand Down
7 changes: 1 addition & 6 deletions mining/src/mempool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,7 @@ impl Mempool {
}

pub(crate) fn update_revalidated_transaction(&mut self, transaction: MutableTransaction) -> bool {
if let Some(tx) = self.transaction_pool.get_mut(&transaction.id()) {
tx.mtx = transaction;
true
} else {
false
}
self.transaction_pool.update_revalidated_transaction(transaction)
}

pub(crate) fn has_accepted_transaction(&self, transaction_id: &TransactionId) -> bool {
Expand Down
4 changes: 0 additions & 4 deletions mining/src/mempool/model/orphan_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,4 @@ impl Pool for OrphanPool {
fn chained(&self) -> &TransactionsEdges {
&self.chained_orphans
}

fn get_mut(&mut self, transaction_id: &TransactionId) -> Option<&mut MempoolTransaction> {
self.all_orphans.get_mut(transaction_id)
}
}
2 changes: 0 additions & 2 deletions mining/src/mempool/model/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ pub(crate) trait Pool {
self.all().get(transaction_id)
}

fn get_mut(&mut self, transaction_id: &TransactionId) -> Option<&mut MempoolTransaction>;

/// Returns the number of transactions in the pool
fn len(&self) -> usize {
self.all().len()
Expand Down
30 changes: 25 additions & 5 deletions mining/src/mempool/model/transactions_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,27 @@ pub(crate) struct TransactionsPool {
/// Mempool config
config: Arc<Config>,

/// Store of transactions
/// Store of transactions.
/// Any mutable access to this map should be carefully reviewed for consistency with all other collections
/// and fields of this struct. In particular, `estimated_size` must reflect the exact sum of estimated size
/// for all current transactions in this collection.
all_transactions: MempoolTransactionCollection,

/// Transactions dependencies formed by inputs present in pool - ancestor relations.
parent_transactions: TransactionsEdges,

/// Transactions dependencies formed by outputs present in pool - successor relations.
chained_transactions: TransactionsEdges,

/// Transactions with no parents in the mempool -- ready to be inserted into a block template
ready_transactions: Frontier,

last_expire_scan_daa_score: u64,

/// last expire scan time in milliseconds
last_expire_scan_time: u64,

/// Sum of estimated size for all transactions currently held in `all_transactions`
estimated_size: usize,

/// Store of UTXOs
Expand Down Expand Up @@ -167,9 +175,25 @@ impl TransactionsPool {
self.utxo_set.remove_transaction(&removed_tx.mtx, &parent_ids);
self.estimated_size -= removed_tx.mtx.mempool_estimated_bytes();

if self.all_transactions.is_empty() {
assert_eq!(0, self.estimated_size, "Sanity test -- if tx pool is empty, estimated byte size should be zero");
}

Ok(removed_tx)
}

pub(crate) fn update_revalidated_transaction(&mut self, transaction: MutableTransaction) -> bool {
if let Some(tx) = self.all_transactions.get_mut(&transaction.id()) {
// Make sure to update the overall estimated size since the updated transaction might have a different size
self.estimated_size -= tx.mtx.mempool_estimated_bytes();
tx.mtx = transaction;
self.estimated_size += tx.mtx.mempool_estimated_bytes();
true
} else {
false
}
}

pub(crate) fn ready_transaction_count(&self) -> usize {
self.ready_transactions.len()
}
Expand Down Expand Up @@ -339,8 +363,4 @@ impl Pool for TransactionsPool {
fn chained(&self) -> &TransactionsEdges {
&self.chained_transactions
}

fn get_mut(&mut self, transaction_id: &TransactionId) -> Option<&mut MempoolTransaction> {
self.all_transactions.get_mut(transaction_id)
}
}

0 comments on commit 864aaf6

Please sign in to comment.