Skip to content

Commit

Permalink
Fix migration test
Browse files Browse the repository at this point in the history
  • Loading branch information
s5u13b committed Oct 21, 2024
1 parent 2c9842d commit 607ac04
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 16 deletions.
4 changes: 2 additions & 2 deletions llumnix/backends/vllm/llm_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,10 +352,10 @@ def commit_dst_request(self, backend_request: SequenceGroupLlumnix) -> None:
logger.info("add seq {} to block table".format(seq.seq_id))
pre_alloc_blocks = self.engine.scheduler.pre_alloc_cache_dict.pop(backend_request.request_id)
self.engine.scheduler.block_manager.add_block_table(pre_alloc_blocks, seq.seq_id)
backend_request.reset_migration_args()
backend_request.reset_migration_args_dst()
if backend_request.status == RequestStatus.RUNNING:
self.add_running_request(backend_request)
else: # backend_request.status == RequestStatus.WAITING
else: # RequestStatus.WAITING
backend_request.waiting_migrating = True
self.add_waiting_request(backend_request)

Expand Down
9 changes: 5 additions & 4 deletions llumnix/llumlet/llumlet.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,10 @@ async def _migrate_out_one_request(self, migrate_out_request: LlumnixRequest, ds
self.backend_engine.free_src_request(migrate_out_request)
migrated_request.append(migrate_out_request.request_id)
self.backend_engine.remove_migrating_out_request_last_stage(migrate_out_request)
elif status == MigrationStatus.FINISHED_SRC_ABORTED:
migrate_out_request.reset_migration_args()
await migrate_in_ray_actor.execute_migration_method.remote("free_dst_pre_alloc_cache", migrate_out_request.request_id)
else: # FINISHED_SRC_ABORTED or FINISHED_DST_ABORTED
migrate_out_request.reset_migration_args_src()
if status == MigrationStatus.FINISHED_SRC_ABORTED:
await migrate_in_ray_actor.execute_migration_method.remote("free_dst_pre_alloc_cache", migrate_out_request.request_id)
t1 = time.time()
logger.info("{}->{} migrate done, migrate request {}, migration status: {}, len: {} blocks, cost: {} ms" \
.format(self.instance_id, dst_instance_id, migrated_request, status, \
Expand Down Expand Up @@ -218,7 +219,7 @@ def clear_migration_states(self, is_migrate_in: bool) -> None:
logger.info("clear_migration_states: add request {} back to engine".format(backend_request.request_id))
if backend_request.status == RequestStatus.RUNNING:
self.backend_engine.add_running_request(backend_request)
else: # backend_request.status == RequestStatus.WAITING
else: # RequestStatus.WAITING
self.backend_engine.add_waiting_request(backend_request)

def execute_migration_method(self, method, *args, **kwargs):
Expand Down
1 change: 1 addition & 0 deletions llumnix/llumlet/migration_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ async def _migrate_out_onestage(self,
self.backend_engine.add_running_request(migrate_out_request)
self.backend_engine.remove_migrating_out_request_last_stage(migrate_out_request)
return MigrationStatus.FINISHED_DST_ABORTED

# do stage send/recv
migrate_out_request.stage_timestamps.append(time.time())
migrate_out_request.stage_num_blocks_list.append(stage_block_num)
Expand Down
13 changes: 10 additions & 3 deletions llumnix/llumlet/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,19 @@ def __init__(self, request_id: int, server_info: ServerInfo, expected_steps: int
# end-of-migration
self.eom = False

def reset_migration_args(self):
def reset_migration_args_dst(self):
# By default, there is no limit on the number of steps expected for the request.
self.expected_steps = math.inf

self.last_preemption_time = None
self.stage_timestamps = []
self.stage_num_blocks_list = []
self.try_schedule_times = 0

def reset_migration_args_src(self):
self.last_preemption_time = None
self.stage_timestamps = []
self.stage_num_blocks_list = []
# By default, there is no limit on the number of steps expected for the request.
self.expected_steps = math.inf

@property
def inference_type(self) -> RequestInferenceType:
Expand Down
14 changes: 7 additions & 7 deletions tests/e2e_test/test_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,18 @@ def parse_instance_log_file(log_files):
speed = float(speed_match.group(1))
speed_dict[total_kv_cache_size].append(speed)

averger_speed = {}
average_speed = {}
for transfer_size, speeds in speed_dict.items():
if len(speeds) <= 2:
continue

speeds.sort()
trimmed_speeds = speeds[1:-1]
averger_speed[transfer_size] = sum(trimmed_speeds) / len(trimmed_speeds)
average_speed[transfer_size] = sum(trimmed_speeds) / len(trimmed_speeds)

assert len(averger_speed) > 0, "Migration should have occurred, but it was not detected. "
assert len(average_speed) > 0, "Migration should have occurred, but it was not detected. "

return averger_speed
return average_speed

def parse_manager_log_file(log_file):
df = pd.read_csv(log_file)
Expand Down Expand Up @@ -95,13 +95,13 @@ async def run_bench_command(command):

parse_manager_log_file("manager_instance.csv")

averger_speed = parse_instance_log_file(instance_output_logs)
average_speed = parse_instance_log_file(instance_output_logs)

sorted_keys = sorted(averger_speed.keys(), key=lambda x: float(x.split()[0]))
sorted_keys = sorted(average_speed.keys(), key=lambda x: float(x.split()[0]))

data = [
['migration_size'] + sorted_keys,
[f'{migration_backend}_speed(GB/s)'] + [f"{averger_speed[key]:.2f}" for key in sorted_keys]
[f'{migration_backend}_speed(GB/s)'] + [f"{average_speed[key]:.2f}" for key in sorted_keys]
]

with open("performance.txt", "a", encoding="utf-8") as f:
Expand Down

0 comments on commit 607ac04

Please sign in to comment.