Skip to content

Commit

Permalink
[Failover][BugFix] Fix duplicate requests caused by failover (#18)
Browse files Browse the repository at this point in the history
  • Loading branch information
s5u13b authored Aug 29, 2024
1 parent db660ac commit 30b5bf8
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 5 deletions.
3 changes: 0 additions & 3 deletions llumnix/backends/vllm/llm_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,6 @@ def add_request(self,
server_info: ServerInfo,
*args,
**kwargs) -> None:
# When manager is unavailable, api server might dispatch the request that has already been dispatched.
if request_id in self.engine.request_server_info:
return
# Store the server information of each request to put the request outputs back to the corresponding api server correctly.
self.engine.request_server_info[request_id] = server_info
self.engine.add_request(request_id, *args, **kwargs)
Expand Down
8 changes: 8 additions & 0 deletions llumnix/entrypoints/vllm/api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
log_requests = None
num_finished_requests = 0
WAIT_MANAGER_INTERVAL = 5
manager_available = True


async def _background_process_outputs():
Expand Down Expand Up @@ -78,10 +79,17 @@ async def manager_generate(prompt, sampling_params, request_id) -> AsyncStream:
# This request's outputs will be put to the request_output_queue of this api server no matter which instance it's running in.
server_info = ServerInfo(server_id, request_output_queue)
# If manager is unavailable, request will be directly added to the llumlet held by api server.
global manager_available
try:
# await to catch exception
await engine_manager.generate.remote(request_id, server_info, prompt, sampling_params)
manager_available = True
except ray.exceptions.RayActorError:
# Do not re-generate the request to avoid duplicate requests.
if manager_available:
manager_available = False
return results_generator

try:
if instance_num_requests:
instance_id = min(instance_num_requests, key=instance_num_requests.get)
Expand Down
2 changes: 0 additions & 2 deletions llumnix/llm_engine_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,6 @@ async def generate(
except (ray.exceptions.RayActorError, KeyError):
logger.info("[generate] instance {} is dead, regenerate request {}".format(instance_id, request_id))
self.scale_down(instance_id)
if self.num_instances != 0:
asyncio.create_task(self.generate(request_id, server_info, *args, **kwargs))

async def abort(self, request_id: Union[str, Iterable[str]]) -> None:
if isinstance(request_id, str):
Expand Down

0 comments on commit 30b5bf8

Please sign in to comment.