-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Redesign of the attestation pool #14324
Conversation
proto/prysm/v1alpha1/attestation.go
Outdated
// IsNil -- | ||
func (a *Attestation) IsNil() bool { | ||
return a == nil || | ||
a.Data == nil || | ||
a.Data.Source == nil || | ||
a.Data.Target == nil || | ||
a.AggregationBits == nil | ||
} | ||
|
||
// IsAggregated -- | ||
func (a *Attestation) IsAggregated() bool { | ||
return a.AggregationBits.Count() > 1 | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I created these two methods because I ran into import cycles when trying to use existing helpers. That aside, I think having such methods is nicer.
type AttestationCache struct { | ||
a *AttestationConsensusData | ||
// AttestationDataCache stores cached results of AttestationData requests. | ||
type AttestationDataCache struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I renamed this because I wanted the new pool to be called AttestationCache
@@ -70,12 +70,6 @@ func IsAggregator(committeeCount uint64, slotSig []byte) (bool, error) { | |||
return binary.LittleEndian.Uint64(b[:8])%modulo == 0, nil | |||
} | |||
|
|||
// IsAggregated returns true if the attestation is an aggregated attestation, | |||
// false otherwise. | |||
func IsAggregated(attestation ethpb.Att) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved to a method defined on the attestation interface
@@ -0,0 +1,87 @@ | |||
package forkchoice |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved to a separate package to avoid dependency cyles
) | ||
|
||
// Attestations -- | ||
type Attestations struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I created a new type to hold forkchoice attestations to be able to reuse it between the old and new pools
attCount = promauto.NewGauge( | ||
prometheus.GaugeOpts{ | ||
Name: "attestations_in_pool_total", | ||
Help: "The number of attestations in the pool.", | ||
}, | ||
) | ||
expiredAtts = promauto.NewCounter(prometheus.CounterOpts{ | ||
Name: "expired_atts_total", | ||
Help: "The number of expired and deleted attestations in the pool.", | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no longer a need to have separate metrics for unaggregated and aggregated atts, so I created new metrics for the new design
log.WithError(err).Error("Could not get expiry slot") | ||
continue | ||
} | ||
numExpired := s.cfg.Cache.PruneBefore(expirySlot) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of checking each attestation for expiry, we pass the expiry slot to the cache. That way we don't need to hash anything.
if err := vs.AttPool.SaveUnaggregatedAttestation(attCopy); err != nil { | ||
log.WithError(err).Error("Could not save unaggregated attestation") | ||
return | ||
if features.Get().EnableExperimentalAttestationPool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should spin up a goroutine here as is the case with the old design. Adding an attestation is very fast.
@@ -409,15 +411,15 @@ func (a proposerAtts) dedup() (proposerAtts, error) { | |||
} | |||
|
|||
// This filters the input attestations to return a list of valid attestations to be packaged inside a beacon block. | |||
func (vs *Server) validateAndDeleteAttsInPool(ctx context.Context, st state.BeaconState, atts []ethpb.Att) ([]ethpb.Att, error) { | |||
func (vs *Server) validateAndDeleteAttsInPool(ctx context.Context, st state.BeaconState, atts []ethpb.Att) []ethpb.Att { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed the signature of this function because not being able to delete an attestation from the pool should not stop proposing
if err != nil { | ||
return errors.Wrap(err, "could not determine if attestation pool has this attestation") | ||
if features.Get().EnableExperimentalAttestationPool { | ||
return s.cfg.attestationCache.Add(a) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't make sense to perform a redundancy check here (similarly to how the old design calls HasAggregateAttestation
) because this function deals with unaggregated atts.
return count | ||
} | ||
|
||
func (c *AttestationCache) DeleteCovered(att ethpb.Att) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DeleteCovered
is much more accurate than the current DeleteAggregateAttestation
. The current name is misleading because we do not delete the passed attestation, but all attestations in the pool whose bits are covered by the incoming attestation.
return pruneCount | ||
} | ||
|
||
func (c *AttestationCache) AggregateIsRedundant(att ethpb.Att) (bool, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AggregateIsRedundant
is much more accurate than the current HasAggregateAttestation
. The current name is misleading because we check if there's an attestation in the pool whose bits cover all bits of the incoming attestation.
Are you sure about this, at least on forkchoice, attestations that come from a block are treated differently than those that don't .
|
return err | ||
if features.Get().EnableExperimentalAttestationPool { | ||
if err := s.cfg.AttestationCache.DeleteCovered(att); err != nil { | ||
return errors.Wrap(err, "could not delete attestation") | ||
} | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} else { |
if att.IsAggregated() { | ||
if err := s.cfg.AttPool.DeleteAggregatedAttestation(att); err != nil { | ||
return err | ||
} | ||
} else { | ||
if err := s.cfg.AttPool.DeleteUnaggregatedAttestation(att); err != nil { | ||
return err | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if att.IsAggregated() { | |
if err := s.cfg.AttPool.DeleteAggregatedAttestation(att); err != nil { | |
return err | |
} | |
} else { | |
if err := s.cfg.AttPool.DeleteUnaggregatedAttestation(att); err != nil { | |
return err | |
} | |
else if att.IsAggregated() { | |
if err := s.cfg.AttPool.DeleteAggregatedAttestation(att); err != nil { | |
return err | |
} | |
} else { | |
if err := s.cfg.AttPool.DeleteUnaggregatedAttestation(att); err != nil { | |
return err | |
} | |
} |
beacon-chain/cache/attestation.go
Outdated
|
||
type attGroup struct { | ||
slot primitives.Slot | ||
local ethpb.Att |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to document somewhere that this design in principle can result in worse aggregates since we lose the ability to aggregate some single bit attestations in case of overlaps with incoming aggregates.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
} | ||
|
||
type AttestationCache struct { | ||
atts map[attestation.Id]*attGroup |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to monitor for memory leaks as this will be a high impact path.
|
||
// NewId -- | ||
func NewId(att ethpb.Att, source IdSource) (Id, error) { | ||
if err := helpers.ValidateNilAttestation(att); err != nil { | ||
return Id{}, err | ||
if att.IsNil() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to document somewhere that I dislike this function, it uses a string method separating with commas for no reason that I can see, on a hot path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried using bytes instead of strings and came up with this code (because the size of int
is machine-dependent, I have to convert it to a fixed-size integer - binary.Write
works only with fixed-size data):
indicesUint := make([]uint16, len(committeeIndices))
for i := range committeeIndices {
indicesUint[i] = uint16(committeeIndices[i])
}
buf := new(bytes.Buffer)
if err = binary.Write(buf, binary.LittleEndian, indicesUint); err != nil {
return Id{}, err
}
h = hash.Hash(append(dataHash[:], buf.Bytes()...))
Running NewId
with 16 bits set
BenchmarkNewId-12 249559 5036 ns/op
Running the string version
BenchmarkNewId-12 275055 4297 ns/op
Do you have an idea how to make this faster?
} | ||
} | ||
|
||
func (c *AttestationCache) Add(att ethpb.Att) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Godoc missing
return pruneCount | ||
} | ||
|
||
func (c *AttestationCache) AggregateIsRedundant(att ethpb.Att) (bool, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Godoc missing
} else if redundant { | ||
return true, nil | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we check against our local version?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure, I will add this
return c.forkchoiceAtts.DeleteForkchoiceAttestation(att) | ||
} | ||
|
||
func GetBySlotAndCommitteeIndex[T ethpb.Att](c *AttestationCache, slot primitives.Slot, committeeIndex primitives.CommitteeIndex) []T { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Godoc missing
beacon-chain/cache/attestation.go
Outdated
local, ok := group.local.(T) | ||
if ok { | ||
if local.GetData().Slot == slot && local.CommitteeBitsVal().BitAt(uint64(committeeIndex)) { | ||
result = append(result, local) | ||
for _, a := range group.external { | ||
a, ok := a.(T) | ||
if ok { | ||
result = append(result, a) | ||
} | ||
} | ||
} | ||
} else if len(group.external) > 0 { | ||
// We can safely compare the first attestation because all attestations in a group | ||
// must have the same slot and committee index, since they are under the same key. | ||
a, ok := group.external[0].(T) | ||
if ok && a.GetData().Slot == slot && a.CommitteeBitsVal().BitAt(uint64(committeeIndex)) { | ||
for _, a := range group.external { | ||
a, ok := a.(T) | ||
if ok { | ||
result = append(result, a) | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems better since it has less nested components and no repeated snippets, in addition it avoids extra checks when the slot or committee indices are just wrong.
local, ok := group.local.(T) | |
if ok { | |
if local.GetData().Slot == slot && local.CommitteeBitsVal().BitAt(uint64(committeeIndex)) { | |
result = append(result, local) | |
for _, a := range group.external { | |
a, ok := a.(T) | |
if ok { | |
result = append(result, a) | |
} | |
} | |
} | |
} else if len(group.external) > 0 { | |
// We can safely compare the first attestation because all attestations in a group | |
// must have the same slot and committee index, since they are under the same key. | |
a, ok := group.external[0].(T) | |
if ok && a.GetData().Slot == slot && a.CommitteeBitsVal().BitAt(uint64(committeeIndex)) { | |
for _, a := range group.external { | |
a, ok := a.(T) | |
if ok { | |
result = append(result, a) | |
} | |
} | |
} | |
} | |
addExternal := false | |
local, ok := group.local.(T) | |
if ok { | |
if local.GetData().Slot != slot || !local.CommitteeBitsVal().BitAt(uint64(committeeIndex)) { | |
continue | |
} | |
result = append(result, local) | |
addExternal = true | |
} else if len(group.external) > 0 { | |
// We can safely compare the first attestation because all attestations in a group | |
// must have the same slot and committee index, since they are under the same key. | |
a, ok := group.external[0].(T) | |
if ok && a.GetData().Slot == slot && a.CommitteeBitsVal().BitAt(uint64(committeeIndex)) { | |
addExternal = true | |
} | |
} | |
if !addExternal { | |
continue | |
} | |
for _, a := range group.external { | |
a, ok := a.(T) | |
if ok { | |
result = append(result, a) | |
} | |
} |
21c8a27
to
dd2305f
Compare
# Conflicts: # CHANGELOG.md # beacon-chain/rpc/eth/beacon/handlers_pool.go # beacon-chain/rpc/eth/validator/handlers.go
|
||
// SaveForkchoiceAttestation saves a forkchoice attestation. | ||
func (a *Attestations) SaveForkchoiceAttestation(att ethpb.Att) error { | ||
if att == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you will need to integrate that nil check that will be merged in
# Conflicts: # CHANGELOG.md # beacon-chain/operations/attestations/kv/forkchoice.go # beacon-chain/operations/attestations/kv/unaggregated.go # proto/prysm/v1alpha1/attestation.go
} | ||
|
||
a := group.atts[0] | ||
bit := att.GetAggregationBits().BitIndices()[0] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it guaranteed the bitfield will have a non-zero amount of bits ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a guard at the beginning of the function
|
||
for _, a := range group.atts { | ||
if redundant, err := a.GetAggregationBits().Contains(att.GetAggregationBits()); err != nil { | ||
return true, err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we returning true
here ? shouldn't it be false because we get an error checking it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure which one is better. I chose to return true
in error paths because the aggregate being redundant is less of a happy path, in opposition to an aggregate being not redundant (hence useful).
@@ -2656,7 +2656,7 @@ func TestProposer_FilterAttestation(t *testing.T) { | |||
HeadFetcher: &mock.ChainService{State: st, Root: genesisRoot[:]}, | |||
} | |||
atts := tt.inputAtts() | |||
received, err := proposerServer.validateAndDeleteAttsInPool(context.Background(), st, atts) | |||
received := proposerServer.validateAndDeleteAttsInPool(context.Background(), st, atts) | |||
if tt.wantedErr != "" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error is removed from this signature, so should you delete the error handling things below?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partial review, will resume later
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
# Conflicts: # CHANGELOG.md # beacon-chain/rpc/eth/beacon/handlers_pool.go # beacon-chain/sync/validate_aggregate_proof.go
) | ||
|
||
// Attestations -- | ||
type Attestations struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't get this. What makes it "fork choice"? Seems like generic attestation cache
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I renamed the package to attmap
. See 2610005
@@ -22,12 +21,12 @@ const ( | |||
) | |||
|
|||
// Id represents an attestation ID. Its uniqueness depends on the IdSource provided when constructing the Id. | |||
type Id [33]byte |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why change this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is what Potuz told me:
I will strongly recommend that you just replkace this with [32]bytes and do the first byte to be the version, and the remaining 31 to be the hash[1:32]
having slices of unaligned data will either double the memory footprint of all these instances or add CPU latency to them
unnecessarily since you don't really care about the full hash anyway it seems
2610005
to
0aebdf4
Compare
# Conflicts: # CHANGELOG.md
Idea
The new attestation pool is called an
AttestationCache
. The cache holds amap[attestation.Id]*attGroup
where anattGroup
is defined as:It groups together all attestations for a single slot. When we add an attestation to the cache by calling
Add()
, we either create a new group with this attestation (if this is the first attestation for some slot) or two things can happen:The first bullet point above means that we keep one aggregate attestation to which we keep appending bits as new single-bit attestations arrive. This means that at any point during seconds 0-4 of a slot we will have only one attestation for this slot in the cache. This results in much much less bookeeping and hashing of attestations for comparisons etc.
Impact
I took profiles of the total allocated space over a span of 30 minutes. The first image is from the old design, the second image is from the new one. There's a 87.5% reduction from 20% to 2.5% in the
NewId
function which hashes attestations.Open questions