Skip to content

Commit

Permalink
Merge pull request #65 from DarkLord017/Refactor-consensus
Browse files Browse the repository at this point in the history
Refactor consensus
  • Loading branch information
star-gazer111 authored Nov 4, 2024
2 parents d97fa10 + ea8a406 commit 5b17eca
Showing 1 changed file with 166 additions and 86 deletions.
252 changes: 166 additions & 86 deletions consensus/consensus.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package consensus


///NOTE: only these imports are required others are in package already
// uses rpc
// uses config for networks
// uses common for datatypes
import (
"bytes"
"context"
"encoding/hex"
"fmt"
"log"
Expand Down Expand Up @@ -181,48 +181,72 @@ func (con ConsensusClient) Expected_current_slot() uint64 {
}

func sync_fallback(inner *Inner, fallback *string) error {
cf, err := (&checkpoints.CheckpointFallback{}).FetchLatestCheckpointFromApi(*fallback)
if err != nil {
return errors.Wrap(err, "failed to fetch checkpoint from API")
}
return inner.sync(cf)
// Create a buffered channel to receive any errors from the goroutine
errorChan := make(chan error, 1)

go func() {
// Attempt to fetch the latest checkpoint from the API
cf, err := (&checkpoints.CheckpointFallback{}).FetchLatestCheckpointFromApi(*fallback)
if err != nil {

errorChan <- err
return
}

if err := inner.sync(cf); err != nil {

errorChan <- err
return
}

errorChan <- nil
}()

return <-errorChan
}

func sync_all_fallback(inner *Inner, chainID uint64) error {
var n config.Network
network, err := n.ChainID(chainID)
if err != nil {
return err
}
errorChan := make(chan error, 1)

ch := checkpoints.CheckpointFallback{}
go func() {

checkpointFallback, errWhileCheckpoint := ch.Build()
if errWhileCheckpoint != nil {
return err
}
ch := checkpoints.CheckpointFallback{}

chainId := network.Chain.ChainID
var networkName config.Network
if chainId == 1 {
networkName = config.MAINNET
} else if chainId == 5 {
networkName = config.GOERLI
} else if chainId == 11155111 {
networkName = config.SEPOLIA
} else {
return errors.New("chain id not recognized")
}
checkpointFallback, errWhileCheckpoint := ch.Build()
if errWhileCheckpoint != nil {
errorChan <- errWhileCheckpoint
return
}

chainId := network.Chain.ChainID
var networkName config.Network
if chainId == 1 {
networkName = config.MAINNET
} else if chainId == 5 {
networkName = config.GOERLI
} else if chainId == 11155111 {
networkName = config.SEPOLIA
} else {
errorChan <- errors.New("chain id not recognized")
return
}

// Fetch the latest checkpoint from the network
checkpoint := checkpointFallback.FetchLatestCheckpoint(networkName)
// Fetch the latest checkpoint from the network
checkpoint := checkpointFallback.FetchLatestCheckpoint(networkName)

// Sync using the inner struct's sync method
if err := inner.sync(checkpoint); err != nil {
return err
}
// Sync using the inner struct's sync method
if err := inner.sync(checkpoint); err != nil {
errorChan <- err
}

return nil
errorChan <- nil
}()
return <-errorChan
}

func (in *Inner) New(rpcURL string, blockSend chan common.Block, finalizedBlockSend chan *common.Block, checkpointSend chan *[]byte, config *config.Config) *Inner {
Expand All @@ -240,21 +264,40 @@ func (in *Inner) New(rpcURL string, blockSend chan common.Block, finalizedBlockS

}
func (in *Inner) Check_rpc() error {
chainID, err := in.RPC.ChainId()
if err != nil {
return err
}
if chainID != in.Config.Chain.ChainID {
return ErrIncorrectRpcNetwork
}
return nil
errorChan := make(chan error, 1)

go func() {
chainID, err := in.RPC.ChainId()
if err != nil {
errorChan <- err
return
}
if chainID != in.Config.Chain.ChainID {
errorChan <- ErrIncorrectRpcNetwork
return
}
errorChan <- nil
}()
return <-errorChan
}
func (in *Inner) get_execution_payload(ctx context.Context, slot *uint64) (*consensus_core.ExecutionPayload, error) {
block, err := in.RPC.GetBlock(*slot)
if err != nil {
func (in *Inner) get_execution_payload(slot *uint64) (*consensus_core.ExecutionPayload, error) {
errorChan := make(chan error, 1)
blockChan := make(chan consensus_core.BeaconBlock, 1)
go func() {
var err error
block, err := in.RPC.GetBlock(*slot)
if err != nil {
errorChan <- err
}
errorChan <- nil
blockChan <- block
}()

if err := <-errorChan; err != nil {
return nil, err
}

block := <-blockChan
blockHash, err := utils.TreeHashRoot(block.Body.ToBytes())
if err != nil {
return nil, err
Expand Down Expand Up @@ -288,7 +331,7 @@ func (in *Inner) get_execution_payload(ctx context.Context, slot *uint64) (*cons
return &payload, nil
}

func (in *Inner) Get_payloads(ctx context.Context, startSlot, endSlot uint64) ([]interface{}, error) {
func (in *Inner) Get_payloads(startSlot, endSlot uint64) ([]interface{}, error) {
var payloads []interface{}

// Fetch the block at endSlot to get the initial parent hash
Expand Down Expand Up @@ -345,11 +388,22 @@ func (in *Inner) Get_payloads(ctx context.Context, startSlot, endSlot uint64) ([
}
}
func (in *Inner) advance() error {
// Fetch and apply finality update
finalityUpdate, err := in.RPC.GetFinalityUpdate()
if err != nil {
return err
ErrorChan := make(chan error, 1)
finalityChan := make(chan consensus_core.FinalityUpdate, 1)

go func() {
finalityUpdate, err := in.RPC.GetFinalityUpdate()
if err != nil {
ErrorChan <- err
return
}
finalityChan <- finalityUpdate
ErrorChan <- nil
}()
if ErrorChan != nil {
return <-ErrorChan
}
finalityUpdate := <-finalityChan
if err := in.verify_finality_update(&finalityUpdate); err != nil {
return err
}
Expand Down Expand Up @@ -394,60 +448,72 @@ func (in *Inner) sync(checkpoint [32]byte) error {
// Perform bootstrap with the given checkpoint
in.bootstrap(checkpoint)

// Calculate the current sync period

currentPeriod := utils.CalcSyncPeriod(in.Store.FinalizedHeader.Slot)

// Fetch updates
updates, err := in.RPC.GetUpdates(currentPeriod, MAX_REQUEST_LIGHT_CLIENT_UPDATES)
if err != nil {
return err
}
errorChan := make(chan error, 1)
var updates []consensus_core.Update
var err error
go func() {
updates, err = in.RPC.GetUpdates(currentPeriod, MAX_REQUEST_LIGHT_CLIENT_UPDATES)
if err != nil {
errorChan <- err
}

// Apply updates
for _, update := range updates {
if err := in.verify_update(&update); err != nil {
return err
// Apply updates
for _, update := range updates {
if err := in.verify_update(&update); err != nil {
errorChan <- err
return
}
in.apply_update(&update)
}
in.apply_update(&update)
}

// Fetch and apply finality update
finalityUpdate, err := in.RPC.GetFinalityUpdate()
if err != nil {
return err
}
if err := in.verify_finality_update(&finalityUpdate); err != nil {
return err
}
in.apply_finality_update(&finalityUpdate)
finalityUpdate, err := in.RPC.GetFinalityUpdate()
if err != nil {
errorChan <- err
return
}
if err := in.verify_finality_update(&finalityUpdate); err != nil {
errorChan <- err
return
}
in.apply_finality_update(&finalityUpdate)

// Fetch and apply optimistic update
// Fetch and apply optimistic update

optimisticUpdate, err := in.RPC.GetOptimisticUpdate()
if err != nil {
return err
}
if err := in.verify_optimistic_update(&optimisticUpdate); err != nil {
optimisticUpdate, err := in.RPC.GetOptimisticUpdate()
if err != nil {
errorChan <- err
return
}
if err := in.verify_optimistic_update(&optimisticUpdate); err != nil {
errorChan <- err
return
}
in.apply_optimistic_update(&optimisticUpdate)
errorChan <- nil
log.Printf("consensus client in sync with checkpoint: 0x%s", hex.EncodeToString(checkpoint[:]))
}()

if err := <-errorChan; err != nil {
return err
}
in.apply_optimistic_update(&optimisticUpdate)

// Log the success message
log.Printf("consensus client in sync with checkpoint: 0x%s", hex.EncodeToString(checkpoint[:]))

return nil
}
func (in *Inner) send_blocks() error {
// Get slot from the optimistic header
slot := in.Store.OptimisticHeader.Slot
payload, err := in.get_execution_payload(context.Background(), &slot)
payload, err := in.get_execution_payload(&slot)
if err != nil {
return err
}

// Get finalized slot from the finalized header
finalizedSlot := in.Store.FinalizedHeader.Slot
finalizedPayload, err := in.get_execution_payload(context.Background(), &finalizedSlot)
finalizedPayload, err := in.get_execution_payload(&finalizedSlot)
if err != nil {
return err
}
Expand Down Expand Up @@ -493,12 +559,25 @@ func (in *Inner) duration_until_next_update() time.Duration {
return time.Duration(nextUpdate) * time.Second
}
func (in *Inner) bootstrap(checkpoint [32]byte) {

bootstrap, errInBootstrap := in.RPC.GetBootstrap(checkpoint)
if errInBootstrap != nil {
log.Printf("failed to fetch bootstrap: %v", errInBootstrap)
errorChan := make(chan error, 1)
bootstrapChan := make(chan consensus_core.Bootstrap, 1)
go func() {
bootstrap, errInBootstrap := in.RPC.GetBootstrap(checkpoint)

if errInBootstrap != nil {
log.Printf("failed to fetch bootstrap: %v", errInBootstrap)
errorChan <- errInBootstrap
return
}
bootstrapChan <- bootstrap
errorChan <- nil
}()
if err := <-errorChan; err != nil {
return
}
bootstrap := <-bootstrapChan



isValid := in.is_valid_checkpoint(bootstrap.Header.Slot)
if !isValid {
Expand Down Expand Up @@ -547,7 +626,7 @@ func apply_bootstrap(store *LightClientStore, bootstrap consensus_core.Bootstrap

func (in *Inner) verify_generic_update(update *GenericUpdate, expectedCurrentSlot uint64, store *LightClientStore, genesisRoots []byte, forks consensus_core.Forks) error {
{
bits := getBits(update.SyncAggregate.SyncCommitteeBits)
bits := getBits(update.SyncAggregate.SyncCommitteeBits[:])
if bits == 0 {
return ErrInsufficientParticipation
}
Expand Down Expand Up @@ -614,6 +693,7 @@ func (in *Inner) verify_generic_update(update *GenericUpdate, expectedCurrentSlo

forkVersion := utils.CalculateForkVersion(&forks, update.SignatureSlot)
forkDataRoot := utils.ComputeForkDataRoot(forkVersion, consensus_core.Bytes32(in.Config.Chain.GenesisRoot))


if !verifySyncCommitteeSignature(pks, &update.AttestedHeader, &update.SyncAggregate.SyncCommitteeSignature, forkDataRoot) {
return ErrInvalidSignature
Expand Down Expand Up @@ -653,7 +733,7 @@ func (in *Inner) verify_optimistic_update(update *consensus_core.OptimisticUpdat
return in.verify_generic_update(&genUpdate, in.expected_current_slot(), &in.Store, in.Config.Chain.GenesisRoot, in.Config.Forks)
}
func (in *Inner) apply_generic_update(store *LightClientStore, update *GenericUpdate) *[]byte {
committeeBits := getBits(update.SyncAggregate.SyncCommitteeBits)
committeeBits := getBits(update.SyncAggregate.SyncCommitteeBits[:])

// Update max active participants
if committeeBits > store.CurrentMaxActiveParticipants {
Expand Down Expand Up @@ -766,7 +846,7 @@ func (in *Inner) apply_optimistic_update(update *consensus_core.OptimisticUpdate
}
}
func (in *Inner) Log_finality_update(update *consensus_core.FinalityUpdate) {
participation := float32(getBits(update.SyncAggregate.SyncCommitteeBits)) / 512.0 * 100.0
participation := float32(getBits(update.SyncAggregate.SyncCommitteeBits[:])) / 512.0 * 100.0
decimals := 2
if participation == 100.0 {
decimals = 1
Expand All @@ -784,7 +864,7 @@ func (in *Inner) Log_finality_update(update *consensus_core.FinalityUpdate) {
)
}
func (in *Inner) Log_optimistic_update(update *consensus_core.OptimisticUpdate) {
participation := float32(getBits(update.SyncAggregate.SyncCommitteeBits)) / 512.0 * 100.0
participation := float32(getBits(update.SyncAggregate.SyncCommitteeBits[:])) / 512.0 * 100.0
decimals := 2
if participation == 100.0 {
decimals = 1
Expand Down Expand Up @@ -1005,7 +1085,7 @@ func processTransaction(txBytes *[1073741824]byte, blockHash consensus_core.Byte
}

// getBits counts the number of bits set to 1 in a [64]byte array
func getBits(bitfield [64]byte) uint64 {
func getBits(bitfield []byte) uint64 {
var count uint64
for _, b := range bitfield {
count += uint64(popCount(b))
Expand Down

0 comments on commit 5b17eca

Please sign in to comment.