Pool: Use a TTL cache for seen bits (#7015)

This commit is contained in:
terence tsao
2020-08-21 16:27:51 -07:00
committed by GitHub
parent e69ed7c778
commit f2afeed9da
17 changed files with 202 additions and 83 deletions

View File

@@ -226,7 +226,6 @@ func (s *Service) updateFinalized(ctx context.Context, cp *ethpb.Checkpoint) err
return err
}
s.clearInitSyncBlocks()
s.attPool.ClearSeenAtts()
if err := s.beaconDB.SaveFinalizedCheckpoint(ctx, cp); err != nil {
return err

View File

@@ -8,6 +8,7 @@ go_library(
"block.go",
"forkchoice.go",
"kv.go",
"seen_bits.go",
"unaggregated.go",
],
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/operations/attestations/kv",
@@ -18,6 +19,8 @@ go_library(
"//beacon-chain/state/stateutil:go_default_library",
"//shared/aggregation/attestations:go_default_library",
"//shared/hashutil:go_default_library",
"//shared/params:go_default_library",
"@com_github_patrickmn_go_cache//:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
@@ -31,6 +34,7 @@ go_test(
"benchmark_test.go",
"block_test.go",
"forkchoice_test.go",
"seen_bits_test.go",
"unaggregated_test.go",
],
embed = [":go_default_library"],
@@ -38,6 +42,7 @@ go_test(
"//shared/bls:go_default_library",
"//shared/testutil/assert:go_default_library",
"//shared/testutil/require:go_default_library",
"@com_github_patrickmn_go_cache//:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
],

View File

@@ -3,7 +3,6 @@ package kv
import (
"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state"
"github.com/prysmaticlabs/prysm/beacon-chain/state/stateutil"
@@ -15,7 +14,10 @@ import (
// It tracks the unaggregated attestations that weren't able to aggregate to prevent
// the deletion of unaggregated attestations in the pool.
func (p *AttCaches) AggregateUnaggregatedAttestations() error {
unaggregatedAtts := p.UnaggregatedAttestations()
unaggregatedAtts, err := p.UnaggregatedAttestations()
if err != nil {
return err
}
attsByDataRoot := make(map[[32]byte][]*ethpb.Attestation, len(unaggregatedAtts))
for _, att := range unaggregatedAtts {
attDataRoot, err := stateutil.AttestationDataRoot(att.Data)
@@ -83,23 +85,18 @@ func (p *AttCaches) SaveAggregatedAttestation(att *ethpb.Attestation) error {
return nil
}
seen, err := p.hasSeenBit(att)
if err != nil {
return err
}
if seen {
return nil
}
r, err := hashFn(att.Data)
if err != nil {
return errors.Wrap(err, "could not tree hash attestation")
}
// Don't save the attestation if the bitfield has been contained in previous blocks.
p.seenAggregatedAttLock.RLock()
seenBits, ok := p.seenAggregatedAtt[r]
p.seenAggregatedAttLock.RUnlock()
if ok {
for _, bit := range seenBits {
if bit.Len() == att.AggregationBits.Len() && bit.Contains(att.AggregationBits) {
return nil
}
}
}
copiedAtt := stateTrie.CopyAttestation(att)
p.aggregatedAttLock.Lock()
defer p.aggregatedAttLock.Unlock()
@@ -172,14 +169,9 @@ func (p *AttCaches) DeleteAggregatedAttestation(att *ethpb.Attestation) error {
return errors.Wrap(err, "could not tree hash attestation data")
}
p.seenAggregatedAttLock.Lock()
_, ok := p.seenAggregatedAtt[r]
if ok {
p.seenAggregatedAtt[r] = append(p.seenAggregatedAtt[r], att.AggregationBits)
} else {
p.seenAggregatedAtt[r] = []bitfield.Bitlist{att.AggregationBits}
if err := p.insertSeenBit(att); err != nil {
return err
}
p.seenAggregatedAttLock.Unlock()
p.aggregatedAttLock.Lock()
defer p.aggregatedAttLock.Unlock()
@@ -242,10 +234,3 @@ func (p *AttCaches) AggregatedAttestationCount() int {
defer p.aggregatedAttLock.RUnlock()
return len(p.aggregatedAtt)
}
// ClearSeenAtts clears the seen attestations cache.
func (p *AttCaches) ClearSeenAtts() {
p.seenAggregatedAttLock.Lock()
defer p.seenAggregatedAttLock.Unlock()
p.seenAggregatedAtt = make(map[[32]byte][]bitfield.Bitlist)
}

View File

@@ -5,6 +5,7 @@ import (
"sort"
"testing"
c "github.com/patrickmn/go-cache"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/prysm/shared/bls"
@@ -101,7 +102,7 @@ func TestKV_Aggregated_SaveAggregatedAttestation(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cache := NewAttCaches()
cache.seenAggregatedAtt[r] = []bitfield.Bitlist{{0xff}}
cache.seenAtt.Set(string(r[:]), []bitfield.Bitlist{{0xff}}, c.DefaultExpiration)
if len(cache.unAggregatedAtt) != 0 {
t.Errorf("Invalid start pool, atts: %d", len(cache.unAggregatedAtt))
}

View File

@@ -5,10 +5,12 @@ package kv
import (
"sync"
"time"
"github.com/patrickmn/go-cache"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/prysm/shared/hashutil"
"github.com/prysmaticlabs/prysm/shared/params"
)
var hashFn = hashutil.HashProto
@@ -17,27 +19,28 @@ var hashFn = hashutil.HashProto
// These caches are KV store for various attestations
// such are unaggregated, aggregated or attestations within a block.
type AttCaches struct {
aggregatedAttLock sync.RWMutex
aggregatedAtt map[[32]byte][]*ethpb.Attestation
unAggregateAttLock sync.RWMutex
unAggregatedAtt map[[32]byte]*ethpb.Attestation
forkchoiceAttLock sync.RWMutex
forkchoiceAtt map[[32]byte]*ethpb.Attestation
blockAttLock sync.RWMutex
blockAtt map[[32]byte][]*ethpb.Attestation
seenAggregatedAttLock sync.RWMutex
seenAggregatedAtt map[[32]byte][]bitfield.Bitlist
aggregatedAttLock sync.RWMutex
aggregatedAtt map[[32]byte][]*ethpb.Attestation
unAggregateAttLock sync.RWMutex
unAggregatedAtt map[[32]byte]*ethpb.Attestation
forkchoiceAttLock sync.RWMutex
forkchoiceAtt map[[32]byte]*ethpb.Attestation
blockAttLock sync.RWMutex
blockAtt map[[32]byte][]*ethpb.Attestation
seenAtt *cache.Cache
}
// NewAttCaches initializes a new attestation pool consists of multiple KV store in cache for
// various kind of attestations.
func NewAttCaches() *AttCaches {
secsInEpoch := time.Duration(params.BeaconConfig().SlotsPerEpoch * params.BeaconConfig().SecondsPerSlot)
c := cache.New(secsInEpoch*time.Second, 2*secsInEpoch*time.Second)
pool := &AttCaches{
unAggregatedAtt: make(map[[32]byte]*ethpb.Attestation),
aggregatedAtt: make(map[[32]byte][]*ethpb.Attestation),
forkchoiceAtt: make(map[[32]byte]*ethpb.Attestation),
blockAtt: make(map[[32]byte][]*ethpb.Attestation),
seenAggregatedAtt: make(map[[32]byte][]bitfield.Bitlist),
unAggregatedAtt: make(map[[32]byte]*ethpb.Attestation),
aggregatedAtt: make(map[[32]byte][]*ethpb.Attestation),
forkchoiceAtt: make(map[[32]byte]*ethpb.Attestation),
blockAtt: make(map[[32]byte][]*ethpb.Attestation),
seenAtt: c,
}
return pool

View File

@@ -0,0 +1,50 @@
package kv
import (
"github.com/patrickmn/go-cache"
"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-bitfield"
)
func (p *AttCaches) insertSeenBit(att *ethpb.Attestation) error {
r, err := hashFn(att.Data)
if err != nil {
return err
}
v, ok := p.seenAtt.Get(string(r[:]))
if ok {
seenBits, ok := v.([]bitfield.Bitlist)
if !ok {
return errors.New("could not convert to bitlist type")
}
seenBits = append(seenBits, att.AggregationBits)
p.seenAtt.Set(string(r[:]), seenBits, cache.DefaultExpiration /* one epoch */)
return nil
}
p.seenAtt.Set(string(r[:]), []bitfield.Bitlist{att.AggregationBits}, cache.DefaultExpiration /* one epoch */)
return nil
}
func (p *AttCaches) hasSeenBit(att *ethpb.Attestation) (bool, error) {
r, err := hashFn(att.Data)
if err != nil {
return false, err
}
v, ok := p.seenAtt.Get(string(r[:]))
if ok {
seenBits, ok := v.([]bitfield.Bitlist)
if !ok {
return false, errors.New("could not convert to bitlist type")
}
for _, bit := range seenBits {
if bit.Len() == att.AggregationBits.Len() && bit.Contains(att.AggregationBits) {
return true, nil
}
}
}
return false, nil
}

View File

@@ -0,0 +1,36 @@
package kv
import (
"testing"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
)
func TestAttCaches_hasSeenBit(t *testing.T) {
c := NewAttCaches()
d := &ethpb.AttestationData{}
seenA1 := &ethpb.Attestation{Data: d, AggregationBits: bitfield.Bitlist{0b10000011}}
seenA2 := &ethpb.Attestation{Data: d, AggregationBits: bitfield.Bitlist{0b11100000}}
require.NoError(t, c.insertSeenBit(seenA1))
require.NoError(t, c.insertSeenBit(seenA2))
tests := []struct {
att *ethpb.Attestation
want bool
}{
{att: &ethpb.Attestation{Data: d, AggregationBits: bitfield.Bitlist{0b10000000}}, want: true},
{att: &ethpb.Attestation{Data: d, AggregationBits: bitfield.Bitlist{0b10000001}}, want: true},
{att: &ethpb.Attestation{Data: d, AggregationBits: bitfield.Bitlist{0b11100000}}, want: true},
{att: &ethpb.Attestation{Data: d, AggregationBits: bitfield.Bitlist{0b10000011}}, want: true},
{att: &ethpb.Attestation{Data: d, AggregationBits: bitfield.Bitlist{0b00001000}}, want: false},
{att: &ethpb.Attestation{Data: d, AggregationBits: bitfield.Bitlist{0b11110111}}, want: false},
}
for _, tt := range tests {
got, err := c.hasSeenBit(tt.att)
require.NoError(t, err)
if got != tt.want {
t.Errorf("hasSeenBit() got = %v, want %v", got, tt.want)
}
}
}

View File

@@ -3,6 +3,7 @@ package kv
import (
"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state"
)
@@ -16,24 +17,15 @@ func (p *AttCaches) SaveUnaggregatedAttestation(att *ethpb.Attestation) error {
return errors.New("attestation is aggregated")
}
r, err := hashFn(att.Data)
seen, err := p.hasSeenBit(att)
if err != nil {
return errors.Wrap(err, "could not tree hash attestation")
return err
}
if seen {
return nil
}
// Don't save the attestation if the bitfield has been contained in previous blocks.
p.seenAggregatedAttLock.RLock()
seenBits, ok := p.seenAggregatedAtt[r]
p.seenAggregatedAttLock.RUnlock()
if ok {
for _, bit := range seenBits {
if bit.Len() == att.AggregationBits.Len() && bit.Contains(att.AggregationBits) {
return nil
}
}
}
r, err = hashFn(att)
r, err := hashFn(att)
if err != nil {
return errors.Wrap(err, "could not tree hash attestation")
}
@@ -56,16 +48,38 @@ func (p *AttCaches) SaveUnaggregatedAttestations(atts []*ethpb.Attestation) erro
}
// UnaggregatedAttestations returns all the unaggregated attestations in cache.
func (p *AttCaches) UnaggregatedAttestations() []*ethpb.Attestation {
p.unAggregateAttLock.RLock()
defer p.unAggregateAttLock.RUnlock()
func (p *AttCaches) UnaggregatedAttestations() ([]*ethpb.Attestation, error) {
p.unAggregateAttLock.Lock()
defer p.unAggregateAttLock.Unlock()
unAggregatedAtts := p.unAggregatedAtt
atts := make([]*ethpb.Attestation, 0, len(unAggregatedAtts))
for _, att := range unAggregatedAtts {
r, err := hashFn(att.Data)
if err != nil {
return nil, errors.Wrap(err, "could not tree hash attestation")
}
v, ok := p.seenAtt.Get(string(r[:]))
if ok {
seenBits, ok := v.([]bitfield.Bitlist)
if !ok {
return nil, errors.New("could not convert to bitlist type")
}
for _, bit := range seenBits {
if bit.Len() == att.AggregationBits.Len() && bit.Contains(att.AggregationBits) {
r, err := hashFn(att)
if err != nil {
return nil, errors.Wrap(err, "could not tree hash attestation")
}
delete(p.unAggregatedAtt, r)
continue
}
}
}
atts := make([]*ethpb.Attestation, 0, len(p.unAggregatedAtt))
for _, att := range p.unAggregatedAtt {
atts = append(atts, stateTrie.CopyAttestation(att) /* Copied */)
}
return atts
return atts, nil
}
// UnaggregatedAttestationsBySlotIndex returns the unaggregated attestations in cache,
@@ -75,7 +89,9 @@ func (p *AttCaches) UnaggregatedAttestationsBySlotIndex(slot uint64, committeeIn
p.unAggregateAttLock.RLock()
defer p.unAggregateAttLock.RUnlock()
for _, a := range p.unAggregatedAtt {
unAggregatedAtts := p.unAggregatedAtt
for _, a := range unAggregatedAtts {
if slot == a.Data.Slot && committeeIndex == a.Data.CommitteeIndex {
atts = append(atts, a)
}
@@ -93,6 +109,10 @@ func (p *AttCaches) DeleteUnaggregatedAttestation(att *ethpb.Attestation) error
return errors.New("attestation is aggregated")
}
if err := p.insertSeenBit(att); err != nil {
return err
}
r, err := hashFn(att)
if err != nil {
return errors.Wrap(err, "could not tree hash attestation")

View File

@@ -3,6 +3,7 @@ package kv
import (
"testing"
c "github.com/patrickmn/go-cache"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
@@ -32,7 +33,7 @@ func TestKV_Unaggregated_SaveUnaggregatedAttestation(t *testing.T) {
BeaconBlockRoot: []byte{0b0},
},
},
wantErrString: "could not tree hash attestation: incorrect fixed bytes marshalling",
wantErrString: "incorrect fixed bytes marshalling",
},
{
name: "normal save",
@@ -63,7 +64,7 @@ func TestKV_Unaggregated_SaveUnaggregatedAttestation(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cache := NewAttCaches()
cache.seenAggregatedAtt[r] = []bitfield.Bitlist{{0xff}}
cache.seenAtt.Set(string(r[:]), []bitfield.Bitlist{{0xff}}, c.DefaultExpiration)
if len(cache.unAggregatedAtt) != 0 {
t.Errorf("Invalid start pool, atts: %d", len(cache.unAggregatedAtt))
}
@@ -171,7 +172,8 @@ func TestKV_Unaggregated_DeleteUnaggregatedAttestation(t *testing.T) {
t.Error(err)
}
}
returned := cache.UnaggregatedAttestations()
returned, err := cache.UnaggregatedAttestations()
require.NoError(t, err)
assert.DeepEqual(t, []*ethpb.Attestation{}, returned)
})
}

View File

@@ -19,11 +19,10 @@ type Pool interface {
DeleteAggregatedAttestation(att *ethpb.Attestation) error
HasAggregatedAttestation(att *ethpb.Attestation) (bool, error)
AggregatedAttestationCount() int
ClearSeenAtts()
// For unaggregated attestations.
SaveUnaggregatedAttestation(att *ethpb.Attestation) error
SaveUnaggregatedAttestations(atts []*ethpb.Attestation) error
UnaggregatedAttestations() []*ethpb.Attestation
UnaggregatedAttestations() ([]*ethpb.Attestation, error)
UnaggregatedAttestationsBySlotIndex(slot uint64, committeeIndex uint64) []*ethpb.Attestation
DeleteUnaggregatedAttestation(att *ethpb.Attestation) error
UnaggregatedAttestationCount() int

View File

@@ -35,7 +35,11 @@ func (s *Service) pruneExpiredAtts() {
}
}
unAggregatedAtts := s.pool.UnaggregatedAttestations()
unAggregatedAtts, err := s.pool.UnaggregatedAttestations()
if err != nil {
log.WithError(err).Error("Could not get unaggregated attestations")
return
}
for _, att := range unAggregatedAtts {
if s.expired(att.Data.Slot) {
if err := s.pool.DeleteUnaggregatedAttestation(att); err != nil {

View File

@@ -53,7 +53,9 @@ func TestPruneExpired_Ticker(t *testing.T) {
done := make(chan struct{}, 1)
runutil.RunEvery(ctx, 500*time.Millisecond, func() {
for _, attestation := range s.pool.UnaggregatedAttestations() {
atts, err := s.pool.UnaggregatedAttestations()
require.NoError(t, err)
for _, attestation := range atts {
if attestation.Data.Slot == 0 {
return
}

View File

@@ -678,7 +678,10 @@ func (vs *Server) packAttestations(ctx context.Context, latestState *stateTrie.B
// If there is any room left in the block, consider unaggregated attestations as well.
numAtts := uint64(len(atts))
if numAtts < params.BeaconConfig().MaxAttestations {
uAtts := vs.AttPool.UnaggregatedAttestations()
uAtts, err := vs.AttPool.UnaggregatedAttestations()
if err != nil {
return nil, errors.Wrap(err, "could not get unaggregated attestations")
}
uAtts, err = vs.filterAttestationsForBlockInclusion(ctx, latestState, uAtts)
atts = append(atts, uAtts...)

View File

@@ -1941,7 +1941,9 @@ func TestDeleteAttsInPool_Aggregated(t *testing.T) {
require.NoError(t, err)
require.NoError(t, s.deleteAttsInPool(context.Background(), append(aa, unaggregatedAtts...)))
assert.Equal(t, 0, len(s.AttPool.AggregatedAttestations()), "Did not delete aggregated attestation")
assert.Equal(t, 0, len(s.AttPool.UnaggregatedAttestations()), "Did not delete unaggregated attestation")
atts, err := s.AttPool.UnaggregatedAttestations()
require.NoError(t, err)
assert.Equal(t, 0, len(atts), "Did not delete unaggregated attestation")
}
func TestSortProfitableAtts(t *testing.T) {

View File

@@ -88,8 +88,10 @@ func TestProcessPendingAtts_HasBlockSaveUnAggregatedAtt(t *testing.T) {
r.blkRootToPendingAtts[r32] = []*ethpb.SignedAggregateAttestationAndProof{{Message: a}}
require.NoError(t, r.processPendingAtts(context.Background()))
assert.Equal(t, 1, len(r.attPool.UnaggregatedAttestations()), "Did not save unaggregated att")
assert.DeepEqual(t, a.Aggregate, r.attPool.UnaggregatedAttestations()[0], "Incorrect saved att")
atts, err := r.attPool.UnaggregatedAttestations()
require.NoError(t, err)
assert.Equal(t, 1, len(atts), "Did not save unaggregated att")
assert.DeepEqual(t, a.Aggregate, atts[0], "Incorrect saved att")
assert.Equal(t, 0, len(r.attPool.AggregatedAttestations()), "Did save aggregated att")
require.LogsContain(t, hook, "Verified and saved pending attestations to pool")
}
@@ -173,7 +175,9 @@ func TestProcessPendingAtts_HasBlockSaveAggregatedAtt(t *testing.T) {
assert.Equal(t, 1, len(r.attPool.AggregatedAttestations()), "Did not save aggregated att")
assert.DeepEqual(t, att, r.attPool.AggregatedAttestations()[0], "Incorrect saved att")
assert.Equal(t, 0, len(r.attPool.UnaggregatedAttestations()), "Did save unaggregated att")
atts, err := r.attPool.UnaggregatedAttestations()
require.NoError(t, err)
assert.Equal(t, 0, len(atts), "Did save unaggregated att")
require.LogsContain(t, hook, "Verified and saved pending attestations to pool")
}

View File

@@ -38,5 +38,8 @@ func TestBeaconAggregateProofSubscriber_CanSaveUnaggregatedAttestation(t *testin
a := &ethpb.SignedAggregateAttestationAndProof{Message: &ethpb.AggregateAttestationAndProof{Aggregate: &ethpb.Attestation{Data: &ethpb.AttestationData{Target: &ethpb.Checkpoint{}}, AggregationBits: bitfield.Bitlist{0x03}}, AggregatorIndex: 100}}
require.NoError(t, r.beaconAggregateProofSubscriber(context.Background(), a))
assert.DeepEqual(t, []*ethpb.Attestation{a.Message.Aggregate}, r.attPool.UnaggregatedAttestations(), "Did not save unaggregated attestation")
atts, err := r.attPool.UnaggregatedAttestations()
require.NoError(t, err)
assert.DeepEqual(t, []*ethpb.Attestation{a.Message.Aggregate}, atts, "Did not save unaggregated attestation")
}

View File

@@ -91,7 +91,8 @@ func TestService_committeeIndexBeaconAttestationSubscriber_ValidMessage(t *testi
time.Sleep(time.Second * 1)
ua := r.attPool.UnaggregatedAttestations()
ua, err := r.attPool.UnaggregatedAttestations()
require.NoError(t, err)
if len(ua) == 0 {
t.Error("No attestations put into pool")
}