Retrieve Attestations from Operation Service (#1606)

* fixed epoch_processing

* test p2p

* attestation pool to retrieve attestations up to max config amount

* lint

* update comments

* lint
This commit is contained in:
terence tsao
2019-02-15 11:27:45 -08:00
committed by Raul Jordan
parent a57912dd5a
commit c8a170dbad
5 changed files with 185 additions and 0 deletions

View File

@@ -24,6 +24,17 @@ func (db *BeaconDB) SaveAttestation(attestation *pb.Attestation) error {
})
}
// DeleteAttestation deletes the attestation record into the beacon chain db.
func (db *BeaconDB) DeleteAttestation(attestation *pb.Attestation) error {
hash := att.Key(attestation.Data)
return db.update(func(tx *bolt.Tx) error {
a := tx.Bucket(attestationBucket)
return a.Delete(hash[:])
})
}
// Attestation retrieves an attestation record from the db using its hash.
func (db *BeaconDB) Attestation(hash [32]byte) (*pb.Attestation, error) {
var attestation *pb.Attestation
@@ -43,6 +54,29 @@ func (db *BeaconDB) Attestation(hash [32]byte) (*pb.Attestation, error) {
return attestation, err
}
// Attestations retrieves all the attestation records from the db.
// These are the attestations that have not been seen on the beacon chain.
func (db *BeaconDB) Attestations() ([]*pb.Attestation, error) {
var attestations []*pb.Attestation
err := db.view(func(tx *bolt.Tx) error {
a := tx.Bucket(attestationBucket)
if err := a.ForEach(func(k, v []byte) error {
attestation, err := createAttestation(v)
if err != nil {
return err
}
attestations = append(attestations, attestation)
return nil
}); err != nil {
return err
}
return nil
})
return attestations, err
}
// HasAttestation checks if the attestation exists.
func (db *BeaconDB) HasAttestation(hash [32]byte) bool {
exists := false

View File

@@ -2,6 +2,8 @@ package db
import (
"bytes"
"reflect"
"sort"
"testing"
"github.com/gogo/protobuf/proto"
@@ -43,6 +45,72 @@ func TestSaveAndRetrieveAttestation(t *testing.T) {
}
}
func TestRetrieveAttestations(t *testing.T) {
db := setupDB(t)
defer teardownDB(t, db)
// Generate 100 unique attestations to save in DB.
attestations := make([]*pb.Attestation, 100)
for i := 0; i < len(attestations); i++ {
attestations[i] = &pb.Attestation{
Data: &pb.AttestationData{
Slot: uint64(i),
Shard: uint64(i),
},
}
if err := db.SaveAttestation(attestations[i]); err != nil {
t.Fatalf("Failed to save attestation: %v", err)
}
}
retrievedAttestations, err := db.Attestations()
if err != nil {
t.Fatalf("Could not retrieve attestations: %v", err)
}
// Sort the retrieved attestations based on slot ordering for comparison.
sort.Slice(retrievedAttestations, func(i, j int) bool {
return retrievedAttestations[i].Data.Slot < retrievedAttestations[j].Data.Slot
})
if !reflect.DeepEqual(retrievedAttestations, attestations) {
t.Log("Retrieved attestations did not match generated attestations")
}
}
func TestDeleteAttestation(t *testing.T) {
db := setupDB(t)
defer teardownDB(t, db)
a := &pb.Attestation{
Data: &pb.AttestationData{
Slot: 0,
Shard: 0,
},
}
if err := db.SaveAttestation(a); err != nil {
t.Fatalf("Could not save attestation: %v", err)
}
aHash := att.Key(a.GetData())
aPrime, err := db.Attestation(aHash)
if err != nil {
t.Fatalf("Could not call Attestation: %v", err)
}
if !reflect.DeepEqual(aPrime, a) {
t.Errorf("Saved attestation and retrieved attestation are not equal")
}
if err := db.DeleteAttestation(a); err != nil {
t.Fatalf("Could not delete attestation: %v", err)
}
if db.HasAttestation(aHash) {
t.Error("Deleted attestation still there")
}
}
func TestNilAttestation(t *testing.T) {
db := setupDB(t)
defer teardownDB(t, db)

View File

@@ -10,6 +10,7 @@ go_library(
"//proto/beacon/p2p/v1:go_default_library",
"//shared/event:go_default_library",
"//shared/hashutil:go_default_library",
"//shared/params:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],
)
@@ -22,6 +23,7 @@ go_test(
"//beacon-chain/internal:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//shared/hashutil:go_default_library",
"//shared/params:go_default_library",
"//shared/testutil:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_sirupsen_logrus//hooks/test:go_default_library",

View File

@@ -3,11 +3,14 @@ package operations
import (
"context"
"fmt"
"sort"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/event"
"github.com/prysmaticlabs/prysm/shared/hashutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/sirupsen/logrus"
)
@@ -82,6 +85,32 @@ func (s *Service) IncomingAttFeed() *event.Feed {
return s.incomingAttFeed
}
// Attestations returns the attestations that have not seen on the beacon chain, the attestations are
// returns in slot ascending order and up to MaxAttestations capacity. The attestations get
// deleted in DB after they have been retrieved.
func (s *Service) Attestations() ([]*pb.Attestation, error) {
var attestations []*pb.Attestation
attestationsFromDB, err := s.beaconDB.Attestations()
if err != nil {
return nil, fmt.Errorf("could not retrieve attestations from DB")
}
sort.Slice(attestationsFromDB, func(i, j int) bool {
return attestationsFromDB[i].Data.Slot < attestationsFromDB[j].Data.Slot
})
for i, attestation := range attestationsFromDB {
// Stop the max attestation number per beacon block is reached.
if uint64(i) == params.BeaconConfig().MaxAttestations {
break
}
attestations = append(attestations, attestation)
// Delete attestation from DB after retrieval.
if err := s.beaconDB.DeleteAttestation(attestationsFromDB[i]); err != nil {
return nil, fmt.Errorf("could not delete attestation %v", attestationsFromDB[i])
}
}
return attestations, nil
}
// saveOperations saves the newly broadcasted beacon block operations
// that was received from sync service.
func (s *Service) saveOperations() {

View File

@@ -4,11 +4,13 @@ import (
"context"
"errors"
"fmt"
"reflect"
"testing"
"github.com/prysmaticlabs/prysm/beacon-chain/internal"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/hashutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/testutil"
"github.com/sirupsen/logrus"
logTest "github.com/sirupsen/logrus/hooks/test"
@@ -105,3 +107,53 @@ func TestIncomingAttestation_Ok(t *testing.T) {
want := fmt.Sprintf("Attestation %#x saved in db", hash)
testutil.AssertLogsContain(t, hook, want)
}
func TestRetrieveAttestations_Ok(t *testing.T) {
beaconDB := internal.SetupDB(t)
defer internal.TeardownDB(t, beaconDB)
service := NewOperationService(context.Background(), &Config{BeaconDB: beaconDB})
// Save 140 attestations for test. During 1st retrieval we should get slot:0 - slot:128 attestations,
// 2nd retrieval we should get slot:128 - slot:140 attestations.
// Max attestation config value is set to 128.
origAttestations := make([]*pb.Attestation, 140)
for i := 0; i < len(origAttestations); i++ {
origAttestations[i] = &pb.Attestation{
Data: &pb.AttestationData{
Slot: uint64(i),
Shard: uint64(i),
},
}
if err := service.beaconDB.SaveAttestation(origAttestations[i]); err != nil {
t.Fatalf("Failed to save attestation: %v", err)
}
}
// Test we can retrieve attestations from slot0 - slot127 (Max attestation amount).
attestations, err := service.Attestations()
if err != nil {
t.Fatalf("Could not retrieve attestations: %v", err)
}
if !reflect.DeepEqual(attestations, origAttestations[0:params.BeaconConfig().MaxAttestations]) {
t.Errorf("Retrieved attestations did not match prev generated attestations for the first %d",
params.BeaconConfig().MaxAttestations)
}
// Test we can retrieve attestations from slot128 - slot139.
attestations, err = service.Attestations()
if err != nil {
t.Fatalf("Could not retrieve attestations: %v", err)
}
if !reflect.DeepEqual(attestations, origAttestations[params.BeaconConfig().MaxAttestations:]) {
t.Errorf("Retrieved attestations did not match prev generated attestations for the first %d",
params.BeaconConfig().MaxAttestations)
}
// Verify attestation pool is empty now we have retrieved everything.
attestations, err = service.Attestations()
if err != nil {
t.Fatalf("Could not retrieve attestations: %v", err)
}
if len(attestations) != 0 {
t.Errorf("Attestation pool should be empty but got a length of %d", len(attestations))
}
}