mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-05-02 03:02:54 -04:00
Clean up operation service (#3468)
* Cleaned up operation service * Fixed all the tests * Fixed node.go * Review feedback * Todo
This commit is contained in:
committed by
Raul Jordan
parent
bef58620fc
commit
bf07cfcdab
@@ -277,7 +277,6 @@ func (b *BeaconNode) registerBlockchainService(ctx *cli.Context) error {
|
||||
func (b *BeaconNode) registerOperationService(ctx *cli.Context) error {
|
||||
operationService := operations.NewService(context.Background(), &operations.Config{
|
||||
BeaconDB: b.db,
|
||||
P2P: b.fetchP2P(ctx),
|
||||
})
|
||||
|
||||
return b.services.RegisterService(operationService)
|
||||
|
||||
@@ -7,11 +7,8 @@ go_library(
|
||||
visibility = ["//beacon-chain:__subpackages__"],
|
||||
deps = [
|
||||
"//beacon-chain/core/blocks:go_default_library",
|
||||
"//beacon-chain/core/helpers:go_default_library",
|
||||
"//beacon-chain/core/state:go_default_library",
|
||||
"//beacon-chain/db:go_default_library",
|
||||
"//beacon-chain/p2p:go_default_library",
|
||||
"//proto/beacon/p2p/v1:go_default_library",
|
||||
"//proto/eth/v1alpha1:go_default_library",
|
||||
"//shared/bls:go_default_library",
|
||||
"//shared/event:go_default_library",
|
||||
@@ -43,7 +40,6 @@ go_test(
|
||||
"//shared/hashutil:go_default_library",
|
||||
"//shared/params:go_default_library",
|
||||
"//shared/testutil:go_default_library",
|
||||
"@com_github_gogo_protobuf//proto:go_default_library",
|
||||
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
|
||||
"@com_github_prysmaticlabs_go_ssz//:go_default_library",
|
||||
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
|
||||
|
||||
@@ -13,11 +13,8 @@ import (
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/go-ssz"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/blocks"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/state"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/db"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
|
||||
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
|
||||
ethpb "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/shared/bls"
|
||||
"github.com/prysmaticlabs/prysm/shared/event"
|
||||
@@ -45,8 +42,6 @@ type Handler interface {
|
||||
// OperationFeeds inteface defines the informational feeds from the operations
|
||||
// service.
|
||||
type OperationFeeds interface {
|
||||
IncomingAttFeed() *event.Feed
|
||||
IncomingExitFeed() *event.Feed
|
||||
IncomingProcessedBlockFeed() *event.Feed
|
||||
}
|
||||
|
||||
@@ -56,13 +51,8 @@ type Service struct {
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
beaconDB db.Database
|
||||
incomingExitFeed *event.Feed
|
||||
incomingValidatorExits chan *ethpb.VoluntaryExit
|
||||
incomingAttFeed *event.Feed
|
||||
incomingAtt chan *ethpb.Attestation
|
||||
incomingProcessedBlockFeed *event.Feed
|
||||
incomingProcessedBlock chan *ethpb.BeaconBlock
|
||||
p2p p2p.Broadcaster
|
||||
error error
|
||||
attestationLockCache *ccache.Cache
|
||||
}
|
||||
@@ -70,7 +60,6 @@ type Service struct {
|
||||
// Config options for the service.
|
||||
type Config struct {
|
||||
BeaconDB db.Database
|
||||
P2P p2p.Broadcaster
|
||||
}
|
||||
|
||||
// NewService instantiates a new operation service instance that will
|
||||
@@ -81,13 +70,8 @@ func NewService(ctx context.Context, cfg *Config) *Service {
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
beaconDB: cfg.BeaconDB,
|
||||
incomingExitFeed: new(event.Feed),
|
||||
incomingValidatorExits: make(chan *ethpb.VoluntaryExit, params.BeaconConfig().DefaultBufferSize),
|
||||
incomingAttFeed: new(event.Feed),
|
||||
incomingAtt: make(chan *ethpb.Attestation, params.BeaconConfig().DefaultBufferSize),
|
||||
incomingProcessedBlockFeed: new(event.Feed),
|
||||
incomingProcessedBlock: make(chan *ethpb.BeaconBlock, params.BeaconConfig().DefaultBufferSize),
|
||||
p2p: cfg.P2P,
|
||||
attestationLockCache: ccache.New(ccache.Configure()),
|
||||
}
|
||||
}
|
||||
@@ -95,7 +79,6 @@ func NewService(ctx context.Context, cfg *Config) *Service {
|
||||
// Start an beacon block operation pool service's main event loop.
|
||||
func (s *Service) Start() {
|
||||
log.Info("Starting service")
|
||||
go s.saveOperations()
|
||||
go s.removeOperations()
|
||||
}
|
||||
|
||||
@@ -115,18 +98,6 @@ func (s *Service) Status() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// IncomingExitFeed returns a feed that any service can send incoming p2p exits object into.
|
||||
// The beacon block operation pool service will subscribe to this feed in order to relay incoming exits.
|
||||
func (s *Service) IncomingExitFeed() *event.Feed {
|
||||
return s.incomingExitFeed
|
||||
}
|
||||
|
||||
// IncomingAttFeed returns a feed that any service can send incoming p2p attestations into.
|
||||
// The beacon block operation pool service will subscribe to this feed in order to relay incoming attestations.
|
||||
func (s *Service) IncomingAttFeed() *event.Feed {
|
||||
return s.incomingAttFeed
|
||||
}
|
||||
|
||||
// IncomingProcessedBlockFeed returns a feed that any service can send incoming p2p beacon blocks into.
|
||||
// The beacon block operation pool service will subscribe to this feed in order to receive incoming beacon blocks.
|
||||
func (s *Service) IncomingProcessedBlockFeed() *event.Feed {
|
||||
@@ -155,7 +126,7 @@ func (s *Service) retrieveLock(key [32]byte) *sync.Mutex {
|
||||
// capacity. The attestations get deleted in DB after they have been retrieved.
|
||||
func (s *Service) AttestationPool(ctx context.Context, requestedSlot uint64) ([]*ethpb.Attestation, error) {
|
||||
var attestations []*ethpb.Attestation
|
||||
attestationsFromDB, err := s.beaconDB.Attestations(ctx, nil /*filter*/)
|
||||
atts, err := s.beaconDB.Attestations(ctx, nil /*filter*/)
|
||||
if err != nil {
|
||||
return nil, errors.New("could not retrieve attestations from DB")
|
||||
}
|
||||
@@ -164,24 +135,20 @@ func (s *Service) AttestationPool(ctx context.Context, requestedSlot uint64) ([]
|
||||
return nil, errors.New("could not retrieve attestations from DB")
|
||||
}
|
||||
|
||||
bState, err = state.ProcessSlots(ctx, bState, requestedSlot)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "could not process slots up to %d", requestedSlot)
|
||||
if bState.Slot < requestedSlot {
|
||||
bState, err = state.ProcessSlots(ctx, bState, requestedSlot)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "could not process slots up to %d", requestedSlot)
|
||||
}
|
||||
}
|
||||
|
||||
sort.Slice(attestationsFromDB, func(i, j int) bool {
|
||||
return attestationsFromDB[i].Data.Crosslink.Shard < attestationsFromDB[j].Data.Crosslink.Shard
|
||||
sort.Slice(atts, func(i, j int) bool {
|
||||
return atts[i].Data.Target.Epoch < atts[j].Data.Target.Epoch
|
||||
})
|
||||
|
||||
var validAttsCount uint64
|
||||
for _, att := range attestationsFromDB {
|
||||
slot, err := helpers.AttestationDataSlot(bState, att.Data)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not get attestation slot")
|
||||
}
|
||||
// Delete the attestation if the attestation is one epoch older than head state,
|
||||
// we don't want to pass these attestations to RPC for proposer to include.
|
||||
if slot+params.BeaconConfig().SlotsPerEpoch <= bState.Slot {
|
||||
for _, att := range atts {
|
||||
if _, err = blocks.ProcessAttestation(bState, att); err != nil {
|
||||
hash, err := ssz.HashTreeRoot(att)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -189,7 +156,6 @@ func (s *Service) AttestationPool(ctx context.Context, requestedSlot uint64) ([]
|
||||
if err := s.beaconDB.DeleteAttestation(ctx, hash); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
validAttsCount++
|
||||
@@ -197,39 +163,11 @@ func (s *Service) AttestationPool(ctx context.Context, requestedSlot uint64) ([]
|
||||
if validAttsCount == params.BeaconConfig().MaxAttestations {
|
||||
break
|
||||
}
|
||||
|
||||
attestations = append(attestations, att)
|
||||
}
|
||||
return attestations, nil
|
||||
}
|
||||
|
||||
// saveOperations saves the newly broadcasted beacon block operations
|
||||
// that was received from sync service.
|
||||
func (s *Service) saveOperations() {
|
||||
// TODO(1438): Add rest of operations (slashings, attestation, exists...etc)
|
||||
incomingSub := s.incomingExitFeed.Subscribe(s.incomingValidatorExits)
|
||||
defer incomingSub.Unsubscribe()
|
||||
incomingAttSub := s.incomingAttFeed.Subscribe(s.incomingAtt)
|
||||
defer incomingAttSub.Unsubscribe()
|
||||
|
||||
for {
|
||||
ctx := context.TODO()
|
||||
select {
|
||||
case <-incomingSub.Err():
|
||||
log.Debug("Subscriber closed, exiting goroutine")
|
||||
return
|
||||
case <-s.ctx.Done():
|
||||
log.Debug("operations service context closed, exiting save goroutine")
|
||||
return
|
||||
// Listen for a newly received incoming exit from the sync service.
|
||||
case exit := <-s.incomingValidatorExits:
|
||||
handler.SafelyHandleMessage(ctx, s.HandleValidatorExits, exit)
|
||||
case attestation := <-s.incomingAtt:
|
||||
handler.SafelyHandleMessage(ctx, s.HandleAttestation, attestation)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// HandleValidatorExits processes a validator exit operation.
|
||||
func (s *Service) HandleValidatorExits(ctx context.Context, message proto.Message) error {
|
||||
ctx, span := trace.StartSpan(ctx, "operations.HandleValidatorExits")
|
||||
@@ -275,10 +213,6 @@ func (s *Service) HandleAttestation(ctx context.Context, message proto.Message)
|
||||
}
|
||||
}
|
||||
|
||||
if err := blocks.VerifyAttestation(bState, attestation); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
incomingAttBits := attestation.AggregationBits
|
||||
if s.beaconDB.HasAttestation(ctx, root) {
|
||||
dbAtt, err := s.beaconDB.Attestation(ctx, root)
|
||||
@@ -339,13 +273,6 @@ func (s *Service) handleProcessedBlock(ctx context.Context, message proto.Messag
|
||||
if err := s.removeAttestationsFromPool(ctx, block.Body.Attestations); err != nil {
|
||||
return errors.Wrap(err, "could not remove processed attestations from DB")
|
||||
}
|
||||
state, err := s.beaconDB.HeadState(ctx)
|
||||
if err != nil {
|
||||
return errors.New("could not retrieve attestations from DB")
|
||||
}
|
||||
if err := s.removeEpochOldAttestations(ctx, state); err != nil {
|
||||
return errors.Wrapf(err, "could not remove old attestations from DB at slot %d", block.Slot)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -367,28 +294,3 @@ func (s *Service) removeAttestationsFromPool(ctx context.Context, attestations [
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// removeEpochOldAttestations removes attestations that's older than one epoch length from current slot.
|
||||
func (s *Service) removeEpochOldAttestations(ctx context.Context, beaconState *pb.BeaconState) error {
|
||||
attestations, err := s.beaconDB.Attestations(ctx, nil /*filter*/)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, a := range attestations {
|
||||
slot, err := helpers.AttestationDataSlot(beaconState, a.Data)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not get attestation slot")
|
||||
}
|
||||
// Remove attestation from DB if it's one epoch older than slot.
|
||||
if slot-params.BeaconConfig().SlotsPerEpoch >= slot {
|
||||
hash, err := ssz.HashTreeRoot(a)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.beaconDB.DeleteAttestation(ctx, hash); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -6,11 +6,9 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sort"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/prysmaticlabs/go-bitfield"
|
||||
"github.com/prysmaticlabs/go-ssz"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/blocks"
|
||||
@@ -29,12 +27,6 @@ import (
|
||||
var _ = OperationFeeds(&Service{})
|
||||
var _ = Pool(&Service{})
|
||||
|
||||
type mockBroadcaster struct{}
|
||||
|
||||
func (mb *mockBroadcaster) Broadcast(_ context.Context, _ proto.Message) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestStop_OK(t *testing.T) {
|
||||
hook := logTest.NewGlobal()
|
||||
opsService := NewService(context.Background(), &Config{})
|
||||
@@ -87,10 +79,8 @@ func TestIncomingExits_Ok(t *testing.T) {
|
||||
func TestHandleAttestation_Saves_NewAttestation(t *testing.T) {
|
||||
beaconDB := dbutil.SetupDB(t)
|
||||
defer dbutil.TeardownDB(t, beaconDB)
|
||||
broadcaster := &mockBroadcaster{}
|
||||
service := NewService(context.Background(), &Config{
|
||||
BeaconDB: beaconDB,
|
||||
P2P: broadcaster,
|
||||
})
|
||||
|
||||
deposits, privKeys := testutil.SetupInitialDeposits(t, 100)
|
||||
@@ -174,10 +164,8 @@ func TestHandleAttestation_Aggregates_LargeNumValidators(t *testing.T) {
|
||||
beaconDB := dbutil.SetupDB(t)
|
||||
defer dbutil.TeardownDB(t, beaconDB)
|
||||
ctx := context.Background()
|
||||
broadcaster := &mockBroadcaster{}
|
||||
opsSrv := NewService(ctx, &Config{
|
||||
BeaconDB: beaconDB,
|
||||
P2P: broadcaster,
|
||||
})
|
||||
|
||||
// First, we create a common attestation data.
|
||||
@@ -291,10 +279,8 @@ func TestHandleAttestation_Skips_PreviouslyAggregatedAttestations(t *testing.T)
|
||||
defer dbutil.TeardownDB(t, beaconDB)
|
||||
ctx := context.Background()
|
||||
helpers.ClearAllCaches()
|
||||
broadcaster := &mockBroadcaster{}
|
||||
service := NewService(context.Background(), &Config{
|
||||
BeaconDB: beaconDB,
|
||||
P2P: broadcaster,
|
||||
})
|
||||
|
||||
deposits, privKeys := testutil.SetupInitialDeposits(t, 200)
|
||||
@@ -477,32 +463,55 @@ func TestRetrieveAttestations_OK(t *testing.T) {
|
||||
defer dbutil.TeardownDB(t, beaconDB)
|
||||
service := NewService(context.Background(), &Config{BeaconDB: beaconDB})
|
||||
|
||||
// Save 140 attestations for test. During 1st retrieval we should get slot:1 - slot:61 attestations.
|
||||
// The 1st retrieval is set at slot 64.
|
||||
origAttestations := make([]*ethpb.Attestation, 140)
|
||||
for i := 0; i < len(origAttestations); i++ {
|
||||
origAttestations[i] = ðpb.Attestation{
|
||||
Data: ðpb.AttestationData{
|
||||
Crosslink: ðpb.Crosslink{
|
||||
Shard: uint64(i),
|
||||
},
|
||||
Source: ðpb.Checkpoint{},
|
||||
Target: ðpb.Checkpoint{},
|
||||
},
|
||||
}
|
||||
if err := service.beaconDB.SaveAttestation(context.Background(), origAttestations[i]); err != nil {
|
||||
t.Fatalf("Failed to save attestation: %v", err)
|
||||
}
|
||||
deposits, _ := testutil.SetupInitialDeposits(t, 100)
|
||||
beaconState, err := state.GenesisBeaconState(deposits, uint64(0), ðpb.Eth1Data{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
aggBits := bitfield.NewBitlist(1)
|
||||
aggBits.SetBitAt(1, true)
|
||||
custodyBits := bitfield.NewBitlist(1)
|
||||
att := ðpb.Attestation{
|
||||
Data: ðpb.AttestationData{
|
||||
Source: ðpb.Checkpoint{Epoch: 0, Root: []byte("hello-world")},
|
||||
Target: ðpb.Checkpoint{Epoch: 0},
|
||||
Crosslink: ðpb.Crosslink{
|
||||
Shard: 0,
|
||||
StartEpoch: 0,
|
||||
},
|
||||
},
|
||||
AggregationBits: aggBits,
|
||||
CustodyBits: custodyBits,
|
||||
}
|
||||
zeroSig := [96]byte{}
|
||||
att.Signature = zeroSig[:]
|
||||
|
||||
beaconState.Slot += params.BeaconConfig().MinAttestationInclusionDelay
|
||||
beaconState.CurrentCrosslinks = []*ethpb.Crosslink{
|
||||
{
|
||||
Shard: 0,
|
||||
StartEpoch: 0,
|
||||
},
|
||||
}
|
||||
beaconState.CurrentJustifiedCheckpoint.Root = []byte("hello-world")
|
||||
beaconState.CurrentEpochAttestations = []*pb.PendingAttestation{}
|
||||
|
||||
encoded, err := ssz.HashTreeRoot(beaconState.CurrentCrosslinks[0])
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
att.Data.Crosslink.ParentRoot = encoded[:]
|
||||
att.Data.Crosslink.DataRoot = params.BeaconConfig().ZeroHash[:]
|
||||
if err := beaconDB.SaveAttestation(context.Background(), att); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
headBlockRoot := [32]byte{1, 2, 3}
|
||||
if err := beaconDB.SaveHeadBlockRoot(context.Background(), headBlockRoot); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := beaconDB.SaveState(context.Background(), &pb.BeaconState{
|
||||
Slot: 64,
|
||||
CurrentCrosslinks: []*ethpb.Crosslink{{
|
||||
StartEpoch: 0,
|
||||
DataRoot: params.BeaconConfig().ZeroHash[:]}}}, headBlockRoot); err != nil {
|
||||
if err := beaconDB.SaveState(context.Background(), beaconState, headBlockRoot); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// Test we can retrieve attestations from slot1 - slot61.
|
||||
@@ -510,10 +519,8 @@ func TestRetrieveAttestations_OK(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("Could not retrieve attestations: %v", err)
|
||||
}
|
||||
sort.Slice(attestations, func(i, j int) bool {
|
||||
return attestations[i].Data.Crosslink.Shard < attestations[j].Data.Crosslink.Shard
|
||||
})
|
||||
if !reflect.DeepEqual(attestations, origAttestations[0:127]) {
|
||||
|
||||
if !reflect.DeepEqual(attestations[0], att) {
|
||||
t.Error("Retrieved attestations did not match")
|
||||
}
|
||||
}
|
||||
@@ -558,8 +565,7 @@ func TestRetrieveAttestations_PruneInvalidAtts(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("Could not retrieve attestations: %v", err)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(attestations, origAttestations[137:]) {
|
||||
if len(attestations) != 127 {
|
||||
t.Error("Incorrect pruned attestations")
|
||||
}
|
||||
|
||||
@@ -605,21 +611,16 @@ func TestRemoveProcessedAttestations_Ok(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
retrievedAtts, err := s.AttestationPool(context.Background(), 15)
|
||||
if err != nil {
|
||||
t.Fatalf("Could not retrieve attestations: %v", err)
|
||||
}
|
||||
if !reflect.DeepEqual(attestations, retrievedAtts) {
|
||||
t.Error("Retrieved attestations did not match prev generated attestations")
|
||||
}
|
||||
|
||||
if err := s.removeAttestationsFromPool(context.Background(), attestations); err != nil {
|
||||
t.Fatalf("Could not remove attestations: %v", err)
|
||||
}
|
||||
|
||||
retrievedAtts, _ = s.AttestationPool(context.Background(), 15)
|
||||
if len(retrievedAtts) != 0 {
|
||||
t.Errorf("Attestation pool should be empty but got a length of %d", len(retrievedAtts))
|
||||
atts, err := s.AttestationPool(context.Background(), 15)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(atts) != 0 {
|
||||
t.Errorf("Attestation pool should be empty but got a length of %d", len(atts))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -22,7 +22,6 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/shared/hashutil"
|
||||
"github.com/prysmaticlabs/prysm/shared/params"
|
||||
"github.com/prysmaticlabs/prysm/shared/trieutil"
|
||||
"github.com/sirupsen/logrus"
|
||||
"go.opencensus.io/trace"
|
||||
)
|
||||
|
||||
@@ -70,7 +69,7 @@ func (ps *ProposerServer) RequestBlock(ctx context.Context, req *pb.BlockRequest
|
||||
}
|
||||
|
||||
// Pack aggregated attestations which have not been included in the beacon chain.
|
||||
attestations, err := ps.attestations(ctx, req.Slot)
|
||||
atts, err := ps.pool.AttestationPool(ctx, req.Slot)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not get pending attestations")
|
||||
}
|
||||
@@ -87,7 +86,7 @@ func (ps *ProposerServer) RequestBlock(ctx context.Context, req *pb.BlockRequest
|
||||
Body: ðpb.BeaconBlockBody{
|
||||
Eth1Data: eth1Data,
|
||||
Deposits: deposits,
|
||||
Attestations: attestations,
|
||||
Attestations: atts,
|
||||
RandaoReveal: req.RandaoReveal,
|
||||
// TODO(2766): Implement rest of the retrievals for beacon block operations
|
||||
Transfers: []*ethpb.Transfer{},
|
||||
@@ -128,70 +127,6 @@ func (ps *ProposerServer) ProposeBlock(ctx context.Context, blk *ethpb.BeaconBlo
|
||||
return &pb.ProposeResponse{BlockRoot: root[:]}, nil
|
||||
}
|
||||
|
||||
// attestations retrieves aggregated attestations kept in the beacon node's operations pool which have
|
||||
// not yet been included into the beacon chain. Proposers include these pending attestations in their
|
||||
// proposed blocks when performing their responsibility. If desired, callers can choose to filter pending
|
||||
// attestations which are ready for inclusion. That is, attestations that satisfy:
|
||||
// attestation.slot + MIN_ATTESTATION_INCLUSION_DELAY <= state.slot.
|
||||
func (ps *ProposerServer) attestations(ctx context.Context, expectedSlot uint64) ([]*ethpb.Attestation, error) {
|
||||
beaconState := ps.headFetcher.HeadState()
|
||||
atts, err := ps.pool.AttestationPool(ctx, expectedSlot)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not retrieve pending attestations from operations service")
|
||||
}
|
||||
|
||||
// advance slot, if it is behind
|
||||
if beaconState.Slot < expectedSlot {
|
||||
beaconState, err = state.ProcessSlots(ctx, beaconState, expectedSlot)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
var attsReadyForInclusion []*ethpb.Attestation
|
||||
for _, att := range atts {
|
||||
slot, err := helpers.AttestationDataSlot(beaconState, att.Data)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not get attestation slot")
|
||||
}
|
||||
if slot+params.BeaconConfig().MinAttestationInclusionDelay <= beaconState.Slot &&
|
||||
beaconState.Slot <= slot+params.BeaconConfig().SlotsPerEpoch {
|
||||
attsReadyForInclusion = append(attsReadyForInclusion, att)
|
||||
}
|
||||
}
|
||||
|
||||
validAtts := make([]*ethpb.Attestation, 0, len(attsReadyForInclusion))
|
||||
for _, att := range attsReadyForInclusion {
|
||||
slot, err := helpers.AttestationDataSlot(beaconState, att.Data)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not get attestation slot")
|
||||
}
|
||||
|
||||
if _, err := blocks.ProcessAttestationNoVerify(beaconState, att); err != nil {
|
||||
if ctx.Err() != nil {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
|
||||
log.WithError(err).WithFields(logrus.Fields{
|
||||
"slot": slot,
|
||||
"headRoot": fmt.Sprintf("%#x", bytesutil.Trunc(att.Data.BeaconBlockRoot))}).Info(
|
||||
"Deleting failed pending attestation from DB")
|
||||
|
||||
root, err := ssz.HashTreeRoot(att.Data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := ps.beaconDB.DeleteAttestation(ctx, root); err != nil {
|
||||
return nil, errors.Wrap(err, "could not delete failed attestation")
|
||||
}
|
||||
continue
|
||||
}
|
||||
validAtts = append(validAtts, att)
|
||||
}
|
||||
|
||||
return validAtts, nil
|
||||
}
|
||||
|
||||
// eth1Data determines the appropriate eth1data for a block proposal. The algorithm for this method
|
||||
// is as follows:
|
||||
// - Determine the timestamp for the start slot for the eth1 voting period.
|
||||
|
||||
@@ -2,14 +2,11 @@ package rpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"math/big"
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/prysmaticlabs/go-bitfield"
|
||||
"github.com/prysmaticlabs/go-ssz"
|
||||
mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/cache/depositcache"
|
||||
@@ -17,11 +14,9 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/state"
|
||||
dbutil "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
|
||||
mockOps "github.com/prysmaticlabs/prysm/beacon-chain/operations/testing"
|
||||
mockPOW "github.com/prysmaticlabs/prysm/beacon-chain/powchain/testing"
|
||||
pbp2p "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
|
||||
ethpb "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/shared/bls"
|
||||
"github.com/prysmaticlabs/prysm/shared/hashutil"
|
||||
"github.com/prysmaticlabs/prysm/shared/params"
|
||||
"github.com/prysmaticlabs/prysm/shared/testutil"
|
||||
@@ -161,357 +156,6 @@ func TestComputeStateRoot_OK(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestPendingAttestations_FiltersWithinInclusionDelay(t *testing.T) {
|
||||
helpers.ClearAllCaches()
|
||||
// This test breaks if it doesnt use mainnet config
|
||||
params.OverrideBeaconConfig(params.MainnetConfig())
|
||||
defer params.OverrideBeaconConfig(params.MinimalSpecConfig())
|
||||
validators := make([]*ethpb.Validator, params.BeaconConfig().MinGenesisActiveValidatorCount/8)
|
||||
for i := 0; i < len(validators); i++ {
|
||||
validators[i] = ðpb.Validator{
|
||||
ExitEpoch: params.BeaconConfig().FarFutureEpoch,
|
||||
}
|
||||
}
|
||||
|
||||
crosslinks := make([]*ethpb.Crosslink, params.BeaconConfig().ShardCount)
|
||||
for i := 0; i < len(crosslinks); i++ {
|
||||
crosslinks[i] = ðpb.Crosslink{
|
||||
StartEpoch: 1,
|
||||
DataRoot: params.BeaconConfig().ZeroHash[:],
|
||||
}
|
||||
}
|
||||
|
||||
stateSlot := uint64(100)
|
||||
beaconState := &pbp2p.BeaconState{
|
||||
Slot: stateSlot,
|
||||
Fork: &pbp2p.Fork{
|
||||
CurrentVersion: params.BeaconConfig().GenesisForkVersion,
|
||||
PreviousVersion: params.BeaconConfig().GenesisForkVersion,
|
||||
},
|
||||
Validators: validators,
|
||||
CurrentCrosslinks: crosslinks,
|
||||
PreviousCrosslinks: crosslinks,
|
||||
StartShard: 100,
|
||||
RandaoMixes: make([][]byte, params.BeaconConfig().EpochsPerHistoricalVector),
|
||||
ActiveIndexRoots: make([][]byte, params.BeaconConfig().EpochsPerHistoricalVector),
|
||||
FinalizedCheckpoint: ðpb.Checkpoint{},
|
||||
PreviousJustifiedCheckpoint: ðpb.Checkpoint{},
|
||||
CurrentJustifiedCheckpoint: ðpb.Checkpoint{},
|
||||
}
|
||||
|
||||
encoded, err := ssz.HashTreeRoot(beaconState.PreviousCrosslinks[0])
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
currentEpoch := helpers.CurrentEpoch(beaconState)
|
||||
activeCount, err := helpers.ActiveValidatorCount(beaconState, currentEpoch)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
committeeCount, err := helpers.CommitteeCount(beaconState, currentEpoch)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
aggBits := bitfield.NewBitlist(activeCount / committeeCount)
|
||||
for i := uint64(0); i < aggBits.Len(); i++ {
|
||||
aggBits.SetBitAt(i, true)
|
||||
}
|
||||
custodyBits := bitfield.NewBitlist(activeCount / committeeCount)
|
||||
att := ðpb.Attestation{
|
||||
Data: ðpb.AttestationData{
|
||||
Crosslink: ðpb.Crosslink{
|
||||
Shard: beaconState.Slot - params.BeaconConfig().MinAttestationInclusionDelay,
|
||||
DataRoot: params.BeaconConfig().ZeroHash[:],
|
||||
ParentRoot: encoded[:]},
|
||||
Source: ðpb.Checkpoint{},
|
||||
Target: ðpb.Checkpoint{},
|
||||
},
|
||||
AggregationBits: aggBits,
|
||||
CustodyBits: custodyBits,
|
||||
}
|
||||
|
||||
attestingIndices, err := helpers.AttestingIndices(beaconState, att.Data, att.AggregationBits)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
dataAndCustodyBit := &pbp2p.AttestationDataAndCustodyBit{
|
||||
Data: att.Data,
|
||||
CustodyBit: false,
|
||||
}
|
||||
hashTreeRoot, err := ssz.HashTreeRoot(dataAndCustodyBit)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
domain := helpers.Domain(beaconState, currentEpoch, params.BeaconConfig().DomainAttestation)
|
||||
sigs := make([]*bls.Signature, len(attestingIndices))
|
||||
for i, indice := range attestingIndices {
|
||||
priv, err := bls.RandKey(rand.Reader)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
beaconState.Validators[indice].PublicKey = priv.PublicKey().Marshal()[:]
|
||||
sigs[i] = priv.Sign(hashTreeRoot[:], domain)
|
||||
}
|
||||
att.Signature = bls.AggregateSignatures(sigs).Marshal()[:]
|
||||
|
||||
blk := ðpb.BeaconBlock{
|
||||
Slot: beaconState.Slot,
|
||||
}
|
||||
blkRoot, err := ssz.SigningRoot(blk)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
proposerServer := &ProposerServer{
|
||||
pool: &mockOps.Operations{
|
||||
Attestations: []*ethpb.Attestation{att},
|
||||
},
|
||||
blockReceiver: &mock.ChainService{State: beaconState, Root: blkRoot[:]},
|
||||
headFetcher: &mock.ChainService{State: beaconState, Root: blkRoot[:]},
|
||||
}
|
||||
|
||||
atts, err := proposerServer.attestations(context.Background(), stateSlot)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error fetching pending attestations: %v", err)
|
||||
}
|
||||
if len(atts) == 0 {
|
||||
t.Error("Expected pending attestations list to be non-empty")
|
||||
}
|
||||
}
|
||||
|
||||
func TestPendingAttestations_FiltersExpiredAttestations(t *testing.T) {
|
||||
db := dbutil.SetupDB(t)
|
||||
defer dbutil.TeardownDB(t, db)
|
||||
|
||||
// This test breaks if it doesnt use mainnet config
|
||||
params.OverrideBeaconConfig(params.MainnetConfig())
|
||||
defer params.OverrideBeaconConfig(params.MinimalSpecConfig())
|
||||
|
||||
// Edge case: current slot is at the end of an epoch. The pending attestation
|
||||
// for the next slot should come from currentSlot + 1.
|
||||
currentSlot := helpers.StartSlot(
|
||||
10,
|
||||
) - 1
|
||||
|
||||
expectedEpoch := uint64(100)
|
||||
crosslink := ðpb.Crosslink{StartEpoch: 9, DataRoot: params.BeaconConfig().ZeroHash[:]}
|
||||
encoded, err := ssz.HashTreeRoot(crosslink)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
validators := make([]*ethpb.Validator, params.BeaconConfig().MinGenesisActiveValidatorCount/8)
|
||||
for i := 0; i < len(validators); i++ {
|
||||
validators[i] = ðpb.Validator{
|
||||
ExitEpoch: params.BeaconConfig().FarFutureEpoch,
|
||||
}
|
||||
}
|
||||
|
||||
beaconState := &pbp2p.BeaconState{
|
||||
Validators: validators,
|
||||
Slot: currentSlot + params.BeaconConfig().MinAttestationInclusionDelay,
|
||||
Fork: &pbp2p.Fork{
|
||||
CurrentVersion: params.BeaconConfig().GenesisForkVersion,
|
||||
PreviousVersion: params.BeaconConfig().GenesisForkVersion,
|
||||
},
|
||||
CurrentJustifiedCheckpoint: ðpb.Checkpoint{
|
||||
Epoch: expectedEpoch,
|
||||
},
|
||||
PreviousJustifiedCheckpoint: ðpb.Checkpoint{
|
||||
Epoch: expectedEpoch,
|
||||
},
|
||||
CurrentCrosslinks: []*ethpb.Crosslink{{
|
||||
StartEpoch: 9,
|
||||
DataRoot: params.BeaconConfig().ZeroHash[:],
|
||||
}},
|
||||
RandaoMixes: make([][]byte, params.BeaconConfig().EpochsPerHistoricalVector),
|
||||
ActiveIndexRoots: make([][]byte, params.BeaconConfig().EpochsPerHistoricalVector),
|
||||
StateRoots: make([][]byte, params.BeaconConfig().EpochsPerHistoricalVector),
|
||||
BlockRoots: make([][]byte, params.BeaconConfig().EpochsPerHistoricalVector),
|
||||
LatestBlockHeader: ðpb.BeaconBlockHeader{StateRoot: []byte{}},
|
||||
}
|
||||
|
||||
currentEpoch := helpers.CurrentEpoch(beaconState)
|
||||
activeCount, err := helpers.ActiveValidatorCount(beaconState, currentEpoch)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
committeeCount, err := helpers.CommitteeCount(beaconState, currentEpoch)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
aggBits := bitfield.NewBitlist(activeCount / committeeCount)
|
||||
for i := uint64(0); i < aggBits.Len(); i++ {
|
||||
aggBits.SetBitAt(i, true)
|
||||
}
|
||||
custodyBits := bitfield.NewBitlist(activeCount / committeeCount)
|
||||
att := ðpb.Attestation{
|
||||
Data: ðpb.AttestationData{
|
||||
Target: ðpb.Checkpoint{Epoch: 10},
|
||||
Source: ðpb.Checkpoint{Epoch: expectedEpoch},
|
||||
Crosslink: ðpb.Crosslink{EndEpoch: 10, DataRoot: params.BeaconConfig().ZeroHash[:], ParentRoot: encoded[:]},
|
||||
},
|
||||
AggregationBits: aggBits,
|
||||
CustodyBits: custodyBits,
|
||||
}
|
||||
|
||||
attestingIndices, err := helpers.AttestingIndices(beaconState, att.Data, att.AggregationBits)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
domain := helpers.Domain(beaconState, expectedEpoch, params.BeaconConfig().DomainAttestation)
|
||||
sigs := make([]*bls.Signature, len(attestingIndices))
|
||||
for i, indice := range attestingIndices {
|
||||
priv, err := bls.RandKey(rand.Reader)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
dataAndCustodyBit := &pbp2p.AttestationDataAndCustodyBit{
|
||||
Data: att.Data,
|
||||
CustodyBit: false,
|
||||
}
|
||||
hashTreeRoot, err := ssz.HashTreeRoot(dataAndCustodyBit)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
beaconState.Validators[indice].PublicKey = priv.PublicKey().Marshal()[:]
|
||||
sigs[i] = priv.Sign(hashTreeRoot[:], domain)
|
||||
}
|
||||
aggregateSig := bls.AggregateSignatures(sigs).Marshal()[:]
|
||||
att.Signature = aggregateSig
|
||||
|
||||
att2 := proto.Clone(att).(*ethpb.Attestation)
|
||||
att3 := proto.Clone(att).(*ethpb.Attestation)
|
||||
|
||||
opService := &mockOps.Operations{
|
||||
Attestations: []*ethpb.Attestation{
|
||||
//Expired attestations
|
||||
{
|
||||
Data: ðpb.AttestationData{
|
||||
Target: ðpb.Checkpoint{Epoch: 10},
|
||||
Source: ðpb.Checkpoint{Epoch: expectedEpoch},
|
||||
Crosslink: ðpb.Crosslink{DataRoot: params.BeaconConfig().ZeroHash[:]},
|
||||
},
|
||||
AggregationBits: aggBits,
|
||||
CustodyBits: custodyBits,
|
||||
},
|
||||
{
|
||||
Data: ðpb.AttestationData{
|
||||
Target: ðpb.Checkpoint{Epoch: 10},
|
||||
Source: ðpb.Checkpoint{Epoch: expectedEpoch},
|
||||
Crosslink: ðpb.Crosslink{DataRoot: params.BeaconConfig().ZeroHash[:]},
|
||||
},
|
||||
AggregationBits: aggBits,
|
||||
CustodyBits: custodyBits,
|
||||
},
|
||||
{
|
||||
Data: ðpb.AttestationData{
|
||||
Target: ðpb.Checkpoint{Epoch: 10},
|
||||
Source: ðpb.Checkpoint{Epoch: expectedEpoch},
|
||||
Crosslink: ðpb.Crosslink{DataRoot: params.BeaconConfig().ZeroHash[:]},
|
||||
},
|
||||
AggregationBits: aggBits,
|
||||
CustodyBits: custodyBits,
|
||||
},
|
||||
{
|
||||
Data: ðpb.AttestationData{
|
||||
Target: ðpb.Checkpoint{Epoch: 10},
|
||||
Source: ðpb.Checkpoint{Epoch: expectedEpoch},
|
||||
Crosslink: ðpb.Crosslink{DataRoot: params.BeaconConfig().ZeroHash[:]},
|
||||
},
|
||||
AggregationBits: aggBits,
|
||||
CustodyBits: custodyBits,
|
||||
},
|
||||
{
|
||||
Data: ðpb.AttestationData{
|
||||
Target: ðpb.Checkpoint{Epoch: 10},
|
||||
Source: ðpb.Checkpoint{Epoch: expectedEpoch},
|
||||
Crosslink: ðpb.Crosslink{DataRoot: params.BeaconConfig().ZeroHash[:]},
|
||||
},
|
||||
AggregationBits: aggBits,
|
||||
CustodyBits: custodyBits},
|
||||
// Non-expired attestation with incorrect justified epoch
|
||||
{
|
||||
Data: ðpb.AttestationData{
|
||||
Target: ðpb.Checkpoint{Epoch: 10},
|
||||
Source: ðpb.Checkpoint{Epoch: expectedEpoch - 1},
|
||||
Crosslink: ðpb.Crosslink{DataRoot: params.BeaconConfig().ZeroHash[:]},
|
||||
},
|
||||
AggregationBits: aggBits,
|
||||
CustodyBits: custodyBits,
|
||||
},
|
||||
// Non-expired attestations with correct justified epoch
|
||||
att,
|
||||
att2,
|
||||
att3,
|
||||
},
|
||||
}
|
||||
blk := ðpb.BeaconBlock{
|
||||
Slot: beaconState.Slot,
|
||||
}
|
||||
blkRoot, err := ssz.SigningRoot(blk)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
expectedNumberOfAttestations := 3
|
||||
proposerServer := &ProposerServer{
|
||||
beaconDB: db,
|
||||
pool: opService,
|
||||
blockReceiver: &mock.ChainService{State: beaconState, Root: blkRoot[:]},
|
||||
headFetcher: &mock.ChainService{State: beaconState, Root: blkRoot[:]},
|
||||
}
|
||||
|
||||
atts, err := proposerServer.attestations(context.Background(), currentSlot+params.BeaconConfig().MinAttestationInclusionDelay+1)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error fetching pending attestations: %v", err)
|
||||
}
|
||||
if len(atts) != expectedNumberOfAttestations {
|
||||
t.Errorf(
|
||||
"Expected pending attestations list length %d, but was %d",
|
||||
expectedNumberOfAttestations,
|
||||
len(atts),
|
||||
)
|
||||
}
|
||||
|
||||
expectedAtts := []*ethpb.Attestation{
|
||||
{
|
||||
Data: ðpb.AttestationData{
|
||||
Target: ðpb.Checkpoint{Epoch: 10},
|
||||
Source: ðpb.Checkpoint{Epoch: expectedEpoch},
|
||||
Crosslink: ðpb.Crosslink{EndEpoch: 10, DataRoot: params.BeaconConfig().ZeroHash[:], ParentRoot: encoded[:]},
|
||||
},
|
||||
AggregationBits: aggBits,
|
||||
CustodyBits: custodyBits,
|
||||
Signature: aggregateSig,
|
||||
},
|
||||
{
|
||||
Data: ðpb.AttestationData{
|
||||
Target: ðpb.Checkpoint{Epoch: 10},
|
||||
Source: ðpb.Checkpoint{Epoch: expectedEpoch},
|
||||
Crosslink: ðpb.Crosslink{EndEpoch: 10, DataRoot: params.BeaconConfig().ZeroHash[:], ParentRoot: encoded[:]},
|
||||
},
|
||||
AggregationBits: aggBits,
|
||||
CustodyBits: custodyBits,
|
||||
Signature: aggregateSig,
|
||||
},
|
||||
{
|
||||
Data: ðpb.AttestationData{
|
||||
Target: ðpb.Checkpoint{Epoch: 10},
|
||||
Source: ðpb.Checkpoint{Epoch: expectedEpoch},
|
||||
Crosslink: ðpb.Crosslink{EndEpoch: 10, DataRoot: params.BeaconConfig().ZeroHash[:], ParentRoot: encoded[:]},
|
||||
},
|
||||
AggregationBits: aggBits,
|
||||
CustodyBits: custodyBits,
|
||||
Signature: aggregateSig,
|
||||
},
|
||||
}
|
||||
if !reflect.DeepEqual(atts, expectedAtts) {
|
||||
t.Error("Did not receive expected attestations")
|
||||
}
|
||||
}
|
||||
|
||||
func TestPendingDeposits_Eth1DataVoteOK(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
|
||||
@@ -26,6 +26,10 @@ func (r *RegularSync) validateBeaconBlockPubSub(ctx context.Context, msg proto.M
|
||||
log.WithField("validate", "beacon block").WithError(err).Error("Failed to get signing root of block")
|
||||
return false
|
||||
}
|
||||
|
||||
// TODO(1332): Add blocks.VerifyAttestation before processing further.
|
||||
// Discussion: https://github.com/ethereum/eth2.0-specs/issues/1332
|
||||
|
||||
if recentlySeenRoots.Get(string(blockRoot[:])) != nil || r.db.HasBlock(ctx, blockRoot) {
|
||||
return false
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user