forked from nats-io/nats.net.v1
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathRxSample.cs
47 lines (38 loc) · 1.28 KB
/
RxSample.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using NATS.Client;
using NATS.Client.Rx;
using NATS.Client.Rx.Ops;
//Can be replaced with using System.Reactive.Linq;
namespace NATSExamples
{
class RxSample
{
static void Main(string[] args)
{
using (var cn = new ConnectionFactory().CreateConnection())
{
var temperatures =
cn.Observe("temperatures")
.Where(m => m.Data?.Any() == true)
.Select(m => BitConverter.ToInt32(m.Data, 0));
temperatures.Subscribe(t => Console.WriteLine($"{t}C"));
temperatures.Subscribe(t => Console.WriteLine($"{(t * 9 / 5) + 32}F"));
var cts = new CancellationTokenSource();
Task.Run(async () =>
{
var rnd = new Random();
while (!cts.IsCancellationRequested)
{
cn.Publish("temperatures", BitConverter.GetBytes(rnd.Next(-10, 40)));
await Task.Delay(1000, cts.Token);
}
}, cts.Token);
Console.ReadKey();
cts.Cancel();
}
}
}
}