diff --git a/Cargo.lock b/Cargo.lock index b26117bc90..a4e75240bb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12470,7 +12470,6 @@ name = "subspace-data-retrieval" version = "0.1.0" dependencies = [ "anyhow", - "async-lock 3.4.0", "async-trait", "futures", "parity-scale-codec", diff --git a/shared/subspace-data-retrieval/Cargo.toml b/shared/subspace-data-retrieval/Cargo.toml index 55119acd13..73eb748d12 100644 --- a/shared/subspace-data-retrieval/Cargo.toml +++ b/shared/subspace-data-retrieval/Cargo.toml @@ -13,7 +13,6 @@ include = [ [dependencies] anyhow = "1.0.89" -async-lock = "3.4.0" async-trait = "0.1.83" futures = "0.3.31" parity-scale-codec = { version = "3.6.12", features = ["derive"] } diff --git a/shared/subspace-data-retrieval/src/segment_fetcher.rs b/shared/subspace-data-retrieval/src/segment_fetcher.rs index 686f5e7290..679039977a 100644 --- a/shared/subspace-data-retrieval/src/segment_fetcher.rs +++ b/shared/subspace-data-retrieval/src/segment_fetcher.rs @@ -16,9 +16,7 @@ //! Fetching segments of the archived history of Subspace Network. use crate::piece_getter::ObjectPieceGetter; -use async_lock::Semaphore; -use futures::stream::FuturesUnordered; -use futures::StreamExt; +use futures::{stream, StreamExt}; use subspace_archiving::archiver::Segment; use subspace_archiving::reconstructor::{Reconstructor, ReconstructorError}; use subspace_core_primitives::pieces::Piece; @@ -72,9 +70,6 @@ where } /// Concurrently downloads the pieces for `segment_index`. -// This code was copied and modified from subspace_service::sync_from_dsn::download_and_reconstruct_blocks(): -// -// // TODO: pass a lower concurrency limit into this function, to avoid overwhelming residential routers or slow connections pub async fn download_segment_pieces( segment_index: SegmentIndex, @@ -85,66 +80,87 @@ where { debug!(%segment_index, "Retrieving pieces of the segment"); - let semaphore = &Semaphore::new(RecordedHistorySegment::NUM_RAW_RECORDS); - - let mut received_segment_pieces = segment_index - .segment_piece_indexes_source_first() - .into_iter() - .map(|piece_index| { - // Source pieces will acquire permit here right away - let maybe_permit = semaphore.try_acquire(); - - async move { - let permit = match maybe_permit { - Some(permit) => permit, - None => { - // Other pieces will acquire permit here instead - semaphore.acquire().await - } - }; - let piece = match piece_getter.get_piece(piece_index).await { - Ok(Some(piece)) => piece, - Ok(None) => { - trace!(?piece_index, "Piece not found"); - return None; - } - Err(error) => { - trace!( - %error, - ?piece_index, - "Piece request failed", - ); - return None; - } - }; - - trace!(?piece_index, "Piece request succeeded"); - - // Piece was received successfully, "remove" this slot from semaphore - permit.forget(); - Some((piece_index, piece)) - } - }) - .collect::>(); - + // We want NUM_RAW_RECORDS pieces to reconstruct the segment, but it doesn't matter exactly which ones. + let piece_indexes = segment_index.segment_piece_indexes_source_first(); + let mut piece_indexes = piece_indexes.as_slice(); let mut segment_pieces = vec![None::; ArchivedHistorySegment::NUM_PIECES]; + + let mut pieces_pending = 0; let mut pieces_received = 0; + let mut piece_streams = Vec::new(); + + // Loop Invariant: + // - the number of remaining piece indexes gets smaller, eventually finishing the fetcher, or + // - the number of pending pieces gets smaller, eventually triggering another batch. + // We also exit early if we have enough pieces to reconstruct a segment. + 'fetcher: while !piece_indexes.is_empty() + && pieces_received < RecordedHistorySegment::NUM_RAW_RECORDS + { + // Request the number of pieces needed to reconstruct the segment, assuming all pending + // pieces are successful. + let batch_size = RecordedHistorySegment::NUM_RAW_RECORDS - pieces_received - pieces_pending; + if batch_size > 0 { + let (batch_indexes, remaining_piece_indexes) = piece_indexes + .split_at_checked(batch_size) + .unwrap_or((piece_indexes, &[])); + // Invariant: the number of remaining piece indexes gets smaller. + piece_indexes = remaining_piece_indexes; + + let stream = piece_getter.get_pieces(batch_indexes.iter().cloned()).await; + match stream { + Ok(stream) => { + piece_streams.push(stream); + pieces_pending += batch_size; + } + Err(error) => { + // A single batch failure isn't fatal. + debug!( + ?error, + %segment_index, + batch_size, + pieces_pending, + pieces_received, + pieces_needed = RecordedHistorySegment::NUM_RAW_RECORDS, + "Segment piece getter batch failed" + ); + } + } + } - while let Some(maybe_piece) = received_segment_pieces.next().await { - let Some((piece_index, piece)) = maybe_piece else { - continue; + // Poll all the batches concurrently, getting all finished pieces. + let piece_responses = stream::iter(&mut piece_streams) + .flatten_unordered(None) + .ready_chunks(RecordedHistorySegment::NUM_RAW_RECORDS) + .next() + .await; + + let Some(piece_responses) = piece_responses else { + // All streams have finished, perhaps abnormally, so reset the number of pending pieces. + // Invariant: the number of pending pieces gets smaller. + pieces_pending = 0; + continue 'fetcher; }; - segment_pieces - .get_mut(piece_index.position() as usize) - .expect("Piece position is by definition within segment; qed") - .replace(piece); + // Process the piece responses. + 'processor: for maybe_piece in piece_responses { + // Invariant: the number of pending pieces gets smaller. + pieces_pending -= 1; + + let (piece_index, Ok(Some(piece))) = maybe_piece else { + continue 'processor; + }; - pieces_received += 1; + segment_pieces + .get_mut(piece_index.position() as usize) + .expect("Piece position is by definition within segment; qed") + .replace(piece); - if pieces_received >= RecordedHistorySegment::NUM_RAW_RECORDS { - trace!(%segment_index, "Received half of the segment."); - break; + pieces_received += 1; + + if pieces_received >= RecordedHistorySegment::NUM_RAW_RECORDS { + trace!(%segment_index, "Received half of the segment."); + break 'fetcher; + } } }