diff --git a/crates/nostr-connect/src/client.rs b/crates/nostr-connect/src/client.rs index 110b99e3b..97f4928a2 100644 --- a/crates/nostr-connect/src/client.rs +++ b/crates/nostr-connect/src/client.rs @@ -119,7 +119,7 @@ impl NostrConnect { } // Connect to relays - self.pool.connect(None).await; + self.pool.connect().await; // Subscribe let notifications = self.subscribe().await?; diff --git a/crates/nostr-connect/src/signer.rs b/crates/nostr-connect/src/signer.rs index 1ffdf9e84..7a7b1ec23 100644 --- a/crates/nostr-connect/src/signer.rs +++ b/crates/nostr-connect/src/signer.rs @@ -6,7 +6,6 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -use std::time::Duration; use nostr::nips::nip46::{Message, Request, ResponseResult}; use nostr_relay_pool::prelude::*; @@ -129,7 +128,7 @@ impl NostrConnectRemoteSigner { } // Connect - self.pool.connect(Some(Duration::from_secs(10))).await; + self.pool.connect().await; let filter = Filter::new() .pubkey(self.keys.signer.public_key()) diff --git a/crates/nostr-relay-pool/examples/pool.rs b/crates/nostr-relay-pool/examples/pool.rs index c8dfdf4f6..0abc9108d 100644 --- a/crates/nostr-relay-pool/examples/pool.rs +++ b/crates/nostr-relay-pool/examples/pool.rs @@ -19,7 +19,7 @@ async fn main() -> Result<()> { pool.add_relay("wss://relay.damus.io", RelayOptions::default()) .await?; - pool.connect(None).await; + pool.connect().await; let event = Event::from_json(r#"{"content":"","created_at":1698412975,"id":"f55c30722f056e330d8a7a6a9ba1522f7522c0f1ced1c93d78ea833c78a3d6ec","kind":3,"pubkey":"f831caf722214748c72db4829986bd0cbb2bb8b3aeade1c959624a52a9629046","sig":"5092a9ffaecdae7d7794706f085ff5852befdf79df424cc3419bb797bf515ae05d4f19404cb8324b8b4380a4bd497763ac7b0f3b1b63ef4d3baa17e5f5901808","tags":[["p","4ddeb9109a8cd29ba279a637f5ec344f2479ee07df1f4043f3fe26d8948cfef9","",""],["p","bb6fd06e156929649a73e6b278af5e648214a69d88943702f1fb627c02179b95","",""],["p","b8b8210f33888fdbf5cedee9edf13c3e9638612698fe6408aff8609059053420","",""],["p","9dcee4fabcd690dc1da9abdba94afebf82e1e7614f4ea92d61d52ef9cd74e083","",""],["p","3eea9e831fefdaa8df35187a204d82edb589a36b170955ac5ca6b88340befaa0","",""],["p","885238ab4568f271b572bf48b9d6f99fa07644731f288259bd395998ee24754e","",""],["p","568a25c71fba591e39bebe309794d5c15d27dbfa7114cacb9f3586ea1314d126","",""]]}"#).unwrap(); pool.send_event(event).await?; diff --git a/crates/nostr-relay-pool/src/pool/inner.rs b/crates/nostr-relay-pool/src/pool/inner.rs index b75cf80cb..76e083cf9 100644 --- a/crates/nostr-relay-pool/src/pool/inner.rs +++ b/crates/nostr-relay-pool/src/pool/inner.rs @@ -922,7 +922,17 @@ impl InnerRelayPool { Ok(ReceiverStream::new(rx)) } - pub async fn connect(&self, connection_timeout: Option) { + pub async fn connect(&self) { + // Lock with read shared access + let relays = self.atomic.relays.read().await; + + // Connect + for relay in relays.values() { + relay.connect() + } + } + + pub async fn try_connect(&self, timeout: Duration) { // Lock with read shared access let relays = self.atomic.relays.read().await; @@ -930,7 +940,7 @@ impl InnerRelayPool { // Filter only relays that can connect and compose futures for relay in relays.values().filter(|r| r.status().can_connect()) { - futures.push(relay.connect(connection_timeout)); + futures.push(relay.try_connect(timeout)); } // Check number of futures @@ -964,11 +974,7 @@ impl InnerRelayPool { Ok(()) } - pub async fn connect_relay( - &self, - url: U, - connection_timeout: Option, - ) -> Result<(), Error> + pub async fn connect_relay(&self, url: U) -> Result<(), Error> where U: TryIntoUrl, Error: From<::Err>, @@ -983,7 +989,27 @@ impl InnerRelayPool { let relay: &Relay = self.internal_relay(&relays, &url)?; // Connect - relay.connect(connection_timeout).await; + relay.connect(); + + Ok(()) + } + + pub async fn try_connect_relay(&self, url: U, timeout: Duration) -> Result<(), Error> + where + U: TryIntoUrl, + Error: From<::Err>, + { + // Convert url + let url: RelayUrl = url.try_into_url()?; + + // Lock with read shared access + let relays = self.atomic.relays.read().await; + + // Get relay + let relay: &Relay = self.internal_relay(&relays, &url)?; + + // Try to connect + relay.try_connect(timeout).await?; Ok(()) } diff --git a/crates/nostr-relay-pool/src/pool/mod.rs b/crates/nostr-relay-pool/src/pool/mod.rs index f3df7a2df..9410b559d 100644 --- a/crates/nostr-relay-pool/src/pool/mod.rs +++ b/crates/nostr-relay-pool/src/pool/mod.rs @@ -251,10 +251,16 @@ impl RelayPool { self.inner.remove_all_relays(true).await } - /// Connect to all added relays and keep connection alive + /// Connect to all added relays #[inline] - pub async fn connect(&self, connection_timeout: Option) { - self.inner.connect(connection_timeout).await + pub async fn connect(&self) { + self.inner.connect().await + } + + /// Connect to all added relays + #[inline] + pub async fn try_connect(&self, timeout: Duration) { + self.inner.try_connect(timeout).await } /// Disconnect from all relays @@ -263,18 +269,30 @@ impl RelayPool { self.inner.disconnect().await } - /// Connect to relay + /// Connect to a previously added relay + /// + /// This method doesn't provide any information on if the connection was successful or not. + /// + /// Return [`Error::RelayNotFound`] if the relay doesn't exist in the pool. #[inline] - pub async fn connect_relay( - &self, - url: U, - connection_timeout: Option, - ) -> Result<(), Error> + pub async fn connect_relay(&self, url: U) -> Result<(), Error> + where + U: TryIntoUrl, + Error: From<::Err>, + { + self.inner.connect_relay(url).await + } + + /// Try to connect to a previously added relay + /// + /// This method returns an error if the connection fails. + #[inline] + pub async fn try_connect_relay(&self, url: U, timeout: Duration) -> Result<(), Error> where U: TryIntoUrl, Error: From<::Err>, { - self.inner.connect_relay(url, connection_timeout).await + self.inner.try_connect_relay(url, timeout).await } /// Disconnect relay @@ -633,7 +651,7 @@ mod tests { pool.add_relay(&url, RelayOptions::default()).await.unwrap(); - pool.connect(None).await; + pool.connect().await; assert!(!pool.inner.is_shutdown()); diff --git a/crates/nostr-relay-pool/src/relay/error.rs b/crates/nostr-relay-pool/src/relay/error.rs index 6f030c460..377a4ff93 100644 --- a/crates/nostr-relay-pool/src/relay/error.rs +++ b/crates/nostr-relay-pool/src/relay/error.rs @@ -59,6 +59,8 @@ pub enum Error { NotReady, /// Relay not connected NotConnected, + /// Connection failed + ConnectionFailed, /// Received shutdown ReceivedShutdown, /// Relay message @@ -145,6 +147,7 @@ impl fmt::Display for Error { } Self::NotReady => write!(f, "relay is initialized but not ready"), Self::NotConnected => write!(f, "relay not connected"), + Self::ConnectionFailed => write!(f, "connection failed"), Self::ReceivedShutdown => write!(f, "received shutdown"), Self::RelayMessage(message) => write!(f, "{message}"), Self::BatchMessagesEmpty => write!(f, "can't batch empty list of messages"), diff --git a/crates/nostr-relay-pool/src/relay/inner.rs b/crates/nostr-relay-pool/src/relay/inner.rs index 514a46e7c..680111038 100644 --- a/crates/nostr-relay-pool/src/relay/inner.rs +++ b/crates/nostr-relay-pool/src/relay/inner.rs @@ -432,42 +432,56 @@ impl InnerRelay { } } - pub async fn connect(&self, connection_timeout: Option) { - // Return if relay can't connect - if !self.status().can_connect() { - return; - } - + fn _connect(&self, timeout: Duration) { // Update status // Change it to pending to avoid issues with the health check (initialized check) self.set_status(RelayStatus::Pending, false); - // If connection timeout is `Some`, try to connect waiting for connection - match connection_timeout { - Some(timeout) => { - let mut notifications = self.internal_notification_sender.subscribe(); + // Spawn connection task + self.spawn_and_try_connect(timeout); + } + + pub fn connect(&self) { + if self.status().can_connect() { + self._connect(DEFAULT_CONNECTION_TIMEOUT); + } + } - // Spawn and try connect - self.spawn_and_try_connect(timeout); + pub async fn try_connect(&self, timeout: Duration) -> Result<(), Error> { + // Check if relay can't connect + if !self.status().can_connect() { + // TODO: should return `Error::AlreadyConnected`? + return Ok(()); + } - // Wait for status change (connected or disconnected) - tracing::debug!(url = %self.url, "Waiting for status change before continue"); - while let Ok(notification) = notifications.recv().await { - if let RelayNotification::RelayStatus { - status: RelayStatus::Connected | RelayStatus::Disconnected, - } = notification - { - break; + // Subscribe to notifications + let mut notifications = self.internal_notification_sender.subscribe(); + + // Connect + self._connect(timeout); + + // Wait for status change + while let Ok(notification) = notifications.recv().await { + if let RelayNotification::RelayStatus { status } = notification { + match status { + // This status shouldn't happen, break the loop + RelayStatus::Initialized => break, + // Waiting for connection, stay in the loop + RelayStatus::Pending | RelayStatus::Connecting => continue, + // Success + RelayStatus::Connected => return Ok(()), + // Failed + RelayStatus::Disconnected | RelayStatus::Terminated => { + return Err(Error::ConnectionFailed) } } } - None => { - self.spawn_and_try_connect(DEFAULT_CONNECTION_TIMEOUT); - } } + + Err(Error::PrematureExit) } - fn spawn_and_try_connect(&self, connection_timeout: Duration) { + fn spawn_and_try_connect(&self, timeout: Duration) { if self.is_running() { tracing::warn!(url = %self.url, "Connection task is already running."); return; @@ -482,7 +496,7 @@ impl InnerRelay { let mut rx_service = relay.atomic.channels.rx_service().await; // Last websocket error - // Store it to avoid to print every time the same connection error + // Store it to avoid printing every time the same connection error let mut last_ws_error = None; // Auto-connect loop @@ -493,7 +507,7 @@ impl InnerRelay { tokio::select! { // Connect and run message handler - _ = relay.connect_and_run(connection_timeout, &mut last_ws_error) => {}, + _ = relay.connect_and_run(timeout, &mut last_ws_error) => {}, // Handle "terminate" message _ = relay.handle_terminate(&mut rx_service) => { // Update status @@ -557,7 +571,7 @@ impl InnerRelay { /// Depending on attempts and success, use default or incremental retry interval fn calculate_retry_interval(&self) -> Duration { - // Check if incremental interval is enabled + // Check if the incremental interval is enabled if self.opts.adjust_retry_interval { // Calculate the difference between attempts and success let diff: u32 = self.stats.attempts().saturating_sub(self.stats.success()) as u32; @@ -609,11 +623,7 @@ impl InnerRelay { } /// Connect and run message handler - async fn connect_and_run( - &self, - connection_timeout: Duration, - last_ws_error: &mut Option, - ) { + async fn connect_and_run(&self, timeout: Duration, last_ws_error: &mut Option) { // Update status self.set_status(RelayStatus::Connecting, true); @@ -626,7 +636,7 @@ impl InnerRelay { DEFAULT_CONNECTION_TIMEOUT } else { // First attempt, use external timeout - connection_timeout + timeout }; // Connect diff --git a/crates/nostr-relay-pool/src/relay/mod.rs b/crates/nostr-relay-pool/src/relay/mod.rs index 28cd72dd5..886c671c8 100644 --- a/crates/nostr-relay-pool/src/relay/mod.rs +++ b/crates/nostr-relay-pool/src/relay/mod.rs @@ -254,10 +254,22 @@ impl Relay { self.inner.internal_notification_sender.subscribe() } - /// Connect to relay and keep alive connection + /// Connect to relay + /// + /// This method returns immediately and doesn't provide any information on if the connection was successful or not. + #[inline] + pub fn connect(&self) { + self.inner.connect() + } + + /// Try to connect to relay + /// + /// This method returns an error if the connection fails. + /// If the connection fails, + /// a task will continue to retry in the background (unless configured differently in [`RelayOptions`]. #[inline] - pub async fn connect(&self, connection_timeout: Option) { - self.inner.connect(connection_timeout).await + pub async fn try_connect(&self, timeout: Duration) -> Result<(), Error> { + self.inner.try_connect(timeout).await } /// Disconnect from relay and set status to 'Terminated' @@ -443,7 +455,7 @@ mod tests { let relay = Relay::new(url); - relay.connect(Some(Duration::from_millis(100))).await; + relay.try_connect(Duration::from_millis(100)).await.unwrap(); let keys = Keys::generate(); let event = EventBuilder::text_note("Test") @@ -462,7 +474,7 @@ mod tests { assert_eq!(relay.status(), RelayStatus::Initialized); - relay.connect(Some(Duration::from_millis(100))).await; + relay.try_connect(Duration::from_millis(100)).await.unwrap(); assert_eq!(relay.status(), RelayStatus::Connected); @@ -485,7 +497,7 @@ mod tests { assert_eq!(relay.status(), RelayStatus::Initialized); - relay.connect(Some(Duration::from_millis(100))).await; + relay.try_connect(Duration::from_millis(100)).await.unwrap(); assert_eq!(relay.status(), RelayStatus::Connected); @@ -508,7 +520,7 @@ mod tests { assert_eq!(relay.status(), RelayStatus::Initialized); - relay.connect(Some(Duration::from_millis(100))).await; + relay.try_connect(Duration::from_millis(100)).await.unwrap(); assert_eq!(relay.status(), RelayStatus::Connected); @@ -532,7 +544,8 @@ mod tests { assert_eq!(relay.status(), RelayStatus::Initialized); - relay.connect(Some(Duration::from_millis(100))).await; + let res = relay.try_connect(Duration::from_millis(100)).await; + assert!(matches!(res.unwrap_err(), Error::ConnectionFailed)); assert!(relay.inner.is_running()); @@ -562,7 +575,7 @@ mod tests { assert_eq!(relay.status(), RelayStatus::Initialized); - relay.connect(None).await; + relay.connect(); time::sleep(Duration::from_secs(1)).await; @@ -594,7 +607,7 @@ mod tests { assert_eq!(relay.status(), RelayStatus::Initialized); - relay.connect(None).await; + relay.connect(); time::sleep(Duration::from_secs(1)).await; @@ -623,7 +636,7 @@ mod tests { relay.inner.state.automatic_authentication(true); - relay.connect(Some(Duration::from_millis(100))).await; + relay.connect(); // Signer let keys = Keys::generate(); @@ -664,7 +677,7 @@ mod tests { let relay = Relay::new(url); - relay.connect(Some(Duration::from_millis(100))).await; + relay.connect(); // Signer let keys = Keys::generate(); diff --git a/crates/nostr-relay-pool/src/shared.rs b/crates/nostr-relay-pool/src/shared.rs index 587a8b587..85f73683f 100644 --- a/crates/nostr-relay-pool/src/shared.rs +++ b/crates/nostr-relay-pool/src/shared.rs @@ -36,6 +36,7 @@ pub struct SharedState { nip42_auto_authentication: Arc, min_pow_difficulty: Arc, pub(crate) filtering: RelayFiltering, + // TODO: add a semaphore to limit number of concurrent websocket connections attempts? } impl Default for SharedState { diff --git a/crates/nostr-sdk/src/client/mod.rs b/crates/nostr-sdk/src/client/mod.rs index 2b42f7221..cb9747e9e 100644 --- a/crates/nostr-sdk/src/client/mod.rs +++ b/crates/nostr-sdk/src/client/mod.rs @@ -493,13 +493,27 @@ impl Client { } /// Connect to a previously added relay + /// + /// Check [`RelayPool::connect_relay`] docs to learn more. #[inline] pub async fn connect_relay(&self, url: U) -> Result<(), Error> where U: TryIntoUrl, pool::Error: From<::Err>, { - Ok(self.pool.connect_relay(url, None).await?) + Ok(self.pool.connect_relay(url).await?) + } + + /// Try to connect to a previously added relay + /// + /// Check [`RelayPool::try_connect_relay`] docs to learn more. + #[inline] + pub async fn try_connect_relay(&self, url: U, timeout: Duration) -> Result<(), Error> + where + U: TryIntoUrl, + pool::Error: From<::Err>, + { + Ok(self.pool.try_connect_relay(url, timeout).await?) } /// Disconnect relay @@ -515,16 +529,22 @@ impl Client { /// Connect to all added relays #[inline] pub async fn connect(&self) { - self.pool.connect(None).await; + self.pool.connect().await; + } + + /// Connect to all added relays + #[inline] + pub async fn try_connect(&self, timeout: Duration) { + self.pool.try_connect(timeout).await; } /// Connect to all added relays /// /// Try to connect to the relays and wait for them to be connected at most for the specified `timeout`. /// The code continues if the `timeout` is reached or if all relays connect. - #[inline] + #[deprecated(since = "0.38.0", note = "Use `try_connect` instead")] pub async fn connect_with_timeout(&self, timeout: Duration) { - self.pool.connect(Some(timeout)).await + self.pool.try_connect(timeout).await; } /// Disconnect from all relays diff --git a/crates/nwc/src/lib.rs b/crates/nwc/src/lib.rs index 6b6830059..fecdb6d23 100644 --- a/crates/nwc/src/lib.rs +++ b/crates/nwc/src/lib.rs @@ -73,7 +73,7 @@ impl NWC { } // Connect - self.relay.connect(None).await; + self.relay.connect(); let filter = Filter::new() .author(self.uri.public_key)