diff --git a/bootstrap.go b/bootstrap.go new file mode 100644 index 0000000..14bb75a --- /dev/null +++ b/bootstrap.go @@ -0,0 +1,105 @@ +package main + +import ( + "context" + "errors" + "sync" + "time" + + tz "github.com/ecadlabs/gotez/v2" + "github.com/ecadlabs/gotez/v2/client" + "github.com/prometheus/client_golang/prometheus" + log "github.com/sirupsen/logrus" +) + +type BootstrapPollerConfig struct { + Client Client + ChainID *tz.ChainID + Timeout time.Duration + Interval time.Duration + Reg prometheus.Registerer +} + +type BootstrapPoller struct { + cfg BootstrapPollerConfig + + mtx sync.RWMutex + status client.BootstrappedResponse + + cancel context.CancelFunc + done chan struct{} + metric prometheus.Gauge +} + +func (c *BootstrapPollerConfig) New() *BootstrapPoller { + b := &BootstrapPoller{ + cfg: *c, + metric: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "tezos", + Subsystem: "node", + Name: "bootstrapped", + Help: "Returns 1 if the node has synchronized its chain with a few peers.", + }), + } + if c.Reg != nil { + c.Reg.MustRegister(b.metric) + } + return b +} + +func (b *BootstrapPoller) Start() { + ctx, cancel := context.WithCancel(context.Background()) + b.cancel = cancel + b.done = make(chan struct{}) + go b.loop(ctx) +} + +func (b *BootstrapPoller) Stop(ctx context.Context) error { + b.cancel() + select { + case <-b.done: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +func (b *BootstrapPoller) Status() client.BootstrappedResponse { + b.mtx.RLock() + defer b.mtx.RUnlock() + return b.status +} + +func (b *BootstrapPoller) loop(ctx context.Context) { + t := time.NewTicker(b.cfg.Interval) + defer func() { + t.Stop() + close(b.done) + }() + for { + c, cancel := context.WithTimeout(ctx, b.cfg.Timeout) + resp, err := b.cfg.Client.IsBootstrapped(c, b.cfg.ChainID) + cancel() + if err != nil { + if errors.Is(err, context.Canceled) { + return + } + log.WithField("chain_id", b.cfg.ChainID).Warn(err) + } else { + b.mtx.Lock() + b.status = *resp + b.mtx.Unlock() + v := 0.0 + if resp.SyncState == client.SyncStateSynced && resp.Bootstrapped { + v = 1 + } + b.metric.Set(v) + } + + select { + case <-t.C: + case <-ctx.Done(): + return + } + } +} diff --git a/config.go b/config.go index f6edaf5..a18bc68 100644 --- a/config.go +++ b/config.go @@ -7,14 +7,14 @@ import ( ) type Config struct { - Listen string `yaml:"listen"` - URL string `yaml:"url"` - ChainID *tz.ChainID `yaml:"chain_id"` - Timeout time.Duration `yaml:"timeout"` - Tolerance time.Duration `yaml:"tolerance"` - ReconnectDelay time.Duration `yaml:"reconnect_delay"` - UseTimestamps bool `yaml:"use_timestamps"` - CheckBlockDelay bool `yaml:"check_block_delay"` - CheckBootstrapped bool `yaml:"check_bootstrapped"` - CheckSyncState bool `yaml:"check_sync_state"` + Listen string `yaml:"listen"` + URL string `yaml:"url"` + ChainID *tz.ChainID `yaml:"chain_id"` + Timeout time.Duration `yaml:"timeout"` + Tolerance time.Duration `yaml:"tolerance"` + ReconnectDelay time.Duration `yaml:"reconnect_delay"` + UseTimestamps bool `yaml:"use_timestamps"` + BootstrappedPollInterval time.Duration `yaml:"bootstrapped_poll_interval"` + HealthUseBootstrapped bool `yaml:"health_use_bootstrapped"` + HealthUseBlockDelay bool `yaml:"health_use_block_delay"` } diff --git a/go.mod b/go.mod index 0c8d7e1..a622642 100644 --- a/go.mod +++ b/go.mod @@ -12,10 +12,17 @@ require ( ) require ( + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/ecadlabs/pretty v0.0.0-20230412124801-f948fc689a04 // indirect github.com/kr/text v0.2.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/prometheus/client_golang v1.19.1 // indirect + github.com/prometheus/client_model v0.5.0 // indirect + github.com/prometheus/common v0.48.0 // indirect + github.com/prometheus/procfs v0.12.0 // indirect github.com/rogpeppe/go-internal v1.12.0 // indirect golang.org/x/crypto v0.24.0 // indirect + google.golang.org/protobuf v1.33.0 // indirect ) diff --git a/go.sum b/go.sum index 990fd1e..43f529e 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,7 @@ +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= @@ -13,6 +17,14 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE= +github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho= +github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw= +github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI= +github.com/prometheus/common v0.48.0 h1:QO8U2CdOzSn1BBsmXJXduaaW+dY/5QLjfB8svtSzKKE= +github.com/prometheus/common v0.48.0/go.mod h1:0/KsvlIEfPQCQ5I2iNSAWKPZziNCvRs5EC6ILDTlAPc= +github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= +github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= @@ -27,6 +39,8 @@ golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5D golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= +google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/health_checker.go b/health_checker.go deleted file mode 100644 index fcf4ccf..0000000 --- a/health_checker.go +++ /dev/null @@ -1,65 +0,0 @@ -package main - -import ( - "context" - "time" - - tz "github.com/ecadlabs/gotez/v2" - "github.com/ecadlabs/gotez/v2/client" - log "github.com/sirupsen/logrus" -) - -type HealthChecker struct { - Monitor *HeadMonitor - Client Client - ChainID *tz.ChainID - Timeout time.Duration - - CheckBlockDelay bool - CheckBootstrapped bool - CheckSyncState bool -} - -type HealthStatus struct { - IsBootstrapped bool `json:"bootstrapped"` - IsSynced bool `json:"synced"` - BlockDelayOk bool `json:"block_delay_ok"` -} - -func (s *HealthStatus) IsOk() bool { - return s.BlockDelayOk && s.IsBootstrapped && s.IsSynced -} - -func (h *HealthChecker) HealthStatus(ctx context.Context) (*HealthStatus, error) { - status := HealthStatus{ - IsBootstrapped: true, - IsSynced: true, - } - if h.CheckBootstrapped || h.CheckSyncState { - c, cancel := context.WithTimeout(ctx, h.Timeout) - defer cancel() - resp, err := h.Client.IsBootstrapped(c, h.ChainID) - if err != nil { - return nil, err - } - if h.CheckBootstrapped { - status.IsBootstrapped = resp.Bootstrapped - } - if h.CheckSyncState { - status.IsSynced = resp.SyncState == client.SyncStateSynced - } - } - if h.CheckBlockDelay { - status.BlockDelayOk = h.Monitor.Status() - } - - if !status.IsOk() { - log.WithFields(log.Fields{ - "chain_id": h.ChainID, - "bootstrapped": status.IsBootstrapped, - "synced": status.IsSynced, - "block_delay_ok": status.BlockDelayOk, - }).Warn("Chain health is not ok") - } - return &status, nil -} diff --git a/main.go b/main.go index fddf567..2d1721a 100644 --- a/main.go +++ b/main.go @@ -3,7 +3,6 @@ package main import ( "context" "encoding/json" - "fmt" "net/http" "os" "os/signal" @@ -13,16 +12,19 @@ import ( "github.com/ecadlabs/gotez/v2/client" "github.com/gorilla/mux" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" log "github.com/sirupsen/logrus" "golang.org/x/sys/unix" "gopkg.in/yaml.v3" ) const ( - defaultListen = ":8080" - defaultTimeout = 30 * time.Second - defaultTolerance = 1 * time.Second - defaultReconnectDelay = 10 * time.Second + defaultListen = ":8080" + defaultTimeout = 30 * time.Second + defaultTolerance = 1 * time.Second + defaultReconnectDelay = 10 * time.Second + defaultBootstrappedPollInterval = 10 * time.Second ) type debugLogger log.Logger @@ -43,13 +45,13 @@ func main() { log.SetLevel(ll) conf := Config{ - Listen: defaultListen, - Timeout: defaultTimeout, - Tolerance: defaultTolerance, - ReconnectDelay: defaultReconnectDelay, - CheckBlockDelay: true, - CheckBootstrapped: true, - CheckSyncState: true, + Listen: defaultListen, + Timeout: defaultTimeout, + Tolerance: defaultTolerance, + ReconnectDelay: defaultReconnectDelay, + HealthUseBlockDelay: true, + HealthUseBootstrapped: true, + BootstrappedPollInterval: defaultBootstrappedPollInterval, } buf, err := os.ReadFile(*confPath) @@ -67,38 +69,45 @@ func main() { DebugLogger: (*debugLogger)(log.StandardLogger()), } - mon := HeadMonitor{ + reg := prometheus.NewRegistry() + + mon := (&HeadMonitorConfig{ Client: &cl, ChainID: conf.ChainID, Timeout: conf.Timeout, Tolerance: conf.Tolerance, ReconnectDelay: conf.ReconnectDelay, UseTimestamps: conf.UseTimestamps, - } - checker := HealthChecker{ - Monitor: &mon, - Client: &cl, - ChainID: conf.ChainID, - Timeout: conf.Timeout, - CheckBlockDelay: conf.CheckBlockDelay, - CheckBootstrapped: conf.CheckBootstrapped, - CheckSyncState: conf.CheckSyncState, - } - if conf.CheckBlockDelay { - mon.Start() - defer mon.Stop(context.Background()) - } + Reg: reg, + }).New() + + bs := (&BootstrapPollerConfig{ + Client: &cl, + ChainID: conf.ChainID, + Timeout: conf.Timeout, + Interval: conf.BootstrappedPollInterval, + Reg: reg, + }).New() + + mon.Start() + defer mon.Stop(context.Background()) + + bs.Start() + defer bs.Stop(context.Background()) r := mux.NewRouter() r.Methods("GET").Path("/health").HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - status, err := checker.HealthStatus(r.Context()) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - fmt.Fprintf(w, "%v", err) - return + status := true + if conf.HealthUseBootstrapped { + s := bs.Status() + status = status && s.Bootstrapped && s.SyncState == client.SyncStateSynced + } + if conf.HealthUseBlockDelay { + status = status && mon.Status() } + var code int - if status.IsOk() { + if status { code = http.StatusOK } else { code = http.StatusInternalServerError @@ -108,14 +117,9 @@ func main() { json.NewEncoder(w).Encode(status) }) r.Methods("GET").Path("/sync_status").HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - status, err := checker.HealthStatus(r.Context()) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - fmt.Fprintf(w, "%v", err) - return - } + status := bs.Status() var code int - if status.IsBootstrapped && status.IsSynced { + if status.Bootstrapped && status.SyncState == client.SyncStateSynced { code = http.StatusOK } else { code = http.StatusInternalServerError @@ -125,14 +129,9 @@ func main() { json.NewEncoder(w).Encode(status) }) r.Methods("GET").Path("/block_delay").HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - status, err := checker.HealthStatus(r.Context()) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - fmt.Fprintf(w, "%v", err) - return - } + status := mon.Status() var code int - if status.BlockDelayOk { + if status { code = http.StatusOK } else { code = http.StatusInternalServerError @@ -141,6 +140,7 @@ func main() { w.WriteHeader(code) json.NewEncoder(w).Encode(status) }) + r.Methods("GET").Path("/metrics").Handler(promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg})) r.Use((&Logging{}).Handler) srv := &http.Server{ diff --git a/monitor.go b/monitor.go index eac5559..684b64d 100644 --- a/monitor.go +++ b/monitor.go @@ -9,20 +9,42 @@ import ( tz "github.com/ecadlabs/gotez/v2" "github.com/ecadlabs/gotez/v2/client" "github.com/ecadlabs/gotez/v2/protocol/core" + "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" ) -type HeadMonitor struct { +type HeadMonitorConfig struct { Client Client ChainID *tz.ChainID Timeout time.Duration Tolerance time.Duration ReconnectDelay time.Duration UseTimestamps bool + Reg prometheus.Registerer +} + +func (c *HeadMonitorConfig) New() *HeadMonitor { + m := &HeadMonitor{ + cfg: *c, + metric: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "tezos", + Subsystem: "node", + Name: "block_delay_ok", + Help: "Returns 1 if the last block arrived in time.", + }), + } + if c.Reg != nil { + c.Reg.MustRegister(m.metric) + } + return m +} +type HeadMonitor struct { + cfg HeadMonitorConfig status atomic.Bool cancel context.CancelFunc done chan struct{} + metric prometheus.Gauge } func (h *HeadMonitor) Status() bool { @@ -30,14 +52,14 @@ func (h *HeadMonitor) Status() bool { } func (h *HeadMonitor) context(ctx context.Context) (context.Context, context.CancelFunc) { - return context.WithTimeout(ctx, h.Timeout) + return context.WithTimeout(ctx, h.cfg.Timeout) } func (h *HeadMonitor) getMinBlockDelay(c context.Context, block string, protocol *tz.ProtocolHash) (time.Duration, error) { ctx, cancel := h.context(c) defer cancel() - consts, err := h.Client.Constants(ctx, &client.ContextRequest{ - Chain: h.ChainID.String(), + consts, err := h.cfg.Client.Constants(ctx, &client.ContextRequest{ + Chain: h.cfg.ChainID.String(), Block: block, Protocol: protocol, }) @@ -52,8 +74,8 @@ func (h *HeadMonitor) getMinBlockDelay(c context.Context, block string, protocol func (h *HeadMonitor) getShellHeader(c context.Context, block *tz.BlockHash) (*core.ShellHeader, error) { ctx, cancel := h.context(c) defer cancel() - return h.Client.BlockShellHeader(ctx, &client.SimpleRequest{ - Chain: h.ChainID.String(), + return h.cfg.Client.BlockShellHeader(ctx, &client.SimpleRequest{ + Chain: h.cfg.ChainID.String(), Block: block.String(), }) } @@ -61,7 +83,7 @@ func (h *HeadMonitor) getShellHeader(c context.Context, block *tz.BlockHash) (*c func (h *HeadMonitor) getBlockInfo(c context.Context, block string) (*client.BasicBlockInfo, error) { ctx, cancel := h.context(c) defer cancel() - return h.Client.BasicBlockInfo(ctx, h.ChainID.String(), block) + return h.cfg.Client.BasicBlockInfo(ctx, h.cfg.ChainID.String(), block) } func (h *HeadMonitor) Start() { @@ -86,9 +108,10 @@ func (h *HeadMonitor) serve(ctx context.Context) { var err error for { h.status.Store(false) + h.metric.Set(0) if err != nil { log.Error(err) - t := time.After(h.ReconnectDelay) + t := time.After(h.cfg.ReconnectDelay) select { case <-t: case <-ctx.Done(): @@ -113,7 +136,7 @@ func (h *HeadMonitor) serve(ctx context.Context) { continue } var timestamp time.Time - if h.UseTimestamps { + if h.cfg.UseTimestamps { timestamp = sh.Timestamp.Time() } else { timestamp = time.Now() @@ -132,7 +155,7 @@ func (h *HeadMonitor) serve(ctx context.Context) { stream <-chan *client.Head errCh <-chan error ) - stream, errCh, err = h.Client.Heads(ctx, &client.HeadsRequest{Chain: h.ChainID.String()}) + stream, errCh, err = h.cfg.Client.Heads(ctx, &client.HeadsRequest{Chain: h.cfg.ChainID.String()}) if err != nil { if errors.Is(err, context.Canceled) { return @@ -151,14 +174,19 @@ func (h *HeadMonitor) serve(ctx context.Context) { case head := <-stream: var t time.Time - if h.UseTimestamps { + if h.cfg.UseTimestamps { t = head.Timestamp.Time() } else { t = time.Now() } - status := t.Before(timestamp.Add(minBlockDelay + h.Tolerance)) + status := t.Before(timestamp.Add(minBlockDelay + h.cfg.Tolerance)) log.Debugf("%v: %t", t, status) h.status.Store(status) + v := 0.0 + if status { + v = 1 + } + h.metric.Set(v) timestamp = t if head.Proto == protoNum { break @@ -166,8 +194,8 @@ func (h *HeadMonitor) serve(ctx context.Context) { // update constant var proto *core.BlockProtocols - proto, err = h.Client.BlockProtocols(ctx, &client.SimpleRequest{ - Chain: h.ChainID.String(), + proto, err = h.cfg.Client.BlockProtocols(ctx, &client.SimpleRequest{ + Chain: h.cfg.ChainID.String(), Block: head.Hash.String(), }) if err != nil { diff --git a/monitor_test.go b/monitor_test.go index 486c916..5eeb8bb 100644 --- a/monitor_test.go +++ b/monitor_test.go @@ -228,11 +228,11 @@ func TestProtocolUpgrade1(t *testing.T) { }, } - mon := HeadMonitor{ + mon := (&HeadMonitorConfig{ Client: &cl, ChainID: &gotez.ChainID{0}, UseTimestamps: true, - } + }).New() mon.Start() <-cl.done