Compare commits

...

3 Commits

Author SHA1 Message Date
Raul Jordan
fc7c6776f6 Create Bucket If Not Exists When Fetching Proposal History (#8011)
* create bucket if not exist when fetching proposal history

* adding unit test
2020-12-01 08:00:03 -06:00
terence tsao
b150acffca Recover state summary for sync validation (#7994)
* Recover state summary

* Add test

* Fix tests

Co-authored-by: Raul Jordan <raul@prysmaticlabs.com>
2020-11-29 20:42:45 -06:00
Victor Farazdagi
54a42ce4a8 more robust processing of invalid head slot (#7990) 2020-11-29 15:03:25 -06:00
9 changed files with 207 additions and 14 deletions

View File

@@ -131,13 +131,13 @@ func (s *State) stateSummary(ctx context.Context, blockRoot [32]byte) (*pb.State
}
}
if summary == nil {
return s.recoverStateSummary(ctx, blockRoot)
return s.RecoverStateSummary(ctx, blockRoot)
}
return summary, nil
}
// This recovers state summary object of a given block root by using the saved block in DB.
func (s *State) recoverStateSummary(ctx context.Context, blockRoot [32]byte) (*pb.StateSummary, error) {
// RecoverStateSummary recovers state summary object of a given block root by using the saved block in DB.
func (s *State) RecoverStateSummary(ctx context.Context, blockRoot [32]byte) (*pb.StateSummary, error) {
if s.beaconDB.HasBlock(ctx, blockRoot) {
b, err := s.beaconDB.Block(ctx, blockRoot)
if err != nil {
@@ -149,7 +149,7 @@ func (s *State) recoverStateSummary(ctx context.Context, blockRoot [32]byte) (*p
}
return summary, nil
}
return nil, errUnknownStateSummary
return nil, errors.New("could not find block in DB")
}
// This loads a beacon state from either the cache or DB then replay blocks up the requested block root.

View File

@@ -232,7 +232,7 @@ func TestStateSummary_CanGetFromCacheOrDB(t *testing.T) {
r := [32]byte{'a'}
summary := &pb.StateSummary{Slot: 100}
_, err := service.stateSummary(ctx, r)
require.ErrorContains(t, errUnknownStateSummary.Error(), err)
require.ErrorContains(t, "could not find block in DB", err)
service.stateSummaryCache.Put(r, summary)
got, err := service.stateSummary(ctx, r)
@@ -244,7 +244,7 @@ func TestStateSummary_CanGetFromCacheOrDB(t *testing.T) {
r = [32]byte{'b'}
summary = &pb.StateSummary{Root: r[:], Slot: 101}
_, err = service.stateSummary(ctx, r)
require.ErrorContains(t, errUnknownStateSummary.Error(), err)
require.ErrorContains(t, "could not find block in DB", err)
require.NoError(t, service.beaconDB.SaveStateSummary(ctx, summary))
got, err = service.stateSummary(ctx, r)

View File

@@ -36,6 +36,10 @@ const (
noRequiredPeersErrRefreshInterval = 15 * time.Second
// maxResetAttempts number of times stale FSM is reset, before backtracking is triggered.
maxResetAttempts = 4
// startBackSlots defines number of slots before the current head, which defines a start position
// of the initial machine. This allows more robustness in case of normal sync sets head to some
// orphaned block: in that case starting earlier and re-fetching blocks allows to reorganize chain.
startBackSlots = 32
)
var (
@@ -173,6 +177,9 @@ func (q *blocksQueue) loop() {
// Define initial state machines.
startSlot := q.chain.HeadSlot()
if startSlot > startBackSlots {
startSlot -= startBackSlots
}
blocksPerRequest := q.blocksFetcher.blocksPerSecond
for i := startSlot; i < startSlot+blocksPerRequest*lookaheadSteps; i += blocksPerRequest {
q.smm.addStateMachine(i)

View File

@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"testing"
"time"
"github.com/kevinms/leakybucket-go"
"github.com/libp2p/go-libp2p-core/peer"
@@ -1255,3 +1256,120 @@ func TestBlocksQueue_stuckInUnfavourableFork(t *testing.T) {
}
})
}
func TestBlocksQueue_stuckWhenHeadIsSetToOrphanedBlock(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
beaconDB, _ := dbtest.SetupDB(t)
p2p := p2pt.NewTestP2P(t)
chain := extendBlockSequence(t, []*eth.SignedBeaconBlock{}, 128)
finalizedSlot := uint64(82)
finalizedEpoch := helpers.SlotToEpoch(finalizedSlot)
genesisBlock := chain[0]
require.NoError(t, beaconDB.SaveBlock(context.Background(), genesisBlock))
genesisRoot, err := genesisBlock.Block.HashTreeRoot()
require.NoError(t, err)
st := testutil.NewBeaconState()
mc := &mock.ChainService{
State: st,
Root: genesisRoot[:],
DB: beaconDB,
FinalizedCheckPoint: &eth.Checkpoint{
Epoch: finalizedEpoch,
Root: []byte(fmt.Sprintf("finalized_root %d", finalizedEpoch)),
},
}
// Populate database with blocks with part of the chain, orphaned block will be added on top.
for _, blk := range chain[1:84] {
parentRoot := bytesutil.ToBytes32(blk.Block.ParentRoot)
// Save block only if parent root is already in database or cache.
if beaconDB.HasBlock(ctx, parentRoot) || mc.HasInitSyncBlock(parentRoot) {
require.NoError(t, beaconDB.SaveBlock(ctx, blk))
require.NoError(t, st.SetSlot(blk.Block.Slot))
}
}
require.Equal(t, uint64(83), mc.HeadSlot())
require.Equal(t, chain[83].Block.Slot, mc.HeadSlot())
// Set head to slot 85, while we do not have block with slot 84 in DB, so block is orphaned.
// Moreover, block with slot 85 is a forked block and should be replaced, with block from peer.
orphanedBlock := testutil.NewBeaconBlock()
orphanedBlock.Block.Slot = 85
orphanedBlock.Block.StateRoot = testutil.Random32Bytes(t)
require.NoError(t, beaconDB.SaveBlock(ctx, orphanedBlock))
require.NoError(t, st.SetSlot(orphanedBlock.Block.Slot))
require.Equal(t, uint64(85), mc.HeadSlot())
fetcher := newBlocksFetcher(
ctx,
&blocksFetcherConfig{
chain: mc,
p2p: p2p,
db: beaconDB,
},
)
fetcher.rateLimiter = leakybucket.NewCollector(6400, 6400, false)
// Connect peer that has all the blocks available.
allBlocksPeer := connectPeerHavingBlocks(t, p2p, chain, finalizedSlot, p2p.Peers())
defer func() {
p2p.Peers().SetConnectionState(allBlocksPeer, peers.PeerDisconnected)
}()
// Queue should be able to fetch whole chain (including slot which comes before the currently set head).
queue := newBlocksQueue(ctx, &blocksQueueConfig{
blocksFetcher: fetcher,
chain: mc,
highestExpectedSlot: uint64(len(chain) - 1),
mode: modeNonConstrained,
})
require.NoError(t, queue.start())
isProcessedBlock := func(ctx context.Context, blk *eth.SignedBeaconBlock, blkRoot [32]byte) bool {
finalizedSlot, err := helpers.StartSlot(mc.FinalizedCheckpt().Epoch)
if err != nil {
return false
}
if blk.Block.Slot <= finalizedSlot || (beaconDB.HasBlock(ctx, blkRoot) || mc.HasInitSyncBlock(blkRoot)) {
return true
}
return false
}
select {
case <-time.After(3 * time.Second):
t.Fatal("test takes to long to complete")
case data := <-queue.fetchedData:
for _, blk := range data.blocks {
blkRoot, err := blk.Block.HashTreeRoot()
require.NoError(t, err)
if isProcessedBlock(ctx, blk, blkRoot) {
log.Errorf("slot: %d , root %#x: %v", blk.Block.Slot, blkRoot, errBlockAlreadyProcessed)
continue
}
parentRoot := bytesutil.ToBytes32(blk.Block.ParentRoot)
if !beaconDB.HasBlock(ctx, parentRoot) && !mc.HasInitSyncBlock(parentRoot) {
log.Errorf("%v: %#x", errParentDoesNotExist, blk.Block.ParentRoot)
continue
}
// Block is not already processed, and parent exists in database - process.
require.NoError(t, beaconDB.SaveBlock(ctx, blk))
require.NoError(t, st.SetSlot(blk.Block.Slot))
}
}
require.NoError(t, queue.stop())
// Check that all blocks available in chain are produced by queue.
for _, blk := range chain[:orphanedBlock.Block.Slot+32] {
blkRoot, err := blk.Block.HashTreeRoot()
require.NoError(t, err)
require.Equal(t, true, beaconDB.HasBlock(ctx, blkRoot) || mc.HasInitSyncBlock(blkRoot), "slot %d", blk.Block.Slot)
}
}

View File

@@ -18,6 +18,7 @@ import (
p2ptest "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
p2ptypes "github.com/prysmaticlabs/prysm/beacon-chain/p2p/types"
"github.com/prysmaticlabs/prysm/beacon-chain/state"
"github.com/prysmaticlabs/prysm/beacon-chain/state/stategen"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/rand"
"github.com/prysmaticlabs/prysm/shared/testutil"
@@ -44,6 +45,7 @@ func TestRegularSyncBeaconBlockSubscriber_ProcessPendingBlocks1(t *testing.T) {
slotToPendingBlocks: gcache.New(time.Second, 2*time.Second),
seenPendingBlocks: make(map[[32]byte]bool),
stateSummaryCache: stateSummaryCache,
stateGen: stategen.New(db, stateSummaryCache),
}
err := r.initCaches()
require.NoError(t, err)
@@ -172,6 +174,7 @@ func TestRegularSyncBeaconBlockSubscriber_ProcessPendingBlocks_2Chains(t *testin
slotToPendingBlocks: gcache.New(time.Second, 2*time.Second),
seenPendingBlocks: make(map[[32]byte]bool),
stateSummaryCache: stateSummaryCache,
stateGen: stategen.New(db, stateSummaryCache),
}
err := r.initCaches()
require.NoError(t, err)

View File

@@ -152,7 +152,10 @@ func (s *Service) validateBeaconBlock(ctx context.Context, blk *ethpb.SignedBeac
hasStateSummaryDB := s.db.HasStateSummary(ctx, bytesutil.ToBytes32(blk.Block.ParentRoot))
hasStateSummaryCache := s.stateSummaryCache.Has(bytesutil.ToBytes32(blk.Block.ParentRoot))
if !hasStateSummaryDB && !hasStateSummaryCache {
return errors.New("no access to parent state")
_, err := s.stateGen.RecoverStateSummary(ctx, bytesutil.ToBytes32(blk.Block.ParentRoot))
if err != nil {
return err
}
}
parentState, err := s.stateGen.StateByRoot(ctx, bytesutil.ToBytes32(blk.Block.ParentRoot))
if err != nil {

View File

@@ -121,6 +121,67 @@ func TestValidateBeaconBlockPubSub_BlockAlreadyPresentInDB(t *testing.T) {
assert.Equal(t, false, result)
}
func TestValidateBeaconBlockPubSub_CanRecoverStateSummary(t *testing.T) {
db, stateSummaryCache := dbtest.SetupDB(t)
p := p2ptest.NewTestP2P(t)
ctx := context.Background()
beaconState, privKeys := testutil.DeterministicGenesisState(t, 100)
parentBlock := testutil.NewBeaconBlock()
require.NoError(t, db.SaveBlock(ctx, parentBlock))
bRoot, err := parentBlock.Block.HashTreeRoot()
require.NoError(t, err)
require.NoError(t, db.SaveState(ctx, beaconState, bRoot))
copied := beaconState.Copy()
require.NoError(t, copied.SetSlot(1))
proposerIdx, err := helpers.BeaconProposerIndex(copied)
require.NoError(t, err)
msg := testutil.NewBeaconBlock()
msg.Block.ParentRoot = bRoot[:]
msg.Block.Slot = 1
msg.Block.ProposerIndex = proposerIdx
msg.Signature, err = helpers.ComputeDomainAndSign(beaconState, 0, msg.Block, params.BeaconConfig().DomainBeaconProposer, privKeys[proposerIdx])
require.NoError(t, err)
c, err := lru.New(10)
require.NoError(t, err)
c2, err := lru.New(10)
require.NoError(t, err)
stateGen := stategen.New(db, stateSummaryCache)
chainService := &mock.ChainService{Genesis: time.Unix(time.Now().Unix()-int64(params.BeaconConfig().SecondsPerSlot), 0),
State: beaconState,
FinalizedCheckPoint: &ethpb.Checkpoint{
Epoch: 0,
Root: make([]byte, 32),
},
}
r := &Service{
db: db,
p2p: p,
initialSync: &mockSync.Sync{IsSyncing: false},
chain: chainService,
blockNotifier: chainService.BlockNotifier(),
seenBlockCache: c,
badBlockCache: c2,
slotToPendingBlocks: gcache.New(time.Second, 2*time.Second),
seenPendingBlocks: make(map[[32]byte]bool),
stateSummaryCache: stateSummaryCache,
stateGen: stateGen,
}
buf := new(bytes.Buffer)
_, err = p.Encoding().EncodeGossip(buf, msg)
require.NoError(t, err)
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
m := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
Topic: &topic,
},
}
result := r.validateBeaconBlockPubSub(ctx, "", m) == pubsub.ValidationAccept
assert.Equal(t, true, result)
assert.NotNil(t, m.ValidatorData, "Decoded message was not set on the message validator data")
}
func TestValidateBeaconBlockPubSub_ValidProposerSignature(t *testing.T) {
db, stateSummaryCache := dbtest.SetupDB(t)
p := p2ptest.NewTestP2P(t)

View File

@@ -51,11 +51,11 @@ func (store *Store) ProposalHistoryForSlot(ctx context.Context, publicKey [48]by
var err error
var proposalExists bool
signingRoot := [32]byte{}
err = store.view(func(tx *bolt.Tx) error {
err = store.update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(newHistoricProposalsBucket)
valBucket := bucket.Bucket(publicKey[:])
if valBucket == nil {
return fmt.Errorf("validator history empty for public key: %#x", publicKey)
valBucket, err := bucket.CreateBucketIfNotExists(publicKey[:])
if err != nil {
return fmt.Errorf("could not create bucket for public key %#x", publicKey[:])
}
signingRootBytes := valBucket.Get(bytesutil.Uint64ToBytesBigEndian(slot))
if signingRootBytes == nil {

View File

@@ -22,12 +22,13 @@ func TestProposalHistoryForSlot_InitializesNewPubKeys(t *testing.T) {
}
}
func TestNewProposalHistoryForSlot_NilDB(t *testing.T) {
func TestNewProposalHistoryForSlot_ReturnsNilIfNoHistory(t *testing.T) {
valPubkey := [48]byte{1, 2, 3}
db := setupDB(t, [][48]byte{})
_, _, err := db.ProposalHistoryForSlot(context.Background(), valPubkey, 0)
require.ErrorContains(t, "validator history empty for public key", err, "Unexpected error for nil DB")
_, proposalExists, err := db.ProposalHistoryForSlot(context.Background(), valPubkey, 0)
require.NoError(t, err)
assert.Equal(t, false, proposalExists)
}
func TestSaveProposalHistoryForSlot_OK(t *testing.T) {