From faa40718bb34df3a4404ffe40445b947bf4d5e80 Mon Sep 17 00:00:00 2001 From: amir-blox <83904651+amir-blox@users.noreply.github.com> Date: Tue, 10 Aug 2021 19:37:02 +0300 Subject: [PATCH] fix history sync failure due to batches (#245) --- ibft/sync/history.go | 16 +++++++++++----- validator/controller.go | 19 +++++++++++-------- 2 files changed, 22 insertions(+), 13 deletions(-) diff --git a/ibft/sync/history.go b/ibft/sync/history.go index f67b708280..e0d1c65ff6 100644 --- a/ibft/sync/history.go +++ b/ibft/sync/history.go @@ -232,14 +232,19 @@ func (s *HistorySync) fetchValidateAndSaveInstances(fromPeer string, startSeq ui foundSeqs[msg.Message.SeqNumber] = msg } + msgCount := len(res.SignedMessages) // validate and save for i := start; i <= endSeq; i++ { msg, found := foundSeqs[i] if !found { failCount++ latestError = errors.Errorf("returned decided by range messages miss sequence number %d", i) + s.logger.Debug("decided by range messages miss sequence number", + zap.Uint64("seq", i), zap.Int("msgCount", msgCount)) break } + // counting all the messages that were visited + msgCount-- // if msg is invalid, break and try again with an updated start seq if s.validateDecidedMsgF(msg) != nil { start = msg.Message.SeqNumber @@ -252,10 +257,7 @@ func (s *HistorySync) fetchValidateAndSaveInstances(fromPeer string, startSeq ui } // set highest - if highestSaved == nil { - highestSaved = msg - } - if highestSaved.Message.SeqNumber < msg.Message.SeqNumber { + if highestSaved == nil || highestSaved.Message.SeqNumber < msg.Message.SeqNumber { highestSaved = msg } @@ -264,7 +266,11 @@ func (s *HistorySync) fetchValidateAndSaveInstances(fromPeer string, startSeq ui if msg.Message.SeqNumber == endSeq { done = true } + // if the current messages batch was processed -> break loop and start the next batch + if msgCount == 0 { + break + } } - s.logger.Info(fmt.Sprintf("fetched and saved instances up to sequence number %d", endSeq)) + s.logger.Info(fmt.Sprintf("fetched and saved instances up to sequence number %d", start - 1)) } } diff --git a/validator/controller.go b/validator/controller.go index fa71d22e8b..953e50e46e 100644 --- a/validator/controller.go +++ b/validator/controller.go @@ -35,7 +35,7 @@ type ControllerOptions struct { // IController interface type IController interface { ListenToEth1Events(cn pubsub.SubjectChannel) - StartValidators() map[string]*Validator + StartValidators() GetValidatorsPubKeys() [][]byte GetValidatorsIndices() []spec.ValidatorIndex GetValidator(pubKey string) (*Validator, bool) @@ -109,10 +109,15 @@ func (c *controller) setupValidators() map[string]*Validator { } else { c.logger.Info("starting validators setup...") } - res := make(map[string]*Validator) for _, validatorShare := range validatorsShare { + pubKey := validatorShare.PublicKey.SerializeToHexStr() + if _, ok := c.validatorsMap[pubKey]; ok { + c.logger.Debug("validator was initialized already..", + zap.String("pubKey", validatorShare.PublicKey.SerializeToHexStr())) + continue + } printValidatorShare(c.logger, validatorShare) - res[validatorShare.PublicKey.SerializeToHexStr()] = New(Options{ + c.validatorsMap[pubKey] = New(Options{ Context: c.context, SignatureCollectionTimeout: c.signatureCollectionTimeout, Logger: c.logger, @@ -122,13 +127,12 @@ func (c *controller) setupValidators() map[string]*Validator { Beacon: c.beacon, }, c.db) } - c.validatorsMap = res - c.logger.Info("setup validators done successfully", zap.Int("count", len(res))) - return res + c.logger.Info("setup validators done successfully", zap.Int("count", len(c.validatorsMap))) + return c.validatorsMap } // StartValidators functions (queue streaming, msgQueue listen, etc) -func (c *controller) StartValidators() map[string]*Validator { +func (c *controller) StartValidators() { validators := c.setupValidators() for _, v := range validators { if err := v.Start(); err != nil { @@ -136,7 +140,6 @@ func (c *controller) StartValidators() map[string]*Validator { continue } } - return validators } // GetValidatorsPubKeys returns a list of all the validators public keys