diff --git a/dot/parachain/availability-store/availability_store.go b/dot/parachain/availability-store/availability_store.go index d19e31f2307..0cb7f703159 100644 --- a/dot/parachain/availability-store/availability_store.go +++ b/dot/parachain/availability-store/availability_store.go @@ -9,6 +9,7 @@ import ( "encoding/json" "errors" "fmt" + "sync" parachaintypes "github.com/ChainSafe/gossamer/dot/parachain/types" "github.com/ChainSafe/gossamer/internal/database" @@ -28,6 +29,10 @@ const ( // AvailabilityStoreSubsystem is the struct that holds subsystem data for the availability store type AvailabilityStoreSubsystem struct { + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + SubSystemToOverseer chan<- any OverseerToSubSystem <-chan any availabilityStore AvailabilityStore @@ -170,7 +175,10 @@ func uint32ToBytes(value uint32) []byte { // Run runs the availability store subsystem func (av *AvailabilityStoreSubsystem) Run(ctx context.Context, OverseerToSubsystem chan any, SubsystemToOverseer chan any) error { - av.processMessages() + + av.wg.Add(1) + go av.processMessages() + return nil } @@ -180,53 +188,81 @@ func (*AvailabilityStoreSubsystem) Name() parachaintypes.SubSystemName { } func (av *AvailabilityStoreSubsystem) processMessages() { - for msg := range av.OverseerToSubSystem { - logger.Debugf("received message %v", msg) - switch msg := msg.(type) { - case QueryAvailableData: - err := av.handleQueryAvailableData(msg) - if err != nil { - logger.Errorf("failed to handle available data: %w", err) - } - case QueryDataAvailability: - err := av.handleQueryDataAvailability(msg) - if err != nil { - logger.Errorf("failed to handle query data availability: %w", err) - } - case QueryChunk: - err := av.handleQueryChunk(msg) - if err != nil { - logger.Errorf("failed to handle query chunk: %w", err) - } - case QueryChunkSize: - err := av.handleQueryChunkSize(msg) - if err != nil { - logger.Errorf("failed to handle query chunk size: %w", err) - } - case QueryAllChunks: - err := av.handleQueryAllChunks(msg) - if err != nil { - logger.Errorf("failed to handle query all chunks: %w", err) - } - case QueryChunkAvailability: - err := av.handleQueryChunkAvailability(msg) - if err != nil { - logger.Errorf("failed to handle query chunk availability: %w", err) - } - case StoreChunk: - err := av.handleStoreChunk(msg) - if err != nil { - logger.Errorf("failed to handle store chunk: %w", err) + for { + select { + case msg := <-av.OverseerToSubSystem: + logger.Debugf("received message %v", msg) + switch msg := msg.(type) { + case QueryAvailableData: + err := av.handleQueryAvailableData(msg) + if err != nil { + logger.Errorf("failed to handle available data: %w", err) + } + case QueryDataAvailability: + err := av.handleQueryDataAvailability(msg) + if err != nil { + logger.Errorf("failed to handle query data availability: %w", err) + } + case QueryChunk: + err := av.handleQueryChunk(msg) + if err != nil { + logger.Errorf("failed to handle query chunk: %w", err) + } + case QueryChunkSize: + err := av.handleQueryChunkSize(msg) + if err != nil { + logger.Errorf("failed to handle query chunk size: %w", err) + } + case QueryAllChunks: + err := av.handleQueryAllChunks(msg) + if err != nil { + logger.Errorf("failed to handle query all chunks: %w", err) + } + case QueryChunkAvailability: + err := av.handleQueryChunkAvailability(msg) + if err != nil { + logger.Errorf("failed to handle query chunk availability: %w", err) + } + case StoreChunk: + err := av.handleStoreChunk(msg) + if err != nil { + logger.Errorf("failed to handle store chunk: %w", err) + } + case StoreAvailableData: + err := av.handleStoreAvailableData(msg) + if err != nil { + logger.Errorf("failed to handle store available data: %w", err) + } + + case parachaintypes.ActiveLeavesUpdateSignal: + av.ProcessActiveLeavesUpdateSignal() + + case parachaintypes.BlockFinalizedSignal: + av.ProcessBlockFinalizedSignal() + + default: + logger.Error(parachaintypes.ErrUnknownOverseerMessage.Error()) } - case StoreAvailableData: - err := av.handleStoreAvailableData(msg) - if err != nil { - logger.Errorf("failed to handle store available data: %w", err) + + case <-av.ctx.Done(): + if err := av.ctx.Err(); err != nil { + logger.Errorf("ctx error: %v\n", err) } + av.wg.Done() + return } + } } +func (av *AvailabilityStoreSubsystem) ProcessActiveLeavesUpdateSignal() { + // TODO: #3630 +} + +func (av *AvailabilityStoreSubsystem) ProcessBlockFinalizedSignal() { + // TODO: #3630 +} + func (av *AvailabilityStoreSubsystem) handleQueryAvailableData(msg QueryAvailableData) error { result, err := av.availabilityStore.loadAvailableData(msg.CandidateHash) if err != nil { @@ -333,3 +369,8 @@ func (av *AvailabilityStoreSubsystem) handleStoreAvailableData(msg StoreAvailabl msg.Sender <- err // TODO: determine how to replicate Rust's Result type return nil } + +func (av *AvailabilityStoreSubsystem) Stop() { + av.cancel() + av.wg.Wait() +} diff --git a/dot/parachain/availability-store/availability_store_test.go b/dot/parachain/availability-store/availability_store_test.go index 385ad5bb087..f0b85625cf1 100644 --- a/dot/parachain/availability-store/availability_store_test.go +++ b/dot/parachain/availability-store/availability_store_test.go @@ -1,3 +1,6 @@ +// Copyright 2023 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + package availabilitystore import ( diff --git a/dot/parachain/backing/candidate_backing.go b/dot/parachain/backing/candidate_backing.go index f18dcb67f1b..dcf5e4febd6 100644 --- a/dot/parachain/backing/candidate_backing.go +++ b/dot/parachain/backing/candidate_backing.go @@ -5,6 +5,7 @@ package backing import ( "context" + "sync" parachaintypes "github.com/ChainSafe/gossamer/dot/parachain/types" "github.com/ChainSafe/gossamer/internal/log" @@ -14,6 +15,10 @@ import ( var logger = log.NewFromGlobal(log.AddContext("pkg", "parachain-candidate-backing")) type CandidateBacking struct { + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + SubSystemToOverseer chan<- any OverseerToSubSystem <-chan any } @@ -72,7 +77,10 @@ func (cb *CandidateBacking) Run(ctx context.Context, overseerToSubSystem chan an // other backing related overseer message. // This would become more clear after we complete processMessages function. It would give us clarity // if we need background_validation_rx or background_validation_tx, as done in rust. - cb.processMessages() + + cb.wg.Add(1) + go cb.processMessages() + return nil } @@ -80,31 +88,48 @@ func (*CandidateBacking) Name() parachaintypes.SubSystemName { return parachaintypes.CandidateBacking } +func (cb *CandidateBacking) ProcessActiveLeavesUpdateSignal() { + // TODO #3644 +} + +func (cb *CandidateBacking) ProcessBlockFinalizedSignal() { + // TODO #3644 +} + func (cb *CandidateBacking) processMessages() { - for msg := range cb.OverseerToSubSystem { - // process these received messages by referencing - // https://github.com/paritytech/polkadot-sdk/blob/769bdd3ff33a291cbc70a800a3830638467e42a2/polkadot/node/core/backing/src/lib.rs#L741 - switch msg.(type) { - case ActiveLeavesUpdate: - cb.handleActiveLeavesUpdate() - case GetBackedCandidates: - cb.handleGetBackedCandidates() - case CanSecond: - cb.handleCanSecond() - case Second: - cb.handleSecond() - case Statement: - cb.handleStatement() - default: - logger.Error("unknown message type") + for { + select { + case msg := <-cb.OverseerToSubSystem: + // process these received messages by referencing + // https://github.com/paritytech/polkadot-sdk/blob/769bdd3ff33a291cbc70a800a3830638467e42a2/polkadot/node/core/backing/src/lib.rs#L741 + switch msg.(type) { + case GetBackedCandidates: + cb.handleGetBackedCandidates() + case CanSecond: + cb.handleCanSecond() + case Second: + cb.handleSecond() + case Statement: + cb.handleStatement() + case parachaintypes.ActiveLeavesUpdateSignal: + cb.ProcessActiveLeavesUpdateSignal() + case parachaintypes.BlockFinalizedSignal: + cb.ProcessBlockFinalizedSignal() + default: + logger.Error(parachaintypes.ErrUnknownOverseerMessage.Error()) + return + } + + case <-cb.ctx.Done(): + if err := cb.ctx.Err(); err != nil { + logger.Errorf("ctx error: %v\n", err) + } + cb.wg.Done() + return } } } -func (cb *CandidateBacking) handleActiveLeavesUpdate() { - // TODO: Implement this #3503 -} - func (cb *CandidateBacking) handleGetBackedCandidates() { // TODO: Implement this #3504 } @@ -121,6 +146,11 @@ func (cb *CandidateBacking) handleStatement() { // TODO: Implement this #3507 } +func (cb *CandidateBacking) Stop() { + cb.cancel() + cb.wg.Wait() +} + // SignedFullStatementWithPVD represents a signed full statement along with associated Persisted Validation Data (PVD). type SignedFullStatementWithPVD struct { SignedFullStatement parachaintypes.UncheckedSignedFullStatement diff --git a/dot/parachain/collator-protocol/validator_side.go b/dot/parachain/collator-protocol/validator_side.go index a577314a1b5..288b2bd83c7 100644 --- a/dot/parachain/collator-protocol/validator_side.go +++ b/dot/parachain/collator-protocol/validator_side.go @@ -29,7 +29,6 @@ const ( var ( ErrUnexpectedMessageOnCollationProtocol = errors.New("unexpected message on collation protocol") ErrUnknownPeer = errors.New("unknown peer") - ErrUnknownOverseerMessage = errors.New("unknown overseer message type") ErrNotExpectedOnValidatorSide = errors.New("message is not expected on the validator side of the protocol") ErrCollationNotInView = errors.New("collation is not in our view") ErrPeerIDNotFoundForCollator = errors.New("peer id not found for collator") @@ -83,6 +82,11 @@ func (cpvs CollatorProtocolValidatorSide) Run( cpvs.fetchedCollations = append(cpvs.fetchedCollations, *collation) } + case <-cpvs.ctx.Done(): + if err := cpvs.ctx.Err(); err != nil { + logger.Errorf("ctx error: %v\n", err) + } + return nil } } } @@ -91,6 +95,18 @@ func (CollatorProtocolValidatorSide) Name() parachaintypes.SubSystemName { return parachaintypes.CollationProtocol } +func (cpvs CollatorProtocolValidatorSide) ProcessActiveLeavesUpdateSignal() { + // NOTE: nothing to do here +} + +func (cpvs CollatorProtocolValidatorSide) ProcessBlockFinalizedSignal() { + // NOTE: nothing to do here +} + +func (cpvs CollatorProtocolValidatorSide) Stop() { + cpvs.cancel() +} + // requestCollation requests a collation from the network. // This function will // - check for duplicate requests @@ -291,6 +307,9 @@ type CollationEvent struct { } type CollatorProtocolValidatorSide struct { + ctx context.Context + cancel context.CancelFunc + net Network SubSystemToOverseer chan<- any @@ -561,8 +580,14 @@ func (cpvs CollatorProtocolValidatorSide) processMessage(msg any) error { Value: peerset.ReportBadCollatorValue, Reason: peerset.ReportBadCollatorReason, }, peerID) + + case parachaintypes.ActiveLeavesUpdateSignal: + cpvs.ProcessActiveLeavesUpdateSignal() + case parachaintypes.BlockFinalizedSignal: + cpvs.ProcessBlockFinalizedSignal() + default: - return ErrUnknownOverseerMessage + return parachaintypes.ErrUnknownOverseerMessage } return nil diff --git a/dot/parachain/overseer/mocks_generate_test.go b/dot/parachain/overseer/mocks_generate_test.go new file mode 100644 index 00000000000..38ea06d27e0 --- /dev/null +++ b/dot/parachain/overseer/mocks_generate_test.go @@ -0,0 +1,6 @@ +// Copyright 2023 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package overseer + +//go:generate mockgen -destination=mocks_test.go -package=$GOPACKAGE . BlockState diff --git a/dot/parachain/overseer/mocks_test.go b/dot/parachain/overseer/mocks_test.go new file mode 100644 index 00000000000..186a581e01c --- /dev/null +++ b/dot/parachain/overseer/mocks_test.go @@ -0,0 +1,87 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/ChainSafe/gossamer/dot/parachain/overseer (interfaces: BlockState) + +// Package overseer is a generated GoMock package. +package overseer + +import ( + reflect "reflect" + + types "github.com/ChainSafe/gossamer/dot/types" + gomock "go.uber.org/mock/gomock" +) + +// MockBlockState is a mock of BlockState interface. +type MockBlockState struct { + ctrl *gomock.Controller + recorder *MockBlockStateMockRecorder +} + +// MockBlockStateMockRecorder is the mock recorder for MockBlockState. +type MockBlockStateMockRecorder struct { + mock *MockBlockState +} + +// NewMockBlockState creates a new mock instance. +func NewMockBlockState(ctrl *gomock.Controller) *MockBlockState { + mock := &MockBlockState{ctrl: ctrl} + mock.recorder = &MockBlockStateMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockBlockState) EXPECT() *MockBlockStateMockRecorder { + return m.recorder +} + +// FreeFinalisedNotifierChannel mocks base method. +func (m *MockBlockState) FreeFinalisedNotifierChannel(arg0 chan *types.FinalisationInfo) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "FreeFinalisedNotifierChannel", arg0) +} + +// FreeFinalisedNotifierChannel indicates an expected call of FreeFinalisedNotifierChannel. +func (mr *MockBlockStateMockRecorder) FreeFinalisedNotifierChannel(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FreeFinalisedNotifierChannel", reflect.TypeOf((*MockBlockState)(nil).FreeFinalisedNotifierChannel), arg0) +} + +// FreeImportedBlockNotifierChannel mocks base method. +func (m *MockBlockState) FreeImportedBlockNotifierChannel(arg0 chan *types.Block) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "FreeImportedBlockNotifierChannel", arg0) +} + +// FreeImportedBlockNotifierChannel indicates an expected call of FreeImportedBlockNotifierChannel. +func (mr *MockBlockStateMockRecorder) FreeImportedBlockNotifierChannel(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FreeImportedBlockNotifierChannel", reflect.TypeOf((*MockBlockState)(nil).FreeImportedBlockNotifierChannel), arg0) +} + +// GetFinalisedNotifierChannel mocks base method. +func (m *MockBlockState) GetFinalisedNotifierChannel() chan *types.FinalisationInfo { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetFinalisedNotifierChannel") + ret0, _ := ret[0].(chan *types.FinalisationInfo) + return ret0 +} + +// GetFinalisedNotifierChannel indicates an expected call of GetFinalisedNotifierChannel. +func (mr *MockBlockStateMockRecorder) GetFinalisedNotifierChannel() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetFinalisedNotifierChannel", reflect.TypeOf((*MockBlockState)(nil).GetFinalisedNotifierChannel)) +} + +// GetImportedBlockNotifierChannel mocks base method. +func (m *MockBlockState) GetImportedBlockNotifierChannel() chan *types.Block { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetImportedBlockNotifierChannel") + ret0, _ := ret[0].(chan *types.Block) + return ret0 +} + +// GetImportedBlockNotifierChannel indicates an expected call of GetImportedBlockNotifierChannel. +func (mr *MockBlockStateMockRecorder) GetImportedBlockNotifierChannel() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetImportedBlockNotifierChannel", reflect.TypeOf((*MockBlockState)(nil).GetImportedBlockNotifierChannel)) +} diff --git a/dot/parachain/overseer/overseer.go b/dot/parachain/overseer/overseer.go index dd112c6868f..9d247270285 100644 --- a/dot/parachain/overseer/overseer.go +++ b/dot/parachain/overseer/overseer.go @@ -12,6 +12,8 @@ import ( availability_store "github.com/ChainSafe/gossamer/dot/parachain/availability-store" "github.com/ChainSafe/gossamer/dot/parachain/backing" collatorprotocol "github.com/ChainSafe/gossamer/dot/parachain/collator-protocol" + "github.com/ChainSafe/gossamer/dot/types" + "github.com/ChainSafe/gossamer/lib/common" parachaintypes "github.com/ChainSafe/gossamer/dot/parachain/types" "github.com/ChainSafe/gossamer/internal/log" @@ -22,22 +24,41 @@ var ( ) type Overseer struct { - ctx context.Context - cancel context.CancelFunc - errChan chan error // channel for overseer to send errors to service that started it + ctx context.Context + cancel context.CancelFunc + errChan chan error // channel for overseer to send errors to service that started it + + blockState BlockState + activeLeaves map[common.Hash]uint32 + + // block notification channels + imported chan *types.Block + finalised chan *types.FinalisationInfo + SubsystemsToOverseer chan any subsystems map[Subsystem]chan any // map[Subsystem]OverseerToSubSystem channel nameToSubsystem map[parachaintypes.SubSystemName]Subsystem wg sync.WaitGroup } -func NewOverseer() *Overseer { +// BlockState interface for block state methods +type BlockState interface { + GetImportedBlockNotifierChannel() chan *types.Block + FreeImportedBlockNotifierChannel(ch chan *types.Block) + GetFinalisedNotifierChannel() chan *types.FinalisationInfo + FreeFinalisedNotifierChannel(ch chan *types.FinalisationInfo) +} + +func NewOverseer(blockState BlockState) *Overseer { ctx := context.Background() ctx, cancel := context.WithCancel(ctx) + return &Overseer{ ctx: ctx, cancel: cancel, errChan: make(chan error), + blockState: blockState, + activeLeaves: make(map[common.Hash]uint32), SubsystemsToOverseer: make(chan any), subsystems: make(map[Subsystem]chan any), nameToSubsystem: make(map[parachaintypes.SubSystemName]Subsystem), @@ -55,6 +76,13 @@ func (o *Overseer) RegisterSubsystem(subsystem Subsystem) chan any { } func (o *Overseer) Start() error { + + imported := o.blockState.GetImportedBlockNotifierChannel() + finalised := o.blockState.GetFinalisedNotifierChannel() + + o.imported = imported + o.finalised = finalised + // start subsystems for subsystem, overseerToSubSystem := range o.subsystems { o.wg.Add(1) @@ -68,10 +96,10 @@ func (o *Overseer) Start() error { }(subsystem, overseerToSubSystem) } - o.wg.Add(1) + o.wg.Add(2) go o.processMessages() + go o.handleBlockEvents() - // TODO: add logic to start listening for Block Imported events and Finalisation events return nil } @@ -109,15 +137,101 @@ func (o *Overseer) processMessages() { if err := o.ctx.Err(); err != nil { logger.Errorf("ctx error: %v\n", err) } - logger.Info("overseer stopping") + o.wg.Done() return } } } +func (o *Overseer) handleBlockEvents() { + for { + select { + case <-o.ctx.Done(): + if err := o.ctx.Err(); err != nil { + logger.Errorf("ctx error: %v\n", err) + } + o.wg.Done() + return + case imported := <-o.imported: + blockNumber, ok := o.activeLeaves[imported.Header.Hash()] + if ok { + if blockNumber != uint32(imported.Header.Number) { + panic("block number mismatch") + } + return + } + + o.activeLeaves[imported.Header.Hash()] = uint32(imported.Header.Number) + delete(o.activeLeaves, imported.Header.ParentHash) + + // TODO: + /* + - Add active leaf only if given head supports parachain consensus. + - You do that by checking the parachain host runtime api version. + - If the parachain host runtime api version is at least 1, then the parachain consensus is supported. + + #[async_trait::async_trait] + impl HeadSupportsParachains for Arc + where + Client: RuntimeApiSubsystemClient + Sync + Send, + { + async fn head_supports_parachains(&self, head: &Hash) -> bool { + // Check that the `ParachainHost` runtime api is at least with version 1 present on chain. + self.api_version_parachain_host(*head).await.ok().flatten().unwrap_or(0) >= 1 + } + } + + */ + activeLeavesUpdate := parachaintypes.ActiveLeavesUpdateSignal{ + Activated: ¶chaintypes.ActivatedLeaf{ + Hash: imported.Header.Hash(), + Number: uint32(imported.Header.Number), + }, + Deactivated: []common.Hash{imported.Header.ParentHash}, + } + + o.broadcast(activeLeavesUpdate) + + case finalised := <-o.finalised: + deactivated := make([]common.Hash, 0) + + for hash, blockNumber := range o.activeLeaves { + if blockNumber <= uint32(finalised.Header.Number) && hash != finalised.Header.Hash() { + deactivated = append(deactivated, hash) + delete(o.activeLeaves, hash) + } + } + + o.broadcast(parachaintypes.BlockFinalizedSignal{ + Hash: finalised.Header.Hash(), + BlockNumber: uint32(finalised.Header.Number), + }) + + // If there are no leaves being deactivated, we don't need to send an update. + // + // Our peers will be informed about our finalized block the next time we + // activating/deactivating some leaf. + if len(deactivated) > 0 { + o.broadcast(parachaintypes.ActiveLeavesUpdateSignal{ + Deactivated: deactivated, + }) + } + } + } +} + +func (o *Overseer) broadcast(msg any) { + for _, overseerToSubSystem := range o.subsystems { + overseerToSubSystem <- msg + } +} + func (o *Overseer) Stop() error { o.cancel() + o.blockState.FreeImportedBlockNotifierChannel(o.imported) + o.blockState.FreeFinalisedNotifierChannel(o.finalised) + // close the errorChan to unblock any listeners on the errChan close(o.errChan) @@ -133,11 +247,6 @@ func (o *Overseer) Stop() error { return nil } -// sendActiveLeavesUpdate sends an ActiveLeavesUpdate to the subsystem -func (o *Overseer) sendActiveLeavesUpdate(update ActiveLeavesUpdate, subsystem Subsystem) { - o.subsystems[subsystem] <- update -} - func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) (timeouted bool) { c := make(chan struct{}) go func() { diff --git a/dot/parachain/overseer/overseer_test.go b/dot/parachain/overseer/overseer_test.go index 81ccb745d04..7bfc0f3cce2 100644 --- a/dot/parachain/overseer/overseer_test.go +++ b/dot/parachain/overseer/overseer_test.go @@ -7,11 +7,14 @@ import ( "context" "fmt" "math/rand" + "sync/atomic" "testing" "time" parachaintypes "github.com/ChainSafe/gossamer/dot/parachain/types" + types "github.com/ChainSafe/gossamer/dot/types" "github.com/stretchr/testify/require" + gomock "go.uber.org/mock/gomock" ) type TestSubsystem struct { @@ -45,56 +48,35 @@ func (s *TestSubsystem) Run(ctx context.Context, OverseerToSubSystem chan any, S } } -func (s *TestSubsystem) String() parachaintypes.SubSystemName { - return parachaintypes.SubSystemName(s.name) +func (s *TestSubsystem) ProcessActiveLeavesUpdateSignal() { + fmt.Printf("%s ProcessActiveLeavesUpdateSignal\n", s.name) } -func TestStart2SubsytemsActivate1(t *testing.T) { - overseer := NewOverseer() - require.NotNil(t, overseer) - - subSystem1 := &TestSubsystem{name: "subSystem1"} - subSystem2 := &TestSubsystem{name: "subSystem2"} - - overseerToSubSystem1 := overseer.RegisterSubsystem(subSystem1) - overseerToSubSystem2 := overseer.RegisterSubsystem(subSystem2) +func (s *TestSubsystem) ProcessBlockFinalizedSignal() { + fmt.Printf("%s ProcessActiveLeavesUpdateSignal\n", s.name) +} - go func() { - <-overseerToSubSystem1 - <-overseerToSubSystem2 - }() +func (s *TestSubsystem) String() parachaintypes.SubSystemName { + return parachaintypes.SubSystemName(s.name) +} - err := overseer.Start() - require.NoError(t, err) +func (s *TestSubsystem) Stop() {} - done := make(chan struct{}) - // listen for errors from overseer - go func() { - for errC := range overseer.errChan { - fmt.Printf("overseer start error: %v\n", errC) - } - close(done) - }() +func TestHandleBlockEvents(t *testing.T) { + ctrl := gomock.NewController(t) - time.Sleep(1000 * time.Millisecond) - activedLeaf := ActivatedLeaf{ - Hash: [32]byte{1}, - Number: 1, - } - overseer.sendActiveLeavesUpdate(ActiveLeavesUpdate{Activated: activedLeaf}, subSystem1) + blockState := NewMockBlockState(ctrl) - // let subsystems run for a bit - time.Sleep(4000 * time.Millisecond) + finalizedNotifierChan := make(chan *types.FinalisationInfo) + importedBlockNotiferChan := make(chan *types.Block) - err = overseer.Stop() - require.NoError(t, err) + blockState.EXPECT().GetFinalisedNotifierChannel().Return(finalizedNotifierChan) + blockState.EXPECT().GetImportedBlockNotifierChannel().Return(importedBlockNotiferChan) + blockState.EXPECT().FreeFinalisedNotifierChannel(finalizedNotifierChan) + blockState.EXPECT().FreeImportedBlockNotifierChannel(importedBlockNotiferChan) - fmt.Printf("overseer stopped\n") - <-done -} + overseer := NewOverseer(blockState) -func TestStart2SubsytemsActivate2Different(t *testing.T) { - overseer := NewOverseer() require.NotNil(t, overseer) subSystem1 := &TestSubsystem{name: "subSystem1"} @@ -103,82 +85,55 @@ func TestStart2SubsytemsActivate2Different(t *testing.T) { overseerToSubSystem1 := overseer.RegisterSubsystem(subSystem1) overseerToSubSystem2 := overseer.RegisterSubsystem(subSystem2) - go func() { - <-overseerToSubSystem1 - <-overseerToSubSystem2 - }() + var finalizedCounter atomic.Int32 + var importedCounter atomic.Int32 - err := overseer.Start() - require.NoError(t, err) - done := make(chan struct{}) go func() { - for errC := range overseer.errChan { - fmt.Printf("overseer start error: %v\n", errC) - } - close(done) - }() - - activedLeaf1 := ActivatedLeaf{ - Hash: [32]byte{1}, - Number: 1, - } - activedLeaf2 := ActivatedLeaf{ - Hash: [32]byte{2}, - Number: 2, - } - time.Sleep(250 * time.Millisecond) - overseer.sendActiveLeavesUpdate(ActiveLeavesUpdate{Activated: activedLeaf1}, subSystem1) - time.Sleep(400 * time.Millisecond) - overseer.sendActiveLeavesUpdate(ActiveLeavesUpdate{Activated: activedLeaf2}, subSystem2) - // let subsystems run for a bit - time.Sleep(3000 * time.Millisecond) - - err = overseer.Stop() - require.NoError(t, err) - - fmt.Printf("overseer stopped\n") - <-done -} - -func TestStart2SubsytemsActivate2Same(t *testing.T) { - overseer := NewOverseer() - require.NotNil(t, overseer) - - subSystem1 := &TestSubsystem{name: "subSystem1"} - subSystem2 := &TestSubsystem{name: "subSystem2"} - - overseerToSubSystem1 := overseer.RegisterSubsystem(subSystem1) - overseerToSubSystem2 := overseer.RegisterSubsystem(subSystem2) + for { + select { + case msg := <-overseerToSubSystem1: + if msg == nil { + continue + } + + _, ok := msg.(parachaintypes.BlockFinalizedSignal) + if ok { + finalizedCounter.Add(1) + } + + _, ok = msg.(parachaintypes.ActiveLeavesUpdateSignal) + if ok { + importedCounter.Add(1) + } + case msg := <-overseerToSubSystem2: + if msg == nil { + continue + } + + _, ok := msg.(parachaintypes.BlockFinalizedSignal) + if ok { + finalizedCounter.Add(1) + } + + _, ok = msg.(parachaintypes.ActiveLeavesUpdateSignal) + if ok { + importedCounter.Add(1) + } + } - go func() { - <-overseerToSubSystem1 - <-overseerToSubSystem2 + } }() err := overseer.Start() require.NoError(t, err) - done := make(chan struct{}) - go func() { - for errC := range overseer.errChan { - fmt.Printf("overseer start error: %v\n", errC) - } - close(done) - }() + finalizedNotifierChan <- &types.FinalisationInfo{} + importedBlockNotiferChan <- &types.Block{} - activedLeaf := ActivatedLeaf{ - Hash: [32]byte{1}, - Number: 1, - } - time.Sleep(300 * time.Millisecond) - overseer.sendActiveLeavesUpdate(ActiveLeavesUpdate{Activated: activedLeaf}, subSystem1) - time.Sleep(400 * time.Millisecond) - overseer.sendActiveLeavesUpdate(ActiveLeavesUpdate{Activated: activedLeaf}, subSystem2) - // let subsystems run for a bit - time.Sleep(2000 * time.Millisecond) + time.Sleep(1000 * time.Millisecond) err = overseer.Stop() require.NoError(t, err) - fmt.Printf("overseer stopped\n") - <-done + require.Equal(t, int32(2), finalizedCounter.Load()) + require.Equal(t, int32(2), importedCounter.Load()) } diff --git a/dot/parachain/overseer/types.go b/dot/parachain/overseer/types.go index ccc3fdb0fb1..3e2d1d61504 100644 --- a/dot/parachain/overseer/types.go +++ b/dot/parachain/overseer/types.go @@ -7,25 +7,14 @@ import ( "context" parachaintypes "github.com/ChainSafe/gossamer/dot/parachain/types" - "github.com/ChainSafe/gossamer/lib/common" ) -// ActivatedLeaf is a parachain head which we care to work on. -type ActivatedLeaf struct { - Hash common.Hash - Number uint32 -} - -// ActiveLeavesUpdate changes in the set of active leaves: the parachain heads which we care to work on. -// -// note: activated field indicates deltas, not complete sets. -type ActiveLeavesUpdate struct { - Activated ActivatedLeaf -} - // Subsystem is an interface for subsystems to be registered with the overseer. type Subsystem interface { // Run runs the subsystem. Run(ctx context.Context, OverseerToSubSystem chan any, SubSystemToOverseer chan any) error Name() parachaintypes.SubSystemName + ProcessActiveLeavesUpdateSignal() + ProcessBlockFinalizedSignal() + Stop() } diff --git a/dot/parachain/service.go b/dot/parachain/service.go index 338c0f5cb9f..9740979b711 100644 --- a/dot/parachain/service.go +++ b/dot/parachain/service.go @@ -34,7 +34,7 @@ type Service struct { var logger = log.NewFromGlobal(log.AddContext("pkg", "parachain")) func NewService(net Network, forkID string, st *state.Service) (*Service, error) { - overseer := overseer.NewOverseer() + overseer := overseer.NewOverseer(st.Block) genesisHash := st.Block.GenesisHash() availabilityStore, err := availability_store.Register(overseer.SubsystemsToOverseer, st) @@ -109,7 +109,7 @@ func (Service) Stop() error { // main loop of parachain service func (s Service) run() { - overseer := overseer.NewOverseer() + overseer := s.overseer candidateBacking := backing.New(overseer.SubsystemsToOverseer) candidateBacking.OverseerToSubSystem = overseer.RegisterSubsystem(candidateBacking) diff --git a/dot/parachain/types/overseer_signals.go b/dot/parachain/types/overseer_signals.go new file mode 100644 index 00000000000..2c5d1435709 --- /dev/null +++ b/dot/parachain/types/overseer_signals.go @@ -0,0 +1,30 @@ +package parachaintypes + +import ( + "errors" + + "github.com/ChainSafe/gossamer/lib/common" +) + +var ErrUnknownOverseerMessage = errors.New("unknown overseer message type") + +// ActivatedLeaf is a parachain head which we care to work on. +type ActivatedLeaf struct { + Hash common.Hash + Number uint32 +} + +// ActiveLeavesUpdateSignal changes in the set of active leaves: the parachain heads which we care to work on. +// +// note: activated field indicates deltas, not complete sets. +type ActiveLeavesUpdateSignal struct { + Activated *ActivatedLeaf + // Relay chain block hashes no longer of interest. + Deactivated []common.Hash +} + +// BlockFinalized signal is used to inform subsystems of a finalized block. +type BlockFinalizedSignal struct { + Hash common.Hash + BlockNumber uint32 +}