-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-43631: [C][Format] Add ArrowAsyncDeviceStreamHandler interface #43632
Changes from all commits
b68f0bb
fa26073
331033f
31117c9
7593f57
e612aed
946c46f
8fb66f2
4cb7991
57a79fe
5b5b4d9
a3d4f88
58c2686
1044f1a
8ed0607
a359550
9bed168
95c40f7
b9bddaf
608b790
dbea393
6e9980d
18efd79
eb6bccb
b2c4486
278af8b
922b103
bcc8a66
f6d3b67
38b21b2
2ff218b
d3a3327
e65ef06
92dcb78
c6bd66b
099e38e
0e3574b
5aa29a8
ca9dfbd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -228,6 +228,212 @@ struct ArrowDeviceArrayStream { | |
|
||
#endif // ARROW_C_DEVICE_STREAM_INTERFACE | ||
|
||
#ifndef ARROW_C_ASYNC_STREAM_INTERFACE | ||
# define ARROW_C_ASYNC_STREAM_INTERFACE | ||
|
||
// EXPERIMENTAL: ArrowAsyncTask represents available data from a producer that was passed | ||
// to an invocation of `on_next_task` on the ArrowAsyncDeviceStreamHandler. | ||
// | ||
// The reason for this Task approach instead of the Async interface returning | ||
// the Array directly is to allow for more complex thread handling and reducing | ||
// context switching and data transfers between CPU cores (e.g. from one L1/L2 | ||
// cache to another) if desired. | ||
// | ||
// For example, the `on_next_task` callback can be called when data is ready, while | ||
// the producer puts potential "decoding" logic in the `ArrowAsyncTask` object. This | ||
// allows for the producer to manage the I/O on one thread which calls `on_next_task` | ||
// and the consumer can determine when the decoding (producer logic in the `extract_data` | ||
// callback of the task) occurs and on which thread, to avoid a CPU core transfer | ||
// (data staying in the L2 cache). | ||
struct ArrowAsyncTask { | ||
// This callback should populate the ArrowDeviceArray associated with this task. | ||
bkietz marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// The order of ArrowAsyncTasks provided by the producer enables a consumer to | ||
// ensure the order of data to process. | ||
// | ||
// This function is expected to be synchronous, but should not perform any blocking | ||
// I/O. Ideally it should be as cheap as possible so as to not tie up the consumer | ||
// thread unnecessarily. | ||
// | ||
// Returns: 0 if successful, errno-compatible error otherwise. | ||
// | ||
// If a non-0 value is returned then it should be followed by a call to `on_error` | ||
// on the appropriate ArrowAsyncDeviceStreamHandler. This is because it's highly | ||
// likely that whatever is calling this function may be entirely disconnected from | ||
// the current control flow. Indicating an error here with a non-zero return allows | ||
// the current flow to be aware of the error occurring, while still allowing any | ||
// logging or error handling to still be centralized in the `on_error` callback of | ||
// the original Async handler. | ||
// | ||
// Rather than a release callback, any required cleanup should be performed as part | ||
// of the invocation of `extract_data`. Ownership of the Array is passed to the consumer | ||
// calling this, and so it must be released separately. | ||
// | ||
// It is only valid to call this method exactly once. | ||
int (*extract_data)(struct ArrowArrayTask* self, struct ArrowDeviceArray* out); | ||
|
||
// opaque task-specific data | ||
void* private_data; | ||
}; | ||
|
||
// EXPERIMENTAL: ArrowAsyncProducer represents a 1-to-1 relationship between an async | ||
// producer and consumer. This object allows the consumer to perform backpressure and flow | ||
// control on the asynchronous stream processing. This object must be owned by the | ||
// producer who creates it, and thus is responsible for cleaning it up. | ||
bkietz marked this conversation as resolved.
Show resolved
Hide resolved
|
||
struct ArrowAsyncProducer { | ||
// A consumer must call this function to start receiving on_next_task calls. | ||
// | ||
// It *must* be valid to call this synchronously from within `on_next_task` or | ||
// `on_schema`, but this function *must not* immediately call `on_next_task` so as | ||
// to avoid recursion and reentrant callbacks. | ||
// | ||
// After cancel has been called, additional calls to this function must be NOPs, | ||
// but allowed. While not cancelled, calling this function must register the | ||
// given number of additional arrays/batches to be produced with the producer. | ||
// The producer should only call `on_next_task` at most the registered number | ||
// of arrays before propagating backpressure. | ||
Comment on lines
+289
to
+293
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So is the idea that I will call this method many times? In other words, if I want to allow up to 32 concurrent tasks I would call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's definitely one way you could do it which would be valid. Another possibility would be that you could call The idea is that backpressure is managed by the consumer signalling the producer "hey, I can handle up to N more batches of data right now". There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Your mermaid diagram only shows There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be pretty easy to toss a call to request inside the branch that calls |
||
// | ||
// Any error encountered by calling request must be propagated by calling the `on_error` | ||
// callback of the ArrowAsyncDeviceStreamHandler. | ||
// | ||
// While not cancelled, any subsequent calls to `on_next_task`, `on_error` or | ||
// `release` should be scheduled by the producer to be called later. | ||
// | ||
// It is invalid for a consumer to call this with a value of n <= 0, producers should | ||
// error if given such a value. | ||
void (*request)(struct ArrowAsyncProducer* self, int64_t n); | ||
|
||
// This cancel callback signals a producer that it must eventually stop making calls | ||
// to on_next_task. It must be idempotent and thread-safe. After calling cancel once, | ||
// subsequent calls must be NOPs. This must not call any consumer-side handlers other | ||
// than `on_error`. | ||
// | ||
// It is not required that calling cancel affect the producer immediately, only that it | ||
// must eventually stop calling on_next_task and subsequently call release on the | ||
// async handler. As such, a consumer must be prepared to receive one or more calls to | ||
// `on_next_task` even after calling cancel if there are still requested arrays pending. | ||
// | ||
// Successful cancellation should *not* result in the producer calling `on_error`, it | ||
// should finish out any remaining tasks and eventually call `release`. | ||
// | ||
// Any error encountered during handling a call to cancel must be reported via the | ||
// on_error callback on the async stream handler. | ||
void (*cancel)(struct ArrowAsyncProducer* self); | ||
|
||
// Any additional metadata tied to a specific stream of data. This must either be NULL | ||
// or a valid pointer to metadata which is encoded in the same way schema metadata | ||
// would be. Non-null metadata must be valid for the lifetime of this object. As an | ||
// example a producer could use this to provide the total number of rows and/or batches | ||
// in the stream if known. | ||
const char* additional_metadata; | ||
|
||
// producer-specific opaque data. | ||
void* private_data; | ||
}; | ||
|
||
// EXPERIMENTAL: Similar to ArrowDeviceArrayStream, except designed for an asynchronous | ||
// style of interaction. While ArrowDeviceArrayStream provides producer | ||
// defined callbacks, this is intended to be created by the consumer instead. | ||
// The consumer passes this handler to the producer, which in turn uses the | ||
// callbacks to inform the consumer of events in the stream. | ||
pitrou marked this conversation as resolved.
Show resolved
Hide resolved
|
||
struct ArrowAsyncDeviceStreamHandler { | ||
// Handler for receiving a schema. The passed in stream_schema must be | ||
// released or moved by the handler (producer is giving ownership of the schema to | ||
// the handler, but not ownership of the top level object itself). | ||
// | ||
// With the exception of an error occurring (on_error), this must be the first | ||
// callback function which is called by a producer and must only be called exactly | ||
// once. As such, the producer should provide a valid ArrowAsyncProducer instance | ||
// so the consumer can control the flow. See the documentation on ArrowAsyncProducer | ||
// for how it works. The ArrowAsyncProducer is owned by the producer who calls this | ||
// function and thus the producer is responsible for cleaning it up when calling | ||
// the release callback of this handler. | ||
// | ||
// If there is any additional metadata tied to this stream, it will be provided as | ||
// a non-null value for the `additional_metadata` field of the ArrowAsyncProducer | ||
// which will be valid at least until the release callback is called. | ||
// | ||
// Return value: 0 if successful, `errno`-compatible error otherwise | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the producer supposed to do with an error? (FWIW, gRPC just uses void for its callbacks) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I imagine the consumer should be able to return something to tell the producer to stop (I agree this should be explicit, though, and errno might not be the most precise tool but it might be slightly more informative than a For argument's sake: If a non-zero value is returned, the producer must not call any other callbacks except the release callback. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right, but it might make more sense to just have a unified cancellation mechanism. And again, what is the producer really supposed to do with the error code? It's just going to drop it on the floor. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Callbacks returning an error like this is pretty common in Rust. Normally what the producer does is:
I think it's nice to be able to return an error code here. For example, maybe the consumer can't handle one of the data types reported by the schema. Either way though, if an error is returned or if the consumer explicitly cancels in some way, I assume the producer needs to keep accepting calls to the callbacks right? (e.g. returning an error here won't prevent There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
It won't prevent it, but the docs explicitly state that if an error is returned from the callback, the producer should only call release after that and not call further callbacks. though there isn't anything to functionally prevent it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In general, I would suggest that ADBC's async APIs should be compatible with Reactive Streams (RS) semantics in all places except for the semantics of Also, users of ADBC's async APIs in JVM and C# will be able to leverage some very high quality code for these platforms that implement RS semantics, such as j.u.c.SubmissionPublisher, kotlinx.coroutines.reactive, dotnet/reactive, hopefully with minimal modifications, or perhaps no modifications at all (the different semantics of Per RS 1.7, it's required that producer stops on error from consumer callback:
Re:
Indeed, per RS 2.5, consumer MUST go out of its way even to prevent buggy concurrent use by the producer, let alone not to call any of its methods itself concurrently with the producer, or ofter the producer has returned error or completion signal. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I think it can be useful for monitoring on the middleware/transport level, if the API wraps communication with a remote server. See APPLICATION_ERROR in RSocket protocol. |
||
// | ||
// A producer that receives a non-zero return here should stop producing and eventually | ||
// call release instead. | ||
int (*on_schema)(struct ArrowAsyncDeviceStreamHandler* self, | ||
struct ArrowSchema* stream_schema); | ||
|
||
// Handler for receiving data. This is called when data is available providing an | ||
// ArrowAsyncTask struct to signify it. The producer indicates the end of the stream | ||
// by passing NULL as the value for the task rather than a valid pointer to a task. | ||
// The task object is only valid for the lifetime of this function call, if a consumer | ||
// wants to utilize it after this function returns, it must copy or move the contents | ||
// of it to a new ArrowAsyncTask object. | ||
// | ||
// The `request` callback of a provided ArrowAsyncProducer must be called in order | ||
// to start receiving calls to this handler. | ||
// | ||
// The metadata argument can be null or can be used by a producer | ||
// to pass arbitrary extra information to the consumer (such as total number | ||
// of rows, context info, or otherwise). The data should be passed using the same | ||
// encoding as the metadata within the ArrowSchema struct itself (defined in | ||
// the spec at | ||
// https://arrow.apache.org/docs/format/CDataInterface.html#c.ArrowSchema.metadata) | ||
// | ||
// If metadata is non-null then it only needs to exist for the lifetime of this call, | ||
// a consumer who wants it to live after that must copy it to ensure lifetime. | ||
// | ||
// A producer *must not* call this concurrently from multiple different threads. | ||
// | ||
// A consumer must be prepared to receive one or more calls to this callback even | ||
// after calling cancel on the corresponding ArrowAsyncProducer, as cancel does not | ||
// guarantee it happens immediately. | ||
// | ||
// Return value: 0 if successful, `errno`-compatible error otherwise. | ||
// | ||
// If the consumer returns a non-zero return from this method, that indicates to the | ||
// producer that it should stop propagating data as an error occurred. After receiving | ||
// such a return, the only interaction with this object is for the producer to call | ||
// the `release` callback. | ||
int (*on_next_task)(struct ArrowAsyncDeviceStreamHandler* self, | ||
struct ArrowAsyncTask* task, const char* metadata); | ||
zeroshade marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// Handler for encountering an error. The producer should call release after | ||
// this returns to clean up any resources. The `code` passed in can be any error | ||
// code that a producer wants, but should be errno-compatible for consistency. | ||
// | ||
// If the message or metadata are non-null, they will only last as long as this | ||
// function call. The consumer would need to perform a copy of the data if it is | ||
// necessary for them to live past the lifetime of this call. | ||
// | ||
// Error metadata should be encoded as with metadata in ArrowSchema, defined in | ||
// the spec at | ||
// https://arrow.apache.org/docs/format/CDataInterface.html#c.ArrowSchema.metadata | ||
// | ||
// It is valid for this to be called by a producer with or without a preceding call | ||
// to ArrowAsyncProducer.request. | ||
// | ||
// This callback must not call any methods of an ArrowAsyncProducer object. | ||
void (*on_error)(struct ArrowAsyncDeviceStreamHandler* self, int code, | ||
zeroshade marked this conversation as resolved.
Show resolved
Hide resolved
|
||
const char* message, const char* metadata); | ||
zeroshade marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// Release callback to release any resources for the handler. Should always be | ||
// called by a producer when it is done utilizing a handler. No callbacks should | ||
// be called after this is called. | ||
// | ||
// It is valid for the release callback to be called by a producer with or without | ||
// a preceding call to ArrowAsyncProducer.request. | ||
// | ||
// The release callback must not call any methods of an ArrowAsyncProducer object. | ||
void (*release)(struct ArrowAsyncDeviceStreamHandler* self); | ||
|
||
// MUST be populated by the producer BEFORE calling any callbacks other than release. | ||
// This provides the connection between a handler and its producer, and must exist until | ||
// the release callback is called. | ||
struct ArrowAsyncProducer* producer; | ||
|
||
// Opaque handler-specific data | ||
void* private_data; | ||
}; | ||
|
||
#endif // ARROW_C_ASYNC_STREAM_INTERFACE | ||
|
||
#ifdef __cplusplus | ||
} | ||
#endif |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a consequence of consumer being able to maintain an upper bound on memory consumption in terms of the number record batches in memory at any point in time.
Readers might be unable to make this causal inference. I don't really have a suggestion on how to reword this, just saying that the connection might be unclear to many readers.