Skip to content

Commit

Permalink
go/worker/compute/executor: Prioritize authoritative discrepancy events
Browse files Browse the repository at this point in the history
  • Loading branch information
peternose committed Oct 4, 2023
1 parent f870326 commit 88ff0dc
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 23 deletions.
50 changes: 39 additions & 11 deletions go/worker/compute/executor/committee/discrepancy.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ import (
)

type discrepancyEvent struct {
rank uint64
height uint64
rank uint64
height uint64
authoritative bool
}

func (n *Node) NotifyDiscrepancy(info *discrepancyEvent) {
Expand All @@ -23,44 +24,70 @@ func (n *Node) NotifyDiscrepancy(info *discrepancyEvent) {
n.discrepancyCh <- info
}

func (n *Node) handleDiscrepancy(ctx context.Context, info *discrepancyEvent) {
func (n *Node) handleDiscrepancy(ctx context.Context, ev *discrepancyEvent) {
n.logger.Warn("execution discrepancy detected",
"rank", info.rank,
"height", info.height,
"rank", ev.rank,
"height", ev.height,
"authoritative", ev.authoritative,
)

crash.Here(crashPointDiscrepancyDetectedAfter)

discrepancyDetectedCount.With(n.getMetricLabels()).Inc()

// Make sure that the runtime has synced this consensus block.
err := n.rt.ConsensusSync(ctx, info.height)
err := n.rt.ConsensusSync(ctx, ev.height)
if err != nil {
n.logger.Warn("failed to ask the runtime to sync the latest consensus block",
"err", err,
"height", info.height,
"height", ev.height,
)
}

// Always prioritize authoritative discrepancy events because they originate
// from the consensus layer, are final, and limited to at most one per round.
// Non-authoritative events, on the other hand, are merely estimates used
// by the backup workers.
if n.discrepancy != nil && n.discrepancy.authoritative {
return
}

n.discrepancy = ev
}

func (n *Node) handleObservedExecutorCommitment(ctx context.Context, ec *commitment.ExecutorCommitment) {
// Don't do anything if we are not a backup worker or we are an executor worker.
id := n.commonNode.Identity.NodeSigner.Public()
if !n.committee.IsBackupWorker(id) || n.committee.IsWorker(id) {
return
}

n.logger.Debug("observed executor commitment",
"commitment", ec,
)

// TODO: Handle equivocation detection.

// Don't do anything if the discrepancy has already been detected.
if n.commitPool.Discrepancy {
n.logger.Debug("ignoring bad observed executor commitment, discrepancy already detected",
"node_id", ec.NodeID,
)
return
}

// Verify and add the commitment.
rt := n.epoch.GetRuntime()
if err := commitment.VerifyExecutorCommitment(ctx, n.blockInfo.RuntimeBlock, rt, n.committee.ValidFor, ec, nil, n.epoch); err != nil {
n.logger.Debug("ignoring bad observed executor commitment",
n.logger.Debug("ignoring bad observed executor commitment, verification failed",
"err", err,
"node_id", ec.NodeID,
)
return
}

if err := n.commitPool.AddVerifiedExecutorCommitment(n.committee, ec); err != nil {
n.logger.Debug("ignoring bad observed executor commitment",
n.logger.Debug("ignoring bad observed executor commitment, insertion failed",
"err", err,
"node_id", ec.NodeID,
)
Expand All @@ -75,7 +102,8 @@ func (n *Node) handleObservedExecutorCommitment(ctx context.Context, ec *commitm
n.logger.Warn("observed commitments indicate discrepancy")

n.NotifyDiscrepancy(&discrepancyEvent{
rank: n.commitPool.HighestRank,
height: uint64(n.blockInfo.ConsensusBlock.Height),
rank: n.commitPool.HighestRank,
height: uint64(n.blockInfo.ConsensusBlock.Height),
authoritative: false,
})
}
5 changes: 3 additions & 2 deletions go/worker/compute/executor/committee/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ func (n *Node) HandleNewEventLocked(ev *roothash.Event) {
switch {
case ev.ExecutionDiscrepancyDetected != nil:
n.NotifyDiscrepancy(&discrepancyEvent{
rank: ev.ExecutionDiscrepancyDetected.Rank,
height: uint64(ev.Height),
rank: ev.ExecutionDiscrepancyDetected.Rank,
height: uint64(ev.Height),
authoritative: true,
})
case ev.ExecutorCommitted != nil:
n.NotifySchedulerCommitment(&ev.ExecutorCommitted.Commit)
Expand Down
19 changes: 9 additions & 10 deletions go/worker/compute/executor/committee/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type Node struct { // nolint: maligned
blockInfo *runtime.BlockInfo
rtState *roothash.RuntimeState
roundResults *roothash.RoundResults
discrepancy *discrepancyEvent
submitted map[uint64]struct{}
rank uint64
proposedBatch *proposedBatch
Expand Down Expand Up @@ -1478,19 +1479,17 @@ func (n *Node) roundWorker(ctx context.Context, bi *runtime.BlockInfo) {
schedulerRankTicker.Start()
defer schedulerRankTicker.Stop()

// Remember if discrepancy was detected.
var discrepancy *discrepancyEvent
// Reset discrepancy detection.
n.discrepancy = nil
n.commitPool = commitment.NewPool()

// Reset submitted proposals/commitments.
n.submitted = make(map[uint64]struct{})

// Reset commitment pool for early discrepancy detection.
n.commitPool = commitment.NewPool()

// Main loop.
for {
// Update state, propose or schedule.
switch discrepancy {
switch n.discrepancy {
case nil:
limit := min(schedulerRank, poolRank, n.rank)
proposal, rank, ok := n.proposals.Best(round, 0, limit, n.submitted)
Expand All @@ -1505,15 +1504,15 @@ func (n *Node) roundWorker(ctx context.Context, bi *runtime.BlockInfo) {
n.scheduleBatch(ctx, round, flush)
}
default:
n.updateState(ctx, discrepancy.rank, discrepancy.rank, true)
n.updateState(ctx, n.discrepancy.rank, n.discrepancy.rank, true)

limit := discrepancy.rank
limit := n.discrepancy.rank
proposal, rank, ok := n.proposals.Best(round, limit, limit, n.submitted)
switch {
case ok:
// Try to process the discrepant proposal.
n.processProposal(ctx, proposal, rank, true)
case n.rank == discrepancy.rank:
case n.rank == n.discrepancy.rank:
// Try to schedule a batch.
n.scheduleBatch(ctx, round, true)
}
Expand All @@ -1527,7 +1526,7 @@ func (n *Node) roundWorker(ctx context.Context, bi *runtime.BlockInfo) {
case txs := <-n.txCh:
// Check any queued transactions.
n.handleNewCheckedTransactions(txs)
case discrepancy = <-n.discrepancyCh:
case discrepancy := <-n.discrepancyCh:
// Discrepancy has been detected.
n.handleDiscrepancy(ctx, discrepancy)
case ec := <-n.ecCh:
Expand Down

0 comments on commit 88ff0dc

Please sign in to comment.