Skip to content

Commit

Permalink
go/worker/compute/executor/committee: Support backup proposers
Browse files Browse the repository at this point in the history
Starting now, all executor committee workers are permitted to schedule
transactions, each with distinct per-round priority. Priority dictates
the time after which a worker can propose a new batch. The consensus
layer tracks all published executor commitments and tries to build
a new runtime block on a proposal with the highest priority.
  • Loading branch information
peternose committed Aug 24, 2023
1 parent cee0595 commit f9112a0
Show file tree
Hide file tree
Showing 51 changed files with 1,580 additions and 1,271 deletions.
7 changes: 7 additions & 0 deletions .changelog/5354.breaking.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
go/worker/compute/executor/committee: Support backup proposers

Starting now, all executor committee workers are permitted to schedule
transactions, each with distinct per-round priority. Priority dictates
the time after which a worker can propose a new batch. The consensus
layer tracks all published executor commitments and tries to build
a new runtime block on a proposal with the highest priority.
2 changes: 1 addition & 1 deletion go/consensus/cometbft/apps/registry/transactions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@ func TestRegisterRuntime(t *testing.T) {
BatchFlushTimeout: 100_000_000,
MaxBatchSize: 100,
MaxBatchSizeBytes: 100_000_000,
ProposerTimeout: 2,
SchedulerTimeout: 2,
},
Deployments: []*registry.VersionInfo{
{
Expand Down
5 changes: 2 additions & 3 deletions go/consensus/cometbft/apps/roothash/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
beacon "github.com/oasisprotocol/oasis-core/go/beacon/api"
"github.com/oasisprotocol/oasis-core/go/common/cbor"
"github.com/oasisprotocol/oasis-core/go/common/errors"
"github.com/oasisprotocol/oasis-core/go/consensus/cometbft/api"
tmapi "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/api"
registryState "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/apps/registry/state"
roothashApi "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/apps/roothash/api"
Expand Down Expand Up @@ -82,7 +81,7 @@ func (app *rootHashApplication) processRuntimeMessages(
return events, nil
}

func (app *rootHashApplication) doBeforeSchedule(ctx *api.Context, msg interface{}) (interface{}, error) {
func (app *rootHashApplication) doBeforeSchedule(ctx *tmapi.Context, msg interface{}) (interface{}, error) {
epoch := msg.(beacon.EpochTime)

ctx.Logger().Debug("processing liveness statistics before scheduling",
Expand All @@ -109,7 +108,7 @@ func (app *rootHashApplication) doBeforeSchedule(ctx *api.Context, msg interface
return nil, nil
}

func (app *rootHashApplication) changeParameters(ctx *api.Context, msg interface{}, apply bool) (interface{}, error) {
func (app *rootHashApplication) changeParameters(ctx *tmapi.Context, msg interface{}, apply bool) (interface{}, error) {
// Unmarshal changes and check if they should be applied to this module.
proposal, ok := msg.(*governance.ChangeParametersProposal)
if !ok {
Expand Down
27 changes: 23 additions & 4 deletions go/consensus/cometbft/apps/roothash/roothash.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,11 @@ func (app *rootHashApplication) prepareNewCommittees(
func (app *rootHashApplication) emitEmptyBlock(ctx *tmapi.Context, runtime *roothash.RuntimeState, hdrType block.HeaderType) error {
blk := block.NewEmptyBlock(runtime.CurrentBlock, uint64(ctx.Now().Unix()), hdrType)

ctx.Logger().Debug("emitting empty block",
"round", blk.Header.Round,
"type", blk.Header.HeaderType,
)

runtime.CurrentBlock = blk
runtime.CurrentBlockHeight = ctx.BlockHeight() + 1 // Current height is ctx.BlockHeight() + 1
// Do not update LastNormal{Round,Height} as empty blocks are not emitted by the runtime.
Expand Down Expand Up @@ -541,7 +546,7 @@ func (app *rootHashApplication) tryFinalizeExecutorCommits( //nolint: gocyclo
pool := rtState.ExecutorPool

// Remember the index of the transaction scheduler within the committee.
schedulerIdx, err := pool.Committee.TransactionSchedulerIdx(pool.Round)
schedulerIdx, err := pool.Committee.SchedulerIdx(pool.Round)
if err != nil {
return err
}
Expand All @@ -552,16 +557,22 @@ func (app *rootHashApplication) tryFinalizeExecutorCommits( //nolint: gocyclo
}
livenessStats := rtState.LivenessStatistics

// Compute the priority of the pool scheduler.
// If the pool is empty, the priority will be set to infinity.
poolSchedulerID, _ := pool.Scheduler()
priority := pool.Committee.SchedulingPriority(pool.Round, poolSchedulerID)

commit, err := pool.TryFinalize(ctx.BlockHeight(), runtime.Executor.RoundTimeout, forced, true)
if err == commitment.ErrDiscrepancyDetected {
ctx.Logger().Warn("executor discrepancy detected",
"round", round,
"priority", priority,
logging.LogEvent, roothash.LogEventExecutionDiscrepancyDetected,
)

ctx.EmitEvent(
tmapi.NewEventBuilder(app.Name()).
TypedAttribute(&roothash.ExecutionDiscrepancyDetectedEvent{Timeout: forced}).
TypedAttribute(&roothash.ExecutionDiscrepancyDetectedEvent{Priority: priority, Timeout: forced}).
TypedAttribute(&roothash.RuntimeIDAttribute{ID: runtime.ID}),
)

Expand All @@ -577,11 +588,19 @@ func (app *rootHashApplication) tryFinalizeExecutorCommits( //nolint: gocyclo
// Round has been finalized.
ctx.Logger().Debug("finalized round",
"round", round,
"priority", priority,
)

// Record that the round was finalized and that the scheduler received enough commitments.
livenessStats.TotalRounds++
livenessStats.FinalizedProposals[schedulerIdx]++

// Record if the transaction scheduler received enough commitments.
schedulerID := pool.Committee.Members[schedulerIdx].PublicKey
switch schedulerID.Equal(poolSchedulerID) {
case true:
livenessStats.FinalizedProposals[schedulerIdx]++
case false:
livenessStats.MissedProposals[schedulerIdx]++
}

ec := commit.ToDDResult().(*commitment.ExecutorCommitment)

Expand Down
6 changes: 3 additions & 3 deletions go/consensus/cometbft/apps/roothash/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (app *rootHashApplication) executorProposerTimeout(
}

// Ensure enough blocks have passed since round start.
proposerTimeout := rtState.Runtime.TxnScheduler.ProposerTimeout
proposerTimeout := rtState.Runtime.TxnScheduler.SchedulerTimeout
currentBlockHeight := rtState.CurrentBlockHeight
if height := ctx.BlockHeight(); height < currentBlockHeight+proposerTimeout {
ctx.Logger().Debug("failed requesting proposer round timeout, timeout not allowed yet",
Expand All @@ -86,7 +86,7 @@ func (app *rootHashApplication) executorProposerTimeout(
}

// Ensure request is valid.
if err = rtState.ExecutorPool.CheckProposerTimeout(ctx, rtState.CurrentBlock, nl, ctx.TxSigner(), rpt.Round); err != nil {
if err = rtState.ExecutorPool.CheckSchedulerTimeout(ctx, rtState.CurrentBlock, nl, ctx.TxSigner(), rpt.Round); err != nil {
ctx.Logger().Debug("failed requesting proposer round timeout",
"err", err,
"round", rtState.CurrentBlock.Header.Round,
Expand All @@ -96,7 +96,7 @@ func (app *rootHashApplication) executorProposerTimeout(
}

// Record that the scheduler did not propose.
schedulerIdx, err := rtState.ExecutorPool.Committee.TransactionSchedulerIdx(rpt.Round)
schedulerIdx, err := rtState.ExecutorPool.Committee.SchedulerIdx(rpt.Round)
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions go/consensus/cometbft/apps/roothash/transactions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
schedulerState "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/apps/scheduler/state"
stakingState "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/apps/staking/state"
genesisTestHelpers "github.com/oasisprotocol/oasis-core/go/genesis/tests"
"github.com/oasisprotocol/oasis-core/go/governance/api"
governance "github.com/oasisprotocol/oasis-core/go/governance/api"
registry "github.com/oasisprotocol/oasis-core/go/registry/api"
roothash "github.com/oasisprotocol/oasis-core/go/roothash/api"
Expand Down Expand Up @@ -197,9 +196,9 @@ func TestMessagesGasEstimation(t *testing.T) {
// Each update_runtime message costs 3000 gas.
{Registry: &message.RegistryMessage{UpdateRuntime: &registry.Runtime{}}},
// Each cast vote message costs 1000 gas.
{Governance: &message.GovernanceMessage{CastVote: &api.ProposalVote{}}},
{Governance: &message.GovernanceMessage{CastVote: &governance.ProposalVote{}}},
// Each submit proposal message costs 2000 gas.
{Governance: &message.GovernanceMessage{SubmitProposal: &api.ProposalContent{}}},
{Governance: &message.GovernanceMessage{SubmitProposal: &governance.ProposalContent{}}},
}
msgsHash := message.MessagesHash(msgs)

Expand All @@ -209,6 +208,7 @@ func TestMessagesGasEstimation(t *testing.T) {
ec := commitment.ExecutorCommitment{
NodeID: sk.Public(),
Header: commitment.ExecutorCommitmentHeader{
SchedulerID: sk.Public(),
Header: commitment.ComputeResultsHeader{
Round: newBlk.Header.Round,
PreviousHash: newBlk.Header.PreviousHash,
Expand Down
7 changes: 3 additions & 4 deletions go/consensus/cometbft/apps/scheduler/debug_force.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,13 @@ func (app *schedulerApplication) debugForceRoles(
mayBeAny []*scheduler.CommitteeNode
)

for i, n := range elected {
committeeNode := elected[i]
if ri, ok := state.params[n.PublicKey]; ok {
for _, committeeNode := range elected {
if ri, ok := state.params[committeeNode.PublicKey]; ok {
if ri.IsScheduler {
if mustBeScheduler != nil {
ctx.Logger().Error("already have a forced scheduler",
"existing", mustBeScheduler.PublicKey,
"new", n.PublicKey,
"new", committeeNode.PublicKey,
)
return false, nil
}
Expand Down
2 changes: 1 addition & 1 deletion go/genesis/genesis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func TestGenesisSanityCheck(t *testing.T) {
BatchFlushTimeout: 1 * time.Second,
MaxBatchSize: 1,
MaxBatchSizeBytes: 1024,
ProposerTimeout: 20,
SchedulerTimeout: 20,
},
AdmissionPolicy: registry.RuntimeAdmissionPolicy{
AnyNode: &registry.AnyNodeRuntimeAdmissionPolicy{},
Expand Down
2 changes: 1 addition & 1 deletion go/oasis-net-runner/fixtures/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func newDefaultFixture() (*oasis.NetworkFixture, error) {
MaxBatchSize: 1000,
MaxBatchSizeBytes: 16 * 1024 * 1024, // 16 MiB
BatchFlushTimeout: 1 * time.Second,
ProposerTimeout: 20,
SchedulerTimeout: 20,
},
AdmissionPolicy: registry.RuntimeAdmissionPolicy{
AnyNode: &registry.AnyNodeRuntimeAdmissionPolicy{},
Expand Down
2 changes: 1 addition & 1 deletion go/oasis-node/cmd/control/runtime_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ func doRuntimeStats(cmd *cobra.Command, args []string) { //nolint:gocyclo
}
// Set committee info.
currentCommittee = state.ExecutorPool.Committee
currentScheduler, err = currentCommittee.TransactionScheduler(currentRound)
currentScheduler, err = currentCommittee.Scheduler(currentRound)
if err != nil {
logger.Error("failed to query transaction scheduler",
"err", err,
Expand Down
26 changes: 16 additions & 10 deletions go/oasis-node/cmd/debug/byzantine/byzantine.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/spf13/viper"

"github.com/oasisprotocol/oasis-core/go/common"
"github.com/oasisprotocol/oasis-core/go/common/crypto/signature"
"github.com/oasisprotocol/oasis-core/go/common/logging"
"github.com/oasisprotocol/oasis-core/go/common/node"
"github.com/oasisprotocol/oasis-core/go/common/sgx/ias"
Expand Down Expand Up @@ -154,31 +155,36 @@ func doExecutorScenario(cmd *cobra.Command, args []string) { //nolint: gocyclo
return
}

// Get the latest roothash block.
var blk *block.Block
blk, err = getRoothashLatestBlock(ctx, b.cometbft.service, runtimeID)
if err != nil {
panic(fmt.Errorf("failed getting latest roothash block: %w", err))
}

var schedulerID signature.PublicKey
cbc := newComputeBatchContext(b.chainContext, runtimeID)
switch isTxScheduler {
case true:
// If we are the transaction scheduler, we wait for transactions and schedule them.
var cont bool
cont, err = b.receiveAndScheduleTransactions(ctx, cbc, executorMode)
cont, err = b.receiveAndScheduleTransactions(ctx, cbc, blk, executorMode)
if err != nil {
panic(fmt.Sprintf("comptue transaction scheduling failed: %+v", err))
panic(fmt.Sprintf("compute transaction scheduling failed: %+v", err))
}
if !cont {
return
}

schedulerID = b.identity.NodeSigner.Public()
case false:
// If we are not the scheduler, receive transactions and the proposal.
if err = cbc.receiveProposal(b.p2p); err != nil {
panic(fmt.Sprintf("compute receive proposal failed: %+v", err))
}
logger.Debug("executor: received proposal", "proposal", cbc.proposal)
}

// Get latest roothash block.
var blk *block.Block
blk, err = getRoothashLatestBlock(ctx, b.cometbft.service, runtimeID)
if err != nil {
panic(fmt.Errorf("failed getting latest roothash block: %w", err))
schedulerID = cbc.proposal.NodeID
}

if err = cbc.openTrees(ctx, blk, b.storageClient); err != nil {
Expand Down Expand Up @@ -256,11 +262,11 @@ func doExecutorScenario(cmd *cobra.Command, args []string) { //nolint: gocyclo

switch executorMode {
case ModeExecutorFailureIndicating:
if err = cbc.createCommitment(b.identity, b.rak, commitment.FailureUnknown); err != nil {
if err = cbc.createCommitment(b.identity, schedulerID, b.rak, commitment.FailureUnknown); err != nil {
panic(fmt.Sprintf("compute create commitment failed: %+v", err))
}
default:
if err = cbc.createCommitment(b.identity, b.rak, commitment.FailureNone); err != nil {
if err = cbc.createCommitment(b.identity, schedulerID, b.rak, commitment.FailureNone); err != nil {
panic(fmt.Sprintf("compute create commitment failed: %+v", err))
}

Expand Down
4 changes: 3 additions & 1 deletion go/oasis-node/cmd/debug/byzantine/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ func (cbc *computeBatchContext) commitTrees(ctx context.Context) error {

func (cbc *computeBatchContext) createCommitment(
id *identity.Identity,
schedulerID signature.PublicKey,
rak signature.Signer,
failure commitment.ExecutorCommitmentFailure,
) error {
Expand All @@ -320,7 +321,8 @@ func (cbc *computeBatchContext) createCommitment(
ec := &commitment.ExecutorCommitment{
NodeID: id.NodeSigner.Public(),
Header: commitment.ExecutorCommitmentHeader{
Header: header,
SchedulerID: schedulerID,
Header: header,
},
}
if rak != nil {
Expand Down
21 changes: 8 additions & 13 deletions go/oasis-node/cmd/debug/byzantine/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,35 +58,30 @@ func (b *byzantine) stop() error {
return nil
}

func (b *byzantine) receiveAndScheduleTransactions(ctx context.Context, cbc *computeBatchContext, mode ExecutorMode) (bool, error) {
func (b *byzantine) receiveAndScheduleTransactions(ctx context.Context, cbc *computeBatchContext, block *block.Block, mode ExecutorMode) (bool, error) {
// Receive transactions.
txs := cbc.receiveTransactions(b.p2p, time.Second)
logger.Debug("executor: received transactions", "transactions", txs)
// Get latest roothash block.
var block *block.Block
block, err := getRoothashLatestBlock(ctx, b.cometbft.service, b.runtimeID)
if err != nil {
return false, fmt.Errorf("failed getting latest roothash block: %w", err)
}

// Include transactions that nobody else has when configured to do so.
if viper.GetBool(CfgExecutorProposeBogusTx) {
logger.Debug("executor scheduler: including bogus transactions")
txs = append(txs, []byte("this is a bogus transction nr. 1"))
txs = append(txs, []byte("this is a bogus transition nr. 1"))
}

// Prepare proposal.
if err = cbc.prepareProposal(ctx, block, txs, b.identity); err != nil {
if err := cbc.prepareProposal(ctx, block, txs, b.identity); err != nil {
panic(fmt.Sprintf("executor proposing batch: %+v", err))
}

if mode == ModeExecutorFailureIndicating {
// Submit failure indicating commitment and stop.
logger.Debug("executor failure indicating: submitting commitment and stopping")
if err = cbc.createCommitment(b.identity, nil, commitment.FailureUnknown); err != nil {
schedulerID := b.identity.NodeSigner.Public()
if err := cbc.createCommitment(b.identity, schedulerID, nil, commitment.FailureUnknown); err != nil {
panic(fmt.Sprintf("compute create failure indicating commitment failed: %+v", err))
}
if err = cbc.publishToChain(b.cometbft.service, b.identity); err != nil {
if err := cbc.publishToChain(b.cometbft.service, b.identity); err != nil {
panic(fmt.Sprintf("compute publish to chain failed: %+v", err))
}
return false, nil
Expand Down Expand Up @@ -227,8 +222,8 @@ func initializeAndRegisterByzantineNode(
b.logger.Debug("executor schedule ok")

// Ensure we have the expected executor transaction scheduler role.
isTxScheduler := schedulerCheckTxScheduler(b.executorCommittee, b.identity.NodeSigner.Public(), 0)
if shouldBeExecutorProposer != isTxScheduler {
isScheduler := schedulerCheckScheduler(b.executorCommittee, b.identity.NodeSigner.Public(), 0)
if shouldBeExecutorProposer != isScheduler {
return nil, fmt.Errorf("not in expected executor transaction scheduler role")
}
b.logger.Debug("executor tx scheduler role ok")
Expand Down
4 changes: 2 additions & 2 deletions go/oasis-node/cmd/debug/byzantine/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ func schedulerCheckScheduled(committee *scheduler.Committee, nodeID signature.Pu
return fmt.Errorf("we're not scheduled")
}

func schedulerCheckTxScheduler(committee *scheduler.Committee, nodeID signature.PublicKey, round uint64) bool {
scheduler, err := committee.TransactionScheduler(round)
func schedulerCheckScheduler(committee *scheduler.Committee, nodeID signature.PublicKey, round uint64) bool {
scheduler, err := committee.Scheduler(round)
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion go/oasis-node/cmd/debug/txsource/workload/registration.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func getRuntime(entityID signature.PublicKey, id common.Namespace, epoch beacon.
BatchFlushTimeout: 1 * time.Second,
MaxBatchSize: 1,
MaxBatchSizeBytes: 1024,
ProposerTimeout: 5,
SchedulerTimeout: 5,
},
AdmissionPolicy: registry.RuntimeAdmissionPolicy{
AnyNode: &registry.AnyNodeRuntimeAdmissionPolicy{},
Expand Down
2 changes: 1 addition & 1 deletion go/oasis-node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ var (
MaxBatchSize: 1,
MaxBatchSizeBytes: 1024,
BatchFlushTimeout: 20 * time.Second,
ProposerTimeout: 20,
SchedulerTimeout: 20,
},
AdmissionPolicy: registry.RuntimeAdmissionPolicy{
AnyNode: &registry.AnyNodeRuntimeAdmissionPolicy{},
Expand Down
2 changes: 1 addition & 1 deletion go/oasis-test-runner/scenario/e2e/registry_cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,7 @@ func (sc *registryCLIImpl) testRuntime(ctx context.Context, childEnv *env.Env, c
BatchFlushTimeout: 11 * time.Second,
MaxBatchSize: 12,
MaxBatchSizeBytes: 1024,
ProposerTimeout: 5,
SchedulerTimeout: 5,
},
AdmissionPolicy: registry.RuntimeAdmissionPolicy{
EntityWhitelist: &registry.EntityWhitelistRuntimeAdmissionPolicy{
Expand Down
Loading

0 comments on commit f9112a0

Please sign in to comment.