Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Misc] Check manager and engine arguments in entrypoints #19

Merged
merged 9 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions llumnix/arg_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,16 @@ def from_cli_args(cls, args: argparse.Namespace) -> 'EngineManagerArgs':
attrs = [attr.name for attr in dataclasses.fields(cls)]
# Set the attributes from the parsed arguments.
engine_manager_args = cls(**{attr: getattr(args, attr) for attr in attrs})
cls._check_args(engine_manager_args)
return engine_manager_args

@classmethod
def _check_args(cls, args):
assert args.migration_backend == 'gloo' \
and not args.disable_init_instance_by_manager and not args.disable_fixed_node_init_instance, \
("When using gloo as migration backend, "
"do not set --disable-init-instance-by-manager and --disable-fixed-node-init-instance.")

@staticmethod
def add_cli_args(
parser: argparse.ArgumentParser) -> argparse.ArgumentParser:
Expand Down
2 changes: 1 addition & 1 deletion llumnix/backends/profiling.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ def get_latency_mem(backend_type: BackendType, profiling_database: ProfilingData
assert sim_parallel_config in profiling_result.para_dict.keys(), "sim parallel config not in database"
latency_mem: LatencyMemData = profiling_result.para_dict[sim_parallel_config]
return latency_mem
raise ValueError(f'unimplemented backend {backend_type}')
raise ValueError(f'Unsupported backend: {backend_type}')

if __name__ == "__main__":
import argparse
Expand Down
2 changes: 1 addition & 1 deletion llumnix/backends/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def init_backend_engine(instance_id: str, backend_type: BackendType, *args, **kw
from llumnix.backends.vllm.simulator import BackendSimVLLM
backend_engine = BackendSimVLLM(instance_id, *args, **kwargs)
else:
raise ValueError(f'unimplemented backend {backend_type}')
raise ValueError(f'Unsupported backend: {backend_type}')
return backend_engine

def initialize_placement_group(
Expand Down
2 changes: 1 addition & 1 deletion llumnix/backends/vllm/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def _init_workers_ray(self, placement_group: "PlacementGroup",
if placement_group:
bundle = placement_group.bundle_specs[rank+1]
if not bundle.get("GPU", 0):
raise Exception("gpu resource cannot be 0")
raise Exception("GPU resource cannot be 0.")
scheduling_strategy = PlacementGroupSchedulingStrategy(
placement_group=placement_group,
placement_group_capture_child_tasks=True,
Expand Down
5 changes: 1 addition & 4 deletions llumnix/backends/vllm/llm_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def from_engine_args(
executor_class = LlumnixRayGPUExecutor
executor_class.migration_config = migration_config
else:
raise ValueError('unimplemented executor backend')
raise ValueError('Unsupported executor backend')
# Hack to pass node_id to _init_workers_ray function.
executor_class.node_id = node_id
# Create the LLM engine.
Expand Down Expand Up @@ -251,9 +251,6 @@ def add_request(self,
server_info: ServerInfo,
*args,
**kwargs) -> None:
# When manager is unavailable, api server might dispatch the request that has already been dispatched.
if request_id in self.engine.request_server_info:
return
# Store the server information of each request to put the request outputs back to the corresponding api server correctly.
self.engine.request_server_info[request_id] = server_info
self.engine.add_request(request_id, *args, **kwargs)
Expand Down
10 changes: 8 additions & 2 deletions llumnix/backends/vllm/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import torch

from vllm.config import ModelConfig, ParallelConfig
from vllm.engine.arg_utils import EngineArgs
from vllm.engine.arg_utils import AsyncEngineArgs, EngineArgs
from vllm.model_executor.sampling_metadata import SamplingMetadata
from vllm.sampling_params import SamplingType
from vllm.model_executor.layers.sampler import SampleResultType, _multinomial, _greedy_sample, _random_sample,\
Expand All @@ -26,6 +26,7 @@

logger = init_logger(__name__)


def detect_unsupported_feature(engine_args: EngineArgs) -> None:
unsupported_feature = None
if engine_args.enable_lora:
Expand All @@ -37,7 +38,12 @@ def detect_unsupported_feature(engine_args: EngineArgs) -> None:
elif engine_args.use_v2_block_manager or engine_args.speculative_model:
unsupported_feature = "speculative decoding"
if unsupported_feature:
raise ValueError(f'vllm feature "{unsupported_feature}" is currently unsupported by llumnix.')
raise ValueError(f'Unsupported feature: Llumnix does not support "{unsupported_feature}" currently.')

def check_engine_args(engine_args: AsyncEngineArgs) -> None:
assert engine_args.engine_use_ray and engine_args.worker_use_ray, \
("In Llumnix, engine and worker must be ray actor.")
detect_unsupported_feature(engine_args)

def _get_dtype_size(dtype: torch.dtype) -> int:
return torch.tensor([], dtype=dtype).element_size()
Expand Down
2 changes: 1 addition & 1 deletion llumnix/backends/vllm/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def reserve_memory_for_migration(self, migration_config: MigrationConfig, model_
cache_config.gpu_memory_utilization -= migrate_ratio

if cache_config.gpu_memory_utilization <= 0:
raise ValueError("nccl migration backend take {:.4f} gpu memory, which is greater than gpu_memory_utilization {:.4f}. "
raise ValueError("Nccl migration backend take {:.4f} gpu memory, which is greater than gpu_memory_utilization {:.4f}. "
"try to increase gpu-memory-utilization or reduce migration-cache-blocks."
.format(migrate_ratio, cache_config.gpu_memory_utilization))

Expand Down
6 changes: 0 additions & 6 deletions llumnix/entrypoints/llumnix_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,14 +183,8 @@ def init_request_output_queue() -> RayQueue:
def init_llumnix_components(engine_manager_args: EngineManagerArgs,
engine_args,
node_id: str) -> Tuple[LLMEngineManager, List[Llumlet], RayQueue]:
assert engine_args.engine_use_ray and engine_args.worker_use_ray, \
("In Llumnix, engine and worker must be ray actor in orther to run step and migrate concurrently.")
engine_manager = init_manager(engine_manager_args)
# TODO(s5u13b): Add arguments checker for Llumnix.
if engine_manager_args.disable_init_instance_by_manager:
assert engine_manager_args.migration_backend != 'gloo', \
("Llumlet should be initialized by manager when using gloo as migration backend for auto-scaling, "
"please do not set --disable-init-instance-by-manager argument.")
instance_ids, llumlets = init_llumlets(engine_manager_args, engine_args, node_id)
else:
instance_ids, llumlets = retry_manager_method_sync(engine_manager.init_llumlets.remote, 'init_llumlets', engine_args, node_id)
Expand Down
14 changes: 12 additions & 2 deletions llumnix/entrypoints/vllm/api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
is_gpu_available, init_llumnix_components)
from llumnix.logger import init_logger
from llumnix.utils import random_uuid

from llumnix.backends.vllm.utils import check_engine_args

logger = init_logger(__name__)
engine_manager = None
Expand All @@ -46,6 +46,7 @@
log_requests = None
num_finished_requests = 0
WAIT_MANAGER_INTERVAL = 5
manager_first_dead = True


async def _background_process_outputs():
Expand All @@ -72,16 +73,23 @@ async def lifespan(fastapi_app: FastAPI):

async def manager_generate(prompt, sampling_params, request_id) -> AsyncStream:
if sampling_params.n > 1 or sampling_params.use_beam_search:
raise ValueError("unsupport multiple sequence decoding")
raise ValueError("Unsupported feature: multiple sequence decoding")
results_generator = AsyncStream(request_id)
request_streams[request_id] = results_generator
# This request's outputs will be put to the request_output_queue of this api server no matter which instance it's running in.
server_info = ServerInfo(server_id, request_output_queue)
# If manager is unavailable, request will be directly added to the llumlet held by api server.
global manager_first_dead
try:
# await to catch exception
await engine_manager.generate.remote(request_id, server_info, prompt, sampling_params)
if not manager_first_dead:
manager_first_dead = True
except ray.exceptions.RayActorError:
# Do not re-generate the request to avoid duplicate requests.
if manager_first_dead:
manager_first_dead = False
return results_generator
try:
if instance_num_requests:
instance_id = min(instance_num_requests, key=instance_num_requests.get)
Expand Down Expand Up @@ -235,6 +243,8 @@ async def is_ready():
engine_manager_args = EngineManagerArgs.from_cli_args(args)
engine_args = AsyncEngineArgs.from_cli_args(args)

check_engine_args(engine_args)

print("engine_args: {}".format(engine_args))

if args.launch_ray_cluster:
Expand Down
3 changes: 0 additions & 3 deletions llumnix/instance_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ class InstanceLoadCalculator:
def __init__(self,
load_metric: str,
enable_defrag: bool) -> None:
assert load_metric in ['remaining_steps', 'usage_ratio']
self.load_metric = load_metric
self.enable_defrag = enable_defrag
self.load_computation_strategies: Dict[str, LoadComputationStrategy] = {
Expand Down Expand Up @@ -122,7 +121,6 @@ def compute_instance_load(self, i: InstanceLoadInfo) -> float:

class MigrationLoadComputation(LoadComputationStrategy):
def compute_instance_load(self, i: InstanceLoadInfo) -> float:
assert self.load_metric in ['usage_ratio', 'remaining_steps']
instance_load = -np.inf
if self.load_metric == 'usage_ratio':
instance_load = (i.num_used_gpu_blocks + i.num_blocks_first_waiting_request) / i.num_total_gpu_blocks
Expand All @@ -144,7 +142,6 @@ def compute_instance_load(self, i: InstanceLoadInfo) -> float:

class DispatchAndScalingLoadComputation(LoadComputationStrategy):
def compute_instance_load(self, i: InstanceLoadInfo) -> float:
assert self.load_metric in ['usage_ratio', 'remaining_steps']
instance_load = -np.inf
if self.load_metric == 'usage_ratio':
instance_load = (i.num_used_gpu_blocks + i.num_blocks_all_waiting_requests) / i.num_total_gpu_blocks
Expand Down
2 changes: 0 additions & 2 deletions llumnix/llm_engine_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,6 @@ async def generate(
except (ray.exceptions.RayActorError, KeyError):
logger.info("[generate] instance {} is dead, regenerate request {}".format(instance_id, request_id))
self.scale_down(instance_id)
if self.num_instances != 0:
asyncio.create_task(self.generate(request_id, server_info, *args, **kwargs))

async def abort(self, request_id: Union[str, Iterable[str]]) -> None:
if isinstance(request_id, str):
Expand Down
2 changes: 0 additions & 2 deletions llumnix/llumlet/llumlet.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ def from_args(cls,
if backend_type == backend_type.VLLM:
if disable_fixed_node_init_instance:
# TODO(s5u13b): Support placement_group lifetime management when the migration backend is gloo.
assert migration_config.migration_backend != 'gloo', 'When the migration backend is gloo, \
do not set --disable-fixed-node-init-instance.'
placement_group = initialize_placement_group(world_size, detached=detached)
kwargs["placement_group"] = placement_group
engine_class = ray.remote(num_cpus=1,
Expand Down
Empty file.
Loading