mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 21:38:05 -05:00
EIP-7549: attestation pool (#14121)
* implementation * test fixes * Electra tests * remove aggregator tests * id comments and tests * make Id equal to [33]byte
This commit is contained in:
@@ -21,12 +21,13 @@ go_library(
|
||||
"//config/features:go_default_library",
|
||||
"//config/params:go_default_library",
|
||||
"//consensus-types/primitives:go_default_library",
|
||||
"//crypto/hash:go_default_library",
|
||||
"//proto/prysm/v1alpha1:go_default_library",
|
||||
"//proto/prysm/v1alpha1/attestation:go_default_library",
|
||||
"//proto/prysm/v1alpha1/attestation/aggregation/attestations:go_default_library",
|
||||
"//time:go_default_library",
|
||||
"//time/slots:go_default_library",
|
||||
"@com_github_hashicorp_golang_lru//:go_default_library",
|
||||
"@com_github_pkg_errors//:go_default_library",
|
||||
"@com_github_prometheus_client_golang//prometheus:go_default_library",
|
||||
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
|
||||
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
|
||||
|
||||
@@ -16,9 +16,10 @@ go_library(
|
||||
"//beacon-chain/core/helpers:go_default_library",
|
||||
"//config/params:go_default_library",
|
||||
"//consensus-types/primitives:go_default_library",
|
||||
"//crypto/hash:go_default_library",
|
||||
"//proto/prysm/v1alpha1:go_default_library",
|
||||
"//proto/prysm/v1alpha1/attestation:go_default_library",
|
||||
"//proto/prysm/v1alpha1/attestation/aggregation/attestations:go_default_library",
|
||||
"//runtime/version:go_default_library",
|
||||
"@com_github_patrickmn_go_cache//:go_default_library",
|
||||
"@com_github_pkg_errors//:go_default_library",
|
||||
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
|
||||
@@ -39,14 +40,15 @@ go_test(
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//config/fieldparams:go_default_library",
|
||||
"//consensus-types/primitives:go_default_library",
|
||||
"//crypto/bls:go_default_library",
|
||||
"//proto/prysm/v1alpha1:go_default_library",
|
||||
"//proto/prysm/v1alpha1/attestation:go_default_library",
|
||||
"//testing/assert:go_default_library",
|
||||
"//testing/require:go_default_library",
|
||||
"//testing/util:go_default_library",
|
||||
"@com_github_patrickmn_go_cache//:go_default_library",
|
||||
"@com_github_pkg_errors//:go_default_library",
|
||||
"@com_github_prysmaticlabs_fastssz//:go_default_library",
|
||||
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
@@ -9,7 +9,9 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers"
|
||||
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1/attestation"
|
||||
attaggregation "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1/attestation/aggregation/attestations"
|
||||
"github.com/prysmaticlabs/prysm/v5/runtime/version"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"go.opencensus.io/trace"
|
||||
)
|
||||
@@ -32,28 +34,28 @@ func (c *AttCaches) aggregateUnaggregatedAtts(ctx context.Context, unaggregatedA
|
||||
_, span := trace.StartSpan(ctx, "operations.attestations.kv.aggregateUnaggregatedAtts")
|
||||
defer span.End()
|
||||
|
||||
attsByDataRoot := make(map[[32]byte][]ethpb.Att, len(unaggregatedAtts))
|
||||
attsByVerAndDataRoot := make(map[attestation.Id][]ethpb.Att, len(unaggregatedAtts))
|
||||
for _, att := range unaggregatedAtts {
|
||||
attDataRoot, err := att.GetData().HashTreeRoot()
|
||||
id, err := attestation.NewId(att, attestation.Data)
|
||||
if err != nil {
|
||||
return err
|
||||
return errors.Wrap(err, "could not create attestation ID")
|
||||
}
|
||||
attsByDataRoot[attDataRoot] = append(attsByDataRoot[attDataRoot], att)
|
||||
attsByVerAndDataRoot[id] = append(attsByVerAndDataRoot[id], att)
|
||||
}
|
||||
|
||||
// Aggregate unaggregated attestations from the pool and save them in the pool.
|
||||
// Track the unaggregated attestations that aren't able to aggregate.
|
||||
leftOverUnaggregatedAtt := make(map[[32]byte]bool)
|
||||
leftOverUnaggregatedAtt := make(map[attestation.Id]bool)
|
||||
|
||||
leftOverUnaggregatedAtt = c.aggregateParallel(attsByDataRoot, leftOverUnaggregatedAtt)
|
||||
leftOverUnaggregatedAtt = c.aggregateParallel(attsByVerAndDataRoot, leftOverUnaggregatedAtt)
|
||||
|
||||
// Remove the unaggregated attestations from the pool that were successfully aggregated.
|
||||
for _, att := range unaggregatedAtts {
|
||||
h, err := hashFn(att)
|
||||
id, err := attestation.NewId(att, attestation.Full)
|
||||
if err != nil {
|
||||
return err
|
||||
return errors.Wrap(err, "could not create attestation ID")
|
||||
}
|
||||
if leftOverUnaggregatedAtt[h] {
|
||||
if leftOverUnaggregatedAtt[id] {
|
||||
continue
|
||||
}
|
||||
if err := c.DeleteUnaggregatedAttestation(att); err != nil {
|
||||
@@ -66,7 +68,7 @@ func (c *AttCaches) aggregateUnaggregatedAtts(ctx context.Context, unaggregatedA
|
||||
// aggregateParallel aggregates attestations in parallel for `atts` and saves them in the pool,
|
||||
// returns the unaggregated attestations that weren't able to aggregate.
|
||||
// Given `n` CPU cores, it creates a channel of size `n` and spawns `n` goroutines to aggregate attestations
|
||||
func (c *AttCaches) aggregateParallel(atts map[[32]byte][]ethpb.Att, leftOver map[[32]byte]bool) map[[32]byte]bool {
|
||||
func (c *AttCaches) aggregateParallel(atts map[attestation.Id][]ethpb.Att, leftOver map[attestation.Id]bool) map[attestation.Id]bool {
|
||||
var leftoverLock sync.Mutex
|
||||
wg := sync.WaitGroup{}
|
||||
|
||||
@@ -92,13 +94,13 @@ func (c *AttCaches) aggregateParallel(atts map[[32]byte][]ethpb.Att, leftOver ma
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
h, err := hashFn(aggregated)
|
||||
id, err := attestation.NewId(aggregated, attestation.Full)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("could not hash attestation")
|
||||
log.WithError(err).Error("Could not create attestation ID")
|
||||
continue
|
||||
}
|
||||
leftoverLock.Lock()
|
||||
leftOver[h] = true
|
||||
leftOver[id] = true
|
||||
leftoverLock.Unlock()
|
||||
}
|
||||
}
|
||||
@@ -139,17 +141,18 @@ func (c *AttCaches) SaveAggregatedAttestation(att ethpb.Att) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
r, err := hashFn(att.GetData())
|
||||
id, err := attestation.NewId(att, attestation.Data)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not tree hash attestation")
|
||||
return errors.Wrap(err, "could not create attestation ID")
|
||||
}
|
||||
copiedAtt := att.Copy()
|
||||
|
||||
c.aggregatedAttLock.Lock()
|
||||
defer c.aggregatedAttLock.Unlock()
|
||||
atts, ok := c.aggregatedAtt[r]
|
||||
atts, ok := c.aggregatedAtt[id]
|
||||
if !ok {
|
||||
atts := []ethpb.Att{copiedAtt}
|
||||
c.aggregatedAtt[r] = atts
|
||||
c.aggregatedAtt[id] = atts
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -157,7 +160,7 @@ func (c *AttCaches) SaveAggregatedAttestation(att ethpb.Att) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.aggregatedAtt[r] = atts
|
||||
c.aggregatedAtt[id] = atts
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -191,17 +194,56 @@ func (c *AttCaches) AggregatedAttestations() []ethpb.Att {
|
||||
|
||||
// AggregatedAttestationsBySlotIndex returns the aggregated attestations in cache,
|
||||
// filtered by committee index and slot.
|
||||
func (c *AttCaches) AggregatedAttestationsBySlotIndex(ctx context.Context, slot primitives.Slot, committeeIndex primitives.CommitteeIndex) []ethpb.Att {
|
||||
func (c *AttCaches) AggregatedAttestationsBySlotIndex(
|
||||
ctx context.Context,
|
||||
slot primitives.Slot,
|
||||
committeeIndex primitives.CommitteeIndex,
|
||||
) []*ethpb.Attestation {
|
||||
_, span := trace.StartSpan(ctx, "operations.attestations.kv.AggregatedAttestationsBySlotIndex")
|
||||
defer span.End()
|
||||
|
||||
atts := make([]ethpb.Att, 0)
|
||||
atts := make([]*ethpb.Attestation, 0)
|
||||
|
||||
c.aggregatedAttLock.RLock()
|
||||
defer c.aggregatedAttLock.RUnlock()
|
||||
for _, a := range c.aggregatedAtt {
|
||||
if slot == a[0].GetData().Slot && committeeIndex == a[0].GetData().CommitteeIndex {
|
||||
atts = append(atts, a...)
|
||||
for _, as := range c.aggregatedAtt {
|
||||
if as[0].Version() == version.Phase0 && slot == as[0].GetData().Slot && committeeIndex == as[0].GetData().CommitteeIndex {
|
||||
for _, a := range as {
|
||||
att, ok := a.(*ethpb.Attestation)
|
||||
// This will never fail in practice because we asserted the version
|
||||
if ok {
|
||||
atts = append(atts, att)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return atts
|
||||
}
|
||||
|
||||
// AggregatedAttestationsBySlotIndexElectra returns the aggregated attestations in cache,
|
||||
// filtered by committee index and slot.
|
||||
func (c *AttCaches) AggregatedAttestationsBySlotIndexElectra(
|
||||
ctx context.Context,
|
||||
slot primitives.Slot,
|
||||
committeeIndex primitives.CommitteeIndex,
|
||||
) []*ethpb.AttestationElectra {
|
||||
_, span := trace.StartSpan(ctx, "operations.attestations.kv.AggregatedAttestationsBySlotIndexElectra")
|
||||
defer span.End()
|
||||
|
||||
atts := make([]*ethpb.AttestationElectra, 0)
|
||||
|
||||
c.aggregatedAttLock.RLock()
|
||||
defer c.aggregatedAttLock.RUnlock()
|
||||
for _, as := range c.aggregatedAtt {
|
||||
if as[0].Version() == version.Electra && slot == as[0].GetData().Slot && as[0].CommitteeBitsVal().BitAt(uint64(committeeIndex)) {
|
||||
for _, a := range as {
|
||||
att, ok := a.(*ethpb.AttestationElectra)
|
||||
// This will never fail in practice because we asserted the version
|
||||
if ok {
|
||||
atts = append(atts, att)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -216,18 +258,19 @@ func (c *AttCaches) DeleteAggregatedAttestation(att ethpb.Att) error {
|
||||
if !helpers.IsAggregated(att) {
|
||||
return errors.New("attestation is not aggregated")
|
||||
}
|
||||
r, err := hashFn(att.GetData())
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not tree hash attestation data")
|
||||
}
|
||||
|
||||
if err := c.insertSeenBit(att); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
id, err := attestation.NewId(att, attestation.Data)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not create attestation ID")
|
||||
}
|
||||
|
||||
c.aggregatedAttLock.Lock()
|
||||
defer c.aggregatedAttLock.Unlock()
|
||||
attList, ok := c.aggregatedAtt[r]
|
||||
attList, ok := c.aggregatedAtt[id]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
@@ -241,9 +284,9 @@ func (c *AttCaches) DeleteAggregatedAttestation(att ethpb.Att) error {
|
||||
}
|
||||
}
|
||||
if len(filtered) == 0 {
|
||||
delete(c.aggregatedAtt, r)
|
||||
delete(c.aggregatedAtt, id)
|
||||
} else {
|
||||
c.aggregatedAtt[r] = filtered
|
||||
c.aggregatedAtt[id] = filtered
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -254,14 +297,15 @@ func (c *AttCaches) HasAggregatedAttestation(att ethpb.Att) (bool, error) {
|
||||
if err := helpers.ValidateNilAttestation(att); err != nil {
|
||||
return false, err
|
||||
}
|
||||
r, err := hashFn(att.GetData())
|
||||
|
||||
id, err := attestation.NewId(att, attestation.Data)
|
||||
if err != nil {
|
||||
return false, errors.Wrap(err, "could not tree hash attestation")
|
||||
return false, errors.Wrap(err, "could not create attestation ID")
|
||||
}
|
||||
|
||||
c.aggregatedAttLock.RLock()
|
||||
defer c.aggregatedAttLock.RUnlock()
|
||||
if atts, ok := c.aggregatedAtt[r]; ok {
|
||||
if atts, ok := c.aggregatedAtt[id]; ok {
|
||||
for _, a := range atts {
|
||||
if c, err := a.GetAggregationBits().Contains(att.GetAggregationBits()); err != nil {
|
||||
return false, err
|
||||
@@ -273,7 +317,7 @@ func (c *AttCaches) HasAggregatedAttestation(att ethpb.Att) (bool, error) {
|
||||
|
||||
c.blockAttLock.RLock()
|
||||
defer c.blockAttLock.RUnlock()
|
||||
if atts, ok := c.blockAtt[r]; ok {
|
||||
if atts, ok := c.blockAtt[id]; ok {
|
||||
for _, a := range atts {
|
||||
if c, err := a.GetAggregationBits().Contains(att.GetAggregationBits()); err != nil {
|
||||
return false, err
|
||||
|
||||
@@ -7,10 +7,11 @@ import (
|
||||
|
||||
c "github.com/patrickmn/go-cache"
|
||||
"github.com/pkg/errors"
|
||||
fssz "github.com/prysmaticlabs/fastssz"
|
||||
"github.com/prysmaticlabs/go-bitfield"
|
||||
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
|
||||
"github.com/prysmaticlabs/prysm/v5/crypto/bls"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1/attestation"
|
||||
"github.com/prysmaticlabs/prysm/v5/testing/assert"
|
||||
"github.com/prysmaticlabs/prysm/v5/testing/require"
|
||||
"github.com/prysmaticlabs/prysm/v5/testing/util"
|
||||
@@ -69,7 +70,7 @@ func TestKV_Aggregated_SaveAggregatedAttestation(t *testing.T) {
|
||||
}),
|
||||
AggregationBits: bitfield.Bitlist{0b10111},
|
||||
},
|
||||
wantErrString: "could not tree hash attestation: --.BeaconBlockRoot (" + fssz.ErrBytesLength.Error() + ")",
|
||||
wantErrString: "could not create attestation ID",
|
||||
},
|
||||
{
|
||||
name: "already seen",
|
||||
@@ -92,15 +93,13 @@ func TestKV_Aggregated_SaveAggregatedAttestation(t *testing.T) {
|
||||
count: 1,
|
||||
},
|
||||
}
|
||||
r, err := hashFn(util.HydrateAttestationData(ðpb.AttestationData{
|
||||
Slot: 100,
|
||||
}))
|
||||
id, err := attestation.NewId(util.HydrateAttestation(ðpb.Attestation{Data: ðpb.AttestationData{Slot: 100}}), attestation.Data)
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
cache := NewAttCaches()
|
||||
cache.seenAtt.Set(string(r[:]), []bitfield.Bitlist{{0xff}}, c.DefaultExpiration)
|
||||
cache.seenAtt.Set(id.String(), []bitfield.Bitlist{{0xff}}, c.DefaultExpiration)
|
||||
assert.Equal(t, 0, len(cache.unAggregatedAtt), "Invalid start pool, atts: %d", len(cache.unAggregatedAtt))
|
||||
|
||||
err := cache.SaveAggregatedAttestation(tt.att)
|
||||
@@ -230,7 +229,7 @@ func TestKV_Aggregated_DeleteAggregatedAttestation(t *testing.T) {
|
||||
},
|
||||
}
|
||||
err := cache.DeleteAggregatedAttestation(att)
|
||||
wantErr := "could not tree hash attestation data: --.BeaconBlockRoot (" + fssz.ErrBytesLength.Error() + ")"
|
||||
wantErr := "could not create attestation ID"
|
||||
assert.ErrorContains(t, wantErr, err)
|
||||
})
|
||||
|
||||
@@ -500,3 +499,49 @@ func TestKV_Aggregated_DuplicateAggregatedAttestations(t *testing.T) {
|
||||
assert.DeepSSZEqual(t, att2, returned[0], "Did not receive correct aggregated atts")
|
||||
assert.Equal(t, 1, len(returned), "Did not receive correct aggregated atts")
|
||||
}
|
||||
|
||||
func TestKV_Aggregated_AggregatedAttestationsBySlotIndex(t *testing.T) {
|
||||
cache := NewAttCaches()
|
||||
|
||||
att1 := util.HydrateAttestation(ðpb.Attestation{Data: ðpb.AttestationData{Slot: 1, CommitteeIndex: 1}, AggregationBits: bitfield.Bitlist{0b1011}})
|
||||
att2 := util.HydrateAttestation(ðpb.Attestation{Data: ðpb.AttestationData{Slot: 1, CommitteeIndex: 2}, AggregationBits: bitfield.Bitlist{0b1101}})
|
||||
att3 := util.HydrateAttestation(ðpb.Attestation{Data: ðpb.AttestationData{Slot: 2, CommitteeIndex: 1}, AggregationBits: bitfield.Bitlist{0b1101}})
|
||||
atts := []*ethpb.Attestation{att1, att2, att3}
|
||||
|
||||
for _, att := range atts {
|
||||
require.NoError(t, cache.SaveAggregatedAttestation(att))
|
||||
}
|
||||
ctx := context.Background()
|
||||
returned := cache.AggregatedAttestationsBySlotIndex(ctx, 1, 1)
|
||||
assert.DeepEqual(t, []*ethpb.Attestation{att1}, returned)
|
||||
returned = cache.AggregatedAttestationsBySlotIndex(ctx, 1, 2)
|
||||
assert.DeepEqual(t, []*ethpb.Attestation{att2}, returned)
|
||||
returned = cache.AggregatedAttestationsBySlotIndex(ctx, 2, 1)
|
||||
assert.DeepEqual(t, []*ethpb.Attestation{att3}, returned)
|
||||
}
|
||||
|
||||
func TestKV_Aggregated_AggregatedAttestationsBySlotIndexElectra(t *testing.T) {
|
||||
cache := NewAttCaches()
|
||||
|
||||
committeeBits := primitives.NewAttestationCommitteeBits()
|
||||
committeeBits.SetBitAt(1, true)
|
||||
att1 := util.HydrateAttestationElectra(ðpb.AttestationElectra{Data: ðpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b1011}, CommitteeBits: committeeBits})
|
||||
committeeBits = primitives.NewAttestationCommitteeBits()
|
||||
committeeBits.SetBitAt(2, true)
|
||||
att2 := util.HydrateAttestationElectra(ðpb.AttestationElectra{Data: ðpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b1101}, CommitteeBits: committeeBits})
|
||||
committeeBits = primitives.NewAttestationCommitteeBits()
|
||||
committeeBits.SetBitAt(1, true)
|
||||
att3 := util.HydrateAttestationElectra(ðpb.AttestationElectra{Data: ðpb.AttestationData{Slot: 2}, AggregationBits: bitfield.Bitlist{0b1101}, CommitteeBits: committeeBits})
|
||||
atts := []*ethpb.AttestationElectra{att1, att2, att3}
|
||||
|
||||
for _, att := range atts {
|
||||
require.NoError(t, cache.SaveAggregatedAttestation(att))
|
||||
}
|
||||
ctx := context.Background()
|
||||
returned := cache.AggregatedAttestationsBySlotIndexElectra(ctx, 1, 1)
|
||||
assert.DeepEqual(t, []*ethpb.AttestationElectra{att1}, returned)
|
||||
returned = cache.AggregatedAttestationsBySlotIndexElectra(ctx, 1, 2)
|
||||
assert.DeepEqual(t, []*ethpb.AttestationElectra{att2}, returned)
|
||||
returned = cache.AggregatedAttestationsBySlotIndexElectra(ctx, 2, 1)
|
||||
assert.DeepEqual(t, []*ethpb.AttestationElectra{att3}, returned)
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package kv
|
||||
import (
|
||||
"github.com/pkg/errors"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1/attestation"
|
||||
)
|
||||
|
||||
// SaveBlockAttestation saves an block attestation in cache.
|
||||
@@ -10,14 +11,15 @@ func (c *AttCaches) SaveBlockAttestation(att ethpb.Att) error {
|
||||
if att == nil {
|
||||
return nil
|
||||
}
|
||||
r, err := hashFn(att.GetData())
|
||||
|
||||
id, err := attestation.NewId(att, attestation.Data)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not tree hash attestation")
|
||||
return errors.Wrap(err, "could not create attestation ID")
|
||||
}
|
||||
|
||||
c.blockAttLock.Lock()
|
||||
defer c.blockAttLock.Unlock()
|
||||
atts, ok := c.blockAtt[r]
|
||||
atts, ok := c.blockAtt[id]
|
||||
if !ok {
|
||||
atts = make([]ethpb.Att, 0, 1)
|
||||
}
|
||||
@@ -31,7 +33,7 @@ func (c *AttCaches) SaveBlockAttestation(att ethpb.Att) error {
|
||||
}
|
||||
}
|
||||
|
||||
c.blockAtt[r] = append(atts, att.Copy())
|
||||
c.blockAtt[id] = append(atts, att.Copy())
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -54,14 +56,15 @@ func (c *AttCaches) DeleteBlockAttestation(att ethpb.Att) error {
|
||||
if att == nil {
|
||||
return nil
|
||||
}
|
||||
r, err := hashFn(att.GetData())
|
||||
|
||||
id, err := attestation.NewId(att, attestation.Data)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not tree hash attestation")
|
||||
return errors.Wrap(err, "could not create attestation ID")
|
||||
}
|
||||
|
||||
c.blockAttLock.Lock()
|
||||
defer c.blockAttLock.Unlock()
|
||||
delete(c.blockAtt, r)
|
||||
delete(c.blockAtt, id)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package kv
|
||||
import (
|
||||
"github.com/pkg/errors"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1/attestation"
|
||||
)
|
||||
|
||||
// SaveForkchoiceAttestation saves an forkchoice attestation in cache.
|
||||
@@ -10,14 +11,15 @@ func (c *AttCaches) SaveForkchoiceAttestation(att ethpb.Att) error {
|
||||
if att == nil {
|
||||
return nil
|
||||
}
|
||||
r, err := hashFn(att)
|
||||
|
||||
id, err := attestation.NewId(att, attestation.Full)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not tree hash attestation")
|
||||
return errors.Wrap(err, "could not create attestation ID")
|
||||
}
|
||||
|
||||
c.forkchoiceAttLock.Lock()
|
||||
defer c.forkchoiceAttLock.Unlock()
|
||||
c.forkchoiceAtt[r] = att.Copy()
|
||||
c.forkchoiceAtt[id] = att
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -51,14 +53,15 @@ func (c *AttCaches) DeleteForkchoiceAttestation(att ethpb.Att) error {
|
||||
if att == nil {
|
||||
return nil
|
||||
}
|
||||
r, err := hashFn(att)
|
||||
|
||||
id, err := attestation.NewId(att, attestation.Full)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not tree hash attestation")
|
||||
return errors.Wrap(err, "could not create attestation ID")
|
||||
}
|
||||
|
||||
c.forkchoiceAttLock.Lock()
|
||||
defer c.forkchoiceAttLock.Unlock()
|
||||
delete(c.forkchoiceAtt, r)
|
||||
delete(c.forkchoiceAtt, id)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -9,24 +9,22 @@ import (
|
||||
|
||||
"github.com/patrickmn/go-cache"
|
||||
"github.com/prysmaticlabs/prysm/v5/config/params"
|
||||
"github.com/prysmaticlabs/prysm/v5/crypto/hash"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1/attestation"
|
||||
)
|
||||
|
||||
var hashFn = hash.Proto
|
||||
|
||||
// AttCaches defines the caches used to satisfy attestation pool interface.
|
||||
// These caches are KV store for various attestations
|
||||
// such are unaggregated, aggregated or attestations within a block.
|
||||
type AttCaches struct {
|
||||
aggregatedAttLock sync.RWMutex
|
||||
aggregatedAtt map[[32]byte][]ethpb.Att
|
||||
aggregatedAtt map[attestation.Id][]ethpb.Att
|
||||
unAggregateAttLock sync.RWMutex
|
||||
unAggregatedAtt map[[32]byte]ethpb.Att
|
||||
unAggregatedAtt map[attestation.Id]ethpb.Att
|
||||
forkchoiceAttLock sync.RWMutex
|
||||
forkchoiceAtt map[[32]byte]ethpb.Att
|
||||
forkchoiceAtt map[attestation.Id]ethpb.Att
|
||||
blockAttLock sync.RWMutex
|
||||
blockAtt map[[32]byte][]ethpb.Att
|
||||
blockAtt map[attestation.Id][]ethpb.Att
|
||||
seenAtt *cache.Cache
|
||||
}
|
||||
|
||||
@@ -36,10 +34,10 @@ func NewAttCaches() *AttCaches {
|
||||
secsInEpoch := time.Duration(params.BeaconConfig().SlotsPerEpoch.Mul(params.BeaconConfig().SecondsPerSlot))
|
||||
c := cache.New(secsInEpoch*time.Second, 2*secsInEpoch*time.Second)
|
||||
pool := &AttCaches{
|
||||
unAggregatedAtt: make(map[[32]byte]ethpb.Att),
|
||||
aggregatedAtt: make(map[[32]byte][]ethpb.Att),
|
||||
forkchoiceAtt: make(map[[32]byte]ethpb.Att),
|
||||
blockAtt: make(map[[32]byte][]ethpb.Att),
|
||||
unAggregatedAtt: make(map[attestation.Id]ethpb.Att),
|
||||
aggregatedAtt: make(map[attestation.Id][]ethpb.Att),
|
||||
forkchoiceAtt: make(map[attestation.Id]ethpb.Att),
|
||||
blockAtt: make(map[attestation.Id][]ethpb.Att),
|
||||
seenAtt: c,
|
||||
}
|
||||
|
||||
|
||||
@@ -1,21 +1,20 @@
|
||||
package kv
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/patrickmn/go-cache"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/go-bitfield"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1/attestation"
|
||||
)
|
||||
|
||||
func (c *AttCaches) insertSeenBit(att ethpb.Att) error {
|
||||
r, err := hashFn(att.GetData())
|
||||
id, err := attestation.NewId(att, attestation.Data)
|
||||
if err != nil {
|
||||
return err
|
||||
return errors.Wrap(err, "could not create attestation ID")
|
||||
}
|
||||
|
||||
v, ok := c.seenAtt.Get(string(r[:]))
|
||||
v, ok := c.seenAtt.Get(id.String())
|
||||
if ok {
|
||||
seenBits, ok := v.([]bitfield.Bitlist)
|
||||
if !ok {
|
||||
@@ -24,7 +23,7 @@ func (c *AttCaches) insertSeenBit(att ethpb.Att) error {
|
||||
alreadyExists := false
|
||||
for _, bit := range seenBits {
|
||||
if c, err := bit.Contains(att.GetAggregationBits()); err != nil {
|
||||
return fmt.Errorf("failed to check seen bits on attestation when inserting bit: %w", err)
|
||||
return err
|
||||
} else if c {
|
||||
alreadyExists = true
|
||||
break
|
||||
@@ -33,21 +32,21 @@ func (c *AttCaches) insertSeenBit(att ethpb.Att) error {
|
||||
if !alreadyExists {
|
||||
seenBits = append(seenBits, att.GetAggregationBits())
|
||||
}
|
||||
c.seenAtt.Set(string(r[:]), seenBits, cache.DefaultExpiration /* one epoch */)
|
||||
c.seenAtt.Set(id.String(), seenBits, cache.DefaultExpiration /* one epoch */)
|
||||
return nil
|
||||
}
|
||||
|
||||
c.seenAtt.Set(string(r[:]), []bitfield.Bitlist{att.GetAggregationBits()}, cache.DefaultExpiration /* one epoch */)
|
||||
c.seenAtt.Set(id.String(), []bitfield.Bitlist{att.GetAggregationBits()}, cache.DefaultExpiration /* one epoch */)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *AttCaches) hasSeenBit(att ethpb.Att) (bool, error) {
|
||||
r, err := hashFn(att.GetData())
|
||||
id, err := attestation.NewId(att, attestation.Data)
|
||||
if err != nil {
|
||||
return false, err
|
||||
return false, errors.Wrap(err, "could not create attestation ID")
|
||||
}
|
||||
|
||||
v, ok := c.seenAtt.Get(string(r[:]))
|
||||
v, ok := c.seenAtt.Get(id.String())
|
||||
if ok {
|
||||
seenBits, ok := v.([]bitfield.Bitlist)
|
||||
if !ok {
|
||||
@@ -55,7 +54,7 @@ func (c *AttCaches) hasSeenBit(att ethpb.Att) (bool, error) {
|
||||
}
|
||||
for _, bit := range seenBits {
|
||||
if c, err := bit.Contains(att.GetAggregationBits()); err != nil {
|
||||
return false, fmt.Errorf("failed to check seen bits on attestation when reading bit: %w", err)
|
||||
return false, err
|
||||
} else if c {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
|
||||
"github.com/prysmaticlabs/go-bitfield"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1/attestation"
|
||||
"github.com/prysmaticlabs/prysm/v5/testing/require"
|
||||
"github.com/prysmaticlabs/prysm/v5/testing/util"
|
||||
)
|
||||
@@ -39,18 +40,18 @@ func TestAttCaches_hasSeenBit(t *testing.T) {
|
||||
func TestAttCaches_insertSeenBitDuplicates(t *testing.T) {
|
||||
c := NewAttCaches()
|
||||
att1 := util.HydrateAttestation(ðpb.Attestation{AggregationBits: bitfield.Bitlist{0b10000011}})
|
||||
r, err := hashFn(att1.Data)
|
||||
id, err := attestation.NewId(att1, attestation.Data)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, c.insertSeenBit(att1))
|
||||
require.Equal(t, 1, c.seenAtt.ItemCount())
|
||||
|
||||
_, expirationTime1, ok := c.seenAtt.GetWithExpiration(string(r[:]))
|
||||
_, expirationTime1, ok := c.seenAtt.GetWithExpiration(id.String())
|
||||
require.Equal(t, true, ok)
|
||||
|
||||
// Make sure that duplicates are not inserted, but expiration time gets updated.
|
||||
require.NoError(t, c.insertSeenBit(att1))
|
||||
require.Equal(t, 1, c.seenAtt.ItemCount())
|
||||
_, expirationprysmTime, ok := c.seenAtt.GetWithExpiration(string(r[:]))
|
||||
_, expirationprysmTime, ok := c.seenAtt.GetWithExpiration(id.String())
|
||||
require.Equal(t, true, ok)
|
||||
require.Equal(t, true, expirationprysmTime.After(expirationTime1), "Expiration time is not updated")
|
||||
}
|
||||
|
||||
@@ -7,6 +7,8 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers"
|
||||
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1/attestation"
|
||||
"github.com/prysmaticlabs/prysm/v5/runtime/version"
|
||||
"go.opencensus.io/trace"
|
||||
)
|
||||
|
||||
@@ -27,13 +29,14 @@ func (c *AttCaches) SaveUnaggregatedAttestation(att ethpb.Att) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
r, err := hashFn(att)
|
||||
id, err := attestation.NewId(att, attestation.Full)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not tree hash attestation")
|
||||
return errors.Wrap(err, "could not create attestation ID")
|
||||
}
|
||||
|
||||
c.unAggregateAttLock.Lock()
|
||||
defer c.unAggregateAttLock.Unlock()
|
||||
c.unAggregatedAtt[r] = att.Copy()
|
||||
c.unAggregatedAtt[id] = att
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -69,19 +72,56 @@ func (c *AttCaches) UnaggregatedAttestations() ([]ethpb.Att, error) {
|
||||
|
||||
// UnaggregatedAttestationsBySlotIndex returns the unaggregated attestations in cache,
|
||||
// filtered by committee index and slot.
|
||||
func (c *AttCaches) UnaggregatedAttestationsBySlotIndex(ctx context.Context, slot primitives.Slot, committeeIndex primitives.CommitteeIndex) []ethpb.Att {
|
||||
func (c *AttCaches) UnaggregatedAttestationsBySlotIndex(
|
||||
ctx context.Context,
|
||||
slot primitives.Slot,
|
||||
committeeIndex primitives.CommitteeIndex,
|
||||
) []*ethpb.Attestation {
|
||||
_, span := trace.StartSpan(ctx, "operations.attestations.kv.UnaggregatedAttestationsBySlotIndex")
|
||||
defer span.End()
|
||||
|
||||
atts := make([]ethpb.Att, 0)
|
||||
atts := make([]*ethpb.Attestation, 0)
|
||||
|
||||
c.unAggregateAttLock.RLock()
|
||||
defer c.unAggregateAttLock.RUnlock()
|
||||
|
||||
unAggregatedAtts := c.unAggregatedAtt
|
||||
for _, a := range unAggregatedAtts {
|
||||
if slot == a.GetData().Slot && committeeIndex == a.GetData().CommitteeIndex {
|
||||
atts = append(atts, a)
|
||||
if a.Version() == version.Phase0 && slot == a.GetData().Slot && committeeIndex == a.GetData().CommitteeIndex {
|
||||
att, ok := a.(*ethpb.Attestation)
|
||||
// This will never fail in practice because we asserted the version
|
||||
if ok {
|
||||
atts = append(atts, att)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return atts
|
||||
}
|
||||
|
||||
// UnaggregatedAttestationsBySlotIndexElectra returns the unaggregated attestations in cache,
|
||||
// filtered by committee index and slot.
|
||||
func (c *AttCaches) UnaggregatedAttestationsBySlotIndexElectra(
|
||||
ctx context.Context,
|
||||
slot primitives.Slot,
|
||||
committeeIndex primitives.CommitteeIndex,
|
||||
) []*ethpb.AttestationElectra {
|
||||
_, span := trace.StartSpan(ctx, "operations.attestations.kv.UnaggregatedAttestationsBySlotIndexElectra")
|
||||
defer span.End()
|
||||
|
||||
atts := make([]*ethpb.AttestationElectra, 0)
|
||||
|
||||
c.unAggregateAttLock.RLock()
|
||||
defer c.unAggregateAttLock.RUnlock()
|
||||
|
||||
unAggregatedAtts := c.unAggregatedAtt
|
||||
for _, a := range unAggregatedAtts {
|
||||
if a.Version() == version.Electra && slot == a.GetData().Slot && a.CommitteeBitsVal().BitAt(uint64(committeeIndex)) {
|
||||
att, ok := a.(*ethpb.AttestationElectra)
|
||||
// This will never fail in practice because we asserted the version
|
||||
if ok {
|
||||
atts = append(atts, att)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -101,14 +141,14 @@ func (c *AttCaches) DeleteUnaggregatedAttestation(att ethpb.Att) error {
|
||||
return err
|
||||
}
|
||||
|
||||
r, err := hashFn(att)
|
||||
id, err := attestation.NewId(att, attestation.Full)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not tree hash attestation")
|
||||
return errors.Wrap(err, "could not create attestation ID")
|
||||
}
|
||||
|
||||
c.unAggregateAttLock.Lock()
|
||||
defer c.unAggregateAttLock.Unlock()
|
||||
delete(c.unAggregatedAtt, r)
|
||||
delete(c.unAggregatedAtt, id)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -7,10 +7,11 @@ import (
|
||||
"testing"
|
||||
|
||||
c "github.com/patrickmn/go-cache"
|
||||
fssz "github.com/prysmaticlabs/fastssz"
|
||||
"github.com/prysmaticlabs/go-bitfield"
|
||||
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
|
||||
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1/attestation"
|
||||
"github.com/prysmaticlabs/prysm/v5/testing/assert"
|
||||
"github.com/prysmaticlabs/prysm/v5/testing/require"
|
||||
"github.com/prysmaticlabs/prysm/v5/testing/util"
|
||||
@@ -39,7 +40,7 @@ func TestKV_Unaggregated_SaveUnaggregatedAttestation(t *testing.T) {
|
||||
BeaconBlockRoot: []byte{0b0},
|
||||
},
|
||||
},
|
||||
wantErrString: fssz.ErrBytesLength.Error(),
|
||||
wantErrString: "could not create attestation ID",
|
||||
},
|
||||
{
|
||||
name: "normal save",
|
||||
@@ -57,13 +58,13 @@ func TestKV_Unaggregated_SaveUnaggregatedAttestation(t *testing.T) {
|
||||
count: 0,
|
||||
},
|
||||
}
|
||||
r, err := hashFn(util.HydrateAttestationData(ðpb.AttestationData{Slot: 100}))
|
||||
id, err := attestation.NewId(util.HydrateAttestation(ðpb.Attestation{Data: ðpb.AttestationData{Slot: 100}}), attestation.Data)
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
cache := NewAttCaches()
|
||||
cache.seenAtt.Set(string(r[:]), []bitfield.Bitlist{{0xff}}, c.DefaultExpiration)
|
||||
cache.seenAtt.Set(id.String(), []bitfield.Bitlist{{0xff}}, c.DefaultExpiration)
|
||||
assert.Equal(t, 0, len(cache.unAggregatedAtt), "Invalid start pool, atts: %d", len(cache.unAggregatedAtt))
|
||||
|
||||
if tt.att != nil && tt.att.GetSignature() == nil {
|
||||
@@ -246,9 +247,35 @@ func TestKV_Unaggregated_UnaggregatedAttestationsBySlotIndex(t *testing.T) {
|
||||
}
|
||||
ctx := context.Background()
|
||||
returned := cache.UnaggregatedAttestationsBySlotIndex(ctx, 1, 1)
|
||||
assert.DeepEqual(t, []ethpb.Att{att1}, returned)
|
||||
assert.DeepEqual(t, []*ethpb.Attestation{att1}, returned)
|
||||
returned = cache.UnaggregatedAttestationsBySlotIndex(ctx, 1, 2)
|
||||
assert.DeepEqual(t, []ethpb.Att{att2}, returned)
|
||||
assert.DeepEqual(t, []*ethpb.Attestation{att2}, returned)
|
||||
returned = cache.UnaggregatedAttestationsBySlotIndex(ctx, 2, 1)
|
||||
assert.DeepEqual(t, []ethpb.Att{att3}, returned)
|
||||
assert.DeepEqual(t, []*ethpb.Attestation{att3}, returned)
|
||||
}
|
||||
|
||||
func TestKV_Unaggregated_UnaggregatedAttestationsBySlotIndexElectra(t *testing.T) {
|
||||
cache := NewAttCaches()
|
||||
|
||||
committeeBits := primitives.NewAttestationCommitteeBits()
|
||||
committeeBits.SetBitAt(1, true)
|
||||
att1 := util.HydrateAttestationElectra(ðpb.AttestationElectra{Data: ðpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b101}, CommitteeBits: committeeBits})
|
||||
committeeBits = primitives.NewAttestationCommitteeBits()
|
||||
committeeBits.SetBitAt(2, true)
|
||||
att2 := util.HydrateAttestationElectra(ðpb.AttestationElectra{Data: ðpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b110}, CommitteeBits: committeeBits})
|
||||
committeeBits = primitives.NewAttestationCommitteeBits()
|
||||
committeeBits.SetBitAt(1, true)
|
||||
att3 := util.HydrateAttestationElectra(ðpb.AttestationElectra{Data: ðpb.AttestationData{Slot: 2}, AggregationBits: bitfield.Bitlist{0b110}, CommitteeBits: committeeBits})
|
||||
atts := []*ethpb.AttestationElectra{att1, att2, att3}
|
||||
|
||||
for _, att := range atts {
|
||||
require.NoError(t, cache.SaveUnaggregatedAttestation(att))
|
||||
}
|
||||
ctx := context.Background()
|
||||
returned := cache.UnaggregatedAttestationsBySlotIndexElectra(ctx, 1, 1)
|
||||
assert.DeepEqual(t, []*ethpb.AttestationElectra{att1}, returned)
|
||||
returned = cache.UnaggregatedAttestationsBySlotIndexElectra(ctx, 1, 2)
|
||||
assert.DeepEqual(t, []*ethpb.AttestationElectra{att2}, returned)
|
||||
returned = cache.UnaggregatedAttestationsBySlotIndexElectra(ctx, 2, 1)
|
||||
assert.DeepEqual(t, []*ethpb.AttestationElectra{att3}, returned)
|
||||
}
|
||||
|
||||
@@ -18,7 +18,8 @@ type Pool interface {
|
||||
SaveAggregatedAttestation(att ethpb.Att) error
|
||||
SaveAggregatedAttestations(atts []ethpb.Att) error
|
||||
AggregatedAttestations() []ethpb.Att
|
||||
AggregatedAttestationsBySlotIndex(ctx context.Context, slot primitives.Slot, committeeIndex primitives.CommitteeIndex) []ethpb.Att
|
||||
AggregatedAttestationsBySlotIndex(ctx context.Context, slot primitives.Slot, committeeIndex primitives.CommitteeIndex) []*ethpb.Attestation
|
||||
AggregatedAttestationsBySlotIndexElectra(ctx context.Context, slot primitives.Slot, committeeIndex primitives.CommitteeIndex) []*ethpb.AttestationElectra
|
||||
DeleteAggregatedAttestation(att ethpb.Att) error
|
||||
HasAggregatedAttestation(att ethpb.Att) (bool, error)
|
||||
AggregatedAttestationCount() int
|
||||
@@ -26,7 +27,8 @@ type Pool interface {
|
||||
SaveUnaggregatedAttestation(att ethpb.Att) error
|
||||
SaveUnaggregatedAttestations(atts []ethpb.Att) error
|
||||
UnaggregatedAttestations() ([]ethpb.Att, error)
|
||||
UnaggregatedAttestationsBySlotIndex(ctx context.Context, slot primitives.Slot, committeeIndex primitives.CommitteeIndex) []ethpb.Att
|
||||
UnaggregatedAttestationsBySlotIndex(ctx context.Context, slot primitives.Slot, committeeIndex primitives.CommitteeIndex) []*ethpb.Attestation
|
||||
UnaggregatedAttestationsBySlotIndexElectra(ctx context.Context, slot primitives.Slot, committeeIndex primitives.CommitteeIndex) []*ethpb.AttestationElectra
|
||||
DeleteUnaggregatedAttestation(att ethpb.Att) error
|
||||
DeleteSeenUnaggregatedAttestations() (int, error)
|
||||
UnaggregatedAttestationCount() int
|
||||
|
||||
@@ -3,14 +3,14 @@ package attestations
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/go-bitfield"
|
||||
"github.com/prysmaticlabs/prysm/v5/config/features"
|
||||
"github.com/prysmaticlabs/prysm/v5/config/params"
|
||||
"github.com/prysmaticlabs/prysm/v5/crypto/hash"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1/attestation"
|
||||
attaggregation "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1/attestation/aggregation/attestations"
|
||||
"github.com/prysmaticlabs/prysm/v5/time/slots"
|
||||
"go.opencensus.io/trace"
|
||||
@@ -67,7 +67,7 @@ func (s *Service) batchForkChoiceAtts(ctx context.Context) error {
|
||||
atts := append(s.cfg.Pool.AggregatedAttestations(), s.cfg.Pool.BlockAttestations()...)
|
||||
atts = append(atts, s.cfg.Pool.ForkchoiceAttestations()...)
|
||||
|
||||
attsByDataRoot := make(map[[32]byte][]ethpb.Att, len(atts))
|
||||
attsByVerAndDataRoot := make(map[attestation.Id][]ethpb.Att, len(atts))
|
||||
|
||||
// Consolidate attestations by aggregating them by similar data root.
|
||||
for _, att := range atts {
|
||||
@@ -79,14 +79,14 @@ func (s *Service) batchForkChoiceAtts(ctx context.Context) error {
|
||||
continue
|
||||
}
|
||||
|
||||
attDataRoot, err := att.GetData().HashTreeRoot()
|
||||
id, err := attestation.NewId(att, attestation.Data)
|
||||
if err != nil {
|
||||
return err
|
||||
return errors.Wrap(err, "could not create attestation ID")
|
||||
}
|
||||
attsByDataRoot[attDataRoot] = append(attsByDataRoot[attDataRoot], att)
|
||||
attsByVerAndDataRoot[id] = append(attsByVerAndDataRoot[id], att)
|
||||
}
|
||||
|
||||
for _, atts := range attsByDataRoot {
|
||||
for _, atts := range attsByVerAndDataRoot {
|
||||
if err := s.aggregateAndSaveForkChoiceAtts(atts); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -119,12 +119,12 @@ func (s *Service) aggregateAndSaveForkChoiceAtts(atts []ethpb.Att) error {
|
||||
// This checks if the attestation has previously been aggregated for fork choice
|
||||
// return true if yes, false if no.
|
||||
func (s *Service) seen(att ethpb.Att) (bool, error) {
|
||||
attRoot, err := hash.Proto(att.GetData())
|
||||
id, err := attestation.NewId(att, attestation.Data)
|
||||
if err != nil {
|
||||
return false, err
|
||||
return false, errors.Wrap(err, "could not create attestation ID")
|
||||
}
|
||||
incomingBits := att.GetAggregationBits()
|
||||
savedBits, ok := s.forkChoiceProcessedRoots.Get(attRoot)
|
||||
savedBits, ok := s.forkChoiceProcessedAtts.Get(id)
|
||||
if ok {
|
||||
savedBitlist, ok := savedBits.(bitfield.Bitlist)
|
||||
if !ok {
|
||||
@@ -149,6 +149,6 @@ func (s *Service) seen(att ethpb.Att) (bool, error) {
|
||||
}
|
||||
}
|
||||
|
||||
s.forkChoiceProcessedRoots.Add(attRoot, incomingBits)
|
||||
s.forkChoiceProcessedAtts.Add(id, incomingBits)
|
||||
return false, nil
|
||||
}
|
||||
|
||||
@@ -13,16 +13,16 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v5/config/params"
|
||||
)
|
||||
|
||||
var forkChoiceProcessedRootsSize = 1 << 16
|
||||
var forkChoiceProcessedAttsSize = 1 << 16
|
||||
|
||||
// Service of attestation pool operations.
|
||||
type Service struct {
|
||||
cfg *Config
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
err error
|
||||
forkChoiceProcessedRoots *lru.Cache
|
||||
genesisTime uint64
|
||||
cfg *Config
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
err error
|
||||
forkChoiceProcessedAtts *lru.Cache
|
||||
genesisTime uint64
|
||||
}
|
||||
|
||||
// Config options for the service.
|
||||
@@ -35,7 +35,7 @@ type Config struct {
|
||||
// NewService instantiates a new attestation pool service instance that will
|
||||
// be registered into a running beacon node.
|
||||
func NewService(ctx context.Context, cfg *Config) (*Service, error) {
|
||||
cache := lruwrpr.New(forkChoiceProcessedRootsSize)
|
||||
cache := lruwrpr.New(forkChoiceProcessedAttsSize)
|
||||
|
||||
if cfg.pruneInterval == 0 {
|
||||
// Prune expired attestations from the pool every slot interval.
|
||||
@@ -44,10 +44,10 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) {
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
return &Service{
|
||||
cfg: cfg,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
forkChoiceProcessedRoots: cache,
|
||||
cfg: cfg,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
forkChoiceProcessedAtts: cache,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -2,7 +2,6 @@ package validator
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/rpc/core"
|
||||
@@ -100,12 +99,8 @@ func (vs *Server) SubmitAggregateSelectionProof(ctx context.Context, req *ethpb.
|
||||
best = aggregatedAtt
|
||||
}
|
||||
}
|
||||
att, ok := best.(*ethpb.Attestation)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("best attestation has wrong type (expected %T, got %T)", ðpb.Attestation{}, best)
|
||||
}
|
||||
a := ðpb.AggregateAttestationAndProof{
|
||||
Aggregate: att,
|
||||
Aggregate: best,
|
||||
SelectionProof: req.SlotSignature,
|
||||
AggregatorIndex: validatorIndex,
|
||||
}
|
||||
|
||||
@@ -2,10 +2,14 @@ load("@prysm//tools/go:def.bzl", "go_library", "go_test")
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = ["attestation_utils.go"],
|
||||
srcs = [
|
||||
"attestation_utils.go",
|
||||
"id.go",
|
||||
],
|
||||
importpath = "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1/attestation",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//beacon-chain/core/helpers:go_default_library",
|
||||
"//beacon-chain/core/signing:go_default_library",
|
||||
"//config/params:go_default_library",
|
||||
"//consensus-types/primitives:go_default_library",
|
||||
@@ -20,7 +24,10 @@ go_library(
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = ["attestation_utils_test.go"],
|
||||
srcs = [
|
||||
"attestation_utils_test.go",
|
||||
"id_test.go",
|
||||
],
|
||||
deps = [
|
||||
":go_default_library",
|
||||
"//config/fieldparams:go_default_library",
|
||||
@@ -29,6 +36,7 @@ go_test(
|
||||
"//proto/prysm/v1alpha1:go_default_library",
|
||||
"//testing/assert:go_default_library",
|
||||
"//testing/require:go_default_library",
|
||||
"//testing/util:go_default_library",
|
||||
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
@@ -12,6 +12,7 @@ go_library(
|
||||
"//crypto/bls:go_default_library",
|
||||
"//proto/prysm/v1alpha1:go_default_library",
|
||||
"//proto/prysm/v1alpha1/attestation/aggregation:go_default_library",
|
||||
"//runtime/version:go_default_library",
|
||||
"@com_github_pkg_errors//:go_default_library",
|
||||
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
|
||||
"@com_github_sirupsen_logrus//:go_default_library",
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v5/crypto/bls"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1/attestation/aggregation"
|
||||
"github.com/prysmaticlabs/prysm/v5/runtime/version"
|
||||
)
|
||||
|
||||
// MaxCoverAttestationAggregation relies on Maximum Coverage greedy algorithm for aggregation.
|
||||
@@ -171,11 +172,21 @@ func aggregateAttestations(atts []ethpb.Att, keys []int, coverage *bitfield.Bitl
|
||||
}
|
||||
}
|
||||
// Put aggregated attestation at a position of the first selected attestation.
|
||||
atts[targetIdx] = ðpb.Attestation{
|
||||
// Append size byte, which will be unnecessary on switch to Bitlist64.
|
||||
AggregationBits: coverage.ToBitlist(),
|
||||
Data: data,
|
||||
Signature: aggregateSignatures(signs).Marshal(),
|
||||
if atts[0].Version() == version.Phase0 {
|
||||
atts[targetIdx] = ðpb.Attestation{
|
||||
// Append size byte, which will be unnecessary on switch to Bitlist64.
|
||||
AggregationBits: coverage.ToBitlist(),
|
||||
Data: data,
|
||||
Signature: aggregateSignatures(signs).Marshal(),
|
||||
}
|
||||
} else {
|
||||
atts[targetIdx] = ðpb.AttestationElectra{
|
||||
// Append size byte, which will be unnecessary on switch to Bitlist64.
|
||||
AggregationBits: coverage.ToBitlist(),
|
||||
CommitteeBits: atts[0].CommitteeBitsVal().Bytes(),
|
||||
Data: data,
|
||||
Signature: aggregateSignatures(signs).Marshal(),
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
71
proto/prysm/v1alpha1/attestation/id.go
Normal file
71
proto/prysm/v1alpha1/attestation/id.go
Normal file
@@ -0,0 +1,71 @@
|
||||
package attestation
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers"
|
||||
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/v5/runtime/version"
|
||||
)
|
||||
|
||||
// IdSource represents the part of attestation that will be used to generate the Id.
|
||||
type IdSource uint8
|
||||
|
||||
const (
|
||||
// Full generates the Id from the whole attestation.
|
||||
Full IdSource = iota
|
||||
// Data generates the Id from the tuple (slot, committee index, beacon block root, source, target).
|
||||
Data
|
||||
)
|
||||
|
||||
// Id represents an attestation ID. Its uniqueness depends on the IdSource provided when constructing the Id.
|
||||
type Id [33]byte
|
||||
|
||||
// NewId --
|
||||
func NewId(att ethpb.Att, source IdSource) (Id, error) {
|
||||
if err := helpers.ValidateNilAttestation(att); err != nil {
|
||||
return Id{}, err
|
||||
}
|
||||
if att.Version() < 0 || att.Version() > 255 {
|
||||
return Id{}, errors.New("attestation version must be between 0 and 255")
|
||||
}
|
||||
|
||||
var id Id
|
||||
id[0] = byte(att.Version())
|
||||
|
||||
switch source {
|
||||
case Full:
|
||||
h, err := att.HashTreeRoot()
|
||||
if err != nil {
|
||||
return Id{}, err
|
||||
}
|
||||
copy(id[1:], h[:])
|
||||
return id, nil
|
||||
case Data:
|
||||
data := att.GetData()
|
||||
if att.Version() >= version.Electra {
|
||||
committeeIndices := att.CommitteeBitsVal().BitIndices()
|
||||
if len(committeeIndices) != 1 {
|
||||
return Id{}, fmt.Errorf("%d committee bits are set instead of 1", len(committeeIndices))
|
||||
}
|
||||
dataCopy := ethpb.CopyAttestationData(att.GetData())
|
||||
dataCopy.CommitteeIndex = primitives.CommitteeIndex(committeeIndices[0])
|
||||
data = dataCopy
|
||||
}
|
||||
h, err := data.HashTreeRoot()
|
||||
if err != nil {
|
||||
return Id{}, err
|
||||
}
|
||||
copy(id[1:], h[:])
|
||||
return id, nil
|
||||
default:
|
||||
return Id{}, errors.New("invalid source requested")
|
||||
}
|
||||
}
|
||||
|
||||
// String --
|
||||
func (id Id) String() string {
|
||||
return string(id[:])
|
||||
}
|
||||
63
proto/prysm/v1alpha1/attestation/id_test.go
Normal file
63
proto/prysm/v1alpha1/attestation/id_test.go
Normal file
@@ -0,0 +1,63 @@
|
||||
package attestation_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1/attestation"
|
||||
"github.com/prysmaticlabs/prysm/v5/testing/assert"
|
||||
"github.com/prysmaticlabs/prysm/v5/testing/require"
|
||||
"github.com/prysmaticlabs/prysm/v5/testing/util"
|
||||
)
|
||||
|
||||
func TestNewId(t *testing.T) {
|
||||
t.Run("full source", func(t *testing.T) {
|
||||
att := util.HydrateAttestation(ðpb.Attestation{})
|
||||
_, err := attestation.NewId(att, attestation.Full)
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
t.Run("data source Phase 0", func(t *testing.T) {
|
||||
att := util.HydrateAttestation(ðpb.Attestation{})
|
||||
_, err := attestation.NewId(att, attestation.Data)
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
t.Run("data source Electra", func(t *testing.T) {
|
||||
cb := primitives.NewAttestationCommitteeBits()
|
||||
cb.SetBitAt(0, true)
|
||||
att := util.HydrateAttestationElectra(ðpb.AttestationElectra{CommitteeBits: cb})
|
||||
_, err := attestation.NewId(att, attestation.Data)
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
t.Run("ID is different between versions", func(t *testing.T) {
|
||||
phase0Att := util.HydrateAttestation(ðpb.Attestation{})
|
||||
phase0Id, err := attestation.NewId(phase0Att, attestation.Data)
|
||||
require.NoError(t, err)
|
||||
cb := primitives.NewAttestationCommitteeBits()
|
||||
cb.SetBitAt(0, true) // setting committee bit 0 for Electra corresponds to attestation data's committee index 0 for Phase 0
|
||||
electraAtt := util.HydrateAttestationElectra(ðpb.AttestationElectra{CommitteeBits: cb})
|
||||
electraId, err := attestation.NewId(electraAtt, attestation.Data)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.NotEqual(t, phase0Id, electraId)
|
||||
})
|
||||
t.Run("invalid source", func(t *testing.T) {
|
||||
att := util.HydrateAttestation(ðpb.Attestation{})
|
||||
_, err := attestation.NewId(att, 123)
|
||||
assert.ErrorContains(t, "invalid source requested", err)
|
||||
})
|
||||
t.Run("data source Electra - 0 bits set", func(t *testing.T) {
|
||||
cb := primitives.NewAttestationCommitteeBits()
|
||||
att := util.HydrateAttestationElectra(ðpb.AttestationElectra{CommitteeBits: cb})
|
||||
_, err := attestation.NewId(att, attestation.Data)
|
||||
assert.ErrorContains(t, "0 committee bits are set", err)
|
||||
})
|
||||
t.Run("data source Electra - multiple bits set", func(t *testing.T) {
|
||||
cb := primitives.NewAttestationCommitteeBits()
|
||||
cb.SetBitAt(0, true)
|
||||
cb.SetBitAt(1, true)
|
||||
att := util.HydrateAttestationElectra(ðpb.AttestationElectra{CommitteeBits: cb})
|
||||
_, err := attestation.NewId(att, attestation.Data)
|
||||
assert.ErrorContains(t, "2 committee bits are set", err)
|
||||
})
|
||||
}
|
||||
Reference in New Issue
Block a user