Skip to content

Commit

Permalink
merge ProcessOverseerSignals with processMessages
Browse files Browse the repository at this point in the history
  • Loading branch information
kishansagathiya committed Jan 8, 2024
1 parent 6ec97d4 commit 97a2df1
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 59 deletions.
16 changes: 13 additions & 3 deletions dot/parachain/availability-store/availability_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ func (av *AvailabilityStoreSubsystem) Run(ctx context.Context, OverseerToSubsyst

av.wg.Add(2)
go av.processMessages()
go av.ProcessOverseerSignals()

return nil
}
Expand Down Expand Up @@ -235,6 +234,14 @@ func (av *AvailabilityStoreSubsystem) processMessages() {
logger.Errorf("failed to handle store available data: %w", err)
}

case parachaintypes.ActiveLeavesUpdateSignal:
av.ProcessActiveLeavesUpdateSignal()

case parachaintypes.BlockFinalizedSignal:
av.ProcessBlockFinalizedSignal()

default:
logger.Error(parachaintypes.ErrUnknownOverseerMessage.Error())
}

case <-av.ctx.Done():
Expand All @@ -248,8 +255,11 @@ func (av *AvailabilityStoreSubsystem) processMessages() {
}
}

func (av *AvailabilityStoreSubsystem) ProcessOverseerSignals() {
av.wg.Done()
func (av *AvailabilityStoreSubsystem) ProcessActiveLeavesUpdateSignal() {
// TODO: #3630
}

func (av *AvailabilityStoreSubsystem) ProcessBlockFinalizedSignal() {
// TODO: #3630
}

Expand Down
21 changes: 11 additions & 10 deletions dot/parachain/backing/candidate_backing.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ func (cb *CandidateBacking) Run(ctx context.Context, overseerToSubSystem chan an

cb.wg.Add(2)
go cb.processMessages()
go cb.ProcessOverseerSignals()

return nil
}
Expand All @@ -89,8 +88,11 @@ func (*CandidateBacking) Name() parachaintypes.SubSystemName {
return parachaintypes.CandidateBacking
}

func (cb *CandidateBacking) ProcessOverseerSignals() {
cb.wg.Done()
func (cb *CandidateBacking) ProcessActiveLeavesUpdateSignal() {
// TODO #3644
}

func (cb *CandidateBacking) ProcessBlockFinalizedSignal() {
// TODO #3644
}

Expand All @@ -101,8 +103,6 @@ func (cb *CandidateBacking) processMessages() {
// process these received messages by referencing
// https://github.com/paritytech/polkadot-sdk/blob/769bdd3ff33a291cbc70a800a3830638467e42a2/polkadot/node/core/backing/src/lib.rs#L741
switch msg.(type) {
case ActiveLeavesUpdate:
cb.handleActiveLeavesUpdate()
case GetBackedCandidates:
cb.handleGetBackedCandidates()
case CanSecond:
Expand All @@ -111,8 +111,13 @@ func (cb *CandidateBacking) processMessages() {
cb.handleSecond()
case Statement:
cb.handleStatement()
case parachaintypes.ActiveLeavesUpdateSignal:
cb.ProcessActiveLeavesUpdateSignal()
case parachaintypes.BlockFinalizedSignal:
cb.ProcessBlockFinalizedSignal()
default:
logger.Error("unknown message type")
logger.Error(parachaintypes.ErrUnknownOverseerMessage.Error())
return
}

case <-cb.ctx.Done():
Expand All @@ -125,10 +130,6 @@ func (cb *CandidateBacking) processMessages() {
}
}

func (cb *CandidateBacking) handleActiveLeavesUpdate() {
// TODO: Implement this #3503
}

func (cb *CandidateBacking) handleGetBackedCandidates() {
// TODO: Implement this #3504
}
Expand Down
15 changes: 12 additions & 3 deletions dot/parachain/collator-protocol/validator_side.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ const (
var (
ErrUnexpectedMessageOnCollationProtocol = errors.New("unexpected message on collation protocol")
ErrUnknownPeer = errors.New("unknown peer")
ErrUnknownOverseerMessage = errors.New("unknown overseer message type")
ErrNotExpectedOnValidatorSide = errors.New("message is not expected on the validator side of the protocol")
ErrCollationNotInView = errors.New("collation is not in our view")
ErrPeerIDNotFoundForCollator = errors.New("peer id not found for collator")
Expand Down Expand Up @@ -96,7 +95,11 @@ func (CollatorProtocolValidatorSide) Name() parachaintypes.SubSystemName {
return parachaintypes.CollationProtocol
}

func (cpvs CollatorProtocolValidatorSide) ProcessOverseerSignals() {
func (cpvs CollatorProtocolValidatorSide) ProcessActiveLeavesUpdateSignal() {
// NOTE: nothing to do here
}

func (cpvs CollatorProtocolValidatorSide) ProcessBlockFinalizedSignal() {
// NOTE: nothing to do here
}

Expand Down Expand Up @@ -577,8 +580,14 @@ func (cpvs CollatorProtocolValidatorSide) processMessage(msg any) error {
Value: peerset.ReportBadCollatorValue,
Reason: peerset.ReportBadCollatorReason,
}, peerID)

case parachaintypes.ActiveLeavesUpdateSignal:
cpvs.ProcessActiveLeavesUpdateSignal()
case parachaintypes.BlockFinalizedSignal:
cpvs.ProcessBlockFinalizedSignal()

default:
return ErrUnknownOverseerMessage
return parachaintypes.ErrUnknownOverseerMessage
}

return nil
Expand Down
10 changes: 5 additions & 5 deletions dot/parachain/overseer/overseer.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@ func (o *Overseer) handleBlockEvents() {
}
*/
activeLeavesUpdate := ActiveLeavesUpdateSignal{
Activated: &ActivatedLeaf{
activeLeavesUpdate := parachaintypes.ActiveLeavesUpdateSignal{
Activated: &parachaintypes.ActivatedLeaf{
Hash: imported.Header.Hash(),
Number: uint32(imported.Header.Number),
},
Expand All @@ -202,7 +202,7 @@ func (o *Overseer) handleBlockEvents() {
}
}

o.broadcast(BlockFinalizedSignal{
o.broadcast(parachaintypes.BlockFinalizedSignal{
Hash: finalised.Header.Hash(),
BlockNumber: uint32(finalised.Header.Number),
})
Expand All @@ -212,7 +212,7 @@ func (o *Overseer) handleBlockEvents() {
// Our peers will be informed about our finalized block the next time we
// activating/deactivating some leaf.
if len(deactivated) > 0 {
o.broadcast(ActiveLeavesUpdateSignal{
o.broadcast(parachaintypes.ActiveLeavesUpdateSignal{
Deactivated: deactivated,
})
}
Expand Down Expand Up @@ -248,7 +248,7 @@ func (o *Overseer) Stop() error {
}

// sendActiveLeavesUpdate sends an ActiveLeavesUpdate to the subsystem
func (o *Overseer) sendActiveLeavesUpdate(update ActiveLeavesUpdateSignal, subsystem Subsystem) {
func (o *Overseer) sendActiveLeavesUpdate(update parachaintypes.ActiveLeavesUpdateSignal, subsystem Subsystem) {
o.subsystems[subsystem] <- update
}

Expand Down
34 changes: 19 additions & 15 deletions dot/parachain/overseer/overseer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,12 @@ func (s *TestSubsystem) Run(ctx context.Context, OverseerToSubSystem chan any, S
}
}

func (s *TestSubsystem) ProcessOverseerSignals() {
fmt.Printf("%s ProcessOverseerSignals\n", s.name)
func (s *TestSubsystem) ProcessActiveLeavesUpdateSignal() {
fmt.Printf("%s ProcessActiveLeavesUpdateSignal\n", s.name)
}

func (s *TestSubsystem) ProcessBlockFinalizedSignal() {
fmt.Printf("%s ProcessActiveLeavesUpdateSignal\n", s.name)
}

func (s *TestSubsystem) String() parachaintypes.SubSystemName {
Expand Down Expand Up @@ -90,11 +94,11 @@ func TestStart2SubsytemsActivate1(t *testing.T) {
}()

time.Sleep(1000 * time.Millisecond)
activedLeaf := ActivatedLeaf{
activedLeaf := parachaintypes.ActivatedLeaf{
Hash: [32]byte{1},
Number: 1,
}
overseer.sendActiveLeavesUpdate(ActiveLeavesUpdateSignal{Activated: &activedLeaf}, subSystem1)
overseer.sendActiveLeavesUpdate(parachaintypes.ActiveLeavesUpdateSignal{Activated: &activedLeaf}, subSystem1)

// let subsystems run for a bit
time.Sleep(4000 * time.Millisecond)
Expand Down Expand Up @@ -134,18 +138,18 @@ func TestStart2SubsytemsActivate2Different(t *testing.T) {
close(done)
}()

activedLeaf1 := ActivatedLeaf{
activedLeaf1 := parachaintypes.ActivatedLeaf{
Hash: [32]byte{1},
Number: 1,
}
activedLeaf2 := ActivatedLeaf{
activedLeaf2 := parachaintypes.ActivatedLeaf{
Hash: [32]byte{2},
Number: 2,
}
time.Sleep(250 * time.Millisecond)
overseer.sendActiveLeavesUpdate(ActiveLeavesUpdateSignal{Activated: &activedLeaf1}, subSystem1)
overseer.sendActiveLeavesUpdate(parachaintypes.ActiveLeavesUpdateSignal{Activated: &activedLeaf1}, subSystem1)
time.Sleep(400 * time.Millisecond)
overseer.sendActiveLeavesUpdate(ActiveLeavesUpdateSignal{Activated: &activedLeaf2}, subSystem2)
overseer.sendActiveLeavesUpdate(parachaintypes.ActiveLeavesUpdateSignal{Activated: &activedLeaf2}, subSystem2)
// let subsystems run for a bit
time.Sleep(3000 * time.Millisecond)

Expand Down Expand Up @@ -185,14 +189,14 @@ func TestStart2SubsytemsActivate2Same(t *testing.T) {
close(done)
}()

activedLeaf := ActivatedLeaf{
activedLeaf := parachaintypes.ActivatedLeaf{
Hash: [32]byte{1},
Number: 1,
}
time.Sleep(300 * time.Millisecond)
overseer.sendActiveLeavesUpdate(ActiveLeavesUpdateSignal{Activated: &activedLeaf}, subSystem1)
overseer.sendActiveLeavesUpdate(parachaintypes.ActiveLeavesUpdateSignal{Activated: &activedLeaf}, subSystem1)
time.Sleep(400 * time.Millisecond)
overseer.sendActiveLeavesUpdate(ActiveLeavesUpdateSignal{Activated: &activedLeaf}, subSystem2)
overseer.sendActiveLeavesUpdate(parachaintypes.ActiveLeavesUpdateSignal{Activated: &activedLeaf}, subSystem2)
// let subsystems run for a bit
time.Sleep(2000 * time.Millisecond)

Expand Down Expand Up @@ -237,12 +241,12 @@ func TestHandleBlockEvents(t *testing.T) {
continue
}

_, ok := msg.(BlockFinalizedSignal)
_, ok := msg.(parachaintypes.BlockFinalizedSignal)
if ok {
finalizedCounter.Add(1)
}

_, ok = msg.(ActiveLeavesUpdateSignal)
_, ok = msg.(parachaintypes.ActiveLeavesUpdateSignal)
if ok {
importedCounter.Add(1)
}
Expand All @@ -251,12 +255,12 @@ func TestHandleBlockEvents(t *testing.T) {
continue
}

_, ok := msg.(BlockFinalizedSignal)
_, ok := msg.(parachaintypes.BlockFinalizedSignal)
if ok {
finalizedCounter.Add(1)
}

_, ok = msg.(ActiveLeavesUpdateSignal)
_, ok = msg.(parachaintypes.ActiveLeavesUpdateSignal)
if ok {
importedCounter.Add(1)
}
Expand Down
25 changes: 2 additions & 23 deletions dot/parachain/overseer/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,35 +7,14 @@ import (
"context"

parachaintypes "github.com/ChainSafe/gossamer/dot/parachain/types"
"github.com/ChainSafe/gossamer/lib/common"
)

// ActivatedLeaf is a parachain head which we care to work on.
type ActivatedLeaf struct {
Hash common.Hash
Number uint32
}

// ActiveLeavesUpdateSignal changes in the set of active leaves: the parachain heads which we care to work on.
//
// note: activated field indicates deltas, not complete sets.
type ActiveLeavesUpdateSignal struct {
Activated *ActivatedLeaf
// Relay chain block hashes no longer of interest.
Deactivated []common.Hash
}

// BlockFinalized signal is used to inform subsystems of a finalized block.
type BlockFinalizedSignal struct {
Hash common.Hash
BlockNumber uint32
}

// Subsystem is an interface for subsystems to be registered with the overseer.
type Subsystem interface {
// Run runs the subsystem.
Run(ctx context.Context, OverseerToSubSystem chan any, SubSystemToOverseer chan any) error
Name() parachaintypes.SubSystemName
ProcessOverseerSignals()
ProcessActiveLeavesUpdateSignal()
ProcessBlockFinalizedSignal()
Stop()
}
30 changes: 30 additions & 0 deletions dot/parachain/types/overseer_signals.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package parachaintypes

import (
"errors"

"github.com/ChainSafe/gossamer/lib/common"
)

var ErrUnknownOverseerMessage = errors.New("unknown overseer message type")

// ActivatedLeaf is a parachain head which we care to work on.
type ActivatedLeaf struct {
Hash common.Hash
Number uint32
}

// ActiveLeavesUpdateSignal changes in the set of active leaves: the parachain heads which we care to work on.
//
// note: activated field indicates deltas, not complete sets.
type ActiveLeavesUpdateSignal struct {
Activated *ActivatedLeaf
// Relay chain block hashes no longer of interest.
Deactivated []common.Hash
}

// BlockFinalized signal is used to inform subsystems of a finalized block.
type BlockFinalizedSignal struct {
Hash common.Hash
BlockNumber uint32
}

0 comments on commit 97a2df1

Please sign in to comment.