Skip to content

Commit

Permalink
perf: Let the lexer operate on u8 instead of char to improve read…
Browse files Browse the repository at this point in the history
… performance. (#23)
  • Loading branch information
LinZhihao-723 authored Dec 16, 2024
1 parent dcc68f3 commit 9d058af
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 48 deletions.
1 change: 1 addition & 0 deletions examples/logs/hive-24h_large.log
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
- UUID:0xddba9b95eeb3cfb9ccb3d8401d1610d42f0e3aad
- HashIndex:0xad56993d052a6b692268e8aa013dd02e37e082bf
2015-03-23 08:09:27,194 INFO [main] org.apache.hadoop.hive.ql.log.PerfLogger: <PERFLOG method=deserializePlan from=org.apache.hadoop.hive.ql.exec.Utilities>
2015-03-23 08:09:27,194 INFO [main] org.apache.hadoop.hive.ql.log.PerfLogger: 哥们儿来点中文试试
2015-03-23 08:09:27,194 INFO [main] org.apache.hadoop.hive.ql.exec.Utilities: Deserializing MapWork via kryo
2015-03-23 08:09:28,203 INFO [main] org.apache.hadoop.hive.ql.log.PerfLogger: </PERFLOG method=deserializePlan start=1427098167194 end=1427098168203 duration=1009 from=org.apache.hadoop.hive.ql.exec.Utilities>
2015-03-23 08:09:28,337 INFO [main] org.apache.hadoop.io.compress.zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
Expand Down
2 changes: 1 addition & 1 deletion src/dfa/dfa.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,10 +227,10 @@ impl DFA {

impl DFA {
pub fn get_next_state(&self, state: State, c: u8) -> Option<State> {
let transitions = &self.transitions[state.0];
if 128 <= c {
return None;
}
let transitions = &self.transitions[state.0];
match &transitions[c as usize] {
Some(transition) => Some(transition.to_state.clone()),
None => None,
Expand Down
39 changes: 21 additions & 18 deletions src/lexer/lexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ pub struct Lexer {
dfa_state: State,

input_stream: Option<Box<dyn LexerStream>>,
buf: Vec<char>,
buf: Vec<u8>,
buf_cursor_pos: usize,
token_queue: VecDeque<Token>,

last_delimiter: Option<char>,
last_delimiter: Option<u8>,
last_tokenized_pos: usize,
match_start_pos: usize,
match_end_pos: usize,
Expand All @@ -48,7 +48,7 @@ pub enum TokenType {
}

pub struct Token {
val: String,
buf: Vec<u8>,
token_type: TokenType,
line_num: usize,
}
Expand All @@ -60,14 +60,18 @@ impl Debug for Token {
"[{:?}|{}]: \"{}\"",
self.token_type,
self.line_num,
self.val.escape_default()
self.get_buf_as_string().escape_default()
)
}
}

impl Token {
pub fn get_val(&self) -> &str {
self.val.as_str()
pub fn get_buf(&self) -> &[u8] {
self.buf.as_slice()
}

pub fn get_buf_as_string(&self) -> String {
String::from_utf8_lossy(&self.buf).to_string()
}

pub fn get_token_type(&self) -> TokenType {
Expand Down Expand Up @@ -170,7 +174,7 @@ impl Lexer {
let delimiter = self.last_delimiter.unwrap();
self.last_delimiter = None;
match delimiter {
'\n' => {
b'\n' => {
self.generate_token(
self.buf_cursor_pos,
TokenType::StaticTextWithEndLine,
Expand Down Expand Up @@ -246,7 +250,7 @@ impl Lexer {
LexerState::EndOfStream => {
if self.buf_cursor_pos > self.last_tokenized_pos {
let token_type = if self.last_delimiter.is_some()
&& self.last_delimiter.unwrap() == '\n'
&& self.last_delimiter.unwrap() == b'\n'
{
// TODO: This seems not possible..
TokenType::StaticTextWithEndLine
Expand Down Expand Up @@ -314,7 +318,7 @@ impl Lexer {
}
}

fn get_next_char_from_buffer(&mut self) -> Result<Option<char>> {
fn get_next_char_from_buffer(&mut self) -> Result<Option<u8>> {
let pos = self.buf_cursor_pos;
if pos == self.buf.len() {
match self
Expand All @@ -332,20 +336,16 @@ impl Lexer {
Ok(Some(self.buf[pos]))
}

fn capture_delimiter(&mut self, c: char) -> bool {
fn capture_delimiter(&mut self, c: u8) -> bool {
if self.schema_config.has_delimiter(c) {
self.last_delimiter = Some(c);
return true;
}
false
}

fn simulate_var_dfa_and_set_lexer_state(&mut self, c: char, delimiter_dst_state: LexerState) {
if false == c.is_ascii() {
self.state = LexerState::SeekingToTheNextDelimiter;
return;
}
match self.var_dfa.get_next_state(self.dfa_state.clone(), c as u8) {
fn simulate_var_dfa_and_set_lexer_state(&mut self, c: u8, delimiter_dst_state: LexerState) {
match self.var_dfa.get_next_state(self.dfa_state.clone(), c) {
Some(next_dfa_state) => {
self.dfa_state = next_dfa_state;
match self.var_dfa.is_accept_state(self.dfa_state.clone()) {
Expand Down Expand Up @@ -374,7 +374,10 @@ impl Lexer {
return Err(LexerInternalErr("Tokenization end position corrupted"));
}
self.token_queue.push_back(Token {
val: self.buf[self.last_tokenized_pos..end_pos].iter().collect(),
buf: self.buf[self.last_tokenized_pos..end_pos]
.iter()
.map(|c| c.clone())
.collect(),
line_num: self.line_num,
token_type,
});
Expand Down Expand Up @@ -406,7 +409,7 @@ impl Lexer {
dst_idx += 1;
src_idx += 1;
}
self.buf.resize(dst_idx, 0 as char);
self.buf.resize(dst_idx, 0);
self.buf_cursor_pos -= self.last_tokenized_pos;
self.last_tokenized_pos = 0;
// No need to reset match_start/end
Expand Down
2 changes: 1 addition & 1 deletion src/lexer/lexer_stream.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::error_handling::Result;

pub trait LexerStream {
fn get_next_char(&mut self) -> Result<Option<char>>;
fn get_next_char(&mut self) -> Result<Option<u8>>;
}
38 changes: 18 additions & 20 deletions src/lexer/streams.rs
Original file line number Diff line number Diff line change
@@ -1,49 +1,47 @@
use super::lexer_stream::LexerStream;
use crate::error_handling::Error::IOError;
use crate::error_handling::Result;
use std::io::BufRead;
use std::io::{self, BufReader, Read};

const BUF_SIZE: usize = 4096 * 8;

pub struct BufferedFileStream {
line_it: std::io::Lines<std::io::BufReader<std::fs::File>>,
line: Option<Vec<char>>,
buf_reader: BufReader<std::fs::File>,
pos: usize,
end: usize,
buffer: [u8; BUF_SIZE],
}

impl BufferedFileStream {
pub fn new(path: &str) -> Result<Self> {
match std::fs::File::open(path) {
Ok(file) => Ok(Self {
line_it: std::io::BufReader::new(file).lines(),
line: None,
buf_reader: BufReader::new(file),
pos: 0,
end: 0,
buffer: [0; BUF_SIZE],
}),
Err(e) => Err(IOError(e)),
}
}
}

impl LexerStream for BufferedFileStream {
fn get_next_char(&mut self) -> Result<Option<char>> {
if self.line.is_none() {
let next_line = self.line_it.next();
if next_line.is_none() {
return Ok(None);
}
match next_line.unwrap() {
Ok(line) => {
self.line = Some(line.chars().collect());
self.line.as_mut().unwrap().push('\n');
fn get_next_char(&mut self) -> Result<Option<u8>> {
if self.pos == self.end {
match self.buf_reader.read(&mut self.buffer) {
Ok(byte_read) => {
if 0 == byte_read {
return Ok(None);
}
self.end = byte_read;
self.pos = 0;
}
Err(e) => return Err(IOError(e)),
}
}

let c = self.line.as_ref().unwrap()[self.pos];
let c = self.buffer[self.pos];
self.pos += 1;
if self.pos == self.line.as_ref().unwrap().len() {
self.line = None;
}
Ok(Some(c))
}
}
8 changes: 4 additions & 4 deletions src/log_parser/log_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ impl LogEvent {
pub fn to_string(&self) -> String {
let mut result = String::new();
for token in &self.tokens {
result += &token.get_val();
result += &token.get_buf_as_string();
}
result
}
Expand All @@ -151,7 +151,7 @@ impl Debug for LogEvent {
"\t[Var({:?})|{}]: \"{}\"\n",
self.schema_config.get_var_schemas()[var_id].name,
token.get_line_num(),
token.get_val().escape_default()
token.get_buf_as_string().escape_default()
)
.as_str()
}
Expand All @@ -160,7 +160,7 @@ impl Debug for LogEvent {
"\t[Timestamp({})|{}]: \"{}\"\n",
ts_id,
token.get_line_num(),
token.get_val().escape_default()
token.get_buf_as_string().escape_default()
)
.as_str()
}
Expand All @@ -169,7 +169,7 @@ impl Debug for LogEvent {
"\t[{:?}|{}]: \"{}\"\n",
token.get_token_type(),
token.get_line_num(),
token.get_val().escape_default()
token.get_buf_as_string().escape_default()
)
.as_str()
}
Expand Down
6 changes: 3 additions & 3 deletions src/parser/schema_parser/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ impl SchemaConfig {
&self.var_schemas
}

pub fn has_delimiter(&self, delimiter: char) -> bool {
if false == delimiter.is_ascii() {
pub fn has_delimiter(&self, delimiter: u8) -> bool {
if 128 <= delimiter {
return false;
}
self.delimiters[delimiter as usize]
Expand Down Expand Up @@ -207,7 +207,7 @@ mod tests {

let delimiters: Vec<char> = vec![' ', '\t', '\n', '\r', ':', ',', '!', ';', '%'];
for delimiter in delimiters {
assert!(parsed_schema.has_delimiter(delimiter));
assert!(parsed_schema.has_delimiter(delimiter as u8));
}

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion tests/lexer_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ fn test_lexer_simple() -> Result<()> {
parsed_line.clear();
curr_line_num += 1;
}
parsed_line += &token.get_val().to_string();
parsed_line += &token.get_buf_as_string().to_string();
println!("{:?}", token);
}
parsed_lines.push(parsed_line.clone());
Expand Down

0 comments on commit 9d058af

Please sign in to comment.