From b97cd55d95f9f868d504d0bf88ce946e747e1db4 Mon Sep 17 00:00:00 2001 From: amanusk Date: Wed, 26 Jun 2019 18:04:19 +0300 Subject: [PATCH] Update deprecated tokio routines * Use tokio-jsoncodec as codec for communication between client and server * Update dependencies to new versions * Update use of deprecated tokio routines and replace the current standard * Run cargo-fmt on files * User Rust-2018 edition rules --- .gitignore | 3 ++ tokio-chat-client/Cargo.toml | 7 ++- tokio-chat-client/src/main.rs | 82 ++++++++++++++++++++--------------- tokio-chat-common/Cargo.toml | 15 ++++--- tokio-chat-common/src/lib.rs | 21 ++++----- tokio-chat-server/Cargo.toml | 9 ++-- tokio-chat-server/src/main.rs | 71 +++++++++++++++++------------- 7 files changed, 116 insertions(+), 92 deletions(-) diff --git a/.gitignore b/.gitignore index a9d37c5..ba7f8f1 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,5 @@ target Cargo.lock +*rusty-tags.vi* +*.swp +.*.swp diff --git a/tokio-chat-client/Cargo.toml b/tokio-chat-client/Cargo.toml index 21403a4..7e99883 100644 --- a/tokio-chat-client/Cargo.toml +++ b/tokio-chat-client/Cargo.toml @@ -1,11 +1,14 @@ [package] name = "tokio-chat-client" -version = "0.1.0" +version = "0.2.0" authors = ["John Gallagher "] +edition = "2018" [dependencies] cursive = "0.3.7" -unicode-width = "0.1.4" +unicode-width = "0.1" futures = "0.1" tokio-core = "0.1" +tokio-codec = "0.1" +tokio = "0.1" tokio-chat-common = { path = "../tokio-chat-common" } diff --git a/tokio-chat-client/src/main.rs b/tokio-chat-client/src/main.rs index 199a70e..f64dad0 100644 --- a/tokio-chat-client/src/main.rs +++ b/tokio-chat-client/src/main.rs @@ -8,8 +8,7 @@ //! with the message `* your_chat_username connected`, and you should be able to type messages. //! Start up another instance of this client (probably with a different username) in another //! window to confirm messages are being broadcast to all clients. -//! -//! The 10,000-foot view archiecture of this client is that a thread is spawned to run a tokio +//! The 10,000-foot view archiecture of this client is that a thread is spawned to run a tokio //! reactor with the client connection to the server, that thread is given a //! `std::sync::mpsc::Sender` it can use to send messages back to the GUI thread, and the GUI //! thread is given a `futures::mpsc::sync::Sender` to send the user's chat messages to the tokio @@ -17,28 +16,24 @@ //! //! Note that the GUI code below is mostly unannotated except where it comes into contact with //! tokio-like things. -extern crate futures; -extern crate tokio_core; -extern crate cursive; -extern crate unicode_width; -extern crate tokio_chat_common; -use cursive::Cursive; use cursive::direction::Direction; use cursive::event::{Event, Key}; use cursive::theme::Theme; use cursive::traits::{Boxable, Identifiable, View}; use cursive::views::{EditView, LinearLayout}; +use cursive::Cursive; use std::thread; +use futures::sync::mpsc; +use futures::{Future, Sink, Stream}; use std::net::SocketAddr; -use tokio_core::io::Io; -use tokio_core::reactor::Core; +use tokio_chat_common::{ + ClientMessage, ClientToServerCodec, Handshake, HandshakeCodec, ServerMessage, +}; +use tokio_codec::Framed; use tokio_core::net::TcpStream; -use futures::{Stream, Sink, Future}; -use futures::sync::mpsc; -use tokio_chat_common::{Handshake, HandshakeCodec, ClientMessage, ServerMessage, - ClientToServerCodec}; +use tokio_core::reactor::Core; mod chat_view; use self::chat_view::ChatView; @@ -47,11 +42,12 @@ use self::chat_view::ChatView; // _not_ a `futures::sync::mpsc::Sender`!). This allows us to send closures to be run in the // Cursive GUI context. #[derive(Clone)] -struct GuiEventSender(std::sync::mpsc::Sender>); +struct GuiEventSender(std::sync::mpsc::Sender>); impl GuiEventSender { fn send(&self, f: F) - where F: Fn(&mut GuiWrapper) + Send + 'static + where + F: Fn(&mut GuiWrapper) + Send + 'static, { self.0 .send(Box::new(move |cursive| f(&mut GuiWrapper::new(cursive)))) @@ -69,18 +65,27 @@ impl<'a> GuiWrapper<'a> { // Build up the Cursive UI. `tx` is a `futures::sync::mpsc::Sender` that we use to send // client input to the thread managing the tokio connection to the server. fn build_ui(&mut self, tx: mpsc::Sender) -> GuiEventSender { - self.0.add_layer(LinearLayout::vertical() - .child(ChatView::new(500) - .with_id("chat") - .full_height()) - .child(EditView::new() - .on_submit(move |cursive, s| { - // This is called whenever the user presses "enter" after entering text. - GuiWrapper::new(cursive).handle_entry_input(s, tx.clone()); - }) - .with_id("input") - .full_width())); - for k in &[Key::Home, Key::End, Key::Up, Key::Down, Key::PageDown, Key::PageUp] { + self.0.add_layer( + LinearLayout::vertical() + .child(ChatView::new(500).with_id("chat").full_height()) + .child( + EditView::new() + .on_submit(move |cursive, s| { + // This is called whenever the user presses "enter" after entering text. + GuiWrapper::new(cursive).handle_entry_input(s, tx.clone()); + }) + .with_id("input") + .full_width(), + ), + ); + for k in &[ + Key::Home, + Key::End, + Key::Up, + Key::Down, + Key::PageDown, + Key::PageUp, + ] { let e = Event::Key(*k); self.0.add_global_callback(e, move |s| { { @@ -88,7 +93,9 @@ impl<'a> GuiWrapper<'a> { chat.on_event(e); } - s.find_id::("input").unwrap().take_focus(Direction::front()); + s.find_id::("input") + .unwrap() + .take_focus(Direction::front()); }); } self.0.set_fps(10); @@ -165,20 +172,25 @@ fn run_client(name: String, gui: GuiEventSender, rx: mpsc::Receiver"] +edition = "2018" [dependencies] -serde = "0.8" -serde_derive = "0.8" -serde_json = "0.8" -tokio-core = "0.1" -byteorder = "1.0" +bytes = "0.4.9" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0.24" +tokio-codec = "0.1.0" +futures = "0.1" +tokio = "0.1.7" +tokio-jsoncodec = "0.1" diff --git a/tokio-chat-common/src/lib.rs b/tokio-chat-common/src/lib.rs index 1e08c04..b3196d5 100644 --- a/tokio-chat-common/src/lib.rs +++ b/tokio-chat-common/src/lib.rs @@ -2,19 +2,14 @@ //! //! This crate should be straightforward; see tokio-chat-server for a description of the //! client/server protocol. -#[macro_use] -extern crate serde_derive; -extern crate serde; -extern crate serde_json; -extern crate tokio_core; -extern crate byteorder; +use serde::{Deserialize, Serialize}; -mod codec; +use tokio_jsoncodec::Codec as JsonCodec; // Handshake message sent from a client to a server when it first connects, identifying the // username of the client. -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct Handshake { pub name: String, } @@ -25,9 +20,9 @@ impl Handshake { } } -pub type HandshakeCodec = codec::LengthPrefixedJson; +pub type HandshakeCodec = JsonCodec; -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct ClientMessage(pub String); impl ClientMessage { @@ -37,7 +32,7 @@ impl ClientMessage { } // Enumerate possible messages the server can send to clients. -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub enum ServerMessage { // A message from a client (first String) containing arbitrary content (second String). Message(String, String), @@ -51,5 +46,5 @@ pub enum ServerMessage { UserDisconnected(String), } -pub type ServerToClientCodec = codec::LengthPrefixedJson; -pub type ClientToServerCodec = codec::LengthPrefixedJson; +pub type ServerToClientCodec = JsonCodec; +pub type ClientToServerCodec = JsonCodec; diff --git a/tokio-chat-server/Cargo.toml b/tokio-chat-server/Cargo.toml index 1074271..3ffb09b 100644 --- a/tokio-chat-server/Cargo.toml +++ b/tokio-chat-server/Cargo.toml @@ -1,13 +1,14 @@ [package] name = "tokio-chat-server" -version = "0.1.0" +version = "0.2.0" authors = ["John Gallagher "] +edition = "2018" [dependencies] -serde = "0.8" -serde_derive = "0.8" -serde_json = "0.8" +serde = "1.0" futures = "0.1" +tokio = "0.1" +tokio-codec = "0.1" tokio-core = "0.1" byteorder = "1.0" tokio-chat-common = { path = "../tokio-chat-common" } diff --git a/tokio-chat-server/src/main.rs b/tokio-chat-server/src/main.rs index a66f94e..8d15aa9 100644 --- a/tokio-chat-server/src/main.rs +++ b/tokio-chat-server/src/main.rs @@ -20,22 +20,18 @@ //! in this project and then in another window run one or more instances the tokio-chat-client //! binary. -extern crate futures; -extern crate tokio_core; -extern crate tokio_chat_common; - +use futures::stream; +use futures::sync::mpsc; +use futures::{Future, Sink, Stream}; use std::cell::RefCell; -use std::rc::Rc; use std::collections::HashMap; use std::io; use std::net::SocketAddr; -use tokio_core::io::Io; -use tokio_core::reactor::Core; +use std::rc::Rc; +use tokio_chat_common::{ClientMessage, HandshakeCodec, ServerMessage, ServerToClientCodec}; +use tokio_codec::Framed; use tokio_core::net::TcpListener; -use futures::{Stream, Sink, Future}; -use futures::stream; -use futures::sync::mpsc; -use tokio_chat_common::{HandshakeCodec, ClientMessage, ServerMessage, ServerToClientCodec}; +use tokio_core::reactor::Core; // For each client that connects, we hang on to an mpsc::Sender (to send the task managing // that client messages) and the name they gave us during handshaking. @@ -83,12 +79,17 @@ impl ConnectedClients { // easier to insert calls to this method in other contexts. This method itself will never // fail. Perhaps the return type could change to `Box>` once // `!` lands? - fn broadcast(&self, message: ServerMessage) -> Box> { + fn broadcast( + &self, + message: ServerMessage, + ) -> Box> { let client_map = self.0.borrow(); // For each client, clone its `mpsc::Sender` (because sending consumes the sender) and // start sending a clone of `message`. This produces an iterator of Futures. - let all_sends = client_map.values().map(|client| client.tx.clone().send(message.clone())); + let all_sends = client_map + .values() + .map(|client| client.tx.clone().send(message.clone())); // Collect the futures into a stream. We don't care about: // @@ -130,13 +131,14 @@ fn main() { let mut core = Core::new().unwrap(); let handle = core.handle(); let listener = TcpListener::bind(&addr, &handle).unwrap(); + println!("Listening on: {}", addr); // Create our (currently empty) stash of clients. let clients = ConnectedClients::new(); let server = listener.incoming().for_each(move |(socket, addr)| { // Frame the socket in a codec that will give us a `Handshake`. - let handshake_io = socket.framed(HandshakeCodec::new()); + let handshake_io = Framed::new(socket, HandshakeCodec::new(false)); // `handshake_io` is a stream, but we just want to read a single `Handshake` off of it // then convert the socket into a different kind of stream. `.into_future()` lets us @@ -144,19 +146,22 @@ fn main() { // `handshake_io` itself. // // If an error occurs, we just want the error and can discard the stream. - let handshake = handshake_io.into_future() - .map_err(|(err, _)| err) + let handshake = handshake_io + .into_future() + .map_err(|(err, _)| err.into()) .and_then(move |(h, io)| { // `h` here is an `Option`. If we did not get a `Handshake`, throw // an error. This can happen if a client connects then disconnects, for example. // If we did get a handshake, log the client's name and return both the handshake // and the unframed socket. (`io.into_inner()` removes the framing and gives back // the underlying `Io` handle, which is `socket` in this case. - h.map_or_else(|| Err(io::Error::from(io::ErrorKind::UnexpectedEof)), - move |h| { - println!("CONNECTED from {:?} with name {}", addr, h.name); - Ok((h, io.into_inner())) - }) + h.map_or_else( + || Err(io::Error::from(io::ErrorKind::UnexpectedEof)), + move |h| { + println!("CONNECTED from {:?} with name {}", addr, h.name); + Ok((h, io.into_inner())) + }, + ) }); // If the handshake succeeds, the next step is to broadcast the `UserConnected` message. @@ -172,8 +177,10 @@ fn main() { // Broadcast the message, and send this client's name, `mpsc::Receiver`, and socket // as the `Item` of this future. - clients.broadcast(ServerMessage::UserConnected(name.clone())) + clients + .broadcast(ServerMessage::UserConnected(name.clone())) .map(|()| (name, rx, socket)) + //.map_err(|err| err.into()) }); // After broadcasting the announcment, the next step is to set up the futures that @@ -182,7 +189,8 @@ fn main() { let connection = announce_connect.and_then(|(name, rx, socket)| { // Frame the socket in a codec that lets us receive `ClientMessage`s and send // `ServerMessage`s. - let (to_client, from_client) = socket.framed(ServerToClientCodec::new()).split(); + let (to_client, from_client) = + Framed::new(socket, ServerToClientCodec::new(false)).split(); // For each incoming `ClientMessage`, attach the sending client's `name` and // broadcast the resulting `ServerMessage::Message` to all connected clients. @@ -196,23 +204,22 @@ fn main() { // `Item` type of that future. let writer = rx .map_err(|()| unreachable!("rx can't fail")) - // `fold` seems to be the most straightforward way to handle this. It takes // an initial value of `to_client` (the sending half of the framed socket); // for each message, it tries to send the message, and the future returned // by `to_client.send` gives back `to_client` itself on success, ready for the // next step of the fold. - .fold(to_client, |to_client, msg| { - to_client.send(msg) - }) - + .fold(to_client, |to_client, msg| to_client.send(msg)) // Once the rx stream is exhausted (because the sender has been dropped), we // no longer need the writing half of the socket, so discard it. .map(|_| ()); // Use select to allow either the reading or writing half dropping to drop the other // half. The `map` and `map_err` here effectively force this drop. - reader.select(writer).map(|_| ()).map_err(|(err, _)| err) + reader + .select(writer) + .map(|_| ()) + .map_err(|(err, _)| err.into()) }); // Finally, spawn off the connection. @@ -234,9 +241,11 @@ fn main() { // but we can sidestep this by taking advantage of the fact that `Option` can also // act as an `Iterator` over its single (or no) element, convert that to a `Stream` // via `stream::iter`, then `fold` over the 0-or-1 long stream to send the message. - let msg = clients_inner.remove(&addr) + let msg = clients_inner + .remove(&addr) .map(|client| ServerMessage::UserDisconnected(client.name)); - stream::iter(msg.map(|m| Ok(m))).fold((), move |(), m| clients_inner.broadcast(m)) + stream::iter_result(msg.map(|m| Ok(m))) + .fold((), move |(), m| clients_inner.broadcast(m)) })); Ok(())