Skip to content

Commit

Permalink
feat(dot/parachain): added overseer signals (#3638)
Browse files Browse the repository at this point in the history
- Added ActiveLeavesUpdate and BlockFinalized overseer signals.
- Connected parachain service with a block state. Using this block state
  parachain service would be notified when a new relay chain block gets
imported or finalized.
- On receiving these signals overseer would update its active leaves and
  broadcast an ActiveLeavesUpdate or/and BlockFinalized signal to all
subsystems
- Added `ProcessActiveLeavesUpdateSignal` and `ProcessBlockFinalizedSignal` method to the subsystem interface, all
  subsystems will have to implement this method based on their
requirements to react to these signals
  • Loading branch information
kishansagathiya authored and timwu20 committed Jun 15, 2024
1 parent 6bbdb40 commit 19f1348
Show file tree
Hide file tree
Showing 11 changed files with 474 additions and 199 deletions.
125 changes: 83 additions & 42 deletions dot/parachain/availability-store/availability_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"encoding/json"
"errors"
"fmt"
"sync"

parachaintypes "github.com/ChainSafe/gossamer/dot/parachain/types"
"github.com/ChainSafe/gossamer/internal/database"
Expand All @@ -28,6 +29,10 @@ const (

// AvailabilityStoreSubsystem is the struct that holds subsystem data for the availability store
type AvailabilityStoreSubsystem struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup

SubSystemToOverseer chan<- any
OverseerToSubSystem <-chan any
availabilityStore AvailabilityStore
Expand Down Expand Up @@ -170,7 +175,10 @@ func uint32ToBytes(value uint32) []byte {
// Run runs the availability store subsystem
func (av *AvailabilityStoreSubsystem) Run(ctx context.Context, OverseerToSubsystem chan any,
SubsystemToOverseer chan any) error {
av.processMessages()

av.wg.Add(1)
go av.processMessages()

return nil
}

Expand All @@ -180,53 +188,81 @@ func (*AvailabilityStoreSubsystem) Name() parachaintypes.SubSystemName {
}

func (av *AvailabilityStoreSubsystem) processMessages() {
for msg := range av.OverseerToSubSystem {
logger.Debugf("received message %v", msg)
switch msg := msg.(type) {
case QueryAvailableData:
err := av.handleQueryAvailableData(msg)
if err != nil {
logger.Errorf("failed to handle available data: %w", err)
}
case QueryDataAvailability:
err := av.handleQueryDataAvailability(msg)
if err != nil {
logger.Errorf("failed to handle query data availability: %w", err)
}
case QueryChunk:
err := av.handleQueryChunk(msg)
if err != nil {
logger.Errorf("failed to handle query chunk: %w", err)
}
case QueryChunkSize:
err := av.handleQueryChunkSize(msg)
if err != nil {
logger.Errorf("failed to handle query chunk size: %w", err)
}
case QueryAllChunks:
err := av.handleQueryAllChunks(msg)
if err != nil {
logger.Errorf("failed to handle query all chunks: %w", err)
}
case QueryChunkAvailability:
err := av.handleQueryChunkAvailability(msg)
if err != nil {
logger.Errorf("failed to handle query chunk availability: %w", err)
}
case StoreChunk:
err := av.handleStoreChunk(msg)
if err != nil {
logger.Errorf("failed to handle store chunk: %w", err)
for {
select {
case msg := <-av.OverseerToSubSystem:
logger.Debugf("received message %v", msg)
switch msg := msg.(type) {
case QueryAvailableData:
err := av.handleQueryAvailableData(msg)
if err != nil {
logger.Errorf("failed to handle available data: %w", err)
}
case QueryDataAvailability:
err := av.handleQueryDataAvailability(msg)
if err != nil {
logger.Errorf("failed to handle query data availability: %w", err)
}
case QueryChunk:
err := av.handleQueryChunk(msg)
if err != nil {
logger.Errorf("failed to handle query chunk: %w", err)
}
case QueryChunkSize:
err := av.handleQueryChunkSize(msg)
if err != nil {
logger.Errorf("failed to handle query chunk size: %w", err)
}
case QueryAllChunks:
err := av.handleQueryAllChunks(msg)
if err != nil {
logger.Errorf("failed to handle query all chunks: %w", err)
}
case QueryChunkAvailability:
err := av.handleQueryChunkAvailability(msg)
if err != nil {
logger.Errorf("failed to handle query chunk availability: %w", err)
}
case StoreChunk:
err := av.handleStoreChunk(msg)
if err != nil {
logger.Errorf("failed to handle store chunk: %w", err)
}
case StoreAvailableData:
err := av.handleStoreAvailableData(msg)
if err != nil {
logger.Errorf("failed to handle store available data: %w", err)
}

case parachaintypes.ActiveLeavesUpdateSignal:
av.ProcessActiveLeavesUpdateSignal()

case parachaintypes.BlockFinalizedSignal:
av.ProcessBlockFinalizedSignal()

default:
logger.Error(parachaintypes.ErrUnknownOverseerMessage.Error())
}
case StoreAvailableData:
err := av.handleStoreAvailableData(msg)
if err != nil {
logger.Errorf("failed to handle store available data: %w", err)

case <-av.ctx.Done():
if err := av.ctx.Err(); err != nil {
logger.Errorf("ctx error: %v\n", err)
}
av.wg.Done()
return
}

}
}

func (av *AvailabilityStoreSubsystem) ProcessActiveLeavesUpdateSignal() {
// TODO: #3630
}

func (av *AvailabilityStoreSubsystem) ProcessBlockFinalizedSignal() {
// TODO: #3630
}

func (av *AvailabilityStoreSubsystem) handleQueryAvailableData(msg QueryAvailableData) error {
result, err := av.availabilityStore.loadAvailableData(msg.CandidateHash)
if err != nil {
Expand Down Expand Up @@ -333,3 +369,8 @@ func (av *AvailabilityStoreSubsystem) handleStoreAvailableData(msg StoreAvailabl
msg.Sender <- err // TODO: determine how to replicate Rust's Result type
return nil
}

func (av *AvailabilityStoreSubsystem) Stop() {
av.cancel()
av.wg.Wait()
}
3 changes: 3 additions & 0 deletions dot/parachain/availability-store/availability_store_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright 2023 ChainSafe Systems (ON)
// SPDX-License-Identifier: LGPL-3.0-only

package availabilitystore

import (
Expand Down
72 changes: 51 additions & 21 deletions dot/parachain/backing/candidate_backing.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package backing

import (
"context"
"sync"

parachaintypes "github.com/ChainSafe/gossamer/dot/parachain/types"
"github.com/ChainSafe/gossamer/internal/log"
Expand All @@ -14,6 +15,10 @@ import (
var logger = log.NewFromGlobal(log.AddContext("pkg", "parachain-candidate-backing"))

type CandidateBacking struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup

SubSystemToOverseer chan<- any
OverseerToSubSystem <-chan any
}
Expand Down Expand Up @@ -72,39 +77,59 @@ func (cb *CandidateBacking) Run(ctx context.Context, overseerToSubSystem chan an
// other backing related overseer message.
// This would become more clear after we complete processMessages function. It would give us clarity
// if we need background_validation_rx or background_validation_tx, as done in rust.
cb.processMessages()

cb.wg.Add(1)
go cb.processMessages()

return nil
}

func (*CandidateBacking) Name() parachaintypes.SubSystemName {
return parachaintypes.CandidateBacking
}

func (cb *CandidateBacking) ProcessActiveLeavesUpdateSignal() {
// TODO #3644
}

func (cb *CandidateBacking) ProcessBlockFinalizedSignal() {
// TODO #3644
}

func (cb *CandidateBacking) processMessages() {
for msg := range cb.OverseerToSubSystem {
// process these received messages by referencing
// https://github.com/paritytech/polkadot-sdk/blob/769bdd3ff33a291cbc70a800a3830638467e42a2/polkadot/node/core/backing/src/lib.rs#L741
switch msg.(type) {
case ActiveLeavesUpdate:
cb.handleActiveLeavesUpdate()
case GetBackedCandidates:
cb.handleGetBackedCandidates()
case CanSecond:
cb.handleCanSecond()
case Second:
cb.handleSecond()
case Statement:
cb.handleStatement()
default:
logger.Error("unknown message type")
for {
select {
case msg := <-cb.OverseerToSubSystem:
// process these received messages by referencing
// https://github.com/paritytech/polkadot-sdk/blob/769bdd3ff33a291cbc70a800a3830638467e42a2/polkadot/node/core/backing/src/lib.rs#L741
switch msg.(type) {
case GetBackedCandidates:
cb.handleGetBackedCandidates()
case CanSecond:
cb.handleCanSecond()
case Second:
cb.handleSecond()
case Statement:
cb.handleStatement()
case parachaintypes.ActiveLeavesUpdateSignal:
cb.ProcessActiveLeavesUpdateSignal()
case parachaintypes.BlockFinalizedSignal:
cb.ProcessBlockFinalizedSignal()
default:
logger.Error(parachaintypes.ErrUnknownOverseerMessage.Error())
return
}

case <-cb.ctx.Done():
if err := cb.ctx.Err(); err != nil {
logger.Errorf("ctx error: %v\n", err)
}
cb.wg.Done()
return
}
}
}

func (cb *CandidateBacking) handleActiveLeavesUpdate() {
// TODO: Implement this #3503
}

func (cb *CandidateBacking) handleGetBackedCandidates() {
// TODO: Implement this #3504
}
Expand All @@ -121,6 +146,11 @@ func (cb *CandidateBacking) handleStatement() {
// TODO: Implement this #3507
}

func (cb *CandidateBacking) Stop() {
cb.cancel()
cb.wg.Wait()
}

// SignedFullStatementWithPVD represents a signed full statement along with associated Persisted Validation Data (PVD).
type SignedFullStatementWithPVD struct {
SignedFullStatement parachaintypes.UncheckedSignedFullStatement
Expand Down
29 changes: 27 additions & 2 deletions dot/parachain/collator-protocol/validator_side.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ const (
var (
ErrUnexpectedMessageOnCollationProtocol = errors.New("unexpected message on collation protocol")
ErrUnknownPeer = errors.New("unknown peer")
ErrUnknownOverseerMessage = errors.New("unknown overseer message type")
ErrNotExpectedOnValidatorSide = errors.New("message is not expected on the validator side of the protocol")
ErrCollationNotInView = errors.New("collation is not in our view")
ErrPeerIDNotFoundForCollator = errors.New("peer id not found for collator")
Expand Down Expand Up @@ -83,6 +82,11 @@ func (cpvs CollatorProtocolValidatorSide) Run(
cpvs.fetchedCollations = append(cpvs.fetchedCollations, *collation)
}

case <-cpvs.ctx.Done():
if err := cpvs.ctx.Err(); err != nil {
logger.Errorf("ctx error: %v\n", err)
}
return nil
}
}
}
Expand All @@ -91,6 +95,18 @@ func (CollatorProtocolValidatorSide) Name() parachaintypes.SubSystemName {
return parachaintypes.CollationProtocol
}

func (cpvs CollatorProtocolValidatorSide) ProcessActiveLeavesUpdateSignal() {
// NOTE: nothing to do here
}

func (cpvs CollatorProtocolValidatorSide) ProcessBlockFinalizedSignal() {
// NOTE: nothing to do here
}

func (cpvs CollatorProtocolValidatorSide) Stop() {
cpvs.cancel()
}

// requestCollation requests a collation from the network.
// This function will
// - check for duplicate requests
Expand Down Expand Up @@ -291,6 +307,9 @@ type CollationEvent struct {
}

type CollatorProtocolValidatorSide struct {
ctx context.Context
cancel context.CancelFunc

net Network

SubSystemToOverseer chan<- any
Expand Down Expand Up @@ -561,8 +580,14 @@ func (cpvs CollatorProtocolValidatorSide) processMessage(msg any) error {
Value: peerset.ReportBadCollatorValue,
Reason: peerset.ReportBadCollatorReason,
}, peerID)

case parachaintypes.ActiveLeavesUpdateSignal:
cpvs.ProcessActiveLeavesUpdateSignal()
case parachaintypes.BlockFinalizedSignal:
cpvs.ProcessBlockFinalizedSignal()

default:
return ErrUnknownOverseerMessage
return parachaintypes.ErrUnknownOverseerMessage
}

return nil
Expand Down
6 changes: 6 additions & 0 deletions dot/parachain/overseer/mocks_generate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// Copyright 2023 ChainSafe Systems (ON)
// SPDX-License-Identifier: LGPL-3.0-only

package overseer

//go:generate mockgen -destination=mocks_test.go -package=$GOPACKAGE . BlockState
Loading

0 comments on commit 19f1348

Please sign in to comment.