Skip to content

Commit

Permalink
feat(metrics): add network tagging to context across multiple files
Browse files Browse the repository at this point in the history
  • Loading branch information
virajbhartiya committed Dec 11, 2024
1 parent 78e0bf5 commit be5d79a
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 7 deletions.
10 changes: 9 additions & 1 deletion blockstore/splitstore/splitstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ import (
ipld "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log/v2"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.uber.org/multierr"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-state-types/abi"

bstore "github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/lotus/build/buildconstants"
"github.com/filecoin-project/lotus/chain/actors/policy"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/types"
Expand Down Expand Up @@ -266,7 +268,13 @@ func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Co
ss.txnViewsCond.L = &ss.txnViewsMx
ss.txnSyncCond.L = &ss.txnSyncMx
ss.chainSyncCond.L = &ss.chainSyncMx
ss.ctx, ss.cancel = context.WithCancel(context.Background())

baseCtx := context.Background()
ctx, err := tag.New(baseCtx, tag.Insert(metrics.Network, buildconstants.NetworkBundle))
if err != nil {
return nil, xerrors.Errorf("failed to create context with network tag: %w", err)
}
ss.ctx, ss.cancel = context.WithCancel(ctx)

ss.reifyCond.L = &ss.reifyMx
ss.reifyPend = make(map[cid.Cid]struct{})
Expand Down
20 changes: 16 additions & 4 deletions node/modules/lp2p/rcmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ import (
"path/filepath"
"sync"

"github.com/filecoin-project/lotus/build/buildconstants"
"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/network"
Expand All @@ -21,10 +25,6 @@ import (
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.uber.org/fx"

"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo"
)

var rcmgrMetricsOnce sync.Once
Expand Down Expand Up @@ -179,6 +179,7 @@ type rcmgrMetrics struct{}

func (r rcmgrMetrics) AllowConn(dir network.Direction, usefd bool) {
ctx := context.Background()
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle))
if dir == network.DirInbound {
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "inbound"))
} else {
Expand All @@ -194,6 +195,7 @@ func (r rcmgrMetrics) AllowConn(dir network.Direction, usefd bool) {

func (r rcmgrMetrics) BlockConn(dir network.Direction, usefd bool) {
ctx := context.Background()
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle))
if dir == network.DirInbound {
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "inbound"))
} else {
Expand All @@ -209,6 +211,7 @@ func (r rcmgrMetrics) BlockConn(dir network.Direction, usefd bool) {

func (r rcmgrMetrics) AllowStream(p peer.ID, dir network.Direction) {
ctx := context.Background()
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle))
if dir == network.DirInbound {
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "inbound"))
} else {
Expand All @@ -219,6 +222,7 @@ func (r rcmgrMetrics) AllowStream(p peer.ID, dir network.Direction) {

func (r rcmgrMetrics) BlockStream(p peer.ID, dir network.Direction) {
ctx := context.Background()
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle))
if dir == network.DirInbound {
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "inbound"))
} else {
Expand All @@ -229,46 +233,54 @@ func (r rcmgrMetrics) BlockStream(p peer.ID, dir network.Direction) {

func (r rcmgrMetrics) AllowPeer(p peer.ID) {
ctx := context.Background()
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle))
stats.Record(ctx, metrics.RcmgrAllowPeer.M(1))
}

func (r rcmgrMetrics) BlockPeer(p peer.ID) {
ctx := context.Background()
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle))
stats.Record(ctx, metrics.RcmgrBlockPeer.M(1))
}

func (r rcmgrMetrics) AllowProtocol(proto protocol.ID) {
ctx := context.Background()
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle))
ctx, _ = tag.New(ctx, tag.Upsert(metrics.ProtocolID, string(proto)))
stats.Record(ctx, metrics.RcmgrAllowProto.M(1))
}

func (r rcmgrMetrics) BlockProtocol(proto protocol.ID) {
ctx := context.Background()
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle))
ctx, _ = tag.New(ctx, tag.Upsert(metrics.ProtocolID, string(proto)))
stats.Record(ctx, metrics.RcmgrBlockProto.M(1))
}

func (r rcmgrMetrics) BlockProtocolPeer(proto protocol.ID, p peer.ID) {
ctx := context.Background()
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle))
ctx, _ = tag.New(ctx, tag.Upsert(metrics.ProtocolID, string(proto)))
stats.Record(ctx, metrics.RcmgrBlockProtoPeer.M(1))
}

func (r rcmgrMetrics) AllowService(svc string) {
ctx := context.Background()
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle))
ctx, _ = tag.New(ctx, tag.Upsert(metrics.ServiceID, svc))
stats.Record(ctx, metrics.RcmgrAllowSvc.M(1))
}

func (r rcmgrMetrics) BlockService(svc string) {
ctx := context.Background()
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle))
ctx, _ = tag.New(ctx, tag.Upsert(metrics.ServiceID, svc))
stats.Record(ctx, metrics.RcmgrBlockSvc.M(1))
}

func (r rcmgrMetrics) BlockServicePeer(svc string, p peer.ID) {
ctx := context.Background()
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle))
ctx, _ = tag.New(ctx, tag.Upsert(metrics.ServiceID, svc))
stats.Record(ctx, metrics.RcmgrBlockSvcPeer.M(1))
}
Expand Down
5 changes: 4 additions & 1 deletion node/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/v0api"
"github.com/filecoin-project/lotus/api/v1api"
"github.com/filecoin-project/lotus/build/buildconstants"
"github.com/filecoin-project/lotus/lib/rpcenc"
"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/metrics/proxy"
Expand All @@ -48,7 +49,9 @@ func ServeRPC(h http.Handler, id string, addr multiaddr.Multiaddr) (StopFunc, er
Handler: h,
ReadHeaderTimeout: 30 * time.Second,
BaseContext: func(listener net.Listener) context.Context {
ctx, _ := tag.New(context.Background(), tag.Upsert(metrics.APIInterface, id))
ctx := context.Background()
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle))
ctx, _ = tag.New(ctx, tag.Upsert(metrics.APIInterface, id))
return ctx
},
}
Expand Down
13 changes: 12 additions & 1 deletion paychmgr/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ import (
"github.com/filecoin-project/lotus/chain/actors/builtin/paych"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/types"

"github.com/filecoin-project/lotus/build/buildconstants"
"github.com/filecoin-project/lotus/metrics"
"go.opencensus.io/tag"
)

var log = logging.Logger("paych")
Expand Down Expand Up @@ -70,6 +74,10 @@ type Manager struct {

func NewManager(ctx context.Context, shutdown func(), sm stmgr.StateManagerAPI, pchstore *Store, api PaychAPI) *Manager {
impl := &managerAPIImpl{StateManagerAPI: sm, PaychAPI: api}

// Add network tag to context
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle))

return &Manager{
ctx: ctx,
shutdown: shutdown,
Expand All @@ -82,13 +90,16 @@ func NewManager(ctx context.Context, shutdown func(), sm stmgr.StateManagerAPI,

// newManager is used by the tests to supply mocks
func newManager(pchstore *Store, pchapi managerAPI) (*Manager, error) {
ctx := context.Background()
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle))

pm := &Manager{
store: pchstore,
sa: &stateAccessor{sm: pchapi},
channels: make(map[string]*channelAccessor),
pchapi: pchapi,
}
pm.ctx, pm.shutdown = context.WithCancel(context.Background())
pm.ctx, pm.shutdown = context.WithCancel(ctx)
return pm, pm.Start()
}

Expand Down
14 changes: 14 additions & 0 deletions storage/paths/db_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/filecoin-project/go-state-types/abi"

"github.com/filecoin-project/lotus/build/buildconstants"
"github.com/filecoin-project/lotus/journal/alerting"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/metrics"
Expand Down Expand Up @@ -46,6 +47,12 @@ func NewDBIndex(al *alerting.Alerting, db *harmonydb.DB) *DBIndex {
}
}

// addNetworkTag adds the network tag to the context for metrics
func addNetworkTag(ctx context.Context) context.Context {
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle))
return ctx
}

func (dbi *DBIndex) StorageList(ctx context.Context) (map[storiface.ID][]storiface.Decl, error) {

var sectorEntries []struct {
Expand Down Expand Up @@ -120,6 +127,8 @@ func splitString(str string) []string {
}

func (dbi *DBIndex) StorageAttach(ctx context.Context, si storiface.StorageInfo, st fsutil.FsStat) error {
ctx = addNetworkTag(ctx)

var allow, deny = make([]string, 0, len(si.AllowTypes)), make([]string, 0, len(si.DenyTypes))

if _, hasAlert := dbi.pathAlerts[si.ID]; dbi.alerting != nil && !hasAlert {
Expand Down Expand Up @@ -312,6 +321,8 @@ func (dbi *DBIndex) StorageDetach(ctx context.Context, id storiface.ID, url stri
}

func (dbi *DBIndex) StorageReportHealth(ctx context.Context, id storiface.ID, report storiface.HealthReport) error {
ctx = addNetworkTag(ctx)

retryWait := time.Millisecond * 20
retryReportHealth:
_, err := dbi.harmonyDB.Exec(ctx,
Expand Down Expand Up @@ -378,6 +389,7 @@ func (dbi *DBIndex) checkFileType(fileType storiface.SectorFileType) bool {
}

func (dbi *DBIndex) StorageDeclareSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType, primary bool) error {
ctx = addNetworkTag(ctx)

if !dbi.checkFileType(ft) {
return xerrors.Errorf("invalid filetype")
Expand Down Expand Up @@ -678,6 +690,8 @@ func (dbi *DBIndex) StorageInfo(ctx context.Context, id storiface.ID) (storiface
}

func (dbi *DBIndex) StorageBestAlloc(ctx context.Context, allocate storiface.SectorFileType, ssize abi.SectorSize, pathType storiface.PathType, miner abi.ActorID) ([]storiface.StorageInfo, error) {
ctx = addNetworkTag(ctx)

var err error
var spaceReq uint64
switch pathType {
Expand Down
11 changes: 11 additions & 0 deletions storage/wdpost/wdpost_changehandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ import (
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/dline"

"github.com/filecoin-project/lotus/build/buildconstants"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/metrics"
"go.opencensus.io/tag"
)

const (
Expand Down Expand Up @@ -51,6 +54,7 @@ func (ch *changeHandler) start() {
}

func (ch *changeHandler) update(ctx context.Context, revert *types.TipSet, advance *types.TipSet) error {
ctx = addNetworkTag(ctx)
// Get the current deadline period
di, err := ch.api.StateMinerProvingDeadline(ctx, ch.actor, advance.Key())
if err != nil {
Expand Down Expand Up @@ -210,6 +214,7 @@ func (p *proveHandler) run() {
}

func (p *proveHandler) processHeadChange(ctx context.Context, newTS *types.TipSet, di *dline.Info) {
ctx = addNetworkTag(ctx)
// If the post window has expired, abort the current proof
if p.current != nil && newTS.Height() >= p.current.di.Close {
// Cancel the context on the current proof
Expand Down Expand Up @@ -387,6 +392,7 @@ func (s *submitHandler) run() {

// processHeadChange is called when the chain head changes
func (s *submitHandler) processHeadChange(ctx context.Context, revert *types.TipSet, advance *types.TipSet, di *dline.Info) {
ctx = addNetworkTag(ctx)
s.currentCtx = ctx
s.currentTS = advance
s.currentDI = di
Expand Down Expand Up @@ -540,3 +546,8 @@ func NextDeadline(currentDeadline *dline.Info) *dline.Info {
func NewDeadlineInfo(periodStart abi.ChainEpoch, deadlineIdx uint64, currEpoch abi.ChainEpoch) *dline.Info {
return dline.NewInfo(periodStart, deadlineIdx, currEpoch, miner.WPoStPeriodDeadlines, miner.WPoStProvingPeriod(), miner.WPoStChallengeWindow(), miner.WPoStChallengeLookback, miner.FaultDeclarationCutoff)
}

func addNetworkTag(ctx context.Context) context.Context {
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle))
return ctx
}

0 comments on commit be5d79a

Please sign in to comment.