diff --git a/src/server/dragonfly_test.cc b/src/server/dragonfly_test.cc index 848b08ea73aa..3d55802f7f93 100644 --- a/src/server/dragonfly_test.cc +++ b/src/server/dragonfly_test.cc @@ -802,6 +802,25 @@ TEST_F(DflyEngineTest, DebugObject) { EXPECT_THAT(resp.GetString(), HasSubstr("encoding:listpack")); } +TEST_F(DflyEngineTest, StreamMemInfo) { + for (int i = 1; i < 2; ++i) { + Run({"XADD", "test", std::to_string(i), "var", "val" + std::to_string(i)}); + } + + int64_t stream_mem_first = GetMetrics().db_stats[0].memory_usage_by_type[OBJ_STREAM]; + EXPECT_GT(stream_mem_first, 0); + + auto dump = Run({"dump", "test"}); + Run({"del", "test"}); + Run({"restore", "test", "0", facade::ToSV(dump.GetBuf())}); + + int64_t stream_mem_second = GetMetrics().db_stats[0].memory_usage_by_type[OBJ_STREAM]; + + // stream_mem_first != stream_mem_second due to a preallocation in XADD command (see + // STREAM_LISTPACK_MAX_PRE_ALLOCATE) + EXPECT_GT(stream_mem_second, 0); +} + // TODO: to test transactions with a single shard since then all transactions become local. // To consider having a parameter in dragonfly engine controlling number of shards // unconditionally from number of cpus. TO TEST BLPOP under multi for single/multi argument case. diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index bd926b45baf3..d227cd8231e0 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -50,6 +50,7 @@ extern "C" { #include "server/serializer_commons.h" #include "server/server_state.h" #include "server/set_family.h" +#include "server/stream_family.h" #include "server/transaction.h" #include "strings/human_readable.h" @@ -703,6 +704,7 @@ void RdbLoaderBase::OpaqueObjLoader::CreateZSet(const LoadTrace* ltrace) { void RdbLoaderBase::OpaqueObjLoader::CreateStream(const LoadTrace* ltrace) { stream* s; + StreamMemTracker mem_tracker; if (config_.append) { if (!EnsureObjEncoding(OBJ_STREAM, OBJ_ENCODING_STREAM)) { return; @@ -848,6 +850,7 @@ void RdbLoaderBase::OpaqueObjLoader::CreateStream(const LoadTrace* ltrace) { if (!config_.append) { pv_->InitRobj(OBJ_STREAM, OBJ_ENCODING_STREAM, s); } + mem_tracker.UpdateStreamSize(*pv_); } void RdbLoaderBase::OpaqueObjLoader::HandleBlob(string_view blob) { diff --git a/src/server/stream_family.cc b/src/server/stream_family.cc index faa1966abb5c..293093e0ec65 100644 --- a/src/server/stream_family.cc +++ b/src/server/stream_family.cc @@ -27,6 +27,18 @@ namespace dfly { using namespace facade; using namespace std; +StreamMemTracker::StreamMemTracker() { + start_size_ = zmalloc_used_memory_tl; +} + +void StreamMemTracker::UpdateStreamSize(PrimeValue& pv) const { + const size_t current = zmalloc_used_memory_tl; + int64_t diff = static_cast(current) - static_cast(start_size_); + pv.AddStreamSize(diff); + // Under any flow we must not end up with this special value. + DCHECK(pv.MallocUsed() != 0); +} + namespace { struct Record { @@ -612,24 +624,6 @@ int StreamTrim(const AddTrimOpts& opts, stream* s) { return 0; } -class StreamMemTracker { - public: - StreamMemTracker() { - start_size_ = zmalloc_used_memory_tl; - } - - void UpdateStreamSize(PrimeValue& pv) const { - const size_t current = zmalloc_used_memory_tl; - int64_t diff = static_cast(current) - static_cast(start_size_); - pv.AddStreamSize(diff); - // Under any flow we must not end up with this special value. - DCHECK(pv.MallocUsed() != 0); - } - - private: - size_t start_size_{0}; -}; - OpResult OpAdd(const OpArgs& op_args, const AddTrimOpts& opts, CmdArgList args) { DCHECK(!args.empty() && args.size() % 2 == 0); auto& db_slice = op_args.GetDbSlice(); diff --git a/src/server/stream_family.h b/src/server/stream_family.h index 4f964ff914c0..715fd36571ed 100644 --- a/src/server/stream_family.h +++ b/src/server/stream_family.h @@ -15,6 +15,19 @@ namespace dfly { class CommandRegistry; struct CommandContext; +class CompactObj; +using PrimeValue = CompactObj; + +class StreamMemTracker { + public: + StreamMemTracker(); + + void UpdateStreamSize(PrimeValue& pv) const; + + private: + size_t start_size_{0}; +}; + class StreamFamily { public: static void Register(CommandRegistry* registry); diff --git a/tests/dragonfly/memory_test.py b/tests/dragonfly/memory_test.py index 37faba1af856..292275a2002c 100644 --- a/tests/dragonfly/memory_test.py +++ b/tests/dragonfly/memory_test.py @@ -6,6 +6,7 @@ from .instance import DflyInstance, DflyInstanceFactory +@pytest.mark.slow @pytest.mark.opt_only @pytest.mark.parametrize( "type, keys, val_size, elements", @@ -23,7 +24,10 @@ # memory it might force the gh runner to run out of memory (since OOM killer might not even # get a chance to run). @dfly_args({"proactor_threads": 4, "maxmemory": "5gb"}) -async def test_rss_used_mem_gap(df_server: DflyInstance, type, keys, val_size, elements): +async def test_rss_used_mem_gap(df_factory, type, keys, val_size, elements): + dbfilename = f"dump_{tmp_file_name()}" + instance = df_factory.create(dbfilename=dbfilename) + instance.start() # Create a Dragonfly and fill it up with `type` until it reaches `min_rss`, then make sure that # the gap between used_memory and rss is no more than `max_unaccounted_ratio`. min_rss = 3 * 1024 * 1024 * 1024 # 3gb @@ -35,7 +39,7 @@ async def test_rss_used_mem_gap(df_server: DflyInstance, type, keys, val_size, e if type == "STREAM": max_unaccounted = max_unaccounted * 3 - client = df_server.client() + client = instance.client() await asyncio.sleep(1) # Wait for another RSS heartbeat update in Dragonfly cmd = f"DEBUG POPULATE {keys} k {val_size} RAND TYPE {type} ELEMENTS {elements}" @@ -44,19 +48,27 @@ async def test_rss_used_mem_gap(df_server: DflyInstance, type, keys, val_size, e await asyncio.sleep(2) # Wait for another RSS heartbeat update in Dragonfly - info = await client.info("memory") - logging.info(f'Used memory {info["used_memory"]}, rss {info["used_memory_rss"]}') - assert info["used_memory"] > min_rss, "Weak testcase: too little used memory" - delta = info["used_memory_rss"] - info["used_memory"] - # It could be the case that the machine is configured to use swap if this assertion fails - assert delta > 0 - assert delta < max_unaccounted - - if type != "STRING" and type != "JSON": - # STRINGs keep some of the data inline, so not all of it is accounted in object_used_memory - # We have a very small over-accounting bug in JSON - assert info["object_used_memory"] > keys * elements * val_size - assert info["used_memory"] > info["object_used_memory"] + async def check_memory(): + info = await client.info("memory") + logging.info(f'Used memory {info["used_memory"]}, rss {info["used_memory_rss"]}') + assert info["used_memory"] > min_rss, "Weak testcase: too little used memory" + delta = info["used_memory_rss"] - info["used_memory"] + # It could be the case that the machine is configured to use swap if this assertion fails + assert delta > 0 + assert delta < max_unaccounted + + if type != "STRING" and type != "JSON": + # STRINGs keep some of the data inline, so not all of it is accounted in object_used_memory + # We have a very small over-accounting bug in JSON + assert info["object_used_memory"] > keys * elements * val_size + assert info["used_memory"] > info["object_used_memory"] + + await check_memory() + + await client.execute_command("SAVE", "DF") + await client.execute_command("DFLY", "LOAD", f"{dbfilename}-summary.dfs") + + await check_memory() @pytest.mark.asyncio