diff --git a/Cargo.lock b/Cargo.lock index 0fb8a33065..e75272425a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12511,6 +12511,7 @@ dependencies = [ name = "subspace-data-retrieval" version = "0.1.0" dependencies = [ + "anyhow", "async-lock 3.4.0", "async-trait", "futures", diff --git a/crates/subspace-gateway-rpc/src/lib.rs b/crates/subspace-gateway-rpc/src/lib.rs index af04fed933..d0756ec0e4 100644 --- a/crates/subspace-gateway-rpc/src/lib.rs +++ b/crates/subspace-gateway-rpc/src/lib.rs @@ -8,6 +8,7 @@ use std::ops::{Deref, DerefMut}; use subspace_core_primitives::hashes::{blake3_hash, Blake3Hash}; use subspace_core_primitives::objects::GlobalObjectMapping; use subspace_data_retrieval::object_fetcher::{self, ObjectFetcher}; +use subspace_data_retrieval::piece_getter::ObjectPieceGetter; use tracing::debug; const SUBSPACE_ERROR: i32 = 9000; @@ -99,22 +100,32 @@ pub trait SubspaceGatewayRpcApi { #[method(name = "subspace_fetchObject")] async fn fetch_object(&self, mappings: GlobalObjectMapping) -> Result, Error>; } + /// Subspace Gateway RPC configuration -pub struct SubspaceGatewayRpcConfig { +pub struct SubspaceGatewayRpcConfig +where + PG: ObjectPieceGetter + Send + Sync + 'static, +{ /// DSN object fetcher instance. - pub object_fetcher: ObjectFetcher, + pub object_fetcher: ObjectFetcher, } /// Implements the [`SubspaceGatewayRpcApiServer`] trait for interacting with the Subspace Gateway. -pub struct SubspaceGatewayRpc { +pub struct SubspaceGatewayRpc +where + PG: ObjectPieceGetter + Send + Sync + 'static, +{ /// DSN object fetcher instance. - object_fetcher: ObjectFetcher, + object_fetcher: ObjectFetcher, } /// [`SubspaceGatewayRpc`] is used to fetch objects from the DSN. -impl SubspaceGatewayRpc { +impl SubspaceGatewayRpc +where + PG: ObjectPieceGetter + Send + Sync + 'static, +{ /// Creates a new instance of the `SubspaceGatewayRpc` handler. - pub fn new(config: SubspaceGatewayRpcConfig) -> Self { + pub fn new(config: SubspaceGatewayRpcConfig) -> Self { Self { object_fetcher: config.object_fetcher, } @@ -122,10 +133,11 @@ impl SubspaceGatewayRpc { } #[async_trait] -impl SubspaceGatewayRpcApiServer for SubspaceGatewayRpc { +impl SubspaceGatewayRpcApiServer for SubspaceGatewayRpc +where + PG: ObjectPieceGetter + Send + Sync + 'static, +{ async fn fetch_object(&self, mappings: GlobalObjectMapping) -> Result, Error> { - // TODO: deny unsafe RPC calls - let count = mappings.objects().len(); if count > MAX_OBJECTS_PER_REQUEST { debug!(%count, %MAX_OBJECTS_PER_REQUEST, "Too many mappings in request"); diff --git a/crates/subspace-gateway/src/commands/run.rs b/crates/subspace-gateway/src/commands/run.rs index 208ec56bce..221bf8bb27 100644 --- a/crates/subspace-gateway/src/commands/run.rs +++ b/crates/subspace-gateway/src/commands/run.rs @@ -102,7 +102,7 @@ pub async fn run(run_options: RunOptions) -> anyhow::Result<()> { Semaphore::new(out_connections as usize * PIECE_PROVIDER_MULTIPLIER), ); let piece_getter = DsnPieceGetter::new(piece_provider); - let object_fetcher = ObjectFetcher::new(piece_getter, erasure_coding, Some(max_size)); + let object_fetcher = ObjectFetcher::new(piece_getter.into(), erasure_coding, Some(max_size)); let rpc_api = SubspaceGatewayRpc::new(SubspaceGatewayRpcConfig { object_fetcher }); let rpc_handle = launch_rpc_server(rpc_api, rpc_options).await?; diff --git a/crates/subspace-gateway/src/commands/run/rpc.rs b/crates/subspace-gateway/src/commands/run/rpc.rs index a7e58bb234..99a504ce59 100644 --- a/crates/subspace-gateway/src/commands/run/rpc.rs +++ b/crates/subspace-gateway/src/commands/run/rpc.rs @@ -3,6 +3,7 @@ use clap::Parser; use jsonrpsee::server::{ServerBuilder, ServerHandle}; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use subspace_data_retrieval::piece_getter::ObjectPieceGetter; use subspace_gateway_rpc::{SubspaceGatewayRpc, SubspaceGatewayRpcApiServer}; use tracing::info; @@ -27,10 +28,13 @@ pub(crate) struct RpcOptions { // - add an argument for a custom tokio runtime // - move this RPC code into a new library part of this crate // - make a RPC config that is independent of clap -pub async fn launch_rpc_server( - rpc_api: SubspaceGatewayRpc, - rpc_options: RpcOptions

, -) -> anyhow::Result { +pub async fn launch_rpc_server( + rpc_api: SubspaceGatewayRpc, + rpc_options: RpcOptions, +) -> anyhow::Result +where + PG: ObjectPieceGetter + Send + Sync + 'static, +{ let server = ServerBuilder::default() .build(rpc_options.rpc_listen_on) .await?; diff --git a/crates/subspace-gateway/src/piece_getter.rs b/crates/subspace-gateway/src/piece_getter.rs index 3d459e1871..47fa6337d2 100644 --- a/crates/subspace-gateway/src/piece_getter.rs +++ b/crates/subspace-gateway/src/piece_getter.rs @@ -2,17 +2,19 @@ use async_trait::async_trait; use futures::stream::StreamExt; +use futures::{FutureExt, Stream}; use std::fmt; -use std::ops::{Deref, DerefMut}; +use std::ops::Deref; +use std::sync::Arc; use subspace_core_primitives::pieces::{Piece, PieceIndex}; -use subspace_data_retrieval::piece_getter::{BoxError, ObjectPieceGetter}; +use subspace_data_retrieval::piece_getter::ObjectPieceGetter; use subspace_networking::utils::piece_provider::{PieceProvider, PieceValidator}; /// The maximum number of peer-to-peer walking rounds for L1 archival storage. const MAX_RANDOM_WALK_ROUNDS: usize = 15; /// Wrapper type for PieceProvider, so it can implement ObjectPieceGetter. -pub struct DsnPieceGetter(pub PieceProvider); +pub struct DsnPieceGetter(pub Arc>); impl fmt::Debug for DsnPieceGetter where @@ -25,35 +27,35 @@ where } } -impl Deref for DsnPieceGetter +impl Clone for DsnPieceGetter where PV: PieceValidator, { - type Target = PieceProvider; - - fn deref(&self) -> &Self::Target { - &self.0 + fn clone(&self) -> Self { + Self(self.0.clone()) } } -impl DerefMut for DsnPieceGetter +impl Deref for DsnPieceGetter where PV: PieceValidator, { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 + type Target = PieceProvider; + + fn deref(&self) -> &Self::Target { + &self.0 } } // TODO: -// - change ObjectPieceGetter trait to take a list of piece indexes +// - reconstruct segment if piece is missing // - move this piece getter impl into a new library part of this crate #[async_trait] impl ObjectPieceGetter for DsnPieceGetter where PV: PieceValidator, { - async fn get_piece(&self, piece_index: PieceIndex) -> Result, BoxError> { + async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result> { if let Some((got_piece_index, maybe_piece)) = self.get_from_cache([piece_index]).await.next().await { @@ -68,6 +70,38 @@ where .get_piece_from_archival_storage(piece_index, MAX_RANDOM_WALK_ROUNDS) .await) } + + async fn get_pieces<'a, PieceIndices>( + &'a self, + piece_indices: PieceIndices, + ) -> anyhow::Result< + Box>)> + Send + Unpin + 'a>, + > + where + PieceIndices: IntoIterator + Send + 'a, + { + let piece_getter = (*self).clone(); + + let stream = self + .get_from_cache(piece_indices) + .await + .then(move |(index, maybe_piece)| { + let piece_getter = piece_getter.clone(); + let fut = async move { + if let Some(piece) = maybe_piece { + return (index, Ok(Some(piece))); + } + + piece_getter + .get_piece_from_archival_storage(index, MAX_RANDOM_WALK_ROUNDS) + .map(|piece| (index, Ok(piece))) + .await + }; + Box::pin(fut) + }); + + Ok(Box::new(stream)) + } } impl DsnPieceGetter @@ -76,6 +110,6 @@ where { /// Creates new DSN piece getter. pub fn new(piece_provider: PieceProvider) -> Self { - Self(piece_provider) + Self(Arc::new(piece_provider)) } } diff --git a/shared/subspace-data-retrieval/Cargo.toml b/shared/subspace-data-retrieval/Cargo.toml index c7f6fe93d8..ff614eb07f 100644 --- a/shared/subspace-data-retrieval/Cargo.toml +++ b/shared/subspace-data-retrieval/Cargo.toml @@ -12,6 +12,7 @@ include = [ ] [dependencies] +anyhow = "1.0.89" async-lock = "3.4.0" async-trait = "0.1.83" futures = "0.3.31" diff --git a/shared/subspace-data-retrieval/src/object_fetcher.rs b/shared/subspace-data-retrieval/src/object_fetcher.rs index 59436181ee..7335733fc4 100644 --- a/shared/subspace-data-retrieval/src/object_fetcher.rs +++ b/shared/subspace-data-retrieval/src/object_fetcher.rs @@ -16,7 +16,7 @@ //! Fetching objects stored in the archived history of Subspace Network. use crate::piece_fetcher::download_pieces; -use crate::piece_getter::{BoxError, ObjectPieceGetter}; +use crate::piece_getter::ObjectPieceGetter; use crate::segment_fetcher::{download_segment, SegmentGetterError}; use parity_scale_codec::{Compact, CompactLen, Decode, Encode}; use std::sync::Arc; @@ -123,7 +123,7 @@ pub enum Error { #[error("Getting piece caused an error: {source:?}")] PieceGetterError { #[from] - source: BoxError, + source: anyhow::Error, }, /// Piece getter couldn't find the piece @@ -132,9 +132,12 @@ pub enum Error { } /// Object fetcher for the Subspace DSN. -pub struct ObjectFetcher { +pub struct ObjectFetcher +where + PG: ObjectPieceGetter + Send + Sync, +{ /// The piece getter used to fetch pieces. - piece_getter: Arc, + piece_getter: Arc, /// The erasure coding configuration of those pieces. erasure_coding: ErasureCoding, @@ -143,21 +146,21 @@ pub struct ObjectFetcher { max_object_len: usize, } -impl ObjectFetcher { +impl ObjectFetcher +where + PG: ObjectPieceGetter + Send + Sync, +{ /// Create a new object fetcher with the given configuration. /// /// `max_object_len` is the amount of data bytes we'll read for a single object before giving /// up and returning an error, or `None` for no limit (`usize::MAX`). - pub fn new( - piece_getter: PG, + pub fn new( + piece_getter: Arc, erasure_coding: ErasureCoding, max_object_len: Option, - ) -> Self - where - PG: ObjectPieceGetter + Send + Sync + 'static, - { + ) -> Self { Self { - piece_getter: Arc::new(piece_getter), + piece_getter, erasure_coding, max_object_len: max_object_len.unwrap_or(usize::MAX), } diff --git a/shared/subspace-data-retrieval/src/piece_fetcher.rs b/shared/subspace-data-retrieval/src/piece_fetcher.rs index 14f6e823ae..deb35162a3 100644 --- a/shared/subspace-data-retrieval/src/piece_fetcher.rs +++ b/shared/subspace-data-retrieval/src/piece_fetcher.rs @@ -16,9 +16,8 @@ //! Fetching pieces of the archived history of Subspace Network. use crate::object_fetcher::Error; -use crate::piece_getter::{BoxError, ObjectPieceGetter}; -use futures::stream::FuturesOrdered; -use futures::TryStreamExt; +use crate::piece_getter::ObjectPieceGetter; +use futures::StreamExt; use subspace_core_primitives::pieces::{Piece, PieceIndex}; use tracing::{debug, trace}; @@ -31,7 +30,7 @@ use tracing::{debug, trace}; pub async fn download_pieces( piece_indexes: &[PieceIndex], piece_getter: &PG, -) -> Result, BoxError> +) -> anyhow::Result> where PG: ObjectPieceGetter, { @@ -42,40 +41,25 @@ where ); // TODO: - // - consider using a semaphore to limit the number of concurrent requests, like - // download_segment_pieces() - // - if we're close to the number of pieces in a segment, use segment downloading and piece + // - if we're close to the number of pieces in a segment, or we can't find a piece, use segment downloading and piece // reconstruction instead // Currently most objects are limited to 4 pieces, so this isn't needed yet. - let received_pieces = piece_indexes - .iter() - .map(|piece_index| async move { - match piece_getter.get_piece(*piece_index).await { - Ok(Some(piece)) => { - trace!(?piece_index, "Piece request succeeded",); - Ok(piece) - } - Ok(None) => { - trace!(?piece_index, "Piece not found"); - Err(Error::PieceNotFound { - piece_index: *piece_index, - } - .into()) - } - Err(error) => { - trace!( - %error, - ?piece_index, - "Piece request caused an error", - ); - Err(error) - } - } - }) - .collect::>(); + let mut received_pieces = piece_getter + .get_pieces(piece_indexes.iter().copied()) + .await?; - // We want exact pieces, so any errors are fatal. - let received_pieces: Vec = received_pieces.try_collect().await?; + let mut pieces = Vec::new(); + pieces.resize(piece_indexes.len(), Piece::default()); + + while let Some((piece_index, maybe_piece)) = received_pieces.next().await { + // We want exact pieces, so any errors are fatal. + let piece = maybe_piece?.ok_or(Error::PieceNotFound { piece_index })?; + let index_position = piece_indexes + .iter() + .position(|i| *i == piece_index) + .expect("get_pieces only returns indexes it was supplied; qed"); + pieces[index_position] = piece; + } trace!( count = piece_indexes.len(), @@ -83,5 +67,5 @@ where "Successfully retrieved exact pieces" ); - Ok(received_pieces) + Ok(pieces) } diff --git a/shared/subspace-data-retrieval/src/piece_getter.rs b/shared/subspace-data-retrieval/src/piece_getter.rs index 70f470de70..c2d58982ab 100644 --- a/shared/subspace-data-retrieval/src/piece_getter.rs +++ b/shared/subspace-data-retrieval/src/piece_getter.rs @@ -16,23 +16,34 @@ //! Getting object pieces from the Subspace Distributed Storage Network, or various caches. use async_trait::async_trait; +use futures::{stream, Stream, StreamExt}; use std::fmt; +use std::future::Future; use std::sync::Arc; use subspace_archiving::archiver::NewArchivedSegment; use subspace_core_primitives::pieces::{Piece, PieceIndex}; -/// A type-erased error -pub type BoxError = Box; - /// Trait representing a way to get pieces from the DSN for object reconstruction -// TODO: make ObjectPieceGetter impls retry before failing, if that is useful #[async_trait] pub trait ObjectPieceGetter: fmt::Debug { /// Get piece by index. /// /// Returns `Ok(None)` if the piece is not found. /// Returns `Err(_)` if trying to get the piece caused an error. - async fn get_piece(&self, piece_index: PieceIndex) -> Result, BoxError>; + async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result>; + + /// Get pieces with provided indices. + /// + /// The number of elements in the returned stream is the same as the number of unique + /// `piece_indices`. + async fn get_pieces<'a, PieceIndices>( + &'a self, + piece_indices: PieceIndices, + ) -> anyhow::Result< + Box>)> + Send + Unpin + 'a>, + > + where + PieceIndices: IntoIterator + Send + 'a; } #[async_trait] @@ -40,15 +51,27 @@ impl ObjectPieceGetter for Arc where T: ObjectPieceGetter + Send + Sync + ?Sized, { - async fn get_piece(&self, piece_index: PieceIndex) -> Result, BoxError> { + async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result> { self.as_ref().get_piece(piece_index).await } + + async fn get_pieces<'a, PieceIndices>( + &'a self, + piece_indices: PieceIndices, + ) -> anyhow::Result< + Box>)> + Send + Unpin + 'a>, + > + where + PieceIndices: IntoIterator + Send + 'a, + { + self.as_ref().get_pieces(piece_indices).await + } } // Convenience methods, mainly used in testing #[async_trait] impl ObjectPieceGetter for NewArchivedSegment { - async fn get_piece(&self, piece_index: PieceIndex) -> Result, BoxError> { + async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result> { if piece_index.segment_index() == self.segment_header.segment_index() { return Ok(Some( self.pieces @@ -60,15 +83,66 @@ impl ObjectPieceGetter for NewArchivedSegment { Ok(None) } + + async fn get_pieces<'a, PieceIndices>( + &'a self, + piece_indices: PieceIndices, + ) -> anyhow::Result< + Box>)> + Send + Unpin + 'a>, + > + where + PieceIndices: IntoIterator + Send + 'a, + { + get_pieces_individually(|piece_index| self.get_piece(piece_index), piece_indices) + } } #[async_trait] impl ObjectPieceGetter for (PieceIndex, Piece) { - async fn get_piece(&self, piece_index: PieceIndex) -> Result, BoxError> { + async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result> { if self.0 == piece_index { return Ok(Some(self.1.clone())); } Ok(None) } + + async fn get_pieces<'a, PieceIndices>( + &'a self, + piece_indices: PieceIndices, + ) -> anyhow::Result< + Box>)> + Send + Unpin + 'a>, + > + where + PieceIndices: IntoIterator + Send + 'a, + { + get_pieces_individually(|piece_index| self.get_piece(piece_index), piece_indices) + } +} + +/// A default implementation which gets each piece individually, using the `get_piece` async +/// function. +/// +/// This is mainly used for testing, most production implementations can fetch multiple pieces more +/// efficiently. +#[expect(clippy::type_complexity, reason = "type matches trait signature")] +pub fn get_pieces_individually<'a, PieceIndices, Func, Fut>( + // TODO: replace with AsyncFn(PieceIndex) -> anyhow::Result> once it stabilises + // https://github.com/rust-lang/rust/issues/62290 + get_piece: Func, + piece_indices: PieceIndices, +) -> anyhow::Result< + Box>)> + Send + Unpin + 'a>, +> +where + PieceIndices: IntoIterator + Send + 'a, + Func: Fn(PieceIndex) -> Fut + Clone + Send + 'a, + Fut: Future>> + Send + Unpin + 'a, +{ + Ok(Box::new(Box::pin(stream::iter(piece_indices).then( + move |piece_index| { + let get_piece = get_piece.clone(); + async move { (piece_index, get_piece(piece_index).await) } + }, + )))) }