forked from petabridge/akkadotnet-code-samples
-
Notifications
You must be signed in to change notification settings - Fork 0
/
FeedParserActor.cs
152 lines (123 loc) · 5.74 KB
/
FeedParserActor.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
using System;
using Akka.Actor;
using HtmlAgilityPack;
using QDFeedParser;
namespace PipeTo.App.Actors
{
/// <summary>
/// Uses QDFeedParser's <see cref="IFeedFactory"/> to parse RSS / Atom Feeds. https://github.com/Aaronontheweb/qdfeed
///
/// And also uses HTML Agility Pack to parse img tags from any downloaded HTML. http://htmlagilitypack.codeplex.com/
/// </summary>
public class FeedParserActor : ReceiveActor
{
#region Message types
/// <summary>
/// Message sent by <see cref="FeedParserCoordinator"/> that begins the parsing process
/// </summary>
public class BeginProcessFeed
{
public BeginProcessFeed(Uri feedUri)
{
FeedUri = feedUri;
}
public Uri FeedUri { get; private set; }
}
/// <summary>
/// Feed Content that needs to be parsed for image tags
/// </summary>
public class ParseFeedItem
{
public ParseFeedItem(string feedUri, BaseFeedItem feedItem)
{
FeedItem = feedItem;
FeedUri = feedUri;
}
public string FeedUri { get; private set; }
public BaseFeedItem FeedItem { get; private set; }
}
#endregion
private readonly IFeedFactory _feedFactory;
private ActorRef _downloadActor;
private readonly string _consoleWriterActorPath;
public FeedParserActor(IFeedFactory feedFactory, ActorRef downloadActor)
: this(feedFactory, downloadActor, ActorNames.ConsoleWriterActor.Path)
{
}
public FeedParserActor(IFeedFactory feedFactory, ActorRef downloadActor, string consoleWriterActorPath)
{
_feedFactory = feedFactory;
_downloadActor = downloadActor;
_consoleWriterActorPath = consoleWriterActorPath;
//Set our Receive functions
Initialize();
}
public void Initialize()
{
//time to kick off the feed parsing process, and send the results to ourselves
Receive<BeginProcessFeed>(feed =>
{
SendMessage(string.Format("Downloading {0} for RSS/ATOM processing...", feed.FeedUri));
_feedFactory.CreateFeedAsync(feed.FeedUri).PipeTo(Self);
});
//this is the type of message we receive when the BeginProcessFeed's PipeTo operation finishes
Receive<IFeed>(feed =>
{
SendMessage("Feed download successful.", PipeToSampleStatusCode.Success);
SendMessage(string.Format("Have to download and parse {0} pages", feed.Items.Count));
//We have to download at least one HTML page per feed URL.
Context.Parent.Tell(new FeedParserCoordinator.RemainingDownloadCount(feed.FeedUri, feed.Items.Count, 0));
//for each item in the feed, we need to process it.
foreach (var item in feed.Items)
{
//Check to see if there's any HTML content...
if (!string.IsNullOrEmpty(item.Content))
{
//We're going to self process each of these items
Self.Tell(new ParseFeedItem(feed.FeedUri, item));
}
else
{
//Whoops, no HTML. We can mark this download as complete then.
Context.Parent.Tell(new FeedParserCoordinator.DownloadComplete(feed.FeedUri, 1, 0));
}
}
//No content in this feed. No need for further processing.
if(feed.Items.Count == 0)
Context.Parent.Tell(new FeedParserCoordinator.EmptyFeed());
});
Receive<ParseFeedItem>(item =>
{
SendMessage(string.Format("Processing {0} for {1}", item.FeedItem.Link, item.FeedUri));
//time to use the HMTL agility pack to process this content
var doc = new HtmlDocument();
doc.LoadHtml(item.FeedItem.Content);
//find all of the IMG tags via XPATH
var nodes = doc.DocumentNode.SelectNodes("//img[@src]");
if (nodes != null)
{
foreach (var imgNode in doc.DocumentNode.SelectNodes("//img[@src]"))
{
var imgUrl = imgNode.Attributes["src"].Value;
SendMessage(string.Format("Found image {0} inside {1}", imgUrl, item.FeedItem.Link));
//Let the coordinator know that we expect download results for moreimages...
Context.Parent.Tell(new FeedParserCoordinator.RemainingDownloadCount(item.FeedUri, 0, 1));
//And let the download actor know that it has work to do
_downloadActor.Tell(new HttpDownloaderActor.DownloadImage(item.FeedUri, imgUrl));
}
}
//Let the parent know that we've finished processing this HTML document
Context.Parent.Tell(new FeedParserCoordinator.DownloadComplete(item.FeedUri, 1, 0));
});
}
#region Messaging methods
private void SendMessage(string message, PipeToSampleStatusCode pipeToSampleStatus = PipeToSampleStatusCode.Normal)
{
//create the message instance
var consoleMsg = StatusMessageHelper.CreateMessage(message, pipeToSampleStatus);
//Select the ConsoleWriterActor and send it a message
Context.ActorSelection(_consoleWriterActorPath).Tell(consoleMsg);
}
#endregion
}
}