Skip to content

Commit

Permalink
retry logic for dropped ws
Browse files Browse the repository at this point in the history
  • Loading branch information
soundsonacid committed Mar 22, 2024
1 parent 058cd51 commit d2582c1
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 149 deletions.
Binary file added .DS_Store
Binary file not shown.
94 changes: 5 additions & 89 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ tokio-tungstenite = { version = "0.21.0", features = ["native-tls"] }
regex = "1.10.2"
dashmap = "5.5.3"
rayon = "1.9.0"
clippy = "0.0.302"

[dev-dependencies]
pyth = { git = "https://github.com/drift-labs/protocol-v2.git", tag = "v2.67.0", features = [
Expand Down
2 changes: 2 additions & 0 deletions src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,8 @@ pub enum SdkError {
BorrowError(#[from] BorrowError),
#[error("{0}")]
Generic(String),
#[error("max connection attempts reached")]
MaxReconnectionAttemptsReached,
}

impl SdkError {
Expand Down
78 changes: 49 additions & 29 deletions src/websocket_account_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,49 +76,69 @@ impl WebsocketAccountSubscriber {
let (unsub_tx, mut unsub_rx) = tokio::sync::mpsc::channel::<()>(1);
self.unsubscriber = Some(unsub_tx);

let mut attempt = 0;
let max_reconnection_attempts = 20;
let base_delay = tokio::time::Duration::from_secs(2);

tokio::spawn({
let event_emitter = self.event_emitter.clone();
let mut latest_slot = 0;
let susbscription_name = self.subscription_name;
let subscription_name = self.subscription_name;
let pubkey = self.pubkey.clone();
async move {
let (mut account_updates, account_unsubscribe) = pubsub
.account_subscribe(&pubkey, Some(account_config))
.await
.unwrap();

loop {
tokio::select! {
message = account_updates.next() => {
match message {
Some(message) => {
let slot = message.context.slot;
if slot >= latest_slot {
latest_slot = slot;
let account_update = AccountUpdate {
pubkey: pubkey.to_string(),
data: message.value,
slot,
};
event_emitter.emit(susbscription_name, Box::new(account_update));
let (mut account_updates, account_unsubscribe) = pubsub
.account_subscribe(&pubkey, Some(account_config.clone()))
.await
.unwrap();

loop {
tokio::select! {
message = account_updates.next() => {
match message {
Some(message) => {
let slot = message.context.slot;
if slot >= latest_slot {
latest_slot = slot;
let account_update = AccountUpdate {
pubkey: pubkey.to_string(),
data: message.value,
slot,
};
event_emitter.emit(subscription_name, Box::new(account_update));
}
}
None => {
log::warn!("{}: Account stream interrupted", subscription_name);
account_unsubscribe().await;
break;
}
}
None => {
log::warn!("{}: Account stream interrupted", susbscription_name);
}
unsub = unsub_rx.recv() => {
if let Some(_) = unsub {
log::debug!("{}: Unsubscribing from account stream", subscription_name);
account_unsubscribe().await;
break;
return Ok(());

}
}
}
unsub = unsub_rx.recv() => {
if let Some(_) = unsub {
log::debug!("{}: Unsubscribing from account stream", susbscription_name);
account_unsubscribe().await;
break;
}

}
}
if attempt >= max_reconnection_attempts {
log::error!("{}: Max reconnection attempts reached", subscription_name);
return Err(crate::SdkError::MaxReconnectionAttemptsReached);
}

let delay_duration = base_delay * 2_u32.pow(attempt);
log::debug!(
"{}: Reconnecting in {:?}",
subscription_name,
delay_duration
);
tokio::time::sleep(delay_duration).await;
attempt += 1;
}
}
});
Expand Down
Loading

0 comments on commit d2582c1

Please sign in to comment.