Skip to content

Commit 206bf13

Browse files
committed
fix(cstream): Switch to async_channel to workaround tokio's try_recv()
See tokio bug tokio-rs/tokio#3350
1 parent a5ad215 commit 206bf13

File tree

2 files changed

+8
-6
lines changed

2 files changed

+8
-6
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ futures = "0.3"
1212
num_enum = "0.5"
1313
lazy_static = "1.4"
1414
tokio = { version = "1.3", features = ["sync"] }
15+
# Tokio's mpsc::sync channel has a bug in try_recv(), in the meantime we use async_channel
16+
async-channel = "1.6.1"
1517

1618
[dev-dependencies]
1719
tokio = { version = "1.3", features = ["macros", "rt-multi-thread"] }

src/ctanker/cstream.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use crate::ctanker::*;
2525
use crate::error::Error;
2626

2727
use ::core::pin::Pin;
28+
use async_channel::{bounded, Receiver, Sender, TryRecvError};
2829
use futures::executor::block_on;
2930
use futures::future::{select, Either};
3031
use futures::io::{AsyncRead, AsyncReadExt};
@@ -34,7 +35,6 @@ use futures::FutureExt;
3435
use std::cmp::min;
3536
use std::future::Future;
3637
use std::sync::Mutex;
37-
use tokio::sync::mpsc::{channel, Receiver, Sender};
3838

3939
#[derive(Debug, Clone)]
4040
struct ReadOperation {
@@ -63,7 +63,7 @@ struct TankerStream<UserStream: AsyncRead + Unpin> {
6363

6464
impl<UserStream: AsyncRead + Unpin> TankerStream<UserStream> {
6565
fn new() -> Self {
66-
let (sender, receiver) = channel(1);
66+
let (sender, receiver) = bounded(1);
6767
TankerStream {
6868
user_stream: None,
6969
tanker_stream_handle: std::ptr::null_mut(),
@@ -175,18 +175,18 @@ impl<UserStream: AsyncRead + Unpin> AsyncRead for TankerStream<UserStream> {
175175
}
176176

177177
// Take the next ReadOperation if there is one
178-
match self.receiver.recv().now_or_never() {
179-
Some(Some(read_operation)) => {
178+
match self.receiver.try_recv() {
179+
Ok(read_operation) => {
180180
debug_assert!(
181181
self.read_operation.is_none(),
182182
"Tanker never asks for a ReadOperation if one is in progress"
183183
);
184184
self.read_operation = Some(read_operation);
185185
}
186-
Some(None) => {
186+
Err(TryRecvError::Closed) => {
187187
panic!("error reading channel: closed");
188188
}
189-
None => {} // Channel still open, but no message
189+
Err(TryRecvError::Empty) => {} // Channel still open, but no message
190190
}
191191

192192
// Process the ReadOperation if there is one in progress

0 commit comments

Comments
 (0)