Skip to content

Commit

Permalink
feat: add support for meta memcache commands (#4362)
Browse files Browse the repository at this point in the history
This is a stripped down version of supporting the memcache meta requests.

a. Not all meta flags are supported, but TTL, flags, arithmetics are supported.
b. does not include reply support.
c. does not include new semantics that are not part of the older, ascii protocol.

The parser interface has not changed significantly, and the meta commands are emulated
using the old, high level commands like ADD,REPLACE, INCR etc.

See https://raw.githubusercontent.com/memcached/memcached/refs/heads/master/doc/protocol.txt for more details
regarding the meta commands spec.

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange authored Dec 24, 2024
1 parent 6946820 commit a27cce8
Show file tree
Hide file tree
Showing 7 changed files with 291 additions and 22 deletions.
6 changes: 6 additions & 0 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1137,8 +1137,14 @@ auto Connection::ParseMemcache() -> ParserStatus {

do {
string_view str = ToSV(io_buf_.InputBuffer());

if (str.empty()) {
return OK;
}

result = memcache_parser_->Parse(str, &consumed, &cmd);

DVLOG(2) << "mc_result " << result << " consumed: " << consumed << " type " << cmd.type;
if (result != MemcacheParser::OK) {
io_buf_.ConsumeInput(consumed);
break;
Expand Down
212 changes: 193 additions & 19 deletions src/facade/memcache_parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
#include <absl/container/flat_hash_map.h>
#include <absl/container/inlined_vector.h>
#include <absl/strings/ascii.h>
#include <absl/strings/escaping.h>
#include <absl/strings/numbers.h>
#include <absl/strings/str_split.h>
#include <absl/types/span.h>

#include "base/logging.h"
#include "base/stl_util.h"
#include "facade/facade_types.h"

namespace facade {
using namespace std;
Expand All @@ -29,16 +31,37 @@ MP::CmdType From(string_view token) {
{"quit", MP::QUIT}, {"version", MP::VERSION},
};

auto it = cmd_map.find(token);
if (it == cmd_map.end())
if (token.size() == 2) {
// META_COMMANDS
if (token[0] != 'm')
return MP::INVALID;
switch (token[1]) {
case 's':
return MP::META_SET;
case 'g':
return MP::META_GET;
case 'd':
return MP::META_DEL;
case 'a':
return MP::META_ARITHM;
case 'n':
return MP::META_NOOP;
case 'e':
return MP::META_DEBUG;
}
return MP::INVALID;
}

return it->second;
if (token.size() > 2) {
auto it = cmd_map.find(token);
if (it == cmd_map.end())
return MP::INVALID;
return it->second;
}
return MP::INVALID;
}

using TokensView = absl::Span<std::string_view>;

MP::Result ParseStore(TokensView tokens, MP::Command* res) {
MP::Result ParseStore(ArgSlice tokens, MP::Command* res) {
const size_t num_tokens = tokens.size();
unsigned opt_pos = 3;
if (res->type == MP::CAS) {
Expand Down Expand Up @@ -70,7 +93,7 @@ MP::Result ParseStore(TokensView tokens, MP::Command* res) {
return MP::OK;
}

MP::Result ParseValueless(TokensView tokens, MP::Command* res) {
MP::Result ParseValueless(ArgSlice tokens, MP::Command* res) {
const size_t num_tokens = tokens.size();
size_t key_pos = 0;
if (res->type == MP::GAT || res->type == MP::GATS) {
Expand Down Expand Up @@ -116,13 +139,159 @@ MP::Result ParseValueless(TokensView tokens, MP::Command* res) {
return MP::OK;
}

bool ParseMetaMode(char m, MP::Command* res) {
if (res->type == MP::SET) {
switch (m) {
case 'E':
res->type = MP::ADD;
break;
case 'A':
res->type = MP::APPEND;
break;
case 'R':
res->type = MP::REPLACE;
break;
case 'P':
res->type = MP::PREPEND;
break;
case 'S':
break;
default:
return false;
}
return true;
}

if (res->type == MP::INCR) {
switch (m) {
case 'I':
case '+':
break;
case 'D':
case '-':
res->type = MP::DECR;
break;
default:
return false;
}
return true;
}
return false;
}

// See https://raw.githubusercontent.com/memcached/memcached/refs/heads/master/doc/protocol.txt
MP::Result ParseMeta(ArgSlice tokens, MP::Command* res) {
DCHECK(!tokens.empty());

if (res->type == MP::META_DEBUG) {
LOG(ERROR) << "meta debug not yet implemented";
return MP::PARSE_ERROR;
}

if (tokens[0].size() > 250)
return MP::PARSE_ERROR;

res->meta = true;
res->key = tokens[0];
res->bytes_len = 0;
res->flags = 0;
res->expire_ts = 0;

tokens.remove_prefix(1);

// We emulate the behavior by returning the high level commands.
// TODO: we should reverse the interface in the future, so that a high level command
// will be represented in MemcacheParser::Command by a meta command with flags.
// high level commands should not be part of the interface in the future.
switch (res->type) {
case MP::META_GET:
res->type = MP::GET;
break;
case MP::META_DEL:
res->type = MP::DELETE;
break;
case MP::META_SET:
if (tokens.empty()) {
return MP::PARSE_ERROR;
}
if (!absl::SimpleAtoi(tokens[0], &res->bytes_len))
return MP::BAD_INT;

res->type = MP::SET;
tokens.remove_prefix(1);
break;
case MP::META_ARITHM:
res->type = MP::INCR;
res->delta = 1;
break;
default:
return MP::PARSE_ERROR;
}

for (size_t i = 0; i < tokens.size(); ++i) {
string_view token = tokens[i];

switch (token[0]) {
case 'T':
if (!absl::SimpleAtoi(token.substr(1), &res->expire_ts))
return MP::BAD_INT;
break;
case 'b':
if (token.size() != 1)
return MP::PARSE_ERROR;
if (!absl::Base64Unescape(res->key, &res->blob))
return MP::PARSE_ERROR;
res->key = res->blob;
res->base64 = true;
break;
case 'F':
if (!absl::SimpleAtoi(token.substr(1), &res->flags))
return MP::BAD_INT;
break;
case 'M':
if (token.size() != 2 || !ParseMetaMode(token[1], res))
return MP::PARSE_ERROR;
break;
case 'D':
if (!absl::SimpleAtoi(token.substr(1), &res->delta))
return MP::BAD_INT;
break;
case 'q':
res->no_reply = true;
break;
case 'f':
res->return_flags = true;
break;
case 'v':
res->return_value = true;
break;
case 't':
res->return_ttl = true;
break;
case 'l':
res->return_access_time = true;
break;
case 'h':
res->return_hit = true;
break;
default:
LOG(WARNING) << "unknown meta flag: " << token; // not yet implemented
return MP::PARSE_ERROR;
}
}

return MP::OK;
}

} // namespace

auto MP::Parse(string_view str, uint32_t* consumed, Command* cmd) -> Result {
cmd->no_reply = false; // re-initialize
auto pos = str.find("\r\n");
*consumed = 0;
if (pos == string_view::npos) {
// We need more data to parse the command. For get/gets commands this line can be very long.
// we limit maxmimum buffer capacity in the higher levels using max_client_iobuf_len.
return INPUT_PENDING;
}

Expand All @@ -131,42 +300,47 @@ auto MP::Parse(string_view str, uint32_t* consumed, Command* cmd) -> Result {
}
*consumed = pos + 2;

std::string_view tokens_expression = str.substr(0, pos);
string_view tokens_expression = str.substr(0, pos);

// cas <key> <flags> <exptime> <bytes> <cas unique> [noreply]\r\n
// get <key>*\r\n
absl::InlinedVector<std::string_view, 32> tokens =
// ms <key> <datalen> <flags>*\r\n
absl::InlinedVector<string_view, 32> tokens =
absl::StrSplit(tokens_expression, ' ', absl::SkipWhitespace());

const size_t num_tokens = tokens.size();

if (num_tokens == 0)
if (tokens.empty())
return PARSE_ERROR;

cmd->type = From(tokens[0]);
if (cmd->type == INVALID) {
return UNKNOWN_CMD;
}

if (cmd->type <= CAS) { // Store command
if (num_tokens < 5 || tokens[1].size() > 250) { // key length limit
ArgSlice tokens_view{tokens};
tokens_view.remove_prefix(1);

if (cmd->type <= CAS) { // Store command
if (tokens_view.size() < 4 || tokens[0].size() > 250) { // key length limit
return MP::PARSE_ERROR;
}

cmd->key = string_view{tokens[1].data(), tokens[1].size()};
cmd->key = tokens_view[0];

TokensView tokens_view{tokens.begin() + 2, num_tokens - 2};
tokens_view.remove_prefix(1);
return ParseStore(tokens_view, cmd);
}

if (num_tokens == 1) {
if (base::_in(cmd->type, {MP::STATS, MP::FLUSHALL, MP::QUIT, MP::VERSION})) {
if (cmd->type >= META_SET) {
return tokens_view.empty() ? MP::PARSE_ERROR : ParseMeta(tokens_view, cmd);
}

if (tokens_view.empty()) {
if (base::_in(cmd->type, {MP::STATS, MP::FLUSHALL, MP::QUIT, MP::VERSION, MP::META_NOOP})) {
return MP::OK;
}
return MP::PARSE_ERROR;
}

TokensView tokens_view{tokens.begin() + 1, num_tokens - 1};
return ParseValueless(tokens_view, cmd);
};

Expand Down
24 changes: 22 additions & 2 deletions src/facade/memcache_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#pragma once

#include <cstdint>
#include <string>
#include <string_view>
#include <vector>

Expand Down Expand Up @@ -39,6 +40,14 @@ class MemcacheParser {
INCR = 32,
DECR = 33,
FLUSHALL = 34,

// META_COMMANDS
META_NOOP = 50,
META_SET = 51,
META_DEL = 52,
META_ARITHM = 53,
META_GET = 54,
META_DEBUG = 55,
};

// According to https://github.com/memcached/memcached/wiki/Commands#standard-protocol
Expand All @@ -56,15 +65,26 @@ class MemcacheParser {
0; // relative (expire_ts <= month) or unix time (expire_ts > month) in seconds
uint32_t bytes_len = 0;
uint32_t flags = 0;
bool no_reply = false;
bool no_reply = false; // q
bool meta = false;

// meta flags
bool base64 = false; // b
bool return_flags = false; // f
bool return_value = false; // v
bool return_ttl = false; // t
bool return_access_time = false; // l
bool return_hit = false; // h
// Used internally by meta parsing.
std::string blob;
};

enum Result {
OK,
INPUT_PENDING,
UNKNOWN_CMD,
BAD_INT,
PARSE_ERROR,
PARSE_ERROR, // request parse error, but can continue parsing within the same connection.
BAD_DELTA,
};

Expand Down
Loading

0 comments on commit a27cce8

Please sign in to comment.