-
Notifications
You must be signed in to change notification settings - Fork 0
/
shard.go
338 lines (296 loc) · 9.47 KB
/
shard.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
/* Package shard allows mapping of resources, e.g, services, to be
dynamically scaled using consistent hashing algorithm using etcd.
*/
package shard
// ConsistentHashRes.go allows mapping of resources, e.g, services, to be
// dynamically scaled using consistent hashing algorithm using shard.
// Assume we have a service, a type of resources, and we want client to
// consistently query the service based on a string token. It's very similar to
// sharding of services where requests identified by the same token always
// go to the same service provider. However, consistent hashing of a service
// has the benefit over sharding in the sense we don't have to predetermine
// how many shards we need. Instead, as there are more and more request
// loads, we just keep adding service providers. Because of consistent hashing,
// addition of new service provider only redistribute a small portion of token
// to service provider mapping. And if both client and service provide both
// uses the same consistent hashing table, then we can ensure that:
// 1, when clients and service providers have the same consistent hash mapping
// then one token will always map to the same provider, and
// 2, when clients and service providers have different mapping, the mismatch
// will be detected.
//
// The following is an example of using ConsistentHashRes to map request hashed
// with a string token always map to a service provider
//
// On the server side
// import shard "htc.com/csi/base/shard"
// import "github.com/csigo/ephemeral"
//
// var emConn ephemeral.Ephemeral
// var emRoot string
// //setup shard client connection
// //...
//
// service := NewService(ipport)
// conhash := shard.NewConsistentHashResServer(emConn, emRoot, "my_service_id",
// shard.ConsistentHashMapReplicaNum, time.Second)
//
// One the client side, we have
//
// conhash := shard.NewConsistentHashResClient(emConn, emRoot,
// shard.ConsistentHashMapReplicaNum,time.Second)
// service_id := conhash.Get("token1")
// connect_to_server(service_id)
import (
"fmt"
"net"
"strings"
"sync"
"time"
log "github.com/Sirupsen/logrus"
"github.com/csigo/ephemeral"
"github.com/facebookgo/stats"
"github.com/golang/groupcache/consistenthash"
"github.com/spaolacci/murmur3"
"golang.org/x/net/context"
)
const (
//ConsistentHashMapReplicaNum default number of replicas for
//each consistent map entries
ConsistentHashMapReplicaNum = 50
maxHosts = 1000
)
var (
//ErrConnTimedOut timeout
ErrConnTimedOut = fmt.Errorf("[shard] init timeout")
)
// ResFinder is the interface to lookup information of resources
type ResFinder interface {
//Get return server ipport given the key
Get(key string) (string, bool)
// IsMyKey checks whether the given key is handled by current shard
IsMyKey(key string) bool
// All returns all ipports in current resource pool
All() map[string]struct{}
}
//ConsistentHashRes represents a consistently hashed resource.
type ConsistentHashRes struct {
sync.Mutex
ephemeral ephemeral.Ephemeral // ephemeral tracks online service instances.
root string //znode root path for consistent hashed connections
cmap *consistenthash.Map //consistent hash map
ipport string //ipport for server, "" for client
done chan struct{} //done channel
peers map[string]struct{} //set for maintaining all peers
ctr stats.Client
}
const (
// timeWait is the waiting time for getting data from zookeepr again
timeWait = 200 * time.Millisecond
)
//NewConsistentHashResClient client is an entity that doesn't provide a consistently
//hashed resource, but uses one.
func NewConsistentHashResClient(
em ephemeral.Ephemeral,
root string,
replica int,
timeout time.Duration,
counter stats.Client,
) (*ConsistentHashRes, error) {
return newConsistentHashRes(em, root, "", replica, timeout, counter)
}
//NewConsistentHashResServer is a provider of a consistently hashed resource.
func NewConsistentHashResServer(
em ephemeral.Ephemeral,
root string,
ipport string,
replica int,
timeout time.Duration,
counter stats.Client,
) (*ConsistentHashRes, error) {
return newConsistentHashRes(em, root, ipport, replica, timeout, counter)
}
//GetResources return a slice of all distinct resources
func (c *ConsistentHashRes) GetResources() (ret []string) {
for k := range c.peers {
ret = append(ret, k)
}
return
}
func (c *ConsistentHashRes) setCmap(other *consistenthash.Map,
resMap map[string]struct{}) {
c.Lock()
c.cmap = other
c.peers = resMap
c.Unlock()
}
// createNode creates an ephemeral node under zookeeper root node
func createNode(em ephemeral.Ephemeral, path string) {
err := em.AddKey(context.Background(), path, "")
// We don't skip if err == ErrPathExists. If path exists, other server is occupying the
// same znode. Should error / fatal to notify.
if err != nil {
// NOTE: we are going to fatal if creating znode failed
log.Fatalf("[shard] fail to create %v %v", path, err)
}
}
func newConsistentHashRes(
em ephemeral.Ephemeral,
root string,
ipport string,
replica int,
timeout time.Duration,
counter stats.Client,
) (*ConsistentHashRes, error) {
c := &ConsistentHashRes{
ephemeral: em,
root: root,
cmap: consistenthash.New(replica, murmur3.Sum32),
done: make(chan struct{}),
ipport: ipport,
ctr: counter,
}
// ensure root path
if err := ephemeral.EnsurePath(em, root); err != nil {
return nil, err
}
//if I am a server then register
if ipport != "" {
if _, _, err := net.SplitHostPort(ipport); err != nil {
log.Errorf("incoming hostport %s isn't in host:port format, err %v", ipport, err)
return nil, err
}
node := makeNode(root, ipport)
createNode(em, node)
}
ready := make(chan struct{})
var retErr error
//listen to server events
go func() {
readySent := false
receiver := c.ephemeral.List(context.Background(), c.root, true)
zkloop:
// TODO: add maxRetries
for {
var resp *ephemeral.ListResponse
select {
case <-c.done: // signal done received
c.ctr.BumpSum("loop.done", 1)
break zkloop
case resp = <-receiver:
if resp.Err == ephemeral.ErrPathNotFound {
log.Fatalf("[shard] root directory<%s> not found", c.root)
}
if resp.Err != nil {
// TODO: handle conn.close
// when conn is closed, we will get an ErrSessionExired
// vendor/src/github.com/samuel/go-zookeeper/shard/zk_test.go
// line 370
c.ctr.BumpSum("loop.setwatch.err", 1)
log.Errorf("[shard] fail to watch %v err: %v", c.root, resp.Err)
retErr = resp.Err
time.Sleep(timeWait)
// Re-assign receiver.
receiver = c.ephemeral.List(context.Background(), c.root, true)
continue
}
c.ctr.BumpSum("loop.zkchange", 1)
}
log.Infof("[shard] in consistentres, root:%s, children: %v", c.root, resp.Children)
cmap := consistenthash.New(replica, murmur3.Sum32)
keys := make(map[string]struct{})
// The list will be reallocated by append() if size is not enough
hosts := make([]string, 0, maxHosts)
for _, child := range resp.Children {
ipport, err := ExtractIPPort(child)
if err != nil {
c.ctr.BumpSum("loop.parse.err", 1)
log.Errorf("[shard] parse error root %v, node %v err %v",
c.root, child, err)
continue
}
if _, ok := keys[ipport]; ok {
c.ctr.BumpSum("loop.dupkey.warn", 1)
log.Infof("[shard] duplicated shard info %v %v", c.root, ipport)
continue
}
keys[ipport] = struct{}{}
hosts = append(hosts, ipport)
}
cmap.Add(hosts...)
//replace the old cmap
c.setCmap(cmap, keys)
//signal ready
if !readySent {
// if ready, clear previous err
retErr = nil
c.ctr.BumpSum("loop.ready", 1)
ready <- struct{}{}
readySent = true
}
}
close(ready)
}()
// wait till ready
select {
case <-ready:
if retErr != nil {
c.ctr.BumpSum("newhash.init.err", 1)
return nil, retErr
}
c.ctr.BumpSum("newhash.ready", 1)
case <-time.After(timeout):
c.ctr.BumpSum("newhash.timeout.err", 1)
log.Errorf("[shard] consistent hash init timeout %v", c.root)
return nil, ErrConnTimedOut
}
return c, nil
}
// TODO: retire separator
const seperator = "_"
// makeNode generated path of a ZNode
func makeNode(zkRoot string, ipport string) string {
return fmt.Sprintf("%s/%s", zkRoot, ipport)
}
// ExtractIPPort parses the input name and returns host IP port which is the
// first token
func ExtractIPPort(name string) (string, error) {
// TODO: Remove separator. Separator and spliting is needed for backward compatibility.
// Should remove separator checking after all servers don't use separator anymore.
tokens := strings.Split(name, seperator)
/*
if len(tokens) <= 1 {
return "", fmt.Errorf("[shard] illegal znode name %s", name)
}
*/
return tokens[0], nil
}
//Close close the client
func (c *ConsistentHashRes) Close() {
c.done <- struct{}{}
}
//HostPort returns the ipport of this connection. If ipport is empty then it's a
//client
func (c *ConsistentHashRes) HostPort() string {
return c.ipport
}
//Get return server ipport given the key
func (c *ConsistentHashRes) Get(key string) (string, bool) {
defer c.ctr.BumpTime("conhash.get").End()
c.Lock()
defer c.Unlock()
if c.cmap.IsEmpty() {
return "", false
}
return c.cmap.Get(key), true
}
// IsMyKey checks whether the given key is handled by current shard
func (c *ConsistentHashRes) IsMyKey(key string) bool {
return c.ipport != "" && c.cmap.Get(key) == c.ipport
}
// All returns all ipports in current resource pool
func (c *ConsistentHashRes) All() map[string]struct{} {
c.Lock()
defer c.Unlock()
return c.peers
}