Add and utilize seen atts map (#6993)

* DB: add block roots test

* Pool: add seen atts map

* Pool: use seen atts map

* Pool: clear seen map

* Gazelle

* Fixed an existing test

Co-authored-by: rauljordan <raul@prysmaticlabs.com>
Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
This commit is contained in:
terence tsao
2020-08-13 14:52:27 -07:00
committed by GitHub
parent 16c34b627f
commit 3df2980cba
9 changed files with 117 additions and 32 deletions

View File

@@ -231,6 +231,8 @@ func (s *Service) updateFinalized(ctx context.Context, cp *ethpb.Checkpoint) err
return errors.Wrap(err, "could not migrate to cold")
}
s.attPool.ClearSeenAtts()
return s.beaconDB.SaveFinalizedCheckpoint(ctx, cp)
}

View File

@@ -20,6 +20,7 @@ go_library(
"//shared/hashutil:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
],
)

View File

@@ -3,6 +3,7 @@ package kv
import (
"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state"
"github.com/prysmaticlabs/prysm/beacon-chain/state/stateutil"
@@ -87,6 +88,18 @@ func (p *AttCaches) SaveAggregatedAttestation(att *ethpb.Attestation) error {
return errors.Wrap(err, "could not tree hash attestation")
}
// Don't save the attestation if the bitfield has been contained in previous blocks.
p.seenAggregatedAttLock.RLock()
seenBits, ok := p.seenAggregatedAtt[r]
p.seenAggregatedAttLock.RUnlock()
if ok {
for _, bit := range seenBits {
if bit.Len() == att.AggregationBits.Len() && bit.Contains(att.AggregationBits) {
return nil
}
}
}
copiedAtt := stateTrie.CopyAttestation(att)
p.aggregatedAttLock.Lock()
defer p.aggregatedAttLock.Unlock()
@@ -159,6 +172,15 @@ func (p *AttCaches) DeleteAggregatedAttestation(att *ethpb.Attestation) error {
return errors.Wrap(err, "could not tree hash attestation data")
}
p.seenAggregatedAttLock.Lock()
_, ok := p.seenAggregatedAtt[r]
if ok {
p.seenAggregatedAtt[r] = append(p.seenAggregatedAtt[r], att.AggregationBits)
} else {
p.seenAggregatedAtt[r] = []bitfield.Bitlist{att.AggregationBits}
}
p.seenAggregatedAttLock.Unlock()
p.aggregatedAttLock.Lock()
defer p.aggregatedAttLock.Unlock()
attList, ok := p.aggregatedAtt[r]
@@ -220,3 +242,10 @@ func (p *AttCaches) AggregatedAttestationCount() int {
defer p.aggregatedAttLock.RUnlock()
return len(p.aggregatedAtt)
}
// ClearSeenAtts clears the seen attestations cache.
func (p *AttCaches) ClearSeenAtts() {
p.seenAggregatedAttLock.Lock()
defer p.seenAggregatedAttLock.Unlock()
p.seenAggregatedAtt = make(map[[32]byte][]bitfield.Bitlist)
}

View File

@@ -72,6 +72,16 @@ func TestKV_Aggregated_SaveAggregatedAttestation(t *testing.T) {
},
wantErrString: "could not tree hash attestation: incorrect fixed bytes marshalling",
},
{
name: "already seen",
att: &ethpb.Attestation{
Data: &ethpb.AttestationData{
Slot: 100,
},
AggregationBits: bitfield.Bitlist{0b11101001},
},
count: 0,
},
{
name: "normal save",
att: &ethpb.Attestation{
@@ -83,10 +93,15 @@ func TestKV_Aggregated_SaveAggregatedAttestation(t *testing.T) {
count: 1,
},
}
r, err := hashFn(&ethpb.AttestationData{
Slot: 100,
})
require.NoError(t, err)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cache := NewAttCaches()
cache.seenAggregatedAtt[r] = []bitfield.Bitlist{{0xff}}
if len(cache.unAggregatedAtt) != 0 {
t.Errorf("Invalid start pool, atts: %d", len(cache.unAggregatedAtt))
}

View File

@@ -7,6 +7,7 @@ import (
"sync"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/prysm/shared/hashutil"
)
@@ -16,24 +17,27 @@ var hashFn = hashutil.HashProto
// These caches are KV store for various attestations
// such are unaggregated, aggregated or attestations within a block.
type AttCaches struct {
aggregatedAttLock sync.RWMutex
aggregatedAtt map[[32]byte][]*ethpb.Attestation
unAggregateAttLock sync.RWMutex
unAggregatedAtt map[[32]byte]*ethpb.Attestation
forkchoiceAttLock sync.RWMutex
forkchoiceAtt map[[32]byte]*ethpb.Attestation
blockAttLock sync.RWMutex
blockAtt map[[32]byte][]*ethpb.Attestation
aggregatedAttLock sync.RWMutex
aggregatedAtt map[[32]byte][]*ethpb.Attestation
unAggregateAttLock sync.RWMutex
unAggregatedAtt map[[32]byte]*ethpb.Attestation
forkchoiceAttLock sync.RWMutex
forkchoiceAtt map[[32]byte]*ethpb.Attestation
blockAttLock sync.RWMutex
blockAtt map[[32]byte][]*ethpb.Attestation
seenAggregatedAttLock sync.RWMutex
seenAggregatedAtt map[[32]byte][]bitfield.Bitlist
}
// NewAttCaches initializes a new attestation pool consists of multiple KV store in cache for
// various kind of attestations.
func NewAttCaches() *AttCaches {
pool := &AttCaches{
unAggregatedAtt: make(map[[32]byte]*ethpb.Attestation),
aggregatedAtt: make(map[[32]byte][]*ethpb.Attestation),
forkchoiceAtt: make(map[[32]byte]*ethpb.Attestation),
blockAtt: make(map[[32]byte][]*ethpb.Attestation),
unAggregatedAtt: make(map[[32]byte]*ethpb.Attestation),
aggregatedAtt: make(map[[32]byte][]*ethpb.Attestation),
forkchoiceAtt: make(map[[32]byte]*ethpb.Attestation),
blockAtt: make(map[[32]byte][]*ethpb.Attestation),
seenAggregatedAtt: make(map[[32]byte][]bitfield.Bitlist),
}
return pool

View File

@@ -16,11 +16,27 @@ func (p *AttCaches) SaveUnaggregatedAttestation(att *ethpb.Attestation) error {
return errors.New("attestation is aggregated")
}
r, err := hashFn(att)
r, err := hashFn(att.Data)
if err != nil {
return errors.Wrap(err, "could not tree hash attestation")
}
// Don't save the attestation if the bitfield has been contained in previous blocks.
p.seenAggregatedAttLock.RLock()
seenBits, ok := p.seenAggregatedAtt[r]
p.seenAggregatedAttLock.RUnlock()
if ok {
for _, bit := range seenBits {
if bit.Len() == att.AggregationBits.Len() && bit.Contains(att.AggregationBits) {
return nil
}
}
}
r, err = hashFn(att)
if err != nil {
return errors.Wrap(err, "could not tree hash attestation")
}
p.unAggregateAttLock.Lock()
defer p.unAggregateAttLock.Unlock()
p.unAggregatedAtt[r] = stateTrie.CopyAttestation(att) // Copied.

View File

@@ -6,6 +6,7 @@ import (
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
)
func TestKV_Unaggregated_SaveUnaggregatedAttestation(t *testing.T) {
@@ -15,34 +16,49 @@ func TestKV_Unaggregated_SaveUnaggregatedAttestation(t *testing.T) {
count int
wantErrString string
}{
//{
// name: "nil attestation",
// att: nil,
//},
//{
// name: "already aggregated",
// att: &ethpb.Attestation{AggregationBits: bitfield.Bitlist{0b10101}},
// wantErrString: "attestation is aggregated",
//},
//{
// name: "invalid hash",
// att: &ethpb.Attestation{
// Data: &ethpb.AttestationData{
// BeaconBlockRoot: []byte{0b0},
// },
// },
// wantErrString: "could not tree hash attestation: incorrect fixed bytes marshalling",
//},
//{
// name: "normal save",
// att: &ethpb.Attestation{AggregationBits: bitfield.Bitlist{0b0001}},
// count: 1,
//},
{
name: "nil attestation",
att: nil,
},
{
name: "already aggregated",
att: &ethpb.Attestation{AggregationBits: bitfield.Bitlist{0b10101}},
wantErrString: "attestation is aggregated",
},
{
name: "invalid hash",
name: "already seen",
att: &ethpb.Attestation{
Data: &ethpb.AttestationData{
BeaconBlockRoot: []byte{0b0},
Slot: 100,
},
AggregationBits: bitfield.Bitlist{0b10000001},
},
wantErrString: "could not tree hash attestation: incorrect fixed bytes marshalling",
},
{
name: "normal save",
att: &ethpb.Attestation{AggregationBits: bitfield.Bitlist{0b0001}},
count: 1,
count: 0,
},
}
r, err := hashFn(&ethpb.AttestationData{
Slot: 100,
})
require.NoError(t, err)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cache := NewAttCaches()
cache.seenAggregatedAtt[r] = []bitfield.Bitlist{{0xff}}
if len(cache.unAggregatedAtt) != 0 {
t.Errorf("Invalid start pool, atts: %d", len(cache.unAggregatedAtt))
}

View File

@@ -19,6 +19,7 @@ type Pool interface {
DeleteAggregatedAttestation(att *ethpb.Attestation) error
HasAggregatedAttestation(att *ethpb.Attestation) (bool, error)
AggregatedAttestationCount() int
ClearSeenAtts()
// For unaggregated attestations.
SaveUnaggregatedAttestation(att *ethpb.Attestation) error
SaveUnaggregatedAttestations(atts []*ethpb.Attestation) error

View File

@@ -1930,8 +1930,9 @@ func TestDeleteAttsInPool_Aggregated(t *testing.T) {
}
sig := bls.RandKey().Sign([]byte("foo")).Marshal()
aggregatedAtts := []*ethpb.Attestation{{AggregationBits: bitfield.Bitlist{0b10101}, Signature: sig}, {AggregationBits: bitfield.Bitlist{0b11010}, Signature: sig}}
unaggregatedAtts := []*ethpb.Attestation{{AggregationBits: bitfield.Bitlist{0b1001}, Signature: sig}, {AggregationBits: bitfield.Bitlist{0b0001}, Signature: sig}}
d := &ethpb.AttestationData{}
aggregatedAtts := []*ethpb.Attestation{{Data: d, AggregationBits: bitfield.Bitlist{0b10101}, Signature: sig}, {Data: d, AggregationBits: bitfield.Bitlist{0b11010}, Signature: sig}}
unaggregatedAtts := []*ethpb.Attestation{{Data: d, AggregationBits: bitfield.Bitlist{0b1001}, Signature: sig}, {Data: d, AggregationBits: bitfield.Bitlist{0b0001}, Signature: sig}}
require.NoError(t, s.AttPool.SaveAggregatedAttestations(aggregatedAtts))
require.NoError(t, s.AttPool.SaveUnaggregatedAttestations(unaggregatedAtts))