Skip to content

Commit

Permalink
Merge branch 'feature/efm-recovery' into yurii/6785-dkg-index-map-bac…
Browse files Browse the repository at this point in the history
…kward-compatible
  • Loading branch information
durkmurder authored Dec 16, 2024
2 parents 83adfc0 + c7c7e5a commit 92ec89a
Show file tree
Hide file tree
Showing 49 changed files with 1,785 additions and 611 deletions.
84 changes: 40 additions & 44 deletions cmd/consensus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ import (
"github.com/onflow/flow-go/state/protocol/blocktimer"
"github.com/onflow/flow-go/state/protocol/events/gadgets"
protocol_state "github.com/onflow/flow-go/state/protocol/protocol_state/state"
"github.com/onflow/flow-go/storage"
bstorage "github.com/onflow/flow-go/storage/badger"
"github.com/onflow/flow-go/utils/io"
)
Expand Down Expand Up @@ -104,32 +103,31 @@ func main() {
insecureAccessAPI bool
accessNodeIDS []string

err error
mutableState protocol.ParticipantState
beaconPrivateKey *encodable.RandomBeaconPrivKey
guarantees mempool.Guarantees
receipts mempool.ExecutionTree
seals mempool.IncorporatedResultSeals
pendingReceipts mempool.PendingReceipts
receiptRequester *requester.Engine
syncCore *chainsync.Core
comp *compliance.Engine
hot module.HotStuff
conMetrics module.ConsensusMetrics
machineAccountMetrics module.MachineAccountMetrics
mainMetrics module.HotstuffMetrics
receiptValidator module.ReceiptValidator
chunkAssigner *chmodule.ChunkAssigner
followerDistributor *pubsub.FollowerDistributor
dkgBrokerTunnel *dkgmodule.BrokerTunnel
blockTimer protocol.BlockTimer
proposalDurProvider hotstuff.ProposalDurationProvider
committee *committees.Consensus
epochLookup *epochs.EpochLookup
hotstuffModules *consensus.HotstuffModules
dkgState *bstorage.DKGState
safeBeaconKeys *bstorage.SafeBeaconPrivateKeys
getSealingConfigs module.SealingConfigsGetter
err error
mutableState protocol.ParticipantState
beaconPrivateKey *encodable.RandomBeaconPrivKey
guarantees mempool.Guarantees
receipts mempool.ExecutionTree
seals mempool.IncorporatedResultSeals
pendingReceipts mempool.PendingReceipts
receiptRequester *requester.Engine
syncCore *chainsync.Core
comp *compliance.Engine
hot module.HotStuff
conMetrics module.ConsensusMetrics
machineAccountMetrics module.MachineAccountMetrics
mainMetrics module.HotstuffMetrics
receiptValidator module.ReceiptValidator
chunkAssigner *chmodule.ChunkAssigner
followerDistributor *pubsub.FollowerDistributor
dkgBrokerTunnel *dkgmodule.BrokerTunnel
blockTimer protocol.BlockTimer
proposalDurProvider hotstuff.ProposalDurationProvider
committee *committees.Consensus
epochLookup *epochs.EpochLookup
hotstuffModules *consensus.HotstuffModules
myBeaconKeyStateMachine *bstorage.RecoverablePrivateBeaconKeyStateMachine
getSealingConfigs module.SealingConfigsGetter
)
var deprecatedFlagBlockRateDelay time.Duration

Expand Down Expand Up @@ -214,13 +212,9 @@ func main() {
return nil
}).
Module("dkg state", func(node *cmd.NodeConfig) error {
dkgState, err = bstorage.NewDKGState(node.Metrics.Cache, node.SecretsDB)
myBeaconKeyStateMachine, err = bstorage.NewRecoverableRandomBeaconStateMachine(node.Metrics.Cache, node.SecretsDB)
return err
}).
Module("beacon keys", func(node *cmd.NodeConfig) error {
safeBeaconKeys = bstorage.NewSafeBeaconPrivateKeys(dkgState)
return nil
}).
Module("updatable sealing config", func(node *cmd.NodeConfig) error {
setter, err := updatable_configs.NewSealingConfigs(
requiredApprovalsForSealConstruction,
Expand Down Expand Up @@ -344,22 +338,24 @@ func main() {
myBeaconPublicKeyShare)
}

// store my beacon key for the first epoch post-spork
err = dkgState.InsertMyBeaconPrivateKey(epochCounter, beaconPrivateKey.PrivateKey)
if err != nil && !errors.Is(err, storage.ErrAlreadyExists) {
return err
started, err := myBeaconKeyStateMachine.IsDKGStarted(epochCounter)
if err != nil {
return fmt.Errorf("could not get DKG started flag for root epoch %d: %w", epochCounter, err)
}
// mark the root DKG as successful, so it is considered safe to use the key
err = dkgState.SetDKGEndState(epochCounter, flow.DKGEndStateSuccess)
if err != nil && !errors.Is(err, storage.ErrAlreadyExists) {
return err

// perform this only if state machine is in initial state
if !started {
// store my beacon key for the first epoch post-spork
err = myBeaconKeyStateMachine.UpsertMyBeaconPrivateKey(epochCounter, beaconPrivateKey.PrivateKey)
if err != nil {
return fmt.Errorf("could not upsert my beacon private key for root epoch %d: %w", epochCounter, err)
}
}

return nil
}).
Module("my beacon key epoch recovery", func(node *cmd.NodeConfig) error {
recoverMyBeaconKeyStorage := bstorage.NewEpochRecoveryMyBeaconKey(safeBeaconKeys)
myBeaconKeyRecovery, err := dkgmodule.NewBeaconKeyRecovery(node.Logger, node.Me, node.State, recoverMyBeaconKeyStorage)
myBeaconKeyRecovery, err := dkgmodule.NewBeaconKeyRecovery(node.Logger, node.Me, node.State, myBeaconKeyStateMachine)
if err != nil {
return fmt.Errorf("could not initialize my beacon key epoch recovery: %w", err)
}
Expand Down Expand Up @@ -582,7 +578,7 @@ func main() {
// wrap Main consensus committee with metrics
wrappedCommittee := committees.NewMetricsWrapper(committee, mainMetrics) // wrapper for measuring time spent determining consensus committee relations

beaconKeyStore := hotsignature.NewEpochAwareRandomBeaconKeyStore(epochLookup, safeBeaconKeys)
beaconKeyStore := hotsignature.NewEpochAwareRandomBeaconKeyStore(epochLookup, myBeaconKeyStateMachine)

// initialize the combined signer for hotstuff
var signer hotstuff.Signer
Expand Down Expand Up @@ -922,7 +918,7 @@ func main() {
node.Logger,
node.Me,
node.State,
dkgState,
myBeaconKeyStateMachine,
dkgmodule.NewControllerFactory(
node.Logger,
node.Me,
Expand Down
6 changes: 3 additions & 3 deletions cmd/util/cmd/common/node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,16 @@ func ReadFullPartnerNodeInfos(log zerolog.Logger, partnerWeightsPath, partnerNod
}
err = ValidateNetworkPubKey(partner.NetworkPubKey)
if err != nil {
return nil, fmt.Errorf(fmt.Sprintf("invalid network public key: %s", partner.NetworkPubKey))
return nil, fmt.Errorf("invalid network public key: %s", partner.NetworkPubKey)
}
err = ValidateStakingPubKey(partner.StakingPubKey)
if err != nil {
return nil, fmt.Errorf(fmt.Sprintf("invalid staking public key: %s", partner.StakingPubKey))
return nil, fmt.Errorf("invalid staking public key: %s", partner.StakingPubKey)
}

weight := weights[partner.NodeID]
if valid := ValidateWeight(weight); !valid {
return nil, fmt.Errorf(fmt.Sprintf("invalid partner weight: %d", weight))
return nil, fmt.Errorf("invalid partner weight: %d", weight)
}

if weight != flow.DefaultInitialWeight {
Expand Down
5 changes: 5 additions & 0 deletions docs/RecoverableRandomBeaconStateMachine.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
The `storage.RecoverableRandomBeaconStateMachine` formalizes the life-cycle of the Random Beacon keys for each epoch. On the happy path, each consensus participant for the next epoch takes part in a DKG to obtain a threshold key to participate in Flow's Random Beacon. After successfully finishing the DKG protocol, the node obtains a random beacon private key, which is stored in the database along with DKG current state `flow.DKGStateCompleted`. If for any reason the DKG fails, then the private key will be `nil` and DKG current state is set to `flow.DKGStateFailure`.
In case of failing Epoch switchover, the network goes into Epoch Fallback Mode [EFM]. The governance committee can recover the network via a special EpochRecover transaction. In this case, the set of threshold keys is specified by the governance committee.
The current implementation focuses on the scenario, where the governance committee re-uses the threshold key set from the last successful epoch transition. While injecting other threshold keys into the nodes is conceptually possible and supported, the utilities for this recovery path are not yet implemented.

[diagram](https://drive.google.com/file/d/1UnJLlTIs8IDOIHZhNUhXakeP_5Re10S4/view?usp=sharing)
2 changes: 1 addition & 1 deletion engine/common/grpc/forwarder/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (f *Forwarder) reconnectingClient(i int) error {
// FaultTolerantClient implements an upstream connection that reconnects on errors
// a reasonable amount of time.
func (f *Forwarder) FaultTolerantClient() (access.AccessAPIClient, io.Closer, error) {
if f.upstream == nil || len(f.upstream) == 0 {
if len(f.upstream) == 0 {
return nil, nil, status.Errorf(codes.Unimplemented, "method not implemented")
}

Expand Down
3 changes: 3 additions & 0 deletions engine/common/rpc/convert/execution_results_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/onflow/flow-go/utils/unittest"
)

// TODO: fails with input non-nil ChunkBody.ServiceEventCount
func TestConvertExecutionResult(t *testing.T) {
t.Parallel()

Expand All @@ -25,6 +26,7 @@ func TestConvertExecutionResult(t *testing.T) {
assert.Equal(t, er, converted)
}

// TODO: fails with input non-nil ChunkBody.ServiceEventCount
func TestConvertExecutionResults(t *testing.T) {
t.Parallel()

Expand All @@ -43,6 +45,7 @@ func TestConvertExecutionResults(t *testing.T) {
assert.Equal(t, results, converted)
}

// TODO: fails with input non-nil ChunkBody.ServiceEventCount
func TestConvertExecutionResultMetaList(t *testing.T) {
t.Parallel()

Expand Down
67 changes: 40 additions & 27 deletions engine/consensus/dkg/reactor_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (e *ReactorEngine) Ready() <-chan struct{} {
if phase == flow.EpochPhaseSetup {
e.startDKGForEpoch(currentCounter, first)
} else if phase == flow.EpochPhaseCommitted {
// If we start up in EpochCommitted phase, ensure the DKG end state is set correctly.
// If we start up in EpochCommitted phase, ensure the DKG current state is set correctly.
e.handleEpochCommittedPhaseStarted(currentCounter, first)
}
})
Expand Down Expand Up @@ -155,23 +155,23 @@ func (e *ReactorEngine) startDKGForEpoch(currentEpochCounter uint64, first *flow
Logger()

// if we have started the dkg for this epoch already, exit
started, err := e.dkgState.GetDKGStarted(nextEpochCounter)
started, err := e.dkgState.IsDKGStarted(nextEpochCounter)
if err != nil {
// unexpected storage-level error
// TODO use irrecoverable context
log.Fatal().Err(err).Msg("could not check whether DKG is started")
log.Fatal().Err(err).Msg("could not check whether DKG is dkgState")
}
if started {
log.Warn().Msg("DKG started before, skipping starting the DKG for this epoch")
return
}

// flag that we are starting the dkg for this epoch
err = e.dkgState.SetDKGStarted(nextEpochCounter)
err = e.dkgState.SetDKGState(nextEpochCounter, flow.DKGStateStarted)
if err != nil {
// unexpected storage-level error
// TODO use irrecoverable context
log.Fatal().Err(err).Msg("could not set dkg started")
log.Fatal().Err(err).Msg("could not transition DKG state machine into state DKGStateStarted")
}

curDKGInfo, err := e.getDKGInfo(firstID)
Expand Down Expand Up @@ -246,14 +246,17 @@ func (e *ReactorEngine) startDKGForEpoch(currentEpochCounter uint64, first *flow

// handleEpochCommittedPhaseStarted is invoked upon the transition to the EpochCommitted
// phase, when the canonical beacon key vector is incorporated into the protocol state.
// Alternatively we invoke this function preemptively on startup if we are in the
// EpochCommitted Phase, in case the `EpochCommittedPhaseStarted` event was missed
// due to a crash.
//
// This function checks that the local DKG completed and that our locally computed
// key share is consistent with the canonical key vector. When this function returns,
// an end state for the just-completed DKG is guaranteed to be stored (if not, the
// the current state for the just-completed DKG is guaranteed to be stored (if not, the
// program will crash). Since this function is invoked synchronously before the end
// of the current epoch, this guarantees that when we reach the end of the current epoch
// we will either have a usable beacon key (successful DKG) or a DKG failure end state
// stored, so we can safely fall back to using our staking key.
// we will either have a usable beacon key committed (state [flow.RandomBeaconKeyCommitted])
// or we persist [flow.DKGStateFailure], so we can safely fall back to using our staking key.
//
// CAUTION: This function is not safe for concurrent use. This is not enforced within
// the ReactorEngine - instead we rely on the protocol event emission being single-threaded
Expand All @@ -267,13 +270,22 @@ func (e *ReactorEngine) handleEpochCommittedPhaseStarted(currentEpochCounter uin
Uint64("next_epoch", nextEpochCounter). // the epoch the just-finished DKG was preparing for
Logger()

// Check whether we have already set the end state for this DKG.
// Check whether we have already set the current state for this DKG.
// This can happen if the DKG failed locally, if we failed to generate
// a local private beacon key, or if we crashed while performing this
// check previously.
endState, err := e.dkgState.GetDKGEndState(nextEpochCounter)
if err == nil {
log.Warn().Msgf("checking beacon key consistency: exiting because dkg end state was already set: %s", endState.String())
currentState, err := e.dkgState.GetDKGState(nextEpochCounter)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
log.Warn().Msg("failed to get dkg state, assuming this node has skipped epoch setup phase")
} else {
log.Fatal().Err(err).Msg("failed to get dkg state")
}

return
}
if currentState != flow.DKGStateCompleted {
log.Warn().Msgf("checking beacon key consistency: exiting because dkg didn't reach completed state: %s", currentState.String())
return
}

Expand All @@ -289,13 +301,13 @@ func (e *ReactorEngine) handleEpochCommittedPhaseStarted(currentEpochCounter uin
return
}

myBeaconPrivKey, err := e.dkgState.RetrieveMyBeaconPrivateKey(nextEpochCounter)
myBeaconPrivKey, err := e.dkgState.UnsafeRetrieveMyBeaconPrivateKey(nextEpochCounter)
if errors.Is(err, storage.ErrNotFound) {
log.Warn().Msg("checking beacon key consistency: no key found")
err := e.dkgState.SetDKGEndState(nextEpochCounter, flow.DKGEndStateNoKey)
err := e.dkgState.SetDKGState(nextEpochCounter, flow.DKGStateFailure)
if err != nil {
// TODO use irrecoverable context
log.Fatal().Err(err).Msg("failed to set dkg end state")
log.Fatal().Err(err).Msg("failed to set dkg state")
}
return
} else if err != nil {
Expand All @@ -312,25 +324,25 @@ func (e *ReactorEngine) handleEpochCommittedPhaseStarted(currentEpochCounter uin
}
localPubKey := myBeaconPrivKey.PublicKey()

// we computed a local beacon key but it is inconsistent with our canonical
// we computed a local beacon key, but it is inconsistent with our canonical
// public key - therefore it is unsafe for use
if !nextDKGPubKey.Equals(localPubKey) {
log.Warn().
Str("computed_beacon_pub_key", localPubKey.String()).
Str("canonical_beacon_pub_key", nextDKGPubKey.String()).
Msg("checking beacon key consistency: locally computed beacon public key does not match beacon public key for next epoch")
err := e.dkgState.SetDKGEndState(nextEpochCounter, flow.DKGEndStateInconsistentKey)
err := e.dkgState.SetDKGState(nextEpochCounter, flow.DKGStateFailure)
if err != nil {
// TODO use irrecoverable context
log.Fatal().Err(err).Msg("failed to set dkg end state")
log.Fatal().Err(err).Msg("failed to set dkg current state")
}
return
}

err = e.dkgState.SetDKGEndState(nextEpochCounter, flow.DKGEndStateSuccess)
err = e.dkgState.SetDKGState(nextEpochCounter, flow.RandomBeaconKeyCommitted)
if err != nil {
// TODO use irrecoverable context
e.log.Fatal().Err(err).Msg("failed to set dkg end state")
e.log.Fatal().Err(err).Msg("failed to set dkg current state")
}
log.Info().Msgf("successfully ended DKG, my beacon pub key for epoch %d is %s", nextEpochCounter, localPubKey)
}
Expand Down Expand Up @@ -423,16 +435,17 @@ func (e *ReactorEngine) end(nextEpochCounter uint64) func() error {
if crypto.IsDKGFailureError(err) {
// Failing to complete the DKG protocol is a rare but expected scenario, which we must handle.
// By convention, if we are leaving the happy path, we want to persist the _first_ failure symptom
// in the `dkgState`. If the write yields a `storage.ErrAlreadyExists`, we know the overall protocol
// has already abandoned the happy path, because on the happy path the ReactorEngine is the only writer.
// Then this function just stops and returns without error.
// in the `dkgState`. If the write yields a [storage.InvalidDKGStateTransitionError], it means that the state machine
// is in the terminal state([flow.RandomBeaconKeyCommitted]) as all other transitions(even to [flow.DKGStateFailure] -> [flow.DKGStateFailure])
// are allowed. If the protocol is in terminal state, and we have a failure symptom, then it means that recovery has happened
// before ending the DKG. In this case, we want to ignore the error and return without error.
e.log.Warn().Err(err).Msgf("node %s with index %d failed DKG locally", e.me.NodeID(), e.controller.GetIndex())
err := e.dkgState.SetDKGEndState(nextEpochCounter, flow.DKGEndStateDKGFailure)
err := e.dkgState.SetDKGState(nextEpochCounter, flow.DKGStateFailure)
if err != nil {
if errors.Is(err, storage.ErrAlreadyExists) {
return nil // DKGEndState already being set is expected in case of epoch recovery
if storage.IsInvalidDKGStateTransitionError(err) {
return nil
}
return fmt.Errorf("failed to set dkg end state following dkg end error: %w", err)
return fmt.Errorf("failed to set dkg current state following dkg end error: %w", err)
}
return nil // local DKG protocol has failed (the expected scenario)
} else if err != nil {
Expand Down
Loading

0 comments on commit 92ec89a

Please sign in to comment.