Skip to content

Commit

Permalink
use channels
Browse files Browse the repository at this point in the history
  • Loading branch information
kanishkatn committed Nov 23, 2023
1 parent 643b672 commit f709d78
Show file tree
Hide file tree
Showing 16 changed files with 812 additions and 921 deletions.
93 changes: 93 additions & 0 deletions dot/parachain/dispute/comm.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package dispute

import (
"context"
"fmt"
"github.com/ChainSafe/gossamer/dot/parachain/dispute/overseer"
"github.com/ChainSafe/gossamer/dot/parachain/dispute/types"
parachainTypes "github.com/ChainSafe/gossamer/dot/parachain/types"
"time"
)

const timeout = 10 * time.Second

func getBlockNumber(overseerChannel chan<- any, receipt parachainTypes.CandidateReceipt) (uint32, error) {
respCh := make(chan any, 1)
relayParent, err := receipt.Hash()
if err != nil {
return 0, fmt.Errorf("get hash: %w", err)
}

message := overseer.ChainAPIMessage[overseer.BlockNumberRequest]{
Message: overseer.BlockNumberRequest{Hash: relayParent},
ResponseChannel: respCh,
}
result, err := call(overseerChannel, message, message.ResponseChannel)
if err != nil {
return 0, fmt.Errorf("send message: %w", err)
}

blockNumber, ok := result.(uint32)
if !ok {
return 0, fmt.Errorf("unexpected response type: %T", result)
}
return blockNumber, nil
}

func sendMessage(channel chan<- any, message any) error {
// Send with timeout
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

select {
case channel <- message:
return nil
case <-ctx.Done():
return ctx.Err()
}
}

func call(channel chan<- any, message any, responseChan chan any) (any, error) {
// Send with timeout
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

select {
case channel <- message:
case <-ctx.Done():
return nil, ctx.Err()
}

select {
case response := <-responseChan:
return response, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}

func sendResult(overseerChannel chan<- any, request ParticipationRequest, outcome types.ParticipationOutcomeType) {
participationOutcome, err := types.NewCustomParticipationOutcomeVDT(outcome)
if err != nil {
logger.Errorf(
"failed to create participation outcome: %s, error: %s",
outcome,
err,
)
return
}

statement := ParticipationStatement{
Session: request.session,
CandidateHash: request.candidateHash,
CandidateReceipt: request.candidateReceipt,
Outcome: participationOutcome,
}
if err := sendMessage(overseerChannel, statement); err != nil {
logger.Errorf(
"failed to send participation statement: %s, error: %s",
statement,
err,
)
}
}
80 changes: 37 additions & 43 deletions dot/parachain/dispute/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,18 @@ const Window = 6

var logger = log.NewFromGlobal(log.AddContext("parachain", "disputes"))

// CoordinatorSubsystem is the dispute coordinator subsystem interface.
type CoordinatorSubsystem interface {
// Run runs the dispute coordinator subsystem.
Run(context overseer.Context) error
}

// disputeCoordinator implements the CoordinatorSubsystem interface.
type disputeCoordinator struct {
// Coordinator implements the CoordinatorSubsystem interface.
type Coordinator struct {
keystore keystore.Keystore
store *overlayBackend
runtime parachain.RuntimeInstance

sender chan<- any
receiver <-chan any
}

func (d *disputeCoordinator) Run(context overseer.Context) error {
initResult, err := d.initialize(context)
func (d *Coordinator) Run() error {
initResult, err := d.initialize()
if err != nil {
return fmt.Errorf("initialize dispute coordinator: %w", err)
}
Expand All @@ -43,7 +40,7 @@ func (d *disputeCoordinator) Run(context overseer.Context) error {
Leaf: initResult.activatedLeaf,
}

if err := initResult.initialized.Run(context, d.store.inner, &initData); err != nil {
if err := initResult.initialized.Run(d.sender, d.store.inner, &initData); err != nil {
return fmt.Errorf("run initialized state: %w", err)
}

Expand All @@ -66,11 +63,11 @@ type initializeResult struct {
initialized *Initialized
}

func (d *disputeCoordinator) waitForFirstLeaf(context overseer.Context) (*overseer.ActivatedLeaf, error) {
func (d *Coordinator) waitForFirstLeaf() (*overseer.ActivatedLeaf, error) {
// TODO: handle other messages
for {
select {
case overseerMessage := <-context.Receiver:
case overseerMessage := <-d.receiver:
switch message := overseerMessage.(type) {
case overseer.Signal[overseer.ActiveLeavesUpdate]:
return message.Data.Activated, nil
Expand All @@ -79,16 +76,16 @@ func (d *disputeCoordinator) waitForFirstLeaf(context overseer.Context) (*overse
}
}

func (d *disputeCoordinator) initialize(context overseer.Context) (
func (d *Coordinator) initialize() (
*initializeResult,
error,
) {
firstLeaf, err := d.waitForFirstLeaf(context)
firstLeaf, err := d.waitForFirstLeaf()
if err != nil {
return nil, fmt.Errorf("wait for first leaf: %w", err)
}

startupData, err := d.handleStartup(context, firstLeaf)
startupData, err := d.handleStartup(firstLeaf)
if err != nil {
return nil, fmt.Errorf("handle startup: %w", err)
}
Expand All @@ -103,7 +100,7 @@ func (d *disputeCoordinator) initialize(context overseer.Context) (
participation: startupData.participation,
votes: startupData.votes,
activatedLeaf: firstLeaf,
initialized: NewInitializedState(context.Sender,
initialized: NewInitializedState(d.sender,
d.runtime,
startupData.spamSlots,
&startupData.orderingProvider,
Expand All @@ -113,7 +110,7 @@ func (d *disputeCoordinator) initialize(context overseer.Context) (
}, nil
}

func (d *disputeCoordinator) handleStartup(context overseer.Context, initialHead *overseer.ActivatedLeaf) (
func (d *Coordinator) handleStartup(initialHead *overseer.ActivatedLeaf) (
*startupResult,
error,
) {
Expand Down Expand Up @@ -151,7 +148,7 @@ func (d *disputeCoordinator) handleStartup(context overseer.Context, initialHead
var participationRequests []ParticipationRequestWithPriority
spamDisputes := make(map[unconfirmedKey]*treeset.Set)
leafHash := initialHead.Hash
scraper, scrapedVotes, err := scraping.NewChainScraper(context.Sender, d.runtime, initialHead)
scraper, scrapedVotes, err := scraping.NewChainScraper(d.sender, d.runtime, initialHead)
if err != nil {
return nil, fmt.Errorf("new chain scraper: %w", err)
}
Expand Down Expand Up @@ -202,26 +199,24 @@ func (d *disputeCoordinator) handleStartup(context overseer.Context, initialHead
spamDisputes[disputeKey] = treeset.NewWithIntComparator()
}
spamDisputes[disputeKey].Add(voteState.Votes.VotedIndices())
} else {
if voteState.Own.VoteMissing() {
logger.Tracef("found valid dispute, with no vote from us on startup - participating. %s")
priority := ParticipationPriorityHigh
if !isIncluded {
priority = ParticipationPriorityBestEffort
}

participationRequests = append(participationRequests, ParticipationRequestWithPriority{
request: ParticipationRequest{
candidateHash: dispute.Comparator.CandidateHash,
candidateReceipt: voteState.Votes.CandidateReceipt,
session: dispute.Comparator.SessionIndex,
},
priority: priority,
})
} else {
logger.Tracef("found valid dispute, with vote from us on startup - distributing. %s")
d.sendDisputeMessages(context, *env, voteState)
} else if voteState.Own.VoteMissing() {
logger.Tracef("found valid dispute, with no vote from us on startup - participating. %s")
priority := ParticipationPriorityHigh
if !isIncluded {
priority = ParticipationPriorityBestEffort
}

participationRequests = append(participationRequests, ParticipationRequestWithPriority{
request: ParticipationRequest{
candidateHash: dispute.Comparator.CandidateHash,
candidateReceipt: voteState.Votes.CandidateReceipt,
session: dispute.Comparator.SessionIndex,
},
priority: priority,
})
} else {
logger.Tracef("found valid dispute, with vote from us on startup - distributing. %s")
d.sendDisputeMessages(*env, voteState)
}

return true
Expand All @@ -237,8 +232,7 @@ func (d *disputeCoordinator) handleStartup(context overseer.Context, initialHead
}, nil
}

func (d *disputeCoordinator) sendDisputeMessages(
context overseer.Context,
func (d *Coordinator) sendDisputeMessages(
env types.CandidateEnvironment,
voteState types.CandidateVoteState,
) {
Expand Down Expand Up @@ -279,13 +273,13 @@ func (d *disputeCoordinator) sendDisputeMessages(
continue
}

if err := context.Sender.SendMessage(disputeMessage); err != nil {
if err := sendMessage(d.sender, disputeMessage); err != nil {
logger.Errorf("send dispute message: %s", err)
}
}
}

func NewDisputeCoordinator(path string) (CoordinatorSubsystem, error) {
func NewDisputeCoordinator(path string) (*Coordinator, error) {
db, err := badger.Open(badger.DefaultOptions(path))
if err != nil {
return nil, fmt.Errorf("open badger db: %w", err)
Expand All @@ -294,7 +288,7 @@ func NewDisputeCoordinator(path string) (CoordinatorSubsystem, error) {
dbBackend := NewDBBackend(db)
backend := newOverlayBackend(dbBackend)

return &disputeCoordinator{
return &Coordinator{
store: backend,
}, nil
}
15 changes: 4 additions & 11 deletions dot/parachain/dispute/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@ import (
"testing"
)

type VirtualOverseer struct {
sender *MockSender
}

type TestState struct {
validators []keystore.KeyPair
validatorPublic []parachaintypes.ValidatorID
Expand All @@ -30,7 +26,7 @@ type TestState struct {
lastBlock common.Hash
knownSession *parachaintypes.SessionIndex
db database.Database
sender *MockSender
overseer chan any
runtime *MockRuntimeInstance
}

Expand Down Expand Up @@ -97,9 +93,6 @@ func newTestState(t *testing.T) *TestState {
headers[lastBlock] = genesisHeader
blockNumToHeader[0] = lastBlock

ctrl := gomock.NewController(t)
sender := NewMockSender(ctrl)

return &TestState{
validators: validators,
validatorPublic: validatorPublic,
Expand All @@ -111,7 +104,7 @@ func newTestState(t *testing.T) *TestState {
lastBlock: lastBlock,
knownSession: nil,
db: db,
sender: sender,
overseer: make(chan any, 1),
}
}

Expand Down Expand Up @@ -174,7 +167,7 @@ func (ts *TestState) activateLeafAtSession(t *testing.T,
Number: uint32(blockNumber),
Status: overseer.LeafStatusFresh,
}
err := ts.sender.SendMessage(overseer.Signal[overseer.ActiveLeavesUpdate]{
err := sendMessage(ts.overseer, overseer.Signal[overseer.ActiveLeavesUpdate]{
Data: overseer.ActiveLeavesUpdate{Activated: &activatedLeaf},
})
require.NoError(t, err)
Expand Down Expand Up @@ -260,7 +253,7 @@ func (ts *TestState) mockResumeSyncWithEvents(t *testing.T,
Number: uint32(i),
Status: overseer.LeafStatusFresh,
}
err := ts.sender.SendMessage(overseer.Signal[overseer.ActiveLeavesUpdate]{
err := sendMessage(ts.overseer, overseer.Signal[overseer.ActiveLeavesUpdate]{
Data: overseer.ActiveLeavesUpdate{Activated: &activatedLeaf},
})
require.NoError(t, err)
Expand Down
Loading

0 comments on commit f709d78

Please sign in to comment.