Skip to content

Commit

Permalink
use weak spawner to spawn wc related task/event loops
Browse files Browse the repository at this point in the history
  • Loading branch information
borngraced committed Nov 27, 2024
1 parent 1ff17cd commit 5e89b9f
Show file tree
Hide file tree
Showing 16 changed files with 90 additions and 83 deletions.
4 changes: 2 additions & 2 deletions mm2src/coins/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ use common::executor::{abortable_queue::AbortableQueue, AbortOnDropHandle, Abort
AbortedError, SpawnAbortable, Timer};
use common::log::{debug, error, info, warn};
use common::number_type_casting::SafeTypeCastingNumbers;
use common::{now_ms, wait_until_sec};
use common::wait_until_sec;
use common::{now_sec, small_rng, DEX_FEE_ADDR_RAW_PUBKEY};
use crypto::privkey::key_pair_from_secret;
use crypto::{Bip44Chain, CryptoCtx, CryptoCtxError, GlobalHDAccountArc, KeyPairPolicy};
Expand Down Expand Up @@ -5539,7 +5539,7 @@ impl EthCoin {
check_every: f64,
) -> Web3RpcResult<Option<SignedEthTx>> {
let wait_until = wait_until_sec(wait_rpc_timeout_s);
while now_ms() < wait_until {
while now_sec() < wait_until {
let maybe_tx = self.transaction(TransactionId::Hash(tx_hash)).await?;
if let Some(tx) = maybe_tx {
let signed_tx = signed_tx_from_web3_tx(tx).map_to_mm(Web3RpcError::InvalidResponse)?;
Expand Down
2 changes: 1 addition & 1 deletion mm2src/coins/eth/wallet_connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use web3::signing::hash_message;
use super::EthCoin;

// Wait for 30 seconds for the transaction to appear on the RPC node.
const WAIT_RPC_TIMEOUT_SECS: u64 = 30;
const WAIT_RPC_TIMEOUT_SECS: u64 = 60;

#[derive(Display, Debug, EnumFromStringify)]
pub enum EthWalletConnectError {
Expand Down
19 changes: 12 additions & 7 deletions mm2src/kdf_walletconnect/src/connection_handler.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::storage::WalletConnectStorageOps;
use crate::WalletConnectCtx;
use crate::WalletConnectCtxImpl;

use common::executor::Timer;
use common::log::{debug, error, info};
use futures::channel::mpsc::UnboundedSender;
use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
use futures::StreamExt;
use relay_client::error::ClientError;
use relay_client::websocket::{CloseFrame, ConnectionHandler, PublishedMessage};
Expand Down Expand Up @@ -75,7 +75,10 @@ impl ConnectionHandler for Handler {
/// Establishes initial connection to WalletConnect relay server with linear retry mechanism.
/// Uses increasing delay between retry attempts starting from INITIAL_RETRY_SECS.
/// After successful connection, attempts to restore previous session state from storage.
pub(crate) async fn initialize_connection(wc: Arc<WalletConnectCtx>) {
pub(crate) async fn spawn_connection_initialization(
wc: Arc<WalletConnectCtxImpl>,
connection_live_rx: UnboundedReceiver<Option<String>>,
) {
info!("Initializing WalletConnect connection");
let mut retry_count = 0;
let mut retry_secs = INITIAL_RETRY_SECS;
Expand All @@ -101,17 +104,19 @@ pub(crate) async fn initialize_connection(wc: Arc<WalletConnectCtx>) {
};

// Spawn session disconnection watcher.
handle_disconnections(&wc).await;
handle_disconnections(&wc, connection_live_rx).await;
}

/// Handles unexpected disconnections from WalletConnect relay server.
/// Implements exponential backoff retry mechanism for reconnection attempts.
/// After successful reconnection, resubscribes to previous topics to restore full functionality.
pub(crate) async fn handle_disconnections(this: &WalletConnectCtx) {
let mut recv = this.connection_live_rx.lock().await;
pub(crate) async fn handle_disconnections(
this: &WalletConnectCtxImpl,
mut connection_live_rx: UnboundedReceiver<Option<String>>,
) {
let mut backoff = 1;

while let Some(msg) = recv.next().await {
while let Some(msg) = connection_live_rx.next().await {
info!("WalletConnect disconnected with message: {msg:?}. Attempting to reconnect...");

loop {
Expand Down
6 changes: 3 additions & 3 deletions mm2src/kdf_walletconnect/src/inbound_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{error::WalletConnectError,
propose::{process_session_propose_response, reply_session_proposal_request},
settle::reply_session_settle_request,
update::reply_session_update_request},
WalletConnectCtx};
WalletConnectCtxImpl};

use common::log::{info, LogOnError};
use futures::sink::SinkExt;
Expand All @@ -24,7 +24,7 @@ pub struct SessionMessage {
}

pub(crate) async fn process_inbound_request(
ctx: &WalletConnectCtx,
ctx: &WalletConnectCtxImpl,
request: Request,
topic: &Topic,
) -> MmResult<(), WalletConnectError> {
Expand Down Expand Up @@ -54,7 +54,7 @@ pub(crate) async fn process_inbound_request(
Ok(())
}

pub(crate) async fn process_inbound_response(ctx: &WalletConnectCtx, response: Response, topic: &Topic) {
pub(crate) async fn process_inbound_response(ctx: &WalletConnectCtxImpl, response: Response, topic: &Topic) {
let message_id = response.id();
let result = match response {
Response::Success(value) => match serde_json::from_value::<ResponseParamsSuccess>(value.result) {
Expand Down
78 changes: 41 additions & 37 deletions mm2src/kdf_walletconnect/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ use crate::session::rpc::propose::send_proposal_request;
use chain::{WcChainId, WcRequestMethods, SUPPORTED_PROTOCOL};
use chrono::Utc;
use common::custom_futures::timeout::FutureTimerExt;
use common::executor::{spawn_abortable, AbortOnDropHandle, Timer};
use common::executor::abortable_queue::AbortableQueue;
use common::executor::{AbortableSystem, Timer};
use common::log::{debug, info};
use common::{executor::SpawnFuture, log::error};
use connection_handler::{initialize_connection, Handler};
use connection_handler::{spawn_connection_initialization, Handler};
use error::WalletConnectError;
use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
use futures::lock::Mutex;
Expand All @@ -39,6 +40,7 @@ use session::rpc::delete::send_session_delete_request;
use session::Session;
use session::{key::SymKeyPair, SessionManager};
use std::collections::BTreeSet;
use std::ops::Deref;
use std::{sync::Arc, time::Duration};
use storage::SessionStorageDb;
use storage::WalletConnectStorageOps;
Expand Down Expand Up @@ -69,49 +71,69 @@ pub trait WalletConnectOps {
) -> Result<Self::SendTxData, Self::Error>;
}

pub struct WalletConnectCtx {
pub struct WalletConnectCtxImpl {
pub(crate) client: Client,
pub(crate) pairing: PairingClient,
pub session_manager: SessionManager,
pub(crate) key_pair: SymKeyPair,
relay: Relay,
metadata: Metadata,
inbound_message_rx: Arc<Mutex<UnboundedReceiver<PublishedMessage>>>,
connection_live_rx: Arc<Mutex<UnboundedReceiver<Option<String>>>>,
message_tx: UnboundedSender<SessionMessageType>,
message_rx: Arc<Mutex<UnboundedReceiver<SessionMessageType>>>,
_abort_handle: AbortOnDropHandle,
pub abortable_system: AbortableQueue,
}

pub struct WalletConnectCtx(pub Arc<WalletConnectCtxImpl>);
impl Deref for WalletConnectCtx {
type Target = WalletConnectCtxImpl;
fn deref(&self) -> &Self::Target { &self.0 }
}

impl WalletConnectCtx {
pub fn try_init(ctx: &MmArc) -> MmResult<Self, WalletConnectError> {
let abortable_system = ctx.abortable_system.create_subsystem::<AbortableQueue>().unwrap();
let storage = SessionStorageDb::new(ctx)?;
let pairing = PairingClient::new();
let relay = Relay {
protocol: SUPPORTED_PROTOCOL.to_string(),
data: None,
};
let (inbound_message_tx, inbound_message_rx) = unbounded();
let (inbound_message_tx, mut inbound_message_rx) = unbounded();
let (conn_live_sender, conn_live_receiver) = unbounded();
let (message_tx, session_request_receiver) = unbounded();
let (client, _abort_handle) = Client::new_with_callback(
let (client, _) = Client::new_with_callback(
Handler::new("Komodefi", inbound_message_tx, conn_live_sender),
|r, h| spawn_abortable(client_event_loop(r, h)),
|r, h| abortable_system.weak_spawner().spawn(client_event_loop(r, h)),
);

Ok(Self {
let inner = Arc::new(WalletConnectCtxImpl {
client,
pairing,
relay,
metadata: generate_metadata(),
key_pair: SymKeyPair::new(),
session_manager: SessionManager::new(storage),
inbound_message_rx: Arc::new(inbound_message_rx.into()),
connection_live_rx: Arc::new(conn_live_receiver.into()),
message_rx: Arc::new(session_request_receiver.into()),
message_tx,
_abort_handle,
})
abortable_system,
});

// Connect to relayer client and spawn a watcher loop for disconnection.
inner
.abortable_system
.weak_spawner()
.spawn(spawn_connection_initialization(inner.clone(), conn_live_receiver));
// spawn message handler event loop
let inner_clone = inner.clone();
inner_clone.abortable_system.weak_spawner().spawn(async move {
while let Some(msg) = inbound_message_rx.next().await {
if let Err(e) = inner_clone.handle_published_message(msg).await {
debug!("Error processing message: {:?}", e);
}
}
});

Ok(Self(inner))
}

pub fn from_ctx(ctx: &MmArc) -> MmResult<Arc<WalletConnectCtx>, WalletConnectError> {
Expand All @@ -120,7 +142,9 @@ impl WalletConnectCtx {
})
.map_to_mm(WalletConnectError::InternalError)
}
}

impl WalletConnectCtxImpl {
pub async fn connect_client(&self) -> MmResult<(), WalletConnectError> {
let auth = {
let key = SigningKey::generate(&mut rand::thread_rng());
Expand All @@ -146,7 +170,9 @@ impl WalletConnectCtx {
.flat_map(|s| vec![s.topic, s.pairing_topic])
.collect::<Vec<_>>();

self.client.batch_subscribe(sessions).await?;
if !sessions.is_empty() {
self.client.batch_subscribe(sessions).await?;
}

Ok(())
}
Expand Down Expand Up @@ -519,28 +545,6 @@ impl WalletConnectCtx {
}
}

/// This function spwans related WalletConnect related tasks and needed initialization before
/// WalletConnect can be usable in KDF.
pub async fn initialize_walletconnect(ctx: &MmArc) -> MmResult<(), WalletConnectError> {
// Initialized WalletConnectCtx
let wallet_connect = WalletConnectCtx::from_ctx(ctx)?;
// WalletConnectCtx is initialized, now we can connect to relayer client and spawn a watcher
// loop for disconnection.
ctx.spawner().spawn(initialize_connection(wallet_connect.clone()));

// spawn message handler event loop
ctx.spawner().spawn(async move {
let mut recv = wallet_connect.inbound_message_rx.lock().await;
while let Some(msg) = recv.next().await {
if let Err(e) = wallet_connect.handle_published_message(msg).await {
debug!("Error processing message: {:?}", e);
}
}
});

Ok(())
}

fn find_account_in_namespace<'a>(accounts: &'a BTreeSet<String>, chain_id: &'a str) -> Option<String> {
accounts.iter().find_map(move |account_name| {
let parts: Vec<&str> = account_name.split(':').collect();
Expand Down
8 changes: 4 additions & 4 deletions mm2src/kdf_walletconnect/src/pairing.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::session::{WcRequestResponseResult, THIRTY_DAYS};
use crate::{error::WalletConnectError, WalletConnectCtx};
use crate::{error::WalletConnectError, WalletConnectCtxImpl};

use chrono::Utc;
use mm2_err_handle::prelude::MmResult;
Expand All @@ -11,7 +11,7 @@ use relay_rpc::{domain::Topic,
ResponseParamsSuccess}};

pub(crate) async fn reply_pairing_ping_response(
ctx: &WalletConnectCtx,
ctx: &WalletConnectCtxImpl,
topic: &Topic,
message_id: &MessageId,
) -> MmResult<(), WalletConnectError> {
Expand All @@ -22,7 +22,7 @@ pub(crate) async fn reply_pairing_ping_response(
}

pub(crate) async fn reply_pairing_extend_response(
ctx: &WalletConnectCtx,
ctx: &WalletConnectCtxImpl,
topic: &Topic,
message_id: &MessageId,
extend: PairingExtendRequest,
Expand All @@ -41,7 +41,7 @@ pub(crate) async fn reply_pairing_extend_response(
}

pub(crate) async fn reply_pairing_delete_response(
ctx: &WalletConnectCtx,
ctx: &WalletConnectCtxImpl,
topic: &Topic,
message_id: &MessageId,
_delete: PairingDeleteRequest,
Expand Down
4 changes: 2 additions & 2 deletions mm2src/kdf_walletconnect/src/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ pub mod rpc;

use crate::chain::WcChainId;
use crate::storage::SessionStorageDb;
use crate::{error::WalletConnectError, WalletConnectCtx};
use crate::{error::WalletConnectError, WalletConnectCtxImpl};

use chrono::Utc;
use common::log::info;
Expand Down Expand Up @@ -130,7 +130,7 @@ pub struct Session {

impl Session {
pub fn new(
ctx: &WalletConnectCtx,
ctx: &WalletConnectCtxImpl,
session_topic: Topic,
subscription_id: SubscriptionId,
session_key: SessionKey,
Expand Down
8 changes: 4 additions & 4 deletions mm2src/kdf_walletconnect/src/session/rpc/delete.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use crate::{error::{WalletConnectError, USER_REQUESTED},
storage::WalletConnectStorageOps,
WalletConnectCtx};
WalletConnectCtxImpl};

use common::log::debug;
use mm2_err_handle::prelude::{MapMmError, MmResult};
use relay_rpc::domain::{MessageId, Topic};
use relay_rpc::rpc::params::{session_delete::SessionDeleteRequest, RequestParams, ResponseParamsSuccess};

pub(crate) async fn reply_session_delete_request(
ctx: &WalletConnectCtx,
ctx: &WalletConnectCtxImpl,
topic: &Topic,
message_id: &MessageId,
_delete_params: SessionDeleteRequest,
Expand All @@ -20,7 +20,7 @@ pub(crate) async fn reply_session_delete_request(
}

pub(crate) async fn send_session_delete_request(
ctx: &WalletConnectCtx,
ctx: &WalletConnectCtxImpl,
session_topic: &Topic,
) -> MmResult<(), WalletConnectError> {
let delete_request = SessionDeleteRequest {
Expand All @@ -34,7 +34,7 @@ pub(crate) async fn send_session_delete_request(
session_delete_cleanup(ctx, session_topic).await
}

async fn session_delete_cleanup(ctx: &WalletConnectCtx, topic: &Topic) -> MmResult<(), WalletConnectError> {
async fn session_delete_cleanup(ctx: &WalletConnectCtxImpl, topic: &Topic) -> MmResult<(), WalletConnectError> {
{
ctx.client.unsubscribe(topic.clone()).await?;
};
Expand Down
4 changes: 2 additions & 2 deletions mm2src/kdf_walletconnect/src/session/rpc/event.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{chain::{WcChain, WcChainId},
error::{WalletConnectError, UNSUPPORTED_CHAINS},
WalletConnectCtx};
WalletConnectCtxImpl};

use common::log::{error, info};
use mm2_err_handle::prelude::*;
Expand All @@ -9,7 +9,7 @@ use relay_rpc::{domain::{MessageId, Topic},
ErrorData}};

pub async fn handle_session_event(
ctx: &WalletConnectCtx,
ctx: &WalletConnectCtxImpl,
topic: &Topic,
message_id: &MessageId,
event: SessionEventRequest,
Expand Down
4 changes: 2 additions & 2 deletions mm2src/kdf_walletconnect/src/session/rpc/extend.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use crate::{error::WalletConnectError, WalletConnectCtx};
use crate::{error::WalletConnectError, WalletConnectCtxImpl};

use mm2_err_handle::prelude::MmResult;
use relay_rpc::{domain::{MessageId, Topic},
rpc::params::{session_extend::SessionExtendRequest, ResponseParamsSuccess}};

/// Process session extend request.
pub(crate) async fn reply_session_extend_request(
ctx: &WalletConnectCtx,
ctx: &WalletConnectCtxImpl,
topic: &Topic,
message_id: &MessageId,
extend: SessionExtendRequest,
Expand Down
Loading

0 comments on commit 5e89b9f

Please sign in to comment.