From a49e9ac28547b91163a67549c60a74eb12de5b28 Mon Sep 17 00:00:00 2001 From: jiuker Date: Wed, 10 Jul 2024 15:43:58 +0800 Subject: [PATCH] refactor refactor --- main.go | 235 +++++++++++++++++++++++++++++++++++------------------ metrics.go | 4 + 2 files changed, 159 insertions(+), 80 deletions(-) diff --git a/main.go b/main.go index a0ad09f..7e35b5e 100644 --- a/main.go +++ b/main.go @@ -31,11 +31,13 @@ import ( "net/http/pprof" "net/url" "os" + "os/signal" "sort" "strconv" "strings" "sync" "sync/atomic" + "syscall" "time" "github.com/dustin/go-humanize" @@ -63,17 +65,18 @@ const ( ) var ( - globalQuietEnabled bool - globalDebugEnabled bool - globalLoggingEnabled bool - globalTrace string - globalJSONEnabled bool - globalConsoleDisplay bool - globalErrorsOnly bool - globalStatusCodes []int - globalConnStats []*ConnStats - log2 *logrus.Logger - globalHostBalance string + globalQuietEnabled bool + globalDebugEnabled bool + globalLoggingEnabled bool + globalTrace string + globalJSONEnabled bool + globalConsoleDisplay bool + globalErrorsOnly bool + globalStatusCodes []int + globalConnStatsRWMutex sync.RWMutex + globalConnStats []*ConnStats + log2 *logrus.Logger + globalHostBalance string ) const ( @@ -305,14 +308,20 @@ func getHealthCheckURL(endpoint, healthCheckPath string, healthCheckPort int) (s } // healthCheck - background routine which checks if a backend is up or down. -func (b *Backend) healthCheck() { +func (b *Backend) healthCheck(ctxt context.Context) { + ticker := time.NewTicker(b.healthCheckDuration) + defer ticker.Stop() for { - err := b.doHealthCheck() - if err != nil { - console.Fatalln(err) - } - time.Sleep(b.healthCheckDuration) + select { + case <-ctxt.Done(): + return + case <-ticker.C: + err := b.doHealthCheck() + if err != nil { + console.Errorln(err) + } + } } } @@ -393,6 +402,9 @@ func (b *Backend) updateCallStats(t shortTraceMsg) { b.Stats.MinLatency = time.Duration(int64(math.Min(float64(b.Stats.MinLatency), float64(t.CallStats.Latency)))) b.Stats.Rx += int64(t.CallStats.Rx) b.Stats.Tx += int64(t.CallStats.Tx) + // automatically update the global stats + // Read/Write Lock is not required here + globalConnStatsRWMutex.RLock() for _, c := range globalConnStats { if c == nil { continue @@ -407,38 +419,105 @@ func (b *Backend) updateCallStats(t shortTraceMsg) { c.setTotalCalls(b.Stats.TotCalls) c.setTotalCallFailures(b.Stats.TotCallFailures) } + globalConnStatsRWMutex.RUnlock() } type multisite struct { - sites []*site + sites []*site + healthCanceler context.CancelFunc + rwLocker sync.RWMutex +} + +func (m *multisite) renewSite(ctx *cli.Context, healthCheckPath string, healthReadCheckPath string, healthCheckPort int, healthCheckDuration, healthCheckTimeout time.Duration) { + ctxt, cancel := context.WithCancel(context.Background()) + var sites []*site + for i, siteStrs := range ctx.Args() { + if i == len(ctx.Args())-1 { + healthCheckPath = healthReadCheckPath + } + site := configureSite(ctxt, ctx, i+1, strings.Split(siteStrs, ","), healthCheckPath, healthCheckPort, healthCheckDuration, healthCheckTimeout) + sites = append(sites, site) + } + m.rwLocker.Lock() + defer m.rwLocker.Unlock() + m.sites = sites + // cancel the previous health checker + if m.healthCanceler != nil { + m.healthCanceler() + } + m.healthCanceler = cancel +} +func (m *multisite) displayUI(show bool) { + if !show { + return + } + go func() { + // Clear screen before we start the table UI + clearScreen() + + ticker := time.NewTicker(500 * time.Millisecond) + for range ticker.C { + m.populate() + } + }() } -func (m *multisite) populate(cellText [][]string) { +func (m *multisite) populate() { + m.rwLocker.RLock() + defer m.rwLocker.RUnlock() + + dspOrder := []col{colGreen} // Header + for i := 0; i < len(m.sites); i++ { + for range m.sites[i].backends { + dspOrder = append(dspOrder, colGrey) + } + } + var printColors []*color.Color + for _, c := range dspOrder { + printColors = append(printColors, getPrintCol(c)) + } + + tbl := console.NewTable(printColors, []bool{ + false, false, false, false, false, false, + false, false, false, false, false, + }, 0) + + cellText := make([][]string, len(dspOrder)) + for i := range dspOrder { + cellText[i] = make([]string, len(headers)) + } + cellText[0] = headers for i, site := range m.sites { for j, b := range site.backends { + b.Stats.Lock() minLatency := "0s" maxLatency := "0s" if b.Stats.MaxLatency > 0 { minLatency = fmt.Sprintf("%2s", b.Stats.MinLatency.Round(time.Microsecond)) maxLatency = fmt.Sprintf("%2s", b.Stats.MaxLatency.Round(time.Microsecond)) } - cellText[i*len(site.backends)+j][0] = humanize.Ordinal(b.siteNumber) - cellText[i*len(site.backends)+j][1] = b.endpoint - cellText[i*len(site.backends)+j][2] = b.getServerStatus() - cellText[i*len(site.backends)+j][3] = strconv.FormatInt(b.Stats.TotCalls, 10) - cellText[i*len(site.backends)+j][4] = strconv.FormatInt(b.Stats.TotCallFailures, 10) - cellText[i*len(site.backends)+j][5] = humanize.IBytes(uint64(b.Stats.Rx)) - cellText[i*len(site.backends)+j][6] = humanize.IBytes(uint64(b.Stats.Tx)) - cellText[i*len(site.backends)+j][7] = b.Stats.CumDowntime.Round(time.Microsecond).String() - cellText[i*len(site.backends)+j][8] = b.Stats.LastDowntime.Round(time.Microsecond).String() - cellText[i*len(site.backends)+j][9] = minLatency - cellText[i*len(site.backends)+j][10] = maxLatency + cellText[i*len(site.backends)+j+1][0] = humanize.Ordinal(b.siteNumber) + cellText[i*len(site.backends)+j+1][1] = b.endpoint + cellText[i*len(site.backends)+j+1][2] = b.getServerStatus() + cellText[i*len(site.backends)+j+1][3] = strconv.FormatInt(b.Stats.TotCalls, 10) + cellText[i*len(site.backends)+j+1][4] = strconv.FormatInt(b.Stats.TotCallFailures, 10) + cellText[i*len(site.backends)+j+1][5] = humanize.IBytes(uint64(b.Stats.Rx)) + cellText[i*len(site.backends)+j+1][6] = humanize.IBytes(uint64(b.Stats.Tx)) + cellText[i*len(site.backends)+j+1][7] = b.Stats.CumDowntime.Round(time.Microsecond).String() + cellText[i*len(site.backends)+j+1][8] = b.Stats.LastDowntime.Round(time.Microsecond).String() + cellText[i*len(site.backends)+j+1][9] = minLatency + cellText[i*len(site.backends)+j+1][10] = maxLatency + b.Stats.Unlock() } } + console.RewindLines(len(cellText) + 2) + tbl.DisplayTable(cellText) } func (m *multisite) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.Header().Set("Server", "SideKick") // indicate sidekick is serving + m.rwLocker.RLock() + defer m.rwLocker.RUnlock() for _, s := range m.sites { if s.Online() { if r.URL.Path == healthPath { @@ -766,7 +845,7 @@ func IsLoopback(addr string) bool { return net.ParseIP(host).IsLoopback() } -func configureSite(ctx *cli.Context, siteNum int, siteStrs []string, healthCheckPath string, healthCheckPort int, healthCheckDuration, healthCheckTimeout time.Duration) *site { +func configureSite(ctxt context.Context, ctx *cli.Context, siteNum int, siteStrs []string, healthCheckPath string, healthCheckPort int, healthCheckDuration, healthCheckTimeout time.Duration) *site { var endpoints []string if ellipses.HasEllipses(siteStrs...) { @@ -790,6 +869,27 @@ func configureSite(ctx *cli.Context, siteNum int, siteStrs []string, healthCheck var backends []*Backend var prevScheme string var transport http.RoundTripper + globalConnStatsRWMutex.Lock() + defer globalConnStatsRWMutex.Unlock() + // reset connstats + globalConnStats = []*ConnStats{} + if len(endpoints) == 1 { + // guess it is LB config address + target, err := url.Parse(endpoints[0]) + if err != nil { + console.Fatalln(fmt.Errorf("Unable to parse input arg %s: %s", endpoints[0], err)) + } + hostName := target.Hostname() + ips, err := net.LookupHost(hostName) + if err != nil { + console.Fatalln(fmt.Errorf("Unable to lookup host %s", hostName)) + } + // set the new endpoints + endpoints = []string{} + for _, ip := range ips { + endpoints = append(endpoints, strings.Replace(target.String(), hostName, ip, 1)) + } + } for _, endpoint := range endpoints { endpoint = strings.TrimSuffix(endpoint, slashSeparator) target, err := url.Parse(endpoint) @@ -843,7 +943,7 @@ func configureSite(ctx *cli.Context, siteNum int, siteStrs []string, healthCheck backend := &Backend{siteNum, endpoint, proxy, &http.Client{ Transport: proxy.Transport, }, 0, healthCheckURL, healthCheckDuration, healthCheckTimeout, &stats} - go backend.healthCheck() + go backend.healthCheck(ctxt) proxy.ErrorHandler = backend.ErrorHandler backends = append(backends, backend) globalConnStats = append(globalConnStats, newConnStats(endpoint)) @@ -922,16 +1022,6 @@ func sidekickMain(ctx *cli.Context) { healthReadCheckPath = slashSeparator + healthReadCheckPath } - var sites []*site - for i, siteStrs := range ctx.Args() { - if i == len(ctx.Args())-1 { - healthCheckPath = healthReadCheckPath - } - - site := configureSite(ctx, i+1, strings.Split(siteStrs, ","), healthCheckPath, healthCheckPort, healthCheckDuration, healthCheckTimeout) - sites = append(sites, site) - } - if globalConsoleDisplay { console.SetColor("LogMsgType", color.New(color.FgHiMagenta)) console.SetColor("TraceMsgType", color.New(color.FgYellow)) @@ -960,42 +1050,9 @@ func sidekickMain(ctx *cli.Context) { console.Fatalln(err) } - m := &multisite{sites} - if !globalConsoleDisplay { - dspOrder := []col{colGreen} // Header - for i := 0; i < len(sites); i++ { - for range sites[i].backends { - dspOrder = append(dspOrder, colGrey) - } - } - var printColors []*color.Color - for _, c := range dspOrder { - printColors = append(printColors, getPrintCol(c)) - } - - tbl := console.NewTable(printColors, []bool{ - false, false, false, false, false, false, - false, false, false, false, false, - }, 0) - - cellText := make([][]string, len(dspOrder)) - for i := range dspOrder { - cellText[i] = make([]string, len(headers)) - } - cellText[0] = headers - - go func() { - // Clear screen before we start the table UI - clearScreen() - - ticker := time.NewTicker(500 * time.Millisecond) - for range ticker.C { - m.populate(cellText[1:]) - console.RewindLines(len(cellText) + 2) - tbl.DisplayTable(cellText) - } - }() - } + m := &multisite{} + m.renewSite(ctx, healthCheckPath, healthReadCheckPath, healthCheckPort, healthCheckDuration, healthCheckTimeout) + m.displayUI(!globalConsoleDisplay) router.PathPrefix(slashSeparator).Handler(m) server := &http.Server{ @@ -1017,8 +1074,26 @@ func sidekickMain(ctx *cli.Context) { } server.TLSConfig = tlsConfig } - if err := server.ListenAndServe(); err != nil { - console.Fatalln(err) + go func() { + if err := server.ListenAndServe(); err != nil { + console.Fatalln(err) + } + }() + osSignalChannel := make(chan os.Signal, 1) + signal.Notify( + osSignalChannel, + syscall.SIGTERM, + syscall.SIGINT, + syscall.SIGHUP, + ) + for signal := range osSignalChannel { + switch signal { + case syscall.SIGHUP: + m.renewSite(ctx, healthCheckPath, healthReadCheckPath, healthCheckPort, healthCheckDuration, healthCheckTimeout) + default: + console.Infof("caught signal '%s'\n", signal) + os.Exit(1) + } } } diff --git a/metrics.go b/metrics.go index 15a3baf..3c6a376 100644 --- a/metrics.go +++ b/metrics.go @@ -50,6 +50,10 @@ func (c *sidekickCollector) Describe(ch chan<- *prometheus.Desc) { // Collect is called by the Prometheus registry when collecting metrics. func (c *sidekickCollector) Collect(ch chan<- prometheus.Metric) { + // automatically read the global stats + // Read/Write Lock is not required here + globalConnStatsRWMutex.RLock() + defer globalConnStatsRWMutex.RUnlock() for _, c := range globalConnStats { if c == nil { continue