1- use std:: {
2- borrow:: Cow ,
3- collections:: HashMap as StdHashMap ,
4- net:: SocketAddr ,
5- sync:: Arc ,
6- time:: { Duration , Instant } ,
7- } ;
8-
91use anyhow:: * ;
102use bytes:: Bytes ;
113use futures_util:: { SinkExt , StreamExt } ;
@@ -16,17 +8,30 @@ use hyper_util::{client::legacy::Client, rt::TokioExecutor};
168use moka:: future:: Cache ;
179use rand;
1810use rivet_api_builder:: { ErrorResponse , RawErrorResponse } ;
19- use rivet_error:: RivetError ;
11+ use rivet_error:: { INTERNAL_ERROR , RivetError } ;
2012use rivet_metrics:: KeyValue ;
2113use rivet_util:: Id ;
2214use serde_json;
15+ use std:: {
16+ borrow:: Cow ,
17+ collections:: HashMap as StdHashMap ,
18+ net:: SocketAddr ,
19+ sync:: Arc ,
20+ time:: { Duration , Instant } ,
21+ } ;
2322use tokio:: sync:: Mutex ;
2423use tokio:: time:: timeout;
25- use tokio_tungstenite:: tungstenite:: client:: IntoClientRequest ;
24+ use tokio_tungstenite:: tungstenite:: {
25+ client:: IntoClientRequest ,
26+ protocol:: { CloseFrame , frame:: coding:: CloseCode } ,
27+ } ;
2628use tracing:: Instrument ;
2729use url:: Url ;
2830
29- use crate :: { custom_serve:: CustomServeTrait , errors, metrics, request_context:: RequestContext } ;
31+ use crate :: {
32+ WebSocketHandle , custom_serve:: CustomServeTrait , errors, metrics,
33+ request_context:: RequestContext ,
34+ } ;
3035
3136pub const X_FORWARDED_FOR : HeaderName = HeaderName :: from_static ( "x-forwarded-for" ) ;
3237pub const X_RIVET_ERROR : HeaderName = HeaderName :: from_static ( "x-rivet-error" ) ;
@@ -1432,9 +1437,9 @@ impl ProxyService {
14321437
14331438 // Close the WebSocket connection with the response message
14341439 let _ = client_ws. close ( Some ( tokio_tungstenite:: tungstenite:: protocol:: CloseFrame {
1435- code : tokio_tungstenite:: tungstenite:: protocol:: frame:: coding:: CloseCode :: Error ,
1436- reason : response. message . as_ref ( ) . into ( ) ,
1437- } ) ) . await ;
1440+ code : tokio_tungstenite:: tungstenite:: protocol:: frame:: coding:: CloseCode :: Error ,
1441+ reason : response. message . as_ref ( ) . into ( ) ,
1442+ } ) ) . await ;
14381443 return ;
14391444 }
14401445 Result :: Ok ( ResolveRouteOutput :: CustomServe ( _) ) => {
@@ -1813,31 +1818,42 @@ impl ProxyService {
18131818 let mut attempts = 0u32 ;
18141819 let mut client_ws = client_websocket;
18151820
1821+ let ws_handle = WebSocketHandle :: new ( client_ws) ;
1822+
18161823 loop {
18171824 match handlers
18181825 . handle_websocket (
1819- client_ws ,
1826+ ws_handle . clone ( ) ,
18201827 & req_headers,
18211828 & req_path,
18221829 & mut request_context,
18231830 )
18241831 . await
18251832 {
1826- Result :: Ok ( ( ) ) => break ,
1827- Result :: Err ( ( returned_client_ws, err) ) => {
1833+ Result :: Ok ( ( ) ) => {
1834+ tracing:: debug!( "websocket closed" ) ;
1835+
1836+ // Send graceful close
1837+ ws_handle. send ( hyper_tungstenite:: tungstenite:: Message :: Close ( Some (
1838+ hyper_tungstenite:: tungstenite:: protocol:: CloseFrame {
1839+ code : hyper_tungstenite:: tungstenite:: protocol:: frame:: coding:: CloseCode :: Normal ,
1840+ reason : format ! ( "Closed" ) . into ( ) ,
1841+ } ,
1842+ ) ) ) ;
1843+
1844+ break ;
1845+ }
1846+ Result :: Err ( err) => {
18281847 attempts += 1 ;
18291848 if attempts > max_attempts || !is_retryable_ws_error ( & err) {
1830- // Accept and close the client websocket with an error reason
1831- if let Result :: Ok ( mut ws) = returned_client_ws. await {
1832- let _ = ws
1833- . send ( hyper_tungstenite:: tungstenite:: Message :: Close ( Some (
1834- hyper_tungstenite:: tungstenite:: protocol:: CloseFrame {
1835- code : hyper_tungstenite:: tungstenite:: protocol:: frame:: coding:: CloseCode :: Error ,
1836- reason : format ! ( "{}" , err) . into ( ) ,
1837- } ,
1838- ) ) )
1839- . await ;
1840- }
1849+ // Close WebSocket with error
1850+ ws_handle
1851+ . accept_and_send (
1852+ hyper_tungstenite:: tungstenite:: Message :: Close (
1853+ Some ( err_to_close_frame ( err) ) ,
1854+ ) ,
1855+ )
1856+ . await ?;
18411857
18421858 break ;
18431859 } else {
@@ -1861,49 +1877,38 @@ impl ProxyService {
18611877 new_handlers,
18621878 ) ) => {
18631879 handlers = new_handlers;
1864- client_ws = returned_client_ws;
18651880 continue ;
18661881 }
18671882 Result :: Ok ( ResolveRouteOutput :: Response ( response) ) => {
1868- if let Result :: Ok ( mut ws) = returned_client_ws. await
1869- {
1870- let _ = ws
1871- . send ( hyper_tungstenite:: tungstenite:: Message :: Close ( Some (
1872- hyper_tungstenite:: tungstenite:: protocol:: CloseFrame {
1873- code : hyper_tungstenite:: tungstenite:: protocol:: frame:: coding:: CloseCode :: Error ,
1874- reason : response. message . as_ref ( ) . into ( ) ,
1875- } ,
1876- ) ) )
1877- . await ;
1878- }
1879- break ;
1883+ ws_handle
1884+ . accept_and_send ( hyper_tungstenite:: tungstenite:: Message :: Close ( Some (
1885+ hyper_tungstenite:: tungstenite:: protocol:: CloseFrame {
1886+ code : hyper_tungstenite:: tungstenite:: protocol:: frame:: coding:: CloseCode :: Error ,
1887+ reason : response. message . as_ref ( ) . into ( ) ,
1888+ } ,
1889+ ) ) )
1890+ . await ;
18801891 }
18811892 Result :: Ok ( ResolveRouteOutput :: Target ( _) ) => {
1882- if let Result :: Ok ( mut ws) = returned_client_ws. await
1883- {
1884- let _ = ws
1885- . send ( hyper_tungstenite:: tungstenite:: Message :: Close ( Some (
1886- hyper_tungstenite:: tungstenite:: protocol:: CloseFrame {
1887- code : hyper_tungstenite:: tungstenite:: protocol:: frame:: coding:: CloseCode :: Error ,
1888- reason : "Cannot retry WebSocket with non-custom serve route" . into ( ) ,
1889- } ,
1890- ) ) )
1891- . await ;
1892- }
1893+ ws_handle
1894+ . accept_and_send ( hyper_tungstenite:: tungstenite:: Message :: Close ( Some (
1895+ hyper_tungstenite:: tungstenite:: protocol:: CloseFrame {
1896+ code : hyper_tungstenite:: tungstenite:: protocol:: frame:: coding:: CloseCode :: Error ,
1897+ reason : "Cannot retry WebSocket with non-custom serve route" . into ( ) ,
1898+ } ,
1899+ ) ) )
1900+ . await ;
18931901 break ;
18941902 }
18951903 Err ( res_err) => {
1896- if let Result :: Ok ( mut ws) = returned_client_ws. await
1897- {
1898- let _ = ws
1899- . send ( hyper_tungstenite:: tungstenite:: Message :: Close ( Some (
1900- hyper_tungstenite:: tungstenite:: protocol:: CloseFrame {
1901- code : hyper_tungstenite:: tungstenite:: protocol:: frame:: coding:: CloseCode :: Error ,
1902- reason : format ! ( "Routing error: {}" , res_err) . into ( ) ,
1903- } ,
1904- ) ) )
1905- . await ;
1906- }
1904+ ws_handle
1905+ . accept_and_send ( hyper_tungstenite:: tungstenite:: Message :: Close ( Some (
1906+ hyper_tungstenite:: tungstenite:: protocol:: CloseFrame {
1907+ code : hyper_tungstenite:: tungstenite:: protocol:: frame:: coding:: CloseCode :: Error ,
1908+ reason : format ! ( "Routing error: {}" , res_err) . into ( ) ,
1909+ } ,
1910+ ) ) )
1911+ . await ;
19071912 break ;
19081913 }
19091914 }
@@ -2242,3 +2247,26 @@ fn is_retryable_ws_error(err: &anyhow::Error) -> bool {
22422247 false
22432248 }
22442249}
2250+
2251+ pub fn err_to_close_frame ( err : anyhow:: Error ) -> CloseFrame {
2252+ let rivet_err = err
2253+ . chain ( )
2254+ . find_map ( |x| x. downcast_ref :: < RivetError > ( ) )
2255+ . cloned ( )
2256+ . unwrap_or_else ( || RivetError :: from ( & INTERNAL_ERROR ) ) ;
2257+
2258+ let code = match ( rivet_err. group ( ) , rivet_err. code ( ) ) {
2259+ ( "ws" , "connection_closed" ) => CloseCode :: Normal ,
2260+ _ => CloseCode :: Error ,
2261+ } ;
2262+
2263+ // NOTE: reason cannot be more than 123 bytes as per the WS protocol
2264+ let reason = rivet_util:: safe_slice (
2265+ & format ! ( "{}.{}" , rivet_err. group( ) , rivet_err. code( ) ) ,
2266+ 0 ,
2267+ 123 ,
2268+ )
2269+ . into ( ) ;
2270+
2271+ CloseFrame { code, reason }
2272+ }
0 commit comments