Skip to content

Commit

Permalink
Feat: finished chain framework
Browse files Browse the repository at this point in the history
  • Loading branch information
hmoog committed Aug 21, 2023
1 parent 1510216 commit ef013c5
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 62 deletions.
50 changes: 32 additions & 18 deletions pkg/protocol/chainmanagerv1/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ const (

// WarpSyncOffset defines how many slots a commitment needs to be behind the latest commitment to be requested by
// the warp sync process.
WarpSyncOffset = 2
WarpSyncOffset = 1
)

type Chain struct {
forkingPoint reactive.Variable[*CommitmentMetadata]

commitments *shrinkingmap.ShrinkingMap[iotago.SlotIndex, *CommitmentMetadata]

latestCommitmentIndex reactive.Variable[iotago.SlotIndex]
Expand All @@ -34,14 +36,17 @@ type Chain struct {
warpSyncThreshold reactive.Variable[iotago.SlotIndex]
}

func NewChain() *Chain {
func NewChain(forkingPoint *CommitmentMetadata) *Chain {
c := &Chain{
forkingPoint: reactive.NewVariable[*CommitmentMetadata](),
commitments: shrinkingmap.New[iotago.SlotIndex, *CommitmentMetadata](),
evicted: reactive.NewEvent(),
latestCommitmentIndex: reactive.NewVariable[iotago.SlotIndex](),
latestVerifiedCommitmentIndex: reactive.NewVariable[iotago.SlotIndex](),
}

c.forkingPoint.Set(forkingPoint)

c.syncThreshold = reactive.NewDerivedVariable[iotago.SlotIndex](func(latestVerifiedCommitmentIndex iotago.SlotIndex) iotago.SlotIndex {
return latestVerifiedCommitmentIndex + 1 + SyncWindow
}, c.latestVerifiedCommitmentIndex)
Expand All @@ -53,22 +58,6 @@ func NewChain() *Chain {
return c
}

func (c *Chain) RegisterCommitment(commitment *CommitmentMetadata) {
c.latestCommitmentIndex.Compute(func(latestCommitmentIndex iotago.SlotIndex) iotago.SlotIndex {
c.commitments.Set(commitment.Index(), commitment)

return lo.Cond(latestCommitmentIndex > commitment.Index(), latestCommitmentIndex, commitment.Index())
})
}

func (c *Chain) UnregisterCommitment(commitment *CommitmentMetadata) {
c.latestCommitmentIndex.Compute(func(latestCommitmentIndex iotago.SlotIndex) iotago.SlotIndex {
c.commitments.Delete(commitment.Index())

return lo.Cond(commitment.Index() < latestCommitmentIndex, commitment.Index()-1, latestCommitmentIndex)
})
}

func (c *Chain) LatestVerifiedCommitmentIndex() reactive.Variable[iotago.SlotIndex] {
return c.latestVerifiedCommitmentIndex
}
Expand All @@ -80,3 +69,28 @@ func (c *Chain) SyncThreshold() reactive.Variable[iotago.SlotIndex] {
func (c *Chain) WarpSyncThreshold() reactive.Variable[iotago.SlotIndex] {
return c.warpSyncThreshold
}

func (c *Chain) registerCommitment(commitment *CommitmentMetadata) {
c.latestCommitmentIndex.Compute(func(latestCommitmentIndex iotago.SlotIndex) iotago.SlotIndex {
c.commitments.Set(commitment.Index(), commitment)

return lo.Cond(latestCommitmentIndex > commitment.Index(), latestCommitmentIndex, commitment.Index())
})

unregisterCommitment := reactive.NewEvent()
unsubscribe := commitment.Chain().OnUpdate(func(_, newValue *Chain) {
if newValue != c {
unregisterCommitment.Trigger()
}
})

unregisterCommitment.OnTrigger(func() {
go unsubscribe()

c.latestCommitmentIndex.Compute(func(latestCommitmentIndex iotago.SlotIndex) iotago.SlotIndex {
c.commitments.Delete(commitment.Index())

return lo.Cond(commitment.Index() < latestCommitmentIndex, commitment.Index()-1, latestCommitmentIndex)
})
})
}
7 changes: 5 additions & 2 deletions pkg/protocol/chainmanagerv1/chainmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ type ChainManager struct {

func NewChainManager() *ChainManager {
return &ChainManager{
rootChain: NewChain(),
rootCommitment: reactive.NewVariable[*CommitmentMetadata](),
commitmentCreated: event.New1[*CommitmentMetadata](),
cachedCommitments: shrinkingmap.New[iotago.CommitmentID, *promise.Promise[*CommitmentMetadata]](),
Expand All @@ -55,14 +54,18 @@ func (c *ChainManager) SetRootCommitment(commitment *model.Commitment) (commitme
}

commitmentMetadata = NewCommitmentMetadata(commitment)
commitmentMetadata.Chain().Set(c.rootChain)
commitmentMetadata.Solid().Trigger()
commitmentMetadata.Verified().Trigger()
commitmentMetadata.BelowSyncThreshold().Trigger()
commitmentMetadata.BelowWarpSyncThreshold().Trigger()
commitmentMetadata.BelowLatestVerifiedCommitment().Trigger()
commitmentMetadata.Evicted().Trigger()

if c.rootChain == nil {
c.rootChain = NewChain(commitmentMetadata)
}
commitmentMetadata.Chain().Set(c.rootChain)

return commitmentMetadata
})

Expand Down
2 changes: 2 additions & 0 deletions pkg/protocol/chainmanagerv1/chainmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,6 @@ func TestChainManager(t *testing.T) {

require.True(t, commitment3Metadata.AboveLatestVerifiedCommitment().Get())
require.True(t, commitment3Metadata.BelowSyncThreshold().Get())

require.Equal(t, iotago.SlotIndex(3), commitment3Metadata.Chain().Get().latestCommitmentIndex.Get())
}
82 changes: 43 additions & 39 deletions pkg/protocol/chainmanagerv1/commitment_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,7 @@ func NewCommitmentMetadata(commitment *model.Commitment) *CommitmentMetadata {
chainSuccessor: reactive.NewVariable[*CommitmentMetadata](),
}

c.chain.OnUpdate(func(oldChain, newChain *Chain) {
if oldChain != nil {
oldChain.UnregisterCommitment(c)
}

newChain.RegisterCommitment(c)
})
c.chain.OnUpdate(func(_, chain *Chain) { chain.registerCommitment(c) })

c.directlyAboveLatestVerifiedCommitment = reactive.NewDerivedVariable2(func(parentVerified, verified bool) bool {
return parentVerified && !verified
Expand Down Expand Up @@ -79,69 +73,79 @@ func (c *CommitmentMetadata) RegisterParent(parent *CommitmentMetadata) {
// triggerEventIfIndexBelowThreshold triggers the given event if the commitment's index is below the given
// threshold. We only monitor the threshold after the corresponding parent event was triggered (to minimize
// the amount of elements that listen to updates of the same property).
triggerEventIfIndexBelowThreshold := func(eventFunc func(*CommitmentMetadata) reactive.Event, thresholdFunc func(*Chain) reactive.Variable[iotago.SlotIndex]) {
triggerEventIfIndexBelowThreshold := func(eventFunc func(*CommitmentMetadata) reactive.Event, thresholdFunc func(*Chain) reactive.Variable[iotago.SlotIndex], id string) {
eventFunc(parent).OnTrigger(func() {
unsubscribe := thresholdFunc(c.chain.Get()).OnUpdate(func(_, latestVerifiedCommitmentIndex iotago.SlotIndex) {
if c.Index() < latestVerifiedCommitmentIndex {
eventFunc(c).Trigger()
}
})

eventFunc(c).OnTrigger(unsubscribe)
eventFunc(c).OnTrigger(func() { go unsubscribe() })
})
}

triggerEventIfIndexBelowThreshold((*CommitmentMetadata).BelowLatestVerifiedCommitment, (*Chain).LatestVerifiedCommitmentIndex)
triggerEventIfIndexBelowThreshold((*CommitmentMetadata).BelowSyncThreshold, (*Chain).SyncThreshold)
triggerEventIfIndexBelowThreshold((*CommitmentMetadata).BelowWarpSyncThreshold, (*Chain).WarpSyncThreshold)
triggerEventIfIndexBelowThreshold((*CommitmentMetadata).BelowLatestVerifiedCommitment, (*Chain).LatestVerifiedCommitmentIndex, "BelowLatestVerifiedCommitment")
triggerEventIfIndexBelowThreshold((*CommitmentMetadata).BelowSyncThreshold, (*Chain).SyncThreshold, "BelowSyncThreshold")
triggerEventIfIndexBelowThreshold((*CommitmentMetadata).BelowWarpSyncThreshold, (*Chain).WarpSyncThreshold, "BelowWarpSyncThreshold")
}

func (c *CommitmentMetadata) RegisterChild(newChild *CommitmentMetadata, onSuccessorUpdated func(*CommitmentMetadata, *CommitmentMetadata)) {
c.chainSuccessor.Compute(func(currentSuccessor *CommitmentMetadata) *CommitmentMetadata {
return lo.Cond(currentSuccessor != nil, currentSuccessor, newChild)
})

unsubscribe := c.chainSuccessor.OnUpdate(onSuccessorUpdated)

c.evicted.OnTrigger(unsubscribe)
// unsubscribe the handler on eviction to prevent memory leaks
c.evicted.OnTrigger(c.chainSuccessor.OnUpdate(onSuccessorUpdated))
}

// inheritChain returns a function that implements the chain inheritance rules.
//
// It must be called whenever the successor of the parent changes as we spawn a new chain for each child that is not the
// direct successor of a parent, and we inherit its chain otherwise.
func (c *CommitmentMetadata) inheritChain(parent *CommitmentMetadata) func(*CommitmentMetadata, *CommitmentMetadata) {
var (
spawnedChain *Chain
unsubscribe func()
)
var spawnedChain *Chain

spawnChain := func() {
if spawnedChain == nil {
spawnedChain = NewChain(c)

c.chain.Set(spawnedChain)
}
}

evictSpawnedChain := func() {
if spawnedChain != nil {
spawnedChain.evicted.Trigger()
spawnedChain = nil
}
}

var unsubscribe func()

subscribeToParentChain := func() {
if unsubscribe == nil {
unsubscribe = parent.chain.OnUpdate(func(_, chain *Chain) { c.chain.Set(chain) })
}
}

unsubscribeFromParentChain := func() {
if unsubscribe != nil {
unsubscribe()
unsubscribe = nil
}
}

return func(_, successor *CommitmentMetadata) {
switch successor {
case nil:
panic("successor must never be changed back to nil")
case c:
if spawnedChain != nil {
spawnedChain.evicted.Trigger()
spawnedChain = nil
}

if unsubscribe == nil {
unsubscribe = parent.chain.OnUpdate(func(_, chain *Chain) {
c.chain.Set(chain)
})
}
evictSpawnedChain()
subscribeToParentChain()
default:
if unsubscribe != nil {
unsubscribe()
unsubscribe = nil
}

if spawnedChain == nil {
spawnedChain = NewChain()

c.chain.Set(spawnedChain)
}
unsubscribeFromParentChain()
spawnChain()
}
}
}
6 changes: 3 additions & 3 deletions pkg/protocol/chainmanagerv1/commitment_metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@ import (
func TestCommitment(t *testing.T) {
testAPI := tpkg.TestAPI

rootChain := NewChain()

rootCommitment := model.NewEmptyCommitment(testAPI)
rootCommitmentMetadata := NewCommitmentMetadata(rootCommitment)
rootCommitmentMetadata.Chain().Set(rootChain)
rootCommitmentMetadata.Solid().Trigger()
rootCommitmentMetadata.Verified().Trigger()
rootCommitmentMetadata.BelowSyncThreshold().Trigger()
rootCommitmentMetadata.BelowWarpSyncThreshold().Trigger()
rootCommitmentMetadata.BelowLatestVerifiedCommitment().Trigger()

rootChain := NewChain(rootCommitmentMetadata)
rootCommitmentMetadata.Chain().Set(rootChain)

commitment1, err := model.CommitmentFromCommitment(iotago.NewCommitment(1, rootCommitment.Index()+1, rootCommitment.ID(), rootCommitmentMetadata.RootsID(), 1, 1), testAPI)
require.NoError(t, err)

Expand Down

0 comments on commit ef013c5

Please sign in to comment.