From 21cb268177ebf721f4d96a6d312e95919214b735 Mon Sep 17 00:00:00 2001 From: max funk Date: Sat, 16 Nov 2024 17:35:39 -0700 Subject: [PATCH] throttle reads from measure service allows graphql to handle close message from client --- services/graphql/Cargo.toml | 1 - services/graphql/src/main.rs | 27 ++++++++++++++++----------- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/services/graphql/Cargo.toml b/services/graphql/Cargo.toml index ce0631cf..9526d6fa 100644 --- a/services/graphql/Cargo.toml +++ b/services/graphql/Cargo.toml @@ -24,7 +24,6 @@ futures-util = "0.3.30" wsclient = { path = "../../crates/wsclient" } tungstenite = { version = "0.24.0", default-features = false } async-stream = "0.3.5" -log = "0.4.22" [target.x86_64-unknown-linux-musl.dependencies] # https://github.com/cross-rs/cross/wiki/Recipes#vendored diff --git a/services/graphql/src/main.rs b/services/graphql/src/main.rs index a7958797..0ac9eb4d 100644 --- a/services/graphql/src/main.rs +++ b/services/graphql/src/main.rs @@ -18,7 +18,6 @@ use axum::{ }; use futures_util::stream::Stream; use httpclient::HttpClient as Client; -use log::debug; use serde_json::json; use shutdown::shutdown_signal; use std::{env, net::ToSocketAddrs, result::Result}; @@ -202,42 +201,43 @@ impl Subscription { let resource = env::var("MEASURE_RESOURCE").unwrap(); let uri = format!("{}/{}", base_uri, resource); let ws_client = WsClient::new(uri, "gdp".to_string(), date, country, region, municipality); - stream! { let mut measure_socket = match ws_client.connect() { Ok(ws) => { - debug!("measure websocket connection created"); + tracing::info!("graphql websocket connection created with measure"); ws } Err(_e) => { - debug!("measure webSocket connection failure: {:?}", _e); + tracing::info!("graphql failed to create webSocket with measure: {:?}", _e); return; } }; - loop { match measure_socket.read() { Ok(msg) => { match msg { tungstenite::Message::Text(text) => { let gdp: f64 = serde_json::from_str(&text).unwrap(); + tracing::info!("sending gdp from measure: {}", gdp); yield gdp; } _ => { - debug!("received non-text message: {:?}", msg); + tracing::info!("received non-text message from measure: {:?}", msg); } } } Err(WsError::ConnectionClosed) => { measure_socket.close(None).unwrap(); - debug!("measure websocket closed"); + tracing::info!("graphql received closed message from measure"); break; } Err(e) => { - debug!("measure message receipt failure: {:?}", e); + tracing::info!("graphql message receipt failure from measure: {:?}", e); break; } } + // throttle reads from measure service to avoid blocking close messages from client + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; } } } @@ -305,17 +305,22 @@ async fn graphql_subscription( ) -> impl IntoResponse { ws.protocols(http::ALL_WEBSOCKET_PROTOCOLS) .on_upgrade(move |socket| async move { - // println!("connection opened"); + tracing::info!("graphql subscription started"); GraphQLWebSocket::new(socket, schema, protocol) + .keepalive_timeout(None) .serve() .await; - // println!("connection closed"); + tracing::info!("graphql subscription closed"); }) } #[tokio::main] async fn main() { - tracing_subscriber::fmt().with_ansi(false).init(); + if let Ok(level) = env::var("RUST_LOG") { + tracing_subscriber::fmt().with_env_filter(level).init(); + } else { + tracing_subscriber::fmt().init(); + } let readiness_check_path = env::var(READINESS_CHECK_PATH) .unwrap_or_else(|_| panic!("{READINESS_CHECK_PATH} variable assignment"));