From 63b38e83ac0f5e774095cacb98ff8d02f08401ec Mon Sep 17 00:00:00 2001 From: Jonathan Chappelow Date: Mon, 23 Sep 2024 17:50:44 -0500 Subject: [PATCH] add prometheus metrics server and config --- cmd/common.go | 5 ++++ cmd/kwild/config/config.go | 21 +++++++++-------- cmd/kwild/config/config_test.go | 4 ++++ cmd/kwild/config/default_config.toml | 14 ++++++++++- cmd/kwild/config/flags.go | 5 ++++ cmd/kwild/config/test_data/config.toml | 12 ++++++++++ cmd/kwild/server/build.go | 7 ++++++ cmd/kwild/server/cometbft.go | 7 ++++++ common/config/config.go | 13 ++++++++--- internal/services/jsonrpc/server.go | 32 ++++++++++++++++++++++++++ 10 files changed, 106 insertions(+), 14 deletions(-) diff --git a/cmd/common.go b/cmd/common.go index 46877e14f5..b9a1de203e 100644 --- a/cmd/common.go +++ b/cmd/common.go @@ -92,6 +92,11 @@ var DefaultConfig = func() *commonConfig.KwildConfig { MaxLogSizeKB: 100_000, // 100 MB uncompressed threshold MaxLogRolls: 0, // the zero value means retain all (don't delete oldest archived logs) }, + Instrumentation: &commonConfig.InstrumentationConfig{ + Prometheus: false, + PromListenAddr: "0.0.0.0:26660", + MaxConnections: 1, + }, ChainConfig: &commonConfig.ChainConfig{ P2P: &commonConfig.P2PConfig{ diff --git a/cmd/kwild/config/config.go b/cmd/kwild/config/config.go index 26078ae991..e86bf3444c 100644 --- a/cmd/kwild/config/config.go +++ b/cmd/kwild/config/config.go @@ -365,27 +365,28 @@ func DefaultEmptyConfig() *config.KwildConfig { StateSync: &config.StateSyncConfig{}, Consensus: &config.ConsensusConfig{}, }, - Logging: &config.Logging{}, + Logging: &config.Logging{}, + Instrumentation: &config.InstrumentationConfig{}, } } -// EmptyConfig returns a config with all fields set to their zero values. -// This is useful for guaranteeing that all fields are set when merging +// EmptyConfig returns a config with all fields set to their zero values (except +// no nil pointers for the sub-sections structs). This is useful for +// guaranteeing that all fields are set when merging. func EmptyConfig() *config.KwildConfig { return &config.KwildConfig{ AppConfig: &config.AppConfig{ ExtensionEndpoints: []string{}, }, ChainConfig: &config.ChainConfig{ - P2P: &config.P2PConfig{}, - RPC: &config.ChainRPCConfig{}, - Mempool: &config.MempoolConfig{}, - StateSync: &config.StateSyncConfig{ - RPCServers: "", - }, + P2P: &config.P2PConfig{}, + RPC: &config.ChainRPCConfig{}, + Mempool: &config.MempoolConfig{}, + StateSync: &config.StateSyncConfig{}, Consensus: &config.ConsensusConfig{}, }, - Logging: &config.Logging{}, + Logging: &config.Logging{}, + Instrumentation: &config.InstrumentationConfig{}, } } diff --git a/cmd/kwild/config/config_test.go b/cmd/kwild/config/config_test.go index 405d1d3157..b2a46f59d7 100644 --- a/cmd/kwild/config/config_test.go +++ b/cmd/kwild/config/config_test.go @@ -35,6 +35,10 @@ func Test_Config_Toml(t *testing.T) { assert.Equal(t, "localhost:50052", cfg.AppConfig.ExtensionEndpoints[0]) assert.Equal(t, "localhost:50053", cfg.AppConfig.ExtensionEndpoints[1]) + assert.Equal(t, true, cfg.Instrumentation.Prometheus) + assert.Equal(t, 6, cfg.Instrumentation.MaxConnections) + assert.Equal(t, "tcp://9.8.7.6:20660", cfg.Instrumentation.PromListenAddr) + // TODO: Add bunch of other validations for different types } diff --git a/cmd/kwild/config/default_config.toml b/cmd/kwild/config/default_config.toml index 0381614cd3..dfc5b7c032 100644 --- a/cmd/kwild/config/default_config.toml +++ b/cmd/kwild/config/default_config.toml @@ -307,4 +307,16 @@ discovery_time = "15s" chunk_request_timeout = "10s" # Note: If the requested chunk is not received for a duration of 2 minutes (hard-coded default), -# the state sync process is aborted and the node will fall back to the regular block sync process. \ No newline at end of file +# the state sync process is aborted and the node will fall back to the regular block sync process. + +[instrumentation] + +# collect and serve are served under /metrics +prometheus = false + +# listen address for prometheus metrics +prometheus_listen_addr = "tcp://0.0.0.0:26660" + +# Maximum number of simultaneous connections. +# 0 - unlimited. +max_open_connections = 1 diff --git a/cmd/kwild/config/flags.go b/cmd/kwild/config/flags.go index a674daa745..ac7d82e05a 100644 --- a/cmd/kwild/config/flags.go +++ b/cmd/kwild/config/flags.go @@ -114,6 +114,11 @@ to instead run a dedicated seeder like https://github.com/kwilteam/cometseed.`)) flagSet.Var(&cfg.ChainConfig.StateSync.DiscoveryTime, "chain.statesync.discovery-time", "Chain state sync discovery time") flagSet.Var(&cfg.ChainConfig.StateSync.ChunkRequestTimeout, "chain.statesync.chunk-request-timeout", "Chain state sync chunk request timeout") + // Instrumentation flags + flagSet.BoolVar(&cfg.Instrumentation.Prometheus, "instrumentation.prometheus", cfg.Instrumentation.Prometheus, "collect and serve prometheus metrics") + flagSet.StringVar(&cfg.Instrumentation.PromListenAddr, "instrumentation.prometheus-listen-addr", cfg.Instrumentation.PromListenAddr, "listen address for prometheus metrics") + flagSet.IntVar(&cfg.Instrumentation.MaxConnections, "instrumentation.max-open-connections", cfg.Instrumentation.MaxConnections, "maximum number of simultaneous connections") + // TODO: delete in v0.10.0 flagSet.String("app.snapshots.snapshot-dir", "", "Snapshot directory path") flagSet.MarkDeprecated("app.snapshots.snapshot-dir", "this value is no longer configurable") diff --git a/cmd/kwild/config/test_data/config.toml b/cmd/kwild/config/test_data/config.toml index fa132df701..ce73dd0411 100644 --- a/cmd/kwild/config/test_data/config.toml +++ b/cmd/kwild/config/test_data/config.toml @@ -301,3 +301,15 @@ chunk_request_timeout = "10s" # Note: If the requested chunk is not received for a duration of 2 minutes (hard-coded default), # the state sync process is aborted and the node will fall back to the regular block sync process. + +[instrumentation] + +# collect and serve are served under /metrics +prometheus = true + +# listen address for prometheus metrics +prometheus_listen_addr = "tcp://9.8.7.6:20660" + +# Maximum number of simultaneous connections. +# 0 - unlimited. +max_open_connections = 6 diff --git a/cmd/kwild/server/build.go b/cmd/kwild/server/build.go index 08410cf384..6e5ac5ffdf 100644 --- a/cmd/kwild/server/build.go +++ b/cmd/kwild/server/build.go @@ -20,6 +20,8 @@ import ( abciTypes "github.com/cometbft/cometbft/abci/types" cmtEd "github.com/cometbft/cometbft/crypto/ed25519" cmtlocal "github.com/cometbft/cometbft/rpc/client/local" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/collectors" kwildcfg "github.com/kwilteam/kwil-db/cmd/kwild/config" "github.com/kwilteam/kwil-db/common" @@ -112,6 +114,11 @@ func buildServer(d *coreDependencies, closers *closeFuncs) *Server { // FinalizeBlock+Commit. This is not just a constructor, sadly. cometBftNode := buildCometNode(d, closers, abciApp) + prometheus.MustRegister( + collectors.NewBuildInfoCollector(), + // collectors.NewDBStatsCollector() // TODO: do something like this for pg.DB + ) + // Give abci p2p module access to removing peers p2p.SetRemovePeerFn(cometBftNode.RemovePeer) diff --git a/cmd/kwild/server/cometbft.go b/cmd/kwild/server/cometbft.go index b97b7cb29c..ad6de5b69e 100644 --- a/cmd/kwild/server/cometbft.go +++ b/cmd/kwild/server/cometbft.go @@ -105,6 +105,13 @@ func newCometConfig(cfg *config.KwildConfig) *cmtCfg.Config { nodeCfg.StateSync.DiscoveryTime = time.Duration(userChainCfg.StateSync.DiscoveryTime) nodeCfg.StateSync.ChunkRequestTimeout = time.Duration(userChainCfg.StateSync.ChunkRequestTimeout) + nodeCfg.Instrumentation = &cmtCfg.InstrumentationConfig{ + Prometheus: cfg.Instrumentation.Prometheus, + PrometheusListenAddr: cfg.Instrumentation.PromListenAddr, + MaxOpenConnections: cfg.Instrumentation.MaxConnections, + Namespace: "cometbft", + } + // Light client verification nodeCfg.StateSync.TrustPeriod = 36000 * time.Second // 10 hours (6s block time) diff --git a/common/config/config.go b/common/config/config.go index bfc9df172c..9a1c534a42 100644 --- a/common/config/config.go +++ b/common/config/config.go @@ -19,9 +19,10 @@ import ( type KwildConfig struct { RootDir string - AppConfig *AppConfig `mapstructure:"app"` - ChainConfig *ChainConfig `mapstructure:"chain"` - Logging *Logging `mapstructure:"log"` + AppConfig *AppConfig `mapstructure:"app"` + ChainConfig *ChainConfig `mapstructure:"chain"` + Logging *Logging `mapstructure:"log"` + Instrumentation *InstrumentationConfig `mapstructure:"instrumentation"` } type Logging struct { @@ -36,6 +37,12 @@ type Logging struct { MaxLogRolls int `mapstructure:"retain_max_rolls"` } +type InstrumentationConfig struct { + Prometheus bool `mapstructure:"prometheus"` + PromListenAddr string `mapstructure:"prometheus_listen_addr"` + MaxConnections int `mapstructure:"max_open_connections"` +} + type AppConfig struct { JSONRPCListenAddress string `mapstructure:"jsonrpc_listen_addr"` AdminListenAddress string `mapstructure:"admin_listen_addr"` diff --git a/internal/services/jsonrpc/server.go b/internal/services/jsonrpc/server.go index a97d440cb1..d23d171554 100644 --- a/internal/services/jsonrpc/server.go +++ b/internal/services/jsonrpc/server.go @@ -23,6 +23,8 @@ import ( "syscall" "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/kwilteam/kwil-db/core/log" jsonrpc "github.com/kwilteam/kwil-db/core/rpc/json" "github.com/kwilteam/kwil-db/internal/services/jsonrpc/openrpc" @@ -56,6 +58,14 @@ type Server struct { spec json.RawMessage authSHA []byte tlsCfg *tls.Config + + // UNSTABLE: this is not much more than a placeholder to ensure we can add + // our own metrics to the global prometheus metrics registry. + metrics map[string]Metrics +} + +type Metrics interface { + Inc() } type serverConfig struct { @@ -171,9 +181,21 @@ var ( } ) +type Mux interface { + Handle() +} + // NewServer creates a new JSON-RPC server. Use RegisterMethodHandler or // RegisterSvc to add method handlers. func NewServer(addr string, log log.Logger, opts ...Opt) (*Server, error) { + counter := prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "json-rpc-server", + Name: "JSON RPC Server counter (UNSTABLE)", + }) + metrics := map[string]Metrics{ + "req_counter": counter, + } + addr, isUNIX, err := checkAddr(addr) if err != nil { return nil, err @@ -228,6 +250,7 @@ func NewServer(addr string, log log.Logger, opts ...Opt) (*Server, error) { services: make(map[string]Svc), specInfo: cfg.specInfo, tlsCfg: cfg.tlsConfig, + metrics: metrics, } if cfg.pass != "" { @@ -246,6 +269,7 @@ func NewServer(addr string, log log.Logger, opts ...Opt) (*Server, error) { if cfg.enableCORS { h = corsHandler(h) } + h = reqCounter(h, metrics) h = realIPHandler(h, cfg.proxyCount) // for effective rate limiting h = recoverer(h, log) // first, wrap with defer and call next ^ @@ -356,6 +380,14 @@ func corsHandler(h http.Handler) http.Handler { }) } +func reqCounter(h http.Handler, metrics map[string]Metrics) http.Handler { + reqCounter := metrics["req_count"] + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + reqCounter.Inc() + h.ServeHTTP(w, r) + }) +} + func recoverer(h http.Handler, log log.Logger) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { defer func() {