Skip to content

Commit

Permalink
Bugfix number of active connections never dropping
Browse files Browse the repository at this point in the history
  • Loading branch information
godmodegalactus committed Mar 20, 2024
1 parent 1754e92 commit 9fabdab
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 33 deletions.
14 changes: 11 additions & 3 deletions bench/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use itertools::Itertools;
use lazy_static::lazy_static;
use rand::{distributions::Alphanumeric, prelude::Distribution, SeedableRng};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::compute_budget;
use solana_sdk::instruction::AccountMeta;
use solana_sdk::{
commitment_config::CommitmentConfig,
Expand Down Expand Up @@ -111,7 +112,10 @@ impl BenchHelper {
let memo = Pubkey::from_str(MEMO_PROGRAM_ID).unwrap();

let instruction = Instruction::new_with_bytes(memo, msg, vec![]);
let message = Message::new(&[instruction], Some(&payer.pubkey()));
let cu = compute_budget::ComputeBudgetInstruction::set_compute_unit_limit(10000);
let price: Instruction =
compute_budget::ComputeBudgetInstruction::set_compute_unit_price(1000000);
let message = Message::new(&[cu, price, instruction], Some(&payer.pubkey()));
Transaction::new(&[payer], message, blockhash)
}

Expand All @@ -120,6 +124,10 @@ impl BenchHelper {

let memo = Pubkey::from_str(MEMO_PROGRAM_ID).unwrap();

let cu = compute_budget::ComputeBudgetInstruction::set_compute_unit_limit(10000);
let price: Instruction =
compute_budget::ComputeBudgetInstruction::set_compute_unit_price(1000000);

let instruction = Instruction::new_with_bytes(
memo,
msg,
Expand All @@ -128,7 +136,7 @@ impl BenchHelper {
.map(|keypair| AccountMeta::new_readonly(keypair.pubkey(), true))
.collect_vec(),
);
let message = Message::new(&[instruction], Some(&payer.pubkey()));
let message = Message::new(&[cu, price, instruction], Some(&payer.pubkey()));

let mut signers = vec![payer];
signers.extend(accounts.iter());
Expand Down Expand Up @@ -160,7 +168,7 @@ fn transaction_size_large() {
);

let seed = 42;
let random_strings = BenchHelper::generate_random_strings(1, Some(seed), 240);
let random_strings = BenchHelper::generate_random_strings(1, Some(seed), 232);
let rand_string = random_strings.first().unwrap();
let tx = BenchHelper::create_memo_tx_large(rand_string, &payer_keypair, blockhash);

Expand Down
2 changes: 1 addition & 1 deletion bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ async fn bench(
let map_of_txs = map_of_txs.clone();
let n_chars = match transaction_size {
TransactionSize::Small => 10,
TransactionSize::Large => 240, // 565 is max but we need to lower that to not burn the CUs
TransactionSize::Large => 232, // 565 is max but we need to lower that to not burn the CUs
};
let rand_strings = BenchHelper::generate_random_strings(tx_count, Some(seed), n_chars);

Expand Down
54 changes: 25 additions & 29 deletions services/src/tpu_utils/tpu_connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl ActiveConnection {
async fn listen(
&self,
mut transaction_reciever: Receiver<SentTransactionInfo>,
mut exit_oneshot_channel: tokio::sync::mpsc::Receiver<()>,
exit_notifier: Arc<Notify>,
addr: SocketAddr,
identity_stakes: IdentityStakesData,
) {
Expand Down Expand Up @@ -220,8 +220,8 @@ impl ActiveConnection {
});
}
},
_ = exit_oneshot_channel.recv() => {
break;
_ = exit_notifier.notified() => {
break 'main_loop;
}
}
}
Expand All @@ -235,31 +235,26 @@ impl ActiveConnection {
pub fn start_listening(
&self,
transaction_reciever: Receiver<SentTransactionInfo>,
exit_oneshot_channel: tokio::sync::mpsc::Receiver<()>,
exit_notifier: Arc<Notify>,
identity_stakes: IdentityStakesData,
) {
let addr = self.tpu_address;
let this = self.clone();
tokio::spawn(async move {
this.listen(
transaction_reciever,
exit_oneshot_channel,
addr,
identity_stakes,
)
.await;
this.listen(transaction_reciever, exit_notifier, addr, identity_stakes)
.await;
});
}
}

struct ActiveConnectionWithExitChannel {
struct ActiveConnectionWithExitNotifier {
pub active_connection: ActiveConnection,
pub exit_stream: tokio::sync::mpsc::Sender<()>,
pub exit_notifier: Arc<Notify>,
}

pub struct TpuConnectionManager {
endpoints: RotatingQueue<Endpoint>,
identity_to_active_connection: Arc<DashMap<Pubkey, Arc<ActiveConnectionWithExitChannel>>>,
identity_to_active_connection: Arc<DashMap<Pubkey, Arc<ActiveConnectionWithExitNotifier>>>,
}

impl TpuConnectionManager {
Expand Down Expand Up @@ -297,37 +292,38 @@ impl TpuConnectionManager {
connection_parameters,
);
// using mpsc as a oneshot channel/ because with one shot channel we cannot reuse the reciever
let (sx, rx) = tokio::sync::mpsc::channel(1);
let exit_notifier = Arc::new(Notify::new());

let broadcast_receiver = broadcast_sender.subscribe();
active_connection.start_listening(broadcast_receiver, rx, identity_stakes);
active_connection.start_listening(
broadcast_receiver,
exit_notifier.clone(),
identity_stakes,
);
self.identity_to_active_connection.insert(
*identity,
Arc::new(ActiveConnectionWithExitChannel {
Arc::new(ActiveConnectionWithExitNotifier {
active_connection,
exit_stream: sx,
exit_notifier,
}),
);
}
}

// remove connections which are no longer needed
let collect_current_active_connections = self
.identity_to_active_connection
.iter()
.map(|x| (*x.key(), x.value().clone()))
.collect::<Vec<_>>();
for (identity, value) in collect_current_active_connections.iter() {
if !connections_to_keep.contains_key(identity) {
trace!("removing a connection for {}", identity);
self.identity_to_active_connection.retain(|key, value| {
if !connections_to_keep.contains_key(key) {
trace!("removing a connection for {}", key.to_string());
// ignore error for exit channel
value
.active_connection
.exit_signal
.store(true, Ordering::Relaxed);
let _ = value.exit_stream.send(()).await;
self.identity_to_active_connection.remove(identity);
value.exit_notifier.notify_one();
false
} else {
true
}
}
});
}
}

0 comments on commit 9fabdab

Please sign in to comment.