Skip to content

Commit

Permalink
Merge pull request #2 from krojew/tokio-0.3
Browse files Browse the repository at this point in the history
Tokio 0.3
  • Loading branch information
krojew authored Oct 16, 2020
2 parents d2f6fa0 + 9406d09 commit 0c7053f
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 12 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## [x.x.x]

### Changed

- Updated `tokio` to 0.3.

### New

- To string conversion from some enums
Expand Down
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ futures = "0.3"
lazy_static = "1.4"
num_enum = "0.5"
snowflake = "1.3.0"
tokio = { version = "0.2", features = ["rt-threaded", "sync", "tcp", "time", "io-util", "macros"] }
tokio = { version = "0.3", features = ["rt-multi-thread", "sync", "net", "time", "io-util", "macros"] }
tracing = "0.1"
uuid = { version = "0.8", features = ["std", "v4"] }

[dev-dependencies]
tracing-subscriber = "0.2"

[features]
unstable = []
21 changes: 10 additions & 11 deletions src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use tokio::net::tcp::OwnedWriteHalf;
use tokio::net::TcpStream;
use tokio::prelude::*;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::time::delay_for;
use tokio::time::sleep;
use tracing::*;

use crate::listeners::ListenerSet;
Expand Down Expand Up @@ -282,10 +282,10 @@ impl ZkIo {
match atype {
ZkTimeout::Ping => {
let duration = self.ping_timeout_duration.clone();
let (future, handle) = abortable(delay_for(duration));
let (future, handle) = abortable(sleep(duration));
self.ping_timeout = Some(handle);

let mut tx = self.ping_tx.clone();
let tx = self.ping_tx.clone();
tokio::spawn(async move {
if future.await.is_ok() {
let _ = tx.send(()).await;
Expand All @@ -294,10 +294,10 @@ impl ZkIo {
}
ZkTimeout::Connect => {
let duration = self.conn_timeout_duration.clone();
let (future, handle) = abortable(delay_for(duration));
let (future, handle) = abortable(sleep(duration));
self.conn_timeout = Some(handle);

let mut tx = self.connect_tx.clone();
let tx = self.connect_tx.clone();
tokio::spawn(async move {
if future.await.is_ok() {
let _ = tx.send(()).await;
Expand Down Expand Up @@ -345,20 +345,19 @@ impl ZkIo {
let (mut rx, tx) = sock.into_split();
self.sock_tx = Some(tx);

let mut data_tx = self.data_tx.clone();
let data_tx = self.data_tx.clone();
tokio::spawn(async move {
let mut buf = BytesMut::with_capacity(4096);
while let Ok(read) = rx.read_buf(&mut buf).await {
let mut buf = [0u8; 4096];
while let Ok(read) = rx.read(&mut buf).await {
trace!("Received {:?} bytes", read);
if data_tx.send(buf).await.is_err() {

if data_tx.send(buf[..read].into()).await.is_err() {
return;
}

if read == 0 {
break;
}

buf = BytesMut::with_capacity(4096);
}

trace!("Exiting read loop");
Expand Down
4 changes: 4 additions & 0 deletions tests/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
use std::io::{BufRead, BufReader, Write};
use std::process::{Child, Command, Stdio};

use tracing_subscriber;

pub struct ZkCluster {
process: Child,
pub connect_string: String,
Expand All @@ -11,6 +13,8 @@ pub struct ZkCluster {

impl ZkCluster {
pub fn start(instances: usize) -> ZkCluster {
let _ = tracing_subscriber::fmt::try_init();

let mut process = match Command::new("java")
.arg("-jar")
.arg("zk-test-cluster/target/main.jar")
Expand Down

0 comments on commit 0c7053f

Please sign in to comment.