-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathpeck_task.go
111 lines (100 loc) · 2.05 KB
/
peck_task.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package logpeck
import (
"errors"
log "github.com/Sirupsen/logrus"
)
// PeckTask .
type PeckTask struct {
Config PeckTaskConfig
Stat PeckTaskStat
filter PeckFilter
extractor Extractor
sender Sender
aggregator *Aggregator
}
// NewPeckTask .
func NewPeckTask(c *PeckTaskConfig, s *PeckTaskStat) (*PeckTask, error) {
config := c
var stat *PeckTaskStat
if s == nil {
stat = &PeckTaskStat{
Name: c.Name,
Stop: true,
}
} else {
stat = s
}
extractor, err := NewExtractor(config.Extractor)
if err != nil {
return nil, err
}
filter := NewPeckFilter(config.Keywords)
//var sender Sender
sender, err := NewSender(&config.Sender)
if err != nil {
return nil, err
}
aggregator := NewAggregator(&config.Aggregator)
task := &PeckTask{
Config: *config,
Stat: *stat,
filter: *filter,
extractor: extractor,
sender: sender,
aggregator: aggregator,
}
log.Infof("[PeckTask] new peck task %#v", task)
return task, nil
}
// Start .
func (p *PeckTask) Start() error {
if err := p.sender.Start(); err != nil {
return err
}
p.Stat.Stop = false
return nil
}
// Stop .
func (p *PeckTask) Stop() error {
if err := p.sender.Stop(); err != nil {
return err
}
p.Stat.Stop = true
return nil
}
// IsStop .
func (p *PeckTask) IsStop() bool {
return p.Stat.Stop
}
// Process .
func (p *PeckTask) Process(content string) {
//log.Infof("sender%v",p.sender)
if p.Stat.Stop {
return
}
if p.filter.Drop(content) {
return
}
fields, _ := p.extractor.Extract(content)
if p.aggregator.IsEnable() {
timestamp := p.aggregator.Record(fields)
deadline := p.aggregator.IsDeadline(timestamp)
if deadline {
fields = p.aggregator.Dump(timestamp)
p.sender.Send(fields)
}
} else {
p.sender.Send(fields)
}
}
// ProcessTest .
func (p *PeckTask) ProcessTest(content string) (map[string]interface{}, error) {
if p.filter.Drop(content) {
return map[string]interface{}{}, errors.New("Discarded")
}
fields, err := p.extractor.Extract(content)
if err != nil {
return map[string]interface{}{}, err
}
return fields, nil
}