diff --git a/.github/workflows/pylint.yml b/.github/workflows/pylint.yml new file mode 100644 index 00000000..cdb2aa7e --- /dev/null +++ b/.github/workflows/pylint.yml @@ -0,0 +1,29 @@ +name: Pylint + +on: + push: + branches: + - main + pull_request: + branches: + - main + +jobs: + build: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ["3.8", "3.9", "3.10"] + steps: + - uses: actions/checkout@v4 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v3 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install pylint==2.12.2 + - name: Analysing the code with pylint + run: | + pylint --rcfile=.pylintrc --output-format=parseable --jobs=8 $( find llumnix/ -type f -name '*.py') diff --git a/benchmark/benchmark_serving.py b/benchmark/benchmark_serving.py index b5afe3b7..c915e1f5 100644 --- a/benchmark/benchmark_serving.py +++ b/benchmark/benchmark_serving.py @@ -424,7 +424,7 @@ async def benchmark( allow_variable_generation_length: bool, verbose: bool, results_filename: str, - ip_ports: list[int], + ip_ports: List[int], distribution: str, qps: float, coefficient_variation: float, diff --git a/llumnix/backends/vllm/executor.py b/llumnix/backends/vllm/executor.py index 8cfd136f..c3fd183a 100644 --- a/llumnix/backends/vllm/executor.py +++ b/llumnix/backends/vllm/executor.py @@ -36,6 +36,7 @@ class LlumnixRayGPUExecutor(RayGPUExecutor): def _init_workers_ray(self, placement_group: "PlacementGroup", **ray_remote_kwargs): + self.last_inference_latency = 0 if self.parallel_config.tensor_parallel_size == 1: # For single GPU case, we use a ray worker with constrained memory. num_gpus = self.cache_config.gpu_memory_utilization @@ -73,7 +74,7 @@ def _init_workers_ray(self, placement_group: "PlacementGroup", num_cpus=0, num_gpus=num_gpus, scheduling_strategy=scheduling_strategy, - max_concurrency=4, + max_concurrency=2, **ray_remote_kwargs, )(RayWorkerWrapper).remote( worker_module_name="llumnix.backends.vllm.worker", @@ -146,10 +147,18 @@ def _init_workers_ray(self, placement_group: "PlacementGroup", max_concurrent_workers=self.parallel_config. max_parallel_loading_workers) + def execute_model(self, *args, **kwargs): + t0 = time.time() + outputs = super().execute_model(*args, **kwargs) + t1 = time.time() + self.last_inference_latency = (t1 - t0) * 1000 + return outputs + class SimGPUExecutor(GPUExecutor): latency_mem: LatencyMemData = None def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) + self.last_inference_latency = 0 self.migration_bandwidth = self.latency_mem.migration_bandwidth # TODO(ziming) add swap bandwidth @@ -187,10 +196,13 @@ def execute_model( decode_bs = _pad_to_alignment(decode_bs, 8) latency = 0 if prefill_seq_len: - latency += model_prefill(prefill_seq_len, *self.latency_mem.prefill_model_params) / 1000 + latency += self.latency_mem.prefill_latency[prefill_seq_len][0] if prefill_seq_len in self.latency_mem.prefill_latency \ + else model_prefill(prefill_seq_len, *self.latency_mem.prefill_model_params) if decode_bs: - latency += model_decode((decode_bs, decode_seq_len), *self.latency_mem.decode_model_params) / 1000 - time.sleep(latency) + decode_meta_data = (decode_bs, decode_seq_len) + latency += self.latency_mem.decode_latency[decode_meta_data][0] if decode_meta_data in self.latency_mem.decode_latency \ + else model_decode((decode_bs, decode_seq_len), *self.latency_mem.decode_model_params) + time.sleep(latency/1000) sampler_outputs = [] for meta_data in execute_model_req.seq_group_metadata_list: samples = [] diff --git a/llumnix/backends/vllm/llm_engine.py b/llumnix/backends/vllm/llm_engine.py index 64914cfd..a1970670 100644 --- a/llumnix/backends/vllm/llm_engine.py +++ b/llumnix/backends/vllm/llm_engine.py @@ -112,9 +112,7 @@ def _process_model_outputs( return super()._process_model_outputs(output, scheduled_seq_groups, ignored_seq_groups, seq_group_metadata_list) def step(self) -> None: - t0_inference_begin = time.time() output_list = super().step() - t1_inference_end = time.time() instance_info: InstanceInfo = self.scheduler.get_record_instance_info() @@ -126,7 +124,7 @@ def step(self) -> None: instance_info.instance_id = self.instance_id instance_info.step_id = next(self.step_counter) instance_info.timestamp = time.time() - instance_info.latency = (t1_inference_end - t0_inference_begin)*1000 + instance_info.latency = self.model_executor.last_inference_latency seq_groups = self.scheduler.running if seq_groups: tot_blocks = [] diff --git a/llumnix/entrypoints/vllm/api_server.py b/llumnix/entrypoints/vllm/api_server.py index 21663cd9..344b879b 100644 --- a/llumnix/entrypoints/vllm/api_server.py +++ b/llumnix/entrypoints/vllm/api_server.py @@ -49,7 +49,8 @@ async def _background_process_outputs(): while True: - request_outputs = request_output_queue.get_nowait_batch(num_items=request_output_queue.qsize()) + qsize = await request_output_queue.actor.qsize.remote() + request_outputs = await request_output_queue.actor.get_nowait_batch.remote(qsize) for request_output in request_outputs: request_id = request_output.request_id # Request could be dispatched twice when manager is dead, the first request will free the request_streams when finished. @@ -59,7 +60,6 @@ async def _background_process_outputs(): if request_output.finished: request_streams[request_id].finish() del request_streams[request_id] - await asyncio.sleep(0.01) # pylint: disable=unused-argument @asynccontextmanager