Skip to content

Commit

Permalink
Caplin: Remove kv.Proposers (#13684)
Browse files Browse the repository at this point in the history
it is actually useless and buggy. there is a more elagant way to
implement proposers endpoint with rando mixes instead.
  • Loading branch information
Giulio2002 authored Feb 4, 2025
1 parent 1744780 commit 859f3a1
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 77 deletions.
16 changes: 1 addition & 15 deletions cl/antiquary/beacon_states_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ type beaconStatesCollector struct {
balancesCollector *etl.Collector
randaoMixesCollector *etl.Collector
intraRandaoMixesCollector *etl.Collector
proposersCollector *etl.Collector
slashingsCollector *etl.Collector
blockRootsCollector *etl.Collector
stateRootsCollector *etl.Collector
Expand Down Expand Up @@ -107,7 +106,6 @@ func newBeaconStatesCollector(beaconCfg *clparams.BeaconChainConfig, tmpdir stri
balancesCollector: etl.NewCollector(kv.ValidatorBalance, tmpdir, makeETLBuffer(), logger).LogLvl(log.LvlTrace),
randaoMixesCollector: etl.NewCollector(kv.RandaoMixes, tmpdir, makeETLBuffer(), logger).LogLvl(log.LvlTrace),
intraRandaoMixesCollector: etl.NewCollector(kv.IntraRandaoMixes, tmpdir, makeETLBuffer(), logger).LogLvl(log.LvlTrace),
proposersCollector: etl.NewCollector(kv.Proposers, tmpdir, makeETLBuffer(), logger).LogLvl(log.LvlTrace),
slashingsCollector: etl.NewCollector(kv.ValidatorSlashings, tmpdir, makeETLBuffer(), logger).LogLvl(log.LvlTrace),
blockRootsCollector: etl.NewCollector(kv.BlockRoot, tmpdir, makeETLBuffer(), logger).LogLvl(log.LvlTrace),
stateRootsCollector: etl.NewCollector(kv.StateRoot, tmpdir, makeETLBuffer(), logger).LogLvl(log.LvlTrace),
Expand Down Expand Up @@ -135,11 +133,6 @@ func (i *beaconStatesCollector) addGenesisState(ctx context.Context, state *stat
i.compressor.Reset(i.buf)

slot := state.Slot()
epoch := slot / i.beaconCfg.SlotsPerEpoch
// Setup state events handlers
if err := i.proposersCollector.Collect(base_encoding.Encode64ToBytes4(epoch), getProposerDutiesValue(state)); err != nil {
return err
}

events := state_accessors.NewStateEvents()

Expand Down Expand Up @@ -265,10 +258,6 @@ func (i *beaconStatesCollector) collectActiveIndices(epoch uint64, activeIndices
return i.activeValidatorIndiciesCollector.Collect(base_encoding.Encode64ToBytes4(slot), i.buf.Bytes())
}

func (i *beaconStatesCollector) collectFlattenedProposers(epoch uint64, proposers []byte) error {
return i.proposersCollector.Collect(base_encoding.Encode64ToBytes4(epoch), proposers)
}

func (i *beaconStatesCollector) collectCurrentSyncCommittee(slot uint64, committee *solid.SyncCommittee) error {
roundedSlot := i.beaconCfg.RoundSlotToSyncCommitteePeriod(slot)
return i.currentSyncCommitteeCollector.Collect(base_encoding.Encode64ToBytes4(roundedSlot), committee[:])
Expand Down Expand Up @@ -323,9 +312,7 @@ func (i *beaconStatesCollector) flush(ctx context.Context, tx kv.RwTx) error {
if err := i.balancesCollector.Load(tx, kv.ValidatorBalance, loadfunc, etl.TransformArgs{Quit: ctx.Done()}); err != nil {
return err
}
if err := i.proposersCollector.Load(tx, kv.Proposers, loadfunc, etl.TransformArgs{Quit: ctx.Done()}); err != nil {
return err
}

if err := i.slashingsCollector.Load(tx, kv.ValidatorSlashings, loadfunc, etl.TransformArgs{Quit: ctx.Done()}); err != nil {
return err
}
Expand Down Expand Up @@ -374,7 +361,6 @@ func (i *beaconStatesCollector) close() {
i.balancesCollector.Close()
i.randaoMixesCollector.Close()
i.intraRandaoMixesCollector.Close()
i.proposersCollector.Close()
i.slashingsCollector.Close()
i.blockRootsCollector.Close()
i.stateRootsCollector.Close()
Expand Down
2 changes: 1 addition & 1 deletion cl/antiquary/state_antiquary.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ func (s *Antiquary) IncrementBeaconState(ctx context.Context, to uint64) error {
); err != nil {
return err
}
return stateAntiquaryCollector.collectFlattenedProposers(epoch, getProposerDutiesValue(s.currentState))
return nil
},
OnNewBlockRoot: func(index int, root libcommon.Hash) error {
return stateAntiquaryCollector.collectBlockRoot(s.currentState.Slot(), root)
Expand Down
73 changes: 31 additions & 42 deletions cl/beacon/handler/duties_proposer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package handler
import (
"crypto/sha256"
"encoding/binary"
"errors"
"fmt"
"net/http"
"sync"

Expand Down Expand Up @@ -48,58 +48,47 @@ func (a *ApiHandler) getDutiesProposer(w http.ResponseWriter, r *http.Request) (
return nil, err
}

if epoch < a.forkchoiceStore.FinalizedCheckpoint().Epoch {
tx, err := a.indiciesDB.BeginRo(r.Context())
if err != nil {
return nil, err
}
defer tx.Rollback()
view := a.caplinStateSnapshots.View()
defer view.Close()

indicies, err := state_accessors.ReadProposersInEpoch(state_accessors.GetValFnTxAndSnapshot(tx, view), epoch)
if err != nil {
return nil, err
}
if len(indicies) == 0 {
return nil, beaconhttp.NewEndpointError(http.StatusNotFound, errors.New("no proposers for this epoch. either this range was prune or not backfilled"))
}
duties := make([]proposerDuties, len(indicies))
for i, validatorIndex := range indicies {
var pk libcommon.Bytes48
pk, err := a.syncedData.ValidatorPublicKeyByIndex(int(validatorIndex))
if err != nil {
return nil, err
}
duties[i] = proposerDuties{
Pubkey: pk,
ValidatorIndex: validatorIndex,
Slot: epoch*a.beaconChainCfg.SlotsPerEpoch + uint64(i),
}
}
return newBeaconResponse(duties).
WithOptimistic(a.forkchoiceStore.IsHeadOptimistic()).
WithFinalized(true).
WithVersion(a.beaconChainCfg.GetCurrentStateVersion(epoch)).
With("dependent_root", dependentRoot), nil
}
marginEpochs := uint64(2 << 13)

expectedSlot := epoch * a.beaconChainCfg.SlotsPerEpoch

duties := make([]proposerDuties, a.beaconChainCfg.SlotsPerEpoch)
wg := sync.WaitGroup{}

if err := a.syncedData.ViewHeadState(func(s *state.CachingBeaconState) error {
for slot := expectedSlot; slot < expectedSlot+a.beaconChainCfg.SlotsPerEpoch; slot++ {
// Lets do proposer index computation
mixPosition := (epoch + a.beaconChainCfg.EpochsPerHistoricalVector - a.beaconChainCfg.MinSeedLookahead - 1) %
a.beaconChainCfg.EpochsPerHistoricalVector
// Lets do proposer index computation
mixPosition := (epoch + a.beaconChainCfg.EpochsPerHistoricalVector - a.beaconChainCfg.MinSeedLookahead - 1) %
a.beaconChainCfg.EpochsPerHistoricalVector

var mix libcommon.Hash
if epoch+marginEpochs > a.forkchoiceStore.FinalizedCheckpoint().Epoch {
// Input for the seed hash.
mix := s.GetRandaoMix(int(mixPosition))
input := shuffling2.GetSeed(a.beaconChainCfg, mix, epoch, a.beaconChainCfg.DomainBeaconProposer)
mix = s.GetRandaoMix(int(mixPosition))
} else {
tx, err := a.indiciesDB.BeginRo(r.Context())
if err != nil {
return err
}
defer tx.Rollback()
view := a.caplinStateSnapshots.View()
defer view.Close()

// read the mix from the database
mix, err = a.stateReader.ReadRandaoMixBySlotAndIndex(tx, state_accessors.GetValFnTxAndSnapshot(tx, view), expectedSlot, mixPosition)
if err != nil {
return err
}
if mix == (libcommon.Hash{}) {
return beaconhttp.NewEndpointError(http.StatusNotFound, fmt.Errorf("mix not found for slot %d and index %d. maybe block was not backfilled or range was pruned", expectedSlot, mixPosition))
}
}

for slot := expectedSlot; slot < expectedSlot+a.beaconChainCfg.SlotsPerEpoch; slot++ {

slotByteArray := make([]byte, 8)
binary.LittleEndian.PutUint64(slotByteArray, slot)

input := shuffling2.GetSeed(a.beaconChainCfg, mix, epoch, a.beaconChainCfg.DomainBeaconProposer)
// Add slot to the end of the input.
inputWithSlot := append(input[:], slotByteArray...)
hash := sha256.New()
Expand Down
18 changes: 0 additions & 18 deletions cl/persistence/state/state_accessors.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,21 +174,3 @@ func ReadActiveIndicies(getFn GetValFn, slot uint64) ([]uint64, error) {
buf := bytes.NewBuffer(v)
return base_encoding.ReadRabbits(nil, buf)
}

func ReadProposersInEpoch(getFn GetValFn, epoch uint64) ([]uint64, error) {
key := base_encoding.Encode64ToBytes4(epoch)

indiciesBytes, err := getFn(kv.Proposers, key)
if err != nil {
return nil, err
}
if len(indiciesBytes) == 0 {
return nil, nil
}
var ret []uint64
for i := 0; i < len(indiciesBytes); i += 4 {
validatorIndex := binary.BigEndian.Uint32(indiciesBytes[i : i+4])
ret = append(ret, uint64(validatorIndex))
}
return ret, nil
}
1 change: 0 additions & 1 deletion turbo/snapshotsync/caplin_state_snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ func MakeCaplinStateSnapshotsTypes(db kv.RoDB) SnapshotTypes {
kv.Eth1DataVotes: getKvGetterForStateTable(db, kv.Eth1DataVotes),
kv.IntraRandaoMixes: getKvGetterForStateTable(db, kv.IntraRandaoMixes),
kv.RandaoMixes: getKvGetterForStateTable(db, kv.RandaoMixes),
kv.Proposers: getKvGetterForStateTable(db, kv.Proposers),
kv.BalancesDump: getKvGetterForStateTable(db, kv.BalancesDump),
kv.EffectiveBalancesDump: getKvGetterForStateTable(db, kv.EffectiveBalancesDump),
},
Expand Down

0 comments on commit 859f3a1

Please sign in to comment.