Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor types & consensus core #66

Merged
Prev Previous commit
Next Next commit
Refactor consensus.go
DarkLord017 authored Oct 1, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit 1eb60402ed407218a33ca10becb0adec19a1911a
251 changes: 165 additions & 86 deletions consensus/consensus.go
Original file line number Diff line number Diff line change
@@ -6,7 +6,6 @@ package consensus
// uses common for datatypes
import (
"bytes"
"context"
"encoding/hex"
"fmt"
"log"
@@ -181,48 +180,72 @@ func (con ConsensusClient) Expected_current_slot() uint64 {
}

func sync_fallback(inner *Inner, fallback *string) error {
cf, err := (&checkpoints.CheckpointFallback{}).FetchLatestCheckpointFromApi(*fallback)
if err != nil {
return errors.Wrap(err, "failed to fetch checkpoint from API")
}
return inner.sync(cf)
// Create a buffered channel to receive any errors from the goroutine
errorChan := make(chan error, 1)

go func() {
// Attempt to fetch the latest checkpoint from the API
cf, err := (&checkpoints.CheckpointFallback{}).FetchLatestCheckpointFromApi(*fallback)
if err != nil {

errorChan <- err
return
}

if err := inner.sync(cf); err != nil {

errorChan <- err
return
}

errorChan <- nil
}()

return <-errorChan
}

func sync_all_fallback(inner *Inner, chainID uint64) error {
var n config.Network
network, err := n.ChainID(chainID)
if err != nil {
return err
}
errorChan := make(chan error, 1)

ch := checkpoints.CheckpointFallback{}
go func() {

checkpointFallback, errWhileCheckpoint := ch.Build()
if errWhileCheckpoint != nil {
return err
}
ch := checkpoints.CheckpointFallback{}

chainId := network.Chain.ChainID
var networkName config.Network
if chainId == 1 {
networkName = config.MAINNET
} else if chainId == 5 {
networkName = config.GOERLI
} else if chainId == 11155111 {
networkName = config.SEPOLIA
} else {
return errors.New("chain id not recognized")
}
checkpointFallback, errWhileCheckpoint := ch.Build()
if errWhileCheckpoint != nil {
errorChan <- errWhileCheckpoint
return
}

// Fetch the latest checkpoint from the network
checkpoint := checkpointFallback.FetchLatestCheckpoint(networkName)
chainId := network.Chain.ChainID
var networkName config.Network
if chainId == 1 {
networkName = config.MAINNET
} else if chainId == 5 {
networkName = config.GOERLI
} else if chainId == 11155111 {
networkName = config.SEPOLIA
} else {
errorChan <- errors.New("chain id not recognized")
return
}

// Sync using the inner struct's sync method
if err := inner.sync(checkpoint); err != nil {
return err
}
// Fetch the latest checkpoint from the network
checkpoint := checkpointFallback.FetchLatestCheckpoint(networkName)

return nil
// Sync using the inner struct's sync method
if err := inner.sync(checkpoint); err != nil {
errorChan <- err
}

errorChan <- nil
}()
return <-errorChan
}

func (in *Inner) New(rpcURL string, blockSend chan common.Block, finalizedBlockSend chan *common.Block, checkpointSend chan *[]byte, config *config.Config) *Inner {
@@ -240,21 +263,40 @@ func (in *Inner) New(rpcURL string, blockSend chan common.Block, finalizedBlockS

}
func (in *Inner) Check_rpc() error {
chainID, err := in.RPC.ChainId()
if err != nil {
return err
}
if chainID != in.Config.Chain.ChainID {
return ErrIncorrectRpcNetwork
}
return nil
errorChan := make(chan error, 1)

go func() {
chainID, err := in.RPC.ChainId()
if err != nil {
errorChan <- err
return
}
if chainID != in.Config.Chain.ChainID {
errorChan <- ErrIncorrectRpcNetwork
return
}
errorChan <- nil
}()
return <-errorChan
}
func (in *Inner) get_execution_payload(ctx context.Context, slot *uint64) (*consensus_core.ExecutionPayload, error) {
block, err := in.RPC.GetBlock(*slot)
if err != nil {
func (in *Inner) get_execution_payload(slot *uint64) (*consensus_core.ExecutionPayload, error) {
errorChan := make(chan error, 1)
blockChan := make(chan consensus_core.BeaconBlock, 1)
go func() {
var err error
block, err := in.RPC.GetBlock(*slot)
if err != nil {
errorChan <- err
}
errorChan <- nil
blockChan <- block
}()

if err := <-errorChan; err != nil {
return nil, err
}

block := <-blockChan
blockHash, err := utils.TreeHashRoot(block.Body.ToBytes())
if err != nil {
return nil, err
@@ -288,7 +330,7 @@ func (in *Inner) get_execution_payload(ctx context.Context, slot *uint64) (*cons
return &payload, nil
}

func (in *Inner) Get_payloads(ctx context.Context, startSlot, endSlot uint64) ([]interface{}, error) {
func (in *Inner) Get_payloads(startSlot, endSlot uint64) ([]interface{}, error) {
var payloads []interface{}

// Fetch the block at endSlot to get the initial parent hash
@@ -345,11 +387,22 @@ func (in *Inner) Get_payloads(ctx context.Context, startSlot, endSlot uint64) ([
}
}
func (in *Inner) advance() error {
// Fetch and apply finality update
finalityUpdate, err := in.RPC.GetFinalityUpdate()
if err != nil {
return err
ErrorChan := make(chan error, 1)
finalityChan := make(chan consensus_core.FinalityUpdate, 1)

go func() {
finalityUpdate, err := in.RPC.GetFinalityUpdate()
if err != nil {
ErrorChan <- err
return
}
finalityChan <- finalityUpdate
ErrorChan <- nil
}()
if ErrorChan != nil {
return <-ErrorChan
}
finalityUpdate := <-finalityChan
if err := in.verify_finality_update(&finalityUpdate); err != nil {
return err
}
@@ -394,60 +447,72 @@ func (in *Inner) sync(checkpoint [32]byte) error {
// Perform bootstrap with the given checkpoint
in.bootstrap(checkpoint)

// Calculate the current sync period

currentPeriod := utils.CalcSyncPeriod(in.Store.FinalizedHeader.Slot)

// Fetch updates
updates, err := in.RPC.GetUpdates(currentPeriod, MAX_REQUEST_LIGHT_CLIENT_UPDATES)
if err != nil {
return err
}
errorChan := make(chan error, 1)
var updates []consensus_core.Update
var err error
go func() {
updates, err = in.RPC.GetUpdates(currentPeriod, MAX_REQUEST_LIGHT_CLIENT_UPDATES)
if err != nil {
errorChan <- err
}

// Apply updates
for _, update := range updates {
if err := in.verify_update(&update); err != nil {
return err
// Apply updates
for _, update := range updates {
if err := in.verify_update(&update); err != nil {
errorChan <- err
return
}
in.apply_update(&update)
}
in.apply_update(&update)
}

// Fetch and apply finality update
finalityUpdate, err := in.RPC.GetFinalityUpdate()
if err != nil {
return err
}
if err := in.verify_finality_update(&finalityUpdate); err != nil {
return err
}
in.apply_finality_update(&finalityUpdate)
finalityUpdate, err := in.RPC.GetFinalityUpdate()
if err != nil {
errorChan <- err
return
}
if err := in.verify_finality_update(&finalityUpdate); err != nil {
errorChan <- err
return
}
in.apply_finality_update(&finalityUpdate)

// Fetch and apply optimistic update
// Fetch and apply optimistic update

optimisticUpdate, err := in.RPC.GetOptimisticUpdate()
if err != nil {
return err
}
if err := in.verify_optimistic_update(&optimisticUpdate); err != nil {
optimisticUpdate, err := in.RPC.GetOptimisticUpdate()
if err != nil {
errorChan <- err
return
}
if err := in.verify_optimistic_update(&optimisticUpdate); err != nil {
errorChan <- err
return
}
in.apply_optimistic_update(&optimisticUpdate)
errorChan <- nil
log.Printf("consensus client in sync with checkpoint: 0x%s", hex.EncodeToString(checkpoint[:]))
}()

if err := <-errorChan; err != nil {
return err
}
in.apply_optimistic_update(&optimisticUpdate)

// Log the success message
log.Printf("consensus client in sync with checkpoint: 0x%s", hex.EncodeToString(checkpoint[:]))

return nil
}
func (in *Inner) send_blocks() error {
// Get slot from the optimistic header
slot := in.Store.OptimisticHeader.Slot
payload, err := in.get_execution_payload(context.Background(), &slot)
payload, err := in.get_execution_payload(&slot)
if err != nil {
return err
}

// Get finalized slot from the finalized header
finalizedSlot := in.Store.FinalizedHeader.Slot
finalizedPayload, err := in.get_execution_payload(context.Background(), &finalizedSlot)
finalizedPayload, err := in.get_execution_payload(&finalizedSlot)
if err != nil {
return err
}
@@ -493,12 +558,25 @@ func (in *Inner) duration_until_next_update() time.Duration {
return time.Duration(nextUpdate) * time.Second
}
func (in *Inner) bootstrap(checkpoint [32]byte) {

bootstrap, errInBootstrap := in.RPC.GetBootstrap(checkpoint)
if errInBootstrap != nil {
log.Printf("failed to fetch bootstrap: %v", errInBootstrap)
errorChan := make(chan error, 1)
bootstrapChan := make(chan consensus_core.Bootstrap, 1)
go func() {
bootstrap, errInBootstrap := in.RPC.GetBootstrap(checkpoint)

if errInBootstrap != nil {
log.Printf("failed to fetch bootstrap: %v", errInBootstrap)
errorChan <- errInBootstrap
return
}
bootstrapChan <- bootstrap
errorChan <- nil
}()
if err := <-errorChan; err != nil {
return
}
bootstrap := <-bootstrapChan



isValid := in.is_valid_checkpoint(bootstrap.Header.Slot)
if !isValid {
@@ -547,7 +625,7 @@ func apply_bootstrap(store *LightClientStore, bootstrap consensus_core.Bootstrap

func (in *Inner) verify_generic_update(update *GenericUpdate, expectedCurrentSlot uint64, store *LightClientStore, genesisRoots []byte, forks consensus_core.Forks) error {
{
bits := getBits(update.SyncAggregate.SyncCommitteeBits)
bits := getBits(update.SyncAggregate.SyncCommitteeBits[:])
if bits == 0 {
return ErrInsufficientParticipation
}
@@ -614,6 +692,7 @@ func (in *Inner) verify_generic_update(update *GenericUpdate, expectedCurrentSlo

forkVersion := utils.CalculateForkVersion(&forks, update.SignatureSlot)
forkDataRoot := utils.ComputeForkDataRoot(forkVersion, consensus_core.Bytes32(in.Config.Chain.GenesisRoot))


if !verifySyncCommitteeSignature(pks, &update.AttestedHeader, &update.SyncAggregate.SyncCommitteeSignature, forkDataRoot) {
return ErrInvalidSignature
@@ -653,7 +732,7 @@ func (in *Inner) verify_optimistic_update(update *consensus_core.OptimisticUpdat
return in.verify_generic_update(&genUpdate, in.expected_current_slot(), &in.Store, in.Config.Chain.GenesisRoot, in.Config.Forks)
}
func (in *Inner) apply_generic_update(store *LightClientStore, update *GenericUpdate) *[]byte {
committeeBits := getBits(update.SyncAggregate.SyncCommitteeBits)
committeeBits := getBits(update.SyncAggregate.SyncCommitteeBits[:])

// Update max active participants
if committeeBits > store.CurrentMaxActiveParticipants {
@@ -766,7 +845,7 @@ func (in *Inner) apply_optimistic_update(update *consensus_core.OptimisticUpdate
}
}
func (in *Inner) Log_finality_update(update *consensus_core.FinalityUpdate) {
participation := float32(getBits(update.SyncAggregate.SyncCommitteeBits)) / 512.0 * 100.0
participation := float32(getBits(update.SyncAggregate.SyncCommitteeBits[:])) / 512.0 * 100.0
decimals := 2
if participation == 100.0 {
decimals = 1
@@ -784,7 +863,7 @@ func (in *Inner) Log_finality_update(update *consensus_core.FinalityUpdate) {
)
}
func (in *Inner) Log_optimistic_update(update *consensus_core.OptimisticUpdate) {
participation := float32(getBits(update.SyncAggregate.SyncCommitteeBits)) / 512.0 * 100.0
participation := float32(getBits(update.SyncAggregate.SyncCommitteeBits[:])) / 512.0 * 100.0
decimals := 2
if participation == 100.0 {
decimals = 1
@@ -1005,7 +1084,7 @@ func processTransaction(txBytes *[1073741824]byte, blockHash consensus_core.Byte
}

// getBits counts the number of bits set to 1 in a [64]byte array
func getBits(bitfield [64]byte) uint64 {
func getBits(bitfield []byte) uint64 {
var count uint64
for _, b := range bitfield {
count += uint64(popCount(b))