diff --git a/dot/parachain/dispute/comm.go b/dot/parachain/dispute/comm.go new file mode 100644 index 0000000000..dd11ef56a0 --- /dev/null +++ b/dot/parachain/dispute/comm.go @@ -0,0 +1,93 @@ +package dispute + +import ( + "context" + "fmt" + "github.com/ChainSafe/gossamer/dot/parachain/dispute/overseer" + "github.com/ChainSafe/gossamer/dot/parachain/dispute/types" + parachainTypes "github.com/ChainSafe/gossamer/dot/parachain/types" + "time" +) + +const timeout = 10 * time.Second + +func getBlockNumber(overseerChannel chan<- any, receipt parachainTypes.CandidateReceipt) (uint32, error) { + respCh := make(chan any, 1) + relayParent, err := receipt.Hash() + if err != nil { + return 0, fmt.Errorf("get hash: %w", err) + } + + message := overseer.ChainAPIMessage[overseer.BlockNumberRequest]{ + Message: overseer.BlockNumberRequest{Hash: relayParent}, + ResponseChannel: respCh, + } + result, err := call(overseerChannel, message, message.ResponseChannel) + if err != nil { + return 0, fmt.Errorf("send message: %w", err) + } + + blockNumber, ok := result.(uint32) + if !ok { + return 0, fmt.Errorf("unexpected response type: %T", result) + } + return blockNumber, nil +} + +func sendMessage(channel chan<- any, message any) error { + // Send with timeout + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + select { + case channel <- message: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +func call(channel chan<- any, message any, responseChan chan any) (any, error) { + // Send with timeout + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + select { + case channel <- message: + case <-ctx.Done(): + return nil, ctx.Err() + } + + select { + case response := <-responseChan: + return response, nil + case <-ctx.Done(): + return nil, ctx.Err() + } +} + +func sendResult(overseerChannel chan<- any, request ParticipationRequest, outcome types.ParticipationOutcomeType) { + participationOutcome, err := types.NewCustomParticipationOutcomeVDT(outcome) + if err != nil { + logger.Errorf( + "failed to create participation outcome: %s, error: %s", + outcome, + err, + ) + return + } + + statement := ParticipationStatement{ + Session: request.session, + CandidateHash: request.candidateHash, + CandidateReceipt: request.candidateReceipt, + Outcome: participationOutcome, + } + if err := sendMessage(overseerChannel, statement); err != nil { + logger.Errorf( + "failed to send participation statement: %s, error: %s", + statement, + err, + ) + } +} diff --git a/dot/parachain/dispute/coordinator.go b/dot/parachain/dispute/coordinator.go index fc57a000a3..c1d0cae6bb 100644 --- a/dot/parachain/dispute/coordinator.go +++ b/dot/parachain/dispute/coordinator.go @@ -18,21 +18,18 @@ const Window = 6 var logger = log.NewFromGlobal(log.AddContext("parachain", "disputes")) -// CoordinatorSubsystem is the dispute coordinator subsystem interface. -type CoordinatorSubsystem interface { - // Run runs the dispute coordinator subsystem. - Run(context overseer.Context) error -} - -// disputeCoordinator implements the CoordinatorSubsystem interface. -type disputeCoordinator struct { +// Coordinator implements the CoordinatorSubsystem interface. +type Coordinator struct { keystore keystore.Keystore store *overlayBackend runtime parachain.RuntimeInstance + + sender chan<- any + receiver <-chan any } -func (d *disputeCoordinator) Run(context overseer.Context) error { - initResult, err := d.initialize(context) +func (d *Coordinator) Run() error { + initResult, err := d.initialize() if err != nil { return fmt.Errorf("initialize dispute coordinator: %w", err) } @@ -43,7 +40,7 @@ func (d *disputeCoordinator) Run(context overseer.Context) error { Leaf: initResult.activatedLeaf, } - if err := initResult.initialized.Run(context, d.store.inner, &initData); err != nil { + if err := initResult.initialized.Run(d.sender, d.store.inner, &initData); err != nil { return fmt.Errorf("run initialized state: %w", err) } @@ -66,11 +63,11 @@ type initializeResult struct { initialized *Initialized } -func (d *disputeCoordinator) waitForFirstLeaf(context overseer.Context) (*overseer.ActivatedLeaf, error) { +func (d *Coordinator) waitForFirstLeaf() (*overseer.ActivatedLeaf, error) { // TODO: handle other messages for { select { - case overseerMessage := <-context.Receiver: + case overseerMessage := <-d.receiver: switch message := overseerMessage.(type) { case overseer.Signal[overseer.ActiveLeavesUpdate]: return message.Data.Activated, nil @@ -79,16 +76,16 @@ func (d *disputeCoordinator) waitForFirstLeaf(context overseer.Context) (*overse } } -func (d *disputeCoordinator) initialize(context overseer.Context) ( +func (d *Coordinator) initialize() ( *initializeResult, error, ) { - firstLeaf, err := d.waitForFirstLeaf(context) + firstLeaf, err := d.waitForFirstLeaf() if err != nil { return nil, fmt.Errorf("wait for first leaf: %w", err) } - startupData, err := d.handleStartup(context, firstLeaf) + startupData, err := d.handleStartup(firstLeaf) if err != nil { return nil, fmt.Errorf("handle startup: %w", err) } @@ -103,7 +100,7 @@ func (d *disputeCoordinator) initialize(context overseer.Context) ( participation: startupData.participation, votes: startupData.votes, activatedLeaf: firstLeaf, - initialized: NewInitializedState(context.Sender, + initialized: NewInitializedState(d.sender, d.runtime, startupData.spamSlots, &startupData.orderingProvider, @@ -113,7 +110,7 @@ func (d *disputeCoordinator) initialize(context overseer.Context) ( }, nil } -func (d *disputeCoordinator) handleStartup(context overseer.Context, initialHead *overseer.ActivatedLeaf) ( +func (d *Coordinator) handleStartup(initialHead *overseer.ActivatedLeaf) ( *startupResult, error, ) { @@ -151,7 +148,7 @@ func (d *disputeCoordinator) handleStartup(context overseer.Context, initialHead var participationRequests []ParticipationRequestWithPriority spamDisputes := make(map[unconfirmedKey]*treeset.Set) leafHash := initialHead.Hash - scraper, scrapedVotes, err := scraping.NewChainScraper(context.Sender, d.runtime, initialHead) + scraper, scrapedVotes, err := scraping.NewChainScraper(d.sender, d.runtime, initialHead) if err != nil { return nil, fmt.Errorf("new chain scraper: %w", err) } @@ -202,26 +199,24 @@ func (d *disputeCoordinator) handleStartup(context overseer.Context, initialHead spamDisputes[disputeKey] = treeset.NewWithIntComparator() } spamDisputes[disputeKey].Add(voteState.Votes.VotedIndices()) - } else { - if voteState.Own.VoteMissing() { - logger.Tracef("found valid dispute, with no vote from us on startup - participating. %s") - priority := ParticipationPriorityHigh - if !isIncluded { - priority = ParticipationPriorityBestEffort - } - - participationRequests = append(participationRequests, ParticipationRequestWithPriority{ - request: ParticipationRequest{ - candidateHash: dispute.Comparator.CandidateHash, - candidateReceipt: voteState.Votes.CandidateReceipt, - session: dispute.Comparator.SessionIndex, - }, - priority: priority, - }) - } else { - logger.Tracef("found valid dispute, with vote from us on startup - distributing. %s") - d.sendDisputeMessages(context, *env, voteState) + } else if voteState.Own.VoteMissing() { + logger.Tracef("found valid dispute, with no vote from us on startup - participating. %s") + priority := ParticipationPriorityHigh + if !isIncluded { + priority = ParticipationPriorityBestEffort } + + participationRequests = append(participationRequests, ParticipationRequestWithPriority{ + request: ParticipationRequest{ + candidateHash: dispute.Comparator.CandidateHash, + candidateReceipt: voteState.Votes.CandidateReceipt, + session: dispute.Comparator.SessionIndex, + }, + priority: priority, + }) + } else { + logger.Tracef("found valid dispute, with vote from us on startup - distributing. %s") + d.sendDisputeMessages(*env, voteState) } return true @@ -237,8 +232,7 @@ func (d *disputeCoordinator) handleStartup(context overseer.Context, initialHead }, nil } -func (d *disputeCoordinator) sendDisputeMessages( - context overseer.Context, +func (d *Coordinator) sendDisputeMessages( env types.CandidateEnvironment, voteState types.CandidateVoteState, ) { @@ -279,13 +273,13 @@ func (d *disputeCoordinator) sendDisputeMessages( continue } - if err := context.Sender.SendMessage(disputeMessage); err != nil { + if err := sendMessage(d.sender, disputeMessage); err != nil { logger.Errorf("send dispute message: %s", err) } } } -func NewDisputeCoordinator(path string) (CoordinatorSubsystem, error) { +func NewDisputeCoordinator(path string) (*Coordinator, error) { db, err := badger.Open(badger.DefaultOptions(path)) if err != nil { return nil, fmt.Errorf("open badger db: %w", err) @@ -294,7 +288,7 @@ func NewDisputeCoordinator(path string) (CoordinatorSubsystem, error) { dbBackend := NewDBBackend(db) backend := newOverlayBackend(dbBackend) - return &disputeCoordinator{ + return &Coordinator{ store: backend, }, nil } diff --git a/dot/parachain/dispute/coordinator_test.go b/dot/parachain/dispute/coordinator_test.go index 855a2c22f7..05c420134c 100644 --- a/dot/parachain/dispute/coordinator_test.go +++ b/dot/parachain/dispute/coordinator_test.go @@ -15,10 +15,6 @@ import ( "testing" ) -type VirtualOverseer struct { - sender *MockSender -} - type TestState struct { validators []keystore.KeyPair validatorPublic []parachaintypes.ValidatorID @@ -30,7 +26,7 @@ type TestState struct { lastBlock common.Hash knownSession *parachaintypes.SessionIndex db database.Database - sender *MockSender + overseer chan any runtime *MockRuntimeInstance } @@ -97,9 +93,6 @@ func newTestState(t *testing.T) *TestState { headers[lastBlock] = genesisHeader blockNumToHeader[0] = lastBlock - ctrl := gomock.NewController(t) - sender := NewMockSender(ctrl) - return &TestState{ validators: validators, validatorPublic: validatorPublic, @@ -111,7 +104,7 @@ func newTestState(t *testing.T) *TestState { lastBlock: lastBlock, knownSession: nil, db: db, - sender: sender, + overseer: make(chan any, 1), } } @@ -174,7 +167,7 @@ func (ts *TestState) activateLeafAtSession(t *testing.T, Number: uint32(blockNumber), Status: overseer.LeafStatusFresh, } - err := ts.sender.SendMessage(overseer.Signal[overseer.ActiveLeavesUpdate]{ + err := sendMessage(ts.overseer, overseer.Signal[overseer.ActiveLeavesUpdate]{ Data: overseer.ActiveLeavesUpdate{Activated: &activatedLeaf}, }) require.NoError(t, err) @@ -260,7 +253,7 @@ func (ts *TestState) mockResumeSyncWithEvents(t *testing.T, Number: uint32(i), Status: overseer.LeafStatusFresh, } - err := ts.sender.SendMessage(overseer.Signal[overseer.ActiveLeavesUpdate]{ + err := sendMessage(ts.overseer, overseer.Signal[overseer.ActiveLeavesUpdate]{ Data: overseer.ActiveLeavesUpdate{Activated: &activatedLeaf}, }) require.NoError(t, err) diff --git a/dot/parachain/dispute/initialized.go b/dot/parachain/dispute/initialized.go index 2af9981f93..39baca0836 100644 --- a/dot/parachain/dispute/initialized.go +++ b/dot/parachain/dispute/initialized.go @@ -55,9 +55,9 @@ func (m MaybeCandidateReceipt) Hash() (common.Hash, error) { return m.CandidateHash, nil } -func (i *Initialized) Run(context overseer.Context, backend DBBackend, initialData *InitialData) error { +func (i *Initialized) Run(overseerChannel chan<- any, backend DBBackend, initialData *InitialData) error { for { - if err := i.runUntilError(context, backend, initialData); err == nil { + if err := i.runUntilError(overseerChannel, backend, initialData); err == nil { logger.Info("received `Conclude` signal, exiting") return nil } else { @@ -66,16 +66,16 @@ func (i *Initialized) Run(context overseer.Context, backend DBBackend, initialDa } } -func (i *Initialized) runUntilError(context overseer.Context, backend DBBackend, initialData *InitialData) error { +func (i *Initialized) runUntilError(overseerChannel chan<- any, backend DBBackend, initialData *InitialData) error { if initialData != nil { for _, p := range initialData.Participation { - if err := i.Participation.Queue(context, p.request, p.priority); err != nil { + if err := i.Participation.Queue(overseerChannel, p.request, p.priority); err != nil { return fmt.Errorf("queue participation request: %w", err) } } overlayDB := newOverlayBackend(backend) - if err := i.ProcessChainImportBacklog(context, + if err := i.ProcessChainImportBacklog(overseerChannel, overlayDB, initialData.Votes, uint64(time.Now().Unix()), @@ -101,20 +101,20 @@ func (i *Initialized) runUntilError(context overseer.Context, backend DBBackend, switch message := msg.(type) { case overseer.Signal[overseer.ActiveLeavesUpdate]: logger.Tracef("OverseerSignal::ActiveLeavesUpdate") - if err := i.ProcessActiveLeavesUpdate(context, + if err := i.ProcessActiveLeavesUpdate(overseerChannel, overlayDB, message.Data, uint64(time.Now().Unix())); err != nil { return fmt.Errorf("process active leaves update: %w", err) } - case overseer.Signal[overseer.Block]: + case overseer.Signal[overseer.BlockFinalized]: logger.Tracef("OverseerSignal::BlockFinalised") i.Scraper.ProcessFinalisedBlock(message.Data.Number) default: var err error - confirmWrite, err = i.HandleIncoming(context, + confirmWrite, err = i.HandleIncoming(overseerChannel, overlayDB, message, uint64(time.Now().Unix()), @@ -139,18 +139,18 @@ func (i *Initialized) runUntilError(context overseer.Context, backend DBBackend, } func (i *Initialized) ProcessActiveLeavesUpdate( - context overseer.Context, + overseerChannel chan<- any, backend OverlayBackend, update overseer.ActiveLeavesUpdate, now uint64, ) error { logger.Tracef("Processing ActiveLeavesUpdate") - scrappedUpdates, err := i.Scraper.ProcessActiveLeavesUpdate(context.Sender, update) + scrappedUpdates, err := i.Scraper.ProcessActiveLeavesUpdate(overseerChannel, update) if err != nil { return fmt.Errorf("scraper: process active leaves update: %w", err) } - i.Participation.BumpPriority(context, scrappedUpdates.IncludedReceipts) + i.Participation.BumpPriority(overseerChannel, scrappedUpdates.IncludedReceipts) i.Participation.ProcessActiveLeavesUpdate(update) if update.Activated != nil { @@ -194,7 +194,7 @@ func (i *Initialized) ProcessActiveLeavesUpdate( logger.Tracef("will process %v onchain votes", len(scrappedUpdates.OnChainVotes)) - if err := i.ProcessChainImportBacklog(context, + if err := i.ProcessChainImportBacklog(overseerChannel, backend, scrappedUpdates.OnChainVotes, now, @@ -209,7 +209,7 @@ func (i *Initialized) ProcessActiveLeavesUpdate( } func (i *Initialized) ProcessChainImportBacklog( - context overseer.Context, + overseerChannel chan<- any, backend OverlayBackend, newVotes []parachainTypes.ScrapedOnChainVotes, now uint64, @@ -227,7 +227,7 @@ func (i *Initialized) ProcessChainImportBacklog( for k := 0; k < importRange; k++ { votes := chainImportBacklog.PopFront() - if err := i.ProcessOnChainVotes(context, backend, votes, now, blockHash); err != nil { + if err := i.ProcessOnChainVotes(overseerChannel, backend, votes, now, blockHash); err != nil { logger.Errorf("skipping scraping block due to error: %w", err) } } @@ -237,7 +237,7 @@ func (i *Initialized) ProcessChainImportBacklog( } func (i *Initialized) ProcessOnChainVotes( - context overseer.Context, + overseerChannel chan<- any, backend OverlayBackend, votes parachainTypes.ScrapedOnChainVotes, now uint64, @@ -358,7 +358,7 @@ func (i *Initialized) ProcessOnChainVotes( candidateReceipt := MaybeCandidateReceipt{ CandidateReceipt: &backingValidators.CandidateReceipt, } - if outcome, err := i.HandleImportStatements(context, + if outcome, err := i.HandleImportStatements(overseerChannel, backend, candidateReceipt, votes.Session, @@ -391,7 +391,7 @@ func (i *Initialized) ProcessOnChainVotes( candidateReceipt := MaybeCandidateReceipt{ CandidateHash: candidateHash, } - if outcome, err := i.HandleImportStatements(context, + if outcome, err := i.HandleImportStatements(overseerChannel, backend, candidateReceipt, votes.Session, @@ -417,7 +417,7 @@ func (i *Initialized) ProcessOnChainVotes( } func (i *Initialized) HandleIncoming( - context overseer.Context, + overseerChannel chan<- any, backend OverlayBackend, msg any, now uint64, @@ -443,7 +443,7 @@ func (i *Initialized) HandleIncoming( message.Data.CandidateHash, valid, ) - if err := i.IssueLocalStatement(context, + if err := i.IssueLocalStatement(overseerChannel, backend, message.Data.CandidateHash, message.Data.CandidateReceipt, @@ -458,7 +458,7 @@ func (i *Initialized) HandleIncoming( candidateReceipt := MaybeCandidateReceipt{ CandidateReceipt: &message.Data.CandidateReceipt, } - outcome, err := i.HandleImportStatements(context, + outcome, err := i.HandleImportStatements(overseerChannel, backend, candidateReceipt, message.Data.Session, @@ -470,8 +470,8 @@ func (i *Initialized) HandleIncoming( } report := func() error { - if message.Data.PendingConfirmation != nil { - if err := message.Data.PendingConfirmation.SendMessage(outcome); err != nil { + if message.ResponseChannel != nil { + if err := sendMessage(message.ResponseChannel, outcome); err != nil { return fmt.Errorf("confirm import statements: %w", err) } } @@ -491,7 +491,7 @@ func (i *Initialized) HandleIncoming( return nil, fmt.Errorf("get recent disputes: %w", err) } - if err := message.Data.Sender.SendMessage(recentDisputes); err != nil { + if err := sendMessage(message.ResponseChannel, recentDisputes); err != nil { return nil, fmt.Errorf("send recent disputes: %w", err) } case types.Message[types.ActiveDisputesMessage]: @@ -501,7 +501,7 @@ func (i *Initialized) HandleIncoming( return nil, fmt.Errorf("get active disputes: %w", err) } - if err := message.Data.Sender.SendMessage(activeDisputes); err != nil { + if err := sendMessage(message.ResponseChannel, activeDisputes); err != nil { return nil, fmt.Errorf("send active disputes: %w", err) } case types.Message[types.QueryCandidateVotesMessage]: @@ -524,12 +524,12 @@ func (i *Initialized) HandleIncoming( }) } - if err := message.Data.Sender.SendMessage(queryOutput); err != nil { + if err := sendMessage(message.ResponseChannel, queryOutput); err != nil { return nil, fmt.Errorf("send candidate votes: %w", err) } case types.Message[types.IssueLocalStatementMessage]: logger.Tracef("HandleIncoming::IssueLocalStatement") - if err := i.IssueLocalStatement(context, + if err := i.IssueLocalStatement(overseerChannel, backend, message.Data.CandidateHash, message.Data.CandidateReceipt, @@ -549,7 +549,7 @@ func (i *Initialized) HandleIncoming( return nil, fmt.Errorf("determine undisputed chain: %w", err) } - if err := message.Data.Tx.SendMessage(undisputedChain); err != nil { + if err := sendMessage(message.ResponseChannel, undisputedChain); err != nil { return nil, fmt.Errorf("send undisputed chain: %w", err) } default: @@ -560,7 +560,7 @@ func (i *Initialized) HandleIncoming( } func (i *Initialized) HandleImportStatements( - context overseer.Context, + overseerChannel chan<- any, backend OverlayBackend, maybeCandidateReceipt MaybeCandidateReceipt, session parachainTypes.SessionIndex, @@ -651,31 +651,28 @@ func (i *Initialized) HandleImportStatements( session, ) - // Use of unbounded channels justified because: - // 1. Only triggered twice per dispute. - // 2. Raising a dispute is costly (requires validation + recovery) by honest nodes, - // dishonest nodes are limited by spam slots. - // 3. Concluding a dispute is even more costly. - // Therefore, it is reasonable to expect a simple vote request to succeed way faster - // than disputes are raised. - // 4. We are waiting (and blocking the whole subsystem) on a response right after - - // therefore even with all else failing we will never have more than - // one message in flight at any given time. - responseChan := make(chan *overseer.ApprovalSignatureResponse, 1) - message := overseer.ApprovalVotingMessage{ - GetApprovalSignature: &overseer.GetApprovalSignatureForCandidate{ + responseChan := make(chan any, 1) + message := overseer.ApprovalVotingMessage[overseer.ApprovalSignatureForCandidate]{ + Message: overseer.ApprovalSignatureForCandidate{ CandidateHash: candidateHash, - ResponseChan: responseChan, }, + ResponseChan: responseChan, } - if err := context.Sender.SendUnboundedMessage(message); err != nil { + + // TODO: we need to send this to a prioritised channel + res, err := call(overseerChannel, message, message.ResponseChan) + if err != nil { logger.Warnf("failed to fetch approval signatures for candidate %s: %s", candidateHash, err, ) importResult = intermediateResult } else { - response := <-responseChan + response, ok := res.(*overseer.ApprovalSignatureResponse) + if !ok { + return InvalidImport, fmt.Errorf("invalid approval signature response") + } + if response.Error != nil { return InvalidImport, fmt.Errorf("approval signature response: %w", response.Error) } @@ -685,7 +682,6 @@ func (i *Initialized) HandleImportStatements( return InvalidImport, fmt.Errorf("import approval votes: %w", err) } - var ok bool importResult, ok = result.(*ImportResultHandler) if !ok { return InvalidImport, fmt.Errorf("invalid import result") @@ -778,7 +774,7 @@ func (i *Initialized) HandleImportStatements( candidateReceipt: newState.Votes.CandidateReceipt, session: session, } - if err := i.Participation.Queue(context, participationRequest, priority); err != nil { + if err := i.Participation.Queue(overseerChannel, participationRequest, priority); err != nil { logger.Errorf("failed to queue participation request: %s", err) } } else { @@ -843,7 +839,7 @@ func (i *Initialized) HandleImportStatements( return InvalidImport, fmt.Errorf("new dispute message: %w", err) } - if err := context.Sender.SendMessage(disputeMessage); err != nil { + if err := sendMessage(overseerChannel, disputeMessage); err != nil { return InvalidImport, fmt.Errorf("send dispute message: %w", err) } } @@ -925,7 +921,7 @@ func (i *Initialized) HandleImportStatements( message := overseer.ChainSelectionMessage{ RevertBlocks: &overseer.RevertBlocksRequest{Blocks: blocks}, } - if err := context.Sender.SendMessage(message); err != nil { + if err := sendMessage(overseerChannel, message); err != nil { return InvalidImport, fmt.Errorf("send revert blocks request: %w", err) } } else { @@ -950,7 +946,7 @@ func (i *Initialized) HandleImportStatements( } func (i *Initialized) IssueLocalStatement( - context overseer.Context, + overseerChannel chan<- any, backend OverlayBackend, candidateHash common.Hash, candidateReceipt parachainTypes.CandidateReceipt, @@ -1029,7 +1025,7 @@ func (i *Initialized) IssueLocalStatement( continue } - if err := context.Sender.SendMessage(disputeMessage); err != nil { + if err := sendMessage(overseerChannel, disputeMessage); err != nil { logger.Warnf("failed to send dispute message for validator index %d: %s", statement.ValidatorIndex, err, @@ -1040,7 +1036,7 @@ func (i *Initialized) IssueLocalStatement( // Do import if len(statements) > 0 { - if outcome, err := i.HandleImportStatements(context, + if outcome, err := i.HandleImportStatements(overseerChannel, backend, MaybeCandidateReceipt{ CandidateReceipt: &candidateReceipt, @@ -1121,7 +1117,7 @@ func (i *Initialized) determineUndisputedChain(backend OverlayBackend, } // NewInitializedState creates a new initialized state. -func NewInitializedState(sender overseer.Sender, +func NewInitializedState(overseerChannel chan<- any, runtime parachainRuntime.RuntimeInstance, spamSlots SpamSlots, scraper *scraping.ChainScraper, @@ -1136,7 +1132,7 @@ func NewInitializedState(sender overseer.Sender, GapsInCache: gapsInCache, ParticipationReceiver: make(chan any), ChainImportBacklog: deque.New[parachainTypes.ScrapedOnChainVotes](), - Participation: NewParticipation(sender, runtime), + Participation: NewParticipation(overseerChannel, runtime), } } diff --git a/dot/parachain/dispute/mocks_overseer_test.go b/dot/parachain/dispute/mocks_overseer_test.go deleted file mode 100644 index b5ac1231ac..0000000000 --- a/dot/parachain/dispute/mocks_overseer_test.go +++ /dev/null @@ -1,76 +0,0 @@ -// Code generated by MockGen. DO NOT EDIT. -// Source: github.com/ChainSafe/gossamer/dot/parachain/dispute/overseer (interfaces: Sender) - -// Package parachain is a generated GoMock package. -package dispute - -import ( - reflect "reflect" - - gomock "github.com/golang/mock/gomock" -) - -// MockSender is a mock of Sender interface. -type MockSender struct { - ctrl *gomock.Controller - recorder *MockSenderMockRecorder -} - -// MockSenderMockRecorder is the mock recorder for MockSender. -type MockSenderMockRecorder struct { - mock *MockSender -} - -// NewMockSender creates a new mock instance. -func NewMockSender(ctrl *gomock.Controller) *MockSender { - mock := &MockSender{ctrl: ctrl} - mock.recorder = &MockSenderMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockSender) EXPECT() *MockSenderMockRecorder { - return m.recorder -} - -// Feed mocks base method. -func (m *MockSender) Feed(arg0 interface{}) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Feed", arg0) - ret0, _ := ret[0].(error) - return ret0 -} - -// Feed indicates an expected call of Feed. -func (mr *MockSenderMockRecorder) Feed(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Feed", reflect.TypeOf((*MockSender)(nil).Feed), arg0) -} - -// SendMessage mocks base method. -func (m *MockSender) SendMessage(arg0 interface{}) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SendMessage", arg0) - ret0, _ := ret[0].(error) - return ret0 -} - -// SendMessage indicates an expected call of SendMessage. -func (mr *MockSenderMockRecorder) SendMessage(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMessage", reflect.TypeOf((*MockSender)(nil).SendMessage), arg0) -} - -// SendUnboundedMessage mocks base method. -func (m *MockSender) SendUnboundedMessage(arg0 interface{}) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SendUnboundedMessage", arg0) - ret0, _ := ret[0].(error) - return ret0 -} - -// SendUnboundedMessage indicates an expected call of SendUnboundedMessage. -func (mr *MockSenderMockRecorder) SendUnboundedMessage(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendUnboundedMessage", reflect.TypeOf((*MockSender)(nil).SendUnboundedMessage), arg0) -} diff --git a/dot/parachain/dispute/overseer/context.go b/dot/parachain/dispute/overseer/context.go deleted file mode 100644 index cd65cd3feb..0000000000 --- a/dot/parachain/dispute/overseer/context.go +++ /dev/null @@ -1,20 +0,0 @@ -// TODO: This is just a temporary file to complete the participation module. The type definitions here are not complete. -// We need to remove this file once we have implemented the overseer. - -package overseer - -type Sender interface { - SendMessage(msg any) error - Feed(msg any) error - SendUnboundedMessage(msg any) error -} - -type Context struct { - Sender Sender - Receiver chan any -} - -type Signal[data any] struct { - Data data - Conclude bool -} diff --git a/dot/parachain/dispute/overseer/leaf.go b/dot/parachain/dispute/overseer/leaf.go deleted file mode 100644 index 409deb324f..0000000000 --- a/dot/parachain/dispute/overseer/leaf.go +++ /dev/null @@ -1,23 +0,0 @@ -// TODO: This is just a temporary file to complete the participation module. The type definitions here are not complete. -// We need to remove this file once we have implemented the leaf update interfaces - -package overseer - -import "github.com/ChainSafe/gossamer/lib/common" - -type LeafStatus uint - -const ( - LeafStatusFresh LeafStatus = iota - LeafStatusStale -) - -type ActivatedLeaf struct { - Hash common.Hash - Number uint32 - Status LeafStatus -} - -type ActiveLeavesUpdate struct { - Activated *ActivatedLeaf -} diff --git a/dot/parachain/dispute/overseer/message.go b/dot/parachain/dispute/overseer/message.go index f6a3685e9b..1984560355 100644 --- a/dot/parachain/dispute/overseer/message.go +++ b/dot/parachain/dispute/overseer/message.go @@ -49,7 +49,7 @@ type AvailabilityRecoveryMessage struct { CandidateReceipt parachainTypes.CandidateReceipt SessionIndex parachainTypes.SessionIndex GroupIndex *uint32 - ResponseChannel chan AvailabilityRecoveryResponse + ResponseChannel chan any } type PvfExecTimeoutKind uint32 @@ -63,7 +63,7 @@ type ValidateFromChainState struct { CandidateReceipt parachainTypes.CandidateReceipt PoV []byte PvfExecTimeoutKind PvfExecTimeoutKind - ResponseChannel chan ValidationResult + ResponseChannel chan any } type ValidValidationResult struct { @@ -105,25 +105,6 @@ type AncestorsRequest struct { K uint32 } -type ApprovalSignature struct { - ValidatorIndex parachainTypes.ValidatorIndex - ValidatorSignature parachainTypes.ValidatorSignature -} - -type ApprovalSignatureResponse struct { - Signature []ApprovalSignature - Error error -} - -type GetApprovalSignatureForCandidate struct { - CandidateHash common.Hash - ResponseChan chan *ApprovalSignatureResponse -} - -type ApprovalVotingMessage struct { - GetApprovalSignature *GetApprovalSignatureForCandidate -} - // Block represents a block type Block struct { Number uint32 @@ -145,3 +126,22 @@ type RevertBlocksRequest struct { type ChainSelectionMessage struct { RevertBlocks *RevertBlocksRequest } + +type ApprovalVotingMessage[message any] struct { + Message message + ResponseChan chan any +} + +type ApprovalSignature struct { + ValidatorIndex parachainTypes.ValidatorIndex + ValidatorSignature parachainTypes.ValidatorSignature +} + +type ApprovalSignatureResponse struct { + Signature []ApprovalSignature + Error error +} + +type ApprovalSignatureForCandidate struct { + CandidateHash common.Hash +} diff --git a/dot/parachain/dispute/overseer/signal.go b/dot/parachain/dispute/overseer/signal.go new file mode 100644 index 0000000000..73c181ce2d --- /dev/null +++ b/dot/parachain/dispute/overseer/signal.go @@ -0,0 +1,43 @@ +package overseer + +import "github.com/ChainSafe/gossamer/lib/common" + +// Signal represents a signal sent from overseer +type Signal[data any] struct { + Data data + Conclude bool +} + +// LeafStatus represents the status of an activated leaf +type LeafStatus uint + +const ( + // LeafStatusFresh A leaf is fresh when it's the first time the leaf has been encountered. + // Most leaves should be fresh. + LeafStatusFresh LeafStatus = iota + // LeafStatusStale A leaf is stale when it's encountered for a subsequent time. This will happen + // when the chain is reverted or the fork-choice rule abandons some chain. + LeafStatusStale +) + +// ActivatedLeaf represents an activated leaf +type ActivatedLeaf struct { + Hash common.Hash + Number uint32 + Status LeafStatus + // TODO: add more fields +} + +// ActiveLeavesUpdate Changes in the set of active leaves: the parachain heads which we care to work on. +// +// Note that the activated and deactivated fields indicate deltas, not complete sets. +// +// Subsystems should adjust their jobs to start and stop work on appropriate block hashes. +type ActiveLeavesUpdate struct { + Activated *ActivatedLeaf +} + +// BlockFinalized subsystem is informed of a finalized block by its block hash and number. +type BlockFinalized struct { + Block +} diff --git a/dot/parachain/dispute/participation.go b/dot/parachain/dispute/participation.go index 84b96920aa..ec591a6441 100644 --- a/dot/parachain/dispute/participation.go +++ b/dot/parachain/dispute/participation.go @@ -1,13 +1,10 @@ package dispute import ( - "context" "fmt" + parachain "github.com/ChainSafe/gossamer/dot/parachain/runtime" "sync" "sync/atomic" - "time" - - parachain "github.com/ChainSafe/gossamer/dot/parachain/runtime" "github.com/ChainSafe/gossamer/dot/parachain/dispute/overseer" "github.com/ChainSafe/gossamer/dot/parachain/dispute/types" @@ -56,7 +53,7 @@ type ParticipationStatement struct { // Participation keeps track of the disputes we need to participate in. type Participation interface { // Queue a dispute for the node to participate in - Queue(context overseer.Context, request ParticipationRequest, priority ParticipationPriority) error + Queue(overseerChannel chan<- any, request ParticipationRequest, priority ParticipationPriority) error // Clear clears a participation request. This is called when we have the dispute result. Clear(candidateHash common.Hash) error @@ -65,7 +62,7 @@ type Participation interface { ProcessActiveLeavesUpdate(update overseer.ActiveLeavesUpdate) // BumpPriority bumps the priority for the given receipts - BumpPriority(ctx overseer.Context, receipts []parachainTypes.CandidateReceipt) + BumpPriority(overseerChannel chan<- any, receipts []parachainTypes.CandidateReceipt) } type block struct { @@ -79,17 +76,18 @@ type ParticipationHandler struct { workers atomic.Int32 queue Queue - sender overseer.Sender // TODO: revisit this once we have the overseer recentBlock *block runtime parachain.RuntimeInstance + overseer chan<- any + //TODO: metrics } const MaxParallelParticipation = 3 -func (p *ParticipationHandler) Queue(ctx overseer.Context, +func (p *ParticipationHandler) Queue(overseerChannel chan<- any, request ParticipationRequest, priority ParticipationPriority, ) error { @@ -103,7 +101,7 @@ func (p *ParticipationHandler) Queue(ctx overseer.Context, return nil } - blockNumber, err := getBlockNumber(ctx.Sender, request.candidateReceipt) + blockNumber, err := getBlockNumber(overseerChannel, request.candidateReceipt) if err != nil { return fmt.Errorf("get block number: %w", err) } @@ -155,9 +153,9 @@ func (p *ParticipationHandler) ProcessActiveLeavesUpdate(update overseer.ActiveL p.dequeueUntilCapacity(update.Activated.Hash) } -func (p *ParticipationHandler) BumpPriority(ctx overseer.Context, receipts []parachainTypes.CandidateReceipt) { +func (p *ParticipationHandler) BumpPriority(overseerChannel chan<- any, receipts []parachainTypes.CandidateReceipt) { for _, receipt := range receipts { - blockNumber, err := getBlockNumber(ctx.Sender, receipt) + blockNumber, err := getBlockNumber(overseerChannel, receipt) if err != nil { logger.Errorf( "failed to get block number. CommitmentsHash: %s, Error: %s", @@ -223,35 +221,34 @@ func (p *ParticipationHandler) forkParticipation(request *ParticipationRequest, } func (p *ParticipationHandler) participate(blockHash common.Hash, request ParticipationRequest) error { - // get available data from the sender - availableDataTx := make(chan overseer.AvailabilityRecoveryResponse, 1) - if err := p.sender.SendMessage(overseer.AvailabilityRecoveryMessage{ + // get available data from the overseer + respCh := make(chan any, 1) + message := overseer.AvailabilityRecoveryMessage{ CandidateReceipt: request.candidateReceipt, SessionIndex: request.session, GroupIndex: nil, - ResponseChannel: availableDataTx, - }); err != nil { + ResponseChannel: respCh, + } + res, err := call(p.overseer, message, message.ResponseChannel) + if err != nil { return fmt.Errorf("send availability recovery message: %w", err) } - recoverDataCtx, recoverDataCancel := context.WithTimeout(context.Background(), 10*time.Second) - defer recoverDataCancel() - var availableData overseer.AvailabilityRecoveryResponse - select { - case <-recoverDataCtx.Done(): - return recoverDataCtx.Err() // Return the context error if timeout exceeded - case availableData = <-availableDataTx: - if availableData.Error != nil { - switch *availableData.Error { - case overseer.RecoveryErrorInvalid: - sendResult(p.sender, request, types.ParticipationOutcomeInvalid) - return fmt.Errorf("invalid available data: %s", availableData.Error.String()) - case overseer.RecoveryErrorUnavailable: - sendResult(p.sender, request, types.ParticipationOutcomeUnAvailable) - return fmt.Errorf("unavailable data: %s", availableData.Error.String()) - default: - return fmt.Errorf("unexpected recovery error: %d", availableData.Error) - } + availableData, ok := res.(overseer.AvailabilityRecoveryResponse) + if !ok { + return fmt.Errorf("unexpected response type: %T", res) + } + + if availableData.Error != nil { + switch *availableData.Error { + case overseer.RecoveryErrorInvalid: + sendResult(p.overseer, request, types.ParticipationOutcomeInvalid) + return fmt.Errorf("invalid available data: %s", availableData.Error.String()) + case overseer.RecoveryErrorUnavailable: + sendResult(p.overseer, request, types.ParticipationOutcomeUnAvailable) + return fmt.Errorf("unavailable data: %s", availableData.Error.String()) + default: + return fmt.Errorf("unexpected recovery error: %d", availableData.Error) } } @@ -259,7 +256,7 @@ func (p *ParticipationHandler) participate(blockHash common.Hash, request Partic blockHash, request.candidateReceipt.Descriptor.ValidationCodeHash) if err != nil || validationCode == nil { - sendResult(p.sender, request, types.ParticipationOutcomeError) + sendResult(p.overseer, request, types.ParticipationOutcomeError) return fmt.Errorf("failed to get validation code: %w", err) } @@ -268,109 +265,49 @@ func (p *ParticipationHandler) participate(blockHash common.Hash, request Partic "validation code is empty. CandidateHash: %s", request.candidateHash.String(), ) - sendResult(p.sender, request, types.ParticipationOutcomeError) + sendResult(p.overseer, request, types.ParticipationOutcomeError) return fmt.Errorf("validation code is empty") } - validateCtx, validateCancel := context.WithTimeout(context.Background(), 10*time.Second) - defer validateCancel() - // validate the request and send the result - tx := make(chan overseer.ValidationResult, 1) - if err := p.sender.SendMessage(overseer.ValidateFromChainState{ + respChan := make(chan any, 1) + validateMessage := overseer.ValidateFromChainState{ CandidateReceipt: request.candidateReceipt, PoV: availableData.AvailableData.POV, PvfExecTimeoutKind: overseer.PvfExecTimeoutKindApproval, - ResponseChannel: tx, - }); err != nil { - sendResult(p.sender, request, types.ParticipationOutcomeError) + ResponseChannel: respChan, + } + res, err = call(p.overseer, validateMessage, validateMessage.ResponseChannel) + if err != nil { + sendResult(p.overseer, request, types.ParticipationOutcomeError) + } + result, ok := res.(overseer.ValidationResult) + if !ok { + sendResult(p.overseer, request, types.ParticipationOutcomeError) + return fmt.Errorf("unexpected response type: %T", res) } - select { - case <-validateCtx.Done(): - return validateCtx.Err() - case result := <-tx: - if result.Error != nil { - // validation failed - sendResult(p.sender, request, types.ParticipationOutcomeError) - return fmt.Errorf("validation failed: %s", result.Error) - } - - if !result.IsValid { - sendResult(p.sender, request, types.ParticipationOutcomeInvalid) - return fmt.Errorf("validation failed: %s", result.Error) - } - - sendResult(p.sender, request, types.ParticipationOutcomeValid) - return nil + if result.Error != nil { + // validation failed + sendResult(p.overseer, request, types.ParticipationOutcomeError) + return fmt.Errorf("validation failed: %s", result.Error) } + if !result.IsValid { + sendResult(p.overseer, request, types.ParticipationOutcomeInvalid) + return fmt.Errorf("validation failed: %s", result.Error) + } + + sendResult(p.overseer, request, types.ParticipationOutcomeValid) + return nil } var _ Participation = (*ParticipationHandler)(nil) -func NewParticipation(sender overseer.Sender, runtime parachain.RuntimeInstance) *ParticipationHandler { +func NewParticipation(overseer chan<- any, runtime parachain.RuntimeInstance) *ParticipationHandler { return &ParticipationHandler{ runningParticipation: sync.Map{}, queue: NewQueue(), - sender: sender, + overseer: overseer, runtime: runtime, } } - -func getBlockNumber(sender overseer.Sender, receipt parachainTypes.CandidateReceipt) (uint32, error) { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - tx := make(chan any, 1) - relayParent, err := receipt.Hash() - if err != nil { - return 0, fmt.Errorf("get hash: %w", err) - } - - if err := sender.SendMessage(overseer.ChainAPIMessage[overseer.BlockNumberRequest]{ - Message: overseer.BlockNumberRequest{Hash: relayParent}, - ResponseChannel: tx, - }); err != nil { - return 0, fmt.Errorf("send message: %w", err) - } - - select { - case result := <-tx: - blockNumber, ok := result.(*uint32) - if !ok { - return 0, fmt.Errorf("unexpected response type: %T", result) - } - if blockNumber == nil { - return 0, fmt.Errorf("empty result") - } - return *blockNumber, nil - case <-ctx.Done(): - return 0, ctx.Err() - } -} - -func sendResult(sender overseer.Sender, request ParticipationRequest, outcome types.ParticipationOutcomeType) { - participationOutcome, err := types.NewCustomParticipationOutcomeVDT(outcome) - if err != nil { - logger.Errorf( - "failed to create participation outcome: %s, error: %s", - outcome, - err, - ) - return - } - - statement := ParticipationStatement{ - Session: request.session, - CandidateHash: request.candidateHash, - CandidateReceipt: request.candidateReceipt, - Outcome: participationOutcome, - } - if err := sender.Feed(statement); err != nil { - logger.Errorf( - "failed to send participation result: %s, error: %s", - statement, - err, - ) - } -} diff --git a/dot/parachain/dispute/participation_test.go b/dot/parachain/dispute/participation_test.go index 74efbacefa..3c9b99e4b8 100644 --- a/dot/parachain/dispute/participation_test.go +++ b/dot/parachain/dispute/participation_test.go @@ -1,6 +1,7 @@ package dispute import ( + "context" "fmt" "github.com/ChainSafe/gossamer/pkg/scale" "sync" @@ -134,18 +135,18 @@ func activateLeaf( return nil } -func participate(participation Participation, context overseer.Context) error { +func participate(participation Participation, overseerChannel chan any) error { candidateCommitments := dummyCandidateCommitments() commitmentsHash, err := candidateCommitments.Hash() if err != nil { panic(err) } - return participateWithCommitmentsHash(participation, context, commitmentsHash) + return participateWithCommitmentsHash(participation, overseerChannel, commitmentsHash) } func participateWithCommitmentsHash( participation Participation, - context overseer.Context, + overseerChannel chan any, commitmentsHash common.Hash, ) error { candidateReceipt, err := dummyCandidateReceiptBadSignature(common.Hash{}, &common.Hash{}) @@ -166,16 +167,16 @@ func participateWithCommitmentsHash( session: session, } - return participation.Queue(context, participationRequest, ParticipationPriorityBestEffort) + return participation.Queue(overseerChannel, participationRequest, ParticipationPriorityBestEffort) } func TestNewParticipation(t *testing.T) { t.Parallel() ctrl := gomock.NewController(t) mockRuntime := NewMockRuntimeInstance(ctrl) - mockSender := NewMockSender(ctrl) + mockOverseer := make(chan any) - participation := NewParticipation(mockSender, mockRuntime) + participation := NewParticipation(mockOverseer, mockRuntime) require.NotNil(t, participation, "should not be nil") } @@ -186,49 +187,50 @@ func TestParticipationHandler_Queue(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - mockSender := NewMockSender(ctrl) + mockOverseer := make(chan any) mockRuntime := NewMockRuntimeInstance(ctrl) - participationHandler := NewParticipation(mockSender, mockRuntime) - ctx := overseer.Context{ - Sender: mockSender, - } + participationHandler := NewParticipation(mockOverseer, mockRuntime) + err := activateLeaf(participationHandler, parachainTypes.BlockNumber(11)) require.NoError(t, err) - mockSender.EXPECT().SendMessage(gomock.Any()).DoAndReturn(func(msg any) error { - switch message := msg.(type) { - case overseer.ChainAPIMessage[overseer.BlockNumberRequest]: - response := uint32(1) - message.ResponseChannel <- &response - case overseer.AvailabilityRecoveryMessage: - response := overseer.RecoveryErrorUnavailable - message.ResponseChannel <- overseer.AvailabilityRecoveryResponse{ - Error: &response, + go func() { + for { + select { + case msg := <-mockOverseer: + switch message := msg.(type) { + case overseer.ChainAPIMessage[overseer.BlockNumberRequest]: + response := uint32(1) + message.ResponseChannel <- response + case overseer.AvailabilityRecoveryMessage: + response := overseer.RecoveryErrorUnavailable + message.ResponseChannel <- overseer.AvailabilityRecoveryResponse{ + Error: &response, + } + case ParticipationStatement: + outcome, err := message.Outcome.Value() + require.NoError(t, err) + switch outcome.(type) { + case disputeTypes.UnAvailableOutcome: + continue + default: + panic("unexpected outcome") + } + default: + t.Errorf("unexpected message type: %T", msg) + return + } } - default: - return fmt.Errorf("unknown message type") } - return nil - }).Times(1) - mockSender.EXPECT().Feed(gomock.Any()).DoAndReturn(func(msg interface{}) error { - statement := msg.(ParticipationStatement) - outcome, err := statement.Outcome.Value() - require.NoError(t, err) - switch outcome.(type) { - case disputeTypes.UnAvailableOutcome: - return nil - default: - panic("unexpected outcome") - } - }) + }() - err = participate(participationHandler, ctx) + err = participate(participationHandler, mockOverseer) require.NoError(t, err) for i := 0; i < MaxParallelParticipation; i++ { - err = participate(participationHandler, ctx) + err = participate(participationHandler, mockOverseer) require.NoError(t, err) } @@ -241,24 +243,21 @@ func TestParticipationHandler_Queue(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - mockSender := NewMockSender(ctrl) + mockOverseer := make(chan any) mockRuntime := NewMockRuntimeInstance(ctrl) var wg sync.WaitGroup participationTest := func() { defer wg.Done() - participationHandler := NewParticipation(mockSender, mockRuntime) - ctx := overseer.Context{ - Sender: mockSender, - } + participationHandler := NewParticipation(mockOverseer, mockRuntime) err := activateLeaf(participationHandler, parachainTypes.BlockNumber(10)) require.NoError(t, err) - err = participate(participationHandler, ctx) + err = participate(participationHandler, mockOverseer) require.NoError(t, err) for i := 0; i < MaxParallelParticipation; i++ { - err = participateWithCommitmentsHash(participationHandler, ctx, common.Hash{byte(i)}) + err = participateWithCommitmentsHash(participationHandler, mockOverseer, common.Hash{byte(i)}) require.NoError(t, err) } @@ -270,43 +269,42 @@ func TestParticipationHandler_Queue(t *testing.T) { requestHandler := func() { defer wg.Done() - // sendMessage is called 4 times for the first 3+1 requests - // sendMessage is called once for getBlockNumber request - // feed is called 4 times for the requests while sending the results - mockSender.EXPECT().SendMessage(gomock.Any()).DoAndReturn(func(msg interface{}) error { - switch message := msg.(type) { - case overseer.ChainAPIMessage[overseer.BlockNumberRequest]: - response := uint32(1) - message.ResponseChannel <- &response - case overseer.AvailabilityRecoveryMessage: - response := overseer.RecoveryErrorUnavailable - message.ResponseChannel <- overseer.AvailabilityRecoveryResponse{ - Error: &response, + counter := 0 + for { + select { + case msg := <-mockOverseer: + if counter == 9 { + panic("too many requests") + } + counter++ + switch message := msg.(type) { + case overseer.ChainAPIMessage[overseer.BlockNumberRequest]: + response := uint32(1) + message.ResponseChannel <- response + case overseer.AvailabilityRecoveryMessage: + response := overseer.RecoveryErrorUnavailable + message.ResponseChannel <- overseer.AvailabilityRecoveryResponse{ + Error: &response, + } + case ParticipationStatement: + outcome, err := message.Outcome.Value() + require.NoError(t, err) + switch outcome.(type) { + case disputeTypes.UnAvailableOutcome: + return + default: + panic("invalid outcome") + } + default: + panic("unknown message type") } - default: - return fmt.Errorf("unknown message type") - } - - return nil - }).Times(5) - mockSender.EXPECT().Feed(gomock.Any()).DoAndReturn(func(msg interface{}) error { - statement := msg.(ParticipationStatement) - outcome, err := statement.Outcome.Value() - require.NoError(t, err) - switch outcome.(type) { - case disputeTypes.UnAvailableOutcome: - return nil - default: - panic("invalid outcome") } - }).Times(4) - - time.Sleep(2 * time.Second) + } } wg.Add(2) - go participationTest() go requestHandler() + go participationTest() wg.Wait() }) @@ -315,22 +313,17 @@ func TestParticipationHandler_Queue(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - mockSender := NewMockSender(ctrl) + mockOverseer := make(chan any) mockRuntime := NewMockRuntimeInstance(ctrl) - var wg sync.WaitGroup - waitTx := make(chan bool, 100) + var wg sync.WaitGroup participationTest := func() { defer wg.Done() - - participationHandler := NewParticipation(mockSender, mockRuntime) - context := overseer.Context{ - Sender: mockSender, - } + participationHandler := NewParticipation(mockOverseer, mockRuntime) go func() { - err := participate(participationHandler, context) + err := participate(participationHandler, mockOverseer) require.NoError(t, err) }() @@ -340,50 +333,68 @@ func TestParticipationHandler_Queue(t *testing.T) { err := activateLeaf(participationHandler, parachainTypes.BlockNumber(10)) require.NoError(t, err) - - time.Sleep(2 * time.Second) } // Responds to messages from the test and verifies its behaviour requestHandler := func() { defer wg.Done() - - // If we receive `Number` request this implicitly proves that the participation is queued - mockSender.EXPECT().SendMessage(gomock.Any()).DoAndReturn(func(msg interface{}) error { + select { + case msg := <-mockOverseer: switch message := msg.(type) { case overseer.ChainAPIMessage[overseer.BlockNumberRequest]: response := uint32(1) - message.ResponseChannel <- &response + message.ResponseChannel <- response + break + default: + panic("unknown message type") + } + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + select { + case _ = <-mockOverseer: + panic("should not receive any messages") + case <-ctx.Done(): + break + } + + // No activity so the participation is queued => unblock the test + waitTx <- true + + counter := 0 + select { + case msg := <-mockOverseer: + counter++ + switch message := msg.(type) { case overseer.AvailabilityRecoveryMessage: response := overseer.RecoveryErrorUnavailable message.ResponseChannel <- overseer.AvailabilityRecoveryResponse{ Error: &response, } + case ParticipationStatement: + outcome, err := message.Outcome.Value() + require.NoError(t, err) + switch outcome.(type) { + case disputeTypes.UnAvailableOutcome: + return + default: + panic("unexpected outcome") + } default: - return fmt.Errorf("unknown message type") - } - - return nil - }).Times(2) - mockSender.EXPECT().Feed(gomock.Any()).DoAndReturn(func(msg interface{}) error { - statement := msg.(ParticipationStatement) - outcome, err := statement.Outcome.Value() - require.NoError(t, err) - switch outcome.(type) { - case disputeTypes.UnAvailableOutcome: - return nil - default: - panic("unexpected outcome") + panic("unknown message type") } - }) + } - time.Sleep(5 * time.Second) - waitTx <- true + if counter == 3 { + return + } } wg.Add(2) - go participationTest() go requestHandler() + go participationTest() wg.Wait() }) @@ -392,45 +403,43 @@ func TestParticipationHandler_Queue(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - mockSender := NewMockSender(ctrl) + mockOverseer := make(chan any) mockRuntime := NewMockRuntimeInstance(ctrl) - participationHandler := NewParticipation(mockSender, mockRuntime) - ctx := overseer.Context{ - Sender: mockSender, - } + participationHandler := NewParticipation(mockOverseer, mockRuntime) err := activateLeaf(participationHandler, parachainTypes.BlockNumber(10)) require.NoError(t, err) - mockSender.EXPECT().SendMessage(gomock.Any()).DoAndReturn(func(msg interface{}) error { - switch message := msg.(type) { - case overseer.ChainAPIMessage[overseer.BlockNumberRequest]: - response := uint32(1) - message.ResponseChannel <- &response - case overseer.AvailabilityRecoveryMessage: - response := overseer.RecoveryErrorUnavailable - message.ResponseChannel <- overseer.AvailabilityRecoveryResponse{ - Error: &response, + go func() { + for { + select { + case msg := <-mockOverseer: + switch message := msg.(type) { + case overseer.ChainAPIMessage[overseer.BlockNumberRequest]: + response := uint32(1) + message.ResponseChannel <- response + case overseer.AvailabilityRecoveryMessage: + response := overseer.RecoveryErrorUnavailable + message.ResponseChannel <- overseer.AvailabilityRecoveryResponse{ + Error: &response, + } + case ParticipationStatement: + outcome, err := message.Outcome.Value() + require.NoError(t, err) + switch outcome.(type) { + case disputeTypes.UnAvailableOutcome: + continue + default: + panic("unexpected outcome") + } + default: + panic("unknown message type") + } } - default: - return fmt.Errorf("unknown message type") } + }() - return nil - }).Times(1) - mockSender.EXPECT().Feed(gomock.Any()).DoAndReturn(func(msg interface{}) error { - statement := msg.(ParticipationStatement) - outcome, err := statement.Outcome.Value() - require.NoError(t, err) - switch outcome.(type) { - case disputeTypes.UnAvailableOutcome: - return nil - default: - panic("unexpected outcome") - } - }) - - err = participate(participationHandler, ctx) + err = participate(participationHandler, mockOverseer) require.NoError(t, err) // sleep for a while to ensure we don't have any further results nor recovery requests @@ -442,53 +451,50 @@ func TestParticipationHandler_Queue(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - mockSender := NewMockSender(ctrl) + mockOverseer := make(chan any) mockRuntime := NewMockRuntimeInstance(ctrl) - - participationHandler := NewParticipation(mockSender, mockRuntime) - context := overseer.Context{ - Sender: mockSender, - } - - mockSender.EXPECT().SendMessage(gomock.Any()).DoAndReturn(func(msg interface{}) error { - switch message := msg.(type) { - case overseer.ChainAPIMessage[overseer.BlockNumberRequest]: - response := uint32(1) - message.ResponseChannel <- &response - case overseer.AvailabilityRecoveryMessage: - availableData := overseer.AvailableData{ - POV: []byte{}, - ValidationData: overseer.PersistedValidationData{}, - } - - message.ResponseChannel <- overseer.AvailabilityRecoveryResponse{ - AvailableData: &availableData, - Error: nil, + participationHandler := NewParticipation(mockOverseer, mockRuntime) + + go func() { + for { + select { + case msg := <-mockOverseer: + switch message := msg.(type) { + case overseer.ChainAPIMessage[overseer.BlockNumberRequest]: + response := uint32(1) + message.ResponseChannel <- response + case overseer.AvailabilityRecoveryMessage: + availableData := overseer.AvailableData{ + POV: []byte{}, + ValidationData: overseer.PersistedValidationData{}, + } + + message.ResponseChannel <- overseer.AvailabilityRecoveryResponse{ + AvailableData: &availableData, + Error: nil, + } + case ParticipationStatement: + outcome, err := message.Outcome.Value() + require.NoError(t, err) + switch outcome.(type) { + case disputeTypes.ErrorOutcome: + continue + default: + panic("unexpected outcome") + } + default: + panic("unknown message type") + } } - default: - return fmt.Errorf("unknown message type") } - - return nil - }) - mockSender.EXPECT().Feed(gomock.Any()).DoAndReturn(func(msg interface{}) error { - statement := msg.(ParticipationStatement) - outcome, err := statement.Outcome.Value() - require.NoError(t, err) - switch outcome.(type) { - case disputeTypes.ErrorOutcome: - return nil - default: - panic("unexpected outcome") - } - }) + }() mockRuntime.EXPECT().ParachainHostValidationCodeByHash(gomock.Any(), gomock.Any()). Return(nil, nil).Times(1) err := activateLeaf(participationHandler, parachainTypes.BlockNumber(10)) require.NoError(t, err) - err = participate(participationHandler, context) + err = participate(participationHandler, mockOverseer) require.NoError(t, err) // sleep for a while to ensure we don't have any further results nor recovery requests @@ -500,44 +506,45 @@ func TestParticipationHandler_Queue(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - mockSender := NewMockSender(ctrl) + mockOverseer := make(chan any) mockRuntime := NewMockRuntimeInstance(ctrl) - participationHandler := NewParticipation(mockSender, mockRuntime) - context := overseer.Context{ - Sender: mockSender, - } + participationHandler := NewParticipation(mockOverseer, mockRuntime) err := activateLeaf(participationHandler, parachainTypes.BlockNumber(10)) require.NoError(t, err) - err = participate(participationHandler, context) + err = participate(participationHandler, mockOverseer) require.NoError(t, err) - mockSender.EXPECT().SendMessage(gomock.Any()).DoAndReturn(func(msg interface{}) error { - switch message := msg.(type) { - case overseer.AvailabilityRecoveryMessage: - response := overseer.RecoveryErrorInvalid - message.ResponseChannel <- overseer.AvailabilityRecoveryResponse{ - Error: &response, + go func() { + for { + select { + case msg := <-mockOverseer: + switch message := msg.(type) { + case overseer.ChainAPIMessage[overseer.BlockNumberRequest]: + response := uint32(1) + message.ResponseChannel <- response + case overseer.AvailabilityRecoveryMessage: + response := overseer.RecoveryErrorInvalid + message.ResponseChannel <- overseer.AvailabilityRecoveryResponse{ + Error: &response, + } + case ParticipationStatement: + outcome, err := message.Outcome.Value() + require.NoError(t, err) + switch outcome.(type) { + case disputeTypes.InvalidOutcome: + continue + default: + panic("unexpected outcome") + } + default: + panic("unknown message type") + } } - default: - return fmt.Errorf("unknown message type") - } - - return nil - }) - mockSender.EXPECT().Feed(gomock.Any()).DoAndReturn(func(msg interface{}) error { - statement := msg.(ParticipationStatement) - outcome, err := statement.Outcome.Value() - require.NoError(t, err) - switch outcome.(type) { - case disputeTypes.InvalidOutcome: - return nil - default: - panic("unexpected outcome") } - }) + }() // sleep for a while to ensure we don't have any further results nor recovery requests time.Sleep(5 * time.Second) @@ -548,60 +555,58 @@ func TestParticipationHandler_Queue(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - mockSender := NewMockSender(ctrl) + mockOverseer := make(chan any) mockRuntime := NewMockRuntimeInstance(ctrl) - - participationHandler := NewParticipation(mockSender, mockRuntime) - context := overseer.Context{ - Sender: mockSender, - } + participationHandler := NewParticipation(mockOverseer, mockRuntime) err := activateLeaf(participationHandler, parachainTypes.BlockNumber(10)) require.NoError(t, err) - err = participate(participationHandler, context) + err = participate(participationHandler, mockOverseer) require.NoError(t, err) - mockSender.EXPECT().SendMessage(gomock.Any()).DoAndReturn(func(msg interface{}) error { - switch message := msg.(type) { - case overseer.ChainAPIMessage[overseer.BlockNumberRequest]: - response := uint32(1) - message.ResponseChannel <- &response - case overseer.ValidateFromChainState: - if message.PvfExecTimeoutKind == overseer.PvfExecTimeoutKindApproval { - message.ResponseChannel <- overseer.ValidationResult{ - IsValid: false, - Error: nil, + go func() { + for { + select { + case msg := <-mockOverseer: + switch message := msg.(type) { + case overseer.ChainAPIMessage[overseer.BlockNumberRequest]: + response := uint32(1) + message.ResponseChannel <- response + case overseer.ValidateFromChainState: + if message.PvfExecTimeoutKind == overseer.PvfExecTimeoutKindApproval { + message.ResponseChannel <- overseer.ValidationResult{ + IsValid: false, + Error: nil, + } + } + case overseer.AvailabilityRecoveryMessage: + availableData := overseer.AvailableData{ + POV: []byte{}, + ValidationData: overseer.PersistedValidationData{}, + } + + message.ResponseChannel <- overseer.AvailabilityRecoveryResponse{ + AvailableData: &availableData, + Error: nil, + } + + case ParticipationStatement: + outcome, err := message.Outcome.Value() + require.NoError(t, err) + switch outcome.(type) { + case disputeTypes.InvalidOutcome: + continue + default: + panic("unexpected outcome") + } + default: + panic("unknown message type") } } - case overseer.AvailabilityRecoveryMessage: - availableData := overseer.AvailableData{ - POV: []byte{}, - ValidationData: overseer.PersistedValidationData{}, - } - - message.ResponseChannel <- overseer.AvailabilityRecoveryResponse{ - AvailableData: &availableData, - Error: nil, - } - - default: - return fmt.Errorf("unknown message type") } + }() - return nil - }).Times(2) - mockSender.EXPECT().Feed(gomock.Any()).DoAndReturn(func(msg interface{}) error { - statement := msg.(ParticipationStatement) - outcome, err := statement.Outcome.Value() - require.NoError(t, err) - switch outcome.(type) { - case disputeTypes.InvalidOutcome: - return nil - default: - panic("unexpected outcome") - } - }) mockValidationCode := dummyValidationCode() mockRuntime.EXPECT().ParachainHostValidationCodeByHash(gomock.Any(), gomock.Any()). Return(&mockValidationCode, nil).Times(1) @@ -618,63 +623,64 @@ func TestParticipationHandler_Queue(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - mockSender := NewMockSender(ctrl) + mockOverseer := make(chan any) mockRuntime := NewMockRuntimeInstance(ctrl) - participationHandler := NewParticipation(mockSender, mockRuntime) - ctx := overseer.Context{ - Sender: mockSender, - } + participationHandler := NewParticipation(mockOverseer, mockRuntime) err := activateLeaf(participationHandler, parachainTypes.BlockNumber(10)) require.NoError(t, err) - mockSender.EXPECT().SendMessage(gomock.Any()).DoAndReturn(func(msg interface{}) error { - switch message := msg.(type) { - case overseer.ChainAPIMessage[overseer.BlockNumberRequest]: - response := uint32(1) - message.ResponseChannel <- &response - case overseer.AvailabilityRecoveryMessage: - availableData := overseer.AvailableData{ - POV: []byte{}, - ValidationData: overseer.PersistedValidationData{}, - } - - message.ResponseChannel <- overseer.AvailabilityRecoveryResponse{ - AvailableData: &availableData, - Error: nil, - } - case overseer.ValidateFromChainState: - if message.PvfExecTimeoutKind == overseer.PvfExecTimeoutKindApproval { - message.ResponseChannel <- overseer.ValidationResult{ - IsValid: false, - Error: nil, - InvalidResult: &overseer.InvalidValidationResult{ - Reason: "commitments hash mismatch", - }, + go func() { + for { + select { + case msg := <-mockOverseer: + switch message := msg.(type) { + case overseer.ChainAPIMessage[overseer.BlockNumberRequest]: + response := uint32(1) + message.ResponseChannel <- response + case overseer.AvailabilityRecoveryMessage: + availableData := overseer.AvailableData{ + POV: []byte{}, + ValidationData: overseer.PersistedValidationData{}, + } + + message.ResponseChannel <- overseer.AvailabilityRecoveryResponse{ + AvailableData: &availableData, + Error: nil, + } + case overseer.ValidateFromChainState: + if message.PvfExecTimeoutKind == overseer.PvfExecTimeoutKindApproval { + message.ResponseChannel <- overseer.ValidationResult{ + IsValid: false, + Error: nil, + InvalidResult: &overseer.InvalidValidationResult{ + Reason: "commitments hash mismatch", + }, + } + } else { + panic("unexpected message") + } + case ParticipationStatement: + outcome, err := message.Outcome.Value() + require.NoError(t, err) + switch outcome.(type) { + case disputeTypes.InvalidOutcome: + continue + default: + panic("unexpected outcome") + } + default: + panic("unknown message type") } } - default: - return fmt.Errorf("unknown message type") } + }() - return nil - }).Times(2) - mockSender.EXPECT().Feed(gomock.Any()).DoAndReturn(func(msg interface{}) error { - statement := msg.(ParticipationStatement) - outcome, err := statement.Outcome.Value() - require.NoError(t, err) - switch outcome.(type) { - case disputeTypes.InvalidOutcome: - return nil - default: - panic("unexpected outcome") - } - }) mockValidationCode := dummyValidationCode() mockRuntime.EXPECT().ParachainHostValidationCodeByHash(gomock.Any(), gomock.Any()). Return(&mockValidationCode, nil).Times(1) - err = participate(participationHandler, ctx) + err = participate(participationHandler, mockOverseer) require.NoError(t, err) // sleep for a while to ensure we don't have any further results nor recovery requests @@ -686,64 +692,65 @@ func TestParticipationHandler_Queue(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - mockSender := NewMockSender(ctrl) + mockOverseer := make(chan any) mockRuntime := NewMockRuntimeInstance(ctrl) - participationHandler := NewParticipation(mockSender, mockRuntime) - ctx := overseer.Context{ - Sender: mockSender, - } + participationHandler := NewParticipation(mockOverseer, mockRuntime) err := activateLeaf(participationHandler, parachainTypes.BlockNumber(10)) require.NoError(t, err) - mockSender.EXPECT().SendMessage(gomock.Any()).DoAndReturn(func(msg interface{}) error { - switch message := msg.(type) { - case overseer.ChainAPIMessage[overseer.BlockNumberRequest]: - response := uint32(1) - message.ResponseChannel <- &response - case overseer.AvailabilityRecoveryMessage: - availableData := overseer.AvailableData{ - POV: []byte{}, - ValidationData: overseer.PersistedValidationData{}, - } - - message.ResponseChannel <- overseer.AvailabilityRecoveryResponse{ - AvailableData: &availableData, - Error: nil, - } - case overseer.ValidateFromChainState: - if message.PvfExecTimeoutKind == overseer.PvfExecTimeoutKindApproval { - message.ResponseChannel <- overseer.ValidationResult{ - IsValid: true, - Error: nil, - ValidResult: &overseer.ValidValidationResult{ - CandidateCommitments: parachainTypes.CandidateCommitments{}, - PersistedValidationData: parachainTypes.PersistedValidationData{}, - }, + go func() { + for { + select { + case msg := <-mockOverseer: + switch message := msg.(type) { + case overseer.ChainAPIMessage[overseer.BlockNumberRequest]: + response := uint32(1) + message.ResponseChannel <- response + case overseer.AvailabilityRecoveryMessage: + availableData := overseer.AvailableData{ + POV: []byte{}, + ValidationData: overseer.PersistedValidationData{}, + } + + message.ResponseChannel <- overseer.AvailabilityRecoveryResponse{ + AvailableData: &availableData, + Error: nil, + } + case overseer.ValidateFromChainState: + if message.PvfExecTimeoutKind == overseer.PvfExecTimeoutKindApproval { + message.ResponseChannel <- overseer.ValidationResult{ + IsValid: true, + Error: nil, + ValidResult: &overseer.ValidValidationResult{ + CandidateCommitments: parachainTypes.CandidateCommitments{}, + PersistedValidationData: parachainTypes.PersistedValidationData{}, + }, + } + } else { + panic("unexpected message") + } + case ParticipationStatement: + outcome, err := message.Outcome.Value() + require.NoError(t, err) + switch outcome.(type) { + case disputeTypes.ValidOutcome: + continue + default: + panic("unexpected outcome") + } + default: + panic("unknown message type") } } - default: - return fmt.Errorf("unknown message type") } + }() - return nil - }).Times(2) - mockSender.EXPECT().Feed(gomock.Any()).DoAndReturn(func(msg interface{}) error { - statement := msg.(ParticipationStatement) - outcome, err := statement.Outcome.Value() - require.NoError(t, err) - switch outcome.(type) { - case disputeTypes.ValidOutcome: - return nil - default: - panic("unexpected outcome") - } - }) mockValidationCode := dummyValidationCode() mockRuntime.EXPECT().ParachainHostValidationCodeByHash(gomock.Any(), gomock.Any()). Return(&mockValidationCode, nil).Times(1) - err = participate(participationHandler, ctx) + err = participate(participationHandler, mockOverseer) require.NoError(t, err) // sleep for a while to ensure we don't have any further results nor recovery requests diff --git a/dot/parachain/dispute/scraping/comm.go b/dot/parachain/dispute/scraping/comm.go new file mode 100644 index 0000000000..7c22419b07 --- /dev/null +++ b/dot/parachain/dispute/scraping/comm.go @@ -0,0 +1,82 @@ +package scraping + +import ( + "context" + "fmt" + "github.com/ChainSafe/gossamer/dot/parachain/dispute/overseer" + "github.com/ChainSafe/gossamer/lib/common" + "time" +) + +const timeout = 10 * time.Second + +// getFinalisedBlockNumber sends a message to the overseer to get the finalised block number. +func getFinalisedBlockNumber(overseerChannel chan<- any) (uint32, error) { + message := overseer.ChainAPIMessage[overseer.FinalizedBlockNumberRequest]{ + ResponseChannel: make(chan any, 1), + } + res, err := call(overseerChannel, message, message.ResponseChannel) + if err != nil { + return 0, fmt.Errorf("sending message to get finalised block number: %w", err) + } + + response, ok := res.(overseer.BlockNumberResponse) + if !ok { + return 0, fmt.Errorf("getting finalised block number: got unexpected response type %T", res) + } + + if response.Err != nil { + return 0, fmt.Errorf("getting finalised block number: %w", response.Err) + } + + return response.Number, nil +} + +// getBlockAncestors sends a message to the overseer to get the ancestors of a block. +func getBlockAncestors( + overseerChannel chan<- any, + head common.Hash, + numAncestors uint32, +) ([]common.Hash, error) { + respChan := make(chan any, 1) + message := overseer.ChainAPIMessage[overseer.AncestorsRequest]{ + Message: overseer.AncestorsRequest{ + Hash: head, + K: numAncestors, + }, + ResponseChannel: respChan, + } + res, err := call(overseerChannel, message, message.ResponseChannel) + if err != nil { + return nil, fmt.Errorf("sending message to get block ancestors: %w", err) + } + + response, ok := res.(overseer.AncestorsResponse) + if !ok { + return nil, fmt.Errorf("getting block ancestors: got unexpected response type %T", res) + } + if response.Error != nil { + return nil, fmt.Errorf("getting block ancestors: %w", response.Error) + } + + return response.Ancestors, nil +} + +func call(channel chan<- any, message any, responseChan chan any) (any, error) { + // Send with timeout + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + select { + case channel <- message: + case <-ctx.Done(): + return nil, ctx.Err() + } + + select { + case response := <-responseChan: + return response, nil + case <-ctx.Done(): + return nil, ctx.Err() + } +} diff --git a/dot/parachain/dispute/scraping/mocks_overseer_test.go b/dot/parachain/dispute/scraping/mocks_overseer_test.go deleted file mode 100644 index 334c6403f8..0000000000 --- a/dot/parachain/dispute/scraping/mocks_overseer_test.go +++ /dev/null @@ -1,76 +0,0 @@ -// Code generated by MockGen. DO NOT EDIT. -// Source: github.com/ChainSafe/gossamer/dot/parachain/dispute/overseer (interfaces: Sender) - -// Package parachain is a generated GoMock package. -package scraping - -import ( - reflect "reflect" - - gomock "github.com/golang/mock/gomock" -) - -// MockSender is a mock of Sender interface. -type MockSender struct { - ctrl *gomock.Controller - recorder *MockSenderMockRecorder -} - -// MockSenderMockRecorder is the mock recorder for MockSender. -type MockSenderMockRecorder struct { - mock *MockSender -} - -// NewMockSender creates a new mock instance. -func NewMockSender(ctrl *gomock.Controller) *MockSender { - mock := &MockSender{ctrl: ctrl} - mock.recorder = &MockSenderMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockSender) EXPECT() *MockSenderMockRecorder { - return m.recorder -} - -// Feed mocks base method. -func (m *MockSender) Feed(arg0 interface{}) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Feed", arg0) - ret0, _ := ret[0].(error) - return ret0 -} - -// Feed indicates an expected call of Feed. -func (mr *MockSenderMockRecorder) Feed(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Feed", reflect.TypeOf((*MockSender)(nil).Feed), arg0) -} - -// SendMessage mocks base method. -func (m *MockSender) SendMessage(arg0 interface{}) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SendMessage", arg0) - ret0, _ := ret[0].(error) - return ret0 -} - -// SendMessage indicates an expected call of SendMessage. -func (mr *MockSenderMockRecorder) SendMessage(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMessage", reflect.TypeOf((*MockSender)(nil).SendMessage), arg0) -} - -// SendUnboundedMessage mocks base method. -func (m *MockSender) SendUnboundedMessage(arg0 interface{}) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SendUnboundedMessage", arg0) - ret0, _ := ret[0].(error) - return ret0 -} - -// SendUnboundedMessage indicates an expected call of SendUnboundedMessage. -func (mr *MockSenderMockRecorder) SendUnboundedMessage(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendUnboundedMessage", reflect.TypeOf((*MockSender)(nil).SendUnboundedMessage), arg0) -} diff --git a/dot/parachain/dispute/scraping/scraping.go b/dot/parachain/dispute/scraping/scraping.go index 50edf7287a..39962b411f 100644 --- a/dot/parachain/dispute/scraping/scraping.go +++ b/dot/parachain/dispute/scraping/scraping.go @@ -2,7 +2,6 @@ package scraping import ( "fmt" - "github.com/ChainSafe/gossamer/dot/parachain/dispute/overseer" "github.com/ChainSafe/gossamer/dot/parachain/dispute/types" parachain "github.com/ChainSafe/gossamer/dot/parachain/runtime" @@ -57,14 +56,14 @@ func (cs *ChainScraper) GetBlocksIncludingCandidate(candidateHash common.Hash) [ // ProcessActiveLeavesUpdate Process active leaves update func (cs *ChainScraper) ProcessActiveLeavesUpdate( - sender overseer.Sender, + overseerChannel chan<- any, update overseer.ActiveLeavesUpdate, ) (*parachainTypes.ScrapedUpdates, error) { if update.Activated == nil { return ¶chainTypes.ScrapedUpdates{}, nil } - ancestors, err := cs.GetRelevantBlockAncestors(sender, update.Activated.Hash, update.Activated.Number) + ancestors, err := cs.GetRelevantBlockAncestors(overseerChannel, update.Activated.Hash, update.Activated.Number) if err != nil { return nil, fmt.Errorf("getting relevant block ancestors: %w", err) } @@ -169,11 +168,11 @@ func (cs *ChainScraper) ProcessCandidateEvents( // GetRelevantBlockAncestors Get relevant block ancestors func (cs *ChainScraper) GetRelevantBlockAncestors( - sender overseer.Sender, + overseerChannel chan<- any, head common.Hash, headNumber uint32, ) ([]common.Hash, error) { - targetAncestor, err := getFinalisedBlockNumber(sender) + targetAncestor, err := getFinalisedBlockNumber(overseerChannel) if err != nil { return nil, fmt.Errorf("getting finalised block number: %w", err) } @@ -186,7 +185,7 @@ func (cs *ChainScraper) GetRelevantBlockAncestors( } for { - hashes, err := getBlockAncestors(sender, head, AncestryChunkSize) + hashes, err := getBlockAncestors(overseerChannel, head, AncestryChunkSize) if err != nil { return nil, fmt.Errorf("getting block ancestors: %w", err) } @@ -236,7 +235,7 @@ func (cs *ChainScraper) IsPotentialSpam(voteState types.CandidateVoteState, cand // NewChainScraper New chain scraper func NewChainScraper( - sender overseer.Sender, + overseerChannel chan<- any, runtime parachain.RuntimeInstance, initialHead *overseer.ActivatedLeaf, ) (*ChainScraper, *parachainTypes.ScrapedUpdates, error) { @@ -251,7 +250,7 @@ func NewChainScraper( update := overseer.ActiveLeavesUpdate{ Activated: initialHead, } - updates, err := chainScraper.ProcessActiveLeavesUpdate(sender, update) + updates, err := chainScraper.ProcessActiveLeavesUpdate(overseerChannel, update) if err != nil { return nil, nil, fmt.Errorf("processing active leaves update: %w", err) } @@ -259,60 +258,6 @@ func NewChainScraper( return chainScraper, updates, nil } -// getFinalisedBlockNumber sends a message to the overseer to get the finalised block number. -func getFinalisedBlockNumber(sender overseer.Sender) (uint32, error) { - tx := make(chan any, 1) - err := sender.SendMessage(overseer.ChainAPIMessage[overseer.FinalizedBlockNumberRequest]{ - ResponseChannel: tx, - }) - if err != nil { - return 0, fmt.Errorf("sending message to get finalised block number: %w", err) - } - - data := <-tx - response, ok := data.(overseer.BlockNumberResponse) - if !ok { - return 0, fmt.Errorf("getting finalised block number: got unexpected response type %T", data) - } - - if response.Err != nil { - return 0, fmt.Errorf("getting finalised block number: %w", response.Err) - } - - return response.Number, nil -} - -// getBlockAncestors sends a message to the overseer to get the ancestors of a block. -func getBlockAncestors( - sender overseer.Sender, - head common.Hash, - numAncestors uint32, -) ([]common.Hash, error) { - tx := make(chan any, 1) - message := overseer.AncestorsRequest{ - Hash: head, - K: numAncestors, - } - err := sender.SendMessage(overseer.ChainAPIMessage[overseer.AncestorsRequest]{ - Message: message, - ResponseChannel: tx, - }) - if err != nil { - return nil, fmt.Errorf("sending message to get block ancestors: %w", err) - } - - data := <-tx - response, ok := data.(overseer.AncestorsResponse) - if !ok { - return nil, fmt.Errorf("getting block ancestors: got unexpected response type %T", data) - } - if response.Error != nil { - return nil, fmt.Errorf("getting block ancestors: %w", response.Error) - } - - return response.Ancestors, nil -} - // saturatingSub returns the result of a - b, saturating at 0. func saturatingSub(a, b uint32) uint32 { if a > b { diff --git a/dot/parachain/dispute/scraping/scraping_test.go b/dot/parachain/dispute/scraping/scraping_test.go index 217c15c950..9f3f5329f7 100644 --- a/dot/parachain/dispute/scraping/scraping_test.go +++ b/dot/parachain/dispute/scraping/scraping_test.go @@ -104,7 +104,7 @@ func configureMockExpectations( func configureMockOverseer( t *testing.T, - sender *MockSender, + overseerChannel chan any, chain *[]common.Hash, messages expectedMessages, finalisedBlock uint32, @@ -113,50 +113,53 @@ func configureMockOverseer( finalisedBlockRequestCalls = 0 ancestorRequestCalls = 0 ) - sender.EXPECT().SendMessage(gomock.Any()).DoAndReturn(func(msg any) error { - switch request := msg.(type) { - case overseer.ChainAPIMessage[overseer.FinalizedBlockNumberRequest]: - require.Less(t, finalisedBlockRequestCalls, messages.finalisedBlockRequests) - result := finalisedBlock - if finalisedBlockRequestCalls == 0 { - result = 0 - } - finalisedBlockRequestCalls++ - response := overseer.BlockNumberResponse{ - Number: result, - Err: nil, - } - request.ResponseChannel <- response - case overseer.ChainAPIMessage[overseer.AncestorsRequest]: - require.Less(t, ancestorRequestCalls, messages.ancestorRequests) - ancestorRequestCalls++ - maybeBlockPosition := -1 - for idx, h := range *chain { - if h == request.Message.Hash { - maybeBlockPosition = idx - break + for { + select { + case msg := <-overseerChannel: + switch request := msg.(type) { + case overseer.ChainAPIMessage[overseer.FinalizedBlockNumberRequest]: + require.LessOrEqual(t, finalisedBlockRequestCalls, messages.finalisedBlockRequests) + result := finalisedBlock + if finalisedBlockRequestCalls == 0 { + result = 0 } - } + finalisedBlockRequestCalls++ - var ancestors []common.Hash - if maybeBlockPosition != -1 { - ancestors = make([]common.Hash, 0) - for i := maybeBlockPosition - 1; i >= 0 && i >= maybeBlockPosition-int(request.Message.K); i-- { - ancestors = append(ancestors, (*chain)[i]) + response := overseer.BlockNumberResponse{ + Number: result, + Err: nil, + } + request.ResponseChannel <- response + case overseer.ChainAPIMessage[overseer.AncestorsRequest]: + require.LessOrEqual(t, ancestorRequestCalls, messages.ancestorRequests) + ancestorRequestCalls++ + maybeBlockPosition := -1 + for idx, h := range *chain { + if h == request.Message.Hash { + maybeBlockPosition = idx + break + } + } + + var ancestors []common.Hash + if maybeBlockPosition != -1 { + ancestors = make([]common.Hash, 0) + for i := maybeBlockPosition - 1; i >= 0 && i >= maybeBlockPosition-int(request.Message.K); i-- { + ancestors = append(ancestors, (*chain)[i]) + } } - } - response := overseer.AncestorsResponse{ - Ancestors: ancestors, - Error: nil, + response := overseer.AncestorsResponse{ + Ancestors: ancestors, + Error: nil, + } + request.ResponseChannel <- response + default: + t.Errorf("unexpected message type: %T", request) } - request.ResponseChannel <- response - default: - return fmt.Errorf("unknown request type") } - return nil - }).Times(messages.finalisedBlockRequests + messages.ancestorRequests) + } } func mockBackedCandidateEvent(blockHash common.Hash) (*scale.VaryingDataTypeSlice, error) { @@ -323,7 +326,7 @@ func configureMockRuntime( func newTestState( t *testing.T, - sender *MockSender, + overseerChannel chan any, runtime *MockRuntimeInstance, messages expectedMessages, calls expectedRuntimeCalls, @@ -331,10 +334,10 @@ func newTestState( eventGenerator func(blockHash common.Hash, chain *[]common.Hash) (*scale.VaryingDataTypeSlice, error), ) (*ChainScraper, *[]common.Hash) { chain := []common.Hash{getBlockNumberHash(0), getBlockNumberHash(1)} - configureMockOverseer(t, sender, &chain, messages, finalisedBlock) + go configureMockOverseer(t, overseerChannel, &chain, messages, finalisedBlock) configureMockRuntime(runtime, &chain, calls, eventGenerator) - scraper, _, err := NewChainScraper(sender, runtime, dummyActivatedLeaf(1)) + scraper, _, err := NewChainScraper(overseerChannel, runtime, dummyActivatedLeaf(1)) require.NoError(t, err) return scraper, &chain } @@ -346,7 +349,7 @@ func TestChainScraper(t *testing.T) { t.Parallel() ctrl := gomock.NewController(t) mockRuntime := NewMockRuntimeInstance(ctrl) - mockSender := NewMockSender(ctrl) + mockOverseer := make(chan any) candidate1, err := dummyCandidateReceipt(getBlockNumberHash(1)).Hash() require.NoError(t, err) @@ -358,7 +361,7 @@ func TestChainScraper(t *testing.T) { messages, calls := configureMockExpectations([]int{expectedAncestryLength}) scraper, chain := newTestState(t, - mockSender, + mockOverseer, mockRuntime, messages, calls, @@ -374,7 +377,7 @@ func TestChainScraper(t *testing.T) { nextLeaf := getNextLeaf(t, chain) nextUpdate := overseer.ActiveLeavesUpdate{Activated: nextLeaf} - _, err = scraper.ProcessActiveLeavesUpdate(mockSender, nextUpdate) + _, err = scraper.ProcessActiveLeavesUpdate(mockOverseer, nextUpdate) require.NoError(t, err) require.True(t, scraper.IsCandidateIncluded(candidate2)) require.True(t, scraper.IsCandidateBacked(candidate2)) @@ -387,19 +390,19 @@ func TestChainScraper(t *testing.T) { ctrl := gomock.NewController(t) mockRuntime := NewMockRuntimeInstance(ctrl) - mockSender := NewMockSender(ctrl) + mockOverseer := make(chan any) finalisedBlock := uint32(0) expectedAncestryLength := int(BlocksToSkip - finalisedBlock) messages, calls := configureMockExpectations([]int{expectedAncestryLength}) - scraper, chain := newTestState(t, mockSender, mockRuntime, messages, calls, finalisedBlock, mockCandidateEvents) + scraper, chain := newTestState(t, mockOverseer, mockRuntime, messages, calls, finalisedBlock, mockCandidateEvents) var nextLeaf *overseer.ActivatedLeaf for i := 0; i < BlocksToSkip; i++ { nextLeaf = getNextLeaf(t, chain) } nextUpdate := overseer.ActiveLeavesUpdate{Activated: nextLeaf} - _, err := scraper.ProcessActiveLeavesUpdate(mockSender, nextUpdate) + _, err := scraper.ProcessActiveLeavesUpdate(mockOverseer, nextUpdate) require.NoError(t, err) nextBlockNumber := len(*chain) @@ -417,25 +420,25 @@ func TestChainScraper(t *testing.T) { ctrl := gomock.NewController(t) mockRuntime := NewMockRuntimeInstance(ctrl) - mockSender := NewMockSender(ctrl) + mockOverseer := make(chan any) finalisedBlock := uint32(0) messages, calls := configureMockExpectations(BlocksToSkip) - scraper, chain := newTestState(t, mockSender, mockRuntime, messages, calls, finalisedBlock, mockCandidateEvents) + scraper, chain := newTestState(t, mockOverseer, mockRuntime, messages, calls, finalisedBlock, mockCandidateEvents) var nextLeaf *overseer.ActivatedLeaf for i := 0; i < BlocksToSkip[0]; i++ { nextLeaf = getNextLeaf(t, chain) } nextUpdate := overseer.ActiveLeavesUpdate{Activated: nextLeaf} - _, err := scraper.ProcessActiveLeavesUpdate(mockSender, nextUpdate) + _, err := scraper.ProcessActiveLeavesUpdate(mockOverseer, nextUpdate) require.NoError(t, err) for i := 0; i < BlocksToSkip[1]; i++ { nextLeaf = getNextLeaf(t, chain) } nextUpdate = overseer.ActiveLeavesUpdate{Activated: nextLeaf} - _, err = scraper.ProcessActiveLeavesUpdate(mockSender, nextUpdate) + _, err = scraper.ProcessActiveLeavesUpdate(mockOverseer, nextUpdate) require.NoError(t, err) }) @@ -446,12 +449,12 @@ func TestChainScraper(t *testing.T) { ctrl := gomock.NewController(t) mockRuntime := NewMockRuntimeInstance(ctrl) - mockSender := NewMockSender(ctrl) + mockOverseer := make(chan any) finalisedBlock := uint32(17) expectedAncestryLength := int(BlocksToSkip - (finalisedBlock - DisputeCandidateLifetimeAfterFinalization)) messages, calls := configureMockExpectations([]int{expectedAncestryLength}) - scraper, chain := newTestState(t, mockSender, mockRuntime, messages, calls, finalisedBlock, mockCandidateEvents) + scraper, chain := newTestState(t, mockOverseer, mockRuntime, messages, calls, finalisedBlock, mockCandidateEvents) var nextLeaf *overseer.ActivatedLeaf // 1 because `TestState` starts at leaf 1. @@ -459,7 +462,7 @@ func TestChainScraper(t *testing.T) { nextLeaf = getNextLeaf(t, chain) } nextUpdate := overseer.ActiveLeavesUpdate{Activated: nextLeaf} - _, err := scraper.ProcessActiveLeavesUpdate(mockSender, nextUpdate) + _, err := scraper.ProcessActiveLeavesUpdate(mockOverseer, nextUpdate) require.NoError(t, err) }) @@ -472,13 +475,13 @@ func TestChainScraper(t *testing.T) { ctrl := gomock.NewController(t) mockRuntime := NewMockRuntimeInstance(ctrl) - mockSender := NewMockSender(ctrl) + mockOverseer := make(chan any) finalisedBlock := uint32(1) expectedAncestryLength := BlocksToSkip - int(finalisedBlock) messages, calls := configureMockExpectations([]int{expectedAncestryLength}) scraper, chain := newTestState(t, - mockSender, + mockOverseer, mockRuntime, messages, calls, @@ -499,7 +502,7 @@ func TestChainScraper(t *testing.T) { nextLeaf = getNextLeaf(t, chain) } nextUpdate := overseer.ActiveLeavesUpdate{Activated: nextLeaf} - _, err := scraper.ProcessActiveLeavesUpdate(mockSender, nextUpdate) + _, err := scraper.ProcessActiveLeavesUpdate(mockOverseer, nextUpdate) require.NoError(t, err) finalisedBlockNumber := TargetBlockNumber + DisputeCandidateLifetimeAfterFinalization @@ -522,13 +525,13 @@ func TestChainScraper(t *testing.T) { ctrl := gomock.NewController(t) mockRuntime := NewMockRuntimeInstance(ctrl) - mockSender := NewMockSender(ctrl) + mockOverseer := make(chan any) finalisedBlock := uint32(1) expectedAncestryLength := BlocksToSkip - int(finalisedBlock) messages, calls := configureMockExpectations([]int{expectedAncestryLength}) scraper, chain := newTestState(t, - mockSender, + mockOverseer, mockRuntime, messages, calls, @@ -550,7 +553,7 @@ func TestChainScraper(t *testing.T) { nextLeaf = getNextLeaf(t, chain) } nextUpdate := overseer.ActiveLeavesUpdate{Activated: nextLeaf} - _, err := scraper.ProcessActiveLeavesUpdate(mockSender, nextUpdate) + _, err := scraper.ProcessActiveLeavesUpdate(mockOverseer, nextUpdate) require.NoError(t, err) finalisedBlock++ @@ -579,12 +582,12 @@ func TestChainScraper(t *testing.T) { ctrl := gomock.NewController(t) mockRuntime := NewMockRuntimeInstance(ctrl) - mockSender := NewMockSender(ctrl) + mockOverseer := make(chan any) finalisedBlock := uint32(1) expectedAncestryLength := BlocksToSkip - int(finalisedBlock) messages, calls := configureMockExpectations([]int{expectedAncestryLength}) - scraper, chain := newTestState(t, mockSender, + scraper, chain := newTestState(t, mockOverseer, mockRuntime, messages, calls, @@ -610,7 +613,7 @@ func TestChainScraper(t *testing.T) { nextLeaf = getNextLeaf(t, chain) } nextUpdate := overseer.ActiveLeavesUpdate{Activated: nextLeaf} - _, err := scraper.ProcessActiveLeavesUpdate(mockSender, nextUpdate) + _, err := scraper.ProcessActiveLeavesUpdate(mockOverseer, nextUpdate) require.NoError(t, err) // Finalize blocks to enforce pruning of scraped events. @@ -642,12 +645,12 @@ func TestChainScraper(t *testing.T) { ctrl := gomock.NewController(t) mockRuntime := NewMockRuntimeInstance(ctrl) - mockSender := NewMockSender(ctrl) + mockOverseer := make(chan any) finalisedBlock := uint32(1) expectedAncestryLength := BlocksToSkip - int(finalisedBlock) messages, calls := configureMockExpectations([]int{expectedAncestryLength}) - scraper, chain := newTestState(t, mockSender, + scraper, chain := newTestState(t, mockOverseer, mockRuntime, messages, calls, @@ -673,7 +676,7 @@ func TestChainScraper(t *testing.T) { nextLeaf = getNextLeaf(t, chain) } nextUpdate := overseer.ActiveLeavesUpdate{Activated: nextLeaf} - _, err := scraper.ProcessActiveLeavesUpdate(mockSender, nextUpdate) + _, err := scraper.ProcessActiveLeavesUpdate(mockOverseer, nextUpdate) require.NoError(t, err) candidateHash, err := dummyCandidateReceipt(getBlockNumberHash(testTarget1)).Hash() diff --git a/dot/parachain/dispute/types/message.go b/dot/parachain/dispute/types/message.go index 7183642e14..91d8372156 100644 --- a/dot/parachain/dispute/types/message.go +++ b/dot/parachain/dispute/types/message.go @@ -232,21 +232,16 @@ func NewDisputeMessage( // ImportStatementsMessage import statements by validators about a candidate type ImportStatementsMessage struct { - CandidateReceipt parachainTypes.CandidateReceipt - Session parachainTypes.SessionIndex - Statements []Statement - PendingConfirmation overseer.Sender + CandidateReceipt parachainTypes.CandidateReceipt + Session parachainTypes.SessionIndex + Statements []Statement } // RecentDisputesMessage message to request recent disputes -type RecentDisputesMessage struct { - Sender overseer.Sender -} +type RecentDisputesMessage struct{} // ActiveDisputesMessage message to request active disputes -type ActiveDisputesMessage struct { - Sender overseer.Sender -} +type ActiveDisputesMessage struct{} // CandidateVotesMessage message to request candidate votes type CandidateVotesMessage struct { @@ -256,7 +251,6 @@ type CandidateVotesMessage struct { // QueryCandidateVotesMessage message to request candidate votes type QueryCandidateVotesMessage struct { - Sender overseer.Sender Queries []CandidateVotesMessage } @@ -286,7 +280,6 @@ type BlockDescription struct { type DetermineUndisputedChainMessage struct { Base overseer.Block BlockDescriptions []BlockDescription - Tx overseer.Sender } // Message messages to be handled in this subsystem.