diff --git a/dot/sync/fullsync.go b/dot/sync/fullsync.go index 79db8b4a29..862e74327e 100644 --- a/dot/sync/fullsync.go +++ b/dot/sync/fullsync.go @@ -15,6 +15,7 @@ import ( "github.com/ChainSafe/gossamer/dot/peerset" "github.com/ChainSafe/gossamer/dot/types" "github.com/ChainSafe/gossamer/internal/database" + "github.com/ChainSafe/gossamer/lib/common" "github.com/libp2p/go-libp2p/core/peer" ) @@ -157,28 +158,33 @@ func (f *FullSyncStrategy) Process(results []*SyncTaskResult) ( return false, nil, nil, fmt.Errorf("getting highest finalized header") } - readyBlocks := make([][]*types.BlockData, 0, len(validResp)) + readyBlocks := make([]*Fragment, 0, len(validResp)) for _, reqRespData := range validResp { + responseFragment := NewFragment(reqRespData.responseData) + // if Gossamer requested the header, then the response data should contains // the full blocks to be imported. If Gossamer didn't request the header, // then the response should only contain the missing parts that will complete // the unreadyBlocks and then with the blocks completed we should be able to import them if reqRespData.req.RequestField(messages.RequestedDataHeader) { - updatedFragment, ok := f.unreadyBlocks.updateDisjointFragments(reqRespData.responseData) + updatedFragment, ok := f.unreadyBlocks.updateDisjointFragments(responseFragment) if ok { - validBlocks := validBlocksUnderFragment(highestFinalized.Number, updatedFragment) - if len(validBlocks) > 0 { - readyBlocks = append(readyBlocks, validBlocks) + validFragment := updatedFragment.Filter(func(bd *types.BlockData) bool { + return bd.Header.Number > highestFinalized.Number + }) + + if validFragment.Len() > 0 { + readyBlocks = append(readyBlocks, validFragment) } } else { - readyBlocks = append(readyBlocks, reqRespData.responseData) + readyBlocks = append(readyBlocks, responseFragment) } continue } - completedBlocks := f.unreadyBlocks.updateIncompleteBlocks(reqRespData.responseData) - readyBlocks = append(readyBlocks, completedBlocks) + completedBlocks := f.unreadyBlocks.updateIncompleteBlocks(responseFragment) + readyBlocks = append(readyBlocks, completedBlocks...) } // disjoint fragments are pieces of the chain that could not be imported right now @@ -186,17 +192,17 @@ func (f *FullSyncStrategy) Process(results []*SyncTaskResult) ( sortFragmentsOfChain(readyBlocks) orderedFragments := mergeFragmentsOfChain(readyBlocks) - nextBlocksToImport := make([]*types.BlockData, 0) - disjointFragments := make([][]*types.BlockData, 0) + nextBlocksToImport := new(Fragment) + disjointFragments := make([]*Fragment, 0) for _, fragment := range orderedFragments { - ok, err := f.blockState.HasHeader(fragment[0].Header.ParentHash) + ok, err := f.blockState.HasHeader(fragment.First().Header.ParentHash) if err != nil && !errors.Is(err, database.ErrNotFound) { return false, nil, nil, fmt.Errorf("checking block parent header: %w", err) } if ok { - nextBlocksToImport = append(nextBlocksToImport, fragment...) + nextBlocksToImport = nextBlocksToImport.Concat(fragment) continue } @@ -204,8 +210,8 @@ func (f *FullSyncStrategy) Process(results []*SyncTaskResult) ( } // this loop goal is to import ready blocks as well as update the highestFinalized header - for len(nextBlocksToImport) > 0 || len(disjointFragments) > 0 { - for _, blockToImport := range nextBlocksToImport { + for nextBlocksToImport.Len() > 0 || len(disjointFragments) > 0 { + for blockToImport := range nextBlocksToImport.Iter() { imported, err := f.blockImporter.importBlock(blockToImport, networkInitialSync) if err != nil { return false, nil, nil, fmt.Errorf("while handling ready block: %w", err) @@ -216,7 +222,7 @@ func (f *FullSyncStrategy) Process(results []*SyncTaskResult) ( } } - nextBlocksToImport = make([]*types.BlockData, 0) + nextBlocksToImport = new(Fragment) highestFinalized, err = f.blockState.GetHighestFinalisedHeader() if err != nil { return false, nil, nil, fmt.Errorf("getting highest finalized header") @@ -226,45 +232,54 @@ func (f *FullSyncStrategy) Process(results []*SyncTaskResult) ( // given that fragment contains chains and these chains contains blocks // check if the first block in the chain contains a parent known by us for _, fragment := range disjointFragments { - validFragment := validBlocksUnderFragment(highestFinalized.Number, fragment) - if len(validFragment) == 0 { + validFragment := fragment.Filter(func(bd *types.BlockData) bool { + return bd.Header.Number > highestFinalized.Number + }) + + if validFragment.Len() == 0 { continue } - ok, err := f.blockState.HasHeader(validFragment[0].Header.ParentHash) + ok, err := f.blockState.HasHeader(validFragment.First().Header.ParentHash) if err != nil && !errors.Is(err, database.ErrNotFound) { return false, nil, nil, err } if !ok { + firstFragmentBlock := validFragment.First() // if the parent of this valid fragment is behind our latest finalized number // then we can discard the whole fragment since it is a invalid fork - if (validFragment[0].Header.Number - 1) <= highestFinalized.Number { + if (firstFragmentBlock.Header.Number - 1) <= highestFinalized.Number { continue } logger.Infof("starting an acestor search from %s parent of #%d (%s)", - validFragment[0].Header.ParentHash, - validFragment[0].Header.Number, - validFragment[0].Header.Hash(), + firstFragmentBlock.Header.ParentHash, + firstFragmentBlock.Header.Number, + firstFragmentBlock.Header.Hash(), ) f.unreadyBlocks.newDisjointFragment(validFragment) request := messages.NewBlockRequest( - *messages.NewFromBlock(validFragment[0].Header.ParentHash), + *messages.NewFromBlock(firstFragmentBlock.Header.ParentHash), messages.MaxBlocksInResponse, messages.BootstrapRequestData, messages.Descending) f.requestQueue.PushBack(request) } else { // inserting them in the queue to be processed after the main chain - nextBlocksToImport = append(nextBlocksToImport, validFragment...) + nextBlocksToImport = nextBlocksToImport.Concat(validFragment) } } disjointFragments = nil } - f.unreadyBlocks.removeIrrelevantFragments(highestFinalized.Number) + // update unready blocks based on the highest finalized block + f.unreadyBlocks.pruneDisjointFragments(LowerThanOrEq(highestFinalized.Number)) + f.unreadyBlocks.removeIncompleteBlocks(func(_ common.Hash, value *types.BlockData) bool { + return value.Header.Number <= highestFinalized.Number + }) + return false, repChanges, peersToIgnore, nil } @@ -486,19 +501,24 @@ resultLoop: // note that we have fragments with single blocks, fragments with fork (in case of 8) // after sorting these fragments we end up with: // [ {1, 2, 3, 4, 5} {6, 7, 8, 9, 10} {8} {11, 12, 13, 14, 15, 16} {17} ] -func sortFragmentsOfChain(fragments [][]*types.BlockData) { +func sortFragmentsOfChain(fragments []*Fragment) { if len(fragments) == 0 { return } - slices.SortFunc(fragments, func(a, b []*types.BlockData) int { - if a[0].Header.Number < b[0].Header.Number { - return -1 - } - if a[0].Header.Number == b[0].Header.Number { - return 0 + slices.SortFunc(fragments, func(fragA, fragB *Fragment) int { + if fragA.First() != nil && fragB.First() != nil { + switch { + case fragA.First().Header.Number < fragB.First().Header.Number: + return -1 + case fragA.First().Header.Number == fragB.First().Header.Number: + return 0 + default: + return 1 + } } - return 1 + + return 0 }) } @@ -510,20 +530,21 @@ func sortFragmentsOfChain(fragments [][]*types.BlockData) { // [ {1, 2, 3, 4, 5} {6, 7, 8, 9, 10} {8} {11, 12, 13, 14, 15, 16} {17} ] // merge will transform it to the following slice: // [ {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17} {8} ] -func mergeFragmentsOfChain(fragments [][]*types.BlockData) [][]*types.BlockData { +func mergeFragmentsOfChain(fragments []*Fragment) []*Fragment { if len(fragments) == 0 { return nil } - mergedFragments := [][]*types.BlockData{fragments[0]} + mergedFragments := []*Fragment{fragments[0]} + for i := 1; i < len(fragments); i++ { lastMergedFragment := mergedFragments[len(mergedFragments)-1] currentFragment := fragments[i] - lastBlock := lastMergedFragment[len(lastMergedFragment)-1] + lastBlock := lastMergedFragment.Last() - if lastBlock.IsParent(currentFragment[0]) { - mergedFragments[len(mergedFragments)-1] = append(lastMergedFragment, currentFragment...) + if lastBlock.IsParent(currentFragment.First()) { + mergedFragments[len(mergedFragments)-1] = lastMergedFragment.Concat(currentFragment) } else { mergedFragments = append(mergedFragments, currentFragment) } @@ -532,23 +553,6 @@ func mergeFragmentsOfChain(fragments [][]*types.BlockData) [][]*types.BlockData return mergedFragments } -// validBlocksUnderFragment ignore all blocks prior to the given last finalized number -func validBlocksUnderFragment(highestFinalizedNumber uint, fragmentBlocks []*types.BlockData) []*types.BlockData { - startFragmentFrom := -1 - for idx, block := range fragmentBlocks { - if block.Header.Number > highestFinalizedNumber { - startFragmentFrom = idx - break - } - } - - if startFragmentFrom < 0 { - return nil - } - - return fragmentBlocks[startFragmentFrom:] -} - // validateResponseFields checks that the expected fields are in the block data func validateResponseFields(req *messages.BlockRequestMessage, blocks []*types.BlockData) error { for _, bd := range blocks { diff --git a/dot/sync/fullsync_test.go b/dot/sync/fullsync_test.go index 0c9bbd4122..04536f5125 100644 --- a/dot/sync/fullsync_test.go +++ b/dot/sync/fullsync_test.go @@ -254,8 +254,8 @@ func TestFullSyncProcess(t *testing.T) { require.Equal(t, fs.requestQueue.Len(), 1) require.Len(t, fs.unreadyBlocks.incompleteBlocks, 0) require.Len(t, fs.unreadyBlocks.disjointFragments, 1) - require.Equal(t, fs.unreadyBlocks.disjointFragments[0], sndTaskBlockResponse.BlockData) - require.Equal(t, len(fs.unreadyBlocks.disjointFragments[0]), len(sndTaskBlockResponse.BlockData)) + require.Equal(t, fs.unreadyBlocks.disjointFragments[0], NewFragment(sndTaskBlockResponse.BlockData)) + require.Equal(t, fs.unreadyBlocks.disjointFragments[0].Len(), len(sndTaskBlockResponse.BlockData)) expectedAncestorRequest := messages.NewBlockRequest( *messages.NewFromBlock(sndTaskBlockResponse.BlockData[0].Header.ParentHash), diff --git a/dot/sync/unready_blocks.go b/dot/sync/unready_blocks.go index 58f477ff52..d08d69ee16 100644 --- a/dot/sync/unready_blocks.go +++ b/dot/sync/unready_blocks.go @@ -4,6 +4,7 @@ package sync import ( + "iter" "maps" "slices" "sync" @@ -12,16 +13,90 @@ import ( "github.com/ChainSafe/gossamer/lib/common" ) +type Fragment struct { + chain []*types.BlockData +} + +func NewFragment(chain []*types.BlockData) *Fragment { + return &Fragment{chain} +} + +// Filter returns a new fragments with blocks that satisfies the predicate p +func (f *Fragment) Filter(p func(*types.BlockData) bool) *Fragment { + filtered := make([]*types.BlockData, 0, len(f.chain)) + for _, bd := range f.chain { + if p(bd) { + filtered = append(filtered, bd) + } + } + return NewFragment(filtered) +} + +// Find returns the first occurrence of a types.BlockData that +// satisfies the predicate p +func (f *Fragment) Find(p func(*types.BlockData) bool) *types.BlockData { + for _, bd := range f.chain { + if p(bd) { + return bd + } + } + + return nil +} + +// First returns the first block in the fragment or nil otherwise +func (f *Fragment) First() *types.BlockData { + if len(f.chain) > 0 { + return f.chain[0] + } + + return nil +} + +// Last returns the last block in the fragment or nil otherwise +func (f *Fragment) Last() *types.BlockData { + if len(f.chain) > 0 { + return f.chain[len(f.chain)-1] + } + + return nil +} + +// Len returns the amount of blocks in the fragment +func (f *Fragment) Len() int { + return len(f.chain) +} + +// Iter returns an iterator of the blocks in the fragment +// it enables the caller to use range keyword in the Fragment instance +func (f *Fragment) Iter() iter.Seq[*types.BlockData] { + return func(yield func(*types.BlockData) bool) { + for _, bd := range f.chain { + if !yield(bd) { + return + } + } + } +} + +// Concat returns a new fragment containing the concatenation +// between this fragment and the given as argument fragment +func (f *Fragment) Concat(snd *Fragment) *Fragment { + return &Fragment{ + chain: slices.Concat(f.chain, snd.chain), + } +} + type unreadyBlocks struct { mtx sync.RWMutex incompleteBlocks map[common.Hash]*types.BlockData - disjointFragments [][]*types.BlockData + disjointFragments []*Fragment } func newUnreadyBlocks() *unreadyBlocks { return &unreadyBlocks{ incompleteBlocks: make(map[common.Hash]*types.BlockData), - disjointFragments: make([][]*types.BlockData, 0), + disjointFragments: make([]*Fragment, 0), } } @@ -36,34 +111,32 @@ func (u *unreadyBlocks) newIncompleteBlock(blockHeader *types.Header) { } } -func (u *unreadyBlocks) newDisjointFragment(frag []*types.BlockData) { +func (u *unreadyBlocks) newDisjointFragment(frag *Fragment) { u.mtx.Lock() defer u.mtx.Unlock() u.disjointFragments = append(u.disjointFragments, frag) } // updateDisjointFragments given a set of blocks check if it -// connects to a disjoint fragment, if so we remove the fragment from the -// disjoint set and return the fragment concatenated with the chain argument -func (u *unreadyBlocks) updateDisjointFragments(chain []*types.BlockData) ([]*types.BlockData, bool) { +// connects to a disjoint fragment, and returns a new fragment +func (u *unreadyBlocks) updateDisjointFragments(chain *Fragment) (*Fragment, bool) { u.mtx.Lock() defer u.mtx.Unlock() - indexToChange := -1 for idx, disjointChain := range u.disjointFragments { - lastBlockArriving := chain[len(chain)-1] - firstDisjointBlock := disjointChain[0] + var outFragment *Fragment + if chain.Last().IsParent(disjointChain.First()) { + outFragment = chain.Concat(disjointChain) + } - if lastBlockArriving.IsParent(firstDisjointBlock) { - indexToChange = idx - break + if disjointChain.Last().IsParent(chain.First()) { + outFragment = disjointChain.Concat(chain) } - } - if indexToChange >= 0 { - disjointChain := u.disjointFragments[indexToChange] - u.disjointFragments = append(u.disjointFragments[:indexToChange], u.disjointFragments[indexToChange+1:]...) - return append(chain, disjointChain...), true + if outFragment != nil { + u.disjointFragments = slices.Delete(u.disjointFragments, idx, idx+1) + return outFragment, true + } } return nil, false @@ -72,12 +145,13 @@ func (u *unreadyBlocks) updateDisjointFragments(chain []*types.BlockData) ([]*ty // updateIncompleteBlocks given a set of blocks check if they can fullfil // incomplete blocks, the blocks that can be completed will be removed from // the incompleteBlocks map and returned -func (u *unreadyBlocks) updateIncompleteBlocks(chain []*types.BlockData) []*types.BlockData { +func (u *unreadyBlocks) updateIncompleteBlocks(chain *Fragment) []*Fragment { u.mtx.Lock() defer u.mtx.Unlock() - completeBlocks := make([]*types.BlockData, 0) - for _, blockData := range chain { + completeBlocks := make([]*Fragment, 0) + + for blockData := range chain.Iter() { incomplete, ok := u.incompleteBlocks[blockData.Hash] if !ok { continue @@ -87,7 +161,7 @@ func (u *unreadyBlocks) updateIncompleteBlocks(chain []*types.BlockData) []*type incomplete.Justification = blockData.Justification delete(u.incompleteBlocks, blockData.Hash) - completeBlocks = append(completeBlocks, incomplete) + completeBlocks = append(completeBlocks, NewFragment([]*types.BlockData{incomplete})) } return completeBlocks @@ -102,26 +176,17 @@ func (u *unreadyBlocks) isIncomplete(blockHash common.Hash) bool { } // inDisjointFragment iterate through the disjoint fragments and -// check if the block hash an number already exists in one of them +// check if the block hash and number already exists in one of them func (u *unreadyBlocks) inDisjointFragment(blockHash common.Hash, blockNumber uint) bool { u.mtx.RLock() defer u.mtx.RUnlock() for _, frag := range u.disjointFragments { - target := &types.BlockData{Header: &types.Header{Number: blockNumber}} - idx, found := slices.BinarySearchFunc(frag, target, - func(a, b *types.BlockData) int { - switch { - case a.Header.Number == b.Header.Number: - return 0 - case a.Header.Number < b.Header.Number: - return -1 - default: - return 1 - } - }) - - if found && frag[idx].Hash == blockHash { + bd := frag.Find(func(bd *types.BlockData) bool { + return bd.Header.Number == blockNumber && bd.Hash == blockHash + }) + + if bd != nil { return true } } @@ -129,35 +194,28 @@ func (u *unreadyBlocks) inDisjointFragment(blockHash common.Hash, blockNumber ui return false } -// removeIrrelevantFragments checks if there is blocks in the fragments that can be pruned -// given the finalised block number -func (u *unreadyBlocks) removeIrrelevantFragments(finalisedNumber uint) { +func (u *unreadyBlocks) removeIncompleteBlocks(del func(key common.Hash, value *types.BlockData) bool) { u.mtx.Lock() defer u.mtx.Unlock() - maps.DeleteFunc(u.incompleteBlocks, func(_ common.Hash, value *types.BlockData) bool { - return value.Header.Number <= finalisedNumber - }) - - fragmentIdx := 0 - for _, fragment := range u.disjointFragments { - // the fragments are sorted in ascending order - // starting from the latest item and going backwards - // we have a higher chance to find the idx that has - // a block with number lower or equal the finalised one - idx := len(fragment) - 1 - for ; idx >= 0; idx-- { - if fragment[idx].Header.Number <= finalisedNumber { - break - } - } + maps.DeleteFunc(u.incompleteBlocks, del) +} - updatedFragment := fragment[idx+1:] - if len(updatedFragment) != 0 { - u.disjointFragments[fragmentIdx] = updatedFragment - fragmentIdx++ - } - } +// pruneFragments will iterate over the disjoint fragments and check if they +// can be removed based on the del param +func (u *unreadyBlocks) pruneDisjointFragments(del func(*Fragment) bool) { + u.mtx.Lock() + defer u.mtx.Unlock() - u.disjointFragments = u.disjointFragments[:fragmentIdx] + u.disjointFragments = slices.DeleteFunc(u.disjointFragments, del) +} + +// LowerThanOrEq returns true if the fragment contains +// a block that has a number lower than highest finalized number +func LowerThanOrEq(blockNumber uint) func(*Fragment) bool { + return func(f *Fragment) bool { + return f.Find(func(bd *types.BlockData) bool { + return bd.Header.Number <= blockNumber + }) != nil + } } diff --git a/dot/sync/unready_blocks_test.go b/dot/sync/unready_blocks_test.go index b15019c362..1f313f4d90 100644 --- a/dot/sync/unready_blocks_test.go +++ b/dot/sync/unready_blocks_test.go @@ -13,38 +13,32 @@ import ( func TestUnreadyBlocks_removeIrrelevantFragments(t *testing.T) { t.Run("removing_all_disjoint_fragment", func(t *testing.T) { ub := newUnreadyBlocks() - ub.disjointFragments = [][]*types.BlockData{ - { - { - Header: &types.Header{ - Number: 100, - }, - }, - }, - { + ub.disjointFragments = []*Fragment{ + NewFragment([]*types.BlockData{ { Header: &types.Header{ Number: 99, }, }, - }, - { + }), + NewFragment([]*types.BlockData{ { Header: &types.Header{ Number: 92, }, }, - }, + }), } - ub.removeIrrelevantFragments(100) + + ub.pruneDisjointFragments(LowerThanOrEq(100)) require.Empty(t, ub.disjointFragments) }) t.Run("removing_irrelevant_fragments", func(t *testing.T) { ub := newUnreadyBlocks() - ub.disjointFragments = [][]*types.BlockData{ + ub.disjointFragments = []*Fragment{ // first fragment - { + NewFragment([]*types.BlockData{ { Header: &types.Header{ Number: 192, @@ -62,10 +56,10 @@ func TestUnreadyBlocks_removeIrrelevantFragments(t *testing.T) { Number: 190, }, }, - }, + }), // second fragment - { + NewFragment([]*types.BlockData{ { Header: &types.Header{ Number: 253, @@ -83,10 +77,10 @@ func TestUnreadyBlocks_removeIrrelevantFragments(t *testing.T) { Number: 255, }, }, - }, + }), // third fragment - { + NewFragment([]*types.BlockData{ { Header: &types.Header{ Number: 1022, @@ -104,30 +98,16 @@ func TestUnreadyBlocks_removeIrrelevantFragments(t *testing.T) { Number: 1024, }, }, - }, + }), } // the first fragment should be removed // the second fragment should have only 2 items // the third frament shold not be affected - ub.removeIrrelevantFragments(253) - require.Len(t, ub.disjointFragments, 2) - - expectedSecondFrag := []*types.BlockData{ - { - Header: &types.Header{ - Number: 254, - }, - }, - - { - Header: &types.Header{ - Number: 255, - }, - }, - } + ub.pruneDisjointFragments(LowerThanOrEq(253)) + require.Len(t, ub.disjointFragments, 1) - expectedThirdFragment := []*types.BlockData{ + expectedThirdFragment := NewFragment([]*types.BlockData{ { Header: &types.Header{ Number: 1022, @@ -145,37 +125,37 @@ func TestUnreadyBlocks_removeIrrelevantFragments(t *testing.T) { Number: 1024, }, }, - } - require.Equal(t, ub.disjointFragments[0], expectedSecondFrag) - require.Equal(t, ub.disjointFragments[1], expectedThirdFragment) + }) + + require.Equal(t, ub.disjointFragments[0], expectedThirdFragment) }) t.Run("keep_all_fragments", func(t *testing.T) { ub := newUnreadyBlocks() - ub.disjointFragments = [][]*types.BlockData{ - { + ub.disjointFragments = []*Fragment{ + NewFragment([]*types.BlockData{ { Header: &types.Header{ Number: 101, }, }, - }, - { + }), + NewFragment([]*types.BlockData{ { Header: &types.Header{ Number: 103, }, }, - }, - { + }), + NewFragment([]*types.BlockData{ { Header: &types.Header{ Number: 104, }, }, - }, + }), } - ub.removeIrrelevantFragments(100) + ub.pruneDisjointFragments(LowerThanOrEq(100)) require.Len(t, ub.disjointFragments, 3) }) }