Skip to content

Commit

Permalink
Replacing exit signal with exit notification (#372)
Browse files Browse the repository at this point in the history
* Replacing exit signal with exit notification

* Deprecitating nightly version

* Increase connection size
  • Loading branch information
godmodegalactus authored Mar 27, 2024
1 parent 5957c60 commit 02e25ca
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 90 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/clippy_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
- uses: actions-rust-lang/setup-rust-toolchain@v1
with:
# use toolchain version from rust-toolchain.toml
toolchain: nightly-2024-01-05
toolchain: nightly-2023-10-05
components: rustfmt, clippy
cache: true
# avoid the default "-D warnings" which thrashes cache
Expand All @@ -48,5 +48,5 @@ jobs:

- name: Run fmt+clippy
run: |
cargo +nightly-2024-01-05 fmt --all --check
cargo +nightly-2024-01-05 clippy --locked --workspace --all-targets -- -D warnings
cargo +nightly-2023-10-05 fmt --all --check
cargo +nightly-2023-10-05 clippy --locked --workspace --all-targets -- -D warnings
3 changes: 0 additions & 3 deletions cluster-endpoints/src/grpc_multiplex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use solana_lite_rpc_core::AnyhowJoinHandle;
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::CommitmentConfig;
use std::collections::{BTreeSet, HashMap, HashSet};
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::broadcast::Receiver;
Expand Down Expand Up @@ -140,7 +139,6 @@ pub fn create_grpc_multiplex_blocks_subscription(
let (processed_block_sender, processed_block_reciever) =
async_channel::unbounded::<ProducedBlock>();

let exit_signal = Arc::new(AtomicBool::new(false));
let exit_notify = Arc::new(Notify::new());
let processed_blocks_tasks = create_grpc_multiplex_processed_block_stream(
&grpc_sources,
Expand Down Expand Up @@ -247,7 +245,6 @@ pub fn create_grpc_multiplex_blocks_subscription(
}
}
}
exit_signal.store(true, std::sync::atomic::Ordering::Relaxed);
exit_notify.notify_waiters();
futures::future::join_all(processed_blocks_tasks).await;
}
Expand Down
1 change: 1 addition & 0 deletions lite-rpc/src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ impl LiteBridge {

let http_server_handle = ServerBuilder::default()
.http_only()
.max_connections(1_000_000)
.build(http_addr.clone())
.await?
.start(rpc)?;
Expand Down
4 changes: 2 additions & 2 deletions run_clippy_fmt.sh
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
cargo +nightly-2024-01-05 fmt --all
cargo +nightly-2024-01-05 clippy --locked --workspace --all-targets -- -D warnings
cargo +nightly-2023-10-05 fmt --all
cargo +nightly-2023-10-05 clippy --locked --workspace --all-targets -- -D warnings
70 changes: 43 additions & 27 deletions services/src/quic_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::{
Arc,
},
};
use tokio::sync::{OwnedSemaphorePermit, RwLock, Semaphore};
use tokio::sync::{Notify, OwnedSemaphorePermit, RwLock, Semaphore};

pub type EndpointPool = RotatingQueue<Endpoint>;

Expand All @@ -40,7 +40,7 @@ pub struct QuicConnection {
identity: Pubkey,
socket_address: SocketAddr,
connection_params: QuicConnectionParameters,
exit_signal: Arc<AtomicBool>,
exit_notify: Arc<Notify>,
timeout_counters: Arc<AtomicU64>,
has_connected_once: Arc<AtomicBool>,
}
Expand All @@ -51,7 +51,7 @@ impl QuicConnection {
endpoint: Endpoint,
socket_address: SocketAddr,
connection_params: QuicConnectionParameters,
exit_signal: Arc<AtomicBool>,
exit_notify: Arc<Notify>,
) -> Self {
Self {
connection: Arc::new(RwLock::new(None)),
Expand All @@ -60,7 +60,7 @@ impl QuicConnection {
identity,
socket_address,
connection_params,
exit_signal,
exit_notify,
timeout_counters: Arc::new(AtomicU64::new(0)),
has_connected_once: Arc::new(AtomicBool::new(false)),
}
Expand All @@ -74,7 +74,7 @@ impl QuicConnection {
self.socket_address,
self.connection_params.connection_timeout,
self.connection_params.connection_retry_count,
self.exit_signal.clone(),
self.exit_notify.clone(),
)
.await
}
Expand Down Expand Up @@ -127,32 +127,48 @@ impl QuicConnection {
pub async fn send_transaction(&self, tx: Vec<u8>) {
let connection_retry_count = self.connection_params.connection_retry_count;
for _ in 0..connection_retry_count {
if self.exit_signal.load(Ordering::Relaxed) {
// return
return;
}

let mut do_retry = false;
let connection = self.get_connection().await;
let exit_notify = self.exit_notify.clone();

let connection = tokio::select! {
conn = self.get_connection() => {
conn
},
_ = exit_notify.notified() => {
break;
}
};

if let Some(connection) = connection {
TRIED_SEND_TRANSCTION_TRIED.inc();
let current_stable_id = connection.stable_id() as u64;
match QuicConnectionUtils::open_unistream(
connection,
self.connection_params.unistream_timeout,
)
.await
{
let open_uni_result = tokio::select! {
res = QuicConnectionUtils::open_unistream(
connection,
self.connection_params.unistream_timeout,
) => {
res
},
_ = exit_notify.notified() => {
break;
}
};
match open_uni_result {
Ok(send_stream) => {
match QuicConnectionUtils::write_all(
send_stream,
&tx,
self.identity,
self.connection_params,
)
.await
{
let write_add_result = tokio::select! {
res = QuicConnectionUtils::write_all(
send_stream,
&tx,
self.identity,
self.connection_params,
) => {
res
},
_ = exit_notify.notified() => {
break;
}
};
match write_add_result {
Ok(()) => {
SEND_TRANSCTION_SUCESSFUL.inc();
}
Expand Down Expand Up @@ -231,7 +247,7 @@ impl QuicConnectionPool {
endpoints: EndpointPool,
socket_address: SocketAddr,
connection_parameters: QuicConnectionParameters,
exit_signal: Arc<AtomicBool>,
exit_notify: Arc<Notify>,
nb_connection: usize,
max_number_of_unistream_connection: usize,
) -> Self {
Expand All @@ -243,7 +259,7 @@ impl QuicConnectionPool {
endpoints.get().expect("Should get and endpoint"),
socket_address,
connection_parameters,
exit_signal.clone(),
exit_notify.clone(),
));
}
Self {
Expand Down
30 changes: 19 additions & 11 deletions services/src/quic_connection_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,10 @@ use solana_lite_rpc_core::network_utils::apply_gso_workaround;
use solana_sdk::pubkey::Pubkey;
use std::{
net::{IpAddr, Ipv4Addr, SocketAddr},
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
sync::Arc,
time::Duration,
};
use tokio::time::timeout;
use tokio::{sync::Notify, time::timeout};

lazy_static::lazy_static! {
static ref NB_QUIC_0RTT_ATTEMPTED: GenericGauge<prometheus::core::AtomicI64> =
Expand Down Expand Up @@ -222,15 +219,29 @@ impl QuicConnectionUtils {
addr: SocketAddr,
connection_timeout: Duration,
connection_retry_count: usize,
exit_signal: Arc<AtomicBool>,
exit_notified: Arc<Notify>,
) -> Option<Connection> {
for _ in 0..connection_retry_count {
let conn = if already_connected {
NB_QUIC_0RTT_ATTEMPTED.inc();
Self::make_connection_0rtt(endpoint.clone(), addr, connection_timeout).await
tokio::select! {
res = Self::make_connection_0rtt(endpoint.clone(), addr, connection_timeout) => {
res
},
_ = exit_notified.notified() => {
break;
}
}
} else {
NB_QUIC_CONN_ATTEMPTED.inc();
Self::make_connection(endpoint.clone(), addr, connection_timeout).await
tokio::select! {
res = Self::make_connection(endpoint.clone(), addr, connection_timeout) => {
res
},
_ = exit_notified.notified() => {
break;
}
}
};
match conn {
Ok(conn) => {
Expand All @@ -239,9 +250,6 @@ impl QuicConnectionUtils {
}
Err(e) => {
trace!("Could not connect to {} because of error {}", identity, e);
if exit_signal.load(Ordering::Relaxed) {
break;
}
}
}
}
Expand Down
62 changes: 18 additions & 44 deletions services/src/tpu_utils/tpu_connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,7 @@ use solana_lite_rpc_core::{
};
use solana_sdk::pubkey::Pubkey;
use solana_streamer::nonblocking::quic::compute_max_allowed_uni_streams;
use std::{
collections::HashMap,
net::SocketAddr,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};
use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration};
use tokio::sync::{
broadcast::{Receiver, Sender},
Notify,
Expand Down Expand Up @@ -55,9 +47,9 @@ struct ActiveConnection {
endpoints: RotatingQueue<Endpoint>,
identity: Pubkey,
tpu_address: SocketAddr,
exit_signal: Arc<AtomicBool>,
data_cache: DataCache,
connection_parameters: QuicConnectionParameters,
exit_notifier: Arc<Notify>,
}

impl ActiveConnection {
Expand All @@ -72,9 +64,9 @@ impl ActiveConnection {
endpoints,
tpu_address,
identity,
exit_signal: Arc::new(AtomicBool::new(false)),
data_cache,
connection_parameters,
exit_notifier: Arc::new(Notify::new()),
}
}

Expand All @@ -99,13 +91,12 @@ impl ActiveConnection {
identity_stakes.stakes,
identity_stakes.total_stakes,
);
let exit_signal = self.exit_signal.clone();
let connection_pool = QuicConnectionPool::new(
identity,
self.endpoints.clone(),
addr,
self.connection_parameters,
exit_signal.clone(),
exit_notifier.clone(),
max_number_of_connections,
max_uni_stream_connections,
);
Expand All @@ -116,12 +107,19 @@ impl ActiveConnection {
let priorization_heap = priorization_heap.clone();
let data_cache = self.data_cache.clone();
let fill_notify = fill_notify.clone();
let exit_signal = exit_signal.clone();
let exit_notifier = exit_notifier.clone();
tokio::spawn(async move {
let mut current_blockheight =
data_cache.block_information_store.get_last_blockheight();
while !exit_signal.load(Ordering::Relaxed) {
let tx = transaction_reciever.recv().await;
loop {
let tx = tokio::select! {
tx = transaction_reciever.recv() => {
tx
},
_ = exit_notifier.notified() => {
break;
}
};
match tx {
Ok(transaction_sent_info) => {
if data_cache
Expand Down Expand Up @@ -172,20 +170,10 @@ impl ActiveConnection {
};

'main_loop: loop {
// exit signal set
if exit_signal.load(Ordering::Relaxed) {
break;
}

tokio::select! {
_ = fill_notify.notified() => {

'process_heap: loop {
// exit signal set
if exit_signal.load(Ordering::Relaxed) {
break 'main_loop;
}

let Some(tx) = priorization_heap.pop().await else {
// wait to get notification from fill event
break 'process_heap;
Expand Down Expand Up @@ -248,14 +236,9 @@ impl ActiveConnection {
}
}

struct ActiveConnectionWithExitNotifier {
pub active_connection: ActiveConnection,
pub exit_notifier: Arc<Notify>,
}

pub struct TpuConnectionManager {
endpoints: RotatingQueue<Endpoint>,
identity_to_active_connection: Arc<DashMap<Pubkey, Arc<ActiveConnectionWithExitNotifier>>>,
identity_to_active_connection: Arc<DashMap<Pubkey, ActiveConnection>>,
}

impl TpuConnectionManager {
Expand Down Expand Up @@ -301,13 +284,8 @@ impl TpuConnectionManager {
exit_notifier.clone(),
identity_stakes,
);
self.identity_to_active_connection.insert(
*identity,
Arc::new(ActiveConnectionWithExitNotifier {
active_connection,
exit_notifier,
}),
);
self.identity_to_active_connection
.insert(*identity, active_connection);
}
}

Expand All @@ -316,11 +294,7 @@ impl TpuConnectionManager {
if !connections_to_keep.contains_key(key) {
trace!("removing a connection for {}", key.to_string());
// ignore error for exit channel
value
.active_connection
.exit_signal
.store(true, Ordering::Relaxed);
value.exit_notifier.notify_one();
value.exit_notifier.notify_waiters();
false
} else {
true
Expand Down

0 comments on commit 02e25ca

Please sign in to comment.