Skip to content

Commit

Permalink
minor improvements + tests
Browse files Browse the repository at this point in the history
  • Loading branch information
michalkucharczyk committed Oct 29, 2024
1 parent c7c21df commit cdadf70
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -600,13 +600,13 @@ where
log::debug!(target: LOG_TARGET, "fatp::submit_at count:{} views:{}", xts.len(), self.active_views_count());
log_xt_trace!(target: LOG_TARGET, xts.iter().map(|xt| self.tx_hash(xt)), "[{:?}] fatp::submit_at");
let xts = xts.into_iter().map(Arc::from).collect::<Vec<_>>();
let mempool_result = self.mempool.extend_unwatched(source, &xts);
let mempool_results = self.mempool.extend_unwatched(source, &xts);

if view_store.is_empty() {
return future::ready(Ok(mempool_result)).boxed()
return future::ready(Ok(mempool_results)).boxed()
}

let to_be_submitted = mempool_result
let to_be_submitted = mempool_results
.iter()
.zip(xts)
.filter_map(|(result, xt)| result.as_ref().ok().map(|_| xt))
Expand All @@ -620,17 +620,17 @@ where
let results_map = view_store.submit(source, to_be_submitted.into_iter()).await;
let mut submission_results = reduce_multiview_result(results_map).into_iter();

Ok(mempool_result
Ok(mempool_results
.into_iter()
.map(|result| {
result.and_then(|xt_hash| {
let result = submission_results
submission_results
.next()
.expect("The number of Ok results in mempool is exactly the same as the size of to-views-submission result. qed.");
if result.is_err() {
mempool.remove(xt_hash);
};
result
.expect("The number of Ok results in mempool is exactly the same as the size of to-views-submission result. qed.")
.map_err(|err| {
mempool.remove(xt_hash);
err
})
})
})
.collect::<Vec<_>>())
Expand Down Expand Up @@ -682,11 +682,10 @@ where
let view_store = self.view_store.clone();
let mempool = self.mempool.clone();
async move {
let result = view_store.submit_and_watch(at, source, xt).await;
if result.is_err() {
view_store.submit_and_watch(at, source, xt).await.map_err(|err| {
mempool.remove(xt_hash);
};
result
err
})
}
.boxed()
}
Expand Down Expand Up @@ -1031,7 +1030,7 @@ where
future::join_all(results).await
}

/// Updates the given view with the transaction from the internal mempol.
/// Updates the given view with the transactions from the internal mempol.
///
/// All transactions from the mempool (excluding those which are either already imported or
/// already included in blocks since recently finalized block) are submitted to the
Expand Down Expand Up @@ -1114,12 +1113,9 @@ where
// out the invalid event, and remove transaction.
if self.view_store.is_empty() {
for result in watched_results {
match result {
Err(tx_hash) => {
self.view_store.listener.invalidate_transactions(&[tx_hash]);
self.mempool.remove(tx_hash);
},
Ok(_) => {},
if let Err(tx_hash) = result {
self.view_store.listener.invalidate_transactions(&[tx_hash]);
self.mempool.remove(tx_hash);
}
}
}
Expand Down
68 changes: 68 additions & 0 deletions substrate/client/transaction-pool/tests/fatp_limits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,74 @@ fn fatp_limits_ready_count_works() {
assert_ready_iterator!(header02a.hash(), pool, [xt2]);
}

#[test]
fn fatp_limits_ready_count_works_for_submit_at() {
sp_tracing::try_init_simple();

let builder = TestPoolBuilder::new();
let (pool, api, _) = builder.with_mempool_count_limit(3).with_ready_count(2).build();
api.set_nonce(api.genesis_hash(), Bob.into(), 200);
api.set_nonce(api.genesis_hash(), Charlie.into(), 500);

let header01 = api.push_block(1, vec![], true);

let event = new_best_block_event(&pool, None, header01.hash());
block_on(pool.maintain(event));

let xt0 = uxt(Charlie, 500);
let xt1 = uxt(Alice, 200);
let xt2 = uxt(Alice, 201);

let results = block_on(pool.submit_at(
header01.hash(),
SOURCE,
vec![xt0.clone(), xt1.clone(), xt2.clone()],
))
.unwrap();

assert!(matches!(results[0].as_ref().unwrap_err().0, TxPoolError::ImmediatelyDropped));
assert!(results[1].as_ref().is_ok());
assert!(results[2].as_ref().is_ok());
assert_eq!(pool.mempool_len().0, 2);
//charlie was not included into view:
assert_pool_status!(header01.hash(), &pool, 2, 0);
assert_ready_iterator!(header01.hash(), pool, [xt1, xt2]);
}

#[test]
fn fatp_limits_ready_count_works_for_submit_and_watch() {
sp_tracing::try_init_simple();

let builder = TestPoolBuilder::new();
let (pool, api, _) = builder.with_mempool_count_limit(3).with_ready_count(2).build();
api.set_nonce(api.genesis_hash(), Bob.into(), 300);
api.set_nonce(api.genesis_hash(), Charlie.into(), 500);

let header01 = api.push_block(1, vec![], true);

let event = new_best_block_event(&pool, None, header01.hash());
block_on(pool.maintain(event));

let xt0 = uxt(Charlie, 500);
let xt1 = uxt(Alice, 200);
let xt2 = uxt(Bob, 300);
api.set_priority(&xt0, 2);
api.set_priority(&xt1, 2);
api.set_priority(&xt2, 1);

let result0 = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone()));
let result1 = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone()));
let result2 = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone())).map(|_| ());

assert!(matches!(result2.unwrap_err().0, TxPoolError::ImmediatelyDropped));
assert!(result0.is_ok());
assert!(result1.is_ok());
assert_eq!(pool.mempool_len().1, 2);
//charlie was not included into view:
assert_pool_status!(header01.hash(), &pool, 2, 0);
assert_ready_iterator!(header01.hash(), pool, [xt0, xt1]);
}

#[test]
fn fatp_limits_future_count_works() {
sp_tracing::try_init_simple();
Expand Down

0 comments on commit cdadf70

Please sign in to comment.