From bc8702bf134588a1bad129d65a936c8c9c1889dc Mon Sep 17 00:00:00 2001 From: s5u13b Date: Wed, 23 Oct 2024 04:31:32 +0000 Subject: [PATCH] Minors --- llumnix/backends/vllm/scheduler.py | 4 ---- llumnix/llumlet/migration_coordinator.py | 2 +- tests/e2e_test/test_migration.py | 20 +++++++++----------- 3 files changed, 10 insertions(+), 16 deletions(-) diff --git a/llumnix/backends/vllm/scheduler.py b/llumnix/backends/vllm/scheduler.py index 5024671c..3b7f7ed1 100644 --- a/llumnix/backends/vllm/scheduler.py +++ b/llumnix/backends/vllm/scheduler.py @@ -136,10 +136,6 @@ def pre_alloc(self, or block_num * self.cache_config.block_size > self.prompt_limit: return [] blocks = self.block_manager.get_free_blocks(block_num) - # Once dst instance cannot pre alloc, free the pre alloc cache proactively. - if len(blocks) < block_num: - self.free_dst_pre_alloc_cache(request_id) - return [] pre_blocks = self.pre_alloc_cache_dict.get(request_id, []) pre_blocks.extend(blocks) self.pre_alloc_cache_dict[request_id] = pre_blocks diff --git a/llumnix/llumlet/migration_coordinator.py b/llumnix/llumlet/migration_coordinator.py index 9b6b5fb5..dd766b50 100644 --- a/llumnix/llumlet/migration_coordinator.py +++ b/llumnix/llumlet/migration_coordinator.py @@ -115,8 +115,8 @@ async def _migrate_out_onestage(self, if not found: return MigrationStatus.FINISHED_SRC_ABORTED self.backend_engine.add_migrating_out_request_last_stage(migrate_out_request) - stage_block_num = len(incremental_blocks) src_blocks = incremental_blocks[:] + stage_block_num = len(incremental_blocks) dst_blocks = await migrate_in_ray_actor.execute_migration_method \ .remote("migrate_in_pre_alloc", migrate_out_request.request_id, migrate_out_request.status, diff --git a/tests/e2e_test/test_migration.py b/tests/e2e_test/test_migration.py index dee0076b..917d316e 100644 --- a/tests/e2e_test/test_migration.py +++ b/tests/e2e_test/test_migration.py @@ -66,7 +66,7 @@ def parse_manager_log_file(log_file): @pytest.mark.skipif(torch.cuda.device_count() < 2, reason="at least 2 gpus required for migration bench") @pytest.mark.parametrize("model", ['/mnt/model/Qwen-7B']) @pytest.mark.parametrize("migration_backend", ['rpc', 'gloo', 'nccl']) -@pytest.mark.parametrize("migrated_request_status", ['waiting', 'running']) +@pytest.mark.parametrize("migrated_request_status", ['running', 'waiting']) async def test_migration_benchmark(model, migration_backend, migrated_request_status): if migrated_request_status == 'waiting' and migration_backend != 'rpc': pytest.skip("When the migrated request status is waiting, only test the rpc migration backend.") @@ -104,16 +104,14 @@ async def run_bench_command(command): parse_manager_log_file("manager_instance.csv") average_speed = parse_instance_log_file(instance_output_logs) - - sorted_keys = sorted(average_speed.keys(), key=lambda x: float(x.split()[0])) - - data = [ - ['migration_size'] + sorted_keys, - [f'{migration_backend}_speed(GB/s)'] + [f"{average_speed[key]:.2f}" for key in sorted_keys] - ] - - with open("performance.txt", "a", encoding="utf-8") as f: - f.write(to_markdown_table(data)) + if migrated_request_status == 'running': + sorted_keys = sorted(average_speed.keys(), key=lambda x: float(x.split()[0])) + data = [ + ['migration_size'] + sorted_keys, + [f'{migration_backend}_speed(GB/s)'] + [f"{average_speed[key]:.2f}" for key in sorted_keys] + ] + with open("performance.txt", "a", encoding="utf-8") as f: + f.write(to_markdown_table(data)) shutdown_llumnix_service() clear_ray_state()