Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
add cluster notifier registration and test
Browse files Browse the repository at this point in the history
  • Loading branch information
musitdev committed Dec 13, 2023
1 parent 33b5edc commit f1a0831
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 2 deletions.
11 changes: 9 additions & 2 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,13 +644,19 @@ impl Validator {
.as_ref()
.and_then(|geyser_plugin_service| geyser_plugin_service.get_block_metadata_notifier());

let cluster_info_notifier = geyser_plugin_service
.as_ref()
.and_then(|geyser_plugin_service| geyser_plugin_service.get_cluster_info_notifier());

info!(
"Geyser plugin: accounts_update_notifier: {}, \
transaction_notifier: {}, \
entry_notifier: {}",
entry_notifier: {} \
cluster_info_notifier: {}",
accounts_update_notifier.is_some(),
transaction_notifier.is_some(),
entry_notifier.is_some()
entry_notifier.is_some(),
cluster_info_notifier.is_some(),
);

let system_monitor_service = Some(SystemMonitorService::new(
Expand Down Expand Up @@ -728,6 +734,7 @@ impl Validator {
identity_keypair.clone(),
socket_addr_space,
);
cluster_info.set_clusterinfo_notifier(cluster_info_notifier);
cluster_info.set_contact_debug_interval(config.contact_debug_interval);
cluster_info.set_entrypoints(cluster_entrypoints);
cluster_info.restore_contact_info(ledger_path, config.contact_save_interval);
Expand Down
7 changes: 7 additions & 0 deletions geyser-plugin-interface/src/geyser_plugin_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,4 +413,11 @@ pub trait GeyserPlugin: Any + Send + Sync + std::fmt::Debug {
fn entry_notifications_enabled(&self) -> bool {
false
}

/// Check if the plugin is interested in cluster info data
/// Default is false -- if the plugin is interested in
/// cluster info data, return true.
fn clusterinfo_notifications_enabled(&self) -> bool {
false
}
}
4 changes: 4 additions & 0 deletions geyser-plugin-manager/src/cluster_info_notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ pub(crate) struct ClusterInfoNotifierImpl {
}

impl ClusterInfoNotifierImpl {
pub fn new(plugin_manager: Arc<RwLock<GeyserPluginManager>>) -> Self {
ClusterInfoNotifierImpl { plugin_manager }
}

fn clusterinfo_from_legacy_contact_info(
legacy_info: &LegacyContactInfo,
) -> ReplicaClusterInfoNode {
Expand Down
9 changes: 9 additions & 0 deletions geyser-plugin-manager/src/geyser_plugin_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,15 @@ impl GeyserPluginManager {
false
}

/// Check if the plugin is interested in cluster info data
pub fn clusterinfo_notifications_enabled(&self) -> bool {
for plugin in &self.plugins {
if plugin.entry_notifications_enabled() {
return true;
}
}
false
}
/// Admin RPC request handler
pub(crate) fn list_plugins(&self) -> JsonRpcResult<Vec<String>> {
Ok(self.plugins.iter().map(|p| p.name().to_owned()).collect())
Expand Down
17 changes: 17 additions & 0 deletions geyser-plugin-manager/src/geyser_plugin_service.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::cluster_info_notifier::ClusterInfoNotifierImpl;
use solana_gossip::cluster_info_notifier_interface::ClusterInfoUpdateNotifierLock;
use {
crate::{
accounts_update_notifier::AccountsUpdateNotifierImpl,
Expand Down Expand Up @@ -37,6 +39,7 @@ pub struct GeyserPluginService {
transaction_notifier: Option<TransactionNotifierLock>,
entry_notifier: Option<EntryNotifierLock>,
block_metadata_notifier: Option<BlockMetadataNotifierLock>,
cluster_info_notifier: Option<ClusterInfoUpdateNotifierLock>,
}

impl GeyserPluginService {
Expand Down Expand Up @@ -81,8 +84,17 @@ impl GeyserPluginService {
plugin_manager.account_data_notifications_enabled();
let transaction_notifications_enabled = plugin_manager.transaction_notifications_enabled();
let entry_notifications_enabled = plugin_manager.entry_notifications_enabled();
let cluster_info_notifications_enabled = plugin_manager.clusterinfo_notifications_enabled();
let plugin_manager = Arc::new(RwLock::new(plugin_manager));

let cluster_info_notifier: Option<ClusterInfoUpdateNotifierLock> =
if cluster_info_notifications_enabled {
let cluster_info_notifier = ClusterInfoNotifierImpl::new(plugin_manager.clone());
Some(Arc::new(RwLock::new(cluster_info_notifier)))
} else {
None
};

let accounts_update_notifier: Option<AccountsUpdateNotifier> =
if account_data_notifications_enabled {
let accounts_update_notifier =
Expand Down Expand Up @@ -143,6 +155,7 @@ impl GeyserPluginService {
transaction_notifier,
entry_notifier,
block_metadata_notifier,
cluster_info_notifier,
})
}

Expand Down Expand Up @@ -172,6 +185,10 @@ impl GeyserPluginService {
self.block_metadata_notifier.clone()
}

pub fn get_cluster_info_notifier(&self) -> Option<ClusterInfoUpdateNotifierLock> {
self.cluster_info_notifier.clone()
}

pub fn join(self) -> thread::Result<()> {
if let Some(mut slot_status_observer) = self.slot_status_observer {
slot_status_observer.join()?;
Expand Down
8 changes: 8 additions & 0 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
note = "Please use `solana_net_utils::{MINIMUM_VALIDATOR_PORT_RANGE_WIDTH, VALIDATOR_PORT_RANGE}` instead"
)]
#[allow(deprecated)]
use crate::cluster_info_notifier_interface::ClusterInfoUpdateNotifierLock;
pub use solana_net_utils::{MINIMUM_VALIDATOR_PORT_RANGE_WIDTH, VALIDATOR_PORT_RANGE};
use {
crate::{
Expand Down Expand Up @@ -439,6 +440,13 @@ impl ClusterInfo {
me
}

pub fn set_clusterinfo_notifier(
&self,
cluster_info_notifier: Option<ClusterInfoUpdateNotifierLock>,
) {
self.gossip.set_clusterinfo_notifier(cluster_info_notifier);
}

pub fn set_contact_debug_interval(&mut self, new: u64) {
self.contact_debug_interval = new;
}
Expand Down
7 changes: 7 additions & 0 deletions gossip/src/crds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,13 @@ fn overrides(value: &CrdsValue, other: &VersionedCrdsValue) -> bool {
}

impl Crds {
pub fn set_clusterinfo_notifier(
&mut self,
cluster_info_notifier: Option<ClusterInfoUpdateNotifierLock>,
) {
self.clusterinfo_update_notifier = cluster_info_notifier;
}

/// Returns true if the given value updates an existing one in the table.
/// The value is outdated and fails to insert, if it already exists in the
/// table with a more recent wallclock.
Expand Down
9 changes: 9 additions & 0 deletions gossip/src/crds_gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
//! designed to run with a simulator or over a UDP network connection with messages up to a
//! packet::PACKET_DATA_SIZE size.
use crate::cluster_info_notifier_interface::ClusterInfoUpdateNotifierLock;
use {
crate::{
cluster_info::Ping,
Expand Down Expand Up @@ -45,6 +46,14 @@ pub struct CrdsGossip {
}

impl CrdsGossip {
pub fn set_clusterinfo_notifier(
&self,
cluster_info_notifier: Option<ClusterInfoUpdateNotifierLock>,
) {
let mut crds = self.crds.write().unwrap();
crds.set_clusterinfo_notifier(cluster_info_notifier);
}

/// Process a push message to the network.
///
/// Returns unique origins' pubkeys of upserted values.
Expand Down

0 comments on commit f1a0831

Please sign in to comment.