-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathsender.go
71 lines (63 loc) · 1.65 KB
/
sender.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package logpeck
import (
"errors"
"strings"
log "github.com/Sirupsen/logrus"
sjson "github.com/bitly/go-simplejson"
)
const (
senderTypeES = "elasticsearch"
senderTypeKafka = "kafka"
senderTypeInfluxDb = "influxdb"
)
// Sender .
type Sender interface {
Send(map[string]interface{})
Start() error
Stop() error
}
// GetSenderConfig .
func GetSenderConfig(j *sjson.Json) (senderConfig SenderConfig, err error) {
cJ := j.Get("Sender")
if cJ.Interface() == nil {
return senderConfig, nil
}
senderConfig.Name, err = cJ.Get("Name").String()
if err != nil {
log.Infof("[GetSenderConfig]err: %v", err)
return senderConfig, err
}
cJ = cJ.Get("Config")
if cJ.Interface() == nil {
return senderConfig, nil
}
jbyte, err := cJ.MarshalJSON()
if err != nil {
return senderConfig, err
}
switch strings.ToLower(senderConfig.Name) {
case senderTypeES:
senderConfig.Config, err = NewElasticSearchSenderConfig(jbyte)
case senderTypeInfluxDb:
senderConfig.Config, err = NewInfluxDbSenderConfig(jbyte)
case senderTypeKafka:
senderConfig.Config, err = NewKafkaSenderConfig(jbyte)
default:
err = errors.New("[GetSenderConfig]sender name error: " + senderConfig.Name)
}
return senderConfig, err
}
// NewSender .
func NewSender(senderConfig *SenderConfig) (sender Sender, err error) {
switch strings.ToLower(senderConfig.Name) {
case senderTypeES:
sender, err = NewElasticSearchSender(senderConfig)
case senderTypeInfluxDb:
sender, err = NewInfluxDbSender(senderConfig)
case senderTypeKafka:
sender, err = NewKafkaSender(senderConfig)
default:
err = errors.New("[NewSender]sender name error: " + senderConfig.Name)
}
return sender, err
}