Skip to content

Commit

Permalink
feat: add logging to metrics and waitgroups for goroutines (#80)
Browse files Browse the repository at this point in the history
* fix: update cosmos dependencies

* fix: metrics count

* update dependencies
  • Loading branch information
randyahx authored Sep 13, 2023
1 parent ba7c138 commit c447869
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 39 deletions.
9 changes: 8 additions & 1 deletion attest/attest_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type AttestMonitor struct {
attestedChallengeIds map[uint64]bool // used to save the last attested challenge id
dataProvider DataProvider
metricService *metrics.MetricService
wg sync.WaitGroup
}

func NewAttestMonitor(executor *executor.Executor, dataProvider DataProvider, metricService *metrics.MetricService) *AttestMonitor {
Expand Down Expand Up @@ -51,14 +52,20 @@ func (a *AttestMonitor) UpdateAttestedChallengeIdLoop() {
if queryCount > MaxQueryCount {
a.attestedChallengeIds = make(map[uint64]bool, 0)
}

a.wg.Wait()
}
}

// updateAttestedCacheAndEventStatus only updates new entries
func (a *AttestMonitor) updateAttestedCacheAndEventStatus(old map[uint64]bool, latest []uint64) {
for _, challengeId := range latest {
if _, ok := old[challengeId]; !ok {
go a.updateEventStatus(challengeId)
a.wg.Add(1)
go func(challengeId uint64) {
defer a.wg.Done() // Decrement the WaitGroup when the goroutine is done
a.updateEventStatus(challengeId)
}(challengeId)
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ require (
cosmossdk.io/math v1.0.1
github.com/avast/retry-go/v4 v4.3.1
github.com/aws/aws-sdk-go v1.40.45
github.com/bnb-chain/greenfield v0.2.4
github.com/bnb-chain/greenfield-common/go v0.0.0-20230809025353-fd0519705054
github.com/bnb-chain/greenfield-go-sdk v0.2.4
github.com/bnb-chain/greenfield v0.2.5-alpha.3
github.com/bnb-chain/greenfield-common/go v0.0.0-20230830120314-a54ffd6da39f
github.com/bnb-chain/greenfield-go-sdk v0.2.5-alpha.1
github.com/cometbft/cometbft v0.37.2
github.com/cosmos/cosmos-sdk v0.47.3
github.com/ethereum/go-ethereum v1.10.26
Expand Down Expand Up @@ -173,7 +173,7 @@ replace (
github.com/cometbft/cometbft => github.com/bnb-chain/greenfield-cometbft v0.0.3
github.com/cometbft/cometbft-db => github.com/bnb-chain/greenfield-cometbft-db v0.8.1-alpha.1
github.com/confio/ics23/go => github.com/cosmos/cosmos-sdk/ics23/go v0.8.0
github.com/cosmos/cosmos-sdk => github.com/bnb-chain/greenfield-cosmos-sdk v0.2.4
github.com/cosmos/cosmos-sdk => github.com/bnb-chain/greenfield-cosmos-sdk v0.2.5-alpha.1
github.com/cosmos/iavl => github.com/bnb-chain/greenfield-iavl v0.20.1
github.com/ferranbt/fastssz => github.com/prysmaticlabs/fastssz v0.0.0-20220110145812-fafb696cae88
github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1
Expand Down
16 changes: 8 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -166,22 +166,22 @@ github.com/bgentry/speakeasy v0.1.1-0.20220910012023-760eaf8b6816 h1:41iFGWnSlI2
github.com/bgentry/speakeasy v0.1.1-0.20220910012023-760eaf8b6816/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84=
github.com/bmizerany/pat v0.0.0-20170815010413-6226ea591a40/go.mod h1:8rLXio+WjiTceGBHIoTvn60HIbs7Hm7bcHjyrSqYB9c=
github.com/bnb-chain/greenfield v0.2.4 h1:3knYY3KbEYoysnTAp3+oh2YyeWG2Au0kXYSdZHyzV+k=
github.com/bnb-chain/greenfield v0.2.4/go.mod h1:7FzduaDVOXpbiWMo0JoH8odXgwEfGJ3ug20BIgPDVKE=
github.com/bnb-chain/greenfield v0.2.5-alpha.3 h1:kjbRRbBJdD35YDYYVMduCzJ/YUwX9KVa8H5Pl8w8ZNQ=
github.com/bnb-chain/greenfield v0.2.5-alpha.3/go.mod h1:/EubHc4hKZqGRwxHgKNZA5twFBYOW74+gM6lQz5bZP8=
github.com/bnb-chain/greenfield-cometbft v0.0.3 h1:tv8NMy3bzX/1urqXGQIIAZSLy83loiI+dG0VKeyh1CY=
github.com/bnb-chain/greenfield-cometbft v0.0.3/go.mod h1:f35mk/r5ab6yvzlqEWZt68LfUje68sYgMpVlt2CUYMk=
github.com/bnb-chain/greenfield-cometbft-db v0.8.1-alpha.1 h1:XcWulGacHVRiSCx90Q8Y//ajOrLNBQWR/KDB89dy3cU=
github.com/bnb-chain/greenfield-cometbft-db v0.8.1-alpha.1/go.mod h1:ey1CiK4bYo1RBNJLRiVbYr5CMdSxci9S/AZRINLtppI=
github.com/bnb-chain/greenfield-common/go v0.0.0-20230809025353-fd0519705054 h1:74pdUdHjo9QNgjSifIgzbDcloqFJ2I+qo715tOXy/oM=
github.com/bnb-chain/greenfield-common/go v0.0.0-20230809025353-fd0519705054/go.mod h1:GEjCahULmz99qx5k8WGWa7cTXIUjoNMNW+J92I+kTWg=
github.com/bnb-chain/greenfield-cosmos-sdk v0.2.4 h1:09ST+MTEAyjyBSc4ZjZzHxpNLMnIIkZ518jJVRtrKFc=
github.com/bnb-chain/greenfield-cosmos-sdk v0.2.4/go.mod h1:y3hDhQhil5hMIhwBTpu07RZBF30ZITkoE+GHhVZChtY=
github.com/bnb-chain/greenfield-common/go v0.0.0-20230830120314-a54ffd6da39f h1:zJvB2wCd80DQ9Nh/ZNQiP8MrHygSpDoav7OzHyIi/pM=
github.com/bnb-chain/greenfield-common/go v0.0.0-20230830120314-a54ffd6da39f/go.mod h1:it3JJVHeG9Wp4QED2GkY/7V9Qo3BuPdoC5/4/U6ecJM=
github.com/bnb-chain/greenfield-cosmos-sdk v0.2.5-alpha.1 h1:E6AOXDWIcFC6e4KAqI7XzMWn8nZQcGjl1ESBRwrhVh8=
github.com/bnb-chain/greenfield-cosmos-sdk v0.2.5-alpha.1/go.mod h1:y3hDhQhil5hMIhwBTpu07RZBF30ZITkoE+GHhVZChtY=
github.com/bnb-chain/greenfield-cosmos-sdk/api v0.0.0-20230816082903-b48770f5e210 h1:GHPbV2bC+gmuO6/sG0Tm8oGal3KKSRlyE+zPscDjlA8=
github.com/bnb-chain/greenfield-cosmos-sdk/api v0.0.0-20230816082903-b48770f5e210/go.mod h1:vhsZxXE9tYJeYB5JR4hPhd6Pc/uPf7j1T8IJ7p9FdeM=
github.com/bnb-chain/greenfield-cosmos-sdk/math v0.0.0-20230816082903-b48770f5e210 h1:FLVOn4+OVbsKi2+YJX5kmD27/4dRu4FW7xCXFhzDO5s=
github.com/bnb-chain/greenfield-cosmos-sdk/math v0.0.0-20230816082903-b48770f5e210/go.mod h1:An0MllWJY6PxibUpnwGk8jOm+a/qIxlKmL5Zyp9NnaM=
github.com/bnb-chain/greenfield-go-sdk v0.2.4 h1:cYts8MZ8+jiW/+3uTRJAT7OKbex5YnViDebizLYcqAs=
github.com/bnb-chain/greenfield-go-sdk v0.2.4/go.mod h1:nd5oQUIuWMi3U0KJRbmM1dJClKLQed5l5GnooiRcFxw=
github.com/bnb-chain/greenfield-go-sdk v0.2.5-alpha.1 h1:8RZD4NJYIEJzz7qqY37gcUDLJCh4IJonuRJoK1nAHWU=
github.com/bnb-chain/greenfield-go-sdk v0.2.5-alpha.1/go.mod h1:h2cdj5hOaPhoFBrI36gYexjCr1aCAQjXBq3zNlAqjO0=
github.com/bnb-chain/greenfield-iavl v0.20.1 h1:y3L64GU99otNp27/xLVBTDbv4eroR6CzoYz0rbaVotM=
github.com/bnb-chain/greenfield-iavl v0.20.1/go.mod h1:oLksTs8dfh7DYIKBro7hbRQ+ewls7ghJ27pIXlbEXyI=
github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps=
Expand Down
29 changes: 23 additions & 6 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package metrics

import (
"fmt"
"github.com/bnb-chain/greenfield-challenger/logging"
"net/http"
"time"

Expand Down Expand Up @@ -277,11 +278,17 @@ func (m *MetricService) IncHeartbeatEvents() {
m.MetricsMap[MetricHeartbeatEvents].(prometheus.Counter).Inc()
}

func (m *MetricService) IncHashVerifierErr() {
func (m *MetricService) IncHashVerifierErr(err error) {
if err != nil {
logging.Logger.Errorf("verifier error count increased, %s", err.Error())
}
m.MetricsMap[MetricHashVerifierErr].(prometheus.Counter).Inc()
}

func (m *MetricService) IncHashVerifierSpApiErr() {
func (m *MetricService) IncHashVerifierSpApiErr(err error) {
if err != nil {
logging.Logger.Errorf("verifier sp api error count increased, %s", err.Error())
}
m.MetricsMap[MetricSpAPIErr].(prometheus.Counter).Inc()
}

Expand All @@ -294,12 +301,16 @@ func (m *MetricService) SetBroadcasterDuration(duration time.Duration) {
m.MetricsMap[MetricBroadcasterDuration].(prometheus.Histogram).Observe(duration.Seconds())
}

func (m *MetricService) IncBroadcasterErr() {
func (m *MetricService) IncBroadcasterErr(err error) {
logging.Logger.Errorf("broadcaster error count increased, %s", err.Error())
m.MetricsMap[MetricBroadcasterErr].(prometheus.Counter).Inc()
}

// Vote Collector
func (m *MetricService) IncVoteCollectorErr() {
func (m *MetricService) IncVoteCollectorErr(err error) {
if err != nil {
logging.Logger.Errorf("vote collector error count increased, %s", err.Error())
}
m.MetricsMap[MetricsVoteCollectorErr].(prometheus.Counter).Inc()
}

Expand All @@ -316,7 +327,10 @@ func (m *MetricService) SetCollatorDuration(duration time.Duration) {
m.MetricsMap[MetricCollatorDuration].(prometheus.Histogram).Observe(duration.Seconds())
}

func (m *MetricService) IncCollatorErr() {
func (m *MetricService) IncCollatorErr(err error) {
if err != nil {
logging.Logger.Errorf("collator error count increased, %s", err.Error())
}
m.MetricsMap[MetricCollatorErr].(prometheus.Counter).Inc()
}

Expand All @@ -329,7 +343,10 @@ func (m *MetricService) SetSubmitterDuration(duration time.Duration) {
m.MetricsMap[MetricSubmitterDuration].(prometheus.Histogram).Observe(duration.Seconds())
}

func (m *MetricService) IncSubmitterErr() {
func (m *MetricService) IncSubmitterErr(err error) {
if err != nil {
logging.Logger.Errorf("submitter error count increased, %s", err.Error())
}
m.MetricsMap[MetricSubmitterErr].(prometheus.Counter).Inc()
}

Expand Down
6 changes: 3 additions & 3 deletions submitter/tx_submitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (s *TxSubmitter) SubmitTransactionLoop() {
currentHeight := s.executor.GetCachedBlockHeight()
events, err := s.FetchEventsForSubmit(currentHeight)
if err != nil {
s.metricService.IncSubmitterErr()
s.metricService.IncSubmitterErr(err)
logging.Logger.Errorf("tx submitter failed to fetch events for submitting", err)
continue
}
Expand Down Expand Up @@ -114,7 +114,7 @@ func (s *TxSubmitter) submitForSingleEvent(event *model.Event, attestPeriodEnd u
// Calculate event hash and use it to fetch votes and validator bitset
aggregatedSignature, valBitSet, err := s.getSignatureAndBitSet(event)
if err != nil {
s.metricService.IncSubmitterErr()
s.metricService.IncSubmitterErr(err)
return err
}
return s.submitTransactionLoop(event, attestPeriodEnd, aggregatedSignature, valBitSet)
Expand Down Expand Up @@ -161,7 +161,7 @@ func (s *TxSubmitter) submitTransactionLoop(event *model.Event, attestPeriodEnd
}

if submittedAttempts > common.MaxSubmitAttempts {
s.metricService.IncSubmitterErr()
//s.metricService.IncSubmitterErr(err)
return fmt.Errorf("submitter exceeded max submit attempts for challengeId: %d", event.ChallengeId)
}

Expand Down
16 changes: 10 additions & 6 deletions verifier/hash_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Verifier struct {
dataProvider DataProvider
limiterSemaphore *semaphore.Weighted
metricService *metrics.MetricService
wg sync.WaitGroup
}

func NewHashVerifier(cfg *config.Config, executor *executor.Executor, dataProvider DataProvider, metricService *metrics.MetricService,
Expand Down Expand Up @@ -64,13 +65,12 @@ func (v *Verifier) VerifyHashLoop() {
time.Sleep(VerifyHashLoopInterval)
}
}

func (v *Verifier) verifyHash() error {
// Read unprocessed event from db with lowest challengeId
currentHeight := v.executor.GetCachedBlockHeight()
events, err := v.dataProvider.FetchEventsForVerification(currentHeight)
if err != nil {
v.metricService.IncHashVerifierErr()
v.metricService.IncHashVerifierErr(err)
logging.Logger.Errorf("verifier failed to retrieve the earliest events from db to begin verification, err=%+v", err.Error())
return err
}
Expand Down Expand Up @@ -102,9 +102,10 @@ func (v *Verifier) verifyHash() error {
logging.Logger.Errorf("failed to acquire semaphore: %v", err)
continue
}
v.wg.Add(1)
go func(event *model.Event) {
defer v.limiterSemaphore.Release(1)
// Call verifyForSingleEvent inside the goroutine
defer v.wg.Done()
err = v.verifyForSingleEvent(event)
}(event)

Expand All @@ -124,6 +125,9 @@ func (v *Verifier) verifyHash() error {
v.mtx.Unlock()
}
}

v.wg.Wait()

return nil
}

Expand Down Expand Up @@ -165,10 +169,10 @@ func (v *Verifier) verifyForSingleEvent(event *model.Event) error {
return challengeResErr
}, retry.Context(context.Background()), common.RtyAttem, common.RtyDelay, common.RtyErr)
if challengeResErr != nil {
v.metricService.IncHashVerifierSpApiErr()
v.metricService.IncHashVerifierSpApiErr(err)
err = v.dataProvider.UpdateEventStatusVerifyResult(event.ChallengeId, model.Verified, model.HashMismatched)
if err != nil {
v.metricService.IncHashVerifierErr()
v.metricService.IncHashVerifierErr(err)
logging.Logger.Errorf("error updating event status for challengeId: %d", event.ChallengeId)
}
v.metricService.IncVerifiedChallenges()
Expand Down Expand Up @@ -199,7 +203,7 @@ func (v *Verifier) verifyForSingleEvent(event *model.Event) error {
if err != nil {
logging.Logger.Errorf("failed to update event status, challenge id: %d, err: %s",
event.ChallengeId, err)
v.metricService.IncHashVerifierErr()
v.metricService.IncHashVerifierErr(err)
return err
}
// Log duration
Expand Down
6 changes: 3 additions & 3 deletions vote/vote_broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (p *VoteBroadcaster) BroadcastVotesLoop() {
currentHeight := p.executor.GetCachedBlockHeight()
events, heartbeatEventCount, err := p.dataProvider.FetchEventsForSelfVote(currentHeight)
if err != nil {
p.metricService.IncBroadcasterErr()
p.metricService.IncBroadcasterErr(err)
logging.Logger.Errorf("vote processor failed to fetch unexpired events to collate votes, err=%+v", err.Error())
continue
}
Expand All @@ -71,7 +71,7 @@ func (p *VoteBroadcaster) BroadcastVotesLoop() {
if strings.Contains(err.Error(), "Duplicate") {
logging.Logger.Errorf("[non-blocking error] broadcaster was trying to save a duplicated vote after clearing cache for challengeId: %d, err=%+v", event.ChallengeId, err.Error())
} else {
p.metricService.IncBroadcasterErr()
p.metricService.IncBroadcasterErr(err)
logging.Logger.Errorf("broadcaster ran into error trying to construct vote for challengeId: %d, err=%+v", event.ChallengeId, err.Error())
continue
}
Expand All @@ -85,7 +85,7 @@ func (p *VoteBroadcaster) BroadcastVotesLoop() {

err = p.broadcastForSingleEvent(localVote.(*votepool.Vote), event)
if err != nil {
p.metricService.IncBroadcasterErr()
p.metricService.IncBroadcasterErr(err)
continue
}
time.Sleep(50 * time.Millisecond)
Expand Down
8 changes: 4 additions & 4 deletions vote/vote_collator.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (p *VoteCollator) CollateVotesLoop() {
events, err := p.dataProvider.FetchEventsForCollate(currentHeight)
logging.Logger.Infof("vote processor fetched %d events for collate", len(events))
if err != nil {
p.metricService.IncCollatorErr()
p.metricService.IncCollatorErr(err)
logging.Logger.Errorf("vote processor failed to fetch unexpired events to collate votes, err=%+v", err.Error())
time.Sleep(RetryInterval)
continue
Expand Down Expand Up @@ -77,7 +77,7 @@ func (p *VoteCollator) collateForSingleEvent(event *model.Event) error {
}
err = p.dataProvider.UpdateEventStatus(event.ChallengeId, model.EnoughVotesCollected)
if err != nil {
p.metricService.IncCollatorErr()
p.metricService.IncCollatorErr(err)
return err
}

Expand All @@ -93,7 +93,7 @@ func (p *VoteCollator) collateForSingleEvent(event *model.Event) error {
func (p *VoteCollator) prepareEnoughValidVotesForEvent(event *model.Event) error {
validators, err := p.executor.QueryCachedLatestValidators()
if err != nil {
p.metricService.IncCollatorErr()
p.metricService.IncCollatorErr(err)
return err
}
if len(validators) == 1 {
Expand Down Expand Up @@ -125,7 +125,7 @@ func (p *VoteCollator) queryMoreThanTwoThirdVotesForEvent(event *model.Event, va
eventHash := CalculateEventHash(event, p.config.GreenfieldConfig.ChainIdString)
queriedVotes, err := p.dataProvider.FetchVotesForCollate(hex.EncodeToString(eventHash))
if err != nil {
p.metricService.IncCollatorErr()
p.metricService.IncCollatorErr(err)
logging.Logger.Errorf("failed to query votes for event %d, err=%+v", event.ChallengeId, err.Error())
return err
}
Expand Down
8 changes: 4 additions & 4 deletions vote/vote_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (p *VoteCollector) collectVotes() error {
eventType := votepool.DataAvailabilityChallengeEvent
queriedVotes, err := p.executor.QueryVotes(eventType)
if err != nil {
p.metricService.IncVoteCollectorErr()
p.metricService.IncVoteCollectorErr(err)
logging.Logger.Errorf("vote collector failed to query votes, err=%+v", err.Error())
return err
}
Expand All @@ -62,15 +62,15 @@ func (p *VoteCollector) collectVotes() error {

validators, err := p.executor.QueryCachedLatestValidators()
if err != nil {
p.metricService.IncVoteCollectorErr()
p.metricService.IncVoteCollectorErr(err)
logging.Logger.Errorf("vote collector ran into error querying validators, err=%+v", err.Error())
return err
}

for _, v := range queriedVotes {
exists, err := p.dataProvider.IsVoteExists(hex.EncodeToString(v.EventHash), hex.EncodeToString(v.PubKey))
if err != nil {
p.metricService.IncVoteCollectorErr()
p.metricService.IncVoteCollectorErr(err)
logging.Logger.Errorf("vote collector ran into an error while checking if vote exists, err=%+v", err.Error())
continue
}
Expand All @@ -90,7 +90,7 @@ func (p *VoteCollector) collectVotes() error {

err = p.dataProvider.SaveVote(EntityToDto(v, uint64(0)))
if err != nil {
p.metricService.IncVoteCollectorErr()
p.metricService.IncVoteCollectorErr(err)
return err
}
logging.Logger.Infof("vote saved: %s", hex.EncodeToString(v.Signature))
Expand Down

0 comments on commit c447869

Please sign in to comment.