Skip to content

Commit

Permalink
Fix extensive memory usage in poll
Browse files Browse the repository at this point in the history
... by checking whether another full packet can be decoded from the existing buffer before trying to read more data from the socket.
  • Loading branch information
athre0z committed Jul 30, 2019
1 parent 6608407 commit 32f9888
Showing 1 changed file with 51 additions and 27 deletions.
78 changes: 51 additions & 27 deletions src/io/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ pub(crate) struct ClickhouseTransport {
done: bool,
// Buffered read data
rd: Vec<u8>,
// Whether the buffer is known to be incomplete
buf_is_incomplete: bool,
// Current buffer to write to the socket
wr: io::Cursor<Vec<u8>>,
// Queued commands
Expand Down Expand Up @@ -53,6 +55,7 @@ impl ClickhouseTransport {
inner,
done: false,
rd: vec![],
buf_is_incomplete: false,
wr: io::Cursor::new(vec![]),
cmds: VecDeque::new(),
timezone: None,
Expand Down Expand Up @@ -104,6 +107,38 @@ impl ClickhouseTransport {
}
}
}

fn try_parse_msg(&mut self) -> Poll<Option<Packet<()>>, Error> {
let pos;
let ret = {
let mut cursor = Cursor::new(&self.rd);
let res = {
let mut parser = Parser::new(&mut cursor, self.timezone, self.compress);
parser.parse_packet()
};
pos = cursor.position() as usize;

if let Ok(Packet::Hello(_, ref packet)) = res {
self.timezone = Some(packet.timezone);
}

match res {
Ok(val) => Ok(Async::Ready(Some(val))),
Err(e) => e.into(),
}
};

match ret {
Ok(Async::NotReady) => (),
_ => {
// Data is consumed
let tail = self.rd.split_off(pos);
mem::replace(&mut self.rd, tail);
}
}

ret
}
}

impl ClickhouseTransport {
Expand Down Expand Up @@ -135,7 +170,16 @@ impl Stream for ClickhouseTransport {

/// Read a message from the `Transport`
fn poll(&mut self) -> Poll<Option<Packet<()>>, Error> {
// First fill the buffer
// Check whether our currently buffered data is enough for a packet
// before reading any more data. This prevents the buffer from growing
// indefinitely when the sender is faster than we can consume the data
if !self.buf_is_incomplete && !self.rd.is_empty() {
if let ret @ Async::Ready(_) = self.try_parse_msg()? {
return Ok(ret);
}
}

// Fill the buffer!
while !self.done {
match self.inner.read_to_end(&mut self.rd) {
Ok(0) => {
Expand All @@ -153,35 +197,15 @@ impl Stream for ClickhouseTransport {
}
}

// Try to parse some data!
let pos;
let ret = {
let mut cursor = Cursor::new(&self.rd);
let res = {
let mut parser = Parser::new(&mut cursor, self.timezone, self.compress);
parser.parse_packet()
};
pos = cursor.position() as usize;

if let Ok(Packet::Hello(_, ref packet)) = res {
self.timezone = Some(packet.timezone);
}
// Try to parse the new data!
let ret = self.try_parse_msg();

match res {
Ok(val) => Ok(Async::Ready(Some(val))),
Err(e) => e.into(),
}
self.buf_is_incomplete = if let Ok(Async::NotReady) = ret {
true
} else {
false
};

match ret {
Ok(Async::NotReady) => {}
_ => {
// Data is consumed
let tail = self.rd.split_off(pos);
mem::replace(&mut self.rd, tail);
}
}

ret
}
}
Expand Down

0 comments on commit 32f9888

Please sign in to comment.