From 48ff5b3d05800f49116ab188fe311d366b548b82 Mon Sep 17 00:00:00 2001 From: randyahx <62416962+randyahx@users.noreply.github.com> Date: Wed, 1 Nov 2023 16:17:13 +0800 Subject: [PATCH] feat: add sp metrics for internal/external sp (#101) --- config/config.go | 5 ++++ config/config.json | 11 +++++++++ executor/executor.go | 3 +-- metrics/metrics.go | 39 ++++++++++++++++++++---------- submitter/tx_submitter.go | 2 ++ verifier/const.go | 1 + verifier/hash_verifier.go | 50 +++++++++++++++++++++++++++++++++------ 7 files changed, 90 insertions(+), 21 deletions(-) diff --git a/config/config.go b/config/config.go index e42c113..171da7b 100644 --- a/config/config.go +++ b/config/config.go @@ -14,6 +14,7 @@ type Config struct { AlertConfig AlertConfig `json:"alert_config"` DBConfig DBConfig `json:"db_config"` MetricsConfig MetricsConfig `json:"metrics_config"` + SPConfig SPConfig `json:"sp_config"` } type GreenfieldConfig struct { @@ -125,6 +126,10 @@ func (cfg *DBConfig) Validate() { } } +type SPConfig struct { + InternalSPEndpoints []string `json:"internal_sp_endpoints"` +} + type MetricsConfig struct { Port uint16 `json:"port"` } diff --git a/config/config.json b/config/config.json index 9343702..1c37bc8 100644 --- a/config/config.json +++ b/config/config.json @@ -39,6 +39,17 @@ "max_open_conns": 40, "debug_mode": true }, + "sp_config": { + "internal_sp_endpoints": [ + "https://greenfield-sp.bnbchain.org", + "https://greenfield-sp.nodereal.io", + "https://greenfield-sp.ninicoin.io", + "https://greenfield-sp.defibit.io", + "https://greenfield-sp.nariox.org", + "https://greenfield-sp.lumibot.org", + "https://greenfield-sp.voltbot.io" + ] + }, "alert_config": { "interval": 300, "identity": "your_identity", diff --git a/executor/executor.go b/executor/executor.go index 6412bd7..5ad7a87 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -301,7 +301,7 @@ func (e *Executor) QueryChallengeSlashCoolingOffPeriod() (uint64, error) { client := e.clients.GetClient() params, err := client.ChallengeParams(context.Background(), &challengetypes.QueryParamsRequest{}) if err != nil { - logging.Logger.Errorf("query challenge params failed, err=%+v", err.Error()) + logging.Logger.Errorf("query slash cooling off period failed, err=%+v", err.Error()) return 0, err } logging.Logger.Infof("challenge slash cooling off period: %d", params.Params.SlashCoolingOffPeriod) @@ -345,7 +345,6 @@ func (e *Executor) GetStorageProviderEndpoint(address string) (string, error) { logging.Logger.Errorf("executor failed to query storage provider %s, err=%+v", address, err.Error()) return "", err } - logging.Logger.Infof("response res.endpoint %s", res.Endpoint) return res.Endpoint, nil } diff --git a/metrics/metrics.go b/metrics/metrics.go index f660fd2..88a362e 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -24,7 +24,8 @@ const ( MetricVerifiedChallengeSuccess = "challenge_success" MetricHeartbeatEvents = "heartbeat_events" MetricHashVerifierErr = "hash_verifier_error_count" - MetricSpAPIErr = "hash_verifier_sp_api_error" + MetricInternalSpAPIErr = "hash_verifier_internal_sp_api_error" + MetricExternalSpAPIErr = "hash_verifier_external_sp_api_error" MetricHashVerifierDuration = "hash_verifier_duration" // Vote Broadcaster @@ -125,17 +126,24 @@ func NewMetricService(config *config.Config) *MetricService { hashVerifierErrCountMetric := prometheus.NewCounter(prometheus.CounterOpts{ Name: MetricHashVerifierErr, - Help: "Hash verifier error count", + Help: "Verifier error count", }) ms[MetricHashVerifierErr] = hashVerifierErrCountMetric prometheus.MustRegister(hashVerifierErrCountMetric) - hashVerifierSpApiErrCountMetric := prometheus.NewCounter(prometheus.CounterOpts{ - Name: MetricSpAPIErr, - Help: "Hash verifier SP API error count", + hashVerifierInternalSpApiErrCountMetric := prometheus.NewCounter(prometheus.CounterOpts{ + Name: MetricInternalSpAPIErr, + Help: "Internal sp error count", }) - ms[MetricSpAPIErr] = hashVerifierSpApiErrCountMetric - prometheus.MustRegister(hashVerifierSpApiErrCountMetric) + ms[MetricInternalSpAPIErr] = hashVerifierInternalSpApiErrCountMetric + prometheus.MustRegister(hashVerifierInternalSpApiErrCountMetric) + + hashVerifierExternalSpApiErrCountMetric := prometheus.NewCounter(prometheus.CounterOpts{ + Name: MetricExternalSpAPIErr, + Help: "External sp error count", + }) + ms[MetricExternalSpAPIErr] = hashVerifierExternalSpApiErrCountMetric + prometheus.MustRegister(hashVerifierExternalSpApiErrCountMetric) // Broadcaster broadcasterErrCountMetric := prometheus.NewCounter(prometheus.CounterOpts{ @@ -281,15 +289,22 @@ func (m *MetricService) IncHeartbeatEvents() { func (m *MetricService) IncHashVerifierErr(err error) { if err != nil { logging.Logger.Errorf("verifier error count increased, %s", err.Error()) + m.MetricsMap[MetricHashVerifierErr].(prometheus.Counter).Inc() + } +} + +func (m *MetricService) IncHashVerifierInternalSpApiErr(err error) { + if err != nil { + logging.Logger.Errorf("verifier internal sp error count increased, %s", err.Error()) + m.MetricsMap[MetricInternalSpAPIErr].(prometheus.Counter).Inc() } - m.MetricsMap[MetricHashVerifierErr].(prometheus.Counter).Inc() } -func (m *MetricService) IncHashVerifierSpApiErr(err error) { +func (m *MetricService) IncHashVerifierExternalSpApiErr(err error) { if err != nil { - logging.Logger.Errorf("verifier sp api error count increased, %s", err.Error()) + logging.Logger.Errorf("verifier external sp error count increased, %s", err.Error()) + m.MetricsMap[MetricExternalSpAPIErr].(prometheus.Counter).Inc() } - m.MetricsMap[MetricSpAPIErr].(prometheus.Counter).Inc() } // Broadcaster @@ -346,8 +361,8 @@ func (m *MetricService) SetSubmitterDuration(duration time.Duration) { func (m *MetricService) IncSubmitterErr(err error) { if err != nil { logging.Logger.Errorf("submitter error count increased, %s", err.Error()) + m.MetricsMap[MetricSubmitterErr].(prometheus.Counter).Inc() } - m.MetricsMap[MetricSubmitterErr].(prometheus.Counter).Inc() } // Attest Monitor diff --git a/submitter/tx_submitter.go b/submitter/tx_submitter.go index ae113b1..f178ef8 100644 --- a/submitter/tx_submitter.go +++ b/submitter/tx_submitter.go @@ -206,6 +206,7 @@ func (s *TxSubmitter) submitTransactionLoop(event *model.Event, attestPeriodEnd } return err } + s.metricService.IncSubmitterErr(err) } else { logging.Logger.Errorf("submitter failed for challengeId: %d, attempts: %d", event.ChallengeId, submittedAttempts) } @@ -217,6 +218,7 @@ func (s *TxSubmitter) submitTransactionLoop(event *model.Event, attestPeriodEnd err = s.DataProvider.UpdateEventStatus(event.ChallengeId, model.Submitted) if err != nil { logging.Logger.Errorf("submitter succeeded in attesting but failed to update database, err=%+v", err.Error()) + s.metricService.IncSubmitterErr(err) continue } diff --git a/verifier/const.go b/verifier/const.go index 25db383..1fbe6db 100644 --- a/verifier/const.go +++ b/verifier/const.go @@ -3,3 +3,4 @@ package verifier import "time" var VerifyHashLoopInterval = 2 * time.Second +var UpdateDeduplicationInterval = 24 * time.Hour diff --git a/verifier/hash_verifier.go b/verifier/hash_verifier.go index bcd5ea5..38ac367 100644 --- a/verifier/hash_verifier.go +++ b/verifier/hash_verifier.go @@ -59,6 +59,7 @@ func NewHashVerifier(cfg *config.Config, executor *executor.Executor, dataProvid } func (v *Verifier) VerifyHashLoop() { + go v.updateDeduplicationIntervalLoop() for { err := v.verifyHash() if err != nil { @@ -68,6 +69,7 @@ func (v *Verifier) VerifyHashLoop() { time.Sleep(VerifyHashLoopInterval) } } + func (v *Verifier) verifyHash() error { // Read unprocessed event from db with lowest challengeId currentHeight, err := v.executor.GetCachedBlockHeight() @@ -194,7 +196,7 @@ func (v *Verifier) verifyForSingleEvent(event *model.Event) error { return err } chainRootHash := checksums[event.RedundancyIndex+1] - logging.Logger.Infof("chainRootHash: %s for challengeId: %d", hex.EncodeToString(chainRootHash), event.ChallengeId) + logging.Logger.Infof("fetched chainRootHash: %s for challengeId: %d", hex.EncodeToString(chainRootHash), event.ChallengeId) // Call sp for challenge result challengeRes := &types.ChallengeResult{} @@ -207,14 +209,19 @@ func (v *Verifier) verifyForSingleEvent(event *model.Event) error { return challengeResErr }, retry.Context(context.Background()), common.RtyAttem, common.RtyDelay, common.RtyErr) if challengeResErr != nil { - v.metricService.IncHashVerifierSpApiErr(err) + if v.isInternalSP(endpoint) { + v.metricService.IncHashVerifierInternalSpApiErr(challengeResErr) + logging.Logger.Infof("challenge succeeded due to internal sp api error for challengeId: %d, failed to fetch challenge result from sp endpoint: %s, objectID: %d, segmentIndex: %d, redundancyIndex: %d, err=%s", event.ChallengeId, endpoint, event.ObjectId, int(event.SegmentIndex), int(event.RedundancyIndex), challengeResErr) + } else { + v.metricService.IncHashVerifierExternalSpApiErr(challengeResErr) + logging.Logger.Infof("challenge succeeded due to external sp api error for challengeId: %d, failed to fetch challenge result from sp endpoint: %s, objectID: %d, segmentIndex: %d, redundancyIndex: %d, err=%s", event.ChallengeId, endpoint, event.ObjectId, int(event.SegmentIndex), int(event.RedundancyIndex), challengeResErr) + } err = v.dataProvider.UpdateEventStatusVerifyResult(event.ChallengeId, model.Verified, model.HashMismatched) if err != nil { v.metricService.IncHashVerifierErr(err) logging.Logger.Errorf("error updating event status for challengeId: %d", event.ChallengeId) } v.metricService.IncVerifiedChallenges() - v.metricService.IncChallengeSuccess() return err } @@ -233,9 +240,8 @@ func (v *Verifier) verifyForSingleEvent(event *model.Event) error { spChecksums = append(spChecksums, checksum) } originalSpRootHash := hash.GenerateChecksum(bytes.Join(spChecksums, []byte(""))) - logging.Logger.Infof("SpRootHash before replacing: %s for challengeId: %d", hex.EncodeToString(originalSpRootHash), event.ChallengeId) spRootHash := v.computeRootHash(event.SegmentIndex, pieceData, spChecksums) - logging.Logger.Infof("SpRootHash after replacing: %s for challengeId: %d", hex.EncodeToString(spRootHash), event.ChallengeId) + logging.Logger.Infof("hash verification for challengeId: %d, Fetched Original SpRootHash: %s, Locally Computed SpRootHash: %s, Fetched ChainRootHash: %s", event.ChallengeId, hex.EncodeToString(originalSpRootHash), hex.EncodeToString(spRootHash), hex.EncodeToString(chainRootHash)) // Update database after comparing err = v.compareHashAndUpdate(event.ChallengeId, chainRootHash, spRootHash) if err != nil { @@ -267,9 +273,14 @@ func (v *Verifier) preCheck(event *model.Event, currentHeight uint64) error { if heartbeatInterval == 0 { panic("heartbeat interval should not zero, potential bug") } - if event.ChallengerAddress == "" && event.ChallengeId%heartbeatInterval != 0 && event.ChallengeId > v.deduplicationInterval { + + v.mtx.Lock() + deduplicationInterval := v.deduplicationInterval + v.mtx.Unlock() + + if event.ChallengerAddress == "" && event.ChallengeId%heartbeatInterval != 0 && event.ChallengeId > deduplicationInterval { found, err := v.dataProvider.IsEventExistsBetween(event.ObjectId, event.SpOperatorAddress, - event.ChallengeId-v.deduplicationInterval, event.ChallengeId-1) + event.ChallengeId-deduplicationInterval, event.ChallengeId-1) if err != nil { logging.Logger.Errorf("verifier failed to retrieve information for event %d, err=%+v", event.ChallengeId, err.Error()) return err @@ -298,6 +309,7 @@ func (v *Verifier) compareHashAndUpdate(challengeId uint64, chainRootHash []byte return err } // update metrics if no err + logging.Logger.Infof("challenge failed for challengeId: %d, hash matched", challengeId) v.metricService.IncVerifiedChallenges() v.metricService.IncChallengeFailed() return err @@ -307,7 +319,31 @@ func (v *Verifier) compareHashAndUpdate(challengeId uint64, chainRootHash []byte return err } // update metrics if no err + logging.Logger.Infof("challenge succeeded for challengeId: %d, hash mismatched", challengeId) v.metricService.IncVerifiedChallenges() v.metricService.IncChallengeSuccess() return err } + +func (v *Verifier) isInternalSP(spEndpoint string) bool { + for _, internalEndpoint := range v.config.SPConfig.InternalSPEndpoints { + if strings.Contains(spEndpoint, internalEndpoint) { + return true + } + } + return false +} + +func (v *Verifier) updateDeduplicationIntervalLoop() { + ticker := time.NewTicker(UpdateDeduplicationInterval) + for range ticker.C { + updatedDeduplicationInterval, err := v.executor.QueryChallengeSlashCoolingOffPeriod() + if err != nil { + logging.Logger.Errorf("error updating deduplication interval, err=%s", err.Error()) + return + } + v.mtx.Lock() + v.deduplicationInterval = updatedDeduplicationInterval + v.mtx.Unlock() + } +}