Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Fix tx-pool returning the same transaction multiple times (#6535)
Browse files Browse the repository at this point in the history
* Fix tx-pool returning the same transaction multiple times

This fixes a bug that lead to returning the same transaction multiple
times when iterating the `ready` iterator. Internally the transaction
was kept in the `best` list and could be duplicated in that list be
re-inserting it again. This `best` list is using a `TransactionRef`
which internally uses a `insertion_id`. This `insertion_id` could lead
to the same transaction being inserted multiple times into the `best`
list.

* Update client/transaction-pool/src/testing/pool.rs

Co-authored-by: Nikolay Volf <[email protected]>

Co-authored-by: Nikolay Volf <[email protected]>
  • Loading branch information
bkchr and NikVolf authored Jun 30, 2020
1 parent 0d0a84d commit 4eaea34
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 24 deletions.
24 changes: 11 additions & 13 deletions client/transaction-pool/graph/src/ready.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,12 +275,7 @@ impl<Hash: hash::Hash + Member + Serialize, Ex> ReadyTransactions<Hash, Ex> {
) -> Vec<Arc<Transaction<Hash, Ex>>> {
let mut removed = vec![];
let mut ready = self.ready.write();
loop {
let hash = match to_remove.pop() {
Some(hash) => hash,
None => return removed,
};

while let Some(hash) = to_remove.pop() {
if let Some(mut tx) = ready.remove(&hash) {
let invalidated = tx.transaction.transaction.provides
.iter()
Expand Down Expand Up @@ -319,6 +314,8 @@ impl<Hash: hash::Hash + Member + Serialize, Ex> ReadyTransactions<Hash, Ex> {
removed.push(tx.transaction.transaction);
}
}

removed
}

/// Removes transactions that provide given tag.
Expand All @@ -330,17 +327,16 @@ impl<Hash: hash::Hash + Member + Serialize, Ex> ReadyTransactions<Hash, Ex> {
let mut removed = vec![];
let mut to_remove = vec![tag];

loop {
let tag = match to_remove.pop() {
Some(tag) => tag,
None => return removed,
};

while let Some(tag) = to_remove.pop() {
let res = self.provided_tags.remove(&tag)
.and_then(|hash| self.ready.write().remove(&hash));
.and_then(|hash| self.ready.write().remove(&hash));

if let Some(tx) = res {
let unlocks = tx.unlocks;

// Make sure we remove it from best txs
self.best.remove(&tx.transaction);

let tx = tx.transaction.transaction;

// prune previous transactions as well
Expand Down Expand Up @@ -403,6 +399,8 @@ impl<Hash: hash::Hash + Member + Serialize, Ex> ReadyTransactions<Hash, Ex> {
removed.push(tx);
}
}

removed
}

/// Checks if the transaction is providing the same tags as other transactions.
Expand Down
8 changes: 4 additions & 4 deletions client/transaction-pool/src/revalidation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,14 +141,14 @@ impl<Api: ChainApi> RevalidationWorker<Api> {
// which they got into the pool
while left > 0 {
let first_block = match self.block_ordered.keys().next().cloned() {
Some(bn) => bn,
None => break,
Some(bn) => bn,
None => break,
};
let mut block_drained = false;
if let Some(extrinsics) = self.block_ordered.get_mut(&first_block) {
let to_queue = extrinsics.iter().take(left).cloned().collect::<Vec<_>>();
if to_queue.len() == extrinsics.len() {
block_drained = true;
block_drained = true;
} else {
for xt in &to_queue {
extrinsics.remove(xt);
Expand All @@ -159,7 +159,7 @@ impl<Api: ChainApi> RevalidationWorker<Api> {
}

if block_drained {
self.block_ordered.remove(&first_block);
self.block_ordered.remove(&first_block);
}
}

Expand Down
25 changes: 25 additions & 0 deletions client/transaction-pool/src/testing/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1066,3 +1066,28 @@ fn import_notification_to_pool_maintain_works() {
block_on(pool.maintain(evt.into()));
assert_eq!(pool.status().ready, 0);
}

// When we prune transactions, we need to make sure that we remove
#[test]
fn pruning_a_transaction_should_remove_it_from_best_transaction() {
let (pool, _guard, _notifier) = maintained_pool();

let xt1 = Extrinsic::IncludeData(Vec::new());

block_on(pool.submit_one(&BlockId::number(0), SOURCE, xt1.clone())).expect("1. Imported");
let header = pool.api.push_block(1, vec![xt1.clone()]);

// This will prune `xt1`.
block_on(pool.maintain(block_event(header)));

// Submit the tx again.
block_on(pool.submit_one(&BlockId::number(1), SOURCE, xt1.clone())).expect("2. Imported");

let mut iterator = block_on(pool.ready_at(1));

assert_eq!(iterator.next().unwrap().data, xt1.clone());

// If the tx was not removed from the best txs, the tx would be
// returned a second time by the iterator.
assert!(iterator.next().is_none());
}
14 changes: 12 additions & 2 deletions test-utils/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,20 @@ impl sp_runtime::traits::Dispatchable for Extrinsic {
}

impl Extrinsic {
/// Convert `&self` into `&Transfer`.
///
/// Panics if this is no `Transfer` extrinsic.
pub fn transfer(&self) -> &Transfer {
self.try_transfer().expect("cannot convert to transfer ref")
}

/// Try to convert `&self` into `&Transfer`.
///
/// Returns `None` if this is no `Transfer` extrinsic.
pub fn try_transfer(&self) -> Option<&Transfer> {
match self {
Extrinsic::Transfer { ref transfer, .. } => transfer,
_ => panic!("cannot convert to transfer ref"),
Extrinsic::Transfer { ref transfer, .. } => Some(transfer),
_ => None,
}
}
}
Expand Down
16 changes: 11 additions & 5 deletions test-utils/runtime/transaction-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,19 @@ impl sc_transaction_graph::ChainApi for TestApi {
) -> Self::ValidationFuture {
self.validation_requests.write().push(uxt.clone());

let chain_nonce = self.chain.read().nonces.get(&uxt.transfer().from).cloned().unwrap_or(0);
let requires = if chain_nonce == uxt.transfer().nonce {
vec![]
let (requires, provides) = if let Some(transfer) = uxt.try_transfer() {
let chain_nonce = self.chain.read().nonces.get(&transfer.from).cloned().unwrap_or(0);
let requires = if chain_nonce == transfer.nonce {
vec![]
} else {
vec![vec![chain_nonce as u8]]
};
let provides = vec![vec![transfer.nonce as u8]];

(requires, provides)
} else {
vec![vec![chain_nonce as u8]]
(Vec::new(), vec![uxt.encode()])
};
let provides = vec![vec![uxt.transfer().nonce as u8]];

if self.chain.read().invalid_hashes.contains(&self.hash_and_length(&uxt).0) {
return futures::future::ready(Ok(
Expand Down

0 comments on commit 4eaea34

Please sign in to comment.