Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EFM] Recoverable Random Beacon State Machine #6771

Open
wants to merge 35 commits into
base: feature/efm-recovery
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
100a92b
Changed structure of interfaces and corresponding implementation for …
durkmurder Nov 20, 2024
719b6a6
Removed 'DKG started' from storage.
durkmurder Nov 22, 2024
01bbdcf
Updated DKG states to have extra states. They no more represent the e…
durkmurder Nov 22, 2024
1d3c6a4
Updated usages of DKG storage in reactor engine
durkmurder Nov 22, 2024
6c34886
Added back GetDKGStarted for easier usage in reactor engine. Updated …
durkmurder Nov 22, 2024
8aac526
Implemented allowed state transitions for recoverable random beacon s…
durkmurder Nov 22, 2024
064b651
Fixed unit test compilation. Updated allowed state transitions
durkmurder Nov 22, 2024
6bcaf38
Renamed interface methods
durkmurder Nov 22, 2024
c13c7f6
Updated mocks
durkmurder Nov 22, 2024
489c871
Fixed tests for reactor engine
durkmurder Nov 22, 2024
10aab93
Updated godoc and reduced number of states for Recoverable state machine
durkmurder Nov 25, 2024
fb28249
Updated usages of DKG state. Updated naming, godocs
durkmurder Nov 25, 2024
fb161f7
Removed flow.RandomBeaconKeyRecovered state. Cleanup
durkmurder Nov 28, 2024
650b8b8
Updated how recovery happens in terms of inserting values
durkmurder Nov 28, 2024
2386623
Implemented test for enforcing invariants of the uninitialized state
durkmurder Nov 29, 2024
bfe95b0
Added additional test cases.
durkmurder Nov 29, 2024
79aac04
Updated logic for state transitions
durkmurder Nov 29, 2024
1119c8c
Added additional test for Completed state
durkmurder Nov 29, 2024
c8dfca6
Added tests for failure state
durkmurder Nov 29, 2024
c9182c5
Added extra tests for Random Beacon Key Committed state
durkmurder Nov 29, 2024
577712a
Updated godoc for DKG tests
durkmurder Dec 2, 2024
7e728aa
Godoc updates
durkmurder Dec 2, 2024
1fa11c6
Updated mocks
durkmurder Dec 2, 2024
a92d8b7
Naming updates
durkmurder Dec 2, 2024
f0be4ba
Fixed broken tests
durkmurder Dec 2, 2024
62d399d
Linted
durkmurder Dec 2, 2024
bea9c1a
Fixed broken integration tests for DKG
durkmurder Dec 2, 2024
89005a8
Merge branch 'feature/efm-recovery' into yurii/6725-recoverable-rando…
durkmurder Dec 3, 2024
0b9f732
Fixed initialization of beacon private key state machine
durkmurder Dec 3, 2024
5a98f76
Updated godoc. Added specific sentinel for error handling
durkmurder Dec 3, 2024
b639742
Updated assertions with the account for sentinel errors
durkmurder Dec 3, 2024
be18c57
Updated logging
durkmurder Dec 3, 2024
d0df4fb
Linted
durkmurder Dec 3, 2024
0f98cf7
Fixed invalid exit logic in DKG reactor engine
durkmurder Dec 3, 2024
6ea64d6
Fixed broken test
durkmurder Dec 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 40 additions & 44 deletions cmd/consensus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ import (
"github.com/onflow/flow-go/state/protocol/blocktimer"
"github.com/onflow/flow-go/state/protocol/events/gadgets"
protocol_state "github.com/onflow/flow-go/state/protocol/protocol_state/state"
"github.com/onflow/flow-go/storage"
bstorage "github.com/onflow/flow-go/storage/badger"
"github.com/onflow/flow-go/utils/io"
)
Expand Down Expand Up @@ -104,32 +103,31 @@ func main() {
insecureAccessAPI bool
accessNodeIDS []string

err error
mutableState protocol.ParticipantState
beaconPrivateKey *encodable.RandomBeaconPrivKey
guarantees mempool.Guarantees
receipts mempool.ExecutionTree
seals mempool.IncorporatedResultSeals
pendingReceipts mempool.PendingReceipts
receiptRequester *requester.Engine
syncCore *chainsync.Core
comp *compliance.Engine
hot module.HotStuff
conMetrics module.ConsensusMetrics
machineAccountMetrics module.MachineAccountMetrics
mainMetrics module.HotstuffMetrics
receiptValidator module.ReceiptValidator
chunkAssigner *chmodule.ChunkAssigner
followerDistributor *pubsub.FollowerDistributor
dkgBrokerTunnel *dkgmodule.BrokerTunnel
blockTimer protocol.BlockTimer
proposalDurProvider hotstuff.ProposalDurationProvider
committee *committees.Consensus
epochLookup *epochs.EpochLookup
hotstuffModules *consensus.HotstuffModules
dkgState *bstorage.DKGState
safeBeaconKeys *bstorage.SafeBeaconPrivateKeys
getSealingConfigs module.SealingConfigsGetter
err error
mutableState protocol.ParticipantState
beaconPrivateKey *encodable.RandomBeaconPrivKey
guarantees mempool.Guarantees
receipts mempool.ExecutionTree
seals mempool.IncorporatedResultSeals
pendingReceipts mempool.PendingReceipts
receiptRequester *requester.Engine
syncCore *chainsync.Core
comp *compliance.Engine
hot module.HotStuff
conMetrics module.ConsensusMetrics
machineAccountMetrics module.MachineAccountMetrics
mainMetrics module.HotstuffMetrics
receiptValidator module.ReceiptValidator
chunkAssigner *chmodule.ChunkAssigner
followerDistributor *pubsub.FollowerDistributor
dkgBrokerTunnel *dkgmodule.BrokerTunnel
blockTimer protocol.BlockTimer
proposalDurProvider hotstuff.ProposalDurationProvider
committee *committees.Consensus
epochLookup *epochs.EpochLookup
hotstuffModules *consensus.HotstuffModules
myBeaconKeyStateMachine *bstorage.RecoverablePrivateBeaconKeyStateMachine
getSealingConfigs module.SealingConfigsGetter
)
var deprecatedFlagBlockRateDelay time.Duration

Expand Down Expand Up @@ -214,13 +212,9 @@ func main() {
return nil
}).
Module("dkg state", func(node *cmd.NodeConfig) error {
dkgState, err = bstorage.NewDKGState(node.Metrics.Cache, node.SecretsDB)
myBeaconKeyStateMachine, err = bstorage.NewRecoverableRandomBeaconStateMachine(node.Metrics.Cache, node.SecretsDB)
return err
}).
Module("beacon keys", func(node *cmd.NodeConfig) error {
safeBeaconKeys = bstorage.NewSafeBeaconPrivateKeys(dkgState)
return nil
}).
Module("updatable sealing config", func(node *cmd.NodeConfig) error {
setter, err := updatable_configs.NewSealingConfigs(
requiredApprovalsForSealConstruction,
Expand Down Expand Up @@ -344,22 +338,24 @@ func main() {
myBeaconPublicKeyShare)
}

// store my beacon key for the first epoch post-spork
err = dkgState.InsertMyBeaconPrivateKey(epochCounter, beaconPrivateKey.PrivateKey)
if err != nil && !errors.Is(err, storage.ErrAlreadyExists) {
return err
started, err := myBeaconKeyStateMachine.GetDKGStarted(epochCounter)
if err != nil {
return fmt.Errorf("could not get DKG started flag for root epoch %d: %w", epochCounter, err)
}
// mark the root DKG as successful, so it is considered safe to use the key
err = dkgState.SetDKGEndState(epochCounter, flow.DKGEndStateSuccess)
if err != nil && !errors.Is(err, storage.ErrAlreadyExists) {
return err

// perform this only if state machine is in initial state
if !started {
// store my beacon key for the first epoch post-spork
err = myBeaconKeyStateMachine.UpsertMyBeaconPrivateKey(epochCounter, beaconPrivateKey.PrivateKey)
if err != nil {
return fmt.Errorf("could not upsert my beacon private key for root epoch %d: %w", epochCounter, err)
}
}

return nil
}).
Module("my beacon key epoch recovery", func(node *cmd.NodeConfig) error {
recoverMyBeaconKeyStorage := bstorage.NewEpochRecoveryMyBeaconKey(safeBeaconKeys)
myBeaconKeyRecovery, err := dkgmodule.NewBeaconKeyRecovery(node.Logger, node.Me, node.State, recoverMyBeaconKeyStorage)
myBeaconKeyRecovery, err := dkgmodule.NewBeaconKeyRecovery(node.Logger, node.Me, node.State, myBeaconKeyStateMachine)
if err != nil {
return fmt.Errorf("could not initialize my beacon key epoch recovery: %w", err)
}
Expand Down Expand Up @@ -582,7 +578,7 @@ func main() {
// wrap Main consensus committee with metrics
wrappedCommittee := committees.NewMetricsWrapper(committee, mainMetrics) // wrapper for measuring time spent determining consensus committee relations

beaconKeyStore := hotsignature.NewEpochAwareRandomBeaconKeyStore(epochLookup, safeBeaconKeys)
beaconKeyStore := hotsignature.NewEpochAwareRandomBeaconKeyStore(epochLookup, myBeaconKeyStateMachine)

// initialize the combined signer for hotstuff
var signer hotstuff.Signer
Expand Down Expand Up @@ -922,7 +918,7 @@ func main() {
node.Logger,
node.Me,
node.State,
dkgState,
myBeaconKeyStateMachine,
dkgmodule.NewControllerFactory(
node.Logger,
node.Me,
Expand Down
6 changes: 3 additions & 3 deletions cmd/util/cmd/common/node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,16 @@ func ReadFullPartnerNodeInfos(log zerolog.Logger, partnerWeightsPath, partnerNod
}
err = ValidateNetworkPubKey(partner.NetworkPubKey)
if err != nil {
return nil, fmt.Errorf(fmt.Sprintf("invalid network public key: %s", partner.NetworkPubKey))
return nil, fmt.Errorf("invalid network public key: %s", partner.NetworkPubKey)
}
err = ValidateStakingPubKey(partner.StakingPubKey)
if err != nil {
return nil, fmt.Errorf(fmt.Sprintf("invalid staking public key: %s", partner.StakingPubKey))
return nil, fmt.Errorf("invalid staking public key: %s", partner.StakingPubKey)
}

weight := weights[partner.NodeID]
if valid := ValidateWeight(weight); !valid {
return nil, fmt.Errorf(fmt.Sprintf("invalid partner weight: %d", weight))
return nil, fmt.Errorf("invalid partner weight: %d", weight)
}

if weight != flow.DefaultInitialWeight {
Expand Down
2 changes: 1 addition & 1 deletion engine/common/grpc/forwarder/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (f *Forwarder) reconnectingClient(i int) error {
// FaultTolerantClient implements an upstream connection that reconnects on errors
// a reasonable amount of time.
func (f *Forwarder) FaultTolerantClient() (access.AccessAPIClient, io.Closer, error) {
if f.upstream == nil || len(f.upstream) == 0 {
if len(f.upstream) == 0 {
return nil, nil, status.Errorf(codes.Unimplemented, "method not implemented")
}

Expand Down
30 changes: 17 additions & 13 deletions engine/consensus/dkg/reactor_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,24 +154,24 @@ func (e *ReactorEngine) startDKGForEpoch(currentEpochCounter uint64, first *flow
Hex("first_block_id", firstID[:]). // id of first block in EpochSetup phase
Logger()

// if we have started the dkg for this epoch already, exit
// if we have dkgState the dkg for this epoch already, exit
started, err := e.dkgState.GetDKGStarted(nextEpochCounter)
if err != nil {
// unexpected storage-level error
// TODO use irrecoverable context
log.Fatal().Err(err).Msg("could not check whether DKG is started")
log.Fatal().Err(err).Msg("could not check whether DKG is dkgState")
}
if started {
log.Warn().Msg("DKG started before, skipping starting the DKG for this epoch")
return
}

// flag that we are starting the dkg for this epoch
err = e.dkgState.SetDKGStarted(nextEpochCounter)
err = e.dkgState.SetDKGState(nextEpochCounter, flow.DKGStateStarted)
if err != nil {
// unexpected storage-level error
// TODO use irrecoverable context
log.Fatal().Err(err).Msg("could not set dkg started")
log.Fatal().Err(err).Msg("could not set dkg dkgState")
}

curDKGInfo, err := e.getDKGInfo(firstID)
Expand Down Expand Up @@ -271,9 +271,13 @@ func (e *ReactorEngine) handleEpochCommittedPhaseStarted(currentEpochCounter uin
// This can happen if the DKG failed locally, if we failed to generate
// a local private beacon key, or if we crashed while performing this
// check previously.
endState, err := e.dkgState.GetDKGEndState(nextEpochCounter)
if err == nil {
log.Warn().Msgf("checking beacon key consistency: exiting because dkg end state was already set: %s", endState.String())
currentState, err := e.dkgState.GetDKGState(nextEpochCounter)
if err != nil {
log.Fatal().Err(err).Msg("failed to get dkg state, by this point it should have been set")
return
}
if currentState != flow.DKGStateCompleted {
log.Warn().Msgf("checking beacon key consistency: exiting because dkg didn't reach completed state: %s", currentState.String())
return
}

Expand All @@ -289,10 +293,10 @@ func (e *ReactorEngine) handleEpochCommittedPhaseStarted(currentEpochCounter uin
return
}

myBeaconPrivKey, err := e.dkgState.RetrieveMyBeaconPrivateKey(nextEpochCounter)
myBeaconPrivKey, err := e.dkgState.UnsafeRetrieveMyBeaconPrivateKey(nextEpochCounter)
if errors.Is(err, storage.ErrNotFound) {
log.Warn().Msg("checking beacon key consistency: no key found")
err := e.dkgState.SetDKGEndState(nextEpochCounter, flow.DKGEndStateNoKey)
err := e.dkgState.SetDKGState(nextEpochCounter, flow.DKGStateFailure)
if err != nil {
// TODO use irrecoverable context
log.Fatal().Err(err).Msg("failed to set dkg end state")
Expand All @@ -319,15 +323,15 @@ func (e *ReactorEngine) handleEpochCommittedPhaseStarted(currentEpochCounter uin
Str("computed_beacon_pub_key", localPubKey.String()).
Str("canonical_beacon_pub_key", nextDKGPubKey.String()).
Msg("checking beacon key consistency: locally computed beacon public key does not match beacon public key for next epoch")
err := e.dkgState.SetDKGEndState(nextEpochCounter, flow.DKGEndStateInconsistentKey)
err := e.dkgState.SetDKGState(nextEpochCounter, flow.DKGStateFailure)
if err != nil {
// TODO use irrecoverable context
log.Fatal().Err(err).Msg("failed to set dkg end state")
}
return
}

err = e.dkgState.SetDKGEndState(nextEpochCounter, flow.DKGEndStateSuccess)
err = e.dkgState.SetDKGState(nextEpochCounter, flow.RandomBeaconKeyCommitted)
if err != nil {
// TODO use irrecoverable context
e.log.Fatal().Err(err).Msg("failed to set dkg end state")
Expand Down Expand Up @@ -427,10 +431,10 @@ func (e *ReactorEngine) end(nextEpochCounter uint64) func() error {
// has already abandoned the happy path, because on the happy path the ReactorEngine is the only writer.
// Then this function just stops and returns without error.
e.log.Warn().Err(err).Msgf("node %s with index %d failed DKG locally", e.me.NodeID(), e.controller.GetIndex())
err := e.dkgState.SetDKGEndState(nextEpochCounter, flow.DKGEndStateDKGFailure)
err := e.dkgState.SetDKGState(nextEpochCounter, flow.DKGStateFailure)
if err != nil {
if errors.Is(err, storage.ErrAlreadyExists) {
return nil // DKGEndState already being set is expected in case of epoch recovery
return nil // DKGState already being set is expected in case of epoch recovery
}
return fmt.Errorf("failed to set dkg end state following dkg end error: %w", err)
}
Expand Down
Loading
Loading