diff --git a/pkg/protocol/chain.go b/pkg/protocol/chain.go index 8e48e1613..3ec93212b 100644 --- a/pkg/protocol/chain.go +++ b/pkg/protocol/chain.go @@ -48,9 +48,8 @@ type Chain struct { // WarpSyncMode contains a flag that indicates whether this chain is in warp sync mode. WarpSyncMode reactive.Variable[bool] - // WarpSyncThreshold contains the slot at which the chain will exit warp sync mode which is derived from the latest - // network slot minus the max committable age. - WarpSyncThreshold reactive.Variable[iotago.SlotIndex] + // LatestSyncedSlot contains the latest commitment of this chain for which all blocks were booked. + LatestSyncedSlot reactive.Variable[iotago.SlotIndex] // OutOfSyncThreshold contains the slot at which the chain will consider itself to be out of sync and switch to warp // sync mode. It is derived from the latest network slot minus two times the max committable age. @@ -93,7 +92,7 @@ func newChain(chains *Chains) *Chain { AttestedWeight: reactive.NewVariable[uint64](), VerifiedWeight: reactive.NewVariable[uint64](), WarpSyncMode: reactive.NewVariable[bool]().Init(true), - WarpSyncThreshold: reactive.NewVariable[iotago.SlotIndex](), + LatestSyncedSlot: reactive.NewVariable[iotago.SlotIndex](), OutOfSyncThreshold: reactive.NewVariable[iotago.SlotIndex](), RequestAttestations: reactive.NewVariable[bool](), StartEngine: reactive.NewVariable[bool](), @@ -187,13 +186,14 @@ func (c *Chain) initLogger() (shutdown func()) { return lo.Batch( c.WarpSyncMode.LogUpdates(c, log.LevelTrace, "WarpSyncMode"), - c.WarpSyncThreshold.LogUpdates(c, log.LevelTrace, "WarpSyncThreshold"), + c.LatestSyncedSlot.LogUpdates(c, log.LevelTrace, "LatestSyncedSlot"), c.OutOfSyncThreshold.LogUpdates(c, log.LevelTrace, "OutOfSyncThreshold"), c.ForkingPoint.LogUpdates(c, log.LevelTrace, "ForkingPoint", (*Commitment).LogName), c.ClaimedWeight.LogUpdates(c, log.LevelTrace, "ClaimedWeight"), c.AttestedWeight.LogUpdates(c, log.LevelTrace, "AttestedWeight"), c.VerifiedWeight.LogUpdates(c, log.LevelTrace, "VerifiedWeight"), c.LatestCommitment.LogUpdates(c, log.LevelTrace, "LatestCommitment", (*Commitment).LogName), + c.LatestAttestedCommitment.LogUpdates(c, log.LevelTrace, "LatestAttestedCommitment", (*Commitment).LogName), c.LatestProducedCommitment.LogUpdates(c, log.LevelDebug, "LatestProducedCommitment", (*Commitment).LogName), c.RequestAttestations.LogUpdates(c, log.LevelTrace, "RequestAttestations"), c.StartEngine.LogUpdates(c, log.LevelDebug, "StartEngine"), @@ -212,39 +212,23 @@ func (c *Chain) initDerivedProperties() (shutdown func()) { c.deriveLatestAttestedWeight(), c.deriveWarpSyncMode(), - c.ForkingPoint.WithValue(func(forkingPoint *Commitment) (shutdown func()) { - return c.deriveParentChain(forkingPoint) - }), - - c.ParentChain.WithNonEmptyValue(func(parentChain *Chain) (shutdown func()) { - return parentChain.deriveChildChains(c) - }), - - c.Engine.WithNonEmptyValue(func(engineInstance *engine.Engine) (shutdown func()) { - return lo.Batch( - c.deriveWarpSyncThreshold(c.chains.LatestSeenSlot, engineInstance), - c.deriveOutOfSyncThreshold(c.chains.LatestSeenSlot, engineInstance), - ) - }), + c.ForkingPoint.WithValue(c.deriveParentChain), + c.ParentChain.WithNonEmptyValue(lo.Bind(c, (*Chain).deriveChildChains)), + c.Engine.WithNonEmptyValue(c.deriveOutOfSyncThreshold), ) } // deriveWarpSyncMode defines how a chain determines whether it is in warp sync mode or not. func (c *Chain) deriveWarpSyncMode() func() { - return c.WarpSyncMode.DeriveValueFrom(reactive.NewDerivedVariable3(func(warpSyncMode bool, latestProducedCommitment *Commitment, warpSyncThreshold iotago.SlotIndex, outOfSyncThreshold iotago.SlotIndex) bool { - // if we have no latest produced commitment, then the engine is not yet initialized and warp sync is disabled - if latestProducedCommitment == nil { - return false - } - - // if warp sync mode is enabled, keep it enabled until we are no longer below the warp sync threshold + return c.WarpSyncMode.DeriveValueFrom(reactive.NewDerivedVariable3(func(warpSyncMode bool, latestSyncedSlot iotago.SlotIndex, latestSeenSlot iotago.SlotIndex, outOfSyncThreshold iotago.SlotIndex) bool { + // if warp sync mode is enabled, keep it enabled until we have synced all slots if warpSyncMode { - return latestProducedCommitment.ID().Slot() < warpSyncThreshold + return latestSyncedSlot < latestSeenSlot } // if warp sync mode is disabled, enable it only if we fall below the out of sync threshold - return latestProducedCommitment.ID().Slot() < outOfSyncThreshold - }, c.LatestProducedCommitment, c.WarpSyncThreshold, c.OutOfSyncThreshold, c.WarpSyncMode.Get())) + return latestSyncedSlot < outOfSyncThreshold + }, c.LatestSyncedSlot, c.chains.LatestSeenSlot, c.OutOfSyncThreshold, c.WarpSyncMode.Get())) } // deriveClaimedWeight defines how a chain determines its claimed weight (by setting the cumulative weight of the @@ -311,26 +295,14 @@ func (c *Chain) deriveParentChain(forkingPoint *Commitment) (shutdown func()) { // deriveOutOfSyncThreshold defines how a chain determines its "out of sync" threshold (the latest seen slot minus 2 // times the max committable age or 0 if this would cause an overflow to the negative numbers). -func (c *Chain) deriveOutOfSyncThreshold(latestSeenSlot reactive.ReadableVariable[iotago.SlotIndex], engineInstance *engine.Engine) func() { +func (c *Chain) deriveOutOfSyncThreshold(engineInstance *engine.Engine) func() { return c.OutOfSyncThreshold.DeriveValueFrom(reactive.NewDerivedVariable(func(_ iotago.SlotIndex, latestSeenSlot iotago.SlotIndex) iotago.SlotIndex { if outOfSyncOffset := 2 * engineInstance.LatestAPI().ProtocolParameters().MaxCommittableAge(); outOfSyncOffset < latestSeenSlot { return latestSeenSlot - outOfSyncOffset } return 0 - }, latestSeenSlot)) -} - -// deriveWarpSyncThreshold defines how a chain determines its warp sync threshold (the latest seen slot minus the max -// committable age or 0 if this would cause an overflow to the negative numbers). -func (c *Chain) deriveWarpSyncThreshold(latestSeenSlot reactive.ReadableVariable[iotago.SlotIndex], engineInstance *engine.Engine) func() { - return c.WarpSyncThreshold.DeriveValueFrom(reactive.NewDerivedVariable(func(_ iotago.SlotIndex, latestSeenSlot iotago.SlotIndex) iotago.SlotIndex { - if warpSyncOffset := engineInstance.LatestAPI().ProtocolParameters().MaxCommittableAge(); warpSyncOffset < latestSeenSlot { - return latestSeenSlot - warpSyncOffset - } - - return 0 - }, latestSeenSlot)) + }, c.chains.LatestSeenSlot)) } // addCommitment adds the given commitment to this chain. @@ -342,6 +314,7 @@ func (c *Chain) addCommitment(newCommitment *Commitment) (shutdown func()) { return lo.Batch( newCommitment.IsAttested.OnTrigger(func() { c.LatestAttestedCommitment.Set(newCommitment) }), newCommitment.IsVerified.OnTrigger(func() { c.LatestProducedCommitment.Set(newCommitment) }), + newCommitment.IsSynced.OnTrigger(func() { c.LatestSyncedSlot.Set(newCommitment.Slot()) }), ) } diff --git a/pkg/protocol/chains.go b/pkg/protocol/chains.go index 2f0051148..d41c4ebab 100644 --- a/pkg/protocol/chains.go +++ b/pkg/protocol/chains.go @@ -31,7 +31,7 @@ type Chains struct { // HeaviestVerifiedCandidate contains the candidate chain with the heaviest verified weight. HeaviestVerifiedCandidate reactive.Variable[*Chain] - // LatestSeenSlot contains the latest slot that was seen by any of the chains. + // LatestSeenSlot contains the slot of the latest commitment of any received block. LatestSeenSlot reactive.Variable[iotago.SlotIndex] // protocol contains a reference to the Protocol instance that this component belongs to. @@ -158,7 +158,7 @@ func (c *Chains) deriveLatestSeenSlot(protocol *Protocol) func() { }), protocol.Network.OnBlockReceived(func(block *model.Block, src peer.ID) { - c.LatestSeenSlot.Set(mainEngine.LatestAPI().TimeProvider().SlotFromTime(block.ProtocolBlock().Header.IssuingTime)) + c.LatestSeenSlot.Set(block.ProtocolBlock().Header.SlotCommitmentID.Slot()) }), ) }) diff --git a/pkg/protocol/commitment.go b/pkg/protocol/commitment.go index 88f454b6c..6c2bb4584 100644 --- a/pkg/protocol/commitment.go +++ b/pkg/protocol/commitment.go @@ -56,6 +56,12 @@ type Commitment struct { // IsAttested contains a flag indicating if we have received attestations for this Commitment. IsAttested reactive.Event + // IsSynced contains a flag that indicates if a Commitment was fully downloaded and processed. + IsSynced reactive.Event + + // IsCommittable contains a flag that indicates if a Commitment is ready to be committed by the warp sync process. + IsCommittable reactive.Event + // IsVerified contains a flag indicating if this Commitment is verified (we produced this Commitment ourselves by // booking all the contained blocks and transactions). IsVerified reactive.Event @@ -65,7 +71,7 @@ type Commitment struct { IsAboveLatestVerifiedCommitment reactive.Variable[bool] // ReplayDroppedBlocks contains a flag indicating if we should replay the blocks that were dropped while the - //Commitment was pending. + // Commitment was pending. ReplayDroppedBlocks reactive.Variable[bool] // IsEvicted contains a flag indicating if this Commitment was evicted from the Protocol. @@ -94,6 +100,8 @@ func newCommitment(commitments *Commitments, model *model.Commitment) *Commitmen CumulativeAttestedWeight: reactive.NewVariable[uint64](), IsRoot: reactive.NewEvent(), IsAttested: reactive.NewEvent(), + IsSynced: reactive.NewEvent(), + IsCommittable: reactive.NewEvent(), IsVerified: reactive.NewEvent(), IsAboveLatestVerifiedCommitment: reactive.NewVariable[bool](), ReplayDroppedBlocks: reactive.NewVariable[bool](), @@ -135,6 +143,8 @@ func (c *Commitment) initLogger() (shutdown func()) { c.CumulativeAttestedWeight.LogUpdates(c, log.LevelTrace, "CumulativeAttestedWeight"), c.IsRoot.LogUpdates(c, log.LevelTrace, "IsRoot"), c.IsAttested.LogUpdates(c, log.LevelTrace, "IsAttested"), + c.IsSynced.LogUpdates(c, log.LevelTrace, "IsSynced"), + c.IsCommittable.LogUpdates(c, log.LevelTrace, "IsCommittable"), c.IsVerified.LogUpdates(c, log.LevelTrace, "IsVerified"), c.ReplayDroppedBlocks.LogUpdates(c, log.LevelTrace, "ReplayDroppedBlocks"), c.IsEvicted.LogUpdates(c, log.LevelTrace, "IsEvicted"), @@ -149,8 +159,9 @@ func (c *Commitment) initDerivedProperties() (shutdown func()) { // mark commitments that are marked as root as verified c.IsVerified.InheritFrom(c.IsRoot), - // mark commitments that are marked as verified as attested + // mark commitments that are marked as verified as attested and synced c.IsAttested.InheritFrom(c.IsVerified), + c.IsSynced.InheritFrom(c.IsVerified), c.Parent.WithNonEmptyValue(func(parent *Commitment) func() { // the weight can be fixed as a one time operation (as it only relies on static information from the parent @@ -167,7 +178,11 @@ func (c *Commitment) initDerivedProperties() (shutdown func()) { c.Chain.WithNonEmptyValue(func(chain *Chain) func() { return lo.Batch( c.deriveRequestAttestations(chain, parent), - c.deriveWarpSyncBlocks(chain, parent), + + // only start requesting blocks once the engine is ready + chain.WithInitializedEngine(func(_ *engine.Engine) (shutdown func()) { + return c.deriveWarpSyncBlocks(chain, parent) + }), ) }), ) @@ -259,11 +274,11 @@ func (c *Commitment) deriveRequestAttestations(chain *Chain, parent *Commitment) } // deriveWarpSyncBlocks derives the WarpSyncBlocks flag of this Commitment which is true if our Chain is requesting -// warp sync, and we are the directly above the latest verified Commitment. +// warp sync, and we are the directly above the latest commitment that is synced (has downloaded everything). func (c *Commitment) deriveWarpSyncBlocks(chain *Chain, parent *Commitment) func() { - return c.WarpSyncBlocks.DeriveValueFrom(reactive.NewDerivedVariable4(func(_ bool, engineInstance *engine.Engine, warpSync bool, parentIsVerified bool, isVerified bool) bool { - return engineInstance != nil && warpSync && parentIsVerified && !isVerified - }, chain.Engine, chain.WarpSyncMode, parent.IsVerified, c.IsVerified)) + return c.WarpSyncBlocks.DeriveValueFrom(reactive.NewDerivedVariable3(func(_ bool, warpSyncMode bool, parentIsSynced bool, isSynced bool) bool { + return warpSyncMode && parentIsSynced && !isSynced + }, chain.WarpSyncMode, parent.IsSynced, c.IsSynced)) } // deriveReplayDroppedBlocks derives the ReplayDroppedBlocks flag of this Commitment which is true if our Chain has an @@ -278,7 +293,7 @@ func (c *Commitment) deriveReplayDroppedBlocks(chain *Chain) func() { // the parent is on the target Chain. func (c *Commitment) forceChain(targetChain *Chain) { if currentChain := c.Chain.Get(); currentChain != targetChain { - if parent := c.Parent.Get(); parent.Chain.Get() == targetChain { + if parent := c.Parent.Get(); parent != nil && parent.Chain.Get() == targetChain { parent.MainChild.Set(c) } } diff --git a/pkg/protocol/commitment_verifier.go b/pkg/protocol/commitment_verifier.go index ab600fbc9..3ee2b4cc8 100644 --- a/pkg/protocol/commitment_verifier.go +++ b/pkg/protocol/commitment_verifier.go @@ -5,6 +5,7 @@ import ( "github.com/iotaledger/hive.go/ds" "github.com/iotaledger/hive.go/ierrors" "github.com/iotaledger/hive.go/kvstore/mapdb" + "github.com/iotaledger/hive.go/runtime/syncutils" "github.com/iotaledger/iota-core/pkg/model" "github.com/iotaledger/iota-core/pkg/protocol/engine" "github.com/iotaledger/iota-core/pkg/protocol/engine/accounts" @@ -22,6 +23,9 @@ type CommitmentVerifier struct { // validatorAccountsData is the accounts data of the validators for the current epoch as known at lastCommonSlotBeforeFork. // Initially, it is set to the accounts data of the validators for the epoch of the last common commitment before the fork. validatorAccountsData map[iotago.AccountID]*accounts.AccountData + + // mutex is used to synchronize access to validatorAccountsData and epoch. + mutex syncutils.RWMutex } func newCommitmentVerifier(mainEngine *engine.Engine, lastCommonCommitmentBeforeFork *model.Commitment) (*CommitmentVerifier, error) { @@ -76,6 +80,7 @@ func (c *CommitmentVerifier) verifyCommitment(commitment *Commitment, attestatio // This is necessary because the committee might have rotated at the epoch boundary and different validators might be part of it. // In case anything goes wrong we keep using previously known accounts data (initially set to the accounts data // of the validators for the epoch of the last common commitment before the fork). + c.mutex.Lock() apiForSlot := c.engine.APIForSlot(commitment.Slot()) commitmentEpoch := apiForSlot.TimeProvider().EpochFromSlot(commitment.Slot()) if commitmentEpoch > c.epoch { @@ -92,6 +97,7 @@ func (c *CommitmentVerifier) verifyCommitment(commitment *Commitment, attestatio } } } + c.mutex.Unlock() // 3. Verify attestations. blockIDs, seatCount, err := c.verifyAttestations(attestations) @@ -107,6 +113,9 @@ func (c *CommitmentVerifier) verifyCommitment(commitment *Commitment, attestatio } func (c *CommitmentVerifier) verifyAttestations(attestations []*iotago.Attestation) (iotago.BlockIDs, uint64, error) { + c.mutex.RLock() + defer c.mutex.RUnlock() + visitedIdentities := ds.NewSet[iotago.AccountID]() var blockIDs iotago.BlockIDs var seatCount uint64 diff --git a/pkg/protocol/engine/blocks/block.go b/pkg/protocol/engine/blocks/block.go index 0dc76a6db..e1263a460 100644 --- a/pkg/protocol/engine/blocks/block.go +++ b/pkg/protocol/engine/blocks/block.go @@ -49,7 +49,7 @@ type Block struct { dropped bool // Notarization - notarized reactive.Variable[bool] + notarized reactive.Event mutex syncutils.RWMutex @@ -88,7 +88,7 @@ func NewBlock(data *model.Block) *Block { booked: reactive.NewVariable[bool](), accepted: reactive.NewVariable[bool](), weightPropagated: reactive.NewVariable[bool](), - notarized: reactive.NewVariable[bool](), + notarized: reactive.NewEvent(), workScore: data.WorkScore(), } } @@ -112,7 +112,7 @@ func NewRootBlock(blockID iotago.BlockID, commitmentID iotago.CommitmentID, issu preAccepted: true, accepted: reactive.NewVariable[bool](), weightPropagated: reactive.NewVariable[bool](), - notarized: reactive.NewVariable[bool](), + notarized: reactive.NewEvent(), scheduled: true, } @@ -140,7 +140,7 @@ func NewMissingBlock(blockID iotago.BlockID) *Block { booked: reactive.NewVariable[bool](), accepted: reactive.NewVariable[bool](), weightPropagated: reactive.NewVariable[bool](), - notarized: reactive.NewVariable[bool](), + notarized: reactive.NewEvent(), } } @@ -622,7 +622,7 @@ func (b *Block) SetWeightPropagated() (wasUpdated bool) { return !b.weightPropagated.Set(true) } -func (b *Block) Notarized() reactive.Variable[bool] { +func (b *Block) Notarized() reactive.Event { return b.notarized } @@ -631,7 +631,7 @@ func (b *Block) IsNotarized() (isBooked bool) { } func (b *Block) SetNotarized() (wasUpdated bool) { - return !b.notarized.Set(true) + return b.notarized.Trigger() } func (b *Block) String() string { diff --git a/pkg/protocol/protocol.go b/pkg/protocol/protocol.go index 7bef7c575..436ef0dcd 100644 --- a/pkg/protocol/protocol.go +++ b/pkg/protocol/protocol.go @@ -89,7 +89,7 @@ func New(logger log.Logger, workers *workerpool.Group, networkEndpoint network.E p.Constructed.Trigger() - p.waitMainEngineInitialized() + p.waitInitialized() }) } @@ -204,14 +204,14 @@ func (p *Protocol) initNetwork() (shutdown func()) { ) } -// waitMainEngineInitialized waits until the main engine is initialized. -func (p *Protocol) waitMainEngineInitialized() { +// waitInitialized waits until the main engine is initialized (published its root commitment). +func (p *Protocol) waitInitialized() { var waitInitialized sync.WaitGroup waitInitialized.Add(1) - p.Engines.Main.OnUpdateOnce(func(_ *engine.Engine, engine *engine.Engine) { - engine.Initialized.OnTrigger(waitInitialized.Done) - }) + p.Commitments.Root.OnUpdateOnce(func(_ *Commitment, _ *Commitment) { + waitInitialized.Done() + }, func(_ *Commitment, rootCommitment *Commitment) bool { return rootCommitment != nil }) waitInitialized.Wait() } diff --git a/pkg/protocol/protocol_attestations.go b/pkg/protocol/protocol_attestations.go index 819813758..f2c4c3153 100644 --- a/pkg/protocol/protocol_attestations.go +++ b/pkg/protocol/protocol_attestations.go @@ -30,7 +30,7 @@ type AttestationsProtocol struct { // commitmentVerifiers contains the commitment verifiers that are used to verify received attestations. commitmentVerifiers *shrinkingmap.ShrinkingMap[iotago.CommitmentID, *CommitmentVerifier] - // Logger embeds a logger that can be used to log messages emitted by this chain. + // Logger embeds a logger that can be used to log messages emitted by this component. log.Logger } diff --git a/pkg/protocol/protocol_blocks.go b/pkg/protocol/protocol_blocks.go index b18983f3b..3afd74807 100644 --- a/pkg/protocol/protocol_blocks.go +++ b/pkg/protocol/protocol_blocks.go @@ -53,21 +53,12 @@ func newBlocksProtocol(protocol *Protocol) *BlocksProtocol { }) }) - protocol.Chains.WithElements(func(chain *Chain) func() { - return chain.Engine.WithNonEmptyValue(func(engineInstance *engine.Engine) (shutdown func()) { - return engineInstance.Events.BlockRequester.Tick.Hook(b.SendRequest).Unhook - }) - }) - - protocol.Chains.Main.Get().Engine.OnUpdateWithContext(func(_ *engine.Engine, engine *engine.Engine, unsubscribeOnEngineChange func(subscriptionFactory func() (unsubscribe func()))) { - if engine != nil { - unsubscribeOnEngineChange(func() (unsubscribe func()) { - return lo.Batch( - engine.Events.Scheduler.BlockScheduled.Hook(func(block *blocks.Block) { b.SendResponse(block.ModelBlock()) }).Unhook, - engine.Events.Scheduler.BlockSkipped.Hook(func(block *blocks.Block) { b.SendResponse(block.ModelBlock()) }).Unhook, - ) - }) - } + protocol.Chains.WithInitializedEngines(func(chain *Chain, engine *engine.Engine) (shutdown func()) { + return lo.Batch( + engine.Events.BlockRequester.Tick.Hook(b.SendRequest).Unhook, + engine.Events.Scheduler.BlockScheduled.Hook(func(block *blocks.Block) { b.SendResponse(block.ModelBlock()) }).Unhook, + engine.Events.Scheduler.BlockSkipped.Hook(func(block *blocks.Block) { b.SendResponse(block.ModelBlock()) }).Unhook, + ) }) }) diff --git a/pkg/protocol/protocol_warp_sync.go b/pkg/protocol/protocol_warp_sync.go index 6a5c4daf9..e42f0c6e8 100644 --- a/pkg/protocol/protocol_warp_sync.go +++ b/pkg/protocol/protocol_warp_sync.go @@ -8,11 +8,13 @@ import ( "github.com/iotaledger/hive.go/ads" "github.com/iotaledger/hive.go/core/eventticker" "github.com/iotaledger/hive.go/ds" + "github.com/iotaledger/hive.go/ds/reactive" "github.com/iotaledger/hive.go/ierrors" "github.com/iotaledger/hive.go/kvstore/mapdb" "github.com/iotaledger/hive.go/lo" "github.com/iotaledger/hive.go/log" "github.com/iotaledger/hive.go/runtime/workerpool" + "github.com/iotaledger/iota-core/pkg/protocol/engine" iotago "github.com/iotaledger/iota.go/v4" "github.com/iotaledger/iota.go/v4/merklehasher" ) @@ -44,7 +46,16 @@ func newWarpSyncProtocol(protocol *Protocol) *WarpSyncProtocol { c.ticker.Events.Tick.Hook(c.SendRequest) protocol.Constructed.OnTrigger(func() { - c.protocol.Commitments.WithElements(func(commitment *Commitment) (shutdown func()) { + protocol.Chains.WithInitializedEngines(func(chain *Chain, engine *engine.Engine) (shutdown func()) { + return chain.WarpSyncMode.OnUpdate(func(_ bool, warpSyncModeEnabled bool) { + if warpSyncModeEnabled { + engine.Workers.WaitChildren() + engine.Reset() + } + }) + }) + + protocol.Commitments.WithElements(func(commitment *Commitment) (shutdown func()) { return commitment.WarpSyncBlocks.OnUpdate(func(_ bool, warpSyncBlocks bool) { if warpSyncBlocks { c.ticker.StartTicker(commitment.ID()) @@ -156,81 +167,102 @@ func (w *WarpSyncProtocol) ProcessResponse(commitmentID iotago.CommitmentID, blo return blocksToWarpSync } - // make sure the engine is clean and requires a warp-sync before we start processing the blocks - if targetEngine.Workers.WaitChildren(); targetEngine.Storage.Settings().LatestCommitment().ID().Slot() > commitmentID.Slot() { - return blocksToWarpSync - } - targetEngine.Reset() - // Once all blocks are booked we // 1. Mark all transactions as accepted // 2. Mark all blocks as accepted // 3. Force commitment of the slot - var bookedBlocks atomic.Uint32 - var notarizedBlocks atomic.Uint32 - - forceCommitmentFunc := func() { - // 3. Force commitment of the slot - producedCommitment, err := targetEngine.Notarization.ForceCommit(commitmentID.Slot()) - if err != nil { - w.protocol.LogError("failed to force commitment", "commitmentID", commitmentID, "err", err) - + commitmentFunc := func() { + if !chain.WarpSyncMode.Get() { return } - // 4. Verify that the produced commitment is the same as the initially requested one - if producedCommitment.ID() != commitmentID { - w.protocol.LogError("commitment does not match", "expectedCommitmentID", commitmentID, "producedCommitmentID", producedCommitment.ID()) + // 0. Prepare data flow + var ( + notarizedBlocksCount uint64 + allBlocksNotarized = reactive.NewEvent() + ) - return + // 1. Mark all transactions as accepted + for _, transactionID := range transactionIDs { + targetEngine.Ledger.SpendDAG().SetAccepted(transactionID) } - } - if totalBlocks == 0 { - forceCommitmentFunc() + // 2. Mark all blocks as accepted and wait for them to be notarized + if totalBlocks == 0 { + allBlocksNotarized.Trigger() + } else { + for _, blockIDs := range blockIDsBySlotCommitment { + for _, blockID := range blockIDs { + block, exists := targetEngine.BlockCache.Block(blockID) + if !exists { // this should never happen as we just booked these blocks in this slot. + continue + } - return blocksToWarpSync - } + targetEngine.BlockGadget.SetAccepted(block) - blockBookedFunc := func(_ bool, _ bool) { - if bookedBlocks.Add(1) != totalBlocks { - return + block.Notarized().OnTrigger(func() { + if atomic.AddUint64(¬arizedBlocksCount, 1) == uint64(totalBlocks) { + allBlocksNotarized.Trigger() + } + }) + } + } } - // 1. Mark all transactions as accepted - for _, transactionID := range transactionIDs { - targetEngine.Ledger.SpendDAG().SetAccepted(transactionID) - } + allBlocksNotarized.OnTrigger(func() { + // This needs to happen in a separate worker since the trigger for block notarized while the lock in + // the notarization is still held. + w.workerPool.Submit(func() { + // 3. Force commitment of the slot + producedCommitment, err := targetEngine.Notarization.ForceCommit(commitmentID.Slot()) + if err != nil { + w.protocol.LogError("failed to force commitment", "commitmentID", commitmentID, "err", err) - // 2. Mark all blocks as accepted - for _, blockIDs := range blockIDsBySlotCommitment { - for _, blockID := range blockIDs { - block, exists := targetEngine.BlockCache.Block(blockID) - if !exists { // this should never happen as we just booked these blocks in this slot. - continue + return } - targetEngine.BlockGadget.SetAccepted(block) + // 4. Verify that the produced commitment is the same as the initially requested one + if producedCommitment.ID() != commitmentID { + w.protocol.LogError("commitment does not match", "expectedCommitmentID", commitmentID, "producedCommitmentID", producedCommitment.ID()) - block.Notarized().OnUpdate(func(_ bool, _ bool) { - // Wait for all blocks to be notarized before forcing the commitment of the slot. - if notarizedBlocks.Add(1) != totalBlocks { - return - } + return + } + }) + }) + } - forceCommitmentFunc() - }) + // Once all blocks are fully booked we can mark the commitment that is minCommittableAge older as this + // commitment to be committable. + commitment.IsSynced.OnUpdateOnce(func(_ bool, _ bool) { + // update the flag in a worker since it can potentially cause a commit + w.workerPool.Submit(func() { + if committableCommitment, exists := chain.Commitment(commitmentID.Slot() - targetEngine.LatestAPI().ProtocolParameters().MinCommittableAge()); exists { + committableCommitment.IsCommittable.Set(true) } - } + }) + }) + + // force commit one by one and wait for the parent to be verified before we commit the next one + commitment.Parent.WithNonEmptyValue(func(parent *Commitment) (teardown func()) { + return parent.IsVerified.WithNonEmptyValue(func(_ bool) (teardown func()) { + return commitment.IsCommittable.OnTrigger(commitmentFunc) + }) + }) + + if totalBlocks == 0 { + // mark empty slots as committable and synced + commitment.IsCommittable.Set(true) + commitment.IsSynced.Set(true) + + return blocksToWarpSync } + var bookedBlocks atomic.Uint32 blocksToWarpSync = ds.NewSet[iotago.BlockID]() - for slotCommitmentID, blockIDs := range blockIDsBySlotCommitment { + for _, blockIDs := range blockIDsBySlotCommitment { for _, blockID := range blockIDs { blocksToWarpSync.Add(blockID) - w.LogError("requesting block", "blockID", blockID) - block, _ := targetEngine.BlockDAG.GetOrRequestBlock(blockID) if block == nil { w.protocol.LogError("failed to request block", "blockID", blockID) @@ -238,12 +270,15 @@ func (w *WarpSyncProtocol) ProcessResponse(commitmentID iotago.CommitmentID, blo continue } - // We need to make sure that we add all blocks as root blocks because we don't know which blocks are root blocks without - // blocks from future slots. We're committing the current slot which then leads to the eviction of the blocks from the - // block cache and thus if not root blocks no block in the next slot can become solid. - targetEngine.EvictionState.AddRootBlock(block.ID(), slotCommitmentID) + // We need to make sure that all blocks are fully booked and their weight propagated before we can + // move the window forward. This is in order to ensure that confirmation and finalization is correctly propagated. + block.WeightPropagated().OnUpdate(func(_ bool, _ bool) { + if bookedBlocks.Add(1) != totalBlocks { + return + } - block.Booked().OnUpdate(blockBookedFunc) + commitment.IsSynced.Set(true) + }) } } diff --git a/pkg/tests/protocol_engine_switching_test.go b/pkg/tests/protocol_engine_switching_test.go index a6ab346f3..046ffbd99 100644 --- a/pkg/tests/protocol_engine_switching_test.go +++ b/pkg/tests/protocol_engine_switching_test.go @@ -377,7 +377,7 @@ func TestProtocol_EngineSwitching(t *testing.T) { func TestProtocol_EngineSwitching_CommitteeRotation(t *testing.T) { ts := testsuite.NewTestSuite(t, - testsuite.WithWaitFor(30*time.Second), + testsuite.WithWaitFor(15*time.Second), testsuite.WithProtocolParametersOptions( iotago.WithTimeProviderOptions( @@ -402,7 +402,7 @@ func TestProtocol_EngineSwitching_CommitteeRotation(t *testing.T) { node2 := ts.AddValidatorNode("node2") node3 := ts.AddValidatorNode("node3") - const expectedCommittedSlotAfterPartitionMerge = 19 + const expectedCommittedSlotAfterPartitionMerge = 18 nodesP1 := []*mock.Node{node0, node1, node2} nodesP2 := []*mock.Node{node3} @@ -605,6 +605,9 @@ func TestProtocol_EngineSwitching_CommitteeRotation(t *testing.T) { // Here we need to let enough time pass for the nodes to sync up the candidate engines and switch them ts.AssertMainEngineSwitchedCount(1, nodesP2...) + // Make sure that enough activity messages are issued so that a block in slot 21 gets accepted and triggers commitment of slot 18. + time.Sleep(3 * time.Second) + ctxP1Cancel() wg.Wait() } @@ -613,12 +616,10 @@ func TestProtocol_EngineSwitching_CommitteeRotation(t *testing.T) { // Those nodes should also have all the blocks from the target fork P1 and should not have blocks from P2. // This is to make sure that the storage was copied correctly during engine switching. ts.AssertBlocksExist(ts.BlocksWithPrefix("P0"), true, ts.Nodes()...) - ts.AssertBlocksExist(ts.BlocksWithPrefix("P1"), true, ts.Nodes()...) // not all blocks of slot 19 are available on node3 (buffer issue?) + ts.AssertBlocksExist(ts.BlocksWithPrefix("P1"), true, ts.Nodes()...) ts.AssertBlocksExist(ts.BlocksWithPrefix("P2"), false, ts.Nodes()...) - ts.AssertNodeState(ts.Nodes(), - testsuite.WithEqualStoredCommitmentAtIndex(expectedCommittedSlotAfterPartitionMerge), - ) + ts.AssertEqualStoredCommitmentAtIndex(expectedCommittedSlotAfterPartitionMerge, ts.Nodes()...) // Assert committee in epoch 1. ts.AssertSybilProtectionCandidates(0, ts.AccountsOfNodes("node1", "node2"), ts.Nodes()...) diff --git a/pkg/testsuite/mock/node.go b/pkg/testsuite/mock/node.go index ab8d94abd..32394616a 100644 --- a/pkg/testsuite/mock/node.go +++ b/pkg/testsuite/mock/node.go @@ -225,11 +225,11 @@ func (n *Node) hookLogging(failOnBlockFiltered bool) { }) } -func (n *Node) attachEngineLogsWithName(failOnBlockFiltered bool, instance *engine.Engine, engineName string) { +func (n *Node) attachEngineLogsWithName(failOnBlockFiltered bool, instance *engine.Engine) { events := instance.Events events.BlockDAG.BlockAttached.Hook(func(block *blocks.Block) { - fmt.Printf("%s > [%s] BlockDAG.BlockAttached: %s\n", n.Name, engineName, block.ID()) + instance.LogTrace("BlockDAG.BlockAttached", "block", block.ID()) n.mutex.Lock() defer n.mutex.Unlock() @@ -237,78 +237,80 @@ func (n *Node) attachEngineLogsWithName(failOnBlockFiltered bool, instance *engi }) events.BlockDAG.BlockSolid.Hook(func(block *blocks.Block) { - fmt.Printf("%s > [%s] BlockDAG.BlockSolid: %s\n", n.Name, engineName, block.ID()) + instance.LogTrace("BlockDAG.BlockSolid", "block", block.ID()) }) events.BlockDAG.BlockInvalid.Hook(func(block *blocks.Block, err error) { - fmt.Printf("%s > [%s] BlockDAG.BlockInvalid: %s - %s\n", n.Name, engineName, block.ID(), err) + instance.LogTrace("BlockDAG.BlockInvalid", "block", block.ID(), "err", err) }) events.BlockDAG.BlockMissing.Hook(func(block *blocks.Block) { - fmt.Printf("%s > [%s] BlockDAG.BlockMissing: %s\n", n.Name, engineName, block.ID()) + instance.LogTrace("BlockDAG.BlockMissing", "block", block.ID()) }) events.BlockDAG.MissingBlockAttached.Hook(func(block *blocks.Block) { - fmt.Printf("%s > [%s] BlockDAG.MissingBlockAttached: %s\n", n.Name, engineName, block.ID()) + instance.LogTrace("BlockDAG.MissingBlockAttached", "block", block.ID()) }) events.SeatManager.BlockProcessed.Hook(func(block *blocks.Block) { - fmt.Printf("%s > [%s] SybilProtection.BlockProcessed: %s\n", n.Name, engineName, block.ID()) + instance.LogTrace("SeatManager.BlockProcessed", "block", block.ID()) }) events.Booker.BlockBooked.Hook(func(block *blocks.Block) { - fmt.Printf("%s > [%s] Booker.BlockBooked: %s\n", n.Name, engineName, block.ID()) + instance.LogTrace("Booker.BlockBooked", "block", block.ID()) }) events.Booker.BlockInvalid.Hook(func(block *blocks.Block, err error) { - fmt.Printf("%s > [%s] Booker.BlockInvalid: %s - %s\n", n.Name, engineName, block.ID(), err.Error()) + instance.LogTrace("Booker.BlockInvalid", "block", block.ID(), "err", err) }) events.Booker.TransactionInvalid.Hook(func(metadata mempool.TransactionMetadata, err error) { - fmt.Printf("%s > [%s] Booker.TransactionInvalid: %s - %s\n", n.Name, engineName, metadata.ID(), err.Error()) + instance.LogTrace("Booker.TransactionInvalid", "tx", metadata.ID(), "err", err) }) events.Scheduler.BlockScheduled.Hook(func(block *blocks.Block) { - fmt.Printf("%s > [%s] Scheduler.BlockScheduled: %s\n", n.Name, engineName, block.ID()) + instance.LogTrace("Scheduler.BlockScheduled", "block", block.ID()) }) events.Scheduler.BlockEnqueued.Hook(func(block *blocks.Block) { - fmt.Printf("%s > [%s] Scheduler.BlockEnqueued: %s\n", n.Name, engineName, block.ID()) + instance.LogTrace("Scheduler.BlockEnqueued", "block", block.ID()) }) events.Scheduler.BlockSkipped.Hook(func(block *blocks.Block) { - fmt.Printf("%s > [%s] Scheduler.BlockSkipped: %s\n", n.Name, engineName, block.ID()) + instance.LogTrace("Scheduler.BlockSkipped", "block", block.ID()) }) events.Scheduler.BlockDropped.Hook(func(block *blocks.Block, err error) { - fmt.Printf("%s > [%s] Scheduler.BlockDropped: %s - %s\n", n.Name, engineName, block.ID(), err.Error()) + instance.LogTrace("Scheduler.BlockDropped", "block", block.ID(), "err", err) }) events.Clock.AcceptedTimeUpdated.Hook(func(newTime time.Time) { - fmt.Printf("%s > [%s] Clock.AcceptedTimeUpdated: %s [Slot %d]\n", n.Name, engineName, newTime, instance.LatestAPI().TimeProvider().SlotFromTime(newTime)) + instance.LogTrace("Clock.AcceptedTimeUpdated", "time", newTime, "slot", instance.LatestAPI().TimeProvider().SlotFromTime(newTime)) }) events.Clock.ConfirmedTimeUpdated.Hook(func(newTime time.Time) { - fmt.Printf("%s > [%s] Clock.ConfirmedTimeUpdated: %s [Slot %d]\n", n.Name, engineName, newTime, instance.LatestAPI().TimeProvider().SlotFromTime(newTime)) + instance.LogTrace("Clock.ConfirmedTimeUpdated", "time", newTime, "slot", instance.LatestAPI().TimeProvider().SlotFromTime(newTime)) }) events.PreSolidFilter.BlockPreAllowed.Hook(func(block *model.Block) { - fmt.Printf("%s > [%s] PreSolidFilter.BlockPreAllowed: %s\n", n.Name, engineName, block.ID()) + instance.LogTrace("PreSolidFilter.BlockPreAllowed", "block", block.ID()) }) events.PreSolidFilter.BlockPreFiltered.Hook(func(event *presolidfilter.BlockPreFilteredEvent) { - fmt.Printf("%s > [%s] PreSolidFilter.BlockPreFiltered: %s - %s\n", n.Name, engineName, event.Block.ID(), event.Reason.Error()) + instance.LogTrace("PreSolidFilter.BlockPreFiltered", "block", event.Block.ID(), "err", event.Reason) + if failOnBlockFiltered { n.Testing.Fatal("no blocks should be prefiltered") } }) events.PostSolidFilter.BlockAllowed.Hook(func(block *blocks.Block) { - fmt.Printf("%s > [%s] PostSolidFilter.BlockAllowed: %s\n", n.Name, engineName, block.ID()) + instance.LogTrace("PostSolidFilter.BlockAllowed", "block", block.ID()) }) events.PostSolidFilter.BlockFiltered.Hook(func(event *postsolidfilter.BlockFilteredEvent) { - fmt.Printf("%s > [%s] PostSolidFilter.BlockFiltered: %s - %s\n", n.Name, engineName, event.Block.ID(), event.Reason.Error()) + instance.LogTrace("PostSolidFilter.BlockFiltered", "block", event.Block.ID(), "err", event.Reason) + if failOnBlockFiltered { n.Testing.Fatal("no blocks should be filtered") } @@ -319,11 +321,11 @@ func (n *Node) attachEngineLogsWithName(failOnBlockFiltered bool, instance *engi }) events.BlockRequester.Tick.Hook(func(blockID iotago.BlockID) { - fmt.Printf("%s > [%s] BlockRequester.Tick: %s\n", n.Name, engineName, blockID) + instance.LogTrace("BlockRequester.Tick", "block", blockID) }) events.BlockProcessed.Hook(func(blockID iotago.BlockID) { - fmt.Printf("%s > [%s] Engine.BlockProcessed: %s\n", n.Name, engineName, blockID) + instance.LogTrace("BlockProcessed", "block", blockID) }) events.Notarization.SlotCommitted.Hook(func(details *notarization.SlotCommittedDetails) { @@ -350,117 +352,116 @@ func (n *Node) attachEngineLogsWithName(failOnBlockFiltered bool, instance *engi require.NoError(n.Testing, err) } - fmt.Printf("%s > [%s] NotarizationManager.SlotCommitted: %s %s Accepted Blocks: %s\n %s\n Attestations: %s\n", n.Name, engineName, details.Commitment.ID(), details.Commitment, acceptedBlocks, roots, attestationBlockIDs) + instance.LogTrace("NotarizationManager.SlotCommitted", "commitment", details.Commitment.ID(), "acceptedBlocks", acceptedBlocks, "roots", roots, "attestations", attestationBlockIDs) }) events.Notarization.LatestCommitmentUpdated.Hook(func(commitment *model.Commitment) { - fmt.Printf("%s > [%s] NotarizationManager.LatestCommitmentUpdated: %s\n", n.Name, engineName, commitment.ID()) + instance.LogTrace("NotarizationManager.LatestCommitmentUpdated", "commitment", commitment.ID()) }) events.BlockGadget.BlockPreAccepted.Hook(func(block *blocks.Block) { - fmt.Printf("%s > [%s] Consensus.BlockGadget.BlockPreAccepted: %s %s\n", n.Name, engineName, block.ID(), block.ProtocolBlock().Header.SlotCommitmentID) + instance.LogTrace("BlockGadget.BlockPreAccepted", "block", block.ID(), "slotCommitmentID", block.ProtocolBlock().Header.SlotCommitmentID) }) events.BlockGadget.BlockAccepted.Hook(func(block *blocks.Block) { - fmt.Printf("%s > [%s] Consensus.BlockGadget.BlockAccepted: %s @ slot %s committing to %s\n", n.Name, engineName, block.ID(), block.ID().Slot(), block.ProtocolBlock().Header.SlotCommitmentID) + instance.LogTrace("BlockGadget.BlockAccepted", "block", block.ID(), "slotCommitmentID", block.ProtocolBlock().Header.SlotCommitmentID) }) events.BlockGadget.BlockPreConfirmed.Hook(func(block *blocks.Block) { - fmt.Printf("%s > [%s] Consensus.BlockGadget.BlockPreConfirmed: %s %s\n", n.Name, engineName, block.ID(), block.ProtocolBlock().Header.SlotCommitmentID) + instance.LogTrace("BlockGadget.BlockPreConfirmed", "block", block.ID(), "slotCommitmentID", block.ProtocolBlock().Header.SlotCommitmentID) }) events.BlockGadget.BlockConfirmed.Hook(func(block *blocks.Block) { - fmt.Printf("%s > [%s] Consensus.BlockGadget.BlockConfirmed: %s %s\n", n.Name, engineName, block.ID(), block.ProtocolBlock().Header.SlotCommitmentID) + instance.LogTrace("BlockGadget.BlockConfirmed", "block", block.ID(), "slotCommitmentID", block.ProtocolBlock().Header.SlotCommitmentID) }) events.SlotGadget.SlotFinalized.Hook(func(slot iotago.SlotIndex) { - fmt.Printf("%s > [%s] Consensus.SlotGadget.SlotFinalized: %s\n", n.Name, engineName, slot) + instance.LogTrace("SlotGadget.SlotFinalized", "slot", slot) }) events.SeatManager.OnlineCommitteeSeatAdded.Hook(func(seat account.SeatIndex, accountID iotago.AccountID) { - fmt.Printf("%s > [%s] SybilProtection.OnlineCommitteeSeatAdded: %d - %s\n", n.Name, engineName, seat, accountID) + instance.LogTrace("SybilProtection.OnlineCommitteeSeatAdded", "seat", seat, "accountID", accountID) }) events.SeatManager.OnlineCommitteeSeatRemoved.Hook(func(seat account.SeatIndex) { - fmt.Printf("%s > [%s] SybilProtection.OnlineCommitteeSeatRemoved: %d\n", n.Name, engineName, seat) + instance.LogTrace("SybilProtection.OnlineCommitteeSeatRemoved", "seat", seat) }) events.SybilProtection.CommitteeSelected.Hook(func(committee *account.Accounts, epoch iotago.EpochIndex) { - fmt.Printf("%s > [%s] SybilProtection.CommitteeSelected: epoch %d - %s\n", n.Name, engineName, epoch, committee.IDs()) + instance.LogTrace("SybilProtection.CommitteeSelected", "epoch", epoch, "committee", committee.IDs()) }) - events.SpendDAG.SpenderCreated.Hook(func(spenderID iotago.TransactionID) { - fmt.Printf("%s > [%s] SpendDAG.SpendCreated: %s\n", n.Name, engineName, spenderID) + events.SpendDAG.SpenderCreated.Hook(func(conflictID iotago.TransactionID) { + instance.LogTrace("SpendDAG.SpenderCreated", "conflictID", conflictID) }) - events.SpendDAG.SpenderEvicted.Hook(func(spenderID iotago.TransactionID) { - fmt.Printf("%s > [%s] SpendDAG.SpendEvicted: %s\n", n.Name, engineName, spenderID) + events.SpendDAG.SpenderEvicted.Hook(func(conflictID iotago.TransactionID) { + instance.LogTrace("SpendDAG.SpenderEvicted", "conflictID", conflictID) }) - events.SpendDAG.SpenderRejected.Hook(func(spenderID iotago.TransactionID) { - fmt.Printf("%s > [%s] SpendDAG.SpendRejected: %s\n", n.Name, engineName, spenderID) + + events.SpendDAG.SpenderRejected.Hook(func(conflictID iotago.TransactionID) { + instance.LogTrace("SpendDAG.SpenderRejected", "conflictID", conflictID) }) - events.SpendDAG.SpenderAccepted.Hook(func(spenderID iotago.TransactionID) { - fmt.Printf("%s > [%s] SpendDAG.SpendAccepted: %s\n", n.Name, engineName, spenderID) + events.SpendDAG.SpenderAccepted.Hook(func(conflictID iotago.TransactionID) { + instance.LogTrace("SpendDAG.SpenderAccepted", "conflictID", conflictID) }) instance.Ledger.MemPool().OnSignedTransactionAttached( func(signedTransactionMetadata mempool.SignedTransactionMetadata) { signedTransactionMetadata.OnSignaturesInvalid(func(err error) { - fmt.Printf("%s > [%s] MemPool.SignedTransactionSignaturesInvalid(%s): %s\n", n.Name, engineName, err, signedTransactionMetadata.ID()) + instance.LogTrace("MemPool.SignedTransactionSignaturesInvalid", "tx", signedTransactionMetadata.ID(), "err", err) }) }, ) instance.Ledger.OnTransactionAttached(func(transactionMetadata mempool.TransactionMetadata) { - fmt.Printf("%s > [%s] Ledger.TransactionAttached: %s\n", n.Name, engineName, transactionMetadata.ID()) + instance.LogTrace("Ledger.TransactionAttached", "tx", transactionMetadata.ID()) transactionMetadata.OnSolid(func() { - fmt.Printf("%s > [%s] MemPool.TransactionSolid: %s\n", n.Name, engineName, transactionMetadata.ID()) + instance.LogTrace("MemPool.TransactionSolid", "tx", transactionMetadata.ID()) }) transactionMetadata.OnExecuted(func() { - fmt.Printf("%s > [%s] MemPool.TransactionExecuted: %s\n", n.Name, engineName, transactionMetadata.ID()) + instance.LogTrace("MemPool.TransactionExecuted", "tx", transactionMetadata.ID()) }) transactionMetadata.OnBooked(func() { - fmt.Printf("%s > [%s] MemPool.TransactionBooked: %s\n", n.Name, engineName, transactionMetadata.ID()) + instance.LogTrace("MemPool.TransactionBooked", "tx", transactionMetadata.ID()) }) transactionMetadata.OnConflicting(func() { - fmt.Printf("%s > [%s] MemPool.TransactionConflicting: %s\n", n.Name, engineName, transactionMetadata.ID()) + instance.LogTrace("MemPool.TransactionConflicting", "tx", transactionMetadata.ID()) }) transactionMetadata.OnAccepted(func() { - fmt.Printf("%s > [%s] MemPool.TransactionAccepted: %s\n", n.Name, engineName, transactionMetadata.ID()) + instance.LogTrace("MemPool.TransactionAccepted", "tx", transactionMetadata.ID()) }) transactionMetadata.OnRejected(func() { - fmt.Printf("%s > [%s] MemPool.TransactionRejected: %s\n", n.Name, engineName, transactionMetadata.ID()) + instance.LogTrace("MemPool.TransactionRejected", "tx", transactionMetadata.ID()) }) transactionMetadata.OnInvalid(func(err error) { - fmt.Printf("%s > [%s] MemPool.TransactionInvalid(%s): %s\n", n.Name, engineName, err, transactionMetadata.ID()) + instance.LogTrace("MemPool.TransactionInvalid", "tx", transactionMetadata.ID(), "err", err) }) transactionMetadata.OnOrphanedSlotUpdated(func(slot iotago.SlotIndex) { - fmt.Printf("%s > [%s] MemPool.TransactionOrphanedSlotUpdated in slot %d: %s\n", n.Name, engineName, slot, transactionMetadata.ID()) + instance.LogTrace("MemPool.TransactionOrphanedSlotUpdated", "tx", transactionMetadata.ID(), "slot", slot) }) transactionMetadata.OnCommittedSlotUpdated(func(slot iotago.SlotIndex) { - fmt.Printf("%s > [%s] MemPool.TransactionCommittedSlotUpdated in slot %d: %s\n", n.Name, engineName, slot, transactionMetadata.ID()) + instance.LogTrace("MemPool.TransactionCommittedSlotUpdated", "tx", transactionMetadata.ID(), "slot", slot) }) transactionMetadata.OnPending(func() { - fmt.Printf("%s > [%s] MemPool.TransactionPending: %s\n", n.Name, engineName, transactionMetadata.ID()) + instance.LogTrace("MemPool.TransactionPending", "tx", transactionMetadata.ID()) }) }) } func (n *Node) attachEngineLogs(failOnBlockFiltered bool, instance *engine.Engine) { - engineName := fmt.Sprintf("%s - %s", lo.Cond(n.Protocol.Engines.Main.Get() != instance, "Candidate", "Main"), instance.Name()[:8]) - - n.attachEngineLogsWithName(failOnBlockFiltered, instance, engineName) + n.attachEngineLogsWithName(failOnBlockFiltered, instance) } func (n *Node) Wait() {