From 73bc4aa5f0fd4ffedc826ec04ba7d7f2624846df Mon Sep 17 00:00:00 2001 From: Aman Sanghi Date: Tue, 19 Nov 2024 20:05:04 +0530 Subject: [PATCH 1/2] block validator: allow large distance between outgoing validations --- staker/block_validator.go | 104 ++++++++++++++++++++++++++++++-------- 1 file changed, 83 insertions(+), 21 deletions(-) diff --git a/staker/block_validator.go b/staker/block_validator.go index 0a1a38ba17..32aeac6808 100644 --- a/staker/block_validator.go +++ b/staker/block_validator.go @@ -37,18 +37,19 @@ import ( ) var ( - validatorPendingValidationsGauge = metrics.NewRegisteredGauge("arb/validator/validations/pending", nil) - validatorValidValidationsCounter = metrics.NewRegisteredCounter("arb/validator/validations/valid", nil) - validatorFailedValidationsCounter = metrics.NewRegisteredCounter("arb/validator/validations/failed", nil) - validatorProfileWaitToRecordHist = metrics.NewRegisteredHistogram("arb/validator/profile/wait_to_record", nil, metrics.NewBoundedHistogramSample()) - validatorProfileRecordingHist = metrics.NewRegisteredHistogram("arb/validator/profile/recording", nil, metrics.NewBoundedHistogramSample()) - validatorProfileWaitToLaunchHist = metrics.NewRegisteredHistogram("arb/validator/profile/wait_to_launch", nil, metrics.NewBoundedHistogramSample()) - validatorProfileLaunchingHist = metrics.NewRegisteredHistogram("arb/validator/profile/launching", nil, metrics.NewBoundedHistogramSample()) - validatorProfileRunningHist = metrics.NewRegisteredHistogram("arb/validator/profile/running", nil, metrics.NewBoundedHistogramSample()) - validatorMsgCountCurrentBatch = metrics.NewRegisteredGauge("arb/validator/msg_count_current_batch", nil) - validatorMsgCountCreatedGauge = metrics.NewRegisteredGauge("arb/validator/msg_count_created", nil) - validatorMsgCountRecordSentGauge = metrics.NewRegisteredGauge("arb/validator/msg_count_record_sent", nil) - validatorMsgCountValidatedGauge = metrics.NewRegisteredGauge("arb/validator/msg_count_validated", nil) + validatorPendingValidationsGauge = metrics.NewRegisteredGauge("arb/validator/validations/pending", nil) + validatorValidValidationsCounter = metrics.NewRegisteredCounter("arb/validator/validations/valid", nil) + validatorFailedValidationsCounter = metrics.NewRegisteredCounter("arb/validator/validations/failed", nil) + validatorProfileWaitToRecordHist = metrics.NewRegisteredHistogram("arb/validator/profile/wait_to_record", nil, metrics.NewBoundedHistogramSample()) + validatorProfileRecordingHist = metrics.NewRegisteredHistogram("arb/validator/profile/recording", nil, metrics.NewBoundedHistogramSample()) + validatorProfileWaitToLaunchHist = metrics.NewRegisteredHistogram("arb/validator/profile/wait_to_launch", nil, metrics.NewBoundedHistogramSample()) + validatorProfileLaunchingHist = metrics.NewRegisteredHistogram("arb/validator/profile/launching", nil, metrics.NewBoundedHistogramSample()) + validatorProfileRunningHist = metrics.NewRegisteredHistogram("arb/validator/profile/running", nil, metrics.NewBoundedHistogramSample()) + validatorMsgCountCurrentBatch = metrics.NewRegisteredGauge("arb/validator/msg_count_current_batch", nil) + validatorMsgCountCreatedGauge = metrics.NewRegisteredGauge("arb/validator/msg_count_created", nil) + validatorMsgCountRecordSentGauge = metrics.NewRegisteredGauge("arb/validator/msg_count_record_sent", nil) + validatorMsgCountValidatedGauge = metrics.NewRegisteredGauge("arb/validator/msg_count_validated", nil) + validatorMsgCountLastValidationSentGauge = metrics.NewRegisteredGauge("arb/validator/msg_count_last_validation_sent", nil) ) type BlockValidator struct { @@ -77,10 +78,11 @@ type BlockValidator struct { // can be read (atomic.Load) by anyone holding reorg-read // written (atomic.Set) by appropriate thread or (any way) holding reorg-write - createdA atomic.Uint64 - recordSentA atomic.Uint64 - validatedA atomic.Uint64 - validations containers.SyncMap[arbutil.MessageIndex, *validationStatus] + createdA atomic.Uint64 + recordSentA atomic.Uint64 + validatedA atomic.Uint64 + lastValidationSentA atomic.Uint64 + validations containers.SyncMap[arbutil.MessageIndex, *validationStatus] config BlockValidatorConfigFetcher @@ -114,6 +116,7 @@ type BlockValidatorConfig struct { ValidationPoll time.Duration `koanf:"validation-poll" reload:"hot"` PrerecordedBlocks uint64 `koanf:"prerecorded-blocks" reload:"hot"` RecordingIterLimit uint64 `koanf:"recording-iter-limit"` + ValidationSentLimit uint64 `koanf:"validation-sent-limit"` ForwardBlocks uint64 `koanf:"forward-blocks" reload:"hot"` BatchCacheLimit uint32 `koanf:"batch-cache-limit"` CurrentModuleRoot string `koanf:"current-module-root"` // TODO(magic) requires reinitialization on hot reload @@ -188,6 +191,7 @@ func BlockValidatorConfigAddOptions(prefix string, f *pflag.FlagSet) { f.Uint32(prefix+".batch-cache-limit", DefaultBlockValidatorConfig.BatchCacheLimit, "limit number of old batches to keep in block-validator") f.String(prefix+".current-module-root", DefaultBlockValidatorConfig.CurrentModuleRoot, "current wasm module root ('current' read from chain, 'latest' from machines/latest dir, or provide hash)") f.Uint64(prefix+".recording-iter-limit", DefaultBlockValidatorConfig.RecordingIterLimit, "limit on block recordings sent per iteration") + f.Uint64(prefix+".validation-sent-limit", DefaultBlockValidatorConfig.ValidationSentLimit, "limit on block validations to keep in validation sent state") f.String(prefix+".pending-upgrade-module-root", DefaultBlockValidatorConfig.PendingUpgradeModuleRoot, "pending upgrade wasm module root to additionally validate (hash, 'latest' or empty)") f.Bool(prefix+".failure-is-fatal", DefaultBlockValidatorConfig.FailureIsFatal, "failing a validation is treated as a fatal error") BlockValidatorDangerousConfigAddOptions(prefix+".dangerous", f) @@ -215,6 +219,7 @@ var DefaultBlockValidatorConfig = BlockValidatorConfig{ BlockInputsFilePath: "./target/validation_inputs", MemoryFreeLimit: "default", RecordingIterLimit: 20, + ValidationSentLimit: 1024, } var TestBlockValidatorConfig = BlockValidatorConfig{ @@ -227,6 +232,7 @@ var TestBlockValidatorConfig = BlockValidatorConfig{ BatchCacheLimit: 20, PrerecordedBlocks: uint64(2 * runtime.NumCPU()), RecordingIterLimit: 20, + ValidationSentLimit: 1024, CurrentModuleRoot: "latest", PendingUpgradeModuleRoot: "latest", FailureIsFatal: true, @@ -364,6 +370,10 @@ func (v *BlockValidator) validated() arbutil.MessageIndex { return atomicLoadPos(&v.validatedA) } +func (v *BlockValidator) lastValidationSent() arbutil.MessageIndex { + return atomicLoadPos(&v.lastValidationSentA) +} + func (v *BlockValidator) Validated(t *testing.T) arbutil.MessageIndex { return v.validated() } @@ -572,7 +582,7 @@ func (v *BlockValidator) createNextValidationEntry(ctx context.Context) (bool, e v.reorgMutex.RLock() defer v.reorgMutex.RUnlock() pos := v.created() - if pos > v.validated()+arbutil.MessageIndex(v.config().ForwardBlocks) { + if pos > v.recordSent()+arbutil.MessageIndex(v.config().ForwardBlocks) { log.Trace("create validation entry: nothing to do", "pos", pos, "validated", v.validated()) return false, nil } @@ -699,10 +709,10 @@ func (v *BlockValidator) sendNextRecordRequests(ctx context.Context) (bool, erro v.reorgMutex.RLock() pos := v.recordSent() created := v.created() - validated := v.validated() + validationSent := v.lastValidationSent() v.reorgMutex.RUnlock() - recordUntil := validated + arbutil.MessageIndex(v.config().PrerecordedBlocks) - 1 + recordUntil := validationSent + arbutil.MessageIndex(v.config().PrerecordedBlocks) - 1 if recordUntil > created-1 { recordUntil = created - 1 } @@ -800,7 +810,6 @@ func (v *BlockValidator) advanceValidations(ctx context.Context) (*arbutil.Messa v.reorgMutex.RLock() defer v.reorgMutex.RUnlock() - wasmRoots := v.GetModuleRootsToValidate() pos := v.validated() - 1 // to reverse the first +1 in the loop validationsLoop: for { @@ -866,7 +875,38 @@ validationsLoop: nonBlockingTrigger(v.testingProgressMadeChan) } log.Trace("result validated", "count", v.validated(), "blockHash", v.lastValidGS.BlockHash) - continue + } + } +} + +// return val: +// *MessageIndex - pointer to bad entry if there is one (requires reorg) +func (v *BlockValidator) sendValidations(ctx context.Context) (*arbutil.MessageIndex, error) { + v.reorgMutex.RLock() + defer v.reorgMutex.RUnlock() + + wasmRoots := v.GetModuleRootsToValidate() + pos := v.lastValidationSent() + sendValidationUntil := v.validated() + arbutil.MessageIndex(v.config().ValidationSentLimit) + for pos <= sendValidationUntil { + if ctx.Err() != nil { + return nil, ctx.Err() + } + v.reorgMutex.RUnlock() + v.reorgMutex.RLock() + if pos >= v.recordSent() { + log.Trace("advanceValidations: nothing to validate", "pos", pos) + return nil, nil + } + validationStatus, found := v.validations.Load(pos) + if !found { + return nil, fmt.Errorf("not found entry for pos %d", pos) + } + currentStatus := validationStatus.getStatus() + if currentStatus == RecordFailed { + // retry + log.Warn("Recording for validation failed, retrying..", "pos", pos) + return &pos, nil } for _, moduleRoot := range wasmRoots { spawner := v.chosenValidator[moduleRoot] @@ -931,7 +971,11 @@ validationsLoop: nonBlockingTrigger(v.progressValidationsChan) }) } + pos += 1 + atomicStorePos(&v.lastValidationSentA, pos, validatorMsgCountLastValidationSentGauge) + log.Trace("validation sent", "pos", pos) } + return nil, nil } func (v *BlockValidator) iterativeValidationProgress(ctx context.Context, ignored struct{}) time.Duration { @@ -948,6 +992,20 @@ func (v *BlockValidator) iterativeValidationProgress(ctx context.Context, ignore return v.config().ValidationPoll } +func (v *BlockValidator) iterativeValidationSentProgress(ctx context.Context, ignored struct{}) time.Duration { + reorg, err := v.sendValidations(ctx) + if err != nil { + log.Error("error trying to send validation node", "err", err) + } else if reorg != nil { + err := v.Reorg(ctx, *reorg) + if err != nil { + log.Error("error trying to reorg validation", "pos", *reorg-1, "err", err) + v.possiblyFatal(err) + } + } + return v.config().ValidationPoll +} + var ErrValidationCanceled = errors.New("validation of block cancelled") func (v *BlockValidator) writeLastValidated(gs validator.GoGlobalState, wasmRoots []common.Hash) error { @@ -1354,6 +1412,10 @@ func (v *BlockValidator) LaunchWorkthreadsWhenCaughtUp(ctx context.Context) { if err != nil { v.possiblyFatal(err) } + err = stopwaiter.CallIterativelyWith[struct{}](&v.StopWaiterSafe, v.iterativeValidationSentProgress, v.progressValidationsChan) + if err != nil { + v.possiblyFatal(err) + } err = stopwaiter.CallIterativelyWith[struct{}](&v.StopWaiterSafe, v.iterativeValidationProgress, v.progressValidationsChan) if err != nil { v.possiblyFatal(err) From 13ae07956e1080751d07f14f2bf32630d123cee7 Mon Sep 17 00:00:00 2001 From: Aman Sanghi Date: Tue, 19 Nov 2024 21:06:18 +0530 Subject: [PATCH 2/2] minor fixes --- staker/block_validator.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/staker/block_validator.go b/staker/block_validator.go index 32aeac6808..50ae290a23 100644 --- a/staker/block_validator.go +++ b/staker/block_validator.go @@ -1117,6 +1117,10 @@ func (v *BlockValidator) UpdateLatestStaked(count arbutil.MessageIndex, globalSt if v.recordSentA.Load() < countUint64 { v.recordSentA.Store(countUint64) } + + if v.lastValidationSentA.Load() < countUint64 { + v.lastValidationSentA.Store(countUint64) + } // #nosec G115 v.validatedA.Store(countUint64) v.valLoopPos = count @@ -1185,6 +1189,11 @@ func (v *BlockValidator) Reorg(ctx context.Context, count arbutil.MessageIndex) if v.recordSentA.Load() > countUint64 { v.recordSentA.Store(countUint64) } + if v.lastValidationSentA.Load() > countUint64 { + v.lastValidationSentA.Store(countUint64) + // #nosec G115 + validatorMsgCountLastValidationSentGauge.Update(int64(countUint64)) + } if v.validatedA.Load() > countUint64 { v.validatedA.Store(countUint64) // #nosec G115 @@ -1379,6 +1388,7 @@ func (v *BlockValidator) checkValidatedGSCaughtUp() (bool, error) { atomicStorePos(&v.createdA, count, validatorMsgCountCreatedGauge) atomicStorePos(&v.recordSentA, count, validatorMsgCountRecordSentGauge) atomicStorePos(&v.validatedA, count, validatorMsgCountValidatedGauge) + atomicStorePos(&v.lastValidationSentA, count, validatorMsgCountLastValidationSentGauge) // #nosec G115 validatorMsgCountValidatedGauge.Update(int64(count)) v.chainCaughtUp = true