Skip to content

Commit

Permalink
pool: add Relay::try_connect method
Browse files Browse the repository at this point in the history
TODO: description

Closes #624

Signed-off-by: Yuki Kishimoto <[email protected]>
  • Loading branch information
yukibtc committed Dec 23, 2024
1 parent 9f3f9b1 commit d9d2aa5
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 52 deletions.
41 changes: 37 additions & 4 deletions crates/nostr-relay-pool/src/pool/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -912,15 +912,25 @@ impl InnerRelayPool {
Ok(ReceiverStream::new(rx))
}

pub async fn connect(&self, connection_timeout: Option<Duration>) {
pub async fn connect(&self) {
// Lock with read shared access
let relays = self.relays.read().await;

// Connect
for relay in relays.values() {
relay.connect()
}
}

pub async fn try_connect(&self, timeout: Duration) {
// Lock with read shared access
let relays = self.relays.read().await;

let mut futures = Vec::with_capacity(relays.len());

// Filter only relays that can connect and compose futures
for relay in relays.values().filter(|r| r.status().can_connect()) {
futures.push(relay.connect(connection_timeout));
futures.push(relay.try_connect(timeout));
}

// Check number of futures
Expand Down Expand Up @@ -957,7 +967,6 @@ impl InnerRelayPool {
pub async fn connect_relay<U>(
&self,
url: U,
connection_timeout: Option<Duration>,
) -> Result<(), Error>
where
U: TryIntoUrl,
Expand All @@ -973,7 +982,31 @@ impl InnerRelayPool {
let relay: &Relay = self.internal_relay(&relays, &url)?;

// Connect
relay.connect(connection_timeout).await;
relay.connect();

Ok(())
}

pub async fn try_connect_relay<U>(
&self,
url: U,
timeout: Duration,
) -> Result<(), Error>
where
U: TryIntoUrl,
Error: From<<U as TryIntoUrl>::Err>,
{
// Convert url
let url: RelayUrl = url.try_into_url()?;

// Lock with read shared access
let relays = self.relays.read().await;

// Get relay
let relay: &Relay = self.internal_relay(&relays, &url)?;

// Try onnect
relay.try_connect(timeout).await?;

Ok(())
}
Expand Down
37 changes: 31 additions & 6 deletions crates/nostr-relay-pool/src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,16 @@ impl RelayPool {
self.inner.remove_all_relays(true).await
}

/// Connect to all added relays and keep connection alive
/// Connect to all added relays
#[inline]
pub async fn connect(&self, connection_timeout: Option<Duration>) {
self.inner.connect(connection_timeout).await
pub async fn connect(&self) {
self.inner.connect().await
}

/// Connect to all added relays
#[inline]
pub async fn try_connect(&self, timeout: Duration) {
self.inner.try_connect(timeout).await
}

/// Disconnect from all relays
Expand All @@ -257,17 +263,36 @@ impl RelayPool {
}

/// Connect to relay
///
/// This method doesn't provide any information on if the connection was successful or not.
///
/// Return [`Error::RelayNotFound`] if the relay doesn't exist in the pool.
#[inline]
pub async fn connect_relay<U>(
&self,
url: U,
connection_timeout: Option<Duration>,
) -> Result<(), Error>
where
U: TryIntoUrl,
Error: From<<U as TryIntoUrl>::Err>,
{
self.inner.connect_relay(url, connection_timeout).await
self.inner.connect_relay(url).await
}

/// Try to connect to relay
///
/// This method returns an error if the connection fails.
#[inline]
pub async fn try_connect_relay<U>(
&self,
url: U,
timeout: Duration,
) -> Result<(), Error>
where
U: TryIntoUrl,
Error: From<<U as TryIntoUrl>::Err>,
{
self.inner.try_connect_relay(url, timeout).await
}

/// Disconnect relay
Expand Down Expand Up @@ -626,7 +651,7 @@ mod tests {

pool.add_relay(&url, RelayOptions::default()).await.unwrap();

pool.connect(None).await;
pool.connect().await;

assert!(!pool.inner.is_shutdown());

Expand Down
3 changes: 3 additions & 0 deletions crates/nostr-relay-pool/src/relay/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ pub enum Error {
NotReady,
/// Relay not connected
NotConnected,
/// Connection failed
ConnectionFailed,
/// Received shutdown
ReceivedShutdown,
/// Relay message
Expand Down Expand Up @@ -145,6 +147,7 @@ impl fmt::Display for Error {
}
Self::NotReady => write!(f, "relay is initialized but not ready"),
Self::NotConnected => write!(f, "relay not connected"),
Self::ConnectionFailed => write!(f, "connection failed"),
Self::ReceivedShutdown => write!(f, "received shutdown"),
Self::RelayMessage(message) => write!(f, "{message}"),
Self::BatchMessagesEmpty => write!(f, "can't batch empty list of messages"),
Expand Down
68 changes: 40 additions & 28 deletions crates/nostr-relay-pool/src/relay/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,42 +423,54 @@ impl InnerRelay {
}
}

pub async fn connect(&self, connection_timeout: Option<Duration>) {
// Return if relay can't connect
if !self.status().can_connect() {
return;
}

fn _connect(&self, timeout: Duration) {
// Update status
// Change it to pending to avoid issues with the health check (initialized check)
self.set_status(RelayStatus::Pending, false);

// If connection timeout is `Some`, try to connect waiting for connection
match connection_timeout {
Some(timeout) => {
let mut notifications = self.internal_notification_sender.subscribe();
// Spawn connection task
self.spawn_and_try_connect(timeout);
}

// Spawn and try connect
self.spawn_and_try_connect(timeout);
pub fn connect(&self) {
if self.status().can_connect() {
self._connect(DEFAULT_CONNECTION_TIMEOUT);
}
}

// Wait for status change (connected or disconnected)
tracing::debug!(url = %self.url, "Waiting for status change before continue");
while let Ok(notification) = notifications.recv().await {
if let RelayNotification::RelayStatus {
status: RelayStatus::Connected | RelayStatus::Disconnected,
} = notification
{
break;
}
pub async fn try_connect(&self, timeout: Duration) -> Result<(), Error> {
// Check if relay can't connect
if self.status().can_connect() {
// TODO: should return `Error::AlreadyConnected`?
return Ok(());
}

// Subscribe to notifications
let mut notifications = self.internal_notification_sender.subscribe();

// Connect
self._connect(timeout);

// Wait for status change (connected or disconnected)
while let Ok(notification) = notifications.recv().await {
if let RelayNotification::RelayStatus { status } = notification {
match status {
// These statuses shouldn't happen, break the loop
RelayStatus::Initialized | RelayStatus::Pending => break,
// Waiting for connection, stay in the loop
RelayStatus::Connecting => continue,
// Success
RelayStatus::Connected => return Ok(()),
// Failed
RelayStatus::Disconnected | RelayStatus::Terminated => return Err(Error::ConnectionFailed),
}
}
None => {
self.spawn_and_try_connect(DEFAULT_CONNECTION_TIMEOUT);
}
}

Err(Error::PrematureExit)
}

fn spawn_and_try_connect(&self, connection_timeout: Duration) {
fn spawn_and_try_connect(&self, timeout: Duration) {
if self.is_running() {
tracing::warn!(url = %self.url, "Connection task is already running.");
return;
Expand All @@ -480,7 +492,7 @@ impl InnerRelay {

tokio::select! {
// Connect and run message handler
_ = relay.connect_and_run(connection_timeout) => {},
_ = relay.connect_and_run(timeout) => {},
// Handle terminate
_ = relay.handle_terminate(&mut rx_service) => {
// Update status
Expand Down Expand Up @@ -596,7 +608,7 @@ impl InnerRelay {
}

/// Connect and run message handler
async fn connect_and_run(&self, connection_timeout: Duration) {
async fn connect_and_run(&self, timeout: Duration) {
// Update status
self.set_status(RelayStatus::Connecting, true);

Expand All @@ -609,7 +621,7 @@ impl InnerRelay {
DEFAULT_CONNECTION_TIMEOUT
} else {
// First attempt, use external timeout
connection_timeout
timeout
};

// Connect
Expand Down
35 changes: 23 additions & 12 deletions crates/nostr-relay-pool/src/relay/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,20 @@ impl Relay {
self.inner.internal_notification_sender.subscribe()
}

/// Connect to relay and keep alive connection
/// Connect to relay
///
/// This method returns immediately and doesn't provide any information on if the connection was successful or not.
#[inline]
pub async fn connect(&self, connection_timeout: Option<Duration>) {
self.inner.connect(connection_timeout).await
pub fn connect(&self) {
self.inner.connect()
}

/// Try to connect to relay
///
/// This method returns an error if the connection fails.
#[inline]
pub async fn try_connect(&self, timeout: Duration) -> Result<(), Error> {
self.inner.try_connect(timeout).await
}

/// Disconnect from relay and set status to 'Terminated'
Expand Down Expand Up @@ -444,7 +454,7 @@ mod tests {

let relay = Relay::new(url);

relay.connect(Some(Duration::from_millis(100))).await;
relay.try_connect(Duration::from_millis(100)).await.unwrap();

let keys = Keys::generate();
let event = EventBuilder::text_note("Test")
Expand All @@ -463,7 +473,7 @@ mod tests {

assert_eq!(relay.status(), RelayStatus::Initialized);

relay.connect(Some(Duration::from_millis(100))).await;
relay.try_connect(Duration::from_millis(100)).await.unwrap();

assert_eq!(relay.status(), RelayStatus::Connected);

Expand All @@ -486,7 +496,7 @@ mod tests {

assert_eq!(relay.status(), RelayStatus::Initialized);

relay.connect(Some(Duration::from_millis(100))).await;
relay.try_connect(Duration::from_millis(100)).await.unwrap();

assert_eq!(relay.status(), RelayStatus::Connected);

Expand All @@ -509,7 +519,7 @@ mod tests {

assert_eq!(relay.status(), RelayStatus::Initialized);

relay.connect(Some(Duration::from_millis(100))).await;
relay.try_connect(Duration::from_millis(100)).await.unwrap();

assert_eq!(relay.status(), RelayStatus::Connected);

Expand All @@ -533,7 +543,8 @@ mod tests {

assert_eq!(relay.status(), RelayStatus::Initialized);

relay.connect(Some(Duration::from_millis(100))).await;
let res = relay.try_connect(Duration::from_millis(100)).await;
assert!(matches!(res.unwrap_err(), Error::ConnectionFailed));

assert!(relay.inner.is_running());

Expand Down Expand Up @@ -563,7 +574,7 @@ mod tests {

assert_eq!(relay.status(), RelayStatus::Initialized);

relay.connect(None).await;
relay.connect();

time::sleep(Duration::from_secs(1)).await;

Expand Down Expand Up @@ -595,7 +606,7 @@ mod tests {

assert_eq!(relay.status(), RelayStatus::Initialized);

relay.connect(None).await;
relay.connect();

time::sleep(Duration::from_secs(1)).await;

Expand Down Expand Up @@ -624,7 +635,7 @@ mod tests {

relay.inner.state.automatic_authentication(true);

relay.connect(Some(Duration::from_millis(100))).await;
relay.connect();

// Signer
let keys = Keys::generate();
Expand Down Expand Up @@ -665,7 +676,7 @@ mod tests {

let relay = Relay::new(url);

relay.connect(Some(Duration::from_millis(100))).await;
relay.connect();

// Signer
let keys = Keys::generate();
Expand Down
3 changes: 1 addition & 2 deletions crates/nostr-relay-pool/src/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,14 @@ impl fmt::Display for SharedStateError {
}
}

// TODO: add SharedStateBuilder?

#[derive(Debug, Clone)]
pub struct SharedState {
pub(crate) database: Arc<dyn NostrDatabase>,
signer: Arc<RwLock<Option<Arc<dyn NostrSigner>>>>,
nip42_auto_authentication: Arc<AtomicBool>,
min_pow_difficulty: Arc<AtomicU8>,
pub(crate) filtering: RelayFiltering,
// TODO: add a semaphore to limit number of concurrent websocket connections attempts?
}

impl Default for SharedState {
Expand Down

0 comments on commit d9d2aa5

Please sign in to comment.