Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support memcache meta responses #4366

Merged
merged 1 commit into from
Dec 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -421,8 +421,8 @@ struct Connection::AsyncOperations {
}

void operator()(const PubMessage& msg);
void operator()(Connection::PipelineMessage& msg);
void operator()(const Connection::MCPipelineMessage& msg);
void operator()(PipelineMessage& msg);
void operator()(const MCPipelineMessage& msg);
void operator()(const MonitorMessage& msg);
void operator()(const AclUpdateMessage& msg);
void operator()(const MigrationRequestMessage& msg);
Expand Down Expand Up @@ -479,7 +479,7 @@ void Connection::AsyncOperations::operator()(Connection::PipelineMessage& msg) {
self->skip_next_squashing_ = false;
}

void Connection::AsyncOperations::operator()(const Connection::MCPipelineMessage& msg) {
void Connection::AsyncOperations::operator()(const MCPipelineMessage& msg) {
self->service_->DispatchMC(msg.cmd, msg.value,
static_cast<MCReplyBuilder*>(self->reply_builder_.get()),
self->cc_.get());
Expand Down
5 changes: 4 additions & 1 deletion src/facade/memcache_parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,9 @@ MP::Result ParseMeta(ArgSlice tokens, MP::Command* res) {
case 'h':
res->return_hit = true;
break;
case 'c':
res->return_version = true;
break;
default:
LOG(WARNING) << "unknown meta flag: " << token; // not yet implemented
return MP::PARSE_ERROR;
Expand All @@ -291,7 +294,7 @@ auto MP::Parse(string_view str, uint32_t* consumed, Command* cmd) -> Result {
*consumed = 0;
if (pos == string_view::npos) {
// We need more data to parse the command. For get/gets commands this line can be very long.
// we limit maxmimum buffer capacity in the higher levels using max_client_iobuf_len.
// we limit maximum buffer capacity in the higher levels using max_client_iobuf_len.
return INPUT_PENDING;
}

Expand Down
2 changes: 2 additions & 0 deletions src/facade/memcache_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ class MemcacheParser {
bool return_ttl = false; // t
bool return_access_time = false; // l
bool return_hit = false; // h
bool return_version = false; // c

// Used internally by meta parsing.
std::string blob;
};
Expand Down
51 changes: 37 additions & 14 deletions src/facade/reply_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -204,35 +204,48 @@ void SinkReplyBuilder::NextVec(std::string_view str) {
vecs_.push_back(iovec{const_cast<char*>(str.data()), str.size()});
}

MCReplyBuilder::MCReplyBuilder(::io::Sink* sink) : SinkReplyBuilder(sink), noreply_(false) {
MCReplyBuilder::MCReplyBuilder(::io::Sink* sink) : SinkReplyBuilder(sink), all_(0) {
}

void MCReplyBuilder::SendValue(std::string_view key, std::string_view value, uint64_t mc_ver,
uint32_t mc_flag) {
ReplyScope scope(this);
WritePieces("VALUE ", key, " ", mc_flag, " ", value.size());
if (mc_ver)
WritePieces(" ", mc_ver);

if (value.size() <= kMaxInlineSize) {
WritePieces(kCRLF, value, kCRLF);
if (flag_.meta) {
string flags;
if (flag_.return_mcflag)
absl::StrAppend(&flags, " f", mc_flag);
if (flag_.return_version)
absl::StrAppend(&flags, " c", mc_ver);
if (flag_.return_value) {
WritePieces("VA ", value.size(), flags, kCRLF, value, kCRLF);
} else {
WritePieces("HD ", flags, kCRLF);
}
} else {
WritePieces(kCRLF);
WriteRef(value);
WritePieces(kCRLF);
WritePieces("VALUE ", key, " ", mc_flag, " ", value.size());
if (mc_ver)
WritePieces(" ", mc_ver);

if (value.size() <= kMaxInlineSize) {
WritePieces(kCRLF, value, kCRLF);
} else {
WritePieces(kCRLF);
WriteRef(value);
WritePieces(kCRLF);
}
}
}

void MCReplyBuilder::SendSimpleString(std::string_view str) {
if (noreply_)
if (flag_.noreply)
return;

ReplyScope scope(this);
WritePieces(str, kCRLF);
}

void MCReplyBuilder::SendStored() {
SendSimpleString("STORED");
SendSimpleString(flag_.meta ? "HD" : "STORED");
}

void MCReplyBuilder::SendLong(long val) {
Expand All @@ -253,11 +266,21 @@ void MCReplyBuilder::SendClientError(string_view str) {
}

void MCReplyBuilder::SendSetSkipped() {
SendSimpleString("NOT_STORED");
SendSimpleString(flag_.meta ? "NS" : "NOT_STORED");
}

void MCReplyBuilder::SendNotFound() {
SendSimpleString("NOT_FOUND");
SendSimpleString(flag_.meta ? "NF" : "NOT_FOUND");
}

void MCReplyBuilder::SendGetEnd() {
if (!flag_.meta)
SendSimpleString("END");
}

void MCReplyBuilder::SendMiss() {
if (flag_.meta)
SendSimpleString("EN");
}

void MCReplyBuilder::SendRaw(std::string_view str) {
Expand Down
38 changes: 35 additions & 3 deletions src/facade/reply_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ class MCReplyBuilder : public SinkReplyBuilder {

void SendClientError(std::string_view str);
void SendNotFound();
void SendMiss();
void SendGetEnd();

void SendValue(std::string_view key, std::string_view value, uint64_t mc_ver, uint32_t mc_flag);
void SendSimpleString(std::string_view str) final;
Expand All @@ -179,15 +181,45 @@ class MCReplyBuilder : public SinkReplyBuilder {
void SendRaw(std::string_view str);

void SetNoreply(bool noreply) {
noreply_ = noreply;
flag_.noreply = noreply;
}

bool NoReply() const {
return noreply_;
return flag_.noreply;
}

void SetMeta(bool meta) {
flag_.meta = meta;
}

void SetBase64(bool base64) {
flag_.base64 = base64;
}

void SetReturnMCFlag(bool val) {
flag_.return_mcflag = val;
}

void SetReturnValue(bool val) {
flag_.return_value = val;
}

void SetReturnVersion(bool val) {
flag_.return_version = val;
}

private:
bool noreply_ = false;
union {
struct {
uint8_t noreply : 1;
uint8_t meta : 1;
uint8_t base64 : 1;
uint8_t return_value : 1;
uint8_t return_mcflag : 1;
uint8_t return_version : 1;
} flag_;
uint8_t all_;
};
};

// Redis reply builder interface for sending RESP data.
Expand Down
9 changes: 8 additions & 1 deletion src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1490,6 +1490,13 @@ void Service::DispatchMC(const MemcacheParser::Command& cmd, std::string_view va
char ttl_op[] = "EXAT";

mc_builder->SetNoreply(cmd.no_reply);
mc_builder->SetMeta(cmd.meta);
if (cmd.meta) {
mc_builder->SetBase64(cmd.base64);
mc_builder->SetReturnMCFlag(cmd.return_flags);
mc_builder->SetReturnValue(cmd.return_value);
mc_builder->SetReturnVersion(cmd.return_version);
}

switch (cmd.type) {
case MemcacheParser::REPLACE:
Expand Down Expand Up @@ -1533,7 +1540,7 @@ void Service::DispatchMC(const MemcacheParser::Command& cmd, std::string_view va
server_family_.StatsMC(cmd.key, mc_builder);
return;
case MemcacheParser::VERSION:
mc_builder->SendSimpleString("VERSION 1.5.0 DF");
mc_builder->SendSimpleString("VERSION 1.6.0 DF");
return;
default:
mc_builder->SendClientError("bad command line format");
Expand Down
10 changes: 6 additions & 4 deletions src/server/string_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1360,11 +1360,13 @@ void StringFamily::MGet(CmdArgList args, const CommandContext& cmnd_cntx) {
auto* rb = static_cast<MCReplyBuilder*>(builder);
DCHECK(dynamic_cast<CapturingReplyBuilder*>(builder) == nullptr);
for (const auto& entry : res) {
if (!entry)
continue;
rb->SendValue(entry->key, entry->value, entry->mc_ver, entry->mc_flag);
if (entry) {
rb->SendValue(entry->key, entry->value, entry->mc_ver, entry->mc_flag);
} else {
rb->SendMiss();
}
}
rb->SendSimpleString("END");
rb->SendGetEnd();
} else {
auto* rb = static_cast<RedisReplyBuilder*>(builder);
rb->StartArray(res.size());
Expand Down
13 changes: 11 additions & 2 deletions tests/dragonfly/memcache_meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
CacheClient,
connection_pool_factory_builder,
)
from meta_memcache.protocol import RequestFlags, Miss, Value, Success

DEFAULT_ARGS = {"memcached_port": 11211, "proactor_threads": 4}

Expand All @@ -16,6 +17,14 @@ def test_basic(df_server: DflyInstance):
servers=[
ServerAddress(host="localhost", port=DEFAULT_ARGS.get("memcached_port")),
],
connection_pool_factory_fn=connection_pool_factory_builder(),
connection_pool_factory_fn=connection_pool_factory_builder(recv_timeout=5),
)
# TODO: to add integration tests

assert pool.set("key1", "value1", 100)
assert pool.set("key1", "value2", 0)
assert pool.get("key1") == "value2"

request_flags = RequestFlags(return_value=False)
response = pool.meta_get(Key("key1"), flags=request_flags)
assert isinstance(response, Success)
assert pool.get("key2") is None
2 changes: 1 addition & 1 deletion tests/dragonfly/pymemcached_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ def test_version(memcached_client: MCClient):
Our real version is being returned in the stats command.
Also verified manually that php client parses correctly the version string that ends with "DF".
"""
assert b"1.5.0 DF" == memcached_client.version()
assert b"1.6.0 DF" == memcached_client.version()
stats = memcached_client.stats()
version = stats[b"version"].decode("utf-8")
assert version.startswith("v") or version == "dev"
Expand Down
Loading