From e18cfa5bfce9ff8f3d3f86204fd9a6045cdbf2eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=97=B2=E5=AE=87?= Date: Thu, 12 Dec 2024 09:53:11 +0000 Subject: [PATCH] refine --- llumnix/backends/bladellm/llm_engine.py | 58 ------------------------- llumnix/llumlet/llumlet.py | 4 -- 2 files changed, 62 deletions(-) diff --git a/llumnix/backends/bladellm/llm_engine.py b/llumnix/backends/bladellm/llm_engine.py index ec7b0ccd..02cedc2f 100644 --- a/llumnix/backends/bladellm/llm_engine.py +++ b/llumnix/backends/bladellm/llm_engine.py @@ -214,56 +214,6 @@ def __init__(self, PrefillAsyncLLMEngine.__init__(self, *args, **kwargs) LLMEngineLlumnixMixin.__init__(self, instance_id, output_queue_type, migration_config, placement_group, node_id) - async def _fetch_post(self, url, data, headers, inst_id): - decode_actor = ray.get_actor("instance_"+inst_id, namespace="llumnix") - response = await decode_actor.exec_entrypoint_method.remote("inner_"+url.split("/")[-1], data) - return json.loads(response) - - async def post_scheduler_update(self, resp, update_output): - self._scheduler.remove_request_from_hanging(list(resp.kv_transfer_done_ids)) - await self._handle_generated_results(update_output) - - if update_output.response is not None: - for req_id, _ in update_output.response.items(): - if req_id in self._single_out_token_reqs: - self._single_out_token_reqs.remove(req_id) - - if update_output.reset: - await self._handle_reset() - elif update_output.response is not None: - for req_id, l_resp in update_output.response.items(): - if req_id in self._back_queue: - # self._back_queue[req_id].put_nowait(l_resp) - await self.trans_wrapper.send(req_id, l_resp) - if l_resp.is_finished: - del self._back_queue[req_id] - - async def _pull_tokens_stream(self): - await self.pd_disagg_initialized.wait() - while not self._stop_event.is_set(): - socks = dict(await self.poller.poll(0)) - if self._recv_from_decode in socks and socks[self._recv_from_decode] & zmq.POLLIN: - prefill_recv_obj: RemoteGenerateStreamResponseLlumnix = await self._recv_from_decode.recv_pyobj() - if prefill_recv_obj.external_id not in self.external_to_reqid: - logger.warning("pd_prefill request {} not found in prefill instance", prefill_recv_obj.external_id) - continue - req_id = self.external_to_reqid[prefill_recv_obj.external_id] - if req_id not in self._back_queue: - logger.warning("pd_prefill request {} not found in back_queue", req_id) - continue - # self._back_queue[req_id].put_nowait(prefill_recv_obj) - - prefill_recv_obj.server_info = pickle.loads(eval(prefill_recv_obj.server_info)) - prefill_recv_obj.request_id = req_id - - await self.trans_wrapper.send(req_id, prefill_recv_obj) - - if prefill_recv_obj.is_finished: - self._remove_request_state(req_id, prefill_recv_obj.external_id) - del self._back_queue[req_id] - await asyncio.sleep(0) - logger.info('stop event is set exit pull token loop') - class DecodeAsyncLLMEngineLlumnix(LLMEngineLlumnixMixin, DecodeAsyncLLMEngine): def __init__(self, instance_id: str, @@ -276,10 +226,6 @@ def __init__(self, DecodeAsyncLLMEngine.__init__(self, *args, **kwargs) LLMEngineLlumnixMixin.__init__(self, instance_id, output_queue_type, migration_config, placement_group, node_id) - def start(self, loop: asyncio.AbstractEventLoop): - LLMEngineLlumnixMixin.start(self, loop) - self.entrypoint = DecodeEntrypoint(self._client, self._args) - class BackendBladeLLM(BackendInterface): def __init__( self, @@ -340,10 +286,6 @@ def abort_request(self, request_id: Union[str, Iterable[str]]) -> None: for req_id in request_ids: self.engine.drop_request(int(req_id)) - def exec_entrypoint_method(self, method, *args, **kwargs): - executor = getattr(self.engine.entrypoint, method) - return asyncio.run_coroutine_threadsafe(executor(*args, **kwargs), self._loop).result() - async def _start_engine_step_loop(self) -> None: pass diff --git a/llumnix/llumlet/llumlet.py b/llumnix/llumlet/llumlet.py index 5016e5a8..98bc6090 100644 --- a/llumnix/llumlet/llumlet.py +++ b/llumnix/llumlet/llumlet.py @@ -256,7 +256,3 @@ def execute_migration_method(self, method, *args, **kwargs): def execute_engine_method(self, method, *args, **kwargs): executor = getattr(self.backend_engine, method) return executor(*args, **kwargs) - - def exec_entrypoint_method(self, method, *args, **kwargs): - executor = getattr(self.backend_engine, "exec_entrypoint_method") - return executor(method, *args, **kwargs)