Skip to content
This repository has been archived by the owner on Feb 15, 2024. It is now read-only.

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
ParthDesai committed Aug 4, 2023
1 parent 5c64435 commit 8a3c0a4
Show file tree
Hide file tree
Showing 14 changed files with 147 additions and 201 deletions.
79 changes: 39 additions & 40 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ sdk-dsn = { path = "dsn" }
sdk-substrate = { path = "substrate" }
sdk-farmer = { path = "farmer" }

subspace-proof-of-space = { git = "https://github.com/subspace/subspace", rev = "561f0e5b2e22e88af18402ca82e784bb9f97c552" }
subspace-proof-of-space = { git = "https://github.com/subspace/subspace", rev = "3640edde1832bd5db94677b542c336c65cef564b" }

# The only triple tested and confirmed as working in `jemallocator` crate is `x86_64-unknown-linux-gnu`
[target.'cfg(all(target_arch = "x86_64", target_vendor = "unknown", target_os = "linux", target_env = "gnu"))'.dev-dependencies]
Expand All @@ -35,7 +35,7 @@ tracing = "0.1"
tracing-futures = "0.2"
tracing-subscriber = "0.3"

subspace-farmer-components = { git = "https://github.com/subspace/subspace", rev = "561f0e5b2e22e88af18402ca82e784bb9f97c552" }
subspace-farmer-components = { git = "https://github.com/subspace/subspace", rev = "3640edde1832bd5db94677b542c336c65cef564b" }

# The list of dependencies below (which can be both direct and indirect dependencies) are crates
# that are suspected to be CPU-intensive, and that are unlikely to require debugging (as some of
Expand Down
12 changes: 5 additions & 7 deletions dsn/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ edition = "2021"

[dependencies]
anyhow = "1"
async-trait = "0.1"
derivative = "2.2.0"
derive_builder = "0.12"
derive_more = "0.99"
Expand All @@ -25,9 +24,8 @@ sc-client-api = { version = "4.0.0-dev", git = "https://github.com/subspace/subs
sp-runtime = { version = "24.0.0", git = "https://github.com/subspace/substrate", rev = "55c157cff49b638a59d81a9f971f0f9a66829c71" }
sp-blockchain = { version = "4.0.0-dev", git = "https://github.com/subspace/substrate", rev = "55c157cff49b638a59d81a9f971f0f9a66829c71" }

subspace-networking = { git = "https://github.com/subspace/subspace", rev = "561f0e5b2e22e88af18402ca82e784bb9f97c552" }
subspace-core-primitives = { git = "https://github.com/subspace/subspace", rev = "561f0e5b2e22e88af18402ca82e784bb9f97c552" }
subspace-farmer = { git = "https://github.com/subspace/subspace", rev = "561f0e5b2e22e88af18402ca82e784bb9f97c552" }
subspace-farmer-components = { git = "https://github.com/subspace/subspace", rev = "561f0e5b2e22e88af18402ca82e784bb9f97c552" }
subspace-service = { git = "https://github.com/subspace/subspace", rev = "561f0e5b2e22e88af18402ca82e784bb9f97c552" }
sc-consensus-subspace = { git = "https://github.com/subspace/subspace", rev = "561f0e5b2e22e88af18402ca82e784bb9f97c552" }
subspace-networking = { git = "https://github.com/subspace/subspace", rev = "3640edde1832bd5db94677b542c336c65cef564b" }
subspace-core-primitives = { git = "https://github.com/subspace/subspace", rev = "3640edde1832bd5db94677b542c336c65cef564b" }
subspace-farmer = { git = "https://github.com/subspace/subspace", rev = "3640edde1832bd5db94677b542c336c65cef564b" }
subspace-service = { git = "https://github.com/subspace/subspace", rev = "3640edde1832bd5db94677b542c336c65cef564b" }
sc-consensus-subspace = { git = "https://github.com/subspace/subspace", rev = "3640edde1832bd5db94677b542c336c65cef564b" }
39 changes: 38 additions & 1 deletion dsn/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,16 @@ use futures::prelude::*;
use sdk_utils::{self, DropCollection, Multiaddr, MultiaddrWithPeerId};
use serde::{Deserialize, Serialize};
use subspace_core_primitives::Piece;
use subspace_farmer::utils::archival_storage_info::ArchivalStorageInfo;
use subspace_farmer::utils::archival_storage_pieces::ArchivalStoragePieces;
use subspace_farmer::utils::readers_and_pieces::ReadersAndPieces;
use subspace_networking::libp2p::kad::ProviderRecord;
use subspace_networking::{PeerInfoProvider, PieceAnnouncementRequestHandler, PieceAnnouncementResponse, PieceByHashRequest, PieceByHashRequestHandler, PieceByHashResponse, ProviderStorage as _, SegmentHeaderBySegmentIndexesRequestHandler, SegmentHeaderRequest, SegmentHeaderResponse, KADEMLIA_PROVIDER_TTL_IN_SECS, PeerInfo};
use subspace_networking::{
PeerInfo, PeerInfoProvider, PieceAnnouncementRequestHandler, PieceAnnouncementResponse,
PieceByHashRequest, PieceByHashRequestHandler, PieceByHashResponse, ProviderStorage as _,
SegmentHeaderBySegmentIndexesRequestHandler, SegmentHeaderRequest, SegmentHeaderResponse,
KADEMLIA_PROVIDER_TTL_IN_SECS,
};
use subspace_service::segment_headers::SegmentHeaderCache;
use subspace_service::Error;

Expand Down Expand Up @@ -200,6 +206,8 @@ pub struct DsnShared<C: sc_client_api::AuxStore + Send + Sync + 'static> {
pub farmer_provider_storage: MaybeProviderStorage<FarmerProviderStorage>,
/// Farmer archival storage pieces
pub farmer_archival_storage_pieces: ArchivalStoragePieces,
/// Farmer archival storage info
pub farmer_archival_storage_info: ArchivalStorageInfo,
/// Farmer piece cache
#[derivative(Debug = "ignore")]
pub piece_cache: NodePieceCache<C>,
Expand Down Expand Up @@ -252,6 +260,7 @@ impl Dsn {

let cuckoo_filter_size = farmer_total_space_pledged / Piece::SIZE + 1usize;
let farmer_archival_storage_pieces = ArchivalStoragePieces::new(cuckoo_filter_size);
let farmer_archival_storage_info = ArchivalStorageInfo::default();

tracing::debug!(genesis_hash = protocol_version, "Setting DSN protocol version...");

Expand Down Expand Up @@ -454,6 +463,33 @@ impl Dsn {
}));
drop_collection.push(on_new_listener);

let on_peer_info = node.on_peer_info(Arc::new({
let archival_storage_info = farmer_archival_storage_info.clone();

move |new_peer_info| {
let peer_id = new_peer_info.peer_id;
let peer_info = &new_peer_info.peer_info;

if let PeerInfo::Farmer { cuckoo_filter } = peer_info {
archival_storage_info.update_cuckoo_filter(peer_id, cuckoo_filter.clone());

tracing::debug!(%peer_id, ?peer_info, "Peer info cached",);
}
}
}));
drop_collection.push(on_peer_info);

let on_disconnected_peer = node.on_disconnected_peer(Arc::new({
let archival_storage_info = farmer_archival_storage_info.clone();

move |peer_id| {
if archival_storage_info.remove_peer_filter(peer_id) {
tracing::debug!(%peer_id, "Peer filter removed.",);
}
}
}));
drop_collection.push(on_disconnected_peer);

Ok((
DsnShared {
node,
Expand All @@ -464,6 +500,7 @@ impl Dsn {
piece_cache,
_drop: drop_collection,
farmer_archival_storage_pieces,
farmer_archival_storage_info,
},
runner,
))
Expand Down
38 changes: 0 additions & 38 deletions dsn/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,8 @@ mod provider_storage_utils;

pub use builder::*;
use either::*;
use sc_client_api::AuxStore;
/// Farmer piece cache
pub use subspace_farmer::utils::farmer_piece_cache::FarmerPieceCache;
use subspace_farmer_components::plotting::{PieceGetter, PieceGetterRetryPolicy};
use subspace_networking::utils::piece_provider::PieceValidator;
use subspace_networking::ParityDbProviderStorage;
use tracing::warn;

Expand All @@ -41,38 +38,3 @@ pub type ProviderStorage<C> = provider_storage_utils::AndProviderStorage<
provider_storage_utils::MaybeProviderStorage<FarmerProviderStorage>,
NodeProviderStorage<C>,
>;

/// Node piece getter (combines DSN and Farmer getters)
pub struct NodePieceGetter<PV, C> {
piece_getter: subspace_farmer::utils::node_piece_getter::NodePieceGetter<PV>,
node_cache: NodePieceCache<C>,
}

impl<PV, C> NodePieceGetter<PV, C> {
/// Constructor
pub fn new(
dsn_piece_getter: subspace_farmer::utils::node_piece_getter::NodePieceGetter<PV>,
node_cache: NodePieceCache<C>,
) -> Self {
Self { piece_getter: dsn_piece_getter, node_cache }
}
}

#[async_trait::async_trait()]
impl<PV: PieceValidator, C: AuxStore + Send + Sync> PieceGetter for NodePieceGetter<PV, C> {
async fn get_piece(
&self,
piece_index: subspace_core_primitives::PieceIndex,
retry_policy: PieceGetterRetryPolicy,
) -> Result<
Option<subspace_core_primitives::Piece>,
Box<dyn std::error::Error + Send + Sync + 'static>,
> {
let piece = self.node_cache.get_piece(piece_index.hash()).map_err(|x| x.to_string())?;
if piece.is_some() {
return Ok(piece);
}

self.piece_getter.get_piece(piece_index, retry_policy).await
}
}
14 changes: 7 additions & 7 deletions farmer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ sdk-utils = { path = "../utils" }
sdk-dsn = { path = "../dsn" }
sdk-traits = { path = "../traits" }

subspace-core-primitives = { git = "https://github.com/subspace/subspace", rev = "561f0e5b2e22e88af18402ca82e784bb9f97c552" }
subspace-erasure-coding = { git = "https://github.com/subspace/subspace", rev = "561f0e5b2e22e88af18402ca82e784bb9f97c552" }
subspace-farmer = { git = "https://github.com/subspace/subspace", rev = "561f0e5b2e22e88af18402ca82e784bb9f97c552" }
subspace-farmer-components = { git = "https://github.com/subspace/subspace", rev = "561f0e5b2e22e88af18402ca82e784bb9f97c552" }
subspace-networking = { git = "https://github.com/subspace/subspace", rev = "561f0e5b2e22e88af18402ca82e784bb9f97c552" }
subspace-proof-of-space = { git = "https://github.com/subspace/subspace", rev = "561f0e5b2e22e88af18402ca82e784bb9f97c552", features = ["parallel", "chia"] }
subspace-rpc-primitives = { git = "https://github.com/subspace/subspace", rev = "561f0e5b2e22e88af18402ca82e784bb9f97c552" }
subspace-core-primitives = { git = "https://github.com/subspace/subspace", rev = "3640edde1832bd5db94677b542c336c65cef564b" }
subspace-erasure-coding = { git = "https://github.com/subspace/subspace", rev = "3640edde1832bd5db94677b542c336c65cef564b" }
subspace-farmer = { git = "https://github.com/subspace/subspace", rev = "3640edde1832bd5db94677b542c336c65cef564b" }
subspace-farmer-components = { git = "https://github.com/subspace/subspace", rev = "3640edde1832bd5db94677b542c336c65cef564b" }
subspace-networking = { git = "https://github.com/subspace/subspace", rev = "3640edde1832bd5db94677b542c336c65cef564b" }
subspace-proof-of-space = { git = "https://github.com/subspace/subspace", rev = "3640edde1832bd5db94677b542c336c65cef564b", features = ["parallel", "chia"] }
subspace-rpc-primitives = { git = "https://github.com/subspace/subspace", rev = "3640edde1832bd5db94677b542c336c65cef564b" }
91 changes: 12 additions & 79 deletions farmer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub use builder::{Builder, Config};
use derivative::Derivative;
use futures::prelude::*;
use futures::stream::FuturesUnordered;
use sdk_dsn::{FarmerPieceCache, FarmerProviderStorage, NodePieceGetter};
use sdk_dsn::{FarmerPieceCache, FarmerProviderStorage};
use sdk_traits::Node;
use sdk_utils::{AsyncDropFutures, ByteSize, DropCollection, PublicKey};
use serde::{Deserialize, Serialize};
Expand All @@ -37,13 +37,13 @@ use subspace_farmer::single_disk_plot::{
};
use subspace_farmer::utils::archival_storage_pieces::ArchivalStoragePieces;
use subspace_farmer::utils::farmer_piece_getter::FarmerPieceGetter;
use subspace_farmer::utils::node_piece_getter::NodePieceGetter as DsnPieceGetter;
use subspace_farmer::utils::parity_db_store::ParityDbStore;
use subspace_farmer::utils::piece_cache::PieceCache;
use subspace_farmer::utils::piece_validator::SegmentCommitmentPieceValidator;
use subspace_farmer::utils::readers_and_pieces::ReadersAndPieces;
use subspace_farmer_components::plotting::{PieceGetter, PieceGetterRetryPolicy, PlottedSector};
use subspace_networking::utils::multihash::ToMultihash;
use subspace_networking::utils::piece_provider::PieceValidator;
use subspace_networking::ParityDbProviderStorage;
use subspace_rpc_primitives::{FarmerAppInfo, SolutionResponse};
use tokio::sync::{oneshot, watch, Mutex};
Expand Down Expand Up @@ -335,15 +335,9 @@ fn create_readers_and_pieces(
fn handler_on_sector_plotted(
plotted_sector: &PlottedSector,
maybe_old_plotted_sector: &Option<PlottedSector>,
plotting_permit: Arc<impl Send + Sync + 'static>,
disk_farm_index: usize,
node: &subspace_networking::Node,
readers_and_pieces: Arc<parking_lot::Mutex<Option<ReadersAndPieces>>>,
mut dropped_receiver: tokio::sync::broadcast::Receiver<()>,
node_name: &str,
) {
let sector_index = plotted_sector.sector_index;

let disk_farm_index = disk_farm_index
.try_into()
.expect("More than 256 plots are not supported, this is checked above already; qed");
Expand All @@ -358,44 +352,6 @@ fn handler_on_sector_plotted(
}
readers_and_pieces.add_sector(disk_farm_index, plotted_sector);
}

let piece_indexes = plotted_sector.piece_indexes.clone();

let node = node.clone();

// TODO: Skip those that were already announced (because they cached)
let publish_fut = async move {
piece_indexes
.into_iter()
.map(|piece_index| {
subspace_networking::utils::piece_announcement::announce_single_piece_index_hash_with_backoff(
piece_index.hash(),
&node,
)
})
.collect::<FuturesUnordered<_>>()
.map(drop)
.collect::<Vec<()>>()
.await;

tracing::info!(sector_index, "Sector publishing was successful.");

// Release only after publishing is finished
drop(plotting_permit);
}
.in_current_span();

drop(sdk_utils::task_spawn(
format!("subspace-sdk-farmer-{node_name}-piece-publishing"),
async move {
use futures::future::{select, Either};

let result = select(Box::pin(publish_fut), Box::pin(dropped_receiver.recv())).await;
if matches!(result, Either::Right(_)) {
tracing::debug!("Piece publishing was cancelled due to shutdown.");
}
},
));
}

impl Config {
Expand All @@ -412,7 +368,7 @@ impl Config {
}

let Self {
max_concurrent_plots,
max_concurrent_plots: _,
piece_cache_size: PieceCacheSize(piece_cache_size),
provided_keys_limit: ProvidedKeysLimit(provided_keys_limit),
max_pieces_in_sector,
Expand All @@ -426,9 +382,6 @@ impl Config {
let mut single_disk_plots = Vec::with_capacity(plots.len());
let mut plot_info = HashMap::with_capacity(plots.len());

let concurrent_plotting_semaphore =
Arc::new(tokio::sync::Semaphore::new(max_concurrent_plots.get()));

let base_path = cache.directory;
let readers_and_pieces = Arc::clone(&node.dsn().farmer_readers_and_pieces);

Expand Down Expand Up @@ -496,11 +449,10 @@ impl Config {
)),
);
let piece_getter = Arc::new(FarmerPieceGetter::new(
NodePieceGetter::new(
DsnPieceGetter::new(piece_provider),
node.dsn().piece_cache.clone(),
),
Arc::clone(&piece_cache),
node.dsn().node.clone(),
piece_provider,
piece_cache.clone(),
node.dsn().farmer_archival_storage_info.clone(),
));

let farmer_app_info = subspace_farmer::NodeClient::farmer_app_info(node.rpc())
Expand Down Expand Up @@ -543,7 +495,6 @@ impl Config {
node,
max_pieces_in_sector,
piece_getter: Arc::clone(&piece_getter),
concurrent_plotting_semaphore: Arc::clone(&concurrent_plotting_semaphore),
description,
kzg: kzg.clone(),
erasure_coding: erasure_coding.clone(),
Expand All @@ -562,30 +513,16 @@ impl Config {
let readers_and_pieces = Arc::clone(&readers_and_pieces);
let span = tracing::info_span!("farm", %disk_farm_index);

// We are not going to send anything here, but dropping of sender on dropping of
// corresponding `SingleDiskPlot` will allow us to stop background tasks.
let (dropped_sender, _dropped_receiver) = tokio::sync::broadcast::channel::<()>(1);
drop_at_exit.defer({
let dropped_sender = dropped_sender.clone();
move || drop(dropped_sender.send(()))
});

let node = node.dsn().node.clone();
let node_name = node_name.clone();
// Collect newly plotted pieces
// TODO: Once we have replotting, this will have to be updated
let handler_id = single_disk_plot.on_sector_plotted(Arc::new(
move |(plotted_sector, maybe_old_plotted_sector, plotting_permit)| {
move |(plotted_sector, maybe_old_plotted_sector)| {
let _span_guard = span.enter();
handler_on_sector_plotted(
plotted_sector,
maybe_old_plotted_sector,
Arc::clone(plotting_permit),
disk_farm_index,
&node,
readers_and_pieces.clone(),
dropped_sender.subscribe(),
&node_name,
)
},
));
Expand Down Expand Up @@ -752,8 +689,7 @@ pub struct InitialPlottingProgress {
}

/// Progress data received from sender, used to monitor plotting progress
pub type ProgressData =
Option<(PlottedSector, Option<PlottedSector>, Arc<tokio::sync::OwnedSemaphorePermit>)>;
pub type ProgressData = Option<(PlottedSector, Option<PlottedSector>)>;

/// Plot structure
#[derive(Debug)]
Expand Down Expand Up @@ -829,7 +765,6 @@ struct PlotOptions<'a, PG, N: sdk_traits::Node> {
pub reward_address: PublicKey,
pub node: &'a N,
pub piece_getter: PG,
pub concurrent_plotting_semaphore: Arc<tokio::sync::Semaphore>,
pub description: &'a PlotDescription,
pub kzg: kzg::Kzg,
pub erasure_coding: ErasureCoding,
Expand All @@ -843,7 +778,6 @@ impl<T: subspace_proof_of_space::Table> Plot<T> {
reward_address,
node,
piece_getter,
concurrent_plotting_semaphore,
description,
kzg,
erasure_coding,
Expand All @@ -869,7 +803,6 @@ impl<T: subspace_proof_of_space::Table> Plot<T> {
kzg,
erasure_coding,
piece_getter,
concurrent_plotting_semaphore,
};
let single_disk_plot = SingleDiskPlot::new::<_, _, T>(description, disk_farm_idx).await?;
let mut drop_at_exit = DropCollection::new();
Expand Down Expand Up @@ -1023,13 +956,13 @@ const POPULATE_PIECE_DELAY: Duration = Duration::from_secs(10);
/// Populates piece cache on startup. It waits for the new segment index and
/// check all pieces from previous segments to see if they are already in the
/// cache. If they are not, they are added from DSN.
async fn populate_pieces_cache<PG, PC>(
async fn populate_pieces_cache<PV, PC>(
dsn_node: subspace_networking::Node,
segment_index: SegmentIndex,
piece_getter: Arc<FarmerPieceGetter<PG, PC>>,
piece_getter: Arc<FarmerPieceGetter<PV, PC>>,
piece_cache: Arc<tokio::sync::Mutex<FarmerPieceCache>>,
) where
PG: PieceGetter + Send + Sync,
PV: PieceValidator + Send + Sync + 'static,
PC: PieceCache + Send + 'static,
{
let _ = dsn_node.wait_for_connected_peers(POPULATE_PIECE_DELAY).await;
Expand Down
Loading

0 comments on commit 8a3c0a4

Please sign in to comment.