Skip to content

Commit 3a67a2e

Browse files
committed
重构 event, 日志改用 stderr
1 parent 6817ea3 commit 3a67a2e

16 files changed

+281
-236
lines changed

Makefile

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,13 @@ install:
88

99

1010
test-integration:
11-
sudo supervisorctl stop supervisor-event-listener
1211
go build
12+
sudo supervisorctl stop supervisor-event-listener
1313
sudo cp ./supervisor-event-listener /usr/local/bin/
1414
sudo cp ./tests/supervisor-app.ini /etc/supervisor.d/
15-
sudo supervisorctl start supervisor-event-listener
16-
sudo supervisorctl update
17-
sudo supervisorctl start sleep-then-exit
15+
sudo supervisorctl remove supervisor-event-listener
16+
sudo supervisorctl update supervisor-event-listener
17+
sudo supervisorctl tail -f supervisor-event-listener stderr
1818

1919

2020
clean:

config/config.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import (
66
"strings"
77

88
"github.com/ouqiang/supervisor-event-listener/utils"
9-
"github.com/ouqiang/supervisor-event-listener/utils/tmpfslog"
9+
"github.com/ouqiang/supervisor-event-listener/utils/errlog"
1010
"gopkg.in/ini.v1"
1111
)
1212

@@ -66,7 +66,7 @@ func ParseConfig(configFile string) *Config {
6666
config := &Config{}
6767
config.NotifyType = notifyType
6868

69-
tmpfslog.Info("notifyType: %+v\n", config.NotifyType)
69+
errlog.Info("notifyType: %+v\n", config.NotifyType)
7070
switch notifyType {
7171
case "mail":
7272
config.MailServer = parseMailServer(section)

event/errs.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package event
2+
3+
import (
4+
"errors"
5+
)
6+
7+
var (
8+
ErrParseHeader = errors.New("parse header failed")
9+
ErrParsePayload = errors.New("parse payload failed")
10+
ErrPayloadLength = errors.New("invalid payload length")
11+
)

event/event.go

Lines changed: 0 additions & 155 deletions
This file was deleted.

event/header.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package event
2+
3+
import "strconv"
4+
5+
type Header struct {
6+
Ver string
7+
Server string
8+
Serial int
9+
Pool string
10+
PoolSerial int
11+
EventName string // 事件名称
12+
Len int // Payload长度
13+
}
14+
15+
func ParseHeader(header string) (*Header, error) {
16+
h := &Header{}
17+
fields := parseFields(header)
18+
if len(fields) == 0 {
19+
return h, ErrParseHeader
20+
}
21+
22+
h.Ver = fields["ver"]
23+
h.Server = fields["server"]
24+
h.Serial, _ = strconv.Atoi(fields["serial"])
25+
h.Pool = fields["pool"]
26+
h.PoolSerial, _ = strconv.Atoi(fields["poolserial"])
27+
h.EventName = fields["eventname"]
28+
h.Len, _ = strconv.Atoi(fields["len"])
29+
30+
return h, nil
31+
}

event/message.go

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
package event
2+
3+
import (
4+
"bufio"
5+
"encoding/json"
6+
"fmt"
7+
"time"
8+
9+
"github.com/ouqiang/supervisor-event-listener/utils/errlog"
10+
"github.com/pkg/errors"
11+
)
12+
13+
type Message struct {
14+
TS time.Time
15+
Header *Header
16+
Payload *Payload
17+
}
18+
19+
func NewMessage(h *Header, p *Payload) Message {
20+
return Message{
21+
TS: time.Now(),
22+
Header: h,
23+
Payload: p,
24+
}
25+
}
26+
27+
func ReadMessage(reader *bufio.Reader) (Message, error) {
28+
header, err := readHeader(reader)
29+
if err != nil {
30+
errlog.Error("header:%+v err:%+v", header, err)
31+
return Message{}, err
32+
}
33+
payload, err := readPayload(reader, header.Len)
34+
if err != nil {
35+
errlog.Error("payload:%+v err:%+v", payload, err)
36+
return Message{}, err
37+
}
38+
return NewMessage(header, payload), nil
39+
}
40+
41+
// 读取header
42+
func readHeader(reader *bufio.Reader) (*Header, error) {
43+
// 读取Header
44+
data, err := reader.ReadString('\n')
45+
if err != nil {
46+
return nil, err
47+
}
48+
// 解析Header
49+
header, err := ParseHeader(data)
50+
if err != nil {
51+
return nil, err
52+
}
53+
return header, nil
54+
}
55+
56+
// 读取payload
57+
func readPayload(reader *bufio.Reader, payloadLen int) (*Payload, error) {
58+
// 读取payload
59+
buf := make([]byte, payloadLen)
60+
length, err := reader.Read(buf)
61+
if err != nil {
62+
return nil, err
63+
}
64+
if payloadLen != length {
65+
err := ErrPayloadLength
66+
err = errors.Wrapf(err, " payloadLen:%d != length:%d", payloadLen, length)
67+
return nil, err
68+
}
69+
// 解析payload
70+
payload, err := ParsePayload(string(buf))
71+
if err != nil {
72+
return nil, err
73+
}
74+
return payload, nil
75+
}
76+
77+
func (msg *Message) String() string {
78+
tmpl := `Proc: %s
79+
Host: %s
80+
State: %s
81+
PID: %d
82+
Date: %s`
83+
return fmt.Sprintf(tmpl,
84+
msg.Payload.ProcessName,
85+
msg.Payload.IP,
86+
msg.Payload.FromState,
87+
msg.Payload.PID,
88+
msg.TS.Format(time.RFC3339),
89+
)
90+
}
91+
92+
func (msg *Message) ToJson(indent ...int) string {
93+
realIndent := 0
94+
if len(indent) > 0 {
95+
realIndent = indent[0]
96+
}
97+
t := ""
98+
switch realIndent {
99+
case 0:
100+
case 1:
101+
t = " "
102+
case 2:
103+
t = " "
104+
case 3:
105+
t = " "
106+
case 4:
107+
t = " "
108+
default:
109+
t = " "
110+
}
111+
_bytes, _ := json.MarshalIndent(msg, "", t)
112+
return string(_bytes)
113+
}
114+
115+
// Header Supervisord触发事件时会先发送Header,根据Header中len字段去读取Payload

0 commit comments

Comments
 (0)