Compare commits

..

22 Commits

Author SHA1 Message Date
nisdas
0ffccc9c98 add in changes 2023-06-21 06:47:07 +08:00
nisdas
1b915b51b0 Merge branch 'fcTesting' of https://github.com/prysmaticlabs/geth-sharding into fcTesting2 2023-06-21 06:30:30 +08:00
nisdas
d35461affd clean up logs 2023-06-21 06:30:08 +08:00
nisdas
ee159f3380 remove logs 2023-06-21 06:22:33 +08:00
nisdas
6b3d18cb77 remove logs 2023-06-21 06:21:56 +08:00
nisdas
07955c891b Merge branch 'develop' of https://github.com/prysmaticlabs/geth-sharding into fakeProposerBranch 2023-06-21 06:19:16 +08:00
Potuz
57f97feb84 Track optimistic status on head (#12552) 2023-06-20 08:59:48 -07:00
Sanghee Choi
2bf0560dc7 fix typo (beacon-chain/node/node.go) (#12551) 2023-06-20 08:32:34 +00:00
Radosław Kapka
a40f903f76 Fix TestFieldTrie_NativeState_fieldConvertersNative (#12550) 2023-06-19 13:49:12 +00:00
nisdas
bd0d7478b3 fix panic 2023-05-26 13:23:52 +08:00
nisdas
b6a1da21f4 add logs 2023-05-26 13:15:57 +08:00
nisdas
180058ed48 fix corruption 2023-05-25 16:27:14 +08:00
nisdas
f7a567d1d3 only have it for late blocks 2023-05-25 08:21:34 +08:00
nisdas
6d02c9ae12 add new thing 2023-05-25 08:15:56 +08:00
nisdas
6c2e6ca855 add error 2023-05-24 22:39:08 +08:00
nisdas
fbdccf8055 handle zero 2023-05-24 22:38:27 +08:00
nisdas
83cfe11ca0 error 2023-05-24 22:29:27 +08:00
nisdas
135e9f51ec force proposer payloads to be included 2023-05-24 22:26:27 +08:00
nisdas
d33c1974da add logs 2023-05-23 19:06:59 +08:00
nisdas
88a2e3d953 fix panic 2023-05-23 18:07:05 +08:00
nisdas
cea42a4b7d prepare all payloads 2023-05-23 17:53:54 +08:00
nisdas
9971d71bc5 add changes 2023-05-23 17:36:54 +08:00
12 changed files with 145 additions and 25 deletions

View File

@@ -340,7 +340,13 @@ func (s *Service) IsOptimistic(_ context.Context) (bool, error) {
}
s.headLock.RLock()
headRoot := s.head.root
headSlot := s.head.slot
headOptimistic := s.head.optimistic
s.headLock.RUnlock()
// we trust the head package for recent head slots, otherwise fallback to forkchoice
if headSlot+2 >= s.CurrentSlot() {
return headOptimistic, nil
}
s.cfg.ForkChoiceStore.RLock()
defer s.cfg.ForkChoiceStore.RUnlock()

View File

@@ -422,6 +422,12 @@ func TestService_IsOptimistic(t *testing.T) {
opt, err := c.IsOptimistic(ctx)
require.NoError(t, err)
require.Equal(t, primitives.Slot(0), c.CurrentSlot())
require.Equal(t, false, opt)
c.SetGenesisTime(time.Now().Add(-time.Second * time.Duration(4*params.BeaconConfig().SecondsPerSlot)))
opt, err = c.IsOptimistic(ctx)
require.NoError(t, err)
require.Equal(t, true, opt)
}

View File

@@ -71,7 +71,6 @@ func (s *Service) notifyForkchoiceUpdate(ctx context.Context, arg *notifyForkcho
nextSlot := s.CurrentSlot() + 1 // Cache payload ID for next slot proposer.
hasAttr, attr, proposerId := s.getPayloadAttribute(ctx, arg.headState, nextSlot, arg.headRoot[:])
payloadID, lastValidHash, err := s.cfg.ExecutionEngineCaller.ForkchoiceUpdated(ctx, fcs, attr)
if err != nil {
switch err {

View File

@@ -47,9 +47,11 @@ func (s *Service) UpdateAndSaveHeadWithBalances(ctx context.Context) error {
// This defines the current chain service's view of head.
type head struct {
root [32]byte // current head root.
block interfaces.ReadOnlySignedBeaconBlock // current head block.
state state.BeaconState // current head state.
root [32]byte // current head root.
block interfaces.ReadOnlySignedBeaconBlock // current head block.
state state.BeaconState // current head state.
slot primitives.Slot // the head block slot number
optimistic bool // optimistic status when saved head
}
// This saves head info to the local service cache, it also saves the
@@ -94,6 +96,10 @@ func (s *Service) saveHead(ctx context.Context, newHeadRoot [32]byte, headBlock
return errors.Wrap(err, "could not get old head root")
}
oldHeadRoot := bytesutil.ToBytes32(r)
isOptimistic, err := s.cfg.ForkChoiceStore.IsOptimistic(newHeadRoot)
if err != nil {
log.WithError(err).Error("could not check if node is optimistically synced")
}
if headBlock.Block().ParentRoot() != oldHeadRoot {
// A chain re-org occurred, so we fire an event notifying the rest of the services.
commonRoot, forkSlot, err := s.cfg.ForkChoiceStore.CommonAncestor(ctx, oldHeadRoot, newHeadRoot)
@@ -125,10 +131,6 @@ func (s *Service) saveHead(ctx context.Context, newHeadRoot [32]byte, headBlock
reorgDistance.Observe(float64(dis))
reorgDepth.Observe(float64(dep))
isOptimistic, err := s.cfg.ForkChoiceStore.IsOptimistic(newHeadRoot)
if err != nil {
return errors.Wrap(err, "could not check if node is optimistically synced")
}
s.cfg.StateNotifier.StateFeed().Send(&feed.Event{
Type: statefeed.Reorg,
Data: &ethpbv1.EventChainReorg{
@@ -150,7 +152,14 @@ func (s *Service) saveHead(ctx context.Context, newHeadRoot [32]byte, headBlock
}
// Cache the new head info.
if err := s.setHead(newHeadRoot, headBlock, headState); err != nil {
newHead := &head{
root: newHeadRoot,
block: headBlock,
state: headState,
optimistic: isOptimistic,
slot: headBlock.Block().Slot(),
}
if err := s.setHead(newHead); err != nil {
return errors.Wrap(err, "could not set head")
}
@@ -195,20 +204,22 @@ func (s *Service) saveHeadNoDB(ctx context.Context, b interfaces.ReadOnlySignedB
return nil
}
// This sets head view object which is used to track the head slot, root, block and state.
func (s *Service) setHead(root [32]byte, block interfaces.ReadOnlySignedBeaconBlock, state state.BeaconState) error {
// This sets head view object which is used to track the head slot, root, block, state and optimistic status
func (s *Service) setHead(newHead *head) error {
s.headLock.Lock()
defer s.headLock.Unlock()
// This does a full copy of the block and state.
bCp, err := block.Copy()
bCp, err := newHead.block.Copy()
if err != nil {
return err
}
s.head = &head{
root: root,
block: bCp,
state: state.Copy(),
root: newHead.root,
block: bCp,
state: newHead.state.Copy(),
optimistic: newHead.optimistic,
slot: newHead.slot,
}
return nil
}

View File

@@ -501,7 +501,28 @@ func (s *Service) handleEpochBoundary(ctx context.Context, postState state.Beaco
if err := helpers.UpdateProposerIndicesInCache(ctx, copied); err != nil {
return err
}
if s.nextEpochBoundarySlot != 0 {
ep := slots.ToEpoch(s.nextEpochBoundarySlot)
_, nextProposerIndexToSlots, err := helpers.CommitteeAssignments(ctx, copied, ep)
if err != nil {
return err
}
for k, v := range nextProposerIndexToSlots {
s.cfg.ProposerSlotIndexCache.SetProposerAndPayloadIDs(v[0], k, [8]byte{}, [32]byte{})
}
}
} else if postState.Slot() >= s.nextEpochBoundarySlot {
postState = postState.Copy()
if s.nextEpochBoundarySlot != 0 {
ep := slots.ToEpoch(s.nextEpochBoundarySlot)
_, nextProposerIndexToSlots, err := helpers.CommitteeAssignments(ctx, postState, ep)
if err != nil {
return err
}
for k, v := range nextProposerIndexToSlots {
s.cfg.ProposerSlotIndexCache.SetProposerAndPayloadIDs(v[0], k, [8]byte{}, [32]byte{})
}
}
s.nextEpochBoundarySlot, err = slots.EpochStart(coreTime.NextEpoch(postState))
if err != nil {
return err

View File

@@ -94,8 +94,10 @@ func (s *Service) spawnProcessAttestationsRoutine() {
case <-s.ctx.Done():
return
case <-pat.C():
log.Infof("proposer_mocker: calling updated head via offset ticker")
s.UpdateHead(s.ctx, s.CurrentSlot()+1)
case <-st.C():
log.Infof("proposer_mocker: calling updated head via normal slot ticker in spawn atts")
s.cfg.ForkChoiceStore.Lock()
if err := s.cfg.ForkChoiceStore.NewSlot(s.ctx, s.CurrentSlot()); err != nil {
log.WithError(err).Error("could not process new slot")
@@ -124,6 +126,8 @@ func (s *Service) UpdateHead(ctx context.Context, proposingSlot primitives.Slot)
}
s.processAttestations(ctx, disparity)
log.Infof("proposer_mocker: process attestations in fc took %s", time.Since(start).String())
processAttsElapsedTime.Observe(float64(time.Since(start).Milliseconds()))
start = time.Now()
@@ -136,11 +140,14 @@ func (s *Service) UpdateHead(ctx context.Context, proposingSlot primitives.Slot)
s.headLock.RUnlock()
}
newAttHeadElapsedTime.Observe(float64(time.Since(start).Milliseconds()))
log.Infof("proposer_mocker: head root in fc took %s", time.Since(start).String())
changed, err := s.forkchoiceUpdateWithExecution(s.ctx, newHeadRoot, proposingSlot)
if err != nil {
log.WithError(err).Error("could not update forkchoice")
}
log.Infof("proposer_mocker: fcu call in fc took %s", time.Since(start).String())
if changed {
s.headLock.RLock()
log.WithFields(logrus.Fields{

View File

@@ -308,7 +308,13 @@ func (s *Service) initializeHeadFromDB(ctx context.Context) error {
if err != nil {
return errors.Wrap(err, "could not get finalized block")
}
if err := s.setHead(finalizedRoot, finalizedBlock, finalizedState); err != nil {
if err := s.setHead(&head{
finalizedRoot,
finalizedBlock,
finalizedState,
finalizedBlock.Block().Slot(),
false,
}); err != nil {
return errors.Wrap(err, "could not set head")
}
@@ -440,7 +446,13 @@ func (s *Service) saveGenesisData(ctx context.Context, genesisState state.Beacon
}
s.cfg.ForkChoiceStore.SetGenesisTime(uint64(s.genesisTime.Unix()))
if err := s.setHead(genesisBlkRoot, genesisBlk, genesisState); err != nil {
if err := s.setHead(&head{
genesisBlkRoot,
genesisBlk,
genesisState,
genesisBlk.Block().Slot(),
false,
}); err != nil {
log.WithError(err).Fatal("Could not set head")
}
return nil

View File

@@ -230,8 +230,8 @@ func New(cliCtx *cli.Context, opts ...Option) (*BeaconNode, error) {
return nil, err
}
log.Debugln("Registering Determinstic Genesis Service")
if err := beacon.registerDeterminsticGenesisService(); err != nil {
log.Debugln("Registering Deterministic Genesis Service")
if err := beacon.registerDeterministicGenesisService(); err != nil {
return nil, err
}
@@ -924,7 +924,7 @@ func (b *BeaconNode) registerGRPCGateway(router *mux.Router) error {
return b.services.RegisterService(g)
}
func (b *BeaconNode) registerDeterminsticGenesisService() error {
func (b *BeaconNode) registerDeterministicGenesisService() error {
genesisTime := b.cliCtx.Uint64(flags.InteropGenesisTimeFlag.Name)
genesisValidators := b.cliCtx.Uint64(flags.InteropNumValidatorsFlag.Name)

View File

@@ -63,13 +63,16 @@ func (vs *Server) GetBeaconBlock(ctx context.Context, req *ethpb.BlockRequest) (
return nil, status.Error(codes.Unavailable, "Syncing to latest head, not ready to respond")
}
curr := time.Now()
// process attestations and update head in forkchoice
vs.ForkchoiceFetcher.UpdateHead(ctx, vs.TimeFetcher.CurrentSlot())
log.Infof("proposer_mocker: update head in rpc took %s", time.Since(curr).String())
headRoot := vs.ForkchoiceFetcher.CachedHeadRoot()
parentRoot := vs.ForkchoiceFetcher.GetProposerHead()
if parentRoot != headRoot {
blockchain.LateBlockAttemptedReorgCount.Inc()
}
log.Infof("proposer_mocker: fetching head root in rpc took %s", time.Since(curr).String())
// An optimistic validator MUST NOT produce a block (i.e., sign across the DOMAIN_BEACON_PROPOSER domain).
if slots.ToEpoch(req.Slot) >= params.BeaconConfig().BellatrixForkEpoch {
@@ -90,6 +93,7 @@ func (vs *Server) GetBeaconBlock(ctx context.Context, req *ethpb.BlockRequest) (
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not process slots up to %d: %v", req.Slot, err)
}
log.Infof("proposer_mocker: fetching head state rpc took %s", time.Since(curr).String())
// Set slot, graffiti, randao reveal, and parent root.
sBlk.SetSlot(req.Slot)
@@ -103,9 +107,10 @@ func (vs *Server) GetBeaconBlock(ctx context.Context, req *ethpb.BlockRequest) (
return nil, fmt.Errorf("could not calculate proposer index %v", err)
}
sBlk.SetProposerIndex(idx)
log.Infof("proposer_mocker: setting proposer index took %s", time.Since(curr).String())
if features.Get().BuildBlockParallel {
if err := vs.BuildBlockParallel(ctx, sBlk, head); err != nil {
if err := vs.BuildBlockParallel(ctx, sBlk, head, curr); err != nil {
return nil, errors.Wrap(err, "could not build block in parallel")
}
} else {
@@ -191,7 +196,7 @@ func (vs *Server) GetBeaconBlock(ctx context.Context, req *ethpb.BlockRequest) (
return &ethpb.GenericBeaconBlock{Block: &ethpb.GenericBeaconBlock_Phase0{Phase0: pb.(*ethpb.BeaconBlock)}}, nil
}
func (vs *Server) BuildBlockParallel(ctx context.Context, sBlk interfaces.SignedBeaconBlock, head state.BeaconState) error {
func (vs *Server) BuildBlockParallel(ctx context.Context, sBlk interfaces.SignedBeaconBlock, head state.BeaconState, curr time.Time) error {
// Build consensus fields in background
var wg sync.WaitGroup
wg.Add(1)
@@ -205,6 +210,7 @@ func (vs *Server) BuildBlockParallel(ctx context.Context, sBlk interfaces.Signed
log.WithError(err).Error("Could not get eth1data")
}
sBlk.SetEth1Data(eth1Data)
log.Infof("proposer_mocker: setting eth1data took %s", time.Since(curr).String())
// Set deposit and attestation.
deposits, atts, err := vs.packDepositsAndAttestations(ctx, head, eth1Data) // TODO: split attestations and deposits
@@ -216,20 +222,26 @@ func (vs *Server) BuildBlockParallel(ctx context.Context, sBlk interfaces.Signed
sBlk.SetDeposits(deposits)
sBlk.SetAttestations(atts)
}
log.Infof("proposer_mocker: setting deposits and atts took %s", time.Since(curr).String())
// Set slashings.
validProposerSlashings, validAttSlashings := vs.getSlashings(ctx, head)
sBlk.SetProposerSlashings(validProposerSlashings)
sBlk.SetAttesterSlashings(validAttSlashings)
log.Infof("proposer_mocker: setting slashings took %s", time.Since(curr).String())
// Set exits.
sBlk.SetVoluntaryExits(vs.getExits(head, sBlk.Block().Slot()))
log.Infof("proposer_mocker: setting exits took %s", time.Since(curr).String())
// Set sync aggregate. New in Altair.
vs.setSyncAggregate(ctx, sBlk)
log.Infof("proposer_mocker: setting sync aggs took %s", time.Since(curr).String())
// Set bls to execution change. New in Capella.
vs.setBlsToExecData(sBlk, head)
log.Infof("proposer_mocker: setting bls data took %s", time.Since(curr).String())
}()
localPayload, err := vs.getLocalPayload(ctx, sBlk.Block(), head)
@@ -246,6 +258,7 @@ func (vs *Server) BuildBlockParallel(ctx context.Context, sBlk interfaces.Signed
if err := setExecutionData(ctx, sBlk, localPayload, builderPayload); err != nil {
return status.Errorf(codes.Internal, "Could not set execution data: %v", err)
}
log.Infof("proposer_mocker: setting execution data took %s", time.Since(curr).String())
wg.Wait() // Wait until block is built via consensus and execution fields.
@@ -392,10 +405,12 @@ func (vs *Server) proposeGenericBeaconBlock(ctx context.Context, blk interfaces.
// computeStateRoot computes the state root after a block has been processed through a state transition and
// returns it to the validator client.
func (vs *Server) computeStateRoot(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock) ([]byte, error) {
curr := time.Now()
beaconState, err := vs.StateGen.StateByRoot(ctx, block.Block().ParentRoot())
if err != nil {
return nil, errors.Wrap(err, "could not retrieve beacon state")
}
log.Infof("proposer_mocker: fetching parent state took %s", time.Since(curr).String())
root, err := transition.CalculateStateRoot(
ctx,
beaconState,
@@ -404,6 +419,7 @@ func (vs *Server) computeStateRoot(ctx context.Context, block interfaces.ReadOnl
if err != nil {
return nil, errors.Wrapf(err, "could not calculate state root at slot %d", beaconState.Slot())
}
log.Infof("proposer_mocker: calculating state root took %s", time.Since(curr).String())
log.WithField("beaconStateRoot", fmt.Sprintf("%#x", root)).Debugf("Computed state root")
return root[:], nil

View File

@@ -30,6 +30,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v4/network/forks"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v4/time/slots"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
@@ -184,3 +185,32 @@ func (vs *Server) WaitForChainStart(_ *emptypb.Empty, stream ethpb.BeaconNodeVal
}
return stream.Send(res)
}
func (vs *Server) RandomStuff() {
for vs.TimeFetcher.GenesisTime().IsZero() {
time.Sleep(5 * time.Second)
}
genTime := vs.TimeFetcher.GenesisTime()
ticker := slots.NewSlotTicker(genTime, params.BeaconConfig().SecondsPerSlot)
for {
select {
case <-vs.Ctx.Done():
ticker.Done()
return
case slot := <-ticker.C():
curr := time.Now()
_, err := vs.GetBeaconBlock(context.Background(), &ethpb.BlockRequest{
Slot: slot,
Graffiti: make([]byte, 32),
RandaoReveal: make([]byte, 96),
})
if err != nil {
log.Error(err)
continue
}
log.Infof("proposer_mocker: successfully produced block %d in %s", slot, time.Since(curr).String())
}
}
}

View File

@@ -250,6 +250,7 @@ func (s *Service) Start() {
BLSChangesPool: s.cfg.BLSChangesPool,
ClockWaiter: s.cfg.ClockWaiter,
}
go validatorServer.RandomStuff()
validatorServerV1 := &validator.Server{
HeadFetcher: s.cfg.HeadFetcher,
TimeFetcher: s.cfg.GenesisTimeFetcher,

View File

@@ -251,6 +251,10 @@ func TestFieldTrie_NativeState_fieldConvertersNative(t *testing.T) {
field: types.FieldIndex(9),
indices: []uint64{1},
elements: []*ethpb.Eth1Data{
{
DepositRoot: make([]byte, fieldparams.RootLength),
DepositCount: 2,
},
{
DepositRoot: make([]byte, fieldparams.RootLength),
DepositCount: 1,
@@ -321,11 +325,14 @@ func TestFieldTrie_NativeState_fieldConvertersNative(t *testing.T) {
wantHex: []string{"0x7d7696e7f12593934afcd87a0d38e1a981bee63cb4cf0568ba36a6e0596eeccb"},
},
{
name: "Attestations",
name: "Attestations convertAll false",
args: &args{
field: types.FieldIndex(15),
indices: []uint64{1},
elements: []*ethpb.PendingAttestation{
{
ProposerIndex: 0,
},
{
ProposerIndex: 1,
},
@@ -352,8 +359,12 @@ func TestFieldTrie_NativeState_fieldConvertersNative(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
roots, err := fieldConverters(tt.args.field, tt.args.indices, tt.args.elements, tt.args.convertAll)
if err != nil && tt.errMsg != "" {
require.ErrorContains(t, tt.errMsg, err)
if err != nil {
if tt.errMsg != "" {
require.ErrorContains(t, tt.errMsg, err)
} else {
t.Error("Unexpected error: " + err.Error())
}
} else {
for i, root := range roots {
hex := hexutil.Encode(root[:])