diff --git a/dot/parachain/availability-store/availability_store.go b/dot/parachain/availability-store/availability_store.go index 9637844288..bd3dba7f00 100644 --- a/dot/parachain/availability-store/availability_store.go +++ b/dot/parachain/availability-store/availability_store.go @@ -178,7 +178,6 @@ func (av *AvailabilityStoreSubsystem) Run(ctx context.Context, OverseerToSubsyst av.wg.Add(2) go av.processMessages() - go av.ProcessOverseerSignals() return nil } @@ -235,6 +234,14 @@ func (av *AvailabilityStoreSubsystem) processMessages() { logger.Errorf("failed to handle store available data: %w", err) } + case parachaintypes.ActiveLeavesUpdateSignal: + av.ProcessActiveLeavesUpdateSignal() + + case parachaintypes.BlockFinalizedSignal: + av.ProcessBlockFinalizedSignal() + + default: + logger.Error(parachaintypes.ErrUnknownOverseerMessage.Error()) } case <-av.ctx.Done(): @@ -248,8 +255,11 @@ func (av *AvailabilityStoreSubsystem) processMessages() { } } -func (av *AvailabilityStoreSubsystem) ProcessOverseerSignals() { - av.wg.Done() +func (av *AvailabilityStoreSubsystem) ProcessActiveLeavesUpdateSignal() { + // TODO: #3630 +} + +func (av *AvailabilityStoreSubsystem) ProcessBlockFinalizedSignal() { // TODO: #3630 } diff --git a/dot/parachain/backing/candidate_backing.go b/dot/parachain/backing/candidate_backing.go index 9086eeddf9..c95fd5102b 100644 --- a/dot/parachain/backing/candidate_backing.go +++ b/dot/parachain/backing/candidate_backing.go @@ -80,7 +80,6 @@ func (cb *CandidateBacking) Run(ctx context.Context, overseerToSubSystem chan an cb.wg.Add(2) go cb.processMessages() - go cb.ProcessOverseerSignals() return nil } @@ -89,8 +88,11 @@ func (*CandidateBacking) Name() parachaintypes.SubSystemName { return parachaintypes.CandidateBacking } -func (cb *CandidateBacking) ProcessOverseerSignals() { - cb.wg.Done() +func (cb *CandidateBacking) ProcessActiveLeavesUpdateSignal() { + // TODO #3644 +} + +func (cb *CandidateBacking) ProcessBlockFinalizedSignal() { // TODO #3644 } @@ -101,8 +103,6 @@ func (cb *CandidateBacking) processMessages() { // process these received messages by referencing // https://github.com/paritytech/polkadot-sdk/blob/769bdd3ff33a291cbc70a800a3830638467e42a2/polkadot/node/core/backing/src/lib.rs#L741 switch msg.(type) { - case ActiveLeavesUpdate: - cb.handleActiveLeavesUpdate() case GetBackedCandidates: cb.handleGetBackedCandidates() case CanSecond: @@ -111,8 +111,13 @@ func (cb *CandidateBacking) processMessages() { cb.handleSecond() case Statement: cb.handleStatement() + case parachaintypes.ActiveLeavesUpdateSignal: + cb.ProcessActiveLeavesUpdateSignal() + case parachaintypes.BlockFinalizedSignal: + cb.ProcessBlockFinalizedSignal() default: - logger.Error("unknown message type") + logger.Error(parachaintypes.ErrUnknownOverseerMessage.Error()) + return } case <-cb.ctx.Done(): @@ -125,10 +130,6 @@ func (cb *CandidateBacking) processMessages() { } } -func (cb *CandidateBacking) handleActiveLeavesUpdate() { - // TODO: Implement this #3503 -} - func (cb *CandidateBacking) handleGetBackedCandidates() { // TODO: Implement this #3504 } diff --git a/dot/parachain/collator-protocol/validator_side.go b/dot/parachain/collator-protocol/validator_side.go index b1f0fa69e4..d1feb167f8 100644 --- a/dot/parachain/collator-protocol/validator_side.go +++ b/dot/parachain/collator-protocol/validator_side.go @@ -29,7 +29,6 @@ const ( var ( ErrUnexpectedMessageOnCollationProtocol = errors.New("unexpected message on collation protocol") ErrUnknownPeer = errors.New("unknown peer") - ErrUnknownOverseerMessage = errors.New("unknown overseer message type") ErrNotExpectedOnValidatorSide = errors.New("message is not expected on the validator side of the protocol") ErrCollationNotInView = errors.New("collation is not in our view") ErrPeerIDNotFoundForCollator = errors.New("peer id not found for collator") @@ -96,7 +95,11 @@ func (CollatorProtocolValidatorSide) Name() parachaintypes.SubSystemName { return parachaintypes.CollationProtocol } -func (cpvs CollatorProtocolValidatorSide) ProcessOverseerSignals() { +func (cpvs CollatorProtocolValidatorSide) ProcessActiveLeavesUpdateSignal() { + // NOTE: nothing to do here +} + +func (cpvs CollatorProtocolValidatorSide) ProcessBlockFinalizedSignal() { // NOTE: nothing to do here } @@ -577,8 +580,14 @@ func (cpvs CollatorProtocolValidatorSide) processMessage(msg any) error { Value: peerset.ReportBadCollatorValue, Reason: peerset.ReportBadCollatorReason, }, peerID) + + case parachaintypes.ActiveLeavesUpdateSignal: + cpvs.ProcessActiveLeavesUpdateSignal() + case parachaintypes.BlockFinalizedSignal: + cpvs.ProcessBlockFinalizedSignal() + default: - return ErrUnknownOverseerMessage + return parachaintypes.ErrUnknownOverseerMessage } return nil diff --git a/dot/parachain/overseer/overseer.go b/dot/parachain/overseer/overseer.go index 54cfe87f76..ea7b32ab76 100644 --- a/dot/parachain/overseer/overseer.go +++ b/dot/parachain/overseer/overseer.go @@ -182,8 +182,8 @@ func (o *Overseer) handleBlockEvents() { } */ - activeLeavesUpdate := ActiveLeavesUpdateSignal{ - Activated: &ActivatedLeaf{ + activeLeavesUpdate := parachaintypes.ActiveLeavesUpdateSignal{ + Activated: ¶chaintypes.ActivatedLeaf{ Hash: imported.Header.Hash(), Number: uint32(imported.Header.Number), }, @@ -202,7 +202,7 @@ func (o *Overseer) handleBlockEvents() { } } - o.broadcast(BlockFinalizedSignal{ + o.broadcast(parachaintypes.BlockFinalizedSignal{ Hash: finalised.Header.Hash(), BlockNumber: uint32(finalised.Header.Number), }) @@ -212,7 +212,7 @@ func (o *Overseer) handleBlockEvents() { // Our peers will be informed about our finalized block the next time we // activating/deactivating some leaf. if len(deactivated) > 0 { - o.broadcast(ActiveLeavesUpdateSignal{ + o.broadcast(parachaintypes.ActiveLeavesUpdateSignal{ Deactivated: deactivated, }) } @@ -248,7 +248,7 @@ func (o *Overseer) Stop() error { } // sendActiveLeavesUpdate sends an ActiveLeavesUpdate to the subsystem -func (o *Overseer) sendActiveLeavesUpdate(update ActiveLeavesUpdateSignal, subsystem Subsystem) { +func (o *Overseer) sendActiveLeavesUpdate(update parachaintypes.ActiveLeavesUpdateSignal, subsystem Subsystem) { o.subsystems[subsystem] <- update } diff --git a/dot/parachain/overseer/overseer_test.go b/dot/parachain/overseer/overseer_test.go index 6e322a6606..8ff1b5cd72 100644 --- a/dot/parachain/overseer/overseer_test.go +++ b/dot/parachain/overseer/overseer_test.go @@ -48,8 +48,12 @@ func (s *TestSubsystem) Run(ctx context.Context, OverseerToSubSystem chan any, S } } -func (s *TestSubsystem) ProcessOverseerSignals() { - fmt.Printf("%s ProcessOverseerSignals\n", s.name) +func (s *TestSubsystem) ProcessActiveLeavesUpdateSignal() { + fmt.Printf("%s ProcessActiveLeavesUpdateSignal\n", s.name) +} + +func (s *TestSubsystem) ProcessBlockFinalizedSignal() { + fmt.Printf("%s ProcessActiveLeavesUpdateSignal\n", s.name) } func (s *TestSubsystem) String() parachaintypes.SubSystemName { @@ -90,11 +94,11 @@ func TestStart2SubsytemsActivate1(t *testing.T) { }() time.Sleep(1000 * time.Millisecond) - activedLeaf := ActivatedLeaf{ + activedLeaf := parachaintypes.ActivatedLeaf{ Hash: [32]byte{1}, Number: 1, } - overseer.sendActiveLeavesUpdate(ActiveLeavesUpdateSignal{Activated: &activedLeaf}, subSystem1) + overseer.sendActiveLeavesUpdate(parachaintypes.ActiveLeavesUpdateSignal{Activated: &activedLeaf}, subSystem1) // let subsystems run for a bit time.Sleep(4000 * time.Millisecond) @@ -134,18 +138,18 @@ func TestStart2SubsytemsActivate2Different(t *testing.T) { close(done) }() - activedLeaf1 := ActivatedLeaf{ + activedLeaf1 := parachaintypes.ActivatedLeaf{ Hash: [32]byte{1}, Number: 1, } - activedLeaf2 := ActivatedLeaf{ + activedLeaf2 := parachaintypes.ActivatedLeaf{ Hash: [32]byte{2}, Number: 2, } time.Sleep(250 * time.Millisecond) - overseer.sendActiveLeavesUpdate(ActiveLeavesUpdateSignal{Activated: &activedLeaf1}, subSystem1) + overseer.sendActiveLeavesUpdate(parachaintypes.ActiveLeavesUpdateSignal{Activated: &activedLeaf1}, subSystem1) time.Sleep(400 * time.Millisecond) - overseer.sendActiveLeavesUpdate(ActiveLeavesUpdateSignal{Activated: &activedLeaf2}, subSystem2) + overseer.sendActiveLeavesUpdate(parachaintypes.ActiveLeavesUpdateSignal{Activated: &activedLeaf2}, subSystem2) // let subsystems run for a bit time.Sleep(3000 * time.Millisecond) @@ -185,14 +189,14 @@ func TestStart2SubsytemsActivate2Same(t *testing.T) { close(done) }() - activedLeaf := ActivatedLeaf{ + activedLeaf := parachaintypes.ActivatedLeaf{ Hash: [32]byte{1}, Number: 1, } time.Sleep(300 * time.Millisecond) - overseer.sendActiveLeavesUpdate(ActiveLeavesUpdateSignal{Activated: &activedLeaf}, subSystem1) + overseer.sendActiveLeavesUpdate(parachaintypes.ActiveLeavesUpdateSignal{Activated: &activedLeaf}, subSystem1) time.Sleep(400 * time.Millisecond) - overseer.sendActiveLeavesUpdate(ActiveLeavesUpdateSignal{Activated: &activedLeaf}, subSystem2) + overseer.sendActiveLeavesUpdate(parachaintypes.ActiveLeavesUpdateSignal{Activated: &activedLeaf}, subSystem2) // let subsystems run for a bit time.Sleep(2000 * time.Millisecond) @@ -237,12 +241,12 @@ func TestHandleBlockEvents(t *testing.T) { continue } - _, ok := msg.(BlockFinalizedSignal) + _, ok := msg.(parachaintypes.BlockFinalizedSignal) if ok { finalizedCounter.Add(1) } - _, ok = msg.(ActiveLeavesUpdateSignal) + _, ok = msg.(parachaintypes.ActiveLeavesUpdateSignal) if ok { importedCounter.Add(1) } @@ -251,12 +255,12 @@ func TestHandleBlockEvents(t *testing.T) { continue } - _, ok := msg.(BlockFinalizedSignal) + _, ok := msg.(parachaintypes.BlockFinalizedSignal) if ok { finalizedCounter.Add(1) } - _, ok = msg.(ActiveLeavesUpdateSignal) + _, ok = msg.(parachaintypes.ActiveLeavesUpdateSignal) if ok { importedCounter.Add(1) } diff --git a/dot/parachain/overseer/types.go b/dot/parachain/overseer/types.go index 7a1aadf427..3e2d1d6150 100644 --- a/dot/parachain/overseer/types.go +++ b/dot/parachain/overseer/types.go @@ -7,35 +7,14 @@ import ( "context" parachaintypes "github.com/ChainSafe/gossamer/dot/parachain/types" - "github.com/ChainSafe/gossamer/lib/common" ) -// ActivatedLeaf is a parachain head which we care to work on. -type ActivatedLeaf struct { - Hash common.Hash - Number uint32 -} - -// ActiveLeavesUpdateSignal changes in the set of active leaves: the parachain heads which we care to work on. -// -// note: activated field indicates deltas, not complete sets. -type ActiveLeavesUpdateSignal struct { - Activated *ActivatedLeaf - // Relay chain block hashes no longer of interest. - Deactivated []common.Hash -} - -// BlockFinalized signal is used to inform subsystems of a finalized block. -type BlockFinalizedSignal struct { - Hash common.Hash - BlockNumber uint32 -} - // Subsystem is an interface for subsystems to be registered with the overseer. type Subsystem interface { // Run runs the subsystem. Run(ctx context.Context, OverseerToSubSystem chan any, SubSystemToOverseer chan any) error Name() parachaintypes.SubSystemName - ProcessOverseerSignals() + ProcessActiveLeavesUpdateSignal() + ProcessBlockFinalizedSignal() Stop() } diff --git a/dot/parachain/types/overseer_signals.go b/dot/parachain/types/overseer_signals.go new file mode 100644 index 0000000000..2c5d143570 --- /dev/null +++ b/dot/parachain/types/overseer_signals.go @@ -0,0 +1,30 @@ +package parachaintypes + +import ( + "errors" + + "github.com/ChainSafe/gossamer/lib/common" +) + +var ErrUnknownOverseerMessage = errors.New("unknown overseer message type") + +// ActivatedLeaf is a parachain head which we care to work on. +type ActivatedLeaf struct { + Hash common.Hash + Number uint32 +} + +// ActiveLeavesUpdateSignal changes in the set of active leaves: the parachain heads which we care to work on. +// +// note: activated field indicates deltas, not complete sets. +type ActiveLeavesUpdateSignal struct { + Activated *ActivatedLeaf + // Relay chain block hashes no longer of interest. + Deactivated []common.Hash +} + +// BlockFinalized signal is used to inform subsystems of a finalized block. +type BlockFinalizedSignal struct { + Hash common.Hash + BlockNumber uint32 +}