Skip to content

Commit

Permalink
check_subscription_before_vccn (kaspanet#375)
Browse files Browse the repository at this point in the history
* check_subscription_before_vccn

* simplfy to root notifer only

* simplify one last time. no need to be in notify trait at all.
  • Loading branch information
D-Stacks committed Jan 4, 2024
1 parent 697e5d3 commit f604a69
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 12 deletions.
24 changes: 13 additions & 11 deletions consensus/src/pipeline/virtual_processor/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ use kaspa_core::{debug, info, time::unix_now, trace, warn};
use kaspa_database::prelude::{StoreError, StoreResultEmptyTuple, StoreResultExtensions};
use kaspa_hashes::Hash;
use kaspa_muhash::MuHash;
use kaspa_notify::notifier::Notify;
use kaspa_notify::{events::EventType, notifier::Notify};

use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use itertools::Itertools;
Expand Down Expand Up @@ -314,16 +314,18 @@ impl VirtualStateProcessor {
self.notification_root
.notify(Notification::VirtualDaaScoreChanged(VirtualDaaScoreChangedNotification::new(new_virtual_state.daa_score)))
.expect("expecting an open unbounded channel");
// TODO: Fetch acceptance data only if there's a subscriber for the below notification.
let added_chain_blocks_acceptance_data =
chain_path.added.iter().copied().map(|added| self.acceptance_data_store.get(added).unwrap()).collect_vec();
self.notification_root
.notify(Notification::VirtualChainChanged(VirtualChainChangedNotification::new(
chain_path.added.into(),
chain_path.removed.into(),
Arc::new(added_chain_blocks_acceptance_data),
)))
.expect("expecting an open unbounded channel");
if self.notification_root.has_subscription(EventType::VirtualChainChanged) {
// check for subscriptions before the heavy lifting
let added_chain_blocks_acceptance_data =
chain_path.added.iter().copied().map(|added| self.acceptance_data_store.get(added).unwrap()).collect_vec();
self.notification_root
.notify(Notification::VirtualChainChanged(VirtualChainChangedNotification::new(
chain_path.added.into(),
chain_path.removed.into(),
Arc::new(added_chain_blocks_acceptance_data),
)))
.expect("expecting an open unbounded channel");
}
}

pub(crate) fn virtual_finality_point(&self, virtual_ghostdag_data: &GhostdagData, pruning_point: Hash) -> Hash {
Expand Down
11 changes: 10 additions & 1 deletion notify/src/root.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
error::Result,
events::EventArray,
events::{EventArray, EventType},
listener::ListenerId,
notification::Notification,
notifier::Notify,
Expand Down Expand Up @@ -50,6 +50,10 @@ where
pub fn is_closed(&self) -> bool {
self.inner.sender.is_closed()
}

pub fn has_subscription(&self, event: EventType) -> bool {
self.inner.has_subscription(event)
}
}

impl<N> Notify<N> for Root<N>
Expand Down Expand Up @@ -131,6 +135,11 @@ where
fn stop_notify(&self, scope: Scope) -> Result<()> {
self.execute_subscribe_command(scope, Command::Stop)
}

fn has_subscription(&self, event: EventType) -> bool {
let subscription = &self.subscriptions.read()[event];
subscription.active()
}
}

// TODO: tests

0 comments on commit f604a69

Please sign in to comment.