diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index b3f82758e544..effb1dbe41d4 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -280,7 +280,6 @@ bool RestoreStreamer::ShouldWrite(cluster::SlotId slot_id) const { void RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) { if (it.GetVersion() < snapshot_version_) { - FiberAtomicGuard fg; it.SetVersion(snapshot_version_); string key_buffer; // we can reuse it for (; !it.is_done(); ++it) { @@ -318,7 +317,10 @@ void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req void RestoreStreamer::WriteEntry(string_view key, const PrimeValue& pk, const PrimeValue& pv, uint64_t expire_ms) { - CmdSerializer serializer([&](std::string s) { Write(s); }); + CmdSerializer serializer([&](std::string s) { + Write(s); + ThrottleIfNeeded(); + }); serializer.SerializeEntry(key, pk, pv, expire_ms); } diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index d3454d8ab229..e5653da93028 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -1825,11 +1825,29 @@ async def node1size0(): assert str(i) == await nodes[1].client.get(f"{{key50}}:{i}") +@pytest.mark.parametrize( + "huge_values_threshold, seed_during_migration", + [ + pytest.param(10, True), + pytest.param(1_000, True), + pytest.param(1_000_000, True), + pytest.param(10, False), + pytest.param(1_000, False), + pytest.param(1_000_000, False), + ], +) @dfly_args({"proactor_threads": 2, "cluster_mode": "yes"}) @pytest.mark.asyncio -async def test_cluster_migration_huge_container(df_factory: DflyInstanceFactory): +async def test_cluster_migration_huge_container( + df_factory: DflyInstanceFactory, huge_values_threshold: int, seed_during_migration: bool +): instances = [ - df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2) + df_factory.create( + port=BASE_PORT + i, + admin_port=BASE_PORT + i + 1000, + serialization_max_chunk_size=huge_values_threshold, + ) + for i in range(2) ] df_factory.start_all(instances) @@ -1840,16 +1858,45 @@ async def test_cluster_migration_huge_container(df_factory: DflyInstanceFactory) await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) logging.debug("Generating huge containers") - seeder = StaticSeeder( - key_target=10, - data_size=10_000_000, - collection_size=10_000, - variance=1, - samples=1, - types=["LIST", "HASH", "SET", "ZSET", "STRING"], - ) - await seeder.run(nodes[0].client) - source_data = await StaticSeeder.capture(nodes[0].client) + # Insert data to containers with a gaussian distribution: some will be small and other big + stop = False + + async def insert_data(client): + nonlocal stop + for i in itertools.count(start=1): + if stop: + return + key = int(random.gauss(mu=0, sigma=100)) + val = "#" * i # Create large entries for RSS to grow + await client.rpush(f"l:{key}", val) + await client.sadd(f"s:{key}", val) + await client.hset(f"h:{key}", val, val) + await client.zadd(f"z:{key}", {val: i}) + logging.debug("Stopped feeding data") + + insert_task = asyncio.create_task(insert_data(instances[0].cluster_client())) + + async def get_rss(client, field): + info = await client.info("memory") + return info[field] + + rss = 0 + while True: + rss = await get_rss(nodes[0].client, "used_memory_rss") + logging.debug(f"Current rss: {rss}") + if rss > 1_000_000_000: + break + await asyncio.sleep(1) + + async def stop_seed(): + nonlocal stop + stop = True + logging.debug("Waiting for task") + await insert_task + logging.debug("Done waiting for task") + + if not seed_during_migration: + await stop_seed() nodes[0].migrations = [ MigrationInfo("127.0.0.1", instances[1].admin_port, [(0, 16383)], nodes[1].id) @@ -1860,8 +1907,24 @@ async def test_cluster_migration_huge_container(df_factory: DflyInstanceFactory) logging.debug("Waiting for migration to finish") await wait_for_status(nodes[0].admin_client, nodes[1].id, "FINISHED") - target_data = await StaticSeeder.capture(nodes[1].client) - assert source_data == target_data + if seed_during_migration: + await stop_seed() + else: + # Only verify memory growth if we haven't pushed new data during migration + new_rss = await get_rss(nodes[0].client, "used_memory_peak_rss") + logging.debug(f"new rss {new_rss}, previous rss {rss}") + assert new_rss < rss * 1.1 + + for i in range(-500, 500): + l = await nodes[1].client.lrange(f"l:{i}", 0, -1) + s = await nodes[1].client.smembers(f"s:{i}") + h = await nodes[1].client.hkeys(f"h:{i}") + z = await nodes[1].client.zrange(f"z:{i}", 0, -1) + assert set(l) == s + assert set(h) == s + assert set(z) == s + + await instances[0].cluster_client().close() def parse_lag(replication_info: str):