Skip to content

Commit

Permalink
feat: support memcache meta responses (#4366)
Browse files Browse the repository at this point in the history
Fixes #4348 and #3071

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange authored Dec 25, 2024
1 parent c88c707 commit 966a1a4
Show file tree
Hide file tree
Showing 9 changed files with 107 additions and 29 deletions.
6 changes: 3 additions & 3 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -421,8 +421,8 @@ struct Connection::AsyncOperations {
}

void operator()(const PubMessage& msg);
void operator()(Connection::PipelineMessage& msg);
void operator()(const Connection::MCPipelineMessage& msg);
void operator()(PipelineMessage& msg);
void operator()(const MCPipelineMessage& msg);
void operator()(const MonitorMessage& msg);
void operator()(const AclUpdateMessage& msg);
void operator()(const MigrationRequestMessage& msg);
Expand Down Expand Up @@ -479,7 +479,7 @@ void Connection::AsyncOperations::operator()(Connection::PipelineMessage& msg) {
self->skip_next_squashing_ = false;
}

void Connection::AsyncOperations::operator()(const Connection::MCPipelineMessage& msg) {
void Connection::AsyncOperations::operator()(const MCPipelineMessage& msg) {
self->service_->DispatchMC(msg.cmd, msg.value,
static_cast<MCReplyBuilder*>(self->reply_builder_.get()),
self->cc_.get());
Expand Down
5 changes: 4 additions & 1 deletion src/facade/memcache_parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,9 @@ MP::Result ParseMeta(ArgSlice tokens, MP::Command* res) {
case 'h':
res->return_hit = true;
break;
case 'c':
res->return_version = true;
break;
default:
LOG(WARNING) << "unknown meta flag: " << token; // not yet implemented
return MP::PARSE_ERROR;
Expand All @@ -291,7 +294,7 @@ auto MP::Parse(string_view str, uint32_t* consumed, Command* cmd) -> Result {
*consumed = 0;
if (pos == string_view::npos) {
// We need more data to parse the command. For get/gets commands this line can be very long.
// we limit maxmimum buffer capacity in the higher levels using max_client_iobuf_len.
// we limit maximum buffer capacity in the higher levels using max_client_iobuf_len.
return INPUT_PENDING;
}

Expand Down
2 changes: 2 additions & 0 deletions src/facade/memcache_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ class MemcacheParser {
bool return_ttl = false; // t
bool return_access_time = false; // l
bool return_hit = false; // h
bool return_version = false; // c

// Used internally by meta parsing.
std::string blob;
};
Expand Down
51 changes: 37 additions & 14 deletions src/facade/reply_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -204,35 +204,48 @@ void SinkReplyBuilder::NextVec(std::string_view str) {
vecs_.push_back(iovec{const_cast<char*>(str.data()), str.size()});
}

MCReplyBuilder::MCReplyBuilder(::io::Sink* sink) : SinkReplyBuilder(sink), noreply_(false) {
MCReplyBuilder::MCReplyBuilder(::io::Sink* sink) : SinkReplyBuilder(sink), all_(0) {
}

void MCReplyBuilder::SendValue(std::string_view key, std::string_view value, uint64_t mc_ver,
uint32_t mc_flag) {
ReplyScope scope(this);
WritePieces("VALUE ", key, " ", mc_flag, " ", value.size());
if (mc_ver)
WritePieces(" ", mc_ver);

if (value.size() <= kMaxInlineSize) {
WritePieces(kCRLF, value, kCRLF);
if (flag_.meta) {
string flags;
if (flag_.return_mcflag)
absl::StrAppend(&flags, " f", mc_flag);
if (flag_.return_version)
absl::StrAppend(&flags, " c", mc_ver);
if (flag_.return_value) {
WritePieces("VA ", value.size(), flags, kCRLF, value, kCRLF);
} else {
WritePieces("HD ", flags, kCRLF);
}
} else {
WritePieces(kCRLF);
WriteRef(value);
WritePieces(kCRLF);
WritePieces("VALUE ", key, " ", mc_flag, " ", value.size());
if (mc_ver)
WritePieces(" ", mc_ver);

if (value.size() <= kMaxInlineSize) {
WritePieces(kCRLF, value, kCRLF);
} else {
WritePieces(kCRLF);
WriteRef(value);
WritePieces(kCRLF);
}
}
}

void MCReplyBuilder::SendSimpleString(std::string_view str) {
if (noreply_)
if (flag_.noreply)
return;

ReplyScope scope(this);
WritePieces(str, kCRLF);
}

void MCReplyBuilder::SendStored() {
SendSimpleString("STORED");
SendSimpleString(flag_.meta ? "HD" : "STORED");
}

void MCReplyBuilder::SendLong(long val) {
Expand All @@ -253,11 +266,21 @@ void MCReplyBuilder::SendClientError(string_view str) {
}

void MCReplyBuilder::SendSetSkipped() {
SendSimpleString("NOT_STORED");
SendSimpleString(flag_.meta ? "NS" : "NOT_STORED");
}

void MCReplyBuilder::SendNotFound() {
SendSimpleString("NOT_FOUND");
SendSimpleString(flag_.meta ? "NF" : "NOT_FOUND");
}

void MCReplyBuilder::SendGetEnd() {
if (!flag_.meta)
SendSimpleString("END");
}

void MCReplyBuilder::SendMiss() {
if (flag_.meta)
SendSimpleString("EN");
}

void MCReplyBuilder::SendRaw(std::string_view str) {
Expand Down
38 changes: 35 additions & 3 deletions src/facade/reply_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ class MCReplyBuilder : public SinkReplyBuilder {

void SendClientError(std::string_view str);
void SendNotFound();
void SendMiss();
void SendGetEnd();

void SendValue(std::string_view key, std::string_view value, uint64_t mc_ver, uint32_t mc_flag);
void SendSimpleString(std::string_view str) final;
Expand All @@ -179,15 +181,45 @@ class MCReplyBuilder : public SinkReplyBuilder {
void SendRaw(std::string_view str);

void SetNoreply(bool noreply) {
noreply_ = noreply;
flag_.noreply = noreply;
}

bool NoReply() const {
return noreply_;
return flag_.noreply;
}

void SetMeta(bool meta) {
flag_.meta = meta;
}

void SetBase64(bool base64) {
flag_.base64 = base64;
}

void SetReturnMCFlag(bool val) {
flag_.return_mcflag = val;
}

void SetReturnValue(bool val) {
flag_.return_value = val;
}

void SetReturnVersion(bool val) {
flag_.return_version = val;
}

private:
bool noreply_ = false;
union {
struct {
uint8_t noreply : 1;
uint8_t meta : 1;
uint8_t base64 : 1;
uint8_t return_value : 1;
uint8_t return_mcflag : 1;
uint8_t return_version : 1;
} flag_;
uint8_t all_;
};
};

// Redis reply builder interface for sending RESP data.
Expand Down
9 changes: 8 additions & 1 deletion src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1490,6 +1490,13 @@ void Service::DispatchMC(const MemcacheParser::Command& cmd, std::string_view va
char ttl_op[] = "EXAT";

mc_builder->SetNoreply(cmd.no_reply);
mc_builder->SetMeta(cmd.meta);
if (cmd.meta) {
mc_builder->SetBase64(cmd.base64);
mc_builder->SetReturnMCFlag(cmd.return_flags);
mc_builder->SetReturnValue(cmd.return_value);
mc_builder->SetReturnVersion(cmd.return_version);
}

switch (cmd.type) {
case MemcacheParser::REPLACE:
Expand Down Expand Up @@ -1533,7 +1540,7 @@ void Service::DispatchMC(const MemcacheParser::Command& cmd, std::string_view va
server_family_.StatsMC(cmd.key, mc_builder);
return;
case MemcacheParser::VERSION:
mc_builder->SendSimpleString("VERSION 1.5.0 DF");
mc_builder->SendSimpleString("VERSION 1.6.0 DF");
return;
default:
mc_builder->SendClientError("bad command line format");
Expand Down
10 changes: 6 additions & 4 deletions src/server/string_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1360,11 +1360,13 @@ void StringFamily::MGet(CmdArgList args, const CommandContext& cmnd_cntx) {
auto* rb = static_cast<MCReplyBuilder*>(builder);
DCHECK(dynamic_cast<CapturingReplyBuilder*>(builder) == nullptr);
for (const auto& entry : res) {
if (!entry)
continue;
rb->SendValue(entry->key, entry->value, entry->mc_ver, entry->mc_flag);
if (entry) {
rb->SendValue(entry->key, entry->value, entry->mc_ver, entry->mc_flag);
} else {
rb->SendMiss();
}
}
rb->SendSimpleString("END");
rb->SendGetEnd();
} else {
auto* rb = static_cast<RedisReplyBuilder*>(builder);
rb->StartArray(res.size());
Expand Down
13 changes: 11 additions & 2 deletions tests/dragonfly/memcache_meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
CacheClient,
connection_pool_factory_builder,
)
from meta_memcache.protocol import RequestFlags, Miss, Value, Success

DEFAULT_ARGS = {"memcached_port": 11211, "proactor_threads": 4}

Expand All @@ -16,6 +17,14 @@ def test_basic(df_server: DflyInstance):
servers=[
ServerAddress(host="localhost", port=DEFAULT_ARGS.get("memcached_port")),
],
connection_pool_factory_fn=connection_pool_factory_builder(),
connection_pool_factory_fn=connection_pool_factory_builder(recv_timeout=5),
)
# TODO: to add integration tests

assert pool.set("key1", "value1", 100)
assert pool.set("key1", "value2", 0)
assert pool.get("key1") == "value2"

request_flags = RequestFlags(return_value=False)
response = pool.meta_get(Key("key1"), flags=request_flags)
assert isinstance(response, Success)
assert pool.get("key2") is None
2 changes: 1 addition & 1 deletion tests/dragonfly/pymemcached_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ def test_version(memcached_client: MCClient):
Our real version is being returned in the stats command.
Also verified manually that php client parses correctly the version string that ends with "DF".
"""
assert b"1.5.0 DF" == memcached_client.version()
assert b"1.6.0 DF" == memcached_client.version()
stats = memcached_client.stats()
version = stats[b"version"].decode("utf-8")
assert version.startswith("v") or version == "dev"
Expand Down

0 comments on commit 966a1a4

Please sign in to comment.