Skip to content

Commit

Permalink
Fix: fixed some SCHTUFF
Browse files Browse the repository at this point in the history
  • Loading branch information
hmoog committed Oct 29, 2023
1 parent 9f1f6bf commit e148b38
Showing 1 changed file with 53 additions and 29 deletions.
82 changes: 53 additions & 29 deletions pkg/protocol/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ func NewChain(protocol *Protocol) *Chain {
LatestCommitment: reactive.NewVariable[*Commitment](),
LatestAttestedCommitment: reactive.NewVariable[*Commitment](),
LatestVerifiedCommitment: reactive.NewVariable[*Commitment](),
ClaimedWeight: reactive.NewVariable[uint64](),
AttestedWeight: reactive.NewVariable[uint64](),
VerifiedWeight: reactive.NewVariable[uint64](),
WarpSync: reactive.NewVariable[bool]().Init(true),
NetworkClockSlot: reactive.NewVariable[iotago.SlotIndex](),
WarpSyncThreshold: reactive.NewVariable[iotago.SlotIndex](),
Expand All @@ -65,14 +67,10 @@ func NewChain(protocol *Protocol) *Chain {
SpawnedEngine: reactive.NewVariable[*engine.Engine](),
}

c.LatestAttestedCommitment.OnUpdateWithContext(func(_, latestAttestedCommitment *Commitment, withinContext func(subscriptionFactory func() (unsubscribe func()))) {
withinContext(func() (unsubscribe func()) {
return latestAttestedCommitment.CumulativeAttestedWeight.OnUpdate(func(_, newValue uint64) { c.AttestedWeight.Set(newValue) })
})
})

c.ClaimedWeight = reactive.NewDerivedVariable((*Commitment).cumulativeWeight, c.LatestCommitment)
c.VerifiedWeight = reactive.NewDerivedVariable((*Commitment).cumulativeWeight, c.LatestVerifiedCommitment)
c.initClaimedWeight()
c.initAttestedWeight()
c.initVerifiedWeight()
c.initWarpSync()

c.Engine = reactive.NewDerivedVariable2(func(spawnedEngine, parentEngine *engine.Engine) *engine.Engine {
if spawnedEngine != nil {
Expand Down Expand Up @@ -163,34 +161,60 @@ func NewChain(protocol *Protocol) *Chain {
c.SpawnedEngine.LogUpdates(entityLogger, log.LevelDebug, "SpawnedEngine", (*engine.Engine).LogName)
})

warpSyncTogglePool := workerpool.New("WarpSync toggle", workerpool.WithWorkerCount(1))
return c
}

var unsubscribe func()
c.WarpSync.OnUpdate(func(_, warpSync bool) {
if unsubscribe != nil {
unsubscribe()
func (c *Chain) initClaimedWeight() {
c.ClaimedWeight.InheritFrom(reactive.NewDerivedVariable(func(c *Commitment) uint64 {
if c == nil {
return 0
}

if warpSync {
c.LogDebug("warp-sync enabled")

warpSyncTogglePool.Submit(func() {
unsubscribe = c.WarpSync.InheritFrom(reactive.NewDerivedVariable2(func(latestVerifiedCommitment *Commitment, warpSyncThreshold iotago.SlotIndex) bool {
return latestVerifiedCommitment != nil && latestVerifiedCommitment.ID().Slot() < warpSyncThreshold
}, c.LatestVerifiedCommitment, c.WarpSyncThreshold))
})
} else {
c.LogDebug("warp-sync disabled")
return c.CumulativeWeight()
}, c.LatestCommitment))
}

warpSyncTogglePool.Submit(func() {
unsubscribe = c.WarpSync.InheritFrom(reactive.NewDerivedVariable2(func(latestVerifiedCommitment *Commitment, outOfSyncThreshold iotago.SlotIndex) bool {
return latestVerifiedCommitment != nil && latestVerifiedCommitment.ID().Slot() < outOfSyncThreshold
}, c.LatestVerifiedCommitment, c.OutOfSyncThreshold))
})
func (c *Chain) initAttestedWeight() {
c.LatestAttestedCommitment.OnUpdateWithContext(func(_, latestAttestedCommitment *Commitment, unsubscribeOnUpdate func(subscriptionFactory func() (unsubscribe func()))) {
setupInheritance := func() func() {
return c.AttestedWeight.InheritFrom(latestAttestedCommitment.CumulativeAttestedWeight)
}

unsubscribeOnUpdate(setupInheritance)
})
}

return c
func (c *Chain) initVerifiedWeight() {
c.VerifiedWeight.InheritFrom(reactive.NewDerivedVariable(func(c *Commitment) uint64 {
if c == nil {
return 0
}

return c.CumulativeWeight()
}, c.LatestVerifiedCommitment))
}

func (c *Chain) initWarpSync() {
enableWarpSyncIfNecessary := func() (unsubscribe func()) {
return c.WarpSync.InheritFrom(reactive.NewDerivedVariable2(func(latestVerifiedCommitment *Commitment, outOfSyncThreshold iotago.SlotIndex) bool {
return latestVerifiedCommitment != nil && latestVerifiedCommitment.ID().Slot() < outOfSyncThreshold
}, c.LatestVerifiedCommitment, c.OutOfSyncThreshold))
}

disableWarpSyncIfNecessary := func() (unsubscribe func()) {
return c.WarpSync.InheritFrom(reactive.NewDerivedVariable2(func(latestVerifiedCommitment *Commitment, warpSyncThreshold iotago.SlotIndex) bool {
return latestVerifiedCommitment != nil && latestVerifiedCommitment.ID().Slot() < warpSyncThreshold
}, c.LatestVerifiedCommitment, c.WarpSyncThreshold))
}

warpSyncTogglePool := workerpool.New("WarpSync toggle", workerpool.WithWorkerCount(1)).Start()
c.IsEvicted.OnTrigger(func() { warpSyncTogglePool.Shutdown() })

c.WarpSync.OnUpdateWithContext(func(_, warpSync bool, unsubscribeOnUpdate func(subscriptionFactory func() (unsubscribe func()))) {
if !c.IsEvicted.Get() {
warpSyncTogglePool.Submit(func() { unsubscribeOnUpdate(lo.Cond(warpSync, disableWarpSyncIfNecessary, enableWarpSyncIfNecessary)) })
}
})
}

func (c *Chain) Commitment(slot iotago.SlotIndex) (commitment *Commitment, exists bool) {
Expand Down

0 comments on commit e148b38

Please sign in to comment.