Skip to content

Commit

Permalink
Merge pull request #3158 from autonomys/obj-piece-list
Browse files Browse the repository at this point in the history
Fetch multiple pieces during object reconstruction
  • Loading branch information
teor2345 authored Nov 27, 2024
2 parents 49d9638 + 0755013 commit 239d791
Show file tree
Hide file tree
Showing 9 changed files with 197 additions and 84 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 21 additions & 9 deletions crates/subspace-gateway-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::ops::{Deref, DerefMut};
use subspace_core_primitives::hashes::{blake3_hash, Blake3Hash};
use subspace_core_primitives::objects::GlobalObjectMapping;
use subspace_data_retrieval::object_fetcher::{self, ObjectFetcher};
use subspace_data_retrieval::piece_getter::ObjectPieceGetter;
use tracing::debug;

const SUBSPACE_ERROR: i32 = 9000;
Expand Down Expand Up @@ -99,33 +100,44 @@ pub trait SubspaceGatewayRpcApi {
#[method(name = "subspace_fetchObject")]
async fn fetch_object(&self, mappings: GlobalObjectMapping) -> Result<Vec<HexData>, Error>;
}

/// Subspace Gateway RPC configuration
pub struct SubspaceGatewayRpcConfig {
pub struct SubspaceGatewayRpcConfig<PG>
where
PG: ObjectPieceGetter + Send + Sync + 'static,
{
/// DSN object fetcher instance.
pub object_fetcher: ObjectFetcher,
pub object_fetcher: ObjectFetcher<PG>,
}

/// Implements the [`SubspaceGatewayRpcApiServer`] trait for interacting with the Subspace Gateway.
pub struct SubspaceGatewayRpc {
pub struct SubspaceGatewayRpc<PG>
where
PG: ObjectPieceGetter + Send + Sync + 'static,
{
/// DSN object fetcher instance.
object_fetcher: ObjectFetcher,
object_fetcher: ObjectFetcher<PG>,
}

/// [`SubspaceGatewayRpc`] is used to fetch objects from the DSN.
impl SubspaceGatewayRpc {
impl<PG> SubspaceGatewayRpc<PG>
where
PG: ObjectPieceGetter + Send + Sync + 'static,
{
/// Creates a new instance of the `SubspaceGatewayRpc` handler.
pub fn new(config: SubspaceGatewayRpcConfig) -> Self {
pub fn new(config: SubspaceGatewayRpcConfig<PG>) -> Self {
Self {
object_fetcher: config.object_fetcher,
}
}
}

#[async_trait]
impl SubspaceGatewayRpcApiServer for SubspaceGatewayRpc {
impl<PG> SubspaceGatewayRpcApiServer for SubspaceGatewayRpc<PG>
where
PG: ObjectPieceGetter + Send + Sync + 'static,
{
async fn fetch_object(&self, mappings: GlobalObjectMapping) -> Result<Vec<HexData>, Error> {
// TODO: deny unsafe RPC calls

let count = mappings.objects().len();
if count > MAX_OBJECTS_PER_REQUEST {
debug!(%count, %MAX_OBJECTS_PER_REQUEST, "Too many mappings in request");
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-gateway/src/commands/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ pub async fn run(run_options: RunOptions) -> anyhow::Result<()> {
Semaphore::new(out_connections as usize * PIECE_PROVIDER_MULTIPLIER),
);
let piece_getter = DsnPieceGetter::new(piece_provider);
let object_fetcher = ObjectFetcher::new(piece_getter, erasure_coding, Some(max_size));
let object_fetcher = ObjectFetcher::new(piece_getter.into(), erasure_coding, Some(max_size));

let rpc_api = SubspaceGatewayRpc::new(SubspaceGatewayRpcConfig { object_fetcher });
let rpc_handle = launch_rpc_server(rpc_api, rpc_options).await?;
Expand Down
12 changes: 8 additions & 4 deletions crates/subspace-gateway/src/commands/run/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use clap::Parser;
use jsonrpsee::server::{ServerBuilder, ServerHandle};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use subspace_data_retrieval::piece_getter::ObjectPieceGetter;
use subspace_gateway_rpc::{SubspaceGatewayRpc, SubspaceGatewayRpcApiServer};
use tracing::info;

Expand All @@ -27,10 +28,13 @@ pub(crate) struct RpcOptions<const DEFAULT_PORT: u16> {
// - add an argument for a custom tokio runtime
// - move this RPC code into a new library part of this crate
// - make a RPC config that is independent of clap
pub async fn launch_rpc_server<const P: u16>(
rpc_api: SubspaceGatewayRpc,
rpc_options: RpcOptions<P>,
) -> anyhow::Result<ServerHandle> {
pub async fn launch_rpc_server<PG, const DEFAULT_PORT: u16>(
rpc_api: SubspaceGatewayRpc<PG>,
rpc_options: RpcOptions<DEFAULT_PORT>,
) -> anyhow::Result<ServerHandle>
where
PG: ObjectPieceGetter + Send + Sync + 'static,
{
let server = ServerBuilder::default()
.build(rpc_options.rpc_listen_on)
.await?;
Expand Down
62 changes: 48 additions & 14 deletions crates/subspace-gateway/src/piece_getter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,19 @@
use async_trait::async_trait;
use futures::stream::StreamExt;
use futures::{FutureExt, Stream};
use std::fmt;
use std::ops::{Deref, DerefMut};
use std::ops::Deref;
use std::sync::Arc;
use subspace_core_primitives::pieces::{Piece, PieceIndex};
use subspace_data_retrieval::piece_getter::{BoxError, ObjectPieceGetter};
use subspace_data_retrieval::piece_getter::ObjectPieceGetter;
use subspace_networking::utils::piece_provider::{PieceProvider, PieceValidator};

/// The maximum number of peer-to-peer walking rounds for L1 archival storage.
const MAX_RANDOM_WALK_ROUNDS: usize = 15;

/// Wrapper type for PieceProvider, so it can implement ObjectPieceGetter.
pub struct DsnPieceGetter<PV: PieceValidator>(pub PieceProvider<PV>);
pub struct DsnPieceGetter<PV: PieceValidator>(pub Arc<PieceProvider<PV>>);

impl<PV> fmt::Debug for DsnPieceGetter<PV>
where
Expand All @@ -25,35 +27,35 @@ where
}
}

impl<PV> Deref for DsnPieceGetter<PV>
impl<PV> Clone for DsnPieceGetter<PV>
where
PV: PieceValidator,
{
type Target = PieceProvider<PV>;

fn deref(&self) -> &Self::Target {
&self.0
fn clone(&self) -> Self {
Self(self.0.clone())
}
}

impl<PV> DerefMut for DsnPieceGetter<PV>
impl<PV> Deref for DsnPieceGetter<PV>
where
PV: PieceValidator,
{
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
type Target = PieceProvider<PV>;

fn deref(&self) -> &Self::Target {
&self.0
}
}

// TODO:
// - change ObjectPieceGetter trait to take a list of piece indexes
// - reconstruct segment if piece is missing
// - move this piece getter impl into a new library part of this crate
#[async_trait]
impl<PV> ObjectPieceGetter for DsnPieceGetter<PV>
where
PV: PieceValidator,
{
async fn get_piece(&self, piece_index: PieceIndex) -> Result<Option<Piece>, BoxError> {
async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
if let Some((got_piece_index, maybe_piece)) =
self.get_from_cache([piece_index]).await.next().await
{
Expand All @@ -68,6 +70,38 @@ where
.get_piece_from_archival_storage(piece_index, MAX_RANDOM_WALK_ROUNDS)
.await)
}

async fn get_pieces<'a, PieceIndices>(
&'a self,
piece_indices: PieceIndices,
) -> anyhow::Result<
Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
>
where
PieceIndices: IntoIterator<Item = PieceIndex, IntoIter: Send> + Send + 'a,
{
let piece_getter = (*self).clone();

let stream = self
.get_from_cache(piece_indices)
.await
.then(move |(index, maybe_piece)| {
let piece_getter = piece_getter.clone();
let fut = async move {
if let Some(piece) = maybe_piece {
return (index, Ok(Some(piece)));
}

piece_getter
.get_piece_from_archival_storage(index, MAX_RANDOM_WALK_ROUNDS)
.map(|piece| (index, Ok(piece)))
.await
};
Box::pin(fut)
});

Ok(Box::new(stream))
}
}

impl<PV> DsnPieceGetter<PV>
Expand All @@ -76,6 +110,6 @@ where
{
/// Creates new DSN piece getter.
pub fn new(piece_provider: PieceProvider<PV>) -> Self {
Self(piece_provider)
Self(Arc::new(piece_provider))
}
}
1 change: 1 addition & 0 deletions shared/subspace-data-retrieval/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ include = [
]

[dependencies]
anyhow = "1.0.89"
async-lock = "3.4.0"
async-trait = "0.1.83"
futures = "0.3.31"
Expand Down
27 changes: 15 additions & 12 deletions shared/subspace-data-retrieval/src/object_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
//! Fetching objects stored in the archived history of Subspace Network.
use crate::piece_fetcher::download_pieces;
use crate::piece_getter::{BoxError, ObjectPieceGetter};
use crate::piece_getter::ObjectPieceGetter;
use crate::segment_fetcher::{download_segment, SegmentGetterError};
use parity_scale_codec::{Compact, CompactLen, Decode, Encode};
use std::sync::Arc;
Expand Down Expand Up @@ -123,7 +123,7 @@ pub enum Error {
#[error("Getting piece caused an error: {source:?}")]
PieceGetterError {
#[from]
source: BoxError,
source: anyhow::Error,
},

/// Piece getter couldn't find the piece
Expand All @@ -132,9 +132,12 @@ pub enum Error {
}

/// Object fetcher for the Subspace DSN.
pub struct ObjectFetcher {
pub struct ObjectFetcher<PG>
where
PG: ObjectPieceGetter + Send + Sync,
{
/// The piece getter used to fetch pieces.
piece_getter: Arc<dyn ObjectPieceGetter + Send + Sync + 'static>,
piece_getter: Arc<PG>,

/// The erasure coding configuration of those pieces.
erasure_coding: ErasureCoding,
Expand All @@ -143,21 +146,21 @@ pub struct ObjectFetcher {
max_object_len: usize,
}

impl ObjectFetcher {
impl<PG> ObjectFetcher<PG>
where
PG: ObjectPieceGetter + Send + Sync,
{
/// Create a new object fetcher with the given configuration.
///
/// `max_object_len` is the amount of data bytes we'll read for a single object before giving
/// up and returning an error, or `None` for no limit (`usize::MAX`).
pub fn new<PG>(
piece_getter: PG,
pub fn new(
piece_getter: Arc<PG>,
erasure_coding: ErasureCoding,
max_object_len: Option<usize>,
) -> Self
where
PG: ObjectPieceGetter + Send + Sync + 'static,
{
) -> Self {
Self {
piece_getter: Arc::new(piece_getter),
piece_getter,
erasure_coding,
max_object_len: max_object_len.unwrap_or(usize::MAX),
}
Expand Down
56 changes: 20 additions & 36 deletions shared/subspace-data-retrieval/src/piece_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@
//! Fetching pieces of the archived history of Subspace Network.
use crate::object_fetcher::Error;
use crate::piece_getter::{BoxError, ObjectPieceGetter};
use futures::stream::FuturesOrdered;
use futures::TryStreamExt;
use crate::piece_getter::ObjectPieceGetter;
use futures::StreamExt;
use subspace_core_primitives::pieces::{Piece, PieceIndex};
use tracing::{debug, trace};

Expand All @@ -31,7 +30,7 @@ use tracing::{debug, trace};
pub async fn download_pieces<PG>(
piece_indexes: &[PieceIndex],
piece_getter: &PG,
) -> Result<Vec<Piece>, BoxError>
) -> anyhow::Result<Vec<Piece>>
where
PG: ObjectPieceGetter,
{
Expand All @@ -42,46 +41,31 @@ where
);

// TODO:
// - consider using a semaphore to limit the number of concurrent requests, like
// download_segment_pieces()
// - if we're close to the number of pieces in a segment, use segment downloading and piece
// - if we're close to the number of pieces in a segment, or we can't find a piece, use segment downloading and piece
// reconstruction instead
// Currently most objects are limited to 4 pieces, so this isn't needed yet.
let received_pieces = piece_indexes
.iter()
.map(|piece_index| async move {
match piece_getter.get_piece(*piece_index).await {
Ok(Some(piece)) => {
trace!(?piece_index, "Piece request succeeded",);
Ok(piece)
}
Ok(None) => {
trace!(?piece_index, "Piece not found");
Err(Error::PieceNotFound {
piece_index: *piece_index,
}
.into())
}
Err(error) => {
trace!(
%error,
?piece_index,
"Piece request caused an error",
);
Err(error)
}
}
})
.collect::<FuturesOrdered<_>>();
let mut received_pieces = piece_getter
.get_pieces(piece_indexes.iter().copied())
.await?;

// We want exact pieces, so any errors are fatal.
let received_pieces: Vec<Piece> = received_pieces.try_collect().await?;
let mut pieces = Vec::new();
pieces.resize(piece_indexes.len(), Piece::default());

while let Some((piece_index, maybe_piece)) = received_pieces.next().await {
// We want exact pieces, so any errors are fatal.
let piece = maybe_piece?.ok_or(Error::PieceNotFound { piece_index })?;
let index_position = piece_indexes
.iter()
.position(|i| *i == piece_index)
.expect("get_pieces only returns indexes it was supplied; qed");
pieces[index_position] = piece;
}

trace!(
count = piece_indexes.len(),
?piece_indexes,
"Successfully retrieved exact pieces"
);

Ok(received_pieces)
Ok(pieces)
}
Loading

0 comments on commit 239d791

Please sign in to comment.