diff --git a/src/io/transport.rs b/src/io/transport.rs index 762ebfee..7569abbe 100644 --- a/src/io/transport.rs +++ b/src/io/transport.rs @@ -25,6 +25,8 @@ pub(crate) struct ClickhouseTransport { done: bool, // Buffered read data rd: Vec, + // Whether the buffer is known to be incomplete + buf_is_incomplete: bool, // Current buffer to write to the socket wr: io::Cursor>, // Queued commands @@ -53,6 +55,7 @@ impl ClickhouseTransport { inner, done: false, rd: vec![], + buf_is_incomplete: false, wr: io::Cursor::new(vec![]), cmds: VecDeque::new(), timezone: None, @@ -104,6 +107,38 @@ impl ClickhouseTransport { } } } + + fn try_parse_msg(&mut self) -> Poll>, Error> { + let pos; + let ret = { + let mut cursor = Cursor::new(&self.rd); + let res = { + let mut parser = Parser::new(&mut cursor, self.timezone, self.compress); + parser.parse_packet() + }; + pos = cursor.position() as usize; + + if let Ok(Packet::Hello(_, ref packet)) = res { + self.timezone = Some(packet.timezone); + } + + match res { + Ok(val) => Ok(Async::Ready(Some(val))), + Err(e) => e.into(), + } + }; + + match ret { + Ok(Async::NotReady) => (), + _ => { + // Data is consumed + let tail = self.rd.split_off(pos); + mem::replace(&mut self.rd, tail); + } + } + + ret + } } impl ClickhouseTransport { @@ -135,7 +170,16 @@ impl Stream for ClickhouseTransport { /// Read a message from the `Transport` fn poll(&mut self) -> Poll>, Error> { - // First fill the buffer + // Check whether our currently buffered data is enough for a packet + // before reading any more data. This prevents the buffer from growing + // indefinitely when the sender is faster than we can consume the data + if !self.buf_is_incomplete && !self.rd.is_empty() { + if let ret @ Async::Ready(_) = self.try_parse_msg()? { + return Ok(ret); + } + } + + // Fill the buffer! while !self.done { match self.inner.read_to_end(&mut self.rd) { Ok(0) => { @@ -153,35 +197,15 @@ impl Stream for ClickhouseTransport { } } - // Try to parse some data! - let pos; - let ret = { - let mut cursor = Cursor::new(&self.rd); - let res = { - let mut parser = Parser::new(&mut cursor, self.timezone, self.compress); - parser.parse_packet() - }; - pos = cursor.position() as usize; - - if let Ok(Packet::Hello(_, ref packet)) = res { - self.timezone = Some(packet.timezone); - } + // Try to parse the new data! + let ret = self.try_parse_msg(); - match res { - Ok(val) => Ok(Async::Ready(Some(val))), - Err(e) => e.into(), - } + self.buf_is_incomplete = if let Ok(Async::NotReady) = ret { + true + } else { + false }; - match ret { - Ok(Async::NotReady) => {} - _ => { - // Data is consumed - let tail = self.rd.split_off(pos); - mem::replace(&mut self.rd, tail); - } - } - ret } }