Skip to content

Commit

Permalink
wip: Make connections also to the tpu_forwards port (#385)
Browse files Browse the repository at this point in the history
* wip: Make connections also to the tpu_forwards port

* Making using of tpu forwards customable

---------

Co-authored-by: Christian Kamm <[email protected]>
  • Loading branch information
godmodegalactus and ckamm authored Apr 3, 2024
1 parent b84e880 commit 9d776ed
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 31 deletions.
1 change: 1 addition & 0 deletions examples/custom-tpu-send-transactions/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ async fn main() -> anyhow::Result<()> {
number_of_transactions_per_unistream: 1,
unistreams_to_create_new_connection_in_percentage: 5,
prioritization_heap_size,
enable_tpu_forwarding: None,
},
tpu_connection_path: TpuConnectionPath::QuicDirectPath,
};
Expand Down
4 changes: 4 additions & 0 deletions lite-rpc/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,5 +393,9 @@ fn quic_params_from_environment() -> Option<QuicConnectionParameters> {
quic_connection_parameters.unistreams_to_create_new_connection_in_percentage,
);

quic_connection_parameters.enable_tpu_forwarding = env::var("ENABLE_TPU_FORWARDING")
.map(|value| Some(value.parse::<bool>().unwrap()))
.unwrap_or(quic_connection_parameters.enable_tpu_forwarding);

Some(quic_connection_parameters)
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ const QUIC_CONNECTION_PARAMS: QuicConnectionParameters = QuicConnectionParameter
number_of_transactions_per_unistream: 10,
unistreams_to_create_new_connection_in_percentage: 10,
prioritization_heap_size: None,
enable_tpu_forwarding: None,
};

#[test]
Expand Down Expand Up @@ -475,27 +476,27 @@ async fn start_literpc_client_direct_mode(
TpuConnectionManager::new(certificate, key, fanout_slots as usize).await;

// this effectively controls how many connections we will have
let mut connections_to_keep: HashMap<Pubkey, SocketAddr> = HashMap::new();
let mut connections_to_keep = HashSet::<(Pubkey, SocketAddr)>::new();
let addr1 = UdpSocket::bind("127.0.0.1:0")
.unwrap()
.local_addr()
.unwrap();
connections_to_keep.insert(
connections_to_keep.insert((
Pubkey::from_str("1111111jepwNWbYG87sgwnBbUJnQHrPiUJzMpqJXZ")?,
addr1,
);
));

let addr2 = UdpSocket::bind("127.0.0.1:0")
.unwrap()
.local_addr()
.unwrap();
connections_to_keep.insert(
connections_to_keep.insert((
Pubkey::from_str("1111111k4AYMctpyJakWNvGcte6tR8BLyZw54R8qu")?,
addr2,
);
));

// this is the real streamer
connections_to_keep.insert(literpc_validator_identity.pubkey(), streamer_listen_addrs);
connections_to_keep.insert((literpc_validator_identity.pubkey(), streamer_listen_addrs));

// get information about the optional validator identity stake
// populated from get_stakes_for_identity()
Expand Down Expand Up @@ -575,27 +576,27 @@ async fn start_literpc_client_proxy_mode(
QuicProxyConnectionManager::new(certificate, key, forward_proxy_address).await;

// this effectively controls how many connections we will have
let mut connections_to_keep: HashMap<Pubkey, SocketAddr> = HashMap::new();
let mut connections_to_keep: HashSet<(Pubkey, SocketAddr)> = HashSet::new();
let addr1 = UdpSocket::bind("127.0.0.1:0")
.unwrap()
.local_addr()
.unwrap();
connections_to_keep.insert(
connections_to_keep.insert((
Pubkey::from_str("1111111jepwNWbYG87sgwnBbUJnQHrPiUJzMpqJXZ")?,
addr1,
);
));

let addr2 = UdpSocket::bind("127.0.0.1:0")
.unwrap()
.local_addr()
.unwrap();
connections_to_keep.insert(
connections_to_keep.insert((
Pubkey::from_str("1111111k4AYMctpyJakWNvGcte6tR8BLyZw54R8qu")?,
addr2,
);
));

// this is the real streamer
connections_to_keep.insert(validator_identity.pubkey(), streamer_listen_addrs);
connections_to_keep.insert((validator_identity.pubkey(), streamer_listen_addrs));

// get information about the optional validator identity stake
// populated from get_stakes_for_identity()
Expand Down
2 changes: 2 additions & 0 deletions services/src/quic_connection_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ pub struct QuicConnectionParameters {
pub number_of_transactions_per_unistream: usize,
pub unistreams_to_create_new_connection_in_percentage: u8,
pub prioritization_heap_size: Option<usize>,
pub enable_tpu_forwarding: Option<bool>,
}

impl Default for QuicConnectionParameters {
Expand All @@ -123,6 +124,7 @@ impl Default for QuicConnectionParameters {
number_of_transactions_per_unistream: 1,
unistreams_to_create_new_connection_in_percentage: 10,
prioritization_heap_size: None,
enable_tpu_forwarding: None,
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions services/src/tpu_utils/quic_proxy_connection_manager.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::HashMap;
use std::collections::HashSet;
use std::net::{SocketAddr, UdpSocket};
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::Relaxed;
Expand Down Expand Up @@ -64,7 +64,7 @@ impl QuicProxyConnectionManager {
&self,
broadcast_receiver: Receiver<SentTransactionInfo>,
// for duration of this slot these tpu nodes will receive the transactions
connections_to_keep: HashMap<Pubkey, SocketAddr>,
connections_to_keep: HashSet<(Pubkey, SocketAddr)>,
connection_parameters: QuicConnectionParameters,
) {
debug!(
Expand Down
21 changes: 11 additions & 10 deletions services/src/tpu_utils/tpu_connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use solana_lite_rpc_core::{
};
use solana_sdk::pubkey::Pubkey;
use solana_streamer::nonblocking::quic::compute_max_allowed_uni_streams;
use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration};
use std::{collections::HashSet, net::SocketAddr, sync::Arc, time::Duration};
use tokio::sync::{
broadcast::{self, Receiver, Sender},
Notify,
Expand Down Expand Up @@ -242,7 +242,7 @@ impl ActiveConnection {

pub struct TpuConnectionManager {
endpoints: RotatingQueue<Endpoint>,
identity_to_active_connection: Arc<DashMap<Pubkey, ActiveConnection>>,
active_connections: Arc<DashMap<(Pubkey, SocketAddr), ActiveConnection>>,
}

impl TpuConnectionManager {
Expand All @@ -256,21 +256,22 @@ impl TpuConnectionManager {
endpoints: RotatingQueue::new(number_of_clients, || {
QuicConnectionUtils::create_endpoint(certificate.clone(), key.clone())
}),
identity_to_active_connection: Arc::new(DashMap::new()),
active_connections: Arc::new(DashMap::new()),
}
}

pub async fn update_connections(
&self,
broadcast_sender: Arc<Sender<SentTransactionInfo>>,
connections_to_keep: HashMap<Pubkey, SocketAddr>,
connections_to_keep: HashSet<(Pubkey, SocketAddr)>,
identity_stakes: IdentityStakesData,
data_cache: DataCache,
connection_parameters: QuicConnectionParameters,
) {
NB_CONNECTIONS_TO_KEEP.set(connections_to_keep.len() as i64);
for (identity, socket_addr) in &connections_to_keep {
if self.identity_to_active_connection.get(identity).is_none() {
let connection_key = (*identity, *socket_addr);
if self.active_connections.get(&connection_key).is_none() {
trace!("added a connection for {}, {}", identity, socket_addr);
let active_connection = ActiveConnection::new(
self.endpoints.clone(),
Expand All @@ -282,15 +283,15 @@ impl TpuConnectionManager {
// using mpsc as a oneshot channel/ because with one shot channel we cannot reuse the reciever
let broadcast_receiver = broadcast_sender.subscribe();
active_connection.start_listening(broadcast_receiver, identity_stakes);
self.identity_to_active_connection
.insert(*identity, active_connection);
self.active_connections
.insert((*identity, *socket_addr), active_connection);
}
}

// remove connections which are no longer needed
self.identity_to_active_connection.retain(|key, value| {
if !connections_to_keep.contains_key(key) {
trace!("removing a connection for {}", key.to_string());
self.active_connections.retain(|key, value| {
if !connections_to_keep.contains(key) {
trace!("removing a connection for {} {}", key.0, key.1);
// ignore error for exit channel
let _ = value.exit_notifier.send(());
false
Expand Down
40 changes: 33 additions & 7 deletions services/src/tpu_utils/tpu_service.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use anyhow::Context;
use prometheus::{core::GenericGauge, opts, register_int_gauge};
use solana_streamer::nonblocking::quic::ConnectionPeerType;

use super::tpu_connection_manager::TpuConnectionManager;
use crate::quic_connection_utils::QuicConnectionParameters;
Expand All @@ -15,7 +16,7 @@ use solana_lite_rpc_core::types::SlotStream;
use solana_lite_rpc_core::AnyhowJoinHandle;
use solana_sdk::{quic::QUIC_PORT_OFFSET, signature::Keypair, slot_history::Slot};
use solana_streamer::tls_certificates::new_self_signed_tls_certificate;
use std::collections::HashMap;
use std::collections::HashSet;
use std::{
net::{IpAddr, Ipv4Addr},
sync::Arc,
Expand Down Expand Up @@ -128,8 +129,21 @@ impl TpuService {
.leader_schedule
.get_slot_leaders(current_slot, last_slot)
.await?;

let identity_stakes = self.data_cache.identity_stakes.get_stakes().await;

let enable_tpu_forwards = {
match identity_stakes.peer_type {
ConnectionPeerType::Unstaked => false,
ConnectionPeerType::Staked => self
.config
.quic_connection_params
.enable_tpu_forwarding
.unwrap_or_default(),
}
};
// get next leader with its tpu port
let connections_to_keep: HashMap<_, _> = next_leaders
let connections_to_keep: HashSet<_, _> = next_leaders
.iter()
.map(|x| {
let contact_info = cluster_nodes.get(&x.pubkey);
Expand All @@ -140,11 +154,23 @@ impl TpuService {
(x.pubkey, tpu_port)
})
.filter(|x| x.1.is_some())
.map(|x| {
let mut addr = x.1.unwrap();
.flat_map(|x| {
let mut addresses = vec![];
let mut tpu_addr = x.1.unwrap();
// add quic port offset
addr.set_port(addr.port() + QUIC_PORT_OFFSET);
(x.0, addr)
tpu_addr.set_port(tpu_addr.port() + QUIC_PORT_OFFSET);

addresses.push((x.0, tpu_addr));

if enable_tpu_forwards {
// Technically the forwards port could be anywhere and unfortunately getClusterNodes
// does not report it. However it's nearly always directly after the tpu port.
let mut tpu_forwards_addr = tpu_addr;
tpu_forwards_addr.set_port(tpu_addr.port() + 1);
addresses.push((x.0, tpu_forwards_addr));
}

addresses
})
.collect();

Expand All @@ -156,7 +182,7 @@ impl TpuService {
.update_connections(
self.broadcast_sender.clone(),
connections_to_keep,
self.data_cache.identity_stakes.get_stakes().await,
identity_stakes,
self.data_cache.clone(),
self.config.quic_connection_params,
)
Expand Down

0 comments on commit 9d776ed

Please sign in to comment.