Skip to content

Commit

Permalink
fix tls:any lookup issues
Browse files Browse the repository at this point in the history
  • Loading branch information
aspect committed Jul 31, 2024
1 parent 36722f4 commit 0ad47cf
Show file tree
Hide file tree
Showing 9 changed files with 195 additions and 70 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "kaspa-resolver"
description = "Kaspa wRPC endpoint resolver and monitor"
version = "0.3.0"
description = "Kaspa RPC endpoint resolver"
version = "0.4.0"
edition = "2021"
# authors.workspace = true
# include.workspace = true
Expand Down
5 changes: 5 additions & 0 deletions src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ pub struct Args {
pub verbose: bool,
/// Tracing mode
pub trace: bool,
/// Debug mode
pub debug: bool,
/// Auto-update
pub auto_update: bool,
/// Custom config file
Expand All @@ -48,6 +50,7 @@ impl Args {
.arg(arg!(--version "Display software version"))
.arg(arg!(--verbose "Enable verbose logging"))
.arg(arg!(--trace "Enable trace log level"))
.arg(arg!(--debug "Enable additional debug output"))
// .arg(arg!(--auto-update "Poll configuration updates"))
// .arg(arg!(--election "Show node data on each election"))
// .arg(arg!(--status "Enable `/status` endpoint"))
Expand Down Expand Up @@ -93,6 +96,7 @@ impl Args {

let trace = matches.get_one::<bool>("trace").cloned().unwrap_or(false);
let verbose = matches.get_one::<bool>("verbose").cloned().unwrap_or(false);
let debug = matches.get_one::<bool>("debug").cloned().unwrap_or(false);
let auto_update = matches
.get_one::<bool>("auto-update")
.cloned()
Expand Down Expand Up @@ -166,6 +170,7 @@ impl Args {
Args {
trace,
verbose,
debug,
auto_update,
user_config,
// election,
Expand Down
61 changes: 44 additions & 17 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ impl fmt::Display for Connection {
#[derive(Debug)]
pub struct Connection {
args: Arc<Args>,
// caps: Arc<RwLock<Option<Caps>>>,
caps: ArcSwapOption<Caps>,
is_synced: AtomicBool,
clients: AtomicU64,
Expand Down Expand Up @@ -79,57 +78,72 @@ impl Connection {
self.args.verbose
}

/// Represents the connection score, which is currently
/// the number of sockets (clients + peers) the node has.
#[inline]
pub fn score(&self) -> u64 {
self.clients.load(Ordering::Relaxed)
pub fn score(self: &Arc<Self>) -> u64 {
self.delegate().sockets()
}

/// Connection availability state.
#[inline]
pub fn is_available(&self) -> bool {
self.is_delegate()
&& self.online()
&& self.caps.load().as_ref().as_ref().is_some_and(|caps| {
let clients = self.clients();
let peers = self.peers();
pub fn is_available(self: &Arc<Self>) -> bool {
let delegate = self.delegate();

self.is_connected()
&& delegate.is_online()
&& delegate.caps.load().as_ref().as_ref().is_some_and(|caps| {
let clients = delegate.clients();
let peers = delegate.peers();
clients < caps.clients_limit && clients + peers < caps.fd_limit
})
}

/// Indicates if the connection RPC is connected.
#[inline]
pub fn connected(&self) -> bool {
pub fn is_connected(&self) -> bool {
self.is_connected.load(Ordering::Relaxed)
}

/// Indicates if the connection is available as a general
/// concept: no errors have occurred during RPC calls
/// and the node is in synced synced.
#[inline]
pub fn online(&self) -> bool {
pub fn is_online(&self) -> bool {
self.is_online.load(Ordering::Relaxed)
}

/// Indicates if the node is in synced state.
#[inline]
pub fn is_synced(&self) -> bool {
self.is_synced.load(Ordering::Relaxed)
}

/// Number of RPC clients connected to the node.
#[inline]
pub fn clients(&self) -> u64 {
self.clients.load(Ordering::Relaxed)
}

/// Number of p2p peers connected to the node.
#[inline]
pub fn peers(&self) -> u64 {
self.peers.load(Ordering::Relaxed)
}

/// Total number of TCP sockets connected to the node.
#[inline]
pub fn sockets(&self) -> u64 {
self.clients() + self.peers()
}

/// Node capabilities (partial system spec, see [`Caps`])
#[inline]
pub fn caps(&self) -> Option<Arc<Caps>> {
self.caps.load().clone()
}

/// Unique system (machine) identifier of the node.
#[inline]
pub fn system_id(&self) -> u64 {
self.caps
Expand All @@ -139,26 +153,37 @@ impl Connection {
.unwrap_or_default()
}

/// Connection address (URL).
#[inline]
pub fn address(&self) -> &str {
self.node.address.as_str()
}

/// Node configuration parameters used to create this connection.
#[inline]
pub fn node(&self) -> &Arc<Node> {
&self.node
}

/// Connection parameters used to create this connection.
#[inline]
pub fn params(&self) -> PathParams {
self.params
}

/// Network id of the node.
#[inline]
pub fn network_id(&self) -> NetworkId {
self.node.network
}

/// Indicates if the connection is a delegate.
#[inline]
pub fn is_delegate(&self) -> bool {
self.delegate.load().is_none()
}

/// Get the delegate of this connection.
#[inline]
pub fn delegate(self: &Arc<Self>) -> Arc<Connection> {
match (**self.delegate.load()).clone() {
Expand All @@ -167,12 +192,17 @@ impl Connection {
}
}

/// Associate a delegate to this connection. A delegate is a primary
/// connection to the node that does actual performance monitoring
/// while non-delegate connections remain idle in a keep-alive state.
#[inline]
pub fn bind_delegate(&self, delegate: Option<Arc<Connection>>) {
self.delegate.store(Arc::new(delegate));
}

pub fn resolve_delegates(self: &Arc<Self>) -> Vec<Arc<Connection>> {
/// Creates a list of delegators for this connection, where the last
/// entry is the delegate.
pub fn resolve_delegators(self: &Arc<Self>) -> Vec<Arc<Connection>> {
let mut delegates = Vec::new();
let mut delegate = (*self).clone();
while let Some(next) = (**delegate.delegate.load()).clone() {
Expand All @@ -183,7 +213,7 @@ impl Connection {
}

pub fn status(&self) -> &'static str {
if self.connected() {
if self.is_connected() {
if !self.is_delegate() {
"delegator"
} else if self.is_synced() {
Expand All @@ -207,14 +237,11 @@ impl Connection {
let shutdown_ctl_receiver = self.shutdown_ctl.request.receiver.clone();
let shutdown_ctl_sender = self.shutdown_ctl.response.sender.clone();

// let mut ttl = sleep(TtlSettings::period());
let mut ttl = TtlSettings::ttl();
// TODO - delegate state changes inside `update_state()`!
let mut poll = if self.is_delegate() {
// workflow_core::task::
interval(SyncSettings::poll())
} else {
// workflow_core::task::
interval(SyncSettings::ping())
};

Expand Down Expand Up @@ -474,7 +501,7 @@ impl<'a> From<&'a Arc<Connection>> for Status<'a> {
.unwrap_or_else(|| ("n/a".to_string(), 0, 0, 0, 0));

let delegates = connection
.resolve_delegates()
.resolve_delegators()
.iter()
.map(|connection| format!("[{:016x}] {}", connection.system_id(), connection.address()))
.collect::<Vec<String>>();
Expand Down
3 changes: 1 addition & 2 deletions src/imports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ pub use arc_swap::{ArcSwap, ArcSwapOption};
pub use cliclack::{log, outro};
pub use enum_dispatch::enum_dispatch;
pub use futures::{select, FutureExt, StreamExt};
pub use kaspa_consensus_core::network::NetworkId;
// pub use kaspa_rpc_core::api::rpc::RpcApi;
pub use kaspa_consensus_core::network::{NetworkId, NetworkType};
pub use kaspa_utils::hex::*;
pub use rand::Rng;
pub use serde::{de::DeserializeOwned, Deserialize, Serialize};
Expand Down
95 changes: 78 additions & 17 deletions src/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl fmt::Debug for Monitor {

impl Monitor {
pub fn new(args: &Arc<Args>, service: Service) -> Self {
let sorts = PathParams::iter()
let sorts = PathParams::iter_tls_any()
.map(|params| (params, AtomicBool::new(false)))
.collect();

Expand Down Expand Up @@ -52,12 +52,9 @@ impl Monitor {
}

pub fn to_vec(&self) -> Vec<Arc<Connection>> {
self.connections
.read()
.unwrap()
.values()
PathParams::iter_tls_strict()
.filter_map(|params| self.connections.read().unwrap().get(&params).cloned())
.flatten()
.cloned()
.collect()
}

Expand All @@ -78,7 +75,10 @@ impl Monitor {

let mut connections = self.connections();

for params in PathParams::iter() {
let mut tls_any_created = Vec::new();
let mut tls_any_removed = Vec::new();

for params in PathParams::iter_tls_strict() {
let nodes = nodes
.iter()
.filter(|node| node.params() == &params)
Expand All @@ -105,22 +105,45 @@ impl Monitor {
&self.args,
)?);
created.start()?;
list.push(created);
list.push(created.clone());
tls_any_created.push(created);
}

for removed in remove {
removed.stop().await?;
list.retain(|c| c.node() != removed.node());
tls_any_removed.push(removed);
}
}

let targets = AHashMap::group_from(connections.values().flatten().map(|c| {
(
c.node().network_node_uid(),
c.node().transport_kind(),
c.clone(),
)
}));
// remove connections from TlsAny list
tls_any_removed.into_iter().for_each(|connection| {
let params = connection.params().to_tls(TlsKind::Any);
let list = connections.entry(params).or_default();
list.retain(|connection| connection.node() != connection.node());
});

// create connections in TlsAny list
tls_any_created.into_iter().for_each(|connection| {
let params = connection.params().to_tls(TlsKind::Any);
let list = connections.entry(params).or_default();
list.push(connection);
});

// collect all strict Tls connections and group them by network_uid (fqdn+network+tls)
let targets = AHashMap::group_from(
connections
.iter()
.filter_map(|(params, list)| params.is_tls_strict().then_some(list))
.flatten()
.map(|connection| {
(
connection.node().network_node_uid(),
connection.node().transport_kind(),
connection.clone(),
)
}),
);

for (_network_uid, transport_map) in targets.iter() {
if let Some(wrpc_borsh) = transport_map.get(&TransportKind::WrpcBorsh) {
Expand All @@ -132,6 +155,23 @@ impl Monitor {
}
}

if self.args.debug {
for params in PathParams::iter_tls_any() {
println!("{}:{}", self.service, params);
if let Some(connections) = connections.get(&params) {
if connections.is_empty() {
println!("\t- None (0)");
} else {
for connection in connections {
println!("\t- {}", connection);
}
}
} else {
println!("\t- N/A");
}
}
}

*self.connections.write().unwrap() = connections;

Ok(())
Expand Down Expand Up @@ -191,16 +231,37 @@ impl Monitor {
Ok(())
}

pub fn schedule_sort(&self, params: &PathParams) {
pub fn schedule_sort(&self, params_tls_kind: &PathParams) {
self.sorts
.get(params)
.get(params_tls_kind)
.unwrap()
.store(true, Ordering::Relaxed);

let params_tls_any = params_tls_kind.to_tls(TlsKind::Any);
self.sorts
.get(&params_tls_any)
.unwrap()
.store(true, Ordering::Relaxed);
}

// /// Get JSON string representing node information (id, url, provider, link)
pub fn election(&self, params: &PathParams) -> Option<String> {
if self.verbose() {
println!("election for: {}", params);
}

let connections = self.connections.read().unwrap();

if self.verbose() {
if let Some(connections) = connections.get(params) {
for connection in connections {
println!("\t- {}", connection);
}
} else {
println!("\t- N/A");
}
}

let connections = connections
.get(params)?
.iter()
Expand Down
Loading

0 comments on commit 0ad47cf

Please sign in to comment.