Skip to content

Commit

Permalink
Support async predictors (✨ again ✨) (#2025)
Browse files Browse the repository at this point in the history
* Revert "Revert "Support async predictors (#2010)" (#2022)"

This reverts commit 8333a83.

* Use anync stream redirector in setup

so that the sync stream redirector context is only entered once, as this
is a known source of problems associated with stdout/stderr orphaning.

* Do not assert that writes from C are captured during setup

* Do not wrap empty data in Log events

* Exclude invalid output paths from infrastructure errors (#2030)

* Exclude invalid output paths from infrastructure errors

Closes PLAT-380

* Fix words given pluralization

Co-authored-by: F <[email protected]>
Signed-off-by: Dan Buch <[email protected]>

---------

Signed-off-by: Dan Buch <[email protected]>
Co-authored-by: F <[email protected]>

---------

Signed-off-by: Dan Buch <[email protected]>
Co-authored-by: F <[email protected]>
  • Loading branch information
meatballhat and erbridge authored Oct 29, 2024
1 parent f8e3461 commit 5eb31ff
Show file tree
Hide file tree
Showing 9 changed files with 415 additions and 49 deletions.
91 changes: 91 additions & 0 deletions python/cog/server/connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import asyncio
import multiprocessing
from multiprocessing.connection import Connection
from typing import Any, Optional

from typing_extensions import Buffer

_spawn = multiprocessing.get_context("spawn")


class AsyncConnection:
def __init__(self, connection: Connection) -> None:
self._connection = connection
self._event = asyncio.Event()
loop = asyncio.get_event_loop()
loop.add_reader(self._connection.fileno(), self._event.set)

def send(self, obj: Any) -> None:
"""Send a (picklable) object"""

self._connection.send(obj)

async def _wait_for_input(self) -> None:
"""Wait until there is an input available to be read"""

while not self._connection.poll():
await self._event.wait()
self._event.clear()

async def recv(self) -> Any:
"""Receive a (picklable) object"""

await self._wait_for_input()
return self._connection.recv()

def fileno(self) -> int:
"""File descriptor or handle of the connection"""
return self._connection.fileno()

def close(self) -> None:
"""Close the connection"""
self._connection.close()

async def poll(self, timeout: float = 0.0) -> bool:
"""Whether there is an input available to be read"""

if self._connection.poll():
return True

try:
await asyncio.wait_for(self._wait_for_input(), timeout=timeout)
except asyncio.TimeoutError:
return False
return self._connection.poll()

def send_bytes(
self, buf: Buffer, offset: int = 0, size: Optional[int] = None
) -> None:
"""Send the bytes data from a bytes-like object"""

self._connection.send_bytes(buf, offset, size)

async def recv_bytes(self, maxlength: Optional[int] = None) -> bytes:
"""
Receive bytes data as a bytes object.
"""

await self._wait_for_input()
return self._connection.recv_bytes(maxlength)

async def recv_bytes_into(self, buf: Buffer, offset: int = 0) -> int:
"""
Receive bytes data into a writeable bytes-like object.
Return the number of bytes read.
"""

await self._wait_for_input()
return self._connection.recv_bytes_into(buf, offset)


class LockedConnection:
def __init__(self, connection: Connection) -> None:
self.connection = connection
self._lock = _spawn.Lock()

def send(self, obj: Any) -> None:
with self._lock:
self.connection.send(obj)

def recv(self) -> Any:
return self.connection.recv()
6 changes: 6 additions & 0 deletions python/cog/server/eventtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@

# From worker parent process
#
@define
class Cancel:
# TODO: identify which prediction!
pass


@define
class PredictionInput:
payload: Dict[str, Any]
Expand Down
101 changes: 100 additions & 1 deletion python/cog/server/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import threading
import uuid
from types import TracebackType
from typing import Any, Callable, Dict, List, Sequence, TextIO, Union
from typing import Any, BinaryIO, Callable, Dict, List, Sequence, TextIO, Union

import pydantic
from typing_extensions import Self
Expand All @@ -19,6 +19,45 @@
from .errors import CogRuntimeError, CogTimeoutError


class _SimpleStreamWrapper(io.TextIOWrapper):
"""
_SimpleStreamWrapper wraps a binary I/O buffer and provides a TextIOWrapper
interface (primarily write and flush methods) which call a provided
callback function instead of (or, if `tee` is True, in addition to) writing
to the underlying buffer.
"""

def __init__(
self,
buffer: BinaryIO,
callback: Callable[[str, str], None],
tee: bool = False,
) -> None:
super().__init__(buffer, line_buffering=True)

self._callback = callback
self._tee = tee
self._buffer = []

def write(self, s: str) -> int:
length = len(s)
self._buffer.append(s)
if self._tee:
super().write(s)
else:
# If we're not teeing, we have to handle automatic flush on
# newline. When `tee` is true, this is handled by the write method.
if "\n" in s or "\r" in s:
self.flush()
return length

def flush(self) -> None:
self._callback(self.name, "".join(self._buffer))
self._buffer.clear()
if self._tee:
super().flush()


class _StreamWrapper:
def __init__(self, name: str, stream: TextIO) -> None:
self.name = name
Expand Down Expand Up @@ -86,6 +125,66 @@ def original(self) -> TextIO:
return self._original_fp


if sys.version_info < (3, 9):

class _AsyncStreamRedirectorBase(contextlib.AbstractContextManager):
pass
else:

class _AsyncStreamRedirectorBase(
contextlib.AbstractContextManager["AsyncStreamRedirector"]
):
pass


class AsyncStreamRedirector(_AsyncStreamRedirectorBase):
"""
AsyncStreamRedirector is a context manager that redirects I/O streams to a
callback function. If `tee` is True, it also writes output to the original
streams.
Unlike StreamRedirector, the underlying stream file descriptors are not
modified, which means that only stream writes from Python code will be
captured. Writes from native code will not be captured.
Unlike StreamRedirector, the streams redirected cannot be configured. The
context manager is only able to redirect STDOUT and STDERR.
"""

def __init__(
self,
callback: Callable[[str, str], None],
tee: bool = False,
) -> None:
self._callback = callback
self._tee = tee

stdout_wrapper = _SimpleStreamWrapper(sys.stdout.buffer, callback, tee)
stderr_wrapper = _SimpleStreamWrapper(sys.stderr.buffer, callback, tee)
self._stdout_ctx = contextlib.redirect_stdout(stdout_wrapper)
self._stderr_ctx = contextlib.redirect_stderr(stderr_wrapper)

def __enter__(self) -> Self:
self._stdout_ctx.__enter__()
self._stderr_ctx.__enter__()
return self

def __exit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
traceback: TracebackType | None,
) -> None:
self._stdout_ctx.__exit__(exc_type, exc_value, traceback)
self._stderr_ctx.__exit__(exc_type, exc_value, traceback)

def drain(self, timeout: float = 0.0) -> None:
# Draining isn't complicated for AsyncStreamRedirector, since we're not
# moving data between threads. We just need to flush the streams.
sys.stdout.flush()
sys.stderr.flush()


if sys.version_info < (3, 9):

class _StreamRedirectorBase(contextlib.AbstractContextManager):
Expand Down
13 changes: 10 additions & 3 deletions python/cog/server/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,10 +408,17 @@ def _upload_files(self, output: Any) -> Any:
try:
# TODO: clean up output files
return self._file_uploader(output)
except (FileNotFoundError, NotADirectoryError):
# These error cases indicate that an output path returned by a prediction does
# not actually exist, so there is no way for us to even attempt to upload it.
# The error is re-raised without wrapping because this is not considered an
# "infrastructure error", such as happens during an upload of a file that
# **does** exist.
raise
except Exception as error: # pylint: disable=broad-exception-caught
# If something goes wrong uploading a file, it's irrecoverable.
# The re-raised exception will be caught and cause the prediction
# to be failed, with a useful error message.
# Any other errors that occur during file upload are irrecoverable and
# considered "infrastructure errors" because there is a high likelihood that
# the error happened in a layer that is outside the control of the model.
raise FileUploadError("Got error trying to upload output files") from error

def _handle_done(self, f: "Future[Done]") -> None:
Expand Down
Loading

0 comments on commit 5eb31ff

Please sign in to comment.