The goal of this sample is to show you how you can use the PipeTo
extension method within Akka.NET to allow a single actor to make many asynchronous calls simultaneously using the .NET Task Parallel Library (TPL).
This is actually a fairly small and simple example, but we've documented it in extensively so you can understand all of our assumptions and modeling with Akka.NET. Don't confuse "detailed" with "complex."
In this sample we're going to ask the user to provide us with the URL of a valid RSS or ATOM feed, and we're going to:
- Validate that the URL resolves to an actual RSS / ATOM feed;
- Parse all of the
<img>
tags out of the bodies of the items in the feed; and - Asynchronously download all images for each blog post in parallel using a single actor, even though Akka.NET actors can only process one message at a time!
The goal of this is to show you that, yes - even though actors can only process one message at a time they can still leverage async
methods and Task<T>
objects to do multiple things in parallel.
Note: with some fairly small changes, you could modify this sample to create an offline, local backup of every blog post in a remote RSS or ATOM feed.
This code can also process multiple RSS or ATOM feeds in parallel without any modification - you'd just need to change the user interface to be able to provide multiple feed URLs at once.
Maybe you should give one of these suggestions a try once you've had a chance to grok the sample? ;)
This sample should illustrate the following concepts:
- That although the actors are only processing a single RSS / ATOM feed, you could easily have them process hundreds of feeds in parallel without writing any additional actor code. All you'd have to do is send additional feed uris to the
FeedValidatorActor
. This is what makes actors fundamentally powerful - any process you've written using actors can be automatically parallelized. - That even though actors can only process one message at a time, they can still run asynchronous tasks in the background just like any other class. The trick to doing this effectively, however, is to make sure that the results of those asynchronous tasks are delivered back to the actor as messages. This is what
PipeTo
does.
Why can't you use async
and await
inside the actor's OnReceive
methods?
As we discussed in "Akka.NET: What is an Actor?" - the mailbox pushes messages into your actor's OnReceive
method as soon as the previous iteration of the OnReceive
function exits. So whenever you await
an async
operation inside the OnReceive
method, you prematurely exit the OnReceive
method and the mailbox will push a new message into it.
Await
breaks the "actors process one message at a time" guarantee, and suddenly your actor's context might be different. Variables such as theSender
of the previous message may be different, or the actor might even be shutting down when theawait
call returns to the previous context.
So just don't do it. Await
is evil inside an actor. Await
is just syntactic sugar anyway. Use ContinueWith
and PipeTo
instead. Turn the results of async
operations into messages that get delivered into your actor's inbox and you can take advantage of Task
and TPL methods just like you did before.
What are those TaskContinuationOptions.AttachedToParent & TaskContinuationOptions.ExecuteSynchronously
flags you keep using on ContinueWith
inside an actor?
Those are a set of bitmasked options specific to the Task
class, and in this case we're telling the TPL to make sure that the ContinueWith
, known as a "task continuation," gets executed immediately after the parent task returns its asynchronous result and executes using the same thread as its parent.
Sometimes your continuations can be executed on a totally different thread or scheduled to run at a different time and this can cause unpredictable behavior inside actors - because the actors themselves might be running on a different thread than they were when they started the asynchronous Task
.
The
ContinueWith
will still run on a different thread than the actor, but it makes sure it's the same thread as the originalTask<T>
.
It's best to include these flags whenever you're doing a ContinueWith
inside an Actor. This makes the behavior of continuations and PipeTo
very predictable and reliable.
Do I need to worry about closing over (closures) my actor's internal state when using PipeTo
?
Yes, you need to close over any state whose value might change between messages that you need to use inside your ContinueWith
or PipeTo
calls.
So for instance, the Sender
property of your actor will almost definitely change between messages. You'll need to use a C# closure for this property in order to guarantee that any asynchronous methods that depend on this property get the right value.
Here's an example:
//time to kick off the feed parsing process, and send the results to ourselves
Receive<BeginProcessFeed>(feed =>
{
//instance variable for closure
var senderClosure = Sender;
SendMessage(string.Format("Downloading {0} for RSS/ATOM processing...", feed.FeedUri));
//reply back to the sender
_feedFactory.CreateFeedAsync(feed.FeedUri).PipeTo(senderClosure);
});
Doing a closure is as simple as stuffing the property into an instance variable (var
) and using that instance variable in your call instead of the field or property defined on your actor.
Everything in this sample, including reading from and writing to the Console
, is done using Akka.NET actors.
Here's how the actor hierarchy is organized in this sample:
/user/
is the root actor for all user-defined actors. Any time you callActorSystem.ActorOf
you're going to create a child of the/user/
actor. This is built into Akka.NET./user/consoleReader
is an instance of aConsoleReaderActor
(source) responsible for prompting the end-user for command-line input. If the user types "exit" on the command line, this actor will also callActorSystem.ShutDown
- which will terminate the application. There is only ever a single instance of this actor, because there's only one instance of the command line to read from./user/consoleWriter/
is an instance of aConsoleWriterActor
(source) responsible for receiving status updates from all other actors in this sample and writing them to the console in a serial fashion. In the event of a completed feed parse or a failed URL validation, theConsoleWriterActor
will tell theConsoleReaderActor
to prompt the user for a new RSS / ATOM feed URL. There is only ever a single instance of this actor, because there should only be one actor responsible for writing output to the console (single writer pattern.)/user/feedValidator
is an instance of aFeedValidatorActor
(source) responsible for receiving input from theConsoleReaderActor
and validating whether or not the user-provided URL is:- A valid absolute URI and
- Actually hosts a valid RSS or Atom feed at the address, determined using Quick and Dirty Feed Parser's asynchronous methods and
PipeTo
(relevant source.)
/user/feedValidator/[feedCoordinator]
is an instance of theFeedParserCoordinator
actor (source) created by theFeedValidatorActor
in the event that a user-supplied feed URL passes validation. AFeedParserCoordinator
is responsible for coordinating the downloading of RSS / ATOM feed items, parsing of said items, and the downloading of all images found in the feed concurrently. There is one of these actors per RSS or Atom feed. Technically, if you sent a list of 100 different feed URLs to theFeedValidatorActor
then it would create 100 differentFeedValidatorActor
instances to process each feed in parallel. This actor is responsible for dispatching work to its children and determining when its children have finished processing the contents of the provided feed./user/feedValidator/[feedCoordinator]/[feedParser]
hosts an instance of aFeedParserActor
(source,) who gets created during theFeedParserCoordinator.PreStart
call. This actor is responsible for:- asynchronously downloading the content of the RSS / ATOM feed using Quick and Dirty Feed Parser's aync methods and using
PipeTo
to deliver the downloaded feed back to itself as a new message (relevant source.) - Parsing all
<img>
tags from each RSS / ATOM item in the feed using the HTML Agility Pack. - Sending the full URLs for each parsed image to its sibling
HTTPDownloaderActor
, which will begin downloading each image into memory. - Reporting back to its parent, the
FeedParserCoordinator
the number of remaining feed items that need to be processed and the number of images that need to be processed.
- asynchronously downloading the content of the RSS / ATOM feed using Quick and Dirty Feed Parser's aync methods and using
/user/feedValidator/[feedCoordinator]/[httpDownloader]
is an instance of aHttpDownloaderActor
(source) who gets created during theFeedParserCoordinator.PreStart
call. This actor is responsible for:- asynchronously downloading all image URLs sent to it by the
FeedParserActor
using theHttpClient
- each download is done asynchronously usingTask
instances,ContinueWith
for minor post-processing, andPipeTo
to deliver the completed results back into theHttpDownloaderActor
as messages. The fact that this single actor can process many image downloads in parallel is the entire point of this code sample. Please see the (relevant source.). - Reporting successful or failed download attempts back to the
FeedParserCoordinator
.
- asynchronously downloading all image URLs sent to it by the
Also worth pointing out is the use of statically defined names and dynamic names.
Any time you intend to have a single instance of a specific actor per-process, you should use a static name so it can be easily referred to via
ActorSelection
throughout your application.If you intend to have many instances of an actor, particularly if they're not top-level actors, then you can use dynamic names.
As a best practice, we define all names and paths for looking up actors inside a static metadata class, in this case the ActorNames
class (source.)
The first data flow involves simply reading input from the console, via the ConsoleReaderActor
:
The ConsoleReaderActor
receives a message of type ConsoleReaderActor.ReadFromConsoleClean
from the Main
method, and this tells the ConsoleReaderActor
that it's time to print the instructions for the app and request an RSS / ATOM feed Uri from the end-user.
If the ConsoleReaderActor
receives the string literal "exit" from the end-user, it will call ActorSystem.ShutDown
and terminate the Program.MyActorSystem
instance, which will cause MyActorSystem.AwaitTermination
to complete and allow the Main
function to exit and terminate the console app.
However, if the ConsoleReaderActor
receives any other string it will Tell
that string to the FeedValidatorActor
.
The FeedValidatorActor
receives the string
message from the ConsoleReaderActor
and immediately checks to see if the string is a valid Uri.
- If the string is not a valid uri, the
FeedValidatorActor
sends aConsoleWriterActor.ConsoleWriteFailureMessage
back to theConsoleWriterActor
. - If the string is a valid uri, the
FeedValidatorActor
will call the following code block to validate that the uri points to a live RSS or ATOM feed.
IsValidRssOrAtomFeed(feedUri)
.ContinueWith(rssValidationResult => new IsValidFeed(feedUri, rssValidationResult.Result),
TaskContinuationOptions.AttachedToParent & TaskContinuationOptions.ExecuteSynchronously)
.PipeTo(Self);
This calls a method within Quick and Dirty Feed Parser which returns Task<bool>
- the FeedValidatorActor
then continues this Task<bool>
with a simple function that wraps the bool
result and the original feedUri
string into a IsValidFeed
instance. This IsValidFeed
object is then piped into FeedValidatorActor
's inbox via the PipeTo
method.
If the
FeedValidatorActor
had to validate hundreds of feeds, a single instance of this actor could process hundreds of feeds concurrently because the long-runningIsValidRssOrAtomFeed
method is being run asynchronously on an I/O completion port and the result of thatTask
is placed into theFeedValidatorActor
's mailbox just like any other message.That's how you're supposed to use async within an actor - turn all asynchronous operations into functions that eventually produce a new message for the actor to process.
- If the feed is not a valid RSS or ATOM feed, the
FeedValidatorActor
sends aConsoleWriterActor.ConsoleWriteFailureMessage
back to theConsoleWriterActor
. - If the feed is a valid RSS or ATOM feed, the
FeedValidatorActor
creates a newFeedParserCoordinator
actor instance and passes in thefeedUri
as a constructor argument.
When the FeedParserCoordinator
is created, it immediately creates a FeedParserActor
and HttpDownloaderActor
during its FeedParserCoordinator.PreStart
phase. Once both children are started, the FeedParserCoordinator
sends a FeedParserActor.BeginProcessFeed
message to the FeedParserActor
to begin the feed download and parsing process.
Once the FeedParserActor
receives the FeedParserActor.BeginProcessFeed
message, it immediately attempts to download and parse the feed using Quick and Dirty Feed Parser - and the results are piped to the FeedParserActor
asynchronously as an IFeed
message. Technically this means that FeedParserActor
could process multiple feeds in parallel, even though we're really only using the actor to parse a single feed once.
If the feed is empty or did not parse properly, the FeedParserActor
notifies its parent, the FeedParserCoordinator
, that the job is finished. The FeedParserCoordinator
will then signal the ConsoleReaderActor
that the app is ready for additional input and will self-terminate.
If the feed has items, the FeedParserActor
will notify the FeedParserCoordinator
that there are N RSS / ATOM feed items waiting to be processed and will then begin sending each of those items back to itself as a distinct ParseFeedItem
message.
For each ParseFeedItem
message the FeedParserActor
receives, the actor will:
- Use the HTML Agility Pack to find any
<img>
tags in the text of the feed item and extract the urls of those images. - Report back to the
FeedParserCoordinator
for each discovered image (to help with job tracking.) - Send the URL of each image to the
HttpDownloaderActor
for download. - Tell the
FeedParserCoordinator
that we've completed HTML parsing for one page (job tracking.)
And so now we get to the important part - seeing the
HttpDownloaderActor
asynchronously download all of the images at once.
The HttpDownloaderActor
receives a HttpDownloaderActor.DownloadImage
message from the FeedParserActor
and asynchronously kicks off an HttpClient.GetAsync
task to begin downloading the image. This is where PipeTo
comes in for a major performance boost. (Source.)
While the HTTP download and post-processing happens for each individual image, the
HttpDownloaderActor
is still able to receive and process additionalHttpDownloaderActor.DownloadImage
orHttpDownloaderActor.ImageDownloadResult
messages while those downloads run on different threads.
Once the post-processing for an HTTP download is done, regardless of success or failure the result is wrapped inside a HttpDownloaderActor.ImageDownloadResult
object and piped back into the actor's mailbox as a new message. That's how you do async within actors - turn the output of Task<T>
objects into messages.
Once the HttpDownloaderActor
receives a HttpDownloaderActor.ImageDownloadComplete
message from the asynchronous Task
, it will report either a success or failure to the ConsoleWriterActor
and let the FeedParserCoordinator
know that it has finished processing at least one additional image.
While the FeedParserActor
and the HttpDownloaderActor
are both processing their workloads, they're periodically reporting results back to the FeedParserCoordinator
- it's ultimately the job of the FeedParserCoordinator
to know:
- how much work needs to be done total and
- how much work still needs to be done right now.
Once the number of completed items is equal to the number of total expected items, the processing job is considered to be "complete." This FeedParserCoordinator
instance will report its results back to the ConsoleWriterActor
and signal to the ConsoleReaderActor
that it's probably time to ask for a new feed url.
Finally, the FeedParserCoordinator
will shut itself and its children down.
This sample depends on the following NuGet packages in order to run:
- Akka.NET (core only)
- HTML Agility Pack
- Quick and Dirty Feed Parser
- Clone this repository to your local computer - we highly recommend installing Github for Windows if you don't already have a Git client installed.
- Open
PipeToSample.sln
in Visual Studio 2012 or later. - Press
F6
to build the sample - this solution has NuGet package restore enabled, so any third party dependencies will automatically be downloaded and added as references. - Press
F5
to run the sample.
From there the console application will provide you with extensive instructions. Here's what you should see upon first run:
And if you provide the sample with a valid RSS or ATOM feed url, such as http://www.aaronstannard.com/feed.xml, you should see output that resembles the following:
Take a look through the messages and do a quick sanity check ;)
If you have any questions about this sample, please create a Github issue for us!