Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add prometheus metrics server and config #1015

Merged
merged 2 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cmd/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ var DefaultConfig = func() *commonConfig.KwildConfig {
MaxLogSizeKB: 100_000, // 100 MB uncompressed threshold
MaxLogRolls: 0, // the zero value means retain all (don't delete oldest archived logs)
},
Instrumentation: &commonConfig.InstrumentationConfig{
Prometheus: false,
PromListenAddr: "0.0.0.0:26660",
MaxConnections: 1,
},

ChainConfig: &commonConfig.ChainConfig{
P2P: &commonConfig.P2PConfig{
Expand Down
21 changes: 11 additions & 10 deletions cmd/kwild/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,27 +365,28 @@ func DefaultEmptyConfig() *config.KwildConfig {
StateSync: &config.StateSyncConfig{},
Consensus: &config.ConsensusConfig{},
},
Logging: &config.Logging{},
Logging: &config.Logging{},
Instrumentation: &config.InstrumentationConfig{},
}
}

// EmptyConfig returns a config with all fields set to their zero values.
// This is useful for guaranteeing that all fields are set when merging
// EmptyConfig returns a config with all fields set to their zero values (except
// no nil pointers for the sub-sections structs). This is useful for
// guaranteeing that all fields are set when merging.
func EmptyConfig() *config.KwildConfig {
return &config.KwildConfig{
AppConfig: &config.AppConfig{
ExtensionEndpoints: []string{},
},
ChainConfig: &config.ChainConfig{
P2P: &config.P2PConfig{},
RPC: &config.ChainRPCConfig{},
Mempool: &config.MempoolConfig{},
StateSync: &config.StateSyncConfig{
RPCServers: "",
},
P2P: &config.P2PConfig{},
RPC: &config.ChainRPCConfig{},
Mempool: &config.MempoolConfig{},
StateSync: &config.StateSyncConfig{},
Consensus: &config.ConsensusConfig{},
},
Logging: &config.Logging{},
Logging: &config.Logging{},
Instrumentation: &config.InstrumentationConfig{},
}
}

Expand Down
4 changes: 4 additions & 0 deletions cmd/kwild/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ func Test_Config_Toml(t *testing.T) {
assert.Equal(t, "localhost:50052", cfg.AppConfig.ExtensionEndpoints[0])
assert.Equal(t, "localhost:50053", cfg.AppConfig.ExtensionEndpoints[1])

assert.Equal(t, true, cfg.Instrumentation.Prometheus)
assert.Equal(t, 6, cfg.Instrumentation.MaxConnections)
assert.Equal(t, "9.8.7.6:20660", cfg.Instrumentation.PromListenAddr)

// TODO: Add bunch of other validations for different types
}

Expand Down
14 changes: 13 additions & 1 deletion cmd/kwild/config/default_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -307,4 +307,16 @@ discovery_time = "15s"
chunk_request_timeout = "10s"

# Note: If the requested chunk is not received for a duration of 2 minutes (hard-coded default),
# the state sync process is aborted and the node will fall back to the regular block sync process.
# the state sync process is aborted and the node will fall back to the regular block sync process.

[instrumentation]

# collect and serve are served under /metrics
prometheus = false

# listen address for prometheus metrics
prometheus_listen_addr = "0.0.0.0:26660"

# Maximum number of simultaneous connections.
# 0 - unlimited.
max_open_connections = 1
5 changes: 5 additions & 0 deletions cmd/kwild/config/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ to instead run a dedicated seeder like https://github.com/kwilteam/cometseed.`))
flagSet.Var(&cfg.ChainConfig.StateSync.DiscoveryTime, "chain.statesync.discovery-time", "Chain state sync discovery time")
flagSet.Var(&cfg.ChainConfig.StateSync.ChunkRequestTimeout, "chain.statesync.chunk-request-timeout", "Chain state sync chunk request timeout")

// Instrumentation flags
flagSet.BoolVar(&cfg.Instrumentation.Prometheus, "instrumentation.prometheus", cfg.Instrumentation.Prometheus, "collect and serve prometheus metrics")
flagSet.StringVar(&cfg.Instrumentation.PromListenAddr, "instrumentation.prometheus-listen-addr", cfg.Instrumentation.PromListenAddr, "listen address for prometheus metrics")
flagSet.IntVar(&cfg.Instrumentation.MaxConnections, "instrumentation.max-open-connections", cfg.Instrumentation.MaxConnections, "maximum number of simultaneous connections")

// TODO: delete in v0.10.0
flagSet.String("app.snapshots.snapshot-dir", "", "Snapshot directory path")
flagSet.MarkDeprecated("app.snapshots.snapshot-dir", "this value is no longer configurable")
Expand Down
12 changes: 12 additions & 0 deletions cmd/kwild/config/test_data/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -301,3 +301,15 @@ chunk_request_timeout = "10s"

# Note: If the requested chunk is not received for a duration of 2 minutes (hard-coded default),
# the state sync process is aborted and the node will fall back to the regular block sync process.

[instrumentation]

# collect and serve are served under /metrics
prometheus = true

# listen address for prometheus metrics
prometheus_listen_addr = "9.8.7.6:20660"

# Maximum number of simultaneous connections.
# 0 - unlimited.
max_open_connections = 6
10 changes: 9 additions & 1 deletion cmd/kwild/server/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
abciTypes "github.com/cometbft/cometbft/abci/types"
cmtEd "github.com/cometbft/cometbft/crypto/ed25519"
cmtlocal "github.com/cometbft/cometbft/rpc/client/local"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"

kwildcfg "github.com/kwilteam/kwil-db/cmd/kwild/config"
"github.com/kwilteam/kwil-db/common"
Expand Down Expand Up @@ -112,6 +114,11 @@ func buildServer(d *coreDependencies, closers *closeFuncs) *Server {
// FinalizeBlock+Commit. This is not just a constructor, sadly.
cometBftNode := buildCometNode(d, closers, abciApp)

prometheus.MustRegister(
collectors.NewBuildInfoCollector(),
// collectors.NewDBStatsCollector() // TODO: do something like this for pg.DB
)

// Give abci p2p module access to removing peers
p2p.SetRemovePeerFn(cometBftNode.RemovePeer)

Expand Down Expand Up @@ -156,7 +163,8 @@ func buildServer(d *coreDependencies, closers *closeFuncs) *Server {
jsonRPCServer, err := rpcserver.NewServer(d.cfg.AppConfig.JSONRPCListenAddress,
*rpcServerLogger, rpcserver.WithTimeout(time.Duration(d.cfg.AppConfig.RPCTimeout)),
rpcserver.WithReqSizeLimit(d.cfg.AppConfig.RPCMaxReqSize),
rpcserver.WithCORS(), rpcserver.WithServerInfo(&usersvc.SpecInfo))
rpcserver.WithCORS(), rpcserver.WithServerInfo(&usersvc.SpecInfo),
rpcserver.WithMetricsNamespace("kwil_json_rpc_user_server"))
if err != nil {
failBuild(err, "unable to create json-rpc server")
}
Expand Down
7 changes: 7 additions & 0 deletions cmd/kwild/server/cometbft.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@ func newCometConfig(cfg *config.KwildConfig) *cmtCfg.Config {
nodeCfg.StateSync.DiscoveryTime = time.Duration(userChainCfg.StateSync.DiscoveryTime)
nodeCfg.StateSync.ChunkRequestTimeout = time.Duration(userChainCfg.StateSync.ChunkRequestTimeout)

nodeCfg.Instrumentation = &cmtCfg.InstrumentationConfig{
Prometheus: cfg.Instrumentation.Prometheus,
PrometheusListenAddr: cfg.Instrumentation.PromListenAddr,
MaxOpenConnections: cfg.Instrumentation.MaxConnections,
Namespace: "cometbft",
}

// Light client verification
nodeCfg.StateSync.TrustPeriod = 36000 * time.Second // 10 hours (6s block time)

Expand Down
13 changes: 10 additions & 3 deletions common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ import (
type KwildConfig struct {
RootDir string

AppConfig *AppConfig `mapstructure:"app"`
ChainConfig *ChainConfig `mapstructure:"chain"`
Logging *Logging `mapstructure:"log"`
AppConfig *AppConfig `mapstructure:"app"`
ChainConfig *ChainConfig `mapstructure:"chain"`
Logging *Logging `mapstructure:"log"`
Instrumentation *InstrumentationConfig `mapstructure:"instrumentation"`
}

type Logging struct {
Expand All @@ -36,6 +37,12 @@ type Logging struct {
MaxLogRolls int `mapstructure:"retain_max_rolls"`
}

type InstrumentationConfig struct {
Prometheus bool `mapstructure:"prometheus"`
PromListenAddr string `mapstructure:"prometheus_listen_addr"`
MaxConnections int `mapstructure:"max_open_connections"`
}

type AppConfig struct {
JSONRPCListenAddress string `mapstructure:"jsonrpc_listen_addr"`
AdminListenAddress string `mapstructure:"admin_listen_addr"`
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
github.com/near/borsh-go v0.3.1
github.com/olekukonko/tablewriter v0.0.6-0.20230925090304-df64c4bbad77
github.com/pelletier/go-toml/v2 v2.2.2
github.com/prometheus/client_golang v1.20.1
github.com/spf13/cobra v1.8.1
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.19.0
Expand Down Expand Up @@ -116,7 +117,6 @@ require (
github.com/pganalyze/pg_query_go/v5 v5.1.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_golang v1.20.1 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.55.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
Expand Down
51 changes: 51 additions & 0 deletions internal/services/jsonrpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"syscall"
"time"

"github.com/prometheus/client_golang/prometheus"

"github.com/kwilteam/kwil-db/core/log"
jsonrpc "github.com/kwilteam/kwil-db/core/rpc/json"
"github.com/kwilteam/kwil-db/internal/services/jsonrpc/openrpc"
Expand Down Expand Up @@ -56,6 +58,14 @@ type Server struct {
spec json.RawMessage
authSHA []byte
tlsCfg *tls.Config

// UNSTABLE: this is not much more than a placeholder to ensure we can add
// our own metrics to the global prometheus metrics registry.
metrics map[string]Metrics
}

type Metrics interface {
Inc()
Comment on lines +62 to +68
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's some PoC stuff. I marked this counter-type metric as unstable as we should fully specify all of kwild's metrics and organize them better within a hierarchy of namespaces.

}

type serverConfig struct {
Expand All @@ -66,6 +76,7 @@ type serverConfig struct {
specInfo *openrpc.Info
reqSzLimit int
proxyCount int
namespace string
}

type Opt func(*serverConfig)
Expand Down Expand Up @@ -93,6 +104,13 @@ func WithTrustedProxyCount(trustedProxyCount int) Opt {
}
}

// WithMetricsNamespace enables metrics with the provided namespace.
func WithMetricsNamespace(namespace string) Opt {
return func(c *serverConfig) {
c.namespace = namespace
}
}

// WithServerInfo sets the OpenRPC "info" section to use when serving the
// OpenRPC JSON specification either via a spec REST endpoint or the
// rpc.discover JSON-RPC method.
Expand Down Expand Up @@ -171,6 +189,11 @@ var (
}
)

const (
// This is name of the counter for all JSON-RPC requests (on /rpc/v1).
reqCounterName = "jsonrpc_request_counter_UNSTABLE"
)

// NewServer creates a new JSON-RPC server. Use RegisterMethodHandler or
// RegisterSvc to add method handlers.
func NewServer(addr string, log log.Logger, opts ...Opt) (*Server, error) {
Expand Down Expand Up @@ -201,6 +224,22 @@ func NewServer(addr string, log log.Logger, opts ...Opt) (*Server, error) {
opt(cfg)
}

// A more complete and structured metrics system should to be created, but
// this is a start to ensure we are accessing the global metrics system used
// by cometbft. In Grafana or another prom dash,
// 'kwil_json_rpc_user_server_request_counter_UNSTABLE' will be graphable.
var metrics map[string]Metrics
if cfg.namespace != "" {
counter := prometheus.NewCounter(prometheus.CounterOpts{
Namespace: cfg.namespace,
Name: reqCounterName,
})
prometheus.MustRegister(counter)
metrics = map[string]Metrics{
reqCounterName: counter,
}
}

mux := http.NewServeMux() // http.DefaultServeMux has the pprof endpoints mounted

disconnectTimeout := cfg.timeout + 5*time.Second // for jsonRPCTimeoutHandler to respond, don't disconnect immediately
Expand Down Expand Up @@ -228,6 +267,7 @@ func NewServer(addr string, log log.Logger, opts ...Opt) (*Server, error) {
services: make(map[string]Svc),
specInfo: cfg.specInfo,
tlsCfg: cfg.tlsConfig,
metrics: metrics,
}

if cfg.pass != "" {
Expand All @@ -246,6 +286,7 @@ func NewServer(addr string, log log.Logger, opts ...Opt) (*Server, error) {
if cfg.enableCORS {
h = corsHandler(h)
}
h = reqCounter(h, metrics[reqCounterName])
h = realIPHandler(h, cfg.proxyCount) // for effective rate limiting
h = recoverer(h, log) // first, wrap with defer and call next ^

Expand Down Expand Up @@ -356,6 +397,16 @@ func corsHandler(h http.Handler) http.Handler {
})
}

func reqCounter(h http.Handler, counter Metrics) http.Handler {
if counter == nil {
return h
}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
counter.Inc()
h.ServeHTTP(w, r)
})
}

func recoverer(h http.Handler, log log.Logger) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
defer func() {
Expand Down
Loading