Fix finalized block root index (#3950)

* progress on fixing #3947

* upsert after, but only if not already exists

* return an error in some case

* format

* handle case where there has not been any finality yet

* it works

* remove todo

* use variable with comments

* regression test

* revert off by one issue, fix tests

* rename receiver

* @nisdas pr feedback
This commit is contained in:
Preston Van Loon
2019-11-07 22:00:47 -05:00
committed by Nishant Das
parent 768c2bd812
commit b87d0abc6c
7 changed files with 218 additions and 26 deletions

View File

@@ -21,6 +21,7 @@ go_library(
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/db/kv",
visibility = ["//beacon-chain:__subpackages__"],
deps = [
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/db/filters:go_default_library",
"//proto/beacon/db:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
@@ -63,6 +64,7 @@ go_test(
"//proto/beacon/p2p/v1:go_default_library",
"//proto/eth/v1alpha1:go_default_library",
"//shared/bytesutil:go_default_library",
"//shared/params:go_default_library",
"//shared/testutil:go_default_library",
"@com_github_ethereum_go_ethereum//common:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",

View File

@@ -252,6 +252,10 @@ func TestStore_Blocks_FiltersCorrectly(t *testing.T) {
filter: filters.NewFilter().SetEndSlot(8),
expectedNumBlocks: 5,
},
{
filter: filters.NewFilter().SetStartSlot(5).SetEndSlot(10),
expectedNumBlocks: 4,
},
{
// Composite filter criteria.
filter: filters.NewFilter().

View File

@@ -90,6 +90,6 @@ func (k *Store) SaveFinalizedCheckpoint(ctx context.Context, checkpoint *ethpb.C
if err := bucket.Put(finalizedCheckpointKey, enc); err != nil {
return err
}
return updateFinalizedBlockRoots(ctx, tx, checkpoint)
return k.updateFinalizedBlockRoots(ctx, tx, checkpoint)
})
}

View File

@@ -7,41 +7,84 @@ import (
"github.com/boltdb/bolt"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/db/filters"
dbpb "github.com/prysmaticlabs/prysm/proto/beacon/db"
ethpb "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/traceutil"
"go.opencensus.io/trace"
)
var errMissingParentBlockInDatabase = errors.New("missing block in database")
var previousFinalizedCheckpointKey = []byte("previous-finalized-checkpoint")
func updateFinalizedBlockRoots(ctx context.Context, tx *bolt.Tx, checkpoint *ethpb.Checkpoint) error {
// Blocks from the recent finalized epoch are not part of the finalized and canonical chain in this
// index. These containers will be removed on the next update of finalized checkpoint. Note that
// these block roots may be considered canonical in the "head view" of the beacon chain, but not so
// in this index.
var containerFinalizedButNotCanonical = []byte("recent block needs reindexing to determine canonical")
// The finalized block roots index tracks beacon blocks which are finalized in the canonical chain.
// The finalized checkpoint contains the the epoch which was finalized and the highest beacon block
// root where block.slot <= start_slot(epoch). As a result, we cannot index the finalized canonical
// beacon block chain using the finalized root alone as this would exclude all other blocks in the
// finalized epoch from being indexed as "final and canonical".
//
// The algorithm for building the index works as follows:
// - De-index all finalized beacon block roots from previous_finalized_epoch to
// new_finalized_epoch. (I.e. delete these roots from the index, to be re-indexed.)
// - Build the canonical finalized chain by walking up the ancestry chain from the finalized block
// root until a parent is found in the index or the parent is genesis.
// - Add all block roots in the database where epoch(block.slot) == checkpoint.epoch.
//
// This method ensures that all blocks from the current finalized epoch are considered "final" while
// maintaining only canonical and finalized blocks older than the current finalized epoch.
func (kv *Store) updateFinalizedBlockRoots(ctx context.Context, tx *bolt.Tx, checkpoint *ethpb.Checkpoint) error {
ctx, span := trace.StartSpan(ctx, "BeaconDB.updateFinalizedBlockRoots")
defer span.End()
bkt := tx.Bucket(finalizedBlockRootsIndexBucket)
blocks := tx.Bucket(blocksBucket)
root := checkpoint.Root
var previousRoot []byte
genesisRoot := blocks.Get(genesisBlockRootKey)
genesisRoot := tx.Bucket(blocksBucket).Get(genesisBlockRootKey)
// De-index recent finalized block roots, to be re-indexed.
prevousFinalizedCheckpoint := &ethpb.Checkpoint{}
if b := bkt.Get(previousFinalizedCheckpointKey); b != nil {
if err := proto.Unmarshal(b, prevousFinalizedCheckpoint); err != nil {
traceutil.AnnotateError(span, err)
return err
}
}
previousFinalizedCheckpointStartSlot := helpers.StartSlot(prevousFinalizedCheckpoint.Epoch)
recentFinalizedCheckpointEndSlot := helpers.StartSlot(checkpoint.Epoch + 1)
blockRoots, err := kv.BlockRoots(ctx, filters.NewFilter().SetStartSlot(previousFinalizedCheckpointStartSlot).SetEndSlot(recentFinalizedCheckpointEndSlot))
if err != nil {
traceutil.AnnotateError(span, err)
return err
}
for _, root := range blockRoots {
if err := bkt.Delete(root[:]); err != nil {
traceutil.AnnotateError(span, err)
return err
}
}
// Walk up the ancestry chain until we reach a block root present in the finalized block roots
// index bucket or genesis block root.
for {
if bytes.Equal(root, genesisRoot) {
return nil
break
}
enc := blocks.Get(root)
if enc == nil {
err := fmt.Errorf("missing block in database: block root=%#x", root)
block, err := kv.Block(ctx, bytesutil.ToBytes32(root))
if err != nil {
traceutil.AnnotateError(span, err)
return err
}
block := &ethpb.BeaconBlock{}
if err := proto.Unmarshal(enc, block); err != nil {
if block == nil {
err := fmt.Errorf("missing block in database: block root=%#x", root)
traceutil.AnnotateError(span, err)
return err
}
@@ -60,6 +103,8 @@ func updateFinalizedBlockRoots(ctx context.Context, tx *bolt.Tx, checkpoint *eth
traceutil.AnnotateError(span, err)
return err
}
// Found parent, loop exit condition.
if parentBytes := bkt.Get(block.ParentRoot); parentBytes != nil {
parent := &dbpb.FinalizedBlockRootContainer{}
if err := proto.Unmarshal(parentBytes, parent); err != nil {
@@ -72,22 +117,58 @@ func updateFinalizedBlockRoots(ctx context.Context, tx *bolt.Tx, checkpoint *eth
traceutil.AnnotateError(span, err)
return err
}
return bkt.Put(block.ParentRoot, enc)
if err := bkt.Put(block.ParentRoot, enc); err != nil {
traceutil.AnnotateError(span, err)
return err
}
break
}
previousRoot = root
root = block.ParentRoot
}
// Upsert blocks from the current finalized epoch.
roots, err := kv.BlockRoots(ctx, filters.NewFilter().SetStartSlot(helpers.StartSlot(checkpoint.Epoch)).SetEndSlot(helpers.StartSlot(checkpoint.Epoch+1)))
if err != nil {
traceutil.AnnotateError(span, err)
return err
}
for _, root := range roots {
root := root[:]
if bytes.Equal(root, checkpoint.Root) || bkt.Get(root) != nil {
continue
}
if err := bkt.Put(root, containerFinalizedButNotCanonical); err != nil {
traceutil.AnnotateError(span, err)
return err
}
}
// Update previous checkpoint
enc, err := proto.Marshal(checkpoint)
if err != nil {
traceutil.AnnotateError(span, err)
return err
}
return bkt.Put(previousFinalizedCheckpointKey, enc)
}
// IsFinalizedBlock returns true if the block root is present in the finalized block root index.
// A beacon block root contained exists in this index if it is considered finalized and canonical.
// Note: beacon blocks from the latest finalized epoch return true, whether or not they are
// considered canonical in the "head view" of the beacon node.
func (kv *Store) IsFinalizedBlock(ctx context.Context, blockRoot [32]byte) bool {
ctx, span := trace.StartSpan(ctx, "BeaconDB.IsFinalizedBlock")
defer span.End()
var exists bool
kv.db.View(func(tx *bolt.Tx) error {
err := kv.db.View(func(tx *bolt.Tx) error {
exists = tx.Bucket(finalizedBlockRootsIndexBucket).Get(blockRoot[:]) != nil
return nil
})
if err != nil {
traceutil.AnnotateError(span, err)
}
return exists
}

View File

@@ -8,11 +8,13 @@ import (
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
ethpb "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/params"
)
var genesisBlockRoot = bytesutil.ToBytes32([]byte{'G', 'E', 'N', 'E', 'S', 'I', 'S'})
func TestStore_IsFinalizedBlock(t *testing.T) {
slotsPerEpoch := int(params.BeaconConfig().SlotsPerEpoch)
db := setupDB(t)
defer teardownDB(t, db)
ctx := context.Background()
@@ -21,12 +23,12 @@ func TestStore_IsFinalizedBlock(t *testing.T) {
t.Fatal(err)
}
blks := makeBlocks(t, 128)
blks := makeBlocks(t, 0, slotsPerEpoch*3, genesisBlockRoot)
if err := db.SaveBlocks(ctx, blks); err != nil {
t.Fatal(err)
}
root, err := ssz.SigningRoot(blks[64])
root, err := ssz.SigningRoot(blks[slotsPerEpoch])
if err != nil {
t.Fatal(err)
}
@@ -45,8 +47,8 @@ func TestStore_IsFinalizedBlock(t *testing.T) {
t.Fatal(err)
}
// All blocks up to 64 should be in the finalized index.
for i := 0; i <= 64; i++ {
// All blocks up to slotsPerEpoch*2 should be in the finalized index.
for i := 0; i < slotsPerEpoch*2; i++ {
root, err := ssz.SigningRoot(blks[i])
if err != nil {
t.Fatal(err)
@@ -55,7 +57,7 @@ func TestStore_IsFinalizedBlock(t *testing.T) {
t.Errorf("Block at index %d was not considered finalized in the index", i)
}
}
for i := 65; i < len(blks); i++ {
for i := slotsPerEpoch*3; i < len(blks); i++ {
root, err := ssz.SigningRoot(blks[i])
if err != nil {
t.Fatal(err)
@@ -66,18 +68,110 @@ func TestStore_IsFinalizedBlock(t *testing.T) {
}
}
func makeBlocks(t *testing.T, n int) []*ethpb.BeaconBlock {
previousRoot := genesisBlockRoot
// This test scenario is to test a specific edge case where the finalized block root is not part of
// the finalized and canonical chain.
//
// Example:
// 0 1 2 3 4 5 6 slot
// a <- b <-- d <- e <- f <- g roots
// ^- c
// Imagine that epochs are 2 slots and that epoch 1, 2, and 3 are finalized. Checkpoint roots would
// be c, e, and g. In this scenario, c was a finalized checkpoint root but no block built upon it so
// it should not be considered "final and canonical" in the view at slot 6.
func TestStore_IsFinalized_ForkEdgeCase(t *testing.T) {
slotsPerEpoch := int(params.BeaconConfig().SlotsPerEpoch)
blocks0 := makeBlocks(t, slotsPerEpoch*0, slotsPerEpoch, genesisBlockRoot)
blocks1 := append(
makeBlocks(t, slotsPerEpoch*1, 1, bytesutil.ToBytes32(sszRootOrDie(t, blocks0[len(blocks0)-1]))), // No block builds off of the first block in epoch.
makeBlocks(t, slotsPerEpoch*1+1, slotsPerEpoch-1, bytesutil.ToBytes32(sszRootOrDie(t, blocks0[len(blocks0)-1])))...
)
blocks2 := makeBlocks(t, slotsPerEpoch*2, slotsPerEpoch, bytesutil.ToBytes32(sszRootOrDie(t, blocks1[len(blocks1)-1])))
db := setupDB(t)
defer teardownDB(t, db)
ctx := context.Background()
if err := db.SaveGenesisBlockRoot(ctx, genesisBlockRoot); err != nil {
t.Fatal(err)
}
if err := db.SaveBlocks(ctx, blocks0); err != nil {
t.Fatal(err)
}
if err := db.SaveBlocks(ctx, blocks1); err != nil {
t.Fatal(err)
}
if err := db.SaveBlocks(ctx, blocks2); err != nil {
t.Fatal(err)
}
// First checkpoint
checkpoint1 := &ethpb.Checkpoint{
Root: sszRootOrDie(t, blocks1[0]),
Epoch: 1,
}
// A state is required to save checkpoint
if err := db.SaveState(ctx, &pb.BeaconState{}, bytesutil.ToBytes32(checkpoint1.Root)); err != nil {
t.Fatal(err)
}
if err := db.SaveFinalizedCheckpoint(ctx, checkpoint1); err != nil {
t.Fatal(err)
}
// All blocks in blocks0 and blocks1 should be finalized and canonical.
for i, block := range append(blocks0, blocks1...) {
root := sszRootOrDie(t, block)
if !db.IsFinalizedBlock(ctx, bytesutil.ToBytes32(root)) {
t.Errorf("%d - Expected block %#x to be finalized", i, root)
}
}
// Second checkpoint
checkpoint2 := &ethpb.Checkpoint{
Root: sszRootOrDie(t, blocks2[0]),
Epoch: 2,
}
// A state is required to save checkpoint
if err := db.SaveState(ctx, &pb.BeaconState{}, bytesutil.ToBytes32(checkpoint2.Root)); err != nil {
t.Fatal(err)
}
if err := db.SaveFinalizedCheckpoint(ctx, checkpoint2); err != nil {
t.Error(err)
}
// All blocks in blocks0 and blocks2 should be finalized and canonical.
for i, block := range append(blocks0, blocks2...) {
root := sszRootOrDie(t, block)
if !db.IsFinalizedBlock(ctx, bytesutil.ToBytes32(root)) {
t.Errorf("%d - Expected block %#x to be finalized", i, root)
}
}
// All blocks in blocks1 should be finalized and canonical, except blocks1[0].
for i, block := range blocks1 {
root := sszRootOrDie(t, block)
if db.IsFinalizedBlock(ctx, bytesutil.ToBytes32(root)) == (i == 0) {
t.Errorf("Expected db.IsFinalizedBlock(ctx, blocks1[%d]) to be %v", i, i != 0)
}
}
}
func sszRootOrDie(t *testing.T, block *ethpb.BeaconBlock) []byte {
root, err := ssz.SigningRoot(block)
if err != nil {
t.Fatal(err)
}
return root[:]
}
func makeBlocks(t *testing.T, i, n int, previousRoot [32]byte) []*ethpb.BeaconBlock {
blocks := make([]*ethpb.BeaconBlock, n)
for i := 0; i < n; i++ {
for j := i; j < n+i; j++ {
parentRoot := make([]byte, 32)
copy(parentRoot, previousRoot[:])
blocks[i] = &ethpb.BeaconBlock{
Slot: uint64(i + 1),
blocks[j-i] = &ethpb.BeaconBlock{
Slot: uint64(j + 1),
ParentRoot: parentRoot,
}
var err error
previousRoot, err = ssz.SigningRoot(blocks[i])
previousRoot, err = ssz.SigningRoot(blocks[j-i])
if err != nil {
t.Fatal(err)
}

View File

@@ -73,6 +73,11 @@ func (s *InitialSync) roundRobinSync(genesis time.Time) error {
// Handle block large block ranges of skipped slots.
start += count * uint64(lastEmptyRequests*len(peers))
// Short circuit start far exceeding the highest finalized epoch in some infinite loop.
if start > helpers.StartSlot(highestFinalizedEpoch()+1) {
return nil, errors.New("attempted to ask for a start slot greater than the next highest epoch")
}
atomic.AddInt32(&p2pRequestCount, int32(len(peers)))
for i, pid := range peers {
if ctx.Err() != nil {

View File

@@ -85,7 +85,13 @@ func (r *RegularSync) beaconBlocksByRangeRPCHandler(ctx context.Context, msg int
return err
}
for i, blk := range blks {
if blk != nil && (blk.Slot-startSlot)%m.Step == 0 && (blk.Slot > helpers.StartSlot(checkpoint.Epoch+1) || r.db.IsFinalizedBlock(ctx, roots[i])) {
if blk == nil {
continue
}
isRequestedSlotStep := (blk.Slot-startSlot)%m.Step == 0
isRecentUnfinalizedSlot := blk.Slot >= helpers.StartSlot(checkpoint.Epoch+1) || checkpoint.Epoch == 0
if isRequestedSlotStep && (isRecentUnfinalizedSlot || r.db.IsFinalizedBlock(ctx, roots[i])) {
if err := r.chunkWriter(stream, blk); err != nil {
log.WithError(err).Error("Failed to send a chunked response")
return err