Skip to content

Commit

Permalink
Add optimistic mode (#123)
Browse files Browse the repository at this point in the history
  • Loading branch information
ramondeklein authored Dec 10, 2024
1 parent 64f8e79 commit c2078b8
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 57 deletions.
6 changes: 3 additions & 3 deletions http-tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func InternalTrace(req *http.Request, resp *http.Response, reqTime, respTime tim
}

// Trace gets trace of http request
func Trace(f http.HandlerFunc, logBody bool, w http.ResponseWriter, r *http.Request, endpoint string) TraceInfo {
func Trace(f http.HandlerFunc, logBody bool, w http.ResponseWriter, r *http.Request, backend *Backend) TraceInfo {
// Setup a http request body recorder
reqHeaders := r.Header.Clone()
reqHeaders.Set("Host", r.Host)
Expand Down Expand Up @@ -270,7 +270,7 @@ func Trace(f http.HandlerFunc, logBody bool, w http.ResponseWriter, r *http.Requ

t.ReqInfo = rq
t.RespInfo = rs
t.NodeName = endpoint
t.NodeName = backend.endpoint
t.CallStats = traceCallStats{
Latency: rs.Time.Sub(rw.StartTime),
Rx: reqBodyRecorder.Size(),
Expand All @@ -286,7 +286,7 @@ func Trace(f http.HandlerFunc, logBody bool, w http.ResponseWriter, r *http.Requ

// Log only the headers.
func httpTraceHdrs(f http.HandlerFunc, w http.ResponseWriter, r *http.Request, backend *Backend) {
trace := Trace(f, false, w, r, backend.endpoint)
trace := Trace(f, false, w, r, backend)
doTrace(trace, backend)
}

Expand Down
161 changes: 107 additions & 54 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ type logMessage struct {
// Endpoint of backend
Endpoint string `json:"Endpoint"`
// Error message
Error error `json:"Error,omitempty"`
Error string `json:"Error,omitempty"`
// Status of endpoint
Status string `json:"Status,omitempty"`
// Downtime so far
Expand All @@ -141,7 +141,7 @@ type logMessage struct {
}

func (l logMessage) String() string {
if l.Error == nil {
if l.Error == "" {
if l.DowntimeDuration > 0 {
return fmt.Sprintf("%s%2s: %s %s is %s Downtime duration: %s",
console.Colorize("LogMsgType", l.Type), "",
Expand All @@ -157,6 +157,7 @@ func (l logMessage) String() string {

// Backend entity to which requests gets load balanced.
type Backend struct {
ctxt context.Context
siteNumber int
endpoint string
proxy *reverse.Proxy
Expand All @@ -165,6 +166,7 @@ type Backend struct {
healthCheckURL string
healthCheckDuration time.Duration
healthCheckTimeout time.Duration
healthOptimistic bool
Stats *BackendStats
}

Expand All @@ -173,12 +175,58 @@ const (
online
)

func (b *Backend) setOffline() {
atomic.StoreInt32(&b.up, offline)
func (b *Backend) setOffline(msg string) (swapped bool) {
if atomic.SwapInt32(&b.up, offline) == offline {
return false
}

now := time.Now().UTC()

b.Stats.Lock()
b.Stats.DowntimeStart = now
b.Stats.UpSince = time.Time{}
b.Stats.Unlock()

if b.healthOptimistic {
go b.healthCheck(false)
}

if globalLoggingEnabled {
logMsg(logMessage{
Endpoint: b.endpoint,
Status: "down",
Error: msg,
})
}

return true
}

func (b *Backend) setOnline() {
atomic.StoreInt32(&b.up, online)
func (b *Backend) setOnline() (swapped bool) {
if atomic.SwapInt32(&b.up, online) == online {
return false
}
now := time.Now().UTC()

b.Stats.Lock()
b.Stats.UpSince = now
if !b.Stats.DowntimeStart.IsZero() {
downtime := now.Sub(b.Stats.DowntimeStart)
b.Stats.LastDowntime = downtime
b.Stats.CumDowntime += downtime
}
b.Stats.DowntimeStart = time.Time{}
b.Stats.Unlock()

if globalLoggingEnabled {
logMsg(logMessage{
Endpoint: b.endpoint,
Status: "up",
DowntimeDuration: b.Stats.LastDowntime,
})
}

return true
}

// Online returns true if backend is up
Expand Down Expand Up @@ -247,10 +295,7 @@ func (b *Backend) ErrorHandler(w http.ResponseWriter, r *http.Request, err error
}
}
if offline {
if globalLoggingEnabled {
logMsg(logMessage{Endpoint: b.endpoint, Status: "down", Error: err})
}
b.setOffline()
b.setOffline(err.Error())
}

writeErrorResponse(w, r, err)
Expand Down Expand Up @@ -312,18 +357,27 @@ func getHealthCheckURL(endpoint, healthCheckPath string, healthCheckPort int) (s
}

// healthCheck - background routine which checks if a backend is up or down.
func (b *Backend) healthCheck(ctxt context.Context) {
func (b *Backend) healthCheck(immediate bool) {
if immediate {
if err := b.doHealthCheck(); err != nil {
console.Errorln(err)
} else if b.healthOptimistic && b.Online() {
return
}
}

rng := rand.New(rand.NewSource(time.Now().UnixNano()))
timer := time.NewTimer(b.healthCheckDuration)
defer timer.Stop()
for {
select {
case <-ctxt.Done():
case <-b.ctxt.Done():
return
case <-timer.C:
err := b.doHealthCheck()
if err != nil {
if err := b.doHealthCheck(); err != nil {
console.Errorln(err)
} else if b.healthOptimistic && b.Online() {
return
}
// Add random jitter to call
timer.Reset(b.healthCheckDuration + time.Duration(rng.Int63n(int64(b.healthCheckDuration))))
Expand Down Expand Up @@ -353,33 +407,16 @@ func (b *Backend) doHealthCheck() error {
resp, err := b.httpClient.Do(req)
respTime := time.Now().UTC()
drainBody(resp)
if err != nil || (err == nil && resp.StatusCode != http.StatusOK) {
if globalLoggingEnabled && (!b.Online() || b.Stats.UpSince.IsZero()) {
logMsg(logMessage{Endpoint: b.endpoint, Status: "down", Error: err})
}
// observed an error, take the backend down.
b.setOffline()
if b.Stats.DowntimeStart.IsZero() {
b.Stats.DowntimeStart = time.Now().UTC()
}
} else {
var downtimeEnd time.Time
if !b.Stats.DowntimeStart.IsZero() {
now := time.Now().UTC()
b.updateDowntime(now.Sub(b.Stats.DowntimeStart))
downtimeEnd = now
}
if globalLoggingEnabled && !b.Online() && !b.Stats.UpSince.IsZero() {
logMsg(logMessage{
Endpoint: b.endpoint,
Status: "up",
DowntimeDuration: downtimeEnd.Sub(b.Stats.DowntimeStart),
})
}
b.Stats.UpSince = time.Now().UTC()
b.Stats.DowntimeStart = time.Time{}

switch {
case err != nil:
b.setOffline(err.Error())
case resp.StatusCode != http.StatusOK:
b.setOffline(fmt.Sprintf("response status %d", resp.StatusCode))
default:
b.setOnline()
}

if globalTrace != "application" {
if resp != nil {
traceHealthCheckReq(req, resp, reqTime, respTime, b, err)
Expand All @@ -389,13 +426,6 @@ func (b *Backend) doHealthCheck() error {
return nil
}

func (b *Backend) updateDowntime(downtime time.Duration) {
b.Stats.Lock()
defer b.Stats.Unlock()
b.Stats.LastDowntime = downtime
b.Stats.CumDowntime += downtime
}

// updateCallStats updates the cumulative stats for each call to backend
func (b *Backend) updateCallStats(t shortTraceMsg) {
b.Stats.Lock()
Expand Down Expand Up @@ -438,6 +468,7 @@ type healthCheckOptions struct {
healthCheckPort int
healthCheckDuration time.Duration
healthCheckTimeout time.Duration
healthOptimistic bool
}

func (m *multisite) renewSite(ctx *cli.Context, tlsMaxVersion uint16, opts healthCheckOptions) {
Expand Down Expand Up @@ -503,6 +534,14 @@ func (m *multisite) populate() {
minLatency = fmt.Sprintf("%2s", b.Stats.MinLatency.Round(time.Microsecond))
maxLatency = fmt.Sprintf("%2s", b.Stats.MaxLatency.Round(time.Microsecond))
}
cumDowntime := b.Stats.CumDowntime
lastDowntime := b.Stats.LastDowntime
if !b.Online() {
// show current downtime and cumulative downtime including
// the current downtime
lastDowntime = time.Now().UTC().Sub(b.Stats.DowntimeStart)
cumDowntime += lastDowntime
}
cellText[i*len(site.backends)+j+1] = []string{
humanize.Ordinal(b.siteNumber),
b.endpoint,
Expand All @@ -511,8 +550,8 @@ func (m *multisite) populate() {
strconv.FormatInt(b.Stats.TotCallFailures, 10),
humanize.IBytes(uint64(b.Stats.Rx)),
humanize.IBytes(uint64(b.Stats.Tx)),
b.Stats.CumDowntime.Round(time.Microsecond).String(),
b.Stats.LastDowntime.Round(time.Microsecond).String(),
cumDowntime.Round(time.Microsecond).String(),
lastDowntime.Round(time.Microsecond).String(),
minLatency,
maxLatency,
}
Expand Down Expand Up @@ -886,8 +925,8 @@ func configureSite(ctxt context.Context, ctx *cli.Context, siteNum int, siteStrs
var backends []*Backend
var prevScheme string
var transport http.RoundTripper
var connStats []*ConnStats
var hostName string

if len(endpoints) == 1 && ctx.GlobalBool("rr-dns-mode") {
console.Infof("RR DNS mode enabled, using %s as hostname", endpoints[0])
// guess it is LB config address
Expand All @@ -906,6 +945,15 @@ func configureSite(ctxt context.Context, ctx *cli.Context, siteNum int, siteStrs
endpoints = append(endpoints, strings.Replace(target.String(), hostName, ip, 1))
}
}

var connStats []*ConnStats
for _, endpoint := range endpoints {
endpoint = strings.TrimSuffix(endpoint, slashSeparator)
connStats = append(connStats, newConnStats(endpoint))
}
globalConnStats.Store(&connStats)

healthOptimistic := ctx.GlobalBool("health-optimistic")
for _, endpoint := range endpoints {
endpoint = strings.TrimSuffix(endpoint, slashSeparator)
target, err := url.Parse(endpoint)
Expand Down Expand Up @@ -956,15 +1004,13 @@ func configureSite(ctxt context.Context, ctx *cli.Context, siteNum int, siteStrs
if err != nil {
console.Fatalln(err)
}
backend := &Backend{siteNum, endpoint, proxy, &http.Client{
backend := &Backend{ctxt, siteNum, endpoint, proxy, &http.Client{
Transport: proxy.Transport,
}, 0, healthCheckURL, opts.healthCheckDuration, opts.healthCheckTimeout, &stats}
go backend.healthCheck(ctxt)
}, 0, healthCheckURL, opts.healthCheckDuration, opts.healthCheckTimeout, healthOptimistic, &stats}
proxy.ErrorHandler = backend.ErrorHandler
backends = append(backends, backend)
connStats = append(connStats, newConnStats(endpoint))
go backend.healthCheck(true)
}
globalConnStats.Store(&connStats)
return &site{
backends: backends,
}
Expand Down Expand Up @@ -993,6 +1039,7 @@ func sidekickMain(ctx *cli.Context) {
})
log2.SetReportCaller(true)

healthOptimistic := ctx.GlobalBool("health-optimistic")
healthCheckPath := ctx.GlobalString("health-path")
healthReadCheckPath := ctx.GlobalString("read-health-path")
healthCheckPort := ctx.GlobalInt("health-port")
Expand Down Expand Up @@ -1082,6 +1129,7 @@ func sidekickMain(ctx *cli.Context) {
healthCheckPort,
healthCheckDuration,
healthCheckTimeout,
healthOptimistic,
})
m.displayUI(!globalConsoleDisplay)

Expand Down Expand Up @@ -1179,6 +1227,7 @@ func sidekickMain(ctx *cli.Context) {
healthCheckPort,
healthCheckDuration,
healthCheckTimeout,
healthOptimistic,
})
default:
console.Infof("caught signal '%s'\n", signal)
Expand Down Expand Up @@ -1240,6 +1289,10 @@ func main() {
Name: "rr-dns-mode",
Usage: "enable round-robin DNS mode",
},
cli.BoolFlag{
Name: "health-optimistic",
Usage: "only perform health requests when nodes are down",
},
cli.StringFlag{
Name: "auto-tls-host",
Usage: "enable auto TLS mode for the specified host",
Expand Down

0 comments on commit c2078b8

Please sign in to comment.