Skip to content

Commit

Permalink
feat(metrics): implement AddNetworkTag function for consistent networ…
Browse files Browse the repository at this point in the history
…k tagging across multiple modules
  • Loading branch information
virajbhartiya committed Dec 12, 2024
1 parent 65a02f6 commit 775cfac
Show file tree
Hide file tree
Showing 9 changed files with 29 additions and 42 deletions.
6 changes: 6 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
rpcmetrics "github.com/filecoin-project/go-jsonrpc/metrics"

"github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/lotus/build/buildconstants"
)

// Distribution
Expand Down Expand Up @@ -851,3 +852,8 @@ func Timer(ctx context.Context, m *stats.Float64Measure) func() time.Duration {
return time.Since(start)
}
}

func AddNetworkTag(ctx context.Context) context.Context {
ctx, _ = tag.New(ctx, tag.Upsert(Network, buildconstants.NetworkBundle))
return ctx
}
22 changes: 11 additions & 11 deletions node/modules/lp2p/rcmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (r rcmgrMetrics) AllowConn(dir network.Direction, usefd bool) {

func (r rcmgrMetrics) BlockConn(dir network.Direction, usefd bool) {
ctx := context.Background()
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle))
ctx = metrics.AddNetworkTag(ctx)
if dir == network.DirInbound {
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "inbound"))
} else {
Expand All @@ -212,7 +212,7 @@ func (r rcmgrMetrics) BlockConn(dir network.Direction, usefd bool) {

func (r rcmgrMetrics) AllowStream(p peer.ID, dir network.Direction) {
ctx := context.Background()
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle))
ctx = metrics.AddNetworkTag(ctx)
if dir == network.DirInbound {
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "inbound"))
} else {
Expand All @@ -223,7 +223,7 @@ func (r rcmgrMetrics) AllowStream(p peer.ID, dir network.Direction) {

func (r rcmgrMetrics) BlockStream(p peer.ID, dir network.Direction) {
ctx := context.Background()
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle))
ctx = metrics.AddNetworkTag(ctx)
if dir == network.DirInbound {
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "inbound"))
} else {
Expand All @@ -234,54 +234,54 @@ func (r rcmgrMetrics) BlockStream(p peer.ID, dir network.Direction) {

func (r rcmgrMetrics) AllowPeer(p peer.ID) {
ctx := context.Background()
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle))
ctx = metrics.AddNetworkTag(ctx)
stats.Record(ctx, metrics.RcmgrAllowPeer.M(1))
}

func (r rcmgrMetrics) BlockPeer(p peer.ID) {
ctx := context.Background()
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle))
ctx = metrics.AddNetworkTag(ctx)
stats.Record(ctx, metrics.RcmgrBlockPeer.M(1))
}

func (r rcmgrMetrics) AllowProtocol(proto protocol.ID) {
ctx := context.Background()
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle))
ctx = metrics.AddNetworkTag(ctx)
ctx, _ = tag.New(ctx, tag.Upsert(metrics.ProtocolID, string(proto)))
stats.Record(ctx, metrics.RcmgrAllowProto.M(1))
}

func (r rcmgrMetrics) BlockProtocol(proto protocol.ID) {
ctx := context.Background()
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle))
ctx = metrics.AddNetworkTag(ctx)
ctx, _ = tag.New(ctx, tag.Upsert(metrics.ProtocolID, string(proto)))
stats.Record(ctx, metrics.RcmgrBlockProto.M(1))
}

func (r rcmgrMetrics) BlockProtocolPeer(proto protocol.ID, p peer.ID) {
ctx := context.Background()
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle))
ctx = metrics.AddNetworkTag(ctx)
ctx, _ = tag.New(ctx, tag.Upsert(metrics.ProtocolID, string(proto)))
stats.Record(ctx, metrics.RcmgrBlockProtoPeer.M(1))
}

func (r rcmgrMetrics) AllowService(svc string) {
ctx := context.Background()
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle))
ctx = metrics.AddNetworkTag(ctx)
ctx, _ = tag.New(ctx, tag.Upsert(metrics.ServiceID, svc))
stats.Record(ctx, metrics.RcmgrAllowSvc.M(1))
}

func (r rcmgrMetrics) BlockService(svc string) {
ctx := context.Background()
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle))
ctx = metrics.AddNetworkTag(ctx)
ctx, _ = tag.New(ctx, tag.Upsert(metrics.ServiceID, svc))
stats.Record(ctx, metrics.RcmgrBlockSvc.M(1))
}

func (r rcmgrMetrics) BlockServicePeer(svc string, p peer.ID) {
ctx := context.Background()
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle))
ctx = metrics.AddNetworkTag(ctx)
ctx, _ = tag.New(ctx, tag.Upsert(metrics.ServiceID, svc))
stats.Record(ctx, metrics.RcmgrBlockSvcPeer.M(1))
}
Expand Down
3 changes: 2 additions & 1 deletion node/modules/paych.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"go.uber.org/fx"

"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/node/impl/full"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/modules/helpers"
Expand All @@ -17,7 +18,7 @@ import (
func NewManager(mctx helpers.MetricsCtx, lc fx.Lifecycle, sm stmgr.StateManagerAPI, pchstore *paychmgr.Store, api paychmgr.PaychAPI) *paychmgr.Manager {
ctx := helpers.LifecycleCtx(mctx, lc)
ctx, shutdown := context.WithCancel(ctx)

ctx = metrics.AddNetworkTag(ctx)
return paychmgr.NewManager(ctx, shutdown, sm, pchstore, api)
}

Expand Down
4 changes: 3 additions & 1 deletion node/modules/storageminer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
"github.com/jpillora/backoff"
"go.opencensus.io/tag"
"go.uber.org/fx"
"go.uber.org/multierr"
"golang.org/x/xerrors"
Expand All @@ -33,6 +34,7 @@ import (
"github.com/filecoin-project/lotus/chain/lf3"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/metrics"
lotusminer "github.com/filecoin-project/lotus/miner"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules/dtypes"
Expand Down Expand Up @@ -289,7 +291,7 @@ func WindowPostScheduler(fc config.MinerFeeConfig, pc config.ProvingConfig) func
)

ctx := helpers.LifecycleCtx(mctx, lc)

ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle))
fps, err := wdpost.NewWindowedPoStScheduler(api, fc, pc, as, sealer, verif, sealer, j, []dtypes.MinerAddress{params.Maddr})

if err != nil {
Expand Down
3 changes: 1 addition & 2 deletions node/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/v0api"
"github.com/filecoin-project/lotus/api/v1api"
"github.com/filecoin-project/lotus/build/buildconstants"
"github.com/filecoin-project/lotus/lib/rpcenc"
"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/metrics/proxy"
Expand Down Expand Up @@ -50,7 +49,7 @@ func ServeRPC(h http.Handler, id string, addr multiaddr.Multiaddr) (StopFunc, er
ReadHeaderTimeout: 30 * time.Second,
BaseContext: func(listener net.Listener) context.Context {
ctx := context.Background()
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle))
ctx = metrics.AddNetworkTag(ctx)
ctx, _ = tag.New(ctx, tag.Upsert(metrics.APIInterface, id))
return ctx
},
Expand Down
4 changes: 0 additions & 4 deletions paychmgr/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,6 @@ type Manager struct {

func NewManager(ctx context.Context, shutdown func(), sm stmgr.StateManagerAPI, pchstore *Store, api PaychAPI) *Manager {
impl := &managerAPIImpl{StateManagerAPI: sm, PaychAPI: api}

// Add network tag to context
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle))

return &Manager{
ctx: ctx,
shutdown: shutdown,
Expand Down
15 changes: 4 additions & 11 deletions storage/paths/db_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (

"github.com/filecoin-project/go-state-types/abi"

"github.com/filecoin-project/lotus/build/buildconstants"
"github.com/filecoin-project/lotus/journal/alerting"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/metrics"
Expand Down Expand Up @@ -47,12 +46,6 @@ func NewDBIndex(al *alerting.Alerting, db *harmonydb.DB) *DBIndex {
}
}

// addNetworkTag adds the network tag to the context for metrics
func addNetworkTag(ctx context.Context) context.Context {
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle))
return ctx
}

func (dbi *DBIndex) StorageList(ctx context.Context) (map[storiface.ID][]storiface.Decl, error) {

var sectorEntries []struct {
Expand Down Expand Up @@ -127,7 +120,7 @@ func splitString(str string) []string {
}

func (dbi *DBIndex) StorageAttach(ctx context.Context, si storiface.StorageInfo, st fsutil.FsStat) error {
ctx = addNetworkTag(ctx)
ctx = metrics.AddNetworkTag(ctx)

var allow, deny = make([]string, 0, len(si.AllowTypes)), make([]string, 0, len(si.DenyTypes))

Expand Down Expand Up @@ -321,7 +314,7 @@ func (dbi *DBIndex) StorageDetach(ctx context.Context, id storiface.ID, url stri
}

func (dbi *DBIndex) StorageReportHealth(ctx context.Context, id storiface.ID, report storiface.HealthReport) error {
ctx = addNetworkTag(ctx)
ctx = metrics.AddNetworkTag(ctx)

retryWait := time.Millisecond * 20
retryReportHealth:
Expand Down Expand Up @@ -389,7 +382,7 @@ func (dbi *DBIndex) checkFileType(fileType storiface.SectorFileType) bool {
}

func (dbi *DBIndex) StorageDeclareSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType, primary bool) error {
ctx = addNetworkTag(ctx)
ctx = metrics.AddNetworkTag(ctx)

if !dbi.checkFileType(ft) {
return xerrors.Errorf("invalid filetype")
Expand Down Expand Up @@ -690,7 +683,7 @@ func (dbi *DBIndex) StorageInfo(ctx context.Context, id storiface.ID) (storiface
}

func (dbi *DBIndex) StorageBestAlloc(ctx context.Context, allocate storiface.SectorFileType, ssize abi.SectorSize, pathType storiface.PathType, miner abi.ActorID) ([]storiface.StorageInfo, error) {
ctx = addNetworkTag(ctx)
ctx = metrics.AddNetworkTag(ctx)

var err error
var spaceReq uint64
Expand Down
13 changes: 2 additions & 11 deletions storage/wdpost/wdpost_changehandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,10 @@ import (
"context"
"sync"

"go.opencensus.io/tag"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/dline"

"github.com/filecoin-project/lotus/build/buildconstants"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/metrics"
Expand Down Expand Up @@ -55,7 +52,6 @@ func (ch *changeHandler) start() {
}

func (ch *changeHandler) update(ctx context.Context, revert *types.TipSet, advance *types.TipSet) error {
ctx = addNetworkTag(ctx)
// Get the current deadline period
di, err := ch.api.StateMinerProvingDeadline(ctx, ch.actor, advance.Key())
if err != nil {
Expand Down Expand Up @@ -215,7 +211,7 @@ func (p *proveHandler) run() {
}

func (p *proveHandler) processHeadChange(ctx context.Context, newTS *types.TipSet, di *dline.Info) {
ctx = addNetworkTag(ctx)
ctx = metrics.AddNetworkTag(ctx)
// If the post window has expired, abort the current proof
if p.current != nil && newTS.Height() >= p.current.di.Close {
// Cancel the context on the current proof
Expand Down Expand Up @@ -393,7 +389,7 @@ func (s *submitHandler) run() {

// processHeadChange is called when the chain head changes
func (s *submitHandler) processHeadChange(ctx context.Context, revert *types.TipSet, advance *types.TipSet, di *dline.Info) {
ctx = addNetworkTag(ctx)
ctx = metrics.AddNetworkTag(ctx)
s.currentCtx = ctx
s.currentTS = advance
s.currentDI = di
Expand Down Expand Up @@ -547,8 +543,3 @@ func NextDeadline(currentDeadline *dline.Info) *dline.Info {
func NewDeadlineInfo(periodStart abi.ChainEpoch, deadlineIdx uint64, currEpoch abi.ChainEpoch) *dline.Info {
return dline.NewInfo(periodStart, deadlineIdx, currEpoch, miner.WPoStPeriodDeadlines, miner.WPoStProvingPeriod(), miner.WPoStChallengeWindow(), miner.WPoStChallengeLookback, miner.FaultDeclarationCutoff)
}

func addNetworkTag(ctx context.Context) context.Context {
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle))
return ctx
}
1 change: 0 additions & 1 deletion storage/wdpost/wdpost_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,6 @@ func (s *WindowPoStScheduler) Run(ctx context.Context) {
}

ctx, span := trace.StartSpan(ctx, "WindowPoStScheduler.headChange")

s.update(ctx, nil, chg.Val)

span.End()
Expand Down

0 comments on commit 775cfac

Please sign in to comment.