Skip to content

Commit

Permalink
Merge pull request #818 from iotaledger/feat/back-reverse
Browse files Browse the repository at this point in the history
Fix: Eviction order in protocol to use lo.BatchReverse
  • Loading branch information
alexsporn authored Mar 8, 2024
2 parents 7125440 + ad01117 commit b03ce6a
Show file tree
Hide file tree
Showing 8 changed files with 25 additions and 25 deletions.
4 changes: 2 additions & 2 deletions pkg/protocol/attestations.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func newAttestations(protocol *Protocol) *Attestations {
}

protocol.Constructed.OnTrigger(func() {
shutdown := lo.Batch(
shutdown := lo.BatchReverse(
a.initCommitmentVerifiers(),
a.initRequester(),
)
Expand Down Expand Up @@ -76,7 +76,7 @@ func (a *Attestations) initCommitmentVerifiers() func() {

// initRequester initializes the ticker that is used to send commitment requests.
func (a *Attestations) initRequester() (shutdown func()) {
unsubscribeFromTicker := lo.Batch(
unsubscribeFromTicker := lo.BatchReverse(
a.protocol.Commitments.WithElements(func(commitment *Commitment) (shutdown func()) {
return commitment.RequestAttestations.WithNonEmptyValue(func(_ bool) (teardown func()) {
if commitment.CumulativeWeight.Get() == 0 {
Expand Down
2 changes: 1 addition & 1 deletion pkg/protocol/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func newBlocks(protocol *Protocol) *Blocks {

//nolint:revive
protocol.Chains.WithInitializedEngines(func(chain *Chain, engine *engine.Engine) (shutdown func()) {
return lo.Batch(
return lo.BatchReverse(
engine.Events.BlockRequester.Tick.Hook(b.SendRequest).Unhook,
engine.Events.Scheduler.BlockScheduled.Hook(func(block *blocks.Block) { b.SendResponse(block.ModelBlock()) }).Unhook,
engine.Events.Scheduler.BlockSkipped.Hook(func(block *blocks.Block) { b.SendResponse(block.ModelBlock()) }).Unhook,
Expand Down
6 changes: 3 additions & 3 deletions pkg/protocol/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func newChain(chains *Chains) *Chain {
commitments: shrinkingmap.New[iotago.SlotIndex, *Commitment](),
}

shutdown := lo.Batch(
shutdown := lo.BatchReverse(
c.initLogger(),
c.initDerivedProperties(),
)
Expand Down Expand Up @@ -191,7 +191,7 @@ func (c *Chain) LatestEngine() *engine.Engine {
func (c *Chain) initLogger() (shutdown func()) {
c.Logger = c.chains.NewChildLogger("", true)

return lo.Batch(
return lo.BatchReverse(
c.WarpSyncMode.LogUpdates(c, log.LevelTrace, "WarpSyncMode"),
c.LatestSyncedSlot.LogUpdates(c, log.LevelTrace, "LatestSyncedSlot"),
c.OutOfSyncThreshold.LogUpdates(c, log.LevelTrace, "OutOfSyncThreshold"),
Expand Down Expand Up @@ -318,7 +318,7 @@ func (c *Chain) addCommitment(newCommitment *Commitment) (shutdown func()) {

c.LatestCommitment.Set(newCommitment)

return lo.Batch(
return lo.BatchReverse(
newCommitment.IsAttested.OnTrigger(func() { c.LatestAttestedCommitment.Set(newCommitment) }),
newCommitment.IsVerified.OnTrigger(func() { c.LatestProducedCommitment.Set(newCommitment) }),
newCommitment.IsSynced.OnTrigger(func() { c.LatestSyncedSlot.Set(newCommitment.Slot()) }),
Expand Down
14 changes: 7 additions & 7 deletions pkg/protocol/chains.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func newChains(protocol *Protocol) *Chains {
})
})

shutdown := lo.Batch(
shutdown := lo.BatchReverse(
c.initLogger(protocol.NewChildLogger("Chains")),
c.initChainSwitching(),

Expand All @@ -91,7 +91,7 @@ func newChains(protocol *Protocol) *Chains {
func attachEngineLogs(instance *engine.Engine) func() {
events := instance.Events

return lo.Batch(
return lo.BatchReverse(
events.BlockDAG.BlockAppended.Hook(func(block *blocks.Block) {
instance.LogTrace("BlockDAG.BlockAppended", "block", block.ID())
}).Unhook,
Expand Down Expand Up @@ -304,7 +304,7 @@ func (c *Chains) WithInitializedEngines(callback func(chain *Chain, engine *engi
func (c *Chains) initLogger(logger log.Logger) (shutdown func()) {
c.Logger = logger

return lo.Batch(
return lo.BatchReverse(
c.Main.LogUpdates(c, log.LevelTrace, "Main", (*Chain).LogName),
c.HeaviestClaimedCandidate.LogUpdates(c, log.LevelTrace, "HeaviestClaimedCandidate", (*Chain).LogName),
c.HeaviestAttestedCandidate.LogUpdates(c, log.LevelTrace, "HeaviestAttestedCandidate", (*Chain).LogName),
Expand All @@ -321,7 +321,7 @@ func (c *Chains) initChainSwitching() (shutdown func()) {

c.Main.Set(mainChain)

return lo.Batch(
return lo.BatchReverse(
c.HeaviestClaimedCandidate.WithNonEmptyValue(func(heaviestClaimedCandidate *Chain) (shutdown func()) {
return heaviestClaimedCandidate.RequestAttestations.ToggleValue(true)
}),
Expand Down Expand Up @@ -364,7 +364,7 @@ func (c *Chains) trackHeaviestCandidates(chain *Chain) (teardown func()) {
func (c *Chains) updateMeasuredSlot(latestSeenSlot iotago.SlotIndex) (teardown func()) {
measuredSlot := latestSeenSlot - chainSwitchingMeasurementOffset

return lo.Batch(
return lo.BatchReverse(
c.HeaviestClaimedCandidate.measureAt(measuredSlot),
c.HeaviestAttestedCandidate.measureAt(measuredSlot),
c.HeaviestVerifiedCandidate.measureAt(measuredSlot),
Expand All @@ -375,7 +375,7 @@ func (c *Chains) updateMeasuredSlot(latestSeenSlot iotago.SlotIndex) (teardown f
func (c *Chains) deriveLatestSeenSlot(protocol *Protocol) func() {
//nolint:revive
return protocol.Engines.Main.WithNonEmptyValue(func(mainEngine *engine.Engine) (shutdown func()) {
return lo.Batch(
return lo.BatchReverse(
c.WithInitializedEngines(func(_ *Chain, engine *engine.Engine) (shutdown func()) {
return engine.LatestCommitment.OnUpdate(func(_ *model.Commitment, latestCommitment *model.Commitment) {
c.LatestSeenSlot.Set(latestCommitment.Slot())
Expand Down Expand Up @@ -483,7 +483,7 @@ func (c *ChainsCandidate) measureAt(slot iotago.SlotIndex) (teardown func()) {
})

// return all teardown functions
return lo.Batch(append(teardownMonitoringFunctions, teardownUpdates)...)
return lo.BatchReverse(append(teardownMonitoringFunctions, teardownUpdates)...)
})
})
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/protocol/commitment.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func newCommitment(commitments *Commitments, model *model.Commitment) *Commitmen
commitments: commitments,
}

shutdown := lo.Batch(
shutdown := lo.BatchReverse(
c.initLogger(),
c.initDerivedProperties(),
)
Expand Down Expand Up @@ -206,7 +206,7 @@ func (c *Commitment) Less(other *Commitment) bool {
func (c *Commitment) initLogger() (shutdown func()) {
c.Logger = c.commitments.NewChildLogger(fmt.Sprintf("Slot%d.", c.Slot()), true)

return lo.Batch(
return lo.BatchReverse(
c.Parent.LogUpdates(c, log.LevelTrace, "Parent", (*Commitment).LogName),
c.MainChild.LogUpdates(c, log.LevelTrace, "MainChild", (*Commitment).LogName),
c.Chain.LogUpdates(c, log.LevelTrace, "Chain", (*Chain).LogName),
Expand All @@ -231,7 +231,7 @@ func (c *Commitment) initLogger() (shutdown func()) {

// initDerivedProperties initializes the behavior of this Commitment by setting up the relations between its properties.
func (c *Commitment) initDerivedProperties() (shutdown func()) {
return lo.Batch(
return lo.BatchReverse(
// mark commitments that are marked as root as verified
c.IsVerified.InheritFrom(c.IsRoot),

Expand All @@ -256,7 +256,7 @@ func (c *Commitment) initDerivedProperties() (shutdown func()) {
c.deriveIsAboveLatestVerifiedCommitment(parent),

c.Chain.WithNonEmptyValue(func(chain *Chain) func() {
return lo.Batch(
return lo.BatchReverse(
c.deriveRequestAttestations(chain, parent),

// only start requesting blocks once the engine is ready
Expand All @@ -269,7 +269,7 @@ func (c *Commitment) initDerivedProperties() (shutdown func()) {
}),

c.Chain.WithNonEmptyValue(func(chain *Chain) func() {
return lo.Batch(
return lo.BatchReverse(
chain.addCommitment(c),

c.deriveReplayDroppedBlocks(chain),
Expand Down
6 changes: 3 additions & 3 deletions pkg/protocol/commitments.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func newCommitments(protocol *Protocol) *Commitments {
requester: eventticker.New[iotago.SlotIndex, iotago.CommitmentID](protocol.Options.CommitmentRequesterOptions...),
}

shutdown := lo.Batch(
shutdown := lo.BatchReverse(
c.initLogger(),
c.initEngineCommitmentSynchronization(),
c.initRequester(),
Expand Down Expand Up @@ -101,7 +101,7 @@ func (c *Commitments) API(commitmentID iotago.CommitmentID) (commitmentAPI *engi
func (c *Commitments) initLogger() (shutdown func()) {
c.Logger = c.protocol.NewChildLogger("Commitments")

return lo.Batch(
return lo.BatchReverse(
c.Root.LogUpdates(c, log.LevelTrace, "Root", (*Commitment).LogName),

c.Logger.UnsubscribeFromParentLogger,
Expand All @@ -111,7 +111,7 @@ func (c *Commitments) initLogger() (shutdown func()) {
// initEngineCommitmentSynchronization initializes the synchronization of commitments that are published by the engines.
func (c *Commitments) initEngineCommitmentSynchronization() func() {
return c.protocol.Constructed.WithNonEmptyValue(func(_ bool) (shutdown func()) {
return lo.Batch(
return lo.BatchReverse(
// advance the root commitment of the main chain
c.protocol.Chains.Main.WithNonEmptyValue(func(mainChain *Chain) (shutdown func()) {
return mainChain.WithInitializedEngine(func(mainEngine *engine.Engine) (shutdown func()) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/protocol/engines.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func newEngines(protocol *Protocol) *Engines {
}

protocol.Constructed.OnTrigger(func() {
shutdown := lo.Batch(
shutdown := lo.BatchReverse(
e.initLogger(protocol.NewChildLogger("Engines")),

e.syncMainEngineFromMainChain(),
Expand All @@ -78,7 +78,7 @@ func newEngines(protocol *Protocol) *Engines {
func (e *Engines) initLogger(logger log.Logger) (shutdown func()) {
e.Logger = logger

return lo.Batch(
return lo.BatchReverse(
e.Main.LogUpdates(e, log.LevelTrace, "Main", (*engine.Engine).LogName),

logger.UnsubscribeFromParentLogger,
Expand Down
4 changes: 2 additions & 2 deletions pkg/protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func New(logger log.Logger, workers *workerpool.Group, networkEndpoint network.E
shutdownSubComponents := p.initSubcomponents(networkEndpoint)

p.Initialized.OnTrigger(func() {
shutdown := lo.Batch(
shutdown := lo.BatchReverse(
p.initEviction(),
p.initGlobalEventsRedirection(),
p.initNetwork(),
Expand Down Expand Up @@ -188,7 +188,7 @@ func (p *Protocol) initGlobalEventsRedirection() (shutdown func()) {

// initNetwork initializes the network of the protocol and returns a function that shuts it down.
func (p *Protocol) initNetwork() (shutdown func()) {
return lo.Batch(
return lo.BatchReverse(
p.Network.OnError(func(err error, peer peer.ID) { p.LogError("network error", "peer", peer, "error", err) }),
p.Network.OnBlockReceived(p.Blocks.ProcessResponse),
p.Network.OnBlockRequestReceived(p.Blocks.ProcessRequest),
Expand Down

0 comments on commit b03ce6a

Please sign in to comment.