-
Notifications
You must be signed in to change notification settings - Fork 98
3.3 Futures and promises
Futures and promises implemented by quantum follow as closely as possible the behavior of their STL counterparts. The user is however encouraged to read the interface definitions in order to understand the API fully. (See files under IPromise and IFuture interfaces).
quantum makes the programmer's life easier, by completely wrapping and managing the lifetime of promises and futures for each task invocation. This means that futures and promises don't have to be explicitly created (although that is also possible if desired) or disposed of when going out of scope. The simplest way to set/get a value via a promise is to use the coroutine context (callee thread) to write the value and the dispatcher context (caller thread) to read it. Similar concept applies to an IO task.
Writing a value into a promise never blocks, however reading or waiting on the associated future does. Writing a promise uses the set()
function and reading the future is done via get()
which blocks. The future can also be waited on with wait()
or waitFor()
which will block and once those return, the user can call get()
which will return the value and not block.
The following code snippet shows how to return a promised value from a coroutine. In this case, we are returning a string.
ThreadContext<std::string>::Ptr tctx = dispatcher.post<std::string>(
[](CoroContext<std::string>::Ptr ctx)->int
{
return ctx->set("The quick brown fox"); //Set the value and return
});
std::string s = tctx->get(); //Block until the future is set.
//Alternatively:
tctx->wait(); //Block until the future is set
std::string s = tctx->get(); //Extract the value without blocking
Returning a promise from an asynchronous IO task is very similar to a coroutine. In this case however, the promise object is not wrapped and is passed as-is via the first parameter in the function signature.
ThreadContext<std::string>::Ptr tctx = dispatcher.postAsyncIo<std::string>(
[](ThreadPromise<std::string>::Ptr promise)->int
{
return promise->set("The quick brown fox"); //Set the value
});
std::string s = tctx->get(); //Block until the future is set.
Similarly, if the async IO is posted from within a coroutine:
dispatcher.post<std::string>([](CoroContext<std::string>::Ptr ctx)->int
{
CoroContext<std::string> ctx2 = ctx->postAsyncIo<std::string>(
[](ThreadPromise<std::string>::Ptr promise)->int
{
return promise->set("The quick brown fox"); //Set the value
});
return ctx->set(ctx2->get(ctx)); //Return the value from the async IO thread
});
One difference to note is that when ctx2->get(ctx)
is called inside the coroutine, the ICoroSync interface is passed as a parameter which causes the coroutine to yield instead of blocking.
In order to pass an exception from a coroutine or IO task to the caller thread, the exception pointer must be caught via std::current_exception()
and set inside the promise as follows:
ThreadContext<int>::Ptr tctx = dispatcher.post([](CoroContext<int>::Ptr ctx)->int
{
std::exception_ptr eptr;
try
{
if (some_error_condition)
{
throw std::logic_error("The quick brown fox is slow");
}
...
}
catch (...)
{
//Get pointer to current exception
eptr = std::current_exception();
//In this case we can also call std::rethrow_exception(eptr) and let
//the scheduler catch it again.
return ctx->setException(eptr);
}
return 0; //no error
});
try
{
int number = tctx->get(); //blocks until the exception is thrown
}
catch (std::exception& ex)
{
std::cerr << ex.what() << std::endl; //prints "The quick brown fox is slow"
}
Everyone hates when someone makes them a promise which is not kept. In this case, if the coroutine exists without setting the promise, and the caller happens to wait on the future, an exception of type BrokenPromiseException will be thrown. The user should also check what other exceptions might be thrown by promises and futures here.
ThreadContext<int>::Ptr tctx = dispatcher.post([](CoroContext<int>::Ptr ctx)->int
{
...
return 0; //exit without setting the 'int' promise
});
try
{
std::string s = tctx->get(); //blocks until the exception is thrown
}
catch (std::exception& ex)
{
std::cerr << ex.what() << std::endl; //prints "Broken promise"
}
Alternatively a broken promise can happen if an exception is thrown inside the coroutine and is allowed to propagate out.
ThreadContext<int>::Ptr tctx = dispatcher.post([](CoroContext<int>::Ptr ctx)->int
{
...
function_which_throws(); //this throws and we don't catch it
return ctx->set(55); //set the promise
});
try
{
std::string s = tctx->get(); //blocks until the exception is thrown
}
catch (std::exception& ex)
{
std::cerr << ex.what() << std::endl; //prints "Broken promise"
}
In the case of async IO tasks, the promise object is not wrapped inside a context and is passed directly to the function itself. This means that its lifetime is not managed automatically and must be done so by the programmer. The reason is because, theoretically inside an IO task, the programmer may choose to further pass along the promise into other functions and set it at a later date (for instance calling some boost:asio delegate or the asynchronous function of a 3rd party API which takes a callback or even spinning off another task with std::async()
). In this case the promise will outlive the scope of the IO task.
If however the programmer chooses to end the promise inside the IO task scope, he/she can use an ITerminate::Guard RAII object to ensure the promise will be either set or broken when the IO task completes. The ITerminate::Guard is simply an object which takes in its constructor any object implementing the ITerminate interface and calls the Terminate()
method on it when it goes out of scope. Most classes in quantum implement the ITerminate interface.
The following examples outline this concept:
ThreadContext<std::string>::Ptr tctx = dispatcher.postAsyncIo<std::string>(
[](ThreadPromise<std::string>::Ptr promise)->int
{
...
return 0; //return without setting the promise
});
std::string s = tctx->get(); //This will block indefinitely as the shared state
//between the future and the promise is never deleted
The correct way of ensuring promise termination:
ThreadContext<std::string>::Ptr tctx = dispatcher.postAsyncIo<std::string>(
[](ThreadPromise<std::string>::Ptr promise)->int
{
ITerminate::Guard guard(*promise);
return 0; //return without setting the promise
});
std::string s = tctx->get(); //Throws BrokenPromiseException
Pass the promise along to some 3rd party callback method:
ThreadContext<std::string>::Ptr tctx = dispatcher.postAsyncIo<std::string>(
[](ThreadPromise<std::string>::Ptr promise)->int
{
//Schedule a callback in a 3rd party library and capture promise by copy
third_party_async_call([promise]()
{
//set inside the callback when 3rd-party async operation completes.
//NOTE: This will be running in a thread outside quantum
promise->set("The quick brown fox");
});
return 0; //return immediately without setting the promise
});
std::string s = tctx->get(); //Returns when the future is set
When get()
is called to retrieve a future, the value inside is moved and the future object is invalidated. This means that any subsequent calls to get()
will throw a FutureAlreadyRetrieved exception. If the user desires to access a future more than once, such as from different threads, the method getRef()
should be called instead. This method returns a constant reference to the future value and will not modify its internal state. This is a concept borrowed from std::shared_future.
One very important use-case scenario of futures is streaming values. Traditional usage requires a single value to be passed between a promise and a future, but there are many cases when multiple values need to be extracted and returned, one at a time. For example suppose an async IO task executes a database query and returns thousands of rows back. With the traditional approach, all fetched rows have to be aggregated into a single object, then returned. This takes a lot of time. With the streaming approach, each fetch operation can be returned to the caller immediately thus allowing the application to react faster and reduce delays.
For this to work, the future value must be wrapped inside a Buffer object. The Buffer futures and promises have two new methods for passing data push()
which inserts the next element into the buffer (tail), and pull()
which extracts the oldest element (head) from the buffer. When the writer thread finishes, it must call closeBuffer()
to end transmission.
The following example shows how to make use of a buffered future:
ThreadContext<Buffer<int>>::Ptr ctx = dispatcher.post<Buffer<int>>(
[](CoroContext<Buffer<int>>::Ptr ctx)->int
{
for (int d = 0; d < 5; d++)
{
ctx->push(d);
ctx->yield(); //simulate some arbitrary delay
}
return ctx->closeBuffer();
});
std::vector<int> v;
while (1)
{
bool isBufferClosed = false;
int value = ctx->pull(isBufferClosed);
if (isBufferClosed) break;
v.push_back(value);
}
//'v' now contains {0, 1, 2, 3, 4}