Skip to content

Commit

Permalink
frank/usermap (#14)
Browse files Browse the repository at this point in the history
* usermap

* uncomment cfg directive

* remove unnecessary derive clones

* typo

* make syncs make more sense

* remove duplicate filter fetches

* refactor to store users instead of data and slot

* use subscription encoding in sync

* remove unused import, cargo fmt

* remove unnecessary to_string

* couple fixes for pr
  • Loading branch information
soundsonacid authored Mar 18, 2024
1 parent 68c690c commit db9c093
Show file tree
Hide file tree
Showing 6 changed files with 242 additions and 1 deletion.
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ thiserror = "1.0.38"
tokio = { version = "1.34.0", features = ["full"] }
tokio-tungstenite = { version = "0.21.0", features = ["native-tls"] }
regex = "1.10.2"
dashmap = "5.5.3"

[dev-dependencies]
pyth = { git = "https://github.com/drift-labs/protocol-v2.git", tag = "v2.67.0", features = [
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ pub mod dlob;
pub mod event_subscriber;
pub mod slot_subscriber;

pub mod usermap;

use types::*;

/// Provides solana Account fetching API
Expand Down
212 changes: 212 additions & 0 deletions src/usermap.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};

use crate::event_emitter::EventEmitter;
use crate::memcmp::{get_non_idle_user_filter, get_user_filter};
use crate::utils::{decode, get_ws_url};
use crate::websocket_program_account_subscriber::{
ProgramAccountUpdate, WebsocketProgramAccountOptions, WebsocketProgramAccountSubscriber,
};
use crate::SdkResult;
use anchor_lang::AccountDeserialize;
use dashmap::DashMap;
use drift::state::user::User;
use serde_json::json;
use solana_account_decoder::UiAccountEncoding;
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_client::rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig};
use solana_client::rpc_request::RpcRequest;
use solana_client::rpc_response::{OptionalContext, RpcKeyedAccount};
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::pubkey::Pubkey;

pub struct Usermap {
subscribed: bool,
subscription: WebsocketProgramAccountSubscriber,
usermap: Arc<DashMap<String, User>>,
sync_lock: Option<Mutex<()>>,
latest_slot: Arc<AtomicU64>,
commitment: CommitmentConfig,
rpc: RpcClient,
}

impl Usermap {
pub fn new(commitment: CommitmentConfig, endpoint: String, sync: bool) -> Self {
let filters = vec![get_user_filter(), get_non_idle_user_filter()];
let options = WebsocketProgramAccountOptions {
filters,
commitment,
encoding: UiAccountEncoding::Base64,
};
let event_emitter = EventEmitter::new();

let url = get_ws_url(&endpoint.clone()).unwrap();

let subscription =
WebsocketProgramAccountSubscriber::new("usermap", url, options, event_emitter);

let usermap = Arc::new(DashMap::new());

let rpc = RpcClient::new_with_commitment(endpoint.clone(), commitment);

let sync_lock = if sync { Some(Mutex::new(())) } else { None };

Self {
subscribed: false,
subscription,
usermap,
sync_lock,
latest_slot: Arc::new(AtomicU64::new(0)),
commitment,
rpc,
}
}

pub async fn subscribe(&mut self) -> SdkResult<()> {
if let Some(_) = self.sync_lock {
self.sync().await?;
}

if !self.subscribed {
self.subscription.subscribe::<User>().await?;
self.subscribed = true;
}

let usermap = self.usermap.clone();
let latest_slot = self.latest_slot.clone();

self.subscription
.event_emitter
.subscribe("usermap", move |event| {
if let Some(update) = event.as_any().downcast_ref::<ProgramAccountUpdate<User>>() {
let user_data_and_slot = update.data_and_slot.clone();
let user_pubkey = update.pubkey.to_string();
if update.data_and_slot.slot > latest_slot.load(Ordering::Relaxed) {
latest_slot.store(update.data_and_slot.slot, Ordering::Relaxed);
}
usermap.insert(user_pubkey, user_data_and_slot.data);
}
});

Ok(())
}

pub async fn unsubscribe(&mut self) -> SdkResult<()> {
if self.subscribed {
self.subscription.unsubscribe().await?;
self.subscribed = false;
self.usermap.clear();
self.latest_slot.store(0, Ordering::Relaxed);
}
Ok(())
}

pub fn size(&self) -> usize {
self.usermap.len()
}

pub fn contains(&self, pubkey: &str) -> bool {
self.usermap.contains_key(pubkey)
}

pub fn get(&self, pubkey: &str) -> Option<User> {
self.usermap.get(pubkey).map(|user| user.value().clone())
}

pub async fn must_get(&self, pubkey: &str) -> SdkResult<User> {
if let Some(user) = self.get(pubkey) {
Ok(user)
} else {
let user_data = self
.rpc
.get_account_data(&Pubkey::from_str(pubkey).unwrap())
.await?;
let user = User::try_deserialize(&mut user_data.as_slice()).unwrap();
self.usermap.insert(pubkey.to_string(), user.clone());
Ok(self.get(pubkey).unwrap())
}
}

async fn sync(&mut self) -> SdkResult<()> {
let sync_lock = self.sync_lock.as_ref().expect("expected sync lock");

let lock = match sync_lock.try_lock() {
Ok(lock) => lock,
Err(_) => return Ok(()),
};

let account_config = RpcAccountInfoConfig {
commitment: Some(self.commitment),
encoding: Some(self.subscription.options.encoding),
..RpcAccountInfoConfig::default()
};

let gpa_config = RpcProgramAccountsConfig {
filters: Some(self.subscription.options.filters.clone()),
account_config,
with_context: Some(true),
};

let response = self
.rpc
.send::<OptionalContext<Vec<RpcKeyedAccount>>>(
RpcRequest::GetProgramAccounts,
json!([drift::id().to_string(), gpa_config]),
)
.await?;

if let OptionalContext::Context(accounts) = response {
for account in accounts.value {
let pubkey = account.pubkey;
let user_data = account.account.data;
let data = decode::<User>(user_data)?;
self.usermap.insert(pubkey, data);
}

self.latest_slot
.store(accounts.context.slot, Ordering::Relaxed);
}

drop(lock);
Ok(())
}

pub fn get_latest_slot(&self) -> u64 {
self.latest_slot.load(Ordering::Relaxed)
}
}

#[cfg(test)]
mod tests {

#[tokio::test]
#[cfg(rpc_tests)]
async fn test_usermap() {
use crate::usermap::Usermap;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::commitment_config::CommitmentLevel;

let endpoint = "rpc_url".to_string();
let commitment = CommitmentConfig {
commitment: CommitmentLevel::Processed,
};

let mut usermap = Usermap::new(commitment, endpoint, true);
usermap.subscribe().await.unwrap();

tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;

dbg!(usermap.size());
assert!(usermap.size() > 50000);

dbg!(usermap.get_latest_slot());

usermap.unsubscribe().await.unwrap();

tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;

assert_eq!(usermap.size(), 0);
assert_eq!(usermap.subscribed, false);
}
}
12 changes: 12 additions & 0 deletions src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,18 @@ pub fn http_to_ws(url: &str) -> Result<String, &'static str> {
Ok(format!("{}/ws", base_url.trim_end_matches('/')))
}

pub fn get_ws_url(url: &str) -> Result<String, &'static str> {
let base_url = if url.starts_with("http://") {
url.replacen("http://", "ws://", 1)
} else if url.starts_with("https://") {
url.replacen("https://", "wss://", 1)
} else {
return Err("Invalid URL scheme");
};

Ok(base_url)
}

pub fn dlob_subscribe_ws_json(market: &str) -> String {
json!({
"type": "subscribe",
Expand Down
2 changes: 1 addition & 1 deletion src/websocket_program_account_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub struct WebsocketProgramAccountOptions {
pub struct WebsocketProgramAccountSubscriber {
subscription_name: &'static str,
url: String,
options: WebsocketProgramAccountOptions,
pub(crate) options: WebsocketProgramAccountOptions,
pub subscribed: bool,
pub event_emitter: EventEmitter,
unsubscriber: Option<tokio::sync::mpsc::Sender<()>>,
Expand Down

0 comments on commit db9c093

Please sign in to comment.