Skip to content

Commit

Permalink
updates from PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
zeroshade committed Sep 13, 2024
1 parent ff7fcbe commit d68f0e8
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 14 deletions.
3 changes: 2 additions & 1 deletion cpp/src/arrow/c/abi.h
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,8 @@ struct ArrowAsyncProducer {

// This cancel callback signals a producer that it must eventually stop making calls
// to on_next_task. It must be idempotent and thread-safe. After calling cancel once,
// subsequent calls must be NOPs.
// subsequent calls must be NOPs. This must not call any consumer-side handlers other
// than `on_error`.
//
// It is not required that calling cancel affect the producer immediately, only that it
// must eventually stop calling on_next_task and subsequently call release on the
Expand Down
35 changes: 22 additions & 13 deletions docs/source/format/CDeviceDataInterface.rst
Original file line number Diff line number Diff line change
Expand Up @@ -743,7 +743,7 @@ The structure has the following fields:
Unless the ``on_error`` handler is called, this will always get called exactly once and will be
the first method called on this object. As such the producer *MUST* provide an ``ArrowAsyncProducer``
object when calling this function to allow the consumer to manage back-pressure and flow control.
The producer maintains ownership of the ``ArrowAsyncProducer`` and must clean it up before or after
The producer maintains ownership of the ``ArrowAsyncProducer`` and must clean it up *after*
calling the release callback on this object.

A producer that receives a non-zero result here must not subsequently call anything other than
Expand All @@ -760,6 +760,11 @@ The structure has the following fields:
better consumer-focused thread control as far as receiving the data. A call to this function
simply indicates that data is available via the provided task.

The producer signals the end of the stream by passing ``NULL`` for the ``ArrowAsyncTask``
pointer instead of a valid address. This task object is only valid during the lifetime of
this function call. If the consumer wants to use the task beyond the scope of this method, it
must copy or move its contents to a new ArrowAsyncTask object.

The ``const char*`` parameter exists for producers to provide any extra contextual information
they want. This is encoded in the same format as :c:member:`ArrowSchema.metadata`. If not ``NULL``,
the lifetime is only the scope of the call to this function. A consumer who wants to maintain
Expand All @@ -770,9 +775,6 @@ The structure has the following fields:
The :c:member:`ArrowAsyncProducer.request` callback must be called to start receiving calls to this
handler.

This function *MUST* be able to be called re-entrantly on the same thread to allow for the
:c:member:`ArrowAsyncProducer.request` callback to potentially call this method recursively.

.. c:member:: void (*ArrowAsyncDeviceStreamHandler.on_error)(struct ArrowAsyncDeviceStreamHandler, int, const char*, const char*)
*Mandatory.* Handler to be called when an error is encountered by the producer. After calling
Expand Down Expand Up @@ -819,7 +821,10 @@ This producer-provided structure has the following fields:
*Mandatory.* A callback to populate the provided ``ArrowDeviceArray`` with the available data.
The order of ArrowAsyncTasks provided by the producer enables a consumer to know the order of
the data to process.
the data to process. If the consumer does not care about the data that is owned by this task,
it must still call ``get_data`` so that the producer can perform any required cleanup. ``NULL``
should be passed as the device array pointer to indicate that the consumer doesn't want the
actual data, letting the task perform necessary cleanup.

If a non-zero value is returned from this, it should be followed only by the producer calling
the ``on_error`` callback of the ``ArrowAsyncDeviceStreamHandler``. Because calling this method
Expand All @@ -829,14 +834,14 @@ This producer-provided structure has the following fields:
:c:member:`ArrowAsyncDeviceStreamHandler.on_error` callback.

Rather than having a separate release callback, any required cleanup should be performed as part
of the invocation of this callback, with ownership of the Array being passed to the consumer-provided
parameter calling this, and must be released separately.
of the invocation of this callback. Ownership of the Array is given to the pointer passed in as
a parameter, and this array must be released separately.

It is only valid to call this method exactly once.

.. c:member:: void* ArrowArrayTask.private_data
*Optional.* An opaque pointer to consumer-provided private data.
*Optional.* An opaque pointer to producer-provided private data.

Consumers *MUST NOT* process this member. Lifetime of this member is handled by
the producer who created this object, and should be cleaned up if necessary during
Expand Down Expand Up @@ -873,7 +878,8 @@ This producer-provided and managed object has the following fields:
*Mandatory.* This function signals to the producer that it must *eventually* stop calling
``on_next_task``. Calls to ``cancel`` must be idempotent and thread-safe. After calling
it once, subsequent calls *MUST* be a NOP.
it once, subsequent calls *MUST* be a NOP. This *MUST NOT* call any consumer-side handlers
other than ``on_error``.

It is not required that calling ``cancel`` affect the producer *immediately*, only that it
must eventually stop calling ``on_next_task`` and then subsequently call ``release``
Expand Down Expand Up @@ -940,10 +946,13 @@ calling ``release`` on the stream handler object.
Thread safety
'''''''''''''

All handler functions should only be called in a serialized manner, but are not guaranteed
to be called from the same thread every time. A producer should wait for handler callbacks to
return before calling the next handler callback, and before calling the ``release`` callback.
As a result, back-pressure is managed by how long it takes for the ``on_next`` handler to return.
All handler functions on the ``ArrowAsyncDeviceStreamHandler`` should only be called in a
serialized manner, but are not guaranteed to be called from the same thread every time. A
producer should wait for handler callbacks to return before calling the next handler callback,
and before calling the ``release`` callback.

Back-pressure is managed by the consumer making calls to :c:member:`ArrowAsyncProducer.request`
to indicate how many arrays it is ready to receive.

The ``ArrowAsyncDeviceStreamHandler`` object should be able to handle callbacks as soon as
it is passed to the producer, any initialization should be performed before it is provided.
Expand Down

0 comments on commit d68f0e8

Please sign in to comment.