Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Let the lexer operate on u8 instead of char to improve read performance. #23

Merged
merged 3 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/logs/hive-24h_large.log
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
- UUID:0xddba9b95eeb3cfb9ccb3d8401d1610d42f0e3aad
- HashIndex:0xad56993d052a6b692268e8aa013dd02e37e082bf
2015-03-23 08:09:27,194 INFO [main] org.apache.hadoop.hive.ql.log.PerfLogger: <PERFLOG method=deserializePlan from=org.apache.hadoop.hive.ql.exec.Utilities>
2015-03-23 08:09:27,194 INFO [main] org.apache.hadoop.hive.ql.log.PerfLogger: 哥们儿来点中文试试
2015-03-23 08:09:27,194 INFO [main] org.apache.hadoop.hive.ql.exec.Utilities: Deserializing MapWork via kryo
2015-03-23 08:09:28,203 INFO [main] org.apache.hadoop.hive.ql.log.PerfLogger: </PERFLOG method=deserializePlan start=1427098167194 end=1427098168203 duration=1009 from=org.apache.hadoop.hive.ql.exec.Utilities>
2015-03-23 08:09:28,337 INFO [main] org.apache.hadoop.io.compress.zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
Expand Down
2 changes: 1 addition & 1 deletion src/dfa/dfa.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,10 +227,10 @@ impl DFA {

impl DFA {
pub fn get_next_state(&self, state: State, c: u8) -> Option<State> {
let transitions = &self.transitions[state.0];
if 128 <= c {
return None;
}
let transitions = &self.transitions[state.0];
match &transitions[c as usize] {
Some(transition) => Some(transition.to_state.clone()),
None => None,
Expand Down
39 changes: 21 additions & 18 deletions src/lexer/lexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ pub struct Lexer {
dfa_state: State,

input_stream: Option<Box<dyn LexerStream>>,
buf: Vec<char>,
buf: Vec<u8>,
buf_cursor_pos: usize,
token_queue: VecDeque<Token>,

last_delimiter: Option<char>,
last_delimiter: Option<u8>,
last_tokenized_pos: usize,
match_start_pos: usize,
match_end_pos: usize,
Expand All @@ -48,7 +48,7 @@ pub enum TokenType {
}

pub struct Token {
val: String,
buf: Vec<u8>,
token_type: TokenType,
line_num: usize,
}
Expand All @@ -60,14 +60,18 @@ impl Debug for Token {
"[{:?}|{}]: \"{}\"",
self.token_type,
self.line_num,
self.val.escape_default()
self.get_buf_as_string().escape_default()
)
}
}

impl Token {
pub fn get_val(&self) -> &str {
self.val.as_str()
pub fn get_buf(&self) -> &[u8] {
self.buf.as_slice()
}

pub fn get_buf_as_string(&self) -> String {
String::from_utf8_lossy(&self.buf).to_string()
}

pub fn get_token_type(&self) -> TokenType {
Expand Down Expand Up @@ -170,7 +174,7 @@ impl Lexer {
let delimiter = self.last_delimiter.unwrap();
self.last_delimiter = None;
match delimiter {
'\n' => {
b'\n' => {
self.generate_token(
self.buf_cursor_pos,
TokenType::StaticTextWithEndLine,
Expand Down Expand Up @@ -246,7 +250,7 @@ impl Lexer {
LexerState::EndOfStream => {
if self.buf_cursor_pos > self.last_tokenized_pos {
let token_type = if self.last_delimiter.is_some()
&& self.last_delimiter.unwrap() == '\n'
&& self.last_delimiter.unwrap() == b'\n'
{
// TODO: This seems not possible..
TokenType::StaticTextWithEndLine
Expand Down Expand Up @@ -314,7 +318,7 @@ impl Lexer {
}
}

fn get_next_char_from_buffer(&mut self) -> Result<Option<char>> {
fn get_next_char_from_buffer(&mut self) -> Result<Option<u8>> {
let pos = self.buf_cursor_pos;
if pos == self.buf.len() {
match self
Expand All @@ -332,20 +336,16 @@ impl Lexer {
Ok(Some(self.buf[pos]))
}

fn capture_delimiter(&mut self, c: char) -> bool {
fn capture_delimiter(&mut self, c: u8) -> bool {
if self.schema_config.has_delimiter(c) {
self.last_delimiter = Some(c);
return true;
}
false
}

fn simulate_var_dfa_and_set_lexer_state(&mut self, c: char, delimiter_dst_state: LexerState) {
if false == c.is_ascii() {
self.state = LexerState::SeekingToTheNextDelimiter;
return;
}
match self.var_dfa.get_next_state(self.dfa_state.clone(), c as u8) {
fn simulate_var_dfa_and_set_lexer_state(&mut self, c: u8, delimiter_dst_state: LexerState) {
match self.var_dfa.get_next_state(self.dfa_state.clone(), c) {
Some(next_dfa_state) => {
self.dfa_state = next_dfa_state;
match self.var_dfa.is_accept_state(self.dfa_state.clone()) {
Expand Down Expand Up @@ -374,7 +374,10 @@ impl Lexer {
return Err(LexerInternalErr("Tokenization end position corrupted"));
}
self.token_queue.push_back(Token {
val: self.buf[self.last_tokenized_pos..end_pos].iter().collect(),
buf: self.buf[self.last_tokenized_pos..end_pos]
.iter()
.map(|c| c.clone())
.collect(),
line_num: self.line_num,
token_type,
});
Expand Down Expand Up @@ -406,7 +409,7 @@ impl Lexer {
dst_idx += 1;
src_idx += 1;
}
self.buf.resize(dst_idx, 0 as char);
self.buf.resize(dst_idx, 0);
self.buf_cursor_pos -= self.last_tokenized_pos;
self.last_tokenized_pos = 0;
// No need to reset match_start/end
Expand Down
2 changes: 1 addition & 1 deletion src/lexer/lexer_stream.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::error_handling::Result;

pub trait LexerStream {
fn get_next_char(&mut self) -> Result<Option<char>>;
fn get_next_char(&mut self) -> Result<Option<u8>>;
}
38 changes: 18 additions & 20 deletions src/lexer/streams.rs
Original file line number Diff line number Diff line change
@@ -1,49 +1,47 @@
use super::lexer_stream::LexerStream;
use crate::error_handling::Error::IOError;
use crate::error_handling::Result;
use std::io::BufRead;
use std::io::{self, BufReader, Read};

const BUF_SIZE: usize = 4096 * 8;

pub struct BufferedFileStream {
line_it: std::io::Lines<std::io::BufReader<std::fs::File>>,
line: Option<Vec<char>>,
buf_reader: BufReader<std::fs::File>,
pos: usize,
end: usize,
buffer: [u8; BUF_SIZE],
}

impl BufferedFileStream {
pub fn new(path: &str) -> Result<Self> {
match std::fs::File::open(path) {
Ok(file) => Ok(Self {
line_it: std::io::BufReader::new(file).lines(),
line: None,
buf_reader: BufReader::new(file),
pos: 0,
end: 0,
buffer: [0; BUF_SIZE],
}),
Err(e) => Err(IOError(e)),
}
}
}

impl LexerStream for BufferedFileStream {
fn get_next_char(&mut self) -> Result<Option<char>> {
if self.line.is_none() {
let next_line = self.line_it.next();
if next_line.is_none() {
return Ok(None);
}
match next_line.unwrap() {
Ok(line) => {
self.line = Some(line.chars().collect());
self.line.as_mut().unwrap().push('\n');
fn get_next_char(&mut self) -> Result<Option<u8>> {
if self.pos == self.end {
match self.buf_reader.read(&mut self.buffer) {
Ok(byte_read) => {
if 0 == byte_read {
return Ok(None);
}
self.end = byte_read;
self.pos = 0;
}
Err(e) => return Err(IOError(e)),
}
}

let c = self.line.as_ref().unwrap()[self.pos];
let c = self.buffer[self.pos];
self.pos += 1;
if self.pos == self.line.as_ref().unwrap().len() {
self.line = None;
}
Ok(Some(c))
}
}
8 changes: 4 additions & 4 deletions src/log_parser/log_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ impl LogEvent {
pub fn to_string(&self) -> String {
let mut result = String::new();
for token in &self.tokens {
result += &token.get_val();
result += &token.get_buf_as_string();
}
result
}
Expand All @@ -151,7 +151,7 @@ impl Debug for LogEvent {
"\t[Var({:?})|{}]: \"{}\"\n",
self.schema_config.get_var_schemas()[var_id].name,
token.get_line_num(),
token.get_val().escape_default()
token.get_buf_as_string().escape_default()
)
.as_str()
}
Expand All @@ -160,7 +160,7 @@ impl Debug for LogEvent {
"\t[Timestamp({})|{}]: \"{}\"\n",
ts_id,
token.get_line_num(),
token.get_val().escape_default()
token.get_buf_as_string().escape_default()
)
.as_str()
}
Expand All @@ -169,7 +169,7 @@ impl Debug for LogEvent {
"\t[{:?}|{}]: \"{}\"\n",
token.get_token_type(),
token.get_line_num(),
token.get_val().escape_default()
token.get_buf_as_string().escape_default()
)
.as_str()
}
Expand Down
6 changes: 3 additions & 3 deletions src/parser/schema_parser/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ impl SchemaConfig {
&self.var_schemas
}

pub fn has_delimiter(&self, delimiter: char) -> bool {
if false == delimiter.is_ascii() {
pub fn has_delimiter(&self, delimiter: u8) -> bool {
if 128 <= delimiter {
return false;
}
self.delimiters[delimiter as usize]
Expand Down Expand Up @@ -191,7 +191,7 @@ mod tests {

let delimiters: Vec<char> = vec![' ', '\t', '\n', '\r', ':', ',', '!', ';', '%'];
for delimiter in delimiters {
assert!(parsed_schema.has_delimiter(delimiter));
assert!(parsed_schema.has_delimiter(delimiter as u8));
}

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion tests/lexer_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ fn test_lexer_simple() -> Result<()> {
parsed_line.clear();
curr_line_num += 1;
}
parsed_line += &token.get_val().to_string();
parsed_line += &token.get_buf_as_string().to_string();
println!("{:?}", token);
}
parsed_lines.push(parsed_line.clone());
Expand Down
Loading