-
Notifications
You must be signed in to change notification settings - Fork 1
/
consul.go
195 lines (162 loc) · 5.07 KB
/
consul.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
// Package consul implements a high-level wrapper Consul HTTP client for easy service discovery.
// Provides additional features, such as time based lookups and retry policy.
package consul
import (
"errors"
"net/http"
"sync"
"time"
consul "github.com/hashicorp/consul/api"
"gopkg.in/vinxi/balancer.v0"
)
var (
// DefaultBalancer stores the roundrobin balancer used by default.
DefaultBalancer = balancer.DefaultBalancer
// ConsulMaxWaitTime defines the maximum Consul wait time before treat it as timeout error.
ConsulMaxWaitTime = 5 * time.Second
// ConsulWaitTimeInterval defines the wait interval for node servers become available.
ConsulWaitTimeInterval = 100 * time.Millisecond
)
var (
// ErrDiscoveryTimeout is used in case that discovery timeout exceeded.
ErrDiscoveryTimeout = errors.New("consul: cannot discover servers due to timeout")
)
// Consul is a wrapper around the Consul API for convenience with additional capabilities.
// Service discoverability will be performed in background.
type Consul struct {
// RWMutex provides a struct mutex to prevent data races.
sync.RWMutex
// ms synchronizes the access to started field.
ms sync.Mutex
// started stores if the Consul discovery goroutine has been started.
started bool
// quit is used internally to open/close the Consul servers update goroutine.
quit chan bool
// nodes is used to cached server nodes URLs provided by Consul servers for the specific service.
nodes []string
// Config stores the Consul client vinxi config options used for discovery.
Config *Config
// Retrier stores the retry strategy to be used if Consul discovery process fails.
Retrier Retrier
// Balancer stores the balancer to be used to distribute traffic
// load across multiple servers provided by Consul.
Balancer balancer.Balancer
}
// New creates a new Consul provider middleware, implementing a Consul client that will
// ask to Consul servers.
func New(config *Config) *Consul {
return &Consul{Config: config, Retrier: DefaultRetrier, Balancer: DefaultBalancer}
}
// nextConsulServer returns the next available server based on the current iteration index.
func (c *Consul) nextConsulServer(index int) (*consul.Config, bool) {
servers := c.Config.Instances
if l := len(servers); index < l {
return servers[index], index != (l - 1)
}
return servers[0], false
}
// UpdateNodes is used to update a list of server nodes for the current discovery service.
func (c *Consul) UpdateNodes() ([]string, error) {
var retries int
var entries []*consul.ServiceEntry
// Fetch Consul service's nodes retrying using the configured strategy
err := NewRetrier(c.Retrier).Run(func() error {
var err error
config, more := c.nextConsulServer(retries)
if !more {
retries = 0
}
retries++
client := NewClient(config)
entries, _, err = client.Health(c.Config.Service, c.Config.Tag, c.Config.QueryOptions)
return err
})
return c.Config.Mapper(entries), err
}
// Stop stops the Consul servers update interval goroutine.
func (c *Consul) Stop() {
close(c.quit)
c.ms.Lock()
c.started = false
c.ms.Unlock()
}
// Start starts the Consul servers update interval goroutine.
func (c *Consul) Start() {
c.ms.Lock()
if c.started {
return
}
c.ms.Unlock()
go c.updateInterval(c.Config.RefreshTime)
}
// updateInterval recursively ask to Consul servers to update the list of available server nodes.
func (c *Consul) updateInterval(interval time.Duration) {
for {
select {
case <-c.quit:
return
default:
nodes, err := c.UpdateNodes()
// TODO: handle error
if err == nil && len(nodes) > 0 {
c.Lock()
c.nodes = nodes
c.Unlock()
}
time.Sleep(interval)
}
}
}
// GetNodes returns a list of server nodes hostnames for the configured service.
func (c *Consul) GetNodes() ([]string, error) {
// Start the Consul background fetcher, if stopped
c.Start()
// Wait until Consul nodes are available.
var loops int64
for range time.NewTicker(ConsulWaitTimeInterval).C {
if (loops * int64(ConsulWaitTimeInterval)) > int64(ConsulMaxWaitTime) {
return nil, ErrDiscoveryTimeout
}
loops++
c.RLock()
if len(c.nodes) > 0 {
c.RUnlock()
break
}
c.RUnlock()
}
c.RLock()
defer c.RUnlock()
return c.nodes, nil
}
// getTargetHost returns the next target host available with optional balancing
func (c *Consul) getTargetHost(nodes []string) (string, error) {
if c.Balancer == nil {
return nodes[0], nil
}
return c.Balancer.Balance(nodes)
}
// HandleHTTP returns the list of healthy entries for a given service filtered by tag.
func (c *Consul) HandleHTTP(w http.ResponseWriter, r *http.Request, h http.Handler) {
if len(c.Config.Instances) == 0 {
h.ServeHTTP(w, r)
return
}
// Retrieve latest service server from Consul
nodes, err := c.GetNodes()
if err != nil || len(nodes) == 0 {
w.WriteHeader(http.StatusBadGateway)
w.Write([]byte(err.Error()))
return
}
// Balance traffic using the configured balancer, if enabled
target, err := c.getTargetHost(nodes)
if err != nil {
h.ServeHTTP(w, r)
return
}
// Define the URL to forward the request
r.Host = target
r.URL.Host = target
h.ServeHTTP(w, r)
}