Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
refactor
  • Loading branch information
jiuker committed Jul 10, 2024
1 parent 55054df commit a49e9ac
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 80 deletions.
235 changes: 155 additions & 80 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@ import (
"net/http/pprof"
"net/url"
"os"
"os/signal"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"

"github.com/dustin/go-humanize"
Expand Down Expand Up @@ -63,17 +65,18 @@ const (
)

var (
globalQuietEnabled bool
globalDebugEnabled bool
globalLoggingEnabled bool
globalTrace string
globalJSONEnabled bool
globalConsoleDisplay bool
globalErrorsOnly bool
globalStatusCodes []int
globalConnStats []*ConnStats
log2 *logrus.Logger
globalHostBalance string
globalQuietEnabled bool
globalDebugEnabled bool
globalLoggingEnabled bool
globalTrace string
globalJSONEnabled bool
globalConsoleDisplay bool
globalErrorsOnly bool
globalStatusCodes []int
globalConnStatsRWMutex sync.RWMutex
globalConnStats []*ConnStats
log2 *logrus.Logger
globalHostBalance string
)

const (
Expand Down Expand Up @@ -305,14 +308,20 @@ func getHealthCheckURL(endpoint, healthCheckPath string, healthCheckPort int) (s
}

// healthCheck - background routine which checks if a backend is up or down.
func (b *Backend) healthCheck() {
func (b *Backend) healthCheck(ctxt context.Context) {
ticker := time.NewTicker(b.healthCheckDuration)
defer ticker.Stop()
for {
err := b.doHealthCheck()
if err != nil {
console.Fatalln(err)
}

time.Sleep(b.healthCheckDuration)
select {
case <-ctxt.Done():
return
case <-ticker.C:
err := b.doHealthCheck()
if err != nil {
console.Errorln(err)
}
}
}
}

Expand Down Expand Up @@ -393,6 +402,9 @@ func (b *Backend) updateCallStats(t shortTraceMsg) {
b.Stats.MinLatency = time.Duration(int64(math.Min(float64(b.Stats.MinLatency), float64(t.CallStats.Latency))))
b.Stats.Rx += int64(t.CallStats.Rx)
b.Stats.Tx += int64(t.CallStats.Tx)
// automatically update the global stats
// Read/Write Lock is not required here
globalConnStatsRWMutex.RLock()
for _, c := range globalConnStats {
if c == nil {
continue
Expand All @@ -407,38 +419,105 @@ func (b *Backend) updateCallStats(t shortTraceMsg) {
c.setTotalCalls(b.Stats.TotCalls)
c.setTotalCallFailures(b.Stats.TotCallFailures)
}
globalConnStatsRWMutex.RUnlock()
}

type multisite struct {
sites []*site
sites []*site
healthCanceler context.CancelFunc
rwLocker sync.RWMutex
}

func (m *multisite) renewSite(ctx *cli.Context, healthCheckPath string, healthReadCheckPath string, healthCheckPort int, healthCheckDuration, healthCheckTimeout time.Duration) {
ctxt, cancel := context.WithCancel(context.Background())
var sites []*site
for i, siteStrs := range ctx.Args() {
if i == len(ctx.Args())-1 {
healthCheckPath = healthReadCheckPath
}
site := configureSite(ctxt, ctx, i+1, strings.Split(siteStrs, ","), healthCheckPath, healthCheckPort, healthCheckDuration, healthCheckTimeout)
sites = append(sites, site)
}
m.rwLocker.Lock()
defer m.rwLocker.Unlock()
m.sites = sites
// cancel the previous health checker
if m.healthCanceler != nil {
m.healthCanceler()
}
m.healthCanceler = cancel
}
func (m *multisite) displayUI(show bool) {
if !show {
return
}
go func() {
// Clear screen before we start the table UI
clearScreen()

ticker := time.NewTicker(500 * time.Millisecond)
for range ticker.C {
m.populate()
}
}()
}

func (m *multisite) populate(cellText [][]string) {
func (m *multisite) populate() {
m.rwLocker.RLock()
defer m.rwLocker.RUnlock()

dspOrder := []col{colGreen} // Header
for i := 0; i < len(m.sites); i++ {
for range m.sites[i].backends {
dspOrder = append(dspOrder, colGrey)
}
}
var printColors []*color.Color
for _, c := range dspOrder {
printColors = append(printColors, getPrintCol(c))
}

tbl := console.NewTable(printColors, []bool{
false, false, false, false, false, false,
false, false, false, false, false,
}, 0)

cellText := make([][]string, len(dspOrder))
for i := range dspOrder {
cellText[i] = make([]string, len(headers))
}
cellText[0] = headers
for i, site := range m.sites {
for j, b := range site.backends {
b.Stats.Lock()
minLatency := "0s"
maxLatency := "0s"
if b.Stats.MaxLatency > 0 {
minLatency = fmt.Sprintf("%2s", b.Stats.MinLatency.Round(time.Microsecond))
maxLatency = fmt.Sprintf("%2s", b.Stats.MaxLatency.Round(time.Microsecond))
}
cellText[i*len(site.backends)+j][0] = humanize.Ordinal(b.siteNumber)
cellText[i*len(site.backends)+j][1] = b.endpoint
cellText[i*len(site.backends)+j][2] = b.getServerStatus()
cellText[i*len(site.backends)+j][3] = strconv.FormatInt(b.Stats.TotCalls, 10)
cellText[i*len(site.backends)+j][4] = strconv.FormatInt(b.Stats.TotCallFailures, 10)
cellText[i*len(site.backends)+j][5] = humanize.IBytes(uint64(b.Stats.Rx))
cellText[i*len(site.backends)+j][6] = humanize.IBytes(uint64(b.Stats.Tx))
cellText[i*len(site.backends)+j][7] = b.Stats.CumDowntime.Round(time.Microsecond).String()
cellText[i*len(site.backends)+j][8] = b.Stats.LastDowntime.Round(time.Microsecond).String()
cellText[i*len(site.backends)+j][9] = minLatency
cellText[i*len(site.backends)+j][10] = maxLatency
cellText[i*len(site.backends)+j+1][0] = humanize.Ordinal(b.siteNumber)
cellText[i*len(site.backends)+j+1][1] = b.endpoint
cellText[i*len(site.backends)+j+1][2] = b.getServerStatus()
cellText[i*len(site.backends)+j+1][3] = strconv.FormatInt(b.Stats.TotCalls, 10)
cellText[i*len(site.backends)+j+1][4] = strconv.FormatInt(b.Stats.TotCallFailures, 10)
cellText[i*len(site.backends)+j+1][5] = humanize.IBytes(uint64(b.Stats.Rx))
cellText[i*len(site.backends)+j+1][6] = humanize.IBytes(uint64(b.Stats.Tx))
cellText[i*len(site.backends)+j+1][7] = b.Stats.CumDowntime.Round(time.Microsecond).String()
cellText[i*len(site.backends)+j+1][8] = b.Stats.LastDowntime.Round(time.Microsecond).String()
cellText[i*len(site.backends)+j+1][9] = minLatency
cellText[i*len(site.backends)+j+1][10] = maxLatency
b.Stats.Unlock()
}
}
console.RewindLines(len(cellText) + 2)
tbl.DisplayTable(cellText)
}

func (m *multisite) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Server", "SideKick") // indicate sidekick is serving
m.rwLocker.RLock()
defer m.rwLocker.RUnlock()
for _, s := range m.sites {
if s.Online() {
if r.URL.Path == healthPath {
Expand Down Expand Up @@ -766,7 +845,7 @@ func IsLoopback(addr string) bool {
return net.ParseIP(host).IsLoopback()
}

func configureSite(ctx *cli.Context, siteNum int, siteStrs []string, healthCheckPath string, healthCheckPort int, healthCheckDuration, healthCheckTimeout time.Duration) *site {
func configureSite(ctxt context.Context, ctx *cli.Context, siteNum int, siteStrs []string, healthCheckPath string, healthCheckPort int, healthCheckDuration, healthCheckTimeout time.Duration) *site {
var endpoints []string

if ellipses.HasEllipses(siteStrs...) {
Expand All @@ -790,6 +869,27 @@ func configureSite(ctx *cli.Context, siteNum int, siteStrs []string, healthCheck
var backends []*Backend
var prevScheme string
var transport http.RoundTripper
globalConnStatsRWMutex.Lock()
defer globalConnStatsRWMutex.Unlock()
// reset connstats
globalConnStats = []*ConnStats{}
if len(endpoints) == 1 {
// guess it is LB config address
target, err := url.Parse(endpoints[0])
if err != nil {
console.Fatalln(fmt.Errorf("Unable to parse input arg %s: %s", endpoints[0], err))
}
hostName := target.Hostname()
ips, err := net.LookupHost(hostName)
if err != nil {
console.Fatalln(fmt.Errorf("Unable to lookup host %s", hostName))
}
// set the new endpoints
endpoints = []string{}
for _, ip := range ips {
endpoints = append(endpoints, strings.Replace(target.String(), hostName, ip, 1))
}
}
for _, endpoint := range endpoints {
endpoint = strings.TrimSuffix(endpoint, slashSeparator)
target, err := url.Parse(endpoint)
Expand Down Expand Up @@ -843,7 +943,7 @@ func configureSite(ctx *cli.Context, siteNum int, siteStrs []string, healthCheck
backend := &Backend{siteNum, endpoint, proxy, &http.Client{
Transport: proxy.Transport,
}, 0, healthCheckURL, healthCheckDuration, healthCheckTimeout, &stats}
go backend.healthCheck()
go backend.healthCheck(ctxt)
proxy.ErrorHandler = backend.ErrorHandler
backends = append(backends, backend)
globalConnStats = append(globalConnStats, newConnStats(endpoint))
Expand Down Expand Up @@ -922,16 +1022,6 @@ func sidekickMain(ctx *cli.Context) {
healthReadCheckPath = slashSeparator + healthReadCheckPath
}

var sites []*site
for i, siteStrs := range ctx.Args() {
if i == len(ctx.Args())-1 {
healthCheckPath = healthReadCheckPath
}

site := configureSite(ctx, i+1, strings.Split(siteStrs, ","), healthCheckPath, healthCheckPort, healthCheckDuration, healthCheckTimeout)
sites = append(sites, site)
}

if globalConsoleDisplay {
console.SetColor("LogMsgType", color.New(color.FgHiMagenta))
console.SetColor("TraceMsgType", color.New(color.FgYellow))
Expand Down Expand Up @@ -960,42 +1050,9 @@ func sidekickMain(ctx *cli.Context) {
console.Fatalln(err)
}

m := &multisite{sites}
if !globalConsoleDisplay {
dspOrder := []col{colGreen} // Header
for i := 0; i < len(sites); i++ {
for range sites[i].backends {
dspOrder = append(dspOrder, colGrey)
}
}
var printColors []*color.Color
for _, c := range dspOrder {
printColors = append(printColors, getPrintCol(c))
}

tbl := console.NewTable(printColors, []bool{
false, false, false, false, false, false,
false, false, false, false, false,
}, 0)

cellText := make([][]string, len(dspOrder))
for i := range dspOrder {
cellText[i] = make([]string, len(headers))
}
cellText[0] = headers

go func() {
// Clear screen before we start the table UI
clearScreen()

ticker := time.NewTicker(500 * time.Millisecond)
for range ticker.C {
m.populate(cellText[1:])
console.RewindLines(len(cellText) + 2)
tbl.DisplayTable(cellText)
}
}()
}
m := &multisite{}
m.renewSite(ctx, healthCheckPath, healthReadCheckPath, healthCheckPort, healthCheckDuration, healthCheckTimeout)
m.displayUI(!globalConsoleDisplay)

router.PathPrefix(slashSeparator).Handler(m)
server := &http.Server{
Expand All @@ -1017,8 +1074,26 @@ func sidekickMain(ctx *cli.Context) {
}
server.TLSConfig = tlsConfig
}
if err := server.ListenAndServe(); err != nil {
console.Fatalln(err)
go func() {
if err := server.ListenAndServe(); err != nil {
console.Fatalln(err)
}
}()
osSignalChannel := make(chan os.Signal, 1)
signal.Notify(
osSignalChannel,
syscall.SIGTERM,
syscall.SIGINT,
syscall.SIGHUP,
)
for signal := range osSignalChannel {
switch signal {
case syscall.SIGHUP:
m.renewSite(ctx, healthCheckPath, healthReadCheckPath, healthCheckPort, healthCheckDuration, healthCheckTimeout)
default:
console.Infof("caught signal '%s'\n", signal)
os.Exit(1)
}
}
}

Expand Down
4 changes: 4 additions & 0 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ func (c *sidekickCollector) Describe(ch chan<- *prometheus.Desc) {

// Collect is called by the Prometheus registry when collecting metrics.
func (c *sidekickCollector) Collect(ch chan<- prometheus.Metric) {
// automatically read the global stats
// Read/Write Lock is not required here
globalConnStatsRWMutex.RLock()
defer globalConnStatsRWMutex.RUnlock()
for _, c := range globalConnStats {
if c == nil {
continue
Expand Down

0 comments on commit a49e9ac

Please sign in to comment.