Skip to content

Commit

Permalink
add participation
Browse files Browse the repository at this point in the history
  • Loading branch information
kanishkatn committed Jul 11, 2023
1 parent 39d1b68 commit ace8c36
Show file tree
Hide file tree
Showing 9 changed files with 520 additions and 18 deletions.
13 changes: 13 additions & 0 deletions dot/parachain/dispute/overseer/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// TODO: This is just a temporary file to complete the participation module. The type definitions here are not complete.
// We need to remove this file once we have implemented the overseer.

package overseer

type Sender interface {
SendMessage(msg any) error
Feed(msg any) error
}

type Context struct {
Sender Sender
}
15 changes: 15 additions & 0 deletions dot/parachain/dispute/overseer/leaf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// TODO: This is just a temporary file to complete the participation module. The type definitions here are not complete.
// We need to remove this file once we have implemented the leaf update interfaces

package overseer

import "github.com/ChainSafe/gossamer/lib/common"

type ActivatedLeaf struct {
Hash common.Hash
Number uint32
}

type ActiveLeavesUpdate struct {
Activated *ActivatedLeaf
}
42 changes: 42 additions & 0 deletions dot/parachain/dispute/overseer/message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package overseer

import (
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/parachain"
)

type ChainAPIMessage struct {
RelayParent common.Hash
ResponseChannel chan *uint32
}

type PersistedValidationData struct {
ParentHead []byte
RelayParentNumber uint32
RelayParentStorageRoot common.Hash
MaxPOVSize uint32
}

type AvailableData struct {
POV []byte
ValidationData PersistedValidationData
}

type RecoveryError uint32

const (
RecoveryErrorInvalid RecoveryError = iota
RecoveryErrorUnavailable
)

type AvailabilityRecoveryResponse struct {
AvailableData *AvailableData
Error RecoveryError
}

type AvailabilityRecoveryMessage struct {
CandidateReceipt parachain.CandidateReceipt
SessionIndex parachain.SessionIndex
GroupIndex *uint32
ResponseChannel chan AvailabilityRecoveryResponse
}
296 changes: 296 additions & 0 deletions dot/parachain/dispute/participation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,296 @@
package dispute

import (
"fmt"
"github.com/ChainSafe/gossamer/dot/parachain/dispute/overseer"
"github.com/ChainSafe/gossamer/dot/parachain/dispute/types"
"github.com/ChainSafe/gossamer/internal/log"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/parachain"
"github.com/ChainSafe/gossamer/pkg/scale"
"sync"
"sync/atomic"
)

// CandidateComparator comparator for ordering of disputes for candidate.
type CandidateComparator struct {
relayParentBlockNumber *uint32
candidateHash common.Hash
}

// NewCandidateComparator creates a new CandidateComparator.
func NewCandidateComparator(relayParentBlockNumber *uint32, receipt parachain.CandidateReceipt) (CandidateComparator, error) {
encodedReceipt, err := scale.Marshal(receipt)
if err != nil {
return CandidateComparator{}, fmt.Errorf("encode candidate receipt: %w", err)
}

candidateHash, err := common.Blake2bHash(encodedReceipt)
if err != nil {
return CandidateComparator{}, fmt.Errorf("hash candidate receipt: %w", err)
}

return CandidateComparator{
relayParentBlockNumber: relayParentBlockNumber,
candidateHash: candidateHash,
}, nil
}

// ParticipationRequest a dispute participation request
type ParticipationRequest struct {
candidateHash common.Hash
candidateReceipt parachain.CandidateReceipt
session parachain.SessionIndex
//TODO: requestTimer for metrics
}

// ParticipationStatement is a statement as result of the validation process.
type ParticipationStatement struct {
Session parachain.SessionIndex
CandidateHash common.Hash
CandidateReceipt parachain.CandidateReceipt
Outcome types.ParticipationOutcome
}

// Participation keeps track of the disputes we need to participate in.
type Participation interface {
// Queue a dispute for the node to participate in
Queue(context overseer.Context, request ParticipationRequest, priority ParticipationPriority) error

// Clear clears a participation request. This is called when we have the dispute result.
Clear(candidateHash common.Hash) error

// ProcessActiveLeavesUpdate processes an active leaves update
ProcessActiveLeavesUpdate(update overseer.ActiveLeavesUpdate) error

// BumpPriority bumps the priority for the given receipts
BumpPriority(ctx overseer.Context, receipts []parachain.CandidateReceipt) error
}

type block struct {
Number uint32
Hash common.Hash
}

// ParticipationHandler handles dispute participation.
type ParticipationHandler struct {
runningParticipation sync.Map
workers atomic.Int32

queue Queue
sender overseer.Sender // TODO: revisit this once we have the overseer
recentBlock *block

//TODO: metrics
}

const MaxParallelParticipation = 3

func (p *ParticipationHandler) Queue(ctx overseer.Context, request ParticipationRequest, priority ParticipationPriority) error {
if _, ok := p.runningParticipation.Load(request.candidateHash); ok {
return nil
}

// if we already have a recent block, participate right away
if p.recentBlock != nil && p.numberOfWorkers() < MaxParallelParticipation {
if err := p.forkParticipation(&request, p.recentBlock.Hash); err != nil {
return fmt.Errorf("fork ParticipationHandler: %w", err)
}

return nil
}

blockNumber, err := getBlockNumber(ctx.Sender, request.candidateReceipt)
if err != nil {
return fmt.Errorf("get block number: %w", err)
}

comparator, err := NewCandidateComparator(&blockNumber, request.candidateReceipt)
if err != nil {
return fmt.Errorf("create candidate comparator: %w", err)
}

if err := p.queue.Queue(comparator, &request, priority); err != nil {
return fmt.Errorf("queue ParticipationHandler request: %w", err)
}

return nil
}

func (p *ParticipationHandler) Clear(candidateHash common.Hash) error {
p.runningParticipation.Delete(candidateHash)
p.workers.Add(-1)

if p.recentBlock == nil {
panic("we never ever reset recentBlock to nil and we already received a result, so it must have been set before. qed")
}

if err := p.dequeueUntilCapacity(p.recentBlock.Hash); err != nil {
return fmt.Errorf("dequeue until capacity: %w", err)
}

return nil
}

func (p *ParticipationHandler) ProcessActiveLeavesUpdate(update overseer.ActiveLeavesUpdate) error {
if update.Activated == nil {
return nil
}

if p.recentBlock == nil {
p.recentBlock = &block{
Number: update.Activated.Number,
Hash: update.Activated.Hash,
}

if err := p.dequeueUntilCapacity(update.Activated.Hash); err != nil {
return fmt.Errorf("dequeue until capacity: %w", err)
}
} else {
if update.Activated.Number > p.recentBlock.Number {
p.recentBlock.Number = update.Activated.Number
p.recentBlock.Hash = update.Activated.Hash
}
}

return nil
}

func (p *ParticipationHandler) BumpPriority(ctx overseer.Context, receipts []parachain.CandidateReceipt) error {
for _, receipt := range receipts {
blockNumber, err := getBlockNumber(ctx.Sender, receipt)
if err != nil {
log.Errorf("failed to get block number: %s", err)
continue
}

comparator, err := NewCandidateComparator(&blockNumber, receipt)
if err != nil {
log.Errorf("failed to create candidate comparator: %s", err)
continue
}

if err := p.queue.PrioritiseIfPresent(comparator); err != nil {
log.Errorf("failed to prioritise candidate comparator: %s", err)
}
}

return nil
}

func (p *ParticipationHandler) numberOfWorkers() int {
return int(p.workers.Load())
}

func (p *ParticipationHandler) dequeueUntilCapacity(recentHead common.Hash) error {
for {
if p.numberOfWorkers() >= MaxParallelParticipation {
break
}

if request := p.queue.Dequeue(); request != nil {
if err := p.forkParticipation(request.request, recentHead); err != nil {
log.Errorf("failed to fork ParticipationHandler: %s", err)
}
} else {
break
}
}

return nil
}

func (p *ParticipationHandler) forkParticipation(request *ParticipationRequest, recentHead common.Hash) error {
_, ok := p.runningParticipation.LoadOrStore(request.candidateHash, nil)
if ok {
return nil
}

p.workers.Add(1)
go func() {
if err := p.participate(recentHead, *request); err != nil {
log.Errorf("failed to participate in dispute: %s", err)
}
}()

return nil
}

func (p *ParticipationHandler) participate(blockHash common.Hash, request ParticipationRequest) error {
// get available data from the sender
availableDataTx := make(chan overseer.AvailabilityRecoveryResponse, 1)
if err := p.sender.SendMessage(overseer.AvailabilityRecoveryMessage{
CandidateReceipt: request.candidateReceipt,
SessionIndex: request.session,
GroupIndex: nil,
ResponseChannel: availableDataTx,
}); err != nil {
return fmt.Errorf("send availability recovery message: %w", err)
}

availableData := <-availableDataTx
switch availableData.Error {
case overseer.RecoveryErrorInvalid:
sendResult(p.sender, request, types.ParticipationOutcomeInvalid)
case overseer.RecoveryErrorUnavailable:
sendResult(p.sender, request, types.ParticipationOutcomeValid)
default:
return fmt.Errorf("unexpected recovery error: %d", availableData.Error)
}

// TODO: get validation code from the runtime

// TODO: validate the request and send the result

panic("not implemented")
}

var _ Participation = &ParticipationHandler{}

func NewParticipation(sender overseer.Sender) *ParticipationHandler {
return &ParticipationHandler{
runningParticipation: sync.Map{},
queue: NewQueue(),
sender: sender,
}
}

func getBlockNumber(sender overseer.Sender, receipt parachain.CandidateReceipt) (uint32, error) {
tx := make(chan *uint32, 1)
relayParent, err := receipt.Hash()
if err != nil {
return 0, fmt.Errorf("get hash: %w", err)
}

if err := sender.SendMessage(overseer.ChainAPIMessage{
RelayParent: relayParent,
ResponseChannel: tx,
}); err != nil {
return 0, fmt.Errorf("send message: %w", err)
}

result := <-tx
if result == nil {
return 0, fmt.Errorf("failed to get block number")
}

return *result, nil
}

func sendResult(sender overseer.Sender, request ParticipationRequest, outcome types.ParticipationOutcomeType) {
participationOutcome, err := types.NewCustomParticipationOutcome(outcome)
if err != nil {
log.Errorf("failed to create participation outcome: %s", err)
return
}

statement := ParticipationStatement{
Session: request.session,
CandidateHash: request.candidateHash,
CandidateReceipt: request.candidateReceipt,
Outcome: participationOutcome,
}
if err := sender.Feed(statement); err != nil {
log.Errorf("sending back participation result failed. Dispute coordinator not working properly!: %s", err)
}
}
Loading

0 comments on commit ace8c36

Please sign in to comment.