From 816e81ddf507d862c3656092faf18c900ab9ce38 Mon Sep 17 00:00:00 2001 From: Biao Sun <39851894+s5u13b@users.noreply.github.com> Date: Tue, 23 Jul 2024 20:08:54 +0800 Subject: [PATCH 1/2] Add pylint to CI (#3) --- .github/workflows/pylint.yml | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) create mode 100644 .github/workflows/pylint.yml diff --git a/.github/workflows/pylint.yml b/.github/workflows/pylint.yml new file mode 100644 index 00000000..cdb2aa7e --- /dev/null +++ b/.github/workflows/pylint.yml @@ -0,0 +1,29 @@ +name: Pylint + +on: + push: + branches: + - main + pull_request: + branches: + - main + +jobs: + build: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ["3.8", "3.9", "3.10"] + steps: + - uses: actions/checkout@v4 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v3 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install pylint==2.12.2 + - name: Analysing the code with pylint + run: | + pylint --rcfile=.pylintrc --output-format=parseable --jobs=8 $( find llumnix/ -type f -name '*.py') From fa393d4865eda42890dac1490aacda7fc5f3b334 Mon Sep 17 00:00:00 2001 From: Ziming Huang <48115868+ZeldaHuang@users.noreply.github.com> Date: Thu, 25 Jul 2024 16:22:46 +0800 Subject: [PATCH 2/2] [Misc] Improve simulator&&api_server performance (#6) --- benchmark/benchmark_serving.py | 2 +- llumnix/backends/vllm/executor.py | 20 ++++++++++++++++---- llumnix/backends/vllm/llm_engine.py | 4 +--- llumnix/entrypoints/vllm/api_server.py | 4 ++-- 4 files changed, 20 insertions(+), 10 deletions(-) diff --git a/benchmark/benchmark_serving.py b/benchmark/benchmark_serving.py index b5afe3b7..c915e1f5 100644 --- a/benchmark/benchmark_serving.py +++ b/benchmark/benchmark_serving.py @@ -424,7 +424,7 @@ async def benchmark( allow_variable_generation_length: bool, verbose: bool, results_filename: str, - ip_ports: list[int], + ip_ports: List[int], distribution: str, qps: float, coefficient_variation: float, diff --git a/llumnix/backends/vllm/executor.py b/llumnix/backends/vllm/executor.py index 8cfd136f..c3fd183a 100644 --- a/llumnix/backends/vllm/executor.py +++ b/llumnix/backends/vllm/executor.py @@ -36,6 +36,7 @@ class LlumnixRayGPUExecutor(RayGPUExecutor): def _init_workers_ray(self, placement_group: "PlacementGroup", **ray_remote_kwargs): + self.last_inference_latency = 0 if self.parallel_config.tensor_parallel_size == 1: # For single GPU case, we use a ray worker with constrained memory. num_gpus = self.cache_config.gpu_memory_utilization @@ -73,7 +74,7 @@ def _init_workers_ray(self, placement_group: "PlacementGroup", num_cpus=0, num_gpus=num_gpus, scheduling_strategy=scheduling_strategy, - max_concurrency=4, + max_concurrency=2, **ray_remote_kwargs, )(RayWorkerWrapper).remote( worker_module_name="llumnix.backends.vllm.worker", @@ -146,10 +147,18 @@ def _init_workers_ray(self, placement_group: "PlacementGroup", max_concurrent_workers=self.parallel_config. max_parallel_loading_workers) + def execute_model(self, *args, **kwargs): + t0 = time.time() + outputs = super().execute_model(*args, **kwargs) + t1 = time.time() + self.last_inference_latency = (t1 - t0) * 1000 + return outputs + class SimGPUExecutor(GPUExecutor): latency_mem: LatencyMemData = None def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) + self.last_inference_latency = 0 self.migration_bandwidth = self.latency_mem.migration_bandwidth # TODO(ziming) add swap bandwidth @@ -187,10 +196,13 @@ def execute_model( decode_bs = _pad_to_alignment(decode_bs, 8) latency = 0 if prefill_seq_len: - latency += model_prefill(prefill_seq_len, *self.latency_mem.prefill_model_params) / 1000 + latency += self.latency_mem.prefill_latency[prefill_seq_len][0] if prefill_seq_len in self.latency_mem.prefill_latency \ + else model_prefill(prefill_seq_len, *self.latency_mem.prefill_model_params) if decode_bs: - latency += model_decode((decode_bs, decode_seq_len), *self.latency_mem.decode_model_params) / 1000 - time.sleep(latency) + decode_meta_data = (decode_bs, decode_seq_len) + latency += self.latency_mem.decode_latency[decode_meta_data][0] if decode_meta_data in self.latency_mem.decode_latency \ + else model_decode((decode_bs, decode_seq_len), *self.latency_mem.decode_model_params) + time.sleep(latency/1000) sampler_outputs = [] for meta_data in execute_model_req.seq_group_metadata_list: samples = [] diff --git a/llumnix/backends/vllm/llm_engine.py b/llumnix/backends/vllm/llm_engine.py index c3311d1a..cee6cd1a 100644 --- a/llumnix/backends/vllm/llm_engine.py +++ b/llumnix/backends/vllm/llm_engine.py @@ -177,9 +177,7 @@ def send_blocks(self, dst_ray_actor: "ray.actor.ActorHandle", src_blocks: List[i src_worker_handle_list=self.worker_handle_list)) def step(self) -> Tuple[List[RequestOutput], InstanceInfo, List[ServerInfo]]: - t0_inference_begin = time.time() output_list = self.engine.step() - t1_inference_end = time.time() instance_info: InstanceInfo = self.engine.scheduler.get_record_instance_info() @@ -191,7 +189,7 @@ def step(self) -> Tuple[List[RequestOutput], InstanceInfo, List[ServerInfo]]: instance_info.instance_id = self.instance_id instance_info.step_id = next(self.step_counter) instance_info.timestamp = time.time() - instance_info.latency = (t1_inference_end - t0_inference_begin)*1000 + instance_info.latency = self.engine.model_executor.last_inference_latency seq_groups = self.engine.scheduler.running if seq_groups: tot_blocks = [] diff --git a/llumnix/entrypoints/vllm/api_server.py b/llumnix/entrypoints/vllm/api_server.py index 21663cd9..344b879b 100644 --- a/llumnix/entrypoints/vllm/api_server.py +++ b/llumnix/entrypoints/vllm/api_server.py @@ -49,7 +49,8 @@ async def _background_process_outputs(): while True: - request_outputs = request_output_queue.get_nowait_batch(num_items=request_output_queue.qsize()) + qsize = await request_output_queue.actor.qsize.remote() + request_outputs = await request_output_queue.actor.get_nowait_batch.remote(qsize) for request_output in request_outputs: request_id = request_output.request_id # Request could be dispatched twice when manager is dead, the first request will free the request_streams when finished. @@ -59,7 +60,6 @@ async def _background_process_outputs(): if request_output.finished: request_streams[request_id].finish() del request_streams[request_id] - await asyncio.sleep(0.01) # pylint: disable=unused-argument @asynccontextmanager