Skip to content

Commit

Permalink
Merge pull request #568 from iotaledger/fix/warpSync
Browse files Browse the repository at this point in the history
Fix: WarpSync
  • Loading branch information
karimodm authored Dec 4, 2023
2 parents 610eb3d + ac6a993 commit edf541f
Show file tree
Hide file tree
Showing 11 changed files with 222 additions and 197 deletions.
59 changes: 16 additions & 43 deletions pkg/protocol/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,8 @@ type Chain struct {
// WarpSyncMode contains a flag that indicates whether this chain is in warp sync mode.
WarpSyncMode reactive.Variable[bool]

// WarpSyncThreshold contains the slot at which the chain will exit warp sync mode which is derived from the latest
// network slot minus the max committable age.
WarpSyncThreshold reactive.Variable[iotago.SlotIndex]
// LatestSyncedSlot contains the latest commitment of this chain for which all blocks were booked.
LatestSyncedSlot reactive.Variable[iotago.SlotIndex]

// OutOfSyncThreshold contains the slot at which the chain will consider itself to be out of sync and switch to warp
// sync mode. It is derived from the latest network slot minus two times the max committable age.
Expand Down Expand Up @@ -93,7 +92,7 @@ func newChain(chains *Chains) *Chain {
AttestedWeight: reactive.NewVariable[uint64](),
VerifiedWeight: reactive.NewVariable[uint64](),
WarpSyncMode: reactive.NewVariable[bool]().Init(true),
WarpSyncThreshold: reactive.NewVariable[iotago.SlotIndex](),
LatestSyncedSlot: reactive.NewVariable[iotago.SlotIndex](),
OutOfSyncThreshold: reactive.NewVariable[iotago.SlotIndex](),
RequestAttestations: reactive.NewVariable[bool](),
StartEngine: reactive.NewVariable[bool](),
Expand Down Expand Up @@ -187,13 +186,14 @@ func (c *Chain) initLogger() (shutdown func()) {

return lo.Batch(
c.WarpSyncMode.LogUpdates(c, log.LevelTrace, "WarpSyncMode"),
c.WarpSyncThreshold.LogUpdates(c, log.LevelTrace, "WarpSyncThreshold"),
c.LatestSyncedSlot.LogUpdates(c, log.LevelTrace, "LatestSyncedSlot"),
c.OutOfSyncThreshold.LogUpdates(c, log.LevelTrace, "OutOfSyncThreshold"),
c.ForkingPoint.LogUpdates(c, log.LevelTrace, "ForkingPoint", (*Commitment).LogName),
c.ClaimedWeight.LogUpdates(c, log.LevelTrace, "ClaimedWeight"),
c.AttestedWeight.LogUpdates(c, log.LevelTrace, "AttestedWeight"),
c.VerifiedWeight.LogUpdates(c, log.LevelTrace, "VerifiedWeight"),
c.LatestCommitment.LogUpdates(c, log.LevelTrace, "LatestCommitment", (*Commitment).LogName),
c.LatestAttestedCommitment.LogUpdates(c, log.LevelTrace, "LatestAttestedCommitment", (*Commitment).LogName),
c.LatestProducedCommitment.LogUpdates(c, log.LevelDebug, "LatestProducedCommitment", (*Commitment).LogName),
c.RequestAttestations.LogUpdates(c, log.LevelTrace, "RequestAttestations"),
c.StartEngine.LogUpdates(c, log.LevelDebug, "StartEngine"),
Expand All @@ -212,39 +212,23 @@ func (c *Chain) initDerivedProperties() (shutdown func()) {
c.deriveLatestAttestedWeight(),
c.deriveWarpSyncMode(),

c.ForkingPoint.WithValue(func(forkingPoint *Commitment) (shutdown func()) {
return c.deriveParentChain(forkingPoint)
}),

c.ParentChain.WithNonEmptyValue(func(parentChain *Chain) (shutdown func()) {
return parentChain.deriveChildChains(c)
}),

c.Engine.WithNonEmptyValue(func(engineInstance *engine.Engine) (shutdown func()) {
return lo.Batch(
c.deriveWarpSyncThreshold(c.chains.LatestSeenSlot, engineInstance),
c.deriveOutOfSyncThreshold(c.chains.LatestSeenSlot, engineInstance),
)
}),
c.ForkingPoint.WithValue(c.deriveParentChain),
c.ParentChain.WithNonEmptyValue(lo.Bind(c, (*Chain).deriveChildChains)),
c.Engine.WithNonEmptyValue(c.deriveOutOfSyncThreshold),
)
}

// deriveWarpSyncMode defines how a chain determines whether it is in warp sync mode or not.
func (c *Chain) deriveWarpSyncMode() func() {
return c.WarpSyncMode.DeriveValueFrom(reactive.NewDerivedVariable3(func(warpSyncMode bool, latestProducedCommitment *Commitment, warpSyncThreshold iotago.SlotIndex, outOfSyncThreshold iotago.SlotIndex) bool {
// if we have no latest produced commitment, then the engine is not yet initialized and warp sync is disabled
if latestProducedCommitment == nil {
return false
}

// if warp sync mode is enabled, keep it enabled until we are no longer below the warp sync threshold
return c.WarpSyncMode.DeriveValueFrom(reactive.NewDerivedVariable3(func(warpSyncMode bool, latestSyncedSlot iotago.SlotIndex, latestSeenSlot iotago.SlotIndex, outOfSyncThreshold iotago.SlotIndex) bool {
// if warp sync mode is enabled, keep it enabled until we have synced all slots
if warpSyncMode {
return latestProducedCommitment.ID().Slot() < warpSyncThreshold
return latestSyncedSlot < latestSeenSlot
}

// if warp sync mode is disabled, enable it only if we fall below the out of sync threshold
return latestProducedCommitment.ID().Slot() < outOfSyncThreshold
}, c.LatestProducedCommitment, c.WarpSyncThreshold, c.OutOfSyncThreshold, c.WarpSyncMode.Get()))
return latestSyncedSlot < outOfSyncThreshold
}, c.LatestSyncedSlot, c.chains.LatestSeenSlot, c.OutOfSyncThreshold, c.WarpSyncMode.Get()))
}

// deriveClaimedWeight defines how a chain determines its claimed weight (by setting the cumulative weight of the
Expand Down Expand Up @@ -311,26 +295,14 @@ func (c *Chain) deriveParentChain(forkingPoint *Commitment) (shutdown func()) {

// deriveOutOfSyncThreshold defines how a chain determines its "out of sync" threshold (the latest seen slot minus 2
// times the max committable age or 0 if this would cause an overflow to the negative numbers).
func (c *Chain) deriveOutOfSyncThreshold(latestSeenSlot reactive.ReadableVariable[iotago.SlotIndex], engineInstance *engine.Engine) func() {
func (c *Chain) deriveOutOfSyncThreshold(engineInstance *engine.Engine) func() {
return c.OutOfSyncThreshold.DeriveValueFrom(reactive.NewDerivedVariable(func(_ iotago.SlotIndex, latestSeenSlot iotago.SlotIndex) iotago.SlotIndex {
if outOfSyncOffset := 2 * engineInstance.LatestAPI().ProtocolParameters().MaxCommittableAge(); outOfSyncOffset < latestSeenSlot {
return latestSeenSlot - outOfSyncOffset
}

return 0
}, latestSeenSlot))
}

// deriveWarpSyncThreshold defines how a chain determines its warp sync threshold (the latest seen slot minus the max
// committable age or 0 if this would cause an overflow to the negative numbers).
func (c *Chain) deriveWarpSyncThreshold(latestSeenSlot reactive.ReadableVariable[iotago.SlotIndex], engineInstance *engine.Engine) func() {
return c.WarpSyncThreshold.DeriveValueFrom(reactive.NewDerivedVariable(func(_ iotago.SlotIndex, latestSeenSlot iotago.SlotIndex) iotago.SlotIndex {
if warpSyncOffset := engineInstance.LatestAPI().ProtocolParameters().MaxCommittableAge(); warpSyncOffset < latestSeenSlot {
return latestSeenSlot - warpSyncOffset
}

return 0
}, latestSeenSlot))
}, c.chains.LatestSeenSlot))
}

// addCommitment adds the given commitment to this chain.
Expand All @@ -342,6 +314,7 @@ func (c *Chain) addCommitment(newCommitment *Commitment) (shutdown func()) {
return lo.Batch(
newCommitment.IsAttested.OnTrigger(func() { c.LatestAttestedCommitment.Set(newCommitment) }),
newCommitment.IsVerified.OnTrigger(func() { c.LatestProducedCommitment.Set(newCommitment) }),
newCommitment.IsSynced.OnTrigger(func() { c.LatestSyncedSlot.Set(newCommitment.Slot()) }),
)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/protocol/chains.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type Chains struct {
// HeaviestVerifiedCandidate contains the candidate chain with the heaviest verified weight.
HeaviestVerifiedCandidate reactive.Variable[*Chain]

// LatestSeenSlot contains the latest slot that was seen by any of the chains.
// LatestSeenSlot contains the slot of the latest commitment of any received block.
LatestSeenSlot reactive.Variable[iotago.SlotIndex]

// protocol contains a reference to the Protocol instance that this component belongs to.
Expand Down Expand Up @@ -158,7 +158,7 @@ func (c *Chains) deriveLatestSeenSlot(protocol *Protocol) func() {
}),

protocol.Network.OnBlockReceived(func(block *model.Block, src peer.ID) {
c.LatestSeenSlot.Set(mainEngine.LatestAPI().TimeProvider().SlotFromTime(block.ProtocolBlock().Header.IssuingTime))
c.LatestSeenSlot.Set(block.ProtocolBlock().Header.SlotCommitmentID.Slot())
}),
)
})
Expand Down
31 changes: 23 additions & 8 deletions pkg/protocol/commitment.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ type Commitment struct {
// IsAttested contains a flag indicating if we have received attestations for this Commitment.
IsAttested reactive.Event

// IsSynced contains a flag that indicates if a Commitment was fully downloaded and processed.
IsSynced reactive.Event

// IsCommittable contains a flag that indicates if a Commitment is ready to be committed by the warp sync process.
IsCommittable reactive.Event

// IsVerified contains a flag indicating if this Commitment is verified (we produced this Commitment ourselves by
// booking all the contained blocks and transactions).
IsVerified reactive.Event
Expand All @@ -65,7 +71,7 @@ type Commitment struct {
IsAboveLatestVerifiedCommitment reactive.Variable[bool]

// ReplayDroppedBlocks contains a flag indicating if we should replay the blocks that were dropped while the
//Commitment was pending.
// Commitment was pending.
ReplayDroppedBlocks reactive.Variable[bool]

// IsEvicted contains a flag indicating if this Commitment was evicted from the Protocol.
Expand Down Expand Up @@ -94,6 +100,8 @@ func newCommitment(commitments *Commitments, model *model.Commitment) *Commitmen
CumulativeAttestedWeight: reactive.NewVariable[uint64](),
IsRoot: reactive.NewEvent(),
IsAttested: reactive.NewEvent(),
IsSynced: reactive.NewEvent(),
IsCommittable: reactive.NewEvent(),
IsVerified: reactive.NewEvent(),
IsAboveLatestVerifiedCommitment: reactive.NewVariable[bool](),
ReplayDroppedBlocks: reactive.NewVariable[bool](),
Expand Down Expand Up @@ -135,6 +143,8 @@ func (c *Commitment) initLogger() (shutdown func()) {
c.CumulativeAttestedWeight.LogUpdates(c, log.LevelTrace, "CumulativeAttestedWeight"),
c.IsRoot.LogUpdates(c, log.LevelTrace, "IsRoot"),
c.IsAttested.LogUpdates(c, log.LevelTrace, "IsAttested"),
c.IsSynced.LogUpdates(c, log.LevelTrace, "IsSynced"),
c.IsCommittable.LogUpdates(c, log.LevelTrace, "IsCommittable"),
c.IsVerified.LogUpdates(c, log.LevelTrace, "IsVerified"),
c.ReplayDroppedBlocks.LogUpdates(c, log.LevelTrace, "ReplayDroppedBlocks"),
c.IsEvicted.LogUpdates(c, log.LevelTrace, "IsEvicted"),
Expand All @@ -149,8 +159,9 @@ func (c *Commitment) initDerivedProperties() (shutdown func()) {
// mark commitments that are marked as root as verified
c.IsVerified.InheritFrom(c.IsRoot),

// mark commitments that are marked as verified as attested
// mark commitments that are marked as verified as attested and synced
c.IsAttested.InheritFrom(c.IsVerified),
c.IsSynced.InheritFrom(c.IsVerified),

c.Parent.WithNonEmptyValue(func(parent *Commitment) func() {
// the weight can be fixed as a one time operation (as it only relies on static information from the parent
Expand All @@ -167,7 +178,11 @@ func (c *Commitment) initDerivedProperties() (shutdown func()) {
c.Chain.WithNonEmptyValue(func(chain *Chain) func() {
return lo.Batch(
c.deriveRequestAttestations(chain, parent),
c.deriveWarpSyncBlocks(chain, parent),

// only start requesting blocks once the engine is ready
chain.WithInitializedEngine(func(_ *engine.Engine) (shutdown func()) {
return c.deriveWarpSyncBlocks(chain, parent)
}),
)
}),
)
Expand Down Expand Up @@ -259,11 +274,11 @@ func (c *Commitment) deriveRequestAttestations(chain *Chain, parent *Commitment)
}

// deriveWarpSyncBlocks derives the WarpSyncBlocks flag of this Commitment which is true if our Chain is requesting
// warp sync, and we are the directly above the latest verified Commitment.
// warp sync, and we are the directly above the latest commitment that is synced (has downloaded everything).
func (c *Commitment) deriveWarpSyncBlocks(chain *Chain, parent *Commitment) func() {
return c.WarpSyncBlocks.DeriveValueFrom(reactive.NewDerivedVariable4(func(_ bool, engineInstance *engine.Engine, warpSync bool, parentIsVerified bool, isVerified bool) bool {
return engineInstance != nil && warpSync && parentIsVerified && !isVerified
}, chain.Engine, chain.WarpSyncMode, parent.IsVerified, c.IsVerified))
return c.WarpSyncBlocks.DeriveValueFrom(reactive.NewDerivedVariable3(func(_ bool, warpSyncMode bool, parentIsSynced bool, isSynced bool) bool {
return warpSyncMode && parentIsSynced && !isSynced
}, chain.WarpSyncMode, parent.IsSynced, c.IsSynced))
}

// deriveReplayDroppedBlocks derives the ReplayDroppedBlocks flag of this Commitment which is true if our Chain has an
Expand All @@ -278,7 +293,7 @@ func (c *Commitment) deriveReplayDroppedBlocks(chain *Chain) func() {
// the parent is on the target Chain.
func (c *Commitment) forceChain(targetChain *Chain) {
if currentChain := c.Chain.Get(); currentChain != targetChain {
if parent := c.Parent.Get(); parent.Chain.Get() == targetChain {
if parent := c.Parent.Get(); parent != nil && parent.Chain.Get() == targetChain {
parent.MainChild.Set(c)
}
}
Expand Down
9 changes: 9 additions & 0 deletions pkg/protocol/commitment_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/iotaledger/hive.go/ds"
"github.com/iotaledger/hive.go/ierrors"
"github.com/iotaledger/hive.go/kvstore/mapdb"
"github.com/iotaledger/hive.go/runtime/syncutils"
"github.com/iotaledger/iota-core/pkg/model"
"github.com/iotaledger/iota-core/pkg/protocol/engine"
"github.com/iotaledger/iota-core/pkg/protocol/engine/accounts"
Expand All @@ -22,6 +23,9 @@ type CommitmentVerifier struct {
// validatorAccountsData is the accounts data of the validators for the current epoch as known at lastCommonSlotBeforeFork.
// Initially, it is set to the accounts data of the validators for the epoch of the last common commitment before the fork.
validatorAccountsData map[iotago.AccountID]*accounts.AccountData

// mutex is used to synchronize access to validatorAccountsData and epoch.
mutex syncutils.RWMutex
}

func newCommitmentVerifier(mainEngine *engine.Engine, lastCommonCommitmentBeforeFork *model.Commitment) (*CommitmentVerifier, error) {
Expand Down Expand Up @@ -76,6 +80,7 @@ func (c *CommitmentVerifier) verifyCommitment(commitment *Commitment, attestatio
// This is necessary because the committee might have rotated at the epoch boundary and different validators might be part of it.
// In case anything goes wrong we keep using previously known accounts data (initially set to the accounts data
// of the validators for the epoch of the last common commitment before the fork).
c.mutex.Lock()
apiForSlot := c.engine.APIForSlot(commitment.Slot())
commitmentEpoch := apiForSlot.TimeProvider().EpochFromSlot(commitment.Slot())
if commitmentEpoch > c.epoch {
Expand All @@ -92,6 +97,7 @@ func (c *CommitmentVerifier) verifyCommitment(commitment *Commitment, attestatio
}
}
}
c.mutex.Unlock()

// 3. Verify attestations.
blockIDs, seatCount, err := c.verifyAttestations(attestations)
Expand All @@ -107,6 +113,9 @@ func (c *CommitmentVerifier) verifyCommitment(commitment *Commitment, attestatio
}

func (c *CommitmentVerifier) verifyAttestations(attestations []*iotago.Attestation) (iotago.BlockIDs, uint64, error) {
c.mutex.RLock()
defer c.mutex.RUnlock()

visitedIdentities := ds.NewSet[iotago.AccountID]()
var blockIDs iotago.BlockIDs
var seatCount uint64
Expand Down
12 changes: 6 additions & 6 deletions pkg/protocol/engine/blocks/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type Block struct {
dropped bool

// Notarization
notarized reactive.Variable[bool]
notarized reactive.Event

mutex syncutils.RWMutex

Expand Down Expand Up @@ -88,7 +88,7 @@ func NewBlock(data *model.Block) *Block {
booked: reactive.NewVariable[bool](),
accepted: reactive.NewVariable[bool](),
weightPropagated: reactive.NewVariable[bool](),
notarized: reactive.NewVariable[bool](),
notarized: reactive.NewEvent(),
workScore: data.WorkScore(),
}
}
Expand All @@ -112,7 +112,7 @@ func NewRootBlock(blockID iotago.BlockID, commitmentID iotago.CommitmentID, issu
preAccepted: true,
accepted: reactive.NewVariable[bool](),
weightPropagated: reactive.NewVariable[bool](),
notarized: reactive.NewVariable[bool](),
notarized: reactive.NewEvent(),
scheduled: true,
}

Expand Down Expand Up @@ -140,7 +140,7 @@ func NewMissingBlock(blockID iotago.BlockID) *Block {
booked: reactive.NewVariable[bool](),
accepted: reactive.NewVariable[bool](),
weightPropagated: reactive.NewVariable[bool](),
notarized: reactive.NewVariable[bool](),
notarized: reactive.NewEvent(),
}
}

Expand Down Expand Up @@ -622,7 +622,7 @@ func (b *Block) SetWeightPropagated() (wasUpdated bool) {
return !b.weightPropagated.Set(true)
}

func (b *Block) Notarized() reactive.Variable[bool] {
func (b *Block) Notarized() reactive.Event {
return b.notarized
}

Expand All @@ -631,7 +631,7 @@ func (b *Block) IsNotarized() (isBooked bool) {
}

func (b *Block) SetNotarized() (wasUpdated bool) {
return !b.notarized.Set(true)
return b.notarized.Trigger()
}

func (b *Block) String() string {
Expand Down
12 changes: 6 additions & 6 deletions pkg/protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func New(logger log.Logger, workers *workerpool.Group, networkEndpoint network.E

p.Constructed.Trigger()

p.waitMainEngineInitialized()
p.waitInitialized()
})
}

Expand Down Expand Up @@ -204,14 +204,14 @@ func (p *Protocol) initNetwork() (shutdown func()) {
)
}

// waitMainEngineInitialized waits until the main engine is initialized.
func (p *Protocol) waitMainEngineInitialized() {
// waitInitialized waits until the main engine is initialized (published its root commitment).
func (p *Protocol) waitInitialized() {
var waitInitialized sync.WaitGroup

waitInitialized.Add(1)
p.Engines.Main.OnUpdateOnce(func(_ *engine.Engine, engine *engine.Engine) {
engine.Initialized.OnTrigger(waitInitialized.Done)
})
p.Commitments.Root.OnUpdateOnce(func(_ *Commitment, _ *Commitment) {
waitInitialized.Done()
}, func(_ *Commitment, rootCommitment *Commitment) bool { return rootCommitment != nil })

waitInitialized.Wait()
}
2 changes: 1 addition & 1 deletion pkg/protocol/protocol_attestations.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type AttestationsProtocol struct {
// commitmentVerifiers contains the commitment verifiers that are used to verify received attestations.
commitmentVerifiers *shrinkingmap.ShrinkingMap[iotago.CommitmentID, *CommitmentVerifier]

// Logger embeds a logger that can be used to log messages emitted by this chain.
// Logger embeds a logger that can be used to log messages emitted by this component.
log.Logger
}

Expand Down
21 changes: 6 additions & 15 deletions pkg/protocol/protocol_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,21 +53,12 @@ func newBlocksProtocol(protocol *Protocol) *BlocksProtocol {
})
})

protocol.Chains.WithElements(func(chain *Chain) func() {
return chain.Engine.WithNonEmptyValue(func(engineInstance *engine.Engine) (shutdown func()) {
return engineInstance.Events.BlockRequester.Tick.Hook(b.SendRequest).Unhook
})
})

protocol.Chains.Main.Get().Engine.OnUpdateWithContext(func(_ *engine.Engine, engine *engine.Engine, unsubscribeOnEngineChange func(subscriptionFactory func() (unsubscribe func()))) {
if engine != nil {
unsubscribeOnEngineChange(func() (unsubscribe func()) {
return lo.Batch(
engine.Events.Scheduler.BlockScheduled.Hook(func(block *blocks.Block) { b.SendResponse(block.ModelBlock()) }).Unhook,
engine.Events.Scheduler.BlockSkipped.Hook(func(block *blocks.Block) { b.SendResponse(block.ModelBlock()) }).Unhook,
)
})
}
protocol.Chains.WithInitializedEngines(func(chain *Chain, engine *engine.Engine) (shutdown func()) {
return lo.Batch(
engine.Events.BlockRequester.Tick.Hook(b.SendRequest).Unhook,
engine.Events.Scheduler.BlockScheduled.Hook(func(block *blocks.Block) { b.SendResponse(block.ModelBlock()) }).Unhook,
engine.Events.Scheduler.BlockSkipped.Hook(func(block *blocks.Block) { b.SendResponse(block.ModelBlock()) }).Unhook,
)
})
})

Expand Down
Loading

0 comments on commit edf541f

Please sign in to comment.