diff --git a/beacon-chain/node/node.go b/beacon-chain/node/node.go index 9eadab0cb1..86388d625d 100644 --- a/beacon-chain/node/node.go +++ b/beacon-chain/node/node.go @@ -277,7 +277,6 @@ func (b *BeaconNode) registerBlockchainService(ctx *cli.Context) error { func (b *BeaconNode) registerOperationService(ctx *cli.Context) error { operationService := operations.NewService(context.Background(), &operations.Config{ BeaconDB: b.db, - P2P: b.fetchP2P(ctx), }) return b.services.RegisterService(operationService) diff --git a/beacon-chain/operations/BUILD.bazel b/beacon-chain/operations/BUILD.bazel index a39341dcb8..6e2d8a2700 100644 --- a/beacon-chain/operations/BUILD.bazel +++ b/beacon-chain/operations/BUILD.bazel @@ -7,11 +7,8 @@ go_library( visibility = ["//beacon-chain:__subpackages__"], deps = [ "//beacon-chain/core/blocks:go_default_library", - "//beacon-chain/core/helpers:go_default_library", "//beacon-chain/core/state:go_default_library", "//beacon-chain/db:go_default_library", - "//beacon-chain/p2p:go_default_library", - "//proto/beacon/p2p/v1:go_default_library", "//proto/eth/v1alpha1:go_default_library", "//shared/bls:go_default_library", "//shared/event:go_default_library", @@ -43,7 +40,6 @@ go_test( "//shared/hashutil:go_default_library", "//shared/params:go_default_library", "//shared/testutil:go_default_library", - "@com_github_gogo_protobuf//proto:go_default_library", "@com_github_prysmaticlabs_go_bitfield//:go_default_library", "@com_github_prysmaticlabs_go_ssz//:go_default_library", "@com_github_sirupsen_logrus//hooks/test:go_default_library", diff --git a/beacon-chain/operations/service.go b/beacon-chain/operations/service.go index 381e399fcc..827647889f 100644 --- a/beacon-chain/operations/service.go +++ b/beacon-chain/operations/service.go @@ -13,11 +13,8 @@ import ( "github.com/pkg/errors" "github.com/prysmaticlabs/go-ssz" "github.com/prysmaticlabs/prysm/beacon-chain/core/blocks" - "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/beacon-chain/core/state" "github.com/prysmaticlabs/prysm/beacon-chain/db" - "github.com/prysmaticlabs/prysm/beacon-chain/p2p" - pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" ethpb "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1" "github.com/prysmaticlabs/prysm/shared/bls" "github.com/prysmaticlabs/prysm/shared/event" @@ -45,8 +42,6 @@ type Handler interface { // OperationFeeds inteface defines the informational feeds from the operations // service. type OperationFeeds interface { - IncomingAttFeed() *event.Feed - IncomingExitFeed() *event.Feed IncomingProcessedBlockFeed() *event.Feed } @@ -56,13 +51,8 @@ type Service struct { ctx context.Context cancel context.CancelFunc beaconDB db.Database - incomingExitFeed *event.Feed - incomingValidatorExits chan *ethpb.VoluntaryExit - incomingAttFeed *event.Feed - incomingAtt chan *ethpb.Attestation incomingProcessedBlockFeed *event.Feed incomingProcessedBlock chan *ethpb.BeaconBlock - p2p p2p.Broadcaster error error attestationLockCache *ccache.Cache } @@ -70,7 +60,6 @@ type Service struct { // Config options for the service. type Config struct { BeaconDB db.Database - P2P p2p.Broadcaster } // NewService instantiates a new operation service instance that will @@ -81,13 +70,8 @@ func NewService(ctx context.Context, cfg *Config) *Service { ctx: ctx, cancel: cancel, beaconDB: cfg.BeaconDB, - incomingExitFeed: new(event.Feed), - incomingValidatorExits: make(chan *ethpb.VoluntaryExit, params.BeaconConfig().DefaultBufferSize), - incomingAttFeed: new(event.Feed), - incomingAtt: make(chan *ethpb.Attestation, params.BeaconConfig().DefaultBufferSize), incomingProcessedBlockFeed: new(event.Feed), incomingProcessedBlock: make(chan *ethpb.BeaconBlock, params.BeaconConfig().DefaultBufferSize), - p2p: cfg.P2P, attestationLockCache: ccache.New(ccache.Configure()), } } @@ -95,7 +79,6 @@ func NewService(ctx context.Context, cfg *Config) *Service { // Start an beacon block operation pool service's main event loop. func (s *Service) Start() { log.Info("Starting service") - go s.saveOperations() go s.removeOperations() } @@ -115,18 +98,6 @@ func (s *Service) Status() error { return nil } -// IncomingExitFeed returns a feed that any service can send incoming p2p exits object into. -// The beacon block operation pool service will subscribe to this feed in order to relay incoming exits. -func (s *Service) IncomingExitFeed() *event.Feed { - return s.incomingExitFeed -} - -// IncomingAttFeed returns a feed that any service can send incoming p2p attestations into. -// The beacon block operation pool service will subscribe to this feed in order to relay incoming attestations. -func (s *Service) IncomingAttFeed() *event.Feed { - return s.incomingAttFeed -} - // IncomingProcessedBlockFeed returns a feed that any service can send incoming p2p beacon blocks into. // The beacon block operation pool service will subscribe to this feed in order to receive incoming beacon blocks. func (s *Service) IncomingProcessedBlockFeed() *event.Feed { @@ -155,7 +126,7 @@ func (s *Service) retrieveLock(key [32]byte) *sync.Mutex { // capacity. The attestations get deleted in DB after they have been retrieved. func (s *Service) AttestationPool(ctx context.Context, requestedSlot uint64) ([]*ethpb.Attestation, error) { var attestations []*ethpb.Attestation - attestationsFromDB, err := s.beaconDB.Attestations(ctx, nil /*filter*/) + atts, err := s.beaconDB.Attestations(ctx, nil /*filter*/) if err != nil { return nil, errors.New("could not retrieve attestations from DB") } @@ -164,24 +135,20 @@ func (s *Service) AttestationPool(ctx context.Context, requestedSlot uint64) ([] return nil, errors.New("could not retrieve attestations from DB") } - bState, err = state.ProcessSlots(ctx, bState, requestedSlot) - if err != nil { - return nil, errors.Wrapf(err, "could not process slots up to %d", requestedSlot) + if bState.Slot < requestedSlot { + bState, err = state.ProcessSlots(ctx, bState, requestedSlot) + if err != nil { + return nil, errors.Wrapf(err, "could not process slots up to %d", requestedSlot) + } } - sort.Slice(attestationsFromDB, func(i, j int) bool { - return attestationsFromDB[i].Data.Crosslink.Shard < attestationsFromDB[j].Data.Crosslink.Shard + sort.Slice(atts, func(i, j int) bool { + return atts[i].Data.Target.Epoch < atts[j].Data.Target.Epoch }) var validAttsCount uint64 - for _, att := range attestationsFromDB { - slot, err := helpers.AttestationDataSlot(bState, att.Data) - if err != nil { - return nil, errors.Wrap(err, "could not get attestation slot") - } - // Delete the attestation if the attestation is one epoch older than head state, - // we don't want to pass these attestations to RPC for proposer to include. - if slot+params.BeaconConfig().SlotsPerEpoch <= bState.Slot { + for _, att := range atts { + if _, err = blocks.ProcessAttestation(bState, att); err != nil { hash, err := ssz.HashTreeRoot(att) if err != nil { return nil, err @@ -189,7 +156,6 @@ func (s *Service) AttestationPool(ctx context.Context, requestedSlot uint64) ([] if err := s.beaconDB.DeleteAttestation(ctx, hash); err != nil { return nil, err } - continue } validAttsCount++ @@ -197,39 +163,11 @@ func (s *Service) AttestationPool(ctx context.Context, requestedSlot uint64) ([] if validAttsCount == params.BeaconConfig().MaxAttestations { break } - attestations = append(attestations, att) } return attestations, nil } -// saveOperations saves the newly broadcasted beacon block operations -// that was received from sync service. -func (s *Service) saveOperations() { - // TODO(1438): Add rest of operations (slashings, attestation, exists...etc) - incomingSub := s.incomingExitFeed.Subscribe(s.incomingValidatorExits) - defer incomingSub.Unsubscribe() - incomingAttSub := s.incomingAttFeed.Subscribe(s.incomingAtt) - defer incomingAttSub.Unsubscribe() - - for { - ctx := context.TODO() - select { - case <-incomingSub.Err(): - log.Debug("Subscriber closed, exiting goroutine") - return - case <-s.ctx.Done(): - log.Debug("operations service context closed, exiting save goroutine") - return - // Listen for a newly received incoming exit from the sync service. - case exit := <-s.incomingValidatorExits: - handler.SafelyHandleMessage(ctx, s.HandleValidatorExits, exit) - case attestation := <-s.incomingAtt: - handler.SafelyHandleMessage(ctx, s.HandleAttestation, attestation) - } - } -} - // HandleValidatorExits processes a validator exit operation. func (s *Service) HandleValidatorExits(ctx context.Context, message proto.Message) error { ctx, span := trace.StartSpan(ctx, "operations.HandleValidatorExits") @@ -275,10 +213,6 @@ func (s *Service) HandleAttestation(ctx context.Context, message proto.Message) } } - if err := blocks.VerifyAttestation(bState, attestation); err != nil { - return err - } - incomingAttBits := attestation.AggregationBits if s.beaconDB.HasAttestation(ctx, root) { dbAtt, err := s.beaconDB.Attestation(ctx, root) @@ -339,13 +273,6 @@ func (s *Service) handleProcessedBlock(ctx context.Context, message proto.Messag if err := s.removeAttestationsFromPool(ctx, block.Body.Attestations); err != nil { return errors.Wrap(err, "could not remove processed attestations from DB") } - state, err := s.beaconDB.HeadState(ctx) - if err != nil { - return errors.New("could not retrieve attestations from DB") - } - if err := s.removeEpochOldAttestations(ctx, state); err != nil { - return errors.Wrapf(err, "could not remove old attestations from DB at slot %d", block.Slot) - } return nil } @@ -367,28 +294,3 @@ func (s *Service) removeAttestationsFromPool(ctx context.Context, attestations [ } return nil } - -// removeEpochOldAttestations removes attestations that's older than one epoch length from current slot. -func (s *Service) removeEpochOldAttestations(ctx context.Context, beaconState *pb.BeaconState) error { - attestations, err := s.beaconDB.Attestations(ctx, nil /*filter*/) - if err != nil { - return err - } - for _, a := range attestations { - slot, err := helpers.AttestationDataSlot(beaconState, a.Data) - if err != nil { - return errors.Wrap(err, "could not get attestation slot") - } - // Remove attestation from DB if it's one epoch older than slot. - if slot-params.BeaconConfig().SlotsPerEpoch >= slot { - hash, err := ssz.HashTreeRoot(a) - if err != nil { - return err - } - if err := s.beaconDB.DeleteAttestation(ctx, hash); err != nil { - return err - } - } - } - return nil -} diff --git a/beacon-chain/operations/service_test.go b/beacon-chain/operations/service_test.go index 2335d2f3ed..a79131ee93 100644 --- a/beacon-chain/operations/service_test.go +++ b/beacon-chain/operations/service_test.go @@ -6,11 +6,9 @@ import ( "errors" "fmt" "reflect" - "sort" "sync" "testing" - "github.com/gogo/protobuf/proto" "github.com/prysmaticlabs/go-bitfield" "github.com/prysmaticlabs/go-ssz" "github.com/prysmaticlabs/prysm/beacon-chain/core/blocks" @@ -29,12 +27,6 @@ import ( var _ = OperationFeeds(&Service{}) var _ = Pool(&Service{}) -type mockBroadcaster struct{} - -func (mb *mockBroadcaster) Broadcast(_ context.Context, _ proto.Message) error { - return nil -} - func TestStop_OK(t *testing.T) { hook := logTest.NewGlobal() opsService := NewService(context.Background(), &Config{}) @@ -87,10 +79,8 @@ func TestIncomingExits_Ok(t *testing.T) { func TestHandleAttestation_Saves_NewAttestation(t *testing.T) { beaconDB := dbutil.SetupDB(t) defer dbutil.TeardownDB(t, beaconDB) - broadcaster := &mockBroadcaster{} service := NewService(context.Background(), &Config{ BeaconDB: beaconDB, - P2P: broadcaster, }) deposits, privKeys := testutil.SetupInitialDeposits(t, 100) @@ -174,10 +164,8 @@ func TestHandleAttestation_Aggregates_LargeNumValidators(t *testing.T) { beaconDB := dbutil.SetupDB(t) defer dbutil.TeardownDB(t, beaconDB) ctx := context.Background() - broadcaster := &mockBroadcaster{} opsSrv := NewService(ctx, &Config{ BeaconDB: beaconDB, - P2P: broadcaster, }) // First, we create a common attestation data. @@ -291,10 +279,8 @@ func TestHandleAttestation_Skips_PreviouslyAggregatedAttestations(t *testing.T) defer dbutil.TeardownDB(t, beaconDB) ctx := context.Background() helpers.ClearAllCaches() - broadcaster := &mockBroadcaster{} service := NewService(context.Background(), &Config{ BeaconDB: beaconDB, - P2P: broadcaster, }) deposits, privKeys := testutil.SetupInitialDeposits(t, 200) @@ -477,32 +463,55 @@ func TestRetrieveAttestations_OK(t *testing.T) { defer dbutil.TeardownDB(t, beaconDB) service := NewService(context.Background(), &Config{BeaconDB: beaconDB}) - // Save 140 attestations for test. During 1st retrieval we should get slot:1 - slot:61 attestations. - // The 1st retrieval is set at slot 64. - origAttestations := make([]*ethpb.Attestation, 140) - for i := 0; i < len(origAttestations); i++ { - origAttestations[i] = ðpb.Attestation{ - Data: ðpb.AttestationData{ - Crosslink: ðpb.Crosslink{ - Shard: uint64(i), - }, - Source: ðpb.Checkpoint{}, - Target: ðpb.Checkpoint{}, - }, - } - if err := service.beaconDB.SaveAttestation(context.Background(), origAttestations[i]); err != nil { - t.Fatalf("Failed to save attestation: %v", err) - } + deposits, _ := testutil.SetupInitialDeposits(t, 100) + beaconState, err := state.GenesisBeaconState(deposits, uint64(0), ðpb.Eth1Data{}) + if err != nil { + t.Fatal(err) } + + aggBits := bitfield.NewBitlist(1) + aggBits.SetBitAt(1, true) + custodyBits := bitfield.NewBitlist(1) + att := ðpb.Attestation{ + Data: ðpb.AttestationData{ + Source: ðpb.Checkpoint{Epoch: 0, Root: []byte("hello-world")}, + Target: ðpb.Checkpoint{Epoch: 0}, + Crosslink: ðpb.Crosslink{ + Shard: 0, + StartEpoch: 0, + }, + }, + AggregationBits: aggBits, + CustodyBits: custodyBits, + } + zeroSig := [96]byte{} + att.Signature = zeroSig[:] + + beaconState.Slot += params.BeaconConfig().MinAttestationInclusionDelay + beaconState.CurrentCrosslinks = []*ethpb.Crosslink{ + { + Shard: 0, + StartEpoch: 0, + }, + } + beaconState.CurrentJustifiedCheckpoint.Root = []byte("hello-world") + beaconState.CurrentEpochAttestations = []*pb.PendingAttestation{} + + encoded, err := ssz.HashTreeRoot(beaconState.CurrentCrosslinks[0]) + if err != nil { + t.Fatal(err) + } + att.Data.Crosslink.ParentRoot = encoded[:] + att.Data.Crosslink.DataRoot = params.BeaconConfig().ZeroHash[:] + if err := beaconDB.SaveAttestation(context.Background(), att); err != nil { + t.Fatal(err) + } + headBlockRoot := [32]byte{1, 2, 3} if err := beaconDB.SaveHeadBlockRoot(context.Background(), headBlockRoot); err != nil { t.Fatal(err) } - if err := beaconDB.SaveState(context.Background(), &pb.BeaconState{ - Slot: 64, - CurrentCrosslinks: []*ethpb.Crosslink{{ - StartEpoch: 0, - DataRoot: params.BeaconConfig().ZeroHash[:]}}}, headBlockRoot); err != nil { + if err := beaconDB.SaveState(context.Background(), beaconState, headBlockRoot); err != nil { t.Fatal(err) } // Test we can retrieve attestations from slot1 - slot61. @@ -510,10 +519,8 @@ func TestRetrieveAttestations_OK(t *testing.T) { if err != nil { t.Fatalf("Could not retrieve attestations: %v", err) } - sort.Slice(attestations, func(i, j int) bool { - return attestations[i].Data.Crosslink.Shard < attestations[j].Data.Crosslink.Shard - }) - if !reflect.DeepEqual(attestations, origAttestations[0:127]) { + + if !reflect.DeepEqual(attestations[0], att) { t.Error("Retrieved attestations did not match") } } @@ -558,8 +565,7 @@ func TestRetrieveAttestations_PruneInvalidAtts(t *testing.T) { if err != nil { t.Fatalf("Could not retrieve attestations: %v", err) } - - if !reflect.DeepEqual(attestations, origAttestations[137:]) { + if len(attestations) != 127 { t.Error("Incorrect pruned attestations") } @@ -605,21 +611,16 @@ func TestRemoveProcessedAttestations_Ok(t *testing.T) { t.Fatal(err) } - retrievedAtts, err := s.AttestationPool(context.Background(), 15) - if err != nil { - t.Fatalf("Could not retrieve attestations: %v", err) - } - if !reflect.DeepEqual(attestations, retrievedAtts) { - t.Error("Retrieved attestations did not match prev generated attestations") - } - if err := s.removeAttestationsFromPool(context.Background(), attestations); err != nil { t.Fatalf("Could not remove attestations: %v", err) } - retrievedAtts, _ = s.AttestationPool(context.Background(), 15) - if len(retrievedAtts) != 0 { - t.Errorf("Attestation pool should be empty but got a length of %d", len(retrievedAtts)) + atts, err := s.AttestationPool(context.Background(), 15) + if err != nil { + t.Fatal(err) + } + if len(atts) != 0 { + t.Errorf("Attestation pool should be empty but got a length of %d", len(atts)) } } diff --git a/beacon-chain/rpc/proposer_server.go b/beacon-chain/rpc/proposer_server.go index fa5f33256f..768380ddc4 100644 --- a/beacon-chain/rpc/proposer_server.go +++ b/beacon-chain/rpc/proposer_server.go @@ -22,7 +22,6 @@ import ( "github.com/prysmaticlabs/prysm/shared/hashutil" "github.com/prysmaticlabs/prysm/shared/params" "github.com/prysmaticlabs/prysm/shared/trieutil" - "github.com/sirupsen/logrus" "go.opencensus.io/trace" ) @@ -70,7 +69,7 @@ func (ps *ProposerServer) RequestBlock(ctx context.Context, req *pb.BlockRequest } // Pack aggregated attestations which have not been included in the beacon chain. - attestations, err := ps.attestations(ctx, req.Slot) + atts, err := ps.pool.AttestationPool(ctx, req.Slot) if err != nil { return nil, errors.Wrap(err, "could not get pending attestations") } @@ -87,7 +86,7 @@ func (ps *ProposerServer) RequestBlock(ctx context.Context, req *pb.BlockRequest Body: ðpb.BeaconBlockBody{ Eth1Data: eth1Data, Deposits: deposits, - Attestations: attestations, + Attestations: atts, RandaoReveal: req.RandaoReveal, // TODO(2766): Implement rest of the retrievals for beacon block operations Transfers: []*ethpb.Transfer{}, @@ -128,70 +127,6 @@ func (ps *ProposerServer) ProposeBlock(ctx context.Context, blk *ethpb.BeaconBlo return &pb.ProposeResponse{BlockRoot: root[:]}, nil } -// attestations retrieves aggregated attestations kept in the beacon node's operations pool which have -// not yet been included into the beacon chain. Proposers include these pending attestations in their -// proposed blocks when performing their responsibility. If desired, callers can choose to filter pending -// attestations which are ready for inclusion. That is, attestations that satisfy: -// attestation.slot + MIN_ATTESTATION_INCLUSION_DELAY <= state.slot. -func (ps *ProposerServer) attestations(ctx context.Context, expectedSlot uint64) ([]*ethpb.Attestation, error) { - beaconState := ps.headFetcher.HeadState() - atts, err := ps.pool.AttestationPool(ctx, expectedSlot) - if err != nil { - return nil, errors.Wrap(err, "could not retrieve pending attestations from operations service") - } - - // advance slot, if it is behind - if beaconState.Slot < expectedSlot { - beaconState, err = state.ProcessSlots(ctx, beaconState, expectedSlot) - if err != nil { - return nil, err - } - } - - var attsReadyForInclusion []*ethpb.Attestation - for _, att := range atts { - slot, err := helpers.AttestationDataSlot(beaconState, att.Data) - if err != nil { - return nil, errors.Wrap(err, "could not get attestation slot") - } - if slot+params.BeaconConfig().MinAttestationInclusionDelay <= beaconState.Slot && - beaconState.Slot <= slot+params.BeaconConfig().SlotsPerEpoch { - attsReadyForInclusion = append(attsReadyForInclusion, att) - } - } - - validAtts := make([]*ethpb.Attestation, 0, len(attsReadyForInclusion)) - for _, att := range attsReadyForInclusion { - slot, err := helpers.AttestationDataSlot(beaconState, att.Data) - if err != nil { - return nil, errors.Wrap(err, "could not get attestation slot") - } - - if _, err := blocks.ProcessAttestationNoVerify(beaconState, att); err != nil { - if ctx.Err() != nil { - return nil, ctx.Err() - } - - log.WithError(err).WithFields(logrus.Fields{ - "slot": slot, - "headRoot": fmt.Sprintf("%#x", bytesutil.Trunc(att.Data.BeaconBlockRoot))}).Info( - "Deleting failed pending attestation from DB") - - root, err := ssz.HashTreeRoot(att.Data) - if err != nil { - return nil, err - } - if err := ps.beaconDB.DeleteAttestation(ctx, root); err != nil { - return nil, errors.Wrap(err, "could not delete failed attestation") - } - continue - } - validAtts = append(validAtts, att) - } - - return validAtts, nil -} - // eth1Data determines the appropriate eth1data for a block proposal. The algorithm for this method // is as follows: // - Determine the timestamp for the start slot for the eth1 voting period. diff --git a/beacon-chain/rpc/proposer_server_test.go b/beacon-chain/rpc/proposer_server_test.go index a1eea1d70e..f39d3b9bd6 100644 --- a/beacon-chain/rpc/proposer_server_test.go +++ b/beacon-chain/rpc/proposer_server_test.go @@ -2,14 +2,11 @@ package rpc import ( "context" - "crypto/rand" "math/big" - "reflect" "strings" "testing" "github.com/gogo/protobuf/proto" - "github.com/prysmaticlabs/go-bitfield" "github.com/prysmaticlabs/go-ssz" mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing" "github.com/prysmaticlabs/prysm/beacon-chain/cache/depositcache" @@ -17,11 +14,9 @@ import ( "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/beacon-chain/core/state" dbutil "github.com/prysmaticlabs/prysm/beacon-chain/db/testing" - mockOps "github.com/prysmaticlabs/prysm/beacon-chain/operations/testing" mockPOW "github.com/prysmaticlabs/prysm/beacon-chain/powchain/testing" pbp2p "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" ethpb "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1" - "github.com/prysmaticlabs/prysm/shared/bls" "github.com/prysmaticlabs/prysm/shared/hashutil" "github.com/prysmaticlabs/prysm/shared/params" "github.com/prysmaticlabs/prysm/shared/testutil" @@ -161,357 +156,6 @@ func TestComputeStateRoot_OK(t *testing.T) { } } -func TestPendingAttestations_FiltersWithinInclusionDelay(t *testing.T) { - helpers.ClearAllCaches() - // This test breaks if it doesnt use mainnet config - params.OverrideBeaconConfig(params.MainnetConfig()) - defer params.OverrideBeaconConfig(params.MinimalSpecConfig()) - validators := make([]*ethpb.Validator, params.BeaconConfig().MinGenesisActiveValidatorCount/8) - for i := 0; i < len(validators); i++ { - validators[i] = ðpb.Validator{ - ExitEpoch: params.BeaconConfig().FarFutureEpoch, - } - } - - crosslinks := make([]*ethpb.Crosslink, params.BeaconConfig().ShardCount) - for i := 0; i < len(crosslinks); i++ { - crosslinks[i] = ðpb.Crosslink{ - StartEpoch: 1, - DataRoot: params.BeaconConfig().ZeroHash[:], - } - } - - stateSlot := uint64(100) - beaconState := &pbp2p.BeaconState{ - Slot: stateSlot, - Fork: &pbp2p.Fork{ - CurrentVersion: params.BeaconConfig().GenesisForkVersion, - PreviousVersion: params.BeaconConfig().GenesisForkVersion, - }, - Validators: validators, - CurrentCrosslinks: crosslinks, - PreviousCrosslinks: crosslinks, - StartShard: 100, - RandaoMixes: make([][]byte, params.BeaconConfig().EpochsPerHistoricalVector), - ActiveIndexRoots: make([][]byte, params.BeaconConfig().EpochsPerHistoricalVector), - FinalizedCheckpoint: ðpb.Checkpoint{}, - PreviousJustifiedCheckpoint: ðpb.Checkpoint{}, - CurrentJustifiedCheckpoint: ðpb.Checkpoint{}, - } - - encoded, err := ssz.HashTreeRoot(beaconState.PreviousCrosslinks[0]) - if err != nil { - t.Fatal(err) - } - - currentEpoch := helpers.CurrentEpoch(beaconState) - activeCount, err := helpers.ActiveValidatorCount(beaconState, currentEpoch) - if err != nil { - t.Error(err) - } - committeeCount, err := helpers.CommitteeCount(beaconState, currentEpoch) - if err != nil { - t.Error(err) - } - aggBits := bitfield.NewBitlist(activeCount / committeeCount) - for i := uint64(0); i < aggBits.Len(); i++ { - aggBits.SetBitAt(i, true) - } - custodyBits := bitfield.NewBitlist(activeCount / committeeCount) - att := ðpb.Attestation{ - Data: ðpb.AttestationData{ - Crosslink: ðpb.Crosslink{ - Shard: beaconState.Slot - params.BeaconConfig().MinAttestationInclusionDelay, - DataRoot: params.BeaconConfig().ZeroHash[:], - ParentRoot: encoded[:]}, - Source: ðpb.Checkpoint{}, - Target: ðpb.Checkpoint{}, - }, - AggregationBits: aggBits, - CustodyBits: custodyBits, - } - - attestingIndices, err := helpers.AttestingIndices(beaconState, att.Data, att.AggregationBits) - if err != nil { - t.Error(err) - } - dataAndCustodyBit := &pbp2p.AttestationDataAndCustodyBit{ - Data: att.Data, - CustodyBit: false, - } - hashTreeRoot, err := ssz.HashTreeRoot(dataAndCustodyBit) - if err != nil { - t.Error(err) - } - domain := helpers.Domain(beaconState, currentEpoch, params.BeaconConfig().DomainAttestation) - sigs := make([]*bls.Signature, len(attestingIndices)) - for i, indice := range attestingIndices { - priv, err := bls.RandKey(rand.Reader) - if err != nil { - t.Error(err) - } - beaconState.Validators[indice].PublicKey = priv.PublicKey().Marshal()[:] - sigs[i] = priv.Sign(hashTreeRoot[:], domain) - } - att.Signature = bls.AggregateSignatures(sigs).Marshal()[:] - - blk := ðpb.BeaconBlock{ - Slot: beaconState.Slot, - } - blkRoot, err := ssz.SigningRoot(blk) - if err != nil { - t.Fatal(err) - } - - proposerServer := &ProposerServer{ - pool: &mockOps.Operations{ - Attestations: []*ethpb.Attestation{att}, - }, - blockReceiver: &mock.ChainService{State: beaconState, Root: blkRoot[:]}, - headFetcher: &mock.ChainService{State: beaconState, Root: blkRoot[:]}, - } - - atts, err := proposerServer.attestations(context.Background(), stateSlot) - if err != nil { - t.Fatalf("Unexpected error fetching pending attestations: %v", err) - } - if len(atts) == 0 { - t.Error("Expected pending attestations list to be non-empty") - } -} - -func TestPendingAttestations_FiltersExpiredAttestations(t *testing.T) { - db := dbutil.SetupDB(t) - defer dbutil.TeardownDB(t, db) - - // This test breaks if it doesnt use mainnet config - params.OverrideBeaconConfig(params.MainnetConfig()) - defer params.OverrideBeaconConfig(params.MinimalSpecConfig()) - - // Edge case: current slot is at the end of an epoch. The pending attestation - // for the next slot should come from currentSlot + 1. - currentSlot := helpers.StartSlot( - 10, - ) - 1 - - expectedEpoch := uint64(100) - crosslink := ðpb.Crosslink{StartEpoch: 9, DataRoot: params.BeaconConfig().ZeroHash[:]} - encoded, err := ssz.HashTreeRoot(crosslink) - if err != nil { - t.Fatal(err) - } - - validators := make([]*ethpb.Validator, params.BeaconConfig().MinGenesisActiveValidatorCount/8) - for i := 0; i < len(validators); i++ { - validators[i] = ðpb.Validator{ - ExitEpoch: params.BeaconConfig().FarFutureEpoch, - } - } - - beaconState := &pbp2p.BeaconState{ - Validators: validators, - Slot: currentSlot + params.BeaconConfig().MinAttestationInclusionDelay, - Fork: &pbp2p.Fork{ - CurrentVersion: params.BeaconConfig().GenesisForkVersion, - PreviousVersion: params.BeaconConfig().GenesisForkVersion, - }, - CurrentJustifiedCheckpoint: ðpb.Checkpoint{ - Epoch: expectedEpoch, - }, - PreviousJustifiedCheckpoint: ðpb.Checkpoint{ - Epoch: expectedEpoch, - }, - CurrentCrosslinks: []*ethpb.Crosslink{{ - StartEpoch: 9, - DataRoot: params.BeaconConfig().ZeroHash[:], - }}, - RandaoMixes: make([][]byte, params.BeaconConfig().EpochsPerHistoricalVector), - ActiveIndexRoots: make([][]byte, params.BeaconConfig().EpochsPerHistoricalVector), - StateRoots: make([][]byte, params.BeaconConfig().EpochsPerHistoricalVector), - BlockRoots: make([][]byte, params.BeaconConfig().EpochsPerHistoricalVector), - LatestBlockHeader: ðpb.BeaconBlockHeader{StateRoot: []byte{}}, - } - - currentEpoch := helpers.CurrentEpoch(beaconState) - activeCount, err := helpers.ActiveValidatorCount(beaconState, currentEpoch) - if err != nil { - t.Error(err) - } - committeeCount, err := helpers.CommitteeCount(beaconState, currentEpoch) - if err != nil { - t.Error(err) - } - aggBits := bitfield.NewBitlist(activeCount / committeeCount) - for i := uint64(0); i < aggBits.Len(); i++ { - aggBits.SetBitAt(i, true) - } - custodyBits := bitfield.NewBitlist(activeCount / committeeCount) - att := ðpb.Attestation{ - Data: ðpb.AttestationData{ - Target: ðpb.Checkpoint{Epoch: 10}, - Source: ðpb.Checkpoint{Epoch: expectedEpoch}, - Crosslink: ðpb.Crosslink{EndEpoch: 10, DataRoot: params.BeaconConfig().ZeroHash[:], ParentRoot: encoded[:]}, - }, - AggregationBits: aggBits, - CustodyBits: custodyBits, - } - - attestingIndices, err := helpers.AttestingIndices(beaconState, att.Data, att.AggregationBits) - if err != nil { - t.Error(err) - } - domain := helpers.Domain(beaconState, expectedEpoch, params.BeaconConfig().DomainAttestation) - sigs := make([]*bls.Signature, len(attestingIndices)) - for i, indice := range attestingIndices { - priv, err := bls.RandKey(rand.Reader) - if err != nil { - t.Error(err) - } - dataAndCustodyBit := &pbp2p.AttestationDataAndCustodyBit{ - Data: att.Data, - CustodyBit: false, - } - hashTreeRoot, err := ssz.HashTreeRoot(dataAndCustodyBit) - if err != nil { - t.Error(err) - } - beaconState.Validators[indice].PublicKey = priv.PublicKey().Marshal()[:] - sigs[i] = priv.Sign(hashTreeRoot[:], domain) - } - aggregateSig := bls.AggregateSignatures(sigs).Marshal()[:] - att.Signature = aggregateSig - - att2 := proto.Clone(att).(*ethpb.Attestation) - att3 := proto.Clone(att).(*ethpb.Attestation) - - opService := &mockOps.Operations{ - Attestations: []*ethpb.Attestation{ - //Expired attestations - { - Data: ðpb.AttestationData{ - Target: ðpb.Checkpoint{Epoch: 10}, - Source: ðpb.Checkpoint{Epoch: expectedEpoch}, - Crosslink: ðpb.Crosslink{DataRoot: params.BeaconConfig().ZeroHash[:]}, - }, - AggregationBits: aggBits, - CustodyBits: custodyBits, - }, - { - Data: ðpb.AttestationData{ - Target: ðpb.Checkpoint{Epoch: 10}, - Source: ðpb.Checkpoint{Epoch: expectedEpoch}, - Crosslink: ðpb.Crosslink{DataRoot: params.BeaconConfig().ZeroHash[:]}, - }, - AggregationBits: aggBits, - CustodyBits: custodyBits, - }, - { - Data: ðpb.AttestationData{ - Target: ðpb.Checkpoint{Epoch: 10}, - Source: ðpb.Checkpoint{Epoch: expectedEpoch}, - Crosslink: ðpb.Crosslink{DataRoot: params.BeaconConfig().ZeroHash[:]}, - }, - AggregationBits: aggBits, - CustodyBits: custodyBits, - }, - { - Data: ðpb.AttestationData{ - Target: ðpb.Checkpoint{Epoch: 10}, - Source: ðpb.Checkpoint{Epoch: expectedEpoch}, - Crosslink: ðpb.Crosslink{DataRoot: params.BeaconConfig().ZeroHash[:]}, - }, - AggregationBits: aggBits, - CustodyBits: custodyBits, - }, - { - Data: ðpb.AttestationData{ - Target: ðpb.Checkpoint{Epoch: 10}, - Source: ðpb.Checkpoint{Epoch: expectedEpoch}, - Crosslink: ðpb.Crosslink{DataRoot: params.BeaconConfig().ZeroHash[:]}, - }, - AggregationBits: aggBits, - CustodyBits: custodyBits}, - // Non-expired attestation with incorrect justified epoch - { - Data: ðpb.AttestationData{ - Target: ðpb.Checkpoint{Epoch: 10}, - Source: ðpb.Checkpoint{Epoch: expectedEpoch - 1}, - Crosslink: ðpb.Crosslink{DataRoot: params.BeaconConfig().ZeroHash[:]}, - }, - AggregationBits: aggBits, - CustodyBits: custodyBits, - }, - // Non-expired attestations with correct justified epoch - att, - att2, - att3, - }, - } - blk := ðpb.BeaconBlock{ - Slot: beaconState.Slot, - } - blkRoot, err := ssz.SigningRoot(blk) - if err != nil { - t.Fatal(err) - } - - expectedNumberOfAttestations := 3 - proposerServer := &ProposerServer{ - beaconDB: db, - pool: opService, - blockReceiver: &mock.ChainService{State: beaconState, Root: blkRoot[:]}, - headFetcher: &mock.ChainService{State: beaconState, Root: blkRoot[:]}, - } - - atts, err := proposerServer.attestations(context.Background(), currentSlot+params.BeaconConfig().MinAttestationInclusionDelay+1) - if err != nil { - t.Fatalf("Unexpected error fetching pending attestations: %v", err) - } - if len(atts) != expectedNumberOfAttestations { - t.Errorf( - "Expected pending attestations list length %d, but was %d", - expectedNumberOfAttestations, - len(atts), - ) - } - - expectedAtts := []*ethpb.Attestation{ - { - Data: ðpb.AttestationData{ - Target: ðpb.Checkpoint{Epoch: 10}, - Source: ðpb.Checkpoint{Epoch: expectedEpoch}, - Crosslink: ðpb.Crosslink{EndEpoch: 10, DataRoot: params.BeaconConfig().ZeroHash[:], ParentRoot: encoded[:]}, - }, - AggregationBits: aggBits, - CustodyBits: custodyBits, - Signature: aggregateSig, - }, - { - Data: ðpb.AttestationData{ - Target: ðpb.Checkpoint{Epoch: 10}, - Source: ðpb.Checkpoint{Epoch: expectedEpoch}, - Crosslink: ðpb.Crosslink{EndEpoch: 10, DataRoot: params.BeaconConfig().ZeroHash[:], ParentRoot: encoded[:]}, - }, - AggregationBits: aggBits, - CustodyBits: custodyBits, - Signature: aggregateSig, - }, - { - Data: ðpb.AttestationData{ - Target: ðpb.Checkpoint{Epoch: 10}, - Source: ðpb.Checkpoint{Epoch: expectedEpoch}, - Crosslink: ðpb.Crosslink{EndEpoch: 10, DataRoot: params.BeaconConfig().ZeroHash[:], ParentRoot: encoded[:]}, - }, - AggregationBits: aggBits, - CustodyBits: custodyBits, - Signature: aggregateSig, - }, - } - if !reflect.DeepEqual(atts, expectedAtts) { - t.Error("Did not receive expected attestations") - } -} - func TestPendingDeposits_Eth1DataVoteOK(t *testing.T) { ctx := context.Background() diff --git a/beacon-chain/sync/validate_beacon_blocks.go b/beacon-chain/sync/validate_beacon_blocks.go index faa7da8704..07378f618b 100644 --- a/beacon-chain/sync/validate_beacon_blocks.go +++ b/beacon-chain/sync/validate_beacon_blocks.go @@ -26,6 +26,10 @@ func (r *RegularSync) validateBeaconBlockPubSub(ctx context.Context, msg proto.M log.WithField("validate", "beacon block").WithError(err).Error("Failed to get signing root of block") return false } + + // TODO(1332): Add blocks.VerifyAttestation before processing further. + // Discussion: https://github.com/ethereum/eth2.0-specs/issues/1332 + if recentlySeenRoots.Get(string(blockRoot[:])) != nil || r.db.HasBlock(ctx, blockRoot) { return false }