diff --git a/benchmark/benchmark_serving.py b/benchmark/benchmark_serving.py index c915e1f5..d85f1ee3 100644 --- a/benchmark/benchmark_serving.py +++ b/benchmark/benchmark_serving.py @@ -240,8 +240,8 @@ def calculate_cdf(latencies): print(f"{hist=}") print(f"{cumsum=}") -def plot_latency_cdf(req_latencies, prefill_latencies, decode_latencies, results_filename): - fig_filename = os.path.splitext(results_filename)[0] + "_latency.png" +def plot_latency_cdf(req_latencies, prefill_latencies, decode_latencies, log_filename): + fig_filename = os.path.splitext(log_filename)[0] + "_latency.png" fig, (ax_req, ax_prefill, ax_decode) = plt.subplots(1, 3, figsize=(3*7, 4.8)) def plot_single(ax, latencies, is_prefill=False): @@ -286,8 +286,8 @@ def plot_single(ax, latencies, is_prefill=False): plt.suptitle(fig_filename_title, fontsize=6) fig.savefig(fig_filename) -def plot_len_cdf(prompt_lens, response_lens, total_tokens, results_filename): - fig_filename = os.path.splitext(results_filename)[0] + "_len.png" +def plot_len_cdf(prompt_lens, response_lens, total_tokens, log_filename): + fig_filename = os.path.splitext(log_filename)[0] + "_len.png" fig, (ax_prompt, ax_response, ax_total) = plt.subplots(1, 3, figsize=(3*7, 4.8)) def plot_single(ax, lens, x_label_str, title_str): @@ -328,8 +328,8 @@ def plot_single(ax, lens, x_label_str, title_str): plt.suptitle(fig_filename_title, fontsize=6) fig.savefig(fig_filename) -def plot_instance(results_filename_0): - current_dir = os.path.dirname(os.path.abspath(results_filename_0)) +def plot_instance(log_filename_0): + current_dir = os.path.dirname(os.path.abspath(log_filename_0)) log_files = glob.glob(os.path.join(current_dir, '*.log_instance.csv')) log_files.sort(key=os.path.getmtime, reverse=True) df_0 = pd.read_csv(log_files[0]).sort_values(by=["timestamp"]) @@ -347,7 +347,7 @@ def plot_instance(results_filename_0): fig, ax = plt.subplots() ax.plot(timestamp_list_0, instance_num_list_0, color="red", label=f"instance_num(avg {avg_instance_num} /s)") ax.legend(loc='upper left') - fig_filename = os.path.splitext(results_filename_0)[0] + "_instance.png" + fig_filename = os.path.splitext(log_filename_0)[0] + "_instance.png" index1 = fig_filename.rfind('/') index2 = fig_filename.rfind('/', 0, index1) fig_filename_title = fig_filename[index2 + 1:] @@ -355,7 +355,7 @@ def plot_instance(results_filename_0): fig.savefig(fig_filename) return avg_instance_num -def save_all_latencies_npy(all_token_latencies:List[np.ndarray], results_filename): +def save_all_latencies_npy(all_token_latencies:List[np.ndarray], log_filename): dtype = [('timestamp',float),('latency',float)] all_lat_pairs = [] for arr in all_token_latencies: @@ -364,7 +364,7 @@ def save_all_latencies_npy(all_token_latencies:List[np.ndarray], results_filenam all_lat_pairs.append((pair[0],pair[1])) all_lat_pairs = np.array(all_lat_pairs,dtype=dtype) all_lat_pairs = np.sort(all_lat_pairs,order='timestamp') - np.save(os.path.splitext(results_filename)[0], all_lat_pairs) + np.save(os.path.splitext(log_filename)[0], all_lat_pairs) class MeasureLatency: def __init__(self): @@ -423,7 +423,7 @@ async def benchmark( prompts: List[str], allow_variable_generation_length: bool, verbose: bool, - results_filename: str, + log_filename: str, ip_ports: List[int], distribution: str, qps: float, @@ -475,9 +475,9 @@ async def benchmark( m._latencies, m._per_token_latencies, m._inference_latencies, m._request_ids, m._decode_latencies, m._request_lens, log_latencies, fail_on_response_failure) calculate_cdf(m._latencies) - plot_latency_cdf(m._latencies, m._prefill_token_latencies, m._decode_token_latencies, results_filename) - save_all_latencies_npy(m._all_latencies, results_filename) - # avg_instance_num = plot_instance(results_filename) + plot_latency_cdf(m._latencies, m._prefill_token_latencies, m._decode_token_latencies, log_filename) + save_all_latencies_npy(m._all_latencies, log_filename) + # avg_instance_num = plot_instance(log_filename) avg_instance_num = 0.0 return throughput, m._prefill_token_latencies, m._decode_token_latencies, m._inference_latencies, avg_instance_num, m._latencies, m._request_ids, m._decode_latencies, m._request_lens, m._all_decode_latencies @@ -655,7 +655,7 @@ def main(): parser.add_argument('-v', '--verbose', action='store_true') parser.add_argument('--backend', type=GenerationBackend, choices=[e.name for e in GenerationBackend], default='vLLM') - parser.add_argument('--results_filename', type=str, default='benchmark.log') + parser.add_argument('--log_filename', type=str, default='benchmark.log') parser.add_argument('--ip_ports', nargs='+', required=True, help='List of ip:port') parser.add_argument('--random_prompt_lens_mean', type=int) parser.add_argument('--random_prompt_lens_range', type=int) @@ -692,7 +692,7 @@ def main(): # parser.add_argument('--calculate_begin_ratio', type=float, default=0.5) # parser.add_argument('--calculate_end_ratio', type=float, default=0.8) - parser.add_argument('--enable_migrate', type=int ,default=0) + parser.add_argument('--enable_migration', type=int ,default=0) parser.add_argument('--priority_ratio', type=float ,default=0.0) args = parser.parse_args() @@ -757,7 +757,7 @@ def main(): print('total tokens', sorted(list(total_tokens))) - plot_len_cdf(prompt_lens, response_lens, total_tokens, args.results_filename) + plot_len_cdf(prompt_lens, response_lens, total_tokens, args.log_filename) prompts = list(zip(prompts, prompt_lens, response_lens)) @@ -767,7 +767,7 @@ def main(): prompts, args.allow_variable_generation_length, args.verbose, - args.results_filename, + args.log_filename, args.ip_ports, args.distribution, args.qps, @@ -775,11 +775,11 @@ def main(): args.log_latencies, args.fail_on_response_failure, )) - file_name = os.path.splitext(args.results_filename)[0] + "_latency_info.json" + file_name = os.path.splitext(args.log_filename)[0] + "_latency_info.json" results = [] import datetime current_time = datetime.datetime.now().strftime("%Y-%m-%d_%H:%M:%S") - file_name = os.path.splitext(args.results_filename)[0] + "_latency_info.json" + file_name = os.path.splitext(args.log_filename)[0] + "_latency_info.json" try: with open(file_name, 'r') as f: results = json.load(f) diff --git a/docs/Arguments.md b/docs/Arguments.md index a9e0318d..8071e572 100644 --- a/docs/Arguments.md +++ b/docs/Arguments.md @@ -9,26 +9,26 @@ usage: -m llumnix.entrypoints.vllm.api_server [-h] [--fixed-node-init] [--initial-instances INITIAL_INSTANCES] [--load-metric {consumed_speed,used_ratio}] + [--polling-interval POLLING_INTERVAL] [--dispatch-policy {balanced,load,queue}] - [--enable-migrate] - [--check-migrate-frequency CHECK_MIGRATE_FREQUENCY] - [--check-migrate-policy {balanced,prefill_constrained,prefill_relaxed}] + [--enable-migration] + [--pair-migration-frequency PAIR_MIGRATION_FREQUENCY] + [--pair-migration-policy {balanced,prefill_constrained,prefill_relaxed}] [--migrate-out-threshold MIGRATE_OUT_THRESHOLD] - [--migrate-policy {LCFS,SJF,LJF}] - [--enable-prefill-migrate ENABLE_PREFILL_MIGRATE] + [--request-migration-policy {LCFS,SJF,LJF}] + [--enable-defrag ENABLE_DEFRAG] [--enable-scaling] [--min-instances MIN_INSTANCES] [--max-instances MAX_INSTANCES] [--scaling-interval SCALING_INTERVAL] - [--scale-policy {max_load,avg_load}] + [--scaling-policy {max_load,avg_load}] [--scale-up-threshold SCALE_UP_THRESHOLD] [--scale-down-threshold SCALE_DOWN_THRESHOLD] [--disable-log-requests-manager] - [--record-instance-info] - [--results-filename RESULTS_FILENAME] - [--gpu-type GPU_TYPE] + [--log-instance-info] + [--log-filename LOG_FILENAME] [--profiling-result-file-path PROFILING_RESULT_FILE_PATH] - [--polling-interval POLLING_INTERVAL] + [--gpu-type GPU_TYPE] [--migration-backend {gloo,rpc}] [--migration-cache_blocks MIGRATION_CACHE_BLOCKS] [--last-stage-max-blocks LAST_STAGE_MAX_BLOCKS] @@ -36,43 +36,47 @@ usage: -m llumnix.entrypoints.vllm.api_server [-h] ``` `--fixed-node-init` -- Place llumlet and workers on the current node. +- Fix the placement of instance to current node. `--initial-instances` -- Number of model instances. +- Number of model instances created at initialization. - Default: 1 `--load-metric` -- Load metric. +- Instance load metric. - Possible choices: consumed_speed, used_ratio - Default: "consumed_speed" +`--polling-interval` +- Time interval(s) to update instance info and pair migration. +- Default: 0.1 + `--dispatch-policy` -- Dispatch policy. +- Request dispatch policy. - Possible choices: balanced, load, queue - Default: "load" -`--enable-migrate` -- Enable migrate request between instances. +`--enable-migration` +- Enable migrate requests between instances. -`--check-migrate-frequency` -- Check migrate frequency. +`--pair-migration-frequency` +- Pair migration frequency. - Default: 1 -`--check-migrate-policy` -- Check migrate policy. +`--pair-migration-policy` +- Pair migration policy. `--migrate-out-threshold` -- Migrate out load threshold. +- Migrate out instance load threshold. - Default: 3.0 -`--migrate-policy` -- Migrate policy. +`--request-migration-policy` +- Request migration policy. - Possible choices: LCFS, SJF, LJF -- Default: "LCFS" +- Default: "SJF" -`--enable-prefill-migrate` -- Enable prefill migrate. +`--enable-defrag` +- Enable defragmentation. - Default: False `--enable-scaling` @@ -90,49 +94,44 @@ usage: -m llumnix.entrypoints.vllm.api_server [-h] - Interval time of check scaling. - Default: 10 -`--scale-policy` -- Scale policy. +`--scaling-policy` +- Scaling policy. - Possible choices: max_load, avg_load - default: "max_load" `--scale-up-threshold` -- Scaling up threshold. +- Scale up threshold. - Default: 4 `--scale-down-threshold` -- Scaling down threshold. +- Scale down threshold. - Default: 100 `--disable-log-requests-manager` - Disable logging requests in manager. -- Default: False - -`--record-instance-info` -- Enable recording instance-info data to a csv file. -- Default: False -`--results-filename` -- Results filename. +`--log-instance-info` +- Enable logging instance info. -`--gpu-type` -- GPU type specified when using simulator. -- Default: "a10" +`--log-filename` +- Log filename. +- Default: "server.log" `--profiling-result-file-path` - Profiling result file path. - Default: "" -`--polling-interval` -- Time interval(s) to update instance info/migration. -- Default: 0.1 +`--gpu-type` +- GPU type specified when using simulator. +- Default: "a10" `--migration-backend` -- Communication backend during migration. +- Communication backend of migration. - Possible choices: gloo, rpc - Default: "rpc" `--migration-cache-blocks` -- Cache blocks num during migration. +- Number of cache blocks in migration. - Default: 512 `--last-stage-max-blocks` diff --git a/llumnix/arg_utils.py b/llumnix/arg_utils.py index f1602bd7..ec94684d 100644 --- a/llumnix/arg_utils.py +++ b/llumnix/arg_utils.py @@ -22,34 +22,36 @@ @dataclass class EngineManagerArgs: launch_ray_cluster: bool = True + initial_instances: int = 1 fixed_node_init: bool = False - initial_instances: int = 1 load_metric: str = 'consumed_speed' polling_interval: float = 0.05 dispatch_policy: str = 'load' - enable_migrate: bool = False - check_migrate_frequency: int = 1 - check_migrate_policy: str = 'prefill_constrained' + enable_migration: bool = True + enable_defrag: bool = True + pair_migration_frequency: int = 1 + pair_migration_policy: str = 'prefill_constrained' migrate_out_threshold: float = 3.0 - migrate_policy: str = 'LCFS' - enable_prefill_migrate: bool = True + request_migration_policy: str = 'SJF' enable_scaling: bool = False min_instances: int = 1 max_instances: int = 1 scaling_interval: int = 10 - scale_policy: str = 'avg_load' + scaling_policy: str = 'avg_load' scale_up_threshold: float = 10 scale_down_threshold: float = 60 + log_filename: str = "server.log" disable_log_requests_manager: bool = False - results_filename: str = "server.log" - record_instance_info: bool = False + log_instance_info: bool = True profiling_result_file_path: str = "" + gpu_type: str = "a10" + migration_backend: str = "rpc" migration_cache_blocks: int = 512 last_stage_max_blocks: int = 4 @@ -61,10 +63,10 @@ def create_engine_manager_configs( global_scheduler_config = GlobalSchedulerConfig(self.initial_instances, self.load_metric, self.dispatch_policy, - self.check_migrate_policy, + self.pair_migration_policy, self.migrate_out_threshold, - self.enable_prefill_migrate, - self.scale_policy, + self.enable_defrag, + self.scaling_policy, self.scale_up_threshold, self.scale_down_threshold) return global_scheduler_config @@ -72,7 +74,7 @@ def create_engine_manager_configs( def create_migration_configs( self, ) -> MigrationConfig: - migration_config = MigrationConfig(self.migrate_policy, + migration_config = MigrationConfig(self.request_migration_policy, self.migration_backend, self.migration_cache_blocks, self.last_stage_max_blocks, @@ -92,112 +94,112 @@ def add_cli_args( parser: argparse.ArgumentParser) -> argparse.ArgumentParser: parser.add_argument('--fixed-node-init', action='store_true', - help='place llumlet and workers in current node.') - + help='fix the placement of instance to current node') parser.add_argument('--initial-instances', type=int, default=EngineManagerArgs.initial_instances, - help='number of model instance') + help='number of model instances created at initialzation') + parser.add_argument('--load-metric', type=str, default=EngineManagerArgs.load_metric, choices=['consumed_speed', 'used_ratio'], - help='load metric') + help='instance load metric') + parser.add_argument('--polling-interval', + type=float, + default=EngineManagerArgs.polling_interval, + help='time interval(s) to update instance info and pair migration') parser.add_argument('--dispatch-policy', type=str, default=EngineManagerArgs.dispatch_policy, choices=['balanced', 'load', 'queue'], - help='dispatch policy') + help='request dispatch policy') - parser.add_argument('--enable-migrate', + parser.add_argument('--enable-migration', action='store_true', - help='enable migrate request between instance') - parser.add_argument('--check-migrate-frequency', + help='enable migrate requests between instances') + parser.add_argument('--pair-migration-frequency', type=int, - default=EngineManagerArgs.check_migrate_frequency, - help='check migrate frequency') - parser.add_argument('--check-migrate-policy', + default=EngineManagerArgs.pair_migration_frequency, + help='pair migration frequency') + parser.add_argument('--pair-migration-policy', type=str, - default=EngineManagerArgs.check_migrate_policy, + default=EngineManagerArgs.pair_migration_policy, choices=['balanced', 'prefill_constrained', 'prefill_relaxed'], - help='check migrate policy') + help='pair migration policy') parser.add_argument('--migrate-out-threshold', type=float, default=EngineManagerArgs.migrate_out_threshold, - help='migrate out load threshold') - parser.add_argument('--migrate-policy', + help='migrate out instance load threshold') + parser.add_argument('--request-migration-policy', type=str, - default=EngineManagerArgs.migrate_policy, + default=EngineManagerArgs.request_migration_policy, choices=['LCFS', 'SJF', 'LJF'], - help='migrate policy') - parser.add_argument('--enable-prefill-migrate', + help='request migration policy') + parser.add_argument('--enable-defrag', type=bool, - default=EngineManagerArgs.enable_prefill_migrate, - help='enable prefill migrate') + default=EngineManagerArgs.enable_defrag, + help='enable defragmentation') parser.add_argument('--enable-scaling', action='store_true', - help='enable auto scaline') + help='enable auto scaling') parser.add_argument('--min-instances', type=int, default=EngineManagerArgs.min_instances, - help='min instances num') + help='minimum number of instances') parser.add_argument('--max-instances', type=int, default=EngineManagerArgs.max_instances, - help='max instances num') + help='maximum number of instances') parser.add_argument('--scaling-interval', type=int, default=EngineManagerArgs.scaling_interval, help='interval time of check scaling') - parser.add_argument('--scale-policy', + parser.add_argument('--scaling-policy', type=str, - default=EngineManagerArgs.scale_policy, + default=EngineManagerArgs.scaling_policy, choices=['max_load', 'avg_load'], - help='scale policy') + help='scaling policy') parser.add_argument('--scale-up-threshold', type=float, default=EngineManagerArgs.scale_up_threshold, - help='scaling up threshold') + help='scale up threshold') parser.add_argument('--scale-down-threshold', type=float, default=EngineManagerArgs.scale_down_threshold, - help='scaling down threshold') + help='scale down threshold') parser.add_argument('--disable-log-requests-manager', action='store_true', - default=EngineManagerArgs.disable_log_requests_manager, help='disable logging requests in manager') - parser.add_argument('--record-instance-info', + parser.add_argument('--log-instance-info', action='store_true', - help='if record instance info') - parser.add_argument('--results-filename', + help='enable logging instance info') + parser.add_argument('--log-filename', type=str, - default=EngineManagerArgs.results_filename, - help='results filename') + default=EngineManagerArgs.log_filename, + help='log filename') parser.add_argument('--profiling-result-file-path', type=str, default=EngineManagerArgs.profiling_result_file_path, help='profiling result file path') + parser.add_argument('--gpu-type', type=str, default=EngineManagerArgs.gpu_type, help='gpu type specified when using simulator') - parser.add_argument('--polling-interval', - type=float, - default=EngineManagerArgs.polling_interval, - help='time interval(s) to update instance info/migration') parser.add_argument('--migration-backend', type=str, default=EngineManagerArgs.migration_backend, choices=['gloo','rpc'], - help='communication backend during migration') + help='communication backend of migration') parser.add_argument('--migration-cache_blocks', type=int, default=EngineManagerArgs.migration_cache_blocks, - help='cache blocks num during migration') + help='number of cache blocks in migration') parser.add_argument('--last-stage-max-blocks', type=int, default=EngineManagerArgs.last_stage_max_blocks, diff --git a/llumnix/backends/vllm/llm_engine.py b/llumnix/backends/vllm/llm_engine.py index a1970670..1534a025 100644 --- a/llumnix/backends/vllm/llm_engine.py +++ b/llumnix/backends/vllm/llm_engine.py @@ -114,7 +114,7 @@ def _process_model_outputs( def step(self) -> None: output_list = super().step() - instance_info: InstanceInfo = self.scheduler.get_record_instance_info() + instance_info: InstanceInfo = self.scheduler.get_instance_info() if self.scaling_down: instance_info.num_running_request = 1 diff --git a/llumnix/backends/vllm/scheduler.py b/llumnix/backends/vllm/scheduler.py index 36a216c1..7b591b35 100644 --- a/llumnix/backends/vllm/scheduler.py +++ b/llumnix/backends/vllm/scheduler.py @@ -186,7 +186,7 @@ def free_src_request(self, backend_request: SequenceGroup) -> None: self.free_seq(seq) @scheduler_lock - def get_record_instance_info(self) -> InstanceInfo: + def get_instance_info(self) -> InstanceInfo: num_total_gpu_block = self.cache_config.num_gpu_blocks num_free_gpu_block = self.block_manager.get_num_free_gpu_blocks() num_used_gpu_block = num_total_gpu_block - num_free_gpu_block diff --git a/llumnix/config.py b/llumnix/config.py index d70d9c9c..9a241a1d 100644 --- a/llumnix/config.py +++ b/llumnix/config.py @@ -14,12 +14,12 @@ class MigrationConfig: def __init__( self, - migrate_policy: str, + request_migration_policy: str, migration_backend: str, migration_cache_blocks: int, last_stage_max_blocks: int, max_stages: int,) -> None: - self.migrate_policy = migrate_policy + self.request_migration_policy = request_migration_policy self.migration_backend = migration_backend self.migration_cache_blocks = migration_cache_blocks self.last_stage_max_blocks = last_stage_max_blocks @@ -31,10 +31,10 @@ def __init__( initial_instances: int, load_metric: str, dispatch_policy: str, - check_migirate_policy: str, + pair_migration_policy: str, migrate_out_threshold: float, - enable_prefill_migrate: bool, - scale_policy: str, + enable_defrag: bool, + scaling_policy: str, scale_up_threshold: float, scale_down_threshold: float) -> None: self.initial_instances = initial_instances @@ -42,10 +42,10 @@ def __init__( self.dispatch_policy = dispatch_policy - self.check_migrate_policy = check_migirate_policy + self.pair_migration_policy = pair_migration_policy self.migrate_out_load_threshold = migrate_out_threshold*(-1) - self.enable_prefill_migrate = enable_prefill_migrate + self.enable_defrag = enable_defrag - self.scale_policy = scale_policy + self.scaling_policy = scaling_policy self.scale_up_threshold = scale_up_threshold*(-1) self.scale_down_threshold = scale_down_threshold*(-1) diff --git a/llumnix/global_scheduler/global_scheduler.py b/llumnix/global_scheduler/global_scheduler.py index ede34011..d8626d2e 100644 --- a/llumnix/global_scheduler/global_scheduler.py +++ b/llumnix/global_scheduler/global_scheduler.py @@ -18,7 +18,7 @@ from llumnix.instance_info import InstanceLoadCalculator, InstanceInfo from llumnix.global_scheduler.dispatch_scheduler import DispatchScheduler from llumnix.global_scheduler.migration_scheduler import MigrationScheduler -from llumnix.global_scheduler.scale_scheduler import ScaleScheduler +from llumnix.global_scheduler.scaling_scheduler import ScalingScheduler logger = init_logger(__name__) @@ -29,22 +29,22 @@ def __init__(self, self.global_scheduler_config = global_scheduler_config # instance load and instance info args self.load_metric = global_scheduler_config.load_metric - self.enable_prefill_migrate = global_scheduler_config.enable_prefill_migrate + self.enable_defrag = global_scheduler_config.enable_defrag self.instance_load_calculator = InstanceLoadCalculator(load_metric=self.load_metric, - enable_prefill_migrate=self.enable_prefill_migrate) + enable_defrag=self.enable_defrag) # dispatch args self.dispatch_policy = global_scheduler_config.dispatch_policy self.dispatch_scheduler = DispatchScheduler(global_scheduler_config.dispatch_policy, self.instance_load_calculator) # migrate args - self.migrate_scheduler = MigrationScheduler(global_scheduler_config.check_migrate_policy, - global_scheduler_config.migrate_out_load_threshold, - self.instance_load_calculator) + self.migration_scheduler = MigrationScheduler(global_scheduler_config.pair_migration_policy, + global_scheduler_config.migrate_out_load_threshold, + self.instance_load_calculator) # auto-scaling args - self.scale_scheduler = ScaleScheduler(global_scheduler_config.scale_up_threshold, - global_scheduler_config.scale_down_threshold, - global_scheduler_config.scale_policy, - self.instance_load_calculator) + self.scaling_scheduler = ScalingScheduler(global_scheduler_config.scale_up_threshold, + global_scheduler_config.scale_down_threshold, + global_scheduler_config.scaling_policy, + self.instance_load_calculator) self.num_instance = 0 self.instance_id_set: Set[str] = set() @@ -63,14 +63,14 @@ def dispatch(self) -> str: instance_id = self.dispatch_scheduler.dispatch() return instance_id - def check_migrate(self) -> List[Tuple[str, str]]: - self.migrate_scheduler.update_instance_infos(self.instance_info) - migrate_instance_pairs = self.migrate_scheduler.check_migrate() + def pair_migration(self) -> List[Tuple[str, str]]: + self.migration_scheduler.update_instance_infos(self.instance_info) + migrate_instance_pairs = self.migration_scheduler.pair_migration() return migrate_instance_pairs def check_scale(self) -> Tuple[str, str]: - self.scale_scheduler.update_instance_infos(self.instance_info) - scale_up_num, scale_down_num = self.scale_scheduler.check_scale() + self.scaling_scheduler.update_instance_infos(self.instance_info) + scale_up_num, scale_down_num = self.scaling_scheduler.check_scale() return scale_up_num, scale_down_num def scale_up(self, instance_id: Union[str, Iterable[str]]) -> None: @@ -100,16 +100,16 @@ def scale_down(self, instance_id: Union[str, Iterable[str]]) -> None: def _add_instance(self, instance_id: str) -> None: self.instance_id_set.add(instance_id) self.num_instance = len(self.instance_id_set) - for scheduler in (self.dispatch_scheduler, self.migrate_scheduler, self.scale_scheduler): + for scheduler in (self.dispatch_scheduler, self.migration_scheduler, self.scaling_scheduler): scheduler.update_instance_infos(self.instance_info) scheduler.add_instance(instance_id) def _remove_instance(self, instance_id: str) -> None: self.instance_id_set.remove(instance_id) self.num_instance = len(self.instance_id_set) - for scheduler in (self.dispatch_scheduler, self.migrate_scheduler, self.scale_scheduler): + for scheduler in (self.dispatch_scheduler, self.migration_scheduler, self.scaling_scheduler): scheduler.update_instance_infos(self.instance_info) scheduler.remove_instance(instance_id) def _get_empty_instance_info(self) -> InstanceInfo: - return self.scale_scheduler.get_empty_instance_info() + return self.scaling_scheduler.get_empty_instance_info() diff --git a/llumnix/global_scheduler/migration_scheduler.py b/llumnix/global_scheduler/migration_scheduler.py index 8268dbb4..11405d48 100644 --- a/llumnix/global_scheduler/migration_scheduler.py +++ b/llumnix/global_scheduler/migration_scheduler.py @@ -24,22 +24,22 @@ class MigrationScheduler: def __init__(self, - check_migrate_policy: str, + pair_migration_policy: str, migrate_out_load_threshold: float, instance_load_calculator: InstanceLoadCalculator) -> None: self.migrate_out_load_threshold = migrate_out_load_threshold self.instance_load_calculator = instance_load_calculator - self.enable_prefill_migrate = instance_load_calculator.enable_prefill_migrate - if not self.enable_prefill_migrate: - self.check_migrate_policy \ - = CheckMigratePolicyFactory.get_policy("balanced", - migrate_out_load_threshold=migrate_out_load_threshold, - instance_load_calculator=instance_load_calculator) + self.enable_defrag = instance_load_calculator.enable_defrag + if not self.enable_defrag: + self.pair_migration_policy \ + = PairMigrationPolicyFactory.get_policy("balanced", + migrate_out_load_threshold=migrate_out_load_threshold, + instance_load_calculator=instance_load_calculator) else: - self.check_migrate_policy \ - = CheckMigratePolicyFactory.get_policy(check_migrate_policy, - migrate_out_load_threshold=migrate_out_load_threshold, - instance_load_calculator=instance_load_calculator) + self.pair_migration_policy \ + = PairMigrationPolicyFactory.get_policy(pair_migration_policy, + migrate_out_load_threshold=migrate_out_load_threshold, + instance_load_calculator=instance_load_calculator) self.num_instance = 0 self.instance_id_set: Set[str] = set() @@ -47,9 +47,9 @@ def __init__(self, self.instance_info: Dict[str, InstanceInfo] = None self.sorted_instance_infos: List[InstanceInfo] = None - def check_migrate(self) -> List[Tuple[str, str]]: + def pair_migration(self) -> List[Tuple[str, str]]: self._sort_instance_infos(descending=False) - return self.check_migrate_policy.check_migrate(self.sorted_instance_infos) + return self.pair_migration_policy.pair_migration(self.sorted_instance_infos) def update_instance_infos(self, instance_info: Dict[str, InstanceInfo]) -> None: @@ -73,7 +73,7 @@ def _sort_instance_infos(self, reverse=descending ) -class CheckMigratePolicy(ABC): +class PairMigrationPolicy(ABC): def __init__(self, migrate_out_load_threshold: float, instance_load_calculator: InstanceLoadCalculator) -> None: @@ -81,15 +81,15 @@ def __init__(self, self.instance_load_calculator = instance_load_calculator @abstractmethod - def check_migrate(self, + def pair_migration(self, sorted_instance_infos: List[InstanceInfo] ) -> List[Tuple[str, str]]: raise NotImplementedError -class Balanced(CheckMigratePolicy): - def check_migrate(self, - sorted_instance_infos: List[InstanceInfo] - ) -> List[Tuple[str, str]]: +class Balanced(PairMigrationPolicy): + def pair_migration(self, + sorted_instance_infos: List[InstanceInfo] + ) -> List[Tuple[str, str]]: # migrate in instances left_instance_infos = [i for i in sorted_instance_infos if i.num_killed_request == 0 and i.instance_load_migrate < self.migrate_out_load_threshold] @@ -120,10 +120,10 @@ def _compute_instance_load_after_migrate(self, instance_info: InstanceInfo, is_m instance_info_after_migrate.num_free_gpu_block += num_block_last_running_request return self.instance_load_calculator.compute_instance_load(instance_info_after_migrate, action='migrate') -class PrefillConstrained(CheckMigratePolicy): - def check_migrate(self, - sorted_instance_infos: List[InstanceInfo] - ) -> List[Tuple[str, str]]: +class PrefillConstrained(PairMigrationPolicy): + def pair_migration(self, + sorted_instance_infos: List[InstanceInfo] + ) -> List[Tuple[str, str]]: # migrate in instances left_instance_infos = [i for i in sorted_instance_infos if i.num_killed_request == 0 and i.instance_load_migrate < self.migrate_out_load_threshold] @@ -136,10 +136,10 @@ def check_migrate(self, migrate_instance_pairs.append((right_instance_infos[i].instance_id, left_instance_infos[i].instance_id)) return migrate_instance_pairs -class PrefillRelaxed(CheckMigratePolicy): - def check_migrate(self, - sorted_instance_infos: List[InstanceInfo] - ) -> List[Tuple[str, str]]: +class PrefillRelaxed(PairMigrationPolicy): + def pair_migration(self, + sorted_instance_infos: List[InstanceInfo] + ) -> List[Tuple[str, str]]: # migrate in instances left_instance_infos = [i for i in sorted_instance_infos if i.num_killed_request == 0 and i.instance_load_migrate < self.migrate_out_load_threshold] @@ -150,7 +150,7 @@ def check_migrate(self, migrate_instance_pairs.append((right_instance_infos[i].instance_id, left_instance_infos[i].instance_id)) return migrate_instance_pairs -class CheckMigratePolicyFactory: +class PairMigrationPolicyFactory: _POLICY_REGISTRY = { 'balanced': Balanced, 'prefill_constrained': PrefillConstrained, @@ -158,5 +158,5 @@ class CheckMigratePolicyFactory: } @classmethod - def get_policy(cls, policy_name: str, **kwargs) -> CheckMigratePolicy: + def get_policy(cls, policy_name: str, **kwargs) -> PairMigrationPolicy: return cls._POLICY_REGISTRY[policy_name](**kwargs) diff --git a/llumnix/global_scheduler/scale_scheduler.py b/llumnix/global_scheduler/scaling_scheduler.py similarity index 94% rename from llumnix/global_scheduler/scale_scheduler.py rename to llumnix/global_scheduler/scaling_scheduler.py index ee37cc12..99913098 100644 --- a/llumnix/global_scheduler/scale_scheduler.py +++ b/llumnix/global_scheduler/scaling_scheduler.py @@ -21,15 +21,15 @@ logger = init_logger(__name__) -class ScaleScheduler: +class ScalingScheduler: def __init__(self, scale_up_threshold: float, scale_down_threshold: float, - scale_policy: str, + scaling_policy: str, instance_load_calculator: InstanceLoadCalculator) -> None: self.scale_up_threshold = scale_up_threshold self.scale_down_threshold = scale_down_threshold - self.scale_policy = ScalePolicyFactory.get_policy(scale_policy, + self.scaling_policy = ScalePolicyFactory.get_policy(scaling_policy, instance_load_calculator=instance_load_calculator) self.instance_load_calculator = instance_load_calculator @@ -46,10 +46,10 @@ def check_scale(self) -> Tuple[str, str]: if len(self.instance_info.keys()) < self.num_instance: return scale_up_num, scale_down_num now_instances = [self.instance_info[instance_id] for instance_id in self.instance_id_set] - load_metric_up = self.scale_policy.compute_load_metric_up(now_instances) - load_metric_down = self.scale_policy.compute_load_metric_down(now_instances) + load_metric_up = self.scaling_policy.compute_load_metric_up(now_instances) + load_metric_down = self.scaling_policy.compute_load_metric_down(now_instances) if load_metric_up > self.scale_up_threshold: - while self.scale_policy.compute_load_metric_avg(now_instances) > self.scale_up_threshold: + while self.scaling_policy.compute_load_metric_avg(now_instances) > self.scale_up_threshold: scale_up_num += 1 now_instances.append(self.get_empty_instance_info()) elif load_metric_down < self.scale_down_threshold: diff --git a/llumnix/instance_info.py b/llumnix/instance_info.py index f8bc1c5e..47ba6352 100644 --- a/llumnix/instance_info.py +++ b/llumnix/instance_info.py @@ -91,14 +91,14 @@ def __init__(self, instance_info: InstanceInfo) -> None: class InstanceLoadCalculator: def __init__(self, load_metric: str, - enable_prefill_migrate: bool) -> None: + enable_defrag: bool) -> None: assert load_metric in ['consumed_speed', 'used_ratio'] self.load_metric = load_metric - self.enable_prefill_migrate = enable_prefill_migrate + self.enable_defrag = enable_defrag self.load_computation_strategies: Dict[str, LoadComputationStrategy] = { - 'migrate': MigrateLoadComputation(load_metric, enable_prefill_migrate), - 'dispatch': DispatchAndScaleLoadComputation(load_metric, enable_prefill_migrate), - 'scale': DispatchAndScaleLoadComputation(load_metric, enable_prefill_migrate), + 'migrate': MigrationLoadComputation(load_metric, enable_defrag), + 'dispatch': DispatchAndScalingLoadComputation(load_metric, enable_defrag), + 'scale': DispatchAndScalingLoadComputation(load_metric, enable_defrag), } def compute_instance_load(self, @@ -112,22 +112,22 @@ def compute_instance_load(self, class LoadComputationStrategy(ABC): def __init__(self, load_metric: str, - enable_prefill_migrate: bool) -> None: + enable_defrag: bool) -> None: self.load_metric = load_metric - self.enable_prefill_migrate = enable_prefill_migrate + self.enable_defrag = enable_defrag @abstractmethod def compute_instance_load(self, i: InstanceLoadInfo) -> float: pass -class MigrateLoadComputation(LoadComputationStrategy): +class MigrationLoadComputation(LoadComputationStrategy): def compute_instance_load(self, i: InstanceLoadInfo) -> float: assert self.load_metric in ['used_ratio', 'consumed_speed'] instance_load = -np.inf if self.load_metric == 'used_ratio': instance_load = i.num_used_gpu_block / i.num_total_gpu_block elif self.load_metric == 'consumed_speed': - if not self.enable_prefill_migrate: + if not self.enable_defrag: num_request = i.num_running_request num_available_gpu_block = i.num_available_gpu_block else: @@ -142,7 +142,7 @@ def compute_instance_load(self, i: InstanceLoadInfo) -> float: instance_load = (num_available_gpu_block / num_request)*(-1) return instance_load -class DispatchAndScaleLoadComputation(LoadComputationStrategy): +class DispatchAndScalingLoadComputation(LoadComputationStrategy): def compute_instance_load(self, i: InstanceLoadInfo) -> float: assert self.load_metric in ['used_ratio', 'consumed_speed'] instance_load = -np.inf diff --git a/llumnix/llm_engine_manager.py b/llumnix/llm_engine_manager.py index 6e49172c..35d9312e 100644 --- a/llumnix/llm_engine_manager.py +++ b/llumnix/llm_engine_manager.py @@ -54,13 +54,13 @@ def __init__(self, self.log_requests = log_requests self.num_instance = 0 - self.enable_migrate = engine_manager_args.enable_migrate + self.enable_migration = engine_manager_args.enable_migration self.enable_scaling = engine_manager_args.enable_scaling self.max_instances = engine_manager_args.max_instances self.min_instances = engine_manager_args.min_instances logger.info("LLMEngineManager starts") - logger.info("enable_migrate: {}".format(self.enable_migrate)) + logger.info("enable_migration: {}".format(self.enable_migration)) logger.info("num_instance: {}".format(self.num_instance)) logger.info("max_instances: {}, min_instances: {}".format(self.max_instances, self.min_instances)) @@ -76,7 +76,7 @@ def __init__(self, asyncio.create_task(self._update_instance_info_loop(self.polling_interval)) # args - self.check_migrate_frequency = engine_manager_args.check_migrate_frequency + self.pair_migration_frequency = engine_manager_args.pair_migration_frequency self.scaling_interval = engine_manager_args.scaling_interval # request states @@ -95,8 +95,8 @@ def __init__(self, self.scaling_down = False self.last_check_scale_time = time.time() + 100 - self.record_instance_info = engine_manager_args.record_instance_info - if self.record_instance_info: + self.log_instance_info = engine_manager_args.log_instance_info + if self.log_instance_info: self._init_instance_info_csv(engine_manager_args) async def generate( @@ -180,11 +180,11 @@ async def _update_instance_info_loop(self, interval: float) -> None: self.global_scheduler.update_instance_infos(instance_info_list) self.num_instance_info_update += 1 # Push migrate when the instance_info have updated a certain number of times. - if self.enable_migrate and self.num_instance_info_update != 0 \ - and self.num_instance_info_update % self.check_migrate_frequency == 0: + if self.enable_migration and self.num_instance_info_update != 0 \ + and self.num_instance_info_update % self.pair_migration_frequency == 0: asyncio.create_task(self._migrate()) - if self.record_instance_info: - self._record_instance_infos_to_csv(instance_info_list) + if self.log_instance_info: + self._log_instance_infos_to_csv(instance_info_list) # pylint: disable=W0703 except Exception as e: logger.error("unexpected exception occurs: {}".format(e)) @@ -224,7 +224,7 @@ async def _post_migrate(self, rets: List[str], call_migrate_instance_pairs: List call_migrate_instance_pairs[i][0], call_migrate_instance_pairs[i][1], migrate_out_request_ids)) async def _migrate(self) -> None: - migrate_instance_pairs = self.global_scheduler.check_migrate() + migrate_instance_pairs = self.global_scheduler.pair_migration() try: migration_tasks = [] call_migrate_instance_pairs: List[Tuple[str, str]] = [] @@ -327,7 +327,7 @@ async def is_ready(self) -> bool: def _init_instance_info_csv(self, engine_manager_args: EngineManagerArgs) -> None: # pylint: disable=consider-using-with - self.instance_info_file = open(engine_manager_args.results_filename + '_instance.csv', 'w', encoding='utf-8') + self.instance_info_file = open(engine_manager_args.log_filename + '_instance.csv', 'w', encoding='utf-8') self.instance_info_csv = csv.writer(self.instance_info_file) self.instance_info_csv.writerow([ 'timestamp', @@ -350,7 +350,7 @@ def _init_instance_info_csv(self, engine_manager_args: EngineManagerArgs) -> Non 'num_block_all_waiting_request', 'waiting_time_first_waiting_request']) - def _record_instance_infos_to_csv(self, instance_infos: List[InstanceInfo]) -> None: + def _log_instance_infos_to_csv(self, instance_infos: List[InstanceInfo]) -> None: for instance_info in instance_infos: self.instance_info_csv.writerow([ instance_info.timestamp, diff --git a/llumnix/llumlet/llumlet.py b/llumnix/llumlet/llumlet.py index 08409e2d..28d9e3d3 100644 --- a/llumnix/llumlet/llumlet.py +++ b/llumnix/llumlet/llumlet.py @@ -46,7 +46,7 @@ def __init__(self, self.migration_coordinator = MigrationCoordinator(self.backend_engine, migration_config.last_stage_max_blocks, migration_config.max_stages) - self.migration_scheduler = LocalMigrationScheduler(migration_config.migrate_policy, + self.migration_scheduler = LocalMigrationScheduler(migration_config.request_migration_policy, self.backend_engine) self.log_requests = True diff --git a/llumnix/llumlet/local_migration_scheduler.py b/llumnix/llumlet/local_migration_scheduler.py index f59c5a64..8d3df123 100644 --- a/llumnix/llumlet/local_migration_scheduler.py +++ b/llumnix/llumlet/local_migration_scheduler.py @@ -16,18 +16,18 @@ from llumnix.backends.backend_interface import BackendInterface class LocalMigrationScheduler: - def __init__(self, migrate_policy: str, backend_engine: BackendInterface) -> None: - self.migrate_policy = migrate_policy + def __init__(self, request_migration_policy: str, backend_engine: BackendInterface) -> None: + self.request_migration_policy = request_migration_policy self.backend_engine = backend_engine def get_migrate_out_request(self) -> Optional[MigratingRequest]: # TODO(s5u13b): remove the if-else codes migrate_out_request: MigratingRequest = None - if self.migrate_policy == 'LCFS': + if self.request_migration_policy == 'LCFS': migrate_out_request = self.backend_engine.get_last_running_request() - elif self.migrate_policy in ['SJF', 'LJF']: - if self.migrate_policy == 'LJF': + elif self.request_migration_policy in ['SJF', 'LJF']: + if self.request_migration_policy == 'LJF': migrate_out_request = self.backend_engine.get_longest_running_request() - elif self.migrate_policy == 'SJF': + elif self.request_migration_policy == 'SJF': migrate_out_request = self.backend_engine.get_shortest_running_request() return migrate_out_request