diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000..5008ddf Binary files /dev/null and b/.DS_Store differ diff --git a/Cargo.lock b/Cargo.lock index 34d714c..6239c51 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -313,12 +313,6 @@ version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6b4930d2cb77ce62f89ee5d5289b4ac049559b1c45539271f5ed4fdc7db34545" -[[package]] -name = "arrayvec" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" - [[package]] name = "arrayvec" version = "0.7.4" @@ -490,17 +484,6 @@ dependencies = [ "typenum", ] -[[package]] -name = "blake2b_simd" -version = "0.5.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "afa748e348ad3be8263be728124b24a24f268266f6f5d58af9d75f6a40b5c587" -dependencies = [ - "arrayref", - "arrayvec 0.5.2", - "constant_time_eq 0.1.5", -] - [[package]] name = "blake3" version = "1.5.0" @@ -508,10 +491,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0231f06152bf547e9c2b5194f247cd97aacf6dcd8b15d8e5ec0663f64580da87" dependencies = [ "arrayref", - "arrayvec 0.7.4", + "arrayvec", "cc", "cfg-if", - "constant_time_eq 0.3.0", + "constant_time_eq", "digest 0.10.7", ] @@ -781,15 +764,6 @@ dependencies = [ "os_str_bytes", ] -[[package]] -name = "clippy" -version = "0.0.302" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d911ee15579a3f50880d8c1d59ef6e79f9533127a3bd342462f5d584f5e8c294" -dependencies = [ - "term", -] - [[package]] name = "console" version = "0.15.8" @@ -829,12 +803,6 @@ version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e4c78c047431fee22c1a7bb92e00ad095a02a983affe4d8a72e2a2c62c1b94f3" -[[package]] -name = "constant_time_eq" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc" - [[package]] name = "constant_time_eq" version = "0.3.0" @@ -1047,17 +1015,6 @@ dependencies = [ "subtle", ] -[[package]] -name = "dirs" -version = "1.0.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3fd78930633bd1c6e35c4b42b1df7b0cbc6bc191146e512bb3bedf243fcc3901" -dependencies = [ - "libc", - "redox_users 0.3.5", - "winapi", -] - [[package]] name = "dirs-next" version = "2.0.0" @@ -1075,7 +1032,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ebda144c4fe02d1f7ea1a7d9641b6fc6b580adcfa024ae48797ecdeb6825b4d" dependencies = [ "libc", - "redox_users 0.4.4", + "redox_users", "winapi", ] @@ -1159,7 +1116,6 @@ dependencies = [ "borsh 1.3.1", "bytemuck", "bytes", - "clippy", "dashmap", "drift", "env_logger 0.10.2", @@ -2006,7 +1962,7 @@ checksum = "85c833ca1e66078851dba29046874e38f08b2c883700aa29a03ddd3b23814ee8" dependencies = [ "bitflags 2.4.2", "libc", - "redox_syscall 0.4.1", + "redox_syscall", ] [[package]] @@ -2441,7 +2397,7 @@ checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e" dependencies = [ "cfg-if", "libc", - "redox_syscall 0.4.1", + "redox_syscall", "smallvec", "windows-targets 0.48.5", ] @@ -2806,12 +2762,6 @@ dependencies = [ "yasna", ] -[[package]] -name = "redox_syscall" -version = "0.1.57" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce" - [[package]] name = "redox_syscall" version = "0.4.1" @@ -2821,17 +2771,6 @@ dependencies = [ "bitflags 1.3.2", ] -[[package]] -name = "redox_users" -version = "0.3.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de0737333e7a9502c789a36d7c7fa6092a49895d4faa31ca5df163857ded2e9d" -dependencies = [ - "getrandom 0.1.16", - "redox_syscall 0.1.57", - "rust-argon2", -] - [[package]] name = "redox_users" version = "0.4.4" @@ -2960,18 +2899,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "rust-argon2" -version = "0.8.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b18820d944b33caa75a71378964ac46f58517c92b6ae5f762636247c09e78fb" -dependencies = [ - "base64 0.13.1", - "blake2b_simd", - "constant_time_eq 0.1.5", - "crossbeam-utils", -] - [[package]] name = "rustc-demangle" version = "0.1.23" @@ -4217,17 +4144,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "term" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "edd106a334b7657c10b7c540a0106114feadeb4dc314513e97df481d5d966f42" -dependencies = [ - "byteorder", - "dirs", - "winapi", -] - [[package]] name = "termcolor" version = "1.4.1" diff --git a/Cargo.toml b/Cargo.toml index 355a6f0..952ade7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,7 +41,6 @@ tokio-tungstenite = { version = "0.21.0", features = ["native-tls"] } regex = "1.10.2" dashmap = "5.5.3" rayon = "1.9.0" -clippy = "0.0.302" [dev-dependencies] pyth = { git = "https://github.com/drift-labs/protocol-v2.git", tag = "v2.67.0", features = [ diff --git a/src/types.rs b/src/types.rs index b6ad4c7..f410ea2 100644 --- a/src/types.rs +++ b/src/types.rs @@ -249,6 +249,8 @@ pub enum SdkError { BorrowError(#[from] BorrowError), #[error("{0}")] Generic(String), + #[error("max connection attempts reached")] + MaxReconnectionAttemptsReached, } impl SdkError { diff --git a/src/websocket_account_subscriber.rs b/src/websocket_account_subscriber.rs index 1bf5db6..c054d32 100644 --- a/src/websocket_account_subscriber.rs +++ b/src/websocket_account_subscriber.rs @@ -76,49 +76,69 @@ impl WebsocketAccountSubscriber { let (unsub_tx, mut unsub_rx) = tokio::sync::mpsc::channel::<()>(1); self.unsubscriber = Some(unsub_tx); + let mut attempt = 0; + let max_reconnection_attempts = 20; + let base_delay = tokio::time::Duration::from_secs(2); + tokio::spawn({ let event_emitter = self.event_emitter.clone(); let mut latest_slot = 0; - let susbscription_name = self.subscription_name; + let subscription_name = self.subscription_name; let pubkey = self.pubkey.clone(); async move { - let (mut account_updates, account_unsubscribe) = pubsub - .account_subscribe(&pubkey, Some(account_config)) - .await - .unwrap(); - loop { - tokio::select! { - message = account_updates.next() => { - match message { - Some(message) => { - let slot = message.context.slot; - if slot >= latest_slot { - latest_slot = slot; - let account_update = AccountUpdate { - pubkey: pubkey.to_string(), - data: message.value, - slot, - }; - event_emitter.emit(susbscription_name, Box::new(account_update)); + let (mut account_updates, account_unsubscribe) = pubsub + .account_subscribe(&pubkey, Some(account_config.clone())) + .await + .unwrap(); + + loop { + tokio::select! { + message = account_updates.next() => { + match message { + Some(message) => { + let slot = message.context.slot; + if slot >= latest_slot { + latest_slot = slot; + let account_update = AccountUpdate { + pubkey: pubkey.to_string(), + data: message.value, + slot, + }; + event_emitter.emit(subscription_name, Box::new(account_update)); + } + } + None => { + log::warn!("{}: Account stream interrupted", subscription_name); + account_unsubscribe().await; + break; } } - None => { - log::warn!("{}: Account stream interrupted", susbscription_name); + } + unsub = unsub_rx.recv() => { + if let Some(_) = unsub { + log::debug!("{}: Unsubscribing from account stream", subscription_name); account_unsubscribe().await; - break; + return Ok(()); + } } } - unsub = unsub_rx.recv() => { - if let Some(_) = unsub { - log::debug!("{}: Unsubscribing from account stream", susbscription_name); - account_unsubscribe().await; - break; + } - } - } + if attempt >= max_reconnection_attempts { + log::error!("{}: Max reconnection attempts reached", subscription_name); + return Err(crate::SdkError::MaxReconnectionAttemptsReached); } + + let delay_duration = base_delay * 2_u32.pow(attempt); + log::debug!( + "{}: Reconnecting in {:?}", + subscription_name, + delay_duration + ); + tokio::time::sleep(delay_duration).await; + attempt += 1; } } }); diff --git a/src/websocket_program_account_subscriber.rs b/src/websocket_program_account_subscriber.rs index ccddebf..ac511fa 100644 --- a/src/websocket_program_account_subscriber.rs +++ b/src/websocket_program_account_subscriber.rs @@ -107,49 +107,68 @@ impl WebsocketProgramAccountSubscriber { let (unsub_tx, mut unsub_rx) = tokio::sync::mpsc::channel::<()>(1); self.unsubscriber = Some(unsub_tx); + let mut attempt = 0; + let max_reconnection_attempts = 20; + let base_delay = tokio::time::Duration::from_secs(5); + tokio::spawn({ let event_emitter = self.event_emitter.clone(); let mut latest_slot = 0; let subscription_name = self.subscription_name; async move { - let (mut accounts, unsubscriber) = pubsub - .program_subscribe(&drift::ID, Some(config)) - .await - .unwrap(); loop { - tokio::select! { - message = accounts.next() => { - match message { - Some(message) => { - let slot = message.context.slot; - if slot >= latest_slot { - latest_slot = slot; - let pubkey = message.value.pubkey; - let account_data = message.value.account.data; - match decode(account_data) { - Ok(data) => { - let data_and_slot = DataAndSlot:: { slot, data }; - event_emitter.emit(subscription_name, Box::new(ProgramAccountUpdate::new(pubkey, data_and_slot))); - }, - Err(e) => { - error!("Error decoding account data {e}"); + let (mut accounts, unsubscriber) = pubsub + .program_subscribe(&drift::ID, Some(config.clone())) + .await + .unwrap(); + loop { + tokio::select! { + message = accounts.next() => { + match message { + Some(message) => { + let slot = message.context.slot; + if slot >= latest_slot { + latest_slot = slot; + let pubkey = message.value.pubkey; + let account_data = message.value.account.data; + match decode(account_data) { + Ok(data) => { + let data_and_slot = DataAndSlot:: { slot, data }; + event_emitter.emit(subscription_name, Box::new(ProgramAccountUpdate::new(pubkey, data_and_slot))); + }, + Err(e) => { + error!("Error decoding account data {e}"); + } } } } - } - None => { - warn!("{} stream ended", subscription_name); - unsubscriber().await; - break; + None => { + warn!("{} stream ended", subscription_name); + unsubscriber().await; + break; + } } } + _ = unsub_rx.recv() => { + debug!("Unsubscribing."); + unsubscriber().await; + return Ok(()); + } } - _ = unsub_rx.recv() => { - debug!("Unsubscribing."); - unsubscriber().await; - break; - } } + + if attempt >= max_reconnection_attempts { + error!("Max reconnection attempts reached."); + return Err(SdkError::MaxReconnectionAttemptsReached); + } + + let delay_duration = base_delay * 2_u32.pow(attempt); + debug!( + "{}: Reconnecting in {:?}", + subscription_name, delay_duration + ); + tokio::time::sleep(delay_duration).await; + attempt += 1; } } });