Skip to content

Commit

Permalink
go/worker/compute/executor/committee: Support backup proposers
Browse files Browse the repository at this point in the history
Starting now, all executor committee workers are permitted to schedule
transactions, each with distinct per-round priority. Priority dictates
the time after which a worker can propose a new batch. The consensus
layer tracks all published executor commitments and tries to build
a new runtime block on a proposal with the highest priority.
  • Loading branch information
peternose committed Aug 23, 2023
1 parent d352667 commit 14eb14f
Show file tree
Hide file tree
Showing 35 changed files with 1,504 additions and 1,216 deletions.
7 changes: 7 additions & 0 deletions .changelog/5354.breaking.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
go/worker/compute/executor/committee: Support backup proposers

Starting now, all executor committee workers are permitted to schedule
transactions, each with distinct per-round priority. Priority dictates
the time after which a worker can propose a new batch. The consensus
layer tracks all published executor commitments and tries to build
a new runtime block on a proposal with the highest priority.
14 changes: 12 additions & 2 deletions go/consensus/cometbft/apps/roothash/roothash.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,11 @@ func (app *rootHashApplication) prepareNewCommittees(
func (app *rootHashApplication) emitEmptyBlock(ctx *tmapi.Context, runtime *roothash.RuntimeState, hdrType block.HeaderType) error {
blk := block.NewEmptyBlock(runtime.CurrentBlock, uint64(ctx.Now().Unix()), hdrType)

ctx.Logger().Debug("emitting empty block",
"round", blk.Header.Round,
"type", blk.Header.HeaderType,
)

runtime.CurrentBlock = blk
runtime.CurrentBlockHeight = ctx.BlockHeight() + 1 // Current height is ctx.BlockHeight() + 1
// Do not update LastNormal{Round,Height} as empty blocks are not emitted by the runtime.
Expand Down Expand Up @@ -541,7 +546,7 @@ func (app *rootHashApplication) tryFinalizeExecutorCommits( //nolint: gocyclo
pool := rtState.ExecutorPool

// Remember the index of the transaction scheduler within the committee.
schedulerIdx, err := pool.Committee.TransactionSchedulerIdx(pool.Round)
schedulerIdx, err := pool.Committee.PrimarySchedulerIdx(pool.Round)
if err != nil {
return err
}
Expand All @@ -552,6 +557,10 @@ func (app *rootHashApplication) tryFinalizeExecutorCommits( //nolint: gocyclo
}
livenessStats := rtState.LivenessStatistics

// Compute the proposer priority. If the pool is empty, the priority will be set to infinity.
proposerID, _ := pool.Proposer()
priority := pool.Committee.SchedulingPriority(pool.Round, proposerID)

commit, err := pool.TryFinalize(ctx.BlockHeight(), runtime.Executor.RoundTimeout, forced, true)
if err == commitment.ErrDiscrepancyDetected {
ctx.Logger().Warn("executor discrepancy detected",
Expand All @@ -561,7 +570,7 @@ func (app *rootHashApplication) tryFinalizeExecutorCommits( //nolint: gocyclo

ctx.EmitEvent(
tmapi.NewEventBuilder(app.Name()).
TypedAttribute(&roothash.ExecutionDiscrepancyDetectedEvent{Timeout: forced}).
TypedAttribute(&roothash.ExecutionDiscrepancyDetectedEvent{Priority: priority, Timeout: forced}).
TypedAttribute(&roothash.RuntimeIDAttribute{ID: runtime.ID}),
)

Expand All @@ -577,6 +586,7 @@ func (app *rootHashApplication) tryFinalizeExecutorCommits( //nolint: gocyclo
// Round has been finalized.
ctx.Logger().Debug("finalized round",
"round", round,
"priority", priority,
)

// Record that the round was finalized and that the scheduler received enough commitments.
Expand Down
2 changes: 1 addition & 1 deletion go/consensus/cometbft/apps/roothash/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (app *rootHashApplication) executorProposerTimeout(
}

// Record that the scheduler did not propose.
schedulerIdx, err := rtState.ExecutorPool.Committee.TransactionSchedulerIdx(rpt.Round)
schedulerIdx, err := rtState.ExecutorPool.Committee.PrimarySchedulerIdx(rpt.Round)
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion go/consensus/cometbft/apps/roothash/transactions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,8 @@ func TestMessagesGasEstimation(t *testing.T) {
emptyHash.Empty()

ec := commitment.ExecutorCommitment{
NodeID: sk.Public(),
ProposerID: sk.Public(),
NodeID: sk.Public(),
Header: commitment.ExecutorCommitmentHeader{
Header: commitment.ComputeResultsHeader{
Round: newBlk.Header.Round,
Expand Down
7 changes: 3 additions & 4 deletions go/consensus/cometbft/apps/scheduler/debug_force.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,13 @@ func (app *schedulerApplication) debugForceRoles(
mayBeAny []*scheduler.CommitteeNode
)

for i, n := range elected {
committeeNode := elected[i]
if ri, ok := state.params[n.PublicKey]; ok {
for _, committeeNode := range elected {
if ri, ok := state.params[committeeNode.PublicKey]; ok {
if ri.IsScheduler {
if mustBeScheduler != nil {
ctx.Logger().Error("already have a forced scheduler",
"existing", mustBeScheduler.PublicKey,
"new", n.PublicKey,
"new", committeeNode.PublicKey,
)
return false, nil
}
Expand Down
2 changes: 1 addition & 1 deletion go/oasis-node/cmd/control/runtime_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ func doRuntimeStats(cmd *cobra.Command, args []string) { //nolint:gocyclo
}
// Set committee info.
currentCommittee = state.ExecutorPool.Committee
currentScheduler, err = currentCommittee.TransactionScheduler(currentRound)
currentScheduler, err = currentCommittee.PrimaryScheduler(currentRound)
if err != nil {
logger.Error("failed to query transaction scheduler",
"err", err,
Expand Down
26 changes: 16 additions & 10 deletions go/oasis-node/cmd/debug/byzantine/byzantine.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/spf13/viper"

"github.com/oasisprotocol/oasis-core/go/common"
"github.com/oasisprotocol/oasis-core/go/common/crypto/signature"
"github.com/oasisprotocol/oasis-core/go/common/logging"
"github.com/oasisprotocol/oasis-core/go/common/node"
"github.com/oasisprotocol/oasis-core/go/common/sgx/ias"
Expand Down Expand Up @@ -154,31 +155,36 @@ func doExecutorScenario(cmd *cobra.Command, args []string) { //nolint: gocyclo
return
}

// Get the latest roothash block.
var blk *block.Block
blk, err = getRoothashLatestBlock(ctx, b.cometbft.service, runtimeID)
if err != nil {
panic(fmt.Errorf("failed getting latest roothash block: %w", err))
}

var proposerID signature.PublicKey
cbc := newComputeBatchContext(b.chainContext, runtimeID)
switch isTxScheduler {
case true:
// If we are the transaction scheduler, we wait for transactions and schedule them.
var cont bool
cont, err = b.receiveAndScheduleTransactions(ctx, cbc, executorMode)
cont, err = b.receiveAndScheduleTransactions(ctx, cbc, blk, executorMode)
if err != nil {
panic(fmt.Sprintf("comptue transaction scheduling failed: %+v", err))
panic(fmt.Sprintf("compute transaction scheduling failed: %+v", err))
}
if !cont {
return
}

proposerID = b.identity.NodeSigner.Public()
case false:
// If we are not the scheduler, receive transactions and the proposal.
if err = cbc.receiveProposal(b.p2p); err != nil {
panic(fmt.Sprintf("compute receive proposal failed: %+v", err))
}
logger.Debug("executor: received proposal", "proposal", cbc.proposal)
}

// Get latest roothash block.
var blk *block.Block
blk, err = getRoothashLatestBlock(ctx, b.cometbft.service, runtimeID)
if err != nil {
panic(fmt.Errorf("failed getting latest roothash block: %w", err))
proposerID = cbc.proposal.NodeID
}

if err = cbc.openTrees(ctx, blk, b.storageClient); err != nil {
Expand Down Expand Up @@ -256,11 +262,11 @@ func doExecutorScenario(cmd *cobra.Command, args []string) { //nolint: gocyclo

switch executorMode {
case ModeExecutorFailureIndicating:
if err = cbc.createCommitment(b.identity, b.rak, commitment.FailureUnknown); err != nil {
if err = cbc.createCommitment(b.identity, proposerID, b.rak, commitment.FailureUnknown); err != nil {
panic(fmt.Sprintf("compute create commitment failed: %+v", err))
}
default:
if err = cbc.createCommitment(b.identity, b.rak, commitment.FailureNone); err != nil {
if err = cbc.createCommitment(b.identity, proposerID, b.rak, commitment.FailureNone); err != nil {
panic(fmt.Sprintf("compute create commitment failed: %+v", err))
}

Expand Down
12 changes: 8 additions & 4 deletions go/oasis-node/cmd/debug/byzantine/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,14 @@ ReceiveProposal:
}

func (cbc *computeBatchContext) openTrees(ctx context.Context, blk *block.Block, rs syncer.ReadSyncer) error {
cbc.ioTree = transaction.NewTree(nil, storage.Root{
emptyRoot := storage.Root{
Namespace: cbc.runtimeID,
Version: cbc.proposal.Header.Round,
Type: storage.RootTypeIO,
Hash: cbc.proposal.Header.BatchHash,
})
}
emptyRoot.Hash.Empty()

cbc.ioTree = transaction.NewTree(nil, emptyRoot)

// Add all transactions to the I/O tree.
for _, tx := range cbc.txs {
Expand Down Expand Up @@ -300,6 +302,7 @@ func (cbc *computeBatchContext) commitTrees(ctx context.Context) error {

func (cbc *computeBatchContext) createCommitment(
id *identity.Identity,
proposerID signature.PublicKey,
rak signature.Signer,
failure commitment.ExecutorCommitmentFailure,
) error {
Expand All @@ -316,7 +319,8 @@ func (cbc *computeBatchContext) createCommitment(
InMessagesCount: 0,
}
ec := &commitment.ExecutorCommitment{
NodeID: id.NodeSigner.Public(),
ProposerID: proposerID,
NodeID: id.NodeSigner.Public(),
Header: commitment.ExecutorCommitmentHeader{
Header: header,
},
Expand Down
19 changes: 7 additions & 12 deletions go/oasis-node/cmd/debug/byzantine/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,35 +58,30 @@ func (b *byzantine) stop() error {
return nil
}

func (b *byzantine) receiveAndScheduleTransactions(ctx context.Context, cbc *computeBatchContext, mode ExecutorMode) (bool, error) {
func (b *byzantine) receiveAndScheduleTransactions(ctx context.Context, cbc *computeBatchContext, block *block.Block, mode ExecutorMode) (bool, error) {
// Receive transactions.
txs := cbc.receiveTransactions(b.p2p, time.Second)
logger.Debug("executor: received transactions", "transactions", txs)
// Get latest roothash block.
var block *block.Block
block, err := getRoothashLatestBlock(ctx, b.cometbft.service, b.runtimeID)
if err != nil {
return false, fmt.Errorf("failed getting latest roothash block: %w", err)
}

// Include transactions that nobody else has when configured to do so.
if viper.GetBool(CfgExecutorProposeBogusTx) {
logger.Debug("executor scheduler: including bogus transactions")
txs = append(txs, []byte("this is a bogus transction nr. 1"))
txs = append(txs, []byte("this is a bogus transition nr. 1"))
}

// Prepare proposal.
if err = cbc.prepareProposal(ctx, block, txs, b.identity); err != nil {
if err := cbc.prepareProposal(ctx, block, txs, b.identity); err != nil {
panic(fmt.Sprintf("executor proposing batch: %+v", err))
}

if mode == ModeExecutorFailureIndicating {
// Submit failure indicating commitment and stop.
logger.Debug("executor failure indicating: submitting commitment and stopping")
if err = cbc.createCommitment(b.identity, nil, commitment.FailureUnknown); err != nil {
proposerID := b.identity.NodeSigner.Public()
if err := cbc.createCommitment(b.identity, proposerID, nil, commitment.FailureUnknown); err != nil {
panic(fmt.Sprintf("compute create failure indicating commitment failed: %+v", err))
}
if err = cbc.publishToChain(b.cometbft.service, b.identity); err != nil {
if err := cbc.publishToChain(b.cometbft.service, b.identity); err != nil {
panic(fmt.Sprintf("compute publish to chain failed: %+v", err))
}
return false, nil
Expand Down Expand Up @@ -227,7 +222,7 @@ func initializeAndRegisterByzantineNode(
b.logger.Debug("executor schedule ok")

// Ensure we have the expected executor transaction scheduler role.
isTxScheduler := schedulerCheckTxScheduler(b.executorCommittee, b.identity.NodeSigner.Public(), 0)
isTxScheduler := schedulerCheckPrimaryScheduler(b.executorCommittee, b.identity.NodeSigner.Public(), 0)
if shouldBeExecutorProposer != isTxScheduler {
return nil, fmt.Errorf("not in expected executor transaction scheduler role")
}
Expand Down
4 changes: 2 additions & 2 deletions go/oasis-node/cmd/debug/byzantine/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ func schedulerCheckScheduled(committee *scheduler.Committee, nodeID signature.Pu
return fmt.Errorf("we're not scheduled")
}

func schedulerCheckTxScheduler(committee *scheduler.Committee, nodeID signature.PublicKey, round uint64) bool {
scheduler, err := committee.TransactionScheduler(round)
func schedulerCheckPrimaryScheduler(committee *scheduler.Committee, nodeID signature.PublicKey, round uint64) bool {
scheduler, err := committee.PrimaryScheduler(round)
if err != nil {
panic(err)
}
Expand Down
8 changes: 5 additions & 3 deletions go/oasis-test-runner/scenario/e2e/runtime/byzantine.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,11 @@ var (
"executor-scheduler-straggler",
"executor",
[]log.WatcherHandlerFactory{
// Scheduler straggler should trigger round failure in first round (proposer timeout).
// In round two timeout and discrepancy detection should be triggered.
oasis.LogAssertRoundFailures(),
// The scheduler straggler should not trigger a round failure in the third round,
// as the backup proposer should propose before other nodes trigger the timeout.
// However, the consensus timeout and discrepancy detection should be triggered
// since we don't allow any stragglers.
oasis.LogAssertNoRoundFailures(),
oasis.LogAssertTimeouts(),
oasis.LogAssertExecutionDiscrepancyDetected(),
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (sc *Scenario) initialEpochTransitionsWith(ctx context.Context, fixture *oa
}
}
if !sc.debugNoRandomInitialEpoch {
// To prevent people from writing tests that depend on very precicse
// To prevent people from writing tests that depend on very precise
// timekeeping by epoch, randomize the start epoch slightly.
//
// If this causes your test to fail, it is not this code that is
Expand Down
2 changes: 1 addition & 1 deletion go/p2p/api/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func PublicKeyToPeerID(pk signature.PublicKey) (core.PeerID, error) {
}

// PublicKeyMapToPeerIDs converts a map of public keys to a list of peer identifiers.
func PublicKeyMapToPeerIDs(pks map[signature.PublicKey]bool) ([]core.PeerID, error) {
func PublicKeyMapToPeerIDs(pks map[signature.PublicKey]struct{}) ([]core.PeerID, error) {
ids := make([]core.PeerID, 0, len(pks))
for pk := range pks {
id, err := PublicKeyToPeerID(pk)
Expand Down
2 changes: 2 additions & 0 deletions go/roothash/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,8 @@ func (e *ExecutorCommittedEvent) EventKind() string {

// ExecutionDiscrepancyDetectedEvent is an execute discrepancy detected event.
type ExecutionDiscrepancyDetectedEvent struct {
// Priority is the priority of the transaction scheduler.
Priority uint64 `json:"priority"`
// Timeout signals whether the discrepancy was due to a timeout.
Timeout bool `json:"timeout"`
}
Expand Down
3 changes: 3 additions & 0 deletions go/roothash/api/commitment/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ func (eh *ExecutorCommitmentHeader) MostlyEqual(other *ExecutorCommitmentHeader)

// ExecutorCommitment is a commitment to results of processing a proposed runtime block.
type ExecutorCommitment struct {
// ProposerID is the public key of the node that generated the proposal.
ProposerID signature.PublicKey `json:"proposer_id"`

// NodeID is the public key of the node that generated this commitment.
NodeID signature.PublicKey `json:"node_id"`

Expand Down
Loading

0 comments on commit 14eb14f

Please sign in to comment.