diff --git a/communication/Cargo.toml b/communication/Cargo.toml index 54d854ae8..45c797e08 100644 --- a/communication/Cargo.toml +++ b/communication/Cargo.toml @@ -19,6 +19,7 @@ default = ["getopts"] [dependencies] getopts = { version = "0.2.14", optional = true } bincode = { version = "1.0", optional = true } +byteorder = "1.5" serde_derive = "1.0" serde = "1.0" abomonation = "0.7" diff --git a/communication/src/networking.rs b/communication/src/networking.rs index fb2b850be..9c3f7a953 100644 --- a/communication/src/networking.rs +++ b/communication/src/networking.rs @@ -8,15 +8,19 @@ use std::thread; use std::thread::sleep; use std::time::Duration; -use abomonation::{encode, decode}; +use byteorder::{ReadBytesExt, WriteBytesExt}; // This constant is sent along immediately after establishing a TCP stream, so // that it is easy to sniff out Timely traffic when it is multiplexed with // other traffic on the same port. const HANDSHAKE_MAGIC: u64 = 0xc2f1fb770118add9; +/// The byte order for writing message headers and stream initialization. +type ByteOrder = byteorder::BigEndian; + /// Framing data for each `Vec` transmission, indicating a typed channel, the source and /// destination workers, and the length in bytes. +// *Warning*: Adding, removing and altering fields requires to adjust the implementation below! #[derive(Abomonation, Debug, PartialEq, Eq, Hash, Clone, Copy)] pub struct MessageHeader { /// index of channel. @@ -32,30 +36,51 @@ pub struct MessageHeader { } impl MessageHeader { + + /// The number of `usize` fields in [MessageHeader]. + const FIELDS: usize = 5; + /// Returns a header when there is enough supporting data #[inline] pub fn try_read(bytes: &mut [u8]) -> Option { - unsafe { decode::(bytes) } - .and_then(|(header, remaining)| { - if remaining.len() >= header.length { - Some(header.clone()) - } - else { - None - } - }) + let mut cursor = io::Cursor::new(&bytes[..]); + let mut buffer = [0; Self::FIELDS]; + cursor.read_u64_into::(&mut buffer).ok()?; + let header = MessageHeader { + // Order must match writing order. + channel: buffer[0] as usize, + source: buffer[1] as usize, + target: buffer[2] as usize, + length: buffer[3] as usize, + seqno: buffer[4] as usize, + }; + + if bytes.len() >= header.required_bytes() { + Some(header) + } else { + None + } } /// Writes the header as binary data. #[inline] - pub fn write_to(&self, writer: &mut W) -> ::std::io::Result<()> { - unsafe { encode(self, writer) } + pub fn write_to(&self, writer: &mut W) -> Result<()> { + let mut buffer = [0u8; std::mem::size_of::() * Self::FIELDS]; + let mut cursor = io::Cursor::new(&mut buffer[..]); + // Order must match reading order. + cursor.write_u64::(self.channel as u64)?; + cursor.write_u64::(self.source as u64)?; + cursor.write_u64::(self.target as u64)?; + cursor.write_u64::(self.length as u64)?; + cursor.write_u64::(self.seqno as u64)?; + + writer.write_all(&buffer[..]) } /// The number of bytes required for the header and data. #[inline] pub fn required_bytes(&self) -> usize { - ::std::mem::size_of::() + self.length + std::mem::size_of::() * Self::FIELDS + self.length } } @@ -89,8 +114,8 @@ pub fn start_connections(addresses: Arc>, my_index: usize, noisy: bo match TcpStream::connect(address) { Ok(mut stream) => { stream.set_nodelay(true).expect("set_nodelay call failed"); - unsafe { encode(&HANDSHAKE_MAGIC, &mut stream) }.expect("failed to encode/send handshake magic"); - unsafe { encode(&(my_index as u64), &mut stream) }.expect("failed to encode/send worker index"); + stream.write_u64::(HANDSHAKE_MAGIC).expect("failed to encode/send handshake magic"); + stream.write_u64::(my_index as u64).expect("failed to encode/send worker index"); if noisy { println!("worker {}:\tconnection to worker {}", my_index, index); } break Some(stream); }, @@ -115,12 +140,13 @@ pub fn await_connections(addresses: Arc>, my_index: usize, noisy: bo stream.set_nodelay(true).expect("set_nodelay call failed"); let mut buffer = [0u8;16]; stream.read_exact(&mut buffer)?; - let (magic, mut buffer) = unsafe { decode::(&mut buffer) }.expect("failed to decode magic"); - if magic != &HANDSHAKE_MAGIC { + let mut cursor = io::Cursor::new(buffer); + let magic = cursor.read_u64::().expect("failed to decode magic"); + if magic != HANDSHAKE_MAGIC { return Err(io::Error::new(io::ErrorKind::InvalidData, "received incorrect timely handshake")); } - let identifier = unsafe { decode::(&mut buffer) }.expect("failed to decode worker index").0.clone() as usize; + let identifier = cursor.read_u64::().expect("failed to decode worker index") as usize; results[identifier - my_index - 1] = Some(stream); if noisy { println!("worker {}:\tconnection from worker {}", my_index, identifier); } }