Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace ZMQ #54

Draft
wants to merge 17 commits into
base: main
Choose a base branch
from
Draft

Replace ZMQ #54

wants to merge 17 commits into from

Conversation

magniloquency
Copy link

This PR aims to replace ZMQ in the scaler with a custom solution built using C++ and CFFI

This PR is a work in progress:

  • The code is messy
  • There are bugs
  • Some things are not implemented

Running It

You must have a recent version of g++ installed (the code is built to the gnu++23 standard, but this may change)

Enter the ./scaler/io/cpp directory and run python build.py, this will invoke g++ and build the extension module.

The scaler can now be used as usual.

callback: Callable[[Message], None],
topic: bytes,
exit_callback: Optional[Callable[[], None]] = None,
stop_event: threading.Event = threading.Event(),
daemonic: bool = False,
timeout_seconds: int = -1,
):
raise NotImplementedError
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is TODO

Comment on lines 15 to 36
class BorrowedMessage:
_payload_buffer: "FFITypes.buffer"
_payload: bytes | None
_address: bytes

def __init__(self, obj: "FFITypes.CData"): # Message*
# the message owns the address and it must be freed when we're done with it
self._payload_buffer = ffi.buffer(ffi.gc(obj.payload.data, C.free), obj.payload.len)
self._payload = None

# copy the address
self._address = bytes(ffi.buffer(obj.address.data, obj.address.len))

@property
def payload(self) -> bytes:
if self._payload is None:
self._payload = bytes(self._payload_buffer)
return self._payload

@property
def address(self) -> bytes:
return self._address
Copy link
Author

@magniloquency magniloquency Feb 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recv and Zero-Copy (ZC)

I'll skip over some details to keep this brief, but receiving data with zero-copy comes with some challenges.

Background

Here are the relevant C structs:

struct Bytes {
    uint8_t *data;
    size_t len;
};

struct Message {
    Bytes address;
    Bytes payload;
};

struct Bytes is simply a pointer to some data and a length. In struct Message, address refers to the sending peer's identity, and when a message is received this is set to point into memory owned by that peer. The payload is the actual content of the message, and is a malloc'd buffer owned by this message.

The Address

When one of these messages is received and passed back to Python, the address must be copied immediately, because the data is owned by the peer, which could disappear at any time, leaving a dangling pointer. It may be possible to avoid this copy using a complex reference-counting scheme, akin to std::shared_ptr, but over ffi, however the benefits of this approach may not outweigh the additional cost of the bookkeeping and extra complexity it would add to the implementation.

The Payload

The larger issue is copying the payload. Surely, because the buffer is owned by the message, there's no need to copy? In theory, yes, Python can access this buffer directly without any need to copy.

Using CFFI to Achieve ZC

We can use ffi.buffer(ptr, len) to get an object compatible with Python's buffer protocol, without copying the data. Further, ffi.gc(ptr, cleanup) allows us to pass the pointer back to C for free-ing when the Python object is deleted. Putting these together, ffi.buffer(ffi.gc(ptr, cleanup), len) is exactly what we need for zero-copy receives.

Integration with pycapnp

pycapnp's from_bytes() function supports deserializing from any object supporting the buffer protocol, which sounds like it is perfect for us, except that there is an integration issue.

The lifetime of our buffer is extremely important, because as soon as its dropped the backing memory is freed and the pointer becomes invalid and you would get a use-after-free bug. This is precisely the problem with pycapnp. For whatever reason, it seems that passing an ffi.gc() ptr to pycapnp for deserialization causes it to be freed before pycapnp is actually done with it.

Unfortunately the lifetime of the buffer after it is passed to pycapnp is a little hard to track as it is immediately passed off to some Cython code.

It seems like a silly problem, because if we just leak the memory, all of this works. The problem is entirely about figuring out how to free the buffer at the right time. We can consider some other ways to make the buffer live long enough.

Attempts to Fix the Lifetime

I tried several approaches to fix this out-of-band. All we need to do is keep the Python object alive long enough, so what if we attach it to something else?

We can attach the buffer to the Message class in protocol.python.mixins, unfortunately this still does not seem to be good enough. Message.get_message returns the actual pycapnp object which I assume outlives the Message object somehow.

What if we attach the buffer to the pycapnp object directly? This would be perfect, because it means that the buffer would live for exactly for as long as it needs to.

What type is that pycapnp object? It's a _DynamicStructReader. Unfortunately this class is defined in Cython and there is no way to attach the buffer to it. I also tried using weakref for this via weakref.finalize(), however this also does not work for the same reason (it raises a type error).

How does PyZMQ Achieve ZC?

At this point we must consider how we got here. ZC was working fine with pyzmq, right? Well, I dug into that and my conclusion is... it was never truly zero-copy. This is a bold statement, so let me back this up with some evidence.

The async connector contains this snippet:

payload: zmq.Frame = self._socket.recv(copy=False)
result: Optional[Message] = deserialize(payload.bytes)

The async binder, and so on contain very similar code.

This appears to be zero-copy, and it is - right until payload.bytes is accessed.

When recv(copy=False) is called, zmq reads the message into a C-buffer and passes it back to Python using ffi.buffer(), exactly as we are doing, but with one major caveat. Frame.bytes is an @property that calls a method, and this method copies the data out of the ffi.buffer into a Python bytes object. Here is the relevant code in pyzmq.

PyZMQ wraps the ffi.buffer in a memoryview and calls .tobytes() on it. It's not hard to verify that this creates a copy:

>>> data = b"mybytes"
>>> view = memoryview(data)
>>> might_be_a_copy = view.tobytes()
>>> data == might_be_a_copy # check if the data is the same
True
>>> id(data) == id(might_be_a_copy) # check if they refer to the same object
False

Conclusion

In conclusion, it seems like it should be possible to truly achieve ZC recv's in our implementation, but it will take some investigation to integrate it with pycapnp in a way that avoids use-after-free errors. For now, copying the payload means that we are no worse than the existing PyZMQ implementation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant