mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 15:37:56 -05:00
Handle attestations with missing block (#4705)
* Fmt * Starting * Cont * Store aggregate attestation is better * Conflict * Done * Merge branch 'master' of git+ssh://github.com/prysmaticlabs/prysm into process-pending-atts * Comment * Better logs * Better logs * Fix existing tests * Update metric names * Preston's feedback * Broadcast atts once it's valid * Gazelle * Test for validating atts and pruning * Tests * Removed debug log * Conflict * Feedback * Merge refs/heads/master into process-pending-atts * Merge refs/heads/master into process-pending-atts
This commit is contained in:
@@ -22,9 +22,10 @@ type ChainInfoFetcher interface {
|
||||
FinalizationFetcher
|
||||
}
|
||||
|
||||
// GenesisTimeFetcher retrieves the Eth2 genesis timestamp.
|
||||
type GenesisTimeFetcher interface {
|
||||
// TimeFetcher retrieves the Eth2 data that's related to time.
|
||||
type TimeFetcher interface {
|
||||
GenesisTime() time.Time
|
||||
CurrentSlot() uint64
|
||||
}
|
||||
|
||||
// HeadFetcher defines a common interface for methods in blockchain service which
|
||||
|
||||
@@ -18,7 +18,7 @@ import (
|
||||
|
||||
// Ensure Service implements chain info interface.
|
||||
var _ = ChainInfoFetcher(&Service{})
|
||||
var _ = GenesisTimeFetcher(&Service{})
|
||||
var _ = TimeFetcher(&Service{})
|
||||
var _ = ForkFetcher(&Service{})
|
||||
|
||||
func TestFinalizedCheckpt_Nil(t *testing.T) {
|
||||
|
||||
@@ -20,6 +20,11 @@ import (
|
||||
"go.opencensus.io/trace"
|
||||
)
|
||||
|
||||
// CurrentSlot returns the current slot based on time.
|
||||
func (s *Service) CurrentSlot() uint64 {
|
||||
return uint64(time.Now().Unix()-s.genesisTime.Unix()) / params.BeaconConfig().SecondsPerSlot
|
||||
}
|
||||
|
||||
// getBlockPreState returns the pre state of an incoming block. It uses the parent root of the block
|
||||
// to retrieve the state in DB. It verifies the pre state's validity and the incoming block
|
||||
// is in the correct time window.
|
||||
@@ -199,7 +204,7 @@ func (s *Service) rmStatesOlderThanLastFinalized(ctx context.Context, startSlot
|
||||
// Otherwise, delay incorporation of new justified checkpoint until next epoch boundary.
|
||||
// See https://ethresear.ch/t/prevention-of-bouncing-attack-on-ffg/6114 for more detailed analysis and discussion.
|
||||
func (s *Service) shouldUpdateCurrentJustified(ctx context.Context, newJustifiedCheckpt *ethpb.Checkpoint) (bool, error) {
|
||||
if helpers.SlotsSinceEpochStarts(s.currentSlot()) < params.BeaconConfig().SafeSlotsToUpdateJustified {
|
||||
if helpers.SlotsSinceEpochStarts(s.CurrentSlot()) < params.BeaconConfig().SafeSlotsToUpdateJustified {
|
||||
return true, nil
|
||||
}
|
||||
newJustifiedBlockSigned, err := s.beaconDB.Block(ctx, bytesutil.ToBytes32(newJustifiedCheckpt.Root))
|
||||
@@ -261,11 +266,6 @@ func (s *Service) updateJustified(ctx context.Context, state *stateTrie.BeaconSt
|
||||
return s.beaconDB.SaveJustifiedCheckpoint(ctx, cpt)
|
||||
}
|
||||
|
||||
// currentSlot returns the current slot based on time.
|
||||
func (s *Service) currentSlot() uint64 {
|
||||
return uint64(time.Now().Unix()-s.genesisTime.Unix()) / params.BeaconConfig().SecondsPerSlot
|
||||
}
|
||||
|
||||
// This saves every finalized state in DB during initial sync, needed as part of optimization to
|
||||
// use cache state during initial sync in case of restart.
|
||||
func (s *Service) saveInitState(ctx context.Context, state *stateTrie.BeaconState) error {
|
||||
|
||||
@@ -193,6 +193,11 @@ func (ms *ChainService) GenesisTime() time.Time {
|
||||
return ms.Genesis
|
||||
}
|
||||
|
||||
// CurrentSlot mocks the same method in the chain service.
|
||||
func (ms *ChainService) CurrentSlot() uint64 {
|
||||
return 0
|
||||
}
|
||||
|
||||
// Participation mocks the same method in the chain service.
|
||||
func (ms *ChainService) Participation(epoch uint64) *precompute.Balance {
|
||||
return ms.Balance
|
||||
|
||||
@@ -26,7 +26,7 @@ type Server struct {
|
||||
Server *grpc.Server
|
||||
BeaconDB db.ReadOnlyDatabase
|
||||
PeersFetcher p2p.PeersProvider
|
||||
GenesisTimeFetcher blockchain.GenesisTimeFetcher
|
||||
GenesisTimeFetcher blockchain.TimeFetcher
|
||||
}
|
||||
|
||||
// GetSyncStatus checks the current network sync status of the node.
|
||||
|
||||
@@ -58,7 +58,7 @@ type Service struct {
|
||||
forkFetcher blockchain.ForkFetcher
|
||||
finalizationFetcher blockchain.FinalizationFetcher
|
||||
participationFetcher blockchain.ParticipationFetcher
|
||||
genesisTimeFetcher blockchain.GenesisTimeFetcher
|
||||
genesisTimeFetcher blockchain.TimeFetcher
|
||||
attestationReceiver blockchain.AttestationReceiver
|
||||
blockReceiver blockchain.BlockReceiver
|
||||
powChainService powchain.Chain
|
||||
@@ -104,7 +104,7 @@ type Config struct {
|
||||
BlockReceiver blockchain.BlockReceiver
|
||||
POWChainService powchain.Chain
|
||||
ChainStartFetcher powchain.ChainStartFetcher
|
||||
GenesisTimeFetcher blockchain.GenesisTimeFetcher
|
||||
GenesisTimeFetcher blockchain.TimeFetcher
|
||||
MockEth1Votes bool
|
||||
AttestationsPool attestations.Pool
|
||||
ExitPool *voluntaryexits.Pool
|
||||
|
||||
@@ -9,6 +9,7 @@ go_library(
|
||||
"error.go",
|
||||
"log.go",
|
||||
"metrics.go",
|
||||
"pending_attestations_queue.go",
|
||||
"pending_blocks_queue.go",
|
||||
"rpc.go",
|
||||
"rpc_beacon_blocks_by_range.go",
|
||||
@@ -79,6 +80,7 @@ go_test(
|
||||
size = "small",
|
||||
srcs = [
|
||||
"error_test.go",
|
||||
"pending_attestations_queue_test.go",
|
||||
"pending_blocks_queue_test.go",
|
||||
"rpc_beacon_blocks_by_range_test.go",
|
||||
"rpc_beacon_blocks_by_root_test.go",
|
||||
|
||||
@@ -33,4 +33,28 @@ var (
|
||||
Help: "Count the number of times a node resyncs.",
|
||||
},
|
||||
)
|
||||
numberOfBlocksRecoveredFromAtt = promauto.NewCounter(
|
||||
prometheus.CounterOpts{
|
||||
Name: "beacon_blocks_recovered_from_attestation_total",
|
||||
Help: "Count the number of times a missing block recovered from attestation vote.",
|
||||
},
|
||||
)
|
||||
numberOfBlocksNotRecoveredFromAtt = promauto.NewCounter(
|
||||
prometheus.CounterOpts{
|
||||
Name: "beacon_blocks_not_recovered_from_attestation_total",
|
||||
Help: "Count the number of times a missing block not recovered and pruned from attestation vote.",
|
||||
},
|
||||
)
|
||||
numberOfAttsRecovered = promauto.NewCounter(
|
||||
prometheus.CounterOpts{
|
||||
Name: "beacon_attestations_recovered_total",
|
||||
Help: "Count the number of times attestation recovered because of missing block",
|
||||
},
|
||||
)
|
||||
numberOfAttsNotRecovered = promauto.NewCounter(
|
||||
prometheus.CounterOpts{
|
||||
Name: "beacon_attestations_not_recovered_total",
|
||||
Help: "Count the number of times attestation not recovered and pruned because of missing block",
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
162
beacon-chain/sync/pending_attestations_queue.go
Normal file
162
beacon-chain/sync/pending_attestations_queue.go
Normal file
@@ -0,0 +1,162 @@
|
||||
package sync
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"time"
|
||||
|
||||
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
|
||||
"github.com/prysmaticlabs/prysm/shared/bls"
|
||||
"github.com/prysmaticlabs/prysm/shared/bytesutil"
|
||||
"github.com/prysmaticlabs/prysm/shared/params"
|
||||
"github.com/prysmaticlabs/prysm/shared/runutil"
|
||||
"github.com/prysmaticlabs/prysm/shared/traceutil"
|
||||
"github.com/sirupsen/logrus"
|
||||
"go.opencensus.io/trace"
|
||||
"golang.org/x/exp/rand"
|
||||
)
|
||||
|
||||
// This defines how often a node cleans up and processes pending attestations in the queue.
|
||||
var processPendingAttsPeriod = time.Duration(params.BeaconConfig().SecondsPerSlot/2) * time.Second
|
||||
|
||||
// This processes pending attestation queues on every `processPendingAttsPeriod`.
|
||||
func (s *Service) processPendingAttsQueue() {
|
||||
ctx := context.Background()
|
||||
runutil.RunEvery(s.ctx, processPendingAttsPeriod, func() {
|
||||
s.processPendingAtts(ctx)
|
||||
})
|
||||
}
|
||||
|
||||
// This defines how pending attestations are processed. It contains features:
|
||||
// 1. Clean up invalid pending attestations from the queue.
|
||||
// 2. Check if pending attestations can be processed when the block has arrived.
|
||||
// 3. Request block from a random peer if unable to proceed step 2.
|
||||
func (s *Service) processPendingAtts(ctx context.Context) error {
|
||||
ctx, span := trace.StartSpan(ctx, "processPendingAtts")
|
||||
defer span.End()
|
||||
|
||||
pids := s.p2p.Peers().Connected()
|
||||
|
||||
// Before a node processes pending attestations queue, it verifies
|
||||
// the attestations in the queue are still valid. Attestations will
|
||||
// be deleted from the queue if invalid (ie. getting staled from falling too many slots behind).
|
||||
s.validatePendingAtts(ctx, s.chain.CurrentSlot())
|
||||
|
||||
for bRoot, attestations := range s.blkRootToPendingAtts {
|
||||
// Has the pending attestation's missing block arrived yet?
|
||||
if s.db.HasBlock(ctx, bRoot) {
|
||||
numberOfBlocksRecoveredFromAtt.Inc()
|
||||
for _, att := range attestations {
|
||||
// The pending attestations can arrive in both aggregated and unaggregated forms,
|
||||
// each from has distinct validation steps.
|
||||
if helpers.IsAggregated(att.Aggregate) {
|
||||
// Save the pending aggregated attestation to the pool if it passes the aggregated
|
||||
// validation steps.
|
||||
if s.validateAggregatedAtt(ctx, att) {
|
||||
if err := s.attPool.SaveAggregatedAttestation(att.Aggregate); err != nil {
|
||||
return err
|
||||
}
|
||||
numberOfAttsRecovered.Inc()
|
||||
|
||||
// Broadcasting the attestation again once a node is able to process it.
|
||||
if err := s.p2p.Broadcast(ctx, att); err != nil {
|
||||
log.WithError(err).Error("Failed to broadcast")
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Save the pending unaggregated attestation to the pool if the BLS signature is
|
||||
// valid.
|
||||
if _, err := bls.SignatureFromBytes(att.Aggregate.Signature); err != nil {
|
||||
continue
|
||||
}
|
||||
if err := s.attPool.SaveUnaggregatedAttestation(att.Aggregate); err != nil {
|
||||
return err
|
||||
}
|
||||
numberOfAttsRecovered.Inc()
|
||||
|
||||
// Broadcasting the attestation again once a node is able to process it.
|
||||
if err := s.p2p.Broadcast(ctx, att); err != nil {
|
||||
log.WithError(err).Error("Failed to broadcast")
|
||||
}
|
||||
}
|
||||
}
|
||||
log.WithFields(logrus.Fields{
|
||||
"blockRoot": hex.EncodeToString(bytesutil.Trunc(bRoot[:])),
|
||||
"pendingAttsCount": len(attestations),
|
||||
}).Info("Verified and saved pending attestations to pool")
|
||||
|
||||
// Delete the missing block root key from pending attestation queue so a node will not request for the block again.
|
||||
delete(s.blkRootToPendingAtts, bRoot)
|
||||
} else {
|
||||
// Pending attestation's missing block has not arrived yet.
|
||||
log.WithField("blockRoot", hex.EncodeToString(bytesutil.Trunc(bRoot[:]))).Info("Requesting block for pending attestation")
|
||||
|
||||
// Start with a random peer to query, but choose the first peer in our unsorted list that claims to
|
||||
// have a head slot newer or equal to the pending attestation's target boundary slot.
|
||||
pid := pids[rand.Int()%len(pids)]
|
||||
targetSlot := helpers.SlotToEpoch(attestations[0].Aggregate.Data.Target.Epoch)
|
||||
for _, p := range pids {
|
||||
if cs, _ := s.p2p.Peers().ChainState(p); cs != nil && cs.HeadSlot >= targetSlot {
|
||||
pid = p
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
req := [][32]byte{bRoot}
|
||||
if err := s.sendRecentBeaconBlocksRequest(ctx, req, pid); err != nil {
|
||||
traceutil.AnnotateError(span, err)
|
||||
log.Errorf("Could not send recent block request: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// This defines how pending attestations is saved in the map. The key is the
|
||||
// root of the missing block. The value is the list of pending attestations
|
||||
// that voted for that block root.
|
||||
func (s *Service) savePendingAtt(att *ethpb.AggregateAttestationAndProof) {
|
||||
s.pendingAttsLock.Lock()
|
||||
defer s.pendingAttsLock.Unlock()
|
||||
|
||||
root := bytesutil.ToBytes32(att.Aggregate.Data.BeaconBlockRoot)
|
||||
|
||||
_, ok := s.blkRootToPendingAtts[root]
|
||||
if !ok {
|
||||
s.blkRootToPendingAtts[root] = []*ethpb.AggregateAttestationAndProof{att}
|
||||
return
|
||||
}
|
||||
|
||||
s.blkRootToPendingAtts[root] = append(s.blkRootToPendingAtts[root], att)
|
||||
}
|
||||
|
||||
// This validates the pending attestations in the queue are still valid.
|
||||
// If not valid, a node will remove it in the queue in place. The validity
|
||||
// check specifies the pending attestation could not fall one epoch behind
|
||||
// of the current slot.
|
||||
func (s *Service) validatePendingAtts(ctx context.Context, slot uint64) {
|
||||
s.pendingAttsLock.Lock()
|
||||
defer s.pendingAttsLock.Unlock()
|
||||
|
||||
ctx, span := trace.StartSpan(ctx, "validatePendingAtts")
|
||||
defer span.End()
|
||||
|
||||
for bRoot, atts := range s.blkRootToPendingAtts {
|
||||
for i := len(atts) - 1; i >= 0; i-- {
|
||||
if slot >= atts[i].Aggregate.Data.Slot+params.BeaconConfig().SlotsPerEpoch {
|
||||
// Remove the pending attestation from the list in place.
|
||||
atts = append(atts[:i], atts[i+1:]...)
|
||||
numberOfAttsNotRecovered.Inc()
|
||||
}
|
||||
}
|
||||
s.blkRootToPendingAtts[bRoot] = atts
|
||||
|
||||
// If the pending attestations list of a given block root is empty,
|
||||
// a node will remove the key from the map to avoid dangling keys.
|
||||
if len(s.blkRootToPendingAtts[bRoot]) == 0 {
|
||||
delete(s.blkRootToPendingAtts, bRoot)
|
||||
numberOfBlocksNotRecoveredFromAtt.Inc()
|
||||
}
|
||||
}
|
||||
}
|
||||
257
beacon-chain/sync/pending_attestations_queue_test.go
Normal file
257
beacon-chain/sync/pending_attestations_queue_test.go
Normal file
@@ -0,0 +1,257 @@
|
||||
package sync
|
||||
|
||||
import (
|
||||
"context"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p-core/network"
|
||||
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
|
||||
"github.com/prysmaticlabs/go-bitfield"
|
||||
"github.com/prysmaticlabs/go-ssz"
|
||||
mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
|
||||
dbtest "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/operations/attestations"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers"
|
||||
p2ptest "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
|
||||
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
|
||||
"github.com/prysmaticlabs/prysm/shared/attestationutil"
|
||||
"github.com/prysmaticlabs/prysm/shared/bls"
|
||||
"github.com/prysmaticlabs/prysm/shared/params"
|
||||
"github.com/prysmaticlabs/prysm/shared/testutil"
|
||||
logTest "github.com/sirupsen/logrus/hooks/test"
|
||||
)
|
||||
|
||||
func TestProcessPendingAtts_NoBlockRequestBlock(t *testing.T) {
|
||||
hook := logTest.NewGlobal()
|
||||
db := dbtest.SetupDB(t)
|
||||
defer dbtest.TeardownDB(t, db)
|
||||
p1 := p2ptest.NewTestP2P(t)
|
||||
p2 := p2ptest.NewTestP2P(t)
|
||||
p1.Connect(p2)
|
||||
if len(p1.Host.Network().Peers()) != 1 {
|
||||
t.Error("Expected peers to be connected")
|
||||
}
|
||||
p1.Peers().Add(p2.PeerID(), nil, network.DirOutbound)
|
||||
p1.Peers().SetConnectionState(p2.PeerID(), peers.PeerConnected)
|
||||
p1.Peers().SetChainState(p2.PeerID(), &pb.Status{})
|
||||
|
||||
r := &Service{
|
||||
p2p: p1,
|
||||
db: db,
|
||||
chain: &mock.ChainService{},
|
||||
blkRootToPendingAtts: make(map[[32]byte][]*ethpb.AggregateAttestationAndProof),
|
||||
}
|
||||
|
||||
a := ðpb.AggregateAttestationAndProof{Aggregate: ðpb.Attestation{Data: ðpb.AttestationData{Target: ðpb.Checkpoint{}}}}
|
||||
r.blkRootToPendingAtts[[32]byte{'A'}] = []*ethpb.AggregateAttestationAndProof{a}
|
||||
if err := r.processPendingAtts(context.Background()); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
testutil.AssertLogsContain(t, hook, "Requesting block for pending attestation")
|
||||
}
|
||||
|
||||
func TestProcessPendingAtts_HasBlockSaveUnAggregatedAtt(t *testing.T) {
|
||||
hook := logTest.NewGlobal()
|
||||
db := dbtest.SetupDB(t)
|
||||
defer dbtest.TeardownDB(t, db)
|
||||
p1 := p2ptest.NewTestP2P(t)
|
||||
|
||||
r := &Service{
|
||||
p2p: p1,
|
||||
db: db,
|
||||
chain: &mock.ChainService{},
|
||||
blkRootToPendingAtts: make(map[[32]byte][]*ethpb.AggregateAttestationAndProof),
|
||||
attPool: attestations.NewPool(),
|
||||
}
|
||||
|
||||
a := ðpb.AggregateAttestationAndProof{
|
||||
Aggregate: ðpb.Attestation{
|
||||
Signature: bls.RandKey().Sign([]byte("foo"), 0).Marshal(),
|
||||
AggregationBits: bitfield.Bitlist{0x02},
|
||||
Data: ðpb.AttestationData{
|
||||
Target: ðpb.Checkpoint{}}}}
|
||||
|
||||
b := ðpb.SignedBeaconBlock{Block: ðpb.BeaconBlock{}}
|
||||
r32, _ := ssz.HashTreeRoot(b.Block)
|
||||
r.db.SaveBlock(context.Background(), b)
|
||||
|
||||
r.blkRootToPendingAtts[r32] = []*ethpb.AggregateAttestationAndProof{a}
|
||||
if err := r.processPendingAtts(context.Background()); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if len(r.attPool.UnaggregatedAttestations()) != 1 {
|
||||
t.Error("Did not save unaggregated att")
|
||||
}
|
||||
if !reflect.DeepEqual(r.attPool.UnaggregatedAttestations()[0], a.Aggregate) {
|
||||
t.Error("Incorrect saved att")
|
||||
}
|
||||
if len(r.attPool.AggregatedAttestations()) != 0 {
|
||||
t.Error("Did save aggregated att")
|
||||
}
|
||||
|
||||
testutil.AssertLogsContain(t, hook, "Verified and saved pending attestations to pool")
|
||||
}
|
||||
|
||||
func TestProcessPendingAtts_HasBlockSaveAggregatedAtt(t *testing.T) {
|
||||
hook := logTest.NewGlobal()
|
||||
db := dbtest.SetupDB(t)
|
||||
defer dbtest.TeardownDB(t, db)
|
||||
p1 := p2ptest.NewTestP2P(t)
|
||||
validators := uint64(256)
|
||||
beaconState, privKeys := testutil.DeterministicGenesisState(t, validators)
|
||||
|
||||
sb := ðpb.SignedBeaconBlock{Block: ðpb.BeaconBlock{}}
|
||||
db.SaveBlock(context.Background(), sb)
|
||||
root, _ := ssz.HashTreeRoot(sb.Block)
|
||||
|
||||
aggBits := bitfield.NewBitlist(3)
|
||||
aggBits.SetBitAt(0, true)
|
||||
aggBits.SetBitAt(1, true)
|
||||
att := ðpb.Attestation{
|
||||
Data: ðpb.AttestationData{
|
||||
BeaconBlockRoot: root[:],
|
||||
Source: ðpb.Checkpoint{Epoch: 0, Root: []byte("hello-world")},
|
||||
Target: ðpb.Checkpoint{Epoch: 0, Root: []byte("hello-world")},
|
||||
},
|
||||
AggregationBits: aggBits,
|
||||
}
|
||||
|
||||
committee, err := helpers.BeaconCommitteeFromState(beaconState, att.Data.Slot, att.Data.CommitteeIndex)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
attestingIndices, err := attestationutil.AttestingIndices(att.AggregationBits, committee)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
hashTreeRoot, err := ssz.HashTreeRoot(att.Data)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
domain := helpers.Domain(beaconState.Fork(), 0, params.BeaconConfig().DomainBeaconAttester)
|
||||
sigs := make([]*bls.Signature, len(attestingIndices))
|
||||
for i, indice := range attestingIndices {
|
||||
sig := privKeys[indice].Sign(hashTreeRoot[:], domain)
|
||||
sigs[i] = sig
|
||||
}
|
||||
att.Signature = bls.AggregateSignatures(sigs).Marshal()[:]
|
||||
|
||||
slotRoot, err := ssz.HashTreeRoot(att.Data.Slot)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
sig := privKeys[154].Sign(slotRoot[:], domain)
|
||||
aggregateAndProof := ðpb.AggregateAttestationAndProof{
|
||||
SelectionProof: sig.Marshal(),
|
||||
Aggregate: att,
|
||||
AggregatorIndex: 154,
|
||||
}
|
||||
|
||||
if err := beaconState.SetGenesisTime(uint64(time.Now().Unix())); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
r := &Service{
|
||||
p2p: p1,
|
||||
db: db,
|
||||
chain: &mock.ChainService{Genesis: time.Now(),
|
||||
State: beaconState,
|
||||
FinalizedCheckPoint: ðpb.Checkpoint{
|
||||
Epoch: 0,
|
||||
}},
|
||||
blkRootToPendingAtts: make(map[[32]byte][]*ethpb.AggregateAttestationAndProof),
|
||||
attPool: attestations.NewPool(),
|
||||
}
|
||||
|
||||
sb = ðpb.SignedBeaconBlock{Block: ðpb.BeaconBlock{}}
|
||||
r32, _ := ssz.HashTreeRoot(sb.Block)
|
||||
r.db.SaveBlock(context.Background(), sb)
|
||||
|
||||
r.blkRootToPendingAtts[r32] = []*ethpb.AggregateAttestationAndProof{aggregateAndProof}
|
||||
if err := r.processPendingAtts(context.Background()); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if len(r.attPool.AggregatedAttestations()) != 1 {
|
||||
t.Error("Did not save aggregated att")
|
||||
}
|
||||
if !reflect.DeepEqual(r.attPool.AggregatedAttestations()[0], att) {
|
||||
t.Error("Incorrect saved att")
|
||||
}
|
||||
if len(r.attPool.UnaggregatedAttestations()) != 0 {
|
||||
t.Error("Did save unaggregated att")
|
||||
}
|
||||
|
||||
testutil.AssertLogsContain(t, hook, "Verified and saved pending attestations to pool")
|
||||
}
|
||||
|
||||
func TestValidatePendingAtts_CanPruneOldAtts(t *testing.T) {
|
||||
db := dbtest.SetupDB(t)
|
||||
defer dbtest.TeardownDB(t, db)
|
||||
|
||||
s := &Service{
|
||||
blkRootToPendingAtts: make(map[[32]byte][]*ethpb.AggregateAttestationAndProof),
|
||||
}
|
||||
|
||||
// 100 Attestations per block root.
|
||||
r1 := [32]byte{'A'}
|
||||
r2 := [32]byte{'B'}
|
||||
r3 := [32]byte{'C'}
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
s.savePendingAtt(ðpb.AggregateAttestationAndProof{
|
||||
Aggregate: ðpb.Attestation{
|
||||
Data: ðpb.AttestationData{Slot: uint64(i), BeaconBlockRoot: r1[:]}}})
|
||||
s.savePendingAtt(ðpb.AggregateAttestationAndProof{
|
||||
Aggregate: ðpb.Attestation{
|
||||
Data: ðpb.AttestationData{Slot: uint64(i), BeaconBlockRoot: r2[:]}}})
|
||||
s.savePendingAtt(ðpb.AggregateAttestationAndProof{
|
||||
Aggregate: ðpb.Attestation{
|
||||
Data: ðpb.AttestationData{Slot: uint64(i), BeaconBlockRoot: r3[:]}}})
|
||||
}
|
||||
|
||||
if len(s.blkRootToPendingAtts[r1]) != 100 {
|
||||
t.Error("Did not save pending atts")
|
||||
}
|
||||
if len(s.blkRootToPendingAtts[r2]) != 100 {
|
||||
t.Error("Did not save pending atts")
|
||||
}
|
||||
if len(s.blkRootToPendingAtts[r3]) != 100 {
|
||||
t.Error("Did not save pending atts")
|
||||
}
|
||||
|
||||
// Set current slot to 50, it should prune 19 attestations. (50 - 31)
|
||||
s.validatePendingAtts(context.Background(), 50)
|
||||
if len(s.blkRootToPendingAtts[r1]) != 81 {
|
||||
t.Error("Did not delete pending atts")
|
||||
}
|
||||
if len(s.blkRootToPendingAtts[r2]) != 81 {
|
||||
t.Error("Did not delete pending atts")
|
||||
}
|
||||
if len(s.blkRootToPendingAtts[r3]) != 81 {
|
||||
t.Error("Did not delete pending atts")
|
||||
}
|
||||
|
||||
// Set current slot to 100 + slot_duration, it should prune all the attestations.
|
||||
s.validatePendingAtts(context.Background(), 100+params.BeaconConfig().SlotsPerEpoch)
|
||||
if len(s.blkRootToPendingAtts[r1]) != 0 {
|
||||
t.Error("Did not delete pending atts")
|
||||
}
|
||||
if len(s.blkRootToPendingAtts[r2]) != 0 {
|
||||
t.Error("Did not delete pending atts")
|
||||
}
|
||||
if len(s.blkRootToPendingAtts[r3]) != 0 {
|
||||
t.Error("Did not delete pending atts")
|
||||
}
|
||||
|
||||
// Verify the keys are deleted.
|
||||
if len(s.blkRootToPendingAtts) != 0 {
|
||||
t.Error("Did not delete block keys")
|
||||
}
|
||||
}
|
||||
@@ -68,7 +68,7 @@ func (r *Service) processPendingBlocks(ctx context.Context) error {
|
||||
if !inPendingQueue && !inDB && hasPeer {
|
||||
log.WithFields(logrus.Fields{
|
||||
"currentSlot": b.Block.Slot,
|
||||
"parentRoot": hex.EncodeToString(b.Block.ParentRoot),
|
||||
"parentRoot": hex.EncodeToString(bytesutil.Trunc(b.Block.ParentRoot)),
|
||||
}).Info("Requesting parent block")
|
||||
req := [][32]byte{bytesutil.ToBytes32(b.Block.ParentRoot)}
|
||||
|
||||
@@ -112,10 +112,14 @@ func (r *Service) processPendingBlocks(ctx context.Context) error {
|
||||
delete(r.seenPendingBlocks, blkRoot)
|
||||
r.pendingQueueLock.Unlock()
|
||||
|
||||
log.Infof("Processed ancestor block with slot %d and cleared pending block cache", s)
|
||||
log.WithFields(logrus.Fields{
|
||||
"slot": s,
|
||||
"blockRoot": hex.EncodeToString(bytesutil.Trunc(blkRoot[:])),
|
||||
}).Info("Processed pending block and cleared it in cache")
|
||||
|
||||
span.End()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -39,25 +39,26 @@ type blockchainService interface {
|
||||
blockchain.FinalizationFetcher
|
||||
blockchain.ForkFetcher
|
||||
blockchain.AttestationReceiver
|
||||
blockchain.GenesisTimeFetcher
|
||||
blockchain.TimeFetcher
|
||||
}
|
||||
|
||||
// NewRegularSync service.
|
||||
func NewRegularSync(cfg *Config) *Service {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
r := &Service{
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
db: cfg.DB,
|
||||
p2p: cfg.P2P,
|
||||
attPool: cfg.AttPool,
|
||||
exitPool: cfg.ExitPool,
|
||||
chain: cfg.Chain,
|
||||
initialSync: cfg.InitialSync,
|
||||
slotToPendingBlocks: make(map[uint64]*ethpb.SignedBeaconBlock),
|
||||
seenPendingBlocks: make(map[[32]byte]bool),
|
||||
stateNotifier: cfg.StateNotifier,
|
||||
blocksRateLimiter: leakybucket.NewCollector(allowedBlocksPerSecond, allowedBlocksBurst, false /* deleteEmptyBuckets */),
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
db: cfg.DB,
|
||||
p2p: cfg.P2P,
|
||||
attPool: cfg.AttPool,
|
||||
exitPool: cfg.ExitPool,
|
||||
chain: cfg.Chain,
|
||||
initialSync: cfg.InitialSync,
|
||||
slotToPendingBlocks: make(map[uint64]*ethpb.SignedBeaconBlock),
|
||||
seenPendingBlocks: make(map[[32]byte]bool),
|
||||
blkRootToPendingAtts: make(map[[32]byte][]*ethpb.AggregateAttestationAndProof),
|
||||
stateNotifier: cfg.StateNotifier,
|
||||
blocksRateLimiter: leakybucket.NewCollector(allowedBlocksPerSecond, allowedBlocksBurst, false /* deleteEmptyBuckets */),
|
||||
}
|
||||
|
||||
r.registerRPCHandlers()
|
||||
@@ -69,21 +70,23 @@ func NewRegularSync(cfg *Config) *Service {
|
||||
// Service is responsible for handling all run time p2p related operations as the
|
||||
// main entry point for network messages.
|
||||
type Service struct {
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
p2p p2p.P2P
|
||||
db db.NoHeadAccessDatabase
|
||||
attPool attestations.Pool
|
||||
exitPool *voluntaryexits.Pool
|
||||
chain blockchainService
|
||||
slotToPendingBlocks map[uint64]*ethpb.SignedBeaconBlock
|
||||
seenPendingBlocks map[[32]byte]bool
|
||||
pendingQueueLock sync.RWMutex
|
||||
chainStarted bool
|
||||
initialSync Checker
|
||||
validateBlockLock sync.RWMutex
|
||||
stateNotifier statefeed.Notifier
|
||||
blocksRateLimiter *leakybucket.Collector
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
p2p p2p.P2P
|
||||
db db.NoHeadAccessDatabase
|
||||
attPool attestations.Pool
|
||||
exitPool *voluntaryexits.Pool
|
||||
chain blockchainService
|
||||
slotToPendingBlocks map[uint64]*ethpb.SignedBeaconBlock
|
||||
seenPendingBlocks map[[32]byte]bool
|
||||
blkRootToPendingAtts map[[32]byte][]*ethpb.AggregateAttestationAndProof
|
||||
pendingAttsLock sync.RWMutex
|
||||
pendingQueueLock sync.RWMutex
|
||||
chainStarted bool
|
||||
initialSync Checker
|
||||
validateBlockLock sync.RWMutex
|
||||
stateNotifier statefeed.Notifier
|
||||
blocksRateLimiter *leakybucket.Collector
|
||||
}
|
||||
|
||||
// Start the regular sync service.
|
||||
@@ -91,6 +94,7 @@ func (r *Service) Start() {
|
||||
r.p2p.AddConnectionHandler(r.sendRPCStatusRequest)
|
||||
r.p2p.AddDisconnectionHandler(r.removeDisconnectedPeerStatus)
|
||||
r.processPendingBlocksQueue()
|
||||
r.processPendingAttsQueue()
|
||||
r.maintainPeerStatuses()
|
||||
r.resyncIfBehind()
|
||||
}
|
||||
|
||||
@@ -49,8 +49,6 @@ func (r *Service) validateAggregateAndProof(ctx context.Context, pid peer.ID, ms
|
||||
return false
|
||||
}
|
||||
|
||||
attSlot := m.Aggregate.Data.Slot
|
||||
|
||||
// Verify aggregate attestation has not already been seen via aggregate gossip, within a block, or through the creation locally.
|
||||
seen, err := r.attPool.HasAggregatedAttestation(m.Aggregate)
|
||||
if err != nil {
|
||||
@@ -61,8 +59,25 @@ func (r *Service) validateAggregateAndProof(ctx context.Context, pid peer.ID, ms
|
||||
return false
|
||||
}
|
||||
|
||||
// Verify the block being voted for passes validation. The block should have passed validation if it's in the DB.
|
||||
if !r.db.HasBlock(ctx, bytesutil.ToBytes32(m.Aggregate.Data.BeaconBlockRoot)) {
|
||||
if !r.validateAggregatedAtt(ctx, m) {
|
||||
return false
|
||||
}
|
||||
|
||||
msg.ValidatorData = m
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (r *Service) validateAggregatedAtt(ctx context.Context, a *ethpb.AggregateAttestationAndProof) bool {
|
||||
ctx, span := trace.StartSpan(ctx, "sync.validateAggregatedAtt")
|
||||
defer span.End()
|
||||
|
||||
attSlot := a.Aggregate.Data.Slot
|
||||
|
||||
// Verify the block being voted is in DB. The block should have passed validation if it's in the DB.
|
||||
if !r.db.HasBlock(ctx, bytesutil.ToBytes32(a.Aggregate.Data.BeaconBlockRoot)) {
|
||||
// A node doesn't have the block, it'll request from peer while saving the pending attestation to a queue.
|
||||
r.savePendingAtt(a)
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -90,25 +105,23 @@ func (r *Service) validateAggregateAndProof(ctx context.Context, pid peer.ID, ms
|
||||
}
|
||||
|
||||
// Verify validator index is within the aggregate's committee.
|
||||
if err := validateIndexInCommittee(ctx, s, m.Aggregate, m.AggregatorIndex); err != nil {
|
||||
if err := validateIndexInCommittee(ctx, s, a.Aggregate, a.AggregatorIndex); err != nil {
|
||||
traceutil.AnnotateError(span, errors.Wrapf(err, "Could not validate index in committee"))
|
||||
return false
|
||||
}
|
||||
|
||||
// Verify selection proof reflects to the right validator and signature is valid.
|
||||
if err := validateSelection(ctx, s, m.Aggregate.Data, m.AggregatorIndex, m.SelectionProof); err != nil {
|
||||
traceutil.AnnotateError(span, errors.Wrapf(err, "Could not validate selection for validator %d", m.AggregatorIndex))
|
||||
if err := validateSelection(ctx, s, a.Aggregate.Data, a.AggregatorIndex, a.SelectionProof); err != nil {
|
||||
traceutil.AnnotateError(span, errors.Wrapf(err, "Could not validate selection for validator %d", a.AggregatorIndex))
|
||||
return false
|
||||
}
|
||||
|
||||
// Verify aggregated attestation has a valid signature.
|
||||
if err := blocks.VerifyAttestation(ctx, s, m.Aggregate); err != nil {
|
||||
if err := blocks.VerifyAttestation(ctx, s, a.Aggregate); err != nil {
|
||||
traceutil.AnnotateError(span, err)
|
||||
return false
|
||||
}
|
||||
|
||||
msg.ValidatorData = m
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
|
||||
@@ -128,10 +128,11 @@ func TestValidateAggregateAndProof_NoBlock(t *testing.T) {
|
||||
}
|
||||
|
||||
r := &Service{
|
||||
p2p: p,
|
||||
db: db,
|
||||
initialSync: &mockSync.Sync{IsSyncing: false},
|
||||
attPool: attestations.NewPool(),
|
||||
p2p: p,
|
||||
db: db,
|
||||
initialSync: &mockSync.Sync{IsSyncing: false},
|
||||
attPool: attestations.NewPool(),
|
||||
blkRootToPendingAtts: make(map[[32]byte][]*ethpb.AggregateAttestationAndProof),
|
||||
}
|
||||
|
||||
buf := new(bytes.Buffer)
|
||||
|
||||
@@ -2,7 +2,6 @@ package sync
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
@@ -19,8 +18,6 @@ import (
|
||||
"go.opencensus.io/trace"
|
||||
)
|
||||
|
||||
var errPointsToBlockNotInDatabase = errors.New("attestation points to a block which is not in the database")
|
||||
|
||||
// Validation
|
||||
// - The attestation's committee index (attestation.data.index) is for the correct subnet.
|
||||
// - The attestation is unaggregated -- that is, it has exactly one participating validator (len([bit for bit in attestation.aggregation_bits if bit == 0b1]) == 1).
|
||||
@@ -76,13 +73,10 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p
|
||||
return false
|
||||
}
|
||||
|
||||
// Attestation's block must exist in database (only valid blocks are stored).
|
||||
// Verify the block being voted is in DB. The block should have passed validation if it's in the DB.
|
||||
if !s.db.HasBlock(ctx, bytesutil.ToBytes32(att.Data.BeaconBlockRoot)) {
|
||||
log.WithField(
|
||||
"blockRoot",
|
||||
fmt.Sprintf("%#x", att.Data.BeaconBlockRoot),
|
||||
).WithError(errPointsToBlockNotInDatabase).Debug("Ignored incoming attestation that points to a block which is not in the database")
|
||||
traceutil.AnnotateError(span, errPointsToBlockNotInDatabase)
|
||||
// A node doesn't have the block, it'll request from peer while saving the pending attestation to a queue.
|
||||
s.savePendingAtt(ð.AggregateAttestationAndProof{Aggregate: att})
|
||||
return false
|
||||
}
|
||||
|
||||
|
||||
@@ -31,6 +31,7 @@ func TestService_validateCommitteeIndexBeaconAttestation(t *testing.T) {
|
||||
chain: &mockChain.ChainService{
|
||||
Genesis: time.Now().Add(time.Duration(-64*int64(params.BeaconConfig().SecondsPerSlot)) * time.Second), // 64 slots ago
|
||||
},
|
||||
blkRootToPendingAtts: make(map[[32]byte][]*ethpb.AggregateAttestationAndProof),
|
||||
}
|
||||
|
||||
blk := ðpb.SignedBeaconBlock{
|
||||
|
||||
Reference in New Issue
Block a user