Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update deprecated tokio routines #2

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
target
Cargo.lock
*rusty-tags.vi*
*.swp
.*.swp
7 changes: 5 additions & 2 deletions tokio-chat-client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
[package]
name = "tokio-chat-client"
version = "0.1.0"
version = "0.2.0"
authors = ["John Gallagher <[email protected]>"]
edition = "2018"

[dependencies]
cursive = "0.3.7"
unicode-width = "0.1.4"
unicode-width = "0.1"
futures = "0.1"
tokio-core = "0.1"
tokio-codec = "0.1"
tokio = "0.1"
tokio-chat-common = { path = "../tokio-chat-common" }
82 changes: 46 additions & 36 deletions tokio-chat-client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,37 +8,32 @@
//! with the message `* your_chat_username connected`, and you should be able to type messages.
//! Start up another instance of this client (probably with a different username) in another
//! window to confirm messages are being broadcast to all clients.
//!
//! The 10,000-foot view archiecture of this client is that a thread is spawned to run a tokio
//! The 10,000-foot view archiecture of this client is that a thread is spawned to run a tokio
//! reactor with the client connection to the server, that thread is given a
//! `std::sync::mpsc::Sender` it can use to send messages back to the GUI thread, and the GUI
//! thread is given a `futures::mpsc::sync::Sender` to send the user's chat messages to the tokio
//! thread. See the comments in tokio-chat-server for a description of the client/server protocol.
//!
//! Note that the GUI code below is mostly unannotated except where it comes into contact with
//! tokio-like things.
extern crate futures;
extern crate tokio_core;
extern crate cursive;
extern crate unicode_width;
extern crate tokio_chat_common;

use cursive::Cursive;
use cursive::direction::Direction;
use cursive::event::{Event, Key};
use cursive::theme::Theme;
use cursive::traits::{Boxable, Identifiable, View};
use cursive::views::{EditView, LinearLayout};
use cursive::Cursive;
use std::thread;

use futures::sync::mpsc;
use futures::{Future, Sink, Stream};
use std::net::SocketAddr;
use tokio_core::io::Io;
use tokio_core::reactor::Core;
use tokio_chat_common::{
ClientMessage, ClientToServerCodec, Handshake, HandshakeCodec, ServerMessage,
};
use tokio_codec::Framed;
use tokio_core::net::TcpStream;
use futures::{Stream, Sink, Future};
use futures::sync::mpsc;
use tokio_chat_common::{Handshake, HandshakeCodec, ClientMessage, ServerMessage,
ClientToServerCodec};
use tokio_core::reactor::Core;

mod chat_view;
use self::chat_view::ChatView;
Expand All @@ -47,11 +42,12 @@ use self::chat_view::ChatView;
// _not_ a `futures::sync::mpsc::Sender`!). This allows us to send closures to be run in the
// Cursive GUI context.
#[derive(Clone)]
struct GuiEventSender(std::sync::mpsc::Sender<Box<Fn(&mut Cursive) + Send>>);
struct GuiEventSender(std::sync::mpsc::Sender<Box<dyn Fn(&mut Cursive) + Send>>);

impl GuiEventSender {
fn send<F>(&self, f: F)
where F: Fn(&mut GuiWrapper) + Send + 'static
where
F: Fn(&mut GuiWrapper) + Send + 'static,
{
self.0
.send(Box::new(move |cursive| f(&mut GuiWrapper::new(cursive))))
Expand All @@ -69,26 +65,37 @@ impl<'a> GuiWrapper<'a> {
// Build up the Cursive UI. `tx` is a `futures::sync::mpsc::Sender` that we use to send
// client input to the thread managing the tokio connection to the server.
fn build_ui(&mut self, tx: mpsc::Sender<ClientMessage>) -> GuiEventSender {
self.0.add_layer(LinearLayout::vertical()
.child(ChatView::new(500)
.with_id("chat")
.full_height())
.child(EditView::new()
.on_submit(move |cursive, s| {
// This is called whenever the user presses "enter" after entering text.
GuiWrapper::new(cursive).handle_entry_input(s, tx.clone());
})
.with_id("input")
.full_width()));
for k in &[Key::Home, Key::End, Key::Up, Key::Down, Key::PageDown, Key::PageUp] {
self.0.add_layer(
LinearLayout::vertical()
.child(ChatView::new(500).with_id("chat").full_height())
.child(
EditView::new()
.on_submit(move |cursive, s| {
// This is called whenever the user presses "enter" after entering text.
GuiWrapper::new(cursive).handle_entry_input(s, tx.clone());
})
.with_id("input")
.full_width(),
),
);
for k in &[
Key::Home,
Key::End,
Key::Up,
Key::Down,
Key::PageDown,
Key::PageUp,
] {
let e = Event::Key(*k);
self.0.add_global_callback(e, move |s| {
{
let chat = s.find_id::<ChatView>("chat").unwrap();
chat.on_event(e);
}

s.find_id::<EditView>("input").unwrap().take_focus(Direction::front());
s.find_id::<EditView>("input")
.unwrap()
.take_focus(Direction::front());
});
}
self.0.set_fps(10);
Expand Down Expand Up @@ -165,20 +172,25 @@ fn run_client(name: String, gui: GuiEventSender, rx: mpsc::Receiver<ClientMessag
// Create the event loop and initiate the connection to the remote server
let mut core = Core::new().unwrap();
let handle = core.handle();
let tcp = TcpStream::connect(&addr, &handle);
let tcp = TcpStream::connect(&addr, &handle).map_err(|err| err.into());

// Once we connect, send a `Handshake` with our name.
let handshake = tcp.and_then(|stream| {
let handshake_io = stream.framed(HandshakeCodec::new());
let handshake_io = Framed::new(stream, HandshakeCodec::new(false));

// After sending the handshake, convert the framed stream back into its inner socket.
handshake_io.send(Handshake::new(name)).map(|handshake_io| handshake_io.into_inner())
handshake_io
.send(Handshake::new(name.clone()))
.map(|handshake_io| {
println!("Sending {}", name.clone());
handshake_io.into_inner()
})
});

// Once we've sent our `Handshake`, start listening for messages from either the server (to
// send to the GUI thread) or the GUI thread (to send to the server).
let client = handshake.and_then(|socket| {
let (to_server, from_server) = socket.framed(ClientToServerCodec::new()).split();
let (to_server, from_server) = Framed::new(socket, ClientToServerCodec::new(false)).split();

// For each incoming message...
let reader = from_server.for_each(move |msg| {
Expand All @@ -200,9 +212,7 @@ fn run_client(name: String, gui: GuiEventSender, rx: mpsc::Receiver<ClientMessag
// being fed from the GUI thread instead of from other futures.
let writer = rx
.map_err(|()| unreachable!("rx can't fail"))
.fold(to_server, |to_server, msg| {
to_server.send(msg)
})
.fold(to_server, |to_server, msg| to_server.send(msg))
.map(|_| ());

// Use select to allow either the reading or writing half dropping to drop the other
Expand Down
15 changes: 9 additions & 6 deletions tokio-chat-common/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
[package]
name = "tokio-chat-common"
version = "0.1.0"
version = "0.2.0"
authors = ["John Gallagher <[email protected]>"]
edition = "2018"

[dependencies]
serde = "0.8"
serde_derive = "0.8"
serde_json = "0.8"
tokio-core = "0.1"
byteorder = "1.0"
bytes = "0.4.9"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.24"
tokio-codec = "0.1.0"
futures = "0.1"
tokio = "0.1.7"
tokio-jsoncodec = "0.1"
21 changes: 8 additions & 13 deletions tokio-chat-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,14 @@
//!
//! This crate should be straightforward; see tokio-chat-server for a description of the
//! client/server protocol.
#[macro_use]
extern crate serde_derive;

extern crate serde;
extern crate serde_json;
extern crate tokio_core;
extern crate byteorder;
use serde::{Deserialize, Serialize};

mod codec;
use tokio_jsoncodec::Codec as JsonCodec;

// Handshake message sent from a client to a server when it first connects, identifying the
// username of the client.
#[derive(Serialize, Deserialize, Debug, Clone)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Handshake {
pub name: String,
}
Expand All @@ -25,9 +20,9 @@ impl Handshake {
}
}

pub type HandshakeCodec = codec::LengthPrefixedJson<Handshake, Handshake>;
pub type HandshakeCodec = JsonCodec<Handshake, Handshake>;

#[derive(Serialize, Deserialize, Debug, Clone)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ClientMessage(pub String);

impl ClientMessage {
Expand All @@ -37,7 +32,7 @@ impl ClientMessage {
}

// Enumerate possible messages the server can send to clients.
#[derive(Serialize, Deserialize, Debug, Clone)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ServerMessage {
// A message from a client (first String) containing arbitrary content (second String).
Message(String, String),
Expand All @@ -51,5 +46,5 @@ pub enum ServerMessage {
UserDisconnected(String),
}

pub type ServerToClientCodec = codec::LengthPrefixedJson<ClientMessage, ServerMessage>;
pub type ClientToServerCodec = codec::LengthPrefixedJson<ServerMessage, ClientMessage>;
pub type ServerToClientCodec = JsonCodec<ClientMessage, ServerMessage>;
pub type ClientToServerCodec = JsonCodec<ServerMessage, ClientMessage>;
9 changes: 5 additions & 4 deletions tokio-chat-server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
[package]
name = "tokio-chat-server"
version = "0.1.0"
version = "0.2.0"
authors = ["John Gallagher <[email protected]>"]
edition = "2018"

[dependencies]
serde = "0.8"
serde_derive = "0.8"
serde_json = "0.8"
serde = "1.0"
futures = "0.1"
tokio = "0.1"
tokio-codec = "0.1"
tokio-core = "0.1"
byteorder = "1.0"
tokio-chat-common = { path = "../tokio-chat-common" }
Loading