From 1b922daa11a5e2d77e1e408e64d501009eaeaa37 Mon Sep 17 00:00:00 2001
From: max funk <mxfactorial@users.noreply.github.com>
Date: Thu, 28 Nov 2024 14:00:39 -0700
Subject: [PATCH] use async tungstenite in graphql service

---
 services/graphql/src/main.rs | 34 +++++++++++++++++++++++-----------
 1 file changed, 23 insertions(+), 11 deletions(-)

diff --git a/services/graphql/src/main.rs b/services/graphql/src/main.rs
index aa55e648..916b76a7 100644
--- a/services/graphql/src/main.rs
+++ b/services/graphql/src/main.rs
@@ -16,7 +16,7 @@ use axum::{
     routing::get,
     Router,
 };
-use futures_util::stream::Stream;
+use futures_util::{stream::Stream, StreamExt};
 use httpclient::HttpClient as Client;
 use serde_json::json;
 use shutdown::shutdown_signal;
@@ -205,7 +205,7 @@ impl Subscription {
             .to_string();
         let ws_client = WsClient::new(uri, "gdp".to_string(), date, country, region, municipality);
         stream! {
-            let mut measure_socket = match ws_client.connect() {
+            let measure_socket = match ws_client.connect().await {
                 Ok(ws) => {
                     tracing::info!("graphql websocket connection created with measure");
                     ws
@@ -215,9 +215,17 @@ impl Subscription {
                     return;
                 }
             };
+
+            // the async-graphql crate limits the scope of the websocket lifecycle to where its upgraded:
+            // https://github.com/async-graphql/async-graphql/issues/1022#issuecomment-1214541591
+            // this means it cant send a close message to the measure service from inside the subscription
+            // todo: add on_close(closed) callback support to graphql subscription context so
+            // a close message can be written/sent to the measure service
+            let (_write, mut read) = measure_socket.split();
+
             loop {
-                match measure_socket.read() {
-                    Ok(msg) => {
+                match read.next().await {
+                    Some(Ok(msg)) => {
                         match msg {
                             tungstenite::Message::Text(text) => {
                                 let gdp: f64 = serde_json::from_str(&text).unwrap();
@@ -229,18 +237,22 @@ impl Subscription {
                             }
                         }
                     }
-                    Err(WsError::ConnectionClosed) => {
-                        measure_socket.close(None).unwrap();
-                        tracing::info!("graphql received closed message from measure");
+                    Some(Err(e)) => {
+                        match e {
+                            WsError::ConnectionClosed => {
+                                tracing::info!("graphql received closed message from measure");
+                            }
+                            _ => {
+                                tracing::info!("graphql message receipt failure from measure: {:?}", e);
+                            }
+                        }
                         break;
                     }
-                    Err(e) => {
-                        tracing::info!("graphql message receipt failure from measure: {:?}", e);
+                    None => {
+                        tracing::info!("graphql received closed message from measure");
                         break;
                     }
                 }
-                // throttle reads from measure service to avoid blocking close messages from client
-                tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
             }
         }
     }