Skip to content

Commit

Permalink
refine
Browse files Browse the repository at this point in the history
  • Loading branch information
KuilongCui committed Dec 12, 2024
1 parent 4b839d5 commit e18cfa5
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 62 deletions.
58 changes: 0 additions & 58 deletions llumnix/backends/bladellm/llm_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,56 +214,6 @@ def __init__(self,
PrefillAsyncLLMEngine.__init__(self, *args, **kwargs)
LLMEngineLlumnixMixin.__init__(self, instance_id, output_queue_type, migration_config, placement_group, node_id)

async def _fetch_post(self, url, data, headers, inst_id):
decode_actor = ray.get_actor("instance_"+inst_id, namespace="llumnix")
response = await decode_actor.exec_entrypoint_method.remote("inner_"+url.split("/")[-1], data)
return json.loads(response)

async def post_scheduler_update(self, resp, update_output):
self._scheduler.remove_request_from_hanging(list(resp.kv_transfer_done_ids))
await self._handle_generated_results(update_output)

if update_output.response is not None:
for req_id, _ in update_output.response.items():
if req_id in self._single_out_token_reqs:
self._single_out_token_reqs.remove(req_id)

if update_output.reset:
await self._handle_reset()
elif update_output.response is not None:
for req_id, l_resp in update_output.response.items():
if req_id in self._back_queue:
# self._back_queue[req_id].put_nowait(l_resp)
await self.trans_wrapper.send(req_id, l_resp)
if l_resp.is_finished:
del self._back_queue[req_id]

async def _pull_tokens_stream(self):
await self.pd_disagg_initialized.wait()
while not self._stop_event.is_set():
socks = dict(await self.poller.poll(0))
if self._recv_from_decode in socks and socks[self._recv_from_decode] & zmq.POLLIN:
prefill_recv_obj: RemoteGenerateStreamResponseLlumnix = await self._recv_from_decode.recv_pyobj()
if prefill_recv_obj.external_id not in self.external_to_reqid:
logger.warning("pd_prefill request {} not found in prefill instance", prefill_recv_obj.external_id)
continue
req_id = self.external_to_reqid[prefill_recv_obj.external_id]
if req_id not in self._back_queue:
logger.warning("pd_prefill request {} not found in back_queue", req_id)
continue
# self._back_queue[req_id].put_nowait(prefill_recv_obj)

prefill_recv_obj.server_info = pickle.loads(eval(prefill_recv_obj.server_info))
prefill_recv_obj.request_id = req_id

await self.trans_wrapper.send(req_id, prefill_recv_obj)

if prefill_recv_obj.is_finished:
self._remove_request_state(req_id, prefill_recv_obj.external_id)
del self._back_queue[req_id]
await asyncio.sleep(0)
logger.info('stop event is set exit pull token loop')

class DecodeAsyncLLMEngineLlumnix(LLMEngineLlumnixMixin, DecodeAsyncLLMEngine):
def __init__(self,
instance_id: str,
Expand All @@ -276,10 +226,6 @@ def __init__(self,
DecodeAsyncLLMEngine.__init__(self, *args, **kwargs)
LLMEngineLlumnixMixin.__init__(self, instance_id, output_queue_type, migration_config, placement_group, node_id)

def start(self, loop: asyncio.AbstractEventLoop):
LLMEngineLlumnixMixin.start(self, loop)
self.entrypoint = DecodeEntrypoint(self._client, self._args)

class BackendBladeLLM(BackendInterface):
def __init__(
self,
Expand Down Expand Up @@ -340,10 +286,6 @@ def abort_request(self, request_id: Union[str, Iterable[str]]) -> None:
for req_id in request_ids:
self.engine.drop_request(int(req_id))

def exec_entrypoint_method(self, method, *args, **kwargs):
executor = getattr(self.engine.entrypoint, method)
return asyncio.run_coroutine_threadsafe(executor(*args, **kwargs), self._loop).result()

async def _start_engine_step_loop(self) -> None:
pass

Expand Down
4 changes: 0 additions & 4 deletions llumnix/llumlet/llumlet.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,3 @@ def execute_migration_method(self, method, *args, **kwargs):
def execute_engine_method(self, method, *args, **kwargs):
executor = getattr(self.backend_engine, method)
return executor(*args, **kwargs)

def exec_entrypoint_method(self, method, *args, **kwargs):
executor = getattr(self.backend_engine, "exec_entrypoint_method")
return executor(method, *args, **kwargs)

0 comments on commit e18cfa5

Please sign in to comment.