Skip to content

Commit

Permalink
Inline get_from_cache_inner function
Browse files Browse the repository at this point in the history
  • Loading branch information
nazar-pc committed Nov 26, 2024
1 parent d0fce1a commit e5b9786
Showing 1 changed file with 26 additions and 48 deletions.
74 changes: 26 additions & 48 deletions crates/subspace-networking/src/utils/piece_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,33 @@ where
where
PieceIndices: IntoIterator<Item = PieceIndex> + 'a,
{
let download_id = random::<u64>();
let (tx, mut rx) = mpsc::unbounded();
let fut = get_from_cache_inner(
piece_indices.into_iter(),
&self.node,
&self.piece_validator,
tx,
&self.piece_downloading_semaphore,
);
let mut fut = Box::pin(fut.fuse());
let fut = async move {
// Download from connected peers first
let pieces_to_download = download_cached_pieces(
piece_indices.into_iter(),
&self.node,
&self.piece_validator,
&tx,
&self.piece_downloading_semaphore,
)
.await;

if pieces_to_download.is_empty() {
debug!("Done");
return;
}

for (piece_index, _closest_peers) in pieces_to_download {
tx.unbounded_send((piece_index, None))
.expect("This future isn't polled after receiver is dropped; qed");
}

debug!("Done #2");
};

let mut fut = Box::pin(fut.instrument(tracing::info_span!("", %download_id)).fuse());

// Drive above future and stream back any pieces that were downloaded so far
stream::poll_fn(move |cx| {
Expand Down Expand Up @@ -488,46 +506,6 @@ impl KademliaWrapper {
}
}

async fn get_from_cache_inner<PV, PieceIndices>(
piece_indices: PieceIndices,
node: &Node,
piece_validator: &PV,
results: mpsc::UnboundedSender<(PieceIndex, Option<Piece>)>,
piece_downloading_semaphore: &Semaphore,
) where
PV: PieceValidator,
PieceIndices: Iterator<Item = PieceIndex>,
{
let download_id = random::<u64>();

let fut = async move {
// Download from connected peers first
let pieces_to_download = download_cached_pieces(
piece_indices,
node,
piece_validator,
&results,
piece_downloading_semaphore,
)
.await;

if pieces_to_download.is_empty() {
debug!("Done");
return;
}

for (piece_index, _closest_peers) in pieces_to_download {
results
.unbounded_send((piece_index, None))
.expect("This future isn't polled after receiver is dropped; qed");
}

debug!("Done #2");
};

fut.instrument(tracing::info_span!("", %download_id)).await;
}

/// Takes pieces to download as an input, sends results with pieces that were downloaded
/// successfully and returns those that were not downloaded from connected peer with addresses of
/// potential candidates
Expand Down

0 comments on commit e5b9786

Please sign in to comment.