From f604a698a81f06f3c43f1667051ded0110872689 Mon Sep 17 00:00:00 2001 From: D-Stacks <78099568+D-Stacks@users.noreply.github.com> Date: Thu, 4 Jan 2024 23:04:40 +0100 Subject: [PATCH] check_subscription_before_vccn (#375) * check_subscription_before_vccn * simplfy to root notifer only * simplify one last time. no need to be in notify trait at all. --- .../pipeline/virtual_processor/processor.rs | 24 ++++++++++--------- notify/src/root.rs | 11 ++++++++- 2 files changed, 23 insertions(+), 12 deletions(-) diff --git a/consensus/src/pipeline/virtual_processor/processor.rs b/consensus/src/pipeline/virtual_processor/processor.rs index 0a32d6478..a0abe76e3 100644 --- a/consensus/src/pipeline/virtual_processor/processor.rs +++ b/consensus/src/pipeline/virtual_processor/processor.rs @@ -74,7 +74,7 @@ use kaspa_core::{debug, info, time::unix_now, trace, warn}; use kaspa_database::prelude::{StoreError, StoreResultEmptyTuple, StoreResultExtensions}; use kaspa_hashes::Hash; use kaspa_muhash::MuHash; -use kaspa_notify::notifier::Notify; +use kaspa_notify::{events::EventType, notifier::Notify}; use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender}; use itertools::Itertools; @@ -314,16 +314,18 @@ impl VirtualStateProcessor { self.notification_root .notify(Notification::VirtualDaaScoreChanged(VirtualDaaScoreChangedNotification::new(new_virtual_state.daa_score))) .expect("expecting an open unbounded channel"); - // TODO: Fetch acceptance data only if there's a subscriber for the below notification. - let added_chain_blocks_acceptance_data = - chain_path.added.iter().copied().map(|added| self.acceptance_data_store.get(added).unwrap()).collect_vec(); - self.notification_root - .notify(Notification::VirtualChainChanged(VirtualChainChangedNotification::new( - chain_path.added.into(), - chain_path.removed.into(), - Arc::new(added_chain_blocks_acceptance_data), - ))) - .expect("expecting an open unbounded channel"); + if self.notification_root.has_subscription(EventType::VirtualChainChanged) { + // check for subscriptions before the heavy lifting + let added_chain_blocks_acceptance_data = + chain_path.added.iter().copied().map(|added| self.acceptance_data_store.get(added).unwrap()).collect_vec(); + self.notification_root + .notify(Notification::VirtualChainChanged(VirtualChainChangedNotification::new( + chain_path.added.into(), + chain_path.removed.into(), + Arc::new(added_chain_blocks_acceptance_data), + ))) + .expect("expecting an open unbounded channel"); + } } pub(crate) fn virtual_finality_point(&self, virtual_ghostdag_data: &GhostdagData, pruning_point: Hash) -> Hash { diff --git a/notify/src/root.rs b/notify/src/root.rs index 992283869..b21e7662a 100644 --- a/notify/src/root.rs +++ b/notify/src/root.rs @@ -1,6 +1,6 @@ use crate::{ error::Result, - events::EventArray, + events::{EventArray, EventType}, listener::ListenerId, notification::Notification, notifier::Notify, @@ -50,6 +50,10 @@ where pub fn is_closed(&self) -> bool { self.inner.sender.is_closed() } + + pub fn has_subscription(&self, event: EventType) -> bool { + self.inner.has_subscription(event) + } } impl Notify for Root @@ -131,6 +135,11 @@ where fn stop_notify(&self, scope: Scope) -> Result<()> { self.execute_subscribe_command(scope, Command::Stop) } + + fn has_subscription(&self, event: EventType) -> bool { + let subscription = &self.subscriptions.read()[event]; + subscription.active() + } } // TODO: tests