Skip to content

Commit

Permalink
feat(dot/sync): include block origin and skip extra validation on `in…
Browse files Browse the repository at this point in the history
…itialSync` (#3392)
  • Loading branch information
EclesioMeloJunior authored Nov 29, 2023
1 parent a9c1f8f commit 8e1650e
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 55 deletions.
76 changes: 51 additions & 25 deletions dot/sync/chain_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ const (
tip
)

type blockOrigin byte

const (
networkInitialSync blockOrigin = iota
networkBroadcast
)

func (s chainSyncState) String() string {
switch s {
case bootstrap:
Expand Down Expand Up @@ -126,6 +133,7 @@ type chainSync struct {
telemetry Telemetry
badBlocks []string
requestMaker network.RequestMaker
waitPeersDuration time.Duration
}

type chainSyncConfig struct {
Expand All @@ -142,6 +150,7 @@ type chainSyncConfig struct {
blockImportHandler BlockImportHandler
telemetry Telemetry
badBlocks []string
waitPeersDuration time.Duration
}

func newChainSync(cfg chainSyncConfig) *chainSync {
Expand All @@ -166,26 +175,40 @@ func newChainSync(cfg chainSyncConfig) *chainSync {
workerPool: newSyncWorkerPool(cfg.net, cfg.requestMaker),
badBlocks: cfg.badBlocks,
requestMaker: cfg.requestMaker,
waitPeersDuration: cfg.waitPeersDuration,
}
}

func (cs *chainSync) start() {
// since the default status from sync mode is syncMode(tip)
isSyncedGauge.Set(1)
func (cs *chainSync) waitEnoughPeersAndTarget() {
waitPeersTimer := time.NewTimer(cs.waitPeersDuration)

// wait until we have a minimal workers in the sync worker pool
for {
cs.workerPool.useConnectedPeers()
_, err := cs.getTarget()

totalAvailable := cs.workerPool.totalWorkers()
if totalAvailable >= uint(cs.minPeers) {
break
if totalAvailable >= uint(cs.minPeers) && err == nil {
return
}

// TODO: https://github.com/ChainSafe/gossamer/issues/3402
time.Sleep(time.Second)
select {
case <-waitPeersTimer.C:
waitPeersTimer.Reset(cs.waitPeersDuration)
case <-cs.stopCh:
return
}
}
}

func (cs *chainSync) start() {
// since the default status from sync mode is syncMode(tip)
isSyncedGauge.Set(1)

cs.wg.Add(1)
go cs.pendingBlocks.run(cs.finalisedCh, cs.stopCh, &cs.wg)

// wait until we have a minimal workers in the sync worker pool
cs.waitEnoughPeersAndTarget()
}

func (cs *chainSync) stop() error {
Expand Down Expand Up @@ -265,7 +288,7 @@ func (cs *chainSync) bootstrapSync() {

if isFarFromTarget {
cs.workerPool.useConnectedPeers()
err = cs.requestMaxBlocksFrom(bestBlockHeader)
err = cs.requestMaxBlocksFrom(bestBlockHeader, networkInitialSync)
if err != nil {
logger.Errorf("requesting max blocks from best block header: %s", err)
}
Expand Down Expand Up @@ -424,7 +447,7 @@ func (cs *chainSync) requestChainBlocks(announcedHeader, bestBlockHeader *types.

resultsQueue := make(chan *syncTaskResult)
cs.workerPool.submitRequest(request, &peerWhoAnnounced, resultsQueue)
err := cs.handleWorkersResults(resultsQueue, startAtBlock, totalBlocks)
err := cs.handleWorkersResults(resultsQueue, networkBroadcast, startAtBlock, totalBlocks)
if err != nil {
return fmt.Errorf("while handling workers results: %w", err)
}
Expand Down Expand Up @@ -462,7 +485,7 @@ func (cs *chainSync) requestForkBlocks(bestBlockHeader, highestFinalizedHeader,
resultsQueue := make(chan *syncTaskResult)
cs.workerPool.submitRequest(request, &peerWhoAnnounced, resultsQueue)

err = cs.handleWorkersResults(resultsQueue, startAtBlock, gapLength)
err = cs.handleWorkersResults(resultsQueue, networkBroadcast, startAtBlock, gapLength)
if err != nil {
return fmt.Errorf("while handling workers results: %w", err)
}
Expand Down Expand Up @@ -490,7 +513,7 @@ func (cs *chainSync) requestPendingBlocks(highestFinalizedHeader *types.Header)
}

if parentExists {
err := cs.handleReadyBlock(pendingBlock.toBlockData())
err := cs.handleReadyBlock(pendingBlock.toBlockData(), networkBroadcast)
if err != nil {
return fmt.Errorf("handling ready block: %w", err)
}
Expand All @@ -515,7 +538,7 @@ func (cs *chainSync) requestPendingBlocks(highestFinalizedHeader *types.Header)
// TODO: we should handle the requests concurrently
// a way of achieve that is by constructing a new `handleWorkersResults` for
// handling only tip sync requests
err = cs.handleWorkersResults(resultsQueue, startAtBlock, *descendingGapRequest.Max)
err = cs.handleWorkersResults(resultsQueue, networkBroadcast, startAtBlock, *descendingGapRequest.Max)
if err != nil {
return fmt.Errorf("while handling workers results: %w", err)
}
Expand All @@ -524,7 +547,7 @@ func (cs *chainSync) requestPendingBlocks(highestFinalizedHeader *types.Header)
return nil
}

func (cs *chainSync) requestMaxBlocksFrom(bestBlockHeader *types.Header) error {
func (cs *chainSync) requestMaxBlocksFrom(bestBlockHeader *types.Header, origin blockOrigin) error { //nolint:unparam
startRequestAt := bestBlockHeader.Number + 1

// targetBlockNumber is the virtual target we will request, however
Expand All @@ -551,7 +574,7 @@ func (cs *chainSync) requestMaxBlocksFrom(bestBlockHeader *types.Header) error {
}

resultsQueue := cs.workerPool.submitRequests(requests)
err = cs.handleWorkersResults(resultsQueue, startRequestAt, expectedAmountOfBlocks)
err = cs.handleWorkersResults(resultsQueue, origin, startRequestAt, expectedAmountOfBlocks)
if err != nil {
return fmt.Errorf("while handling workers results: %w", err)
}
Expand Down Expand Up @@ -589,7 +612,7 @@ func (cs *chainSync) getTarget() (uint, error) {
// in the queue and wait for it to completes
// TODO: handle only justification requests
func (cs *chainSync) handleWorkersResults(
workersResults chan *syncTaskResult, startAtBlock uint, expectedSyncedBlocks uint32) error {
workersResults chan *syncTaskResult, origin blockOrigin, startAtBlock uint, expectedSyncedBlocks uint32) error {

startTime := time.Now()
defer func() {
Expand Down Expand Up @@ -739,15 +762,15 @@ taskResultLoop:
// response was validated! place into ready block queue
for _, bd := range syncingChain {
// block is ready to be processed!
if err := cs.handleReadyBlock(bd); err != nil {
if err := cs.handleReadyBlock(bd, origin); err != nil {
return fmt.Errorf("while handling ready block: %w", err)
}
}

return nil
}

func (cs *chainSync) handleReadyBlock(bd *types.BlockData) error {
func (cs *chainSync) handleReadyBlock(bd *types.BlockData, origin blockOrigin) error {
// if header was not requested, get it from the pending set
// if we're expecting headers, validate should ensure we have a header
if bd.Header == nil {
Expand Down Expand Up @@ -779,7 +802,7 @@ func (cs *chainSync) handleReadyBlock(bd *types.BlockData) error {
bd.Header = block.header
}

err := cs.processBlockData(*bd)
err := cs.processBlockData(*bd, origin)
if err != nil {
// depending on the error, we might want to save this block for later
logger.Errorf("block data processing for block with hash %s failed: %s", bd.Hash, err)
Expand All @@ -793,7 +816,7 @@ func (cs *chainSync) handleReadyBlock(bd *types.BlockData) error {
// processBlockData processes the BlockData from a BlockResponse and
// returns the index of the last BlockData it handled on success,
// or the index of the block data that errored on failure.
func (cs *chainSync) processBlockData(blockData types.BlockData) error {
func (cs *chainSync) processBlockData(blockData types.BlockData, origin blockOrigin) error {
// while in bootstrap mode we don't need to broadcast block announcements
announceImportedBlock := cs.getSyncMode() == tip
var blockDataJustification []byte
Expand All @@ -808,7 +831,7 @@ func (cs *chainSync) processBlockData(blockData types.BlockData) error {
}

if blockData.Body != nil {
err = cs.processBlockDataWithHeaderAndBody(blockData, announceImportedBlock)
err = cs.processBlockDataWithHeaderAndBody(blockData, origin, announceImportedBlock)
if err != nil {
return fmt.Errorf("processing block data with header and body: %w", err)
}
Expand Down Expand Up @@ -842,10 +865,13 @@ func (cs *chainSync) verifyJustification(headerHash common.Hash, justification [
}

func (cs *chainSync) processBlockDataWithHeaderAndBody(blockData types.BlockData,
announceImportedBlock bool) (err error) {
err = cs.babeVerifier.VerifyBlock(blockData.Header)
if err != nil {
return fmt.Errorf("babe verifying block: %w", err)
origin blockOrigin, announceImportedBlock bool) (err error) {

if origin != networkInitialSync {
err = cs.babeVerifier.VerifyBlock(blockData.Header)
if err != nil {
return fmt.Errorf("babe verifying block: %w", err)
}
}

cs.handleBody(blockData.Body)
Expand Down
Loading

0 comments on commit 8e1650e

Please sign in to comment.