Skip to content

Commit

Permalink
Merge pull request #3259 from autonomys/improve-piece-downloading
Browse files Browse the repository at this point in the history
Improve piece downloading
  • Loading branch information
nazar-pc authored Nov 26, 2024
2 parents f4b5a3e + 34e728d commit 49d9638
Show file tree
Hide file tree
Showing 23 changed files with 507 additions and 507 deletions.
43 changes: 15 additions & 28 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::commands::cluster::farmer::FARMER_IDENTIFICATION_BROADCAST_INTERVAL;
use crate::commands::shared::derive_libp2p_keypair;
use crate::commands::shared::network::{configure_network, NetworkArgs};
use anyhow::anyhow;
use async_lock::RwLock as AsyncRwLock;
use async_lock::{RwLock as AsyncRwLock, Semaphore};
use backoff::ExponentialBackoff;
use clap::{Parser, ValueHint};
use futures::stream::FuturesUnordered;
Expand Down Expand Up @@ -38,6 +38,8 @@ const PIECE_GETTER_MAX_RETRIES: u16 = 7;
const GET_PIECE_INITIAL_INTERVAL: Duration = Duration::from_secs(5);
/// Defines max duration between get_piece calls.
const GET_PIECE_MAX_INTERVAL: Duration = Duration::from_secs(40);
/// Multiplier on top of outgoing connections number for piece downloading purposes
const PIECE_PROVIDER_MULTIPLIER: usize = 10;

/// Arguments for controller
#[derive(Debug, Parser)]
Expand Down Expand Up @@ -137,6 +139,7 @@ pub(super) async fn controller(
.await
.map_err(|error| anyhow!("Failed to create caching proxy node client: {error}"))?;

let out_connections = network_args.out_connections;
let (node, mut node_runner) = {
if network_args.bootstrap_nodes.is_empty() {
network_args
Expand All @@ -161,6 +164,7 @@ pub(super) async fn controller(
let piece_provider = PieceProvider::new(
node.clone(),
SegmentCommitmentPieceValidator::new(node.clone(), node_client.clone(), kzg.clone()),
Semaphore::new(out_connections as usize * PIECE_PROVIDER_MULTIPLIER),
);

let piece_getter = FarmerPieceGetter::new(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use crate::commands::shared::DiskFarm;
use anyhow::anyhow;
use async_lock::Mutex as AsyncMutex;
use async_lock::{Mutex as AsyncMutex, Semaphore};
use backoff::ExponentialBackoff;
use bytesize::ByteSize;
use clap::Parser;
Expand Down Expand Up @@ -36,7 +36,7 @@ use subspace_farmer::utils::{
use subspace_farmer_components::reading::ReadSectorRecordChunksMode;
use subspace_kzg::Kzg;
use subspace_proof_of_space::Table;
use tokio::sync::{Barrier, Semaphore};
use tokio::sync::Barrier;
use tracing::{error, info, info_span, warn, Instrument};

const FARM_ERROR_PRINT_INTERVAL: Duration = Duration::from_secs(30);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::commands::shared::PlottingThreadPriority;
use anyhow::anyhow;
use async_lock::Mutex as AsyncMutex;
use async_lock::{Mutex as AsyncMutex, Semaphore};
use clap::Parser;
use prometheus_client::registry::Registry;
use std::future::Future;
Expand Down Expand Up @@ -28,7 +28,6 @@ use subspace_farmer::utils::{
use subspace_farmer_components::PieceGetter;
use subspace_kzg::Kzg;
use subspace_proof_of_space::Table;
use tokio::sync::Semaphore;
use tracing::info;

const PLOTTING_RETRY_INTERVAL: Duration = Duration::from_secs(5);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::commands::shared::network::{configure_network, NetworkArgs};
use crate::commands::shared::{derive_libp2p_keypair, DiskFarm, PlottingThreadPriority};
use crate::utils::shutdown_signal;
use anyhow::anyhow;
use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock, Semaphore};
use backoff::ExponentialBackoff;
use bytesize::ByteSize;
use clap::{Parser, ValueHint};
Expand Down Expand Up @@ -54,7 +54,7 @@ use subspace_kzg::Kzg;
use subspace_metrics::{start_prometheus_metrics_server, RegistryAdapter};
use subspace_networking::utils::piece_provider::PieceProvider;
use subspace_proof_of_space::Table;
use tokio::sync::{Barrier, Semaphore};
use tokio::sync::Barrier;
use tracing::{error, info, info_span, warn, Instrument};

/// Get piece retry attempts number.
Expand All @@ -68,6 +68,8 @@ const GET_PIECE_MAX_INTERVAL: Duration = Duration::from_secs(40);
const MAX_SPACE_PLEDGED_FOR_PLOT_CACHE_ON_WINDOWS: u64 = 7 * 1024 * 1024 * 1024 * 1024;
const FARM_ERROR_PRINT_INTERVAL: Duration = Duration::from_secs(30);
const PLOTTING_RETRY_INTERVAL: Duration = Duration::from_secs(5);
/// Multiplier on top of outgoing connections number for piece downloading purposes
const PIECE_PROVIDER_MULTIPLIER: usize = 10;

type FarmIndex = u8;

Expand Down Expand Up @@ -431,6 +433,7 @@ where
.await
.map_err(|error| anyhow!("Failed to create caching proxy node client: {error}"))?;

let out_connections = network_args.out_connections;
let (node, mut node_runner) = {
if network_args.bootstrap_nodes.is_empty() {
network_args
Expand Down Expand Up @@ -460,6 +463,7 @@ where
let piece_provider = PieceProvider::new(
node.clone(),
SegmentCommitmentPieceValidator::new(node.clone(), node_client.clone(), kzg.clone()),
Semaphore::new(out_connections as usize * PIECE_PROVIDER_MULTIPLIER),
);

let piece_getter = FarmerPieceGetter::new(
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-farmer/src/farmer_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ where
downloading_pieces_stream
// This allows to schedule new batch while previous batches partially completed, but
// avoids excessive memory usage like when all futures are created upfront
.buffer_unordered(SYNC_CONCURRENT_BATCHES * 2)
.buffer_unordered(SYNC_CONCURRENT_BATCHES * 10)
// Simply drain everything
.for_each(|()| async {})
.await;
Expand Down
40 changes: 6 additions & 34 deletions crates/subspace-farmer/src/plotter/cpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::plotter::cpu::metrics::CpuPlotterMetrics;
use crate::plotter::{Plotter, SectorPlottingProgress};
use crate::thread_pool_manager::PlottingThreadPoolManager;
use crate::utils::AsyncJoinOnDrop;
use async_lock::Mutex as AsyncMutex;
use async_lock::{Mutex as AsyncMutex, Semaphore, SemaphoreGuardArc};
use async_trait::async_trait;
use bytes::Bytes;
use event_listener_primitives::{Bag, HandlerId};
Expand Down Expand Up @@ -35,7 +35,6 @@ use subspace_farmer_components::plotting::{
use subspace_farmer_components::{FarmerProtocolInfo, PieceGetter};
use subspace_kzg::Kzg;
use subspace_proof_of_space::Table;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tokio::task::yield_now;
use tracing::{warn, Instrument};

Expand Down Expand Up @@ -87,7 +86,7 @@ where
PosTable: Table,
{
async fn has_free_capacity(&self) -> Result<bool, String> {
Ok(self.downloading_semaphore.available_permits() > 0)
Ok(self.downloading_semaphore.try_acquire().is_some())
}

async fn plot_sector(
Expand All @@ -97,39 +96,13 @@ where
farmer_protocol_info: FarmerProtocolInfo,
pieces_in_sector: u16,
replotting: bool,
mut progress_sender: mpsc::Sender<SectorPlottingProgress>,
progress_sender: mpsc::Sender<SectorPlottingProgress>,
) {
let start = Instant::now();

// Done outside the future below as a backpressure, ensuring that it is not possible to
// schedule unbounded number of plotting tasks
let downloading_permit = match Arc::clone(&self.downloading_semaphore)
.acquire_owned()
.await
{
Ok(downloading_permit) => downloading_permit,
Err(error) => {
warn!(%error, "Failed to acquire downloading permit");

let progress_updater = ProgressUpdater {
public_key,
sector_index,
handlers: Arc::clone(&self.handlers),
metrics: self.metrics.clone(),
};

progress_updater
.update_progress_and_events(
&mut progress_sender,
SectorPlottingProgress::Error {
error: format!("Failed to acquire downloading permit: {error}"),
},
)
.await;

return;
}
};
let downloading_permit = self.downloading_semaphore.acquire_arc().await;

self.plot_sector_internal(
start,
Expand All @@ -155,8 +128,7 @@ where
) -> bool {
let start = Instant::now();

let Ok(downloading_permit) = Arc::clone(&self.downloading_semaphore).try_acquire_owned()
else {
let Some(downloading_permit) = self.downloading_semaphore.try_acquire_arc() else {
return false;
};

Expand Down Expand Up @@ -259,7 +231,7 @@ where
async fn plot_sector_internal<PS>(
&self,
start: Instant,
downloading_permit: OwnedSemaphorePermit,
downloading_permit: SemaphoreGuardArc,
public_key: PublicKey,
sector_index: SectorIndex,
farmer_protocol_info: FarmerProtocolInfo,
Expand Down
Loading

0 comments on commit 49d9638

Please sign in to comment.