diff --git a/Cargo.lock b/Cargo.lock index cbcc732ddf2148..d3625756742d54 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6011,6 +6011,7 @@ name = "solana-geyser-plugin-interface" version = "1.17.5" dependencies = [ "log", + "solana-gossip", "solana-sdk", "solana-transaction-status", "thiserror", @@ -6029,8 +6030,10 @@ dependencies = [ "log", "serde_json", "solana-accounts-db", + "solana-client", "solana-entry", "solana-geyser-plugin-interface", + "solana-gossip", "solana-ledger", "solana-measure", "solana-metrics", diff --git a/core/src/validator.rs b/core/src/validator.rs index fac592aedb8620..5e8d48e3341d63 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -644,13 +644,19 @@ impl Validator { .as_ref() .and_then(|geyser_plugin_service| geyser_plugin_service.get_block_metadata_notifier()); + let cluster_info_notifier = geyser_plugin_service + .as_ref() + .and_then(|geyser_plugin_service| geyser_plugin_service.get_cluster_info_notifier()); + info!( "Geyser plugin: accounts_update_notifier: {}, \ transaction_notifier: {}, \ - entry_notifier: {}", + entry_notifier: {} \ + cluster_info_notifier: {}", accounts_update_notifier.is_some(), transaction_notifier.is_some(), - entry_notifier.is_some() + entry_notifier.is_some(), + cluster_info_notifier.is_some(), ); let system_monitor_service = Some(SystemMonitorService::new( @@ -728,6 +734,9 @@ impl Validator { identity_keypair.clone(), socket_addr_space, ); + + //register Geyzer notifier. + cluster_info.set_clusterinfo_notifier(cluster_info_notifier); cluster_info.set_contact_debug_interval(config.contact_debug_interval); cluster_info.set_entrypoints(cluster_entrypoints); cluster_info.restore_contact_info(ledger_path, config.contact_save_interval); diff --git a/geyser-plugin-interface/Cargo.toml b/geyser-plugin-interface/Cargo.toml index af99758b47d630..3c8532f00ae917 100644 --- a/geyser-plugin-interface/Cargo.toml +++ b/geyser-plugin-interface/Cargo.toml @@ -11,6 +11,7 @@ edition = { workspace = true } [dependencies] log = { workspace = true } +solana-gossip = { workspace = true } solana-sdk = { workspace = true } solana-transaction-status = { workspace = true } thiserror = { workspace = true } diff --git a/geyser-plugin-interface/src/geyser_plugin_interface.rs b/geyser-plugin-interface/src/geyser_plugin_interface.rs index b2bbb5a4953aed..1688aab9490c9f 100644 --- a/geyser-plugin-interface/src/geyser_plugin_interface.rs +++ b/geyser-plugin-interface/src/geyser_plugin_interface.rs @@ -2,6 +2,8 @@ /// the GeyserPlugin trait to work with the runtime. /// In addition, the dynamic library must export a "C" function _create_plugin which /// creates the implementation of the plugin. +use solana_sdk::pubkey::Pubkey; +use std::net::SocketAddr; use { solana_sdk::{ clock::{Slot, UnixTimestamp}, @@ -13,6 +15,36 @@ use { thiserror::Error, }; +#[derive(Debug, Clone, PartialEq, Eq)] +/// Information about a node in the cluster. +pub struct ReplicaClusterInfoNode { + pub id: Pubkey, + /// gossip address + pub gossip: Option, + /// address to connect to for replication + pub tvu: Option, + /// TVU over QUIC protocol. + pub tvu_quic: Option, + /// repair service over QUIC protocol. + pub serve_repair_quic: Option, + /// transactions address + pub tpu: Option, + /// address to forward unprocessed transactions to + pub tpu_forwards: Option, + /// address to which to send bank state requests + pub tpu_vote: Option, + /// address to which to send JSON-RPC requests + pub rpc: Option, + /// websocket for JSON-RPC push notifications + pub rpc_pubsub: Option, + /// address to send repair requests to + pub serve_repair: Option, + /// latest wallclock picked + pub wallclock: u64, + /// node shred version + pub shred_version: u16, +} + #[derive(Debug, Clone, PartialEq, Eq)] /// Information about an account being updated pub struct ReplicaAccountInfo<'a> { @@ -317,6 +349,18 @@ pub trait GeyserPlugin: Any + Send + Sync + std::fmt::Debug { Ok(()) } + /// Called when a cluster info is updated on gossip network. + #[allow(unused_variables)] + fn update_cluster_info(&self, cluster_info: &ReplicaClusterInfoNode) -> Result<()> { + Ok(()) + } + + /// Called when a cluster info is removed on gossip network. + #[allow(unused_variables)] + fn notify_clusterinfo_remove(&self, pubkey: &Pubkey) -> Result<()> { + Ok(()) + } + /// Called when all accounts are notified of during startup. fn notify_end_of_startup(&self) -> Result<()> { Ok(()) @@ -375,4 +419,11 @@ pub trait GeyserPlugin: Any + Send + Sync + std::fmt::Debug { fn entry_notifications_enabled(&self) -> bool { false } + + /// Check if the plugin is interested in cluster info data + /// Default is false -- if the plugin is interested in + /// cluster info data, return true. + fn clusterinfo_notifications_enabled(&self) -> bool { + false + } } diff --git a/geyser-plugin-manager/Cargo.toml b/geyser-plugin-manager/Cargo.toml index 9b4468eddaea9b..2c029a2095e762 100644 --- a/geyser-plugin-manager/Cargo.toml +++ b/geyser-plugin-manager/Cargo.toml @@ -20,8 +20,10 @@ libloading = { workspace = true } log = { workspace = true } serde_json = { workspace = true } solana-accounts-db = { workspace = true } +solana-client = { workspace = true } solana-entry = { workspace = true } solana-geyser-plugin-interface = { workspace = true } +solana-gossip = { workspace = true } solana-ledger = { workspace = true } solana-measure = { workspace = true } solana-metrics = { workspace = true } diff --git a/geyser-plugin-manager/src/cluster_info_notifier.rs b/geyser-plugin-manager/src/cluster_info_notifier.rs new file mode 100644 index 00000000000000..c200c70c07ed0d --- /dev/null +++ b/geyser-plugin-manager/src/cluster_info_notifier.rs @@ -0,0 +1,153 @@ +/// Module responsible for notifying plugins of transactions +use solana_gossip::legacy_contact_info::LegacyContactInfo; +use solana_sdk::pubkey::Pubkey; +use { + crate::geyser_plugin_manager::GeyserPluginManager, + log::*, + solana_client::connection_cache::Protocol, + solana_geyser_plugin_interface::geyser_plugin_interface::ReplicaClusterInfoNode, + solana_gossip::cluster_info_notifier_interface::ClusterInfoNotifierInterface, + solana_measure::measure::Measure, + solana_metrics::*, + solana_rpc::transaction_notifier_interface::TransactionNotifier, + solana_sdk::{clock::Slot, signature::Signature, transaction::SanitizedTransaction}, + solana_transaction_status::TransactionStatusMeta, + std::sync::{Arc, RwLock}, +}; + +/// This implementation of ClusterInfoNotifierImpl is passed to the rpc's TransactionStatusService +/// at the validator startup. TransactionStatusService invokes the notify_transaction method +/// for new transactions. The implementation in turn invokes the notify_transaction of each +/// plugin enabled with transaction notification managed by the GeyserPluginManager. +#[derive(Debug)] +pub(crate) struct ClusterInfoNotifierImpl { + plugin_manager: Arc>, +} + +impl ClusterInfoNotifierImpl { + pub fn new(plugin_manager: Arc>) -> Self { + ClusterInfoNotifierImpl { plugin_manager } + } + + fn clusterinfo_from_legacy_contact_info( + legacy_info: &LegacyContactInfo, + ) -> ReplicaClusterInfoNode { + ReplicaClusterInfoNode { + id: *legacy_info.pubkey(), + /// gossip address + gossip: legacy_info.gossip().ok(), + /// address to connect to for replication + tvu: legacy_info.tvu(Protocol::UDP).ok(), + /// TVU over QUIC protocol. + tvu_quic: legacy_info.tvu(Protocol::QUIC).ok(), + /// repair service over QUIC protocol. + serve_repair_quic: legacy_info.serve_repair(Protocol::QUIC).ok(), + /// transactions address + tpu: legacy_info.tpu(Protocol::UDP).ok(), + /// address to forward unprocessed transactions to + tpu_forwards: legacy_info.tpu_forwards(Protocol::UDP).ok(), + /// address to which to send bank state requests + tpu_vote: legacy_info.tpu_vote().ok(), + /// address to which to send JSON-RPC requests + rpc: legacy_info.rpc().ok(), + /// websocket for JSON-RPC push notifications + rpc_pubsub: legacy_info.rpc_pubsub().ok(), + /// address to send repair requests to + serve_repair: legacy_info.serve_repair(Protocol::UDP).ok(), + /// latest wallclock picked + wallclock: legacy_info.wallclock(), + /// node shred version + shred_version: legacy_info.shred_version(), + } + } +} + +impl ClusterInfoNotifierInterface for ClusterInfoNotifierImpl { + fn notify_clusterinfo_update(&self, contact_info: &LegacyContactInfo) { + let cluster_info = + ClusterInfoNotifierImpl::clusterinfo_from_legacy_contact_info(contact_info); + let mut measure2 = Measure::start("geyser-plugin-notify_plugins_of_cluster_info_update"); + let plugin_manager = self.plugin_manager.read().unwrap(); + + if plugin_manager.plugins.is_empty() { + return; + } + for plugin in plugin_manager.plugins.iter() { + let mut measure = Measure::start("geyser-plugin-update-cluster_info"); + match plugin.update_cluster_info(&cluster_info) { + Err(err) => { + error!( + "Failed to update cluster_info {}, error: {} to plugin {}", + bs58::encode(cluster_info.id).into_string(), + err, + plugin.name() + ) + } + Ok(_) => { + trace!( + "Successfully updated cluster_info {} to plugin {}", + bs58::encode(cluster_info.id).into_string(), + plugin.name() + ); + } + } + measure.stop(); + inc_new_counter_debug!( + "geyser-plugin-update-cluster_info-us", + measure.as_us() as usize, + 100000, + 100000 + ); + } + measure2.stop(); + inc_new_counter_debug!( + "geyser-plugin-notify_plugins_of_cluster_info_update-us", + measure2.as_us() as usize, + 100000, + 100000 + ); + } + + fn notify_clusterinfo_remove(&self, pubkey: &Pubkey) { + let mut measure2 = Measure::start("geyser-plugin-notify_plugins_of_cluster_info_update"); + let plugin_manager = self.plugin_manager.read().unwrap(); + + if plugin_manager.plugins.is_empty() { + return; + } + for plugin in plugin_manager.plugins.iter() { + let mut measure = Measure::start("geyser-plugin-remove-cluster_info"); + match plugin.notify_clusterinfo_remove(pubkey) { + Err(err) => { + error!( + "Failed to remove cluster_info {}, error: {} to plugin {}", + bs58::encode(pubkey).into_string(), + err, + plugin.name() + ) + } + Ok(_) => { + trace!( + "Successfully remove cluster_info {} to plugin {}", + bs58::encode(pubkey).into_string(), + plugin.name() + ); + } + } + measure.stop(); + inc_new_counter_debug!( + "geyser-plugin-remove-cluster_info-us", + measure.as_us() as usize, + 100000, + 100000 + ); + } + measure2.stop(); + inc_new_counter_debug!( + "geyser-plugin-notify_plugins_of_cluster_info_remove-us", + measure2.as_us() as usize, + 100000, + 100000 + ); + } +} diff --git a/geyser-plugin-manager/src/geyser_plugin_manager.rs b/geyser-plugin-manager/src/geyser_plugin_manager.rs index 0698cf1a656363..22e6e2624d9550 100644 --- a/geyser-plugin-manager/src/geyser_plugin_manager.rs +++ b/geyser-plugin-manager/src/geyser_plugin_manager.rs @@ -64,6 +64,15 @@ impl GeyserPluginManager { false } + /// Check if the plugin is interested in cluster info data + pub fn clusterinfo_notifications_enabled(&self) -> bool { + for plugin in &self.plugins { + if plugin.entry_notifications_enabled() { + return true; + } + } + false + } /// Admin RPC request handler pub(crate) fn list_plugins(&self) -> JsonRpcResult> { Ok(self.plugins.iter().map(|p| p.name().to_owned()).collect()) diff --git a/geyser-plugin-manager/src/geyser_plugin_service.rs b/geyser-plugin-manager/src/geyser_plugin_service.rs index b8f9db49102dc7..51fde8ab322528 100644 --- a/geyser-plugin-manager/src/geyser_plugin_service.rs +++ b/geyser-plugin-manager/src/geyser_plugin_service.rs @@ -1,3 +1,5 @@ +use crate::cluster_info_notifier::ClusterInfoNotifierImpl; +use solana_gossip::cluster_info_notifier_interface::ClusterInfoUpdateNotifierLock; use { crate::{ accounts_update_notifier::AccountsUpdateNotifierImpl, @@ -37,6 +39,7 @@ pub struct GeyserPluginService { transaction_notifier: Option, entry_notifier: Option, block_metadata_notifier: Option, + cluster_info_notifier: Option, } impl GeyserPluginService { @@ -81,8 +84,17 @@ impl GeyserPluginService { plugin_manager.account_data_notifications_enabled(); let transaction_notifications_enabled = plugin_manager.transaction_notifications_enabled(); let entry_notifications_enabled = plugin_manager.entry_notifications_enabled(); + let cluster_info_notifications_enabled = plugin_manager.clusterinfo_notifications_enabled(); let plugin_manager = Arc::new(RwLock::new(plugin_manager)); + let cluster_info_notifier: Option = + if cluster_info_notifications_enabled { + let cluster_info_notifier = ClusterInfoNotifierImpl::new(plugin_manager.clone()); + Some(Arc::new(RwLock::new(cluster_info_notifier))) + } else { + None + }; + let accounts_update_notifier: Option = if account_data_notifications_enabled { let accounts_update_notifier = @@ -143,6 +155,7 @@ impl GeyserPluginService { transaction_notifier, entry_notifier, block_metadata_notifier, + cluster_info_notifier, }) } @@ -172,6 +185,10 @@ impl GeyserPluginService { self.block_metadata_notifier.clone() } + pub fn get_cluster_info_notifier(&self) -> Option { + self.cluster_info_notifier.clone() + } + pub fn join(self) -> thread::Result<()> { if let Some(mut slot_status_observer) = self.slot_status_observer { slot_status_observer.join()?; diff --git a/geyser-plugin-manager/src/lib.rs b/geyser-plugin-manager/src/lib.rs index 2dd1a8f5d2616b..f120b9a7b19bdd 100644 --- a/geyser-plugin-manager/src/lib.rs +++ b/geyser-plugin-manager/src/lib.rs @@ -1,6 +1,7 @@ pub mod accounts_update_notifier; pub mod block_metadata_notifier; pub mod block_metadata_notifier_interface; +pub mod cluster_info_notifier; pub mod entry_notifier; pub mod geyser_plugin_manager; pub mod geyser_plugin_service; diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 07fbbf93334e12..f74c67eb687621 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -18,6 +18,7 @@ note = "Please use `solana_net_utils::{MINIMUM_VALIDATOR_PORT_RANGE_WIDTH, VALIDATOR_PORT_RANGE}` instead" )] #[allow(deprecated)] +use crate::cluster_info_notifier_interface::ClusterInfoUpdateNotifierLock; pub use solana_net_utils::{MINIMUM_VALIDATOR_PORT_RANGE_WIDTH, VALIDATOR_PORT_RANGE}; use { crate::{ @@ -439,6 +440,13 @@ impl ClusterInfo { me } + pub fn set_clusterinfo_notifier( + &self, + cluster_info_notifier: Option, + ) { + self.gossip.set_clusterinfo_notifier(cluster_info_notifier); + } + pub fn set_contact_debug_interval(&mut self, new: u64) { self.contact_debug_interval = new; } diff --git a/gossip/src/cluster_info_notifier_interface.rs b/gossip/src/cluster_info_notifier_interface.rs new file mode 100644 index 00000000000000..94a4dc7f1ee633 --- /dev/null +++ b/gossip/src/cluster_info_notifier_interface.rs @@ -0,0 +1,16 @@ +use crate::legacy_contact_info::LegacyContactInfo; +use { + solana_sdk::pubkey::Pubkey, + std::sync::{Arc, RwLock}, +}; + +pub trait ClusterInfoNotifierInterface: std::fmt::Debug { + /// Notified when an cluster node is updated (added or changed). + fn notify_clusterinfo_update(&self, cluster_info: &LegacyContactInfo); + + /// Notified when a node is removed from the cluster + fn notify_clusterinfo_remove(&self, pubkey: &Pubkey); +} + +pub type ClusterInfoUpdateNotifierLock = + Arc>; diff --git a/gossip/src/crds.rs b/gossip/src/crds.rs index d8ab6e45b3d593..99e8e181d1285e 100644 --- a/gossip/src/crds.rs +++ b/gossip/src/crds.rs @@ -26,6 +26,7 @@ use { crate::{ + cluster_info_notifier_interface::ClusterInfoUpdateNotifierLock, crds_entry::CrdsEntry, crds_gossip_pull::CrdsTimeouts, crds_shards::CrdsShards, @@ -54,6 +55,8 @@ use { }, }; +mod geyser_plugin_utils; + const CRDS_SHARDS_BITS: u32 = 12; // Number of vote slots to track in an lru-cache for metrics. const VOTE_SLOTS_METRICS_CAP: usize = 100; @@ -86,6 +89,8 @@ pub struct Crds { // Mapping from nodes' pubkeys to their respective shred-version. shred_versions: HashMap, stats: Mutex, + /// GeyserPlugin cluster info update notifier + clusterinfo_update_notifier: Option, } #[derive(PartialEq, Eq, Debug)] @@ -174,6 +179,7 @@ impl Default for Crds { purged: VecDeque::default(), shred_versions: HashMap::default(), stats: Mutex::::default(), + clusterinfo_update_notifier: None, } } } @@ -204,6 +210,13 @@ fn overrides(value: &CrdsValue, other: &VersionedCrdsValue) -> bool { } impl Crds { + pub fn set_clusterinfo_notifier( + &mut self, + cluster_info_notifier: Option, + ) { + self.clusterinfo_update_notifier = cluster_info_notifier; + } + /// Returns true if the given value updates an existing one in the table. /// The value is outdated and fails to insert, if it already exists in the /// table with a more recent wallclock. @@ -221,9 +234,10 @@ impl Crds { route: GossipRoute, ) -> Result<(), CrdsError> { let label = value.label(); + let gossip_label = label.clone(); let pubkey = value.pubkey(); let value = VersionedCrdsValue::new(value, self.cursor, now); - match self.table.entry(label) { + let ret = match self.table.entry(label) { Entry::Vacant(entry) => { self.stats.lock().unwrap().record_insert(&value, route); let entry_index = entry.index(); @@ -309,7 +323,10 @@ impl Crds { Err(CrdsError::InsertFailed) } } - } + }; + //notify geyzer interface + self.notify_clusterinfo_update(self.table.get(&gossip_label)); + ret } pub fn get<'a, 'b, V>(&'a self, key: V::Key) -> Option @@ -533,6 +550,8 @@ impl Crds { self.shards.remove(index, &value); match value.value.data { CrdsData::LegacyContactInfo(_) => { + //notify geyzer interface + self.notify_clusterinfo_remove(&value.value.pubkey()); self.nodes.swap_remove(&index); } CrdsData::Vote(_, _) => { diff --git a/gossip/src/crds/geyser_plugin_utils.rs b/gossip/src/crds/geyser_plugin_utils.rs new file mode 100644 index 00000000000000..4aff2e6cc5f1ec --- /dev/null +++ b/gossip/src/crds/geyser_plugin_utils.rs @@ -0,0 +1,27 @@ +use crate::crds::Crds; +use crate::crds::Pubkey; +use crate::crds::VersionedCrdsValue; +use crate::crds_value::CrdsData; + +impl Crds { + /// Notified when an account is updated at runtime, due to transaction activities + pub fn notify_clusterinfo_update(&self, crd_value: Option<&VersionedCrdsValue>) { + if let Some(clusterinfo_update_notifier) = &self.clusterinfo_update_notifier { + if let Some(value) = crd_value { + if let CrdsData::LegacyContactInfo(ref cluster_info) = value.value.data { + let notifier = &clusterinfo_update_notifier.read().unwrap(); + notifier.notify_clusterinfo_update(cluster_info); + } + } + } + } + + /// Notified when the AccountsDb is initialized at start when restored + /// from a snapshot. + pub fn notify_clusterinfo_remove(&self, pubkey: &Pubkey) { + if let Some(clusterinfo_update_notifier) = &self.clusterinfo_update_notifier { + let notifier = &clusterinfo_update_notifier.read().unwrap(); + notifier.notify_clusterinfo_remove(pubkey); + } + } +} diff --git a/gossip/src/crds_gossip.rs b/gossip/src/crds_gossip.rs index 015deed1d2a472..8ba8c42ec2b180 100644 --- a/gossip/src/crds_gossip.rs +++ b/gossip/src/crds_gossip.rs @@ -4,6 +4,7 @@ //! designed to run with a simulator or over a UDP network connection with messages up to a //! packet::PACKET_DATA_SIZE size. +use crate::cluster_info_notifier_interface::ClusterInfoUpdateNotifierLock; use { crate::{ cluster_info::Ping, @@ -45,6 +46,14 @@ pub struct CrdsGossip { } impl CrdsGossip { + pub fn set_clusterinfo_notifier( + &self, + cluster_info_notifier: Option, + ) { + let mut crds = self.crds.write().unwrap(); + crds.set_clusterinfo_notifier(cluster_info_notifier); + } + /// Process a push message to the network. /// /// Returns unique origins' pubkeys of upserted values. diff --git a/gossip/src/lib.rs b/gossip/src/lib.rs index 11b609f3a37f52..4ca08e1ea6f8b4 100644 --- a/gossip/src/lib.rs +++ b/gossip/src/lib.rs @@ -3,6 +3,7 @@ pub mod cluster_info; pub mod cluster_info_metrics; +pub mod cluster_info_notifier_interface; pub mod contact_info; pub mod crds; pub mod crds_entry;