-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathaggregator_simple.go
56 lines (48 loc) · 1.07 KB
/
aggregator_simple.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package main
// simpleAggregator implements an aggregator that does nothing but tokenizing
// incoming data.
type simpleAggregator struct {
t tokenizer
inbox chan serializer
outbox chan token
done chan empty
}
func newSimpleAggregator() aggregator {
return &simpleAggregator{
done: make(chan empty),
}
}
func (s *simpleAggregator) setConfig(c *config) {}
func (s *simpleAggregator) use(t tokenizer) {
s.t = t
}
func (s *simpleAggregator) connect(inbox chan serializer, outbox chan token) {
s.inbox = inbox
s.outbox = outbox
}
func (s *simpleAggregator) start() {
if err := s.t.resetKey(); err != nil {
l.Fatalf("Failed to reset tokenizer key: %v", err)
}
go func() {
for {
select {
case <-s.done:
return
case b := <-s.inbox:
token, err := s.t.tokenize(b)
if err != nil {
l.Printf("Failed to tokenize blob: %v", err)
continue
}
l.Println("Tokenized blob.")
s.outbox <- token
l.Println("Sent token to forwarder.")
}
}
}()
}
func (s *simpleAggregator) stop() {
close(s.done)
l.Println("Stopped aggregator.")
}