diff --git a/llumnix/backends/vllm/llm_engine.py b/llumnix/backends/vllm/llm_engine.py index c99fdad2..22ff9ce3 100644 --- a/llumnix/backends/vllm/llm_engine.py +++ b/llumnix/backends/vllm/llm_engine.py @@ -251,9 +251,6 @@ def add_request(self, server_info: ServerInfo, *args, **kwargs) -> None: - # When manager is unavailable, api server might dispatch the request that has already been dispatched. - if request_id in self.engine.request_server_info: - return # Store the server information of each request to put the request outputs back to the corresponding api server correctly. self.engine.request_server_info[request_id] = server_info self.engine.add_request(request_id, *args, **kwargs) diff --git a/llumnix/entrypoints/vllm/api_server.py b/llumnix/entrypoints/vllm/api_server.py index 429630e3..ae9c0a4b 100644 --- a/llumnix/entrypoints/vllm/api_server.py +++ b/llumnix/entrypoints/vllm/api_server.py @@ -46,6 +46,7 @@ log_requests = None num_finished_requests = 0 WAIT_MANAGER_INTERVAL = 5 +manager_available = True async def _background_process_outputs(): @@ -78,10 +79,17 @@ async def manager_generate(prompt, sampling_params, request_id) -> AsyncStream: # This request's outputs will be put to the request_output_queue of this api server no matter which instance it's running in. server_info = ServerInfo(server_id, request_output_queue) # If manager is unavailable, request will be directly added to the llumlet held by api server. + global manager_available try: # await to catch exception await engine_manager.generate.remote(request_id, server_info, prompt, sampling_params) + manager_available = True except ray.exceptions.RayActorError: + # Do not re-generate the request to avoid duplicate requests. + if manager_available: + manager_available = False + return results_generator + try: if instance_num_requests: instance_id = min(instance_num_requests, key=instance_num_requests.get) diff --git a/llumnix/llm_engine_manager.py b/llumnix/llm_engine_manager.py index 70bda6d0..2c253574 100644 --- a/llumnix/llm_engine_manager.py +++ b/llumnix/llm_engine_manager.py @@ -122,8 +122,6 @@ async def generate( except (ray.exceptions.RayActorError, KeyError): logger.info("[generate] instance {} is dead, regenerate request {}".format(instance_id, request_id)) self.scale_down(instance_id) - if self.num_instances != 0: - asyncio.create_task(self.generate(request_id, server_info, *args, **kwargs)) async def abort(self, request_id: Union[str, Iterable[str]]) -> None: if isinstance(request_id, str):