diff --git a/src/facade/redis_parser.cc b/src/facade/redis_parser.cc index e92cf2056e5a..e4a0fcdd784d 100644 --- a/src/facade/redis_parser.cc +++ b/src/facade/redis_parser.cc @@ -3,6 +3,7 @@ // #include "facade/redis_parser.h" +#include #include #include "base/logging.h" @@ -18,6 +19,9 @@ auto RedisParser::Parse(Buffer str, uint32_t* consumed, RespExpr::Vec* res) -> R *consumed = 0; res->clear(); + DVLOG(2) << "Parsing: " + << absl::CHexEscape(string_view{reinterpret_cast(str.data()), str.size()}); + if (state_ == CMD_COMPLETE_S) { if (InitStart(str[0], res)) { // We recognized a non-INLINE state, starting with a special char. @@ -76,6 +80,7 @@ auto RedisParser::Parse(Buffer str, uint32_t* consumed, RespExpr::Vec* res) -> R } if (resultc.first == INPUT_PENDING) { + DCHECK(str.empty()); StashState(res); } return resultc.first; @@ -83,6 +88,8 @@ auto RedisParser::Parse(Buffer str, uint32_t* consumed, RespExpr::Vec* res) -> R if (resultc.first == OK) { DCHECK(cached_expr_); + DCHECK_EQ(0, small_len_); + if (res != cached_expr_) { DCHECK(!stash_.empty()); @@ -233,15 +240,24 @@ auto RedisParser::ParseLen(Buffer str, int64_t* res) -> ResultConsumed { const char* s = reinterpret_cast(str.data()); const char* pos = reinterpret_cast(memchr(s, '\n', str.size())); if (!pos) { - Result r = INPUT_PENDING; - if (str.size() >= 32) { + if (str.size() + small_len_ < sizeof(small_buf_)) { + memcpy(small_buf_ + small_len_, str.data(), str.size()); + small_len_ += str.size(); + return {INPUT_PENDING, str.size()}; + } LOG(WARNING) << "Unexpected format " << string_view{s, str.size()}; - r = BAD_ARRAYLEN; + return ResultConsumed{BAD_ARRAYLEN, 0}; } - return {r, 0}; - } unsigned consumed = pos - s + 1; + if (small_len_ > 0) { + memcpy(small_buf_ + small_len_, str.data(), consumed); + small_len_ += consumed; + s = small_buf_; + pos = small_buf_ + small_len_ - 1; + small_len_ = 0; + } + if (pos[-1] != '\r') { return {BAD_ARRAYLEN, consumed}; } @@ -322,8 +338,10 @@ auto RedisParser::ParseArg(Buffer str) -> ResultConsumed { unsigned min_len = 2 + int(arg_c_ != '_'); - if (str.size() < min_len) { - return {INPUT_PENDING, 0}; + if (small_len_ + str.size() < min_len) { + memcpy(small_buf_ + small_len_, str.data(), str.size()); + small_len_ += str.size(); + return {INPUT_PENDING, str.size()}; } if (arg_c_ == '$') { @@ -354,14 +372,22 @@ auto RedisParser::ParseArg(Buffer str) -> ResultConsumed { DCHECK(!server_mode_); if (arg_c_ == '_') { // Resp3 NIL // '\r','\n' - if (str[0] != '\r' || str[1] != '\n') { + // '_','\r','\n' + DCHECK_GE(small_len_ + str.size(), 2u); + DCHECK_LT(small_len_, 2); + + unsigned consumed = 2 - small_len_; + for (unsigned i = 0; i < consumed; ++i) { + small_buf_[small_len_ + i] = str[i]; + } + if (small_buf_[0] != '\r' || small_buf_[1] != '\n') { return {BAD_STRING, 0}; } cached_expr_->emplace_back(RespExpr::NIL); cached_expr_->back().u = Buffer{}; HandleFinishArg(); - return {OK, 2}; + return {OK, consumed}; } if (arg_c_ == '*') { @@ -425,6 +451,26 @@ auto RedisParser::ConsumeBulk(Buffer str) -> ResultConsumed { uint32_t consumed = 0; + if (small_len_ > 0) { + DCHECK(!is_broken_token_); + DCHECK_EQ(bulk_len_, 0u); + + if (bulk_len_ == 0) { + DCHECK_EQ(small_len_, 1); + DCHECK_GE(str.size(), 1u); + if (small_buf_[0] != '\r' || str[0] != '\n') { + return {BAD_STRING, 0}; + } + consumed = bulk_len_ + 2; + small_len_ = 0; + HandleFinishArg(); + + return {OK, 1}; + } + } + + DCHECK_EQ(small_len_, 0); + if (str.size() >= bulk_len_) { consumed = bulk_len_; if (bulk_len_) { @@ -446,6 +492,10 @@ auto RedisParser::ConsumeBulk(Buffer str) -> ResultConsumed { } HandleFinishArg(); return {OK, consumed + 2}; + } else if (str.size() == 1) { + small_buf_[0] = str[0]; + consumed++; + small_len_ = 1; } return {INPUT_PENDING, consumed}; } @@ -490,6 +540,7 @@ void RedisParser::HandleFinishArg() { } cached_expr_ = parse_stack_.back().second; } + small_len_ = 0; } void RedisParser::ExtendLastString(Buffer str) { diff --git a/src/facade/redis_parser.h b/src/facade/redis_parser.h index 60a8594d9385..560beec95916 100644 --- a/src/facade/redis_parser.h +++ b/src/facade/redis_parser.h @@ -45,8 +45,6 @@ class RedisParser { * part of str because parser caches the intermediate state internally according to 'consumed' * result. * - * Note: A parser does not always guarantee progress, i.e. if a small buffer was passed it may - * returns INPUT_PENDING with consumed == 0. * */ @@ -99,7 +97,9 @@ class RedisParser { State state_ = CMD_COMPLETE_S; bool is_broken_token_ = false; // true, if a token (inline or bulk) is broken during the parsing. bool server_mode_ = true; + uint8_t small_len_ = 0; char arg_c_ = 0; + uint32_t bulk_len_ = 0; uint32_t last_stashed_level_ = 0, last_stashed_index_ = 0; uint32_t max_arr_len_; @@ -114,6 +114,7 @@ class RedisParser { using Blob = std::vector; std::vector buf_stash_; + char small_buf_[32]; }; } // namespace facade diff --git a/src/facade/redis_parser_test.cc b/src/facade/redis_parser_test.cc index 9555c6aaad1b..851603084278 100644 --- a/src/facade/redis_parser_test.cc +++ b/src/facade/redis_parser_test.cc @@ -146,6 +146,15 @@ TEST_F(RedisParserTest, ClientMode) { ASSERT_EQ(RedisParser::OK, Parse("-ERR foo bar\r\n")); EXPECT_THAT(args_, ElementsAre(ErrArg("ERR foo"))); + + ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("_")); + EXPECT_EQ(1, consumed_); + ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("\r")); + EXPECT_EQ(1, consumed_); + ASSERT_EQ(RedisParser::OK, Parse("\n")); + EXPECT_EQ(1, consumed_); + EXPECT_THAT(args_, ElementsAre(ArgType(RespExpr::NIL))); + ASSERT_EQ(RedisParser::OK, Parse("*2\r\n_\r\n_\r\n")); } TEST_F(RedisParserTest, Hierarchy) { @@ -183,13 +192,13 @@ TEST_F(RedisParserTest, LargeBulk) { ASSERT_EQ(RedisParser::INPUT_PENDING, Parse(half)); ASSERT_EQ(512, consumed_); ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("\r")); - ASSERT_EQ(0, consumed_); - ASSERT_EQ(RedisParser::OK, Parse("\r\n")); - ASSERT_EQ(2, consumed_); + ASSERT_EQ(1, consumed_); + ASSERT_EQ(RedisParser::OK, Parse("\n")); + EXPECT_EQ(1, consumed_); string part1 = absl::StrCat(prefix, half); - ASSERT_EQ(RedisParser::INPUT_PENDING, Parse(part1)); - ASSERT_EQ(RedisParser::INPUT_PENDING, Parse(half)); + EXPECT_EQ(RedisParser::INPUT_PENDING, Parse(part1)); + EXPECT_EQ(RedisParser::INPUT_PENDING, Parse(half)); ASSERT_EQ(RedisParser::OK, Parse("\r\n")); prefix = "*1\r\n$270000000\r\n"; @@ -245,15 +254,15 @@ TEST_F(RedisParserTest, UsedMemory) { TEST_F(RedisParserTest, Eol) { ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("*1\r")); - EXPECT_EQ(1, consumed_); - ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("1\r\n$5\r\n")); - EXPECT_EQ(7, consumed_); + EXPECT_EQ(3, consumed_); + ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("\n$5\r\n")); + EXPECT_EQ(5, consumed_); } TEST_F(RedisParserTest, BulkSplit) { - ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("*1\r\n$4\r\nSADD")); - ASSERT_EQ(12, consumed_); - ASSERT_EQ(RedisParser::OK, Parse("\r\n")); + ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("*1\r\n$4\r\nSADD\r")); + ASSERT_EQ(13, consumed_); + ASSERT_EQ(RedisParser::OK, Parse("\n")); } TEST_F(RedisParserTest, InlineSplit) { @@ -261,8 +270,6 @@ TEST_F(RedisParserTest, InlineSplit) { EXPECT_EQ(1, consumed_); ASSERT_EQ(RedisParser::OK, Parse("\nPING\n\n")); EXPECT_EQ(6, consumed_); - ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("\n")); - EXPECT_EQ(1, consumed_); ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("P")); ASSERT_EQ(RedisParser::OK, Parse("ING\n")); }