-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathrequest.go
152 lines (135 loc) · 3.62 KB
/
request.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package dagchain
import (
"encoding/gob"
"fmt"
"net"
"strconv"
)
// Request 节点之间交换的数据结构
type Request struct {
ID int64
Command int
Data interface{}
From string
}
const (
NormalRequest = 0 // outer application's data
NormalRequestReceived = 1 // ack
ServerPing = 2 // ping to the seed
ServerPong = 3 // pong to the ping
BackupSeeds = 4 // return the backup seeds
SyncBackupSeeds = 5 // query for the backup seeds
)
func (r *Request) handle(node *Node, conn net.Conn) (string, error) {
switch r.Command {
case NormalRequestReceived:
// delete the message from resend queue
deleteResend(r.ID, r.From)
case NormalRequest:
// route the received message to other nodes and outer application
routeSend(node, r)
// response with ack
encoder := gob.NewEncoder(conn)
encoder.Encode(Request{
ID: r.ID,
Command: NormalRequestReceived,
From: node.nodeAddr,
})
case SyncBackupSeeds:
// the address of the requester
fromAddr := r.Data.(string)
// filter the adjacent nodes from current seed and the downstream nodes
// Avoid forming a dead loop, seed addr mustn't be equal to from addr
var addrs []string
if node.seedAddr != "" && node.seedAddr != fromAddr {
addrs = append(addrs, node.seedAddr)
}
for addr := range node.downstreams {
if len(addrs) < maxBackupSeedLen && addr != fromAddr {
addrs = append(addrs, addr)
}
}
encoder := gob.NewEncoder(conn)
encoder.Encode(Request{
Command: BackupSeeds,
Data: addrs,
})
case BackupSeeds:
addrs := r.Data.([]string)
for _, addr1 := range addrs {
if addr1 == "" {
continue
}
// the strategy of seeds update
// if the upper limit of the seedBackup is not reached, we can append the new addr to the seedBackup
// otherwise, we need to replace those nodes whose connection retries bigger than the maxRetry,
// with the new seed
exist := false
maxRetry := 0
for _, seed := range node.seedBackup {
if seed.retry > maxRetry {
maxRetry = seed.retry
}
if addr1 == seed.addr {
exist = true
break
}
}
if !exist {
if len(node.seedBackup) >= maxBackupSeedLen {
if maxRetry <= seedMaxRetry {
break
}
for i, seed := range node.seedBackup {
if seed.retry > seedMaxRetry {
node.seedBackup[i] = &Seed{
addr: addr1,
retry: 0,
}
}
}
} else {
node.seedBackup = append(node.seedBackup, &Seed{
addr: addr1,
retry: 0,
})
}
}
}
fmt.Printf("source seed: %s,current seed:%s,backup seeds:%v,downsteam:%v\n", node.sourceAddr, node.seedAddr, getSeedAddrs(node.seedBackup), node.downstreams)
case ServerPing:
// a downstream node sends its address to its seed node(our node)
addr, ok := r.Data.(string)
if ok {
// we need to add the downstream node
lock.Lock()
node.downstreams[addr] = conn
lock.Unlock()
// a node can't appear in downstream and seedBackup at the same time
for i, seed := range node.seedBackup {
if seed.addr == addr {
node.seedBackup = append(node.seedBackup[:i], node.seedBackup[i+1:]...)
break
}
}
encoder := gob.NewEncoder(conn)
encoder.Encode(Request{
Command: ServerPong,
From: node.nodeAddr,
})
return addr, nil
}
case ServerPong:
node.pinged = true
default:
fmt.Println("unrecognized message type:", r.Command)
}
return "", nil
}
func getSeedAddrs(seeds []*Seed) []string {
addrs := make([]string, len(seeds))
for i, seed := range seeds {
addrs[i] = seed.addr + "/" + strconv.Itoa(seed.retry)
}
return addrs
}