Skip to content

Commit

Permalink
Fetch multiple pieces in the piece fetcher
Browse files Browse the repository at this point in the history
  • Loading branch information
teor2345 committed Oct 22, 2024
1 parent 01750d2 commit 170588b
Showing 1 changed file with 8 additions and 31 deletions.
39 changes: 8 additions & 31 deletions shared/subspace-data-retrieval/src/piece_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
use crate::object_fetcher::Error;
use crate::piece_getter::ObjectPieceGetter;
use futures::stream::FuturesOrdered;
use futures::TryStreamExt;
use futures::{StreamExt, TryStreamExt};
use subspace_core_primitives::pieces::{Piece, PieceIndex};
use tracing::{debug, trace};

Expand All @@ -42,40 +41,18 @@ where
);

// TODO:
// - consider using a semaphore to limit the number of concurrent requests, like
// download_segment_pieces()
// - if we're close to the number of pieces in a segment, use segment downloading and piece
// reconstruction instead
// Currently most objects are limited to 4 pieces, so this isn't needed yet.
let received_pieces = piece_indexes
.iter()
.map(|piece_index| async move {
match piece_getter.get_piece(*piece_index).await {
Ok(Some(piece)) => {
trace!(?piece_index, "Piece request succeeded",);
Ok(piece)
}
Ok(None) => {
trace!(?piece_index, "Piece not found");
Err(Error::PieceNotFound {
piece_index: *piece_index,
}
.into())
}
Err(error) => {
trace!(
%error,
?piece_index,
"Piece request caused an error",
);
Err(error)
}
}
})
.collect::<FuturesOrdered<_>>();
let received_pieces = piece_getter
.get_pieces(piece_indexes.iter().copied())
.await?;

// We want exact pieces, so any errors are fatal.
let received_pieces: Vec<Piece> = received_pieces.try_collect().await?;
let received_pieces: Vec<Piece> = received_pieces
.map(|(piece_index, maybe_piece)| maybe_piece?.ok_or(Error::PieceNotFound { piece_index }))
.try_collect()
.await?;

trace!(
count = piece_indexes.len(),
Expand Down

0 comments on commit 170588b

Please sign in to comment.