-
Notifications
You must be signed in to change notification settings - Fork 0
/
ping_pong.go
132 lines (105 loc) · 2.3 KB
/
ping_pong.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package art
import (
"context"
"errors"
"fmt"
"time"
)
var ppKey = "logger"
func CtxWithPingPong(ctx context.Context, v WaitPingPong) context.Context {
return context.WithValue(ctx, &ppKey, v)
}
func CtxGetPingPong(ctx context.Context) WaitPingPong {
wait, ok := ctx.Value(&ppKey).(WaitPingPong)
if !ok {
panic("not found ping pong")
}
return wait
}
func NewWaitPingPong() WaitPingPong {
wait := make(WaitPingPong, 1)
return wait
}
type WaitPingPong chan struct{}
func (wait WaitPingPong) Ack() {
select {
case wait <- struct{}{}:
default:
}
}
func WaitPingSendPong(waitPingSeconds int, waitPing WaitPingPong, sendPong func() error, isStop func() bool) error {
waitPingTime := time.Duration(waitPingSeconds) * time.Second
waitPingTimer := time.NewTimer(waitPingTime)
defer waitPingTimer.Stop()
for !isStop() {
select {
case <-waitPingTimer.C:
if isStop() {
return nil
}
return errors.New("wait ping timeout")
case <-waitPing:
err := sendPong()
if err != nil {
return fmt.Errorf("send pong: %v", err)
}
ok := waitPingTimer.Reset(waitPingTime)
if !ok {
waitPingTimer = time.NewTimer(waitPingTime)
}
}
}
return nil
}
func SendPingWaitPong(sendPingSeconds int, sendPing func() error, waitPong WaitPingPong, isStopped func() bool) error {
sendPingPeriod := time.Duration(sendPingSeconds) * time.Second
waitPongTime := sendPingPeriod * 2
done := make(chan struct{})
defer close(done)
result := make(chan error, 2)
go func() {
sendPingTicker := time.NewTicker(sendPingPeriod)
defer sendPingTicker.Stop()
for !isStopped() {
select {
case <-sendPingTicker.C:
if isStopped() {
result <- nil
return
}
err := sendPing()
if err != nil {
result <- fmt.Errorf("send ping: %v", err)
return
}
case <-done:
return
}
}
result <- nil
}()
go func() {
waitPongTimer := time.NewTimer(waitPongTime)
defer waitPongTimer.Stop()
for !isStopped() {
select {
case <-waitPongTimer.C:
if isStopped() {
result <- nil
return
}
result <- errors.New("wait pong timeout")
return
case <-waitPong:
ok := waitPongTimer.Reset(waitPongTime)
if !ok {
waitPongTimer = time.NewTimer(waitPongTime)
}
case <-done:
return
}
}
result <- nil
}()
return <-result
}