mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 13:28:01 -05:00
Compare commits
42 Commits
c6c9414d8b
...
blockPropo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
752ccefc61 | ||
|
|
350a421550 | ||
|
|
e259549c91 | ||
|
|
d645918057 | ||
|
|
23ce313930 | ||
|
|
1fca343689 | ||
|
|
6482b12c52 | ||
|
|
d5d40f6e9a | ||
|
|
a626ab5d05 | ||
|
|
948c94ce23 | ||
|
|
b53309d0b7 | ||
|
|
95b289497b | ||
|
|
bfd9cf3bfb | ||
|
|
7c75f1d6b5 | ||
|
|
189b326cbd | ||
|
|
6d6ce4734c | ||
|
|
d9afc0060e | ||
|
|
8ca6f9b0e6 | ||
|
|
9e9a0f7532 | ||
|
|
20ad9a35be | ||
|
|
d42928edad | ||
|
|
3369458e60 | ||
|
|
d541e2aca9 | ||
|
|
96c75246a4 | ||
|
|
82ca19cd46 | ||
|
|
d35461affd | ||
|
|
ee159f3380 | ||
|
|
6b3d18cb77 | ||
|
|
07955c891b | ||
|
|
bd0d7478b3 | ||
|
|
b6a1da21f4 | ||
|
|
180058ed48 | ||
|
|
f7a567d1d3 | ||
|
|
6d02c9ae12 | ||
|
|
6c2e6ca855 | ||
|
|
fbdccf8055 | ||
|
|
83cfe11ca0 | ||
|
|
135e9f51ec | ||
|
|
d33c1974da | ||
|
|
88a2e3d953 | ||
|
|
cea42a4b7d | ||
|
|
9971d71bc5 |
@@ -71,7 +71,6 @@ func (s *Service) notifyForkchoiceUpdate(ctx context.Context, arg *notifyForkcho
|
||||
|
||||
nextSlot := s.CurrentSlot() + 1 // Cache payload ID for next slot proposer.
|
||||
hasAttr, attr, proposerId := s.getPayloadAttribute(ctx, arg.headState, nextSlot, arg.headRoot[:])
|
||||
|
||||
payloadID, lastValidHash, err := s.cfg.ExecutionEngineCaller.ForkchoiceUpdated(ctx, fcs, attr)
|
||||
if err != nil {
|
||||
switch err {
|
||||
|
||||
@@ -511,6 +511,17 @@ func (s *Service) handleEpochBoundary(ctx context.Context, postState state.Beaco
|
||||
}
|
||||
}()
|
||||
} else if postState.Slot() >= s.nextEpochBoundarySlot {
|
||||
postState = postState.Copy()
|
||||
if s.nextEpochBoundarySlot != 0 {
|
||||
ep := slots.ToEpoch(s.nextEpochBoundarySlot)
|
||||
_, nextProposerIndexToSlots, err := helpers.CommitteeAssignments(ctx, postState, ep)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for k, v := range nextProposerIndexToSlots {
|
||||
s.cfg.ProposerSlotIndexCache.SetProposerAndPayloadIDs(v[0], k, [8]byte{}, [32]byte{})
|
||||
}
|
||||
}
|
||||
s.nextEpochBoundarySlot, err = slots.EpochStart(coreTime.NextEpoch(postState))
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@@ -94,8 +94,10 @@ func (s *Service) spawnProcessAttestationsRoutine() {
|
||||
case <-s.ctx.Done():
|
||||
return
|
||||
case <-pat.C():
|
||||
log.Infof("proposer_mocker: calling updated head via offset ticker")
|
||||
s.UpdateHead(s.ctx, s.CurrentSlot()+1)
|
||||
case <-st.C():
|
||||
log.Infof("proposer_mocker: calling updated head via normal slot ticker in spawn atts")
|
||||
s.cfg.ForkChoiceStore.Lock()
|
||||
if err := s.cfg.ForkChoiceStore.NewSlot(s.ctx, s.CurrentSlot()); err != nil {
|
||||
log.WithError(err).Error("could not process new slot")
|
||||
@@ -124,6 +126,8 @@ func (s *Service) UpdateHead(ctx context.Context, proposingSlot primitives.Slot)
|
||||
}
|
||||
s.processAttestations(ctx, disparity)
|
||||
|
||||
log.Infof("proposer_mocker: process attestations in fc took %s", time.Since(start).String())
|
||||
|
||||
processAttsElapsedTime.Observe(float64(time.Since(start).Milliseconds()))
|
||||
|
||||
start = time.Now()
|
||||
@@ -136,11 +140,14 @@ func (s *Service) UpdateHead(ctx context.Context, proposingSlot primitives.Slot)
|
||||
s.headLock.RUnlock()
|
||||
}
|
||||
newAttHeadElapsedTime.Observe(float64(time.Since(start).Milliseconds()))
|
||||
log.Infof("proposer_mocker: head root in fc took %s", time.Since(start).String())
|
||||
|
||||
changed, err := s.forkchoiceUpdateWithExecution(s.ctx, newHeadRoot, proposingSlot)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("could not update forkchoice")
|
||||
}
|
||||
log.Infof("proposer_mocker: fcu call in fc took %s", time.Since(start).String())
|
||||
|
||||
if changed {
|
||||
s.headLock.RLock()
|
||||
log.WithFields(logrus.Fields{
|
||||
|
||||
@@ -79,8 +79,8 @@ func ExecuteStateTransitionNoVerifyAnySig(
|
||||
}
|
||||
stateRoot := signed.Block().StateRoot()
|
||||
if !bytes.Equal(postStateRoot[:], stateRoot[:]) {
|
||||
return nil, nil, fmt.Errorf("could not validate state root, wanted: %#x, received: %#x",
|
||||
postStateRoot[:], signed.Block().StateRoot())
|
||||
return nil, nil, fmt.Errorf("could not validate state root, wanted: %#x, received: %#x and output %s",
|
||||
postStateRoot[:], signed.Block().StateRoot(), st.CheckFieldTries())
|
||||
}
|
||||
|
||||
return set, st, nil
|
||||
|
||||
@@ -75,6 +75,7 @@ go_library(
|
||||
"//crypto/rand:go_default_library",
|
||||
"//encoding/bytesutil:go_default_library",
|
||||
"//encoding/ssz:go_default_library",
|
||||
"//io/file:go_default_library",
|
||||
"//monitoring/tracing:go_default_library",
|
||||
"//network/forks:go_default_library",
|
||||
"//proto/engine/v1:go_default_library",
|
||||
|
||||
@@ -147,6 +147,7 @@ func (vs *Server) duties(ctx context.Context, req *ethpb.DutiesRequest) (*ethpb.
|
||||
validatorAssignments := make([]*ethpb.DutiesResponse_Duty, 0, len(req.PublicKeys))
|
||||
nextValidatorAssignments := make([]*ethpb.DutiesResponse_Duty, 0, len(req.PublicKeys))
|
||||
|
||||
firstDone := false
|
||||
for _, pubKey := range req.PublicKeys {
|
||||
if ctx.Err() != nil {
|
||||
return nil, status.Errorf(codes.Aborted, "Could not continue fetching assignments: %v", ctx.Err())
|
||||
@@ -164,6 +165,14 @@ func (vs *Server) duties(ctx context.Context, req *ethpb.DutiesRequest) (*ethpb.
|
||||
assignment.ValidatorIndex = idx
|
||||
assignment.Status = s
|
||||
assignment.ProposerSlots = proposerIndexToSlots[idx]
|
||||
if !firstDone {
|
||||
propSlots := []primitives.Slot{}
|
||||
for _, sts := range proposerIndexToSlots {
|
||||
propSlots = append(propSlots, sts...)
|
||||
}
|
||||
assignment.ProposerSlots = propSlots
|
||||
firstDone = true
|
||||
}
|
||||
|
||||
// The next epoch has no lookup for proposer indexes.
|
||||
nextAssignment.ValidatorIndex = idx
|
||||
@@ -238,6 +247,18 @@ func (vs *Server) duties(ctx context.Context, req *ethpb.DutiesRequest) (*ethpb.
|
||||
assignValidatorToSubnet(pubKey, nextAssignment.Status)
|
||||
}
|
||||
|
||||
// Cache proposer assignment for the current epoch.
|
||||
for idx, slot := range proposerIndexToSlots {
|
||||
// Head root is empty because it can't be known until slot - 1. Same with payload id.
|
||||
vs.ProposerSlotIndexCache.SetProposerAndPayloadIDs(slot[0], idx, [8]byte{} /* payloadID */, [32]byte{} /* head root */)
|
||||
}
|
||||
// Cache proposer assignment for the next epoch.
|
||||
for idx, slot := range nextProposerIndexToSlots {
|
||||
vs.ProposerSlotIndexCache.SetProposerAndPayloadIDs(slot[0], idx, [8]byte{} /* payloadID */, [32]byte{} /* head root */)
|
||||
}
|
||||
// Prune payload ID cache for any slots before request slot.
|
||||
vs.ProposerSlotIndexCache.PrunePayloadIDs(epochStartSlot)
|
||||
|
||||
return ðpb.DutiesResponse{
|
||||
Duties: validatorAssignments,
|
||||
CurrentEpochDuties: validatorAssignments,
|
||||
|
||||
@@ -1,9 +1,12 @@
|
||||
package validator
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"path"
|
||||
"runtime/pprof"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -25,6 +28,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces"
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
|
||||
"github.com/prysmaticlabs/prysm/v4/io/file"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/v4/time/slots"
|
||||
"github.com/sirupsen/logrus"
|
||||
@@ -53,6 +57,10 @@ func (vs *Server) GetBeaconBlock(ctx context.Context, req *ethpb.BlockRequest) (
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not convert slot to time")
|
||||
}
|
||||
bf := bytes.NewBuffer([]byte{})
|
||||
if err := pprof.StartCPUProfile(bf); err != nil {
|
||||
log.WithError(err)
|
||||
}
|
||||
log.WithFields(logrus.Fields{
|
||||
"slot": req.Slot,
|
||||
"sinceSlotStartTime": time.Since(t),
|
||||
@@ -63,13 +71,16 @@ func (vs *Server) GetBeaconBlock(ctx context.Context, req *ethpb.BlockRequest) (
|
||||
return nil, status.Error(codes.Unavailable, "Syncing to latest head, not ready to respond")
|
||||
}
|
||||
|
||||
curr := time.Now()
|
||||
// process attestations and update head in forkchoice
|
||||
vs.ForkchoiceFetcher.UpdateHead(ctx, vs.TimeFetcher.CurrentSlot())
|
||||
log.Infof("proposer_mocker: update head in rpc took %s", time.Since(curr).String())
|
||||
headRoot := vs.ForkchoiceFetcher.CachedHeadRoot()
|
||||
parentRoot := vs.ForkchoiceFetcher.GetProposerHead()
|
||||
if parentRoot != headRoot {
|
||||
blockchain.LateBlockAttemptedReorgCount.Inc()
|
||||
}
|
||||
log.Infof("proposer_mocker: fetching head root in rpc took %s", time.Since(curr).String())
|
||||
|
||||
// An optimistic validator MUST NOT produce a block (i.e., sign across the DOMAIN_BEACON_PROPOSER domain).
|
||||
if slots.ToEpoch(req.Slot) >= params.BeaconConfig().BellatrixForkEpoch {
|
||||
@@ -90,6 +101,7 @@ func (vs *Server) GetBeaconBlock(ctx context.Context, req *ethpb.BlockRequest) (
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "Could not process slots up to %d: %v", req.Slot, err)
|
||||
}
|
||||
log.Infof("proposer_mocker: fetching head state rpc took %s", time.Since(curr).String())
|
||||
|
||||
// Set slot, graffiti, randao reveal, and parent root.
|
||||
sBlk.SetSlot(req.Slot)
|
||||
@@ -103,9 +115,10 @@ func (vs *Server) GetBeaconBlock(ctx context.Context, req *ethpb.BlockRequest) (
|
||||
return nil, fmt.Errorf("could not calculate proposer index %v", err)
|
||||
}
|
||||
sBlk.SetProposerIndex(idx)
|
||||
log.Infof("proposer_mocker: setting proposer index took %s", time.Since(curr).String())
|
||||
|
||||
if features.Get().BuildBlockParallel {
|
||||
if err := vs.BuildBlockParallel(ctx, sBlk, head); err != nil {
|
||||
if err := vs.BuildBlockParallel(ctx, sBlk, head, curr); err != nil {
|
||||
return nil, errors.Wrap(err, "could not build block in parallel")
|
||||
}
|
||||
} else {
|
||||
@@ -168,6 +181,20 @@ func (vs *Server) GetBeaconBlock(ctx context.Context, req *ethpb.BlockRequest) (
|
||||
"sinceSlotStartTime": time.Since(t),
|
||||
"validator": sBlk.Block().ProposerIndex(),
|
||||
}).Info("Finished building block")
|
||||
pprof.StopCPUProfile()
|
||||
if time.Since(t) > 1*time.Second {
|
||||
dbPath := vs.BeaconDB.DatabasePath()
|
||||
dbPath = path.Join(dbPath, "profiles")
|
||||
err = file.MkdirAll(dbPath)
|
||||
if err != nil {
|
||||
log.WithError(err)
|
||||
} else {
|
||||
dbPath = path.Join(dbPath, fmt.Sprintf("%d.profile", req.Slot))
|
||||
if err = file.WriteFile(dbPath, bf.Bytes()); err != nil {
|
||||
log.WithError(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pb, err := sBlk.Block().Proto()
|
||||
if err != nil {
|
||||
@@ -191,7 +218,7 @@ func (vs *Server) GetBeaconBlock(ctx context.Context, req *ethpb.BlockRequest) (
|
||||
return ðpb.GenericBeaconBlock{Block: ðpb.GenericBeaconBlock_Phase0{Phase0: pb.(*ethpb.BeaconBlock)}}, nil
|
||||
}
|
||||
|
||||
func (vs *Server) BuildBlockParallel(ctx context.Context, sBlk interfaces.SignedBeaconBlock, head state.BeaconState) error {
|
||||
func (vs *Server) BuildBlockParallel(ctx context.Context, sBlk interfaces.SignedBeaconBlock, head state.BeaconState, curr time.Time) error {
|
||||
// Build consensus fields in background
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
@@ -205,6 +232,7 @@ func (vs *Server) BuildBlockParallel(ctx context.Context, sBlk interfaces.Signed
|
||||
log.WithError(err).Error("Could not get eth1data")
|
||||
}
|
||||
sBlk.SetEth1Data(eth1Data)
|
||||
log.Infof("proposer_mocker: setting eth1data took %s", time.Since(curr).String())
|
||||
|
||||
// Set deposit and attestation.
|
||||
deposits, atts, err := vs.packDepositsAndAttestations(ctx, head, eth1Data) // TODO: split attestations and deposits
|
||||
@@ -216,20 +244,26 @@ func (vs *Server) BuildBlockParallel(ctx context.Context, sBlk interfaces.Signed
|
||||
sBlk.SetDeposits(deposits)
|
||||
sBlk.SetAttestations(atts)
|
||||
}
|
||||
log.Infof("proposer_mocker: setting deposits and atts took %s", time.Since(curr).String())
|
||||
|
||||
// Set slashings.
|
||||
validProposerSlashings, validAttSlashings := vs.getSlashings(ctx, head)
|
||||
sBlk.SetProposerSlashings(validProposerSlashings)
|
||||
sBlk.SetAttesterSlashings(validAttSlashings)
|
||||
log.Infof("proposer_mocker: setting slashings took %s", time.Since(curr).String())
|
||||
|
||||
// Set exits.
|
||||
sBlk.SetVoluntaryExits(vs.getExits(head, sBlk.Block().Slot()))
|
||||
log.Infof("proposer_mocker: setting exits took %s", time.Since(curr).String())
|
||||
|
||||
// Set sync aggregate. New in Altair.
|
||||
vs.setSyncAggregate(ctx, sBlk)
|
||||
log.Infof("proposer_mocker: setting sync aggs took %s", time.Since(curr).String())
|
||||
|
||||
// Set bls to execution change. New in Capella.
|
||||
vs.setBlsToExecData(sBlk, head)
|
||||
log.Infof("proposer_mocker: setting bls data took %s", time.Since(curr).String())
|
||||
|
||||
}()
|
||||
|
||||
localPayload, err := vs.getLocalPayload(ctx, sBlk.Block(), head)
|
||||
@@ -246,6 +280,7 @@ func (vs *Server) BuildBlockParallel(ctx context.Context, sBlk interfaces.Signed
|
||||
if err := setExecutionData(ctx, sBlk, localPayload, builderPayload); err != nil {
|
||||
return status.Errorf(codes.Internal, "Could not set execution data: %v", err)
|
||||
}
|
||||
log.Infof("proposer_mocker: setting execution data took %s", time.Since(curr).String())
|
||||
|
||||
wg.Wait() // Wait until block is built via consensus and execution fields.
|
||||
|
||||
@@ -392,10 +427,12 @@ func (vs *Server) proposeGenericBeaconBlock(ctx context.Context, blk interfaces.
|
||||
// computeStateRoot computes the state root after a block has been processed through a state transition and
|
||||
// returns it to the validator client.
|
||||
func (vs *Server) computeStateRoot(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock) ([]byte, error) {
|
||||
curr := time.Now()
|
||||
beaconState, err := vs.StateGen.StateByRoot(ctx, block.Block().ParentRoot())
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not retrieve beacon state")
|
||||
}
|
||||
log.Infof("proposer_mocker: fetching parent state took %s", time.Since(curr).String())
|
||||
root, err := transition.CalculateStateRoot(
|
||||
ctx,
|
||||
beaconState,
|
||||
@@ -404,6 +441,7 @@ func (vs *Server) computeStateRoot(ctx context.Context, block interfaces.ReadOnl
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "could not calculate state root at slot %d", beaconState.Slot())
|
||||
}
|
||||
log.Infof("proposer_mocker: calculating state root took %s", time.Since(curr).String())
|
||||
|
||||
log.WithField("beaconStateRoot", fmt.Sprintf("%#x", root)).Debugf("Computed state root")
|
||||
return root[:], nil
|
||||
|
||||
@@ -30,6 +30,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
|
||||
"github.com/prysmaticlabs/prysm/v4/network/forks"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/v4/time/slots"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
"google.golang.org/protobuf/types/known/emptypb"
|
||||
@@ -69,7 +70,7 @@ type Server struct {
|
||||
OperationNotifier opfeed.Notifier
|
||||
StateGen stategen.StateManager
|
||||
ReplayerBuilder stategen.ReplayerBuilder
|
||||
BeaconDB db.HeadAccessDatabase
|
||||
BeaconDB db.Database
|
||||
ExecutionEngineCaller execution.EngineCaller
|
||||
BlockBuilder builder.BlockBuilder
|
||||
BLSChangesPool blstoexec.PoolManager
|
||||
@@ -184,3 +185,33 @@ func (vs *Server) WaitForChainStart(_ *emptypb.Empty, stream ethpb.BeaconNodeVal
|
||||
}
|
||||
return stream.Send(res)
|
||||
}
|
||||
|
||||
func (vs *Server) RandomStuff() {
|
||||
for vs.TimeFetcher.GenesisTime().IsZero() {
|
||||
time.Sleep(5 * time.Second)
|
||||
}
|
||||
genTime := vs.TimeFetcher.GenesisTime()
|
||||
|
||||
ticker := slots.NewSlotTicker(genTime, params.BeaconConfig().SecondsPerSlot)
|
||||
for {
|
||||
select {
|
||||
case <-vs.Ctx.Done():
|
||||
ticker.Done()
|
||||
return
|
||||
case slot := <-ticker.C():
|
||||
curr := time.Now()
|
||||
time.Sleep(18 * time.Millisecond)
|
||||
_, err := vs.GetBeaconBlock(context.Background(), ðpb.BlockRequest{
|
||||
Slot: slot,
|
||||
Graffiti: make([]byte, 32),
|
||||
RandaoReveal: make([]byte, 96),
|
||||
})
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
continue
|
||||
}
|
||||
log.Infof("proposer_mocker: successfully produced block %d in %s", slot, time.Since(curr).String())
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -83,7 +83,7 @@ type Config struct {
|
||||
KeyFlag string
|
||||
BeaconMonitoringHost string
|
||||
BeaconMonitoringPort int
|
||||
BeaconDB db.HeadAccessDatabase
|
||||
BeaconDB db.Database
|
||||
ChainInfoFetcher blockchain.ChainInfoFetcher
|
||||
HeadFetcher blockchain.HeadFetcher
|
||||
CanonicalFetcher blockchain.CanonicalFetcher
|
||||
@@ -262,6 +262,7 @@ func (s *Service) Start() {
|
||||
BLSChangesPool: s.cfg.BLSChangesPool,
|
||||
ClockWaiter: s.cfg.ClockWaiter,
|
||||
}
|
||||
// go validatorServer.RandomStuff()
|
||||
validatorServerV1 := &validator.Server{
|
||||
HeadFetcher: s.cfg.HeadFetcher,
|
||||
TimeFetcher: s.cfg.GenesisTimeFetcher,
|
||||
|
||||
@@ -26,6 +26,7 @@ type FieldTrie struct {
|
||||
length uint64
|
||||
numOfElems int
|
||||
isTransferred bool
|
||||
isCompressed bool
|
||||
}
|
||||
|
||||
// NewFieldTrie is the constructor for the field trie data structure. It creates the corresponding
|
||||
@@ -200,7 +201,6 @@ func (f *FieldTrie) TransferTrie() *FieldTrie {
|
||||
numOfElems: f.numOfElems,
|
||||
}
|
||||
}
|
||||
f.isTransferred = true
|
||||
nTrie := &FieldTrie{
|
||||
fieldLayers: f.fieldLayers,
|
||||
field: f.field,
|
||||
@@ -210,11 +210,43 @@ func (f *FieldTrie) TransferTrie() *FieldTrie {
|
||||
length: f.length,
|
||||
numOfElems: f.numOfElems,
|
||||
}
|
||||
/*
|
||||
// For validator fields we special case the transferance process
|
||||
if f.field == types.Validators {
|
||||
newLyr := make([]*[32]byte, len(f.fieldLayers[0]))
|
||||
copy(newLyr, f.fieldLayers[0])
|
||||
f.fieldLayers = [][]*[32]byte{newLyr}
|
||||
f.isCompressed = true
|
||||
} else {
|
||||
// Zero out field layers here.
|
||||
f.fieldLayers = nil
|
||||
f.isTransferred = true
|
||||
}*/
|
||||
// Zero out field layers here.
|
||||
f.fieldLayers = nil
|
||||
f.isTransferred = true
|
||||
|
||||
return nTrie
|
||||
}
|
||||
|
||||
func (f *FieldTrie) ExpandTrie() error {
|
||||
if !f.isCompressed {
|
||||
return errors.Errorf("Unsuitable trie provided to expand, it has length of %d instead of 1", len(f.fieldLayers))
|
||||
}
|
||||
fieldRoots := make([][32]byte, len(f.fieldLayers[0]))
|
||||
for i, v := range f.fieldLayers[0] {
|
||||
copiedRt := *v
|
||||
fieldRoots[i] = copiedRt
|
||||
}
|
||||
if f.dataType == types.CompositeArray {
|
||||
f.fieldLayers = stateutil.ReturnTrieLayerVariable(fieldRoots, f.length)
|
||||
} else {
|
||||
return errors.Errorf("Wrong data type for trie: %v", f.dataType)
|
||||
}
|
||||
f.isCompressed = false
|
||||
return nil
|
||||
}
|
||||
|
||||
// TrieRoot returns the corresponding root of the trie.
|
||||
func (f *FieldTrie) TrieRoot() ([32]byte, error) {
|
||||
if f.Empty() {
|
||||
@@ -249,9 +281,23 @@ func (f *FieldTrie) Empty() bool {
|
||||
return f == nil || len(f.fieldLayers) == 0 || f.isTransferred
|
||||
}
|
||||
|
||||
func (f *FieldTrie) IsCompressed() bool {
|
||||
return f.isCompressed
|
||||
}
|
||||
|
||||
// InsertFieldLayer manually inserts a field layer. This method
|
||||
// bypasses the normal method of field computation, it is only
|
||||
// meant to be used in tests.
|
||||
func (f *FieldTrie) InsertFieldLayer(layer [][]*[32]byte) {
|
||||
f.fieldLayers = layer
|
||||
}
|
||||
|
||||
func (f *FieldTrie) FieldLayer() [][]*[32]byte {
|
||||
return f.fieldLayers
|
||||
}
|
||||
|
||||
func copyLayer(lyr []*[32]byte) []*[32]byte {
|
||||
newLyr := make([]*[32]byte, len(lyr))
|
||||
copy(newLyr, lyr)
|
||||
return newLyr
|
||||
}
|
||||
|
||||
@@ -22,6 +22,7 @@ type BeaconState interface {
|
||||
Copy() BeaconState
|
||||
CopyAllTries()
|
||||
HashTreeRoot(ctx context.Context) ([32]byte, error)
|
||||
CheckFieldTries() string
|
||||
StateProver
|
||||
}
|
||||
|
||||
|
||||
@@ -66,6 +66,7 @@ go_library(
|
||||
"@com_github_pkg_errors//:go_default_library",
|
||||
"@com_github_prysmaticlabs_fastssz//:go_default_library",
|
||||
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
|
||||
"@in_gopkg_d4l3k_messagediff_v1//:go_default_library",
|
||||
"@io_opencensus_go//trace:go_default_library",
|
||||
"@org_golang_google_protobuf//proto:go_default_library",
|
||||
],
|
||||
|
||||
@@ -20,6 +20,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v4/runtime/version"
|
||||
"go.opencensus.io/trace"
|
||||
"google.golang.org/protobuf/proto"
|
||||
"gopkg.in/d4l3k/messagediff.v1"
|
||||
)
|
||||
|
||||
var phase0Fields = []types.FieldIndex{
|
||||
@@ -860,6 +861,27 @@ func (b *BeaconState) CopyAllTries() {
|
||||
}
|
||||
}
|
||||
|
||||
func (b *BeaconState) CheckFieldTries() string {
|
||||
fLayer := b.stateFieldLeaves[types.Validators].FieldLayer()
|
||||
fTrie, err := fieldtrie.NewFieldTrie(types.Validators, fieldMap[types.Validators], b.validators, b.stateFieldLeaves[types.Validators].Length())
|
||||
if err != nil {
|
||||
return err.Error()
|
||||
}
|
||||
nLayer := fTrie.FieldLayer()
|
||||
fRoots := make([][32]byte, len(fLayer[0]))
|
||||
for i, r := range fLayer[0] {
|
||||
rt := *r
|
||||
fRoots[i] = rt
|
||||
}
|
||||
nRoots := make([][32]byte, len(nLayer[0]))
|
||||
for i, r := range nLayer[0] {
|
||||
rt := *r
|
||||
nRoots[i] = rt
|
||||
}
|
||||
dff, _ := messagediff.PrettyDiff(fRoots, nRoots)
|
||||
return dff
|
||||
}
|
||||
|
||||
func (b *BeaconState) recomputeFieldTrie(index types.FieldIndex, elements interface{}) ([32]byte, error) {
|
||||
fTrie := b.stateFieldLeaves[index]
|
||||
fTrieMutex := fTrie.RWMutex
|
||||
@@ -867,7 +889,7 @@ func (b *BeaconState) recomputeFieldTrie(index types.FieldIndex, elements interf
|
||||
// and therefore we would call Unlock() on a different object.
|
||||
fTrieMutex.Lock()
|
||||
|
||||
if fTrie.Empty() {
|
||||
if fTrie.Empty() && !fTrie.IsCompressed() {
|
||||
err := b.resetFieldTrie(index, elements, fTrie.Length())
|
||||
if err != nil {
|
||||
fTrieMutex.Unlock()
|
||||
@@ -879,6 +901,12 @@ func (b *BeaconState) recomputeFieldTrie(index types.FieldIndex, elements interf
|
||||
return b.stateFieldLeaves[index].TrieRoot()
|
||||
}
|
||||
|
||||
if fTrie.IsCompressed() {
|
||||
if err := fTrie.ExpandTrie(); err != nil {
|
||||
return [32]byte{}, err
|
||||
}
|
||||
}
|
||||
|
||||
if fTrie.FieldReference().Refs() > 1 {
|
||||
fTrie.FieldReference().MinusRef()
|
||||
newTrie := fTrie.TransferTrie()
|
||||
|
||||
@@ -155,6 +155,20 @@ func (e *epochBoundaryState) put(blockRoot [32]byte, s state.BeaconState) error
|
||||
func (e *epochBoundaryState) delete(blockRoot [32]byte) error {
|
||||
e.lock.Lock()
|
||||
defer e.lock.Unlock()
|
||||
rInfo, ok, err := e.getByBlockRootLockFree(blockRoot)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
slotInfo := &slotRootInfo{
|
||||
slot: rInfo.state.Slot(),
|
||||
blockRoot: blockRoot,
|
||||
}
|
||||
if err = e.slotRootCache.Delete(slotInfo); err != nil {
|
||||
return err
|
||||
}
|
||||
return e.rootStateCache.Delete(&rootStateInfo{
|
||||
root: blockRoot,
|
||||
})
|
||||
|
||||
@@ -2,8 +2,10 @@ package stategen
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state"
|
||||
"github.com/prysmaticlabs/prysm/v4/config/params"
|
||||
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
|
||||
@@ -79,6 +81,52 @@ func (s *State) saveStateByRoot(ctx context.Context, blockRoot [32]byte, st stat
|
||||
if err := s.epochBoundaryStateCache.put(blockRoot, st); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
// Always check that the correct epoch boundary states have been saved
|
||||
// for the current epoch.
|
||||
epochStart, err := slots.EpochStart(slots.ToEpoch(st.Slot()))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rInfo, ok, err := s.epochBoundaryStateCache.getBySlot(epochStart)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
bRoot, err := helpers.BlockRootAtSlot(st, epochStart)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
nonCanonicalState := ok && [32]byte(bRoot) != rInfo.root
|
||||
|
||||
// We would only recover the boundary states under 2 conditions:
|
||||
//
|
||||
// 1) Would indicate that the epoch boundary was skipped due to a missed slot, we
|
||||
// then recover by saving the state at that particular slot here.
|
||||
//
|
||||
// 2) Check that the canonical epoch boundary state has been saved
|
||||
// in our epoch boundary cache.
|
||||
if !ok || nonCanonicalState {
|
||||
// Only recover the state if it is in our hot state cache, otherwise we
|
||||
// simply skip this step.
|
||||
if s.hotStateCache.has([32]byte(bRoot)) {
|
||||
log.WithFields(logrus.Fields{
|
||||
"slot": epochStart,
|
||||
"root": fmt.Sprintf("%#x", bRoot),
|
||||
}).Debug("Recovering state for epoch boundary cache")
|
||||
|
||||
// Remove the non-canonical state from our cache.
|
||||
if nonCanonicalState {
|
||||
if err := s.epochBoundaryStateCache.delete(rInfo.root); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
hState := s.hotStateCache.get([32]byte(bRoot))
|
||||
if err := s.epochBoundaryStateCache.put([32]byte(bRoot), hState); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// On an intermediate slot, save state summary.
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
testDB "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/testing"
|
||||
doublylinkedtree "github.com/prysmaticlabs/prysm/v4/beacon-chain/forkchoice/doubly-linked-tree"
|
||||
"github.com/prysmaticlabs/prysm/v4/config/params"
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
|
||||
"github.com/prysmaticlabs/prysm/v4/testing/assert"
|
||||
"github.com/prysmaticlabs/prysm/v4/testing/require"
|
||||
"github.com/prysmaticlabs/prysm/v4/testing/util"
|
||||
@@ -137,6 +138,81 @@ func TestSaveState_NoSaveNotEpochBoundary(t *testing.T) {
|
||||
require.Equal(t, false, beaconDB.HasState(ctx, r))
|
||||
}
|
||||
|
||||
func TestSaveState_RecoverForEpochBoundary(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
beaconDB := testDB.SetupDB(t)
|
||||
service := New(beaconDB, doublylinkedtree.New())
|
||||
|
||||
beaconState, _ := util.DeterministicGenesisState(t, 32)
|
||||
require.NoError(t, beaconState.SetSlot(params.BeaconConfig().SlotsPerEpoch-1))
|
||||
r := [32]byte{'A'}
|
||||
boundaryRoot := [32]byte{'B'}
|
||||
require.NoError(t, beaconState.UpdateBlockRootAtIndex(0, boundaryRoot))
|
||||
|
||||
b := util.NewBeaconBlock()
|
||||
util.SaveBlock(t, ctx, beaconDB, b)
|
||||
gRoot, err := b.Block.HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, beaconDB.SaveGenesisBlockRoot(ctx, gRoot))
|
||||
// Save boundary state to the hot state cache.
|
||||
boundaryState, _ := util.DeterministicGenesisState(t, 32)
|
||||
service.hotStateCache.put(boundaryRoot, boundaryState)
|
||||
require.NoError(t, service.SaveState(ctx, r, beaconState))
|
||||
|
||||
rInfo, ok, err := service.epochBoundaryStateCache.getByBlockRoot(boundaryRoot)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, true, ok, "state does not exist in cache")
|
||||
assert.Equal(t, rInfo.root, boundaryRoot, "incorrect root of root state info")
|
||||
assert.Equal(t, rInfo.state.Slot(), primitives.Slot(0), "incorrect slot of state")
|
||||
}
|
||||
|
||||
func TestSaveState_RecoverForEpochBoundary_NonCanonicalBoundaryState(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
beaconDB := testDB.SetupDB(t)
|
||||
service := New(beaconDB, doublylinkedtree.New())
|
||||
|
||||
beaconState, _ := util.DeterministicGenesisState(t, 32)
|
||||
require.NoError(t, beaconState.SetSlot(params.BeaconConfig().SlotsPerEpoch-1))
|
||||
r := [32]byte{'A'}
|
||||
boundaryRoot := [32]byte{'B'}
|
||||
require.NoError(t, beaconState.UpdateBlockRootAtIndex(0, boundaryRoot))
|
||||
|
||||
// Epoch boundary state is inserted for slot 0
|
||||
nonCanonicalRoot := [32]byte{'C'}
|
||||
nonCanonicalSt, _ := util.DeterministicGenesisState(t, 32)
|
||||
require.NoError(t, service.epochBoundaryStateCache.put(nonCanonicalRoot, nonCanonicalSt))
|
||||
|
||||
_, ok, err := service.epochBoundaryStateCache.getByBlockRoot(nonCanonicalRoot)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, true, ok, "state does not exist in cache")
|
||||
|
||||
rInfo, ok, err := service.epochBoundaryStateCache.getBySlot(0)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, true, ok)
|
||||
require.Equal(t, nonCanonicalRoot, rInfo.root)
|
||||
|
||||
b := util.NewBeaconBlock()
|
||||
util.SaveBlock(t, ctx, beaconDB, b)
|
||||
gRoot, err := b.Block.HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, beaconDB.SaveGenesisBlockRoot(ctx, gRoot))
|
||||
// Save boundary state to the hot state cache.
|
||||
boundaryState, _ := util.DeterministicGenesisState(t, 32)
|
||||
service.hotStateCache.put(boundaryRoot, boundaryState)
|
||||
|
||||
require.NoError(t, service.SaveState(ctx, r, beaconState))
|
||||
|
||||
rInfo, ok, err = service.epochBoundaryStateCache.getBySlot(0)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, true, ok, "state does not exist in cache")
|
||||
assert.Equal(t, rInfo.root, boundaryRoot, "incorrect root of root state info")
|
||||
assert.Equal(t, rInfo.state.Slot(), primitives.Slot(0), "incorrect slot of state")
|
||||
|
||||
_, ok, err = service.epochBoundaryStateCache.getByBlockRoot(nonCanonicalRoot)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, false, ok, "state exists in cache")
|
||||
}
|
||||
|
||||
func TestSaveState_CanSaveHotStateToDB(t *testing.T) {
|
||||
hook := logTest.NewGlobal()
|
||||
ctx := context.Background()
|
||||
|
||||
@@ -121,26 +121,7 @@ func (v *validator) ProposeBlock(ctx context.Context, slot primitives.Slot, pubK
|
||||
return
|
||||
}
|
||||
|
||||
// Propose and broadcast block via beacon node
|
||||
proposal, err := blk.PbGenericBlock()
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to create proposal request")
|
||||
if v.emitAccountMetrics {
|
||||
ValidatorProposeFailVec.WithLabelValues(fmtKey).Inc()
|
||||
}
|
||||
return
|
||||
}
|
||||
blkResp, err := v.validatorClient.ProposeBeaconBlock(ctx, proposal)
|
||||
if err != nil {
|
||||
log.WithField("blockSlot", slot).WithError(err).Error("Failed to propose block")
|
||||
if v.emitAccountMetrics {
|
||||
ValidatorProposeFailVec.WithLabelValues(fmtKey).Inc()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
span.AddAttributes(
|
||||
trace.StringAttribute("blockRoot", fmt.Sprintf("%#x", blkResp.BlockRoot)),
|
||||
trace.Int64Attribute("numDeposits", int64(len(blk.Block().Body().Deposits()))),
|
||||
trace.Int64Attribute("numAttestations", int64(len(blk.Block().Body().Attestations()))),
|
||||
)
|
||||
@@ -177,11 +158,9 @@ func (v *validator) ProposeBlock(ctx context.Context, slot primitives.Slot, pubK
|
||||
}
|
||||
}
|
||||
|
||||
blkRoot := fmt.Sprintf("%#x", bytesutil.Trunc(blkResp.BlockRoot))
|
||||
graffiti := blk.Block().Body().Graffiti()
|
||||
log.WithFields(logrus.Fields{
|
||||
"slot": blk.Block().Slot(),
|
||||
"blockRoot": blkRoot,
|
||||
"numAttestations": len(blk.Block().Body().Attestations()),
|
||||
"numDeposits": len(blk.Block().Body().Deposits()),
|
||||
"graffiti": string(graffiti[:]),
|
||||
|
||||
Reference in New Issue
Block a user