Skip to content
This repository was archived by the owner on Jan 24, 2025. It is now read-only.

Commit

Permalink
Merge branch 'feat/warpsync' of github.com:iotaledger/iota-core into …
Browse files Browse the repository at this point in the history
…feat/reactive-chainmanager
  • Loading branch information
hmoog committed Aug 21, 2023
2 parents fd9d4b9 + 6695551 commit d54ab5f
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 26 deletions.
52 changes: 30 additions & 22 deletions pkg/protocol/block_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"github.com/iotaledger/hive.go/ds/types"
"github.com/iotaledger/hive.go/ierrors"
"github.com/iotaledger/hive.go/kvstore/mapdb"
"github.com/iotaledger/hive.go/lo"
"github.com/iotaledger/hive.go/runtime/options"
"github.com/iotaledger/hive.go/runtime/workerpool"
"github.com/iotaledger/iota-core/pkg/core/buffer"
"github.com/iotaledger/iota-core/pkg/model"
Expand Down Expand Up @@ -44,25 +46,26 @@ type BlockDispatcher struct {

// shutdownEvent is a reactive event that is triggered when the BlockDispatcher instance is stopped.
shutdownEvent reactive.Event

// optWarpSyncWindowSize is the optional warp sync window size.
optWarpSyncWindowSize iotago.SlotIndex
}

// NewBlockDispatcher creates a new BlockDispatcher instance.
func NewBlockDispatcher(protocol *Protocol) *BlockDispatcher {
b := &BlockDispatcher{
func NewBlockDispatcher(protocol *Protocol, opts ...options.Option[BlockDispatcher]) *BlockDispatcher {
return options.Apply(&BlockDispatcher{
protocol: protocol,
dispatchWorkers: protocol.Workers.CreatePool("BlockDispatcher.Dispatch"),
warpSyncWorkers: protocol.Workers.CreatePool("BlockDispatcher.WarpSync", 1),
unsolidCommitmentBlocks: buffer.NewUnsolidCommitmentBuffer[*types.Tuple[*model.Block, network.PeerID]](20, 100),
pendingWarpSyncRequests: eventticker.New[iotago.SlotIndex, iotago.CommitmentID](eventticker.RetryInterval[iotago.SlotIndex, iotago.CommitmentID](WarpSyncRetryInterval)),
processedWarpSyncRequests: ds.NewSet[iotago.CommitmentID](),
shutdownEvent: reactive.NewEvent(),
}

protocol.HookConstructed(b.initEngineMonitoring)
protocol.HookInitialized(b.initNetworkConnection)
protocol.HookStopped(b.shutdown)

return b
}, opts, func(b *BlockDispatcher) {
protocol.HookConstructed(b.initEngineMonitoring)
protocol.HookInitialized(b.initNetworkConnection)
protocol.HookStopped(b.shutdown)
})
}

// Dispatch dispatches the given block to the correct engine instance.
Expand Down Expand Up @@ -107,7 +110,11 @@ func (b *BlockDispatcher) initEngineMonitoring() {
}, b.dispatchWorkers)

b.runTask(func() {
b.warpSyncIfNecessary(b.targetEngine(chainCommitment), chainCommitment)
// warpsync only if the observed commitment is at least two commitments ahead.
targetEngine := b.targetEngine(chainCommitment)
if targetEngine != nil && chainCommitment.Commitment().Index() > targetEngine.Storage.Settings().LatestCommitment().Index()+1 {
b.warpSync(targetEngine, chainCommitment)
}
}, b.warpSyncWorkers)
})
})
Expand Down Expand Up @@ -238,24 +245,23 @@ func (b *BlockDispatcher) inWarpSyncRange(engine *engine.Engine, block *model.Bl
latestCommitmentIndex := engine.Storage.Settings().LatestCommitment().Index()
maxCommittableAge := engine.APIForSlot(slotCommitmentID.Index()).ProtocolParameters().MaxCommittableAge()

return aboveWarpSyncThreshold(block.ID().Index(), latestCommitmentIndex, maxCommittableAge) && slotCommitmentID.Index() > latestCommitmentIndex
return block.ID().Index() > latestCommitmentIndex+maxCommittableAge
}

// warpSyncIfNecessary checks if a warp sync is necessary and starts the process if that is the case.
func (b *BlockDispatcher) warpSyncIfNecessary(e *engine.Engine, chainCommitment *chainmanager.ChainCommitment) {
// warpSync triggers warp sync from the latest committed slot up to the warpsync window.
func (b *BlockDispatcher) warpSync(e *engine.Engine, chainCommitment *chainmanager.ChainCommitment) {
if e == nil || chainCommitment == nil {
return
}

chain := chainCommitment.Chain()
maxCommittableAge := e.APIForSlot(chainCommitment.Commitment().Index()).ProtocolParameters().MaxCommittableAge()
warpSyncWindowSize := lo.Cond(maxCommittableAge > b.optWarpSyncWindowSize, maxCommittableAge, b.optWarpSyncWindowSize)
latestCommitmentIndex := e.Storage.Settings().LatestCommitment().Index()

if aboveWarpSyncThreshold(chainCommitment.Commitment().Index(), latestCommitmentIndex, maxCommittableAge) {
for slotToWarpSync := latestCommitmentIndex + 1; slotToWarpSync <= latestCommitmentIndex+maxCommittableAge; slotToWarpSync++ {
if commitmentToSync := chain.Commitment(slotToWarpSync); commitmentToSync != nil && !b.processedWarpSyncRequests.Has(commitmentToSync.ID()) {
b.pendingWarpSyncRequests.StartTicker(commitmentToSync.ID())
}
for slotToWarpSync := latestCommitmentIndex + 1; slotToWarpSync <= latestCommitmentIndex+warpSyncWindowSize; slotToWarpSync++ {
if commitmentToSync := chain.Commitment(slotToWarpSync); commitmentToSync != nil && !b.processedWarpSyncRequests.Has(commitmentToSync.ID()) {
b.pendingWarpSyncRequests.StartTicker(commitmentToSync.ID())
}
}
}
Expand Down Expand Up @@ -291,8 +297,7 @@ func (b *BlockDispatcher) monitorLatestEngineCommitment(engineInstance *engine.E
engineInstance.HookStopped(engineInstance.Events.Notarization.LatestCommitmentUpdated.Hook(func(commitment *model.Commitment) {
if chainCommitment, exists := b.protocol.ChainManager.Commitment(commitment.ID()); exists {
b.processedWarpSyncRequests.Delete(commitment.ID())

b.warpSyncIfNecessary(engineInstance, chainCommitment)
b.warpSync(engineInstance, chainCommitment)
}
}).Unhook)
}
Expand Down Expand Up @@ -329,8 +334,11 @@ func (b *BlockDispatcher) runTask(task func(), pool *workerpool.WorkerPool) {
})
}

func aboveWarpSyncThreshold(slot iotago.SlotIndex, latestCommitmentIndex iotago.SlotIndex, maxCommittableAge iotago.SlotIndex) bool {
return slot > latestCommitmentIndex+2*maxCommittableAge
// WithWarpSyncWindowSize is an option for the BlockDispatcher that allows to set the warp sync window size.
func WithWarpSyncWindowSize(size iotago.SlotIndex) options.Option[BlockDispatcher] {
return func(b *BlockDispatcher) {
b.optWarpSyncWindowSize = size
}
}

// WarpSyncRetryInterval is the interval in which a warp sync request is retried.
Expand Down
4 changes: 0 additions & 4 deletions pkg/protocol/protocol_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@ func (p *Protocol) runNetworkProtocol() {
}
}, event.WithWorkerPool(wpBlocks))

p.Events.Engine.BlockRequester.Tick.Hook(func(blockID iotago.BlockID) {
p.networkProtocol.RequestBlock(blockID)
}, event.WithWorkerPool(wpBlocks))

// Blocks are gossiped when they are scheduled or skipped.
p.Events.Engine.Scheduler.BlockScheduled.Hook(func(block *blocks.Block) {
p.networkProtocol.SendBlock(block.ModelBlock())
Expand Down

0 comments on commit d54ab5f

Please sign in to comment.