diff --git a/crates/subspace-networking/src/utils/piece_provider.rs b/crates/subspace-networking/src/utils/piece_provider.rs index 24f7197188..e3b5e8e99f 100644 --- a/crates/subspace-networking/src/utils/piece_provider.rs +++ b/crates/subspace-networking/src/utils/piece_provider.rs @@ -94,15 +94,33 @@ where where PieceIndices: IntoIterator + 'a, { + let download_id = random::(); let (tx, mut rx) = mpsc::unbounded(); - let fut = get_from_cache_inner( - piece_indices.into_iter(), - &self.node, - &self.piece_validator, - tx, - &self.piece_downloading_semaphore, - ); - let mut fut = Box::pin(fut.fuse()); + let fut = async move { + // Download from connected peers first + let pieces_to_download = download_cached_pieces( + piece_indices.into_iter(), + &self.node, + &self.piece_validator, + &tx, + &self.piece_downloading_semaphore, + ) + .await; + + if pieces_to_download.is_empty() { + debug!("Done"); + return; + } + + for (piece_index, _closest_peers) in pieces_to_download { + tx.unbounded_send((piece_index, None)) + .expect("This future isn't polled after receiver is dropped; qed"); + } + + debug!("Done #2"); + }; + + let mut fut = Box::pin(fut.instrument(tracing::info_span!("", %download_id)).fuse()); // Drive above future and stream back any pieces that were downloaded so far stream::poll_fn(move |cx| { @@ -488,46 +506,6 @@ impl KademliaWrapper { } } -async fn get_from_cache_inner( - piece_indices: PieceIndices, - node: &Node, - piece_validator: &PV, - results: mpsc::UnboundedSender<(PieceIndex, Option)>, - piece_downloading_semaphore: &Semaphore, -) where - PV: PieceValidator, - PieceIndices: Iterator, -{ - let download_id = random::(); - - let fut = async move { - // Download from connected peers first - let pieces_to_download = download_cached_pieces( - piece_indices, - node, - piece_validator, - &results, - piece_downloading_semaphore, - ) - .await; - - if pieces_to_download.is_empty() { - debug!("Done"); - return; - } - - for (piece_index, _closest_peers) in pieces_to_download { - results - .unbounded_send((piece_index, None)) - .expect("This future isn't polled after receiver is dropped; qed"); - } - - debug!("Done #2"); - }; - - fut.instrument(tracing::info_span!("", %download_id)).await; -} - /// Takes pieces to download as an input, sends results with pieces that were downloaded /// successfully and returns those that were not downloaded from connected peer with addresses of /// potential candidates