From a27cce81b1d546d07d9a17da6d8b967609197cd6 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Tue, 24 Dec 2024 15:33:24 +0200 Subject: [PATCH] feat: add support for meta memcache commands (#4362) This is a stripped down version of supporting the memcache meta requests. a. Not all meta flags are supported, but TTL, flags, arithmetics are supported. b. does not include reply support. c. does not include new semantics that are not part of the older, ascii protocol. The parser interface has not changed significantly, and the meta commands are emulated using the old, high level commands like ADD,REPLACE, INCR etc. See https://raw.githubusercontent.com/memcached/memcached/refs/heads/master/doc/protocol.txt for more details regarding the meta commands spec. Signed-off-by: Roman Gershman --- src/facade/dragonfly_connection.cc | 6 + src/facade/memcache_parser.cc | 212 +++++++++++++++++++++++++--- src/facade/memcache_parser.h | 24 +++- src/facade/memcache_parser_test.cc | 48 +++++++ tests/dragonfly/memcache_meta.py | 21 +++ tests/dragonfly/pymemcached_test.py | 1 - tests/dragonfly/requirements.txt | 1 + 7 files changed, 291 insertions(+), 22 deletions(-) create mode 100644 tests/dragonfly/memcache_meta.py diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index 39ef714e50fd..7102d49e1f01 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -1137,8 +1137,14 @@ auto Connection::ParseMemcache() -> ParserStatus { do { string_view str = ToSV(io_buf_.InputBuffer()); + + if (str.empty()) { + return OK; + } + result = memcache_parser_->Parse(str, &consumed, &cmd); + DVLOG(2) << "mc_result " << result << " consumed: " << consumed << " type " << cmd.type; if (result != MemcacheParser::OK) { io_buf_.ConsumeInput(consumed); break; diff --git a/src/facade/memcache_parser.cc b/src/facade/memcache_parser.cc index 763599ee5ca8..4720b0f67c31 100644 --- a/src/facade/memcache_parser.cc +++ b/src/facade/memcache_parser.cc @@ -6,12 +6,14 @@ #include #include #include +#include #include #include #include #include "base/logging.h" #include "base/stl_util.h" +#include "facade/facade_types.h" namespace facade { using namespace std; @@ -29,16 +31,37 @@ MP::CmdType From(string_view token) { {"quit", MP::QUIT}, {"version", MP::VERSION}, }; - auto it = cmd_map.find(token); - if (it == cmd_map.end()) + if (token.size() == 2) { + // META_COMMANDS + if (token[0] != 'm') + return MP::INVALID; + switch (token[1]) { + case 's': + return MP::META_SET; + case 'g': + return MP::META_GET; + case 'd': + return MP::META_DEL; + case 'a': + return MP::META_ARITHM; + case 'n': + return MP::META_NOOP; + case 'e': + return MP::META_DEBUG; + } return MP::INVALID; + } - return it->second; + if (token.size() > 2) { + auto it = cmd_map.find(token); + if (it == cmd_map.end()) + return MP::INVALID; + return it->second; + } + return MP::INVALID; } -using TokensView = absl::Span; - -MP::Result ParseStore(TokensView tokens, MP::Command* res) { +MP::Result ParseStore(ArgSlice tokens, MP::Command* res) { const size_t num_tokens = tokens.size(); unsigned opt_pos = 3; if (res->type == MP::CAS) { @@ -70,7 +93,7 @@ MP::Result ParseStore(TokensView tokens, MP::Command* res) { return MP::OK; } -MP::Result ParseValueless(TokensView tokens, MP::Command* res) { +MP::Result ParseValueless(ArgSlice tokens, MP::Command* res) { const size_t num_tokens = tokens.size(); size_t key_pos = 0; if (res->type == MP::GAT || res->type == MP::GATS) { @@ -116,6 +139,150 @@ MP::Result ParseValueless(TokensView tokens, MP::Command* res) { return MP::OK; } +bool ParseMetaMode(char m, MP::Command* res) { + if (res->type == MP::SET) { + switch (m) { + case 'E': + res->type = MP::ADD; + break; + case 'A': + res->type = MP::APPEND; + break; + case 'R': + res->type = MP::REPLACE; + break; + case 'P': + res->type = MP::PREPEND; + break; + case 'S': + break; + default: + return false; + } + return true; + } + + if (res->type == MP::INCR) { + switch (m) { + case 'I': + case '+': + break; + case 'D': + case '-': + res->type = MP::DECR; + break; + default: + return false; + } + return true; + } + return false; +} + +// See https://raw.githubusercontent.com/memcached/memcached/refs/heads/master/doc/protocol.txt +MP::Result ParseMeta(ArgSlice tokens, MP::Command* res) { + DCHECK(!tokens.empty()); + + if (res->type == MP::META_DEBUG) { + LOG(ERROR) << "meta debug not yet implemented"; + return MP::PARSE_ERROR; + } + + if (tokens[0].size() > 250) + return MP::PARSE_ERROR; + + res->meta = true; + res->key = tokens[0]; + res->bytes_len = 0; + res->flags = 0; + res->expire_ts = 0; + + tokens.remove_prefix(1); + + // We emulate the behavior by returning the high level commands. + // TODO: we should reverse the interface in the future, so that a high level command + // will be represented in MemcacheParser::Command by a meta command with flags. + // high level commands should not be part of the interface in the future. + switch (res->type) { + case MP::META_GET: + res->type = MP::GET; + break; + case MP::META_DEL: + res->type = MP::DELETE; + break; + case MP::META_SET: + if (tokens.empty()) { + return MP::PARSE_ERROR; + } + if (!absl::SimpleAtoi(tokens[0], &res->bytes_len)) + return MP::BAD_INT; + + res->type = MP::SET; + tokens.remove_prefix(1); + break; + case MP::META_ARITHM: + res->type = MP::INCR; + res->delta = 1; + break; + default: + return MP::PARSE_ERROR; + } + + for (size_t i = 0; i < tokens.size(); ++i) { + string_view token = tokens[i]; + + switch (token[0]) { + case 'T': + if (!absl::SimpleAtoi(token.substr(1), &res->expire_ts)) + return MP::BAD_INT; + break; + case 'b': + if (token.size() != 1) + return MP::PARSE_ERROR; + if (!absl::Base64Unescape(res->key, &res->blob)) + return MP::PARSE_ERROR; + res->key = res->blob; + res->base64 = true; + break; + case 'F': + if (!absl::SimpleAtoi(token.substr(1), &res->flags)) + return MP::BAD_INT; + break; + case 'M': + if (token.size() != 2 || !ParseMetaMode(token[1], res)) + return MP::PARSE_ERROR; + break; + case 'D': + if (!absl::SimpleAtoi(token.substr(1), &res->delta)) + return MP::BAD_INT; + break; + case 'q': + res->no_reply = true; + break; + case 'f': + res->return_flags = true; + break; + case 'v': + res->return_value = true; + break; + case 't': + res->return_ttl = true; + break; + case 'l': + res->return_access_time = true; + break; + case 'h': + res->return_hit = true; + break; + default: + LOG(WARNING) << "unknown meta flag: " << token; // not yet implemented + return MP::PARSE_ERROR; + } + } + + return MP::OK; +} + } // namespace auto MP::Parse(string_view str, uint32_t* consumed, Command* cmd) -> Result { @@ -123,6 +290,8 @@ auto MP::Parse(string_view str, uint32_t* consumed, Command* cmd) -> Result { auto pos = str.find("\r\n"); *consumed = 0; if (pos == string_view::npos) { + // We need more data to parse the command. For get/gets commands this line can be very long. + // we limit maxmimum buffer capacity in the higher levels using max_client_iobuf_len. return INPUT_PENDING; } @@ -131,16 +300,15 @@ auto MP::Parse(string_view str, uint32_t* consumed, Command* cmd) -> Result { } *consumed = pos + 2; - std::string_view tokens_expression = str.substr(0, pos); + string_view tokens_expression = str.substr(0, pos); // cas [noreply]\r\n // get *\r\n - absl::InlinedVector tokens = + // ms *\r\n + absl::InlinedVector tokens = absl::StrSplit(tokens_expression, ' ', absl::SkipWhitespace()); - const size_t num_tokens = tokens.size(); - - if (num_tokens == 0) + if (tokens.empty()) return PARSE_ERROR; cmd->type = From(tokens[0]); @@ -148,25 +316,31 @@ auto MP::Parse(string_view str, uint32_t* consumed, Command* cmd) -> Result { return UNKNOWN_CMD; } - if (cmd->type <= CAS) { // Store command - if (num_tokens < 5 || tokens[1].size() > 250) { // key length limit + ArgSlice tokens_view{tokens}; + tokens_view.remove_prefix(1); + + if (cmd->type <= CAS) { // Store command + if (tokens_view.size() < 4 || tokens[0].size() > 250) { // key length limit return MP::PARSE_ERROR; } - cmd->key = string_view{tokens[1].data(), tokens[1].size()}; + cmd->key = tokens_view[0]; - TokensView tokens_view{tokens.begin() + 2, num_tokens - 2}; + tokens_view.remove_prefix(1); return ParseStore(tokens_view, cmd); } - if (num_tokens == 1) { - if (base::_in(cmd->type, {MP::STATS, MP::FLUSHALL, MP::QUIT, MP::VERSION})) { + if (cmd->type >= META_SET) { + return tokens_view.empty() ? MP::PARSE_ERROR : ParseMeta(tokens_view, cmd); + } + + if (tokens_view.empty()) { + if (base::_in(cmd->type, {MP::STATS, MP::FLUSHALL, MP::QUIT, MP::VERSION, MP::META_NOOP})) { return MP::OK; } return MP::PARSE_ERROR; } - TokensView tokens_view{tokens.begin() + 1, num_tokens - 1}; return ParseValueless(tokens_view, cmd); }; diff --git a/src/facade/memcache_parser.h b/src/facade/memcache_parser.h index 1906c3dedfed..65db0f74786c 100644 --- a/src/facade/memcache_parser.h +++ b/src/facade/memcache_parser.h @@ -5,6 +5,7 @@ #pragma once #include +#include #include #include @@ -39,6 +40,14 @@ class MemcacheParser { INCR = 32, DECR = 33, FLUSHALL = 34, + + // META_COMMANDS + META_NOOP = 50, + META_SET = 51, + META_DEL = 52, + META_ARITHM = 53, + META_GET = 54, + META_DEBUG = 55, }; // According to https://github.com/memcached/memcached/wiki/Commands#standard-protocol @@ -56,7 +65,18 @@ class MemcacheParser { 0; // relative (expire_ts <= month) or unix time (expire_ts > month) in seconds uint32_t bytes_len = 0; uint32_t flags = 0; - bool no_reply = false; + bool no_reply = false; // q + bool meta = false; + + // meta flags + bool base64 = false; // b + bool return_flags = false; // f + bool return_value = false; // v + bool return_ttl = false; // t + bool return_access_time = false; // l + bool return_hit = false; // h + // Used internally by meta parsing. + std::string blob; }; enum Result { @@ -64,7 +84,7 @@ class MemcacheParser { INPUT_PENDING, UNKNOWN_CMD, BAD_INT, - PARSE_ERROR, + PARSE_ERROR, // request parse error, but can continue parsing within the same connection. BAD_DELTA, }; diff --git a/src/facade/memcache_parser_test.cc b/src/facade/memcache_parser_test.cc index 9e20d40498e3..a3c14558740c 100644 --- a/src/facade/memcache_parser_test.cc +++ b/src/facade/memcache_parser_test.cc @@ -101,6 +101,54 @@ TEST_F(MCParserTest, NoreplyBasic) { EXPECT_FALSE(cmd_.no_reply); } +TEST_F(MCParserTest, Meta) { + MemcacheParser::Result st = parser_.Parse("ms key1 ", &consumed_, &cmd_); + EXPECT_EQ(MemcacheParser::INPUT_PENDING, st); + EXPECT_EQ(0, consumed_); + st = parser_.Parse("ms key1 6 T1 F2\r\n", &consumed_, &cmd_); + EXPECT_EQ(MemcacheParser::OK, st); + EXPECT_EQ(17, consumed_); + EXPECT_EQ(MemcacheParser::SET, cmd_.type); + EXPECT_EQ("key1", cmd_.key); + EXPECT_EQ(2, cmd_.flags); + EXPECT_EQ(1, cmd_.expire_ts); + + st = parser_.Parse("ms 16nXnNeV150= 5 b ME\r\n", &consumed_, &cmd_); + EXPECT_EQ(MemcacheParser::OK, st); + EXPECT_EQ(24, consumed_); + EXPECT_EQ(MemcacheParser::ADD, cmd_.type); + EXPECT_EQ("שלום", cmd_.key); + EXPECT_EQ(5, cmd_.bytes_len); + + st = parser_.Parse("mg 16nXnNeV150= b\r\n", &consumed_, &cmd_); + EXPECT_EQ(MemcacheParser::OK, st); + EXPECT_EQ(19, consumed_); + EXPECT_EQ(MemcacheParser::GET, cmd_.type); + EXPECT_EQ("שלום", cmd_.key); + + st = parser_.Parse("ma val b\r\n", &consumed_, &cmd_); + EXPECT_EQ(MemcacheParser::OK, st); + EXPECT_EQ(10, consumed_); + EXPECT_EQ(MemcacheParser::INCR, cmd_.type); + + st = parser_.Parse("ma val M- D10\r\n", &consumed_, &cmd_); + EXPECT_EQ(MemcacheParser::OK, st); + EXPECT_EQ(15, consumed_); + EXPECT_EQ(MemcacheParser::DECR, cmd_.type); + EXPECT_EQ(10, cmd_.delta); + + st = parser_.Parse("mg key f v t l h\r\n", &consumed_, &cmd_); + EXPECT_EQ(MemcacheParser::OK, st); + EXPECT_EQ(18, consumed_); + EXPECT_EQ(MemcacheParser::GET, cmd_.type); + EXPECT_EQ("key", cmd_.key); + EXPECT_TRUE(cmd_.return_flags); + EXPECT_TRUE(cmd_.return_value); + EXPECT_TRUE(cmd_.return_ttl); + EXPECT_TRUE(cmd_.return_access_time); + EXPECT_TRUE(cmd_.return_hit); +} + class MCParserNoreplyTest : public MCParserTest { protected: void RunTest(string_view str, bool noreply) { diff --git a/tests/dragonfly/memcache_meta.py b/tests/dragonfly/memcache_meta.py new file mode 100644 index 000000000000..3b4cf0b4c4f5 --- /dev/null +++ b/tests/dragonfly/memcache_meta.py @@ -0,0 +1,21 @@ +from .instance import DflyInstance +from . import dfly_args +from meta_memcache import ( + Key, + ServerAddress, + CacheClient, + connection_pool_factory_builder, +) + +DEFAULT_ARGS = {"memcached_port": 11211, "proactor_threads": 4} + + +@dfly_args(DEFAULT_ARGS) +def test_basic(df_server: DflyInstance): + pool = CacheClient.cache_client_from_servers( + servers=[ + ServerAddress(host="localhost", port=DEFAULT_ARGS.get("memcached_port")), + ], + connection_pool_factory_fn=connection_pool_factory_builder(), + ) + # TODO: to add integration tests diff --git a/tests/dragonfly/pymemcached_test.py b/tests/dragonfly/pymemcached_test.py index 3d5842341019..a8b405555f23 100644 --- a/tests/dragonfly/pymemcached_test.py +++ b/tests/dragonfly/pymemcached_test.py @@ -5,7 +5,6 @@ import socket import random import time -import warnings from . import dfly_args from .instance import DflyInstance diff --git a/tests/dragonfly/requirements.txt b/tests/dragonfly/requirements.txt index f6be33893286..cfbbd8262986 100644 --- a/tests/dragonfly/requirements.txt +++ b/tests/dragonfly/requirements.txt @@ -13,6 +13,7 @@ wrapt==1.14.1 pytest-asyncio==0.20.1 pytest-repeat==0.9.3 pymemcache==4.0.0 +meta_memcache==2 prometheus_client==0.17.0 aiohttp==3.10.2 numpy