Skip to content

Commit

Permalink
chore(dot/sync): add Fragment struct for chain of blocks (#4274)
Browse files Browse the repository at this point in the history
Co-authored-by: Haiko Schol <[email protected]>
  • Loading branch information
EclesioMeloJunior and haikoschol authored Dec 2, 2024
1 parent f0d0159 commit 82433af
Show file tree
Hide file tree
Showing 4 changed files with 209 additions and 167 deletions.
114 changes: 59 additions & 55 deletions dot/sync/fullsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/ChainSafe/gossamer/dot/peerset"
"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/internal/database"
"github.com/ChainSafe/gossamer/lib/common"

"github.com/libp2p/go-libp2p/core/peer"
)
Expand Down Expand Up @@ -157,55 +158,60 @@ func (f *FullSyncStrategy) Process(results []*SyncTaskResult) (
return false, nil, nil, fmt.Errorf("getting highest finalized header")
}

readyBlocks := make([][]*types.BlockData, 0, len(validResp))
readyBlocks := make([]*Fragment, 0, len(validResp))
for _, reqRespData := range validResp {
responseFragment := NewFragment(reqRespData.responseData)

// if Gossamer requested the header, then the response data should contains
// the full blocks to be imported. If Gossamer didn't request the header,
// then the response should only contain the missing parts that will complete
// the unreadyBlocks and then with the blocks completed we should be able to import them
if reqRespData.req.RequestField(messages.RequestedDataHeader) {
updatedFragment, ok := f.unreadyBlocks.updateDisjointFragments(reqRespData.responseData)
updatedFragment, ok := f.unreadyBlocks.updateDisjointFragments(responseFragment)
if ok {
validBlocks := validBlocksUnderFragment(highestFinalized.Number, updatedFragment)
if len(validBlocks) > 0 {
readyBlocks = append(readyBlocks, validBlocks)
validFragment := updatedFragment.Filter(func(bd *types.BlockData) bool {
return bd.Header.Number > highestFinalized.Number
})

if validFragment.Len() > 0 {
readyBlocks = append(readyBlocks, validFragment)
}
} else {
readyBlocks = append(readyBlocks, reqRespData.responseData)
readyBlocks = append(readyBlocks, responseFragment)
}

continue
}

completedBlocks := f.unreadyBlocks.updateIncompleteBlocks(reqRespData.responseData)
readyBlocks = append(readyBlocks, completedBlocks)
completedBlocks := f.unreadyBlocks.updateIncompleteBlocks(responseFragment)
readyBlocks = append(readyBlocks, completedBlocks...)
}

// disjoint fragments are pieces of the chain that could not be imported right now
// because is blocks too far ahead or blocks that belongs to forks
sortFragmentsOfChain(readyBlocks)
orderedFragments := mergeFragmentsOfChain(readyBlocks)

nextBlocksToImport := make([]*types.BlockData, 0)
disjointFragments := make([][]*types.BlockData, 0)
nextBlocksToImport := new(Fragment)
disjointFragments := make([]*Fragment, 0)

for _, fragment := range orderedFragments {
ok, err := f.blockState.HasHeader(fragment[0].Header.ParentHash)
ok, err := f.blockState.HasHeader(fragment.First().Header.ParentHash)
if err != nil && !errors.Is(err, database.ErrNotFound) {
return false, nil, nil, fmt.Errorf("checking block parent header: %w", err)
}

if ok {
nextBlocksToImport = append(nextBlocksToImport, fragment...)
nextBlocksToImport = nextBlocksToImport.Concat(fragment)
continue
}

disjointFragments = append(disjointFragments, fragment)
}

// this loop goal is to import ready blocks as well as update the highestFinalized header
for len(nextBlocksToImport) > 0 || len(disjointFragments) > 0 {
for _, blockToImport := range nextBlocksToImport {
for nextBlocksToImport.Len() > 0 || len(disjointFragments) > 0 {
for blockToImport := range nextBlocksToImport.Iter() {
imported, err := f.blockImporter.importBlock(blockToImport, networkInitialSync)
if err != nil {
return false, nil, nil, fmt.Errorf("while handling ready block: %w", err)
Expand All @@ -216,7 +222,7 @@ func (f *FullSyncStrategy) Process(results []*SyncTaskResult) (
}
}

nextBlocksToImport = make([]*types.BlockData, 0)
nextBlocksToImport = new(Fragment)
highestFinalized, err = f.blockState.GetHighestFinalisedHeader()
if err != nil {
return false, nil, nil, fmt.Errorf("getting highest finalized header")
Expand All @@ -226,45 +232,54 @@ func (f *FullSyncStrategy) Process(results []*SyncTaskResult) (
// given that fragment contains chains and these chains contains blocks
// check if the first block in the chain contains a parent known by us
for _, fragment := range disjointFragments {
validFragment := validBlocksUnderFragment(highestFinalized.Number, fragment)
if len(validFragment) == 0 {
validFragment := fragment.Filter(func(bd *types.BlockData) bool {
return bd.Header.Number > highestFinalized.Number
})

if validFragment.Len() == 0 {
continue
}

ok, err := f.blockState.HasHeader(validFragment[0].Header.ParentHash)
ok, err := f.blockState.HasHeader(validFragment.First().Header.ParentHash)
if err != nil && !errors.Is(err, database.ErrNotFound) {
return false, nil, nil, err
}

if !ok {
firstFragmentBlock := validFragment.First()
// if the parent of this valid fragment is behind our latest finalized number
// then we can discard the whole fragment since it is a invalid fork
if (validFragment[0].Header.Number - 1) <= highestFinalized.Number {
if (firstFragmentBlock.Header.Number - 1) <= highestFinalized.Number {
continue
}

logger.Infof("starting an acestor search from %s parent of #%d (%s)",
validFragment[0].Header.ParentHash,
validFragment[0].Header.Number,
validFragment[0].Header.Hash(),
firstFragmentBlock.Header.ParentHash,
firstFragmentBlock.Header.Number,
firstFragmentBlock.Header.Hash(),
)

f.unreadyBlocks.newDisjointFragment(validFragment)
request := messages.NewBlockRequest(
*messages.NewFromBlock(validFragment[0].Header.ParentHash),
*messages.NewFromBlock(firstFragmentBlock.Header.ParentHash),
messages.MaxBlocksInResponse,
messages.BootstrapRequestData, messages.Descending)
f.requestQueue.PushBack(request)
} else {
// inserting them in the queue to be processed after the main chain
nextBlocksToImport = append(nextBlocksToImport, validFragment...)
nextBlocksToImport = nextBlocksToImport.Concat(validFragment)
}
}

disjointFragments = nil
}

f.unreadyBlocks.removeIrrelevantFragments(highestFinalized.Number)
// update unready blocks based on the highest finalized block
f.unreadyBlocks.pruneDisjointFragments(LowerThanOrEq(highestFinalized.Number))
f.unreadyBlocks.removeIncompleteBlocks(func(_ common.Hash, value *types.BlockData) bool {
return value.Header.Number <= highestFinalized.Number
})

return false, repChanges, peersToIgnore, nil
}

Expand Down Expand Up @@ -486,19 +501,24 @@ resultLoop:
// note that we have fragments with single blocks, fragments with fork (in case of 8)
// after sorting these fragments we end up with:
// [ {1, 2, 3, 4, 5} {6, 7, 8, 9, 10} {8} {11, 12, 13, 14, 15, 16} {17} ]
func sortFragmentsOfChain(fragments [][]*types.BlockData) {
func sortFragmentsOfChain(fragments []*Fragment) {
if len(fragments) == 0 {
return
}

slices.SortFunc(fragments, func(a, b []*types.BlockData) int {
if a[0].Header.Number < b[0].Header.Number {
return -1
}
if a[0].Header.Number == b[0].Header.Number {
return 0
slices.SortFunc(fragments, func(fragA, fragB *Fragment) int {
if fragA.First() != nil && fragB.First() != nil {
switch {
case fragA.First().Header.Number < fragB.First().Header.Number:
return -1
case fragA.First().Header.Number == fragB.First().Header.Number:
return 0
default:
return 1
}
}
return 1

return 0
})
}

Expand All @@ -510,20 +530,21 @@ func sortFragmentsOfChain(fragments [][]*types.BlockData) {
// [ {1, 2, 3, 4, 5} {6, 7, 8, 9, 10} {8} {11, 12, 13, 14, 15, 16} {17} ]
// merge will transform it to the following slice:
// [ {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17} {8} ]
func mergeFragmentsOfChain(fragments [][]*types.BlockData) [][]*types.BlockData {
func mergeFragmentsOfChain(fragments []*Fragment) []*Fragment {
if len(fragments) == 0 {
return nil
}

mergedFragments := [][]*types.BlockData{fragments[0]}
mergedFragments := []*Fragment{fragments[0]}

for i := 1; i < len(fragments); i++ {
lastMergedFragment := mergedFragments[len(mergedFragments)-1]
currentFragment := fragments[i]

lastBlock := lastMergedFragment[len(lastMergedFragment)-1]
lastBlock := lastMergedFragment.Last()

if lastBlock.IsParent(currentFragment[0]) {
mergedFragments[len(mergedFragments)-1] = append(lastMergedFragment, currentFragment...)
if lastBlock.IsParent(currentFragment.First()) {
mergedFragments[len(mergedFragments)-1] = lastMergedFragment.Concat(currentFragment)
} else {
mergedFragments = append(mergedFragments, currentFragment)
}
Expand All @@ -532,23 +553,6 @@ func mergeFragmentsOfChain(fragments [][]*types.BlockData) [][]*types.BlockData
return mergedFragments
}

// validBlocksUnderFragment ignore all blocks prior to the given last finalized number
func validBlocksUnderFragment(highestFinalizedNumber uint, fragmentBlocks []*types.BlockData) []*types.BlockData {
startFragmentFrom := -1
for idx, block := range fragmentBlocks {
if block.Header.Number > highestFinalizedNumber {
startFragmentFrom = idx
break
}
}

if startFragmentFrom < 0 {
return nil
}

return fragmentBlocks[startFragmentFrom:]
}

// validateResponseFields checks that the expected fields are in the block data
func validateResponseFields(req *messages.BlockRequestMessage, blocks []*types.BlockData) error {
for _, bd := range blocks {
Expand Down
4 changes: 2 additions & 2 deletions dot/sync/fullsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,8 @@ func TestFullSyncProcess(t *testing.T) {
require.Equal(t, fs.requestQueue.Len(), 1)
require.Len(t, fs.unreadyBlocks.incompleteBlocks, 0)
require.Len(t, fs.unreadyBlocks.disjointFragments, 1)
require.Equal(t, fs.unreadyBlocks.disjointFragments[0], sndTaskBlockResponse.BlockData)
require.Equal(t, len(fs.unreadyBlocks.disjointFragments[0]), len(sndTaskBlockResponse.BlockData))
require.Equal(t, fs.unreadyBlocks.disjointFragments[0], NewFragment(sndTaskBlockResponse.BlockData))
require.Equal(t, fs.unreadyBlocks.disjointFragments[0].Len(), len(sndTaskBlockResponse.BlockData))

expectedAncestorRequest := messages.NewBlockRequest(
*messages.NewFromBlock(sndTaskBlockResponse.BlockData[0].Header.ParentHash),
Expand Down
Loading

0 comments on commit 82433af

Please sign in to comment.