diff --git a/beacon-chain/blockchain/BUILD.bazel b/beacon-chain/blockchain/BUILD.bazel index 2326fbfcc9..1c8f3af5f3 100644 --- a/beacon-chain/blockchain/BUILD.bazel +++ b/beacon-chain/blockchain/BUILD.bazel @@ -7,6 +7,8 @@ go_library( "info.go", "log.go", "metrics.go", + "process_block.go", + "process_block_helpers.go", "receive_attestation.go", "receive_block.go", "service.go", @@ -23,6 +25,7 @@ go_library( "//beacon-chain/core/helpers:go_default_library", "//beacon-chain/core/state:go_default_library", "//beacon-chain/db:go_default_library", + "//beacon-chain/db/filters:go_default_library", "//beacon-chain/forkchoice:go_default_library", "//beacon-chain/forkchoice/protoarray:go_default_library", "//beacon-chain/operations/attestations:go_default_library", @@ -61,6 +64,7 @@ go_test( size = "medium", srcs = [ "chain_info_test.go", + "process_block_test.go", "receive_attestation_test.go", "receive_block_test.go", "service_test.go", diff --git a/beacon-chain/blockchain/info.go b/beacon-chain/blockchain/info.go index 6e0eee2569..13c40040e6 100644 --- a/beacon-chain/blockchain/info.go +++ b/beacon-chain/blockchain/info.go @@ -68,7 +68,7 @@ func (s *Service) TreeHandler(w http.ResponseWriter, _ *http.Request) { dotNodes[i] = &dotN } - for i := len(nodes) - 1; i >= 0; i-- { + for i := len(nodes) - 1; i >= 0; i-- { if nodes[i].Parent != ^uint64(0) && nodes[i].Parent < uint64(len(dotNodes)) { graph.Edge(*dotNodes[i], *dotNodes[nodes[i].Parent]) } diff --git a/beacon-chain/blockchain/log.go b/beacon-chain/blockchain/log.go index a40d8ed830..2d55004fb5 100644 --- a/beacon-chain/blockchain/log.go +++ b/beacon-chain/blockchain/log.go @@ -1,7 +1,12 @@ package blockchain import ( + "fmt" + ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" + "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" + pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" + "github.com/prysmaticlabs/prysm/shared/params" "github.com/sirupsen/logrus" ) @@ -15,3 +20,30 @@ func logStateTransitionData(b *ethpb.BeaconBlock) { "deposits": len(b.Body.Deposits), }).Info("Finished applying state transition") } + +func logEpochData(beaconState *pb.BeaconState) { + log.WithFields(logrus.Fields{ + "epoch": helpers.CurrentEpoch(beaconState), + "finalizedEpoch": beaconState.FinalizedCheckpoint.Epoch, + "justifiedEpoch": beaconState.CurrentJustifiedCheckpoint.Epoch, + "previousJustifiedEpoch": beaconState.PreviousJustifiedCheckpoint.Epoch, + }).Info("Starting next epoch") + activeVals, err := helpers.ActiveValidatorIndices(beaconState, helpers.CurrentEpoch(beaconState)) + if err != nil { + log.WithError(err).Error("Could not get active validator indices") + return + } + log.WithFields(logrus.Fields{ + "totalValidators": len(beaconState.Validators), + "activeValidators": len(activeVals), + "averageBalance": fmt.Sprintf("%.5f ETH", averageBalance(beaconState.Balances)), + }).Info("Validator registry information") +} + +func averageBalance(balances []uint64) float64 { + total := uint64(0) + for i := 0; i < len(balances); i++ { + total += balances[i] + } + return float64(total) / float64(len(balances)) / float64(params.BeaconConfig().GweiPerEth) +} diff --git a/beacon-chain/blockchain/metrics.go b/beacon-chain/blockchain/metrics.go index 378063e835..4f08881e57 100644 --- a/beacon-chain/blockchain/metrics.go +++ b/beacon-chain/blockchain/metrics.go @@ -3,7 +3,10 @@ package blockchain import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prysmaticlabs/prysm/beacon-chain/core/epoch/precompute" + pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" "github.com/prysmaticlabs/prysm/shared/bytesutil" + "github.com/prysmaticlabs/prysm/shared/params" ) var ( @@ -44,13 +47,73 @@ var ( Help: "The # of processed attestation with pubsub and fork choice, this ususally means attestations from rpc", }) headFinalizedEpoch = promauto.NewGauge(prometheus.GaugeOpts{ - Name: "head_finalized_epoch", + Name: "chain_service_head_finalized_epoch", Help: "Last finalized epoch of the head state", }) headFinalizedRoot = promauto.NewGauge(prometheus.GaugeOpts{ - Name: "head_finalized_root", + Name: "chain_service_head_finalized_root", Help: "Last finalized root of the head state", }) + beaconFinalizedEpoch = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "chain_service_beacon_finalized_epoch", + Help: "Last finalized epoch of the processed state", + }) + beaconFinalizedRoot = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "chain_service_beacon_finalized_root", + Help: "Last finalized root of the processed state", + }) + cacheFinalizedEpoch = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "chain_service_cache_finalized_epoch", + Help: "Last cached finalized epoch", + }) + cacheFinalizedRoot = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "chain_service_cache_finalized_root", + Help: "Last cached finalized root", + }) + beaconCurrentJustifiedEpoch = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "chain_service_beacon_current_justified_epoch", + Help: "Current justified epoch of the processed state", + }) + beaconCurrentJustifiedRoot = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "chain_service_beacon_current_justified_root", + Help: "Current justified root of the processed state", + }) + beaconPrevJustifiedEpoch = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "chain_service_beacon_previous_justified_epoch", + Help: "Previous justified epoch of the processed state", + }) + beaconPrevJustifiedRoot = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "chain_service_beacon_previous_justified_root", + Help: "Previous justified root of the processed state", + }) + sigFailsToVerify = promauto.NewCounter(prometheus.CounterOpts{ + Name: "chain_service_att_signature_failed_to_verify_with_cache", + Help: "Number of attestation signatures that failed to verify with cache on, but succeeded without cache", + }) + validatorsCount = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "chain_service_validator_count", + Help: "The total number of validators", + }, []string{"state"}) + validatorsBalance = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "chain_service_validators_total_balance", + Help: "The total balance of validators, in GWei", + }, []string{"state"}) + validatorsEffectiveBalance = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "chain_service_validators_total_effective_balance", + Help: "The total effective balance of validators, in GWei", + }, []string{"state"}) + currentEth1DataDepositCount = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "chain_service_current_eth1_data_deposit_count", + Help: "The current eth1 deposit count in the last processed state eth1data field.", + }) + totalEligibleBalances = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "chain_service_total_eligible_balances", + Help: "The total amount of ether, in gwei, that has been used in voting attestation target of previous epoch", + }) + totalVotedTargetBalances = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "chain_service_total_voted_target_balances", + Help: "The total amount of ether, in gwei, that is eligible for voting of previous epoch", + }) ) func (s *Service) reportSlotMetrics(currentSlot uint64) { @@ -61,3 +124,91 @@ func (s *Service) reportSlotMetrics(currentSlot uint64) { headFinalizedRoot.Set(float64(bytesutil.ToLowInt64(s.headState.FinalizedCheckpoint.Root))) } } + +func reportEpochMetrics(state *pb.BeaconState) { + currentEpoch := state.Slot / params.BeaconConfig().SlotsPerEpoch + + // Validator instances + pendingInstances := 0 + activeInstances := 0 + slashingInstances := 0 + slashedInstances := 0 + exitingInstances := 0 + exitedInstances := 0 + // Validator balances + pendingBalance := uint64(0) + activeBalance := uint64(0) + activeEffectiveBalance := uint64(0) + exitingBalance := uint64(0) + exitingEffectiveBalance := uint64(0) + slashingBalance := uint64(0) + slashingEffectiveBalance := uint64(0) + + for i, validator := range state.Validators { + if validator.Slashed { + if currentEpoch < validator.ExitEpoch { + slashingInstances++ + slashingBalance += state.Balances[i] + slashingEffectiveBalance += validator.EffectiveBalance + } else { + slashedInstances++ + } + continue + } + if validator.ExitEpoch != params.BeaconConfig().FarFutureEpoch { + if currentEpoch < validator.ExitEpoch { + exitingInstances++ + exitingBalance += state.Balances[i] + exitingEffectiveBalance += validator.EffectiveBalance + } else { + exitedInstances++ + } + continue + } + if currentEpoch < validator.ActivationEpoch { + pendingInstances++ + pendingBalance += state.Balances[i] + continue + } + activeInstances++ + activeBalance += state.Balances[i] + activeEffectiveBalance += validator.EffectiveBalance + } + validatorsCount.WithLabelValues("Pending").Set(float64(pendingInstances)) + validatorsCount.WithLabelValues("Active").Set(float64(activeInstances)) + validatorsCount.WithLabelValues("Exiting").Set(float64(exitingInstances)) + validatorsCount.WithLabelValues("Exited").Set(float64(exitedInstances)) + validatorsCount.WithLabelValues("Slashing").Set(float64(slashingInstances)) + validatorsCount.WithLabelValues("Slashed").Set(float64(slashedInstances)) + validatorsBalance.WithLabelValues("Pending").Set(float64(pendingBalance)) + validatorsBalance.WithLabelValues("Active").Set(float64(activeBalance)) + validatorsBalance.WithLabelValues("Exiting").Set(float64(exitingBalance)) + validatorsBalance.WithLabelValues("Slashing").Set(float64(slashingBalance)) + validatorsEffectiveBalance.WithLabelValues("Active").Set(float64(activeEffectiveBalance)) + validatorsEffectiveBalance.WithLabelValues("Exiting").Set(float64(exitingEffectiveBalance)) + validatorsEffectiveBalance.WithLabelValues("Slashing").Set(float64(slashingEffectiveBalance)) + + // Last justified slot + if state.CurrentJustifiedCheckpoint != nil { + beaconCurrentJustifiedEpoch.Set(float64(state.CurrentJustifiedCheckpoint.Epoch)) + beaconCurrentJustifiedRoot.Set(float64(bytesutil.ToLowInt64(state.CurrentJustifiedCheckpoint.Root))) + } + // Last previous justified slot + if state.PreviousJustifiedCheckpoint != nil { + beaconPrevJustifiedEpoch.Set(float64(state.PreviousJustifiedCheckpoint.Epoch)) + beaconPrevJustifiedRoot.Set(float64(bytesutil.ToLowInt64(state.PreviousJustifiedCheckpoint.Root))) + } + // Last finalized slot + if state.FinalizedCheckpoint != nil { + beaconFinalizedEpoch.Set(float64(state.FinalizedCheckpoint.Epoch)) + beaconFinalizedRoot.Set(float64(bytesutil.ToLowInt64(state.FinalizedCheckpoint.Root))) + } + if state.Eth1Data != nil { + currentEth1DataDepositCount.Set(float64(state.Eth1Data.DepositCount)) + } + + if precompute.Balances != nil { + totalEligibleBalances.Set(float64(precompute.Balances.PrevEpoch)) + totalVotedTargetBalances.Set(float64(precompute.Balances.PrevEpochTargetAttesters)) + } +} diff --git a/beacon-chain/blockchain/process_block.go b/beacon-chain/blockchain/process_block.go new file mode 100644 index 0000000000..6c62ba6916 --- /dev/null +++ b/beacon-chain/blockchain/process_block.go @@ -0,0 +1,232 @@ +package blockchain + +import ( + "context" + "encoding/hex" + "fmt" + + "github.com/pkg/errors" + ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" + "github.com/prysmaticlabs/go-ssz" + "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" + "github.com/prysmaticlabs/prysm/beacon-chain/core/state" + pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" + "github.com/prysmaticlabs/prysm/shared/featureconfig" + "github.com/sirupsen/logrus" + "go.opencensus.io/trace" +) + +// onBlock is called when a gossip block is received. It runs regular state transition on the block. +// +// Spec pseudocode definition: +// def on_block(store: Store, block: BeaconBlock) -> None: +// # Make a copy of the state to avoid mutability issues +// assert block.parent_root in store.block_states +// pre_state = store.block_states[block.parent_root].copy() +// # Blocks cannot be in the future. If they are, their consideration must be delayed until the are in the past. +// assert store.time >= pre_state.genesis_time + block.slot * SECONDS_PER_SLOT +// # Add new block to the store +// store.blocks[signing_root(block)] = block +// # Check block is a descendant of the finalized block +// assert ( +// get_ancestor(store, signing_root(block), store.blocks[store.finalized_checkpoint.root].slot) == +// store.finalized_checkpoint.root +// ) +// # Check that block is later than the finalized epoch slot +// assert block.slot > compute_start_slot_of_epoch(store.finalized_checkpoint.epoch) +// # Check the block is valid and compute the post-state +// state = state_transition(pre_state, block) +// # Add new state for this block to the store +// store.block_states[signing_root(block)] = state +// +// # Update justified checkpoint +// if state.current_justified_checkpoint.epoch > store.justified_checkpoint.epoch: +// if state.current_justified_checkpoint.epoch > store.best_justified_checkpoint.epoch: +// store.best_justified_checkpoint = state.current_justified_checkpoint +// +// # Update finalized checkpoint +// if state.finalized_checkpoint.epoch > store.finalized_checkpoint.epoch: +// store.finalized_checkpoint = state.finalized_checkpoint +func (s *Service) onBlock(ctx context.Context, signed *ethpb.SignedBeaconBlock) (*pb.BeaconState, error) { + ctx, span := trace.StartSpan(ctx, "forkchoice.onBlock") + defer span.End() + + if signed == nil || signed.Block == nil { + return nil, errors.New("nil block") + } + + b := signed.Block + + // Retrieve incoming block's pre state. + preState, err := s.getBlockPreState(ctx, b) + if err != nil { + return nil, err + } + preStateValidatorCount := len(preState.Validators) + + root, err := ssz.HashTreeRoot(b) + if err != nil { + return nil, errors.Wrapf(err, "could not get signing root of block %d", b.Slot) + } + log.WithFields(logrus.Fields{ + "slot": b.Slot, + "root": fmt.Sprintf("0x%s...", hex.EncodeToString(root[:])[:8]), + }).Info("Executing state transition on block") + + postState, err := state.ExecuteStateTransition(ctx, preState, signed) + if err != nil { + return nil, errors.Wrap(err, "could not execute state transition") + } + + if err := s.beaconDB.SaveBlock(ctx, signed); err != nil { + return nil, errors.Wrapf(err, "could not save block from slot %d", b.Slot) + } + if err := s.beaconDB.SaveState(ctx, postState, root); err != nil { + return nil, errors.Wrap(err, "could not save state") + } + + // Update justified check point. + if postState.CurrentJustifiedCheckpoint.Epoch > s.justifiedCheckpt.Epoch { + if err := s.updateJustified(ctx, postState); err != nil { + return nil, err + } + } + + // Update finalized check point. Prune the block cache and helper caches on every new finalized epoch. + if postState.FinalizedCheckpoint.Epoch > s.finalizedCheckpt.Epoch { + if err := s.beaconDB.SaveFinalizedCheckpoint(ctx, postState.FinalizedCheckpoint); err != nil { + return nil, errors.Wrap(err, "could not save finalized checkpoint") + } + + startSlot := helpers.StartSlot(s.prevFinalizedCheckpt.Epoch) + endSlot := helpers.StartSlot(s.finalizedCheckpt.Epoch) + if endSlot > startSlot { + if err := s.rmStatesOlderThanLastFinalized(ctx, startSlot, endSlot); err != nil { + return nil, errors.Wrapf(err, "could not delete states prior to finalized check point, range: %d, %d", + startSlot, endSlot) + } + } + + s.prevFinalizedCheckpt = s.finalizedCheckpt + s.finalizedCheckpt = postState.FinalizedCheckpoint + } + + // Update validator indices in database as needed. + if err := s.saveNewValidators(ctx, preStateValidatorCount, postState); err != nil { + return nil, errors.Wrap(err, "could not save finalized checkpoint") + } + + // Epoch boundary bookkeeping such as logging epoch summaries. + if postState.Slot >= s.nextEpochBoundarySlot { + logEpochData(postState) + reportEpochMetrics(postState) + + // Update committees cache at epoch boundary slot. + if err := helpers.UpdateCommitteeCache(postState, helpers.CurrentEpoch(postState)); err != nil { + return nil, err + } + if err := helpers.UpdateProposerIndicesInCache(postState, helpers.CurrentEpoch(postState)); err != nil { + return nil, err + } + + s.nextEpochBoundarySlot = helpers.StartSlot(helpers.NextEpoch(postState)) + } + + return postState, nil +} + +// onBlockInitialSyncStateTransition is called when an initial sync block is received. +// It runs state transition on the block and without any BLS verification. The excluded BLS verification +// includes attestation's aggregated signature. It also does not save attestations. +func (s *Service) onBlockInitialSyncStateTransition(ctx context.Context, signed *ethpb.SignedBeaconBlock) (*pb.BeaconState, error) { + ctx, span := trace.StartSpan(ctx, "forkchoice.onBlock") + defer span.End() + + if signed == nil || signed.Block == nil { + return nil, errors.New("nil block") + } + + b := signed.Block + + s.initSyncStateLock.Lock() + defer s.initSyncStateLock.Unlock() + + // Retrieve incoming block's pre state. + preState, err := s.cachedPreState(ctx, b) + if err != nil { + return nil, err + } + preStateValidatorCount := len(preState.Validators) + + postState, err := state.ExecuteStateTransitionNoVerifyAttSigs(ctx, preState, signed) + if err != nil { + return nil, errors.Wrap(err, "could not execute state transition") + } + + if err := s.beaconDB.SaveBlock(ctx, signed); err != nil { + return nil, errors.Wrapf(err, "could not save block from slot %d", b.Slot) + } + root, err := ssz.HashTreeRoot(b) + if err != nil { + return nil, errors.Wrapf(err, "could not get signing root of block %d", b.Slot) + } + + if featureconfig.Get().InitSyncCacheState { + s.initSyncState[root] = postState + } else { + if err := s.beaconDB.SaveState(ctx, postState, root); err != nil { + return nil, errors.Wrap(err, "could not save state") + } + } + + // Update justified check point. + if postState.CurrentJustifiedCheckpoint.Epoch > s.justifiedCheckpt.Epoch { + if err := s.updateJustified(ctx, postState); err != nil { + return nil, err + } + } + + // Update finalized check point. Prune the block cache and helper caches on every new finalized epoch. + if postState.FinalizedCheckpoint.Epoch > s.finalizedCheckpt.Epoch { + startSlot := helpers.StartSlot(s.prevFinalizedCheckpt.Epoch) + endSlot := helpers.StartSlot(s.finalizedCheckpt.Epoch) + if endSlot > startSlot { + if err := s.rmStatesOlderThanLastFinalized(ctx, startSlot, endSlot); err != nil { + return nil, errors.Wrapf(err, "could not delete states prior to finalized check point, range: %d, %d", + startSlot, endSlot) + } + } + + if err := s.saveInitState(ctx, postState); err != nil { + return nil, errors.Wrap(err, "could not save init sync finalized state") + } + + if err := s.beaconDB.SaveFinalizedCheckpoint(ctx, postState.FinalizedCheckpoint); err != nil { + return nil, errors.Wrap(err, "could not save finalized checkpoint") + } + + s.prevFinalizedCheckpt = s.finalizedCheckpt + s.finalizedCheckpt = postState.FinalizedCheckpoint + } + + // Update validator indices in database as needed. + if err := s.saveNewValidators(ctx, preStateValidatorCount, postState); err != nil { + return nil, errors.Wrap(err, "could not save finalized checkpoint") + } + + // Epoch boundary bookkeeping such as logging epoch summaries. + if postState.Slot >= s.nextEpochBoundarySlot { + reportEpochMetrics(postState) + s.nextEpochBoundarySlot = helpers.StartSlot(helpers.NextEpoch(postState)) + + // Update committees cache at epoch boundary slot. + if err := helpers.UpdateCommitteeCache(postState, helpers.CurrentEpoch(postState)); err != nil { + return nil, err + } + if err := helpers.UpdateProposerIndicesInCache(postState, helpers.CurrentEpoch(postState)); err != nil { + return nil, err + } + } + + return postState, nil +} diff --git a/beacon-chain/blockchain/process_block_helpers.go b/beacon-chain/blockchain/process_block_helpers.go new file mode 100644 index 0000000000..a20f8f169b --- /dev/null +++ b/beacon-chain/blockchain/process_block_helpers.go @@ -0,0 +1,363 @@ +package blockchain + +import ( + "bytes" + "context" + "fmt" + "time" + + "github.com/gogo/protobuf/proto" + "github.com/pkg/errors" + ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" + "github.com/prysmaticlabs/go-ssz" + "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" + "github.com/prysmaticlabs/prysm/beacon-chain/db/filters" + pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" + "github.com/prysmaticlabs/prysm/shared/bytesutil" + "github.com/prysmaticlabs/prysm/shared/featureconfig" + "github.com/prysmaticlabs/prysm/shared/params" + "github.com/prysmaticlabs/prysm/shared/traceutil" + "github.com/sirupsen/logrus" + "go.opencensus.io/trace" +) + +// getBlockPreState returns the pre state of an incoming block. It uses the parent root of the block +// to retrieve the state in DB. It verifies the pre state's validity and the incoming block +// is in the correct time window. +func (s *Service) getBlockPreState(ctx context.Context, b *ethpb.BeaconBlock) (*pb.BeaconState, error) { + ctx, span := trace.StartSpan(ctx, "forkchoice.getBlockPreState") + defer span.End() + + // Verify incoming block has a valid pre state. + preState, err := s.verifyBlkPreState(ctx, b) + if err != nil { + return nil, err + } + + // Verify block slot time is not from the feature. + if err := helpers.VerifySlotTime(preState.GenesisTime, b.Slot); err != nil { + return nil, err + } + + // Verify block is a descendent of a finalized block. + if err := s.verifyBlkDescendant(ctx, bytesutil.ToBytes32(b.ParentRoot), b.Slot); err != nil { + return nil, err + } + + // Verify block is later than the finalized epoch slot. + if err := s.verifyBlkFinalizedSlot(b); err != nil { + return nil, err + } + + return preState, nil +} + +// verifyBlkPreState validates input block has a valid pre-state. +func (s *Service) verifyBlkPreState(ctx context.Context, b *ethpb.BeaconBlock) (*pb.BeaconState, error) { + preState, err := s.beaconDB.State(ctx, bytesutil.ToBytes32(b.ParentRoot)) + if err != nil { + return nil, errors.Wrapf(err, "could not get pre state for slot %d", b.Slot) + } + if preState == nil { + return nil, fmt.Errorf("pre state of slot %d does not exist", b.Slot) + } + return preState, nil +} + +// verifyBlkDescendant validates input block root is a descendant of the +// current finalized block root. +func (s *Service) verifyBlkDescendant(ctx context.Context, root [32]byte, slot uint64) error { + ctx, span := trace.StartSpan(ctx, "forkchoice.verifyBlkDescendant") + defer span.End() + + finalizedBlkSigned, err := s.beaconDB.Block(ctx, bytesutil.ToBytes32(s.finalizedCheckpt.Root)) + if err != nil || finalizedBlkSigned == nil || finalizedBlkSigned.Block == nil { + return errors.Wrap(err, "could not get finalized block") + } + finalizedBlk := finalizedBlkSigned.Block + + bFinalizedRoot, err := s.ancestor(ctx, root[:], finalizedBlk.Slot) + if err != nil { + return errors.Wrap(err, "could not get finalized block root") + } + if !bytes.Equal(bFinalizedRoot, s.finalizedCheckpt.Root) { + err := fmt.Errorf("block from slot %d is not a descendent of the current finalized block slot %d, %#x != %#x", + slot, finalizedBlk.Slot, bytesutil.Trunc(bFinalizedRoot), bytesutil.Trunc(s.finalizedCheckpt.Root)) + traceutil.AnnotateError(span, err) + return err + } + return nil +} + +// verifyBlkFinalizedSlot validates input block is not less than or equal +// to current finalized slot. +func (s *Service) verifyBlkFinalizedSlot(b *ethpb.BeaconBlock) error { + finalizedSlot := helpers.StartSlot(s.finalizedCheckpt.Epoch) + if finalizedSlot >= b.Slot { + return fmt.Errorf("block is equal or earlier than finalized block, slot %d < slot %d", b.Slot, finalizedSlot) + } + return nil +} + +// saveNewValidators saves newly added validator indices from the state to db. +// Does nothing if validator count has not changed. +func (s *Service) saveNewValidators(ctx context.Context, preStateValidatorCount int, postState *pb.BeaconState) error { + postStateValidatorCount := len(postState.Validators) + if preStateValidatorCount != postStateValidatorCount { + indices := make([]uint64, 0) + pubKeys := make([][]byte, 0) + for i := preStateValidatorCount; i < postStateValidatorCount; i++ { + indices = append(indices, uint64(i)) + pubKeys = append(pubKeys, postState.Validators[i].PublicKey) + } + if err := s.beaconDB.SaveValidatorIndices(ctx, pubKeys, indices); err != nil { + return errors.Wrapf(err, "could not save activated validators: %v", indices) + } + log.WithFields(logrus.Fields{ + "indices": indices, + "totalValidatorCount": postStateValidatorCount - preStateValidatorCount, + }).Info("Validator indices saved in DB") + } + return nil +} + +// rmStatesOlderThanLastFinalized deletes the states in db since last finalized check point. +func (s *Service) rmStatesOlderThanLastFinalized(ctx context.Context, startSlot uint64, endSlot uint64) error { + ctx, span := trace.StartSpan(ctx, "forkchoice.rmStatesBySlots") + defer span.End() + + // Make sure start slot is not a skipped slot + for i := startSlot; i > 0; i-- { + filter := filters.NewFilter().SetStartSlot(i).SetEndSlot(i) + b, err := s.beaconDB.Blocks(ctx, filter) + if err != nil { + return err + } + if len(b) > 0 { + startSlot = i + break + } + } + + // Make sure finalized slot is not a skipped slot. + for i := endSlot; i > 0; i-- { + filter := filters.NewFilter().SetStartSlot(i).SetEndSlot(i) + b, err := s.beaconDB.Blocks(ctx, filter) + if err != nil { + return err + } + if len(b) > 0 { + endSlot = i - 1 + break + } + } + + // Do not remove genesis state + if startSlot == 0 { + startSlot++ + } + // If end slot comes less than start slot + if endSlot < startSlot { + endSlot = startSlot + } + + filter := filters.NewFilter().SetStartSlot(startSlot).SetEndSlot(endSlot) + roots, err := s.beaconDB.BlockRoots(ctx, filter) + if err != nil { + return err + } + + roots, err = s.filterBlockRoots(ctx, roots) + if err != nil { + return err + } + + if err := s.beaconDB.DeleteStates(ctx, roots); err != nil { + return err + } + + return nil +} + +// shouldUpdateCurrentJustified prevents bouncing attack, by only update conflicting justified +// checkpoints in the fork choice if in the early slots of the epoch. +// Otherwise, delay incorporation of new justified checkpoint until next epoch boundary. +// See https://ethresear.ch/t/prevention-of-bouncing-attack-on-ffg/6114 for more detailed analysis and discussion. +func (s *Service) shouldUpdateCurrentJustified(ctx context.Context, newJustifiedCheckpt *ethpb.Checkpoint) (bool, error) { + if helpers.SlotsSinceEpochStarts(s.currentSlot()) < params.BeaconConfig().SafeSlotsToUpdateJustified { + return true, nil + } + newJustifiedBlockSigned, err := s.beaconDB.Block(ctx, bytesutil.ToBytes32(newJustifiedCheckpt.Root)) + if err != nil { + return false, err + } + if newJustifiedBlockSigned == nil || newJustifiedBlockSigned.Block == nil { + return false, errors.New("nil new justified block") + } + newJustifiedBlock := newJustifiedBlockSigned.Block + if newJustifiedBlock.Slot <= helpers.StartSlot(s.justifiedCheckpt.Epoch) { + return false, nil + } + justifiedBlockSigned, err := s.beaconDB.Block(ctx, bytesutil.ToBytes32(s.justifiedCheckpt.Root)) + if err != nil { + return false, err + } + if justifiedBlockSigned == nil || justifiedBlockSigned.Block == nil { + return false, errors.New("nil justified block") + } + justifiedBlock := justifiedBlockSigned.Block + b, err := s.ancestor(ctx, newJustifiedCheckpt.Root, justifiedBlock.Slot) + if err != nil { + return false, err + } + if !bytes.Equal(b, s.justifiedCheckpt.Root) { + return false, nil + } + return true, nil +} + +func (s *Service) updateJustified(ctx context.Context, state *pb.BeaconState) error { + if state.CurrentJustifiedCheckpoint.Epoch > s.bestJustifiedCheckpt.Epoch { + s.bestJustifiedCheckpt = state.CurrentJustifiedCheckpoint + } + canUpdate, err := s.shouldUpdateCurrentJustified(ctx, state.CurrentJustifiedCheckpoint) + if err != nil { + return err + } + if canUpdate { + s.justifiedCheckpt = state.CurrentJustifiedCheckpoint + } + + if featureconfig.Get().InitSyncCacheState { + justifiedRoot := bytesutil.ToBytes32(state.CurrentJustifiedCheckpoint.Root) + justifiedState := s.initSyncState[justifiedRoot] + if err := s.beaconDB.SaveState(ctx, justifiedState, justifiedRoot); err != nil { + return errors.Wrap(err, "could not save justified state") + } + } + + return s.beaconDB.SaveJustifiedCheckpoint(ctx, state.CurrentJustifiedCheckpoint) +} + +// currentSlot returns the current slot based on time. +func (s *Service) currentSlot() uint64 { + return uint64(time.Now().Unix()-s.genesisTime.Unix()) / params.BeaconConfig().SecondsPerSlot +} + +// This receives cached state in memory for initial sync only during initial sync. +func (s *Service) cachedPreState(ctx context.Context, b *ethpb.BeaconBlock) (*pb.BeaconState, error) { + if featureconfig.Get().InitSyncCacheState { + preState := s.initSyncState[bytesutil.ToBytes32(b.ParentRoot)] + var err error + if preState == nil { + preState, err = s.beaconDB.State(ctx, bytesutil.ToBytes32(b.ParentRoot)) + if err != nil { + return nil, errors.Wrapf(err, "could not get pre state for slot %d", b.Slot) + } + if preState == nil { + return nil, fmt.Errorf("pre state of slot %d does not exist", b.Slot) + } + } + return proto.Clone(preState).(*pb.BeaconState), nil + } + + preState, err := s.beaconDB.State(ctx, bytesutil.ToBytes32(b.ParentRoot)) + if err != nil { + return nil, errors.Wrapf(err, "could not get pre state for slot %d", b.Slot) + } + if preState == nil { + return nil, fmt.Errorf("pre state of slot %d does not exist", b.Slot) + } + + return preState, nil +} + +// This saves every finalized state in DB during initial sync, needed as part of optimization to +// use cache state during initial sync in case of restart. +func (s *Service) saveInitState(ctx context.Context, state *pb.BeaconState) error { + if !featureconfig.Get().InitSyncCacheState { + return nil + } + finalizedRoot := bytesutil.ToBytes32(state.FinalizedCheckpoint.Root) + fs := s.initSyncState[finalizedRoot] + + if err := s.beaconDB.SaveState(ctx, fs, finalizedRoot); err != nil { + return errors.Wrap(err, "could not save state") + } + for r, oldState := range s.initSyncState { + if oldState.Slot < state.FinalizedCheckpoint.Epoch*params.BeaconConfig().SlotsPerEpoch { + delete(s.initSyncState, r) + } + } + return nil +} + +// This filters block roots that are not known as head root and finalized root in DB. +// It serves as the last line of defence before we prune states. +func (s *Service) filterBlockRoots(ctx context.Context, roots [][32]byte) ([][32]byte, error) { + f, err := s.beaconDB.FinalizedCheckpoint(ctx) + if err != nil { + return nil, err + } + fRoot := f.Root + h, err := s.beaconDB.HeadBlock(ctx) + if err != nil { + return nil, err + } + hRoot, err := ssz.SigningRoot(h) + if err != nil { + return nil, err + } + + filtered := make([][32]byte, 0, len(roots)) + for _, root := range roots { + if bytes.Equal(root[:], fRoot[:]) || bytes.Equal(root[:], hRoot[:]) { + continue + } + filtered = append(filtered, root) + } + + return filtered, nil +} + +// ancestor returns the block root of an ancestry block from the input block root. +// +// Spec pseudocode definition: +// def get_ancestor(store: Store, root: Hash, slot: Slot) -> Hash: +// block = store.blocks[root] +// if block.slot > slot: +// return get_ancestor(store, block.parent_root, slot) +// elif block.slot == slot: +// return root +// else: +// return Bytes32() # root is older than queried slot: no results. +func (s *Service) ancestor(ctx context.Context, root []byte, slot uint64) ([]byte, error) { + ctx, span := trace.StartSpan(ctx, "forkchoice.ancestor") + defer span.End() + + // Stop recursive ancestry lookup if context is cancelled. + if ctx.Err() != nil { + return nil, ctx.Err() + } + + signed, err := s.beaconDB.Block(ctx, bytesutil.ToBytes32(root)) + if err != nil { + return nil, errors.Wrap(err, "could not get ancestor block") + } + if signed == nil || signed.Block == nil { + return nil, errors.New("nil block") + } + b := signed.Block + + // If we dont have the ancestor in the DB, simply return nil so rest of fork choice + // operation can proceed. This is not an error condition. + if b == nil || b.Slot < slot { + return nil, nil + } + + if b.Slot == slot { + return root, nil + } + + return s.ancestor(ctx, b.ParentRoot, slot) +} diff --git a/beacon-chain/blockchain/process_block_test.go b/beacon-chain/blockchain/process_block_test.go new file mode 100644 index 0000000000..4814ec2a3e --- /dev/null +++ b/beacon-chain/blockchain/process_block_test.go @@ -0,0 +1,570 @@ +package blockchain + +import ( + "context" + "reflect" + "strings" + "testing" + "time" + + ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" + "github.com/prysmaticlabs/go-ssz" + "github.com/prysmaticlabs/prysm/beacon-chain/core/blocks" + "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" + "github.com/prysmaticlabs/prysm/beacon-chain/db" + testDB "github.com/prysmaticlabs/prysm/beacon-chain/db/testing" + pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" + "github.com/prysmaticlabs/prysm/shared/bytesutil" + "github.com/prysmaticlabs/prysm/shared/featureconfig" + "github.com/prysmaticlabs/prysm/shared/params" + "github.com/prysmaticlabs/prysm/shared/stateutil" +) + +func TestStore_OnBlock(t *testing.T) { + ctx := context.Background() + db := testDB.SetupDB(t) + defer testDB.TeardownDB(t, db) + + cfg := &Config{BeaconDB: db} + service, err := NewService(ctx, cfg) + if err != nil { + t.Fatal(err) + } + + genesisStateRoot, err := stateutil.HashTreeRootState(&pb.BeaconState{}) + if err != nil { + t.Error(err) + } + genesis := blocks.NewGenesisBlock(genesisStateRoot[:]) + if err := db.SaveBlock(ctx, genesis); err != nil { + t.Error(err) + } + validGenesisRoot, err := ssz.HashTreeRoot(genesis.Block) + if err != nil { + t.Error(err) + } + if err := service.beaconDB.SaveState(ctx, &pb.BeaconState{}, validGenesisRoot); err != nil { + t.Fatal(err) + } + roots, err := blockTree1(db, validGenesisRoot[:]) + if err != nil { + t.Fatal(err) + } + random := ðpb.SignedBeaconBlock{Block: ðpb.BeaconBlock{Slot: 1, ParentRoot: validGenesisRoot[:]}} + if err := db.SaveBlock(ctx, random); err != nil { + t.Error(err) + } + randomParentRoot, err := ssz.HashTreeRoot(random.Block) + if err != nil { + t.Error(err) + } + if err := service.beaconDB.SaveState(ctx, &pb.BeaconState{}, randomParentRoot); err != nil { + t.Fatal(err) + } + randomParentRoot2 := roots[1] + if err := service.beaconDB.SaveState(ctx, &pb.BeaconState{}, bytesutil.ToBytes32(randomParentRoot2)); err != nil { + t.Fatal(err) + } + + tests := []struct { + name string + blk *ethpb.BeaconBlock + s *pb.BeaconState + time uint64 + wantErrString string + }{ + { + name: "parent block root does not have a state", + blk: ðpb.BeaconBlock{}, + s: &pb.BeaconState{}, + wantErrString: "pre state of slot 0 does not exist", + }, + { + name: "block is from the feature", + blk: ðpb.BeaconBlock{ParentRoot: randomParentRoot[:], Slot: params.BeaconConfig().FarFutureEpoch}, + s: &pb.BeaconState{}, + wantErrString: "could not process slot from the future", + }, + { + name: "could not get finalized block", + blk: ðpb.BeaconBlock{ParentRoot: randomParentRoot[:]}, + s: &pb.BeaconState{}, + wantErrString: "block from slot 0 is not a descendent of the current finalized block", + }, + { + name: "same slot as finalized block", + blk: ðpb.BeaconBlock{Slot: 0, ParentRoot: randomParentRoot2}, + s: &pb.BeaconState{}, + wantErrString: "block is equal or earlier than finalized block, slot 0 < slot 0", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + service.justifiedCheckpt = ðpb.Checkpoint{Root: validGenesisRoot[:]} + service.bestJustifiedCheckpt = ðpb.Checkpoint{Root: validGenesisRoot[:]} + service.finalizedCheckpt = ðpb.Checkpoint{Root: validGenesisRoot[:]} + service.prevFinalizedCheckpt = ðpb.Checkpoint{Root: validGenesisRoot[:]} + service.finalizedCheckpt.Root = roots[0] + + _, err := service.onBlock(ctx, ðpb.SignedBeaconBlock{Block: tt.blk}) + if !strings.Contains(err.Error(), tt.wantErrString) { + t.Errorf("Store.OnBlock() error = %v, wantErr = %v", err, tt.wantErrString) + } + }) + } +} + +func TestStore_SaveNewValidators(t *testing.T) { + ctx := context.Background() + db := testDB.SetupDB(t) + defer testDB.TeardownDB(t, db) + + cfg := &Config{BeaconDB: db} + service, err := NewService(ctx, cfg) + if err != nil { + t.Fatal(err) + } + preCount := 2 // validators 0 and validators 1 + s := &pb.BeaconState{Validators: []*ethpb.Validator{ + {PublicKey: []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}}, + {PublicKey: []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}}, + {PublicKey: []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2}}, + {PublicKey: []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3}}, + }} + if err := service.saveNewValidators(ctx, preCount, s); err != nil { + t.Fatal(err) + } + + if !db.HasValidatorIndex(ctx, []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2}) { + t.Error("Wanted validator saved in db") + } + if !db.HasValidatorIndex(ctx, []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3}) { + t.Error("Wanted validator saved in db") + } + if db.HasValidatorIndex(ctx, []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}) { + t.Error("validator not suppose to be saved in db") + } +} + +func TestRemoveStateSinceLastFinalized(t *testing.T) { + ctx := context.Background() + db := testDB.SetupDB(t) + defer testDB.TeardownDB(t, db) + params.UseMinimalConfig() + defer params.UseMainnetConfig() + + cfg := &Config{BeaconDB: db} + service, err := NewService(ctx, cfg) + if err != nil { + t.Fatal(err) + } + + // Save 100 blocks in DB, each has a state. + numBlocks := 100 + totalBlocks := make([]*ethpb.SignedBeaconBlock, numBlocks) + blockRoots := make([][32]byte, 0) + for i := 0; i < len(totalBlocks); i++ { + totalBlocks[i] = ðpb.SignedBeaconBlock{ + Block: ðpb.BeaconBlock{ + Slot: uint64(i), + }, + } + r, err := ssz.HashTreeRoot(totalBlocks[i].Block) + if err != nil { + t.Fatal(err) + } + if err := service.beaconDB.SaveState(ctx, &pb.BeaconState{Slot: uint64(i)}, r); err != nil { + t.Fatal(err) + } + if err := service.beaconDB.SaveBlock(ctx, totalBlocks[i]); err != nil { + t.Fatal(err) + } + blockRoots = append(blockRoots, r) + if err := service.beaconDB.SaveHeadBlockRoot(ctx, r); err != nil { + t.Fatal(err) + } + } + + // New finalized epoch: 1 + finalizedEpoch := uint64(1) + finalizedSlot := finalizedEpoch * params.BeaconConfig().SlotsPerEpoch + endSlot := helpers.StartSlot(finalizedEpoch+1) - 1 // Inclusive + if err := service.rmStatesOlderThanLastFinalized(ctx, 0, endSlot); err != nil { + t.Fatal(err) + } + for _, r := range blockRoots { + s, err := service.beaconDB.State(ctx, r) + if err != nil { + t.Fatal(err) + } + // Also verifies genesis state didnt get deleted + if s != nil && s.Slot != finalizedSlot && s.Slot != 0 && s.Slot < endSlot { + t.Errorf("State with slot %d should not be in DB", s.Slot) + } + } + + // New finalized epoch: 5 + newFinalizedEpoch := uint64(5) + newFinalizedSlot := newFinalizedEpoch * params.BeaconConfig().SlotsPerEpoch + endSlot = helpers.StartSlot(newFinalizedEpoch+1) - 1 // Inclusive + if err := service.rmStatesOlderThanLastFinalized(ctx, helpers.StartSlot(finalizedEpoch+1)-1, endSlot); err != nil { + t.Fatal(err) + } + for _, r := range blockRoots { + s, err := service.beaconDB.State(ctx, r) + if err != nil { + t.Fatal(err) + } + // Also verifies genesis state didnt get deleted + if s != nil && s.Slot != newFinalizedSlot && s.Slot != finalizedSlot && s.Slot != 0 && s.Slot < endSlot { + t.Errorf("State with slot %d should not be in DB", s.Slot) + } + } +} + +func TestRemoveStateSinceLastFinalized_EmptyStartSlot(t *testing.T) { + ctx := context.Background() + db := testDB.SetupDB(t) + defer testDB.TeardownDB(t, db) + params.UseMinimalConfig() + defer params.UseMainnetConfig() + + cfg := &Config{BeaconDB: db} + service, err := NewService(ctx, cfg) + if err != nil { + t.Fatal(err) + } + service.genesisTime = time.Now() + + update, err := service.shouldUpdateCurrentJustified(ctx, ðpb.Checkpoint{}) + if err != nil { + t.Fatal(err) + } + if !update { + t.Error("Should be able to update justified, received false") + } + + lastJustifiedBlk := ðpb.SignedBeaconBlock{Block: ðpb.BeaconBlock{ParentRoot: []byte{'G'}}} + lastJustifiedRoot, _ := ssz.HashTreeRoot(lastJustifiedBlk.Block) + newJustifiedBlk := ðpb.SignedBeaconBlock{Block: ðpb.BeaconBlock{Slot: 1, ParentRoot: lastJustifiedRoot[:]}} + newJustifiedRoot, _ := ssz.HashTreeRoot(newJustifiedBlk.Block) + if err := service.beaconDB.SaveBlock(ctx, newJustifiedBlk); err != nil { + t.Fatal(err) + } + if err := service.beaconDB.SaveBlock(ctx, lastJustifiedBlk); err != nil { + t.Fatal(err) + } + + diff := (params.BeaconConfig().SlotsPerEpoch - 1) * params.BeaconConfig().SecondsPerSlot + service.genesisTime = time.Unix(time.Now().Unix()-int64(diff), 0) + service.justifiedCheckpt = ðpb.Checkpoint{Root: lastJustifiedRoot[:]} + update, err = service.shouldUpdateCurrentJustified(ctx, ðpb.Checkpoint{Root: newJustifiedRoot[:]}) + if err != nil { + t.Fatal(err) + } + if !update { + t.Error("Should be able to update justified, received false") + } +} + +func TestShouldUpdateJustified_ReturnFalse(t *testing.T) { + ctx := context.Background() + db := testDB.SetupDB(t) + defer testDB.TeardownDB(t, db) + params.UseMinimalConfig() + defer params.UseMainnetConfig() + + cfg := &Config{BeaconDB: db} + service, err := NewService(ctx, cfg) + if err != nil { + t.Fatal(err) + } + lastJustifiedBlk := ðpb.SignedBeaconBlock{Block: ðpb.BeaconBlock{ParentRoot: []byte{'G'}}} + lastJustifiedRoot, _ := ssz.HashTreeRoot(lastJustifiedBlk.Block) + newJustifiedBlk := ðpb.SignedBeaconBlock{Block: ðpb.BeaconBlock{ParentRoot: lastJustifiedRoot[:]}} + newJustifiedRoot, _ := ssz.HashTreeRoot(newJustifiedBlk.Block) + if err := service.beaconDB.SaveBlock(ctx, newJustifiedBlk); err != nil { + t.Fatal(err) + } + if err := service.beaconDB.SaveBlock(ctx, lastJustifiedBlk); err != nil { + t.Fatal(err) + } + + diff := (params.BeaconConfig().SlotsPerEpoch - 1) * params.BeaconConfig().SecondsPerSlot + service.genesisTime = time.Unix(time.Now().Unix()-int64(diff), 0) + service.justifiedCheckpt = ðpb.Checkpoint{Root: lastJustifiedRoot[:]} + + update, err := service.shouldUpdateCurrentJustified(ctx, ðpb.Checkpoint{Root: newJustifiedRoot[:]}) + if err != nil { + t.Fatal(err) + } + if update { + t.Error("Should not be able to update justified, received true") + } +} + +func TestCachedPreState_CanGetFromCache(t *testing.T) { + ctx := context.Background() + db := testDB.SetupDB(t) + defer testDB.TeardownDB(t, db) + + cfg := &Config{BeaconDB: db} + service, err := NewService(ctx, cfg) + if err != nil { + t.Fatal(err) + } + + s := &pb.BeaconState{Slot: 1} + r := [32]byte{'A'} + b := ðpb.BeaconBlock{Slot: 1, ParentRoot: r[:]} + service.initSyncState[r] = s + + wanted := "pre state of slot 1 does not exist" + if _, err := service.cachedPreState(ctx, b); !strings.Contains(err.Error(), wanted) { + t.Fatal("Not expected error") + } +} + +func TestCachedPreState_CanGetFromCacheWithFeature(t *testing.T) { + ctx := context.Background() + db := testDB.SetupDB(t) + defer testDB.TeardownDB(t, db) + config := &featureconfig.Flags{ + InitSyncCacheState: true, + } + featureconfig.Init(config) + + cfg := &Config{BeaconDB: db} + service, err := NewService(ctx, cfg) + if err != nil { + t.Fatal(err) + } + + s := &pb.BeaconState{Slot: 1} + r := [32]byte{'A'} + b := ðpb.BeaconBlock{Slot: 1, ParentRoot: r[:]} + service.initSyncState[r] = s + + received, err := service.cachedPreState(ctx, b) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(s, received) { + t.Error("cached state not the same") + } +} + +func TestCachedPreState_CanGetFromDB(t *testing.T) { + ctx := context.Background() + db := testDB.SetupDB(t) + defer testDB.TeardownDB(t, db) + + cfg := &Config{BeaconDB: db} + service, err := NewService(ctx, cfg) + if err != nil { + t.Fatal(err) + } + + r := [32]byte{'A'} + b := ðpb.BeaconBlock{Slot: 1, ParentRoot: r[:]} + + _, err = service.cachedPreState(ctx, b) + wanted := "pre state of slot 1 does not exist" + if err.Error() != wanted { + t.Error("Did not get wanted error") + } + + s := &pb.BeaconState{Slot: 1} + service.beaconDB.SaveState(ctx, s, r) + + received, err := service.cachedPreState(ctx, b) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(s, received) { + t.Error("cached state not the same") + } +} + +func TestSaveInitState_CanSaveDelete(t *testing.T) { + ctx := context.Background() + db := testDB.SetupDB(t) + defer testDB.TeardownDB(t, db) + + cfg := &Config{BeaconDB: db} + service, err := NewService(ctx, cfg) + if err != nil { + t.Fatal(err) + } + + config := &featureconfig.Flags{ + InitSyncCacheState: true, + } + featureconfig.Init(config) + + for i := uint64(0); i < 64; i++ { + b := ðpb.BeaconBlock{Slot: i} + s := &pb.BeaconState{Slot: i} + r, _ := ssz.HashTreeRoot(b) + service.initSyncState[r] = s + } + + // Set finalized root as slot 32 + finalizedRoot, _ := ssz.HashTreeRoot(ðpb.BeaconBlock{Slot: 32}) + + if err := service.saveInitState(ctx, &pb.BeaconState{FinalizedCheckpoint: ðpb.Checkpoint{ + Epoch: 1, Root: finalizedRoot[:]}}); err != nil { + t.Fatal(err) + } + + // Verify finalized state is saved in DB + finalizedState, err := service.beaconDB.State(ctx, finalizedRoot) + if err != nil { + t.Fatal(err) + } + if finalizedState == nil { + t.Error("finalized state can't be nil") + } + + // Verify cached state is properly pruned + if len(service.initSyncState) != int(params.BeaconConfig().SlotsPerEpoch) { + t.Errorf("wanted: %d, got: %d", len(service.initSyncState), params.BeaconConfig().SlotsPerEpoch) + } +} + +func TestUpdateJustified_CouldUpdateBest(t *testing.T) { + ctx := context.Background() + db := testDB.SetupDB(t) + defer testDB.TeardownDB(t, db) + + cfg := &Config{BeaconDB: db} + service, err := NewService(ctx, cfg) + if err != nil { + t.Fatal(err) + } + + signedBlock := ðpb.SignedBeaconBlock{Block: ðpb.BeaconBlock{}} + if err := db.SaveBlock(ctx, signedBlock); err != nil { + t.Fatal(err) + } + r, err := ssz.HashTreeRoot(signedBlock.Block) + if err != nil { + t.Fatal(err) + } + service.justifiedCheckpt = ðpb.Checkpoint{Root: []byte{'A'}} + service.bestJustifiedCheckpt = ðpb.Checkpoint{Root: []byte{'A'}} + service.initSyncState[r] = &pb.BeaconState{} + if err := db.SaveState(ctx, &pb.BeaconState{}, r); err != nil { + t.Fatal(err) + } + + // Could update + s := &pb.BeaconState{CurrentJustifiedCheckpoint: ðpb.Checkpoint{Epoch: 1, Root: r[:]}} + if err := service.updateJustified(context.Background(), s); err != nil { + t.Fatal(err) + } + + if service.bestJustifiedCheckpt.Epoch != s.CurrentJustifiedCheckpoint.Epoch { + t.Error("Incorrect justified epoch in service") + } + + // Could not update + service.bestJustifiedCheckpt.Epoch = 2 + if err := service.updateJustified(context.Background(), s); err != nil { + t.Fatal(err) + } + + if service.bestJustifiedCheckpt.Epoch != 2 { + t.Error("Incorrect justified epoch in service") + } +} + +func TestFilterBlockRoots_CanFilter(t *testing.T) { + ctx := context.Background() + db := testDB.SetupDB(t) + defer testDB.TeardownDB(t, db) + + cfg := &Config{BeaconDB: db} + service, err := NewService(ctx, cfg) + if err != nil { + t.Fatal(err) + } + + fBlock := ðpb.BeaconBlock{} + fRoot, _ := ssz.HashTreeRoot(fBlock) + hBlock := ðpb.BeaconBlock{Slot: 1} + headRoot, _ := ssz.HashTreeRoot(hBlock) + if err := service.beaconDB.SaveBlock(ctx, ðpb.SignedBeaconBlock{Block: fBlock}); err != nil { + t.Fatal(err) + } + if err := service.beaconDB.SaveState(ctx, &pb.BeaconState{}, fRoot); err != nil { + t.Fatal(err) + } + if err := service.beaconDB.SaveFinalizedCheckpoint(ctx, ðpb.Checkpoint{Root: fRoot[:]}); err != nil { + t.Fatal(err) + } + if err := service.beaconDB.SaveBlock(ctx, ðpb.SignedBeaconBlock{Block: hBlock}); err != nil { + t.Fatal(err) + } + if err := service.beaconDB.SaveState(ctx, &pb.BeaconState{}, headRoot); err != nil { + t.Fatal(err) + } + if err := service.beaconDB.SaveHeadBlockRoot(ctx, headRoot); err != nil { + t.Fatal(err) + } + + roots := [][32]byte{{'C'}, {'D'}, headRoot, {'E'}, fRoot, {'F'}} + wanted := [][32]byte{{'C'}, {'D'}, {'E'}, {'F'}} + + received, err := service.filterBlockRoots(ctx, roots) + if err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(wanted, received) { + t.Error("Did not filter correctly") + } +} + +// blockTree1 constructs the following tree: +// /- B1 +// B0 /- B5 - B7 +// \- B3 - B4 - B6 - B8 +// (B1, and B3 are all from the same slots) +func blockTree1(db db.Database, genesisRoot []byte) ([][]byte, error) { + b0 := ðpb.BeaconBlock{Slot: 0, ParentRoot: genesisRoot} + r0, _ := ssz.HashTreeRoot(b0) + b1 := ðpb.BeaconBlock{Slot: 1, ParentRoot: r0[:]} + r1, _ := ssz.HashTreeRoot(b1) + b3 := ðpb.BeaconBlock{Slot: 3, ParentRoot: r0[:]} + r3, _ := ssz.HashTreeRoot(b3) + b4 := ðpb.BeaconBlock{Slot: 4, ParentRoot: r3[:]} + r4, _ := ssz.HashTreeRoot(b4) + b5 := ðpb.BeaconBlock{Slot: 5, ParentRoot: r4[:]} + r5, _ := ssz.HashTreeRoot(b5) + b6 := ðpb.BeaconBlock{Slot: 6, ParentRoot: r4[:]} + r6, _ := ssz.HashTreeRoot(b6) + b7 := ðpb.BeaconBlock{Slot: 7, ParentRoot: r5[:]} + r7, _ := ssz.HashTreeRoot(b7) + b8 := ðpb.BeaconBlock{Slot: 8, ParentRoot: r6[:]} + r8, _ := ssz.HashTreeRoot(b8) + for _, b := range []*ethpb.BeaconBlock{b0, b1, b3, b4, b5, b6, b7, b8} { + if err := db.SaveBlock(context.Background(), ðpb.SignedBeaconBlock{Block: b}); err != nil { + return nil, err + } + if err := db.SaveState(context.Background(), &pb.BeaconState{}, bytesutil.ToBytes32(b.ParentRoot)); err != nil { + return nil, err + } + } + if err := db.SaveState(context.Background(), &pb.BeaconState{}, r1); err != nil { + return nil, err + } + if err := db.SaveState(context.Background(), &pb.BeaconState{}, r7); err != nil { + return nil, err + } + if err := db.SaveState(context.Background(), &pb.BeaconState{}, r8); err != nil { + return nil, err + } + return [][]byte{r0[:], r1[:], nil, r3[:], r4[:], r5[:], r6[:], r7[:], r8[:]}, nil +} diff --git a/beacon-chain/blockchain/receive_block.go b/beacon-chain/blockchain/receive_block.go index c68af8ef4e..739259aef5 100644 --- a/beacon-chain/blockchain/receive_block.go +++ b/beacon-chain/blockchain/receive_block.go @@ -13,6 +13,7 @@ import ( "github.com/prysmaticlabs/prysm/beacon-chain/core/feed" statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" + pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" "github.com/prysmaticlabs/prysm/shared/bytesutil" "github.com/prysmaticlabs/prysm/shared/featureconfig" "github.com/prysmaticlabs/prysm/shared/traceutil" @@ -70,11 +71,22 @@ func (s *Service) ReceiveBlockNoPubsub(ctx context.Context, block *ethpb.SignedB blockCopy := proto.Clone(block).(*ethpb.SignedBeaconBlock) // Apply state transition on the new block. - postState, err := s.forkChoiceStoreOld.OnBlockCacheFilteredTree(ctx, blockCopy) - if err != nil { - err := errors.Wrap(err, "could not process block from fork choice service") - traceutil.AnnotateError(span, err) - return err + var postState *pb.BeaconState + var err error + if featureconfig.Get().ProtoArrayForkChoice { + postState, err = s.onBlock(ctx, blockCopy) + if err != nil { + err := errors.Wrap(err, "could not process block") + traceutil.AnnotateError(span, err) + return err + } + } else { + postState, err = s.forkChoiceStoreOld.OnBlockCacheFilteredTree(ctx, blockCopy) + if err != nil { + err := errors.Wrap(err, "could not process block from fork choice service") + traceutil.AnnotateError(span, err) + return err + } } root, err := ssz.HashTreeRoot(blockCopy.Block) @@ -135,8 +147,8 @@ func (s *Service) ReceiveBlockNoPubsub(ctx context.Context, block *ethpb.SignedB s.forkChoiceStore.ProcessAttestation(ctx, indices, bytesutil.ToBytes32(a.Data.BeaconBlockRoot), a.Data.Target.Epoch) } - f := s.forkChoiceStoreOld.FinalizedCheckpt() - j := s.forkChoiceStoreOld.JustifiedCheckpt() + f := s.finalizedCheckpt + j := s.justifiedCheckpt headRootProtoArray, err := s.forkChoiceStore.Head( ctx, f.Epoch, @@ -148,7 +160,7 @@ func (s *Service) ReceiveBlockNoPubsub(ctx context.Context, block *ethpb.SignedB return nil } - if postState.FinalizedCheckpoint.Epoch > s.FinalizedCheckpt().Epoch { + if postState.FinalizedCheckpoint.Epoch > f.Epoch { if err := s.forkChoiceStore.Prune(ctx, bytesutil.ToBytes32(postState.FinalizedCheckpoint.Root)); err != nil { return errors.Wrap(err, "could not prune proto array fork choice") } @@ -193,12 +205,23 @@ func (s *Service) ReceiveBlockNoPubsubForkchoice(ctx context.Context, block *eth defer span.End() blockCopy := proto.Clone(block).(*ethpb.SignedBeaconBlock) - // Apply state transition on the incoming newly received block. - postState, err := s.forkChoiceStoreOld.OnBlock(ctx, blockCopy) - if err != nil { - err := errors.Wrap(err, "could not process block from fork choice service") - traceutil.AnnotateError(span, err) - return err + // Apply state transition on the new block. + var postState *pb.BeaconState + var err error + if featureconfig.Get().ProtoArrayForkChoice { + postState, err = s.onBlock(ctx, blockCopy) + if err != nil { + err := errors.Wrap(err, "could not process block") + traceutil.AnnotateError(span, err) + return err + } + } else { + postState, err = s.forkChoiceStoreOld.OnBlock(ctx, blockCopy) + if err != nil { + err := errors.Wrap(err, "could not process block from fork choice service") + traceutil.AnnotateError(span, err) + return err + } } root, err := ssz.HashTreeRoot(blockCopy.Block) @@ -265,10 +288,22 @@ func (s *Service) ReceiveBlockNoVerify(ctx context.Context, block *ethpb.SignedB blockCopy := proto.Clone(block).(*ethpb.SignedBeaconBlock) // Apply state transition on the incoming newly received blockCopy without verifying its BLS contents. - postState, err := s.forkChoiceStoreOld.OnBlockInitialSyncStateTransition(ctx, blockCopy) - if err != nil { - return errors.Wrap(err, "could not process blockCopy from fork choice service") + var postState *pb.BeaconState + var err error + if featureconfig.Get().ProtoArrayForkChoice { + postState, err = s.onBlockInitialSyncStateTransition(ctx, blockCopy) + if err != nil { + err := errors.Wrap(err, "could not process block") + traceutil.AnnotateError(span, err) + return err + } + } else { + postState, err = s.forkChoiceStoreOld.OnBlockInitialSyncStateTransition(ctx, blockCopy) + if err != nil { + return errors.Wrap(err, "could not process blockCopy from fork choice service") + } } + root, err := ssz.HashTreeRoot(blockCopy.Block) if err != nil { return errors.Wrap(err, "could not get signing root on received blockCopy") diff --git a/beacon-chain/blockchain/receive_block_test.go b/beacon-chain/blockchain/receive_block_test.go index 14a16bde98..e725972fb0 100644 --- a/beacon-chain/blockchain/receive_block_test.go +++ b/beacon-chain/blockchain/receive_block_test.go @@ -36,6 +36,10 @@ func TestReceiveBlock_ProcessCorrectly(t *testing.T) { if err != nil { t.Fatal(err) } + if err := chainService.beaconDB.SaveGenesisBlockRoot(ctx, genesisBlkRoot); err != nil { + t.Fatal(err) + } + if err := db.SaveState(ctx, beaconState, genesisBlkRoot); err != nil { t.Fatal(err) } @@ -48,10 +52,6 @@ func TestReceiveBlock_ProcessCorrectly(t *testing.T) { t.Fatalf("Could not save block to db: %v", err) } - if err := db.SaveState(ctx, beaconState, genesisBlkRoot); err != nil { - t.Fatal(err) - } - slot := beaconState.Slot + 1 block, err := testutil.GenerateFullBlock(beaconState, privKeys, nil, slot) if err != nil { @@ -169,6 +169,9 @@ func TestReceiveBlockNoPubsubForkchoice_ProcessCorrectly(t *testing.T) { if err != nil { t.Fatal(err) } + if err := chainService.beaconDB.SaveGenesisBlockRoot(ctx, parentRoot); err != nil { + t.Fatal(err) + } if err := db.SaveState(ctx, beaconState, parentRoot); err != nil { t.Fatal(err) } diff --git a/beacon-chain/blockchain/service.go b/beacon-chain/blockchain/service.go index da284b6160..5bdba85601 100644 --- a/beacon-chain/blockchain/service.go +++ b/beacon-chain/blockchain/service.go @@ -62,6 +62,14 @@ type Service struct { epochParticipation map[uint64]*precompute.Balance epochParticipationLock sync.RWMutex forkChoiceStore f.ForkChoicer + justifiedCheckpt *ethpb.Checkpoint + bestJustifiedCheckpt *ethpb.Checkpoint + finalizedCheckpt *ethpb.Checkpoint + prevFinalizedCheckpt *ethpb.Checkpoint + nextEpochBoundarySlot uint64 + voteLock sync.RWMutex + initSyncState map[[32]byte]*pb.BeaconState + initSyncStateLock sync.RWMutex } // Config options for the service. @@ -98,6 +106,7 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) { stateNotifier: cfg.StateNotifier, epochParticipation: make(map[uint64]*precompute.Balance), forkChoiceStore: cfg.ForkChoiceStore, + initSyncState: make(map[[32]byte]*pb.BeaconState), }, nil } @@ -143,9 +152,18 @@ func (s *Service) Start() { if err := s.forkChoiceStoreOld.GenesisStore(ctx, justifiedCheckpoint, finalizedCheckpoint); err != nil { log.Fatalf("Could not start fork choice service: %v", err) } - if err := s.resumeForkChoice(ctx, justifiedCheckpoint, finalizedCheckpoint); err != nil { - log.Fatalf("Could not resume fork choice: %v", err) + + if featureconfig.Get().ProtoArrayForkChoice { + s.justifiedCheckpt = proto.Clone(justifiedCheckpoint).(*ethpb.Checkpoint) + s.bestJustifiedCheckpt = proto.Clone(justifiedCheckpoint).(*ethpb.Checkpoint) + s.finalizedCheckpt = proto.Clone(finalizedCheckpoint).(*ethpb.Checkpoint) + s.prevFinalizedCheckpt = proto.Clone(finalizedCheckpoint).(*ethpb.Checkpoint) + + if err := s.resumeForkChoice(ctx, justifiedCheckpoint, finalizedCheckpoint); err != nil { + log.Fatalf("Could not resume fork choice: %v", err) + } } + s.stateNotifier.StateFeed().Send(&feed.Event{ Type: statefeed.Initialized, Data: &statefeed.InitializedData{ @@ -359,6 +377,11 @@ func (s *Service) saveGenesisData(ctx context.Context, genesisState *pb.BeaconSt // Add the genesis block to the fork choice store. if featureconfig.Get().ProtoArrayForkChoice { + s.justifiedCheckpt = proto.Clone(genesisCheckpoint).(*ethpb.Checkpoint) + s.bestJustifiedCheckpt = proto.Clone(genesisCheckpoint).(*ethpb.Checkpoint) + s.finalizedCheckpt = proto.Clone(genesisCheckpoint).(*ethpb.Checkpoint) + s.prevFinalizedCheckpt = proto.Clone(genesisCheckpoint).(*ethpb.Checkpoint) + if err := s.forkChoiceStore.ProcessBlock(ctx, genesisBlk.Block.Slot, genesisBlkRoot, diff --git a/endtoend/demo_e2e_test.go b/endtoend/demo_e2e_test.go index 94eb710159..6e8ba0045c 100644 --- a/endtoend/demo_e2e_test.go +++ b/endtoend/demo_e2e_test.go @@ -15,7 +15,7 @@ func TestEndToEnd_DemoConfig(t *testing.T) { params.UseDemoBeaconConfig() demoConfig := &end2EndConfig{ - beaconFlags: featureconfig.E2EBeaconChainFlags, + beaconFlags: featureconfig.E2EBeaconChainFlags, validatorFlags: featureconfig.E2EValidatorFlags, epochsToRun: 5, numBeaconNodes: 2, diff --git a/endtoend/minimal_e2e_test.go b/endtoend/minimal_e2e_test.go index dcf120c8a0..3cd18d0c12 100644 --- a/endtoend/minimal_e2e_test.go +++ b/endtoend/minimal_e2e_test.go @@ -14,7 +14,7 @@ func TestEndToEnd_MinimalConfig(t *testing.T) { params.UseMinimalConfig() minimalConfig := &end2EndConfig{ - beaconFlags: append(featureconfig.E2EBeaconChainFlags, "--minimal-config"), + beaconFlags: append(featureconfig.E2EBeaconChainFlags, "--minimal-config"), validatorFlags: append(featureconfig.E2EValidatorFlags, "--minimal-config"), epochsToRun: 5, numBeaconNodes: 4,