From 966a1a46fddf4e691c501e1935f569a9f1ee7106 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Wed, 25 Dec 2024 09:46:50 +0200 Subject: [PATCH] feat: support memcache meta responses (#4366) Fixes #4348 and #3071 Signed-off-by: Roman Gershman --- src/facade/dragonfly_connection.cc | 6 ++-- src/facade/memcache_parser.cc | 5 ++- src/facade/memcache_parser.h | 2 ++ src/facade/reply_builder.cc | 51 +++++++++++++++++++++-------- src/facade/reply_builder.h | 38 +++++++++++++++++++-- src/server/main_service.cc | 9 ++++- src/server/string_family.cc | 10 +++--- tests/dragonfly/memcache_meta.py | 13 ++++++-- tests/dragonfly/pymemcached_test.py | 2 +- 9 files changed, 107 insertions(+), 29 deletions(-) diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index 7102d49e1f01..da7d884fc9ca 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -421,8 +421,8 @@ struct Connection::AsyncOperations { } void operator()(const PubMessage& msg); - void operator()(Connection::PipelineMessage& msg); - void operator()(const Connection::MCPipelineMessage& msg); + void operator()(PipelineMessage& msg); + void operator()(const MCPipelineMessage& msg); void operator()(const MonitorMessage& msg); void operator()(const AclUpdateMessage& msg); void operator()(const MigrationRequestMessage& msg); @@ -479,7 +479,7 @@ void Connection::AsyncOperations::operator()(Connection::PipelineMessage& msg) { self->skip_next_squashing_ = false; } -void Connection::AsyncOperations::operator()(const Connection::MCPipelineMessage& msg) { +void Connection::AsyncOperations::operator()(const MCPipelineMessage& msg) { self->service_->DispatchMC(msg.cmd, msg.value, static_cast(self->reply_builder_.get()), self->cc_.get()); diff --git a/src/facade/memcache_parser.cc b/src/facade/memcache_parser.cc index 4720b0f67c31..31b63cc7c571 100644 --- a/src/facade/memcache_parser.cc +++ b/src/facade/memcache_parser.cc @@ -274,6 +274,9 @@ MP::Result ParseMeta(ArgSlice tokens, MP::Command* res) { case 'h': res->return_hit = true; break; + case 'c': + res->return_version = true; + break; default: LOG(WARNING) << "unknown meta flag: " << token; // not yet implemented return MP::PARSE_ERROR; @@ -291,7 +294,7 @@ auto MP::Parse(string_view str, uint32_t* consumed, Command* cmd) -> Result { *consumed = 0; if (pos == string_view::npos) { // We need more data to parse the command. For get/gets commands this line can be very long. - // we limit maxmimum buffer capacity in the higher levels using max_client_iobuf_len. + // we limit maximum buffer capacity in the higher levels using max_client_iobuf_len. return INPUT_PENDING; } diff --git a/src/facade/memcache_parser.h b/src/facade/memcache_parser.h index 65db0f74786c..a16919130830 100644 --- a/src/facade/memcache_parser.h +++ b/src/facade/memcache_parser.h @@ -75,6 +75,8 @@ class MemcacheParser { bool return_ttl = false; // t bool return_access_time = false; // l bool return_hit = false; // h + bool return_version = false; // c + // Used internally by meta parsing. std::string blob; }; diff --git a/src/facade/reply_builder.cc b/src/facade/reply_builder.cc index 72e873f8bc1e..9a909b902d12 100644 --- a/src/facade/reply_builder.cc +++ b/src/facade/reply_builder.cc @@ -204,27 +204,40 @@ void SinkReplyBuilder::NextVec(std::string_view str) { vecs_.push_back(iovec{const_cast(str.data()), str.size()}); } -MCReplyBuilder::MCReplyBuilder(::io::Sink* sink) : SinkReplyBuilder(sink), noreply_(false) { +MCReplyBuilder::MCReplyBuilder(::io::Sink* sink) : SinkReplyBuilder(sink), all_(0) { } void MCReplyBuilder::SendValue(std::string_view key, std::string_view value, uint64_t mc_ver, uint32_t mc_flag) { ReplyScope scope(this); - WritePieces("VALUE ", key, " ", mc_flag, " ", value.size()); - if (mc_ver) - WritePieces(" ", mc_ver); - - if (value.size() <= kMaxInlineSize) { - WritePieces(kCRLF, value, kCRLF); + if (flag_.meta) { + string flags; + if (flag_.return_mcflag) + absl::StrAppend(&flags, " f", mc_flag); + if (flag_.return_version) + absl::StrAppend(&flags, " c", mc_ver); + if (flag_.return_value) { + WritePieces("VA ", value.size(), flags, kCRLF, value, kCRLF); + } else { + WritePieces("HD ", flags, kCRLF); + } } else { - WritePieces(kCRLF); - WriteRef(value); - WritePieces(kCRLF); + WritePieces("VALUE ", key, " ", mc_flag, " ", value.size()); + if (mc_ver) + WritePieces(" ", mc_ver); + + if (value.size() <= kMaxInlineSize) { + WritePieces(kCRLF, value, kCRLF); + } else { + WritePieces(kCRLF); + WriteRef(value); + WritePieces(kCRLF); + } } } void MCReplyBuilder::SendSimpleString(std::string_view str) { - if (noreply_) + if (flag_.noreply) return; ReplyScope scope(this); @@ -232,7 +245,7 @@ void MCReplyBuilder::SendSimpleString(std::string_view str) { } void MCReplyBuilder::SendStored() { - SendSimpleString("STORED"); + SendSimpleString(flag_.meta ? "HD" : "STORED"); } void MCReplyBuilder::SendLong(long val) { @@ -253,11 +266,21 @@ void MCReplyBuilder::SendClientError(string_view str) { } void MCReplyBuilder::SendSetSkipped() { - SendSimpleString("NOT_STORED"); + SendSimpleString(flag_.meta ? "NS" : "NOT_STORED"); } void MCReplyBuilder::SendNotFound() { - SendSimpleString("NOT_FOUND"); + SendSimpleString(flag_.meta ? "NF" : "NOT_FOUND"); +} + +void MCReplyBuilder::SendGetEnd() { + if (!flag_.meta) + SendSimpleString("END"); +} + +void MCReplyBuilder::SendMiss() { + if (flag_.meta) + SendSimpleString("EN"); } void MCReplyBuilder::SendRaw(std::string_view str) { diff --git a/src/facade/reply_builder.h b/src/facade/reply_builder.h index f148bf97f145..4000558c7fd0 100644 --- a/src/facade/reply_builder.h +++ b/src/facade/reply_builder.h @@ -171,6 +171,8 @@ class MCReplyBuilder : public SinkReplyBuilder { void SendClientError(std::string_view str); void SendNotFound(); + void SendMiss(); + void SendGetEnd(); void SendValue(std::string_view key, std::string_view value, uint64_t mc_ver, uint32_t mc_flag); void SendSimpleString(std::string_view str) final; @@ -179,15 +181,45 @@ class MCReplyBuilder : public SinkReplyBuilder { void SendRaw(std::string_view str); void SetNoreply(bool noreply) { - noreply_ = noreply; + flag_.noreply = noreply; } bool NoReply() const { - return noreply_; + return flag_.noreply; + } + + void SetMeta(bool meta) { + flag_.meta = meta; + } + + void SetBase64(bool base64) { + flag_.base64 = base64; + } + + void SetReturnMCFlag(bool val) { + flag_.return_mcflag = val; + } + + void SetReturnValue(bool val) { + flag_.return_value = val; + } + + void SetReturnVersion(bool val) { + flag_.return_version = val; } private: - bool noreply_ = false; + union { + struct { + uint8_t noreply : 1; + uint8_t meta : 1; + uint8_t base64 : 1; + uint8_t return_value : 1; + uint8_t return_mcflag : 1; + uint8_t return_version : 1; + } flag_; + uint8_t all_; + }; }; // Redis reply builder interface for sending RESP data. diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 3a1cde50f5e3..df22f6b8ccd3 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -1490,6 +1490,13 @@ void Service::DispatchMC(const MemcacheParser::Command& cmd, std::string_view va char ttl_op[] = "EXAT"; mc_builder->SetNoreply(cmd.no_reply); + mc_builder->SetMeta(cmd.meta); + if (cmd.meta) { + mc_builder->SetBase64(cmd.base64); + mc_builder->SetReturnMCFlag(cmd.return_flags); + mc_builder->SetReturnValue(cmd.return_value); + mc_builder->SetReturnVersion(cmd.return_version); + } switch (cmd.type) { case MemcacheParser::REPLACE: @@ -1533,7 +1540,7 @@ void Service::DispatchMC(const MemcacheParser::Command& cmd, std::string_view va server_family_.StatsMC(cmd.key, mc_builder); return; case MemcacheParser::VERSION: - mc_builder->SendSimpleString("VERSION 1.5.0 DF"); + mc_builder->SendSimpleString("VERSION 1.6.0 DF"); return; default: mc_builder->SendClientError("bad command line format"); diff --git a/src/server/string_family.cc b/src/server/string_family.cc index 67a711098ccf..537611c5f39f 100644 --- a/src/server/string_family.cc +++ b/src/server/string_family.cc @@ -1360,11 +1360,13 @@ void StringFamily::MGet(CmdArgList args, const CommandContext& cmnd_cntx) { auto* rb = static_cast(builder); DCHECK(dynamic_cast(builder) == nullptr); for (const auto& entry : res) { - if (!entry) - continue; - rb->SendValue(entry->key, entry->value, entry->mc_ver, entry->mc_flag); + if (entry) { + rb->SendValue(entry->key, entry->value, entry->mc_ver, entry->mc_flag); + } else { + rb->SendMiss(); + } } - rb->SendSimpleString("END"); + rb->SendGetEnd(); } else { auto* rb = static_cast(builder); rb->StartArray(res.size()); diff --git a/tests/dragonfly/memcache_meta.py b/tests/dragonfly/memcache_meta.py index 3b4cf0b4c4f5..87bbd1fcf648 100644 --- a/tests/dragonfly/memcache_meta.py +++ b/tests/dragonfly/memcache_meta.py @@ -6,6 +6,7 @@ CacheClient, connection_pool_factory_builder, ) +from meta_memcache.protocol import RequestFlags, Miss, Value, Success DEFAULT_ARGS = {"memcached_port": 11211, "proactor_threads": 4} @@ -16,6 +17,14 @@ def test_basic(df_server: DflyInstance): servers=[ ServerAddress(host="localhost", port=DEFAULT_ARGS.get("memcached_port")), ], - connection_pool_factory_fn=connection_pool_factory_builder(), + connection_pool_factory_fn=connection_pool_factory_builder(recv_timeout=5), ) - # TODO: to add integration tests + + assert pool.set("key1", "value1", 100) + assert pool.set("key1", "value2", 0) + assert pool.get("key1") == "value2" + + request_flags = RequestFlags(return_value=False) + response = pool.meta_get(Key("key1"), flags=request_flags) + assert isinstance(response, Success) + assert pool.get("key2") is None diff --git a/tests/dragonfly/pymemcached_test.py b/tests/dragonfly/pymemcached_test.py index a8b405555f23..96263dd7f2ca 100644 --- a/tests/dragonfly/pymemcached_test.py +++ b/tests/dragonfly/pymemcached_test.py @@ -137,7 +137,7 @@ def test_version(memcached_client: MCClient): Our real version is being returned in the stats command. Also verified manually that php client parses correctly the version string that ends with "DF". """ - assert b"1.5.0 DF" == memcached_client.version() + assert b"1.6.0 DF" == memcached_client.version() stats = memcached_client.stats() version = stats[b"version"].decode("utf-8") assert version.startswith("v") or version == "dev"