Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

opt: optimize cluster identification #3032

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ mod caches;
mod farms;

use crate::commands::cluster::controller::caches::maintain_caches;
use crate::commands::cluster::controller::farms::{maintain_farms, FarmIndex};
use crate::commands::cluster::controller::farms::maintain_farms;
use crate::commands::shared::derive_libp2p_keypair;
use crate::commands::shared::network::{configure_network, NetworkArgs};
use anyhow::anyhow;
Expand All @@ -20,6 +20,7 @@ use std::sync::Arc;
use std::time::Duration;
use subspace_core_primitives::crypto::kzg::{embedded_kzg_settings, Kzg};
use subspace_farmer::cluster::controller::controller_service;
use subspace_farmer::cluster::farmer::FarmIndex;
use subspace_farmer::cluster::nats_client::NatsClient;
use subspace_farmer::farm::plotted_pieces::PlottedPieces;
use subspace_farmer::farmer_cache::FarmerCache;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,54 +9,65 @@ use crate::commands::cluster::cache::CACHE_IDENTIFICATION_BROADCAST_INTERVAL;
use anyhow::anyhow;
use futures::channel::oneshot;
use futures::future::FusedFuture;
use futures::{select, FutureExt, StreamExt};
use futures::{select, stream, FutureExt, StreamExt};
use parking_lot::Mutex;
use std::future::{ready, Future};
use std::pin::{pin, Pin};
use std::sync::Arc;
use std::time::{Duration, Instant};
use subspace_farmer::cluster::cache::{
ClusterCacheIdentifyBroadcast, ClusterCacheIndex, ClusterPieceCache,
ClusterCacheDetailsRequest, ClusterCacheIdentifyBroadcast,
ClusterCacheIdentifySignalCacheBroadcast, ClusterCacheIndex, ClusterPieceCache,
ClusterSingleCacheDetails,
};
use subspace_farmer::cluster::controller::ClusterControllerCacheIdentifyBroadcast;
use subspace_farmer::cluster::nats_client::NatsClient;
use subspace_farmer::farm::{PieceCache, PieceCacheId};
use subspace_farmer::farmer_cache::FarmerCache;
use tokio::time::MissedTickBehavior;
use tracing::{info, trace, warn};
use tracing::{debug, info, trace, warn};

const SCHEDULE_REINITIALIZATION_DELAY: Duration = Duration::from_secs(3);

#[derive(Debug)]
struct KnownCache {
cache_id: PieceCacheId,
last_identification: Instant,
}

#[derive(Debug)]
struct KnownSingleCache {
single_cache_id: PieceCacheId,
last_identification: Instant,
piece_cache: Arc<ClusterPieceCache>,
}

#[derive(Debug, Default)]
struct KnownCaches {
known_caches: Vec<KnownCache>,
known_single_caches: Vec<KnownSingleCache>,
}

impl KnownCaches {
fn get_all(&self) -> Vec<Arc<dyn PieceCache>> {
self.known_caches
self.known_single_caches
.iter()
.map(|known_cache| Arc::clone(&known_cache.piece_cache) as Arc<_>)
.collect()
}

/// Return `true` if farmer cache reinitialization is required
fn update(
&mut self,
cache_id: PieceCacheId,
max_num_elements: u32,
nats_client: &NatsClient,
) -> bool {
fn update(&mut self, cache_id: PieceCacheId) -> bool {
let last_identification = Instant::now();
if self.known_caches.iter_mut().any(|known_cache| {
if known_cache.cache_id == cache_id {
known_cache.last_identification = Instant::now();
known_cache.last_identification = last_identification;
self.known_single_caches
.iter_mut()
.for_each(|known_single_cache| {
debug!(single_cache_id = %known_single_cache.single_cache_id, "Updating last identification for single cache");
known_single_cache.last_identification = last_identification;
});
true
} else {
false
Expand All @@ -65,23 +76,58 @@ impl KnownCaches {
return false;
}

let piece_cache = Arc::new(ClusterPieceCache::new(
self.known_caches.push(KnownCache {
cache_id,
last_identification,
});

true
}

/// Return `true` if farmer cache reinitialization is required
fn update_single(
&mut self,
single_cache_id: PieceCacheId,
max_num_elements: u32,
nats_client: &NatsClient,
) -> bool {
if self
.known_single_caches
.iter_mut()
.any(|known_single_cache| {
if known_single_cache.single_cache_id == single_cache_id {
known_single_cache.last_identification = Instant::now();
true
} else {
false
}
})
{
return false;
}

let piece_cache = Arc::new(ClusterPieceCache::new(
single_cache_id,
max_num_elements,
nats_client.clone(),
));
self.known_caches.push(KnownCache {
cache_id,
self.known_single_caches.push(KnownSingleCache {
single_cache_id,
last_identification: Instant::now(),
piece_cache,
});
true
}

fn remove_expired(&mut self) -> impl Iterator<Item = KnownCache> + '_ {
self.known_caches.extract_if(|known_cache| {
known_cache.last_identification.elapsed() > CACHE_IDENTIFICATION_BROADCAST_INTERVAL * 2
})
fn remove_expired(&mut self) -> impl Iterator<Item = KnownSingleCache> + '_ {
let elapsed = CACHE_IDENTIFICATION_BROADCAST_INTERVAL * 2;
self.known_caches
.retain(move |known_cache| known_cache.last_identification.elapsed() <= elapsed);

self.known_single_caches
.extract_if(move |known_single_cache| {
known_single_cache.last_identification.elapsed() > elapsed
})
}
}

Expand All @@ -98,10 +144,20 @@ pub(super) async fn maintain_caches(
(Box::pin(ready(())) as Pin<Box<dyn Future<Output = ()>>>).fuse();

let cache_identify_subscription = pin!(nats_client
.subscribe_to_broadcasts::<ClusterCacheIdentifyBroadcast>(Some(cache_group), None)
.subscribe_to_broadcasts::<ClusterCacheIdentifyBroadcast>(None, None)
.await
.map_err(|error| anyhow!("Failed to subscribe to cache identify broadcast: {error}"))?);

let single_cache_identify_subscription = pin!(nats_client
.subscribe_to_broadcasts::<ClusterCacheIdentifySignalCacheBroadcast>(
Some(cache_group),
None
)
.await
.map_err(|error| anyhow!(
"Failed to subscribe to single cache identify broadcast: {error}"
))?);

// Request cache to identify themselves
if let Err(error) = nats_client
.broadcast(&ClusterControllerCacheIdentifyBroadcast, cache_group)
Expand All @@ -111,6 +167,7 @@ pub(super) async fn maintain_caches(
}

let mut cache_identify_subscription = cache_identify_subscription.fuse();
let mut single_cache_identify_subscription = single_cache_identify_subscription.fuse();
let mut cache_pruning_interval = tokio::time::interval_at(
(Instant::now() + CACHE_IDENTIFICATION_BROADCAST_INTERVAL * 2).into(),
CACHE_IDENTIFICATION_BROADCAST_INTERVAL * 2,
Expand Down Expand Up @@ -152,37 +209,72 @@ pub(super) async fn maintain_caches(
}

select! {
maybe_identify_message = cache_identify_subscription.next() => {
maybe_identify_message = single_cache_identify_subscription.next() => {
let Some(identify_message) = maybe_identify_message else {
return Err(anyhow!("Cache identify stream ended"));
};

let ClusterCacheIdentifyBroadcast {
let ClusterCacheIdentifySignalCacheBroadcast {
cache_id,
max_num_elements,
} = identify_message;
if known_caches.update(cache_id, max_num_elements, nats_client) {
info!(
%cache_id,
"New cache discovered, scheduling reinitialization"
);
scheduled_reinitialization_for.replace(
Instant::now() + SCHEDULE_REINITIALIZATION_DELAY,
);
} else {
trace!(
%cache_id,
"Received identification for already known cache"
);
process_cache_identify_message(
nats_client,
cache_id,
max_num_elements,
&mut known_caches,
&mut scheduled_reinitialization_for,
)
}
maybe_identify_message = cache_identify_subscription.next() => {
let Some(identify_message) = maybe_identify_message else {
return Err(anyhow!("Cache identify stream ended"));
};

let ClusterCacheIdentifyBroadcast { cache_id, cache_count } = identify_message;

if !known_caches.update(cache_id) {
// Cache already known, nothing to do
debug!(%cache_id, "Cache already known, nothing to do");
continue
}

let cache_ids = cache_id.derive_sub_ids(cache_count.into());
match nats_client
.stream_request(
ClusterCacheDetailsRequest,
Some(&cache_id.to_string()),
)
.await
{
Ok(caches_details) => {
let mut caches_details = caches_details.zip(stream::iter(cache_ids));
while let Some((cache_details, single_cache_id)) = caches_details.next().await {
let ClusterSingleCacheDetails { max_num_elements } = cache_details;

process_cache_identify_message(
nats_client,
single_cache_id,
max_num_elements,
&mut known_caches,
&mut scheduled_reinitialization_for,
)
}
}
Err(error) => warn!(
%error,
%cache_id,
"Failed to request farmer farm details"
),
}
}
_ = cache_pruning_interval.tick().fuse() => {
let mut reinit = false;
for removed_cache in known_caches.remove_expired() {
reinit = true;

warn!(
cache_id = %removed_cache.cache_id,
cache_id = %removed_cache.single_cache_id,
"Cache expired and removed, scheduling reinitialization"
);
}
Expand All @@ -198,4 +290,26 @@ pub(super) async fn maintain_caches(
}
}
}

fn process_cache_identify_message(
nats_client: &NatsClient,
cache_id: PieceCacheId,
max_num_elements: u32,
known_caches: &mut KnownCaches,
scheduled_reinitialization_for: &mut Option<Instant>,
) {
if known_caches.update_single(cache_id, max_num_elements, nats_client) {
info!(
%cache_id,
"New cache discovered, scheduling reinitialization"
);
scheduled_reinitialization_for
.replace(Instant::now() + SCHEDULE_REINITIALIZATION_DELAY);
} else {
trace!(
%cache_id,
"Received identification for already known cache"
);
}
}
}
Loading
Loading