Skip to content

Commit

Permalink
Merge pull request #969 from subspace/improve-piece-getting-mechanism
Browse files Browse the repository at this point in the history
Improve piece getting mechanism on farmer
  • Loading branch information
nazar-pc authored Nov 30, 2022
2 parents 98e87df + d218e0d commit 25a7318
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 18 deletions.
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/subspace-farmer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ include = [
[dependencies]
anyhow = "1.0.66"
async-trait = "0.1.58"
backoff = { version = "0.4.0", features = ["futures", "tokio"] }
base58 = "0.2.0"
blake2 = "0.10.5"
bytesize = "1.1.0"
Expand Down
6 changes: 4 additions & 2 deletions crates/subspace-farmer/src/single_disk_plot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -663,8 +663,8 @@ impl SingleDiskPlot {
// Some sectors may already be plotted, skip them
metadata_header.lock().sector_count as usize,
)
.map(|(sector_index, (sector, metadata))| {
(sector_index as u64 + first_sector_index, sector, metadata)
.map(|(sector_offset, (sector, metadata))| {
(sector_offset as u64 + first_sector_index, sector, metadata)
});

// TODO: Concurrency
Expand All @@ -677,6 +677,8 @@ impl SingleDiskPlot {
return;
}

debug!(%sector_index, "Plotting sector");

let farmer_app_info = handle
.block_on(rpc_client.farmer_app_info())
.map_err(|error| PlottingError::FailedToGetFarmerInfo { error })?;
Expand Down
72 changes: 56 additions & 16 deletions crates/subspace-farmer/src/single_disk_plot/piece_receiver.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,31 @@
use async_trait::async_trait;
use backoff::future::retry;
use backoff::ExponentialBackoff;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use parity_scale_codec::Decode;
use std::collections::BTreeSet;
use std::error::Error;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use subspace_core_primitives::{Piece, PieceIndex, PieceIndexHash};
use subspace_farmer_components::plotting::PieceReceiver;
use subspace_networking::libp2p::PeerId;
use subspace_networking::utils::multihash::MultihashCode;
use subspace_networking::{Node, PieceByHashRequest, PieceKey, ToMultihash};
use tokio::time::sleep;
use tokio::time::{sleep, timeout};
use tracing::{debug, error, info, trace, warn};

/// Defines a duration between get_piece calls.
const GET_PIECE_WAITING_DURATION_IN_SECS: u64 = 1;
/// Defines initial duration between get_piece calls.
const GET_PIECE_INITIAL_INTERVAL: Duration = Duration::from_secs(1);
/// Defines max duration between get_piece calls.
const GET_PIECE_MAX_INTERVAL: Duration = Duration::from_secs(5);
/// Delay for getting piece from cache before resorting to archival storage
const GET_PIECE_ARCHIVAL_STORAGE_DELAY: Duration = Duration::from_secs(1);
/// Max time allocated for getting piece from DSN before attempt is considered to fail
const GET_PIECE_TIMEOUT: Duration = Duration::from_secs(5);

// Temporary struct serving pieces from different providers using configuration arguments.
pub(crate) struct MultiChannelPieceReceiver<'a> {
Expand Down Expand Up @@ -181,21 +193,49 @@ impl<'a> PieceReceiver for MultiChannelPieceReceiver<'a> {
) -> Result<Option<Piece>, Box<dyn Error + Send + Sync + 'static>> {
trace!(%piece_index, "Piece request.");

// until we get a valid piece
loop {
self.check_cancellation()?;

if let Some(piece) = self.get_piece_from_cache(piece_index).await {
return Ok(Some(piece));
}

if let Some(piece) = self.get_piece_from_archival_storage(piece_index).await {
return Ok(Some(piece));
let backoff = ExponentialBackoff {
initial_interval: GET_PIECE_INITIAL_INTERVAL,
max_interval: GET_PIECE_MAX_INTERVAL,
// Try until we get a valid piece
max_elapsed_time: None,
..ExponentialBackoff::default()
};

retry(backoff, || async {
self.check_cancellation()
.map_err(backoff::Error::Permanent)?;

// Try to pull pieces in two ways, whichever is faster
let mut piece_attempts = [
timeout(
GET_PIECE_TIMEOUT,
Box::pin(self.get_piece_from_cache(piece_index))
as Pin<Box<dyn Future<Output = _> + Send>>,
),
timeout(
GET_PIECE_TIMEOUT,
Box::pin(async {
// Prefer cache if it can return quickly, otherwise fall back to archival storage
sleep(GET_PIECE_ARCHIVAL_STORAGE_DELAY).await;
self.get_piece_from_archival_storage(piece_index).await
}) as Pin<Box<dyn Future<Output = _> + Send>>,
),
]
.into_iter()
.collect::<FuturesUnordered<_>>();

while let Some(maybe_piece) = piece_attempts.next().await {
if let Ok(Some(piece)) = maybe_piece {
return Ok(Some(piece));
}
}

warn!(%piece_index, "Couldn't get a piece from DSN. Starting a new attempt...");
warn!(%piece_index, "Couldn't get a piece from DSN. Retrying...");

sleep(Duration::from_secs(GET_PIECE_WAITING_DURATION_IN_SECS)).await;
}
Err(backoff::Error::transient(
"Couldn't get piece from DSN".into(),
))
})
.await
}
}

0 comments on commit 25a7318

Please sign in to comment.