Skip to content

Commit

Permalink
sdk: fix pong not match if connect method called multiple times
Browse files Browse the repository at this point in the history
  • Loading branch information
yukibtc committed Oct 12, 2023
1 parent 2b273c9 commit 36e39f4
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 96 deletions.
3 changes: 2 additions & 1 deletion bindings/nostr-sdk-ffi/src/nostr_sdk.udl
Original file line number Diff line number Diff line change
Expand Up @@ -540,8 +540,9 @@ interface RelayConnectionStats {

enum RelayStatus {
"Initialized",
"Connected",
"Pending",
"Connecting",
"Connected",
"Disconnected",
"Stopped",
"Terminated",
Expand Down
173 changes: 86 additions & 87 deletions crates/nostr-sdk/src/relay/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ pub enum Error {
pub enum RelayStatus {
/// Relay initialized
Initialized,
/// Pending
Pending,
/// Connecting
Connecting,
/// Relay connected
Expand All @@ -131,6 +133,7 @@ impl fmt::Display for RelayStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Initialized => write!(f, "Initialized"),
Self::Pending => write!(f, "Pending"),
Self::Connecting => write!(f, "Connecting"),
Self::Connected => write!(f, "Connected"),
Self::Disconnected => write!(f, "Disconnected"),
Expand Down Expand Up @@ -454,98 +457,85 @@ impl Relay {
self.schedule_for_stop(false);
self.schedule_for_termination(false);

if self.opts.get_reconnect() {
if wait_for_connection {
if let RelayStatus::Initialized | RelayStatus::Stopped | RelayStatus::Terminated =
self.status().await
{
if let RelayStatus::Initialized | RelayStatus::Stopped | RelayStatus::Terminated =
self.status().await
{
if self.opts.get_reconnect() {
if wait_for_connection {
self.try_connect().await
}
}

// TODO: temp disable because cause stop-start issues
/* if !self.is_auto_connect_loop_running() {
self.set_auto_connect_loop_running(true); */

tracing::debug!("Auto connect loop started for {}", self.url);

if !wait_for_connection {
self.set_status(RelayStatus::Initialized).await;
}

let relay = self.clone();
thread::abortable(async move {
loop {
let queue = relay.queue();
if queue > 0 {
tracing::info!(
"{} messages queued for {} (capacity: {})",
queue,
relay.url(),
relay.relay_sender.capacity()
);
}
tracing::debug!("Auto connect loop started for {}", self.url);

// Schedule relay for termination
// Needed to terminate the auto reconnect loop, also if the relay is not connected yet.
if relay.is_scheduled_for_stop() {
relay.set_status(RelayStatus::Stopped).await;
relay.schedule_for_stop(false);
tracing::debug!(
"Auto connect loop terminated for {} [stop - schedule]",
relay.url
);
break;
} else if relay.is_scheduled_for_termination() {
relay.set_status(RelayStatus::Terminated).await;
relay.schedule_for_termination(false);
tracing::debug!(
"Auto connect loop terminated for {} [schedule]",
relay.url
);
break;
}
if !wait_for_connection {
self.set_status(RelayStatus::Pending).await;
}

// Check status
match relay.status().await {
RelayStatus::Initialized | RelayStatus::Disconnected => {
relay.try_connect().await
let relay = self.clone();
thread::abortable(async move {
loop {
let queue = relay.queue();
if queue > 0 {
tracing::info!(
"{} messages queued for {} (capacity: {})",
queue,
relay.url(),
relay.relay_sender.capacity()
);
}
RelayStatus::Stopped | RelayStatus::Terminated => {
tracing::debug!("Auto connect loop terminated for {}", relay.url);

// Schedule relay for termination
// Needed to terminate the auto reconnect loop, also if the relay is not connected yet.
if relay.is_scheduled_for_stop() {
relay.set_status(RelayStatus::Stopped).await;
relay.schedule_for_stop(false);
tracing::debug!(
"Auto connect loop terminated for {} [stop - schedule]",
relay.url
);
break;
} else if relay.is_scheduled_for_termination() {
relay.set_status(RelayStatus::Terminated).await;
relay.schedule_for_termination(false);
tracing::debug!(
"Auto connect loop terminated for {} [schedule]",
relay.url
);
break;
}
_ => (),
};

let retry_sec: u64 = if relay.opts.get_adjust_retry_sec() {
let var: u64 =
relay.stats.attempts().saturating_sub(relay.stats.success()) as u64;
if var >= 3 {
let retry_interval: i64 =
cmp::min(MIN_RETRY_SEC * (1 + var), MAX_ADJ_RETRY_SEC) as i64;
let jitter: i64 = rand::thread_rng().gen_range(-1..=1);
retry_interval.saturating_add(jitter) as u64
// Check status
match relay.status().await {
RelayStatus::Initialized
| RelayStatus::Pending
| RelayStatus::Disconnected => relay.try_connect().await,
RelayStatus::Stopped | RelayStatus::Terminated => {
tracing::debug!("Auto connect loop terminated for {}", relay.url);
break;
}
_ => (),
};

let retry_sec: u64 = if relay.opts.get_adjust_retry_sec() {
let var: u64 =
relay.stats.attempts().saturating_sub(relay.stats.success()) as u64;
if var >= 3 {
let retry_interval: i64 =
cmp::min(MIN_RETRY_SEC * (1 + var), MAX_ADJ_RETRY_SEC) as i64;
let jitter: i64 = rand::thread_rng().gen_range(-1..=1);
retry_interval.saturating_add(jitter) as u64
} else {
relay.opts().get_retry_sec()
}
} else {
relay.opts().get_retry_sec()
}
} else {
relay.opts().get_retry_sec()
};

tracing::trace!("{} retry time set to {retry_sec} secs", relay.url);
thread::sleep(Duration::from_secs(retry_sec)).await;
}
};

//relay.set_auto_connect_loop_running(false);
});
/* } else {
tracing::warn!("Auto connect loop for {} is already running!", self.url)
} */
} else if let RelayStatus::Initialized | RelayStatus::Stopped | RelayStatus::Terminated =
self.status().await
{
if wait_for_connection {
tracing::trace!("{} retry time set to {retry_sec} secs", relay.url);
thread::sleep(Duration::from_secs(retry_sec)).await;
}
});
} else if wait_for_connection {
self.try_connect().await
} else {
let relay = self.clone();
Expand Down Expand Up @@ -610,14 +600,23 @@ impl Relay {
break;
}

let nonce: u64 = nostr::secp256k1::rand::random();
relay.stats.ping.set_last_nonce(nonce);
relay.stats.ping.set_replied(false);
if let Err(e) = relay.send_relay_event(RelayEvent::Ping { nonce }, None)
let nonce: u64 = rand::thread_rng().gen();
if relay.stats.ping.set_last_nonce(nonce)
&& relay.stats.ping.set_replied(false)
{
tracing::error!("Impossible to ping {}: {e}", relay.url);
break;
};
if let Err(e) =
relay.send_relay_event(RelayEvent::Ping { nonce }, None)
{
tracing::error!("Impossible to ping {}: {e}", relay.url);
break;
};
} else {
tracing::warn!(
"`last_nonce` or `replied` not updated for {}!",
relay.url
);
}

thread::sleep(Duration::from_secs(PING_INTERVAL)).await;
}

Expand Down
16 changes: 8 additions & 8 deletions crates/nostr-sdk/src/relay/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,16 @@ impl PingStats {
*sent_at = Instant::now();
}

pub(crate) fn set_last_nonce(&self, nonce: u64) {
let _ = self
.last_nonce
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_| Some(nonce));
pub(crate) fn set_last_nonce(&self, nonce: u64) -> bool {
self.last_nonce
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_| Some(nonce))
.is_ok()
}

pub(crate) fn set_replied(&self, replied: bool) {
let _ = self
.replied
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_| Some(replied));
pub(crate) fn set_replied(&self, replied: bool) -> bool {
self.replied
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_| Some(replied))
.is_ok()
}
}

Expand Down

0 comments on commit 36e39f4

Please sign in to comment.