-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathresend.go
102 lines (90 loc) · 1.79 KB
/
resend.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package dagchain
import (
"encoding/gob"
"net"
"time"
)
type Packet struct {
Addr string
retrys int
}
var sendPackets = make(map[int64][]*Packet)
var sendDatas = make(map[int64]Request)
func deleteResend(rid int64, from string) {
lock.Lock()
ps, ok := sendPackets[rid]
lock.Unlock()
if !ok {
return
}
for i, p := range ps {
if p.Addr == from {
ps = append(ps[:i], ps[i+1:]...)
break
}
}
if len(ps) != 0 {
lock.Lock()
sendPackets[rid] = ps
lock.Unlock()
return
}
lock.Lock()
delete(sendPackets, rid)
delete(sendDatas, rid)
lock.Unlock()
}
// periodically resend the messages
func resend(node *Node) {
for {
now := time.Now().Unix()
lock.Lock()
for rid, ps := range sendPackets {
// if the message stays too long,we will delete it directly
if now-(rid/1e9) > maxResendStayTime {
delete(sendPackets, rid)
delete(sendDatas, rid)
continue
}
// the message must stays for some time to resend
if now-(rid/1e9) > minResendStayTime {
r, ok := sendDatas[rid]
if ok {
for i, p := range ps {
conn := getConnByAddr(p.Addr, node)
if conn == nil {
// the conn is empty,delete the message
ps = append(ps[:i], ps[i+1:]...)
continue
}
encoder := gob.NewEncoder(conn)
err := encoder.Encode(r)
if err != nil {
// the conn is broken, delete the message
ps = append(ps[:i], ps[i+1:]...)
continue
}
}
}
}
if len(ps) == 0 {
delete(sendPackets, rid)
delete(sendDatas, rid)
} else {
sendPackets[rid] = ps
}
}
lock.Unlock()
time.Sleep(10 * time.Second)
}
}
func getConnByAddr(addr string, node *Node) net.Conn {
if addr == node.seedAddr {
return node.seedConn
}
conn, ok := node.downstreams[addr]
if ok {
return conn
}
return nil
}