Skip to content

Commit

Permalink
throttle reads from measure service
Browse files Browse the repository at this point in the history
allows graphql to handle close message from client
  • Loading branch information
mxfactorial committed Nov 17, 2024
1 parent 26763b8 commit 21cb268
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 12 deletions.
1 change: 0 additions & 1 deletion services/graphql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ futures-util = "0.3.30"
wsclient = { path = "../../crates/wsclient" }
tungstenite = { version = "0.24.0", default-features = false }
async-stream = "0.3.5"
log = "0.4.22"

[target.x86_64-unknown-linux-musl.dependencies]
# https://github.com/cross-rs/cross/wiki/Recipes#vendored
Expand Down
27 changes: 16 additions & 11 deletions services/graphql/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use axum::{
};
use futures_util::stream::Stream;
use httpclient::HttpClient as Client;
use log::debug;
use serde_json::json;
use shutdown::shutdown_signal;
use std::{env, net::ToSocketAddrs, result::Result};
Expand Down Expand Up @@ -202,42 +201,43 @@ impl Subscription {
let resource = env::var("MEASURE_RESOURCE").unwrap();
let uri = format!("{}/{}", base_uri, resource);
let ws_client = WsClient::new(uri, "gdp".to_string(), date, country, region, municipality);

stream! {
let mut measure_socket = match ws_client.connect() {
Ok(ws) => {
debug!("measure websocket connection created");
tracing::info!("graphql websocket connection created with measure");
ws
}
Err(_e) => {
debug!("measure webSocket connection failure: {:?}", _e);
tracing::info!("graphql failed to create webSocket with measure: {:?}", _e);
return;
}
};

loop {
match measure_socket.read() {
Ok(msg) => {
match msg {
tungstenite::Message::Text(text) => {
let gdp: f64 = serde_json::from_str(&text).unwrap();
tracing::info!("sending gdp from measure: {}", gdp);
yield gdp;
}
_ => {
debug!("received non-text message: {:?}", msg);
tracing::info!("received non-text message from measure: {:?}", msg);
}
}
}
Err(WsError::ConnectionClosed) => {
measure_socket.close(None).unwrap();
debug!("measure websocket closed");
tracing::info!("graphql received closed message from measure");
break;
}
Err(e) => {
debug!("measure message receipt failure: {:?}", e);
tracing::info!("graphql message receipt failure from measure: {:?}", e);
break;
}
}
// throttle reads from measure service to avoid blocking close messages from client
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
}
}
}
Expand Down Expand Up @@ -305,17 +305,22 @@ async fn graphql_subscription(
) -> impl IntoResponse {
ws.protocols(http::ALL_WEBSOCKET_PROTOCOLS)
.on_upgrade(move |socket| async move {
// println!("connection opened");
tracing::info!("graphql subscription started");
GraphQLWebSocket::new(socket, schema, protocol)
.keepalive_timeout(None)
.serve()
.await;
// println!("connection closed");
tracing::info!("graphql subscription closed");
})
}

#[tokio::main]
async fn main() {
tracing_subscriber::fmt().with_ansi(false).init();
if let Ok(level) = env::var("RUST_LOG") {
tracing_subscriber::fmt().with_env_filter(level).init();
} else {
tracing_subscriber::fmt().init();
}

let readiness_check_path = env::var(READINESS_CHECK_PATH)
.unwrap_or_else(|_| panic!("{READINESS_CHECK_PATH} variable assignment"));
Expand Down

0 comments on commit 21cb268

Please sign in to comment.