Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
KuilongCui committed Aug 29, 2024
1 parent 8dd876f commit 8f34424
Show file tree
Hide file tree
Showing 9 changed files with 34 additions and 39 deletions.
4 changes: 1 addition & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ install: cupy
lint: check_pylint_installed
@pylint --rcfile=.pylintrc -s n ./llumnix

@pylint -s n --disable=all \
--enable=trailing-whitespace,unused-variable,wrong-import-order,missing-final-newline,line-too-long,\
unused-import,singleton-comparison,unnecessary-comprehension ./tests
@pylint -s n --disable=unused-argument,redefined-outer-name,super-init-not-called,protected-access,protected-access ./tests

.PHONY: test
test:
Expand Down
4 changes: 1 addition & 3 deletions tests/backends/vllm/test_llm_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest

from unittest.mock import MagicMock

from vllm.sequence import (Logprob, SequenceGroupOutput, SequenceOutput,
Expand Down Expand Up @@ -88,4 +86,4 @@ def test_llm_engine_from_engine_args():

latency_data = LatencyMemData({},{},{})
llm_engine = MockEngine.from_engine_args(engine_args, instance_id="0", migration_config=None, latency_mem=latency_data)
assert llm_engine.executor_class == SimGPUExecutor
assert llm_engine.executor_class == SimGPUExecutor
3 changes: 1 addition & 2 deletions tests/backends/vllm/test_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

import pytest
import torch
import time
import ray
from ray.util.queue import Queue as RayQueue
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy
Expand Down Expand Up @@ -141,4 +140,4 @@ def test_clear_migration_states():
_, seq_group = create_dummy_prompt("0",7,block_size)
llumlet.backend_engine.add_migrating_out_request_last_stage(seq_group)
llumlet.clear_migration_states(is_migrate_in=False)
assert llumlet.backend_engine.get_last_running_request() is not None
assert llumlet.backend_engine.get_last_running_request() is not None
3 changes: 1 addition & 2 deletions tests/backends/vllm/test_migration_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import ray

from vllm.engine.arg_utils import EngineArgs
from vllm.utils import get_distributed_init_method, get_ip, get_open_port

from llumnix.backends.vllm.worker import MigrationWorker
from llumnix.arg_utils import EngineManagerArgs
Expand Down Expand Up @@ -94,7 +93,7 @@ def test_migrate_cache(backend):

dst_blocks = list(range(num_gpu_blocks))
random.shuffle(dst_blocks)
src_to_dst = {idx: block_num for idx, block_num in enumerate(dst_blocks)}
src_to_dst = dict(enumerate(dst_blocks))
ray.get(worker1.execute_method.remote(
'migrate_cache',
src_worker_handle_list=[worker0],
Expand Down
44 changes: 22 additions & 22 deletions tests/backends/vllm/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,19 +77,19 @@ def test_scheduler_policy():

# all seq_group in waiting queue
migrating_request = scheduler.get_last_running_request()
assert migrating_request == None
assert migrating_request is None
migrating_request = scheduler.get_shortest_running_request()
assert migrating_request == None
assert migrating_request is None
migrating_request = scheduler.get_longest_running_request()
assert migrating_request == None
assert migrating_request is None
# all seq_group in prefilling stage
seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
_, out = schedule_and_update_computed_tokens(scheduler)
migrating_request = scheduler.get_last_running_request()
assert migrating_request == None
assert migrating_request is None
migrating_request = scheduler.get_shortest_running_request()
assert migrating_request == None
assert migrating_request is None
migrating_request = scheduler.get_longest_running_request()
assert migrating_request == None
assert migrating_request is None
append_new_token(out, 1)
schedule_and_update_computed_tokens(scheduler)
# all in running queue
Expand All @@ -109,11 +109,11 @@ def test_scheduler_num_killed_request():
_, seq_group = create_dummy_prompt(str(idx), prompt_length=8, block_size=block_size)
scheduler.add_seq_group(seq_group)
# remain 0 blocks
seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
_, out = schedule_and_update_computed_tokens(scheduler)
append_new_token(out, 1)
assert scheduler._get_num_killed_requests() == 0
# preempt 2 requests
seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
_, out = schedule_and_update_computed_tokens(scheduler)
assert scheduler._get_num_killed_requests() == 2

def test_scheduler_running_request():
Expand All @@ -123,7 +123,7 @@ def test_scheduler_running_request():
for idx in range(1, num_seq_group + 1):
_, seq_group = create_dummy_prompt(str(idx), prompt_length=idx, block_size=block_size)
scheduler.add_seq_group(seq_group)
seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
schedule_and_update_computed_tokens(scheduler)
assert scheduler.get_num_unfinished_seq_groups() == 4
scheduler.remove_running_request("1")
assert scheduler.get_num_unfinished_seq_groups() == 3
Expand Down Expand Up @@ -162,46 +162,46 @@ def test_scheduler_should_abort_migration():
_, seq_group_1 = create_dummy_prompt("1", prompt_length=17, block_size=block_size)
scheduler.add_seq_group(seq_group_1)
# remain 0 blocks
seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
_, out = schedule_and_update_computed_tokens(scheduler)
append_new_token(out, 1)

assert scheduler._get_num_killed_requests() == 0
# assert scheduler.block_manager.get_num_free_gpu_blocks() == 0
# all in running queue
seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
_, out = schedule_and_update_computed_tokens(scheduler)
append_new_token(out, 1)
assert scheduler._get_num_killed_requests() == 0
migrating_request = scheduler.get_last_running_request()
last_stage_time = time.time()
assert migrating_request.request_id == "1"
# preempt request 1
seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
_, out = schedule_and_update_computed_tokens(scheduler)
append_new_token(out, 1)
assert scheduler.should_abort_migration(seq_group_1, last_stage_time) == True
assert scheduler.should_abort_migration(seq_group_0, last_stage_time) == False
assert scheduler.should_abort_migration(seq_group_1, last_stage_time)
assert not scheduler.should_abort_migration(seq_group_0, last_stage_time)
assert scheduler._get_num_killed_requests() == 1
scheduler.remove_running_request(seq_group_0)
scheduler.free_src_request(seq_group_0)
# free request 0, requset 1 prefill
seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
_, out = schedule_and_update_computed_tokens(scheduler)
append_new_token(out, 1)
assert scheduler._get_num_killed_requests() == 0
assert scheduler.should_abort_migration(seq_group_1, last_stage_time) == True
assert scheduler.should_abort_migration(seq_group_1, last_stage_time)

def test_free_dst_pre_alloc_cache():
scheduler = initialize_scheduler()
blocks = scheduler.pre_alloc("1", 2)
blocks = scheduler.pre_alloc("1", 4)
scheduler.pre_alloc("1", 2)
scheduler.pre_alloc("1", 4)
assert len(scheduler.pre_alloc_cache_dict["1"]) == 6
scheduler.free_dst_pre_alloc_cache("1")
assert scheduler.pre_alloc_cache_dict.get("1",None) == None
assert scheduler.pre_alloc_cache_dict.get("1", None) is None
assert scheduler.block_manager.get_num_free_gpu_blocks() == 8

def test_get_request_incremental_blocks():
scheduler = initialize_scheduler()
block_size = 4
_, seq_group = create_dummy_prompt("0", prompt_length=16, block_size=block_size)
scheduler.add_seq_group(seq_group)
seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
schedule_and_update_computed_tokens(scheduler)
incremental_blocks = scheduler.get_request_incremental_blocks(seq_group, 2)
assert len(incremental_blocks) == 2
assert len(incremental_blocks) == 2
2 changes: 1 addition & 1 deletion tests/entrypoints/vllm/api_server_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ async def generate(self, request_id, server_info, *args, **kwargs):

async def abort(self, request_id):
self._num_aborts += 1

def testing_stats(self):
return {"num_aborted_requests": self._num_aborts}

Expand Down
2 changes: 1 addition & 1 deletion tests/entrypoints/vllm/test_api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def test_api_server(api_server, interface: str):
_query_server = _query_server_generate
elif interface == 'generate_benchmark':
_query_server = _query_server_generate_benchmark

with Pool(32) as pool:
# Wait until the server is ready
prompts = ["warm up"] * 1
Expand Down
6 changes: 3 additions & 3 deletions tests/global_scheduler/test_llm_engine_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import time

import ray
import pytest
import numpy as np
import time

from vllm.utils import random_uuid

Expand Down Expand Up @@ -174,8 +175,7 @@ def test_generate_and_abort(engine_manager, llumlet):
assert num_requests == 0

def test_get_request_instance():
instance_ids, llumlets = init_llumlets(2)
instance_id, instance_id_1 = instance_ids[0], instance_ids[1]
_, llumlets = init_llumlets(2)
llumlet, llumlet_1 = llumlets[0], llumlets[1]
request_id = random_uuid()
request_id_1 = random_uuid()
Expand Down
5 changes: 3 additions & 2 deletions tests/llumlet/test_migration_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import ray
from unittest.mock import MagicMock, patch

import ray

from llumnix.llumlet.migration_coordinator import MigrationCoordinator
from llumnix.llumlet.migrating_request import MigratingRequest
from llumnix.backends.backend_interface import BackendInterface
Expand Down Expand Up @@ -107,4 +108,4 @@ def test_migrate_out_multistage(migrate_out_onestage):
assert coordinator.migrate_out_onestage.call_count == max_stages + 1
assert status == MigrationStatus.FINISHED_ABORTED

ray.shutdown()
ray.shutdown()

0 comments on commit 8f34424

Please sign in to comment.