Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
kanishkatn committed Nov 25, 2023
1 parent f709d78 commit 8a2933d
Show file tree
Hide file tree
Showing 22 changed files with 405 additions and 494 deletions.
4 changes: 0 additions & 4 deletions dot/parachain/collator-protocol/mocks_generate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,5 @@

package collatorprotocol

<<<<<<<< HEAD:dot/parachain/collator-protocol/mocks_generate_test.go
//go:generate mockgen -destination=mocks_test.go -package=$GOPACKAGE . Network
========
//go:generate mockgen -destination=mocks_test.go -package=$GOPACKAGE . PoVRequestor
//go:generate mockgen -destination=mocks_runtime_test.go -package $GOPACKAGE github.com/ChainSafe/gossamer/dot/parachain/runtime RuntimeInstance
>>>>>>>> 7c50d399 (moved `lib/parachain` to `dot/parachain` (#3429)):dot/parachain/mocks_generate_test.go
6 changes: 3 additions & 3 deletions dot/parachain/dispute/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/ChainSafe/gossamer/lib/common"
)

// Backend is the backend for the dispute coordinator module.
// Backend is the backend for the disputes coordinator module.
type Backend interface {
// GetEarliestSession returns the earliest session index, if any.
GetEarliestSession() (*parachainTypes.SessionIndex, error)
Expand All @@ -28,7 +28,7 @@ type Backend interface {
SetCandidateVotes(session parachainTypes.SessionIndex, candidateHash common.Hash, votes *types.CandidateVotes) error
}

// OverlayBackend is the overlay backend for the dispute coordinator module.
// OverlayBackend is the overlay backend for the disputes coordinator module.
type OverlayBackend interface {
Backend

Expand All @@ -42,7 +42,7 @@ type OverlayBackend interface {
NoteEarliestSession(session parachainTypes.SessionIndex) error
}

// DBBackend is the backend for the dispute coordinator module that uses a database.
// DBBackend is the backend for the disputes coordinator module that uses a database.
type DBBackend interface {
Backend

Expand Down
15 changes: 6 additions & 9 deletions dot/parachain/dispute/comm.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ func getBlockNumber(overseerChannel chan<- any, receipt parachainTypes.Candidate
return 0, fmt.Errorf("get hash: %w", err)
}

message := overseer.ChainAPIMessage[overseer.BlockNumberRequest]{
Message: overseer.BlockNumberRequest{Hash: relayParent},
message := overseer.ChainAPIMessage[overseer.BlockNumber]{
Message: overseer.BlockNumber{Hash: relayParent},
ResponseChannel: respCh,
}
result, err := call(overseerChannel, message, message.ResponseChannel)
Expand Down Expand Up @@ -48,16 +48,13 @@ func sendMessage(channel chan<- any, message any) error {
}

func call(channel chan<- any, message any, responseChan chan any) (any, error) {
// Send with timeout
if err := sendMessage(channel, message); err != nil {
return nil, fmt.Errorf("send message: %w", err)
}

ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

select {
case channel <- message:
case <-ctx.Done():
return nil, ctx.Err()
}

select {
case response := <-responseChan:
return response, nil
Expand Down
145 changes: 74 additions & 71 deletions dot/parachain/dispute/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,31 +24,12 @@ type Coordinator struct {
store *overlayBackend
runtime parachain.RuntimeInstance

sender chan<- any
overseer chan<- any
receiver <-chan any
}

func (d *Coordinator) Run() error {
initResult, err := d.initialize()
if err != nil {
return fmt.Errorf("initialize dispute coordinator: %w", err)
}

initData := InitialData{
Participation: initResult.participation,
Votes: initResult.votes,
Leaf: initResult.activatedLeaf,
}

if err := initResult.initialized.Run(d.sender, d.store.inner, &initData); err != nil {
return fmt.Errorf("run initialized state: %w", err)
}

return nil
}

type startupResult struct {
participation []ParticipationRequestWithPriority
participation []ParticipationData
votes []parachainTypes.ScrapedOnChainVotes
spamSlots SpamSlots
orderingProvider scraping.ChainScraper
Expand All @@ -57,20 +38,69 @@ type startupResult struct {
}

type initializeResult struct {
participation []ParticipationRequestWithPriority
participation []ParticipationData
votes []parachainTypes.ScrapedOnChainVotes
activatedLeaf *overseer.ActivatedLeaf
initialized *Initialized

Check failure on line 44 in dot/parachain/dispute/coordinator.go

View workflow job for this annotation

GitHub Actions / linting

`initialized` is a misspelling of `initialised` (misspell)
}

func (d *Coordinator) sendDisputeMessages(
env types.CandidateEnvironment,
voteState types.CandidateVoteState,
) {
ownVotes, err := voteState.Own.Votes()
if err != nil {
logger.Errorf("get own votes: %s", err)
return
}

for _, vote := range ownVotes {
keypair, err := types.GetValidatorKeyPair(d.keystore, env.Session.Validators, vote.ValidatorIndex)
if err != nil {
logger.Errorf("get validator key pair: %s", err)
continue
}

candidateHash, err := voteState.Votes.CandidateReceipt.Hash()
if err != nil {
logger.Errorf("get candidate hash: %s", err)
continue
}

isValid, err := vote.DisputeStatement.IsValid()
if err != nil {
logger.Errorf("check if dispute statement is valid: %s", err)
continue
}

signedDisputeStatement, err := types.NewSignedDisputeStatement(keypair, isValid, candidateHash, env.SessionIndex)
if err != nil {
logger.Errorf("create signed dispute statement: %s", err)
continue
}

disputeMessage, err := types.NewDisputeMessage(keypair, voteState.Votes, &signedDisputeStatement, vote.ValidatorIndex, env.Session)
if err != nil {
logger.Errorf("create dispute message: %s", err)
continue
}

if err := sendMessage(d.overseer, disputeMessage); err != nil {
logger.Errorf("send dispute message: %s", err)
}
}
}

func (d *Coordinator) waitForFirstLeaf() (*overseer.ActivatedLeaf, error) {

Check failure on line 94 in dot/parachain/dispute/coordinator.go

View workflow job for this annotation

GitHub Actions / linting

(*Coordinator).waitForFirstLeaf - result 1 (error) is always nil (unparam)
// TODO: handle other messages
for {
select {
case overseerMessage := <-d.receiver:
switch message := overseerMessage.(type) {
case overseer.Signal[overseer.ActiveLeavesUpdate]:
return message.Data.Activated, nil
default:
logger.Warnf("Received message before first active leaves update. "+
"This is not expected - message will be dropped. %T", message)
}
}
}
Expand Down Expand Up @@ -100,7 +130,7 @@ func (d *Coordinator) initialize() (
participation: startupData.participation,
votes: startupData.votes,
activatedLeaf: firstLeaf,
initialized: NewInitializedState(d.sender,
initialized: NewInitializedState(d.overseer,

Check failure on line 133 in dot/parachain/dispute/coordinator.go

View workflow job for this annotation

GitHub Actions / linting

`initialized` is a misspelling of `initialised` (misspell)
d.runtime,
startupData.spamSlots,
&startupData.orderingProvider,
Expand All @@ -127,28 +157,29 @@ func (d *Coordinator) handleStartup(initialHead *overseer.ActivatedLeaf) (
}

gapsInCache := false
for idx := highestSession - (Window - 1); idx <= highestSession; idx++ {
_, err = d.runtime.ParachainHostSessionInfo(initialHead.Hash, idx)
if err != nil {
for idx := saturatingSub(uint32(highestSession), Window-1); idx <= uint32(highestSession); idx++ {
sessionInfo, err := d.runtime.ParachainHostSessionInfo(initialHead.Hash, parachainTypes.SessionIndex(idx))
if err != nil || sessionInfo == nil {
logger.Debugf("no session info for session %d", idx)
gapsInCache = true
continue
}
}

// prune obsolete disputes
if err := d.store.NoteEarliestSession(highestSession); err != nil {
earliestSession := parachainTypes.SessionIndex(saturatingSub(uint32(highestSession), Window-1))
if err := d.store.NoteEarliestSession(earliestSession); err != nil {
return nil, fmt.Errorf("note earliest session: %w", err)
}

// for every dispute in activeDisputes
// get candidate votes
// check if it is a potential spam
// participate if needed, if not distribute the own vote
var participationRequests []ParticipationRequestWithPriority
var participationRequests []ParticipationData
spamDisputes := make(map[unconfirmedKey]*treeset.Set)
leafHash := initialHead.Hash
scraper, scrapedVotes, err := scraping.NewChainScraper(d.sender, d.runtime, initialHead)
scraper, scrapedVotes, err := scraping.NewChainScraper(d.overseer, d.runtime, initialHead)
if err != nil {
return nil, fmt.Errorf("new chain scraper: %w", err)
}
Expand Down Expand Up @@ -206,7 +237,7 @@ func (d *Coordinator) handleStartup(initialHead *overseer.ActivatedLeaf) (
priority = ParticipationPriorityBestEffort
}

participationRequests = append(participationRequests, ParticipationRequestWithPriority{
participationRequests = append(participationRequests, ParticipationData{
request: ParticipationRequest{
candidateHash: dispute.Comparator.CandidateHash,
candidateReceipt: voteState.Votes.CandidateReceipt,
Expand All @@ -232,51 +263,23 @@ func (d *Coordinator) handleStartup(initialHead *overseer.ActivatedLeaf) (
}, nil
}

func (d *Coordinator) sendDisputeMessages(
env types.CandidateEnvironment,
voteState types.CandidateVoteState,
) {
ownVotes, err := voteState.Own.Votes()
func (d *Coordinator) Run() error {
initResult, err := d.initialize()
if err != nil {
logger.Errorf("get own votes: %s", err)
return
return fmt.Errorf("initialize dispute coordinator: %w", err)
}

for _, vote := range ownVotes {
keypair, err := types.GetValidatorKeyPair(d.keystore, env.Session.Validators, vote.ValidatorIndex)
if err != nil {
logger.Errorf("get validator key pair: %s", err)
continue
}

candidateHash, err := voteState.Votes.CandidateReceipt.Hash()
if err != nil {
logger.Errorf("get candidate hash: %s", err)
continue
}

isValid, err := vote.DisputeStatement.IsValid()
if err != nil {
logger.Errorf("check if dispute statement is valid: %s", err)
continue
}

signedDisputeStatement, err := types.NewSignedDisputeStatement(keypair, isValid, candidateHash, env.SessionIndex)
if err != nil {
logger.Errorf("create signed dispute statement: %s", err)
continue
}

disputeMessage, err := types.NewDisputeMessage(keypair, voteState.Votes, &signedDisputeStatement, vote.ValidatorIndex, env.Session)
if err != nil {
logger.Errorf("create dispute message: %s", err)
continue
}
initData := InitialData{
Participation: initResult.participation,
Votes: initResult.votes,
Leaf: initResult.activatedLeaf,
}

if err := sendMessage(d.sender, disputeMessage); err != nil {
logger.Errorf("send dispute message: %s", err)
}
if err := initResult.initialized.Run(d.overseer, d.store.inner, &initData); err != nil {
return fmt.Errorf("run initialized state: %w", err)

Check failure on line 279 in dot/parachain/dispute/coordinator.go

View workflow job for this annotation

GitHub Actions / linting

`initialized` is a misspelling of `initialised` (misspell)
}

return nil
}

func NewDisputeCoordinator(path string) (*Coordinator, error) {
Expand Down
6 changes: 3 additions & 3 deletions dot/parachain/dispute/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,12 +241,12 @@ func (b *BadgerBackend) setVotesCleanupTxn(txn *badger.Txn, earliestSession para
return fmt.Errorf("get watermark: %w", err)
}

cleanUntil := earliestSession - watermark
cleanUntil := saturatingSub(uint32(earliestSession), uint32(watermark))
if cleanUntil > MaxCleanBatchSize {
cleanUntil = MaxCleanBatchSize
}

for i := watermark; i < cleanUntil; i++ {
for i := watermark; i < parachainTypes.SessionIndex(cleanUntil); i++ {
prefix := newCandidateVotesSessionPrefix(i)
it := txn.NewIterator(badger.DefaultIteratorOptions)

Expand All @@ -263,7 +263,7 @@ func (b *BadgerBackend) setVotesCleanupTxn(txn *badger.Txn, earliestSession para
}

// new watermark
if err := b.setWatermarkTxn(txn, cleanUntil); err != nil {
if err := b.setWatermarkTxn(txn, parachainTypes.SessionIndex(cleanUntil)); err != nil {
return fmt.Errorf("set watermark: %w", err)
}

Expand Down
Loading

0 comments on commit 8a2933d

Please sign in to comment.