Operation service prunes invalid attestation (#2439)

* prune atts > one epoch old

* use VerifyAttestation instead

* added test to prune invalid att

* space

* added ctx

* fixed existing tests

* gaz

* gazelle
This commit is contained in:
terence tsao
2019-04-29 13:03:28 -07:00
committed by Raul Jordan
parent aa80c308ce
commit bff774de32
7 changed files with 111 additions and 25 deletions

View File

@@ -499,6 +499,7 @@ func VerifyAttestation(beaconState *pb.BeaconState, att *pb.Attestation, verifyS
}
crosslinkFromAttestation := att.Data.LatestCrosslink
crosslinkFromState := beaconState.LatestCrosslinks[shard]
if !(reflect.DeepEqual(crosslinkFromState, crosslink) ||
reflect.DeepEqual(crosslinkFromState, crosslinkFromAttestation)) {
return fmt.Errorf(

View File

@@ -6,6 +6,7 @@ go_library(
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/operations",
visibility = ["//beacon-chain:__subpackages__"],
deps = [
"//beacon-chain/core/blocks:go_default_library",
"//beacon-chain/db:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//shared/event:go_default_library",
@@ -26,6 +27,7 @@ go_test(
deps = [
"//beacon-chain/internal:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//shared/hashutil:go_default_library",
"//shared/params:go_default_library",
"//shared/testutil:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",

View File

@@ -7,6 +7,7 @@ import (
"sort"
"github.com/gogo/protobuf/proto"
"github.com/prysmaticlabs/prysm/beacon-chain/core/blocks"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/event"
@@ -112,21 +113,36 @@ func (s *Service) IncomingProcessedBlockFeed() *event.Feed {
// PendingAttestations returns the attestations that have not seen on the beacon chain, the attestations are
// returns in slot ascending order and up to MaxAttestations capacity. The attestations get
// deleted in DB after they have been retrieved.
func (s *Service) PendingAttestations() ([]*pb.Attestation, error) {
func (s *Service) PendingAttestations(ctx context.Context) ([]*pb.Attestation, error) {
var attestations []*pb.Attestation
attestationsFromDB, err := s.beaconDB.Attestations()
if err != nil {
return nil, fmt.Errorf("could not retrieve attestations from DB")
}
state, err := s.beaconDB.HeadState(ctx)
if err != nil {
return nil, fmt.Errorf("could not retrieve attestations from DB")
}
sort.Slice(attestationsFromDB, func(i, j int) bool {
return attestationsFromDB[i].Data.Slot < attestationsFromDB[j].Data.Slot
})
for i := range attestationsFromDB {
var validAttsCount uint64
for _, att := range attestationsFromDB {
// Delete the attestation if it fails to verify using head state,
// we don't want to pass the attestation to the proposer.
if err := blocks.VerifyAttestation(state, att, false /* verify signature */); err != nil {
if err := s.beaconDB.DeleteAttestation(att); err != nil {
return nil, err
}
continue
}
validAttsCount++
// Stop the max attestation number per beacon block is reached.
if uint64(i) == params.BeaconConfig().MaxAttestations {
if validAttsCount == params.BeaconConfig().MaxAttestations {
break
}
attestations = append(attestations, attestationsFromDB[i])
attestations = append(attestations, att)
}
return attestations, nil
}

View File

@@ -10,6 +10,7 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/prysmaticlabs/prysm/beacon-chain/internal"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/hashutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/testutil"
"github.com/sirupsen/logrus"
@@ -126,29 +127,80 @@ func TestRetrieveAttestations_OK(t *testing.T) {
defer internal.TeardownDB(t, beaconDB)
service := NewOpsPoolService(context.Background(), &Config{BeaconDB: beaconDB})
// Save 140 attestations for test. During 1st retrieval we should get slot:0 - slot:128 attestations,
// 2nd retrieval we should get slot:128 - slot:140 attestations.
// Max attestation config value is set to 128.
// Save 140 attestations for test. During 1st retrieval we should get slot:1 - slot:61 attestations.
// The 1st retrieval is set at slot 64.
origAttestations := make([]*pb.Attestation, 140)
for i := 0; i < len(origAttestations); i++ {
origAttestations[i] = &pb.Attestation{
Data: &pb.AttestationData{
Slot: uint64(i),
Shard: uint64(i),
Slot: params.BeaconConfig().GenesisSlot + uint64(i),
CrosslinkDataRootHash32: params.BeaconConfig().ZeroHash[:],
},
}
if err := service.beaconDB.SaveAttestation(context.Background(), origAttestations[i]); err != nil {
t.Fatalf("Failed to save attestation: %v", err)
}
}
// Test we can retrieve attestations from slot0 - slot127 (Max attestation amount).
attestations, err := service.PendingAttestations()
if err := beaconDB.SaveState(context.Background(), &pb.BeaconState{
Slot: params.BeaconConfig().GenesisSlot + 64,
LatestCrosslinks: []*pb.Crosslink{{
Epoch: params.BeaconConfig().GenesisEpoch,
CrosslinkDataRootHash32: params.BeaconConfig().ZeroHash[:]}}}); err != nil {
t.Fatal(err)
}
// Test we can retrieve attestations from slot1 - slot61.
attestations, err := service.PendingAttestations(context.Background())
if err != nil {
t.Fatalf("Could not retrieve attestations: %v", err)
}
if !reflect.DeepEqual(attestations, origAttestations[0:params.BeaconConfig().MaxAttestations]) {
t.Errorf("Retrieved attestations did not match prev generated attestations for the first %d",
params.BeaconConfig().MaxAttestations)
if !reflect.DeepEqual(attestations, origAttestations[1:61]) {
t.Error("Retrieved attestations did not match")
}
}
func TestRetrieveAttestations_PruneInvalidAtts(t *testing.T) {
beaconDB := internal.SetupDB(t)
defer internal.TeardownDB(t, beaconDB)
service := NewOpsPoolService(context.Background(), &Config{BeaconDB: beaconDB})
// Save 140 attestations for slots 0 to 139.
origAttestations := make([]*pb.Attestation, 140)
for i := 0; i < len(origAttestations); i++ {
origAttestations[i] = &pb.Attestation{
Data: &pb.AttestationData{
Slot: params.BeaconConfig().GenesisSlot + uint64(i),
CrosslinkDataRootHash32: params.BeaconConfig().ZeroHash[:],
},
}
if err := service.beaconDB.SaveAttestation(context.Background(), origAttestations[i]); err != nil {
t.Fatalf("Failed to save attestation: %v", err)
}
}
// At slot 200 only attestations up to from slot 137 to 139 are valid attestations.
if err := beaconDB.SaveState(context.Background(), &pb.BeaconState{
Slot: params.BeaconConfig().GenesisSlot + 200,
LatestCrosslinks: []*pb.Crosslink{{
Epoch: params.BeaconConfig().GenesisEpoch + 2,
CrosslinkDataRootHash32: params.BeaconConfig().ZeroHash[:]}}}); err != nil {
t.Fatal(err)
}
attestations, err := service.PendingAttestations(context.Background())
if err != nil {
t.Fatalf("Could not retrieve attestations: %v", err)
}
if !reflect.DeepEqual(attestations, origAttestations[137:]) {
t.Error("Incorrect pruned attestations")
}
// Verify the invalid attestations are deleted.
hash, err := hashutil.HashProto(origAttestations[136])
if err != nil {
t.Fatal(err)
}
if service.beaconDB.HasAttestation(hash) {
t.Error("Invalid attestation is not deleted")
}
}
@@ -161,16 +213,23 @@ func TestRemoveProcessedAttestations_Ok(t *testing.T) {
for i := 0; i < len(attestations); i++ {
attestations[i] = &pb.Attestation{
Data: &pb.AttestationData{
Slot: uint64(i),
Shard: uint64(i),
Slot: params.BeaconConfig().GenesisSlot + uint64(i),
CrosslinkDataRootHash32: params.BeaconConfig().ZeroHash[:],
},
}
if err := s.beaconDB.SaveAttestation(context.Background(), attestations[i]); err != nil {
t.Fatalf("Failed to save attestation: %v", err)
}
}
if err := db.SaveState(context.Background(), &pb.BeaconState{
Slot: params.BeaconConfig().GenesisSlot + 15,
LatestCrosslinks: []*pb.Crosslink{{
Epoch: params.BeaconConfig().GenesisEpoch,
CrosslinkDataRootHash32: params.BeaconConfig().ZeroHash[:]}}}); err != nil {
t.Fatal(err)
}
retrievedAtts, err := s.PendingAttestations()
retrievedAtts, err := s.PendingAttestations(context.Background())
if err != nil {
t.Fatalf("Could not retrieve attestations: %v", err)
}
@@ -182,7 +241,7 @@ func TestRemoveProcessedAttestations_Ok(t *testing.T) {
t.Fatalf("Could not remove pending attestations: %v", err)
}
retrievedAtts, _ = s.PendingAttestations()
retrievedAtts, _ = s.PendingAttestations(context.Background())
if len(retrievedAtts) != 0 {
t.Errorf("Attestation pool should be empty but got a length of %d", len(retrievedAtts))
}
@@ -233,8 +292,8 @@ func TestReceiveBlkRemoveOps_Ok(t *testing.T) {
for i := 0; i < len(attestations); i++ {
attestations[i] = &pb.Attestation{
Data: &pb.AttestationData{
Slot: uint64(i),
Shard: uint64(i),
Slot: params.BeaconConfig().GenesisSlot + uint64(i),
CrosslinkDataRootHash32: params.BeaconConfig().ZeroHash[:],
},
}
if err := s.beaconDB.SaveAttestation(context.Background(), attestations[i]); err != nil {
@@ -242,7 +301,15 @@ func TestReceiveBlkRemoveOps_Ok(t *testing.T) {
}
}
atts, _ := s.PendingAttestations()
if err := db.SaveState(context.Background(), &pb.BeaconState{
Slot: params.BeaconConfig().GenesisSlot + 15,
LatestCrosslinks: []*pb.Crosslink{{
Epoch: params.BeaconConfig().GenesisEpoch,
CrosslinkDataRootHash32: params.BeaconConfig().ZeroHash[:]}}}); err != nil {
t.Fatal(err)
}
atts, _ := s.PendingAttestations(context.Background())
if len(atts) != len(attestations) {
t.Errorf("Attestation pool should be %d but got a length of %d",
len(attestations), len(atts))
@@ -259,7 +326,7 @@ func TestReceiveBlkRemoveOps_Ok(t *testing.T) {
t.Error(err)
}
atts, _ = s.PendingAttestations()
atts, _ = s.PendingAttestations(context.Background())
if len(atts) != 0 {
t.Errorf("Attestation pool should be empty but got a length of %d", len(atts))
}

View File

@@ -101,7 +101,7 @@ func (ps *ProposerServer) PendingAttestations(ctx context.Context, req *pb.Pendi
if err != nil {
return nil, fmt.Errorf("could not retrieve beacon state: %v", err)
}
atts, err := ps.operationService.PendingAttestations()
atts, err := ps.operationService.PendingAttestations(ctx)
if err != nil {
return nil, fmt.Errorf("could not retrieve pending attestations from operations service: %v", err)
}

View File

@@ -39,7 +39,7 @@ type chainService interface {
}
type operationService interface {
PendingAttestations() ([]*pbp2p.Attestation, error)
PendingAttestations(ctx context.Context) ([]*pbp2p.Attestation, error)
HandleAttestations(context.Context, proto.Message) error
IncomingAttFeed() *event.Feed
}

View File

@@ -47,7 +47,7 @@ func (ms *mockOperationService) HandleAttestations(_ context.Context, _ proto.Me
return nil
}
func (ms *mockOperationService) PendingAttestations() ([]*pb.Attestation, error) {
func (ms *mockOperationService) PendingAttestations(_ context.Context) ([]*pb.Attestation, error) {
if ms.pendingAttestations != nil {
return ms.pendingAttestations, nil
}