Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscription context with a network type #458

Open
wants to merge 24 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
bab2f13
Make `Tracker` clonable shallowly
tiram88 Apr 9, 2024
d3be13b
Embed a `Tracker` inside `CounterMap` and `IndexSet`
tiram88 Apr 9, 2024
778c634
Reverse the dependency between `Tracker` and `IndexSet` & `CounterMap…
tiram88 Apr 11, 2024
5335ac9
Merge branch 'master' into current-network-type
tiram88 Apr 11, 2024
ce3e406
Fix `CounterMap` cloning
tiram88 Apr 12, 2024
0a61fc8
Move `Tracker::contains()` to `CounterMap` & `IndexSet`
tiram88 Apr 12, 2024
51cd579
`SubscriptionContext` is no longer needed by `Notification` trait
tiram88 Apr 12, 2024
ee8153b
Take empty counters into account in `CounterMap`
tiram88 Apr 15, 2024
7f187fe
Add unit tests covering `CounterMap` and `IndexSet` tracking
tiram88 Apr 15, 2024
62467fd
Base UTXO set filtering optimization on capacity rather than length
tiram88 Apr 18, 2024
3f63916
Refactor `UtxosChangedScope` as an enum containing either addresses o…
tiram88 Apr 19, 2024
a11334e
Add a prefix to the tracker
tiram88 Apr 21, 2024
4a83a77
Let `SubscriptionContext` be defined with a network type instead of a…
tiram88 Apr 22, 2024
fe73f6d
Clients (gRPC and wRPC) register the server network type
tiram88 Apr 22, 2024
23b5335
Use the tracker prefix when querying addresses from IndexSet and Coun…
tiram88 Apr 22, 2024
e9e42ba
Extent UtxosChangedScope serialization to the Indexes variant
tiram88 Apr 22, 2024
f15bcae
Remove the subscription context wherever no longer needed
tiram88 Apr 22, 2024
ff37b2b
Extend test coverage of tracker reference counting to fns involving o…
tiram88 Apr 23, 2024
97c8479
Merge branch 'master' into current-network-type
tiram88 Apr 23, 2024
c8fdcf3
Assert all indexers do reference the right tracker
tiram88 Apr 23, 2024
4b200ed
Assert rather on tracker inner ptr
tiram88 Apr 23, 2024
c06ee17
Merge branch 'master' into current-network-type
tiram88 Jun 24, 2024
fa371ff
Ensure `KaspaRpcClient` and its `SubscriptionContext` have matching n…
tiram88 Jun 24, 2024
5cf1e0f
Merge branch 'dev' into current-network-type
tiram88 Jul 11, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 3 additions & 12 deletions consensus/notify/src/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use kaspa_notify::{
full_featured,
notification::Notification as NotificationTrait,
subscription::{
context::SubscriptionContext,
single::{OverallSubscription, UtxosChangedSubscription, VirtualChainChangedSubscription},
Subscription,
},
Expand Down Expand Up @@ -46,18 +45,14 @@ pub enum Notification {
}

impl NotificationTrait for Notification {
fn apply_overall_subscription(&self, subscription: &OverallSubscription, _context: &SubscriptionContext) -> Option<Self> {
fn apply_overall_subscription(&self, subscription: &OverallSubscription) -> Option<Self> {
match subscription.active() {
true => Some(self.clone()),
false => None,
}
}

fn apply_virtual_chain_changed_subscription(
&self,
subscription: &VirtualChainChangedSubscription,
_context: &SubscriptionContext,
) -> Option<Self> {
fn apply_virtual_chain_changed_subscription(&self, subscription: &VirtualChainChangedSubscription) -> Option<Self> {
match subscription.active() {
true => {
// If the subscription excludes accepted transaction ids and the notification includes some
Expand All @@ -77,11 +72,7 @@ impl NotificationTrait for Notification {
}
}

fn apply_utxos_changed_subscription(
&self,
_subscription: &UtxosChangedSubscription,
_context: &SubscriptionContext,
) -> Option<Self> {
fn apply_utxos_changed_subscription(&self, _subscription: &UtxosChangedSubscription) -> Option<Self> {
// No effort is made here to apply the subscription addresses.
// This will be achieved farther along the notification backbone.
Some(self.clone())
Expand Down
4 changes: 2 additions & 2 deletions consensus/src/consensus/test_consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub struct TestConsensus {
impl TestConsensus {
/// Creates a test consensus instance based on `config` with the provided `db` and `notification_sender`
pub fn with_db(db: Arc<DB>, config: &Config, notification_sender: Sender<Notification>) -> Self {
let notification_root = Arc::new(ConsensusNotificationRoot::new(notification_sender));
let notification_root = Arc::new(ConsensusNotificationRoot::new(config.net.into(), notification_sender));
let counters = Default::default();
let tx_script_cache_counters = Default::default();
let consensus = Arc::new(Consensus::new(
Expand Down Expand Up @@ -90,7 +90,7 @@ impl TestConsensus {
pub fn new(config: &Config) -> Self {
let (db_lifetime, db) = create_temp_db!(ConnBuilder::default().with_files_limit(10));
let (dummy_notification_sender, _) = async_channel::unbounded();
let notification_root = Arc::new(ConsensusNotificationRoot::new(dummy_notification_sender));
let notification_root = Arc::new(ConsensusNotificationRoot::new(config.net.into(), dummy_notification_sender));
let counters = Default::default();
let tx_script_cache_counters = Default::default();
let consensus = Arc::new(Consensus::new(
Expand Down
46 changes: 16 additions & 30 deletions indexes/core/src/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use kaspa_notify::{
full_featured,
notification::Notification as NotificationTrait,
subscription::{
context::SubscriptionContext,
single::{OverallSubscription, UtxosChangedSubscription, VirtualChainChangedSubscription},
Subscription,
},
Expand All @@ -24,30 +23,22 @@ pub enum Notification {
}

impl NotificationTrait for Notification {
fn apply_overall_subscription(&self, subscription: &OverallSubscription, _context: &SubscriptionContext) -> Option<Self> {
fn apply_overall_subscription(&self, subscription: &OverallSubscription) -> Option<Self> {
match subscription.active() {
true => Some(self.clone()),
false => None,
}
}

fn apply_virtual_chain_changed_subscription(
&self,
_subscription: &VirtualChainChangedSubscription,
_context: &SubscriptionContext,
) -> Option<Self> {
fn apply_virtual_chain_changed_subscription(&self, _subscription: &VirtualChainChangedSubscription) -> Option<Self> {
Some(self.clone())
}

fn apply_utxos_changed_subscription(
&self,
subscription: &UtxosChangedSubscription,
context: &SubscriptionContext,
) -> Option<Self> {
fn apply_utxos_changed_subscription(&self, subscription: &UtxosChangedSubscription) -> Option<Self> {
match subscription.active() {
true => {
let Self::UtxosChanged(notification) = self else { return None };
notification.apply_utxos_changed_subscription(subscription, context).map(Self::UtxosChanged)
notification.apply_utxos_changed_subscription(subscription).map(Self::UtxosChanged)
}
false => None,
}
Expand Down Expand Up @@ -78,16 +69,12 @@ impl UtxosChangedNotification {
Self { added: Arc::new(utxos_changed.added), removed: Arc::new(utxos_changed.removed) }
}

pub(crate) fn apply_utxos_changed_subscription(
&self,
subscription: &UtxosChangedSubscription,
context: &SubscriptionContext,
) -> Option<Self> {
pub(crate) fn apply_utxos_changed_subscription(&self, subscription: &UtxosChangedSubscription) -> Option<Self> {
if subscription.to_all() {
Some(self.clone())
} else {
let added = Self::filter_utxo_set(&self.added, subscription, context);
let removed = Self::filter_utxo_set(&self.removed, subscription, context);
let added = Self::filter_utxo_set(&self.added, subscription);
let removed = Self::filter_utxo_set(&self.removed, subscription);
if added.is_empty() && removed.is_empty() {
None
} else {
Expand All @@ -96,27 +83,26 @@ impl UtxosChangedNotification {
}
}

fn filter_utxo_set(
utxo_set: &UtxoSetByScriptPublicKey,
subscription: &UtxosChangedSubscription,
context: &SubscriptionContext,
) -> UtxoSetByScriptPublicKey {
fn filter_utxo_set(utxo_set: &UtxoSetByScriptPublicKey, subscription: &UtxosChangedSubscription) -> UtxoSetByScriptPublicKey {
// As an optimization, we iterate over the smaller set (O(n)) among the two below
// and check existence over the larger set (O(1))
let mut result = HashMap::default();
let subscription_data = subscription.data();
if utxo_set.len() < subscription_data.len() {

// Compare capacities instead of lengths since iterators do also visit empty buckets,
// so we have O(capacity) time complexity
if utxo_set.capacity() < subscription_data.capacity() {
{
utxo_set.iter().for_each(|(script_public_key, collection)| {
if subscription_data.contains(script_public_key, context) {
if subscription_data.contains(script_public_key) {
result.insert(script_public_key.clone(), collection.clone());
}
});
}
} else {
let tracker_data = context.address_tracker.data();
subscription_data.iter().for_each(|index| {
if let Some(script_public_key) = tracker_data.get_index(*index) {
let tracker_data = subscription_data.tracker().data();
subscription_data.iter_index().for_each(|index| {
if let Some(script_public_key) = tracker_data.get_index(index) {
if let Some(collection) = utxo_set.get(script_public_key) {
result.insert(script_public_key.clone(), collection.clone());
}
Expand Down
2 changes: 1 addition & 1 deletion kaspad/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm
let tick_service = Arc::new(TickService::new());
let (notification_send, notification_recv) = unbounded();
let max_tracked_addresses = if args.utxoindex && args.max_tracked_addresses > 0 { Some(args.max_tracked_addresses) } else { None };
let subscription_context = SubscriptionContext::with_options(max_tracked_addresses);
let subscription_context = SubscriptionContext::with_options(Some(config.net.into()), max_tracked_addresses);
let notification_root = Arc::new(ConsensusNotificationRoot::with_context(notification_send, subscription_context.clone()));
let processing_counters = Arc::new(ProcessingCounters::default());
let mining_counters = Arc::new(MiningCounters::default());
Expand Down
10 changes: 6 additions & 4 deletions notify/benches/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@ use kaspa_addresses::{Address, Prefix};
use kaspa_math::Uint256;
use kaspa_notify::{address::tracker::Indexes, subscription::context::SubscriptionContext};

const ADDRESS_PREFIX: Prefix = Prefix::Mainnet;

fn create_addresses(count: usize) -> Vec<Address> {
(0..count)
.map(|i| Address::new(Prefix::Mainnet, kaspa_addresses::Version::PubKey, &Uint256::from_u64(i as u64).to_le_bytes()))
.map(|i| Address::new(ADDRESS_PREFIX, kaspa_addresses::Version::PubKey, &Uint256::from_u64(i as u64).to_le_bytes()))
.collect()
}

fn create_and_fill_context(addresses: Vec<Address>) -> SubscriptionContext {
let mut indexes = Indexes::new(vec![]);
let context = SubscriptionContext::with_options(Some(ADDRESS_COUNT));
let _ = context.address_tracker.register(&mut indexes, addresses);
let context = SubscriptionContext::with_options(Some(ADDRESS_PREFIX.try_into().unwrap()), Some(ADDRESS_COUNT));
let mut indexes = Indexes::new(context.address_tracker.clone());
let _ = indexes.register(addresses);
context
}

Expand Down
9 changes: 8 additions & 1 deletion notify/src/address/error.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
use kaspa_addresses::Address;
use thiserror::Error;

#[derive(Clone, Debug, Error)]
pub enum Error {
#[error("the address store reached the maximum capacity")]
#[error("the address tracker reached the maximum capacity")]
MaxCapacityReached,

#[error("no prefix was attributed to the address tracker")]
NoPrefix,

#[error("address {0} does not match the address tracker prefix")]
PrefixMismatch(Address),
}

pub type Result<T> = std::result::Result<T, Error>;
2 changes: 2 additions & 0 deletions notify/src/address/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ pub mod tracker;
pub mod test_helpers {
use kaspa_addresses::Address;
use kaspa_addresses::{Prefix, Version};
use kaspa_consensus_core::network::NetworkType;

pub const NETWORK_TYPE: NetworkType = NetworkType::Mainnet;
pub const ADDRESS_PREFIX: Prefix = Prefix::Mainnet;

pub fn get_3_addresses(sorted: bool) -> Vec<Address> {
Expand Down
Loading
Loading