Remove unused attestation operations in DB (#6664)

* Remove attestation usages in DB
* Merge refs/heads/master into rm-unused
* Merge refs/heads/master into rm-unused
* Fixed export wrapper
* Merge branch 'rm-unused' of github.com:prysmaticlabs/prysm into rm-unused
This commit is contained in:
terence tsao
2020-07-21 11:17:33 -07:00
committed by GitHub
parent ca54c1d480
commit 367738e83b
8 changed files with 1 additions and 834 deletions

View File

@@ -18,10 +18,6 @@ import (
// ReadOnlyDatabase defines a struct which only has read access to database methods. // ReadOnlyDatabase defines a struct which only has read access to database methods.
type ReadOnlyDatabase interface { type ReadOnlyDatabase interface {
// Attestation related methods.
AttestationsByDataRoot(ctx context.Context, attDataRoot [32]byte) ([]*eth.Attestation, error)
Attestations(ctx context.Context, f *filters.QueryFilter) ([]*eth.Attestation, error)
HasAttestation(ctx context.Context, attDataRoot [32]byte) bool
// Block related methods. // Block related methods.
Block(ctx context.Context, blockRoot [32]byte) (*eth.SignedBeaconBlock, error) Block(ctx context.Context, blockRoot [32]byte) (*eth.SignedBeaconBlock, error)
Blocks(ctx context.Context, f *filters.QueryFilter) ([]*eth.SignedBeaconBlock, error) Blocks(ctx context.Context, f *filters.QueryFilter) ([]*eth.SignedBeaconBlock, error)
@@ -67,11 +63,6 @@ type ReadOnlyDatabase interface {
type NoHeadAccessDatabase interface { type NoHeadAccessDatabase interface {
ReadOnlyDatabase ReadOnlyDatabase
// Attestation related methods.
DeleteAttestation(ctx context.Context, attDataRoot [32]byte) error
DeleteAttestations(ctx context.Context, attDataRoots [][32]byte) error
SaveAttestation(ctx context.Context, att *eth.Attestation) error
SaveAttestations(ctx context.Context, atts []*eth.Attestation) error
// Block related methods. // Block related methods.
SaveBlock(ctx context.Context, block *eth.SignedBeaconBlock) error SaveBlock(ctx context.Context, block *eth.SignedBeaconBlock) error
SaveBlocks(ctx context.Context, blocks []*eth.SignedBeaconBlock) error SaveBlocks(ctx context.Context, blocks []*eth.SignedBeaconBlock) error

View File

@@ -80,29 +80,6 @@ func (e Exporter) Close() error {
return e.db.Close() return e.db.Close()
} }
// SaveAttestation publishes to the kafka topic for attestations.
func (e Exporter) SaveAttestation(ctx context.Context, att *eth.Attestation) error {
go func() {
if err := e.publish(ctx, "beacon_attestation", att); err != nil {
log.WithError(err).Error("Failed to publish attestation")
}
}()
return e.db.SaveAttestation(ctx, att)
}
// SaveAttestations publishes to the kafka topic for beacon attestations.
func (e Exporter) SaveAttestations(ctx context.Context, atts []*eth.Attestation) error {
go func() {
for _, att := range atts {
if err := e.publish(ctx, "beacon_attestation", att); err != nil {
log.WithError(err).Error("Failed to publish attestation")
}
}
}()
return e.db.SaveAttestations(ctx, atts)
}
// SaveBlock publishes to the kafka topic for beacon blocks. // SaveBlock publishes to the kafka topic for beacon blocks.
func (e Exporter) SaveBlock(ctx context.Context, block *eth.SignedBeaconBlock) error { func (e Exporter) SaveBlock(ctx context.Context, block *eth.SignedBeaconBlock) error {
go func() { go func() {

View File

@@ -28,31 +28,6 @@ func (e Exporter) Backup(ctx context.Context) error {
return e.db.Backup(ctx) return e.db.Backup(ctx)
} }
// AttestationsByDataRoot -- passthrough.
func (e Exporter) AttestationsByDataRoot(ctx context.Context, attDataRoot [32]byte) ([]*eth.Attestation, error) {
return e.db.AttestationsByDataRoot(ctx, attDataRoot)
}
// Attestations -- passthrough.
func (e Exporter) Attestations(ctx context.Context, f *filters.QueryFilter) ([]*eth.Attestation, error) {
return e.db.Attestations(ctx, f)
}
// HasAttestation -- passthrough.
func (e Exporter) HasAttestation(ctx context.Context, attDataRoot [32]byte) bool {
return e.db.HasAttestation(ctx, attDataRoot)
}
// DeleteAttestation -- passthrough.
func (e Exporter) DeleteAttestation(ctx context.Context, attDataRoot [32]byte) error {
return e.db.DeleteAttestation(ctx, attDataRoot)
}
// DeleteAttestations -- passthrough.
func (e Exporter) DeleteAttestations(ctx context.Context, attDataRoots [][32]byte) error {
return e.db.DeleteAttestations(ctx, attDataRoots)
}
// Block -- passthrough. // Block -- passthrough.
func (e Exporter) Block(ctx context.Context, blockRoot [32]byte) (*eth.SignedBeaconBlock, error) { func (e Exporter) Block(ctx context.Context, blockRoot [32]byte) (*eth.SignedBeaconBlock, error) {
return e.db.Block(ctx, blockRoot) return e.db.Block(ctx, blockRoot)

View File

@@ -6,7 +6,6 @@ go_library(
srcs = [ srcs = [
"archive.go", "archive.go",
"archived_point.go", "archived_point.go",
"attestations.go",
"backup.go", "backup.go",
"blocks.go", "blocks.go",
"check_historical_state.go", "check_historical_state.go",
@@ -65,7 +64,6 @@ go_test(
srcs = [ srcs = [
"archive_test.go", "archive_test.go",
"archived_point_test.go", "archived_point_test.go",
"attestations_test.go",
"backup_test.go", "backup_test.go",
"blocks_test.go", "blocks_test.go",
"check_historical_test_test.go", "check_historical_test_test.go",
@@ -95,7 +93,6 @@ go_test(
"@com_github_ethereum_go_ethereum//common:go_default_library", "@com_github_ethereum_go_ethereum//common:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library", "@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library", "@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
"@com_github_prysmaticlabs_go_ssz//:go_default_library", "@com_github_prysmaticlabs_go_ssz//:go_default_library",
"@in_gopkg_d4l3k_messagediff_v1//:go_default_library", "@in_gopkg_d4l3k_messagediff_v1//:go_default_library",
"@io_etcd_go_bbolt//:go_default_library", "@io_etcd_go_bbolt//:go_default_library",

View File

@@ -1,279 +0,0 @@
package kv
import (
"context"
"fmt"
"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/db/filters"
"github.com/prysmaticlabs/prysm/beacon-chain/state/stateutil"
dbpb "github.com/prysmaticlabs/prysm/proto/beacon/db"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/sliceutil"
"github.com/prysmaticlabs/prysm/shared/traceutil"
bolt "go.etcd.io/bbolt"
"go.opencensus.io/trace"
)
// AttestationsByDataRoot returns any (aggregated) attestations matching this data root.
func (kv *Store) AttestationsByDataRoot(ctx context.Context, attDataRoot [32]byte) ([]*ethpb.Attestation, error) {
ctx, span := trace.StartSpan(ctx, "BeaconDB.Attestation")
defer span.End()
var atts []*ethpb.Attestation
err := kv.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(attestationsBucket)
enc := bkt.Get(attDataRoot[:])
if enc == nil {
return nil
}
ac := &dbpb.AttestationContainer{}
if err := decode(ctx, enc, ac); err != nil {
return err
}
atts = ac.ToAttestations()
return nil
})
if err != nil {
traceutil.AnnotateError(span, err)
}
return atts, err
}
// Attestations retrieves a list of attestations by filter criteria.
func (kv *Store) Attestations(ctx context.Context, f *filters.QueryFilter) ([]*ethpb.Attestation, error) {
ctx, span := trace.StartSpan(ctx, "BeaconDB.Attestations")
defer span.End()
atts := make([]*ethpb.Attestation, 0)
err := kv.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(attestationsBucket)
// If no filter criteria are specified, return an error.
if f == nil {
return errors.New("must specify a filter criteria for retrieving attestations")
}
// Creates a list of indices from the passed in filter values, such as:
// []byte("parent-root-0x2093923"), etc. to be used for looking up
// block roots that were stored under each of those indices for O(1) lookup.
indicesByBucket, err := createAttestationIndicesFromFilters(ctx, f)
if err != nil {
return errors.Wrap(err, "could not determine lookup indices")
}
// Once we have a list of attestation data roots that correspond to each
// lookup index, we find the intersection across all of them and use
// that list of roots to lookup the attestations. These attestations will
// meet the filter criteria.
keys := sliceutil.IntersectionByteSlices(lookupValuesForIndices(ctx, indicesByBucket, tx)...)
for i := 0; i < len(keys); i++ {
encoded := bkt.Get(keys[i])
ac := &dbpb.AttestationContainer{}
if err := decode(ctx, encoded, ac); err != nil {
return err
}
atts = append(atts, ac.ToAttestations()...)
}
return nil
})
return atts, err
}
// HasAttestation checks if an attestation by its attestation data root exists in the db.
func (kv *Store) HasAttestation(ctx context.Context, attDataRoot [32]byte) bool {
ctx, span := trace.StartSpan(ctx, "BeaconDB.HasAttestation")
defer span.End()
exists := false
if err := kv.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(attestationsBucket)
exists = bkt.Get(attDataRoot[:]) != nil
return nil
}); err != nil { // This view never returns an error, but we'll handle anyway for sanity.
panic(err)
}
return exists
}
// DeleteAttestation by attestation data root.
func (kv *Store) DeleteAttestation(ctx context.Context, attDataRoot [32]byte) error {
ctx, span := trace.StartSpan(ctx, "BeaconDB.DeleteAttestation")
defer span.End()
return kv.DeleteAttestations(ctx, [][32]byte{attDataRoot})
}
// DeleteAttestations by attestation data roots.
func (kv *Store) DeleteAttestations(ctx context.Context, attDataRoots [][32]byte) error {
ctx, span := trace.StartSpan(ctx, "BeaconDB.DeleteAttestations")
defer span.End()
return kv.db.Update(func(tx *bolt.Tx) error {
bkt := tx.Bucket(attestationsBucket)
for _, attDataRoot := range attDataRoots {
enc := bkt.Get(attDataRoot[:])
if enc == nil {
continue
}
ac := &dbpb.AttestationContainer{}
if err := decode(ctx, enc, ac); err != nil {
return err
}
indicesByBucket := createAttestationIndicesFromData(ctx, ac.Data)
if err := deleteValueForIndices(ctx, indicesByBucket, attDataRoot[:], tx); err != nil {
return errors.Wrap(err, "could not delete root for DB indices")
}
if err := bkt.Delete(attDataRoot[:]); err != nil {
return err
}
}
return nil
})
}
// SaveAttestation to the db.
func (kv *Store) SaveAttestation(ctx context.Context, att *ethpb.Attestation) error {
ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveAttestation")
defer span.End()
return kv.SaveAttestations(ctx, []*ethpb.Attestation{att})
}
// SaveAttestations via batch updates to the db.
func (kv *Store) SaveAttestations(ctx context.Context, atts []*ethpb.Attestation) error {
ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveAttestations")
defer span.End()
err := kv.db.Update(func(tx *bolt.Tx) error {
bkt := tx.Bucket(attestationsBucket)
for _, att := range atts {
// Aggregation bits are required to store attestations within the attestation container. Missing
// this field may cause silent failures or unexpected results.
if att.AggregationBits == nil {
traceutil.AnnotateError(span, errors.New("attestation has nil aggregation bitlist"))
continue
}
attDataRoot, err := stateutil.AttestationDataRoot(att.Data)
if err != nil {
return err
}
ac := &dbpb.AttestationContainer{
Data: att.Data,
}
existingEnc := bkt.Get(attDataRoot[:])
if existingEnc != nil {
if err := decode(ctx, existingEnc, ac); err != nil {
return err
}
}
ac.InsertAttestation(att)
enc, err := encode(ctx, ac)
if err != nil {
return err
}
indicesByBucket := createAttestationIndicesFromData(ctx, att.Data)
if err := updateValueForIndices(ctx, indicesByBucket, attDataRoot[:], tx); err != nil {
return errors.Wrap(err, "could not update DB indices")
}
if err := bkt.Put(attDataRoot[:], enc); err != nil {
return err
}
}
return nil
})
if err != nil {
traceutil.AnnotateError(span, err)
}
return err
}
// createAttestationIndicesFromData takes in attestation data and returns
// a map of bolt DB index buckets corresponding to each particular key for indices for
// data, such as (shard indices bucket -> shard 5).
func createAttestationIndicesFromData(ctx context.Context, attData *ethpb.AttestationData) map[string][]byte {
ctx, span := trace.StartSpan(ctx, "BeaconDB.createAttestationIndicesFromData")
defer span.End()
indicesByBucket := make(map[string][]byte)
buckets := make([][]byte, 0)
indices := make([][]byte, 0)
if attData.Source != nil {
buckets = append(buckets, attestationSourceEpochIndicesBucket)
indices = append(indices, bytesutil.Uint64ToBytesLittleEndian(attData.Source.Epoch))
if attData.Source.Root != nil && len(attData.Source.Root) > 0 {
buckets = append(buckets, attestationSourceRootIndicesBucket)
indices = append(indices, attData.Source.Root)
}
}
if attData.Target != nil {
buckets = append(buckets, attestationTargetEpochIndicesBucket)
indices = append(indices, bytesutil.Uint64ToBytesLittleEndian(attData.Target.Epoch))
if attData.Target.Root != nil && len(attData.Target.Root) > 0 {
buckets = append(buckets, attestationTargetRootIndicesBucket)
indices = append(indices, attData.Target.Root)
}
}
if attData.BeaconBlockRoot != nil && len(attData.BeaconBlockRoot) > 0 {
buckets = append(buckets, attestationHeadBlockRootBucket)
indices = append(indices, attData.BeaconBlockRoot)
}
for i := 0; i < len(buckets); i++ {
indicesByBucket[string(buckets[i])] = indices[i]
}
return indicesByBucket
}
// createAttestationIndicesFromFilters takes in filter criteria and returns
// a list of of byte keys used to retrieve the values stored
// for the indices from the DB.
//
// For attestations, these are list of hash tree roots of attestation.Data
// objects. If a certain filter criterion does not apply to
// attestations, an appropriate error is returned.
func createAttestationIndicesFromFilters(ctx context.Context, f *filters.QueryFilter) (map[string][]byte, error) {
ctx, span := trace.StartSpan(ctx, "BeaconDB.createAttestationIndicesFromFilters")
defer span.End()
indicesByBucket := make(map[string][]byte)
for k, v := range f.Filters() {
switch k {
case filters.HeadBlockRoot:
headBlockRoot, ok := v.([]byte)
if !ok {
return nil, errors.New("headBlockRoot is not type []byte")
}
indicesByBucket[string(attestationHeadBlockRootBucket)] = headBlockRoot
case filters.SourceRoot:
sourceRoot, ok := v.([]byte)
if !ok {
return nil, errors.New("sourceRoot is not type []byte")
}
indicesByBucket[string(attestationSourceRootIndicesBucket)] = sourceRoot
case filters.SourceEpoch:
sourceEpoch, ok := v.(uint64)
if !ok {
return nil, errors.New("sourceEpoch is not type uint64")
}
indicesByBucket[string(attestationSourceEpochIndicesBucket)] = bytesutil.Uint64ToBytesLittleEndian(sourceEpoch)
case filters.TargetEpoch:
targetEpoch, ok := v.(uint64)
if !ok {
return nil, errors.New("targetEpoch is not type uint64")
}
indicesByBucket[string(attestationTargetEpochIndicesBucket)] = bytesutil.Uint64ToBytesLittleEndian(targetEpoch)
case filters.TargetRoot:
targetRoot, ok := v.([]byte)
if !ok {
return nil, errors.New("targetRoot is not type []byte")
}
indicesByBucket[string(attestationTargetRootIndicesBucket)] = targetRoot
default:
return nil, fmt.Errorf("filter criterion %v not supported for attestations", k)
}
}
return indicesByBucket, nil
}

View File

@@ -1,477 +0,0 @@
package kv
import (
"bytes"
"context"
"reflect"
"sort"
"sync"
"testing"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/gogo/protobuf/proto"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/go-ssz"
"github.com/prysmaticlabs/prysm/beacon-chain/db/filters"
"github.com/prysmaticlabs/prysm/beacon-chain/state/stateutil"
)
func TestStore_AttestationCRUD(t *testing.T) {
db := setupDB(t)
att := &ethpb.Attestation{
Data: &ethpb.AttestationData{
Slot: 10,
BeaconBlockRoot: make([]byte, 32),
Source: &ethpb.Checkpoint{
Root: make([]byte, 32),
},
Target: &ethpb.Checkpoint{
Root: make([]byte, 32),
},
},
AggregationBits: bitfield.Bitlist{0b00000001, 0b1},
}
ctx := context.Background()
attDataRoot, err := stateutil.AttestationDataRoot(att.Data)
if err != nil {
t.Fatal(err)
}
retrievedAtts, err := db.AttestationsByDataRoot(ctx, attDataRoot)
if err != nil {
t.Fatal(err)
}
if len(retrievedAtts) != 0 {
t.Errorf("Expected no attestations, received %v", retrievedAtts)
}
if err := db.SaveAttestation(ctx, att); err != nil {
t.Fatal(err)
}
if !db.HasAttestation(ctx, attDataRoot) {
t.Error("Expected attestation to exist in the db")
}
retrievedAtts, err = db.AttestationsByDataRoot(ctx, attDataRoot)
if err != nil {
t.Fatal(err)
}
if !proto.Equal(att, retrievedAtts[0]) {
t.Errorf("Wanted %v, received %v", att, retrievedAtts[0])
}
if err := db.DeleteAttestation(ctx, attDataRoot); err != nil {
t.Fatal(err)
}
if db.HasAttestation(ctx, attDataRoot) {
t.Error("Expected attestation to have been deleted from the db")
}
}
func TestStore_AttestationsBatchDelete(t *testing.T) {
db := setupDB(t)
ctx := context.Background()
numAtts := 10
blockRoot := bytesutil.PadTo([]byte("head"), 32)
totalAtts := make([]*ethpb.Attestation, numAtts)
// We track the data roots for the even indexed attestations.
attDataRoots := make([][32]byte, 0)
oddAtts := make([]*ethpb.Attestation, 0)
for i := 0; i < len(totalAtts); i++ {
totalAtts[i] = &ethpb.Attestation{
Data: &ethpb.AttestationData{
Slot: uint64(i),
BeaconBlockRoot: blockRoot,
Source: &ethpb.Checkpoint{
Root: make([]byte, 32),
},
Target: &ethpb.Checkpoint{
Root: make([]byte, 32),
},
},
AggregationBits: bitfield.Bitlist{0b00000001, 0b1},
}
if i%2 == 0 {
r, err := stateutil.AttestationDataRoot(totalAtts[i].Data)
if err != nil {
t.Fatal(err)
}
attDataRoots = append(attDataRoots, r)
} else {
oddAtts = append(oddAtts, totalAtts[i])
}
}
if err := db.SaveAttestations(ctx, totalAtts); err != nil {
t.Fatal(err)
}
retrieved, err := db.Attestations(ctx, filters.NewFilter().SetHeadBlockRoot(blockRoot))
if err != nil {
t.Fatal(err)
}
if len(retrieved) != numAtts {
t.Errorf("Received %d attestations, wanted 1000", len(retrieved))
}
// We delete all even indexed attestation.
if err := db.DeleteAttestations(ctx, attDataRoots); err != nil {
t.Fatal(err)
}
// When we retrieve the data, only the odd indexed attestations should remain.
retrieved, err = db.Attestations(ctx, filters.NewFilter().SetHeadBlockRoot(blockRoot))
if err != nil {
t.Fatal(err)
}
sort.Slice(retrieved, func(i, j int) bool {
return retrieved[i].Data.Slot < retrieved[j].Data.Slot
})
if !reflect.DeepEqual(retrieved, oddAtts) {
t.Errorf("Wanted %v, received %v", oddAtts, retrieved)
}
}
func TestStore_BoltDontPanic(t *testing.T) {
db := setupDB(t)
var wg sync.WaitGroup
for i := 0; i <= 100; i++ {
att := &ethpb.Attestation{
Data: &ethpb.AttestationData{
Slot: uint64(i),
BeaconBlockRoot: make([]byte, 32),
Source: &ethpb.Checkpoint{
Root: make([]byte, 32)},
Target: &ethpb.Checkpoint{
Root: make([]byte, 32),
},
},
AggregationBits: bitfield.Bitlist{0b11},
}
ctx := context.Background()
attDataRoot, err := stateutil.AttestationDataRoot(att.Data)
if err != nil {
t.Fatal(err)
}
retrievedAtts, err := db.AttestationsByDataRoot(ctx, attDataRoot)
if err != nil {
t.Fatal(err)
}
if len(retrievedAtts) != 0 {
t.Errorf("Expected no attestation, received %v", retrievedAtts)
}
if err := db.SaveAttestation(ctx, att); err != nil {
t.Fatal(err)
}
}
// if indices are improperly deleted this test will then panic.
for i := 0; i <= 100; i++ {
startEpoch := i + 1
wg.Add(1)
go func() {
att := &ethpb.Attestation{
Data: &ethpb.AttestationData{
Slot: uint64(startEpoch),
BeaconBlockRoot: make([]byte, 32),
Source: &ethpb.Checkpoint{
Root: make([]byte, 32)},
Target: &ethpb.Checkpoint{
Root: make([]byte, 32),
},
},
AggregationBits: bitfield.Bitlist{0b11},
}
ctx := context.Background()
attDataRoot, err := stateutil.AttestationDataRoot(att.Data)
if err != nil {
t.Fatal(err)
}
if err := db.DeleteAttestation(ctx, attDataRoot); err != nil {
t.Fatal(err)
}
if db.HasAttestation(ctx, attDataRoot) {
t.Error("Expected attestation to have been deleted from the db")
}
wg.Done()
}()
}
wg.Wait()
}
func TestStore_Attestations_FiltersCorrectly(t *testing.T) {
db := setupDB(t)
someRoot := [32]byte{1, 2, 3}
otherRoot := [32]byte{4, 5, 6}
atts := []*ethpb.Attestation{
{
Data: &ethpb.AttestationData{
BeaconBlockRoot: someRoot[:],
Source: &ethpb.Checkpoint{
Root: someRoot[:],
Epoch: 5,
},
Target: &ethpb.Checkpoint{
Root: someRoot[:],
Epoch: 7,
},
},
AggregationBits: bitfield.Bitlist{0b11},
},
{
Data: &ethpb.AttestationData{
BeaconBlockRoot: someRoot[:],
Source: &ethpb.Checkpoint{
Root: otherRoot[:],
Epoch: 5,
},
Target: &ethpb.Checkpoint{
Root: otherRoot[:],
Epoch: 7,
},
},
AggregationBits: bitfield.Bitlist{0b11},
},
{
Data: &ethpb.AttestationData{
BeaconBlockRoot: otherRoot[:],
Source: &ethpb.Checkpoint{
Root: someRoot[:],
Epoch: 7,
},
Target: &ethpb.Checkpoint{
Root: someRoot[:],
Epoch: 5,
},
},
AggregationBits: bitfield.Bitlist{0b11},
},
}
ctx := context.Background()
if err := db.SaveAttestations(ctx, atts); err != nil {
t.Fatal(err)
}
tests := []struct {
filter *filters.QueryFilter
expectedNumAtt int
}{
{
filter: filters.NewFilter().
SetSourceEpoch(5),
expectedNumAtt: 2,
},
{
filter: filters.NewFilter().
SetHeadBlockRoot(someRoot[:]),
expectedNumAtt: 2,
},
{
filter: filters.NewFilter().
SetHeadBlockRoot(otherRoot[:]),
expectedNumAtt: 1,
},
{
filter: filters.NewFilter().SetTargetEpoch(7),
expectedNumAtt: 2,
},
{
// Only two attestation in the list meet the composite filter criteria above.
filter: filters.NewFilter().
SetHeadBlockRoot(someRoot[:]).
SetTargetEpoch(7),
expectedNumAtt: 2,
},
{
// No attestation meets the criteria below.
filter: filters.NewFilter().
SetTargetEpoch(1000),
expectedNumAtt: 0,
},
}
for _, tt := range tests {
retrievedAtts, err := db.Attestations(ctx, tt.filter)
if err != nil {
t.Fatal(err)
}
if len(retrievedAtts) != tt.expectedNumAtt {
t.Errorf("Expected %d attestations, received %d", tt.expectedNumAtt, len(retrievedAtts))
}
}
}
func TestStore_DuplicatedAttestations_FiltersCorrectly(t *testing.T) {
db := setupDB(t)
someRoot := [32]byte{1, 2, 3}
att := &ethpb.Attestation{
Data: &ethpb.AttestationData{
BeaconBlockRoot: someRoot[:],
Source: &ethpb.Checkpoint{
Root: someRoot[:],
Epoch: 5,
},
Target: &ethpb.Checkpoint{
Root: someRoot[:],
Epoch: 7,
},
},
AggregationBits: bitfield.Bitlist{0b11},
}
atts := []*ethpb.Attestation{att, att, att}
ctx := context.Background()
if err := db.SaveAttestations(ctx, atts); err != nil {
t.Fatal(err)
}
retrievedAtts, err := db.Attestations(ctx, filters.NewFilter().
SetHeadBlockRoot(someRoot[:]))
if err != nil {
t.Fatal(err)
}
if len(retrievedAtts) != 1 {
t.Errorf("Expected %d attestations, received %d", 1, len(retrievedAtts))
}
att1, ok := proto.Clone(att).(*ethpb.Attestation)
if !ok {
t.Error("Entity is not of type *ethpb.Attestation")
}
att1.Data.Source.Epoch = 6
atts = []*ethpb.Attestation{att, att, att, att1, att1, att1}
if err := db.SaveAttestations(ctx, atts); err != nil {
t.Fatal(err)
}
retrievedAtts, err = db.Attestations(ctx, filters.NewFilter().
SetHeadBlockRoot(someRoot[:]))
if err != nil {
t.Fatal(err)
}
if len(retrievedAtts) != 2 {
t.Errorf("Expected %d attestations, received %d", 1, len(retrievedAtts))
}
retrievedAtts, err = db.Attestations(ctx, filters.NewFilter().
SetHeadBlockRoot(someRoot[:]).SetSourceEpoch(5))
if err != nil {
t.Fatal(err)
}
if len(retrievedAtts) != 1 {
t.Errorf("Expected %d attestations, received %d", 1, len(retrievedAtts))
}
retrievedAtts, err = db.Attestations(ctx, filters.NewFilter().
SetHeadBlockRoot(someRoot[:]).SetSourceEpoch(6))
if err != nil {
t.Fatal(err)
}
if len(retrievedAtts) != 1 {
t.Errorf("Expected %d attestations, received %d", 1, len(retrievedAtts))
}
}
func TestStore_Attestations_BitfieldLogic(t *testing.T) {
commonData := &ethpb.AttestationData{Slot: 10}
tests := []struct {
name string
input []*ethpb.Attestation
output []*ethpb.Attestation
}{
{
name: "all distinct aggregation bitfields",
input: []*ethpb.Attestation{
{
Data: commonData,
AggregationBits: []byte{0b10000001},
},
{
Data: commonData,
AggregationBits: []byte{0b10000010},
},
},
output: []*ethpb.Attestation{
{
Data: commonData,
AggregationBits: []byte{0b10000001},
},
{
Data: commonData,
AggregationBits: []byte{0b10000010},
},
},
},
{
name: "Incoming attestation is fully contained already",
input: []*ethpb.Attestation{
{
Data: commonData,
AggregationBits: []byte{0b11111111},
},
{
Data: commonData,
AggregationBits: []byte{0b10000010},
},
},
output: []*ethpb.Attestation{
{
Data: commonData,
AggregationBits: []byte{0b11111111},
},
},
},
{
name: "Existing attestations are fully contained incoming attestation",
input: []*ethpb.Attestation{
{
Data: commonData,
AggregationBits: []byte{0b10000001},
},
{
Data: commonData,
AggregationBits: []byte{0b10000010},
},
{
Data: commonData,
AggregationBits: []byte{0b11111111},
},
},
output: []*ethpb.Attestation{
{
Data: commonData,
AggregationBits: []byte{0b11111111},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
db := setupDB(t)
ctx := context.Background()
if err := db.SaveAttestations(ctx, tt.input); err != nil {
t.Fatal(err)
}
r, err := ssz.HashTreeRoot(tt.input[0].Data)
if err != nil {
t.Fatal(err)
}
output, err := db.AttestationsByDataRoot(ctx, r)
if err != nil {
t.Fatal(err)
}
if len(output) != len(tt.output) {
t.Fatalf(
"Wrong number of attestations returned. Got %d attestations but wanted %d",
len(output),
len(tt.output),
)
}
sort.Slice(output, func(i, j int) bool {
return output[i].AggregationBits.Bytes()[0] < output[j].AggregationBits.Bytes()[0]
})
sort.Slice(tt.output, func(i, j int) bool {
return tt.output[i].AggregationBits.Bytes()[0] < tt.output[j].AggregationBits.Bytes()[0]
})
for i, att := range output {
if !bytes.Equal(att.AggregationBits, tt.output[i].AggregationBits) {
t.Errorf("Aggregation bits are not the same. Got %b, wanted %b", att.AggregationBits, tt.output[i].AggregationBits)
}
}
})
}
}

View File

@@ -8,11 +8,9 @@ go_library(
visibility = ["//visibility:private"], visibility = ["//visibility:private"],
deps = [ deps = [
"//beacon-chain/cache:go_default_library", "//beacon-chain/cache:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/db:go_default_library", "//beacon-chain/db:go_default_library",
"//beacon-chain/db/filters:go_default_library", "//beacon-chain/db/filters:go_default_library",
"//beacon-chain/state/stateutil:go_default_library", "//beacon-chain/state/stateutil:go_default_library",
"//shared/attestationutil:go_default_library",
"//shared/bytesutil:go_default_library", "//shared/bytesutil:go_default_library",
"@com_github_emicklei_dot//:go_default_library", "@com_github_emicklei_dot//:go_default_library",
], ],

View File

@@ -17,11 +17,9 @@ import (
"github.com/emicklei/dot" "github.com/emicklei/dot"
"github.com/prysmaticlabs/prysm/beacon-chain/cache" "github.com/prysmaticlabs/prysm/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/db" "github.com/prysmaticlabs/prysm/beacon-chain/db"
"github.com/prysmaticlabs/prysm/beacon-chain/db/filters" "github.com/prysmaticlabs/prysm/beacon-chain/db/filters"
"github.com/prysmaticlabs/prysm/beacon-chain/state/stateutil" "github.com/prysmaticlabs/prysm/beacon-chain/state/stateutil"
"github.com/prysmaticlabs/prysm/shared/attestationutil"
"github.com/prysmaticlabs/prysm/shared/bytesutil" "github.com/prysmaticlabs/prysm/shared/bytesutil"
) )
@@ -68,8 +66,6 @@ func main() {
} }
m[r] = &node{score: make(map[uint64]bool)} m[r] = &node{score: make(map[uint64]bool)}
// Gather votes from the attestations voted for this block
atts, err := db.Attestations(context.Background(), filters.NewFilter().SetHeadBlockRoot(r[:]))
state, err := db.State(context.Background(), r) state, err := db.State(context.Background(), r)
if err != nil { if err != nil {
panic(err) panic(err)
@@ -92,21 +88,10 @@ func main() {
panic(err) panic(err)
} }
} }
// Retrieve attestation indices
for _, att := range atts {
committee, err := helpers.BeaconCommitteeFromState(state, att.Data.Slot, att.Data.CommitteeIndex)
if err != nil {
panic(err)
}
indices := attestationutil.AttestingIndices(att.AggregationBits, committee)
for _, i := range indices {
m[r].score[i] = true
}
}
// Construct label of each node. // Construct label of each node.
rStr := hex.EncodeToString(r[:2]) rStr := hex.EncodeToString(r[:2])
label := "slot: " + strconv.Itoa(int(b.Block.Slot)) + "\n root: " + rStr + "\n votes: " + strconv.Itoa(len(m[r].score)) label := "slot: " + strconv.Itoa(int(b.Block.Slot)) + "\n root: " + rStr
dotN := graph.Node(rStr).Box().Attr("label", label) dotN := graph.Node(rStr).Box().Attr("label", label)
n := &node{ n := &node{