Skip to content

Commit

Permalink
pool: add Relay::try_connect method
Browse files Browse the repository at this point in the history
Closes #624

Signed-off-by: Yuki Kishimoto <[email protected]>
  • Loading branch information
yukibtc committed Dec 28, 2024
1 parent 47200d2 commit a430a64
Show file tree
Hide file tree
Showing 11 changed files with 163 additions and 73 deletions.
2 changes: 1 addition & 1 deletion crates/nostr-connect/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl NostrConnect {
}

// Connect to relays
self.pool.connect(None).await;
self.pool.connect().await;

// Subscribe
let notifications = self.subscribe().await?;
Expand Down
3 changes: 1 addition & 2 deletions crates/nostr-connect/src/signer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;

use nostr::nips::nip46::{Message, Request, ResponseResult};
use nostr_relay_pool::prelude::*;
Expand Down Expand Up @@ -129,7 +128,7 @@ impl NostrConnectRemoteSigner {
}

// Connect
self.pool.connect(Some(Duration::from_secs(10))).await;
self.pool.connect().await;

let filter = Filter::new()
.pubkey(self.keys.signer.public_key())
Expand Down
2 changes: 1 addition & 1 deletion crates/nostr-relay-pool/examples/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ async fn main() -> Result<()> {

pool.add_relay("wss://relay.damus.io", RelayOptions::default())
.await?;
pool.connect(None).await;
pool.connect().await;

let event = Event::from_json(r#"{"content":"","created_at":1698412975,"id":"f55c30722f056e330d8a7a6a9ba1522f7522c0f1ced1c93d78ea833c78a3d6ec","kind":3,"pubkey":"f831caf722214748c72db4829986bd0cbb2bb8b3aeade1c959624a52a9629046","sig":"5092a9ffaecdae7d7794706f085ff5852befdf79df424cc3419bb797bf515ae05d4f19404cb8324b8b4380a4bd497763ac7b0f3b1b63ef4d3baa17e5f5901808","tags":[["p","4ddeb9109a8cd29ba279a637f5ec344f2479ee07df1f4043f3fe26d8948cfef9","",""],["p","bb6fd06e156929649a73e6b278af5e648214a69d88943702f1fb627c02179b95","",""],["p","b8b8210f33888fdbf5cedee9edf13c3e9638612698fe6408aff8609059053420","",""],["p","9dcee4fabcd690dc1da9abdba94afebf82e1e7614f4ea92d61d52ef9cd74e083","",""],["p","3eea9e831fefdaa8df35187a204d82edb589a36b170955ac5ca6b88340befaa0","",""],["p","885238ab4568f271b572bf48b9d6f99fa07644731f288259bd395998ee24754e","",""],["p","568a25c71fba591e39bebe309794d5c15d27dbfa7114cacb9f3586ea1314d126","",""]]}"#).unwrap();
pool.send_event(event).await?;
Expand Down
42 changes: 34 additions & 8 deletions crates/nostr-relay-pool/src/pool/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -922,15 +922,25 @@ impl InnerRelayPool {
Ok(ReceiverStream::new(rx))
}

pub async fn connect(&self, connection_timeout: Option<Duration>) {
pub async fn connect(&self) {
// Lock with read shared access
let relays = self.atomic.relays.read().await;

// Connect
for relay in relays.values() {
relay.connect()
}
}

pub async fn try_connect(&self, timeout: Duration) {
// Lock with read shared access
let relays = self.atomic.relays.read().await;

let mut futures = Vec::with_capacity(relays.len());

// Filter only relays that can connect and compose futures
for relay in relays.values().filter(|r| r.status().can_connect()) {
futures.push(relay.connect(connection_timeout));
futures.push(relay.try_connect(timeout));
}

// Check number of futures
Expand Down Expand Up @@ -964,11 +974,7 @@ impl InnerRelayPool {
Ok(())
}

pub async fn connect_relay<U>(
&self,
url: U,
connection_timeout: Option<Duration>,
) -> Result<(), Error>
pub async fn connect_relay<U>(&self, url: U) -> Result<(), Error>
where
U: TryIntoUrl,
Error: From<<U as TryIntoUrl>::Err>,
Expand All @@ -983,7 +989,27 @@ impl InnerRelayPool {
let relay: &Relay = self.internal_relay(&relays, &url)?;

// Connect
relay.connect(connection_timeout).await;
relay.connect();

Ok(())
}

pub async fn try_connect_relay<U>(&self, url: U, timeout: Duration) -> Result<(), Error>
where
U: TryIntoUrl,
Error: From<<U as TryIntoUrl>::Err>,
{
// Convert url
let url: RelayUrl = url.try_into_url()?;

// Lock with read shared access
let relays = self.atomic.relays.read().await;

// Get relay
let relay: &Relay = self.internal_relay(&relays, &url)?;

// Try to connect
relay.try_connect(timeout).await?;

Ok(())
}
Expand Down
40 changes: 29 additions & 11 deletions crates/nostr-relay-pool/src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,10 +251,16 @@ impl RelayPool {
self.inner.remove_all_relays(true).await
}

/// Connect to all added relays and keep connection alive
/// Connect to all added relays
#[inline]
pub async fn connect(&self, connection_timeout: Option<Duration>) {
self.inner.connect(connection_timeout).await
pub async fn connect(&self) {
self.inner.connect().await
}

/// Connect to all added relays
#[inline]
pub async fn try_connect(&self, timeout: Duration) {
self.inner.try_connect(timeout).await
}

/// Disconnect from all relays
Expand All @@ -263,18 +269,30 @@ impl RelayPool {
self.inner.disconnect().await
}

/// Connect to relay
/// Connect to a previously added relay
///
/// This method doesn't provide any information on if the connection was successful or not.
///
/// Return [`Error::RelayNotFound`] if the relay doesn't exist in the pool.
#[inline]
pub async fn connect_relay<U>(
&self,
url: U,
connection_timeout: Option<Duration>,
) -> Result<(), Error>
pub async fn connect_relay<U>(&self, url: U) -> Result<(), Error>
where
U: TryIntoUrl,
Error: From<<U as TryIntoUrl>::Err>,
{
self.inner.connect_relay(url).await
}

/// Try to connect to a previously added relay
///
/// This method returns an error if the connection fails.
#[inline]
pub async fn try_connect_relay<U>(&self, url: U, timeout: Duration) -> Result<(), Error>
where
U: TryIntoUrl,
Error: From<<U as TryIntoUrl>::Err>,
{
self.inner.connect_relay(url, connection_timeout).await
self.inner.try_connect_relay(url, timeout).await
}

/// Disconnect relay
Expand Down Expand Up @@ -633,7 +651,7 @@ mod tests {

pool.add_relay(&url, RelayOptions::default()).await.unwrap();

pool.connect(None).await;
pool.connect().await;

assert!(!pool.inner.is_shutdown());

Expand Down
3 changes: 3 additions & 0 deletions crates/nostr-relay-pool/src/relay/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ pub enum Error {
NotReady,
/// Relay not connected
NotConnected,
/// Connection failed
ConnectionFailed,
/// Received shutdown
ReceivedShutdown,
/// Relay message
Expand Down Expand Up @@ -145,6 +147,7 @@ impl fmt::Display for Error {
}
Self::NotReady => write!(f, "relay is initialized but not ready"),
Self::NotConnected => write!(f, "relay not connected"),
Self::ConnectionFailed => write!(f, "connection failed"),
Self::ReceivedShutdown => write!(f, "received shutdown"),
Self::RelayMessage(message) => write!(f, "{message}"),
Self::BatchMessagesEmpty => write!(f, "can't batch empty list of messages"),
Expand Down
76 changes: 43 additions & 33 deletions crates/nostr-relay-pool/src/relay/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,42 +432,56 @@ impl InnerRelay {
}
}

pub async fn connect(&self, connection_timeout: Option<Duration>) {
// Return if relay can't connect
if !self.status().can_connect() {
return;
}

fn _connect(&self, timeout: Duration) {
// Update status
// Change it to pending to avoid issues with the health check (initialized check)
self.set_status(RelayStatus::Pending, false);

// If connection timeout is `Some`, try to connect waiting for connection
match connection_timeout {
Some(timeout) => {
let mut notifications = self.internal_notification_sender.subscribe();
// Spawn connection task
self.spawn_and_try_connect(timeout);
}

pub fn connect(&self) {
if self.status().can_connect() {
self._connect(DEFAULT_CONNECTION_TIMEOUT);
}
}

// Spawn and try connect
self.spawn_and_try_connect(timeout);
pub async fn try_connect(&self, timeout: Duration) -> Result<(), Error> {
// Check if relay can't connect
if !self.status().can_connect() {
// TODO: should return `Error::AlreadyConnected`?
return Ok(());
}

// Wait for status change (connected or disconnected)
tracing::debug!(url = %self.url, "Waiting for status change before continue");
while let Ok(notification) = notifications.recv().await {
if let RelayNotification::RelayStatus {
status: RelayStatus::Connected | RelayStatus::Disconnected,
} = notification
{
break;
// Subscribe to notifications
let mut notifications = self.internal_notification_sender.subscribe();

// Connect
self._connect(timeout);

// Wait for status change
while let Ok(notification) = notifications.recv().await {
if let RelayNotification::RelayStatus { status } = notification {
match status {
// This status shouldn't happen, break the loop
RelayStatus::Initialized => break,
// Waiting for connection, stay in the loop
RelayStatus::Pending | RelayStatus::Connecting => continue,
// Success
RelayStatus::Connected => return Ok(()),
// Failed
RelayStatus::Disconnected | RelayStatus::Terminated => {
return Err(Error::ConnectionFailed)
}
}
}
None => {
self.spawn_and_try_connect(DEFAULT_CONNECTION_TIMEOUT);
}
}

Err(Error::PrematureExit)
}

fn spawn_and_try_connect(&self, connection_timeout: Duration) {
fn spawn_and_try_connect(&self, timeout: Duration) {
if self.is_running() {
tracing::warn!(url = %self.url, "Connection task is already running.");
return;
Expand All @@ -482,7 +496,7 @@ impl InnerRelay {
let mut rx_service = relay.atomic.channels.rx_service().await;

// Last websocket error
// Store it to avoid to print every time the same connection error
// Store it to avoid printing every time the same connection error
let mut last_ws_error = None;

// Auto-connect loop
Expand All @@ -493,7 +507,7 @@ impl InnerRelay {

tokio::select! {
// Connect and run message handler
_ = relay.connect_and_run(connection_timeout, &mut last_ws_error) => {},
_ = relay.connect_and_run(timeout, &mut last_ws_error) => {},
// Handle "terminate" message
_ = relay.handle_terminate(&mut rx_service) => {
// Update status
Expand Down Expand Up @@ -557,7 +571,7 @@ impl InnerRelay {

/// Depending on attempts and success, use default or incremental retry interval
fn calculate_retry_interval(&self) -> Duration {
// Check if incremental interval is enabled
// Check if the incremental interval is enabled
if self.opts.adjust_retry_interval {
// Calculate the difference between attempts and success
let diff: u32 = self.stats.attempts().saturating_sub(self.stats.success()) as u32;
Expand Down Expand Up @@ -609,11 +623,7 @@ impl InnerRelay {
}

/// Connect and run message handler
async fn connect_and_run(
&self,
connection_timeout: Duration,
last_ws_error: &mut Option<String>,
) {
async fn connect_and_run(&self, timeout: Duration, last_ws_error: &mut Option<String>) {
// Update status
self.set_status(RelayStatus::Connecting, true);

Expand All @@ -626,7 +636,7 @@ impl InnerRelay {
DEFAULT_CONNECTION_TIMEOUT
} else {
// First attempt, use external timeout
connection_timeout
timeout
};

// Connect
Expand Down
Loading

0 comments on commit a430a64

Please sign in to comment.