Skip to content

Commit

Permalink
Merge pull request #439 from iotaledger/fix/engine-shutdown-races
Browse files Browse the repository at this point in the history
Fix Engine shutdown races
  • Loading branch information
karimodm authored Oct 19, 2023
2 parents e919cf8 + 6b41e5d commit 1fbcc48
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 13 deletions.
2 changes: 1 addition & 1 deletion pkg/protocol/block_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (b *BlockDispatcher) Dispatch(block *model.Block, src peer.ID) error {

matchingEngineFound := false
for _, engine := range []*engine.Engine{b.protocol.MainEngineInstance(), b.protocol.CandidateEngineInstance()} {
if engine != nil && (engine.ChainID() == slotCommitment.Chain().ForkingPoint.ID() || engine.BlockRequester.HasTicker(block.ID())) {
if engine != nil && !engine.WasShutdown() && (engine.ChainID() == slotCommitment.Chain().ForkingPoint.ID() || engine.BlockRequester.HasTicker(block.ID())) {
if b.inSyncWindow(engine, block) {
engine.ProcessBlockFromPeer(block, src)
} else {
Expand Down
5 changes: 3 additions & 2 deletions pkg/protocol/engine/blockdag/inmemoryblockdag/blockdag.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ type BlockDAG struct {

func NewProvider(opts ...options.Option[BlockDAG]) module.Provider[*engine.Engine, blockdag.BlockDAG] {
return module.Provide(func(e *engine.Engine) blockdag.BlockDAG {

b := New(e.Workers.CreateGroup("BlockDAG"), int(e.Storage.Settings().APIProvider().CommittedAPI().ProtocolParameters().MaxCommittableAge())*2, e.EvictionState, e.BlockCache, e.ErrorHandler("blockdag"), opts...)

e.HookConstructed(func() {
Expand Down Expand Up @@ -78,7 +77,9 @@ func (b *BlockDAG) setupBlock(block *blocks.Block) {
block.ForEachParent(func(parent iotago.Parent) {
parentBlock, exists := b.blockCache.Block(parent.ID)
if !exists {
panic("cannot setup block without existing parent")
b.errorHandler(ierrors.Errorf("failed to setup block %s, parent %s is missing", block.ID(), parent.ID))

return
}

parentBlock.Solid().OnUpdateOnce(func(_, _ bool) {
Expand Down
6 changes: 5 additions & 1 deletion pkg/protocol/engine/clock/blocktime/clock.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ type Clock struct {
// confirmedTime contains a notion of time that is anchored to the latest confirmed block.
confirmedTime *RelativeTime

workerPool *workerpool.WorkerPool

syncutils.RWMutex

// Module embeds the required methods of the module.Interface.
Expand All @@ -36,6 +38,7 @@ func NewProvider(opts ...options.Option[Clock]) module.Provider[*engine.Engine,
return options.Apply(&Clock{
acceptedTime: NewRelativeTime(),
confirmedTime: NewRelativeTime(),
workerPool: e.Workers.CreatePool("Clock", workerpool.WithWorkerCount(1), workerpool.WithCancelPendingTasksOnShutdown(true), workerpool.WithPanicOnSubmitAfterShutdown(true)),
}, opts, func(c *Clock) {
e.HookConstructed(func() {
latestCommitmentIndex := e.Storage.Settings().LatestCommitment().Slot()
Expand All @@ -49,7 +52,7 @@ func NewProvider(opts ...options.Option[Clock]) module.Provider[*engine.Engine,
e.Events.Clock.AcceptedTimeUpdated.LinkTo(c.acceptedTime.OnUpdated)
e.Events.Clock.ConfirmedTimeUpdated.LinkTo(c.confirmedTime.OnUpdated)

asyncOpt := event.WithWorkerPool(e.Workers.CreatePool("Clock", workerpool.WithWorkerCount(1)))
asyncOpt := event.WithWorkerPool(c.workerPool)
c.HookStopped(lo.Batch(
e.Events.BlockGadget.BlockAccepted.Hook(func(block *blocks.Block) {
c.advanceAccepted(block.IssuingTime())
Expand Down Expand Up @@ -96,6 +99,7 @@ func (c *Clock) Snapshot() *clock.Snapshot {
}

func (c *Clock) Shutdown() {
c.workerPool.Shutdown()
c.TriggerStopped()
}

Expand Down
8 changes: 5 additions & 3 deletions pkg/protocol/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,8 @@ func New(
}

func (e *Engine) Shutdown() {
if !e.WasStopped() {
e.TriggerStopped()
if !e.WasShutdown() {
e.TriggerShutdown()

e.BlockRequester.Shutdown()
e.Attestations.Shutdown()
Expand All @@ -237,8 +237,10 @@ func (e *Engine) Shutdown() {
e.CommitmentFilter.Shutdown()
e.Scheduler.Shutdown()
e.Retainer.Shutdown()
e.Storage.Shutdown()
e.Workers.Shutdown()
e.Storage.Shutdown()

e.TriggerStopped()
}
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/retainer/retainer/retainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ type Retainer struct {

stakersResponses *shrinkingmap.ShrinkingMap[uint32, []*apimodels.ValidatorResponse]

workers *workerpool.Group
workerPool *workerpool.WorkerPool

module.Module
}

func New(workers *workerpool.Group, retainerFunc RetainerFunc, latestCommittedSlotFunc LatestCommittedSlotFunc, finalizedSlotFunc FinalizedSlotFunc, errorHandler func(error)) *Retainer {
func New(workersGroup *workerpool.Group, retainerFunc RetainerFunc, latestCommittedSlotFunc LatestCommittedSlotFunc, finalizedSlotFunc FinalizedSlotFunc, errorHandler func(error)) *Retainer {
return &Retainer{
workers: workers,
workerPool: workersGroup.CreatePool("Retainer", workerpool.WithWorkerCount(1)),
store: retainerFunc,
stakersResponses: shrinkingmap.New[uint32, []*apimodels.ValidatorResponse](),
latestCommittedSlotFunc: latestCommittedSlotFunc,
Expand All @@ -58,7 +58,7 @@ func NewProvider() module.Provider[*engine.Engine, retainer.Retainer] {
e.Storage.Settings().LatestFinalizedSlot,
e.ErrorHandler("retainer"))

asyncOpt := event.WithWorkerPool(r.workers.CreatePool("Retainer", workerpool.WithWorkerCount(1)))
asyncOpt := event.WithWorkerPool(r.workerPool)

e.Events.BlockDAG.BlockAttached.Hook(func(b *blocks.Block) {
if err := r.onBlockAttached(b.ID()); err != nil {
Expand Down Expand Up @@ -138,7 +138,7 @@ func NewProvider() module.Provider[*engine.Engine, retainer.Retainer] {
}

func (r *Retainer) Shutdown() {
r.workers.Shutdown()
r.workerPool.Shutdown()
}

func (r *Retainer) BlockMetadata(blockID iotago.BlockID) (*retainer.BlockMetadata, error) {
Expand Down
2 changes: 1 addition & 1 deletion tools/genesis-snapshot/presets/presets.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ var Feature = []options.Option[snapshotcreator.Options]{
iotago.NewV3ProtocolParameters(
iotago.WithNetworkOptions("feature", "rms"),
iotago.WithSupplyOptions(10_000_000_000, 100, 1, 10, 100, 100, 100),
iotago.WithTimeProviderOptions(1697406181, 10, 13),
iotago.WithTimeProviderOptions(1697631694, 10, 13),
iotago.WithLivenessOptions(30, 30, 10, 20, 30),
// increase/decrease threshold = fraction * slotDurationInSeconds * schedulerRate
iotago.WithCongestionControlOptions(500, 500, 500, 800000, 500000, 100000, 1000, 100),
Expand Down

0 comments on commit 1fbcc48

Please sign in to comment.