Skip to content

Commit

Permalink
Merge pull request suharev7#44 from athre0z/fix-extensive-mem-usage
Browse files Browse the repository at this point in the history
Fix extensive memory usage in `ClickhouseTransport::poll`
  • Loading branch information
suharev7 authored Jul 30, 2019
2 parents 6608407 + 32f9888 commit f7d055e
Showing 1 changed file with 51 additions and 27 deletions.
78 changes: 51 additions & 27 deletions src/io/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ pub(crate) struct ClickhouseTransport {
done: bool,
// Buffered read data
rd: Vec<u8>,
// Whether the buffer is known to be incomplete
buf_is_incomplete: bool,
// Current buffer to write to the socket
wr: io::Cursor<Vec<u8>>,
// Queued commands
Expand Down Expand Up @@ -53,6 +55,7 @@ impl ClickhouseTransport {
inner,
done: false,
rd: vec![],
buf_is_incomplete: false,
wr: io::Cursor::new(vec![]),
cmds: VecDeque::new(),
timezone: None,
Expand Down Expand Up @@ -104,6 +107,38 @@ impl ClickhouseTransport {
}
}
}

fn try_parse_msg(&mut self) -> Poll<Option<Packet<()>>, Error> {
let pos;
let ret = {
let mut cursor = Cursor::new(&self.rd);
let res = {
let mut parser = Parser::new(&mut cursor, self.timezone, self.compress);
parser.parse_packet()
};
pos = cursor.position() as usize;

if let Ok(Packet::Hello(_, ref packet)) = res {
self.timezone = Some(packet.timezone);
}

match res {
Ok(val) => Ok(Async::Ready(Some(val))),
Err(e) => e.into(),
}
};

match ret {
Ok(Async::NotReady) => (),
_ => {
// Data is consumed
let tail = self.rd.split_off(pos);
mem::replace(&mut self.rd, tail);
}
}

ret
}
}

impl ClickhouseTransport {
Expand Down Expand Up @@ -135,7 +170,16 @@ impl Stream for ClickhouseTransport {

/// Read a message from the `Transport`
fn poll(&mut self) -> Poll<Option<Packet<()>>, Error> {
// First fill the buffer
// Check whether our currently buffered data is enough for a packet
// before reading any more data. This prevents the buffer from growing
// indefinitely when the sender is faster than we can consume the data
if !self.buf_is_incomplete && !self.rd.is_empty() {
if let ret @ Async::Ready(_) = self.try_parse_msg()? {
return Ok(ret);
}
}

// Fill the buffer!
while !self.done {
match self.inner.read_to_end(&mut self.rd) {
Ok(0) => {
Expand All @@ -153,35 +197,15 @@ impl Stream for ClickhouseTransport {
}
}

// Try to parse some data!
let pos;
let ret = {
let mut cursor = Cursor::new(&self.rd);
let res = {
let mut parser = Parser::new(&mut cursor, self.timezone, self.compress);
parser.parse_packet()
};
pos = cursor.position() as usize;

if let Ok(Packet::Hello(_, ref packet)) = res {
self.timezone = Some(packet.timezone);
}
// Try to parse the new data!
let ret = self.try_parse_msg();

match res {
Ok(val) => Ok(Async::Ready(Some(val))),
Err(e) => e.into(),
}
self.buf_is_incomplete = if let Ok(Async::NotReady) = ret {
true
} else {
false
};

match ret {
Ok(Async::NotReady) => {}
_ => {
// Data is consumed
let tail = self.rd.split_off(pos);
mem::replace(&mut self.rd, tail);
}
}

ret
}
}
Expand Down

0 comments on commit f7d055e

Please sign in to comment.