You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I have a nagging feeling that the family of Arrow protocols and formats misses a generic protocol for streaming columnar data across ArrowArrays, such as to pack data from multiple arrays into a contiguous chunk of GPU memory. The goal is similar to Dissociated IPC, but I think a more flexible design is possible.
The times and places where the concerns of Dissociated IPC are relevant is also when asynchronous programming model is often used (such as, because GPU operations are asynchronous with CPU), and therefore I think it makes sense to consider asynchronous streaming with cancels and back pressure, as well as off-loading a-la Dissociated IPC in a single protocol design space.
Dissociated IPC’s <free_data> signal semantically always coincides with other async streaming signals, as will be detailed below, thus a separate type of message is not needed.
Please don't take the description below too literally, it's more of a directional sketch and the invitation for discussion than a concrete proposal.
Also, this is inspired by @westonpace's "columnar container format", and can be seen as an attempt to do the same, but "for the wire/transfer" rather than "for the disk/storage".
Protocol description
I think it makes the most sense to mostly inherit reactive streaming semantics from RSocket, specifically the semantics of ERROR, CANCEL, LEASE, REQUEST_N, and COMPLETE signals, indicating and how they interact with payload streaming: see Request Stream sequences.
Array streaming warrants one modification to RSocket semantics, however. RSocket’s flow control is defined in terms of payload count that Requester (I call it Receiver below) requests from Responder (I call it Sender) using REQUEST_N signal. However, the most reasonable unit for flow control in ArrowArray streaming is the number of records/rows, and there will typically be many rows in a single payload.
Data plane characteristics negotiation
Before array streaming begins, Sender and Receiver should negotiate the data plane concerns to minimise copies and optimise for the array production that precedes sending and the array consumption that follows reception, respectively.
Transfer modes
1. Standard (in-band): the data transfer happens over the same transport as negotiation and control messaging. This is the default for Arrow Flight. This default transport will typically be gRPC, QUIC, or WebTransport.
2. Off-loaded Receiver read: Sender sends to Receiver the addresses of memory regions (in its CPU or GPU) containing the array buffers via a PAYLOAD message. Receiver then reads the memory regions via out-of-band transfer. Sender assumes that the memory regions can be freed (or RDMA unpin, etc.) after Receiver sends any subsequent signal to Sender: either ERROR, CANCEL, REQUEST_N (which implies that the Receiver completed processing of this payload and ready to receive the next), or COMPLETE.
Note: This mode is optimal for operation pipelining on Receiver’s side. Receiver’s GPU can pipeline processing operations with RDMA reads in CUDA. This is the mode is supported in Dissociated IPC. However, this mode “breaks the pipeline” on Sender’s side.
When Receiver doesn’t have RDMA access to Sender’s memory, but they are still connected by some transport faster than UDP (or multiple transports in parallel), this mode is still favourable to sending the Array buffers in-band, albeit there won’t be any special win in Receiver’s GPU pipelining.
3. Off-loaded Sender write: Receiver sends to Sender the addresses of memory regions in its CPU or GPU memory via REQUEST_N message (alongside the requested number of rows). The memory regions MUST be large enough to fit this number of rows.
Then Sender writes array buffers to these memory regions via an out-of-band transfer. Sender can write fewer rows than than requested (it’s not guaranteed to have that many rows). Sender sends to Receiver the number of rows written via a subsequent PAYLOAD message.
Upon receiving such a PAYLOAD message (as well as ERROR or COMPLETE signal), Receiver can take over the ownership of the memory regions back again and can free the resources that were needed to enable remote writing: free memory, RDMA unpin, etc.
Note: This mode is optimal for pipelining on Sender’s side. Sender’s GPU can pipeline RDMA writes or send() right after the processing operations that produce the Array buffers. However, RDMA writes are rarely available, so in practice it can mostly be used when Sender and Receiver are on the same PCIe bus, i.e., in the same node.
4. "Simple" off-loading: Off-loading data plane to a different connection (a different port, and, perhaps, over a different transport, such as raw QUIC vs. gRPC) than the standard (in-band) connection, on which the control plane remains. There could perhaps be some performance or Array buffer alignment (see below) reasons to do this.
Note: This is what Dissociated IPC also supports, by introducing the possibility of separate Data and Metadata streams, but Dissociated IPC makes this mode a different "degree of freedom" than the "off-loaded Receiver read" mode. I don't understand the need for this, and it looks unnecessarily over-complicated for me.
UCX or OFI/libfabric negotiation
Ideally, UCX or OFI/libfabric negotiation (rndv protocol) and connection establishment should happen only once per stream, together with the negotiation of Sender and Receiver which transfer mode they should use, rather than before each Array sent in the stream.
Batch sizes and alignment
Both Sender and Receiver may have minimum and/or optimal and/or maximum batch sizes. They should try agree to both use the batch size optimal for the slower side, and then Receiver requests this agreed-upon number of rows in each of its REQUEST_N messages.
Receiver can specify that it has a minimum batch size for the off-loaded modes. If Sender has highly irregular Array sizes, it should accumulate them internally until it has prepared (e.g., read from disk) a sufficient number of rows for the minimum batch size. Only the trailing PAYLOAD may have insufficient rows, and it may need fall-back to the standard (in-band) mode.
Finally, Receiver may have requirements regarding Array buffer alignment. If the standard (in-band) transport doesn’t guarantee frame’s data bytes alignment without an extra memory copy (i.e., the 64-byte alignment of the first buffer in the PAYLOAD message), Sender and Receiver can try to avoid this copy by establishing a separate connection (such as, on a different UDP port) with the transport protocol library configured to ensure such alignment.
Flow control
Both Sender and Receiver must keep track of the total cumulative number of rows that Receiver requested via REQUEST_N messages since the opening of the stream and the total cumulative number of rows that Receiver has claimed to deliver via PAYLOAD messages.
Sender must never send PAYLOADs that make the latter number (the number delivered rows) exceed the former (the number of requested rows).
In the “Off-loaded Receiver read” mode, if Receiver needs to indicate to Sender that it has read rows from the memory regions (and thus the Sender can free the associated resources internally), but the Receiver is still fine with the remaining number of requested rows and doesn’t want to increase it, Receiver should send REQUEST_N message with n=0 and the addresses of "consumed" memory regions.
In addition, in the “Off-loaded Sender write” mode, both Sender and Receiver must keep track of the sequences (one sequence for each Array buffer, to be precise) of Receiver’s memory regions that it offered via REQUEST_N messages, and the “high watermark” of written bytes in these memory regions. The high watermarks (one for each Array buffer) “flow through” these memory region sequences. Sender may also copy this information (watermarks) in PAYLOAD messages, to double-check with Receiver, mainly as a sanity measure.
“Multiplexing”: Array splitting into multiple streams
I think it’s most reasonable that Sender and Receiver negotiate just one transfer mode per Array stream. If multiple modes are needed, Sender can split the Array into groups of buffers that need to be transferred in different modes, and then the original semantics “reconstructed” in the Receiver’s application.
It’s also most likely that non-standard transfer modes (modes 2 and 3) can only work sanely for fixed-size layout buffers only. For variable-size layouts, it’s too complex to pack buffers from multiple subsequent Arrays into the same contiguous memory regions. I also don’t know what would be the use-cases for this, anyway.
Thus, during the initial negotiation phase, Sender and Receiver can decide how they split Arrays into multiple buffer groups, and transfer them separately over different "synthetic" Array streams, possibly in different transfer modes. However, these Array streams should all share CANCEL, ERROR, COMPLETE, and REQUEST_N signals, happening on the standard (in-band) transport.
The cumulative number of delivered rows (as claimed by Sender in PAYLOAD messages) in this case is assumed to be the minimum such count across all streams.
It can be pointless to “splice” variable-size layout Array buffers that cover more rows than the number of rows in fixed-size layout buffers that Sender delivers in parallel. For example, in Run End Encoding layout, a single run may "suddenly" cover millions of rows (all having the same value in the corresponding column), while fixed-sized layout buffers are delivered in parallel in batches of just 64k. It makes sense to permit such variable-size layout buffers to “over-deliver” rows. The generality is not lost because, as noted in the previous paragraph, Sender and Receiver should track the minimum delivered rows across streams. If Receiver's application needs this, it can keep track of scan pointers within these variable-sized layout buffers and advance them in step with the minimum delivered row count.
Note that semantically, this method of treating variable-sized layouts is close to DictionaryBatches, whose sizes are not predictable relative to the sizes of RecordBatches that they prepend or interleave, such as when the whole-stream DictionaryBatch with lots of rows may prepend a smaller first RecordBatch. Thus, DictionaryBatches can be transferred over their own separate synthetic stream.
The generality is also preserved here, unless someone wants to deliver DictionaryBatches in modes 2 or 3, in which case they have to keep track of two different flow controls: one of the number of "covered" array rows (equal to the culumative number of rows in all Arrays up to and including the next Array in the "original" stream), and DictionaryBatches own records requested/delivered flow control. However, I'm not sure if there is any real need for delivering DictionaryBatches in modes 2 or 3.
Sender assumes that the buffers can be freed (or RDMA unpin, etc.) after Receiver sends any subsequent signal to Sender: either ERROR, CANCEL, REQUEST_N (which implies that the Receiver completed processing of this payload and ready to receive the next), or COMPLETE.
Upon reflection, this creates a race condition if REQUEST_N is in flight while Sender sends another PAYLOAD. So, Receiver should explicitly mention Sender's memory regions that it already consumed in its REQUEST_N messages.
Describe the enhancement requested
I have a nagging feeling that the family of Arrow protocols and formats misses a generic protocol for streaming columnar data across ArrowArrays, such as to pack data from multiple arrays into a contiguous chunk of GPU memory. The goal is similar to Dissociated IPC, but I think a more flexible design is possible.
The times and places where the concerns of Dissociated IPC are relevant is also when asynchronous programming model is often used (such as, because GPU operations are asynchronous with CPU), and therefore I think it makes sense to consider asynchronous streaming with cancels and back pressure, as well as off-loading a-la Dissociated IPC in a single protocol design space.
Dissociated IPC’s
<free_data>
signal semantically always coincides with other async streaming signals, as will be detailed below, thus a separate type of message is not needed.Please don't take the description below too literally, it's more of a directional sketch and the invitation for discussion than a concrete proposal.
Also, this is inspired by @westonpace's "columnar container format", and can be seen as an attempt to do the same, but "for the wire/transfer" rather than "for the disk/storage".
Protocol description
I think it makes the most sense to mostly inherit reactive streaming semantics from RSocket, specifically the semantics of ERROR, CANCEL, LEASE, REQUEST_N, and COMPLETE signals, indicating and how they interact with payload streaming: see Request Stream sequences.
Array streaming warrants one modification to RSocket semantics, however. RSocket’s flow control is defined in terms of payload count that Requester (I call it Receiver below) requests from Responder (I call it Sender) using REQUEST_N signal. However, the most reasonable unit for flow control in ArrowArray streaming is the number of records/rows, and there will typically be many rows in a single payload.
Data plane characteristics negotiation
Before array streaming begins, Sender and Receiver should negotiate the data plane concerns to minimise copies and optimise for the array production that precedes sending and the array consumption that follows reception, respectively.
Transfer modes
1. Standard (in-band): the data transfer happens over the same transport as negotiation and control messaging. This is the default for Arrow Flight. This default transport will typically be gRPC, QUIC, or WebTransport.
2. Off-loaded Receiver read: Sender sends to Receiver the addresses of memory regions (in its CPU or GPU) containing the array buffers via a PAYLOAD message. Receiver then reads the memory regions via out-of-band transfer. Sender assumes that the memory regions can be freed (or RDMA unpin, etc.) after Receiver sends any subsequent signal to Sender: either ERROR, CANCEL, REQUEST_N (which implies that the Receiver completed processing of this payload and ready to receive the next), or COMPLETE.
Note: This mode is optimal for operation pipelining on Receiver’s side. Receiver’s GPU can pipeline processing operations with RDMA reads in CUDA. This is the mode is supported in Dissociated IPC. However, this mode “breaks the pipeline” on Sender’s side.
When Receiver doesn’t have RDMA access to Sender’s memory, but they are still connected by some transport faster than UDP (or multiple transports in parallel), this mode is still favourable to sending the Array buffers in-band, albeit there won’t be any special win in Receiver’s GPU pipelining.
3. Off-loaded Sender write: Receiver sends to Sender the addresses of memory regions in its CPU or GPU memory via REQUEST_N message (alongside the requested number of rows). The memory regions MUST be large enough to fit this number of rows.
Then Sender writes array buffers to these memory regions via an out-of-band transfer. Sender can write fewer rows than than requested (it’s not guaranteed to have that many rows). Sender sends to Receiver the number of rows written via a subsequent PAYLOAD message.
Upon receiving such a PAYLOAD message (as well as ERROR or COMPLETE signal), Receiver can take over the ownership of the memory regions back again and can free the resources that were needed to enable remote writing: free memory, RDMA unpin, etc.
Note: This mode is optimal for pipelining on Sender’s side. Sender’s GPU can pipeline RDMA writes or
send()
right after the processing operations that produce the Array buffers. However, RDMA writes are rarely available, so in practice it can mostly be used when Sender and Receiver are on the same PCIe bus, i.e., in the same node.4. "Simple" off-loading: Off-loading data plane to a different connection (a different port, and, perhaps, over a different transport, such as raw QUIC vs. gRPC) than the standard (in-band) connection, on which the control plane remains. There could perhaps be some performance or Array buffer alignment (see below) reasons to do this.
Note: This is what Dissociated IPC also supports, by introducing the possibility of separate Data and Metadata streams, but Dissociated IPC makes this mode a different "degree of freedom" than the "off-loaded Receiver read" mode. I don't understand the need for this, and it looks unnecessarily over-complicated for me.
UCX or OFI/libfabric negotiation
Ideally, UCX or OFI/libfabric negotiation (rndv protocol) and connection establishment should happen only once per stream, together with the negotiation of Sender and Receiver which transfer mode they should use, rather than before each Array sent in the stream.
Batch sizes and alignment
Both Sender and Receiver may have minimum and/or optimal and/or maximum batch sizes. They should try agree to both use the batch size optimal for the slower side, and then Receiver requests this agreed-upon number of rows in each of its REQUEST_N messages.
Receiver can specify that it has a minimum batch size for the off-loaded modes. If Sender has highly irregular Array sizes, it should accumulate them internally until it has prepared (e.g., read from disk) a sufficient number of rows for the minimum batch size. Only the trailing PAYLOAD may have insufficient rows, and it may need fall-back to the standard (in-band) mode.
Finally, Receiver may have requirements regarding Array buffer alignment. If the standard (in-band) transport doesn’t guarantee frame’s data bytes alignment without an extra memory copy (i.e., the 64-byte alignment of the first buffer in the PAYLOAD message), Sender and Receiver can try to avoid this copy by establishing a separate connection (such as, on a different UDP port) with the transport protocol library configured to ensure such alignment.
Flow control
Both Sender and Receiver must keep track of the total cumulative number of rows that Receiver requested via REQUEST_N messages since the opening of the stream and the total cumulative number of rows that Receiver has claimed to deliver via PAYLOAD messages.
Sender must never send PAYLOADs that make the latter number (the number delivered rows) exceed the former (the number of requested rows).
In the “Off-loaded Receiver read” mode, if Receiver needs to indicate to Sender that it has read rows from the memory regions (and thus the Sender can free the associated resources internally), but the Receiver is still fine with the remaining number of requested rows and doesn’t want to increase it, Receiver should send REQUEST_N message with n=0 and the addresses of "consumed" memory regions.
In addition, in the “Off-loaded Sender write” mode, both Sender and Receiver must keep track of the sequences (one sequence for each Array buffer, to be precise) of Receiver’s memory regions that it offered via REQUEST_N messages, and the “high watermark” of written bytes in these memory regions. The high watermarks (one for each Array buffer) “flow through” these memory region sequences. Sender may also copy this information (watermarks) in PAYLOAD messages, to double-check with Receiver, mainly as a sanity measure.
“Multiplexing”: Array splitting into multiple streams
I think it’s most reasonable that Sender and Receiver negotiate just one transfer mode per Array stream. If multiple modes are needed, Sender can split the Array into groups of buffers that need to be transferred in different modes, and then the original semantics “reconstructed” in the Receiver’s application.
It’s also most likely that non-standard transfer modes (modes 2 and 3) can only work sanely for fixed-size layout buffers only. For variable-size layouts, it’s too complex to pack buffers from multiple subsequent Arrays into the same contiguous memory regions. I also don’t know what would be the use-cases for this, anyway.
Thus, during the initial negotiation phase, Sender and Receiver can decide how they split Arrays into multiple buffer groups, and transfer them separately over different "synthetic" Array streams, possibly in different transfer modes. However, these Array streams should all share CANCEL, ERROR, COMPLETE, and REQUEST_N signals, happening on the standard (in-band) transport.
The cumulative number of delivered rows (as claimed by Sender in PAYLOAD messages) in this case is assumed to be the minimum such count across all streams.
It can be pointless to “splice” variable-size layout Array buffers that cover more rows than the number of rows in fixed-size layout buffers that Sender delivers in parallel. For example, in Run End Encoding layout, a single run may "suddenly" cover millions of rows (all having the same value in the corresponding column), while fixed-sized layout buffers are delivered in parallel in batches of just 64k. It makes sense to permit such variable-size layout buffers to “over-deliver” rows. The generality is not lost because, as noted in the previous paragraph, Sender and Receiver should track the minimum delivered rows across streams. If Receiver's application needs this, it can keep track of scan pointers within these variable-sized layout buffers and advance them in step with the minimum delivered row count.
Note that semantically, this method of treating variable-sized layouts is close to DictionaryBatches, whose sizes are not predictable relative to the sizes of RecordBatches that they prepend or interleave, such as when the whole-stream DictionaryBatch with lots of rows may prepend a smaller first RecordBatch. Thus, DictionaryBatches can be transferred over their own separate synthetic stream.
The generality is also preserved here, unless someone wants to deliver DictionaryBatches in modes 2 or 3, in which case they have to keep track of two different flow controls: one of the number of "covered" array rows (equal to the culumative number of rows in all Arrays up to and including the next Array in the "original" stream), and DictionaryBatches own records requested/delivered flow control. However, I'm not sure if there is any real need for delivering DictionaryBatches in modes 2 or 3.
cc @zeroshade @raulcd @bkietz @pitrou @lidavidm
Component(s)
FlightRPC, Format
The text was updated successfully, but these errors were encountered: