Skip to content

Commit

Permalink
fix(dot/epoch): Resume node with correct previous next epoch data a…
Browse files Browse the repository at this point in the history
…nd `config data` (#4105)
  • Loading branch information
ramiroJCB authored Oct 1, 2024
1 parent dbe6858 commit eb0b5f0
Show file tree
Hide file tree
Showing 5 changed files with 515 additions and 25 deletions.
203 changes: 193 additions & 10 deletions dot/state/epoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"encoding/binary"
"errors"
"fmt"
"strconv"
"strings"
"sync"
"time"

Expand All @@ -28,11 +30,13 @@ var (
)

var (
epochPrefix = "epoch"
currentEpochKey = []byte("current")
epochDataPrefix = []byte("epochinfo")
configDataPrefix = []byte("configinfo")
skipToKey = []byte("skipto")
epochPrefix = "epoch"
currentEpochKey = []byte("current")
epochDataPrefix = []byte("epochinfo")
configDataPrefix = []byte("configinfo")
skipToKey = []byte("skipto")
nextEpochDataPrefix = []byte("nextepochdata")
nextConfigDataPrefix = []byte("nextconfigdata")
)

func epochDataKey(epoch uint64) []byte {
Expand All @@ -47,6 +51,16 @@ func configDataKey(epoch uint64) []byte {
return append(configDataPrefix, buf...)
}

func nextEpochDataKey(epoch uint64, hash common.Hash) []byte {
partialKey := fmt.Sprintf("%d:%s", epoch, hash.String())
return append(nextEpochDataPrefix, []byte(partialKey)...)
}

func nextConfigDataKey(epoch uint64, hash common.Hash) []byte {
partialKey := fmt.Sprintf("%d:%s", epoch, hash.String())
return append(nextConfigDataPrefix, []byte(partialKey)...)
}

// GenesisEpochDescriptor is the informations provided by calling
// the genesis WASM runtime exported function `BabeAPIConfiguration`
type GenesisEpochDescriptor struct {
Expand All @@ -56,7 +70,7 @@ type GenesisEpochDescriptor struct {

// EpochState tracks information related to each epoch
type EpochState struct {
db GetterPutterNewBatcher
db database.Table
baseState *BaseState
blockState *BlockState
epochLength uint64 // measured in slots
Expand Down Expand Up @@ -127,16 +141,26 @@ func NewEpochState(db database.Database, blockState *BlockState,
if err != nil {
return nil, err
}
epochTable := database.NewTable(db, epochPrefix)
nextEpochData, err := restoreMapFromDisk[types.NextEpochData](epochTable, nextEpochDataPrefix)
if err != nil {
return nil, err
}

nextConfigData, err := restoreMapFromDisk[types.NextConfigDataV1](epochTable, nextConfigDataPrefix)
if err != nil {
return nil, err
}

return &EpochState{
baseState: baseState,
blockState: blockState,
db: database.NewTable(db, epochPrefix),
db: epochTable,
epochLength: genesisConfig.EpochLength,
slotDuration: genesisConfig.SlotDuration,
skipToEpoch: skipToEpoch,
nextEpochData: make(nextEpochMap[types.NextEpochData]),
nextConfigData: make(nextEpochMap[types.NextConfigDataV1]),
nextEpochData: nextEpochData,
nextConfigData: nextConfigData,
genesisEpochDescriptor: &GenesisEpochDescriptor{
EpochData: &types.EpochDataRaw{
Authorities: genesisConfig.GenesisAuthorities,
Expand All @@ -151,6 +175,72 @@ func NewEpochState(db database.Database, blockState *BlockState,
}, nil
}

// restoreMapFromDisk retrieves the next epoch and config data maps from the database
func restoreMapFromDisk[T types.NextConfigDataV1 | types.NextEpochData](db database.Table, prefix []byte) (
nextEpochMap[T], error) {

resMap := make(nextEpochMap[T])
iter, err := db.NewPrefixIterator(prefix)
if err != nil {
return resMap, err
}

defer iter.Release()

for iter.First(); iter.Valid(); iter.Next() {
mapValue, epoch, fork, err := getNextEpochOrConfigData[T](iter, prefix)

if err != nil {
return resMap, err
}

if _, ok := resMap[epoch]; !ok {
resMap[epoch] = make(map[common.Hash]T)
}

resMap[epoch][fork] = *mapValue
}

if err = iter.Close(); err != nil {
return resMap, err
}
return resMap, nil
}

// getNextEpochOrConfigData retrieves the next epoch or config data from the iterator
func getNextEpochOrConfigData[T types.NextConfigDataV1 | types.NextEpochData](iter database.Iterator, prefix []byte) (
*T, uint64, common.Hash, error) {
nextData := new(T)
key := string(iter.Key())
value := iter.Value()

keyWithoutPrefix := strings.Split(key, string(prefix))[1]

// Split the key into epoch and fork
parts := strings.Split(keyWithoutPrefix, ":")
if len(parts) != 2 {
return nil, 0, common.Hash{}, fmt.Errorf("invalid key format: %s", key)
}
epoch, err := strconv.ParseUint(parts[0], 10, 64)
if err != nil {
return nil, 0, common.Hash{}, err
}

var fork common.Hash
part1, err := common.HexToBytes(parts[1])
if err != nil {
return nil, 0, common.Hash{}, fmt.Errorf("while converting bytes to hash: %w", err)
}

copy(fork[:], part1)

if err = scale.Unmarshal(value, nextData); err != nil {
return nil, 0, common.Hash{}, err
}

return nextData, epoch, fork, nil
}

// GetEpochLength returns the length of an epoch in slots
func (s *EpochState) GetEpochLength() uint64 {
return s.epochLength
Expand Down Expand Up @@ -187,7 +277,7 @@ func (s *EpochState) GetEpochForBlock(header *types.Header) (uint64, error) {
// actually the epoch number for block number #1 is epoch 0,
// epochs start from 0 and are incremented (almost, given that epochs might be skipped)
// sequentially 0...1...2, so the block number #1 belongs to epoch 0
if header.Number == 1 {
if header.Number == 0 || header.Number == 1 {
return 0, nil
}

Expand Down Expand Up @@ -596,6 +686,11 @@ func (s *EpochState) HandleBABEDigest(header *types.Header, digest types.BabeCon

nextEpoch := currEpoch + 1
s.storeBABENextEpochData(nextEpoch, headerHash, val)

if err = s.setBABENextEpochDataInDB(nextEpoch, headerHash, val); err != nil {
return fmt.Errorf("setting next epoch data in db: %w", err)
}

logger.Debugf("stored BABENextEpochData data: %v for hash: %s to epoch: %d", digest, headerHash, nextEpoch)
return nil

Expand All @@ -616,6 +711,11 @@ func (s *EpochState) HandleBABEDigest(header *types.Header, digest types.BabeCon
}
nextEpoch := currEpoch + 1
s.storeBABENextConfigData(nextEpoch, headerHash, nextConfigData)

if err := s.setBABENextConfigData(nextEpoch, headerHash, nextConfigData); err != nil {
return fmt.Errorf("setting next config data in db: %w", err)
}

logger.Debugf("stored BABENextConfigData data: %v for hash: %s to epoch: %d", digest, headerHash, nextEpoch)
return nil
default:
Expand Down Expand Up @@ -823,6 +923,19 @@ func (s *EpochState) storeBABENextEpochData(epoch uint64, hash common.Hash, next
s.nextEpochData[epoch][hash] = nextEpochData
}

// setBABENextEpochDataInDB stores the types.NextEpochData under epoch and hash keys
func (s *EpochState) setBABENextEpochDataInDB(epoch uint64, forkHash common.Hash,
nextEpochData types.NextEpochData) error {
encodedEpochData, err := scale.Marshal(nextEpochData)
if err != nil {
return err
}

key := nextEpochDataKey(epoch, forkHash)

return s.db.Put(key, encodedEpochData)
}

// StoreBABENextConfigData stores the types.NextConfigData under epoch and hash keys
func (s *EpochState) storeBABENextConfigData(epoch uint64, hash common.Hash, nextConfigData types.NextConfigDataV1) {
s.nextConfigDataLock.Lock()
Expand All @@ -837,6 +950,18 @@ func (s *EpochState) storeBABENextConfigData(epoch uint64, hash common.Hash, nex
s.nextConfigData[epoch][hash] = nextConfigData
}

// setBABENextConfigData stores the types.NextConfigData under epoch and hash keys
func (s *EpochState) setBABENextConfigData(epoch uint64,
forkHash common.Hash, nextConfigData types.NextConfigDataV1) error {
encodedConfigData, err := scale.Marshal(nextConfigData)
if err != nil {
return err
}

key := nextConfigDataKey(epoch, forkHash)
return s.db.Put(key, encodedConfigData)
}

// FinalizeBABENextEpochData stores the right types.NextEpochData by
// getting the set of hashes from the received epoch and for each hash
// check if the header is in the database then it's been finalized and
Expand Down Expand Up @@ -888,12 +1013,66 @@ func (s *EpochState) FinalizeBABENextEpochData(finalizedHeader *types.Header) er
for e := range s.nextEpochData {
if e <= nextEpoch {
delete(s.nextEpochData, e)
// remove the epoch data from the database
if err = deleteDataFromDisk[types.NextEpochData](s.db, e, nextEpochDataPrefix); err != nil {
return fmt.Errorf("cannot delete next epoch data from the database: %w", err)
}
}
}

return nil
}

// deleteDataFromDisk is a generic function that deletes all the nextEpochData or nextConfigData
// for a given epoch from the database
func deleteDataFromDisk[T types.NextEpochData | types.NextConfigDataV1](
db database.Table, epoch uint64, prefix []byte) error {
keysToDelete, err := getDataKeysFromDisk[T](db, prefix, epoch)
if err != nil {
return fmt.Errorf("cannot get next config data keys from disk: %w", err)
}
batch := db.NewBatch()
for _, key := range keysToDelete {
err = batch.Del([]byte(key))
if err != nil {
return fmt.Errorf("cannot delete next config data from the database: %w", err)
}
}

if err := batch.Flush(); err != nil {
return fmt.Errorf("cannot flush deletion batch: %w", err)
}

return nil
}

// getDataKeysFromDisk is a generic function that returns all the nextEpochData or nextConfigData keys
// for a given epoch from the database
func getDataKeysFromDisk[T types.NextEpochData | types.NextConfigDataV1](
db database.Table, prefix []byte, currentEpoch uint64) (
[]string, error) {

var dataKeys []string
currentEpochPrefix := fmt.Sprintf("%s%d", prefix, currentEpoch)

iter, err := db.NewPrefixIterator([]byte(currentEpochPrefix))
if err != nil {
return dataKeys, err
}

defer iter.Release()

for iter.First(); iter.Valid(); iter.Next() {
key := string(iter.Key())
index := strings.Index(key, epochPrefix)
secondPart := key[index+len(epochPrefix):]
dataKeys = append(dataKeys, secondPart)

}

return dataKeys, nil
}

// FinalizeBABENextConfigData stores the right types.NextConfigData by
// getting the set of hashes from the received epoch and for each hash
// check if the header is in the database then it's been finalized and
Expand Down Expand Up @@ -950,6 +1129,10 @@ func (s *EpochState) FinalizeBABENextConfigData(finalizedHeader *types.Header) e
for e := range s.nextConfigData {
if e <= nextEpoch {
delete(s.nextConfigData, e)
// remove the config data from the database
if err = deleteDataFromDisk[types.NextConfigDataV1](s.db, e, nextConfigDataPrefix); err != nil {
return fmt.Errorf("cannot delete next config data from the database: %w", err)
}
}
}

Expand Down
Loading

0 comments on commit eb0b5f0

Please sign in to comment.