Compare commits

...

14 Commits

Author SHA1 Message Date
nisdas
7cbb59cd3e Enable Batch Publishing 2025-03-14 21:34:15 +08:00
james-prysm
7cef3b0491 adding omitempty to request object (#15031) 2025-03-11 13:43:00 +00:00
Radosław Kapka
15462844f9 Remove error from signatures of UnaggregatedAttestations and pruneAttsFromPool (#15028) 2025-03-10 22:28:51 +00:00
Potuz
863eee7b40 Add feature flag to start from any beacon block in db (#15000)
* Add feature flag to start from any beacon block in db

The new feature flag called --sync-from takes a string that can take
values:

- `head` or
- a 0x-prefixed hex encoded beacon block root.

The beacon block root or the head block root has to be known in db and
has to be a descendant of the current justified checkpoint.

* Fix Bugs In Sync From Head (#15006)

* Fix Bugs

* Remove log

* missing save

* add tests

* Kasey review #1

* Kasey's review #2

* Kasey's review #3

---------

Co-authored-by: Nishant Das <nishdas93@gmail.com>
2025-03-10 15:51:25 +00:00
Radosław Kapka
6d89373583 Handle unaggregated attestations when decomposing (#15027) 2025-03-10 13:48:43 +00:00
kasey
9a421a2feb payload attribute computations in event handler (#14963)
* payload attribute computations in event handler

* Skip executing nil lazyReaders

* adding in clarifying comments, uncommenting needs fill, adding in happy path unit test for code coverage

* gaz

* fixing ineffectual assignment

* nil check the Reader coming from the lazyReader

* Apply suggestions from code review

Radek's PR suggestion to fix error/log capitalization.

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* Radek feedback

* Move fire payload attribute event to after save head

* set mock statenotifier in testServiceOptsWithDB

---------

Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com>
Co-authored-by: james-prysm <james@prysmaticlabs.com>
Co-authored-by: Radosław Kapka <rkapka@wp.pl>
Co-authored-by: terence tsao <terence@prysmaticlabs.com>
2025-03-07 23:25:12 +00:00
james-prysm
4e41d5c610 fixing e2e builder gas limit (#15025)
* fixing e2e

* linting
2025-03-07 19:00:16 +00:00
james-prysm
0b6bea43a8 adding optimistic check to /eth/v1/validator/sync_committee_contribution rest endpoint (#15022) 2025-03-06 22:29:11 +00:00
Radosław Kapka
f89afb0fbd Handle some errors from hasSeenBit and insertSeenBit differently (#15018)
* Ignore errors from `hasSeenBit` and don't pack unaggregated attestations

* changelog <3

* Revert "Ignore errors from `hasSeenBit` and don't pack unaggregated attestations"

This reverts commit dc86cb63204d6ff735d9302142068de95a8188fd.
2025-03-06 22:26:20 +00:00
Potuz
3cd2973c92 Return the genesis block root from last validated checkpoint if zero (#15021)
* Return the genesis block root from last validated checkpoint if zero
When starting a node we load the last validated checkpoint. On tests or a new node this checkpoint can have the zero blockroot (it returns the finalized checkpoint). This PR ensures that it returns the genesis block root instead.

It can't affect runnning code since the root is only used at startup in `setup_forkchoice`. But it may affect tests because now `OptimisticForRoot` will error out if there is no genesis block root set on db.

* Terence review

* fix test
2025-03-06 21:42:11 +00:00
james-prysm
d3e5710a63 updated gas limit (#14858) 2025-03-06 16:37:14 +00:00
Preston Van Loon
f40b4f16c2 Beacon API: Broadcasting BLS to execution changes should not use the request context in a go routine (#15019)
* Broadcasting BLS to execution changes should not use the request context in a go routine

* Changelog fragment
2025-03-06 15:25:48 +00:00
kasey
7fd4f746d6 Clean up block-slot-indices on block deletion (#15011)
* clean up block-slot-indices on block deletion

* also remove parent root index entry

* treat parent root index as packed key (like slot idx)

* fix bug where input slice is modified, with test

---------

Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com>
2025-03-06 14:43:23 +00:00
james-prysm
2362d9f3c2 blob_sidecar_subnet configs missing from /eth/v1/config/spec endpoint (#15016)
* fixing config display

* fixing test
2025-03-06 14:25:53 +00:00
67 changed files with 1004 additions and 239 deletions

View File

@@ -78,8 +78,8 @@ type GetBlockHeaderResponse struct {
}
type GetValidatorsRequest struct {
Ids []string `json:"ids"`
Statuses []string `json:"statuses"`
Ids []string `json:"ids,omitempty"`
Statuses []string `json:"statuses,omitempty"`
}
type GetValidatorsResponse struct {

View File

@@ -128,6 +128,7 @@ go_test(
"receive_block_test.go",
"service_norace_test.go",
"service_test.go",
"setup_forkchoice_test.go",
"setup_test.go",
"weak_subjectivity_checks_test.go",
],

View File

@@ -582,6 +582,7 @@ func TestService_IsOptimisticForRoot_StateSummaryRecovered(t *testing.T) {
br, err := b.Block.HashTreeRoot()
require.NoError(t, err)
util.SaveBlock(t, context.Background(), beaconDB, b)
require.NoError(t, beaconDB.SaveGenesisBlockRoot(ctx, [32]byte{}))
_, err = c.IsOptimisticForRoot(ctx, br)
assert.NoError(t, err)
summ, err := beaconDB.StateSummary(ctx, br)

View File

@@ -72,7 +72,6 @@ func (s *Service) notifyForkchoiceUpdate(ctx context.Context, arg *fcuConfig) (*
if arg.attributes == nil {
arg.attributes = payloadattribute.EmptyWithVersion(headBlk.Version())
}
go firePayloadAttributesEvent(ctx, s.cfg.StateNotifier.StateFeed(), arg)
payloadID, lastValidHash, err := s.cfg.ExecutionEngineCaller.ForkchoiceUpdated(ctx, fcs, arg.attributes)
if err != nil {
switch {
@@ -159,6 +158,7 @@ func (s *Service) notifyForkchoiceUpdate(ctx context.Context, arg *fcuConfig) (*
log.WithFields(logrus.Fields{
"blockRoot": fmt.Sprintf("%#x", bytesutil.Trunc(arg.headRoot[:])),
"headSlot": headBlk.Slot(),
"nextSlot": nextSlot,
"payloadID": fmt.Sprintf("%#x", bytesutil.Trunc(payloadID[:])),
}).Info("Forkchoice updated with payload attributes for proposal")
s.cfg.PayloadIDCache.Set(nextSlot, arg.headRoot, pId)
@@ -166,40 +166,19 @@ func (s *Service) notifyForkchoiceUpdate(ctx context.Context, arg *fcuConfig) (*
log.WithFields(logrus.Fields{
"blockHash": fmt.Sprintf("%#x", headPayload.BlockHash()),
"slot": headBlk.Slot(),
"nextSlot": nextSlot,
}).Error("Received nil payload ID on VALID engine response")
}
return payloadID, nil
}
func firePayloadAttributesEvent(ctx context.Context, f event.SubscriberSender, cfg *fcuConfig) {
pidx, err := helpers.BeaconProposerIndex(ctx, cfg.headState)
if err != nil {
log.WithError(err).
WithField("head_root", cfg.headRoot[:]).
Error("Could not get proposer index for PayloadAttributes event")
return
}
evd := payloadattribute.EventData{
ProposerIndex: pidx,
ProposalSlot: cfg.headState.Slot(),
ParentBlockRoot: cfg.headRoot[:],
Attributer: cfg.attributes,
HeadRoot: cfg.headRoot,
HeadState: cfg.headState,
HeadBlock: cfg.headBlock,
}
if cfg.headBlock != nil && !cfg.headBlock.IsNil() {
headPayload, err := cfg.headBlock.Block().Body().Execution()
if err != nil {
log.WithError(err).Error("Could not get execution payload for head block")
return
}
evd.ParentBlockHash = headPayload.BlockHash()
evd.ParentBlockNumber = headPayload.BlockNumber()
}
func firePayloadAttributesEvent(_ context.Context, f event.SubscriberSender, nextSlot primitives.Slot) {
// the fcu args have differing amounts of completeness based on the code path,
// and there is work we only want to do if a client is actually listening to the events beacon api endpoint.
// temporary solution: just fire a blank event and fill in the details in the api handler.
f.Send(&feed.Event{
Type: statefeed.PayloadAttributes,
Data: evd,
Data: payloadattribute.EventData{ProposalSlot: nextSlot},
})
}

View File

@@ -102,10 +102,10 @@ func (s *Service) forkchoiceUpdateWithExecution(ctx context.Context, args *fcuCo
log.WithError(err).Error("could not save head")
}
go firePayloadAttributesEvent(ctx, s.cfg.StateNotifier.StateFeed(), s.CurrentSlot()+1)
// Only need to prune attestations from pool if the head has changed.
if err := s.pruneAttsFromPool(s.ctx, args.headState, args.headBlock); err != nil {
log.WithError(err).Error("could not prune attestations from pool")
}
s.pruneAttsFromPool(s.ctx, args.headState, args.headBlock)
return nil
}

View File

@@ -3,6 +3,7 @@ package blockchain
import (
"testing"
mock "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/testing"
testDB "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/testing"
doublylinkedtree "github.com/prysmaticlabs/prysm/v5/beacon-chain/forkchoice/doubly-linked-tree"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/startup"
@@ -18,6 +19,7 @@ func testServiceOptsWithDB(t *testing.T) []Option {
WithStateGen(stategen.New(beaconDB, fcs)),
WithForkChoiceStore(fcs),
WithClockSynchronizer(cs),
WithStateNotifier(&mock.MockStateNotifier{RecordEvents: true}),
}
}

View File

@@ -423,13 +423,12 @@ func (s *Service) savePostStateInfo(ctx context.Context, r [32]byte, b interface
// pruneAttsFromPool removes these attestations from the attestation pool
// which are covered by attestations from the received block.
func (s *Service) pruneAttsFromPool(ctx context.Context, headState state.BeaconState, headBlock interfaces.ReadOnlySignedBeaconBlock) error {
func (s *Service) pruneAttsFromPool(ctx context.Context, headState state.BeaconState, headBlock interfaces.ReadOnlySignedBeaconBlock) {
for _, att := range headBlock.Block().Body().Attestations() {
if err := s.pruneCoveredAttsFromPool(ctx, headState, att); err != nil {
log.WithError(err).Warn("Could not prune attestations covered by a received block's attestation")
}
}
return nil
}
func (s *Service) pruneCoveredAttsFromPool(ctx context.Context, headState state.BeaconState, att ethpb.Att) error {
@@ -503,6 +502,10 @@ func (s *Service) pruneCoveredElectraAttsFromPool(ctx context.Context, headState
if err = s.cfg.AttestationCache.DeleteCovered(a); err != nil {
return errors.Wrap(err, "could not delete covered attestation")
}
} else if !a.IsAggregated() {
if err = s.cfg.AttPool.DeleteUnaggregatedAttestation(a); err != nil {
return errors.Wrap(err, "could not delete unaggregated attestation")
}
} else if err = s.cfg.AttPool.DeleteAggregatedAttestation(a); err != nil {
return errors.Wrap(err, "could not delete aggregated attestation")
}
@@ -723,13 +726,9 @@ func (s *Service) lateBlockTasks(ctx context.Context) {
attribute := s.getPayloadAttribute(ctx, headState, s.CurrentSlot()+1, headRoot[:])
// return early if we are not proposing next slot
if attribute.IsEmpty() {
fcuArgs := &fcuConfig{
headState: headState,
headRoot: headRoot,
headBlock: nil,
attributes: attribute,
}
go firePayloadAttributesEvent(ctx, s.cfg.StateNotifier.StateFeed(), fcuArgs)
// notifyForkchoiceUpdate fires the payload attribute event. But in this case, we won't
// call notifyForkchoiceUpdate, so the event is fired here.
go firePayloadAttributesEvent(ctx, s.cfg.StateNotifier.StateFeed(), s.CurrentSlot()+1)
return
}

View File

@@ -50,6 +50,7 @@ import (
func Test_pruneAttsFromPool_Electra(t *testing.T) {
ctx := context.Background()
logHook := logTest.NewGlobal()
params.SetupTestConfigCleanup(t)
cfg := params.BeaconConfig().Copy()
@@ -71,7 +72,7 @@ func Test_pruneAttsFromPool_Electra(t *testing.T) {
cb := primitives.NewAttestationCommitteeBits()
cb.SetBitAt(0, true)
att1 := &ethpb.AttestationElectra{
AggregationBits: bitfield.Bitlist{0b11110111, 0b00000001},
AggregationBits: bitfield.Bitlist{0b10000000, 0b00000001},
Data: data,
Signature: make([]byte, 96),
CommitteeBits: cb,
@@ -95,16 +96,15 @@ func Test_pruneAttsFromPool_Electra(t *testing.T) {
CommitteeBits: cb,
}
require.NoError(t, s.cfg.AttPool.SaveAggregatedAttestation(att1))
require.NoError(t, s.cfg.AttPool.SaveUnaggregatedAttestation(att1))
require.NoError(t, s.cfg.AttPool.SaveAggregatedAttestation(att2))
require.NoError(t, s.cfg.AttPool.SaveAggregatedAttestation(att3))
require.Equal(t, 3, len(s.cfg.AttPool.AggregatedAttestations()))
cb = primitives.NewAttestationCommitteeBits()
cb.SetBitAt(0, true)
cb.SetBitAt(1, true)
onChainAtt := &ethpb.AttestationElectra{
AggregationBits: bitfield.Bitlist{0b11110111, 0b11110111, 0b00000001},
AggregationBits: bitfield.Bitlist{0b10000000, 0b11110111, 0b00000001},
Data: data,
Signature: make([]byte, 96),
CommitteeBits: cb,
@@ -126,8 +126,12 @@ func Test_pruneAttsFromPool_Electra(t *testing.T) {
// into the correct number of aggregates.
require.Equal(t, 4, len(committees))
require.NoError(t, s.pruneAttsFromPool(ctx, st, rob))
attsInPool := s.cfg.AttPool.AggregatedAttestations()
s.pruneAttsFromPool(ctx, st, rob)
require.LogsDoNotContain(t, logHook, "Could not prune attestations")
attsInPool := s.cfg.AttPool.UnaggregatedAttestations()
assert.Equal(t, 0, len(attsInPool))
attsInPool = s.cfg.AttPool.AggregatedAttestations()
require.Equal(t, 1, len(attsInPool))
assert.DeepEqual(t, att3, attsInPool[0])
}
@@ -908,6 +912,8 @@ func TestInsertFinalizedDeposits_MultipleFinalizedRoutines(t *testing.T) {
}
func TestRemoveBlockAttestationsInPool(t *testing.T) {
logHook := logTest.NewGlobal()
genesis, keys := util.DeterministicGenesisState(t, 64)
b, err := util.GenerateFullBlock(genesis, keys, util.DefaultBlockGenConfig(), 1)
assert.NoError(t, err)
@@ -927,7 +933,8 @@ func TestRemoveBlockAttestationsInPool(t *testing.T) {
require.NoError(t, service.cfg.AttPool.SaveAggregatedAttestations(atts))
wsb, err := consensusblocks.NewSignedBeaconBlock(b)
require.NoError(t, err)
require.NoError(t, service.pruneAttsFromPool(context.Background(), nil /* state not needed pre-Electra */, wsb))
service.pruneAttsFromPool(context.Background(), nil /* state not needed pre-Electra */, wsb)
require.LogsDoNotContain(t, logHook, "Could not prune attestations")
require.Equal(t, 0, service.cfg.AttPool.AggregatedAttestationCount())
}
@@ -1983,6 +1990,7 @@ func TestNoViableHead_Reboot(t *testing.T) {
require.NoError(t, service.cfg.BeaconDB.SaveState(ctx, genesisState, genesisRoot), "Could not save genesis state")
require.NoError(t, service.cfg.BeaconDB.SaveHeadBlockRoot(ctx, genesisRoot), "Could not save genesis state")
require.NoError(t, service.cfg.BeaconDB.SaveGenesisBlockRoot(ctx, genesisRoot), "Could not save genesis state")
for i := 1; i < 6; i++ {
driftGenesisTime(service, int64(i), 0)
@@ -2117,6 +2125,7 @@ func TestNoViableHead_Reboot(t *testing.T) {
require.NoError(t, service.cfg.BeaconDB.SaveState(ctx, genesisState, jroot))
service.cfg.ForkChoiceStore.SetBalancesByRooter(service.cfg.StateGen.ActiveNonSlashedBalancesByRoot)
require.NoError(t, service.StartFromSavedState(genesisState))
require.NoError(t, service.cfg.BeaconDB.SaveGenesisBlockRoot(ctx, genesisRoot))
// Forkchoice has the genesisRoot loaded at startup
require.Equal(t, genesisRoot, service.ensureRootNotZeros(service.cfg.ForkChoiceStore.CachedHeadRoot()))
@@ -2126,7 +2135,7 @@ func TestNoViableHead_Reboot(t *testing.T) {
require.Equal(t, genesisRoot, bytesutil.ToBytes32(headRoot))
optimistic, err := service.IsOptimistic(ctx)
require.NoError(t, err)
require.Equal(t, true, optimistic)
require.Equal(t, false, optimistic)
// Check that the node's justified checkpoint does not agree with the
// last valid state's justified checkpoint

View File

@@ -39,6 +39,7 @@ import (
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
prysmTime "github.com/prysmaticlabs/prysm/v5/time"
"github.com/prysmaticlabs/prysm/v5/time/slots"
"github.com/sirupsen/logrus"
)
// Service represents a service that handles the internal
@@ -316,32 +317,36 @@ func (s *Service) originRootFromSavedState(ctx context.Context) ([32]byte, error
return genesisBlkRoot, nil
}
// initializeHeadFromDB uses the finalized checkpoint and head block found in the database to set the current head.
// initializeHeadFromDB uses the finalized checkpoint and head block root from forkchoice to set the current head.
// Note that this may block until stategen replays blocks between the finalized and head blocks
// if the head sync flag was specified and the gap between the finalized and head blocks is at least 128 epochs long.
func (s *Service) initializeHeadFromDB(ctx context.Context, finalizedState state.BeaconState) error {
func (s *Service) initializeHead(ctx context.Context, st state.BeaconState) error {
cp := s.FinalizedCheckpt()
fRoot := [32]byte(cp.Root)
finalizedRoot := s.ensureRootNotZeros(fRoot)
if finalizedState == nil || finalizedState.IsNil() {
fRoot := s.ensureRootNotZeros([32]byte(cp.Root))
if st == nil || st.IsNil() {
return errors.New("finalized state can't be nil")
}
finalizedBlock, err := s.getBlock(ctx, finalizedRoot)
s.cfg.ForkChoiceStore.RLock()
root := s.cfg.ForkChoiceStore.HighestReceivedBlockRoot()
s.cfg.ForkChoiceStore.RUnlock()
blk, err := s.cfg.BeaconDB.Block(ctx, root)
if err != nil {
return errors.Wrap(err, "could not get finalized block")
return errors.Wrap(err, "could not get head block")
}
if err := s.setHead(&head{
finalizedRoot,
finalizedBlock,
finalizedState,
finalizedBlock.Block().Slot(),
false,
}); err != nil {
if root != fRoot {
st, err = s.cfg.StateGen.StateByRoot(ctx, root)
if err != nil {
return errors.Wrap(err, "could not get head state")
}
}
if err := s.setHead(&head{root, blk, st, blk.Block().Slot(), false}); err != nil {
return errors.Wrap(err, "could not set head")
}
log.WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", root),
"slot": blk.Block().Slot(),
}).Info("Initialized head block from DB")
return nil
}

View File

@@ -2,28 +2,119 @@ package blockchain
import (
"bytes"
"context"
"fmt"
"github.com/pkg/errors"
forkchoicetypes "github.com/prysmaticlabs/prysm/v5/beacon-chain/forkchoice/types"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v5/config/features"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces"
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v5/time/slots"
)
func (s *Service) setupForkchoice(st state.BeaconState) error {
if err := s.setupForkchoiceCheckpoints(); err != nil {
return errors.Wrap(err, "could not set up forkchoice checkpoints")
}
if err := s.setupForkchoiceRoot(st); err != nil {
if err := s.setupForkchoiceTree(st); err != nil {
return errors.Wrap(err, "could not set up forkchoice root")
}
if err := s.initializeHeadFromDB(s.ctx, st); err != nil {
if err := s.initializeHead(s.ctx, st); err != nil {
return errors.Wrap(err, "could not initialize head from db")
}
return nil
}
func (s *Service) startupHeadRoot() [32]byte {
headStr := features.Get().ForceHead
cp := s.FinalizedCheckpt()
fRoot := s.ensureRootNotZeros([32]byte(cp.Root))
if headStr == "" {
return fRoot
}
if headStr == "head" {
root, err := s.cfg.BeaconDB.HeadBlockRoot()
if err != nil {
log.WithError(err).Error("could not get head block root, starting with finalized block as head")
return fRoot
}
log.Infof("Using Head root of %#x", root)
return root
}
root, err := bytesutil.DecodeHexWithLength(headStr, 32)
if err != nil {
log.WithError(err).Error("could not parse head root, starting with finalized block as head")
return fRoot
}
return [32]byte(root)
}
func (s *Service) setupForkchoiceTree(st state.BeaconState) error {
headRoot := s.startupHeadRoot()
cp := s.FinalizedCheckpt()
fRoot := s.ensureRootNotZeros([32]byte(cp.Root))
if err := s.setupForkchoiceRoot(st); err != nil {
return errors.Wrap(err, "could not set up forkchoice root")
}
if headRoot == fRoot {
return nil
}
blk, err := s.cfg.BeaconDB.Block(s.ctx, headRoot)
if err != nil {
log.WithError(err).Error("could not get head block, starting with finalized block as head")
return nil
}
if slots.ToEpoch(blk.Block().Slot()) < cp.Epoch {
log.WithField("headRoot", fmt.Sprintf("%#x", headRoot)).Error("head block is older than finalized block, starting with finalized block as head")
return nil
}
chain, err := s.buildForkchoiceChain(s.ctx, blk)
if err != nil {
log.WithError(err).Error("could not build forkchoice chain, starting with finalized block as head")
return nil
}
s.cfg.ForkChoiceStore.Lock()
defer s.cfg.ForkChoiceStore.Unlock()
return s.cfg.ForkChoiceStore.InsertChain(s.ctx, chain)
}
func (s *Service) buildForkchoiceChain(ctx context.Context, head interfaces.ReadOnlySignedBeaconBlock) ([]*forkchoicetypes.BlockAndCheckpoints, error) {
chain := []*forkchoicetypes.BlockAndCheckpoints{}
cp := s.FinalizedCheckpt()
fRoot := s.ensureRootNotZeros([32]byte(cp.Root))
jp := s.CurrentJustifiedCheckpt()
root, err := head.Block().HashTreeRoot()
if err != nil {
return nil, errors.Wrap(err, "could not get head block root")
}
for {
roblock, err := blocks.NewROBlockWithRoot(head, root)
if err != nil {
return nil, err
}
// This chain sets the justified checkpoint for every block, including some that are older than jp.
// This should be however safe for forkchoice at startup. An alternative would be to hook during the
// block processing pipeline when setting the head state, to compute the right states for the justified
// checkpoint.
chain = append(chain, &forkchoicetypes.BlockAndCheckpoints{Block: roblock, JustifiedCheckpoint: jp, FinalizedCheckpoint: cp})
root = head.Block().ParentRoot()
if root == fRoot {
break
}
head, err = s.cfg.BeaconDB.Block(s.ctx, root)
if err != nil {
return nil, errors.Wrap(err, "could not get block")
}
if slots.ToEpoch(head.Block().Slot()) < cp.Epoch {
return nil, errors.New("head block is not a descendant of the finalized checkpoint")
}
}
return chain, nil
}
func (s *Service) setupForkchoiceRoot(st state.BeaconState) error {
cp := s.FinalizedCheckpt()
fRoot := s.ensureRootNotZeros([32]byte(cp.Root))

View File

@@ -0,0 +1,128 @@
package blockchain
import (
"testing"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/blocks"
"github.com/prysmaticlabs/prysm/v5/config/features"
"github.com/prysmaticlabs/prysm/v5/config/params"
consensusblocks "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/testing/require"
"github.com/prysmaticlabs/prysm/v5/testing/util"
logTest "github.com/sirupsen/logrus/hooks/test"
)
func Test_startupHeadRoot(t *testing.T) {
service, tr := minimalTestService(t)
ctx := tr.ctx
hook := logTest.NewGlobal()
cp := service.FinalizedCheckpt()
require.DeepEqual(t, cp.Root, params.BeaconConfig().ZeroHash[:])
gr := [32]byte{'r', 'o', 'o', 't'}
service.originBlockRoot = gr
require.NoError(t, service.cfg.BeaconDB.SaveGenesisBlockRoot(ctx, gr))
t.Run("start from finalized", func(t *testing.T) {
require.Equal(t, service.startupHeadRoot(), gr)
})
t.Run("head requested, error path", func(t *testing.T) {
resetCfg := features.InitWithReset(&features.Flags{
ForceHead: "head",
})
defer resetCfg()
require.Equal(t, service.startupHeadRoot(), gr)
require.LogsContain(t, hook, "could not get head block root, starting with finalized block as head")
})
st, _ := util.DeterministicGenesisState(t, 64)
hr := [32]byte{'h', 'e', 'a', 'd'}
require.NoError(t, service.cfg.BeaconDB.SaveState(ctx, st, hr), "Could not save genesis state")
require.NoError(t, service.cfg.BeaconDB.SaveHeadBlockRoot(ctx, hr), "Could not save genesis state")
require.NoError(t, service.cfg.BeaconDB.SaveHeadBlockRoot(ctx, hr))
t.Run("start from head", func(t *testing.T) {
resetCfg := features.InitWithReset(&features.Flags{
ForceHead: "head",
})
defer resetCfg()
require.Equal(t, service.startupHeadRoot(), hr)
})
}
func Test_setupForkchoiceTree_Finalized(t *testing.T) {
service, tr := minimalTestService(t)
ctx := tr.ctx
st, _ := util.DeterministicGenesisState(t, 64)
stateRoot, err := st.HashTreeRoot(ctx)
require.NoError(t, err, "Could not hash genesis state")
require.NoError(t, service.saveGenesisData(ctx, st))
genesis := blocks.NewGenesisBlock(stateRoot[:])
wsb, err := consensusblocks.NewSignedBeaconBlock(genesis)
require.NoError(t, err)
require.NoError(t, service.cfg.BeaconDB.SaveBlock(ctx, wsb), "Could not save genesis block")
parentRoot, err := genesis.Block.HashTreeRoot()
require.NoError(t, err, "Could not get signing root")
require.NoError(t, service.cfg.BeaconDB.SaveState(ctx, st, parentRoot), "Could not save genesis state")
require.NoError(t, service.cfg.BeaconDB.SaveHeadBlockRoot(ctx, parentRoot), "Could not save genesis state")
require.NoError(t, service.cfg.BeaconDB.SaveJustifiedCheckpoint(ctx, &ethpb.Checkpoint{Root: parentRoot[:]}))
require.NoError(t, service.cfg.BeaconDB.SaveFinalizedCheckpoint(ctx, &ethpb.Checkpoint{Root: parentRoot[:]}))
require.NoError(t, service.setupForkchoiceTree(st))
require.Equal(t, 1, service.cfg.ForkChoiceStore.NodeCount())
}
func Test_setupForkchoiceTree_Head(t *testing.T) {
service, tr := minimalTestService(t)
ctx := tr.ctx
resetCfg := features.InitWithReset(&features.Flags{
ForceHead: "head",
})
defer resetCfg()
genesisState, keys := util.DeterministicGenesisState(t, 64)
stateRoot, err := genesisState.HashTreeRoot(ctx)
require.NoError(t, err, "Could not hash genesis state")
genesis := blocks.NewGenesisBlock(stateRoot[:])
wsb, err := consensusblocks.NewSignedBeaconBlock(genesis)
require.NoError(t, err)
genesisRoot, err := genesis.Block.HashTreeRoot()
require.NoError(t, err, "Could not get signing root")
require.NoError(t, service.cfg.BeaconDB.SaveBlock(ctx, wsb), "Could not save genesis block")
require.NoError(t, service.saveGenesisData(ctx, genesisState))
require.NoError(t, service.cfg.BeaconDB.SaveState(ctx, genesisState, genesisRoot), "Could not save genesis state")
require.NoError(t, service.cfg.BeaconDB.SaveHeadBlockRoot(ctx, genesisRoot), "Could not save genesis state")
st, err := service.HeadState(ctx)
require.NoError(t, err)
b, err := util.GenerateFullBlock(st, keys, util.DefaultBlockGenConfig(), primitives.Slot(1))
require.NoError(t, err)
wsb, err = consensusblocks.NewSignedBeaconBlock(b)
require.NoError(t, err)
root, err := b.Block.HashTreeRoot()
require.NoError(t, err)
preState, err := service.getBlockPreState(ctx, wsb.Block())
require.NoError(t, err)
postState, err := service.validateStateTransition(ctx, preState, wsb)
require.NoError(t, err)
require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState))
b, err = util.GenerateFullBlock(postState, keys, util.DefaultBlockGenConfig(), primitives.Slot(2))
require.NoError(t, err)
wsb, err = consensusblocks.NewSignedBeaconBlock(b)
require.NoError(t, err)
root, err = b.Block.HashTreeRoot()
require.NoError(t, err)
require.NoError(t, service.savePostStateInfo(ctx, root, wsb, preState))
require.NoError(t, service.cfg.BeaconDB.SaveHeadBlockRoot(ctx, root))
cp := service.FinalizedCheckpt()
fRoot := service.ensureRootNotZeros([32]byte(cp.Root))
require.NotEqual(t, fRoot, root)
require.Equal(t, root, service.startupHeadRoot())
require.NoError(t, service.setupForkchoiceTree(st))
require.Equal(t, 2, service.cfg.ForkChoiceStore.NodeCount())
}

View File

@@ -110,6 +110,7 @@ type HeadAccessDatabase interface {
// Block related methods.
HeadBlock(ctx context.Context) (interfaces.ReadOnlySignedBeaconBlock, error)
HeadBlockRoot() ([32]byte, error)
SaveHeadBlockRoot(ctx context.Context, blockRoot [32]byte) error
// Genesis operations.

View File

@@ -30,22 +30,32 @@ var errInvalidSlotRange = errors.New("invalid end slot and start slot provided")
func (s *Store) Block(ctx context.Context, blockRoot [32]byte) (interfaces.ReadOnlySignedBeaconBlock, error) {
ctx, span := trace.StartSpan(ctx, "BeaconDB.Block")
defer span.End()
// Return block from cache if it exists.
blk, err := s.getBlock(ctx, blockRoot, nil)
if errors.Is(err, ErrNotFound) {
return nil, nil
}
return blk, err
}
func (s *Store) getBlock(ctx context.Context, blockRoot [32]byte, tx *bolt.Tx) (interfaces.ReadOnlySignedBeaconBlock, error) {
if v, ok := s.blockCache.Get(string(blockRoot[:])); v != nil && ok {
return v.(interfaces.ReadOnlySignedBeaconBlock), nil
}
var blk interfaces.ReadOnlySignedBeaconBlock
err := s.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(blocksBucket)
enc := bkt.Get(blockRoot[:])
if enc == nil {
return nil
}
// This method allows the caller to pass in its tx if one is already open.
// Or if a nil value is used, a transaction will be managed intenally.
if tx == nil {
var err error
blk, err = unmarshalBlock(ctx, enc)
return err
})
return blk, err
tx, err = s.db.Begin(false)
if err != nil {
return nil, err
}
defer func() {
if err := tx.Rollback(); err != nil {
log.WithError(err).Error("could not rollback read-only getBlock transaction")
}
}()
}
return unmarshalBlock(ctx, tx.Bucket(blocksBucket).Get(blockRoot[:]))
}
// OriginCheckpointBlockRoot returns the value written to the db in SaveOriginCheckpointBlockRoot
@@ -70,6 +80,21 @@ func (s *Store) OriginCheckpointBlockRoot(ctx context.Context) ([32]byte, error)
return root, err
}
// HeadBlockRoot returns the latest canonical block root in the Ethereum Beacon Chain.
func (s *Store) HeadBlockRoot() ([32]byte, error) {
var root [32]byte
err := s.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(blocksBucket)
headRoot := bkt.Get(headBlockRootKey)
if len(headRoot) == 0 {
return errors.New("no head block root found")
}
copy(root[:], headRoot)
return nil
})
return root, err
}
// HeadBlock returns the latest canonical block in the Ethereum Beacon Chain.
func (s *Store) HeadBlock(ctx context.Context) (interfaces.ReadOnlySignedBeaconBlock, error) {
ctx, span := trace.StartSpan(ctx, "BeaconDB.HeadBlock")
@@ -227,6 +252,21 @@ func (s *Store) DeleteBlock(ctx context.Context, root [32]byte) error {
return ErrDeleteJustifiedAndFinalized
}
// Look up the block to find its slot; needed to remove the slot index entry.
blk, err := s.getBlock(ctx, root, tx)
if err != nil {
// getBlock can return ErrNotFound, in which case we won't even try to delete it.
if errors.Is(err, ErrNotFound) {
return nil
}
return err
}
if err := s.deleteSlotIndexEntry(tx, blk.Block().Slot(), root); err != nil {
return err
}
if err := s.deleteMatchingParentIndex(tx, blk.Block().ParentRoot(), root); err != nil {
return err
}
if err := s.deleteBlock(tx, root[:]); err != nil {
return err
}
@@ -899,6 +939,9 @@ func createBlockIndicesFromFilters(ctx context.Context, f *filters.QueryFilter)
// unmarshal block from marshaled proto beacon block bytes to versioned beacon block struct type.
func unmarshalBlock(_ context.Context, enc []byte) (interfaces.ReadOnlySignedBeaconBlock, error) {
if len(enc) == 0 {
return nil, errors.Wrap(ErrNotFound, "empty block bytes in db")
}
var err error
enc, err = snappy.Decode(nil, enc)
if err != nil {
@@ -1050,6 +1093,47 @@ func (s *Store) deleteBlock(tx *bolt.Tx, root []byte) error {
return nil
}
func (s *Store) deleteMatchingParentIndex(tx *bolt.Tx, parent, child [32]byte) error {
bkt := tx.Bucket(blockParentRootIndicesBucket)
if err := deleteRootIndexEntry(bkt, parent[:], child); err != nil {
return errors.Wrap(err, "could not delete parent root index entry")
}
return nil
}
func (s *Store) deleteSlotIndexEntry(tx *bolt.Tx, slot primitives.Slot, root [32]byte) error {
key := bytesutil.SlotToBytesBigEndian(slot)
bkt := tx.Bucket(blockSlotIndicesBucket)
if err := deleteRootIndexEntry(bkt, key, root); err != nil {
return errors.Wrap(err, "could not delete slot index entry")
}
return nil
}
func deleteRootIndexEntry(bkt *bolt.Bucket, key []byte, root [32]byte) error {
packed := bkt.Get(key)
if len(packed) == 0 {
return nil
}
updated, err := removeRoot(packed, root)
if err != nil {
return err
}
// Don't update the value if the root was not found.
if bytes.Equal(updated, packed) {
return nil
}
// If there are no other roots in the key, just delete it.
if len(updated) == 0 {
if err := bkt.Delete(key); err != nil {
return err
}
return nil
}
// Update the key with the root removed.
return bkt.Put(key, updated)
}
func (s *Store) deleteValidatorHashes(tx *bolt.Tx, root []byte) error {
ok, err := s.isStateValidatorMigrationOver()
if err != nil {

View File

@@ -196,9 +196,13 @@ func TestStore_BlocksCRUD(t *testing.T) {
blockRoot, err := blk.Block().HashTreeRoot()
require.NoError(t, err)
_, err = db.getBlock(ctx, blockRoot, nil)
require.ErrorIs(t, err, ErrNotFound)
retrievedBlock, err := db.Block(ctx, blockRoot)
require.NoError(t, err)
assert.DeepEqual(t, nil, retrievedBlock, "Expected nil block")
_, err = db.getBlock(ctx, blockRoot, nil)
require.ErrorIs(t, err, ErrNotFound)
require.NoError(t, db.SaveBlock(ctx, blk))
assert.Equal(t, true, db.HasBlock(ctx, blockRoot), "Expected block to exist in the db")
@@ -214,10 +218,34 @@ func TestStore_BlocksCRUD(t *testing.T) {
retrievedPb, err := retrievedBlock.Proto()
require.NoError(t, err)
assert.Equal(t, true, proto.Equal(wantedPb, retrievedPb), "Wanted: %v, received: %v", wanted, retrievedBlock)
// Check that the block is in the slot->block index
found, roots, err := db.BlockRootsBySlot(ctx, blk.Block().Slot())
require.NoError(t, err)
require.Equal(t, true, found)
require.Equal(t, 1, len(roots))
require.Equal(t, blockRoot, roots[0])
// Delete the block, then check that it is no longer in the index.
parent := blk.Block().ParentRoot()
testCheckParentIndices(t, db.db, parent, true)
require.NoError(t, db.DeleteBlock(ctx, blockRoot))
require.NoError(t, err)
testCheckParentIndices(t, db.db, parent, false)
found, roots, err = db.BlockRootsBySlot(ctx, blk.Block().Slot())
require.NoError(t, err)
require.Equal(t, false, found)
require.Equal(t, 0, len(roots))
})
}
}
func testCheckParentIndices(t *testing.T, db *bolt.DB, parent [32]byte, expected bool) {
require.NoError(t, db.View(func(tx *bolt.Tx) error {
require.Equal(t, expected, tx.Bucket(blockParentRootIndicesBucket).Get(parent[:]) != nil)
return nil
}))
}
func TestStore_BlocksHandleZeroCase(t *testing.T) {
for _, tt := range blockTests {
t.Run(tt.name, func(t *testing.T) {

View File

@@ -114,3 +114,27 @@ func splitRoots(b []byte) ([][32]byte, error) {
}
return rl, nil
}
func removeRoot(roots []byte, root [32]byte) ([]byte, error) {
if len(roots) == 0 {
return []byte{}, nil
}
if len(roots) == 32 && bytes.Equal(roots, root[:]) {
return []byte{}, nil
}
if len(roots)%32 != 0 {
return nil, errors.Wrapf(errMisalignedRootList, "root list len=%d", len(roots))
}
search := root[:]
for i := 0; i <= len(roots)-32; i += 32 {
if bytes.Equal(roots[i:i+32], search) {
result := make([]byte, len(roots)-32)
copy(result, roots[:i])
copy(result[i:], roots[i+32:])
return result, nil
}
}
return roots, nil
}

View File

@@ -1,6 +1,7 @@
package kv
import (
"bytes"
"context"
"crypto/rand"
"testing"
@@ -195,3 +196,85 @@ func TestSplitRoots(t *testing.T) {
})
}
}
func tPad(p ...[]byte) []byte {
r := make([]byte, 32*len(p))
for i, b := range p {
copy(r[i*32:], b)
}
return r
}
func TestRemoveRoot(t *testing.T) {
cases := []struct {
name string
roots []byte
root [32]byte
expect []byte
err error
}{
{
name: "empty",
roots: []byte{},
root: [32]byte{0xde, 0xad, 0xbe, 0xef},
expect: []byte{},
},
{
name: "single",
roots: tPad([]byte{0xde, 0xad, 0xbe, 0xef}),
root: [32]byte{0xde, 0xad, 0xbe, 0xef},
expect: []byte{},
},
{
name: "single, different",
roots: tPad([]byte{0xde, 0xad, 0xbe, 0xef}),
root: [32]byte{0xde, 0xad, 0xbe, 0xee},
expect: tPad([]byte{0xde, 0xad, 0xbe, 0xef}),
},
{
name: "multi",
roots: tPad([]byte{0xde, 0xad, 0xbe, 0xef}, []byte{0xac, 0x1d, 0xfa, 0xce}),
root: [32]byte{0xac, 0x1d, 0xfa, 0xce},
expect: tPad([]byte{0xde, 0xad, 0xbe, 0xef}),
},
{
name: "multi, reordered",
roots: tPad([]byte{0xac, 0x1d, 0xfa, 0xce}, []byte{0xde, 0xad, 0xbe, 0xef}),
root: [32]byte{0xac, 0x1d, 0xfa, 0xce},
expect: tPad([]byte{0xde, 0xad, 0xbe, 0xef}),
},
{
name: "multi, 3",
roots: tPad([]byte{0xac, 0x1d, 0xfa, 0xce}, []byte{0xbe, 0xef, 0xca, 0xb5}, []byte{0xde, 0xad, 0xbe, 0xef}),
root: [32]byte{0xac, 0x1d, 0xfa, 0xce},
expect: tPad([]byte{0xbe, 0xef, 0xca, 0xb5}, []byte{0xde, 0xad, 0xbe, 0xef}),
},
{
name: "multi, different",
roots: tPad([]byte{0xde, 0xad, 0xbe, 0xef}, []byte{0xac, 0x1d, 0xfa, 0xce}),
root: [32]byte{0xac, 0x1d, 0xbe, 0xa7},
expect: tPad([]byte{0xde, 0xad, 0xbe, 0xef}, []byte{0xac, 0x1d, 0xfa, 0xce}),
},
{
name: "misaligned",
roots: make([]byte, 61),
root: [32]byte{0xac, 0x1d, 0xbe, 0xa7},
err: errMisalignedRootList,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
before := make([]byte, len(c.roots))
copy(before, c.roots)
r, err := removeRoot(c.roots, c.root)
if c.err != nil {
require.ErrorIs(t, err, c.err)
return
}
require.NoError(t, err)
require.Equal(t, len(c.expect), len(r))
require.Equal(t, true, bytes.Equal(c.expect, r))
require.Equal(t, true, bytes.Equal(before, c.roots))
})
}
}

View File

@@ -1,8 +1,10 @@
package kv
import (
"bytes"
"context"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/monitoring/tracing/trace"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
bolt "go.etcd.io/bbolt"
@@ -19,7 +21,17 @@ func (s *Store) LastValidatedCheckpoint(ctx context.Context) (*ethpb.Checkpoint,
if enc == nil {
var finErr error
checkpoint, finErr = s.FinalizedCheckpoint(ctx)
return finErr
if finErr != nil {
return finErr
}
if bytes.Equal(checkpoint.Root, params.BeaconConfig().ZeroHash[:]) {
bkt = tx.Bucket(blocksBucket)
r := bkt.Get(genesisBlockRootKey)
if r != nil {
checkpoint.Root = r
}
}
return nil
}
checkpoint = &ethpb.Checkpoint{}
return decode(ctx, enc, checkpoint)

View File

@@ -252,6 +252,13 @@ func (s *Store) tips() ([][32]byte, []primitives.Slot) {
return roots, slots
}
func (f *ForkChoice) HighestReceivedBlockRoot() [32]byte {
if f.store.highestReceivedNode == nil {
return [32]byte{}
}
return f.store.highestReceivedNode.root
}
// HighestReceivedBlockSlot returns the highest slot received by the forkchoice
func (f *ForkChoice) HighestReceivedBlockSlot() primitives.Slot {
if f.store.highestReceivedNode == nil {

View File

@@ -65,6 +65,7 @@ type FastGetter interface {
FinalizedPayloadBlockHash() [32]byte
HasNode([32]byte) bool
HighestReceivedBlockSlot() primitives.Slot
HighestReceivedBlockRoot() [32]byte
HighestReceivedBlockDelay() primitives.Slot
IsCanonical(root [32]byte) bool
IsOptimistic(root [32]byte) (bool, error)

View File

@@ -114,6 +114,13 @@ func (ro *ROForkChoice) HighestReceivedBlockSlot() primitives.Slot {
return ro.getter.HighestReceivedBlockSlot()
}
// HighestReceivedBlockRoot delegates to the underlying forkchoice call, under a lock.
func (ro *ROForkChoice) HighestReceivedBlockRoot() [32]byte {
ro.l.RLock()
defer ro.l.RUnlock()
return ro.getter.HighestReceivedBlockRoot()
}
// HighestReceivedBlockDelay delegates to the underlying forkchoice call, under a lock.
func (ro *ROForkChoice) HighestReceivedBlockDelay() primitives.Slot {
ro.l.RLock()

View File

@@ -29,6 +29,7 @@ const (
unrealizedJustifiedPayloadBlockHashCalled
nodeCountCalled
highestReceivedBlockSlotCalled
highestReceivedBlockRootCalled
highestReceivedBlockDelayCalled
receivedBlocksLastEpochCalled
weightCalled
@@ -252,6 +253,11 @@ func (ro *mockROForkchoice) HighestReceivedBlockSlot() primitives.Slot {
return 0
}
func (ro *mockROForkchoice) HighestReceivedBlockRoot() [32]byte {
ro.calls = append(ro.calls, highestReceivedBlockRootCalled)
return [32]byte{}
}
func (ro *mockROForkchoice) HighestReceivedBlockDelay() primitives.Slot {
ro.calls = append(ro.calls, highestReceivedBlockDelayCalled)
return 0

View File

@@ -23,10 +23,7 @@ import (
func (c *AttCaches) AggregateUnaggregatedAttestations(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "operations.attestations.kv.AggregateUnaggregatedAttestations")
defer span.End()
unaggregatedAtts, err := c.UnaggregatedAttestations()
if err != nil {
return err
}
unaggregatedAtts := c.UnaggregatedAttestations()
return c.aggregateUnaggregatedAtts(ctx, unaggregatedAtts)
}

View File

@@ -9,6 +9,7 @@ import (
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1/attestation"
"github.com/prysmaticlabs/prysm/v5/runtime/version"
log "github.com/sirupsen/logrus"
)
// SaveUnaggregatedAttestation saves an unaggregated attestation in cache.
@@ -52,7 +53,7 @@ func (c *AttCaches) SaveUnaggregatedAttestations(atts []ethpb.Att) error {
}
// UnaggregatedAttestations returns all the unaggregated attestations in cache.
func (c *AttCaches) UnaggregatedAttestations() ([]ethpb.Att, error) {
func (c *AttCaches) UnaggregatedAttestations() []ethpb.Att {
c.unAggregateAttLock.RLock()
defer c.unAggregateAttLock.RUnlock()
unAggregatedAtts := c.unAggregatedAtt
@@ -60,13 +61,14 @@ func (c *AttCaches) UnaggregatedAttestations() ([]ethpb.Att, error) {
for _, att := range unAggregatedAtts {
seen, err := c.hasSeenBit(att)
if err != nil {
return nil, err
log.WithError(err).Debug("Could not check if unaggregated attestation's bit has been seen. Attestation will not be returned")
continue
}
if !seen {
atts = append(atts, att.Clone())
}
}
return atts, nil
return atts
}
// UnaggregatedAttestationsBySlotIndex returns the unaggregated attestations in cache,
@@ -137,7 +139,7 @@ func (c *AttCaches) DeleteUnaggregatedAttestation(att ethpb.Att) error {
}
if err := c.insertSeenBit(att); err != nil {
return err
log.WithError(err).Debug("Could not insert seen bit of unaggregated attestation. Attestation will be deleted")
}
id, err := attestation.NewId(att, attestation.Full)
@@ -163,7 +165,12 @@ func (c *AttCaches) DeleteSeenUnaggregatedAttestations() (int, error) {
if att == nil || att.IsNil() || att.IsAggregated() {
continue
}
if seen, err := c.hasSeenBit(att); err == nil && seen {
seen, err := c.hasSeenBit(att)
if err != nil {
log.WithError(err).Debug("Could not check if unaggregated attestation's bit has been seen. Attestation will be deleted")
seen = true
}
if seen {
delete(c.unAggregatedAtt, r)
count++
}

View File

@@ -17,6 +17,23 @@ import (
"github.com/prysmaticlabs/prysm/v5/testing/util"
)
func TestKV_Unaggregated_UnaggregatedAttestations(t *testing.T) {
t.Run("not returned when hasSeenBit fails", func(t *testing.T) {
att := util.HydrateAttestation(&ethpb.Attestation{Data: &ethpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b101}})
id, err := attestation.NewId(att, attestation.Data)
require.NoError(t, err)
cache := NewAttCaches()
require.NoError(t, cache.SaveUnaggregatedAttestation(att))
cache.seenAtt.Delete(id.String())
// cache a bitlist whose length is different from the attestation bitlist's length
cache.seenAtt.Set(id.String(), []bitfield.Bitlist{{0b1001}}, c.DefaultExpiration)
atts := cache.UnaggregatedAttestations()
assert.Equal(t, 0, len(atts))
})
}
func TestKV_Unaggregated_SaveUnaggregatedAttestation(t *testing.T) {
tests := []struct {
name string
@@ -151,10 +168,24 @@ func TestKV_Unaggregated_DeleteUnaggregatedAttestation(t *testing.T) {
for _, att := range atts {
assert.NoError(t, cache.DeleteUnaggregatedAttestation(att))
}
returned, err := cache.UnaggregatedAttestations()
require.NoError(t, err)
returned := cache.UnaggregatedAttestations()
assert.DeepEqual(t, []ethpb.Att{}, returned)
})
t.Run("deleted when insertSeenBit fails", func(t *testing.T) {
att := util.HydrateAttestation(&ethpb.Attestation{Data: &ethpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b101}})
id, err := attestation.NewId(att, attestation.Data)
require.NoError(t, err)
cache := NewAttCaches()
require.NoError(t, cache.SaveUnaggregatedAttestation(att))
cache.seenAtt.Delete(id.String())
// cache a bitlist whose length is different from the attestation bitlist's length
cache.seenAtt.Set(id.String(), []bitfield.Bitlist{{0b1001}}, c.DefaultExpiration)
require.NoError(t, cache.DeleteUnaggregatedAttestation(att))
assert.Equal(t, 0, len(cache.unAggregatedAtt), "Attestation was not deleted")
})
}
func TestKV_Unaggregated_DeleteSeenUnaggregatedAttestations(t *testing.T) {
@@ -201,11 +232,10 @@ func TestKV_Unaggregated_DeleteSeenUnaggregatedAttestations(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 1, count)
assert.Equal(t, 2, cache.UnaggregatedAttestationCount())
returned, err := cache.UnaggregatedAttestations()
returned := cache.UnaggregatedAttestations()
sort.Slice(returned, func(i, j int) bool {
return bytes.Compare(returned[i].GetAggregationBits(), returned[j].GetAggregationBits()) < 0
})
require.NoError(t, err)
assert.DeepEqual(t, []ethpb.Att{atts[0], atts[2]}, returned)
})
@@ -228,10 +258,26 @@ func TestKV_Unaggregated_DeleteSeenUnaggregatedAttestations(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 3, count)
assert.Equal(t, 0, cache.UnaggregatedAttestationCount())
returned, err := cache.UnaggregatedAttestations()
require.NoError(t, err)
returned := cache.UnaggregatedAttestations()
assert.DeepEqual(t, []ethpb.Att{}, returned)
})
t.Run("deleted when hasSeenBit fails", func(t *testing.T) {
att := util.HydrateAttestation(&ethpb.Attestation{Data: &ethpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b101}})
id, err := attestation.NewId(att, attestation.Data)
require.NoError(t, err)
cache := NewAttCaches()
require.NoError(t, cache.SaveUnaggregatedAttestation(att))
cache.seenAtt.Delete(id.String())
// cache a bitlist whose length is different from the attestation bitlist's length
cache.seenAtt.Set(id.String(), []bitfield.Bitlist{{0b1001}}, c.DefaultExpiration)
count, err := cache.DeleteSeenUnaggregatedAttestations()
require.NoError(t, err)
assert.Equal(t, 1, count)
assert.Equal(t, 0, len(cache.unAggregatedAtt), "Attestation was not deleted")
})
}
func TestKV_Unaggregated_UnaggregatedAttestationsBySlotIndex(t *testing.T) {

View File

@@ -79,8 +79,8 @@ func (m *PoolMock) SaveUnaggregatedAttestations(atts []ethpb.Att) error {
}
// UnaggregatedAttestations --
func (m *PoolMock) UnaggregatedAttestations() ([]ethpb.Att, error) {
return m.UnaggregatedAtts, nil
func (m *PoolMock) UnaggregatedAttestations() []ethpb.Att {
return m.UnaggregatedAtts
}
// UnaggregatedAttestationsBySlotIndex --

View File

@@ -26,7 +26,7 @@ type Pool interface {
// For unaggregated attestations.
SaveUnaggregatedAttestation(att ethpb.Att) error
SaveUnaggregatedAttestations(atts []ethpb.Att) error
UnaggregatedAttestations() ([]ethpb.Att, error)
UnaggregatedAttestations() []ethpb.Att
UnaggregatedAttestationsBySlotIndex(ctx context.Context, slot primitives.Slot, committeeIndex primitives.CommitteeIndex) []*ethpb.Attestation
UnaggregatedAttestationsBySlotIndexElectra(ctx context.Context, slot primitives.Slot, committeeIndex primitives.CommitteeIndex) []*ethpb.AttestationElectra
DeleteUnaggregatedAttestation(att ethpb.Att) error

View File

@@ -61,12 +61,8 @@ func (s *Service) pruneExpiredAtts() {
if _, err := s.cfg.Pool.DeleteSeenUnaggregatedAttestations(); err != nil {
log.WithError(err).Error("Cannot delete seen attestations")
}
unAggregatedAtts, err := s.cfg.Pool.UnaggregatedAttestations()
if err != nil {
log.WithError(err).Error("Could not get unaggregated attestations")
return
}
for _, att := range unAggregatedAtts {
for _, att := range s.cfg.Pool.UnaggregatedAttestations() {
if s.expired(att.GetData().Slot) {
if err := s.cfg.Pool.DeleteUnaggregatedAttestation(att); err != nil {
log.WithError(err).Error("Could not delete expired unaggregated attestation")

View File

@@ -54,9 +54,7 @@ func TestPruneExpired_Ticker(t *testing.T) {
done := make(chan struct{}, 1)
async.RunEvery(ctx, 500*time.Millisecond, func() {
atts, err := s.cfg.Pool.UnaggregatedAttestations()
require.NoError(t, err)
for _, attestation := range atts {
for _, attestation := range s.cfg.Pool.UnaggregatedAttestations() {
if attestation.GetData().Slot == 0 {
return
}

View File

@@ -7,6 +7,7 @@ import (
"reflect"
"time"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/pkg/errors"
ssz "github.com/prysmaticlabs/fastssz"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/altair"
@@ -27,7 +28,7 @@ var ErrMessageNotMapped = errors.New("message type is not mapped to a PubSub top
// Broadcast a message to the p2p network, the message is assumed to be
// broadcasted to the current fork.
func (s *Service) Broadcast(ctx context.Context, msg proto.Message) error {
func (s *Service) Broadcast(ctx context.Context, msg proto.Message, pubOpts ...pubsub.PubOpt) error {
ctx, span := trace.StartSpan(ctx, "p2p.Broadcast")
defer span.End()
@@ -51,7 +52,7 @@ func (s *Service) Broadcast(ctx context.Context, msg proto.Message) error {
if !ok {
return errors.Errorf("message of %T does not support marshaller interface", msg)
}
return s.broadcastObject(ctx, castMsg, fmt.Sprintf(topic, forkDigest))
return s.broadcastObject(ctx, castMsg, fmt.Sprintf(topic, forkDigest), pubOpts...)
}
// BroadcastAttestation broadcasts an attestation to the p2p network, the message is assumed to be
@@ -209,7 +210,7 @@ func (s *Service) broadcastSyncCommittee(ctx context.Context, subnet uint64, sMs
// BroadcastBlob broadcasts a blob to the p2p network, the message is assumed to be
// broadcasted to the current fork and to the input subnet.
func (s *Service) BroadcastBlob(ctx context.Context, subnet uint64, blob *ethpb.BlobSidecar) error {
func (s *Service) BroadcastBlob(ctx context.Context, subnet uint64, blob *ethpb.BlobSidecar, pubOpts ...pubsub.PubOpt) error {
ctx, span := trace.StartSpan(ctx, "p2p.BroadcastBlob")
defer span.End()
if blob == nil {
@@ -223,12 +224,12 @@ func (s *Service) BroadcastBlob(ctx context.Context, subnet uint64, blob *ethpb.
}
// Non-blocking broadcast, with attempts to discover a subnet peer if none available.
go s.internalBroadcastBlob(ctx, subnet, blob, forkDigest)
go s.internalBroadcastBlob(ctx, subnet, blob, forkDigest, pubOpts...)
return nil
}
func (s *Service) internalBroadcastBlob(ctx context.Context, subnet uint64, blobSidecar *ethpb.BlobSidecar, forkDigest [4]byte) {
func (s *Service) internalBroadcastBlob(ctx context.Context, subnet uint64, blobSidecar *ethpb.BlobSidecar, forkDigest [4]byte, pubOpts ...pubsub.PubOpt) {
_, span := trace.StartSpan(ctx, "p2p.internalBroadcastBlob")
defer span.End()
ctx = trace.NewContext(context.Background(), span) // clear parent context / deadline.
@@ -262,14 +263,14 @@ func (s *Service) internalBroadcastBlob(ctx context.Context, subnet uint64, blob
}
}
if err := s.broadcastObject(ctx, blobSidecar, blobSubnetToTopic(subnet, forkDigest)); err != nil {
if err := s.broadcastObject(ctx, blobSidecar, blobSubnetToTopic(subnet, forkDigest), pubOpts...); err != nil {
log.WithError(err).Error("Failed to broadcast blob sidecar")
tracing.AnnotateError(span, err)
}
}
// method to broadcast messages to other peers in our gossip mesh.
func (s *Service) broadcastObject(ctx context.Context, obj ssz.Marshaler, topic string) error {
func (s *Service) broadcastObject(ctx context.Context, obj ssz.Marshaler, topic string, pubOpts ...pubsub.PubOpt) error {
ctx, span := trace.StartSpan(ctx, "p2p.broadcastObject")
defer span.End()
@@ -289,7 +290,7 @@ func (s *Service) broadcastObject(ctx context.Context, obj ssz.Marshaler, topic
iid := int64(id)
span = trace.AddMessageSendEvent(span, iid, messageLen /*uncompressed*/, messageLen /*compressed*/)
}
if err := s.PublishToTopic(ctx, topic+s.Encoding().ProtocolSuffix(), buf.Bytes()); err != nil {
if err := s.PublishToTopic(ctx, topic+s.Encoding().ProtocolSuffix(), buf.Bytes(), pubOpts...); err != nil {
err := errors.Wrap(err, "could not publish message")
tracing.AnnotateError(span, err)
return err

View File

@@ -32,10 +32,10 @@ type P2P interface {
// Broadcaster broadcasts messages to peers over the p2p pubsub protocol.
type Broadcaster interface {
Broadcast(context.Context, proto.Message) error
Broadcast(context.Context, proto.Message, ...pubsub.PubOpt) error
BroadcastAttestation(ctx context.Context, subnet uint64, att ethpb.Att) error
BroadcastSyncCommitteeMessage(ctx context.Context, subnet uint64, sMsg *ethpb.SyncCommitteeMessage) error
BroadcastBlob(ctx context.Context, subnet uint64, blob *ethpb.BlobSidecar) error
BroadcastBlob(ctx context.Context, subnet uint64, blob *ethpb.BlobSidecar, pubOpts ...pubsub.PubOpt) error
}
// SetStreamHandler configures p2p to handle streams of a certain topic ID.

View File

@@ -55,11 +55,7 @@ func (s *Server) ListAttestations(w http.ResponseWriter, r *http.Request) {
attestations = s.AttestationCache.GetAll()
} else {
attestations = s.AttestationsPool.AggregatedAttestations()
unaggAtts, err := s.AttestationsPool.UnaggregatedAttestations()
if err != nil {
httputil.HandleError(w, "Could not get unaggregated attestations: "+err.Error(), http.StatusInternalServerError)
return
}
unaggAtts := s.AttestationsPool.UnaggregatedAttestations()
attestations = append(attestations, unaggAtts...)
}
@@ -114,11 +110,7 @@ func (s *Server) ListAttestationsV2(w http.ResponseWriter, r *http.Request) {
attestations = s.AttestationCache.GetAll()
} else {
attestations = s.AttestationsPool.AggregatedAttestations()
unaggAtts, err := s.AttestationsPool.UnaggregatedAttestations()
if err != nil {
httputil.HandleError(w, "Could not get unaggregated attestations: "+err.Error(), http.StatusInternalServerError)
return
}
unaggAtts := s.AttestationsPool.UnaggregatedAttestations()
attestations = append(attestations, unaggAtts...)
}
@@ -633,7 +625,7 @@ func (s *Server) SubmitBLSToExecutionChanges(w http.ResponseWriter, r *http.Requ
toBroadcast = append(toBroadcast, sbls)
}
}
go s.broadcastBLSChanges(ctx, toBroadcast)
go s.broadcastBLSChanges(context.Background(), toBroadcast)
if len(failures) > 0 {
failuresErr := &server.IndexedVerificationFailureError{
Code: http.StatusBadRequest,

View File

@@ -160,6 +160,8 @@ func TestGetSpec(t *testing.T) {
config.MaxTransactionsPerPayload = 99
config.FieldElementsPerBlob = 100
config.KzgCommitmentInclusionProofDepth = 101
config.BlobsidecarSubnetCount = 102
config.BlobsidecarSubnetCountElectra = 103
var dbp [4]byte
copy(dbp[:], []byte{'0', '0', '0', '1'})
@@ -198,7 +200,7 @@ func TestGetSpec(t *testing.T) {
data, ok := resp.Data.(map[string]interface{})
require.Equal(t, true, ok)
assert.Equal(t, 168, len(data))
assert.Equal(t, 170, len(data))
for k, v := range data {
t.Run(k, func(t *testing.T) {
switch k {
@@ -559,6 +561,10 @@ func TestGetSpec(t *testing.T) {
assert.Equal(t, "100", v)
case "KZG_COMMITMENT_INCLUSION_PROOF_DEPTH":
assert.Equal(t, "101", v)
case "BLOB_SIDECAR_SUBNET_COUNT":
assert.Equal(t, "102", v)
case "BLOB_SIDECAR_SUBNET_COUNT_ELECTRA":
assert.Equal(t, "103", v)
default:
t.Errorf("Incorrect key: %s", k)
}

View File

@@ -18,7 +18,8 @@ go_library(
"//beacon-chain/core/feed/operation:go_default_library",
"//beacon-chain/core/feed/state:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/time:go_default_library",
"//beacon-chain/core/transition:go_default_library",
"//beacon-chain/state:go_default_library",
"//config/params:go_default_library",
"//consensus-types/interfaces:go_default_library",
"//consensus-types/payload-attribute:go_default_library",
@@ -58,6 +59,7 @@ go_test(
"//consensus-types/interfaces:go_default_library",
"//consensus-types/payload-attribute:go_default_library",
"//consensus-types/primitives:go_default_library",
"//proto/engine/v1:go_default_library",
"//proto/eth/v1:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//testing/require:go_default_library",

View File

@@ -7,6 +7,7 @@ import (
"fmt"
"io"
"net/http"
"runtime/debug"
"strconv"
"time"
@@ -20,7 +21,8 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/operation"
statefeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers"
chaintime "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/time"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/transition"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces"
payloadattribute "github.com/prysmaticlabs/prysm/v5/consensus-types/payload-attribute"
@@ -352,9 +354,18 @@ func writeLazyReaderWithRecover(w *streamingResponseWriterController, lr lazyRea
if r := recover(); r != nil {
log.WithField("panic", r).Error("Recovered from panic while writing event to client.")
err = errWriterUnusable
debug.PrintStack()
}
}()
if lr == nil {
log.Warn("Event stream skipping a nil lazy event reader callback")
return nil
}
r := lr()
if r == nil {
log.Warn("Event stream skipping a nil event reader")
return nil
}
out, err := io.ReadAll(r)
if err != nil {
return err
@@ -600,27 +611,14 @@ func (s *Server) lazyReaderForEvent(ctx context.Context, event *feed.Event, topi
var errUnsupportedPayloadAttribute = errors.New("cannot compute payload attributes pre-Bellatrix")
func (s *Server) computePayloadAttributes(ctx context.Context, ev payloadattribute.EventData) (payloadattribute.Attributer, error) {
v := ev.HeadState.Version()
func (s *Server) computePayloadAttributes(ctx context.Context, st state.ReadOnlyBeaconState, root [32]byte, proposer primitives.ValidatorIndex, timestamp uint64, randao []byte) (payloadattribute.Attributer, error) {
v := st.Version()
if v < version.Bellatrix {
return nil, errors.Wrapf(errUnsupportedPayloadAttribute, "%s is not supported", version.String(v))
}
t, err := slots.ToTime(ev.HeadState.GenesisTime(), ev.HeadState.Slot())
if err != nil {
return nil, errors.Wrap(err, "could not get head state slot time")
}
timestamp := uint64(t.Unix())
prevRando, err := helpers.RandaoMix(ev.HeadState, chaintime.CurrentEpoch(ev.HeadState))
if err != nil {
return nil, errors.Wrap(err, "could not get head state randao mix")
}
proposerIndex, err := helpers.BeaconProposerIndex(ctx, ev.HeadState)
if err != nil {
return nil, errors.Wrap(err, "could not get head state proposer index")
}
feeRecpt := params.BeaconConfig().DefaultFeeRecipient.Bytes()
tValidator, exists := s.TrackedValidatorsCache.Validator(proposerIndex)
tValidator, exists := s.TrackedValidatorsCache.Validator(proposer)
if exists {
feeRecpt = tValidator.FeeRecipient[:]
}
@@ -628,34 +626,30 @@ func (s *Server) computePayloadAttributes(ctx context.Context, ev payloadattribu
if v == version.Bellatrix {
return payloadattribute.New(&engine.PayloadAttributes{
Timestamp: timestamp,
PrevRandao: prevRando,
PrevRandao: randao,
SuggestedFeeRecipient: feeRecpt,
})
}
w, _, err := ev.HeadState.ExpectedWithdrawals()
w, _, err := st.ExpectedWithdrawals()
if err != nil {
return nil, errors.Wrap(err, "could not get withdrawals from head state")
}
if v == version.Capella {
return payloadattribute.New(&engine.PayloadAttributesV2{
Timestamp: timestamp,
PrevRandao: prevRando,
PrevRandao: randao,
SuggestedFeeRecipient: feeRecpt,
Withdrawals: w,
})
}
pr, err := ev.HeadBlock.Block().HashTreeRoot()
if err != nil {
return nil, errors.Wrap(err, "could not compute head block root")
}
return payloadattribute.New(&engine.PayloadAttributesV3{
Timestamp: timestamp,
PrevRandao: prevRando,
PrevRandao: randao,
SuggestedFeeRecipient: feeRecpt,
Withdrawals: w,
ParentBeaconBlockRoot: pr[:],
ParentBeaconBlockRoot: root[:],
})
}
@@ -665,37 +659,75 @@ type asyncPayloadAttrData struct {
err error
}
var zeroRoot [32]byte
// needsFill allows tests to provide filled EventData values. An ordinary event data value fired by the blockchain package will have
// all of the checked fields empty, so the logical short circuit should hit immediately.
func needsFill(ev payloadattribute.EventData) bool {
return ev.HeadState == nil || ev.HeadState.IsNil() || ev.HeadState.LatestBlockHeader() == nil ||
ev.HeadBlock == nil || ev.HeadBlock.IsNil() ||
ev.HeadRoot == zeroRoot || len(ev.ParentBlockRoot) == 0 || len(ev.ParentBlockHash) == 0 ||
ev.Attributer == nil || ev.Attributer.IsEmpty()
}
func (s *Server) fillEventData(ctx context.Context, ev payloadattribute.EventData) (payloadattribute.EventData, error) {
if ev.HeadBlock == nil || ev.HeadBlock.IsNil() {
hb, err := s.HeadFetcher.HeadBlock(ctx)
if err != nil {
return ev, errors.Wrap(err, "Could not look up head block")
}
root, err := hb.Block().HashTreeRoot()
if err != nil {
return ev, errors.Wrap(err, "Could not compute head block root")
}
if ev.HeadRoot != root {
return ev, errors.Wrap(err, "head root changed before payload attribute event handler execution")
}
ev.HeadBlock = hb
payload, err := hb.Block().Body().Execution()
if err != nil {
return ev, errors.Wrap(err, "Could not get execution payload for head block")
}
ev.ParentBlockHash = payload.BlockHash()
ev.ParentBlockNumber = payload.BlockNumber()
var err error
if !needsFill(ev) {
return ev, nil
}
attr := ev.Attributer
if attr == nil || attr.IsEmpty() {
attr, err := s.computePayloadAttributes(ctx, ev)
if err != nil {
return ev, errors.Wrap(err, "Could not compute payload attributes")
}
ev.Attributer = attr
ev.HeadState, err = s.HeadFetcher.HeadState(ctx)
if err != nil {
return ev, errors.Wrap(err, "could not get head state")
}
return ev, nil
ev.HeadBlock, err = s.HeadFetcher.HeadBlock(ctx)
if err != nil {
return ev, errors.Wrap(err, "could not look up head block")
}
ev.HeadRoot, err = ev.HeadBlock.Block().HashTreeRoot()
if err != nil {
return ev, errors.Wrap(err, "could not compute head block root")
}
pr := ev.HeadBlock.Block().ParentRoot()
ev.ParentBlockRoot = pr[:]
hsr, err := ev.HeadState.LatestBlockHeader().HashTreeRoot()
if err != nil {
return ev, errors.Wrap(err, "could not compute latest block header root")
}
pse := slots.ToEpoch(ev.ProposalSlot)
st := ev.HeadState
if slots.ToEpoch(st.Slot()) != pse {
st, err = transition.ProcessSlotsUsingNextSlotCache(ctx, st, hsr[:], ev.ProposalSlot)
if err != nil {
return ev, errors.Wrap(err, "could not run process blocks on head state into the proposal slot epoch")
}
}
ev.ProposerIndex, err = helpers.BeaconProposerIndexAtSlot(ctx, st, ev.ProposalSlot)
if err != nil {
return ev, errors.Wrap(err, "failed to compute proposer index")
}
randao, err := helpers.RandaoMix(st, pse)
if err != nil {
return ev, errors.Wrap(err, "could not get head state randado")
}
payload, err := ev.HeadBlock.Block().Body().Execution()
if err != nil {
return ev, errors.Wrap(err, "could not get execution payload for head block")
}
ev.ParentBlockHash = payload.BlockHash()
ev.ParentBlockNumber = payload.BlockNumber()
t, err := slots.ToTime(st.GenesisTime(), ev.ProposalSlot)
if err != nil {
return ev, errors.Wrap(err, "could not get head state slot time")
}
ev.Attributer, err = s.computePayloadAttributes(ctx, st, hsr, ev.ProposerIndex, uint64(t.Unix()), randao)
return ev, err
}
// This event stream is intended to be used by builders and relays.
@@ -704,10 +736,7 @@ func (s *Server) payloadAttributesReader(ctx context.Context, ev payloadattribut
ctx, cancel := context.WithTimeout(ctx, payloadAttributeTimeout)
edc := make(chan asyncPayloadAttrData)
go func() {
d := asyncPayloadAttrData{
version: version.String(ev.HeadState.Version()),
}
d := asyncPayloadAttrData{}
defer func() {
edc <- d
}()
@@ -716,6 +745,7 @@ func (s *Server) payloadAttributesReader(ctx context.Context, ev payloadattribut
d.err = errors.Wrap(err, "Could not fill event data")
return
}
d.version = version.String(ev.HeadBlock.Version())
attributesBytes, err := marshalAttributes(ev.Attributer)
if err != nil {
d.err = errors.Wrap(err, "errors marshaling payload attributes to json")

View File

@@ -2,6 +2,7 @@ package events
import (
"context"
"encoding/binary"
"fmt"
"io"
"math"
@@ -24,6 +25,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces"
payloadattribute "github.com/prysmaticlabs/prysm/v5/consensus-types/payload-attribute"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
enginev1 "github.com/prysmaticlabs/prysm/v5/proto/engine/v1"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/eth/v1"
eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/testing/require"
@@ -557,6 +559,110 @@ func TestStreamEvents_OperationsEvents(t *testing.T) {
})
}
func TestFillEventData(t *testing.T) {
ctx := context.Background()
t.Run("AlreadyFilledData_ShouldShortCircuitWithoutError", func(t *testing.T) {
st, err := util.NewBeaconStateBellatrix()
require.NoError(t, err)
b, err := blocks.NewSignedBeaconBlock(util.HydrateSignedBeaconBlockBellatrix(&eth.SignedBeaconBlockBellatrix{}))
require.NoError(t, err)
attributor, err := payloadattribute.New(&enginev1.PayloadAttributes{
Timestamp: uint64(time.Now().Unix()),
})
require.NoError(t, err)
alreadyFilled := payloadattribute.EventData{
HeadState: st,
HeadBlock: b,
HeadRoot: [32]byte{1, 2, 3},
Attributer: attributor,
ParentBlockRoot: []byte{1, 2, 3},
ParentBlockHash: []byte{4, 5, 6},
}
srv := &Server{} // No real HeadFetcher needed here since it won't be called.
result, err := srv.fillEventData(ctx, alreadyFilled)
require.NoError(t, err)
require.DeepEqual(t, alreadyFilled, result)
})
t.Run("Electra PartialData_ShouldFetchHeadStateAndBlock", func(t *testing.T) {
st, err := util.NewBeaconStateElectra()
require.NoError(t, err)
valCount := 10
setActiveValidators(t, st, valCount)
inactivityScores := make([]uint64, valCount)
for i := range inactivityScores {
inactivityScores[i] = 10
}
require.NoError(t, st.SetInactivityScores(inactivityScores))
b, err := blocks.NewSignedBeaconBlock(util.HydrateSignedBeaconBlockElectra(&eth.SignedBeaconBlockElectra{}))
require.NoError(t, err)
attributor, err := payloadattribute.New(&enginev1.PayloadAttributes{
Timestamp: uint64(time.Now().Unix()),
})
require.NoError(t, err)
// Create an event data object missing certain fields:
partial := payloadattribute.EventData{
// The presence of a nil HeadState, nil HeadBlock, zeroed HeadRoot, etc.
// will cause fillEventData to try to fill the values.
ProposalSlot: 42, // different epoch from current slot
Attributer: attributor, // Must be Bellatrix or later
}
currentSlot := primitives.Slot(0)
// to avoid slot processing
require.NoError(t, st.SetSlot(currentSlot+1))
mockChainService := &mockChain.ChainService{
Root: make([]byte, 32),
State: st,
Block: b,
Slot: &currentSlot,
}
stn := mockChain.NewEventFeedWrapper()
opn := mockChain.NewEventFeedWrapper()
srv := &Server{
StateNotifier: &mockChain.SimpleNotifier{Feed: stn},
OperationNotifier: &mockChain.SimpleNotifier{Feed: opn},
HeadFetcher: mockChainService,
ChainInfoFetcher: mockChainService,
TrackedValidatorsCache: cache.NewTrackedValidatorsCache(),
EventWriteTimeout: testEventWriteTimeout,
}
filled, err := srv.fillEventData(ctx, partial)
require.NoError(t, err, "expected successful fill of partial event data")
// Verify that fields have been updated from the mock data:
require.NotNil(t, filled.HeadState, "HeadState should be assigned")
require.NotNil(t, filled.HeadBlock, "HeadBlock should be assigned")
require.NotEqual(t, [32]byte{}, filled.HeadRoot, "HeadRoot should no longer be zero")
require.NotEmpty(t, filled.ParentBlockRoot, "ParentBlockRoot should be filled")
require.NotEmpty(t, filled.ParentBlockHash, "ParentBlockHash should be filled")
require.Equal(t, uint64(0), filled.ParentBlockNumber, "ParentBlockNumber must match mock block")
// Check that a valid Attributer was set:
require.NotNil(t, filled.Attributer, "Should have a valid payload attributes object")
require.Equal(t, false, filled.Attributer.IsEmpty(), "Attributer should not be empty after fill")
})
}
func setActiveValidators(t *testing.T, st state.BeaconState, count int) {
balances := make([]uint64, count)
validators := make([]*eth.Validator, 0, count)
for i := 0; i < count; i++ {
pubKey := make([]byte, params.BeaconConfig().BLSPubkeyLength)
binary.LittleEndian.PutUint64(pubKey, uint64(i))
balances[i] = uint64(i)
validators = append(validators, &eth.Validator{
PublicKey: pubKey,
ActivationEpoch: 0,
ExitEpoch: params.BeaconConfig().FarFutureEpoch,
WithdrawalCredentials: make([]byte, 32),
})
}
require.NoError(t, st.SetValidators(validators))
require.NoError(t, st.SetBalances(balances))
}
func TestStuckReaderScenarios(t *testing.T) {
cases := []struct {
name string

View File

@@ -162,11 +162,7 @@ func (s *Server) aggregatedAttestation(w http.ResponseWriter, slot primitives.Sl
return nil
}
atts, err := s.AttestationsPool.UnaggregatedAttestations()
if err != nil {
httputil.HandleError(w, "Could not get unaggregated attestations: "+err.Error(), http.StatusInternalServerError)
return nil
}
atts := s.AttestationsPool.UnaggregatedAttestations()
match, err = matchingAtts(atts, slot, attDataRoot, index)
if err != nil {
httputil.HandleError(w, "Could not get matching attestations: "+err.Error(), http.StatusInternalServerError)
@@ -636,6 +632,16 @@ func (s *Server) ProduceSyncCommitteeContribution(w http.ResponseWriter, r *http
ctx, span := trace.StartSpan(r.Context(), "validator.ProduceSyncCommitteeContribution")
defer span.End()
isOptimistic, err := s.OptimisticModeFetcher.IsOptimistic(ctx)
if err != nil {
httputil.HandleError(w, err.Error(), http.StatusInternalServerError)
return
}
if isOptimistic {
httputil.HandleError(w, "Beacon node is currently syncing and not serving request on that endpoint", http.StatusServiceUnavailable)
return
}
_, index, ok := shared.UintFromQuery(w, r, "subcommittee_index", true)
if !ok {
return

View File

@@ -118,8 +118,7 @@ func TestGetAggregateAttestation(t *testing.T) {
pool := attestations.NewPool()
require.NoError(t, pool.SaveUnaggregatedAttestations([]ethpbalpha.Att{unaggSlot3_Root1_1, unaggSlot3_Root1_2, unaggSlot3_Root2, unaggSlot4}), "Failed to save unaggregated attestations")
unagg, err := pool.UnaggregatedAttestations()
require.NoError(t, err)
unagg := pool.UnaggregatedAttestations()
require.Equal(t, 4, len(unagg), "Expected 4 unaggregated attestations")
require.NoError(t, pool.SaveAggregatedAttestations([]ethpbalpha.Att{aggSlot1_Root1_1, aggSlot1_Root1_2, aggSlot1_Root2, aggSlot2}), "Failed to save aggregated attestations")
agg := pool.AggregatedAttestations()
@@ -268,8 +267,7 @@ func TestGetAggregateAttestation(t *testing.T) {
pool := attestations.NewPool()
require.NoError(t, pool.SaveUnaggregatedAttestations([]ethpbalpha.Att{unaggSlot3_Root1_1, unaggSlot3_Root1_2, unaggSlot3_Root2, unaggSlot4}), "Failed to save unaggregated attestations")
unagg, err := pool.UnaggregatedAttestations()
require.NoError(t, err)
unagg := pool.UnaggregatedAttestations()
require.Equal(t, 4, len(unagg), "Expected 4 unaggregated attestations")
require.NoError(t, pool.SaveAggregatedAttestations([]ethpbalpha.Att{aggSlot1_Root1_1, aggSlot1_Root1_2, aggSlot1_Root2, aggSlot2, postElectraAtt}), "Failed to save aggregated attestations")
agg := pool.AggregatedAttestations()
@@ -373,8 +371,7 @@ func TestGetAggregateAttestation(t *testing.T) {
pool := attestations.NewPool()
require.NoError(t, pool.SaveUnaggregatedAttestations([]ethpbalpha.Att{unaggSlot3_Root1_1, unaggSlot3_Root1_2, unaggSlot3_Root2, unaggSlot4}), "Failed to save unaggregated attestations")
unagg, err := pool.UnaggregatedAttestations()
require.NoError(t, err)
unagg := pool.UnaggregatedAttestations()
require.Equal(t, 4, len(unagg), "Expected 4 unaggregated attestations")
require.NoError(t, pool.SaveAggregatedAttestations([]ethpbalpha.Att{aggSlot1_Root1_1, aggSlot1_Root1_2, aggSlot1_Root2, aggSlot2, preElectraAtt}), "Failed to save aggregated attestations")
agg := pool.AggregatedAttestations()
@@ -1584,7 +1581,8 @@ func TestProduceSyncCommitteeContribution(t *testing.T) {
SyncCommitteeIndices: []primitives.CommitteeIndex{0},
},
},
SyncCommitteePool: syncCommitteePool,
SyncCommitteePool: syncCommitteePool,
OptimisticModeFetcher: &mockChain.ChainService{},
}
t.Run("ok", func(t *testing.T) {
url := "http://example.com?slot=1&subcommittee_index=1&beacon_block_root=0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2"
@@ -1672,7 +1670,8 @@ func TestProduceSyncCommitteeContribution(t *testing.T) {
SyncCommitteeIndices: []primitives.CommitteeIndex{0},
},
},
SyncCommitteePool: syncCommitteePool,
SyncCommitteePool: syncCommitteePool,
OptimisticModeFetcher: &mockChain.ChainService{},
}
server.ProduceSyncCommitteeContribution(writer, request)
assert.Equal(t, http.StatusNotFound, writer.Code)
@@ -1680,6 +1679,26 @@ func TestProduceSyncCommitteeContribution(t *testing.T) {
require.NoError(t, json.Unmarshal(writer.Body.Bytes(), resp2))
require.ErrorContains(t, "No subcommittee messages found", errors.New(writer.Body.String()))
})
t.Run("Optimistic returns 503", func(t *testing.T) {
url := "http://example.com?slot=1&subcommittee_index=1&beacon_block_root=0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2"
request := httptest.NewRequest(http.MethodGet, url, nil)
writer := httptest.NewRecorder()
writer.Body = &bytes.Buffer{}
syncCommitteePool = synccommittee.NewStore()
server = Server{
CoreService: &core.Service{
HeadFetcher: &mockChain.ChainService{
SyncCommitteeIndices: []primitives.CommitteeIndex{0},
},
},
SyncCommitteePool: syncCommitteePool,
OptimisticModeFetcher: &mockChain.ChainService{
Optimistic: true,
},
}
server.ProduceSyncCommitteeContribution(writer, request)
assert.Equal(t, http.StatusServiceUnavailable, writer.Code)
})
}
func TestServer_RegisterValidator(t *testing.T) {

View File

@@ -58,6 +58,7 @@ go_library(
"//beacon-chain/operations/synccommittee:go_default_library",
"//beacon-chain/operations/voluntaryexits:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//beacon-chain/p2p/encoder:go_default_library",
"//beacon-chain/rpc/core:go_default_library",
"//beacon-chain/startup:go_default_library",
"//beacon-chain/state:go_default_library",
@@ -94,6 +95,8 @@ go_library(
"@com_github_ethereum_go_ethereum//common:go_default_library",
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",
"@com_github_golang_protobuf//ptypes/empty",
"@com_github_libp2p_go_libp2p_pubsub//:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//pb:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",

View File

@@ -1,6 +1,7 @@
package validator
import (
"bytes"
"context"
"fmt"
"strings"
@@ -10,7 +11,10 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
emptypb "github.com/golang/protobuf/ptypes/empty"
pubsub "github.com/libp2p/go-libp2p-pubsub"
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/pkg/errors"
ssz "github.com/prysmaticlabs/fastssz"
builderapi "github.com/prysmaticlabs/prysm/v5/api/client/builder"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/builder"
@@ -21,12 +25,15 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/transition"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/kv"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/encoder"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/monitoring/tracing/trace"
"github.com/prysmaticlabs/prysm/v5/network/forks"
enginev1 "github.com/prysmaticlabs/prysm/v5/proto/engine/v1"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/runtime/version"
@@ -297,21 +304,24 @@ func (vs *Server) ProposeBeaconBlock(ctx context.Context, req *ethpb.GenericSign
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not hash tree root: %v", err)
}
bOpt, err := vs.createBatchOption(block, sidecars)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not create option: %v", err)
}
var wg sync.WaitGroup
errChan := make(chan error, 1)
wg.Add(1)
go func() {
defer wg.Done()
if err := vs.broadcastReceiveBlock(ctx, block, root); err != nil {
if err := vs.broadcastReceiveBlock(ctx, block, root, bOpt); err != nil {
errChan <- errors.Wrap(err, "broadcast/receive block failed")
return
}
errChan <- nil
}()
if err := vs.broadcastAndReceiveBlobs(ctx, sidecars, root); err != nil {
if err := vs.broadcastAndReceiveBlobs(ctx, sidecars, root, bOpt); err != nil {
return nil, status.Errorf(codes.Internal, "Could not broadcast/receive blobs: %v", err)
}
@@ -323,6 +333,39 @@ func (vs *Server) ProposeBeaconBlock(ctx context.Context, req *ethpb.GenericSign
return &ethpb.ProposeResponse{BlockRoot: root[:]}, nil
}
func (vs *Server) createBatchOption(block interfaces.SignedBeaconBlock, blobs []*ethpb.BlobSidecar) (pubsub.PubOpt, error) {
sszEnc := &encoder.SszNetworkEncoder{}
pblk, err := block.Proto()
if err != nil {
return nil, err
}
buf := bytes.NewBuffer([]byte{})
if _, err := sszEnc.EncodeGossip(buf, pblk.(ssz.Marshaler)); err != nil {
return nil, err
}
genValRoot := vs.GenesisFetcher.GenesisValidatorsRoot()
currDigest, err := forks.CreateForkDigest(vs.TimeFetcher.GenesisTime(), genValRoot[:])
if err != nil {
return nil, err
}
topicStr := fmt.Sprintf(p2p.BlockSubnetTopicFormat, currDigest) + sszEnc.ProtocolSuffix()
blockID := p2p.MsgID(genValRoot[:], &pubsub_pb.Message{Data: buf.Bytes(), Topic: &topicStr})
bm := pubsub.NewBatchMessage()
bm.AddMessage(blockID)
for i, b := range blobs {
blobTopicStr := fmt.Sprintf(p2p.BlobSubnetTopicFormat, currDigest, i) + sszEnc.ProtocolSuffix()
buf = bytes.NewBuffer([]byte{})
if _, err := sszEnc.EncodeGossip(buf, b); err != nil {
return nil, err
}
blobID := p2p.MsgID(genValRoot[:], &pubsub_pb.Message{Data: buf.Bytes(), Topic: &blobTopicStr})
bm.AddMessage(blobID)
}
return pubsub.WithBatchPublishing(bm), nil
}
// handleBlindedBlock processes blinded beacon blocks.
func (vs *Server) handleBlindedBlock(ctx context.Context, block interfaces.SignedBeaconBlock) (interfaces.SignedBeaconBlock, []*ethpb.BlobSidecar, error) {
if block.Version() < version.Bellatrix {
@@ -363,12 +406,12 @@ func (vs *Server) blobSidecarsFromUnblindedBlock(block interfaces.SignedBeaconBl
}
// broadcastReceiveBlock broadcasts a block and handles its reception.
func (vs *Server) broadcastReceiveBlock(ctx context.Context, block interfaces.SignedBeaconBlock, root [32]byte) error {
func (vs *Server) broadcastReceiveBlock(ctx context.Context, block interfaces.SignedBeaconBlock, root [32]byte, pubOpts ...pubsub.PubOpt) error {
protoBlock, err := block.Proto()
if err != nil {
return errors.Wrap(err, "protobuf conversion failed")
}
if err := vs.P2P.Broadcast(ctx, protoBlock); err != nil {
if err := vs.P2P.Broadcast(ctx, protoBlock, pubOpts...); err != nil {
return errors.Wrap(err, "broadcast failed")
}
vs.BlockNotifier.BlockFeed().Send(&feed.Event{
@@ -379,7 +422,7 @@ func (vs *Server) broadcastReceiveBlock(ctx context.Context, block interfaces.Si
}
// broadcastAndReceiveBlobs handles the broadcasting and reception of blob sidecars.
func (vs *Server) broadcastAndReceiveBlobs(ctx context.Context, sidecars []*ethpb.BlobSidecar, root [32]byte) error {
func (vs *Server) broadcastAndReceiveBlobs(ctx context.Context, sidecars []*ethpb.BlobSidecar, root [32]byte, pubOpts ...pubsub.PubOpt) error {
eg, eCtx := errgroup.WithContext(ctx)
for i, sc := range sidecars {
// Copy the iteration instance to a local variable to give each go-routine its own copy to play with.
@@ -387,7 +430,7 @@ func (vs *Server) broadcastAndReceiveBlobs(ctx context.Context, sidecars []*ethp
subIdx := i
sCar := sc
eg.Go(func() error {
if err := vs.P2P.BroadcastBlob(eCtx, uint64(subIdx), sCar); err != nil {
if err := vs.P2P.BroadcastBlob(eCtx, uint64(subIdx), sCar, pubOpts...); err != nil {
return errors.Wrap(err, "broadcast blob failed")
}
readOnlySc, err := blocks.NewROBlobWithRoot(sCar, root)

View File

@@ -40,10 +40,7 @@ func (vs *Server) packAttestations(ctx context.Context, latestState state.Beacon
atts = vs.AttPool.AggregatedAttestations()
atts = vs.validateAndDeleteAttsInPool(ctx, latestState, atts)
uAtts, err := vs.AttPool.UnaggregatedAttestations()
if err != nil {
return nil, errors.Wrap(err, "could not get unaggregated attestations")
}
uAtts := vs.AttPool.UnaggregatedAttestations()
uAtts = vs.validateAndDeleteAttsInPool(ctx, latestState, uAtts)
atts = append(atts, uAtts...)
}

View File

@@ -2949,8 +2949,7 @@ func TestProposer_DeleteAttsInPool_Aggregated(t *testing.T) {
require.NoError(t, err)
require.NoError(t, s.deleteAttsInPool(context.Background(), append(aa, unaggregatedAtts...)))
assert.Equal(t, 0, len(s.AttPool.AggregatedAttestations()), "Did not delete aggregated attestation")
atts, err := s.AttPool.UnaggregatedAttestations()
require.NoError(t, err)
atts := s.AttPool.UnaggregatedAttestations()
assert.Equal(t, 0, len(atts), "Did not delete unaggregated attestation")
}

View File

@@ -153,8 +153,7 @@ func TestProcessPendingAtts_HasBlockSaveUnAggregatedAtt(t *testing.T) {
}
}
}()
atts, err := r.cfg.attPool.UnaggregatedAttestations()
require.NoError(t, err)
atts := r.cfg.attPool.UnaggregatedAttestations()
assert.Equal(t, 1, len(atts), "Did not save unaggregated att")
assert.DeepEqual(t, att, atts[0], "Incorrect saved att")
assert.Equal(t, 0, len(r.cfg.attPool.AggregatedAttestations()), "Did save aggregated att")
@@ -248,8 +247,7 @@ func TestProcessPendingAtts_HasBlockSaveUnAggregatedAttElectra(t *testing.T) {
}
}
}()
atts, err := r.cfg.attPool.UnaggregatedAttestations()
require.NoError(t, err)
atts := r.cfg.attPool.UnaggregatedAttestations()
require.Equal(t, 1, len(atts), "Did not save unaggregated att")
assert.DeepEqual(t, att.ToAttestationElectra(committee), atts[0], "Incorrect saved att")
assert.Equal(t, 0, len(r.cfg.attPool.AggregatedAttestations()), "Did save aggregated att")
@@ -457,8 +455,7 @@ func TestProcessPendingAtts_HasBlockSaveAggregatedAtt(t *testing.T) {
assert.Equal(t, 1, len(r.cfg.attPool.AggregatedAttestations()), "Did not save aggregated att")
assert.DeepEqual(t, att, r.cfg.attPool.AggregatedAttestations()[0], "Incorrect saved att")
atts, err := r.cfg.attPool.UnaggregatedAttestations()
require.NoError(t, err)
atts := r.cfg.attPool.UnaggregatedAttestations()
assert.Equal(t, 0, len(atts), "Did save aggregated att")
require.LogsContain(t, hook, "Verified and saved pending attestations to pool")
cancel()

View File

@@ -57,7 +57,6 @@ func TestBeaconAggregateProofSubscriber_CanSaveUnaggregatedAttestation(t *testin
}
require.NoError(t, r.beaconAggregateProofSubscriber(context.Background(), a))
atts, err := r.cfg.attPool.UnaggregatedAttestations()
require.NoError(t, err)
atts := r.cfg.attPool.UnaggregatedAttestations()
assert.DeepEqual(t, []ethpb.Att{a.Message.Aggregate}, atts, "Did not save unaggregated attestation")
}

View File

@@ -0,0 +1,3 @@
### Fixed
- /eth/v1/validator/sync_committee_contribution should check for optimistic status and return a 503 if it's optimistic.

View File

@@ -0,0 +1,3 @@
### Fixed
- fixed /eth/v1/config/spec displays BLOB_SIDECAR_SUBNET_COUNT,BLOB_SIDECAR_SUBNET_COUNT_ELECTRA

View File

@@ -0,0 +1,4 @@
### Fixed
- Fixes printing superfluous response.WriteHeader call from error in builder.
- Fixes e2e run with builder having wrong gaslimit header due to not being set on eth1 nodes.

View File

@@ -0,0 +1,3 @@
### Changed
- changed request object for `POST /eth/v1/beacon/states/head/validators` to omit the field if empty for satisfying other clients.

View File

@@ -0,0 +1,3 @@
### Changed
- Updated default gas limit from 30M to 36M

View File

@@ -0,0 +1,2 @@
### Fixed
- Ensure that deleting a block from the database clears its entry in the slot->root db index.

View File

@@ -0,0 +1,2 @@
### Fixed
- Fixed a bug in the event stream handler when processing payload attribute events where the timestamp and slot of the event would be based on the head rather than the current slot.

View File

@@ -0,0 +1,3 @@
### Ignored
- When starting a node, check that the last validated checkpoint has zero as root and return the genesis block root

View File

@@ -0,0 +1,3 @@
### Added
- Added a feature flag to sync from an arbitrary beacon block root at startup.

3
changelog/pvl_bls-ctx.md Normal file
View File

@@ -0,0 +1,3 @@
### Fixed
- Broadcasting BLS to execution changes should not use the request context in a go routine. Use context.Background() for the broadcasting go routine.

View File

@@ -0,0 +1,3 @@
### Changed
- Ignore errors from `hasSeenBit` and don't pack unaggregated attestations.

View File

@@ -0,0 +1,3 @@
### Fixed
- Handle unaggregated attestations when decomposing Electra block attestations.

View File

@@ -86,6 +86,9 @@ type Flags struct {
// AggregateIntervals specifies the time durations at which we aggregate attestations preparing for forkchoice.
AggregateIntervals [3]time.Duration
// Feature related flags (alignment forced in the end)
ForceHead string // ForceHead forces the head block to be a specific block root, the last head block, or the last finalized block.
}
var featureConfig *Flags
@@ -268,6 +271,10 @@ func ConfigureBeaconChain(ctx *cli.Context) error {
logEnabled(enableExperimentalAttestationPool)
cfg.EnableExperimentalAttestationPool = true
}
if ctx.IsSet(forceHeadFlag.Name) {
logEnabled(forceHeadFlag)
cfg.ForceHead = ctx.String(forceHeadFlag.Name)
}
cfg.AggregateIntervals = [3]time.Duration{aggregateFirstInterval.Value, aggregateSecondInterval.Value, aggregateThirdInterval.Value}
Init(cfg)

View File

@@ -174,6 +174,12 @@ var (
Name: "enable-experimental-attestation-pool",
Usage: "Enables an experimental attestation pool design.",
}
// forceHeadFlag is a flag to force the head of the beacon chain to a specific block.
forceHeadFlag = &cli.StringFlag{
Name: "sync-from",
Usage: "Forces the head of the beacon chain to a specific block root. Values can be 'head' or a block root." +
" The block root has to be known to the beacon node and correspond to a block newer than the current finalized checkpoint.",
}
)
// devModeFlags holds list of flags that are set when development mode is on.
@@ -230,6 +236,7 @@ var BeaconChainFlags = combinedFlags([]cli.Flag{
DisableCommitteeAwarePacking,
EnableDiscoveryReboot,
enableExperimentalAttestationPool,
forceHeadFlag,
}, deprecatedBeaconFlags, deprecatedFlags, upcomingDeprecation)
func combinedFlags(flags ...[]cli.Flag) []cli.Flag {

View File

@@ -235,8 +235,8 @@ type BeaconChainConfig struct {
ExecutionEngineTimeoutValue uint64 // ExecutionEngineTimeoutValue defines the seconds to wait before timing out engine endpoints with execution payload execution semantics (newPayload, forkchoiceUpdated).
// Subnet value
BlobsidecarSubnetCount uint64 `yaml:"BLOB_SIDECAR_SUBNET_COUNT"` // BlobsidecarSubnetCount is the number of blobsidecar subnets used in the gossipsub protocol.
BlobsidecarSubnetCountElectra uint64 `yaml:"BLOB_SIDECAR_SUBNET_COUNT_ELECTRA"` // BlobsidecarSubnetCountElectra is the number of blobsidecar subnets used in the gossipsub protocol post Electra hard fork.
BlobsidecarSubnetCount uint64 `yaml:"BLOB_SIDECAR_SUBNET_COUNT" spec:"true"` // BlobsidecarSubnetCount is the number of blobsidecar subnets used in the gossipsub protocol.
BlobsidecarSubnetCountElectra uint64 `yaml:"BLOB_SIDECAR_SUBNET_COUNT_ELECTRA" spec:"true"` // BlobsidecarSubnetCountElectra is the number of blobsidecar subnets used in the gossipsub protocol post Electra hard fork.
// Values introduced in Deneb hard fork
MaxPerEpochActivationChurnLimit uint64 `yaml:"MAX_PER_EPOCH_ACTIVATION_CHURN_LIMIT" spec:"true"` // MaxPerEpochActivationChurnLimit is the maximum amount of churn allotted for validator activation.

View File

@@ -267,7 +267,7 @@ var mainnetBeaconConfig = &BeaconChainConfig{
BytesPerLogsBloom: 256,
MaxExtraDataBytes: 32,
EthBurnAddressHex: "0x0000000000000000000000000000000000000000",
DefaultBuilderGasLimit: uint64(30000000),
DefaultBuilderGasLimit: uint64(36000000),
// Mevboost circuit breaker
MaxBuilderConsecutiveMissedSlots: 3,

View File

@@ -4,7 +4,7 @@
"fee_recipient": "0x50155530FCE8a85ec7055A5F8b2bE214B3DaeFd3",
"builder": {
"enabled": true,
"gas_limit": "30000000"
"gas_limit": "36000000"
}
},
"0xb057816155ad77931185101128655c0191bd0214c201ca48ed887f6c4c6adf334070efcd75140eada5ac83a92506dd7b": {

View File

@@ -9,4 +9,4 @@ default_config:
fee_recipient: '0x6e35733c5af9B61374A128e6F85f553aF09ff89A'
builder:
enabled: false
gas_limit: '30000000'
gas_limit: '36000000'

View File

@@ -1987,8 +1987,9 @@ def prysm_deps():
name = "com_github_libp2p_go_libp2p_pubsub",
build_file_proto_mode = "disable_global",
importpath = "github.com/libp2p/go-libp2p-pubsub",
sum = "h1:RmFQ2XAy3zQtbt2iNPy7Tt0/3fwTnHpCQSSnmGnt1Ps=",
version = "v0.13.0",
replace = "github.com/nisdas/go-libp2p-pubsub",
sum = "h1:s0BSHd/oXPBk16u84LYrXmmRXzoK2GqXqr4XD7dzHSU=",
version = "v0.3.3-0.20250312092335-b8aab45386f7",
)
go_repository(
name = "com_github_libp2p_go_libp2p_testing",

View File

@@ -22,12 +22,15 @@ func IsHex(b []byte) bool {
// DecodeHexWithLength takes a string and a length in bytes,
// and validates whether the string is a hex and has the correct length.
func DecodeHexWithLength(s string, length int) ([]byte, error) {
if len(s) > 2*length+2 {
return nil, fmt.Errorf("%s is greather than length %d bytes", s, length)
}
bytes, err := hexutil.Decode(s)
if err != nil {
return nil, errors.Wrap(err, fmt.Sprintf("%s is not a valid hex", s))
}
if len(bytes) != length {
return nil, fmt.Errorf("%s is not length %d bytes", s, length)
return nil, fmt.Errorf("length of %s is not %d bytes", s, length)
}
return bytes, nil
}

2
go.mod
View File

@@ -290,3 +290,5 @@ require (
)
replace github.com/json-iterator/go => github.com/prestonvanloon/go v1.1.7-0.20190722034630-4f2e55fcf87b
replace github.com/libp2p/go-libp2p-pubsub => github.com/nisdas/go-libp2p-pubsub v0.3.3-0.20250312092335-b8aab45386f7

View File

@@ -146,6 +146,7 @@ func (m *Miner) initAttempt(ctx context.Context, attempt int) (*os.File, error)
fmt.Sprintf("--unlock=%s", EthAddress),
"--allow-insecure-unlock",
"--syncmode=full",
fmt.Sprintf("--miner.gaslimit=%d", params.BeaconConfig().DefaultBuilderGasLimit),
fmt.Sprintf("--txpool.locals=%s", EthAddress),
fmt.Sprintf("--password=%s", pwFile),
}

View File

@@ -110,6 +110,7 @@ func (node *Node) Start(ctx context.Context) error {
"--ipcdisable",
"--verbosity=4",
"--syncmode=full",
fmt.Sprintf("--miner.gaslimit=%d", params.BeaconConfig().DefaultBuilderGasLimit),
fmt.Sprintf("--txpool.locals=%s", EthAddress),
}

View File

@@ -399,7 +399,7 @@ func (p *Builder) handleHeaderRequest(w http.ResponseWriter, req *http.Request)
Message: bid,
},
}
w.WriteHeader(http.StatusOK)
err = json.NewEncoder(w).Encode(hdrResp)
if err != nil {
p.cfg.logger.WithError(err).Error("Could not encode response")
@@ -408,7 +408,6 @@ func (p *Builder) handleHeaderRequest(w http.ResponseWriter, req *http.Request)
}
p.currVersion = version.Bellatrix
p.currPayload = wObj
w.WriteHeader(http.StatusOK)
}
func (p *Builder) handleHeaderRequestCapella(w http.ResponseWriter) {
@@ -477,7 +476,7 @@ func (p *Builder) handleHeaderRequestCapella(w http.ResponseWriter) {
Message: bid,
},
}
w.WriteHeader(http.StatusOK)
err = json.NewEncoder(w).Encode(hdrResp)
if err != nil {
p.cfg.logger.WithError(err).Error("Could not encode response")
@@ -486,7 +485,6 @@ func (p *Builder) handleHeaderRequestCapella(w http.ResponseWriter) {
}
p.currVersion = version.Capella
p.currPayload = wObj
w.WriteHeader(http.StatusOK)
}
func (p *Builder) handleHeaderRequestDeneb(w http.ResponseWriter) {
@@ -563,7 +561,7 @@ func (p *Builder) handleHeaderRequestDeneb(w http.ResponseWriter) {
Message: bid,
},
}
w.WriteHeader(http.StatusOK)
err = json.NewEncoder(w).Encode(hdrResp)
if err != nil {
p.cfg.logger.WithError(err).Error("Could not encode response")
@@ -573,7 +571,6 @@ func (p *Builder) handleHeaderRequestDeneb(w http.ResponseWriter) {
p.currVersion = version.Deneb
p.currPayload = wObj
p.blobBundle = b.BlobsBundle
w.WriteHeader(http.StatusOK)
}
func (p *Builder) handleHeaderRequestElectra(w http.ResponseWriter) {
@@ -697,7 +694,7 @@ func (p *Builder) handleHeaderRequestElectra(w http.ResponseWriter) {
Message: bid,
},
}
w.WriteHeader(http.StatusOK)
err = json.NewEncoder(w).Encode(hdrResp)
if err != nil {
p.cfg.logger.WithError(err).Error("Could not encode response")
@@ -707,7 +704,6 @@ func (p *Builder) handleHeaderRequestElectra(w http.ResponseWriter) {
p.currVersion = version.Electra
p.currPayload = wObj
p.blobBundle = b.BlobsBundle
w.WriteHeader(http.StatusOK)
}
func (p *Builder) handleBlindedBlock(w http.ResponseWriter, req *http.Request) {
@@ -732,13 +728,13 @@ func (p *Builder) handleBlindedBlock(w http.ResponseWriter, req *http.Request) {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
err = json.NewEncoder(w).Encode(resp)
if err != nil {
p.cfg.logger.WithError(err).Error("Could not encode full payload response")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
}
var errInvalidTypeConversion = errors.New("unable to translate between api and foreign type")