mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-10 05:47:59 -05:00
Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fc7c6776f6 | ||
|
|
b150acffca | ||
|
|
54a42ce4a8 |
@@ -131,13 +131,13 @@ func (s *State) stateSummary(ctx context.Context, blockRoot [32]byte) (*pb.State
|
||||
}
|
||||
}
|
||||
if summary == nil {
|
||||
return s.recoverStateSummary(ctx, blockRoot)
|
||||
return s.RecoverStateSummary(ctx, blockRoot)
|
||||
}
|
||||
return summary, nil
|
||||
}
|
||||
|
||||
// This recovers state summary object of a given block root by using the saved block in DB.
|
||||
func (s *State) recoverStateSummary(ctx context.Context, blockRoot [32]byte) (*pb.StateSummary, error) {
|
||||
// RecoverStateSummary recovers state summary object of a given block root by using the saved block in DB.
|
||||
func (s *State) RecoverStateSummary(ctx context.Context, blockRoot [32]byte) (*pb.StateSummary, error) {
|
||||
if s.beaconDB.HasBlock(ctx, blockRoot) {
|
||||
b, err := s.beaconDB.Block(ctx, blockRoot)
|
||||
if err != nil {
|
||||
@@ -149,7 +149,7 @@ func (s *State) recoverStateSummary(ctx context.Context, blockRoot [32]byte) (*p
|
||||
}
|
||||
return summary, nil
|
||||
}
|
||||
return nil, errUnknownStateSummary
|
||||
return nil, errors.New("could not find block in DB")
|
||||
}
|
||||
|
||||
// This loads a beacon state from either the cache or DB then replay blocks up the requested block root.
|
||||
|
||||
@@ -232,7 +232,7 @@ func TestStateSummary_CanGetFromCacheOrDB(t *testing.T) {
|
||||
r := [32]byte{'a'}
|
||||
summary := &pb.StateSummary{Slot: 100}
|
||||
_, err := service.stateSummary(ctx, r)
|
||||
require.ErrorContains(t, errUnknownStateSummary.Error(), err)
|
||||
require.ErrorContains(t, "could not find block in DB", err)
|
||||
|
||||
service.stateSummaryCache.Put(r, summary)
|
||||
got, err := service.stateSummary(ctx, r)
|
||||
@@ -244,7 +244,7 @@ func TestStateSummary_CanGetFromCacheOrDB(t *testing.T) {
|
||||
r = [32]byte{'b'}
|
||||
summary = &pb.StateSummary{Root: r[:], Slot: 101}
|
||||
_, err = service.stateSummary(ctx, r)
|
||||
require.ErrorContains(t, errUnknownStateSummary.Error(), err)
|
||||
require.ErrorContains(t, "could not find block in DB", err)
|
||||
|
||||
require.NoError(t, service.beaconDB.SaveStateSummary(ctx, summary))
|
||||
got, err = service.stateSummary(ctx, r)
|
||||
|
||||
@@ -36,6 +36,10 @@ const (
|
||||
noRequiredPeersErrRefreshInterval = 15 * time.Second
|
||||
// maxResetAttempts number of times stale FSM is reset, before backtracking is triggered.
|
||||
maxResetAttempts = 4
|
||||
// startBackSlots defines number of slots before the current head, which defines a start position
|
||||
// of the initial machine. This allows more robustness in case of normal sync sets head to some
|
||||
// orphaned block: in that case starting earlier and re-fetching blocks allows to reorganize chain.
|
||||
startBackSlots = 32
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -173,6 +177,9 @@ func (q *blocksQueue) loop() {
|
||||
|
||||
// Define initial state machines.
|
||||
startSlot := q.chain.HeadSlot()
|
||||
if startSlot > startBackSlots {
|
||||
startSlot -= startBackSlots
|
||||
}
|
||||
blocksPerRequest := q.blocksFetcher.blocksPerSecond
|
||||
for i := startSlot; i < startSlot+blocksPerRequest*lookaheadSteps; i += blocksPerRequest {
|
||||
q.smm.addStateMachine(i)
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/kevinms/leakybucket-go"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
@@ -1255,3 +1256,120 @@ func TestBlocksQueue_stuckInUnfavourableFork(t *testing.T) {
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestBlocksQueue_stuckWhenHeadIsSetToOrphanedBlock(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
beaconDB, _ := dbtest.SetupDB(t)
|
||||
p2p := p2pt.NewTestP2P(t)
|
||||
|
||||
chain := extendBlockSequence(t, []*eth.SignedBeaconBlock{}, 128)
|
||||
finalizedSlot := uint64(82)
|
||||
finalizedEpoch := helpers.SlotToEpoch(finalizedSlot)
|
||||
|
||||
genesisBlock := chain[0]
|
||||
require.NoError(t, beaconDB.SaveBlock(context.Background(), genesisBlock))
|
||||
genesisRoot, err := genesisBlock.Block.HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
|
||||
st := testutil.NewBeaconState()
|
||||
mc := &mock.ChainService{
|
||||
State: st,
|
||||
Root: genesisRoot[:],
|
||||
DB: beaconDB,
|
||||
FinalizedCheckPoint: ð.Checkpoint{
|
||||
Epoch: finalizedEpoch,
|
||||
Root: []byte(fmt.Sprintf("finalized_root %d", finalizedEpoch)),
|
||||
},
|
||||
}
|
||||
|
||||
// Populate database with blocks with part of the chain, orphaned block will be added on top.
|
||||
for _, blk := range chain[1:84] {
|
||||
parentRoot := bytesutil.ToBytes32(blk.Block.ParentRoot)
|
||||
// Save block only if parent root is already in database or cache.
|
||||
if beaconDB.HasBlock(ctx, parentRoot) || mc.HasInitSyncBlock(parentRoot) {
|
||||
require.NoError(t, beaconDB.SaveBlock(ctx, blk))
|
||||
require.NoError(t, st.SetSlot(blk.Block.Slot))
|
||||
}
|
||||
}
|
||||
require.Equal(t, uint64(83), mc.HeadSlot())
|
||||
require.Equal(t, chain[83].Block.Slot, mc.HeadSlot())
|
||||
|
||||
// Set head to slot 85, while we do not have block with slot 84 in DB, so block is orphaned.
|
||||
// Moreover, block with slot 85 is a forked block and should be replaced, with block from peer.
|
||||
orphanedBlock := testutil.NewBeaconBlock()
|
||||
orphanedBlock.Block.Slot = 85
|
||||
orphanedBlock.Block.StateRoot = testutil.Random32Bytes(t)
|
||||
require.NoError(t, beaconDB.SaveBlock(ctx, orphanedBlock))
|
||||
require.NoError(t, st.SetSlot(orphanedBlock.Block.Slot))
|
||||
require.Equal(t, uint64(85), mc.HeadSlot())
|
||||
|
||||
fetcher := newBlocksFetcher(
|
||||
ctx,
|
||||
&blocksFetcherConfig{
|
||||
chain: mc,
|
||||
p2p: p2p,
|
||||
db: beaconDB,
|
||||
},
|
||||
)
|
||||
fetcher.rateLimiter = leakybucket.NewCollector(6400, 6400, false)
|
||||
|
||||
// Connect peer that has all the blocks available.
|
||||
allBlocksPeer := connectPeerHavingBlocks(t, p2p, chain, finalizedSlot, p2p.Peers())
|
||||
defer func() {
|
||||
p2p.Peers().SetConnectionState(allBlocksPeer, peers.PeerDisconnected)
|
||||
}()
|
||||
|
||||
// Queue should be able to fetch whole chain (including slot which comes before the currently set head).
|
||||
queue := newBlocksQueue(ctx, &blocksQueueConfig{
|
||||
blocksFetcher: fetcher,
|
||||
chain: mc,
|
||||
highestExpectedSlot: uint64(len(chain) - 1),
|
||||
mode: modeNonConstrained,
|
||||
})
|
||||
|
||||
require.NoError(t, queue.start())
|
||||
isProcessedBlock := func(ctx context.Context, blk *eth.SignedBeaconBlock, blkRoot [32]byte) bool {
|
||||
finalizedSlot, err := helpers.StartSlot(mc.FinalizedCheckpt().Epoch)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
if blk.Block.Slot <= finalizedSlot || (beaconDB.HasBlock(ctx, blkRoot) || mc.HasInitSyncBlock(blkRoot)) {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
select {
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatal("test takes to long to complete")
|
||||
case data := <-queue.fetchedData:
|
||||
for _, blk := range data.blocks {
|
||||
blkRoot, err := blk.Block.HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
if isProcessedBlock(ctx, blk, blkRoot) {
|
||||
log.Errorf("slot: %d , root %#x: %v", blk.Block.Slot, blkRoot, errBlockAlreadyProcessed)
|
||||
continue
|
||||
}
|
||||
|
||||
parentRoot := bytesutil.ToBytes32(blk.Block.ParentRoot)
|
||||
if !beaconDB.HasBlock(ctx, parentRoot) && !mc.HasInitSyncBlock(parentRoot) {
|
||||
log.Errorf("%v: %#x", errParentDoesNotExist, blk.Block.ParentRoot)
|
||||
continue
|
||||
}
|
||||
|
||||
// Block is not already processed, and parent exists in database - process.
|
||||
require.NoError(t, beaconDB.SaveBlock(ctx, blk))
|
||||
require.NoError(t, st.SetSlot(blk.Block.Slot))
|
||||
}
|
||||
}
|
||||
require.NoError(t, queue.stop())
|
||||
|
||||
// Check that all blocks available in chain are produced by queue.
|
||||
for _, blk := range chain[:orphanedBlock.Block.Slot+32] {
|
||||
blkRoot, err := blk.Block.HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, true, beaconDB.HasBlock(ctx, blkRoot) || mc.HasInitSyncBlock(blkRoot), "slot %d", blk.Block.Slot)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,6 +18,7 @@ import (
|
||||
p2ptest "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
|
||||
p2ptypes "github.com/prysmaticlabs/prysm/beacon-chain/p2p/types"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/state"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/state/stategen"
|
||||
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
|
||||
"github.com/prysmaticlabs/prysm/shared/rand"
|
||||
"github.com/prysmaticlabs/prysm/shared/testutil"
|
||||
@@ -44,6 +45,7 @@ func TestRegularSyncBeaconBlockSubscriber_ProcessPendingBlocks1(t *testing.T) {
|
||||
slotToPendingBlocks: gcache.New(time.Second, 2*time.Second),
|
||||
seenPendingBlocks: make(map[[32]byte]bool),
|
||||
stateSummaryCache: stateSummaryCache,
|
||||
stateGen: stategen.New(db, stateSummaryCache),
|
||||
}
|
||||
err := r.initCaches()
|
||||
require.NoError(t, err)
|
||||
@@ -172,6 +174,7 @@ func TestRegularSyncBeaconBlockSubscriber_ProcessPendingBlocks_2Chains(t *testin
|
||||
slotToPendingBlocks: gcache.New(time.Second, 2*time.Second),
|
||||
seenPendingBlocks: make(map[[32]byte]bool),
|
||||
stateSummaryCache: stateSummaryCache,
|
||||
stateGen: stategen.New(db, stateSummaryCache),
|
||||
}
|
||||
err := r.initCaches()
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -152,7 +152,10 @@ func (s *Service) validateBeaconBlock(ctx context.Context, blk *ethpb.SignedBeac
|
||||
hasStateSummaryDB := s.db.HasStateSummary(ctx, bytesutil.ToBytes32(blk.Block.ParentRoot))
|
||||
hasStateSummaryCache := s.stateSummaryCache.Has(bytesutil.ToBytes32(blk.Block.ParentRoot))
|
||||
if !hasStateSummaryDB && !hasStateSummaryCache {
|
||||
return errors.New("no access to parent state")
|
||||
_, err := s.stateGen.RecoverStateSummary(ctx, bytesutil.ToBytes32(blk.Block.ParentRoot))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
parentState, err := s.stateGen.StateByRoot(ctx, bytesutil.ToBytes32(blk.Block.ParentRoot))
|
||||
if err != nil {
|
||||
|
||||
@@ -121,6 +121,67 @@ func TestValidateBeaconBlockPubSub_BlockAlreadyPresentInDB(t *testing.T) {
|
||||
assert.Equal(t, false, result)
|
||||
}
|
||||
|
||||
func TestValidateBeaconBlockPubSub_CanRecoverStateSummary(t *testing.T) {
|
||||
db, stateSummaryCache := dbtest.SetupDB(t)
|
||||
p := p2ptest.NewTestP2P(t)
|
||||
ctx := context.Background()
|
||||
beaconState, privKeys := testutil.DeterministicGenesisState(t, 100)
|
||||
parentBlock := testutil.NewBeaconBlock()
|
||||
require.NoError(t, db.SaveBlock(ctx, parentBlock))
|
||||
bRoot, err := parentBlock.Block.HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, db.SaveState(ctx, beaconState, bRoot))
|
||||
copied := beaconState.Copy()
|
||||
require.NoError(t, copied.SetSlot(1))
|
||||
proposerIdx, err := helpers.BeaconProposerIndex(copied)
|
||||
require.NoError(t, err)
|
||||
msg := testutil.NewBeaconBlock()
|
||||
msg.Block.ParentRoot = bRoot[:]
|
||||
msg.Block.Slot = 1
|
||||
msg.Block.ProposerIndex = proposerIdx
|
||||
msg.Signature, err = helpers.ComputeDomainAndSign(beaconState, 0, msg.Block, params.BeaconConfig().DomainBeaconProposer, privKeys[proposerIdx])
|
||||
require.NoError(t, err)
|
||||
|
||||
c, err := lru.New(10)
|
||||
require.NoError(t, err)
|
||||
c2, err := lru.New(10)
|
||||
require.NoError(t, err)
|
||||
stateGen := stategen.New(db, stateSummaryCache)
|
||||
chainService := &mock.ChainService{Genesis: time.Unix(time.Now().Unix()-int64(params.BeaconConfig().SecondsPerSlot), 0),
|
||||
State: beaconState,
|
||||
FinalizedCheckPoint: ðpb.Checkpoint{
|
||||
Epoch: 0,
|
||||
Root: make([]byte, 32),
|
||||
},
|
||||
}
|
||||
r := &Service{
|
||||
db: db,
|
||||
p2p: p,
|
||||
initialSync: &mockSync.Sync{IsSyncing: false},
|
||||
chain: chainService,
|
||||
blockNotifier: chainService.BlockNotifier(),
|
||||
seenBlockCache: c,
|
||||
badBlockCache: c2,
|
||||
slotToPendingBlocks: gcache.New(time.Second, 2*time.Second),
|
||||
seenPendingBlocks: make(map[[32]byte]bool),
|
||||
stateSummaryCache: stateSummaryCache,
|
||||
stateGen: stateGen,
|
||||
}
|
||||
buf := new(bytes.Buffer)
|
||||
_, err = p.Encoding().EncodeGossip(buf, msg)
|
||||
require.NoError(t, err)
|
||||
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
|
||||
m := &pubsub.Message{
|
||||
Message: &pubsubpb.Message{
|
||||
Data: buf.Bytes(),
|
||||
Topic: &topic,
|
||||
},
|
||||
}
|
||||
result := r.validateBeaconBlockPubSub(ctx, "", m) == pubsub.ValidationAccept
|
||||
assert.Equal(t, true, result)
|
||||
assert.NotNil(t, m.ValidatorData, "Decoded message was not set on the message validator data")
|
||||
}
|
||||
|
||||
func TestValidateBeaconBlockPubSub_ValidProposerSignature(t *testing.T) {
|
||||
db, stateSummaryCache := dbtest.SetupDB(t)
|
||||
p := p2ptest.NewTestP2P(t)
|
||||
|
||||
@@ -51,11 +51,11 @@ func (store *Store) ProposalHistoryForSlot(ctx context.Context, publicKey [48]by
|
||||
var err error
|
||||
var proposalExists bool
|
||||
signingRoot := [32]byte{}
|
||||
err = store.view(func(tx *bolt.Tx) error {
|
||||
err = store.update(func(tx *bolt.Tx) error {
|
||||
bucket := tx.Bucket(newHistoricProposalsBucket)
|
||||
valBucket := bucket.Bucket(publicKey[:])
|
||||
if valBucket == nil {
|
||||
return fmt.Errorf("validator history empty for public key: %#x", publicKey)
|
||||
valBucket, err := bucket.CreateBucketIfNotExists(publicKey[:])
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not create bucket for public key %#x", publicKey[:])
|
||||
}
|
||||
signingRootBytes := valBucket.Get(bytesutil.Uint64ToBytesBigEndian(slot))
|
||||
if signingRootBytes == nil {
|
||||
|
||||
@@ -22,12 +22,13 @@ func TestProposalHistoryForSlot_InitializesNewPubKeys(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewProposalHistoryForSlot_NilDB(t *testing.T) {
|
||||
func TestNewProposalHistoryForSlot_ReturnsNilIfNoHistory(t *testing.T) {
|
||||
valPubkey := [48]byte{1, 2, 3}
|
||||
db := setupDB(t, [][48]byte{})
|
||||
|
||||
_, _, err := db.ProposalHistoryForSlot(context.Background(), valPubkey, 0)
|
||||
require.ErrorContains(t, "validator history empty for public key", err, "Unexpected error for nil DB")
|
||||
_, proposalExists, err := db.ProposalHistoryForSlot(context.Background(), valPubkey, 0)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, false, proposalExists)
|
||||
}
|
||||
|
||||
func TestSaveProposalHistoryForSlot_OK(t *testing.T) {
|
||||
|
||||
Reference in New Issue
Block a user