Skip to content

Commit

Permalink
backend: fix ws stream shutdown (#19)
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid authored Nov 17, 2024
1 parent f5bc1d3 commit 9b79795
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 21 deletions.
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ The minor version will be incremented upon a breaking change and the patch versi
- geyser: wait all transactions before process block ([#10](https://github.com/solana-stream-solutions/solfees/pull/10))
- frontend: init ([#8](https://github.com/solana-stream-solutions/solfees/pull/8))
- geyser: use process_compute_budget_instructions ([#15](https://github.com/solana-stream-solutions/solfees/pull/15))
- api: improve parallelism ([#16](https://github.com/solana-stream-solutions/solfees/pull/16))
- backend: improve parallelism ([#16](https://github.com/solana-stream-solutions/solfees/pull/16))
- geyser: do not stream outdated data ([#17](https://github.com/solana-stream-solutions/solfees/pull/17))
- api: add metrics of used resources ([#18](https://github.com/solana-stream-solutions/solfees/pull/18))
- backend: add metrics of used resources ([#18](https://github.com/solana-stream-solutions/solfees/pull/18))
- backend: fix ws stream shutdown ([#19](https://github.com/solana-stream-solutions/solfees/pull/19))

### Breaking
6 changes: 4 additions & 2 deletions solfees-be/src/bin/solfees-ws-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,14 @@ async fn main() -> anyhow::Result<()> {
Some(Ok(Message::Ping(_))) => continue,
Some(Ok(Message::Pong(_))) => continue,
Some(Ok(Message::Frame(_))) => continue,
Some(Ok(Message::Close(_))) => return Ok(()),
Some(Ok(Message::Close(_))) => anyhow::bail!("close message received"),
Some(Err(error)) => anyhow::bail!(error),
None => anyhow::bail!("stream is closed"),
None => anyhow::bail!("stream finished"),
};
info!("new message: {text}");
}
#[allow(unreachable_code)]
Ok::<(), anyhow::Error>(())
};

tokio::try_join!(req_to_ws, ws_to_stdout).map(|_| ())
Expand Down
20 changes: 12 additions & 8 deletions solfees-be/src/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use {
net::TcpListener,
sync::{broadcast, Notify},
},
tracing::{error, info},
tracing::{debug, error, info},
};

pub async fn run_admin(addr: SocketAddr, shutdown: Arc<Notify>) -> anyhow::Result<()> {
Expand Down Expand Up @@ -73,6 +73,9 @@ pub async fn run_solfees(
let listener = TcpListener::bind(addr).await?;
info!(%addr, "Start Solfees RPC server");

let (ws_tx, _ws_rx) = broadcast::channel(1);
let ws_tx = Arc::new(ws_tx);

let http = ServerBuilder::new(TokioExecutor::new());
let graceful = GracefulShutdown::new();
loop {
Expand All @@ -83,13 +86,13 @@ pub async fn run_solfees(

let solana_rpc = solana_rpc.clone();
let config_metrics = Arc::clone(&config_metrics);
let (shutdown_tx, shutdown_rx) = broadcast::channel(1);
let ws_tx = Arc::clone(&ws_tx);
let connection = http.serve_connection_with_upgrades(
TokioIo::new(Box::pin(stream)),
service_fn(move |mut req: Request<BodyIncoming>| {
let solana_rpc = solana_rpc.clone();
let config_metrics = Arc::clone(&config_metrics);
let shutdown_rx = shutdown_rx.resubscribe();
let ws_tx = Arc::clone(&ws_tx);
async move {
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ReqType {
Expand Down Expand Up @@ -129,7 +132,6 @@ pub async fn run_solfees(
client_id,
solana_rpc_mode,
body.aggregate(),
shutdown_rx,
)
})
.await
Expand Down Expand Up @@ -166,6 +168,7 @@ pub async fn run_solfees(
client_id,
solana_rpc_mode,
websocket,
ws_tx.subscribe(),
));
let (parts, body) = response.into_parts();
Ok(Response::from_parts(parts, body.boxed()))
Expand All @@ -179,15 +182,16 @@ pub async fn run_solfees(
}
}),
);
let fut = graceful.watch(connection.into_owned());

let connection = graceful.watch(connection.into_owned());
tokio::spawn(async move {
let _ = fut.await;
drop(shutdown_tx);
if let Err(error) = connection.await {
debug!(error, "connection error");
}
});
}

drop(listener);
drop(ws_tx);
graceful.shutdown().await;

Ok::<(), anyhow::Error>(())
Expand Down
12 changes: 3 additions & 9 deletions solfees-be/src/rpc_solana.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,6 @@ impl SolanaRpc {
client_id: ClientId,
mode: SolanaRpcMode,
body: impl Buf,
mut shutdown_rx: broadcast::Receiver<()>,
) -> anyhow::Result<(RpcRequestsStats, Vec<u8>)> {
let timer = client_id.start_timer_cpu();
let mut stats = RpcRequestsStats::default();
Expand Down Expand Up @@ -534,14 +533,6 @@ impl SolanaRpc {
let response_rx = join_all(rxs);

tokio::select! {
value = shutdown_rx.recv() => match value {
Ok(()) => unreachable!(),
Err(broadcast::error::RecvError::Closed) => {
shutdown.store(true, Ordering::Relaxed);
anyhow::bail!("connection closed");
},
Err(broadcast::error::RecvError::Lagged(_)) => unreachable!(),
},
() = sleep(self.request_timeout) => {
shutdown.store(true, Ordering::Relaxed);
anyhow::bail!("request timeout");
Expand Down Expand Up @@ -590,6 +581,7 @@ impl SolanaRpc {
client_id: ClientId,
mode: SolanaRpcMode,
websocket: HyperWebsocket,
mut shutdown_rx: broadcast::Receiver<()>,
) {
let ws_frontend = match mode {
SolanaRpcMode::Solfees => false,
Expand Down Expand Up @@ -627,6 +619,8 @@ impl SolanaRpc {
};

tokio::select! {
_ = shutdown_rx.recv() => break Some(None),

flush_result = websocket_tx_flush => match flush_result {
Ok(()) => {
flush_required = false;
Expand Down

0 comments on commit 9b79795

Please sign in to comment.