diff --git a/beacon-chain/db/attestation.go b/beacon-chain/db/attestation.go index e909898879..ad772c944a 100644 --- a/beacon-chain/db/attestation.go +++ b/beacon-chain/db/attestation.go @@ -24,6 +24,17 @@ func (db *BeaconDB) SaveAttestation(attestation *pb.Attestation) error { }) } +// DeleteAttestation deletes the attestation record into the beacon chain db. +func (db *BeaconDB) DeleteAttestation(attestation *pb.Attestation) error { + hash := att.Key(attestation.Data) + + return db.update(func(tx *bolt.Tx) error { + a := tx.Bucket(attestationBucket) + + return a.Delete(hash[:]) + }) +} + // Attestation retrieves an attestation record from the db using its hash. func (db *BeaconDB) Attestation(hash [32]byte) (*pb.Attestation, error) { var attestation *pb.Attestation @@ -43,6 +54,29 @@ func (db *BeaconDB) Attestation(hash [32]byte) (*pb.Attestation, error) { return attestation, err } +// Attestations retrieves all the attestation records from the db. +// These are the attestations that have not been seen on the beacon chain. +func (db *BeaconDB) Attestations() ([]*pb.Attestation, error) { + var attestations []*pb.Attestation + err := db.view(func(tx *bolt.Tx) error { + a := tx.Bucket(attestationBucket) + + if err := a.ForEach(func(k, v []byte) error { + attestation, err := createAttestation(v) + if err != nil { + return err + } + attestations = append(attestations, attestation) + return nil + }); err != nil { + return err + } + return nil + }) + + return attestations, err +} + // HasAttestation checks if the attestation exists. func (db *BeaconDB) HasAttestation(hash [32]byte) bool { exists := false diff --git a/beacon-chain/db/attestation_test.go b/beacon-chain/db/attestation_test.go index 98548f23fb..53aa2a1035 100644 --- a/beacon-chain/db/attestation_test.go +++ b/beacon-chain/db/attestation_test.go @@ -2,6 +2,8 @@ package db import ( "bytes" + "reflect" + "sort" "testing" "github.com/gogo/protobuf/proto" @@ -43,6 +45,72 @@ func TestSaveAndRetrieveAttestation(t *testing.T) { } } +func TestRetrieveAttestations(t *testing.T) { + db := setupDB(t) + defer teardownDB(t, db) + + // Generate 100 unique attestations to save in DB. + attestations := make([]*pb.Attestation, 100) + for i := 0; i < len(attestations); i++ { + attestations[i] = &pb.Attestation{ + Data: &pb.AttestationData{ + Slot: uint64(i), + Shard: uint64(i), + }, + } + if err := db.SaveAttestation(attestations[i]); err != nil { + t.Fatalf("Failed to save attestation: %v", err) + } + } + + retrievedAttestations, err := db.Attestations() + if err != nil { + t.Fatalf("Could not retrieve attestations: %v", err) + } + + // Sort the retrieved attestations based on slot ordering for comparison. + sort.Slice(retrievedAttestations, func(i, j int) bool { + return retrievedAttestations[i].Data.Slot < retrievedAttestations[j].Data.Slot + }) + if !reflect.DeepEqual(retrievedAttestations, attestations) { + t.Log("Retrieved attestations did not match generated attestations") + } +} + +func TestDeleteAttestation(t *testing.T) { + db := setupDB(t) + defer teardownDB(t, db) + + a := &pb.Attestation{ + Data: &pb.AttestationData{ + Slot: 0, + Shard: 0, + }, + } + + if err := db.SaveAttestation(a); err != nil { + t.Fatalf("Could not save attestation: %v", err) + } + + aHash := att.Key(a.GetData()) + aPrime, err := db.Attestation(aHash) + if err != nil { + t.Fatalf("Could not call Attestation: %v", err) + } + + if !reflect.DeepEqual(aPrime, a) { + t.Errorf("Saved attestation and retrieved attestation are not equal") + } + + if err := db.DeleteAttestation(a); err != nil { + t.Fatalf("Could not delete attestation: %v", err) + } + + if db.HasAttestation(aHash) { + t.Error("Deleted attestation still there") + } +} + func TestNilAttestation(t *testing.T) { db := setupDB(t) defer teardownDB(t, db) diff --git a/beacon-chain/operations/BUILD.bazel b/beacon-chain/operations/BUILD.bazel index 568efd1c7c..e72274238d 100644 --- a/beacon-chain/operations/BUILD.bazel +++ b/beacon-chain/operations/BUILD.bazel @@ -10,6 +10,7 @@ go_library( "//proto/beacon/p2p/v1:go_default_library", "//shared/event:go_default_library", "//shared/hashutil:go_default_library", + "//shared/params:go_default_library", "@com_github_sirupsen_logrus//:go_default_library", ], ) @@ -22,6 +23,7 @@ go_test( "//beacon-chain/internal:go_default_library", "//proto/beacon/p2p/v1:go_default_library", "//shared/hashutil:go_default_library", + "//shared/params:go_default_library", "//shared/testutil:go_default_library", "@com_github_sirupsen_logrus//:go_default_library", "@com_github_sirupsen_logrus//hooks/test:go_default_library", diff --git a/beacon-chain/operations/service.go b/beacon-chain/operations/service.go index 67fb495148..be0ecf7d46 100644 --- a/beacon-chain/operations/service.go +++ b/beacon-chain/operations/service.go @@ -3,11 +3,14 @@ package operations import ( "context" + "fmt" + "sort" "github.com/prysmaticlabs/prysm/beacon-chain/db" pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" "github.com/prysmaticlabs/prysm/shared/event" "github.com/prysmaticlabs/prysm/shared/hashutil" + "github.com/prysmaticlabs/prysm/shared/params" "github.com/sirupsen/logrus" ) @@ -82,6 +85,32 @@ func (s *Service) IncomingAttFeed() *event.Feed { return s.incomingAttFeed } +// Attestations returns the attestations that have not seen on the beacon chain, the attestations are +// returns in slot ascending order and up to MaxAttestations capacity. The attestations get +// deleted in DB after they have been retrieved. +func (s *Service) Attestations() ([]*pb.Attestation, error) { + var attestations []*pb.Attestation + attestationsFromDB, err := s.beaconDB.Attestations() + if err != nil { + return nil, fmt.Errorf("could not retrieve attestations from DB") + } + sort.Slice(attestationsFromDB, func(i, j int) bool { + return attestationsFromDB[i].Data.Slot < attestationsFromDB[j].Data.Slot + }) + for i, attestation := range attestationsFromDB { + // Stop the max attestation number per beacon block is reached. + if uint64(i) == params.BeaconConfig().MaxAttestations { + break + } + attestations = append(attestations, attestation) + // Delete attestation from DB after retrieval. + if err := s.beaconDB.DeleteAttestation(attestationsFromDB[i]); err != nil { + return nil, fmt.Errorf("could not delete attestation %v", attestationsFromDB[i]) + } + } + return attestations, nil +} + // saveOperations saves the newly broadcasted beacon block operations // that was received from sync service. func (s *Service) saveOperations() { diff --git a/beacon-chain/operations/service_test.go b/beacon-chain/operations/service_test.go index c6269e027e..e6770f13e3 100644 --- a/beacon-chain/operations/service_test.go +++ b/beacon-chain/operations/service_test.go @@ -4,11 +4,13 @@ import ( "context" "errors" "fmt" + "reflect" "testing" "github.com/prysmaticlabs/prysm/beacon-chain/internal" pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" "github.com/prysmaticlabs/prysm/shared/hashutil" + "github.com/prysmaticlabs/prysm/shared/params" "github.com/prysmaticlabs/prysm/shared/testutil" "github.com/sirupsen/logrus" logTest "github.com/sirupsen/logrus/hooks/test" @@ -105,3 +107,53 @@ func TestIncomingAttestation_Ok(t *testing.T) { want := fmt.Sprintf("Attestation %#x saved in db", hash) testutil.AssertLogsContain(t, hook, want) } + +func TestRetrieveAttestations_Ok(t *testing.T) { + beaconDB := internal.SetupDB(t) + defer internal.TeardownDB(t, beaconDB) + service := NewOperationService(context.Background(), &Config{BeaconDB: beaconDB}) + + // Save 140 attestations for test. During 1st retrieval we should get slot:0 - slot:128 attestations, + // 2nd retrieval we should get slot:128 - slot:140 attestations. + // Max attestation config value is set to 128. + origAttestations := make([]*pb.Attestation, 140) + for i := 0; i < len(origAttestations); i++ { + origAttestations[i] = &pb.Attestation{ + Data: &pb.AttestationData{ + Slot: uint64(i), + Shard: uint64(i), + }, + } + if err := service.beaconDB.SaveAttestation(origAttestations[i]); err != nil { + t.Fatalf("Failed to save attestation: %v", err) + } + } + // Test we can retrieve attestations from slot0 - slot127 (Max attestation amount). + attestations, err := service.Attestations() + if err != nil { + t.Fatalf("Could not retrieve attestations: %v", err) + } + if !reflect.DeepEqual(attestations, origAttestations[0:params.BeaconConfig().MaxAttestations]) { + t.Errorf("Retrieved attestations did not match prev generated attestations for the first %d", + params.BeaconConfig().MaxAttestations) + } + + // Test we can retrieve attestations from slot128 - slot139. + attestations, err = service.Attestations() + if err != nil { + t.Fatalf("Could not retrieve attestations: %v", err) + } + if !reflect.DeepEqual(attestations, origAttestations[params.BeaconConfig().MaxAttestations:]) { + t.Errorf("Retrieved attestations did not match prev generated attestations for the first %d", + params.BeaconConfig().MaxAttestations) + } + + // Verify attestation pool is empty now we have retrieved everything. + attestations, err = service.Attestations() + if err != nil { + t.Fatalf("Could not retrieve attestations: %v", err) + } + if len(attestations) != 0 { + t.Errorf("Attestation pool should be empty but got a length of %d", len(attestations)) + } +}