-
Notifications
You must be signed in to change notification settings - Fork 6
meeting
As explained in the general overview, each 'a-sync' block (see (a-sync coroutines)) or 'compose-a-sync' block (see (a-sync compose)) is a separate unit of computation which appears within itself to proceed sequentially but which also appears to execute concurrently with other 'a-sync' or 'compose-a-sync' blocks running on the same event loop. Each 'a-sync' or 'compose-a-sync' block is therefore in some sense analogous to a thread of execution.
This (a-sync meeting) library file provides a 'meeting' type which can be used to synchronize between such "pseudo-threads" (that is, between a-sync or compose-a-sync blocks). A 'meeting' object is, in terms of communicating sequential processes, an unbuffered (synchronous) channel. Unbuffered channels in the CSP style are a natural fit for use with coroutines running on a single native thread (but not necessarily for native threads running in parallel unless combined with work stealing). They are therefore a natural fit for synchronizing the "pseudo-threads" provided by this library.
Some of the things that can be done by using meetings can also be done using await-generator!. Note also that any one meeting object is strictly for use by "pseudo-threads" which are running on the same event loop, and so 'ipso facto' running in the same native OS thread. To have other native OS threads communicate with an event-loop, use await-task-in-thread!, await-task-in-event-loop!, await-task-in-thread-pool!, await-generator-in-thread!, await-generator-in-event-loop! or await-generator-in-thread-pool! (see (a-sync event-loop) and (a-sync thread-pool)). Having said that, some things, such as having one "pseudo-thread" join on another "pseudo-thread", are more easily done with meeting objects.
A "pseudo-thread" enters a meeting by applying meeting-receive (where it is to receive a datum at the meeting) or meeting-send (where it is to provide the datum) to a meeting object. Once a "pseudo-thread" enters a meeting it cannot leave until another co-operating "pseudo-thread" also enters the meeting so the datum exchange can take place, or until the meeting-close procedure is applied. Once a datum exchange has taken place, a meeting object can be reused for making another exchange (provided the meeting object has not been closed).
Synchronization occurs at the moment that the exchange of the datum takes place. Once one of the "pseudo-threads" leaves the meeting upon meeting-receive or meeting-send (or both) returning, all subsequent events are unsynchronized until another datum exchange is arranged. This means that where two "pseudo-threads" share access to objects other than the datum exchanged and they might be mutated, one "pseudo-thread" is guaranteed to see a value of the unexchanged shared objects which is not earlier than the value they held at the moment of datum exchange, but it may or may not see a later value.
Here is an example of the use of a meeting object:
(set-default-event-loop!) ;; if none has yet been set
(define m1 (make-meeting))
(a-sync (lambda (await resume)
(let loop ([datum (meeting-receive await resume m1)])
(when (not (eq? datum 'stop-iteration))
(display datum)
(newline)
(loop (meeting-receive await resume m1))))))
(a-sync (lambda (await resume)
(let loop ([count 0])
(if (< count 4)
(begin
(meeting-send await resume m1 count)
(loop (+ count 1)))
(meeting-close m1)))))
(event-loop-run!)
And here is one possible implementation of a parallel-map procedure using a meeting object, for a program with an event loop (this uses await-task-in-thread! for simplicity, but in real life you might want to use await-task-in-thread-pool!):
(import
(a-sync coroutines)
(a-sync event-loop)
(a-sync meeting)
(a-sync compose)
(chezscheme))
(set-default-event-loop!) ;; if none has yet been set
(event-loop-block! #t)
(define (parallel-map await resume proc lst)
(if (null? lst)
'()
(let ([head (car lst)]
[tail (cdr lst)]
[m1 (make-meeting)])
(compose-a-sync ([hd (await-task-in-thread! (lambda () (proc head)))])
(meeting-send m1 hd))
(let* ([tl (parallel-map await resume proc tail)]
[hd (meeting-receive await resume m1)])
(cons hd tl)))))
;; simulate a computational task which takes 1 second to complete
(define (do-work i) (sleep (make-time 'time-duration 0 1)) (* i 2))
(a-sync
(lambda (await resume)
(let ([lst (parallel-map await resume do-work (list 1 2 3 4 5))])
(display lst)(newline)
(event-loop-block! #f))))
(event-loop-run!)
See the documentation on the meeting-send and meeting-receive procedures below for an example of multiple "pseudo-threads" sending to or receiving from a single meeting object for fan in and fan out, and for an example of those procedures 'selecting' on more than one meeting object.
The (a-sync meeting) library file provides the following procedures:
(make-meeting [loop])
This procedure makes and returns a meeting object. Meetings are objects on which a-sync or compose-a-sync blocks running on the same event loop can synchronize by one passing a datum to another. The 'loop' argument specifies the event loop (as constructed by make-event-loop in the (a-sync event-loop) library file) with respect to which the meeting will be held: it is an error if the meeting-send or meeting-receive procedures are passed a different event loop as an argument. The 'loop' argument is optional - if none is passed, or #f is passed, then the default event loop will be chosen.
Strictly speaking this procedure can be called in any native OS thread, but since it carries out no synchronization of native threads the user would have to provide her own synchronization if called in other than the thread of the event loop with respect to which the meeting will be held; so it is best if this procedure is called in the thread of that event loop.
This procedure is first available in version 0.13 of this library.
(meeting? obj)
This procedure indicates whether 'obj' is a meeting object constructed by make-meeting.
This procedure is first available in version 0.13 of this library.
(meeting-close meeting)
This closes a meeting object. It's purpose is to wake up any "pseudo-thread" (that is, any a-sync or compose-a-sync block) waiting in meeting-send or meeting-receive by causing either procedure to return with a 'stop-iteration value.
Where that is not necessary (say, the receiver already knows how many items are to be sent), then this procedure does not need to be applied. It is not needed in order to release resources.
This procedure is first available in version 0.13 of this library.
(meeting-ready? meeting)
This indicates whether applying meeting-send or meeting-receive (as the case may be) to the meeting object 'meeting' will return immediately: in other words, this procedure will return #t if another a-sync or compose-a-sync block is already waiting on the object or the meeting object has been closed, otherwise #f.
This procedure is first available in version 0.13 of this library.
(meeting-send await resume [loop] m0 [m1 ...] datum)
This sends a datum to a receiver which is running on the same event loop as the sender, via one or more meeting objects 'm0 m1 ...'. If no receiver is waiting for the datum, this procedure waits until a receiver calls meeting-receive on one of the meeting objects to request the datum. If a receiver is already waiting, this procedure passes on the datum and returns immediately.
The 'loop' argument is optional. If not supplied, or #f is passed, this procedure will use the default event loop. It is an error if this procedure is given a different event loop than the one which was passed to make-meeting on constructing the 'meeting' objects.
This procedure is intended to be called within a waitable procedure invoked by a-sync (which supplies the 'await' and 'resume' arguments).
With version 0.13 of this library, a sender could not invoke this procedure when another a-sync or compose-a-sync block running on the event loop concerned was already waiting to send on the same 'meeting' object. From version 0.14, multiple senders may wait on a meeting object to permit fan in. The provided datum of each sender will be passed to a receiver (as and when a receiver becomes available) in the order in which this procedure was invoked.
Here is an example of such a fan in:
(set-default-event-loop!) ;; if none has yet been set
(define m1 (make-meeting))
(a-sync (lambda (await resume)
(meeting-send await resume m1 "Hello from a-sync1")))
(a-sync (lambda (await resume)
(meeting-send await resume m1 "Hello from a-sync2")))
(a-sync (lambda (await resume)
(let next ([count 0])
(when (< count 2)
(display (meeting-receive await resume m1))
(newline)
(next (+ count 1))))))
(event-loop-run!)
In addition, with version 0.13 of this library, only a single meeting object could be passed to this procedure. From version 0.14 this procedure has 'select'-like behavior: multiple meeting objects may be passed and this procedure will send to the first one which becomes available to receive the datum.
Here is an example of selecting on send (here, meeting-send will send to the first meeting which becomes available for receiving, which is m2):
(set-default-event-loop!) ;; if none has yet been set
(define m1 (make-meeting))
(define m2 (make-meeting))
(a-sync (lambda (await resume)
(meeting-send await resume m1 m2 "Hello")))
(a-sync (lambda (await resume)
(format #t "meeting-send says: ~a~%"
(meeting-receive await resume m2))))
(event-loop-run!)
Once a datum exchange has taken place, the meeting object(s) can be reused for making another exchange (provided the meeting objects have not been closed).
This procedure must be called in the native OS thread in which the event loop concerned runs. To have other native OS threads communicate with an event-loop, use await-task-in-thread!, await-task-in-event-loop!, await-task-in-thread-pool!, await-generator-in-thread!, await-generator-in-event-loop! or await-generator-in-thread-pool!.
This procedure always returns #f unless meeting-close has been applied to a meeting object, in which case 'stop-iteration is returned. Note that if multiple meeting objects are passed to this procedure and one of them is then closed, this procedure will return 'stop-iteration and any wait will be abandonned. It is usually a bad idea to close a meeting object on which this procedure is waiting where this procedure is selecting on more than one meeting object.
This procedure is first available in version 0.13 of this library.
(meeting-receive await resume [loop] m0 [m1 ...])
This receives a datum from a sender running on the same event loop as the receiver, via one or more meeting objects 'm0 m1 ...'. If no sender is waiting to pass the datum, this procedure waits until a sender calls meeting-send on one of the meeting objects to provide the datum. If a sender is already waiting, this procedure returns immediately with the datum supplied.
The 'loop' argument is optional. If not supplied, or #f is passed, this procedure will use the default event loop. It is an error if this procedure is given a different event loop than the one which was passed to make-meeting on constructing the 'meeting' objects.
This procedure is intended to be called within a waitable procedure invoked by a-sync (which supplies the 'await' and 'resume' arguments).
With version 0.13 of this library, a receiver could not invoke this procedure when another a-sync or compose-a-sync block running on the event loop concerned was already waiting to receive from the same 'meeting' object. From version 0.14, multiple receivers may wait on a meeting object to permit fan out. The waiting receivers will be released (as and when a sender provides a datum) in the order in which this procedure was invoked.
Here is an example of such a fan out:
(set-default-event-loop!) ;; if none has yet been set
(define m1 (make-meeting))
(a-sync (lambda (await resume)
(display (meeting-receive await resume m1))
(newline)))
(a-sync (lambda (await resume)
(display (meeting-receive await resume m1))
(newline)))
(a-sync (lambda (await resume)
(let next ([count 0])
(when (< count 2)
(meeting-send await resume m1 count)
(next (+ count 1))))))
(event-loop-run!)
In addition, with version 0.13 of this library, only a single meeting object could be passed to this procedure. From version 0.14 this procedure has 'select'-like behavior: multiple meeting objects may be passed and this procedure will receive from the first one which sends a datum.
Here is an example of selecting on receive:
(set-default-event-loop!) ;; if none has yet been set
(define m1 (make-meeting))
(define m2 (make-meeting))
(a-sync (lambda (await resume)
(meeting-send await resume m1 "m1")))
(a-sync (lambda (await resume)
(meeting-send await resume m2 "m2")))
(a-sync (lambda (await resume)
(let next ([count 0])
(when (< count 2)
(format #t "Message received from ~a~%"
(meeting-receive await resume m1 m2))
(next (+ count 1))))))
(event-loop-run!)
Once a datum exchange has taken place, the meeting object(s) can be reused for making another exchange (provided the meeting objects have not been closed).
This procedure must be called in the native OS thread in which the event loop concerned runs. To have other native OS threads communicate with an event-loop, use await-task-in-thread!, await-task-in-event-loop!, await-task-in-thread-pool!, await-generator-in-thread!, await-generator-in-event-loop! or await-generator-in-thread-pool!.
This procedure always returns the datum value supplied by meeting-send unless meeting-close has been applied to a meeting object, in which case 'stop-iteration is returned. Note that if multiple meeting objects are passed to this procedure and one of them is then closed, this procedure will return 'stop-iteration and any wait will be abandonned. It is usually a bad idea to close a meeting object on which this procedure is waiting where this procedure is selecting on more than one meeting object.
This procedure is first available in version 0.13 of this library.
Library files:
- (a-sync coroutines)
- (a-sync event-loop)
- (a-sync thread-pool)
- (a-sync compose)
- (a-sync meeting)
- (a-sync try)
Other: