From f4f409bedc58392ac24a0a8b7c6a739a6f57d5fb Mon Sep 17 00:00:00 2001 From: tediou5 Date: Sat, 7 Sep 2024 23:28:56 +0800 Subject: [PATCH 1/9] chore: moving FarmIndex into cluster::farmer --- .../src/bin/subspace-farmer/commands/cluster/controller.rs | 3 ++- .../subspace-farmer/commands/cluster/controller/farms.rs | 6 +++--- crates/subspace-farmer/src/cluster/farmer.rs | 2 ++ 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller.rs index 6f40a5be03..751fbdef6c 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller.rs @@ -2,7 +2,7 @@ mod caches; mod farms; use crate::commands::cluster::controller::caches::maintain_caches; -use crate::commands::cluster::controller::farms::{maintain_farms, FarmIndex}; +use crate::commands::cluster::controller::farms::maintain_farms; use crate::commands::shared::derive_libp2p_keypair; use crate::commands::shared::network::{configure_network, NetworkArgs}; use anyhow::anyhow; @@ -20,6 +20,7 @@ use std::sync::Arc; use std::time::Duration; use subspace_core_primitives::crypto::kzg::{embedded_kzg_settings, Kzg}; use subspace_farmer::cluster::controller::controller_service; +use subspace_farmer::cluster::farmer::FarmIndex; use subspace_farmer::cluster::nats_client::NatsClient; use subspace_farmer::farm::plotted_pieces::PlottedPieces; use subspace_farmer::farmer_cache::FarmerCache; diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/farms.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/farms.rs index e8efec6895..1450463c37 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/farms.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/farms.rs @@ -21,7 +21,9 @@ use std::sync::Arc; use std::time::Instant; use subspace_core_primitives::{Blake3Hash, SectorIndex}; use subspace_farmer::cluster::controller::ClusterControllerFarmerIdentifyBroadcast; -use subspace_farmer::cluster::farmer::{ClusterFarm, ClusterFarmerIdentifyFarmBroadcast}; +use subspace_farmer::cluster::farmer::{ + ClusterFarm, ClusterFarmerIdentifyFarmBroadcast, FarmIndex, +}; use subspace_farmer::cluster::nats_client::NatsClient; use subspace_farmer::farm::plotted_pieces::PlottedPieces; use subspace_farmer::farm::{Farm, FarmId, SectorPlottingDetails, SectorUpdate}; @@ -32,8 +34,6 @@ use tracing::{error, info, trace, warn}; type AddRemoveFuture<'a> = Pin, ClusterFarm)>> + 'a>>; -pub(super) type FarmIndex = u16; - #[derive(Debug)] struct KnownFarm { farm_id: FarmId, diff --git a/crates/subspace-farmer/src/cluster/farmer.rs b/crates/subspace-farmer/src/cluster/farmer.rs index 483f12413e..6aca7d0cc1 100644 --- a/crates/subspace-farmer/src/cluster/farmer.rs +++ b/crates/subspace-farmer/src/cluster/farmer.rs @@ -37,6 +37,8 @@ use tracing::{debug, error, trace, warn}; const BROADCAST_NOTIFICATIONS_BUFFER: usize = 1000; const MIN_FARMER_IDENTIFICATION_INTERVAL: Duration = Duration::from_secs(1); +/// Type alias for farm index used by cluster. +pub type FarmIndex = u16; type Handler = Bag, A>; /// Broadcast with identification details by farmers From c1d4a22ea9b9df92eb832720a015c513918d3ba6 Mon Sep 17 00:00:00 2001 From: tediou5 Date: Sun, 8 Sep 2024 00:01:02 +0800 Subject: [PATCH 2/9] chore: tiny refactoring for cluster farm --- .../commands/cluster/controller/farms.rs | 13 +++-- crates/subspace-farmer/src/cluster/farmer.rs | 57 ++++++++++++------- 2 files changed, 45 insertions(+), 25 deletions(-) diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/farms.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/farms.rs index 1450463c37..bd6c22f27b 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/farms.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/farms.rs @@ -115,8 +115,9 @@ impl KnownFarms { } fn remove_expired(&mut self) -> impl Iterator + '_ { - self.known_farms.extract_if(|_farm_index, known_farm| { - known_farm.last_identification.elapsed() > FARMER_IDENTIFICATION_BROADCAST_INTERVAL * 2 + let elapsed = FARMER_IDENTIFICATION_BROADCAST_INTERVAL * 2; + self.known_farms.extract_if(move |_farm_index, known_farm| { + known_farm.last_identification.elapsed() > elapsed }) } @@ -142,7 +143,7 @@ pub(super) async fn maintain_farms( Box::pin(pending()) as Pin)>>> ]); - let farmer_identify_subscription = pin!(nats_client + let farm_identify_subscription = pin!(nats_client .subscribe_to_broadcasts::(None, None) .await .map_err(|error| anyhow!( @@ -157,7 +158,7 @@ pub(super) async fn maintain_farms( warn!(%error, "Failed to send farmer identification broadcast"); } - let mut farmer_identify_subscription = farmer_identify_subscription.fuse(); + let mut farm_identify_subscription = farm_identify_subscription.fuse(); let mut farm_pruning_interval = tokio::time::interval_at( (Instant::now() + FARMER_IDENTIFICATION_BROADCAST_INTERVAL * 2).into(), FARMER_IDENTIFICATION_BROADCAST_INTERVAL * 2, @@ -200,9 +201,9 @@ pub(super) async fn maintain_farms( } } } - maybe_identify_message = farmer_identify_subscription.next() => { + maybe_identify_message = farm_identify_subscription.next() => { let Some(identify_message) = maybe_identify_message else { - return Err(anyhow!("Farmer identify stream ended")); + return Err(anyhow!("Farm identify stream ended")); }; process_farm_identify_message( diff --git a/crates/subspace-farmer/src/cluster/farmer.rs b/crates/subspace-farmer/src/cluster/farmer.rs index 6aca7d0cc1..e434806278 100644 --- a/crates/subspace-farmer/src/cluster/farmer.rs +++ b/crates/subspace-farmer/src/cluster/farmer.rs @@ -363,8 +363,8 @@ where // that can be used to respond to incoming requests let farms_details = farms .iter() - .map(|farm| { - let farm_id = *farm.id(); + .map(|farm| (farm, *farm.id())) + .map(|(farm, farm_id)| { let nats_client = nats_client.clone(); let background_tasks = if primary_instance { @@ -480,7 +480,11 @@ where async move { if primary_instance { select! { - result = identify_responder(&nats_client, &farms_details, identification_broadcast_interval).fuse() => { + result = identify_responder( + &nats_client, + &farms_details, + identification_broadcast_interval, + ).fuse() => { result }, result = plotted_sectors_responder(&nats_client, &farms_details).fuse() => { @@ -540,14 +544,20 @@ async fn identify_responder( } last_identification = Instant::now(); - send_identify_broadcast(nats_client, farms_details).await; + send_identify_broadcast( + nats_client, + farms_details, + ).await; interval.reset(); } _ = interval.tick().fuse() => { last_identification = Instant::now(); trace!("Farmer self-identification"); - send_identify_broadcast(nats_client, farms_details).await; + send_identify_broadcast( + nats_client, + farms_details, + ).await; } } } @@ -555,29 +565,27 @@ async fn identify_responder( Ok(()) } -async fn send_identify_broadcast(nats_client: &NatsClient, farms_details: &[FarmDetails]) { +async fn send_identify_broadcast( + nats_client: &NatsClient, + farms_details: &[FarmDetails], +) { + if farms_details.is_empty() { + warn!("No farm, skip sending farmer identify notification"); + return; + } + farms_details .iter() .map(|farm_details| async move { if let Err(error) = nats_client .broadcast( - &ClusterFarmerIdentifyFarmBroadcast { - farm_id: farm_details.farm_id, - total_sectors_count: farm_details.total_sectors_count, - fingerprint: blake3_hash_list(&[ - &farm_details.farm_id.encode(), - &farm_details.total_sectors_count.to_le_bytes(), - ]), - }, + &new_identify_message(farm_details), &farm_details.farm_id_string, ) .await { - warn!( - farm_id = %farm_details.farm_id, - %error, - "Failed to send farmer identify notification" - ); + let farmer_id = farm_details.farm_id; + warn!(%farmer_id, %error, "Failed to send farmer identify notification"); } }) .collect::>() @@ -585,6 +593,17 @@ async fn send_identify_broadcast(nats_client: &NatsClient, farms_details: &[Farm .await; } +fn new_identify_message(farm_details: &FarmDetails) -> ClusterFarmerIdentifyFarmBroadcast { + ClusterFarmerIdentifyFarmBroadcast { + farm_id: farm_details.farm_id, + total_sectors_count: farm_details.total_sectors_count, + fingerprint: blake3_hash_list(&[ + &farm_details.farm_id.encode(), + &farm_details.total_sectors_count.to_le_bytes(), + ]), + } +} + async fn plotted_sectors_responder( nats_client: &NatsClient, farms_details: &[FarmDetails], From 201756839b974bb971c631e520c82f79ea1d5f6e Mon Sep 17 00:00:00 2001 From: tediou5 Date: Sun, 8 Sep 2024 00:35:24 +0800 Subject: [PATCH 3/9] feat: derive sub farm_ids from a farm_id --- crates/subspace-farmer/src/farm.rs | 15 +++++++++++++++ crates/subspace-farmer/src/farm/tests.rs | 19 +++++++++++++++++++ 2 files changed, 34 insertions(+) create mode 100644 crates/subspace-farmer/src/farm/tests.rs diff --git a/crates/subspace-farmer/src/farm.rs b/crates/subspace-farmer/src/farm.rs index c755c2d343..dc35d8a80a 100644 --- a/crates/subspace-farmer/src/farm.rs +++ b/crates/subspace-farmer/src/farm.rs @@ -28,6 +28,8 @@ use thiserror::Error; use ulid::Ulid; pub mod plotted_pieces; +#[cfg(test)] +mod tests; /// Erased error type pub type FarmError = Box; @@ -499,6 +501,19 @@ impl FarmId { pub fn new() -> Self { Self::Ulid(Ulid::new()) } + + /// Derive sub IDs + #[inline] + pub fn derive_sub_ids(&self, n: usize) -> Vec { + match self { + FarmId::Ulid(ulid) => { + let ulid = ulid.0; + (0..n as u128) + .map(|i| FarmId::Ulid(Ulid(ulid + i))) + .collect() + } + } + } } /// Abstract farm implementation diff --git a/crates/subspace-farmer/src/farm/tests.rs b/crates/subspace-farmer/src/farm/tests.rs new file mode 100644 index 0000000000..606fe6fe6b --- /dev/null +++ b/crates/subspace-farmer/src/farm/tests.rs @@ -0,0 +1,19 @@ +use crate::farm::FarmId; + +#[test] +fn derive_sub_farm_ids_test() { + let id = FarmId::new(); + let sub_ids = id.derive_sub_ids(128); + assert_eq!(sub_ids.len(), 128); + + match id { + FarmId::Ulid(id) => { + let id: u128 = id.into(); + sub_ids.into_iter().zip(0..128u128).for_each(|(sub_id, i)| { + let FarmId::Ulid(sub_id) = sub_id; + let sub_id: u128 = sub_id.into(); + assert_eq!(sub_id, id + i); + }); + } + }; +} From 1c70a899b5ad33aba3d3359801c3368fc33dd2b9 Mon Sep 17 00:00:00 2001 From: tedio5 Date: Mon, 26 Aug 2024 15:46:38 +0800 Subject: [PATCH 4/9] opt: optimize farm identification --- .../commands/cluster/controller/farms.rs | 117 +++++++++++- crates/subspace-farmer/src/cluster/farmer.rs | 170 +++++++++++++++--- 2 files changed, 259 insertions(+), 28 deletions(-) diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/farms.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/farms.rs index bd6c22f27b..fea375d2c1 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/farms.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/farms.rs @@ -9,7 +9,7 @@ use anyhow::anyhow; use async_lock::RwLock as AsyncRwLock; use futures::channel::oneshot; use futures::future::FusedFuture; -use futures::stream::FuturesUnordered; +use futures::stream::{self, FuturesUnordered}; use futures::{select, FutureExt, StreamExt}; use parking_lot::Mutex; use std::collections::hash_map::Entry; @@ -22,18 +22,26 @@ use std::time::Instant; use subspace_core_primitives::{Blake3Hash, SectorIndex}; use subspace_farmer::cluster::controller::ClusterControllerFarmerIdentifyBroadcast; use subspace_farmer::cluster::farmer::{ - ClusterFarm, ClusterFarmerIdentifyFarmBroadcast, FarmIndex, + ClusterFarm, ClusterFarmerFarmDetailsRequest, ClusterFarmerIdentifyBroadcast, + ClusterFarmerIdentifyFarmBroadcast, FarmIndex, }; use subspace_farmer::cluster::nats_client::NatsClient; use subspace_farmer::farm::plotted_pieces::PlottedPieces; use subspace_farmer::farm::{Farm, FarmId, SectorPlottingDetails, SectorUpdate}; use tokio::task; use tokio::time::MissedTickBehavior; -use tracing::{error, info, trace, warn}; +use tracing::{debug, error, info, trace, warn}; type AddRemoveFuture<'a> = Pin, ClusterFarm)>> + 'a>>; +#[derive(Debug)] +struct KnownFarmer { + farmer_id: FarmId, + fingerprint: Blake3Hash, + last_identification: Instant, +} + #[derive(Debug)] struct KnownFarm { farm_id: FarmId, @@ -56,10 +64,38 @@ enum KnownFarmInsertResult { #[derive(Debug, Default)] struct KnownFarms { + known_farmers: Vec, known_farms: HashMap, } impl KnownFarms { + fn update_farmer(&mut self, farmer_id: FarmId, fingerprint: Blake3Hash) -> bool { + let last_identification = Instant::now(); + if self.known_farmers.iter_mut().any(|known_farmer| { + if known_farmer.farmer_id == farmer_id && known_farmer.fingerprint == fingerprint { + known_farmer.last_identification = last_identification; + self.known_farms.iter_mut().for_each(|(_, known_farm)| { + // All farms in farmer use the same fingerprint + if known_farm.fingerprint == fingerprint { + known_farm.last_identification = last_identification; + } + }); + true + } else { + false + } + }) { + return false; + } + + self.known_farmers.push(KnownFarmer { + farmer_id, + fingerprint, + last_identification, + }); + true + } + fn insert_or_update( &mut self, farm_id: FarmId, @@ -116,6 +152,9 @@ impl KnownFarms { fn remove_expired(&mut self) -> impl Iterator + '_ { let elapsed = FARMER_IDENTIFICATION_BROADCAST_INTERVAL * 2; + self.known_farmers + .retain(move |known_farmer| known_farmer.last_identification.elapsed() <= elapsed); + self.known_farms.extract_if(move |_farm_index, known_farm| { known_farm.last_identification.elapsed() > elapsed }) @@ -143,6 +182,11 @@ pub(super) async fn maintain_farms( Box::pin(pending()) as Pin)>>> ]); + let farmer_identify_subscription = pin!(nats_client + .subscribe_to_broadcasts::(None, None) + .await + .map_err(|error| anyhow!("Failed to subscribe to farmer identify broadcast: {error}"))?); + let farm_identify_subscription = pin!(nats_client .subscribe_to_broadcasts::(None, None) .await @@ -159,6 +203,7 @@ pub(super) async fn maintain_farms( } let mut farm_identify_subscription = farm_identify_subscription.fuse(); + let mut farmer_identify_subscription = farmer_identify_subscription.fuse(); let mut farm_pruning_interval = tokio::time::interval_at( (Instant::now() + FARMER_IDENTIFICATION_BROADCAST_INTERVAL * 2).into(), FARMER_IDENTIFICATION_BROADCAST_INTERVAL * 2, @@ -214,6 +259,19 @@ pub(super) async fn maintain_farms( plotted_pieces, ); } + maybe_identify_message = farmer_identify_subscription.next() => { + let Some(identify_message) = maybe_identify_message else { + return Err(anyhow!("Farmer identify stream ended")); + }; + + process_farmer_identify_message( + identify_message, + nats_client, + &mut known_farms, + &mut farms_to_add_remove, + plotted_pieces, + ).await; + } _ = farm_pruning_interval.tick().fuse() => { for (farm_index, removed_farm) in known_farms.remove_expired() { let farm_id = removed_farm.farm_id; @@ -270,6 +328,59 @@ pub(super) async fn maintain_farms( } } +async fn process_farmer_identify_message<'a>( + identify_message: ClusterFarmerIdentifyBroadcast, + nats_client: &'a NatsClient, + known_farms: &mut KnownFarms, + farms_to_add_remove: &mut VecDeque>, + plotted_pieces: &'a Arc>>, +) { + let ClusterFarmerIdentifyBroadcast { + farmer_id, + farms_count, + fingerprint, + } = identify_message; + + if !known_farms.update_farmer(farmer_id, fingerprint) { + // Farmer already known, nothing to do + debug!(%farmer_id, "Farmer already known, nothing to do"); + return; + }; + + let farm_ids = farmer_id.derive_sub_ids(farms_count.into()); + match nats_client + .stream_request( + ClusterFarmerFarmDetailsRequest, + Some(&farmer_id.to_string()), + ) + .await + { + Ok(farms_details) => { + let mut farms_details = farms_details.zip(stream::iter(farm_ids)); + while let Some((farm_details, farm_id)) = farms_details.next().await { + let farm_identify_message = ClusterFarmerIdentifyFarmBroadcast { + farm_id, + total_sectors_count: farm_details.total_sectors_count, + fingerprint, + }; + + process_farm_identify_message( + farm_identify_message, + nats_client, + known_farms, + farms_to_add_remove, + plotted_pieces, + ) + } + } + Err(error) => warn!( + %error, + %farmer_id, + "Failed to request farmer farm details" + ), + } +} + fn process_farm_identify_message<'a>( identify_message: ClusterFarmerIdentifyFarmBroadcast, nats_client: &'a NatsClient, diff --git a/crates/subspace-farmer/src/cluster/farmer.rs b/crates/subspace-farmer/src/cluster/farmer.rs index e434806278..879d794cab 100644 --- a/crates/subspace-farmer/src/cluster/farmer.rs +++ b/crates/subspace-farmer/src/cluster/farmer.rs @@ -41,6 +41,37 @@ const MIN_FARMER_IDENTIFICATION_INTERVAL: Duration = Duration::from_secs(1); pub type FarmIndex = u16; type Handler = Bag, A>; +/// Broadcast with farmer id for identification +#[derive(Debug, Clone, Encode, Decode)] +pub struct ClusterFarmerIdentifyBroadcast { + /// Farmer ID + pub farmer_id: FarmId, + /// Number of farms + pub farms_count: FarmIndex, + /// Farmer fingerprint changes when something about internal farm changes (like allocated space) + pub fingerprint: Blake3Hash, +} + +impl GenericBroadcast for ClusterFarmerIdentifyBroadcast { + const SUBJECT: &'static str = "subspace.farmer.*.farmer-identify"; +} + +/// Request farm details from farmer +#[derive(Debug, Clone, Encode, Decode)] +pub struct ClusterFarmerFarmDetailsRequest; + +impl GenericStreamRequest for ClusterFarmerFarmDetailsRequest { + const SUBJECT: &'static str = "subspace.farmer.*.farm.details"; + type Response = ClusterFarmerFarmDetails; +} + +/// Farm details +#[derive(Debug, Clone, Encode, Decode)] +pub struct ClusterFarmerFarmDetails { + /// Total number of sectors in the farm + pub total_sectors_count: SectorIndex, +} + /// Broadcast with identification details by farmers #[derive(Debug, Clone, Encode, Decode)] pub struct ClusterFarmerIdentifyFarmBroadcast { @@ -359,11 +390,15 @@ pub fn farmer_service( where F: Farm, { + let farmer_id = FarmId::new(); + let farmer_id_string = farmer_id.to_string(); + let farm_ids = farmer_id.derive_sub_ids(farms.len()); + // For each farm start forwarding notifications as broadcast messages and create farm details // that can be used to respond to incoming requests let farms_details = farms .iter() - .map(|farm| (farm, *farm.id())) + .zip(farm_ids) .map(|(farm, farm_id)| { let nats_client = nats_client.clone(); @@ -482,11 +517,21 @@ where select! { result = identify_responder( &nats_client, + farmer_id, + &farmer_id_string, &farms_details, identification_broadcast_interval, ).fuse() => { result }, + result = farms_details_responder( + &nats_client, + farmer_id, + &farmer_id_string, + &farms_details, + ).fuse() => { + result + }, result = plotted_sectors_responder(&nats_client, &farms_details).fuse() => { result }, @@ -511,6 +556,8 @@ where /// broadcast in response, also send periodic notifications reminding that farm exists async fn identify_responder( nats_client: &NatsClient, + farmer_id: FarmId, + farmer_id_string: &str, farms_details: &[FarmDetails], identification_broadcast_interval: Duration, ) -> anyhow::Result<()> { @@ -546,6 +593,8 @@ async fn identify_responder( last_identification = Instant::now(); send_identify_broadcast( nats_client, + farmer_id, + farmer_id_string, farms_details, ).await; interval.reset(); @@ -556,6 +605,8 @@ async fn identify_responder( send_identify_broadcast( nats_client, + farmer_id, + farmer_id_string, farms_details, ).await; } @@ -567,6 +618,8 @@ async fn identify_responder( async fn send_identify_broadcast( nats_client: &NatsClient, + farmer_id: FarmId, + farmer_id_string: &str, farms_details: &[FarmDetails], ) { if farms_details.is_empty() { @@ -574,34 +627,101 @@ async fn send_identify_broadcast( return; } - farms_details + if let Err(error) = nats_client + .broadcast( + &new_identify_message(farmer_id, farms_details), + farmer_id_string, + ) + .await + { + warn!(%farmer_id, %error, "Failed to send farmer identify notification"); + } +} + +fn new_identify_message( + farmer_id: FarmId, + farms_details: &[FarmDetails], +) -> ClusterFarmerIdentifyBroadcast { + let farmer_id_bytes = farmer_id.encode(); + let farms_sectors_counts = farms_details .iter() - .map(|farm_details| async move { - if let Err(error) = nats_client - .broadcast( - &new_identify_message(farm_details), - &farm_details.farm_id_string, - ) - .await - { - let farmer_id = farm_details.farm_id; - warn!(%farmer_id, %error, "Failed to send farmer identify notification"); - } - }) - .collect::>() - .collect::>() - .await; + .map(|farm_details| farm_details.total_sectors_count.to_le_bytes()) + .collect::>(); + let mut farms_sectors_counts = farms_sectors_counts + .iter() + .map(AsRef::as_ref) + .collect::>(); + farms_sectors_counts.push(farmer_id_bytes.as_slice()); + let fingerprint = blake3_hash_list(farms_sectors_counts.as_slice()); + + ClusterFarmerIdentifyBroadcast { + farmer_id, + farms_count: farms_details.len() as u16, + fingerprint, + } } -fn new_identify_message(farm_details: &FarmDetails) -> ClusterFarmerIdentifyFarmBroadcast { - ClusterFarmerIdentifyFarmBroadcast { - farm_id: farm_details.farm_id, - total_sectors_count: farm_details.total_sectors_count, - fingerprint: blake3_hash_list(&[ - &farm_details.farm_id.encode(), - &farm_details.total_sectors_count.to_le_bytes(), - ]), +async fn farms_details_responder( + nats_client: &NatsClient, + farmer_id: FarmId, + farmer_id_string: &str, + farms_details: &[FarmDetails], +) -> anyhow::Result<()> { + // Initialize with pending future so it never ends + let mut processing = FuturesUnordered::from_iter([ + Box::pin(pending()) as Pin + Send>> + ]); + let mut subscription = nats_client + .subscribe_to_stream_requests(Some(farmer_id_string), Some(farmer_id_string.to_string())) + .await + .map_err(|error| { + anyhow!( + "Failed to subscribe to farms details {}: {}", + farmer_id, + error + ) + })? + .fuse(); + + loop { + select! { + maybe_message = subscription.next() => { + let Some(message) = maybe_message else { + break; + }; + + // Create background task for concurrent processing + processing.push(Box::pin(process_farms_deatils_request( + nats_client, + farms_details, + message, + ))); + } + _ = processing.next() => { + // Nothing to do here + } + } } + + Ok(()) +} + +async fn process_farms_deatils_request( + nats_client: &NatsClient, + farms_details: &[FarmDetails], + request: StreamRequest, +) { + trace!(?request, "Farms details request"); + + let stream = Box::new(stream::iter(farms_details.iter().map(|farm_details| { + ClusterFarmerFarmDetails { + total_sectors_count: farm_details.total_sectors_count, + } + }))); + + nats_client + .stream_response::(request.response_subject, stream) + .await; } async fn plotted_sectors_responder( From 3f285f4bb8535cc918e899c71a3ce84ec005db33 Mon Sep 17 00:00:00 2001 From: tediou5 Date: Mon, 16 Sep 2024 04:10:16 +0800 Subject: [PATCH 5/9] feat: derive sub cache_ids from a cache_id --- crates/subspace-farmer/src/farm.rs | 13 +++++++++++++ crates/subspace-farmer/src/farm/tests.rs | 20 +++++++++++++++++++- 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/crates/subspace-farmer/src/farm.rs b/crates/subspace-farmer/src/farm.rs index dc35d8a80a..58a81afb15 100644 --- a/crates/subspace-farmer/src/farm.rs +++ b/crates/subspace-farmer/src/farm.rs @@ -102,6 +102,19 @@ impl PieceCacheId { pub fn new() -> Self { Self::Ulid(Ulid::new()) } + + /// Derive sub IDs + #[inline] + pub fn derive_sub_ids(&self, n: usize) -> Vec { + match self { + PieceCacheId::Ulid(ulid) => { + let ulid = ulid.0; + (0..n as u128) + .map(|i| PieceCacheId::Ulid(Ulid(ulid + i))) + .collect() + } + } + } } /// Offset wrapper for pieces in [`PieceCache`] diff --git a/crates/subspace-farmer/src/farm/tests.rs b/crates/subspace-farmer/src/farm/tests.rs index 606fe6fe6b..0a7d645e02 100644 --- a/crates/subspace-farmer/src/farm/tests.rs +++ b/crates/subspace-farmer/src/farm/tests.rs @@ -1,4 +1,4 @@ -use crate::farm::FarmId; +use crate::farm::{FarmId, PieceCacheId}; #[test] fn derive_sub_farm_ids_test() { @@ -17,3 +17,21 @@ fn derive_sub_farm_ids_test() { } }; } + +#[test] +fn derive_sub_cache_ids_test() { + let id = PieceCacheId::new(); + let sub_ids = id.derive_sub_ids(128); + assert_eq!(sub_ids.len(), 128); + + match id { + PieceCacheId::Ulid(id) => { + let id: u128 = id.into(); + sub_ids.into_iter().zip(0..128u128).for_each(|(sub_id, i)| { + let PieceCacheId::Ulid(sub_id) = sub_id; + let sub_id: u128 = sub_id.into(); + assert_eq!(sub_id, id + i); + }); + } + }; +} From 8087b8787279f08737e764966f703a354fd83090 Mon Sep 17 00:00:00 2001 From: tediou5 Date: Tue, 17 Sep 2024 01:40:02 +0800 Subject: [PATCH 6/9] chore: tiny refactoring for cluster cache --- .../commands/cluster/controller/caches.rs | 59 +++++++++++-------- crates/subspace-farmer/src/cluster/cache.rs | 30 +++++++--- 2 files changed, 55 insertions(+), 34 deletions(-) diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/caches.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/caches.rs index a57a71e6e6..de6bcc7aab 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/caches.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/caches.rs @@ -16,7 +16,7 @@ use std::pin::{pin, Pin}; use std::sync::Arc; use std::time::{Duration, Instant}; use subspace_farmer::cluster::cache::{ - ClusterCacheIdentifyBroadcast, ClusterCacheIndex, ClusterPieceCache, + ClusterCacheIdentifySignalCacheBroadcast, ClusterCacheIndex, ClusterPieceCache, }; use subspace_farmer::cluster::controller::ClusterControllerCacheIdentifyBroadcast; use subspace_farmer::cluster::nats_client::NatsClient; @@ -28,60 +28,66 @@ use tracing::{info, trace, warn}; const SCHEDULE_REINITIALIZATION_DELAY: Duration = Duration::from_secs(3); #[derive(Debug)] -struct KnownCache { - cache_id: PieceCacheId, +struct KnownSingleCache { + single_cache_id: PieceCacheId, last_identification: Instant, piece_cache: Arc, } #[derive(Debug, Default)] struct KnownCaches { - known_caches: Vec, + known_single_caches: Vec, } impl KnownCaches { fn get_all(&self) -> Vec> { - self.known_caches + self.known_single_caches .iter() .map(|known_cache| Arc::clone(&known_cache.piece_cache) as Arc<_>) .collect() } /// Return `true` if farmer cache reinitialization is required - fn update( + fn update_single( &mut self, - cache_id: PieceCacheId, + single_cache_id: PieceCacheId, max_num_elements: u32, nats_client: &NatsClient, ) -> bool { - if self.known_caches.iter_mut().any(|known_cache| { - if known_cache.cache_id == cache_id { - known_cache.last_identification = Instant::now(); - true - } else { - false - } - }) { + if self + .known_single_caches + .iter_mut() + .any(|known_single_cache| { + if known_single_cache.single_cache_id == single_cache_id { + known_single_cache.last_identification = Instant::now(); + true + } else { + false + } + }) + { return false; } let piece_cache = Arc::new(ClusterPieceCache::new( - cache_id, + single_cache_id, max_num_elements, nats_client.clone(), )); - self.known_caches.push(KnownCache { - cache_id, + self.known_single_caches.push(KnownSingleCache { + single_cache_id, last_identification: Instant::now(), piece_cache, }); true } - fn remove_expired(&mut self) -> impl Iterator + '_ { - self.known_caches.extract_if(|known_cache| { - known_cache.last_identification.elapsed() > CACHE_IDENTIFICATION_BROADCAST_INTERVAL * 2 - }) + fn remove_expired(&mut self) -> impl Iterator + '_ { + let elapsed = CACHE_IDENTIFICATION_BROADCAST_INTERVAL * 2; + self.known_single_caches + .extract_if(move |known_single_cache| { + known_single_cache.last_identification.elapsed() > elapsed + }) } } @@ -98,7 +104,10 @@ pub(super) async fn maintain_caches( (Box::pin(ready(())) as Pin>>).fuse(); let cache_identify_subscription = pin!(nats_client - .subscribe_to_broadcasts::(Some(cache_group), None) + .subscribe_to_broadcasts::( + Some(cache_group), + None + ) .await .map_err(|error| anyhow!("Failed to subscribe to cache identify broadcast: {error}"))?); @@ -157,11 +166,11 @@ pub(super) async fn maintain_caches( return Err(anyhow!("Cache identify stream ended")); }; - let ClusterCacheIdentifyBroadcast { + let ClusterCacheIdentifySignalCacheBroadcast { cache_id, max_num_elements, } = identify_message; - if known_caches.update(cache_id, max_num_elements, nats_client) { + if known_caches.update_single(cache_id, max_num_elements, nats_client) { info!( %cache_id, "New cache discovered, scheduling reinitialization" diff --git a/crates/subspace-farmer/src/cluster/cache.rs b/crates/subspace-farmer/src/cluster/cache.rs index edd624de5a..ef19fad4f2 100644 --- a/crates/subspace-farmer/src/cluster/cache.rs +++ b/crates/subspace-farmer/src/cluster/cache.rs @@ -31,14 +31,14 @@ pub type ClusterCacheIndex = u16; /// Broadcast with identification details by caches #[derive(Debug, Clone, Encode, Decode)] -pub struct ClusterCacheIdentifyBroadcast { +pub struct ClusterCacheIdentifySignalCacheBroadcast { /// Cache ID pub cache_id: PieceCacheId, /// Max number of elements in this cache pub max_num_elements: u32, } -impl GenericBroadcast for ClusterCacheIdentifyBroadcast { +impl GenericBroadcast for ClusterCacheIdentifySignalCacheBroadcast { /// `*` here stands for cache group const SUBJECT: &'static str = "subspace.cache.*.identify"; } @@ -211,9 +211,8 @@ where { let caches_details = caches .iter() - .map(|cache| { - let cache_id = *cache.id(); - + .map(|cache| (cache, *cache.id())) + .map(|(cache, cache_id)| { if primary_instance { info!(%cache_id, max_num_elements = %cache.max_num_elements(), "Created cache"); } @@ -228,7 +227,12 @@ where if primary_instance { select! { - result = identify_responder(&nats_client, &caches_details, cache_group, identification_broadcast_interval).fuse() => { + result = identify_responder( + &nats_client, + &caches_details, + cache_group, + identification_broadcast_interval + ).fuse() => { result }, result = write_piece_responder(&nats_client, &caches_details).fuse() => { @@ -306,14 +310,22 @@ where } last_identification = Instant::now(); - send_identify_broadcast(nats_client, caches_details, cache_group).await; + send_identify_broadcast( + nats_client, + caches_details, + cache_group + ).await; interval.reset(); } _ = interval.tick().fuse() => { last_identification = Instant::now(); trace!("Cache self-identification"); - send_identify_broadcast(nats_client, caches_details, cache_group).await; + send_identify_broadcast( + nats_client, + caches_details, + cache_group + ).await; } } } @@ -333,7 +345,7 @@ async fn send_identify_broadcast( .map(|cache| async move { if let Err(error) = nats_client .broadcast( - &ClusterCacheIdentifyBroadcast { + &ClusterCacheIdentifySignalCacheBroadcast { cache_id: cache.cache_id, max_num_elements: cache.cache.max_num_elements(), }, From db67397aa426cb780a69bf35060b59a995df20a0 Mon Sep 17 00:00:00 2001 From: tediou5 Date: Tue, 17 Sep 2024 02:13:02 +0800 Subject: [PATCH 7/9] opt: optimize cache identification --- .../commands/cluster/controller/caches.rs | 140 +++++++++++++--- crates/subspace-farmer/src/cluster/cache.rs | 153 +++++++++++++++--- 2 files changed, 253 insertions(+), 40 deletions(-) diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/caches.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/caches.rs index de6bcc7aab..8f5c93ca66 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/caches.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/caches.rs @@ -9,24 +9,32 @@ use crate::commands::cluster::cache::CACHE_IDENTIFICATION_BROADCAST_INTERVAL; use anyhow::anyhow; use futures::channel::oneshot; use futures::future::FusedFuture; -use futures::{select, FutureExt, StreamExt}; +use futures::{select, stream, FutureExt, StreamExt}; use parking_lot::Mutex; use std::future::{ready, Future}; use std::pin::{pin, Pin}; use std::sync::Arc; use std::time::{Duration, Instant}; use subspace_farmer::cluster::cache::{ + ClusterCacheDetailsRequest, ClusterCacheIdentifyBroadcast, ClusterCacheIdentifySignalCacheBroadcast, ClusterCacheIndex, ClusterPieceCache, + ClusterSingleCacheDetails, }; use subspace_farmer::cluster::controller::ClusterControllerCacheIdentifyBroadcast; use subspace_farmer::cluster::nats_client::NatsClient; use subspace_farmer::farm::{PieceCache, PieceCacheId}; use subspace_farmer::farmer_cache::FarmerCache; use tokio::time::MissedTickBehavior; -use tracing::{info, trace, warn}; +use tracing::{debug, info, trace, warn}; const SCHEDULE_REINITIALIZATION_DELAY: Duration = Duration::from_secs(3); +#[derive(Debug)] +struct KnownCache { + cache_id: PieceCacheId, + last_identification: Instant, +} + #[derive(Debug)] struct KnownSingleCache { single_cache_id: PieceCacheId, @@ -36,6 +44,7 @@ struct KnownSingleCache { #[derive(Debug, Default)] struct KnownCaches { + known_caches: Vec, known_single_caches: Vec, } @@ -47,6 +56,33 @@ impl KnownCaches { .collect() } + /// Return `true` if farmer cache reinitialization is required + fn update(&mut self, cache_id: PieceCacheId) -> bool { + let last_identification = Instant::now(); + if self.known_caches.iter_mut().any(|known_cache| { + if known_cache.cache_id == cache_id { + known_cache.last_identification = last_identification; + self.known_single_caches + .iter_mut() + .for_each(|known_single_cache| { + known_single_cache.last_identification = last_identification; + }); + true + } else { + false + } + }) { + return false; + } + + self.known_caches.push(KnownCache { + cache_id, + last_identification, + }); + + true + } + /// Return `true` if farmer cache reinitialization is required fn update_single( &mut self, @@ -84,6 +120,9 @@ impl KnownCaches { fn remove_expired(&mut self) -> impl Iterator + '_ { let elapsed = CACHE_IDENTIFICATION_BROADCAST_INTERVAL * 2; + self.known_caches + .retain(move |known_cache| known_cache.last_identification.elapsed() <= elapsed); + self.known_single_caches .extract_if(move |known_single_cache| { known_single_cache.last_identification.elapsed() > elapsed @@ -104,12 +143,19 @@ pub(super) async fn maintain_caches( (Box::pin(ready(())) as Pin>>).fuse(); let cache_identify_subscription = pin!(nats_client + .subscribe_to_broadcasts::(None, None) + .await + .map_err(|error| anyhow!("Failed to subscribe to cache identify broadcast: {error}"))?); + + let single_cache_identify_subscription = pin!(nats_client .subscribe_to_broadcasts::( Some(cache_group), None ) .await - .map_err(|error| anyhow!("Failed to subscribe to cache identify broadcast: {error}"))?); + .map_err(|error| anyhow!( + "Failed to subscribe to single cache identify broadcast: {error}" + ))?); // Request cache to identify themselves if let Err(error) = nats_client @@ -120,6 +166,7 @@ pub(super) async fn maintain_caches( } let mut cache_identify_subscription = cache_identify_subscription.fuse(); + let mut single_cache_identify_subscription = single_cache_identify_subscription.fuse(); let mut cache_pruning_interval = tokio::time::interval_at( (Instant::now() + CACHE_IDENTIFICATION_BROADCAST_INTERVAL * 2).into(), CACHE_IDENTIFICATION_BROADCAST_INTERVAL * 2, @@ -161,7 +208,7 @@ pub(super) async fn maintain_caches( } select! { - maybe_identify_message = cache_identify_subscription.next() => { + maybe_identify_message = single_cache_identify_subscription.next() => { let Some(identify_message) = maybe_identify_message else { return Err(anyhow!("Cache identify stream ended")); }; @@ -170,20 +217,55 @@ pub(super) async fn maintain_caches( cache_id, max_num_elements, } = identify_message; - if known_caches.update_single(cache_id, max_num_elements, nats_client) { - info!( - %cache_id, - "New cache discovered, scheduling reinitialization" - ); - scheduled_reinitialization_for.replace( - Instant::now() + SCHEDULE_REINITIALIZATION_DELAY, - ); - } else { - trace!( - %cache_id, - "Received identification for already known cache" - ); + process_cache_identify_message( + nats_client, + cache_id, + max_num_elements, + &mut known_caches, + &mut scheduled_reinitialization_for, + ) + } + maybe_identify_message = cache_identify_subscription.next() => { + let Some(identify_message) = maybe_identify_message else { + return Err(anyhow!("Cache identify stream ended")); + }; + + let ClusterCacheIdentifyBroadcast { cache_id, cache_count } = identify_message; + + if !known_caches.update(cache_id) { + // Cache already known, nothing to do + debug!(%cache_id, "Cache already known, nothing to do"); + continue + } + + let cache_ids = cache_id.derive_sub_ids(cache_count.into()); + match nats_client + .stream_request( + ClusterCacheDetailsRequest, + Some(&cache_id.to_string()), + ) + .await + { + Ok(caches_details) => { + let mut caches_details = caches_details.zip(stream::iter(cache_ids)); + while let Some((cache_details, single_cache_id)) = caches_details.next().await { + let ClusterSingleCacheDetails { max_num_elements } = cache_details; + + process_cache_identify_message( + nats_client, + single_cache_id, + max_num_elements, + &mut known_caches, + &mut scheduled_reinitialization_for, + ) + } } + Err(error) => warn!( + %error, + %cache_id, + "Failed to request farmer farm details" + ), + } } _ = cache_pruning_interval.tick().fuse() => { let mut reinit = false; @@ -191,7 +273,7 @@ pub(super) async fn maintain_caches( reinit = true; warn!( - cache_id = %removed_cache.cache_id, + cache_id = %removed_cache.single_cache_id, "Cache expired and removed, scheduling reinitialization" ); } @@ -207,4 +289,26 @@ pub(super) async fn maintain_caches( } } } + + fn process_cache_identify_message( + nats_client: &NatsClient, + cache_id: PieceCacheId, + max_num_elements: u32, + known_caches: &mut KnownCaches, + scheduled_reinitialization_for: &mut Option, + ) { + if known_caches.update_single(cache_id, max_num_elements, nats_client) { + info!( + %cache_id, + "New cache discovered, scheduling reinitialization" + ); + scheduled_reinitialization_for + .replace(Instant::now() + SCHEDULE_REINITIALIZATION_DELAY); + } else { + trace!( + %cache_id, + "Received identification for already known cache" + ); + } + } } diff --git a/crates/subspace-farmer/src/cluster/cache.rs b/crates/subspace-farmer/src/cluster/cache.rs index ef19fad4f2..3c57aae5fe 100644 --- a/crates/subspace-farmer/src/cluster/cache.rs +++ b/crates/subspace-farmer/src/cluster/cache.rs @@ -29,6 +29,36 @@ const MIN_CACHE_IDENTIFICATION_INTERVAL: Duration = Duration::from_secs(1); /// Type alias for cache index used by cluster. pub type ClusterCacheIndex = u16; +/// Request cache details from cache +#[derive(Debug, Clone, Encode, Decode)] +pub struct ClusterCacheDetailsRequest; + +impl GenericStreamRequest for ClusterCacheDetailsRequest { + const SUBJECT: &'static str = "subspace.cache.*.details"; + type Response = ClusterSingleCacheDetails; +} + +/// Cache details +#[derive(Debug, Clone, Encode, Decode)] +pub struct ClusterSingleCacheDetails { + /// Max number of elements in this cache + pub max_num_elements: u32, +} + +/// Broadcast with identification details by caches +#[derive(Debug, Clone, Encode, Decode)] +pub struct ClusterCacheIdentifyBroadcast { + /// Cache ID + pub cache_id: PieceCacheId, + /// Number of caches + pub cache_count: ClusterCacheIndex, +} + +impl GenericBroadcast for ClusterCacheIdentifyBroadcast { + /// `*` here stands for cache group + const SUBJECT: &'static str = "subspace.cache.*.cache-identify"; +} + /// Broadcast with identification details by caches #[derive(Debug, Clone, Encode, Decode)] pub struct ClusterCacheIdentifySignalCacheBroadcast { @@ -209,9 +239,13 @@ pub async fn cache_service( where C: PieceCache, { + let cache_id = PieceCacheId::new(); + let cache_id_string = cache_id.to_string(); + let cache_ids = cache_id.derive_sub_ids(caches.len()); + let caches_details = caches .iter() - .map(|cache| (cache, *cache.id())) + .zip(cache_ids) .map(|(cache, cache_id)| { if primary_instance { info!(%cache_id, max_num_elements = %cache.max_num_elements(), "Created cache"); @@ -229,12 +263,21 @@ where select! { result = identify_responder( &nats_client, + cache_id, &caches_details, cache_group, identification_broadcast_interval ).fuse() => { result }, + result = caches_details_responder( + &nats_client, + cache_id, + &cache_id_string, + &caches_details, + ).fuse() => { + result + }, result = write_piece_responder(&nats_client, &caches_details).fuse() => { result }, @@ -273,6 +316,7 @@ where /// per controller instance in order to parallelize more work across threads if needed. async fn identify_responder( nats_client: &NatsClient, + cache_id: PieceCacheId, caches_details: &[CacheDetails<'_, C>], cache_group: &str, identification_broadcast_interval: Duration, @@ -312,6 +356,7 @@ where last_identification = Instant::now(); send_identify_broadcast( nats_client, + cache_id, caches_details, cache_group ).await; @@ -323,6 +368,7 @@ where send_identify_broadcast( nats_client, + cache_id, caches_details, cache_group ).await; @@ -335,33 +381,96 @@ where async fn send_identify_broadcast( nats_client: &NatsClient, + cache_id: PieceCacheId, caches_details: &[CacheDetails<'_, C>], cache_group: &str, ) where C: PieceCache, { - caches_details - .iter() - .map(|cache| async move { - if let Err(error) = nats_client - .broadcast( - &ClusterCacheIdentifySignalCacheBroadcast { - cache_id: cache.cache_id, - max_num_elements: cache.cache.max_num_elements(), - }, - cache_group, - ) - .await - { - warn!( - cache_id = %cache.cache_id, - %error, - "Failed to send cache identify notification" - ); + if caches_details.is_empty() { + warn!("No cache, skip sending cache identify notification"); + return; + } + + if let Err(error) = nats_client + .broadcast( + &ClusterCacheIdentifyBroadcast { + cache_id, + cache_count: caches_details.len() as ClusterCacheIndex, + }, + cache_group, + ) + .await + { + warn!(%cache_id, %error, "Failed to send farmer identify notification"); + } +} + +async fn caches_details_responder( + nats_client: &NatsClient, + cache_id: PieceCacheId, + cache_id_string: &str, + caches_details: &[CacheDetails<'_, C>], +) -> anyhow::Result<()> +where + C: PieceCache, +{ + // Initialize with pending future so it never ends + let mut processing = FuturesUnordered::from_iter([ + Box::pin(pending()) as Pin + Send>> + ]); + let mut subscription = nats_client + .subscribe_to_stream_requests(Some(cache_id_string), Some(cache_id_string.to_string())) + .await + .map_err(|error| { + anyhow!( + "Failed to subscribe to farms details {}: {}", + cache_id, + error + ) + })? + .fuse(); + + loop { + select! { + maybe_message = subscription.next() => { + let Some(message) = maybe_message else { + break; + }; + + // Create background task for concurrent processing + processing.push(Box::pin(process_caches_deatils_request( + nats_client, + caches_details, + message, + ))); } - }) - .collect::>() - .collect::>() + _ = processing.next() => { + // Nothing to do here + } + } + } + + Ok(()) +} + +async fn process_caches_deatils_request( + nats_client: &NatsClient, + caches_details: &[CacheDetails<'_, C>], + request: StreamRequest, +) where + C: PieceCache, +{ + trace!(?request, "Caches details request"); + + let stream = Box::new(stream::iter(caches_details.iter().map(|cache_details| { + ClusterSingleCacheDetails { + max_num_elements: cache_details.cache.max_num_elements(), + } + }))); + + nats_client + .stream_response::(request.response_subject, stream) .await; } From 1fa5cdd7888200900209b558db0b6b155f82ea0d Mon Sep 17 00:00:00 2001 From: tedio5 Date: Wed, 18 Sep 2024 18:16:39 +0800 Subject: [PATCH 8/9] chore: add debug log when farm or cache updating last identification --- .../bin/subspace-farmer/commands/cluster/controller/caches.rs | 1 + .../src/bin/subspace-farmer/commands/cluster/controller/farms.rs | 1 + 2 files changed, 2 insertions(+) diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/caches.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/caches.rs index 8f5c93ca66..a94a2971c7 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/caches.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/caches.rs @@ -65,6 +65,7 @@ impl KnownCaches { self.known_single_caches .iter_mut() .for_each(|known_single_cache| { + debug!(single_cache_id = %known_single_cache.single_cache_id, "Updating last identification for single cache"); known_single_cache.last_identification = last_identification; }); true diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/farms.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/farms.rs index fea375d2c1..85c9b0719a 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/farms.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/farms.rs @@ -77,6 +77,7 @@ impl KnownFarms { self.known_farms.iter_mut().for_each(|(_, known_farm)| { // All farms in farmer use the same fingerprint if known_farm.fingerprint == fingerprint { + debug!(farm_id = %known_farm.farm_id, "Updating last identification for farm"); known_farm.last_identification = last_identification; } }); From fdad7e57696331f88b21c8c9d2a90b8255fb9349 Mon Sep 17 00:00:00 2001 From: tedio5 Date: Thu, 19 Sep 2024 09:11:14 +0800 Subject: [PATCH 9/9] fix typo --- crates/subspace-farmer/src/cluster/cache.rs | 4 ++-- crates/subspace-farmer/src/cluster/farmer.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/subspace-farmer/src/cluster/cache.rs b/crates/subspace-farmer/src/cluster/cache.rs index 3c57aae5fe..0a5d66326b 100644 --- a/crates/subspace-farmer/src/cluster/cache.rs +++ b/crates/subspace-farmer/src/cluster/cache.rs @@ -439,7 +439,7 @@ where }; // Create background task for concurrent processing - processing.push(Box::pin(process_caches_deatils_request( + processing.push(Box::pin(process_caches_details_request( nats_client, caches_details, message, @@ -454,7 +454,7 @@ where Ok(()) } -async fn process_caches_deatils_request( +async fn process_caches_details_request( nats_client: &NatsClient, caches_details: &[CacheDetails<'_, C>], request: StreamRequest, diff --git a/crates/subspace-farmer/src/cluster/farmer.rs b/crates/subspace-farmer/src/cluster/farmer.rs index 879d794cab..a2cfdd8b39 100644 --- a/crates/subspace-farmer/src/cluster/farmer.rs +++ b/crates/subspace-farmer/src/cluster/farmer.rs @@ -691,7 +691,7 @@ async fn farms_details_responder( }; // Create background task for concurrent processing - processing.push(Box::pin(process_farms_deatils_request( + processing.push(Box::pin(process_farms_details_request( nats_client, farms_details, message, @@ -706,7 +706,7 @@ async fn farms_details_responder( Ok(()) } -async fn process_farms_deatils_request( +async fn process_farms_details_request( nats_client: &NatsClient, farms_details: &[FarmDetails], request: StreamRequest,