Part 1 of block chain service refactor - move process block (#4670)

This commit is contained in:
terence tsao
2020-01-27 13:48:16 -08:00
committed by GitHub
parent 1fa301c79c
commit eaf7ae3774
12 changed files with 1441 additions and 28 deletions

View File

@@ -7,6 +7,8 @@ go_library(
"info.go",
"log.go",
"metrics.go",
"process_block.go",
"process_block_helpers.go",
"receive_attestation.go",
"receive_block.go",
"service.go",
@@ -23,6 +25,7 @@ go_library(
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/state:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/db/filters:go_default_library",
"//beacon-chain/forkchoice:go_default_library",
"//beacon-chain/forkchoice/protoarray:go_default_library",
"//beacon-chain/operations/attestations:go_default_library",
@@ -61,6 +64,7 @@ go_test(
size = "medium",
srcs = [
"chain_info_test.go",
"process_block_test.go",
"receive_attestation_test.go",
"receive_block_test.go",
"service_test.go",

View File

@@ -68,7 +68,7 @@ func (s *Service) TreeHandler(w http.ResponseWriter, _ *http.Request) {
dotNodes[i] = &dotN
}
for i := len(nodes) - 1; i >= 0; i-- {
for i := len(nodes) - 1; i >= 0; i-- {
if nodes[i].Parent != ^uint64(0) && nodes[i].Parent < uint64(len(dotNodes)) {
graph.Edge(*dotNodes[i], *dotNodes[nodes[i].Parent])
}

View File

@@ -1,7 +1,12 @@
package blockchain
import (
"fmt"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/sirupsen/logrus"
)
@@ -15,3 +20,30 @@ func logStateTransitionData(b *ethpb.BeaconBlock) {
"deposits": len(b.Body.Deposits),
}).Info("Finished applying state transition")
}
func logEpochData(beaconState *pb.BeaconState) {
log.WithFields(logrus.Fields{
"epoch": helpers.CurrentEpoch(beaconState),
"finalizedEpoch": beaconState.FinalizedCheckpoint.Epoch,
"justifiedEpoch": beaconState.CurrentJustifiedCheckpoint.Epoch,
"previousJustifiedEpoch": beaconState.PreviousJustifiedCheckpoint.Epoch,
}).Info("Starting next epoch")
activeVals, err := helpers.ActiveValidatorIndices(beaconState, helpers.CurrentEpoch(beaconState))
if err != nil {
log.WithError(err).Error("Could not get active validator indices")
return
}
log.WithFields(logrus.Fields{
"totalValidators": len(beaconState.Validators),
"activeValidators": len(activeVals),
"averageBalance": fmt.Sprintf("%.5f ETH", averageBalance(beaconState.Balances)),
}).Info("Validator registry information")
}
func averageBalance(balances []uint64) float64 {
total := uint64(0)
for i := 0; i < len(balances); i++ {
total += balances[i]
}
return float64(total) / float64(len(balances)) / float64(params.BeaconConfig().GweiPerEth)
}

View File

@@ -3,7 +3,10 @@ package blockchain
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prysmaticlabs/prysm/beacon-chain/core/epoch/precompute"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/params"
)
var (
@@ -44,13 +47,73 @@ var (
Help: "The # of processed attestation with pubsub and fork choice, this ususally means attestations from rpc",
})
headFinalizedEpoch = promauto.NewGauge(prometheus.GaugeOpts{
Name: "head_finalized_epoch",
Name: "chain_service_head_finalized_epoch",
Help: "Last finalized epoch of the head state",
})
headFinalizedRoot = promauto.NewGauge(prometheus.GaugeOpts{
Name: "head_finalized_root",
Name: "chain_service_head_finalized_root",
Help: "Last finalized root of the head state",
})
beaconFinalizedEpoch = promauto.NewGauge(prometheus.GaugeOpts{
Name: "chain_service_beacon_finalized_epoch",
Help: "Last finalized epoch of the processed state",
})
beaconFinalizedRoot = promauto.NewGauge(prometheus.GaugeOpts{
Name: "chain_service_beacon_finalized_root",
Help: "Last finalized root of the processed state",
})
cacheFinalizedEpoch = promauto.NewGauge(prometheus.GaugeOpts{
Name: "chain_service_cache_finalized_epoch",
Help: "Last cached finalized epoch",
})
cacheFinalizedRoot = promauto.NewGauge(prometheus.GaugeOpts{
Name: "chain_service_cache_finalized_root",
Help: "Last cached finalized root",
})
beaconCurrentJustifiedEpoch = promauto.NewGauge(prometheus.GaugeOpts{
Name: "chain_service_beacon_current_justified_epoch",
Help: "Current justified epoch of the processed state",
})
beaconCurrentJustifiedRoot = promauto.NewGauge(prometheus.GaugeOpts{
Name: "chain_service_beacon_current_justified_root",
Help: "Current justified root of the processed state",
})
beaconPrevJustifiedEpoch = promauto.NewGauge(prometheus.GaugeOpts{
Name: "chain_service_beacon_previous_justified_epoch",
Help: "Previous justified epoch of the processed state",
})
beaconPrevJustifiedRoot = promauto.NewGauge(prometheus.GaugeOpts{
Name: "chain_service_beacon_previous_justified_root",
Help: "Previous justified root of the processed state",
})
sigFailsToVerify = promauto.NewCounter(prometheus.CounterOpts{
Name: "chain_service_att_signature_failed_to_verify_with_cache",
Help: "Number of attestation signatures that failed to verify with cache on, but succeeded without cache",
})
validatorsCount = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "chain_service_validator_count",
Help: "The total number of validators",
}, []string{"state"})
validatorsBalance = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "chain_service_validators_total_balance",
Help: "The total balance of validators, in GWei",
}, []string{"state"})
validatorsEffectiveBalance = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "chain_service_validators_total_effective_balance",
Help: "The total effective balance of validators, in GWei",
}, []string{"state"})
currentEth1DataDepositCount = promauto.NewGauge(prometheus.GaugeOpts{
Name: "chain_service_current_eth1_data_deposit_count",
Help: "The current eth1 deposit count in the last processed state eth1data field.",
})
totalEligibleBalances = promauto.NewGauge(prometheus.GaugeOpts{
Name: "chain_service_total_eligible_balances",
Help: "The total amount of ether, in gwei, that has been used in voting attestation target of previous epoch",
})
totalVotedTargetBalances = promauto.NewGauge(prometheus.GaugeOpts{
Name: "chain_service_total_voted_target_balances",
Help: "The total amount of ether, in gwei, that is eligible for voting of previous epoch",
})
)
func (s *Service) reportSlotMetrics(currentSlot uint64) {
@@ -61,3 +124,91 @@ func (s *Service) reportSlotMetrics(currentSlot uint64) {
headFinalizedRoot.Set(float64(bytesutil.ToLowInt64(s.headState.FinalizedCheckpoint.Root)))
}
}
func reportEpochMetrics(state *pb.BeaconState) {
currentEpoch := state.Slot / params.BeaconConfig().SlotsPerEpoch
// Validator instances
pendingInstances := 0
activeInstances := 0
slashingInstances := 0
slashedInstances := 0
exitingInstances := 0
exitedInstances := 0
// Validator balances
pendingBalance := uint64(0)
activeBalance := uint64(0)
activeEffectiveBalance := uint64(0)
exitingBalance := uint64(0)
exitingEffectiveBalance := uint64(0)
slashingBalance := uint64(0)
slashingEffectiveBalance := uint64(0)
for i, validator := range state.Validators {
if validator.Slashed {
if currentEpoch < validator.ExitEpoch {
slashingInstances++
slashingBalance += state.Balances[i]
slashingEffectiveBalance += validator.EffectiveBalance
} else {
slashedInstances++
}
continue
}
if validator.ExitEpoch != params.BeaconConfig().FarFutureEpoch {
if currentEpoch < validator.ExitEpoch {
exitingInstances++
exitingBalance += state.Balances[i]
exitingEffectiveBalance += validator.EffectiveBalance
} else {
exitedInstances++
}
continue
}
if currentEpoch < validator.ActivationEpoch {
pendingInstances++
pendingBalance += state.Balances[i]
continue
}
activeInstances++
activeBalance += state.Balances[i]
activeEffectiveBalance += validator.EffectiveBalance
}
validatorsCount.WithLabelValues("Pending").Set(float64(pendingInstances))
validatorsCount.WithLabelValues("Active").Set(float64(activeInstances))
validatorsCount.WithLabelValues("Exiting").Set(float64(exitingInstances))
validatorsCount.WithLabelValues("Exited").Set(float64(exitedInstances))
validatorsCount.WithLabelValues("Slashing").Set(float64(slashingInstances))
validatorsCount.WithLabelValues("Slashed").Set(float64(slashedInstances))
validatorsBalance.WithLabelValues("Pending").Set(float64(pendingBalance))
validatorsBalance.WithLabelValues("Active").Set(float64(activeBalance))
validatorsBalance.WithLabelValues("Exiting").Set(float64(exitingBalance))
validatorsBalance.WithLabelValues("Slashing").Set(float64(slashingBalance))
validatorsEffectiveBalance.WithLabelValues("Active").Set(float64(activeEffectiveBalance))
validatorsEffectiveBalance.WithLabelValues("Exiting").Set(float64(exitingEffectiveBalance))
validatorsEffectiveBalance.WithLabelValues("Slashing").Set(float64(slashingEffectiveBalance))
// Last justified slot
if state.CurrentJustifiedCheckpoint != nil {
beaconCurrentJustifiedEpoch.Set(float64(state.CurrentJustifiedCheckpoint.Epoch))
beaconCurrentJustifiedRoot.Set(float64(bytesutil.ToLowInt64(state.CurrentJustifiedCheckpoint.Root)))
}
// Last previous justified slot
if state.PreviousJustifiedCheckpoint != nil {
beaconPrevJustifiedEpoch.Set(float64(state.PreviousJustifiedCheckpoint.Epoch))
beaconPrevJustifiedRoot.Set(float64(bytesutil.ToLowInt64(state.PreviousJustifiedCheckpoint.Root)))
}
// Last finalized slot
if state.FinalizedCheckpoint != nil {
beaconFinalizedEpoch.Set(float64(state.FinalizedCheckpoint.Epoch))
beaconFinalizedRoot.Set(float64(bytesutil.ToLowInt64(state.FinalizedCheckpoint.Root)))
}
if state.Eth1Data != nil {
currentEth1DataDepositCount.Set(float64(state.Eth1Data.DepositCount))
}
if precompute.Balances != nil {
totalEligibleBalances.Set(float64(precompute.Balances.PrevEpoch))
totalVotedTargetBalances.Set(float64(precompute.Balances.PrevEpochTargetAttesters))
}
}

View File

@@ -0,0 +1,232 @@
package blockchain
import (
"context"
"encoding/hex"
"fmt"
"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-ssz"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/core/state"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)
// onBlock is called when a gossip block is received. It runs regular state transition on the block.
//
// Spec pseudocode definition:
// def on_block(store: Store, block: BeaconBlock) -> None:
// # Make a copy of the state to avoid mutability issues
// assert block.parent_root in store.block_states
// pre_state = store.block_states[block.parent_root].copy()
// # Blocks cannot be in the future. If they are, their consideration must be delayed until the are in the past.
// assert store.time >= pre_state.genesis_time + block.slot * SECONDS_PER_SLOT
// # Add new block to the store
// store.blocks[signing_root(block)] = block
// # Check block is a descendant of the finalized block
// assert (
// get_ancestor(store, signing_root(block), store.blocks[store.finalized_checkpoint.root].slot) ==
// store.finalized_checkpoint.root
// )
// # Check that block is later than the finalized epoch slot
// assert block.slot > compute_start_slot_of_epoch(store.finalized_checkpoint.epoch)
// # Check the block is valid and compute the post-state
// state = state_transition(pre_state, block)
// # Add new state for this block to the store
// store.block_states[signing_root(block)] = state
//
// # Update justified checkpoint
// if state.current_justified_checkpoint.epoch > store.justified_checkpoint.epoch:
// if state.current_justified_checkpoint.epoch > store.best_justified_checkpoint.epoch:
// store.best_justified_checkpoint = state.current_justified_checkpoint
//
// # Update finalized checkpoint
// if state.finalized_checkpoint.epoch > store.finalized_checkpoint.epoch:
// store.finalized_checkpoint = state.finalized_checkpoint
func (s *Service) onBlock(ctx context.Context, signed *ethpb.SignedBeaconBlock) (*pb.BeaconState, error) {
ctx, span := trace.StartSpan(ctx, "forkchoice.onBlock")
defer span.End()
if signed == nil || signed.Block == nil {
return nil, errors.New("nil block")
}
b := signed.Block
// Retrieve incoming block's pre state.
preState, err := s.getBlockPreState(ctx, b)
if err != nil {
return nil, err
}
preStateValidatorCount := len(preState.Validators)
root, err := ssz.HashTreeRoot(b)
if err != nil {
return nil, errors.Wrapf(err, "could not get signing root of block %d", b.Slot)
}
log.WithFields(logrus.Fields{
"slot": b.Slot,
"root": fmt.Sprintf("0x%s...", hex.EncodeToString(root[:])[:8]),
}).Info("Executing state transition on block")
postState, err := state.ExecuteStateTransition(ctx, preState, signed)
if err != nil {
return nil, errors.Wrap(err, "could not execute state transition")
}
if err := s.beaconDB.SaveBlock(ctx, signed); err != nil {
return nil, errors.Wrapf(err, "could not save block from slot %d", b.Slot)
}
if err := s.beaconDB.SaveState(ctx, postState, root); err != nil {
return nil, errors.Wrap(err, "could not save state")
}
// Update justified check point.
if postState.CurrentJustifiedCheckpoint.Epoch > s.justifiedCheckpt.Epoch {
if err := s.updateJustified(ctx, postState); err != nil {
return nil, err
}
}
// Update finalized check point. Prune the block cache and helper caches on every new finalized epoch.
if postState.FinalizedCheckpoint.Epoch > s.finalizedCheckpt.Epoch {
if err := s.beaconDB.SaveFinalizedCheckpoint(ctx, postState.FinalizedCheckpoint); err != nil {
return nil, errors.Wrap(err, "could not save finalized checkpoint")
}
startSlot := helpers.StartSlot(s.prevFinalizedCheckpt.Epoch)
endSlot := helpers.StartSlot(s.finalizedCheckpt.Epoch)
if endSlot > startSlot {
if err := s.rmStatesOlderThanLastFinalized(ctx, startSlot, endSlot); err != nil {
return nil, errors.Wrapf(err, "could not delete states prior to finalized check point, range: %d, %d",
startSlot, endSlot)
}
}
s.prevFinalizedCheckpt = s.finalizedCheckpt
s.finalizedCheckpt = postState.FinalizedCheckpoint
}
// Update validator indices in database as needed.
if err := s.saveNewValidators(ctx, preStateValidatorCount, postState); err != nil {
return nil, errors.Wrap(err, "could not save finalized checkpoint")
}
// Epoch boundary bookkeeping such as logging epoch summaries.
if postState.Slot >= s.nextEpochBoundarySlot {
logEpochData(postState)
reportEpochMetrics(postState)
// Update committees cache at epoch boundary slot.
if err := helpers.UpdateCommitteeCache(postState, helpers.CurrentEpoch(postState)); err != nil {
return nil, err
}
if err := helpers.UpdateProposerIndicesInCache(postState, helpers.CurrentEpoch(postState)); err != nil {
return nil, err
}
s.nextEpochBoundarySlot = helpers.StartSlot(helpers.NextEpoch(postState))
}
return postState, nil
}
// onBlockInitialSyncStateTransition is called when an initial sync block is received.
// It runs state transition on the block and without any BLS verification. The excluded BLS verification
// includes attestation's aggregated signature. It also does not save attestations.
func (s *Service) onBlockInitialSyncStateTransition(ctx context.Context, signed *ethpb.SignedBeaconBlock) (*pb.BeaconState, error) {
ctx, span := trace.StartSpan(ctx, "forkchoice.onBlock")
defer span.End()
if signed == nil || signed.Block == nil {
return nil, errors.New("nil block")
}
b := signed.Block
s.initSyncStateLock.Lock()
defer s.initSyncStateLock.Unlock()
// Retrieve incoming block's pre state.
preState, err := s.cachedPreState(ctx, b)
if err != nil {
return nil, err
}
preStateValidatorCount := len(preState.Validators)
postState, err := state.ExecuteStateTransitionNoVerifyAttSigs(ctx, preState, signed)
if err != nil {
return nil, errors.Wrap(err, "could not execute state transition")
}
if err := s.beaconDB.SaveBlock(ctx, signed); err != nil {
return nil, errors.Wrapf(err, "could not save block from slot %d", b.Slot)
}
root, err := ssz.HashTreeRoot(b)
if err != nil {
return nil, errors.Wrapf(err, "could not get signing root of block %d", b.Slot)
}
if featureconfig.Get().InitSyncCacheState {
s.initSyncState[root] = postState
} else {
if err := s.beaconDB.SaveState(ctx, postState, root); err != nil {
return nil, errors.Wrap(err, "could not save state")
}
}
// Update justified check point.
if postState.CurrentJustifiedCheckpoint.Epoch > s.justifiedCheckpt.Epoch {
if err := s.updateJustified(ctx, postState); err != nil {
return nil, err
}
}
// Update finalized check point. Prune the block cache and helper caches on every new finalized epoch.
if postState.FinalizedCheckpoint.Epoch > s.finalizedCheckpt.Epoch {
startSlot := helpers.StartSlot(s.prevFinalizedCheckpt.Epoch)
endSlot := helpers.StartSlot(s.finalizedCheckpt.Epoch)
if endSlot > startSlot {
if err := s.rmStatesOlderThanLastFinalized(ctx, startSlot, endSlot); err != nil {
return nil, errors.Wrapf(err, "could not delete states prior to finalized check point, range: %d, %d",
startSlot, endSlot)
}
}
if err := s.saveInitState(ctx, postState); err != nil {
return nil, errors.Wrap(err, "could not save init sync finalized state")
}
if err := s.beaconDB.SaveFinalizedCheckpoint(ctx, postState.FinalizedCheckpoint); err != nil {
return nil, errors.Wrap(err, "could not save finalized checkpoint")
}
s.prevFinalizedCheckpt = s.finalizedCheckpt
s.finalizedCheckpt = postState.FinalizedCheckpoint
}
// Update validator indices in database as needed.
if err := s.saveNewValidators(ctx, preStateValidatorCount, postState); err != nil {
return nil, errors.Wrap(err, "could not save finalized checkpoint")
}
// Epoch boundary bookkeeping such as logging epoch summaries.
if postState.Slot >= s.nextEpochBoundarySlot {
reportEpochMetrics(postState)
s.nextEpochBoundarySlot = helpers.StartSlot(helpers.NextEpoch(postState))
// Update committees cache at epoch boundary slot.
if err := helpers.UpdateCommitteeCache(postState, helpers.CurrentEpoch(postState)); err != nil {
return nil, err
}
if err := helpers.UpdateProposerIndicesInCache(postState, helpers.CurrentEpoch(postState)); err != nil {
return nil, err
}
}
return postState, nil
}

View File

@@ -0,0 +1,363 @@
package blockchain
import (
"bytes"
"context"
"fmt"
"time"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-ssz"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/db/filters"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/traceutil"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)
// getBlockPreState returns the pre state of an incoming block. It uses the parent root of the block
// to retrieve the state in DB. It verifies the pre state's validity and the incoming block
// is in the correct time window.
func (s *Service) getBlockPreState(ctx context.Context, b *ethpb.BeaconBlock) (*pb.BeaconState, error) {
ctx, span := trace.StartSpan(ctx, "forkchoice.getBlockPreState")
defer span.End()
// Verify incoming block has a valid pre state.
preState, err := s.verifyBlkPreState(ctx, b)
if err != nil {
return nil, err
}
// Verify block slot time is not from the feature.
if err := helpers.VerifySlotTime(preState.GenesisTime, b.Slot); err != nil {
return nil, err
}
// Verify block is a descendent of a finalized block.
if err := s.verifyBlkDescendant(ctx, bytesutil.ToBytes32(b.ParentRoot), b.Slot); err != nil {
return nil, err
}
// Verify block is later than the finalized epoch slot.
if err := s.verifyBlkFinalizedSlot(b); err != nil {
return nil, err
}
return preState, nil
}
// verifyBlkPreState validates input block has a valid pre-state.
func (s *Service) verifyBlkPreState(ctx context.Context, b *ethpb.BeaconBlock) (*pb.BeaconState, error) {
preState, err := s.beaconDB.State(ctx, bytesutil.ToBytes32(b.ParentRoot))
if err != nil {
return nil, errors.Wrapf(err, "could not get pre state for slot %d", b.Slot)
}
if preState == nil {
return nil, fmt.Errorf("pre state of slot %d does not exist", b.Slot)
}
return preState, nil
}
// verifyBlkDescendant validates input block root is a descendant of the
// current finalized block root.
func (s *Service) verifyBlkDescendant(ctx context.Context, root [32]byte, slot uint64) error {
ctx, span := trace.StartSpan(ctx, "forkchoice.verifyBlkDescendant")
defer span.End()
finalizedBlkSigned, err := s.beaconDB.Block(ctx, bytesutil.ToBytes32(s.finalizedCheckpt.Root))
if err != nil || finalizedBlkSigned == nil || finalizedBlkSigned.Block == nil {
return errors.Wrap(err, "could not get finalized block")
}
finalizedBlk := finalizedBlkSigned.Block
bFinalizedRoot, err := s.ancestor(ctx, root[:], finalizedBlk.Slot)
if err != nil {
return errors.Wrap(err, "could not get finalized block root")
}
if !bytes.Equal(bFinalizedRoot, s.finalizedCheckpt.Root) {
err := fmt.Errorf("block from slot %d is not a descendent of the current finalized block slot %d, %#x != %#x",
slot, finalizedBlk.Slot, bytesutil.Trunc(bFinalizedRoot), bytesutil.Trunc(s.finalizedCheckpt.Root))
traceutil.AnnotateError(span, err)
return err
}
return nil
}
// verifyBlkFinalizedSlot validates input block is not less than or equal
// to current finalized slot.
func (s *Service) verifyBlkFinalizedSlot(b *ethpb.BeaconBlock) error {
finalizedSlot := helpers.StartSlot(s.finalizedCheckpt.Epoch)
if finalizedSlot >= b.Slot {
return fmt.Errorf("block is equal or earlier than finalized block, slot %d < slot %d", b.Slot, finalizedSlot)
}
return nil
}
// saveNewValidators saves newly added validator indices from the state to db.
// Does nothing if validator count has not changed.
func (s *Service) saveNewValidators(ctx context.Context, preStateValidatorCount int, postState *pb.BeaconState) error {
postStateValidatorCount := len(postState.Validators)
if preStateValidatorCount != postStateValidatorCount {
indices := make([]uint64, 0)
pubKeys := make([][]byte, 0)
for i := preStateValidatorCount; i < postStateValidatorCount; i++ {
indices = append(indices, uint64(i))
pubKeys = append(pubKeys, postState.Validators[i].PublicKey)
}
if err := s.beaconDB.SaveValidatorIndices(ctx, pubKeys, indices); err != nil {
return errors.Wrapf(err, "could not save activated validators: %v", indices)
}
log.WithFields(logrus.Fields{
"indices": indices,
"totalValidatorCount": postStateValidatorCount - preStateValidatorCount,
}).Info("Validator indices saved in DB")
}
return nil
}
// rmStatesOlderThanLastFinalized deletes the states in db since last finalized check point.
func (s *Service) rmStatesOlderThanLastFinalized(ctx context.Context, startSlot uint64, endSlot uint64) error {
ctx, span := trace.StartSpan(ctx, "forkchoice.rmStatesBySlots")
defer span.End()
// Make sure start slot is not a skipped slot
for i := startSlot; i > 0; i-- {
filter := filters.NewFilter().SetStartSlot(i).SetEndSlot(i)
b, err := s.beaconDB.Blocks(ctx, filter)
if err != nil {
return err
}
if len(b) > 0 {
startSlot = i
break
}
}
// Make sure finalized slot is not a skipped slot.
for i := endSlot; i > 0; i-- {
filter := filters.NewFilter().SetStartSlot(i).SetEndSlot(i)
b, err := s.beaconDB.Blocks(ctx, filter)
if err != nil {
return err
}
if len(b) > 0 {
endSlot = i - 1
break
}
}
// Do not remove genesis state
if startSlot == 0 {
startSlot++
}
// If end slot comes less than start slot
if endSlot < startSlot {
endSlot = startSlot
}
filter := filters.NewFilter().SetStartSlot(startSlot).SetEndSlot(endSlot)
roots, err := s.beaconDB.BlockRoots(ctx, filter)
if err != nil {
return err
}
roots, err = s.filterBlockRoots(ctx, roots)
if err != nil {
return err
}
if err := s.beaconDB.DeleteStates(ctx, roots); err != nil {
return err
}
return nil
}
// shouldUpdateCurrentJustified prevents bouncing attack, by only update conflicting justified
// checkpoints in the fork choice if in the early slots of the epoch.
// Otherwise, delay incorporation of new justified checkpoint until next epoch boundary.
// See https://ethresear.ch/t/prevention-of-bouncing-attack-on-ffg/6114 for more detailed analysis and discussion.
func (s *Service) shouldUpdateCurrentJustified(ctx context.Context, newJustifiedCheckpt *ethpb.Checkpoint) (bool, error) {
if helpers.SlotsSinceEpochStarts(s.currentSlot()) < params.BeaconConfig().SafeSlotsToUpdateJustified {
return true, nil
}
newJustifiedBlockSigned, err := s.beaconDB.Block(ctx, bytesutil.ToBytes32(newJustifiedCheckpt.Root))
if err != nil {
return false, err
}
if newJustifiedBlockSigned == nil || newJustifiedBlockSigned.Block == nil {
return false, errors.New("nil new justified block")
}
newJustifiedBlock := newJustifiedBlockSigned.Block
if newJustifiedBlock.Slot <= helpers.StartSlot(s.justifiedCheckpt.Epoch) {
return false, nil
}
justifiedBlockSigned, err := s.beaconDB.Block(ctx, bytesutil.ToBytes32(s.justifiedCheckpt.Root))
if err != nil {
return false, err
}
if justifiedBlockSigned == nil || justifiedBlockSigned.Block == nil {
return false, errors.New("nil justified block")
}
justifiedBlock := justifiedBlockSigned.Block
b, err := s.ancestor(ctx, newJustifiedCheckpt.Root, justifiedBlock.Slot)
if err != nil {
return false, err
}
if !bytes.Equal(b, s.justifiedCheckpt.Root) {
return false, nil
}
return true, nil
}
func (s *Service) updateJustified(ctx context.Context, state *pb.BeaconState) error {
if state.CurrentJustifiedCheckpoint.Epoch > s.bestJustifiedCheckpt.Epoch {
s.bestJustifiedCheckpt = state.CurrentJustifiedCheckpoint
}
canUpdate, err := s.shouldUpdateCurrentJustified(ctx, state.CurrentJustifiedCheckpoint)
if err != nil {
return err
}
if canUpdate {
s.justifiedCheckpt = state.CurrentJustifiedCheckpoint
}
if featureconfig.Get().InitSyncCacheState {
justifiedRoot := bytesutil.ToBytes32(state.CurrentJustifiedCheckpoint.Root)
justifiedState := s.initSyncState[justifiedRoot]
if err := s.beaconDB.SaveState(ctx, justifiedState, justifiedRoot); err != nil {
return errors.Wrap(err, "could not save justified state")
}
}
return s.beaconDB.SaveJustifiedCheckpoint(ctx, state.CurrentJustifiedCheckpoint)
}
// currentSlot returns the current slot based on time.
func (s *Service) currentSlot() uint64 {
return uint64(time.Now().Unix()-s.genesisTime.Unix()) / params.BeaconConfig().SecondsPerSlot
}
// This receives cached state in memory for initial sync only during initial sync.
func (s *Service) cachedPreState(ctx context.Context, b *ethpb.BeaconBlock) (*pb.BeaconState, error) {
if featureconfig.Get().InitSyncCacheState {
preState := s.initSyncState[bytesutil.ToBytes32(b.ParentRoot)]
var err error
if preState == nil {
preState, err = s.beaconDB.State(ctx, bytesutil.ToBytes32(b.ParentRoot))
if err != nil {
return nil, errors.Wrapf(err, "could not get pre state for slot %d", b.Slot)
}
if preState == nil {
return nil, fmt.Errorf("pre state of slot %d does not exist", b.Slot)
}
}
return proto.Clone(preState).(*pb.BeaconState), nil
}
preState, err := s.beaconDB.State(ctx, bytesutil.ToBytes32(b.ParentRoot))
if err != nil {
return nil, errors.Wrapf(err, "could not get pre state for slot %d", b.Slot)
}
if preState == nil {
return nil, fmt.Errorf("pre state of slot %d does not exist", b.Slot)
}
return preState, nil
}
// This saves every finalized state in DB during initial sync, needed as part of optimization to
// use cache state during initial sync in case of restart.
func (s *Service) saveInitState(ctx context.Context, state *pb.BeaconState) error {
if !featureconfig.Get().InitSyncCacheState {
return nil
}
finalizedRoot := bytesutil.ToBytes32(state.FinalizedCheckpoint.Root)
fs := s.initSyncState[finalizedRoot]
if err := s.beaconDB.SaveState(ctx, fs, finalizedRoot); err != nil {
return errors.Wrap(err, "could not save state")
}
for r, oldState := range s.initSyncState {
if oldState.Slot < state.FinalizedCheckpoint.Epoch*params.BeaconConfig().SlotsPerEpoch {
delete(s.initSyncState, r)
}
}
return nil
}
// This filters block roots that are not known as head root and finalized root in DB.
// It serves as the last line of defence before we prune states.
func (s *Service) filterBlockRoots(ctx context.Context, roots [][32]byte) ([][32]byte, error) {
f, err := s.beaconDB.FinalizedCheckpoint(ctx)
if err != nil {
return nil, err
}
fRoot := f.Root
h, err := s.beaconDB.HeadBlock(ctx)
if err != nil {
return nil, err
}
hRoot, err := ssz.SigningRoot(h)
if err != nil {
return nil, err
}
filtered := make([][32]byte, 0, len(roots))
for _, root := range roots {
if bytes.Equal(root[:], fRoot[:]) || bytes.Equal(root[:], hRoot[:]) {
continue
}
filtered = append(filtered, root)
}
return filtered, nil
}
// ancestor returns the block root of an ancestry block from the input block root.
//
// Spec pseudocode definition:
// def get_ancestor(store: Store, root: Hash, slot: Slot) -> Hash:
// block = store.blocks[root]
// if block.slot > slot:
// return get_ancestor(store, block.parent_root, slot)
// elif block.slot == slot:
// return root
// else:
// return Bytes32() # root is older than queried slot: no results.
func (s *Service) ancestor(ctx context.Context, root []byte, slot uint64) ([]byte, error) {
ctx, span := trace.StartSpan(ctx, "forkchoice.ancestor")
defer span.End()
// Stop recursive ancestry lookup if context is cancelled.
if ctx.Err() != nil {
return nil, ctx.Err()
}
signed, err := s.beaconDB.Block(ctx, bytesutil.ToBytes32(root))
if err != nil {
return nil, errors.Wrap(err, "could not get ancestor block")
}
if signed == nil || signed.Block == nil {
return nil, errors.New("nil block")
}
b := signed.Block
// If we dont have the ancestor in the DB, simply return nil so rest of fork choice
// operation can proceed. This is not an error condition.
if b == nil || b.Slot < slot {
return nil, nil
}
if b.Slot == slot {
return root, nil
}
return s.ancestor(ctx, b.ParentRoot, slot)
}

View File

@@ -0,0 +1,570 @@
package blockchain
import (
"context"
"reflect"
"strings"
"testing"
"time"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-ssz"
"github.com/prysmaticlabs/prysm/beacon-chain/core/blocks"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
testDB "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/stateutil"
)
func TestStore_OnBlock(t *testing.T) {
ctx := context.Background()
db := testDB.SetupDB(t)
defer testDB.TeardownDB(t, db)
cfg := &Config{BeaconDB: db}
service, err := NewService(ctx, cfg)
if err != nil {
t.Fatal(err)
}
genesisStateRoot, err := stateutil.HashTreeRootState(&pb.BeaconState{})
if err != nil {
t.Error(err)
}
genesis := blocks.NewGenesisBlock(genesisStateRoot[:])
if err := db.SaveBlock(ctx, genesis); err != nil {
t.Error(err)
}
validGenesisRoot, err := ssz.HashTreeRoot(genesis.Block)
if err != nil {
t.Error(err)
}
if err := service.beaconDB.SaveState(ctx, &pb.BeaconState{}, validGenesisRoot); err != nil {
t.Fatal(err)
}
roots, err := blockTree1(db, validGenesisRoot[:])
if err != nil {
t.Fatal(err)
}
random := &ethpb.SignedBeaconBlock{Block: &ethpb.BeaconBlock{Slot: 1, ParentRoot: validGenesisRoot[:]}}
if err := db.SaveBlock(ctx, random); err != nil {
t.Error(err)
}
randomParentRoot, err := ssz.HashTreeRoot(random.Block)
if err != nil {
t.Error(err)
}
if err := service.beaconDB.SaveState(ctx, &pb.BeaconState{}, randomParentRoot); err != nil {
t.Fatal(err)
}
randomParentRoot2 := roots[1]
if err := service.beaconDB.SaveState(ctx, &pb.BeaconState{}, bytesutil.ToBytes32(randomParentRoot2)); err != nil {
t.Fatal(err)
}
tests := []struct {
name string
blk *ethpb.BeaconBlock
s *pb.BeaconState
time uint64
wantErrString string
}{
{
name: "parent block root does not have a state",
blk: &ethpb.BeaconBlock{},
s: &pb.BeaconState{},
wantErrString: "pre state of slot 0 does not exist",
},
{
name: "block is from the feature",
blk: &ethpb.BeaconBlock{ParentRoot: randomParentRoot[:], Slot: params.BeaconConfig().FarFutureEpoch},
s: &pb.BeaconState{},
wantErrString: "could not process slot from the future",
},
{
name: "could not get finalized block",
blk: &ethpb.BeaconBlock{ParentRoot: randomParentRoot[:]},
s: &pb.BeaconState{},
wantErrString: "block from slot 0 is not a descendent of the current finalized block",
},
{
name: "same slot as finalized block",
blk: &ethpb.BeaconBlock{Slot: 0, ParentRoot: randomParentRoot2},
s: &pb.BeaconState{},
wantErrString: "block is equal or earlier than finalized block, slot 0 < slot 0",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
service.justifiedCheckpt = &ethpb.Checkpoint{Root: validGenesisRoot[:]}
service.bestJustifiedCheckpt = &ethpb.Checkpoint{Root: validGenesisRoot[:]}
service.finalizedCheckpt = &ethpb.Checkpoint{Root: validGenesisRoot[:]}
service.prevFinalizedCheckpt = &ethpb.Checkpoint{Root: validGenesisRoot[:]}
service.finalizedCheckpt.Root = roots[0]
_, err := service.onBlock(ctx, &ethpb.SignedBeaconBlock{Block: tt.blk})
if !strings.Contains(err.Error(), tt.wantErrString) {
t.Errorf("Store.OnBlock() error = %v, wantErr = %v", err, tt.wantErrString)
}
})
}
}
func TestStore_SaveNewValidators(t *testing.T) {
ctx := context.Background()
db := testDB.SetupDB(t)
defer testDB.TeardownDB(t, db)
cfg := &Config{BeaconDB: db}
service, err := NewService(ctx, cfg)
if err != nil {
t.Fatal(err)
}
preCount := 2 // validators 0 and validators 1
s := &pb.BeaconState{Validators: []*ethpb.Validator{
{PublicKey: []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}},
{PublicKey: []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}},
{PublicKey: []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2}},
{PublicKey: []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3}},
}}
if err := service.saveNewValidators(ctx, preCount, s); err != nil {
t.Fatal(err)
}
if !db.HasValidatorIndex(ctx, []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2}) {
t.Error("Wanted validator saved in db")
}
if !db.HasValidatorIndex(ctx, []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3}) {
t.Error("Wanted validator saved in db")
}
if db.HasValidatorIndex(ctx, []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}) {
t.Error("validator not suppose to be saved in db")
}
}
func TestRemoveStateSinceLastFinalized(t *testing.T) {
ctx := context.Background()
db := testDB.SetupDB(t)
defer testDB.TeardownDB(t, db)
params.UseMinimalConfig()
defer params.UseMainnetConfig()
cfg := &Config{BeaconDB: db}
service, err := NewService(ctx, cfg)
if err != nil {
t.Fatal(err)
}
// Save 100 blocks in DB, each has a state.
numBlocks := 100
totalBlocks := make([]*ethpb.SignedBeaconBlock, numBlocks)
blockRoots := make([][32]byte, 0)
for i := 0; i < len(totalBlocks); i++ {
totalBlocks[i] = &ethpb.SignedBeaconBlock{
Block: &ethpb.BeaconBlock{
Slot: uint64(i),
},
}
r, err := ssz.HashTreeRoot(totalBlocks[i].Block)
if err != nil {
t.Fatal(err)
}
if err := service.beaconDB.SaveState(ctx, &pb.BeaconState{Slot: uint64(i)}, r); err != nil {
t.Fatal(err)
}
if err := service.beaconDB.SaveBlock(ctx, totalBlocks[i]); err != nil {
t.Fatal(err)
}
blockRoots = append(blockRoots, r)
if err := service.beaconDB.SaveHeadBlockRoot(ctx, r); err != nil {
t.Fatal(err)
}
}
// New finalized epoch: 1
finalizedEpoch := uint64(1)
finalizedSlot := finalizedEpoch * params.BeaconConfig().SlotsPerEpoch
endSlot := helpers.StartSlot(finalizedEpoch+1) - 1 // Inclusive
if err := service.rmStatesOlderThanLastFinalized(ctx, 0, endSlot); err != nil {
t.Fatal(err)
}
for _, r := range blockRoots {
s, err := service.beaconDB.State(ctx, r)
if err != nil {
t.Fatal(err)
}
// Also verifies genesis state didnt get deleted
if s != nil && s.Slot != finalizedSlot && s.Slot != 0 && s.Slot < endSlot {
t.Errorf("State with slot %d should not be in DB", s.Slot)
}
}
// New finalized epoch: 5
newFinalizedEpoch := uint64(5)
newFinalizedSlot := newFinalizedEpoch * params.BeaconConfig().SlotsPerEpoch
endSlot = helpers.StartSlot(newFinalizedEpoch+1) - 1 // Inclusive
if err := service.rmStatesOlderThanLastFinalized(ctx, helpers.StartSlot(finalizedEpoch+1)-1, endSlot); err != nil {
t.Fatal(err)
}
for _, r := range blockRoots {
s, err := service.beaconDB.State(ctx, r)
if err != nil {
t.Fatal(err)
}
// Also verifies genesis state didnt get deleted
if s != nil && s.Slot != newFinalizedSlot && s.Slot != finalizedSlot && s.Slot != 0 && s.Slot < endSlot {
t.Errorf("State with slot %d should not be in DB", s.Slot)
}
}
}
func TestRemoveStateSinceLastFinalized_EmptyStartSlot(t *testing.T) {
ctx := context.Background()
db := testDB.SetupDB(t)
defer testDB.TeardownDB(t, db)
params.UseMinimalConfig()
defer params.UseMainnetConfig()
cfg := &Config{BeaconDB: db}
service, err := NewService(ctx, cfg)
if err != nil {
t.Fatal(err)
}
service.genesisTime = time.Now()
update, err := service.shouldUpdateCurrentJustified(ctx, &ethpb.Checkpoint{})
if err != nil {
t.Fatal(err)
}
if !update {
t.Error("Should be able to update justified, received false")
}
lastJustifiedBlk := &ethpb.SignedBeaconBlock{Block: &ethpb.BeaconBlock{ParentRoot: []byte{'G'}}}
lastJustifiedRoot, _ := ssz.HashTreeRoot(lastJustifiedBlk.Block)
newJustifiedBlk := &ethpb.SignedBeaconBlock{Block: &ethpb.BeaconBlock{Slot: 1, ParentRoot: lastJustifiedRoot[:]}}
newJustifiedRoot, _ := ssz.HashTreeRoot(newJustifiedBlk.Block)
if err := service.beaconDB.SaveBlock(ctx, newJustifiedBlk); err != nil {
t.Fatal(err)
}
if err := service.beaconDB.SaveBlock(ctx, lastJustifiedBlk); err != nil {
t.Fatal(err)
}
diff := (params.BeaconConfig().SlotsPerEpoch - 1) * params.BeaconConfig().SecondsPerSlot
service.genesisTime = time.Unix(time.Now().Unix()-int64(diff), 0)
service.justifiedCheckpt = &ethpb.Checkpoint{Root: lastJustifiedRoot[:]}
update, err = service.shouldUpdateCurrentJustified(ctx, &ethpb.Checkpoint{Root: newJustifiedRoot[:]})
if err != nil {
t.Fatal(err)
}
if !update {
t.Error("Should be able to update justified, received false")
}
}
func TestShouldUpdateJustified_ReturnFalse(t *testing.T) {
ctx := context.Background()
db := testDB.SetupDB(t)
defer testDB.TeardownDB(t, db)
params.UseMinimalConfig()
defer params.UseMainnetConfig()
cfg := &Config{BeaconDB: db}
service, err := NewService(ctx, cfg)
if err != nil {
t.Fatal(err)
}
lastJustifiedBlk := &ethpb.SignedBeaconBlock{Block: &ethpb.BeaconBlock{ParentRoot: []byte{'G'}}}
lastJustifiedRoot, _ := ssz.HashTreeRoot(lastJustifiedBlk.Block)
newJustifiedBlk := &ethpb.SignedBeaconBlock{Block: &ethpb.BeaconBlock{ParentRoot: lastJustifiedRoot[:]}}
newJustifiedRoot, _ := ssz.HashTreeRoot(newJustifiedBlk.Block)
if err := service.beaconDB.SaveBlock(ctx, newJustifiedBlk); err != nil {
t.Fatal(err)
}
if err := service.beaconDB.SaveBlock(ctx, lastJustifiedBlk); err != nil {
t.Fatal(err)
}
diff := (params.BeaconConfig().SlotsPerEpoch - 1) * params.BeaconConfig().SecondsPerSlot
service.genesisTime = time.Unix(time.Now().Unix()-int64(diff), 0)
service.justifiedCheckpt = &ethpb.Checkpoint{Root: lastJustifiedRoot[:]}
update, err := service.shouldUpdateCurrentJustified(ctx, &ethpb.Checkpoint{Root: newJustifiedRoot[:]})
if err != nil {
t.Fatal(err)
}
if update {
t.Error("Should not be able to update justified, received true")
}
}
func TestCachedPreState_CanGetFromCache(t *testing.T) {
ctx := context.Background()
db := testDB.SetupDB(t)
defer testDB.TeardownDB(t, db)
cfg := &Config{BeaconDB: db}
service, err := NewService(ctx, cfg)
if err != nil {
t.Fatal(err)
}
s := &pb.BeaconState{Slot: 1}
r := [32]byte{'A'}
b := &ethpb.BeaconBlock{Slot: 1, ParentRoot: r[:]}
service.initSyncState[r] = s
wanted := "pre state of slot 1 does not exist"
if _, err := service.cachedPreState(ctx, b); !strings.Contains(err.Error(), wanted) {
t.Fatal("Not expected error")
}
}
func TestCachedPreState_CanGetFromCacheWithFeature(t *testing.T) {
ctx := context.Background()
db := testDB.SetupDB(t)
defer testDB.TeardownDB(t, db)
config := &featureconfig.Flags{
InitSyncCacheState: true,
}
featureconfig.Init(config)
cfg := &Config{BeaconDB: db}
service, err := NewService(ctx, cfg)
if err != nil {
t.Fatal(err)
}
s := &pb.BeaconState{Slot: 1}
r := [32]byte{'A'}
b := &ethpb.BeaconBlock{Slot: 1, ParentRoot: r[:]}
service.initSyncState[r] = s
received, err := service.cachedPreState(ctx, b)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(s, received) {
t.Error("cached state not the same")
}
}
func TestCachedPreState_CanGetFromDB(t *testing.T) {
ctx := context.Background()
db := testDB.SetupDB(t)
defer testDB.TeardownDB(t, db)
cfg := &Config{BeaconDB: db}
service, err := NewService(ctx, cfg)
if err != nil {
t.Fatal(err)
}
r := [32]byte{'A'}
b := &ethpb.BeaconBlock{Slot: 1, ParentRoot: r[:]}
_, err = service.cachedPreState(ctx, b)
wanted := "pre state of slot 1 does not exist"
if err.Error() != wanted {
t.Error("Did not get wanted error")
}
s := &pb.BeaconState{Slot: 1}
service.beaconDB.SaveState(ctx, s, r)
received, err := service.cachedPreState(ctx, b)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(s, received) {
t.Error("cached state not the same")
}
}
func TestSaveInitState_CanSaveDelete(t *testing.T) {
ctx := context.Background()
db := testDB.SetupDB(t)
defer testDB.TeardownDB(t, db)
cfg := &Config{BeaconDB: db}
service, err := NewService(ctx, cfg)
if err != nil {
t.Fatal(err)
}
config := &featureconfig.Flags{
InitSyncCacheState: true,
}
featureconfig.Init(config)
for i := uint64(0); i < 64; i++ {
b := &ethpb.BeaconBlock{Slot: i}
s := &pb.BeaconState{Slot: i}
r, _ := ssz.HashTreeRoot(b)
service.initSyncState[r] = s
}
// Set finalized root as slot 32
finalizedRoot, _ := ssz.HashTreeRoot(&ethpb.BeaconBlock{Slot: 32})
if err := service.saveInitState(ctx, &pb.BeaconState{FinalizedCheckpoint: &ethpb.Checkpoint{
Epoch: 1, Root: finalizedRoot[:]}}); err != nil {
t.Fatal(err)
}
// Verify finalized state is saved in DB
finalizedState, err := service.beaconDB.State(ctx, finalizedRoot)
if err != nil {
t.Fatal(err)
}
if finalizedState == nil {
t.Error("finalized state can't be nil")
}
// Verify cached state is properly pruned
if len(service.initSyncState) != int(params.BeaconConfig().SlotsPerEpoch) {
t.Errorf("wanted: %d, got: %d", len(service.initSyncState), params.BeaconConfig().SlotsPerEpoch)
}
}
func TestUpdateJustified_CouldUpdateBest(t *testing.T) {
ctx := context.Background()
db := testDB.SetupDB(t)
defer testDB.TeardownDB(t, db)
cfg := &Config{BeaconDB: db}
service, err := NewService(ctx, cfg)
if err != nil {
t.Fatal(err)
}
signedBlock := &ethpb.SignedBeaconBlock{Block: &ethpb.BeaconBlock{}}
if err := db.SaveBlock(ctx, signedBlock); err != nil {
t.Fatal(err)
}
r, err := ssz.HashTreeRoot(signedBlock.Block)
if err != nil {
t.Fatal(err)
}
service.justifiedCheckpt = &ethpb.Checkpoint{Root: []byte{'A'}}
service.bestJustifiedCheckpt = &ethpb.Checkpoint{Root: []byte{'A'}}
service.initSyncState[r] = &pb.BeaconState{}
if err := db.SaveState(ctx, &pb.BeaconState{}, r); err != nil {
t.Fatal(err)
}
// Could update
s := &pb.BeaconState{CurrentJustifiedCheckpoint: &ethpb.Checkpoint{Epoch: 1, Root: r[:]}}
if err := service.updateJustified(context.Background(), s); err != nil {
t.Fatal(err)
}
if service.bestJustifiedCheckpt.Epoch != s.CurrentJustifiedCheckpoint.Epoch {
t.Error("Incorrect justified epoch in service")
}
// Could not update
service.bestJustifiedCheckpt.Epoch = 2
if err := service.updateJustified(context.Background(), s); err != nil {
t.Fatal(err)
}
if service.bestJustifiedCheckpt.Epoch != 2 {
t.Error("Incorrect justified epoch in service")
}
}
func TestFilterBlockRoots_CanFilter(t *testing.T) {
ctx := context.Background()
db := testDB.SetupDB(t)
defer testDB.TeardownDB(t, db)
cfg := &Config{BeaconDB: db}
service, err := NewService(ctx, cfg)
if err != nil {
t.Fatal(err)
}
fBlock := &ethpb.BeaconBlock{}
fRoot, _ := ssz.HashTreeRoot(fBlock)
hBlock := &ethpb.BeaconBlock{Slot: 1}
headRoot, _ := ssz.HashTreeRoot(hBlock)
if err := service.beaconDB.SaveBlock(ctx, &ethpb.SignedBeaconBlock{Block: fBlock}); err != nil {
t.Fatal(err)
}
if err := service.beaconDB.SaveState(ctx, &pb.BeaconState{}, fRoot); err != nil {
t.Fatal(err)
}
if err := service.beaconDB.SaveFinalizedCheckpoint(ctx, &ethpb.Checkpoint{Root: fRoot[:]}); err != nil {
t.Fatal(err)
}
if err := service.beaconDB.SaveBlock(ctx, &ethpb.SignedBeaconBlock{Block: hBlock}); err != nil {
t.Fatal(err)
}
if err := service.beaconDB.SaveState(ctx, &pb.BeaconState{}, headRoot); err != nil {
t.Fatal(err)
}
if err := service.beaconDB.SaveHeadBlockRoot(ctx, headRoot); err != nil {
t.Fatal(err)
}
roots := [][32]byte{{'C'}, {'D'}, headRoot, {'E'}, fRoot, {'F'}}
wanted := [][32]byte{{'C'}, {'D'}, {'E'}, {'F'}}
received, err := service.filterBlockRoots(ctx, roots)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(wanted, received) {
t.Error("Did not filter correctly")
}
}
// blockTree1 constructs the following tree:
// /- B1
// B0 /- B5 - B7
// \- B3 - B4 - B6 - B8
// (B1, and B3 are all from the same slots)
func blockTree1(db db.Database, genesisRoot []byte) ([][]byte, error) {
b0 := &ethpb.BeaconBlock{Slot: 0, ParentRoot: genesisRoot}
r0, _ := ssz.HashTreeRoot(b0)
b1 := &ethpb.BeaconBlock{Slot: 1, ParentRoot: r0[:]}
r1, _ := ssz.HashTreeRoot(b1)
b3 := &ethpb.BeaconBlock{Slot: 3, ParentRoot: r0[:]}
r3, _ := ssz.HashTreeRoot(b3)
b4 := &ethpb.BeaconBlock{Slot: 4, ParentRoot: r3[:]}
r4, _ := ssz.HashTreeRoot(b4)
b5 := &ethpb.BeaconBlock{Slot: 5, ParentRoot: r4[:]}
r5, _ := ssz.HashTreeRoot(b5)
b6 := &ethpb.BeaconBlock{Slot: 6, ParentRoot: r4[:]}
r6, _ := ssz.HashTreeRoot(b6)
b7 := &ethpb.BeaconBlock{Slot: 7, ParentRoot: r5[:]}
r7, _ := ssz.HashTreeRoot(b7)
b8 := &ethpb.BeaconBlock{Slot: 8, ParentRoot: r6[:]}
r8, _ := ssz.HashTreeRoot(b8)
for _, b := range []*ethpb.BeaconBlock{b0, b1, b3, b4, b5, b6, b7, b8} {
if err := db.SaveBlock(context.Background(), &ethpb.SignedBeaconBlock{Block: b}); err != nil {
return nil, err
}
if err := db.SaveState(context.Background(), &pb.BeaconState{}, bytesutil.ToBytes32(b.ParentRoot)); err != nil {
return nil, err
}
}
if err := db.SaveState(context.Background(), &pb.BeaconState{}, r1); err != nil {
return nil, err
}
if err := db.SaveState(context.Background(), &pb.BeaconState{}, r7); err != nil {
return nil, err
}
if err := db.SaveState(context.Background(), &pb.BeaconState{}, r8); err != nil {
return nil, err
}
return [][]byte{r0[:], r1[:], nil, r3[:], r4[:], r5[:], r6[:], r7[:], r8[:]}, nil
}

View File

@@ -13,6 +13,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/traceutil"
@@ -70,11 +71,22 @@ func (s *Service) ReceiveBlockNoPubsub(ctx context.Context, block *ethpb.SignedB
blockCopy := proto.Clone(block).(*ethpb.SignedBeaconBlock)
// Apply state transition on the new block.
postState, err := s.forkChoiceStoreOld.OnBlockCacheFilteredTree(ctx, blockCopy)
if err != nil {
err := errors.Wrap(err, "could not process block from fork choice service")
traceutil.AnnotateError(span, err)
return err
var postState *pb.BeaconState
var err error
if featureconfig.Get().ProtoArrayForkChoice {
postState, err = s.onBlock(ctx, blockCopy)
if err != nil {
err := errors.Wrap(err, "could not process block")
traceutil.AnnotateError(span, err)
return err
}
} else {
postState, err = s.forkChoiceStoreOld.OnBlockCacheFilteredTree(ctx, blockCopy)
if err != nil {
err := errors.Wrap(err, "could not process block from fork choice service")
traceutil.AnnotateError(span, err)
return err
}
}
root, err := ssz.HashTreeRoot(blockCopy.Block)
@@ -135,8 +147,8 @@ func (s *Service) ReceiveBlockNoPubsub(ctx context.Context, block *ethpb.SignedB
s.forkChoiceStore.ProcessAttestation(ctx, indices, bytesutil.ToBytes32(a.Data.BeaconBlockRoot), a.Data.Target.Epoch)
}
f := s.forkChoiceStoreOld.FinalizedCheckpt()
j := s.forkChoiceStoreOld.JustifiedCheckpt()
f := s.finalizedCheckpt
j := s.justifiedCheckpt
headRootProtoArray, err := s.forkChoiceStore.Head(
ctx,
f.Epoch,
@@ -148,7 +160,7 @@ func (s *Service) ReceiveBlockNoPubsub(ctx context.Context, block *ethpb.SignedB
return nil
}
if postState.FinalizedCheckpoint.Epoch > s.FinalizedCheckpt().Epoch {
if postState.FinalizedCheckpoint.Epoch > f.Epoch {
if err := s.forkChoiceStore.Prune(ctx, bytesutil.ToBytes32(postState.FinalizedCheckpoint.Root)); err != nil {
return errors.Wrap(err, "could not prune proto array fork choice")
}
@@ -193,12 +205,23 @@ func (s *Service) ReceiveBlockNoPubsubForkchoice(ctx context.Context, block *eth
defer span.End()
blockCopy := proto.Clone(block).(*ethpb.SignedBeaconBlock)
// Apply state transition on the incoming newly received block.
postState, err := s.forkChoiceStoreOld.OnBlock(ctx, blockCopy)
if err != nil {
err := errors.Wrap(err, "could not process block from fork choice service")
traceutil.AnnotateError(span, err)
return err
// Apply state transition on the new block.
var postState *pb.BeaconState
var err error
if featureconfig.Get().ProtoArrayForkChoice {
postState, err = s.onBlock(ctx, blockCopy)
if err != nil {
err := errors.Wrap(err, "could not process block")
traceutil.AnnotateError(span, err)
return err
}
} else {
postState, err = s.forkChoiceStoreOld.OnBlock(ctx, blockCopy)
if err != nil {
err := errors.Wrap(err, "could not process block from fork choice service")
traceutil.AnnotateError(span, err)
return err
}
}
root, err := ssz.HashTreeRoot(blockCopy.Block)
@@ -265,10 +288,22 @@ func (s *Service) ReceiveBlockNoVerify(ctx context.Context, block *ethpb.SignedB
blockCopy := proto.Clone(block).(*ethpb.SignedBeaconBlock)
// Apply state transition on the incoming newly received blockCopy without verifying its BLS contents.
postState, err := s.forkChoiceStoreOld.OnBlockInitialSyncStateTransition(ctx, blockCopy)
if err != nil {
return errors.Wrap(err, "could not process blockCopy from fork choice service")
var postState *pb.BeaconState
var err error
if featureconfig.Get().ProtoArrayForkChoice {
postState, err = s.onBlockInitialSyncStateTransition(ctx, blockCopy)
if err != nil {
err := errors.Wrap(err, "could not process block")
traceutil.AnnotateError(span, err)
return err
}
} else {
postState, err = s.forkChoiceStoreOld.OnBlockInitialSyncStateTransition(ctx, blockCopy)
if err != nil {
return errors.Wrap(err, "could not process blockCopy from fork choice service")
}
}
root, err := ssz.HashTreeRoot(blockCopy.Block)
if err != nil {
return errors.Wrap(err, "could not get signing root on received blockCopy")

View File

@@ -36,6 +36,10 @@ func TestReceiveBlock_ProcessCorrectly(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if err := chainService.beaconDB.SaveGenesisBlockRoot(ctx, genesisBlkRoot); err != nil {
t.Fatal(err)
}
if err := db.SaveState(ctx, beaconState, genesisBlkRoot); err != nil {
t.Fatal(err)
}
@@ -48,10 +52,6 @@ func TestReceiveBlock_ProcessCorrectly(t *testing.T) {
t.Fatalf("Could not save block to db: %v", err)
}
if err := db.SaveState(ctx, beaconState, genesisBlkRoot); err != nil {
t.Fatal(err)
}
slot := beaconState.Slot + 1
block, err := testutil.GenerateFullBlock(beaconState, privKeys, nil, slot)
if err != nil {
@@ -169,6 +169,9 @@ func TestReceiveBlockNoPubsubForkchoice_ProcessCorrectly(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if err := chainService.beaconDB.SaveGenesisBlockRoot(ctx, parentRoot); err != nil {
t.Fatal(err)
}
if err := db.SaveState(ctx, beaconState, parentRoot); err != nil {
t.Fatal(err)
}

View File

@@ -62,6 +62,14 @@ type Service struct {
epochParticipation map[uint64]*precompute.Balance
epochParticipationLock sync.RWMutex
forkChoiceStore f.ForkChoicer
justifiedCheckpt *ethpb.Checkpoint
bestJustifiedCheckpt *ethpb.Checkpoint
finalizedCheckpt *ethpb.Checkpoint
prevFinalizedCheckpt *ethpb.Checkpoint
nextEpochBoundarySlot uint64
voteLock sync.RWMutex
initSyncState map[[32]byte]*pb.BeaconState
initSyncStateLock sync.RWMutex
}
// Config options for the service.
@@ -98,6 +106,7 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) {
stateNotifier: cfg.StateNotifier,
epochParticipation: make(map[uint64]*precompute.Balance),
forkChoiceStore: cfg.ForkChoiceStore,
initSyncState: make(map[[32]byte]*pb.BeaconState),
}, nil
}
@@ -143,9 +152,18 @@ func (s *Service) Start() {
if err := s.forkChoiceStoreOld.GenesisStore(ctx, justifiedCheckpoint, finalizedCheckpoint); err != nil {
log.Fatalf("Could not start fork choice service: %v", err)
}
if err := s.resumeForkChoice(ctx, justifiedCheckpoint, finalizedCheckpoint); err != nil {
log.Fatalf("Could not resume fork choice: %v", err)
if featureconfig.Get().ProtoArrayForkChoice {
s.justifiedCheckpt = proto.Clone(justifiedCheckpoint).(*ethpb.Checkpoint)
s.bestJustifiedCheckpt = proto.Clone(justifiedCheckpoint).(*ethpb.Checkpoint)
s.finalizedCheckpt = proto.Clone(finalizedCheckpoint).(*ethpb.Checkpoint)
s.prevFinalizedCheckpt = proto.Clone(finalizedCheckpoint).(*ethpb.Checkpoint)
if err := s.resumeForkChoice(ctx, justifiedCheckpoint, finalizedCheckpoint); err != nil {
log.Fatalf("Could not resume fork choice: %v", err)
}
}
s.stateNotifier.StateFeed().Send(&feed.Event{
Type: statefeed.Initialized,
Data: &statefeed.InitializedData{
@@ -359,6 +377,11 @@ func (s *Service) saveGenesisData(ctx context.Context, genesisState *pb.BeaconSt
// Add the genesis block to the fork choice store.
if featureconfig.Get().ProtoArrayForkChoice {
s.justifiedCheckpt = proto.Clone(genesisCheckpoint).(*ethpb.Checkpoint)
s.bestJustifiedCheckpt = proto.Clone(genesisCheckpoint).(*ethpb.Checkpoint)
s.finalizedCheckpt = proto.Clone(genesisCheckpoint).(*ethpb.Checkpoint)
s.prevFinalizedCheckpt = proto.Clone(genesisCheckpoint).(*ethpb.Checkpoint)
if err := s.forkChoiceStore.ProcessBlock(ctx,
genesisBlk.Block.Slot,
genesisBlkRoot,

View File

@@ -15,7 +15,7 @@ func TestEndToEnd_DemoConfig(t *testing.T) {
params.UseDemoBeaconConfig()
demoConfig := &end2EndConfig{
beaconFlags: featureconfig.E2EBeaconChainFlags,
beaconFlags: featureconfig.E2EBeaconChainFlags,
validatorFlags: featureconfig.E2EValidatorFlags,
epochsToRun: 5,
numBeaconNodes: 2,

View File

@@ -14,7 +14,7 @@ func TestEndToEnd_MinimalConfig(t *testing.T) {
params.UseMinimalConfig()
minimalConfig := &end2EndConfig{
beaconFlags: append(featureconfig.E2EBeaconChainFlags, "--minimal-config"),
beaconFlags: append(featureconfig.E2EBeaconChainFlags, "--minimal-config"),
validatorFlags: append(featureconfig.E2EValidatorFlags, "--minimal-config"),
epochsToRun: 5,
numBeaconNodes: 4,