forked from petabridge/akka-bootcamp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathTailActor.cs
117 lines (103 loc) · 3.64 KB
/
TailActor.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
using System;
using System.IO;
using System.Text;
using Akka.Actor;
namespace WinTail
{
/// <summary>
/// Monitors the file at <see cref="_filePath" /> for changes and sends
/// file updates to console.
/// </summary>
public class TailActor : UntypedActor
{
private readonly string _filePath;
private readonly Stream _fileStream;
private readonly StreamReader _fileStreamReader;
private readonly FileObserver _observer;
private readonly IActorRef _reporterActor;
public TailActor(IActorRef reporterActor, string filePath)
{
_reporterActor = reporterActor;
_filePath = filePath;
// start watching file for changes
_observer = new FileObserver(Self, Path.GetFullPath(_filePath));
_observer.Start();
// open the file stream with shared read/write permissions
// (so file can be written to while open)
_fileStream = new FileStream(Path.GetFullPath(_filePath),
FileMode.Open, FileAccess.Read, FileShare.ReadWrite);
_fileStreamReader = new StreamReader(_fileStream, Encoding.UTF8);
// read the initial contents of the file and send it to console as first msg
var text = _fileStreamReader.ReadToEnd();
Self.Tell(new InitialRead(_filePath, text));
}
protected override void OnReceive(object message)
{
if (message is FileWrite)
{
// move file cursor forward
// pull results from cursor to end of file and write to output
// (this is assuming a log file type format that is append-only)
var text = _fileStreamReader.ReadToEnd();
if (!string.IsNullOrEmpty(text))
{
_reporterActor.Tell(text);
}
if (text.Contains("X"))
{
throw new Exception();
}
}
else if (message is FileError)
{
var fe = message as FileError;
_reporterActor.Tell(string.Format("Tail error: {0}", fe.Reason));
}
else if (message is InitialRead)
{
var ir = message as InitialRead;
_reporterActor.Tell(ir.Text);
}
}
#region Message types
/// <summary>
/// Signal that the file has changed, and we need to
/// read the next line of the file.
/// </summary>
public class FileWrite
{
public FileWrite(string fileName)
{
FileName = fileName;
}
public string FileName { get; private set; }
}
/// <summary>
/// Signal that the OS had an error accessing the file.
/// </summary>
public class FileError
{
public FileError(string fileName, string reason)
{
FileName = fileName;
Reason = reason;
}
public string FileName { get; private set; }
public string Reason { get; }
}
/// <summary>
/// Signal to read the initial contents of the file at actor startup.
/// </summary>
public class InitialRead
{
public InitialRead(string fileName, string text)
{
FileName = fileName;
Text = text;
}
public string FileName { get; private set; }
public string Text { get; }
}
#endregion
}
}