diff --git a/cmd/common.go b/cmd/common.go index 46877e14f..b9a1de203 100644 --- a/cmd/common.go +++ b/cmd/common.go @@ -92,6 +92,11 @@ var DefaultConfig = func() *commonConfig.KwildConfig { MaxLogSizeKB: 100_000, // 100 MB uncompressed threshold MaxLogRolls: 0, // the zero value means retain all (don't delete oldest archived logs) }, + Instrumentation: &commonConfig.InstrumentationConfig{ + Prometheus: false, + PromListenAddr: "0.0.0.0:26660", + MaxConnections: 1, + }, ChainConfig: &commonConfig.ChainConfig{ P2P: &commonConfig.P2PConfig{ diff --git a/cmd/kwild/config/config.go b/cmd/kwild/config/config.go index 26078ae99..e86bf3444 100644 --- a/cmd/kwild/config/config.go +++ b/cmd/kwild/config/config.go @@ -365,27 +365,28 @@ func DefaultEmptyConfig() *config.KwildConfig { StateSync: &config.StateSyncConfig{}, Consensus: &config.ConsensusConfig{}, }, - Logging: &config.Logging{}, + Logging: &config.Logging{}, + Instrumentation: &config.InstrumentationConfig{}, } } -// EmptyConfig returns a config with all fields set to their zero values. -// This is useful for guaranteeing that all fields are set when merging +// EmptyConfig returns a config with all fields set to their zero values (except +// no nil pointers for the sub-sections structs). This is useful for +// guaranteeing that all fields are set when merging. func EmptyConfig() *config.KwildConfig { return &config.KwildConfig{ AppConfig: &config.AppConfig{ ExtensionEndpoints: []string{}, }, ChainConfig: &config.ChainConfig{ - P2P: &config.P2PConfig{}, - RPC: &config.ChainRPCConfig{}, - Mempool: &config.MempoolConfig{}, - StateSync: &config.StateSyncConfig{ - RPCServers: "", - }, + P2P: &config.P2PConfig{}, + RPC: &config.ChainRPCConfig{}, + Mempool: &config.MempoolConfig{}, + StateSync: &config.StateSyncConfig{}, Consensus: &config.ConsensusConfig{}, }, - Logging: &config.Logging{}, + Logging: &config.Logging{}, + Instrumentation: &config.InstrumentationConfig{}, } } diff --git a/cmd/kwild/config/config_test.go b/cmd/kwild/config/config_test.go index 405d1d315..2241de360 100644 --- a/cmd/kwild/config/config_test.go +++ b/cmd/kwild/config/config_test.go @@ -35,6 +35,10 @@ func Test_Config_Toml(t *testing.T) { assert.Equal(t, "localhost:50052", cfg.AppConfig.ExtensionEndpoints[0]) assert.Equal(t, "localhost:50053", cfg.AppConfig.ExtensionEndpoints[1]) + assert.Equal(t, true, cfg.Instrumentation.Prometheus) + assert.Equal(t, 6, cfg.Instrumentation.MaxConnections) + assert.Equal(t, "9.8.7.6:20660", cfg.Instrumentation.PromListenAddr) + // TODO: Add bunch of other validations for different types } diff --git a/cmd/kwild/config/default_config.toml b/cmd/kwild/config/default_config.toml index 0381614cd..828caf536 100644 --- a/cmd/kwild/config/default_config.toml +++ b/cmd/kwild/config/default_config.toml @@ -307,4 +307,16 @@ discovery_time = "15s" chunk_request_timeout = "10s" # Note: If the requested chunk is not received for a duration of 2 minutes (hard-coded default), -# the state sync process is aborted and the node will fall back to the regular block sync process. \ No newline at end of file +# the state sync process is aborted and the node will fall back to the regular block sync process. + +[instrumentation] + +# collect and serve are served under /metrics +prometheus = false + +# listen address for prometheus metrics +prometheus_listen_addr = "0.0.0.0:26660" + +# Maximum number of simultaneous connections. +# 0 - unlimited. +max_open_connections = 1 diff --git a/cmd/kwild/config/flags.go b/cmd/kwild/config/flags.go index a674daa74..ac7d82e05 100644 --- a/cmd/kwild/config/flags.go +++ b/cmd/kwild/config/flags.go @@ -114,6 +114,11 @@ to instead run a dedicated seeder like https://github.com/kwilteam/cometseed.`)) flagSet.Var(&cfg.ChainConfig.StateSync.DiscoveryTime, "chain.statesync.discovery-time", "Chain state sync discovery time") flagSet.Var(&cfg.ChainConfig.StateSync.ChunkRequestTimeout, "chain.statesync.chunk-request-timeout", "Chain state sync chunk request timeout") + // Instrumentation flags + flagSet.BoolVar(&cfg.Instrumentation.Prometheus, "instrumentation.prometheus", cfg.Instrumentation.Prometheus, "collect and serve prometheus metrics") + flagSet.StringVar(&cfg.Instrumentation.PromListenAddr, "instrumentation.prometheus-listen-addr", cfg.Instrumentation.PromListenAddr, "listen address for prometheus metrics") + flagSet.IntVar(&cfg.Instrumentation.MaxConnections, "instrumentation.max-open-connections", cfg.Instrumentation.MaxConnections, "maximum number of simultaneous connections") + // TODO: delete in v0.10.0 flagSet.String("app.snapshots.snapshot-dir", "", "Snapshot directory path") flagSet.MarkDeprecated("app.snapshots.snapshot-dir", "this value is no longer configurable") diff --git a/cmd/kwild/config/test_data/config.toml b/cmd/kwild/config/test_data/config.toml index fa132df70..c0063adc8 100644 --- a/cmd/kwild/config/test_data/config.toml +++ b/cmd/kwild/config/test_data/config.toml @@ -301,3 +301,15 @@ chunk_request_timeout = "10s" # Note: If the requested chunk is not received for a duration of 2 minutes (hard-coded default), # the state sync process is aborted and the node will fall back to the regular block sync process. + +[instrumentation] + +# collect and serve are served under /metrics +prometheus = true + +# listen address for prometheus metrics +prometheus_listen_addr = "9.8.7.6:20660" + +# Maximum number of simultaneous connections. +# 0 - unlimited. +max_open_connections = 6 diff --git a/cmd/kwild/server/build.go b/cmd/kwild/server/build.go index 08410cf38..3657b404d 100644 --- a/cmd/kwild/server/build.go +++ b/cmd/kwild/server/build.go @@ -20,6 +20,8 @@ import ( abciTypes "github.com/cometbft/cometbft/abci/types" cmtEd "github.com/cometbft/cometbft/crypto/ed25519" cmtlocal "github.com/cometbft/cometbft/rpc/client/local" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/collectors" kwildcfg "github.com/kwilteam/kwil-db/cmd/kwild/config" "github.com/kwilteam/kwil-db/common" @@ -112,6 +114,11 @@ func buildServer(d *coreDependencies, closers *closeFuncs) *Server { // FinalizeBlock+Commit. This is not just a constructor, sadly. cometBftNode := buildCometNode(d, closers, abciApp) + prometheus.MustRegister( + collectors.NewBuildInfoCollector(), + // collectors.NewDBStatsCollector() // TODO: do something like this for pg.DB + ) + // Give abci p2p module access to removing peers p2p.SetRemovePeerFn(cometBftNode.RemovePeer) @@ -156,7 +163,8 @@ func buildServer(d *coreDependencies, closers *closeFuncs) *Server { jsonRPCServer, err := rpcserver.NewServer(d.cfg.AppConfig.JSONRPCListenAddress, *rpcServerLogger, rpcserver.WithTimeout(time.Duration(d.cfg.AppConfig.RPCTimeout)), rpcserver.WithReqSizeLimit(d.cfg.AppConfig.RPCMaxReqSize), - rpcserver.WithCORS(), rpcserver.WithServerInfo(&usersvc.SpecInfo)) + rpcserver.WithCORS(), rpcserver.WithServerInfo(&usersvc.SpecInfo), + rpcserver.WithMetricsNamespace("kwil_json_rpc_user_server")) if err != nil { failBuild(err, "unable to create json-rpc server") } diff --git a/cmd/kwild/server/cometbft.go b/cmd/kwild/server/cometbft.go index b97b7cb29..ad6de5b69 100644 --- a/cmd/kwild/server/cometbft.go +++ b/cmd/kwild/server/cometbft.go @@ -105,6 +105,13 @@ func newCometConfig(cfg *config.KwildConfig) *cmtCfg.Config { nodeCfg.StateSync.DiscoveryTime = time.Duration(userChainCfg.StateSync.DiscoveryTime) nodeCfg.StateSync.ChunkRequestTimeout = time.Duration(userChainCfg.StateSync.ChunkRequestTimeout) + nodeCfg.Instrumentation = &cmtCfg.InstrumentationConfig{ + Prometheus: cfg.Instrumentation.Prometheus, + PrometheusListenAddr: cfg.Instrumentation.PromListenAddr, + MaxOpenConnections: cfg.Instrumentation.MaxConnections, + Namespace: "cometbft", + } + // Light client verification nodeCfg.StateSync.TrustPeriod = 36000 * time.Second // 10 hours (6s block time) diff --git a/common/config/config.go b/common/config/config.go index bfc9df172..9a1c534a4 100644 --- a/common/config/config.go +++ b/common/config/config.go @@ -19,9 +19,10 @@ import ( type KwildConfig struct { RootDir string - AppConfig *AppConfig `mapstructure:"app"` - ChainConfig *ChainConfig `mapstructure:"chain"` - Logging *Logging `mapstructure:"log"` + AppConfig *AppConfig `mapstructure:"app"` + ChainConfig *ChainConfig `mapstructure:"chain"` + Logging *Logging `mapstructure:"log"` + Instrumentation *InstrumentationConfig `mapstructure:"instrumentation"` } type Logging struct { @@ -36,6 +37,12 @@ type Logging struct { MaxLogRolls int `mapstructure:"retain_max_rolls"` } +type InstrumentationConfig struct { + Prometheus bool `mapstructure:"prometheus"` + PromListenAddr string `mapstructure:"prometheus_listen_addr"` + MaxConnections int `mapstructure:"max_open_connections"` +} + type AppConfig struct { JSONRPCListenAddress string `mapstructure:"jsonrpc_listen_addr"` AdminListenAddress string `mapstructure:"admin_listen_addr"` diff --git a/go.mod b/go.mod index e7e3fd3b9..7150c5dda 100644 --- a/go.mod +++ b/go.mod @@ -23,6 +23,7 @@ require ( github.com/near/borsh-go v0.3.1 github.com/olekukonko/tablewriter v0.0.6-0.20230925090304-df64c4bbad77 github.com/pelletier/go-toml/v2 v2.2.2 + github.com/prometheus/client_golang v1.20.1 github.com/spf13/cobra v1.8.1 github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.19.0 @@ -116,7 +117,6 @@ require ( github.com/pganalyze/pg_query_go/v5 v5.1.0 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect - github.com/prometheus/client_golang v1.20.1 // indirect github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.55.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect diff --git a/internal/services/jsonrpc/server.go b/internal/services/jsonrpc/server.go index a97d440cb..a14720ac2 100644 --- a/internal/services/jsonrpc/server.go +++ b/internal/services/jsonrpc/server.go @@ -23,6 +23,8 @@ import ( "syscall" "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/kwilteam/kwil-db/core/log" jsonrpc "github.com/kwilteam/kwil-db/core/rpc/json" "github.com/kwilteam/kwil-db/internal/services/jsonrpc/openrpc" @@ -56,6 +58,14 @@ type Server struct { spec json.RawMessage authSHA []byte tlsCfg *tls.Config + + // UNSTABLE: this is not much more than a placeholder to ensure we can add + // our own metrics to the global prometheus metrics registry. + metrics map[string]Metrics +} + +type Metrics interface { + Inc() } type serverConfig struct { @@ -66,6 +76,7 @@ type serverConfig struct { specInfo *openrpc.Info reqSzLimit int proxyCount int + namespace string } type Opt func(*serverConfig) @@ -93,6 +104,13 @@ func WithTrustedProxyCount(trustedProxyCount int) Opt { } } +// WithMetricsNamespace enables metrics with the provided namespace. +func WithMetricsNamespace(namespace string) Opt { + return func(c *serverConfig) { + c.namespace = namespace + } +} + // WithServerInfo sets the OpenRPC "info" section to use when serving the // OpenRPC JSON specification either via a spec REST endpoint or the // rpc.discover JSON-RPC method. @@ -171,6 +189,11 @@ var ( } ) +const ( + // This is name of the counter for all JSON-RPC requests (on /rpc/v1). + reqCounterName = "jsonrpc_request_counter_UNSTABLE" +) + // NewServer creates a new JSON-RPC server. Use RegisterMethodHandler or // RegisterSvc to add method handlers. func NewServer(addr string, log log.Logger, opts ...Opt) (*Server, error) { @@ -201,6 +224,22 @@ func NewServer(addr string, log log.Logger, opts ...Opt) (*Server, error) { opt(cfg) } + // A more complete and structured metrics system should to be created, but + // this is a start to ensure we are accessing the global metrics system used + // by cometbft. In Grafana or another prom dash, + // 'kwil_json_rpc_user_server_request_counter_UNSTABLE' will be graphable. + var metrics map[string]Metrics + if cfg.namespace != "" { + counter := prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: cfg.namespace, + Name: reqCounterName, + }) + prometheus.MustRegister(counter) + metrics = map[string]Metrics{ + reqCounterName: counter, + } + } + mux := http.NewServeMux() // http.DefaultServeMux has the pprof endpoints mounted disconnectTimeout := cfg.timeout + 5*time.Second // for jsonRPCTimeoutHandler to respond, don't disconnect immediately @@ -228,6 +267,7 @@ func NewServer(addr string, log log.Logger, opts ...Opt) (*Server, error) { services: make(map[string]Svc), specInfo: cfg.specInfo, tlsCfg: cfg.tlsConfig, + metrics: metrics, } if cfg.pass != "" { @@ -246,6 +286,7 @@ func NewServer(addr string, log log.Logger, opts ...Opt) (*Server, error) { if cfg.enableCORS { h = corsHandler(h) } + h = reqCounter(h, metrics[reqCounterName]) h = realIPHandler(h, cfg.proxyCount) // for effective rate limiting h = recoverer(h, log) // first, wrap with defer and call next ^ @@ -356,6 +397,16 @@ func corsHandler(h http.Handler) http.Handler { }) } +func reqCounter(h http.Handler, counter Metrics) http.Handler { + if counter == nil { + return h + } + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + counter.Inc() + h.ServeHTTP(w, r) + }) +} + func recoverer(h http.Handler, log log.Logger) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { defer func() {