Update validators db during epoch boundary (#3307)

This commit is contained in:
terence tsao
2019-08-26 09:02:17 -07:00
committed by Raul Jordan
parent 5828278807
commit 0b5b3865ef
3 changed files with 192 additions and 45 deletions

View File

@@ -4,9 +4,13 @@ import (
"bytes"
"context"
"encoding/hex"
"fmt"
"github.com/pkg/errors"
"github.com/prysmaticlabs/go-ssz"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/core/validators"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
ethpb "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/sirupsen/logrus"
@@ -96,6 +100,13 @@ func (c *ChainService) ReceiveBlockNoPubsub(ctx context.Context, block *ethpb.Be
return errors.Wrap(err, "could not clean up block deposits, attestations, and other operations")
}
// Update validator's public key to indices mapping in DB.
if helpers.IsEpochStart(block.Slot) {
if err := c.updateValidatorsDB(ctx, root); err != nil {
return errors.Wrap(err, "could not update validators db")
}
}
processedBlkNoPubsub.Inc()
return nil
}
@@ -128,6 +139,13 @@ func (c *ChainService) ReceiveBlockNoPubsubForkchoice(ctx context.Context, block
return errors.Wrap(err, "could not clean up block deposits, attestations, and other operations")
}
// Update validator's public key to indices mapping in DB.
if helpers.IsEpochStart(block.Slot) {
if err := c.updateValidatorsDB(ctx, root); err != nil {
return errors.Wrap(err, "could not update validators db")
}
}
processedBlkNoPubsubForkchoice.Inc()
return nil
}
@@ -149,6 +167,64 @@ func (c *ChainService) cleanupBlockOperations(ctx context.Context, block *ethpb.
return nil
}
// this updates validator's public key to indices mapping stored in DB, due to the frequent
// validator activation and exit, we should check this every epoch.
func (c *ChainService) updateValidatorsDB(ctx context.Context, r [32]byte) error {
s, err := c.beaconDB.State(ctx, r)
if err != nil {
return errors.Wrap(err, "could not retrieve latest processed state in DB")
}
if err := c.saveValidatorIdx(ctx, s); err != nil {
return errors.Wrap(err, "could not save validator index")
}
if err := c.deleteValidatorIdx(ctx, s); err != nil {
return errors.Wrap(err, "could not delete validator index")
}
return nil
}
// saveValidatorIdx saves the validators public key to index mapping in DB, these
// validators were activated from current epoch. After it saves, current epoch key
// is deleted from ActivatedValidators mapping.
func (c *ChainService) saveValidatorIdx(ctx context.Context, state *pb.BeaconState) error {
nextEpoch := helpers.CurrentEpoch(state) + 1
activatedValidators := validators.ActivatedValFromEpoch(nextEpoch)
var idxNotInState []uint64
fmt.Println(activatedValidators)
for _, idx := range activatedValidators {
// If for some reason the activated validator indices is not in state,
// we skip them and save them to process for next epoch.
if int(idx) >= len(state.Validators) {
idxNotInState = append(idxNotInState, idx)
continue
}
pubKey := state.Validators[idx].PublicKey
if err := c.beaconDB.SaveValidatorIndex(ctx, bytesutil.ToBytes48(pubKey), idx); err != nil {
return errors.Wrap(err, "could not save validator index")
}
}
// Since we are processing next epoch, save the can't processed validator indices
// to the epoch after that.
validators.InsertActivatedIndices(nextEpoch+1, idxNotInState)
validators.DeleteActivatedVal(helpers.CurrentEpoch(state))
return nil
}
// deleteValidatorIdx deletes the validators public key to index mapping in DB, the
// validators were exited from current epoch. After it deletes, current epoch key
// is deleted from ExitedValidators mapping.
func (c *ChainService) deleteValidatorIdx(ctx context.Context, state *pb.BeaconState) error {
exitedValidators := validators.ExitedValFromEpoch(helpers.CurrentEpoch(state) + 1)
for _, idx := range exitedValidators {
pubKey := state.Validators[idx].PublicKey
if err := c.beaconDB.DeleteValidatorIndex(ctx, bytesutil.ToBytes48(pubKey)); err != nil {
return errors.Wrap(err, "could not delete validator index")
}
}
validators.DeleteExitedVal(helpers.CurrentEpoch(state))
return nil
}
// This checks if the block is from a competing chain, emits warning and updates metrics.
func isCompetingBlock(root []byte, slot uint64, headRoot []byte, headSlot uint64) {
if !bytes.Equal(root[:], headRoot) {

View File

@@ -116,7 +116,7 @@ func TestReceiveBlock_ProcessCorrectly(t *testing.T) {
testutil.AssertLogsContain(t, hook, "Finished applying fork choice for block")
}
func TestReceiveBlock_CanSaveHeadInfo(t *testing.T) {
func TestReceiveReceiveBlockNoPubsub_CanSaveHeadInfo(t *testing.T) {
db := testDB.SetupDB(t)
defer testDB.TeardownDB(t, db)
ctx := context.Background()
@@ -133,7 +133,9 @@ func TestReceiveBlock_CanSaveHeadInfo(t *testing.T) {
}
chainService.forkChoiceStore = &store{headRoot: r[:]}
if err := chainService.ReceiveBlockNoPubsub(ctx, &ethpb.BeaconBlock{Body: &ethpb.BeaconBlockBody{}}); err != nil {
if err := chainService.ReceiveBlockNoPubsub(ctx, &ethpb.BeaconBlock{
Slot: 1,
Body: &ethpb.BeaconBlockBody{}}); err != nil {
t.Fatal(err)
}
@@ -146,6 +148,62 @@ func TestReceiveBlock_CanSaveHeadInfo(t *testing.T) {
}
}
func TestReceiveBlockNoPubsub_CanUpdateValidatorDB(t *testing.T) {
db := testDB.SetupDB(t)
defer testDB.TeardownDB(t, db)
ctx := context.Background()
chainService := setupBeaconChain(t, db)
b := &ethpb.BeaconBlock{
Slot: params.BeaconConfig().SlotsPerEpoch,
Body: &ethpb.BeaconBlockBody{}}
bRoot, err := ssz.SigningRoot(b)
if err != nil {
t.Fatal(err)
}
if err := db.SaveState(ctx, &pb.BeaconState{
Validators: []*ethpb.Validator{
{PublicKey: []byte{'A'}},
{PublicKey: []byte{'B'}},
{PublicKey: []byte{'C'}},
{PublicKey: []byte{'D'}},
},
}, bRoot); err != nil {
t.Fatal(err)
}
headBlk := &ethpb.BeaconBlock{Slot: 100}
if err := db.SaveBlock(ctx, headBlk); err != nil {
t.Fatal(err)
}
r, err := ssz.SigningRoot(headBlk)
if err != nil {
t.Fatal(err)
}
chainService.forkChoiceStore = &store{headRoot: r[:]}
v.InsertActivatedIndices(1, []uint64{1, 2})
if err := chainService.ReceiveBlockNoPubsub(ctx, b); err != nil {
t.Fatal(err)
}
index, _, _ := db.ValidatorIndex(ctx, bytesutil.ToBytes48([]byte{'B'}))
if index != 1 {
t.Errorf("Wanted: %d, got: %d", 1, index)
}
index, _, _ = db.ValidatorIndex(ctx, bytesutil.ToBytes48([]byte{'C'}))
if index != 2 {
t.Errorf("Wanted: %d, got: %d", 2, index)
}
_, e, _ := db.ValidatorIndex(ctx, bytesutil.ToBytes48([]byte{'D'}))
if e == true {
t.Error("Index should not exist in DB")
}
}
func TestReceiveBlockNoPubsubForkchoice_ProcessCorrectly(t *testing.T) {
hook := logTest.NewGlobal()
db := testDB.SetupDB(t)
@@ -240,6 +298,62 @@ func TestReceiveBlockNoPubsubForkchoice_ProcessCorrectly(t *testing.T) {
testutil.AssertLogsDoNotContain(t, hook, "Finished fork choice")
}
func TestReceiveBlockNoPubsubForkchoice_CanUpdateValidatorDB(t *testing.T) {
db := testDB.SetupDB(t)
defer testDB.TeardownDB(t, db)
ctx := context.Background()
chainService := setupBeaconChain(t, db)
b := &ethpb.BeaconBlock{
Slot: params.BeaconConfig().SlotsPerEpoch,
Body: &ethpb.BeaconBlockBody{}}
bRoot, err := ssz.SigningRoot(b)
if err != nil {
t.Fatal(err)
}
if err := db.SaveState(ctx, &pb.BeaconState{
Validators: []*ethpb.Validator{
{PublicKey: []byte{'X'}},
{PublicKey: []byte{'Y'}},
{PublicKey: []byte{'Z'}},
},
}, bRoot); err != nil {
t.Fatal(err)
}
headBlk := &ethpb.BeaconBlock{Slot: 100}
if err := db.SaveBlock(ctx, headBlk); err != nil {
t.Fatal(err)
}
r, err := ssz.SigningRoot(headBlk)
if err != nil {
t.Fatal(err)
}
chainService.forkChoiceStore = &store{headRoot: r[:]}
v.DeleteActivatedVal(1)
v.InsertActivatedIndices(1, []uint64{0})
if err := chainService.ReceiveBlockNoPubsub(ctx, b); err != nil {
t.Fatal(err)
}
index, _, _ := db.ValidatorIndex(ctx, bytesutil.ToBytes48([]byte{'X'}))
if index != 0 {
t.Errorf("Wanted: %d, got: %d", 1, index)
}
_, e, _ := db.ValidatorIndex(ctx, bytesutil.ToBytes48([]byte{'Y'}))
if e == true {
t.Error("Index should not exist in DB")
}
_, e, _ = db.ValidatorIndex(ctx, bytesutil.ToBytes48([]byte{'Z'}))
if e == true {
t.Error("Index should not exist in DB")
}
}
func TestSaveValidatorIdx_SaveRetrieveWorks(t *testing.T) {
db := internal.SetupDBDeprecated(t)
defer internal.TeardownDBDeprecated(t, db)

View File

@@ -16,9 +16,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain/forkchoice"
"github.com/prysmaticlabs/prysm/beacon-chain/cache/depositcache"
"github.com/prysmaticlabs/prysm/beacon-chain/core/blocks"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/core/state"
"github.com/prysmaticlabs/prysm/beacon-chain/core/validators"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
"github.com/prysmaticlabs/prysm/beacon-chain/operations"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
@@ -190,47 +188,6 @@ func (c *ChainService) StateInitializedFeed() *event.Feed {
return c.stateInitializedFeed
}
// saveValidatorIdx saves the validators public key to index mapping in DB, these
// validators were activated from current epoch. After it saves, current epoch key
// is deleted from ActivatedValidators mapping.
func (c *ChainService) saveValidatorIdx(ctx context.Context, state *pb.BeaconState) error {
nextEpoch := helpers.CurrentEpoch(state) + 1
activatedValidators := validators.ActivatedValFromEpoch(nextEpoch)
var idxNotInState []uint64
for _, idx := range activatedValidators {
// If for some reason the activated validator indices is not in state,
// we skip them and save them to process for next epoch.
if int(idx) >= len(state.Validators) {
idxNotInState = append(idxNotInState, idx)
continue
}
pubKey := state.Validators[idx].PublicKey
if err := c.beaconDB.SaveValidatorIndex(ctx, bytesutil.ToBytes48(pubKey), idx); err != nil {
return errors.Wrap(err, "could not save validator index")
}
}
// Since we are processing next epoch, save the can't processed validator indices
// to the epoch after that.
validators.InsertActivatedIndices(nextEpoch+1, idxNotInState)
validators.DeleteActivatedVal(helpers.CurrentEpoch(state))
return nil
}
// deleteValidatorIdx deletes the validators public key to index mapping in DB, the
// validators were exited from current epoch. After it deletes, current epoch key
// is deleted from ExitedValidators mapping.
func (c *ChainService) deleteValidatorIdx(ctx context.Context, state *pb.BeaconState) error {
exitedValidators := validators.ExitedValFromEpoch(helpers.CurrentEpoch(state) + 1)
for _, idx := range exitedValidators {
pubKey := state.Validators[idx].PublicKey
if err := c.beaconDB.DeleteValidatorIndex(ctx, bytesutil.ToBytes48(pubKey)); err != nil {
return errors.Wrap(err, "could not delete validator index")
}
}
validators.DeleteExitedVal(helpers.CurrentEpoch(state))
return nil
}
// This gets called to update canonical root mapping.
func (c *ChainService) saveHead(ctx context.Context, b *ethpb.BeaconBlock, r [32]byte) error {