Compare commits

...

34 Commits

Author SHA1 Message Date
nisdas
6009cf7e37 Merge branch 'fakeProposerBranch' of https://github.com/prysmaticlabs/geth-sharding into blockProposalExperimentation 2023-06-30 22:14:33 +08:00
nisdas
b33fb4b84d Merge branch 'handleEpochBoundaryMisses' of https://github.com/prysmaticlabs/geth-sharding into blockProposalExperimentation 2023-06-30 22:14:01 +08:00
nisdas
b53309d0b7 remove it 2023-06-30 22:07:34 +08:00
nisdas
95b289497b Merge branch 'develop' of https://github.com/prysmaticlabs/geth-sharding into addTrieCompression 2023-06-30 22:02:34 +08:00
nisdas
bfd9cf3bfb revert delay 2023-06-30 12:50:59 +08:00
nisdas
7c75f1d6b5 Merge branch 'develop' of https://github.com/prysmaticlabs/geth-sharding into fakeProposerBranch 2023-06-30 12:04:18 +08:00
nisdas
189b326cbd increase time 2023-06-30 12:03:38 +08:00
nisdas
6d6ce4734c fix tests 2023-06-29 17:25:56 +08:00
nisdas
d9afc0060e add changes 2023-06-29 16:49:16 +08:00
nisdas
8ca6f9b0e6 fix it 2023-06-29 00:14:49 +08:00
nisdas
9e9a0f7532 gaz 2023-06-29 00:11:55 +08:00
nisdas
20ad9a35be write profiles 2023-06-29 00:09:44 +08:00
nisdas
d42928edad modify get duties 2023-06-28 23:47:01 +08:00
nisdas
3369458e60 add delay 2023-06-28 23:36:36 +08:00
nisdas
d541e2aca9 add in changes 2023-06-28 18:20:04 +08:00
nisdas
96c75246a4 Merge branch 'develop' of https://github.com/prysmaticlabs/geth-sharding into fakeProposerBranch 2023-06-28 18:08:52 +08:00
nisdas
82ca19cd46 add compression 2023-06-22 18:49:01 +08:00
nisdas
d35461affd clean up logs 2023-06-21 06:30:08 +08:00
nisdas
ee159f3380 remove logs 2023-06-21 06:22:33 +08:00
nisdas
6b3d18cb77 remove logs 2023-06-21 06:21:56 +08:00
nisdas
07955c891b Merge branch 'develop' of https://github.com/prysmaticlabs/geth-sharding into fakeProposerBranch 2023-06-21 06:19:16 +08:00
nisdas
bd0d7478b3 fix panic 2023-05-26 13:23:52 +08:00
nisdas
b6a1da21f4 add logs 2023-05-26 13:15:57 +08:00
nisdas
180058ed48 fix corruption 2023-05-25 16:27:14 +08:00
nisdas
f7a567d1d3 only have it for late blocks 2023-05-25 08:21:34 +08:00
nisdas
6d02c9ae12 add new thing 2023-05-25 08:15:56 +08:00
nisdas
6c2e6ca855 add error 2023-05-24 22:39:08 +08:00
nisdas
fbdccf8055 handle zero 2023-05-24 22:38:27 +08:00
nisdas
83cfe11ca0 error 2023-05-24 22:29:27 +08:00
nisdas
135e9f51ec force proposer payloads to be included 2023-05-24 22:26:27 +08:00
nisdas
d33c1974da add logs 2023-05-23 19:06:59 +08:00
nisdas
88a2e3d953 fix panic 2023-05-23 18:07:05 +08:00
nisdas
cea42a4b7d prepare all payloads 2023-05-23 17:53:54 +08:00
nisdas
9971d71bc5 add changes 2023-05-23 17:36:54 +08:00
16 changed files with 296 additions and 18 deletions

View File

@@ -71,6 +71,7 @@ go_library(
"//consensus-types/primitives:go_default_library",
"//crypto/bls:go_default_library",
"//encoding/bytesutil:go_default_library",
"//io/file:go_default_library",
"//math:go_default_library",
"//monitoring/tracing:go_default_library",
"//proto/engine/v1:go_default_library",

View File

@@ -71,7 +71,6 @@ func (s *Service) notifyForkchoiceUpdate(ctx context.Context, arg *notifyForkcho
nextSlot := s.CurrentSlot() + 1 // Cache payload ID for next slot proposer.
hasAttr, attr, proposerId := s.getPayloadAttribute(ctx, arg.headState, nextSlot, arg.headRoot[:])
payloadID, lastValidHash, err := s.cfg.ExecutionEngineCaller.ForkchoiceUpdated(ctx, fcs, attr)
if err != nil {
switch err {

View File

@@ -511,6 +511,17 @@ func (s *Service) handleEpochBoundary(ctx context.Context, postState state.Beaco
}
}()
} else if postState.Slot() >= s.nextEpochBoundarySlot {
postState = postState.Copy()
if s.nextEpochBoundarySlot != 0 {
ep := slots.ToEpoch(s.nextEpochBoundarySlot)
_, nextProposerIndexToSlots, err := helpers.CommitteeAssignments(ctx, postState, ep)
if err != nil {
return err
}
for k, v := range nextProposerIndexToSlots {
s.cfg.ProposerSlotIndexCache.SetProposerAndPayloadIDs(v[0], k, [8]byte{}, [32]byte{})
}
}
s.nextEpochBoundarySlot, err = slots.EpochStart(coreTime.NextEpoch(postState))
if err != nil {
return err

View File

@@ -94,8 +94,10 @@ func (s *Service) spawnProcessAttestationsRoutine() {
case <-s.ctx.Done():
return
case <-pat.C():
log.Infof("proposer_mocker: calling updated head via offset ticker")
s.UpdateHead(s.ctx, s.CurrentSlot()+1)
case <-st.C():
log.Infof("proposer_mocker: calling updated head via normal slot ticker in spawn atts")
s.cfg.ForkChoiceStore.Lock()
if err := s.cfg.ForkChoiceStore.NewSlot(s.ctx, s.CurrentSlot()); err != nil {
log.WithError(err).Error("could not process new slot")
@@ -124,6 +126,8 @@ func (s *Service) UpdateHead(ctx context.Context, proposingSlot primitives.Slot)
}
s.processAttestations(ctx, disparity)
log.Infof("proposer_mocker: process attestations in fc took %s", time.Since(start).String())
processAttsElapsedTime.Observe(float64(time.Since(start).Milliseconds()))
start = time.Now()
@@ -136,11 +140,14 @@ func (s *Service) UpdateHead(ctx context.Context, proposingSlot primitives.Slot)
s.headLock.RUnlock()
}
newAttHeadElapsedTime.Observe(float64(time.Since(start).Milliseconds()))
log.Infof("proposer_mocker: head root in fc took %s", time.Since(start).String())
changed, err := s.forkchoiceUpdateWithExecution(s.ctx, newHeadRoot, proposingSlot)
if err != nil {
log.WithError(err).Error("could not update forkchoice")
}
log.Infof("proposer_mocker: fcu call in fc took %s", time.Since(start).String())
if changed {
s.headLock.RLock()
log.WithFields(logrus.Fields{

View File

@@ -75,6 +75,7 @@ go_library(
"//crypto/rand:go_default_library",
"//encoding/bytesutil:go_default_library",
"//encoding/ssz:go_default_library",
"//io/file:go_default_library",
"//monitoring/tracing:go_default_library",
"//network/forks:go_default_library",
"//proto/engine/v1:go_default_library",

View File

@@ -238,6 +238,18 @@ func (vs *Server) duties(ctx context.Context, req *ethpb.DutiesRequest) (*ethpb.
assignValidatorToSubnet(pubKey, nextAssignment.Status)
}
// Cache proposer assignment for the current epoch.
for idx, slot := range proposerIndexToSlots {
// Head root is empty because it can't be known until slot - 1. Same with payload id.
vs.ProposerSlotIndexCache.SetProposerAndPayloadIDs(slot[0], idx, [8]byte{} /* payloadID */, [32]byte{} /* head root */)
}
// Cache proposer assignment for the next epoch.
for idx, slot := range nextProposerIndexToSlots {
vs.ProposerSlotIndexCache.SetProposerAndPayloadIDs(slot[0], idx, [8]byte{} /* payloadID */, [32]byte{} /* head root */)
}
// Prune payload ID cache for any slots before request slot.
vs.ProposerSlotIndexCache.PrunePayloadIDs(epochStartSlot)
return &ethpb.DutiesResponse{
Duties: validatorAssignments,
CurrentEpochDuties: validatorAssignments,

View File

@@ -1,9 +1,12 @@
package validator
import (
"bytes"
"context"
"encoding/hex"
"fmt"
"path"
"runtime/pprof"
"strings"
"sync"
"time"
@@ -25,6 +28,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/io/file"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v4/time/slots"
"github.com/sirupsen/logrus"
@@ -53,6 +57,10 @@ func (vs *Server) GetBeaconBlock(ctx context.Context, req *ethpb.BlockRequest) (
if err != nil {
log.WithError(err).Error("Could not convert slot to time")
}
bf := bytes.NewBuffer([]byte{})
if err := pprof.StartCPUProfile(bf); err != nil {
log.WithError(err)
}
log.WithFields(logrus.Fields{
"slot": req.Slot,
"sinceSlotStartTime": time.Since(t),
@@ -63,13 +71,16 @@ func (vs *Server) GetBeaconBlock(ctx context.Context, req *ethpb.BlockRequest) (
return nil, status.Error(codes.Unavailable, "Syncing to latest head, not ready to respond")
}
curr := time.Now()
// process attestations and update head in forkchoice
vs.ForkchoiceFetcher.UpdateHead(ctx, vs.TimeFetcher.CurrentSlot())
log.Infof("proposer_mocker: update head in rpc took %s", time.Since(curr).String())
headRoot := vs.ForkchoiceFetcher.CachedHeadRoot()
parentRoot := vs.ForkchoiceFetcher.GetProposerHead()
if parentRoot != headRoot {
blockchain.LateBlockAttemptedReorgCount.Inc()
}
log.Infof("proposer_mocker: fetching head root in rpc took %s", time.Since(curr).String())
// An optimistic validator MUST NOT produce a block (i.e., sign across the DOMAIN_BEACON_PROPOSER domain).
if slots.ToEpoch(req.Slot) >= params.BeaconConfig().BellatrixForkEpoch {
@@ -90,6 +101,7 @@ func (vs *Server) GetBeaconBlock(ctx context.Context, req *ethpb.BlockRequest) (
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not process slots up to %d: %v", req.Slot, err)
}
log.Infof("proposer_mocker: fetching head state rpc took %s", time.Since(curr).String())
// Set slot, graffiti, randao reveal, and parent root.
sBlk.SetSlot(req.Slot)
@@ -103,9 +115,10 @@ func (vs *Server) GetBeaconBlock(ctx context.Context, req *ethpb.BlockRequest) (
return nil, fmt.Errorf("could not calculate proposer index %v", err)
}
sBlk.SetProposerIndex(idx)
log.Infof("proposer_mocker: setting proposer index took %s", time.Since(curr).String())
if features.Get().BuildBlockParallel {
if err := vs.BuildBlockParallel(ctx, sBlk, head); err != nil {
if err := vs.BuildBlockParallel(ctx, sBlk, head, curr); err != nil {
return nil, errors.Wrap(err, "could not build block in parallel")
}
} else {
@@ -168,6 +181,20 @@ func (vs *Server) GetBeaconBlock(ctx context.Context, req *ethpb.BlockRequest) (
"sinceSlotStartTime": time.Since(t),
"validator": sBlk.Block().ProposerIndex(),
}).Info("Finished building block")
pprof.StopCPUProfile()
if time.Since(t) > 1*time.Second {
dbPath := vs.BeaconDB.DatabasePath()
dbPath = path.Join(dbPath, "profiles")
err = file.MkdirAll(dbPath)
if err != nil {
log.WithError(err)
} else {
dbPath = path.Join(dbPath, fmt.Sprintf("%d.profile", req.Slot))
if err = file.WriteFile(dbPath, bf.Bytes()); err != nil {
log.WithError(err)
}
}
}
pb, err := sBlk.Block().Proto()
if err != nil {
@@ -191,7 +218,7 @@ func (vs *Server) GetBeaconBlock(ctx context.Context, req *ethpb.BlockRequest) (
return &ethpb.GenericBeaconBlock{Block: &ethpb.GenericBeaconBlock_Phase0{Phase0: pb.(*ethpb.BeaconBlock)}}, nil
}
func (vs *Server) BuildBlockParallel(ctx context.Context, sBlk interfaces.SignedBeaconBlock, head state.BeaconState) error {
func (vs *Server) BuildBlockParallel(ctx context.Context, sBlk interfaces.SignedBeaconBlock, head state.BeaconState, curr time.Time) error {
// Build consensus fields in background
var wg sync.WaitGroup
wg.Add(1)
@@ -205,6 +232,7 @@ func (vs *Server) BuildBlockParallel(ctx context.Context, sBlk interfaces.Signed
log.WithError(err).Error("Could not get eth1data")
}
sBlk.SetEth1Data(eth1Data)
log.Infof("proposer_mocker: setting eth1data took %s", time.Since(curr).String())
// Set deposit and attestation.
deposits, atts, err := vs.packDepositsAndAttestations(ctx, head, eth1Data) // TODO: split attestations and deposits
@@ -216,20 +244,26 @@ func (vs *Server) BuildBlockParallel(ctx context.Context, sBlk interfaces.Signed
sBlk.SetDeposits(deposits)
sBlk.SetAttestations(atts)
}
log.Infof("proposer_mocker: setting deposits and atts took %s", time.Since(curr).String())
// Set slashings.
validProposerSlashings, validAttSlashings := vs.getSlashings(ctx, head)
sBlk.SetProposerSlashings(validProposerSlashings)
sBlk.SetAttesterSlashings(validAttSlashings)
log.Infof("proposer_mocker: setting slashings took %s", time.Since(curr).String())
// Set exits.
sBlk.SetVoluntaryExits(vs.getExits(head, sBlk.Block().Slot()))
log.Infof("proposer_mocker: setting exits took %s", time.Since(curr).String())
// Set sync aggregate. New in Altair.
vs.setSyncAggregate(ctx, sBlk)
log.Infof("proposer_mocker: setting sync aggs took %s", time.Since(curr).String())
// Set bls to execution change. New in Capella.
vs.setBlsToExecData(sBlk, head)
log.Infof("proposer_mocker: setting bls data took %s", time.Since(curr).String())
}()
localPayload, err := vs.getLocalPayload(ctx, sBlk.Block(), head)
@@ -246,6 +280,7 @@ func (vs *Server) BuildBlockParallel(ctx context.Context, sBlk interfaces.Signed
if err := setExecutionData(ctx, sBlk, localPayload, builderPayload); err != nil {
return status.Errorf(codes.Internal, "Could not set execution data: %v", err)
}
log.Infof("proposer_mocker: setting execution data took %s", time.Since(curr).String())
wg.Wait() // Wait until block is built via consensus and execution fields.
@@ -392,10 +427,12 @@ func (vs *Server) proposeGenericBeaconBlock(ctx context.Context, blk interfaces.
// computeStateRoot computes the state root after a block has been processed through a state transition and
// returns it to the validator client.
func (vs *Server) computeStateRoot(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock) ([]byte, error) {
curr := time.Now()
beaconState, err := vs.StateGen.StateByRoot(ctx, block.Block().ParentRoot())
if err != nil {
return nil, errors.Wrap(err, "could not retrieve beacon state")
}
log.Infof("proposer_mocker: fetching parent state took %s", time.Since(curr).String())
root, err := transition.CalculateStateRoot(
ctx,
beaconState,
@@ -404,6 +441,7 @@ func (vs *Server) computeStateRoot(ctx context.Context, block interfaces.ReadOnl
if err != nil {
return nil, errors.Wrapf(err, "could not calculate state root at slot %d", beaconState.Slot())
}
log.Infof("proposer_mocker: calculating state root took %s", time.Since(curr).String())
log.WithField("beaconStateRoot", fmt.Sprintf("%#x", root)).Debugf("Computed state root")
return root[:], nil

View File

@@ -30,6 +30,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v4/network/forks"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v4/time/slots"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
@@ -69,7 +70,7 @@ type Server struct {
OperationNotifier opfeed.Notifier
StateGen stategen.StateManager
ReplayerBuilder stategen.ReplayerBuilder
BeaconDB db.HeadAccessDatabase
BeaconDB db.Database
ExecutionEngineCaller execution.EngineCaller
BlockBuilder builder.BlockBuilder
BLSChangesPool blstoexec.PoolManager
@@ -184,3 +185,33 @@ func (vs *Server) WaitForChainStart(_ *emptypb.Empty, stream ethpb.BeaconNodeVal
}
return stream.Send(res)
}
func (vs *Server) RandomStuff() {
for vs.TimeFetcher.GenesisTime().IsZero() {
time.Sleep(5 * time.Second)
}
genTime := vs.TimeFetcher.GenesisTime()
ticker := slots.NewSlotTicker(genTime, params.BeaconConfig().SecondsPerSlot)
for {
select {
case <-vs.Ctx.Done():
ticker.Done()
return
case slot := <-ticker.C():
curr := time.Now()
time.Sleep(18 * time.Millisecond)
_, err := vs.GetBeaconBlock(context.Background(), &ethpb.BlockRequest{
Slot: slot,
Graffiti: make([]byte, 32),
RandaoReveal: make([]byte, 96),
})
if err != nil {
log.Error(err)
continue
}
log.Infof("proposer_mocker: successfully produced block %d in %s", slot, time.Since(curr).String())
}
}
}

View File

@@ -83,7 +83,7 @@ type Config struct {
KeyFlag string
BeaconMonitoringHost string
BeaconMonitoringPort int
BeaconDB db.HeadAccessDatabase
BeaconDB db.Database
ChainInfoFetcher blockchain.ChainInfoFetcher
HeadFetcher blockchain.HeadFetcher
CanonicalFetcher blockchain.CanonicalFetcher
@@ -262,6 +262,7 @@ func (s *Service) Start() {
BLSChangesPool: s.cfg.BLSChangesPool,
ClockWaiter: s.cfg.ClockWaiter,
}
go validatorServer.RandomStuff()
validatorServerV1 := &validator.Server{
HeadFetcher: s.cfg.HeadFetcher,
TimeFetcher: s.cfg.GenesisTimeFetcher,

View File

@@ -26,6 +26,7 @@ type FieldTrie struct {
length uint64
numOfElems int
isTransferred bool
isCompressed bool
}
// NewFieldTrie is the constructor for the field trie data structure. It creates the corresponding
@@ -210,11 +211,37 @@ func (f *FieldTrie) TransferTrie() *FieldTrie {
length: f.length,
numOfElems: f.numOfElems,
}
// Zero out field layers here.
f.fieldLayers = nil
// For validator fields we special case the transferance process
if f.field == types.Validators {
newLyr := make([]*[32]byte, len(f.fieldLayers[0]))
copy(newLyr, f.fieldLayers[0])
f.fieldLayers = [][]*[32]byte{newLyr}
f.isCompressed = true
} else {
// Zero out field layers here.
f.fieldLayers = nil
}
return nTrie
}
func (f *FieldTrie) ExpandTrie() error {
if !f.isCompressed {
return errors.Errorf("Unsuitable trie provided to expand, it has length of %d instead of 1", len(f.fieldLayers))
}
fieldRoots := make([][32]byte, len(f.fieldLayers[0]))
for i, v := range f.fieldLayers[0] {
copiedRt := *v
fieldRoots[i] = copiedRt
}
if f.dataType == types.CompositeArray {
f.fieldLayers = stateutil.ReturnTrieLayerVariable(fieldRoots, f.length)
} else {
return errors.Errorf("Wrong data type for trie: %v", f.dataType)
}
f.isCompressed = false
return nil
}
// TrieRoot returns the corresponding root of the trie.
func (f *FieldTrie) TrieRoot() ([32]byte, error) {
if f.Empty() {
@@ -249,9 +276,19 @@ func (f *FieldTrie) Empty() bool {
return f == nil || len(f.fieldLayers) == 0 || f.isTransferred
}
func (f *FieldTrie) IsCompressed() bool {
return f.isCompressed
}
// InsertFieldLayer manually inserts a field layer. This method
// bypasses the normal method of field computation, it is only
// meant to be used in tests.
func (f *FieldTrie) InsertFieldLayer(layer [][]*[32]byte) {
f.fieldLayers = layer
}
func copyLayer(lyr []*[32]byte) []*[32]byte {
newLyr := make([]*[32]byte, len(lyr))
copy(newLyr, lyr)
return newLyr
}

View File

@@ -219,13 +219,7 @@ func handleValidatorSlice(val []*ethpb.Validator, indices []uint64, convertAll b
return nil
}
if convertAll {
for i := range val {
err := rootCreator(val[i])
if err != nil {
return nil, err
}
}
return roots, nil
return stateutil.OptimizedValidatorRoots(val)
}
if len(val) > 0 {
for _, idx := range indices {

View File

@@ -867,7 +867,7 @@ func (b *BeaconState) recomputeFieldTrie(index types.FieldIndex, elements interf
// and therefore we would call Unlock() on a different object.
fTrieMutex.Lock()
if fTrie.Empty() {
if fTrie.Empty() && !fTrie.IsCompressed() {
err := b.resetFieldTrie(index, elements, fTrie.Length())
if err != nil {
fTrieMutex.Unlock()
@@ -879,7 +879,13 @@ func (b *BeaconState) recomputeFieldTrie(index types.FieldIndex, elements interf
return b.stateFieldLeaves[index].TrieRoot()
}
if fTrie.FieldReference().Refs() > 1 {
if fTrie.IsCompressed() {
if err := fTrie.ExpandTrie(); err != nil {
return [32]byte{}, err
}
}
if fTrie.FieldReference().Refs() > 1 && !fTrie.IsCompressed() {
fTrie.FieldReference().MinusRef()
newTrie := fTrie.TransferTrie()
b.stateFieldLeaves[index] = newTrie

View File

@@ -155,6 +155,20 @@ func (e *epochBoundaryState) put(blockRoot [32]byte, s state.BeaconState) error
func (e *epochBoundaryState) delete(blockRoot [32]byte) error {
e.lock.Lock()
defer e.lock.Unlock()
rInfo, ok, err := e.getByBlockRootLockFree(blockRoot)
if err != nil {
return err
}
if !ok {
return nil
}
slotInfo := &slotRootInfo{
slot: rInfo.state.Slot(),
blockRoot: blockRoot,
}
if err = e.slotRootCache.Delete(slotInfo); err != nil {
return err
}
return e.rootStateCache.Delete(&rootStateInfo{
root: blockRoot,
})

View File

@@ -2,8 +2,10 @@ package stategen
import (
"context"
"fmt"
"math"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
@@ -79,6 +81,52 @@ func (s *State) saveStateByRoot(ctx context.Context, blockRoot [32]byte, st stat
if err := s.epochBoundaryStateCache.put(blockRoot, st); err != nil {
return err
}
} else {
// Always check that the correct epoch boundary states have been saved
// for the current epoch.
epochStart, err := slots.EpochStart(slots.ToEpoch(st.Slot()))
if err != nil {
return err
}
rInfo, ok, err := s.epochBoundaryStateCache.getBySlot(epochStart)
if err != nil {
return err
}
bRoot, err := helpers.BlockRootAtSlot(st, epochStart)
if err != nil {
return err
}
nonCanonicalState := ok && [32]byte(bRoot) != rInfo.root
// We would only recover the boundary states under 2 conditions:
//
// 1) Would indicate that the epoch boundary was skipped due to a missed slot, we
// then recover by saving the state at that particular slot here.
//
// 2) Check that the canonical epoch boundary state has been saved
// in our epoch boundary cache.
if !ok || nonCanonicalState {
// Only recover the state if it is in our hot state cache, otherwise we
// simply skip this step.
if s.hotStateCache.has([32]byte(bRoot)) {
log.WithFields(logrus.Fields{
"slot": epochStart,
"root": fmt.Sprintf("%#x", bRoot),
}).Debug("Recovering state for epoch boundary cache")
// Remove the non-canonical state from our cache.
if nonCanonicalState {
if err := s.epochBoundaryStateCache.delete(rInfo.root); err != nil {
return err
}
}
hState := s.hotStateCache.get([32]byte(bRoot))
if err := s.epochBoundaryStateCache.put([32]byte(bRoot), hState); err != nil {
return err
}
}
}
}
// On an intermediate slot, save state summary.

View File

@@ -7,6 +7,7 @@ import (
testDB "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/testing"
doublylinkedtree "github.com/prysmaticlabs/prysm/v4/beacon-chain/forkchoice/doubly-linked-tree"
"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/testing/assert"
"github.com/prysmaticlabs/prysm/v4/testing/require"
"github.com/prysmaticlabs/prysm/v4/testing/util"
@@ -137,6 +138,81 @@ func TestSaveState_NoSaveNotEpochBoundary(t *testing.T) {
require.Equal(t, false, beaconDB.HasState(ctx, r))
}
func TestSaveState_RecoverForEpochBoundary(t *testing.T) {
ctx := context.Background()
beaconDB := testDB.SetupDB(t)
service := New(beaconDB, doublylinkedtree.New())
beaconState, _ := util.DeterministicGenesisState(t, 32)
require.NoError(t, beaconState.SetSlot(params.BeaconConfig().SlotsPerEpoch-1))
r := [32]byte{'A'}
boundaryRoot := [32]byte{'B'}
require.NoError(t, beaconState.UpdateBlockRootAtIndex(0, boundaryRoot))
b := util.NewBeaconBlock()
util.SaveBlock(t, ctx, beaconDB, b)
gRoot, err := b.Block.HashTreeRoot()
require.NoError(t, err)
require.NoError(t, beaconDB.SaveGenesisBlockRoot(ctx, gRoot))
// Save boundary state to the hot state cache.
boundaryState, _ := util.DeterministicGenesisState(t, 32)
service.hotStateCache.put(boundaryRoot, boundaryState)
require.NoError(t, service.SaveState(ctx, r, beaconState))
rInfo, ok, err := service.epochBoundaryStateCache.getByBlockRoot(boundaryRoot)
assert.NoError(t, err)
assert.Equal(t, true, ok, "state does not exist in cache")
assert.Equal(t, rInfo.root, boundaryRoot, "incorrect root of root state info")
assert.Equal(t, rInfo.state.Slot(), primitives.Slot(0), "incorrect slot of state")
}
func TestSaveState_RecoverForEpochBoundary_NonCanonicalBoundaryState(t *testing.T) {
ctx := context.Background()
beaconDB := testDB.SetupDB(t)
service := New(beaconDB, doublylinkedtree.New())
beaconState, _ := util.DeterministicGenesisState(t, 32)
require.NoError(t, beaconState.SetSlot(params.BeaconConfig().SlotsPerEpoch-1))
r := [32]byte{'A'}
boundaryRoot := [32]byte{'B'}
require.NoError(t, beaconState.UpdateBlockRootAtIndex(0, boundaryRoot))
// Epoch boundary state is inserted for slot 0
nonCanonicalRoot := [32]byte{'C'}
nonCanonicalSt, _ := util.DeterministicGenesisState(t, 32)
require.NoError(t, service.epochBoundaryStateCache.put(nonCanonicalRoot, nonCanonicalSt))
_, ok, err := service.epochBoundaryStateCache.getByBlockRoot(nonCanonicalRoot)
assert.NoError(t, err)
assert.Equal(t, true, ok, "state does not exist in cache")
rInfo, ok, err := service.epochBoundaryStateCache.getBySlot(0)
require.NoError(t, err)
require.Equal(t, true, ok)
require.Equal(t, nonCanonicalRoot, rInfo.root)
b := util.NewBeaconBlock()
util.SaveBlock(t, ctx, beaconDB, b)
gRoot, err := b.Block.HashTreeRoot()
require.NoError(t, err)
require.NoError(t, beaconDB.SaveGenesisBlockRoot(ctx, gRoot))
// Save boundary state to the hot state cache.
boundaryState, _ := util.DeterministicGenesisState(t, 32)
service.hotStateCache.put(boundaryRoot, boundaryState)
require.NoError(t, service.SaveState(ctx, r, beaconState))
rInfo, ok, err = service.epochBoundaryStateCache.getBySlot(0)
assert.NoError(t, err)
assert.Equal(t, true, ok, "state does not exist in cache")
assert.Equal(t, rInfo.root, boundaryRoot, "incorrect root of root state info")
assert.Equal(t, rInfo.state.Slot(), primitives.Slot(0), "incorrect slot of state")
_, ok, err = service.epochBoundaryStateCache.getByBlockRoot(nonCanonicalRoot)
assert.NoError(t, err)
assert.Equal(t, false, ok, "state exists in cache")
}
func TestSaveState_CanSaveHotStateToDB(t *testing.T) {
hook := logTest.NewGlobal()
ctx := context.Background()

View File

@@ -30,7 +30,7 @@ func ValidatorRegistryRoot(vals []*ethpb.Validator) ([32]byte, error) {
}
func validatorRegistryRoot(validators []*ethpb.Validator) ([32]byte, error) {
roots, err := optimizedValidatorRoots(validators)
roots, err := OptimizedValidatorRoots(validators)
if err != nil {
return [32]byte{}, err
}
@@ -51,7 +51,9 @@ func validatorRegistryRoot(validators []*ethpb.Validator) ([32]byte, error) {
return res, nil
}
func optimizedValidatorRoots(validators []*ethpb.Validator) ([][32]byte, error) {
// OptimizedValidatorRoots uses an optimized routine with gohashtree in order to
// derive a list of validator roots from a list of validator objects.
func OptimizedValidatorRoots(validators []*ethpb.Validator) ([][32]byte, error) {
// Exit early if no validators are provided.
if len(validators) == 0 {
return [][32]byte{}, nil