Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
s5u13b committed Oct 10, 2024
1 parent dcbf02d commit 987f5fe
Show file tree
Hide file tree
Showing 9 changed files with 44 additions and 12 deletions.
7 changes: 6 additions & 1 deletion llumnix/queue/queue_client_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,16 @@
# limitations under the License.

from abc import ABC, abstractmethod
from typing import Any
from collections.abc import Iterable

from llumnix.server_info import ServerInfo

class QueueClientBase(ABC):
@abstractmethod
async def put_nowait(self, items: Iterable, server_info: ServerInfo):
async def put_nowait(self, item: Any, server_info: ServerInfo):
raise NotImplementedError

@abstractmethod
async def put_nowait_batch(self, items: Iterable, server_info: ServerInfo):
raise NotImplementedError
10 changes: 7 additions & 3 deletions llumnix/queue/ray_queue_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.


from typing import Any
from collections.abc import Iterable

from llumnix.server_info import ServerInfo
from llumnix.queue.queue_client_base import QueueClientBase

class RayQueueClient(QueueClientBase):
async def put_nowait(self, items: Iterable, server_info: ServerInfo):
async def put_nowait(self, item: Any, server_info: ServerInfo):
output_queue = server_info.request_output_queue
return await output_queue.actor.put_nowait.remote(item)

async def put_nowait_batch(self, items: Iterable, server_info: ServerInfo):
output_queue = server_info.request_output_queue
return await output_queue.actor.put_nowait.remote(items)
return await output_queue.actor.put_nowait_batch.remote(items)
16 changes: 12 additions & 4 deletions llumnix/queue/zmq_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Any
from contextlib import contextmanager
from collections.abc import Iterable

Expand All @@ -22,8 +23,8 @@
from llumnix.server_info import ServerInfo

from llumnix.queue.zmq_utils import (RPC_GET_DATA_TIMEOUT_MS, RPC_SOCKET_LIMIT_CUTOFF, RPC_ZMQ_HWM, RPC_SUCCESS_STR,
RPCClientClosedError, RPC_REQUEST_TYPE, RPCUtilityRequest, RPCPutNoWaitQueueRequest,
get_open_zmq_ipc_path)
RPCClientClosedError, RPC_REQUEST_TYPE, RPCUtilityRequest, RPCPutNoWaitQueueRequest,
RPCPutNoWaitBatchQueueRequest, get_open_zmq_ipc_path)

logger = init_logger(__name__)

Expand Down Expand Up @@ -104,9 +105,16 @@ async def wait_for_server_rpc(self,
rpc_path=rpc_path,
error_message="Unable to start RPC Server")

async def put_nowait(self, items: Iterable, server_info: ServerInfo):
async def put_nowait(self, item: Any, server_info: ServerInfo):
rpc_path = get_open_zmq_ipc_path(server_info.request_output_queue_ip, server_info.request_output_queue_port)
await self._send_one_way_rpc_request(
request=RPCPutNoWaitQueueRequest(items=items),
request=RPCPutNoWaitQueueRequest(item=item),
rpc_path=rpc_path,
error_message="Unable to put items into queue.")

async def put_nowait_batch(self, items: Iterable, server_info: ServerInfo):
rpc_path = get_open_zmq_ipc_path(server_info.request_output_queue_ip, server_info.request_output_queue_port)
await self._send_one_way_rpc_request(
request=RPCPutNoWaitBatchQueueRequest(items=items),
rpc_path=rpc_path,
error_message="Unable to put items into queue.")
15 changes: 13 additions & 2 deletions llumnix/queue/zmq_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import cloudpickle

from llumnix.queue.zmq_utils import (RPC_ZMQ_HWM, RPC_SUCCESS_STR, RPC_SOCKET_LIMIT_CUTOFF,
RPCPutNoWaitQueueRequest, RPCUtilityRequest)
RPCPutNoWaitQueueRequest, RPCPutNoWaitBatchQueueRequest, RPCUtilityRequest)
from llumnix.logger import init_logger

logger = init_logger(__name__)
Expand Down Expand Up @@ -112,6 +112,8 @@ def _make_handler_coro(self, identity,
return self._is_server_ready(identity)
if isinstance(request, RPCPutNoWaitQueueRequest):
return self._put_nowait(identity, request)
if isinstance(request, RPCPutNoWaitBatchQueueRequest):
return self._put_nowait_batch(identity, request)

raise ValueError(f"Unknown RPCRequest type: {request}")

Expand All @@ -121,7 +123,16 @@ async def _is_server_ready(self, identity):

async def _put_nowait(self, identity, put_nowait_queue_request: RPCPutNoWaitQueueRequest):
try:
self.put_nowait(put_nowait_queue_request.items)
self.put_nowait(put_nowait_queue_request.item)
await self.socket.send_multipart(
[identity, cloudpickle.dumps(RPC_SUCCESS_STR)])
# pylint: disable=W0703
except Exception as e:
await self.socket.send_multipart([identity, cloudpickle.dumps(e)])

async def _put_nowait_batch(self, identity, put_nowait_batch_queue_request: RPCPutNoWaitBatchQueueRequest):
try:
self.put_nowait_batch(put_nowait_batch_queue_request.items)
await self.socket.send_multipart(
[identity, cloudpickle.dumps(RPC_SUCCESS_STR)])
# pylint: disable=W0703
Expand Down
6 changes: 5 additions & 1 deletion llumnix/queue/zmq_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@

@dataclass
class RPCPutNoWaitQueueRequest:
item: Any = None

@dataclass
class RPCPutNoWaitBatchQueueRequest:
items: List[Any] = None

class RPCUtilityRequest(Enum):
IS_SERVER_READY = 1

# pylint: disable=C0103
RPC_REQUEST_TYPE = Union[RPCPutNoWaitQueueRequest, RPCUtilityRequest]
RPC_REQUEST_TYPE = Union[RPCPutNoWaitQueueRequest, RPCPutNoWaitBatchQueueRequest, RPCUtilityRequest]

class RPCClientClosedError(Exception):
"""Exception class raised when the client is used post-close.
Expand Down
2 changes: 1 addition & 1 deletion tests/unit_test/backends/vllm/test_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from llumnix.llumlet.request import LlumnixRequest, RequestInferenceType
from llumnix.queue.queue_type import QueueType

from tests.unit_test.output_queue.utils import request_output_queue_server
from tests.unit_test.queue.utils import request_output_queue_server
# pylint: disable=unused-import
from tests.conftest import setup_ray_env

Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.

0 comments on commit 987f5fe

Please sign in to comment.