Skip to content

Commit

Permalink
add prometheus metrics server and config
Browse files Browse the repository at this point in the history
  • Loading branch information
jchappelow committed Sep 23, 2024
1 parent 5dc5f27 commit 63b38e8
Show file tree
Hide file tree
Showing 10 changed files with 106 additions and 14 deletions.
5 changes: 5 additions & 0 deletions cmd/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ var DefaultConfig = func() *commonConfig.KwildConfig {
MaxLogSizeKB: 100_000, // 100 MB uncompressed threshold
MaxLogRolls: 0, // the zero value means retain all (don't delete oldest archived logs)
},
Instrumentation: &commonConfig.InstrumentationConfig{
Prometheus: false,
PromListenAddr: "0.0.0.0:26660",
MaxConnections: 1,
},

ChainConfig: &commonConfig.ChainConfig{
P2P: &commonConfig.P2PConfig{
Expand Down
21 changes: 11 additions & 10 deletions cmd/kwild/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,27 +365,28 @@ func DefaultEmptyConfig() *config.KwildConfig {
StateSync: &config.StateSyncConfig{},
Consensus: &config.ConsensusConfig{},
},
Logging: &config.Logging{},
Logging: &config.Logging{},
Instrumentation: &config.InstrumentationConfig{},
}
}

// EmptyConfig returns a config with all fields set to their zero values.
// This is useful for guaranteeing that all fields are set when merging
// EmptyConfig returns a config with all fields set to their zero values (except
// no nil pointers for the sub-sections structs). This is useful for
// guaranteeing that all fields are set when merging.
func EmptyConfig() *config.KwildConfig {
return &config.KwildConfig{
AppConfig: &config.AppConfig{
ExtensionEndpoints: []string{},
},
ChainConfig: &config.ChainConfig{
P2P: &config.P2PConfig{},
RPC: &config.ChainRPCConfig{},
Mempool: &config.MempoolConfig{},
StateSync: &config.StateSyncConfig{
RPCServers: "",
},
P2P: &config.P2PConfig{},
RPC: &config.ChainRPCConfig{},
Mempool: &config.MempoolConfig{},
StateSync: &config.StateSyncConfig{},
Consensus: &config.ConsensusConfig{},
},
Logging: &config.Logging{},
Logging: &config.Logging{},
Instrumentation: &config.InstrumentationConfig{},
}
}

Expand Down
4 changes: 4 additions & 0 deletions cmd/kwild/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ func Test_Config_Toml(t *testing.T) {
assert.Equal(t, "localhost:50052", cfg.AppConfig.ExtensionEndpoints[0])
assert.Equal(t, "localhost:50053", cfg.AppConfig.ExtensionEndpoints[1])

assert.Equal(t, true, cfg.Instrumentation.Prometheus)
assert.Equal(t, 6, cfg.Instrumentation.MaxConnections)
assert.Equal(t, "tcp://9.8.7.6:20660", cfg.Instrumentation.PromListenAddr)

// TODO: Add bunch of other validations for different types
}

Expand Down
14 changes: 13 additions & 1 deletion cmd/kwild/config/default_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -307,4 +307,16 @@ discovery_time = "15s"
chunk_request_timeout = "10s"

# Note: If the requested chunk is not received for a duration of 2 minutes (hard-coded default),
# the state sync process is aborted and the node will fall back to the regular block sync process.
# the state sync process is aborted and the node will fall back to the regular block sync process.

[instrumentation]

# collect and serve are served under /metrics
prometheus = false

# listen address for prometheus metrics
prometheus_listen_addr = "tcp://0.0.0.0:26660"

# Maximum number of simultaneous connections.
# 0 - unlimited.
max_open_connections = 1
5 changes: 5 additions & 0 deletions cmd/kwild/config/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ to instead run a dedicated seeder like https://github.com/kwilteam/cometseed.`))
flagSet.Var(&cfg.ChainConfig.StateSync.DiscoveryTime, "chain.statesync.discovery-time", "Chain state sync discovery time")
flagSet.Var(&cfg.ChainConfig.StateSync.ChunkRequestTimeout, "chain.statesync.chunk-request-timeout", "Chain state sync chunk request timeout")

// Instrumentation flags
flagSet.BoolVar(&cfg.Instrumentation.Prometheus, "instrumentation.prometheus", cfg.Instrumentation.Prometheus, "collect and serve prometheus metrics")
flagSet.StringVar(&cfg.Instrumentation.PromListenAddr, "instrumentation.prometheus-listen-addr", cfg.Instrumentation.PromListenAddr, "listen address for prometheus metrics")
flagSet.IntVar(&cfg.Instrumentation.MaxConnections, "instrumentation.max-open-connections", cfg.Instrumentation.MaxConnections, "maximum number of simultaneous connections")

// TODO: delete in v0.10.0
flagSet.String("app.snapshots.snapshot-dir", "", "Snapshot directory path")
flagSet.MarkDeprecated("app.snapshots.snapshot-dir", "this value is no longer configurable")
Expand Down
12 changes: 12 additions & 0 deletions cmd/kwild/config/test_data/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -301,3 +301,15 @@ chunk_request_timeout = "10s"

# Note: If the requested chunk is not received for a duration of 2 minutes (hard-coded default),
# the state sync process is aborted and the node will fall back to the regular block sync process.

[instrumentation]

# collect and serve are served under /metrics
prometheus = true

# listen address for prometheus metrics
prometheus_listen_addr = "tcp://9.8.7.6:20660"

# Maximum number of simultaneous connections.
# 0 - unlimited.
max_open_connections = 6
7 changes: 7 additions & 0 deletions cmd/kwild/server/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
abciTypes "github.com/cometbft/cometbft/abci/types"
cmtEd "github.com/cometbft/cometbft/crypto/ed25519"
cmtlocal "github.com/cometbft/cometbft/rpc/client/local"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"

kwildcfg "github.com/kwilteam/kwil-db/cmd/kwild/config"
"github.com/kwilteam/kwil-db/common"
Expand Down Expand Up @@ -112,6 +114,11 @@ func buildServer(d *coreDependencies, closers *closeFuncs) *Server {
// FinalizeBlock+Commit. This is not just a constructor, sadly.
cometBftNode := buildCometNode(d, closers, abciApp)

prometheus.MustRegister(
collectors.NewBuildInfoCollector(),
// collectors.NewDBStatsCollector() // TODO: do something like this for pg.DB
)

// Give abci p2p module access to removing peers
p2p.SetRemovePeerFn(cometBftNode.RemovePeer)

Expand Down
7 changes: 7 additions & 0 deletions cmd/kwild/server/cometbft.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@ func newCometConfig(cfg *config.KwildConfig) *cmtCfg.Config {
nodeCfg.StateSync.DiscoveryTime = time.Duration(userChainCfg.StateSync.DiscoveryTime)
nodeCfg.StateSync.ChunkRequestTimeout = time.Duration(userChainCfg.StateSync.ChunkRequestTimeout)

nodeCfg.Instrumentation = &cmtCfg.InstrumentationConfig{
Prometheus: cfg.Instrumentation.Prometheus,
PrometheusListenAddr: cfg.Instrumentation.PromListenAddr,
MaxOpenConnections: cfg.Instrumentation.MaxConnections,
Namespace: "cometbft",
}

// Light client verification
nodeCfg.StateSync.TrustPeriod = 36000 * time.Second // 10 hours (6s block time)

Expand Down
13 changes: 10 additions & 3 deletions common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ import (
type KwildConfig struct {
RootDir string

AppConfig *AppConfig `mapstructure:"app"`
ChainConfig *ChainConfig `mapstructure:"chain"`
Logging *Logging `mapstructure:"log"`
AppConfig *AppConfig `mapstructure:"app"`
ChainConfig *ChainConfig `mapstructure:"chain"`
Logging *Logging `mapstructure:"log"`
Instrumentation *InstrumentationConfig `mapstructure:"instrumentation"`
}

type Logging struct {
Expand All @@ -36,6 +37,12 @@ type Logging struct {
MaxLogRolls int `mapstructure:"retain_max_rolls"`
}

type InstrumentationConfig struct {
Prometheus bool `mapstructure:"prometheus"`
PromListenAddr string `mapstructure:"prometheus_listen_addr"`
MaxConnections int `mapstructure:"max_open_connections"`
}

type AppConfig struct {
JSONRPCListenAddress string `mapstructure:"jsonrpc_listen_addr"`
AdminListenAddress string `mapstructure:"admin_listen_addr"`
Expand Down
32 changes: 32 additions & 0 deletions internal/services/jsonrpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"syscall"
"time"

"github.com/prometheus/client_golang/prometheus"

"github.com/kwilteam/kwil-db/core/log"
jsonrpc "github.com/kwilteam/kwil-db/core/rpc/json"
"github.com/kwilteam/kwil-db/internal/services/jsonrpc/openrpc"
Expand Down Expand Up @@ -56,6 +58,14 @@ type Server struct {
spec json.RawMessage
authSHA []byte
tlsCfg *tls.Config

// UNSTABLE: this is not much more than a placeholder to ensure we can add
// our own metrics to the global prometheus metrics registry.
metrics map[string]Metrics
}

type Metrics interface {
Inc()
}

type serverConfig struct {
Expand Down Expand Up @@ -171,9 +181,21 @@ var (
}
)

type Mux interface {
Handle()
}

// NewServer creates a new JSON-RPC server. Use RegisterMethodHandler or
// RegisterSvc to add method handlers.
func NewServer(addr string, log log.Logger, opts ...Opt) (*Server, error) {
counter := prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "json-rpc-server",
Name: "JSON RPC Server counter (UNSTABLE)",
})
metrics := map[string]Metrics{
"req_counter": counter,
}

addr, isUNIX, err := checkAddr(addr)
if err != nil {
return nil, err
Expand Down Expand Up @@ -228,6 +250,7 @@ func NewServer(addr string, log log.Logger, opts ...Opt) (*Server, error) {
services: make(map[string]Svc),
specInfo: cfg.specInfo,
tlsCfg: cfg.tlsConfig,
metrics: metrics,
}

if cfg.pass != "" {
Expand All @@ -246,6 +269,7 @@ func NewServer(addr string, log log.Logger, opts ...Opt) (*Server, error) {
if cfg.enableCORS {
h = corsHandler(h)
}
h = reqCounter(h, metrics)
h = realIPHandler(h, cfg.proxyCount) // for effective rate limiting
h = recoverer(h, log) // first, wrap with defer and call next ^

Expand Down Expand Up @@ -356,6 +380,14 @@ func corsHandler(h http.Handler) http.Handler {
})
}

func reqCounter(h http.Handler, metrics map[string]Metrics) http.Handler {
reqCounter := metrics["req_count"]
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
reqCounter.Inc()
h.ServeHTTP(w, r)
})
}

func recoverer(h http.Handler, log log.Logger) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
defer func() {
Expand Down

0 comments on commit 63b38e8

Please sign in to comment.