Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Misc] Check manager and engine arguments in entrypoints #19

Merged
merged 9 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion llumnix/arg_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class EngineManagerArgs:
last_stage_max_blocks: int = 16
max_stages: int = 3

def create_engine_manager_configs(
def create_global_scheduler_configs(
self,
) -> Tuple[GlobalSchedulerConfig]:
global_scheduler_config = GlobalSchedulerConfig(self.initial_instances,
Expand Down Expand Up @@ -89,8 +89,16 @@ def from_cli_args(cls, args: argparse.Namespace) -> 'EngineManagerArgs':
attrs = [attr.name for attr in dataclasses.fields(cls)]
# Set the attributes from the parsed arguments.
engine_manager_args = cls(**{attr: getattr(args, attr) for attr in attrs})
cls._check_args(engine_manager_args)
return engine_manager_args

@classmethod
def _check_args(cls, args):
assert args.migration_backend != 'gloo' or (args.migration_backend == 'gloo' \
and not args.disable_init_instance_by_manager and not args.disable_fixed_node_init_instance), \
("When using gloo as migration backend, "
"do not set --disable-init-instance-by-manager and --disable-fixed-node-init-instance.")

@staticmethod
def add_cli_args(
parser: argparse.ArgumentParser) -> argparse.ArgumentParser:
Expand Down
2 changes: 1 addition & 1 deletion llumnix/backends/profiling.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ def get_latency_mem(backend_type: BackendType, profiling_database: ProfilingData
assert sim_parallel_config in profiling_result.para_dict.keys(), "sim parallel config not in database"
latency_mem: LatencyMemData = profiling_result.para_dict[sim_parallel_config]
return latency_mem
raise ValueError(f'unimplemented backend {backend_type}')
raise ValueError(f'Unsupported backend: {backend_type}')

if __name__ == "__main__":
import argparse
Expand Down
2 changes: 1 addition & 1 deletion llumnix/backends/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def init_backend_engine(instance_id: str, backend_type: BackendType, *args, **kw
from llumnix.backends.vllm.simulator import BackendSimVLLM
backend_engine = BackendSimVLLM(instance_id, *args, **kwargs)
else:
raise ValueError(f'unimplemented backend {backend_type}')
raise ValueError(f'Unsupported backend: {backend_type}')
return backend_engine

def initialize_placement_group(
Expand Down
2 changes: 1 addition & 1 deletion llumnix/backends/vllm/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def _init_workers_ray(self, placement_group: "PlacementGroup",
if placement_group:
bundle = placement_group.bundle_specs[rank+1]
if not bundle.get("GPU", 0):
raise Exception("gpu resource cannot be 0")
raise Exception("GPU resource cannot be 0.")
scheduling_strategy = PlacementGroupSchedulingStrategy(
placement_group=placement_group,
placement_group_capture_child_tasks=True,
Expand Down
2 changes: 1 addition & 1 deletion llumnix/backends/vllm/llm_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def from_engine_args(
executor_class = LlumnixRayGPUExecutor
executor_class.migration_config = migration_config
else:
raise ValueError('unimplemented executor backend')
raise ValueError('Unsupported executor backend')
# Hack to pass node_id to _init_workers_ray function.
executor_class.node_id = node_id
# Create the LLM engine.
Expand Down
18 changes: 16 additions & 2 deletions llumnix/backends/vllm/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,18 @@
import torch

from vllm.config import ModelConfig, ParallelConfig
from vllm.engine.arg_utils import EngineArgs
from vllm.engine.arg_utils import AsyncEngineArgs, EngineArgs
from vllm.model_executor.sampling_metadata import SamplingMetadata
from vllm.sampling_params import SamplingType
from vllm.model_executor.layers.sampler import SampleResultType, _multinomial, _greedy_sample, _random_sample,\
_modify_greedy_probs_inplace, _beam_search_sample

from llumnix.logger import init_logger
from llumnix.arg_utils import EngineManagerArgs

logger = init_logger(__name__)


def detect_unsupported_feature(engine_args: EngineArgs) -> None:
unsupported_feature = None
if engine_args.enable_lora:
Expand All @@ -37,7 +39,19 @@ def detect_unsupported_feature(engine_args: EngineArgs) -> None:
elif engine_args.use_v2_block_manager or engine_args.speculative_model:
unsupported_feature = "speculative decoding"
if unsupported_feature:
raise ValueError(f'vllm feature "{unsupported_feature}" is currently unsupported by llumnix.')
raise ValueError(f'Unsupported feature: Llumnix does not support "{unsupported_feature}" currently.')

def check_engine_args(engine_args: AsyncEngineArgs, engine_manager_args: EngineManagerArgs) -> None:
assert engine_args.engine_use_ray and engine_args.worker_use_ray, \
("In Llumnix, engine and worker must be ray actor.")
migration_config = engine_manager_args.create_migration_config()
engine_config = engine_args.create_engine_config()
parallel_config = engine_config.parallel_config
if parallel_config.world_size > 1 and migration_config.migration_backend == 'nccl':
# TODO(s5u13b): fix logger
print("Llumnix does not support TP or PP enabled model when the migration backend is nccl, change migration backend to gloo.")
engine_manager_args.migration_backend = 'gloo'
detect_unsupported_feature(engine_args)

def _get_dtype_size(dtype: torch.dtype) -> int:
return torch.tensor([], dtype=dtype).element_size()
Expand Down
9 changes: 2 additions & 7 deletions llumnix/backends/vllm/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,7 @@ def get_global_rank(self):
return self.global_rank

def reserve_memory_for_migration(self, migration_config: MigrationConfig, model_config: ModelConfig,
cache_config: CacheConfig, parallel_config: ParallelConfig) -> int:
# TODO(s5u13b): move this to arguments checker
if parallel_config.world_size > 1 and migration_config.migration_backend == 'nccl':
logger.warning("nccl backend is not supported for PP or TP enabled model, use gloo instead.")
migration_config.migration_backend = 'gloo'

cache_config: CacheConfig, parallel_config: ParallelConfig) -> int:
migrate_cache_blocks_size = migration_config.migration_cache_blocks
migrate_num_layers = migration_config.migration_num_layers
dummy_cache_size = migrate_num_layers * migrate_cache_blocks_size * CacheEngine.get_cache_block_size(
Expand All @@ -66,7 +61,7 @@ def reserve_memory_for_migration(self, migration_config: MigrationConfig, model_
cache_config.gpu_memory_utilization -= migrate_ratio

if cache_config.gpu_memory_utilization <= 0:
raise ValueError("nccl migration backend take {:.4f} gpu memory, which is greater than gpu_memory_utilization {:.4f}. "
raise ValueError("Nccl migration backend take {:.4f} gpu memory, which is greater than gpu_memory_utilization {:.4f}. "
"try to increase gpu-memory-utilization or reduce migration-cache-blocks."
.format(migrate_ratio, cache_config.gpu_memory_utilization))

Expand Down
6 changes: 0 additions & 6 deletions llumnix/entrypoints/llumnix_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,14 +183,8 @@ def init_request_output_queue() -> RayQueue:
def init_llumnix_components(engine_manager_args: EngineManagerArgs,
engine_args,
node_id: str) -> Tuple[LLMEngineManager, List[Llumlet], RayQueue]:
assert engine_args.engine_use_ray and engine_args.worker_use_ray, \
("In Llumnix, engine and worker must be ray actor in orther to run step and migrate concurrently.")
engine_manager = init_manager(engine_manager_args)
# TODO(s5u13b): Add arguments checker for Llumnix.
if engine_manager_args.disable_init_instance_by_manager:
assert engine_manager_args.migration_backend != 'gloo', \
("Llumlet should be initialized by manager when using gloo as migration backend for auto-scaling, "
"please do not set --disable-init-instance-by-manager argument.")
instance_ids, llumlets = init_llumlets(engine_manager_args, engine_args, node_id)
else:
instance_ids, llumlets = retry_manager_method_sync(engine_manager.init_llumlets.remote, 'init_llumlets', engine_args, node_id)
Expand Down
7 changes: 4 additions & 3 deletions llumnix/entrypoints/vllm/api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
is_gpu_available, init_llumnix_components)
from llumnix.logger import init_logger
from llumnix.utils import random_uuid

from llumnix.backends.vllm.utils import check_engine_args

logger = init_logger(__name__)
engine_manager = None
Expand Down Expand Up @@ -73,7 +73,7 @@ async def lifespan(fastapi_app: FastAPI):

async def manager_generate(prompt, sampling_params, request_id) -> AsyncStream:
if sampling_params.n > 1 or sampling_params.use_beam_search:
raise ValueError("unsupport multiple sequence decoding")
raise ValueError("Unsupported feature: multiple sequence decoding")
results_generator = AsyncStream(request_id)
request_streams[request_id] = results_generator
# This request's outputs will be put to the request_output_queue of this api server no matter which instance it's running in.
Expand All @@ -89,7 +89,6 @@ async def manager_generate(prompt, sampling_params, request_id) -> AsyncStream:
if manager_available:
manager_available = False
return results_generator

try:
if instance_num_requests:
instance_id = min(instance_num_requests, key=instance_num_requests.get)
Expand Down Expand Up @@ -243,6 +242,8 @@ async def is_ready():
engine_manager_args = EngineManagerArgs.from_cli_args(args)
engine_args = AsyncEngineArgs.from_cli_args(args)

check_engine_args(engine_args, engine_manager_args)

print("engine_args: {}".format(engine_args))

if args.launch_ray_cluster:
Expand Down
3 changes: 0 additions & 3 deletions llumnix/instance_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ class InstanceLoadCalculator:
def __init__(self,
load_metric: str,
enable_defrag: bool) -> None:
assert load_metric in ['remaining_steps', 'usage_ratio']
self.load_metric = load_metric
self.enable_defrag = enable_defrag
self.load_computation_strategies: Dict[str, LoadComputationStrategy] = {
Expand Down Expand Up @@ -122,7 +121,6 @@ def compute_instance_load(self, i: InstanceLoadInfo) -> float:

class MigrationLoadComputation(LoadComputationStrategy):
def compute_instance_load(self, i: InstanceLoadInfo) -> float:
assert self.load_metric in ['usage_ratio', 'remaining_steps']
instance_load = -np.inf
if self.load_metric == 'usage_ratio':
instance_load = (i.num_used_gpu_blocks + i.num_blocks_first_waiting_request) / i.num_total_gpu_blocks
Expand All @@ -144,7 +142,6 @@ def compute_instance_load(self, i: InstanceLoadInfo) -> float:

class DispatchAndScalingLoadComputation(LoadComputationStrategy):
def compute_instance_load(self, i: InstanceLoadInfo) -> float:
assert self.load_metric in ['usage_ratio', 'remaining_steps']
instance_load = -np.inf
if self.load_metric == 'usage_ratio':
instance_load = (i.num_used_gpu_blocks + i.num_blocks_all_waiting_requests) / i.num_total_gpu_blocks
Expand Down
2 changes: 1 addition & 1 deletion llumnix/llm_engine_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ async def _check_instance_error(self, migrate_instance_pairs: Tuple[str, str]) -
def from_args(cls,
engine_manager_args: EngineManagerArgs,
profiling_database: ProfilingDatabase=None) -> "LLMEngineManager":
global_scheduler_config = engine_manager_args.create_engine_manager_configs()
global_scheduler_config = engine_manager_args.create_global_scheduler_configs()
# Init manager actor in 'llumnix' namespace to ensure that only one manager can be created.
manager_class = ray.remote(num_cpus=0,
max_restarts=-1,
Expand Down
2 changes: 0 additions & 2 deletions llumnix/llumlet/llumlet.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ def from_args(cls,
if backend_type == backend_type.VLLM:
if disable_fixed_node_init_instance:
# TODO(s5u13b): Support placement_group lifetime management when the migration backend is gloo.
assert migration_config.migration_backend != 'gloo', 'When the migration backend is gloo, \
do not set --disable-fixed-node-init-instance.'
placement_group = initialize_placement_group(world_size, detached=detached)
kwargs["placement_group"] = placement_group
engine_class = ray.remote(num_cpus=1,
Expand Down
Empty file.
Loading