Skip to content

Commit

Permalink
fix history sync failure due to batches (#245)
Browse files Browse the repository at this point in the history
  • Loading branch information
amirylm authored Aug 10, 2021
1 parent 0b18781 commit faa4071
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 13 deletions.
16 changes: 11 additions & 5 deletions ibft/sync/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,14 +232,19 @@ func (s *HistorySync) fetchValidateAndSaveInstances(fromPeer string, startSeq ui
foundSeqs[msg.Message.SeqNumber] = msg
}

msgCount := len(res.SignedMessages)
// validate and save
for i := start; i <= endSeq; i++ {
msg, found := foundSeqs[i]
if !found {
failCount++
latestError = errors.Errorf("returned decided by range messages miss sequence number %d", i)
s.logger.Debug("decided by range messages miss sequence number",
zap.Uint64("seq", i), zap.Int("msgCount", msgCount))
break
}
// counting all the messages that were visited
msgCount--
// if msg is invalid, break and try again with an updated start seq
if s.validateDecidedMsgF(msg) != nil {
start = msg.Message.SeqNumber
Expand All @@ -252,10 +257,7 @@ func (s *HistorySync) fetchValidateAndSaveInstances(fromPeer string, startSeq ui
}

// set highest
if highestSaved == nil {
highestSaved = msg
}
if highestSaved.Message.SeqNumber < msg.Message.SeqNumber {
if highestSaved == nil || highestSaved.Message.SeqNumber < msg.Message.SeqNumber {
highestSaved = msg
}

Expand All @@ -264,7 +266,11 @@ func (s *HistorySync) fetchValidateAndSaveInstances(fromPeer string, startSeq ui
if msg.Message.SeqNumber == endSeq {
done = true
}
// if the current messages batch was processed -> break loop and start the next batch
if msgCount == 0 {
break
}
}
s.logger.Info(fmt.Sprintf("fetched and saved instances up to sequence number %d", endSeq))
s.logger.Info(fmt.Sprintf("fetched and saved instances up to sequence number %d", start - 1))
}
}
19 changes: 11 additions & 8 deletions validator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type ControllerOptions struct {
// IController interface
type IController interface {
ListenToEth1Events(cn pubsub.SubjectChannel)
StartValidators() map[string]*Validator
StartValidators()
GetValidatorsPubKeys() [][]byte
GetValidatorsIndices() []spec.ValidatorIndex
GetValidator(pubKey string) (*Validator, bool)
Expand Down Expand Up @@ -109,10 +109,15 @@ func (c *controller) setupValidators() map[string]*Validator {
} else {
c.logger.Info("starting validators setup...")
}
res := make(map[string]*Validator)
for _, validatorShare := range validatorsShare {
pubKey := validatorShare.PublicKey.SerializeToHexStr()
if _, ok := c.validatorsMap[pubKey]; ok {
c.logger.Debug("validator was initialized already..",
zap.String("pubKey", validatorShare.PublicKey.SerializeToHexStr()))
continue
}
printValidatorShare(c.logger, validatorShare)
res[validatorShare.PublicKey.SerializeToHexStr()] = New(Options{
c.validatorsMap[pubKey] = New(Options{
Context: c.context,
SignatureCollectionTimeout: c.signatureCollectionTimeout,
Logger: c.logger,
Expand All @@ -122,21 +127,19 @@ func (c *controller) setupValidators() map[string]*Validator {
Beacon: c.beacon,
}, c.db)
}
c.validatorsMap = res
c.logger.Info("setup validators done successfully", zap.Int("count", len(res)))
return res
c.logger.Info("setup validators done successfully", zap.Int("count", len(c.validatorsMap)))
return c.validatorsMap
}

// StartValidators functions (queue streaming, msgQueue listen, etc)
func (c *controller) StartValidators() map[string]*Validator {
func (c *controller) StartValidators() {
validators := c.setupValidators()
for _, v := range validators {
if err := v.Start(); err != nil {
c.logger.Error("failed to start validator", zap.Error(err))
continue
}
}
return validators
}

// GetValidatorsPubKeys returns a list of all the validators public keys
Expand Down

0 comments on commit faa4071

Please sign in to comment.