From 859f3a1cd11ca8f6023c16ee95e3086d4bd557cc Mon Sep 17 00:00:00 2001 From: Giulio rebuffo Date: Tue, 4 Feb 2025 06:49:34 +0100 Subject: [PATCH] Caplin: Remove `kv.Proposers` (#13684) it is actually useless and buggy. there is a more elagant way to implement proposers endpoint with rando mixes instead. --- cl/antiquary/beacon_states_collector.go | 16 +---- cl/antiquary/state_antiquary.go | 2 +- cl/beacon/handler/duties_proposer.go | 73 +++++++++----------- cl/persistence/state/state_accessors.go | 18 ----- turbo/snapshotsync/caplin_state_snapshots.go | 1 - 5 files changed, 33 insertions(+), 77 deletions(-) diff --git a/cl/antiquary/beacon_states_collector.go b/cl/antiquary/beacon_states_collector.go index 2f602e14755..e148cf2b990 100644 --- a/cl/antiquary/beacon_states_collector.go +++ b/cl/antiquary/beacon_states_collector.go @@ -63,7 +63,6 @@ type beaconStatesCollector struct { balancesCollector *etl.Collector randaoMixesCollector *etl.Collector intraRandaoMixesCollector *etl.Collector - proposersCollector *etl.Collector slashingsCollector *etl.Collector blockRootsCollector *etl.Collector stateRootsCollector *etl.Collector @@ -107,7 +106,6 @@ func newBeaconStatesCollector(beaconCfg *clparams.BeaconChainConfig, tmpdir stri balancesCollector: etl.NewCollector(kv.ValidatorBalance, tmpdir, makeETLBuffer(), logger).LogLvl(log.LvlTrace), randaoMixesCollector: etl.NewCollector(kv.RandaoMixes, tmpdir, makeETLBuffer(), logger).LogLvl(log.LvlTrace), intraRandaoMixesCollector: etl.NewCollector(kv.IntraRandaoMixes, tmpdir, makeETLBuffer(), logger).LogLvl(log.LvlTrace), - proposersCollector: etl.NewCollector(kv.Proposers, tmpdir, makeETLBuffer(), logger).LogLvl(log.LvlTrace), slashingsCollector: etl.NewCollector(kv.ValidatorSlashings, tmpdir, makeETLBuffer(), logger).LogLvl(log.LvlTrace), blockRootsCollector: etl.NewCollector(kv.BlockRoot, tmpdir, makeETLBuffer(), logger).LogLvl(log.LvlTrace), stateRootsCollector: etl.NewCollector(kv.StateRoot, tmpdir, makeETLBuffer(), logger).LogLvl(log.LvlTrace), @@ -135,11 +133,6 @@ func (i *beaconStatesCollector) addGenesisState(ctx context.Context, state *stat i.compressor.Reset(i.buf) slot := state.Slot() - epoch := slot / i.beaconCfg.SlotsPerEpoch - // Setup state events handlers - if err := i.proposersCollector.Collect(base_encoding.Encode64ToBytes4(epoch), getProposerDutiesValue(state)); err != nil { - return err - } events := state_accessors.NewStateEvents() @@ -265,10 +258,6 @@ func (i *beaconStatesCollector) collectActiveIndices(epoch uint64, activeIndices return i.activeValidatorIndiciesCollector.Collect(base_encoding.Encode64ToBytes4(slot), i.buf.Bytes()) } -func (i *beaconStatesCollector) collectFlattenedProposers(epoch uint64, proposers []byte) error { - return i.proposersCollector.Collect(base_encoding.Encode64ToBytes4(epoch), proposers) -} - func (i *beaconStatesCollector) collectCurrentSyncCommittee(slot uint64, committee *solid.SyncCommittee) error { roundedSlot := i.beaconCfg.RoundSlotToSyncCommitteePeriod(slot) return i.currentSyncCommitteeCollector.Collect(base_encoding.Encode64ToBytes4(roundedSlot), committee[:]) @@ -323,9 +312,7 @@ func (i *beaconStatesCollector) flush(ctx context.Context, tx kv.RwTx) error { if err := i.balancesCollector.Load(tx, kv.ValidatorBalance, loadfunc, etl.TransformArgs{Quit: ctx.Done()}); err != nil { return err } - if err := i.proposersCollector.Load(tx, kv.Proposers, loadfunc, etl.TransformArgs{Quit: ctx.Done()}); err != nil { - return err - } + if err := i.slashingsCollector.Load(tx, kv.ValidatorSlashings, loadfunc, etl.TransformArgs{Quit: ctx.Done()}); err != nil { return err } @@ -374,7 +361,6 @@ func (i *beaconStatesCollector) close() { i.balancesCollector.Close() i.randaoMixesCollector.Close() i.intraRandaoMixesCollector.Close() - i.proposersCollector.Close() i.slashingsCollector.Close() i.blockRootsCollector.Close() i.stateRootsCollector.Close() diff --git a/cl/antiquary/state_antiquary.go b/cl/antiquary/state_antiquary.go index 4e6cb2e9412..57a12568d65 100644 --- a/cl/antiquary/state_antiquary.go +++ b/cl/antiquary/state_antiquary.go @@ -321,7 +321,7 @@ func (s *Antiquary) IncrementBeaconState(ctx context.Context, to uint64) error { ); err != nil { return err } - return stateAntiquaryCollector.collectFlattenedProposers(epoch, getProposerDutiesValue(s.currentState)) + return nil }, OnNewBlockRoot: func(index int, root libcommon.Hash) error { return stateAntiquaryCollector.collectBlockRoot(s.currentState.Slot(), root) diff --git a/cl/beacon/handler/duties_proposer.go b/cl/beacon/handler/duties_proposer.go index 6caa34c243f..52d91fea19b 100644 --- a/cl/beacon/handler/duties_proposer.go +++ b/cl/beacon/handler/duties_proposer.go @@ -19,7 +19,7 @@ package handler import ( "crypto/sha256" "encoding/binary" - "errors" + "fmt" "net/http" "sync" @@ -48,41 +48,7 @@ func (a *ApiHandler) getDutiesProposer(w http.ResponseWriter, r *http.Request) ( return nil, err } - if epoch < a.forkchoiceStore.FinalizedCheckpoint().Epoch { - tx, err := a.indiciesDB.BeginRo(r.Context()) - if err != nil { - return nil, err - } - defer tx.Rollback() - view := a.caplinStateSnapshots.View() - defer view.Close() - - indicies, err := state_accessors.ReadProposersInEpoch(state_accessors.GetValFnTxAndSnapshot(tx, view), epoch) - if err != nil { - return nil, err - } - if len(indicies) == 0 { - return nil, beaconhttp.NewEndpointError(http.StatusNotFound, errors.New("no proposers for this epoch. either this range was prune or not backfilled")) - } - duties := make([]proposerDuties, len(indicies)) - for i, validatorIndex := range indicies { - var pk libcommon.Bytes48 - pk, err := a.syncedData.ValidatorPublicKeyByIndex(int(validatorIndex)) - if err != nil { - return nil, err - } - duties[i] = proposerDuties{ - Pubkey: pk, - ValidatorIndex: validatorIndex, - Slot: epoch*a.beaconChainCfg.SlotsPerEpoch + uint64(i), - } - } - return newBeaconResponse(duties). - WithOptimistic(a.forkchoiceStore.IsHeadOptimistic()). - WithFinalized(true). - WithVersion(a.beaconChainCfg.GetCurrentStateVersion(epoch)). - With("dependent_root", dependentRoot), nil - } + marginEpochs := uint64(2 << 13) expectedSlot := epoch * a.beaconChainCfg.SlotsPerEpoch @@ -90,16 +56,39 @@ func (a *ApiHandler) getDutiesProposer(w http.ResponseWriter, r *http.Request) ( wg := sync.WaitGroup{} if err := a.syncedData.ViewHeadState(func(s *state.CachingBeaconState) error { - for slot := expectedSlot; slot < expectedSlot+a.beaconChainCfg.SlotsPerEpoch; slot++ { - // Lets do proposer index computation - mixPosition := (epoch + a.beaconChainCfg.EpochsPerHistoricalVector - a.beaconChainCfg.MinSeedLookahead - 1) % - a.beaconChainCfg.EpochsPerHistoricalVector + // Lets do proposer index computation + mixPosition := (epoch + a.beaconChainCfg.EpochsPerHistoricalVector - a.beaconChainCfg.MinSeedLookahead - 1) % + a.beaconChainCfg.EpochsPerHistoricalVector + + var mix libcommon.Hash + if epoch+marginEpochs > a.forkchoiceStore.FinalizedCheckpoint().Epoch { // Input for the seed hash. - mix := s.GetRandaoMix(int(mixPosition)) - input := shuffling2.GetSeed(a.beaconChainCfg, mix, epoch, a.beaconChainCfg.DomainBeaconProposer) + mix = s.GetRandaoMix(int(mixPosition)) + } else { + tx, err := a.indiciesDB.BeginRo(r.Context()) + if err != nil { + return err + } + defer tx.Rollback() + view := a.caplinStateSnapshots.View() + defer view.Close() + + // read the mix from the database + mix, err = a.stateReader.ReadRandaoMixBySlotAndIndex(tx, state_accessors.GetValFnTxAndSnapshot(tx, view), expectedSlot, mixPosition) + if err != nil { + return err + } + if mix == (libcommon.Hash{}) { + return beaconhttp.NewEndpointError(http.StatusNotFound, fmt.Errorf("mix not found for slot %d and index %d. maybe block was not backfilled or range was pruned", expectedSlot, mixPosition)) + } + } + + for slot := expectedSlot; slot < expectedSlot+a.beaconChainCfg.SlotsPerEpoch; slot++ { + slotByteArray := make([]byte, 8) binary.LittleEndian.PutUint64(slotByteArray, slot) + input := shuffling2.GetSeed(a.beaconChainCfg, mix, epoch, a.beaconChainCfg.DomainBeaconProposer) // Add slot to the end of the input. inputWithSlot := append(input[:], slotByteArray...) hash := sha256.New() diff --git a/cl/persistence/state/state_accessors.go b/cl/persistence/state/state_accessors.go index 3340f9425c7..3de95d97965 100644 --- a/cl/persistence/state/state_accessors.go +++ b/cl/persistence/state/state_accessors.go @@ -174,21 +174,3 @@ func ReadActiveIndicies(getFn GetValFn, slot uint64) ([]uint64, error) { buf := bytes.NewBuffer(v) return base_encoding.ReadRabbits(nil, buf) } - -func ReadProposersInEpoch(getFn GetValFn, epoch uint64) ([]uint64, error) { - key := base_encoding.Encode64ToBytes4(epoch) - - indiciesBytes, err := getFn(kv.Proposers, key) - if err != nil { - return nil, err - } - if len(indiciesBytes) == 0 { - return nil, nil - } - var ret []uint64 - for i := 0; i < len(indiciesBytes); i += 4 { - validatorIndex := binary.BigEndian.Uint32(indiciesBytes[i : i+4]) - ret = append(ret, uint64(validatorIndex)) - } - return ret, nil -} diff --git a/turbo/snapshotsync/caplin_state_snapshots.go b/turbo/snapshotsync/caplin_state_snapshots.go index 5579d71cb89..8f257ac6724 100644 --- a/turbo/snapshotsync/caplin_state_snapshots.go +++ b/turbo/snapshotsync/caplin_state_snapshots.go @@ -110,7 +110,6 @@ func MakeCaplinStateSnapshotsTypes(db kv.RoDB) SnapshotTypes { kv.Eth1DataVotes: getKvGetterForStateTable(db, kv.Eth1DataVotes), kv.IntraRandaoMixes: getKvGetterForStateTable(db, kv.IntraRandaoMixes), kv.RandaoMixes: getKvGetterForStateTable(db, kv.RandaoMixes), - kv.Proposers: getKvGetterForStateTable(db, kv.Proposers), kv.BalancesDump: getKvGetterForStateTable(db, kv.BalancesDump), kv.EffectiveBalancesDump: getKvGetterForStateTable(db, kv.EffectiveBalancesDump), },