From ccc4fba52e7d96a0c6712f73c2d37e09a322465a Mon Sep 17 00:00:00 2001 From: muXxer Date: Wed, 20 Mar 2024 11:35:25 +0100 Subject: [PATCH] Cleanup the syncmanager --- .../trivialsyncmanager/syncmanager.go | 299 ++++++++++-------- pkg/tests/protocol_engine_switching_test.go | 6 +- pkg/tests/protocol_eviction_test.go | 2 +- 3 files changed, 171 insertions(+), 136 deletions(-) diff --git a/pkg/protocol/engine/syncmanager/trivialsyncmanager/syncmanager.go b/pkg/protocol/engine/syncmanager/trivialsyncmanager/syncmanager.go index 3fcb06165..fa862d499 100644 --- a/pkg/protocol/engine/syncmanager/trivialsyncmanager/syncmanager.go +++ b/pkg/protocol/engine/syncmanager/trivialsyncmanager/syncmanager.go @@ -19,38 +19,58 @@ type ( isBootstrappedFunc func(e *engine.Engine) bool ) +func WithSyncThreshold(threshold time.Duration) options.Option[SyncManager] { + return func(s *SyncManager) { + s.optsSyncThreshold = threshold + } +} + +func WithIsBootstrappedFunc(isBootstrapped isBootstrappedFunc) options.Option[SyncManager] { + return func(s *SyncManager) { + s.optsIsBootstrappedFunc = isBootstrapped + } +} + +func WithBootstrappedThreshold(threshold time.Duration) options.Option[SyncManager] { + return func(s *SyncManager) { + s.optsBootstrappedThreshold = threshold + } +} + type SyncManager struct { - events *syncmanager.Events - engine *engine.Engine - syncThreshold time.Duration + events *syncmanager.Events + engine *engine.Engine + + // options + optsSyncThreshold time.Duration + optsIsBootstrappedFunc isBootstrappedFunc + optsBootstrappedThreshold time.Duration + + // state + isBootstrappedLock syncutils.RWMutex + isBootstrapped bool + + isSyncedLock syncutils.RWMutex + isSynced bool + + isFinalizationDelayedLock syncutils.RWMutex + isFinalizationDelayed bool - lastAcceptedBlockSlot iotago.SlotIndex lastAcceptedBlockSlotLock syncutils.RWMutex + lastAcceptedBlockSlot iotago.SlotIndex - lastConfirmedBlockSlot iotago.SlotIndex lastConfirmedBlockSlotLock syncutils.RWMutex + lastConfirmedBlockSlot iotago.SlotIndex - latestCommitment *model.Commitment latestCommitmentLock syncutils.RWMutex + latestCommitment *model.Commitment - latestFinalizedSlot iotago.SlotIndex latestFinalizedSlotLock syncutils.RWMutex + latestFinalizedSlot iotago.SlotIndex + lastPrunedEpochLock syncutils.RWMutex lastPrunedEpoch iotago.EpochIndex hasPruned bool - lastPrunedEpochLock syncutils.RWMutex - - isSynced bool - isSyncedLock syncutils.RWMutex - - isFinalizationDelayed bool - isFinalizationDelayedLock syncutils.RWMutex - - isBootstrapped bool - isBootstrappedLock syncutils.RWMutex - - optsIsBootstrappedFunc isBootstrappedFunc - optsBootstrappedThreshold time.Duration module.Module } @@ -74,14 +94,15 @@ func NewProvider(opts ...options.Option[SyncManager]) module.Provider[*engine.En }, asyncOpt) e.Events.Notarization.LatestCommitmentUpdated.Hook(func(commitment *model.Commitment) { + var bootstrapChanged bool if !s.IsBootstrapped() { - s.updateBootstrappedStatus() + bootstrapChanged = s.updateBootstrappedStatus() } syncChanged := s.updateSyncStatus() commitmentChanged := s.updateLatestCommitment(commitment) - if syncChanged || commitmentChanged { + if bootstrapChanged || syncChanged || commitmentChanged { s.triggerUpdate() } }, asyncOpt) @@ -106,21 +127,23 @@ func NewProvider(opts ...options.Option[SyncManager]) module.Provider[*engine.En func New(subModule module.Module, e *engine.Engine, latestCommitment *model.Commitment, finalizedSlot iotago.SlotIndex, opts ...options.Option[SyncManager]) *SyncManager { return module.InitSimpleLifecycle(options.Apply(&SyncManager{ - Module: subModule, - events: syncmanager.NewEvents(), - engine: e, - syncThreshold: 10 * time.Second, + Module: subModule, + events: syncmanager.NewEvents(), + engine: e, + + optsSyncThreshold: 10 * time.Second, + optsIsBootstrappedFunc: nil, + optsBootstrappedThreshold: 10 * time.Second, + + isBootstrapped: false, + isSynced: false, + isFinalizationDelayed: true, lastAcceptedBlockSlot: latestCommitment.Slot(), lastConfirmedBlockSlot: latestCommitment.Slot(), latestCommitment: latestCommitment, latestFinalizedSlot: finalizedSlot, lastPrunedEpoch: 0, hasPruned: false, - isSynced: false, - isFinalizationDelayed: true, - isBootstrapped: false, - - optsBootstrappedThreshold: 10 * time.Second, }, opts, func(s *SyncManager) { s.updatePrunedEpoch(s.engine.Storage.LastPrunedEpoch()) @@ -170,109 +193,59 @@ func (s *SyncManager) SyncStatus() *syncmanager.SyncStatus { // Reset resets the component to a clean state as if it was created at the last commitment. func (s *SyncManager) Reset() { + s.isSyncedLock.Lock() + s.isFinalizationDelayedLock.Lock() s.lastAcceptedBlockSlotLock.Lock() s.lastConfirmedBlockSlotLock.Lock() s.latestCommitmentLock.RLock() - s.isSyncedLock.Lock() - s.isFinalizationDelayedLock.Lock() - defer s.lastAcceptedBlockSlotLock.Unlock() - defer s.lastConfirmedBlockSlotLock.Unlock() - defer s.latestCommitmentLock.RUnlock() - defer s.isSyncedLock.Unlock() - defer s.isFinalizationDelayedLock.Unlock() - s.lastAcceptedBlockSlot = s.latestCommitment.Slot() - s.lastConfirmedBlockSlot = s.latestCommitment.Slot() + defer func() { + s.isSyncedLock.Unlock() + s.isFinalizationDelayedLock.Unlock() + s.lastAcceptedBlockSlotLock.Unlock() + s.lastConfirmedBlockSlotLock.Unlock() + s.latestCommitmentLock.RUnlock() + }() + // Mark the synced flag as false, // because we clear the latest accepted blocks and return the whole state to the last committed slot. s.isSynced = false s.isFinalizationDelayed = true + s.lastAcceptedBlockSlot = s.latestCommitment.Slot() + s.lastConfirmedBlockSlot = s.latestCommitment.Slot() } -func (s *SyncManager) updateLastAcceptedBlock(id iotago.BlockID) (changed bool) { - s.lastAcceptedBlockSlotLock.Lock() - defer s.lastAcceptedBlockSlotLock.Unlock() - - if id.Slot() > s.lastAcceptedBlockSlot { - s.lastAcceptedBlockSlot = id.Slot() - return true - } - - return false -} - -func (s *SyncManager) updateLastConfirmedBlock(id iotago.BlockID) (changed bool) { - s.lastConfirmedBlockSlotLock.Lock() - defer s.lastConfirmedBlockSlotLock.Unlock() - - if id.Slot() > s.lastConfirmedBlockSlot { - s.lastConfirmedBlockSlot = id.Slot() - return true - } - - return false -} - -func (s *SyncManager) updateLatestCommitment(commitment *model.Commitment) (changed bool) { - s.latestCommitmentLock.Lock() - - if s.latestCommitment != commitment { - s.latestCommitment = commitment - s.latestCommitmentLock.Unlock() - - s.setIsFinalizationDelayed(s.LatestFinalizedSlot(), commitment.Slot()) - - return true - } - s.latestCommitmentLock.Unlock() - - return false +func (s *SyncManager) triggerUpdate() { + s.events.UpdatedStatus.Trigger(s.SyncStatus()) } -func (s *SyncManager) updateBootstrappedStatus() { +func (s *SyncManager) updateBootstrappedStatus() (changed bool) { s.isBootstrappedLock.Lock() defer s.isBootstrappedLock.Unlock() if !s.isBootstrapped && s.optsIsBootstrappedFunc(s.engine) { s.isBootstrapped = true - } -} - -func (s *SyncManager) updateSyncStatus() (changed bool) { - s.isSyncedLock.Lock() - defer s.isSyncedLock.Unlock() - if s.isSynced != (s.isBootstrapped && time.Since(s.engine.Clock.Accepted().RelativeTime()) < s.syncThreshold) { - s.isSynced = !s.isSynced return true } return false } -func (s *SyncManager) updateFinalizedSlot(slot iotago.SlotIndex) (changed bool) { - s.latestFinalizedSlotLock.Lock() - - if s.latestFinalizedSlot != slot { - s.latestFinalizedSlot = slot - s.latestFinalizedSlotLock.Unlock() - - s.setIsFinalizationDelayed(slot, s.LatestCommitment().Slot()) - - return true - } - s.latestFinalizedSlotLock.Unlock() +func (s *SyncManager) IsBootstrapped() bool { + s.isBootstrappedLock.RLock() + defer s.isBootstrappedLock.RUnlock() - return false + return s.isBootstrapped } -func (s *SyncManager) updatePrunedEpoch(epoch iotago.EpochIndex, hasPruned bool) (changed bool) { - s.lastPrunedEpochLock.Lock() - defer s.lastPrunedEpochLock.Unlock() +func (s *SyncManager) updateSyncStatus() (changed bool) { + s.isSyncedLock.Lock() + defer s.isSyncedLock.Unlock() - if s.lastPrunedEpoch != epoch { - s.lastPrunedEpoch = epoch - s.hasPruned = hasPruned + isSynced := s.isBootstrapped && time.Since(s.engine.Clock.Accepted().RelativeTime()) < s.optsSyncThreshold + if s.isSynced != isSynced { + s.isSynced = isSynced return true } @@ -280,13 +253,6 @@ func (s *SyncManager) updatePrunedEpoch(epoch iotago.EpochIndex, hasPruned bool) return false } -func (s *SyncManager) IsBootstrapped() bool { - s.isBootstrappedLock.RLock() - defer s.isBootstrappedLock.RUnlock() - - return s.isBootstrapped -} - func (s *SyncManager) IsNodeSynced() bool { s.isSyncedLock.RLock() defer s.isSyncedLock.RUnlock() @@ -294,16 +260,23 @@ func (s *SyncManager) IsNodeSynced() bool { return s.isSynced } -func (s *SyncManager) setIsFinalizationDelayed(latestFinalizedSlot iotago.SlotIndex, latestCommitmentSlot iotago.SlotIndex) { +func (s *SyncManager) updateIsFinalizationDelayed(latestFinalizedSlot iotago.SlotIndex, latestCommitmentSlot iotago.SlotIndex) (changed bool) { s.isFinalizationDelayedLock.Lock() defer s.isFinalizationDelayedLock.Unlock() if latestCommitmentSlot < latestFinalizedSlot { // This should never happen, but if it does, we don't want to panic. - return + return false + } + + isFinalizationDelayed := latestCommitmentSlot-latestFinalizedSlot > s.engine.CommittedAPI().ProtocolParameters().MaxCommittableAge() + if s.isFinalizationDelayed != isFinalizationDelayed { + s.isFinalizationDelayed = isFinalizationDelayed + + return true } - s.isFinalizationDelayed = latestCommitmentSlot-latestFinalizedSlot > s.engine.CommittedAPI().ProtocolParameters().MaxCommittableAge() + return false } func (s *SyncManager) IsFinalizationDelayed() bool { @@ -313,6 +286,19 @@ func (s *SyncManager) IsFinalizationDelayed() bool { return s.isFinalizationDelayed } +func (s *SyncManager) updateLastAcceptedBlock(lastAcceptedBlockID iotago.BlockID) (changed bool) { + s.lastAcceptedBlockSlotLock.Lock() + defer s.lastAcceptedBlockSlotLock.Unlock() + + if lastAcceptedBlockID.Slot() > s.lastAcceptedBlockSlot { + s.lastAcceptedBlockSlot = lastAcceptedBlockID.Slot() + + return true + } + + return false +} + func (s *SyncManager) LastAcceptedBlockSlot() iotago.SlotIndex { s.lastAcceptedBlockSlotLock.RLock() defer s.lastAcceptedBlockSlotLock.RUnlock() @@ -320,6 +306,19 @@ func (s *SyncManager) LastAcceptedBlockSlot() iotago.SlotIndex { return s.lastAcceptedBlockSlot } +func (s *SyncManager) updateLastConfirmedBlock(lastConfirmedBlockID iotago.BlockID) (changed bool) { + s.lastConfirmedBlockSlotLock.Lock() + defer s.lastConfirmedBlockSlotLock.Unlock() + + if lastConfirmedBlockID.Slot() > s.lastConfirmedBlockSlot { + s.lastConfirmedBlockSlot = lastConfirmedBlockID.Slot() + + return true + } + + return false +} + func (s *SyncManager) LastConfirmedBlockSlot() iotago.SlotIndex { s.lastConfirmedBlockSlotLock.RLock() defer s.lastConfirmedBlockSlotLock.RUnlock() @@ -327,6 +326,25 @@ func (s *SyncManager) LastConfirmedBlockSlot() iotago.SlotIndex { return s.lastConfirmedBlockSlot } +func (s *SyncManager) updateLatestCommitment(commitment *model.Commitment) (changed bool) { + s.latestCommitmentLock.Lock() + + if s.latestCommitment != commitment { + s.latestCommitment = commitment + + // we need to unlock the lock before calling updateIsFinalizationDelayed, + // otherwise it might deadlock if isFinalizationDelayedLock is Rlocked in SyncStatus(). + s.latestCommitmentLock.Unlock() + + s.updateIsFinalizationDelayed(s.LatestFinalizedSlot(), commitment.Slot()) + + return true + } + s.latestCommitmentLock.Unlock() + + return false +} + func (s *SyncManager) LatestCommitment() *model.Commitment { s.latestCommitmentLock.RLock() defer s.latestCommitmentLock.RUnlock() @@ -334,6 +352,25 @@ func (s *SyncManager) LatestCommitment() *model.Commitment { return s.latestCommitment } +func (s *SyncManager) updateFinalizedSlot(slot iotago.SlotIndex) (changed bool) { + s.latestFinalizedSlotLock.Lock() + + if s.latestFinalizedSlot != slot { + s.latestFinalizedSlot = slot + + // we need to unlock the lock before calling updateIsFinalizationDelayed, + // otherwise it might deadlock if isFinalizationDelayedLock is Rlocked in SyncStatus(). + s.latestFinalizedSlotLock.Unlock() + + s.updateIsFinalizationDelayed(slot, s.LatestCommitment().Slot()) + + return true + } + s.latestFinalizedSlotLock.Unlock() + + return false +} + func (s *SyncManager) LatestFinalizedSlot() iotago.SlotIndex { s.latestFinalizedSlotLock.RLock() defer s.latestFinalizedSlotLock.RUnlock() @@ -341,25 +378,23 @@ func (s *SyncManager) LatestFinalizedSlot() iotago.SlotIndex { return s.latestFinalizedSlot } -func (s *SyncManager) LastPrunedEpoch() (iotago.EpochIndex, bool) { - s.lastPrunedEpochLock.RLock() - defer s.lastPrunedEpochLock.RUnlock() - - return s.lastPrunedEpoch, s.hasPruned -} +func (s *SyncManager) updatePrunedEpoch(epoch iotago.EpochIndex, hasPruned bool) (changed bool) { + s.lastPrunedEpochLock.Lock() + defer s.lastPrunedEpochLock.Unlock() -func (s *SyncManager) triggerUpdate() { - s.events.UpdatedStatus.Trigger(s.SyncStatus()) -} + if s.lastPrunedEpoch != epoch || s.hasPruned != hasPruned { + s.lastPrunedEpoch = epoch + s.hasPruned = hasPruned -func WithBootstrappedThreshold(threshold time.Duration) options.Option[SyncManager] { - return func(s *SyncManager) { - s.optsBootstrappedThreshold = threshold + return true } + + return false } -func WithBootstrappedFunc(isBootstrapped func(*engine.Engine) bool) options.Option[SyncManager] { - return func(s *SyncManager) { - s.optsIsBootstrappedFunc = isBootstrapped - } +func (s *SyncManager) LastPrunedEpoch() (iotago.EpochIndex, bool) { + s.lastPrunedEpochLock.RLock() + defer s.lastPrunedEpochLock.RUnlock() + + return s.lastPrunedEpoch, s.hasPruned } diff --git a/pkg/tests/protocol_engine_switching_test.go b/pkg/tests/protocol_engine_switching_test.go index b98155a88..0b9912dbf 100644 --- a/pkg/tests/protocol_engine_switching_test.go +++ b/pkg/tests/protocol_engine_switching_test.go @@ -363,7 +363,7 @@ func TestProtocol_EngineSwitching(t *testing.T) { ), protocol.WithSyncManagerProvider( trivialsyncmanager.NewProvider( - trivialsyncmanager.WithBootstrappedFunc(func(e *engine.Engine) bool { + trivialsyncmanager.WithIsBootstrappedFunc(func(e *engine.Engine) bool { return e.SyncManager.LatestCommitment().Slot() >= expectedCommittedSlotAfterPartitionMerge && e.Notarization.IsBootstrapped() }), ), @@ -723,7 +723,7 @@ func TestProtocol_EngineSwitching_CommitteeRotation(t *testing.T) { ), protocol.WithSyncManagerProvider( trivialsyncmanager.NewProvider( - trivialsyncmanager.WithBootstrappedFunc(func(e *engine.Engine) bool { + trivialsyncmanager.WithIsBootstrappedFunc(func(e *engine.Engine) bool { return e.SyncManager.LatestCommitment().Slot() >= expectedCommittedSlotAfterPartitionMerge && e.Notarization.IsBootstrapped() }), ), @@ -1058,7 +1058,7 @@ func TestProtocol_EngineSwitching_Tie(t *testing.T) { protocol.WithSyncManagerProvider( trivialsyncmanager.NewProvider( - trivialsyncmanager.WithBootstrappedFunc(func(e *engine.Engine) bool { + trivialsyncmanager.WithIsBootstrappedFunc(func(e *engine.Engine) bool { return e.Storage.Settings().LatestCommitment().Slot() >= expectedCommittedSlotAfterPartitionMerge && e.Notarization.IsBootstrapped() }), ), diff --git a/pkg/tests/protocol_eviction_test.go b/pkg/tests/protocol_eviction_test.go index 17eb69a13..c933918de 100644 --- a/pkg/tests/protocol_eviction_test.go +++ b/pkg/tests/protocol_eviction_test.go @@ -93,7 +93,7 @@ func TestProtocol_Eviction(t *testing.T) { protocol.WithSyncManagerProvider( trivialsyncmanager.NewProvider( - trivialsyncmanager.WithBootstrappedFunc(func(e *engine.Engine) bool { + trivialsyncmanager.WithIsBootstrappedFunc(func(e *engine.Engine) bool { return e.Notarization.IsBootstrapped() }), ),