Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

frank/usermap #14

Merged
merged 11 commits into from
Mar 18, 2024
Merged
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ thiserror = "1.0.38"
tokio = { version = "1.34.0", features = ["full"] }
tokio-tungstenite = { version = "0.21.0", features = ["native-tls"] }
regex = "1.10.2"
dashmap = "5.5.3"

[dev-dependencies]
pyth = { git = "https://github.com/drift-labs/protocol-v2.git", tag = "v2.67.0", features = [
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ pub mod dlob;
pub mod event_subscriber;
pub mod slot_subscriber;

pub mod usermap;

use types::*;

/// Provides solana Account fetching API
Expand Down
212 changes: 212 additions & 0 deletions src/usermap.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};

use crate::event_emitter::EventEmitter;
use crate::memcmp::{get_non_idle_user_filter, get_user_filter};
use crate::utils::{decode, get_ws_url};
use crate::websocket_program_account_subscriber::{
ProgramAccountUpdate, WebsocketProgramAccountOptions, WebsocketProgramAccountSubscriber,
};
use crate::SdkResult;
use anchor_lang::AccountDeserialize;
use dashmap::DashMap;
use drift::state::user::User;
use serde_json::json;
use solana_account_decoder::UiAccountEncoding;
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_client::rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig};
use solana_client::rpc_request::RpcRequest;
use solana_client::rpc_response::{OptionalContext, RpcKeyedAccount};
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::pubkey::Pubkey;

pub struct Usermap {
subscribed: bool,
subscription: WebsocketProgramAccountSubscriber,
usermap: Arc<DashMap<String, User>>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is String helpful here?

Suggested change
usermap: Arc<DashMap<String, User>>,
usermap: Arc<DashMap<Pubkey, User>>,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the pubkeys come out of the events as strings, i think keeping it string makes sense

sync_lock: Option<Mutex<()>>,
latest_slot: Arc<AtomicU64>,
commitment: CommitmentConfig,
rpc: RpcClient,
}

impl Usermap {
pub fn new(commitment: CommitmentConfig, endpoint: String, sync: bool) -> Self {
let filters = vec![get_user_filter(), get_non_idle_user_filter()];
let options = WebsocketProgramAccountOptions {
filters,
commitment,
encoding: UiAccountEncoding::Base64,
};
let event_emitter = EventEmitter::new();

let url = get_ws_url(&endpoint.clone()).unwrap();
soundsonacid marked this conversation as resolved.
Show resolved Hide resolved

let subscription =
WebsocketProgramAccountSubscriber::new("usermap", url, options, event_emitter);

let usermap = Arc::new(DashMap::new());

let rpc = RpcClient::new_with_commitment(endpoint.clone(), commitment);

let sync_lock = if sync { Some(Mutex::new(())) } else { None };

Self {
subscribed: false,
subscription,
usermap,
sync_lock,
latest_slot: Arc::new(AtomicU64::new(0)),
commitment,
rpc,
}
}

pub async fn subscribe(&mut self) -> SdkResult<()> {
if let Some(_) = self.sync_lock {
self.sync().await?;
}

if !self.subscribed {
self.subscription.subscribe::<User>().await?;
self.subscribed = true;
}

let usermap = self.usermap.clone();
let latest_slot = self.latest_slot.clone();

self.subscription
.event_emitter
.subscribe("usermap", move |event| {
if let Some(update) = event.as_any().downcast_ref::<ProgramAccountUpdate<User>>() {
let user_data_and_slot = update.data_and_slot.clone();
let user_pubkey = update.pubkey.to_string();
if update.data_and_slot.slot > latest_slot.load(Ordering::Relaxed) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe >= ? so updates from different user in the same slot should both go through

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the data updates will still go through, it just won't go to the effort of updating the latest slot if it's the same slot

latest_slot.store(update.data_and_slot.slot, Ordering::Relaxed);
}
usermap.insert(user_pubkey, user_data_and_slot.data);
}
});

Ok(())
}

pub async fn unsubscribe(&mut self) -> SdkResult<()> {
if self.subscribed {
self.subscription.unsubscribe().await?;
self.subscribed = false;
self.usermap.clear();
self.latest_slot.store(0, Ordering::Relaxed);
}
Ok(())
}

pub fn size(&self) -> usize {
self.usermap.len()
}

pub fn contains(&self, pubkey: &str) -> bool {
self.usermap.contains_key(pubkey)
}

pub fn get(&self, pubkey: &str) -> Option<User> {
self.usermap.get(pubkey).map(|user| user.value().clone())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible to return &User? the caller can clone if they need

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

without using Cow i don't think so and i'd really rather not use Cow, cloning 4.3kb shouldn't be that expensive

}

pub async fn must_get(&self, pubkey: &str) -> SdkResult<User> {
if let Some(user) = self.get(pubkey) {
Ok(user)
} else {
let user_data = self
.rpc
.get_account_data(&Pubkey::from_str(pubkey).unwrap())
.await?;
let user = User::try_deserialize(&mut user_data.as_slice()).unwrap();
self.usermap.insert(pubkey.to_string(), user.clone());
Ok(self.get(pubkey).unwrap())
}
}

async fn sync(&mut self) -> SdkResult<()> {
let sync_lock = self.sync_lock.as_ref().expect("expected sync lock");

let lock = match sync_lock.try_lock() {
Ok(lock) => lock,
Err(_) => return Ok(()),
};

let account_config = RpcAccountInfoConfig {
commitment: Some(self.commitment),
encoding: Some(self.subscription.options.encoding),
..RpcAccountInfoConfig::default()
};

let gpa_config = RpcProgramAccountsConfig {
filters: Some(self.subscription.options.filters.clone()),
account_config,
with_context: Some(true),
};

let response = self
.rpc
.send::<OptionalContext<Vec<RpcKeyedAccount>>>(
RpcRequest::GetProgramAccounts,
json!([drift::id().to_string(), gpa_config]),
)
.await?;

if let OptionalContext::Context(accounts) = response {
for account in accounts.value {
let pubkey = account.pubkey;
let user_data = account.account.data;
let data = decode::<User>(user_data)?;
self.usermap.insert(pubkey, data);
}

self.latest_slot
.store(accounts.context.slot, Ordering::Relaxed);
}

drop(lock);
Ok(())
}

pub fn get_latest_slot(&self) -> u64 {
self.latest_slot.load(Ordering::Relaxed)
}
}

#[cfg(test)]
mod tests {

#[tokio::test]
#[cfg(rpc_tests)]
async fn test_usermap() {
use crate::usermap::Usermap;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::commitment_config::CommitmentLevel;

let endpoint = "rpc_url".to_string();
let commitment = CommitmentConfig {
commitment: CommitmentLevel::Processed,
};

let mut usermap = Usermap::new(commitment, endpoint, true);
usermap.subscribe().await.unwrap();

tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;

dbg!(usermap.size());
assert!(usermap.size() > 50000);

dbg!(usermap.get_latest_slot());

usermap.unsubscribe().await.unwrap();

tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;

assert_eq!(usermap.size(), 0);
assert_eq!(usermap.subscribed, false);
}
}
12 changes: 12 additions & 0 deletions src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,18 @@ pub fn http_to_ws(url: &str) -> Result<String, &'static str> {
Ok(format!("{}/ws", base_url.trim_end_matches('/')))
}

pub fn get_ws_url(url: &str) -> Result<String, &'static str> {
let base_url = if url.starts_with("http://") {
url.replacen("http://", "ws://", 1)
} else if url.starts_with("https://") {
url.replacen("https://", "wss://", 1)
} else {
return Err("Invalid URL scheme");
};

Ok(base_url)
}

pub fn dlob_subscribe_ws_json(market: &str) -> String {
json!({
"type": "subscribe",
Expand Down
2 changes: 1 addition & 1 deletion src/websocket_program_account_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub struct WebsocketProgramAccountOptions {
pub struct WebsocketProgramAccountSubscriber {
subscription_name: &'static str,
url: String,
options: WebsocketProgramAccountOptions,
pub(crate) options: WebsocketProgramAccountOptions,
pub subscribed: bool,
pub event_emitter: EventEmitter,
unsubscriber: Option<tokio::sync::mpsc::Sender<()>>,
Expand Down
Loading