Skip to content

Commit

Permalink
Fixing accounts on demand (#371)
Browse files Browse the repository at this point in the history
* Fixing accounts on demand

* Adding missing file and renaming gprc to grpc

* Adding missing grpc_accounts_streaming
  • Loading branch information
godmodegalactus authored Mar 26, 2024
1 parent 3c99459 commit 29bd6de
Show file tree
Hide file tree
Showing 10 changed files with 243 additions and 376 deletions.
124 changes: 89 additions & 35 deletions accounts-on-demand/src/accounts_on_demand.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
use std::sync::Arc;
use std::{collections::HashMap, sync::Arc, time::Duration};

use async_trait::async_trait;
use dashmap::DashSet;
use futures::lock::Mutex;
use itertools::Itertools;
use prometheus::{opts, register_int_gauge, IntGauge};
use solana_client::{
nonblocking::rpc_client::RpcClient,
rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig},
rpc_filter::RpcFilterType,
};
use solana_lite_rpc_accounts::account_store_interface::AccountStorageInterface;
use solana_lite_rpc_accounts::account_store_interface::{
AccountLoadingError, AccountStorageInterface,
};
use solana_lite_rpc_cluster_endpoints::geyser_grpc_connector::GrpcSourceConfig;
use solana_lite_rpc_core::{
commitment_utils::Commitment,
Expand All @@ -19,7 +22,7 @@ use solana_lite_rpc_core::{
},
};
use solana_sdk::{clock::Slot, pubkey::Pubkey};
use tokio::sync::{broadcast::Sender, RwLock};
use tokio::sync::{broadcast::Sender, Notify, RwLock};

use crate::subscription_manager::SubscriptionManger;

Expand All @@ -31,12 +34,15 @@ lazy_static::lazy_static! {
register_int_gauge!(opts!("literpc_number_of_program_filters_on_demand", "Number of program filters on demand")).unwrap();
}

const RETRY_FETCHING_ACCOUNT: usize = 10;

pub struct AccountsOnDemand {
rpc_client: Arc<RpcClient>,
accounts_storage: Arc<dyn AccountStorageInterface>,
accounts_subscribed: Arc<DashSet<Pubkey>>,
program_filters: Arc<RwLock<AccountFilters>>,
subscription_manager: SubscriptionManger,
accounts_is_loading: Arc<Mutex<HashMap<Pubkey, Arc<Notify>>>>,
}

impl AccountsOnDemand {
Expand All @@ -56,6 +62,7 @@ impl AccountsOnDemand {
accounts_storage,
account_notification_sender,
),
accounts_is_loading: Arc::new(Mutex::new(HashMap::new())),
}
}

Expand Down Expand Up @@ -102,52 +109,99 @@ impl AccountStorageInterface for AccountsOnDemand {
.await
}

async fn get_account(&self, account_pk: Pubkey, commitment: Commitment) -> Option<AccountData> {
async fn get_account(
&self,
account_pk: Pubkey,
commitment: Commitment,
) -> Result<Option<AccountData>, AccountLoadingError> {
match self
.accounts_storage
.get_account(account_pk, commitment)
.await
.await?
{
Some(account_data) => Some(account_data),
Some(account_data) => Ok(Some(account_data)),
None => {
// account does not exist in account store
// first check if we have already subscribed to the required account
// This is to avoid resetting geyser subscription because of accounts that do not exists.
if !self.accounts_subscribed.contains(&account_pk) {
// get account from rpc and create its subscription
self.accounts_subscribed.insert(account_pk);
self.refresh_subscription().await;
let account_response = self
.rpc_client
.get_account_with_commitment(
&account_pk,
commitment.into_commiment_config(),
let mut lk = self.accounts_is_loading.lock().await;
match lk.get(&account_pk).cloned() {
Some(loading_account) => {
drop(lk);
match tokio::time::timeout(
Duration::from_secs(10),
loading_account.notified(),
)
.await;
if let Ok(response) = account_response {
match response.value {
Some(account) => {
// update account in storage and return the account data
let account_data = AccountData {
pubkey: account_pk,
account: Arc::new(account),
updated_slot: response.context.slot,
};
.await
{
Ok(_) => {
self.accounts_storage
.update_account(account_data.clone(), commitment)
.get_account(account_pk, commitment)
.await
}
Err(_timeout) => Err(AccountLoadingError::OperationTimeOut),
}
}
None => {
// account is not loading
if self.accounts_subscribed.contains(&account_pk) {
// account was already tried to be loaded but does not exists
Ok(None)
} else {
// update account loading map
// create a notify for accounts under loading
lk.insert(account_pk, Arc::new(Notify::new()));
self.accounts_subscribed.insert(account_pk);
drop(lk);
self.refresh_subscription().await;
let mut return_value = None;
for _ in 0..RETRY_FETCHING_ACCOUNT {
let account_response = self
.rpc_client
.get_account_with_commitment(
&account_pk,
commitment.into_commiment_config(),
)
.await;
Some(account_data)
match account_response {
Ok(response) => {
if let Some(account) = response.value {
// update account in storage and return the account data
let account_data = AccountData {
pubkey: account_pk,
account: Arc::new(account),
updated_slot: response.context.slot,
};
self.accounts_storage
.update_account(account_data.clone(), commitment)
.await;
return_value = Some(account_data);
break;
} else {
// account does not exist
break;
}
}
Err(e) => {
log::error!(
"Error fetching account {} {e:?}",
account_pk.to_string()
);
}
}
}
// account does not exist
None => None,
// update loading lock
{
let mut write_lock = self.accounts_is_loading.lock().await;
let notify = write_lock.remove(&account_pk);
drop(write_lock);
if let Some(notify) = notify {
notify.notify_waiters();
}
}
Ok(return_value)
}
} else {
// issue getting account, will then be updated by geyser
None
}
} else {
// we have already subscribed to the account and it does not exist
None
}
}
}
Expand Down
Loading

0 comments on commit 29bd6de

Please sign in to comment.