Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Xinyi-ECNU committed Sep 9, 2024
1 parent e933988 commit f50fa8d
Show file tree
Hide file tree
Showing 10 changed files with 42 additions and 260 deletions.
26 changes: 13 additions & 13 deletions llumnix/arg_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from llumnix.internal_config import GlobalSchedulerConfig, MigrationConfig
from llumnix.config import LlumnixConfig, get_llumnix_config
from llumnix.config.default import _C
from llumnix.common.config import get_cfg
from llumnix.logger import init_logger

logger = init_logger(__name__)
Expand All @@ -36,6 +35,7 @@ class EngineManagerArgs:
polling_interval: float = None

dispatch_policy: str = None
num_dispatch_instances: int = None

enable_migration: bool = None
enable_defrag: bool = None
Expand Down Expand Up @@ -65,33 +65,29 @@ class EngineManagerArgs:
last_stage_max_blocks: int = None
max_stages: int = None

enable_pd_disagg: bool = False

def __post_init__(self):
for attr in dataclasses.fields(self):
if getattr(self, attr.name) is None:
setattr(self, attr.name, getattr(_C.MANAGER, attr.name.upper()))

config_file: str = None
def create_global_scheduler_configs(
self,
) -> Tuple[GlobalSchedulerConfig]:

# Provide default configuration.
config_data = get_cfg()
if self.config_file:
config_data.merge_from_file(self.config_file)

# Create the GlobalScheduler Configuration.
global_scheduler_config = GlobalSchedulerConfig(self.initial_instances,
self.load_metric,
self.dispatch_policy,
self.num_dispatch_instances,
self.pair_migration_policy,
self.migrate_out_threshold,
self.enable_defrag,
self.scaling_policy,
self.scale_up_threshold,
self.scale_down_threshold,
config_data.PDD_CONFIG.ENABLE_PREFILL_DISAGGREATION,
config_data.PDD_CONFIG.PREFILL_INSTANCE_NUM)
self.enable_pd_disagg)
return global_scheduler_config

def create_migration_config(self) -> MigrationConfig:
Expand Down Expand Up @@ -168,6 +164,10 @@ def add_cli_args(
default=None,
choices=['balanced', 'load', 'queue', 'flood'],
help='request dispatch policy')
parser.add_argument('--num-available-dispatch-instances',
type=int,
default=None,
help='number of available instances for dispatching')

parser.add_argument('--enable-migration',
action='store_true',
Expand Down Expand Up @@ -273,8 +273,8 @@ def add_cli_args(
type=int,
default=None,
help='drop migration if the number of stages > max_stages')
parser.add_argument("--config-file",
type=str,
default=EngineManagerArgs.config_file,
help="path to the configuration file")
parser.add_argument('--enable-pd-disagg',
type=bool,
default=None,
help='enable prefill decoding disaggregation')
return parser
211 changes: 0 additions & 211 deletions llumnix/common/config.py

This file was deleted.

15 changes: 0 additions & 15 deletions llumnix/common/defaults.py

This file was deleted.

8 changes: 8 additions & 0 deletions llumnix/config/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@
_C.MANAGER.LOAD_METRIC = 'remaining_steps'
# Request dispatch policy with choices: 'balanced', 'load', 'queue', 'flood'
_C.MANAGER.DISPATCH_POLICY = 'load'
# Number of available dispatch instances. -1 indicates that all instances can be used for dispatching
_C.MANAGER.NUM_DISPATCH_INSTANCES = -1

# -----------------------------------------------------------------------------
# MIGRATION CONFIGURATION
Expand Down Expand Up @@ -120,3 +122,9 @@
_C.MANAGER.SCALE_UP_THRESHOLD = 10
# Scale down threshold
_C.MANAGER.SCALE_DOWN_THRESHOLD = 60

# -----------------------------------------------------------------------------
# PREFILL DECODING DISAGGREGATION CONFIGURATION
# -----------------------------------------------------------------------------
# Enable prefill decoding disaggregation
_C.MANAGER.ENABLE_PD_DISAGG = False
6 changes: 3 additions & 3 deletions llumnix/entrypoints/vllm/api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,12 +251,12 @@ async def is_ready():

logger.info("engine_args: {}".format(engine_args))

if cfg.RAY.LAUNCH_CLUSTER:
if cfg.RAY.LAUNCH_RAY_CLUSTER:
# Launch the ray cluster for multi-node serving.
launch_ray_cluster(cfg.RAY.CLUSTER_PORT)
launch_ray_cluster(cfg.RAY.RAY_CLUSTER_PORT)

# Connect to a ray cluster.
connect_to_ray_cluster(port=cfg.RAY.CLUSTER_PORT)
connect_to_ray_cluster(port=cfg.RAY.RAY_CLUSTER_PORT)

# if gpu is not available, it means that this node is head pod without any llumnix components
if is_gpu_available():
Expand Down
8 changes: 4 additions & 4 deletions llumnix/global_scheduler/dispatch_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ class DispatchScheduler:
def __init__(self,
dispatch_policy: str,
instance_load_calculator: InstanceLoadCalculator,
num_available_dispatch_instances: int) -> None:
num_dispatch_instances: int) -> None:
self.dispatch_policy = DispatchPolicyFactory.get_policy(dispatch_policy)
self.instance_load_calculator = instance_load_calculator
self.num_instances = 0
self.instance_id_set: Set[str] = set()
self.available_dispatch_instance_set: Set[str] = set()
self.num_available_dispatch_instances = num_available_dispatch_instances
self.num_dispatch_instances = num_dispatch_instances
# instance info args
self.instance_info: Dict[str, InstanceInfo] = {}
self.sorted_instance_infos: List[InstanceInfo] = None
Expand Down Expand Up @@ -59,8 +59,8 @@ def update_instance_infos(self,
def add_instance(self, instance_id: str) -> None:
self.instance_id_set.add(instance_id)
self.num_instances = len(self.instance_id_set)
if self.num_available_dispatch_instances == -1 or (self.num_available_dispatch_instances > 0 and
len(self.available_dispatch_instance_set) < self.num_available_dispatch_instances):
if self.num_dispatch_instances == -1 or (self.num_dispatch_instances > 0 and
len(self.available_dispatch_instance_set) < self.num_dispatch_instances):
self.available_dispatch_instance_set.add(instance_id)
self.instance_num_requests[instance_id] = 0

Expand Down
4 changes: 2 additions & 2 deletions llumnix/global_scheduler/global_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ def __init__(self,
self.dispatch_policy = global_scheduler_config.dispatch_policy
self.dispatch_scheduler = DispatchScheduler(global_scheduler_config.dispatch_policy,
self.instance_load_calculator,
global_scheduler_config.num_available_dispatch_instances)
global_scheduler_config.num_dispatch_instances)
# migrate args
self.migration_scheduler = MigrationScheduler(global_scheduler_config.pair_migration_policy,
global_scheduler_config.migrate_out_load_threshold,
self.instance_load_calculator,
global_scheduler_config.num_available_dispatch_instances)
global_scheduler_config.num_dispatch_instances)
# auto-scaling args
self.scaling_scheduler = ScalingScheduler(global_scheduler_config.scale_up_threshold,
global_scheduler_config.scale_down_threshold,
Expand Down
Loading

0 comments on commit f50fa8d

Please sign in to comment.