Skip to content

Commit

Permalink
fix timing issue of streaming
Browse files Browse the repository at this point in the history
the stream request and data is handle asynchronously, if the data is
handled when stream is not created yet, the data will be discarded and
an error occur.

Signed-off-by: Abel <[email protected]>
  • Loading branch information
abel-von committed Mar 14, 2024
1 parent 6f12d9c commit 431bfb8
Showing 1 changed file with 17 additions and 5 deletions.
22 changes: 17 additions & 5 deletions src/asynchronous/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,12 +380,14 @@ impl ReaderDelegate for ServerReader {
async fn handle_msg(&self, msg: GenMessage) {
let handler_shutdown_waiter = self.handler_shutdown.subscribe();
let context = self.context();
let (wait_tx, wait_rx) = tokio::sync::oneshot::channel::<()>();
spawn(async move {
select! {
_ = context.handle_msg(msg) => {}
_ = context.handle_msg(msg, wait_tx) => {}
_ = handler_shutdown_waiter.wait_shutdown() => {}
}
});
wait_rx.await.unwrap_or_default();
}

async fn handle_err(&self, header: MessageHeader, e: Error) {
Expand Down Expand Up @@ -423,7 +425,7 @@ impl HandlerContext {
})
.ok();
}
async fn handle_msg(&self, msg: GenMessage) {
async fn handle_msg(&self, msg: GenMessage, wait_tx: tokio::sync::oneshot::Sender<()>) {
let stream_id = msg.header.stream_id;

if (stream_id % 2) != 1 {
Expand All @@ -437,7 +439,7 @@ impl HandlerContext {
}

match msg.header.type_ {
MESSAGE_TYPE_REQUEST => match self.handle_request(msg).await {
MESSAGE_TYPE_REQUEST => match self.handle_request(msg, wait_tx).await {
Ok(opt_msg) => match opt_msg {
Some(mut resp) => {
// Server: check size before sending to client
Expand Down Expand Up @@ -470,6 +472,8 @@ impl HandlerContext {
Err(status) => Self::respond_with_status(self.tx.clone(), stream_id, status).await,
},
MESSAGE_TYPE_DATA => {
// no need to wait data message handling
drop(wait_tx);
// TODO(wllenyj): Compatible with golang behavior.
if (msg.header.flags & FLAG_REMOTE_CLOSED) == FLAG_REMOTE_CLOSED
&& !msg.payload.is_empty()
Expand Down Expand Up @@ -517,7 +521,11 @@ impl HandlerContext {
}
}

async fn handle_request(&self, msg: GenMessage) -> StdResult<Option<Response>, Status> {
async fn handle_request(
&self,
msg: GenMessage,
wait_tx: tokio::sync::oneshot::Sender<()>,
) -> StdResult<Option<Response>, Status> {
//TODO:
//if header.stream_id <= self.last_stream_id {
// return Err;
Expand All @@ -538,10 +546,11 @@ impl HandlerContext {
})?;

if let Some(method) = srv.get_method(&req.method) {
drop(wait_tx);
return self.handle_method(method, req_msg).await;
}
if let Some(stream) = srv.get_stream(&req.method) {
return self.handle_stream(stream, req_msg).await;
return self.handle_stream(stream, req_msg, wait_tx).await;
}
Err(get_status(
Code::UNIMPLEMENTED,
Expand Down Expand Up @@ -597,6 +606,7 @@ impl HandlerContext {
&self,
stream: Arc<dyn StreamHandler + Send + Sync>,
req_msg: Message<Request>,
wait_tx: tokio::sync::oneshot::Sender<()>,
) -> StdResult<Option<Response>, Status> {
let stream_id = req_msg.header.stream_id;
let req = req_msg.payload;
Expand All @@ -608,6 +618,8 @@ impl HandlerContext {

let no_data = (req_msg.header.flags & FLAG_NO_DATA) == FLAG_NO_DATA;

drop(wait_tx);

let si = StreamInner::new(
stream_id,
self.tx.clone(),
Expand Down

0 comments on commit 431bfb8

Please sign in to comment.