From e70351e54399ddeaafdcc9550c9957ddd246febf Mon Sep 17 00:00:00 2001 From: Abel Feng Date: Thu, 7 Mar 2024 20:01:44 +0800 Subject: [PATCH] fix timing issue of streaming the stream request and data is handle asynchronously, if the data is handled when stream is not created yet, the data will be discarded and an error occur. Signed-off-by: Abel --- src/asynchronous/server.rs | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/src/asynchronous/server.rs b/src/asynchronous/server.rs index 0934831..592d4d4 100644 --- a/src/asynchronous/server.rs +++ b/src/asynchronous/server.rs @@ -381,12 +381,14 @@ impl ReaderDelegate for ServerReader { async fn handle_msg(&self, msg: GenMessage) { let handler_shutdown_waiter = self.handler_shutdown.subscribe(); let context = self.context(); + let (wait_tx, wait_rx) = tokio::sync::oneshot::channel::<()>(); spawn(async move { select! { - _ = context.handle_msg(msg) => {} + _ = context.handle_msg(msg, wait_tx) => {} _ = handler_shutdown_waiter.wait_shutdown() => {} } }); + wait_rx.await.unwrap_or_default(); } async fn handle_err(&self, header: MessageHeader, e: Error) { @@ -424,7 +426,7 @@ impl HandlerContext { }) .ok(); } - async fn handle_msg(&self, msg: GenMessage) { + async fn handle_msg(&self, msg: GenMessage, wait_tx: tokio::sync::oneshot::Sender<()>) { let stream_id = msg.header.stream_id; if (stream_id % 2) != 1 { @@ -438,7 +440,7 @@ impl HandlerContext { } match msg.header.type_ { - MESSAGE_TYPE_REQUEST => match self.handle_request(msg).await { + MESSAGE_TYPE_REQUEST => match self.handle_request(msg, wait_tx).await { Ok(opt_msg) => match opt_msg { Some(mut resp) => { // Server: check size before sending to client @@ -471,6 +473,8 @@ impl HandlerContext { Err(status) => Self::respond_with_status(self.tx.clone(), stream_id, status).await, }, MESSAGE_TYPE_DATA => { + // no need to wait data message handling + drop(wait_tx); // TODO(wllenyj): Compatible with golang behavior. if (msg.header.flags & FLAG_REMOTE_CLOSED) == FLAG_REMOTE_CLOSED && !msg.payload.is_empty() @@ -518,7 +522,11 @@ impl HandlerContext { } } - async fn handle_request(&self, msg: GenMessage) -> StdResult, Status> { + async fn handle_request( + &self, + msg: GenMessage, + wait_tx: tokio::sync::oneshot::Sender<()>, + ) -> StdResult, Status> { //TODO: //if header.stream_id <= self.last_stream_id { // return Err; @@ -539,10 +547,11 @@ impl HandlerContext { })?; if let Some(method) = srv.get_method(&req.method) { + drop(wait_tx); return self.handle_method(method, req_msg).await; } if let Some(stream) = srv.get_stream(&req.method) { - return self.handle_stream(stream, req_msg).await; + return self.handle_stream(stream, req_msg, wait_tx).await; } Err(get_status( Code::UNIMPLEMENTED, @@ -598,6 +607,7 @@ impl HandlerContext { &self, stream: Arc, req_msg: Message, + wait_tx: tokio::sync::oneshot::Sender<()>, ) -> StdResult, Status> { let stream_id = req_msg.header.stream_id; let req = req_msg.payload; @@ -609,6 +619,8 @@ impl HandlerContext { let no_data = (req_msg.header.flags & FLAG_NO_DATA) == FLAG_NO_DATA; + drop(wait_tx); + let si = StreamInner::new( stream_id, self.tx.clone(),