Compare commits

...

7 Commits

Author SHA1 Message Date
terence tsao
f16e4600a8 Add performance profiling to capture GetDutiesV2 operations taking over 2s 2025-08-03 07:37:29 -07:00
Bastin
ae4b982a6c Fix finality update bugs & Move broadcast logic to LC Store (#15540)
* fix IsBetterFinalityUpdate and add tests

fix finality update bugs

* Update lightclient.go

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* Update beacon-chain/core/light-client/lightclient.go

---------

Co-authored-by: Radosław Kapka <rkapka@wp.pl>
2025-08-01 12:35:21 +00:00
Radosław Kapka
f330021785 Do not compare liveness response with LH (#15556)
* Do not compare liveness response with LH

* changelog <3
2025-07-31 14:37:36 +00:00
james-prysm
bd6b4ecd5b wrapping goodbye messages in goroutine to speed up node shutdown (#15542)
* wrapping goodbye messages in goroutine to speed up node shutdown

* fixing requirement
2025-07-31 12:52:31 +00:00
Potuz
d7d8764a91 Trigger payload attribute event on early blocks (#15541)
Currently the payload attribute events is triggered on
`forkchoiceUpodateWithExecution`. However when we import an early block,
we do not call this function, we make two calls to FCU, the first one is
on a locked path at the end of `postBlockProcess` and this call is made
without any payload attributes to avoid updating the shuffling caches.

The second call is made on `handleSecondFCUCall` which calls directly
`notifyForkchoiceUpdate` bypassing the call to
`forkchoiceUpdateWithExecution`, but this call is the one that actually
computes the payload attributes. So the event handler is never called
with the new attributes.

This PR moves the event trigger to the same place where we actually call
FCU with the computed payload attributes.

Some considerations with forkchoice locking logic: since the calls are
always in a go routine, anyway the routine will wait to forkchoice to be
unlocked to proceed.

Co-authored-by: james-prysm <90280386+james-prysm@users.noreply.github.com>
2025-07-30 19:34:34 +00:00
Muzry
9b7f91d947 bugfix: submitPoolSyncCommitteeSignatures response inconsistent (#15516)
* fix: submitPoolSyncCommitteeSignatures reponse inconsistent

* update: bazel build file

* update: add changelog fragment file

* update api/server/structs/BUILD.bazel format

* update the unit test

* update: the error format

---------

Co-authored-by: james-prysm <90280386+james-prysm@users.noreply.github.com>
2025-07-29 16:08:28 +00:00
terence
57e27199bd Fix builder bid version compatibility to support Electra bids with Fulu blocks (#15536) 2025-07-29 14:16:05 +00:00
41 changed files with 992 additions and 139 deletions

View File

@@ -36,6 +36,7 @@ go_library(
"//consensus-types/primitives:go_default_library",
"//consensus-types/validator:go_default_library",
"//container/slice:go_default_library",
"//crypto/bls:go_default_library",
"//encoding/bytesutil:go_default_library",
"//math:go_default_library",
"//proto/engine/v1:go_default_library",

View File

@@ -10,6 +10,7 @@ import (
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v6/consensus-types/validator"
"github.com/OffchainLabs/prysm/v6/container/slice"
"github.com/OffchainLabs/prysm/v6/crypto/bls"
"github.com/OffchainLabs/prysm/v6/encoding/bytesutil"
"github.com/OffchainLabs/prysm/v6/math"
enginev1 "github.com/OffchainLabs/prysm/v6/proto/engine/v1"
@@ -699,6 +700,11 @@ func (m *SyncCommitteeMessage) ToConsensus() (*eth.SyncCommitteeMessage, error)
if err != nil {
return nil, server.NewDecodeError(err, "Signature")
}
// Add validation to check if the signature is valid BLS format
_, err = bls.SignatureFromBytes(sig)
if err != nil {
return nil, server.NewDecodeError(err, "Signature")
}
return &eth.SyncCommitteeMessage{
Slot: primitives.Slot(slot),

View File

@@ -174,6 +174,7 @@ func (s *Service) notifyForkchoiceUpdate(ctx context.Context, arg *fcuConfig) (*
"payloadID": fmt.Sprintf("%#x", bytesutil.Trunc(payloadID[:])),
}).Info("Forkchoice updated with payload attributes for proposal")
s.cfg.PayloadIDCache.Set(nextSlot, arg.headRoot, pId)
go s.firePayloadAttributesEvent(s.cfg.StateNotifier.StateFeed(), arg.headBlock, arg.headRoot, nextSlot)
} else if hasAttr && payloadID == nil && !features.Get().PrepareAllPayloads {
log.WithFields(logrus.Fields{
"blockHash": fmt.Sprintf("%#x", headPayload.BlockHash()),

View File

@@ -102,8 +102,6 @@ func (s *Service) forkchoiceUpdateWithExecution(ctx context.Context, args *fcuCo
log.WithError(err).Error("Could not save head")
}
go s.firePayloadAttributesEvent(s.cfg.StateNotifier.StateFeed(), args.headBlock, args.headRoot, s.CurrentSlot()+1)
// Only need to prune attestations from pool if the head has changed.
s.pruneAttsFromPool(s.ctx, args.headState, args.headBlock)
return nil

View File

@@ -36,7 +36,7 @@ func WithMaxGoroutines(x int) Option {
// WithLCStore for light client store access.
func WithLCStore() Option {
return func(s *Service) error {
s.lcStore = lightclient.NewLightClientStore(s.cfg.BeaconDB)
s.lcStore = lightclient.NewLightClientStore(s.cfg.BeaconDB, s.cfg.P2P, s.cfg.StateNotifier.StateFeed())
return nil
}
}

View File

@@ -1,7 +1,6 @@
package blockchain
import (
"bytes"
"context"
"fmt"
"strings"
@@ -198,8 +197,7 @@ func (s *Service) processLightClientFinalityUpdate(
finalizedCheckpoint := attestedState.FinalizedCheckpoint()
// Check if the finalized checkpoint has changed
if finalizedCheckpoint == nil || bytes.Equal(finalizedCheckpoint.GetRoot(), postState.FinalizedCheckpoint().Root) {
if finalizedCheckpoint == nil {
return nil
}
@@ -224,17 +222,7 @@ func (s *Service) processLightClientFinalityUpdate(
return nil
}
log.Debug("Saving new light client finality update")
s.lcStore.SetLastFinalityUpdate(newUpdate)
s.cfg.StateNotifier.StateFeed().Send(&feed.Event{
Type: statefeed.LightClientFinalityUpdate,
Data: newUpdate,
})
if err = s.cfg.P2P.BroadcastLightClientFinalityUpdate(ctx, newUpdate); err != nil {
return errors.Wrap(err, "could not broadcast light client finality update")
}
s.lcStore.SetLastFinalityUpdate(newUpdate, true)
return nil
}
@@ -266,17 +254,7 @@ func (s *Service) processLightClientOptimisticUpdate(ctx context.Context, signed
return nil
}
log.Debug("Saving new light client optimistic update")
s.lcStore.SetLastOptimisticUpdate(newUpdate)
s.cfg.StateNotifier.StateFeed().Send(&feed.Event{
Type: statefeed.LightClientOptimisticUpdate,
Data: newUpdate,
})
if err = s.cfg.P2P.BroadcastLightClientOptimisticUpdate(ctx, newUpdate); err != nil {
return errors.Wrap(err, "could not broadcast light client optimistic update")
}
s.lcStore.SetLastOptimisticUpdate(newUpdate, true)
return nil
}

View File

@@ -3170,7 +3170,7 @@ func TestProcessLightClientOptimisticUpdate(t *testing.T) {
t.Run(version.String(testVersion)+"_"+tc.name, func(t *testing.T) {
s.genesisTime = time.Unix(time.Now().Unix()-(int64(forkEpoch)*int64(params.BeaconConfig().SlotsPerEpoch)*int64(params.BeaconConfig().SecondsPerSlot)), 0)
s.lcStore = &lightClient.Store{}
s.lcStore = lightClient.NewLightClientStore(s.cfg.BeaconDB, s.cfg.P2P, s.cfg.StateNotifier.StateFeed())
var oldActualUpdate interfaces.LightClientOptimisticUpdate
var err error
@@ -3246,39 +3246,39 @@ func TestProcessLightClientFinalityUpdate(t *testing.T) {
expectReplace: true,
},
{
name: "Old update is better - age - no supermajority",
name: "Old update is better - finalized slot is higher",
oldOptions: []util.LightClientOption{util.WithIncreasedFinalizedSlot(1)},
newOptions: []util.LightClientOption{},
expectReplace: false,
},
{
name: "Old update is better - age - both supermajority",
oldOptions: []util.LightClientOption{util.WithIncreasedFinalizedSlot(1), util.WithSupermajority()},
newOptions: []util.LightClientOption{util.WithSupermajority()},
expectReplace: false,
},
{
name: "Old update is better - supermajority",
oldOptions: []util.LightClientOption{util.WithSupermajority()},
name: "Old update is better - attested slot is higher",
oldOptions: []util.LightClientOption{util.WithIncreasedAttestedSlot(1)},
newOptions: []util.LightClientOption{},
expectReplace: false,
},
{
name: "New update is better - age - both supermajority",
oldOptions: []util.LightClientOption{util.WithSupermajority()},
newOptions: []util.LightClientOption{util.WithIncreasedFinalizedSlot(1), util.WithSupermajority()},
name: "Old update is better - signature slot is higher",
oldOptions: []util.LightClientOption{util.WithIncreasedSignatureSlot(1)},
newOptions: []util.LightClientOption{},
expectReplace: false,
},
{
name: "New update is better - finalized slot is higher",
oldOptions: []util.LightClientOption{},
newOptions: []util.LightClientOption{util.WithIncreasedAttestedSlot(1)},
expectReplace: true,
},
{
name: "New update is better - age - no supermajority",
name: "New update is better - attested slot is higher",
oldOptions: []util.LightClientOption{},
newOptions: []util.LightClientOption{util.WithIncreasedFinalizedSlot(1)},
newOptions: []util.LightClientOption{util.WithIncreasedAttestedSlot(1)},
expectReplace: true,
},
{
name: "New update is better - supermajority",
name: "New update is better - signature slot is higher",
oldOptions: []util.LightClientOption{},
newOptions: []util.LightClientOption{util.WithSupermajority()},
newOptions: []util.LightClientOption{util.WithIncreasedSignatureSlot(1)},
expectReplace: true,
},
}
@@ -3310,7 +3310,7 @@ func TestProcessLightClientFinalityUpdate(t *testing.T) {
t.Run(version.String(testVersion)+"_"+tc.name, func(t *testing.T) {
s.genesisTime = time.Unix(time.Now().Unix()-(int64(forkEpoch)*int64(params.BeaconConfig().SlotsPerEpoch)*int64(params.BeaconConfig().SecondsPerSlot)), 0)
s.lcStore = &lightClient.Store{}
s.lcStore = lightClient.NewLightClientStore(s.cfg.BeaconDB, s.cfg.P2P, s.cfg.StateNotifier.StateFeed())
var actualOldUpdate, actualNewUpdate interfaces.LightClientFinalityUpdate
var err error

View File

@@ -10,8 +10,12 @@ go_library(
importpath = "github.com/OffchainLabs/prysm/v6/beacon-chain/core/light-client",
visibility = ["//visibility:public"],
deps = [
"//async/event:go_default_library",
"//beacon-chain/core/feed:go_default_library",
"//beacon-chain/core/feed/state:go_default_library",
"//beacon-chain/db/iface:go_default_library",
"//beacon-chain/execution:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//beacon-chain/state:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
@@ -39,6 +43,9 @@ go_test(
],
deps = [
":go_default_library",
"//async/event:go_default_library",
"//beacon-chain/db/testing:go_default_library",
"//beacon-chain/p2p/testing:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types:go_default_library",

View File

@@ -750,7 +750,9 @@ func UpdateHasSupermajority(syncAggregate *pb.SyncAggregate) bool {
return numActiveParticipants*3 >= maxActiveParticipants*2
}
func IsBetterFinalityUpdate(newUpdate, oldUpdate interfaces.LightClientFinalityUpdate) bool {
// IsFinalityUpdateValidForBroadcast checks if a finality update needs to be broadcasted.
// It is also used to check if an incoming gossiped finality update is valid for forwarding and saving.
func IsFinalityUpdateValidForBroadcast(newUpdate, oldUpdate interfaces.LightClientFinalityUpdate) bool {
if oldUpdate == nil {
return true
}
@@ -772,6 +774,35 @@ func IsBetterFinalityUpdate(newUpdate, oldUpdate interfaces.LightClientFinalityU
return true
}
// IsBetterFinalityUpdate checks if the new finality update is better than the old one for saving.
// This does not concern broadcasting, but rather the decision of whether to save the new update.
// For broadcasting checks, use IsFinalityUpdateValidForBroadcast.
func IsBetterFinalityUpdate(newUpdate, oldUpdate interfaces.LightClientFinalityUpdate) bool {
if oldUpdate == nil {
return true
}
// Full nodes SHOULD provide the LightClientFinalityUpdate with the highest attested_header.beacon.slot (if multiple, highest signature_slot)
newFinalizedSlot := newUpdate.FinalizedHeader().Beacon().Slot
newAttestedSlot := newUpdate.AttestedHeader().Beacon().Slot
oldFinalizedSlot := oldUpdate.FinalizedHeader().Beacon().Slot
oldAttestedSlot := oldUpdate.AttestedHeader().Beacon().Slot
if newFinalizedSlot < oldFinalizedSlot {
return false
}
if newFinalizedSlot == oldFinalizedSlot {
if newAttestedSlot < oldAttestedSlot {
return false
}
if newAttestedSlot == oldAttestedSlot && newUpdate.SignatureSlot() <= oldUpdate.SignatureSlot() {
return false
}
}
return true
}
func IsBetterOptimisticUpdate(newUpdate, oldUpdate interfaces.LightClientOptimisticUpdate) bool {
if oldUpdate == nil {
return true

View File

@@ -4,7 +4,11 @@ import (
"context"
"sync"
"github.com/OffchainLabs/prysm/v6/async/event"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed"
statefeed "github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed/state"
"github.com/OffchainLabs/prysm/v6/beacon-chain/db/iface"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p"
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
@@ -16,13 +20,17 @@ type Store struct {
mu sync.RWMutex
beaconDB iface.HeadAccessDatabase
lastFinalityUpdate interfaces.LightClientFinalityUpdate
lastOptimisticUpdate interfaces.LightClientOptimisticUpdate
lastFinalityUpdate interfaces.LightClientFinalityUpdate // tracks the best finality update seen so far
lastOptimisticUpdate interfaces.LightClientOptimisticUpdate // tracks the best optimistic update seen so far
p2p p2p.Accessor
stateFeed event.SubscriberSender
}
func NewLightClientStore(db iface.HeadAccessDatabase) *Store {
func NewLightClientStore(db iface.HeadAccessDatabase, p p2p.Accessor, e event.SubscriberSender) *Store {
return &Store{
beaconDB: db,
beaconDB: db,
p2p: p,
stateFeed: e,
}
}
@@ -143,10 +151,23 @@ func (s *Store) SaveLightClientUpdate(ctx context.Context, period uint64, update
return nil
}
func (s *Store) SetLastFinalityUpdate(update interfaces.LightClientFinalityUpdate) {
func (s *Store) SetLastFinalityUpdate(update interfaces.LightClientFinalityUpdate, broadcast bool) {
s.mu.Lock()
defer s.mu.Unlock()
if broadcast && IsFinalityUpdateValidForBroadcast(update, s.lastFinalityUpdate) {
if err := s.p2p.BroadcastLightClientFinalityUpdate(context.Background(), update); err != nil {
log.WithError(err).Error("Could not broadcast light client finality update")
}
}
s.lastFinalityUpdate = update
log.Debug("Saved new light client finality update")
s.stateFeed.Send(&feed.Event{
Type: statefeed.LightClientFinalityUpdate,
Data: update,
})
}
func (s *Store) LastFinalityUpdate() interfaces.LightClientFinalityUpdate {
@@ -155,10 +176,23 @@ func (s *Store) LastFinalityUpdate() interfaces.LightClientFinalityUpdate {
return s.lastFinalityUpdate
}
func (s *Store) SetLastOptimisticUpdate(update interfaces.LightClientOptimisticUpdate) {
func (s *Store) SetLastOptimisticUpdate(update interfaces.LightClientOptimisticUpdate, broadcast bool) {
s.mu.Lock()
defer s.mu.Unlock()
if broadcast {
if err := s.p2p.BroadcastLightClientOptimisticUpdate(context.Background(), update); err != nil {
log.WithError(err).Error("Could not broadcast light client optimistic update")
}
}
s.lastOptimisticUpdate = update
log.Debug("Saved new light client optimistic update")
s.stateFeed.Send(&feed.Event{
Type: statefeed.LightClientOptimisticUpdate,
Data: update,
})
}
func (s *Store) LastOptimisticUpdate() interfaces.LightClientOptimisticUpdate {

View File

@@ -3,7 +3,10 @@ package light_client_test
import (
"testing"
"github.com/OffchainLabs/prysm/v6/async/event"
lightClient "github.com/OffchainLabs/prysm/v6/beacon-chain/core/light-client"
testDB "github.com/OffchainLabs/prysm/v6/beacon-chain/db/testing"
p2pTesting "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/testing"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/runtime/version"
"github.com/OffchainLabs/prysm/v6/testing/require"
@@ -21,7 +24,7 @@ func TestLightClientStore(t *testing.T) {
params.OverrideBeaconConfig(cfg)
// Initialize the light client store
lcStore := &lightClient.Store{}
lcStore := lightClient.NewLightClientStore(testDB.SetupDB(t), &p2pTesting.FakeP2P{}, new(event.Feed))
// Create test light client updates for Capella and Deneb
lCapella := util.NewTestLightClient(t, version.Capella)
@@ -45,24 +48,118 @@ func TestLightClientStore(t *testing.T) {
require.IsNil(t, lcStore.LastOptimisticUpdate(), "lastOptimisticUpdate should be nil")
// Set and get finality with Capella update. Optimistic update should be nil
lcStore.SetLastFinalityUpdate(finUpdateCapella)
lcStore.SetLastFinalityUpdate(finUpdateCapella, false)
require.Equal(t, finUpdateCapella, lcStore.LastFinalityUpdate(), "lastFinalityUpdate is wrong")
require.IsNil(t, lcStore.LastOptimisticUpdate(), "lastOptimisticUpdate should be nil")
// Set and get optimistic with Capella update. Finality update should be Capella
lcStore.SetLastOptimisticUpdate(opUpdateCapella)
lcStore.SetLastOptimisticUpdate(opUpdateCapella, false)
require.Equal(t, opUpdateCapella, lcStore.LastOptimisticUpdate(), "lastOptimisticUpdate is wrong")
require.Equal(t, finUpdateCapella, lcStore.LastFinalityUpdate(), "lastFinalityUpdate is wrong")
// Set and get finality and optimistic with Deneb update
lcStore.SetLastFinalityUpdate(finUpdateDeneb)
lcStore.SetLastOptimisticUpdate(opUpdateDeneb)
lcStore.SetLastFinalityUpdate(finUpdateDeneb, false)
lcStore.SetLastOptimisticUpdate(opUpdateDeneb, false)
require.Equal(t, finUpdateDeneb, lcStore.LastFinalityUpdate(), "lastFinalityUpdate is wrong")
require.Equal(t, opUpdateDeneb, lcStore.LastOptimisticUpdate(), "lastOptimisticUpdate is wrong")
// Set and get finality and optimistic with nil update
lcStore.SetLastFinalityUpdate(nil)
lcStore.SetLastOptimisticUpdate(nil)
require.IsNil(t, lcStore.LastFinalityUpdate(), "lastFinalityUpdate should be nil")
require.IsNil(t, lcStore.LastOptimisticUpdate(), "lastOptimisticUpdate should be nil")
}
func TestLightClientStore_SetLastFinalityUpdate(t *testing.T) {
p2p := p2pTesting.NewTestP2P(t)
lcStore := lightClient.NewLightClientStore(testDB.SetupDB(t), p2p, new(event.Feed))
// update 0 with basic data and no supermajority following an empty lastFinalityUpdate - should save and broadcast
l0 := util.NewTestLightClient(t, version.Altair)
update0, err := lightClient.NewLightClientFinalityUpdateFromBeaconState(l0.Ctx, l0.State, l0.Block, l0.AttestedState, l0.AttestedBlock, l0.FinalizedBlock)
require.NoError(t, err, "Failed to create light client finality update")
require.Equal(t, true, lightClient.IsBetterFinalityUpdate(update0, lcStore.LastFinalityUpdate()), "update0 should be better than nil")
// update0 should be valid for broadcast - meaning it should be broadcasted
require.Equal(t, true, lightClient.IsFinalityUpdateValidForBroadcast(update0, lcStore.LastFinalityUpdate()), "update0 should be valid for broadcast")
lcStore.SetLastFinalityUpdate(update0, true)
require.Equal(t, update0, lcStore.LastFinalityUpdate(), "lastFinalityUpdate should match the set value")
require.Equal(t, true, p2p.BroadcastCalled.Load(), "Broadcast should have been called after setting a new last finality update when previous is nil")
p2p.BroadcastCalled.Store(false) // Reset for next test
// update 1 with same finality slot, increased attested slot, and no supermajority - should save but not broadcast
l1 := util.NewTestLightClient(t, version.Altair, util.WithIncreasedAttestedSlot(1))
update1, err := lightClient.NewLightClientFinalityUpdateFromBeaconState(l1.Ctx, l1.State, l1.Block, l1.AttestedState, l1.AttestedBlock, l1.FinalizedBlock)
require.NoError(t, err, "Failed to create light client finality update")
require.Equal(t, true, lightClient.IsBetterFinalityUpdate(update1, update0), "update1 should be better than update0")
// update1 should not be valid for broadcast - meaning it should not be broadcasted
require.Equal(t, false, lightClient.IsFinalityUpdateValidForBroadcast(update1, lcStore.LastFinalityUpdate()), "update1 should not be valid for broadcast")
lcStore.SetLastFinalityUpdate(update1, true)
require.Equal(t, update1, lcStore.LastFinalityUpdate(), "lastFinalityUpdate should match the set value")
require.Equal(t, false, p2p.BroadcastCalled.Load(), "Broadcast should not have been called after setting a new last finality update without supermajority")
p2p.BroadcastCalled.Store(false) // Reset for next test
// update 2 with same finality slot, increased attested slot, and supermajority - should save and broadcast
l2 := util.NewTestLightClient(t, version.Altair, util.WithIncreasedAttestedSlot(2), util.WithSupermajority())
update2, err := lightClient.NewLightClientFinalityUpdateFromBeaconState(l2.Ctx, l2.State, l2.Block, l2.AttestedState, l2.AttestedBlock, l2.FinalizedBlock)
require.NoError(t, err, "Failed to create light client finality update")
require.Equal(t, true, lightClient.IsBetterFinalityUpdate(update2, update1), "update2 should be better than update1")
// update2 should be valid for broadcast - meaning it should be broadcasted
require.Equal(t, true, lightClient.IsFinalityUpdateValidForBroadcast(update2, lcStore.LastFinalityUpdate()), "update2 should be valid for broadcast")
lcStore.SetLastFinalityUpdate(update2, true)
require.Equal(t, update2, lcStore.LastFinalityUpdate(), "lastFinalityUpdate should match the set value")
require.Equal(t, true, p2p.BroadcastCalled.Load(), "Broadcast should have been called after setting a new last finality update with supermajority")
p2p.BroadcastCalled.Store(false) // Reset for next test
// update 3 with same finality slot, increased attested slot, and supermajority - should save but not broadcast
l3 := util.NewTestLightClient(t, version.Altair, util.WithIncreasedAttestedSlot(3), util.WithSupermajority())
update3, err := lightClient.NewLightClientFinalityUpdateFromBeaconState(l3.Ctx, l3.State, l3.Block, l3.AttestedState, l3.AttestedBlock, l3.FinalizedBlock)
require.NoError(t, err, "Failed to create light client finality update")
require.Equal(t, true, lightClient.IsBetterFinalityUpdate(update3, update2), "update3 should be better than update2")
// update3 should not be valid for broadcast - meaning it should not be broadcasted
require.Equal(t, false, lightClient.IsFinalityUpdateValidForBroadcast(update3, lcStore.LastFinalityUpdate()), "update3 should not be valid for broadcast")
lcStore.SetLastFinalityUpdate(update3, true)
require.Equal(t, update3, lcStore.LastFinalityUpdate(), "lastFinalityUpdate should match the set value")
require.Equal(t, false, p2p.BroadcastCalled.Load(), "Broadcast should not have been when previous was already broadcast")
// update 4 with increased finality slot, increased attested slot, and supermajority - should save and broadcast
l4 := util.NewTestLightClient(t, version.Altair, util.WithIncreasedFinalizedSlot(1), util.WithIncreasedAttestedSlot(1), util.WithSupermajority())
update4, err := lightClient.NewLightClientFinalityUpdateFromBeaconState(l4.Ctx, l4.State, l4.Block, l4.AttestedState, l4.AttestedBlock, l4.FinalizedBlock)
require.NoError(t, err, "Failed to create light client finality update")
require.Equal(t, true, lightClient.IsBetterFinalityUpdate(update4, update3), "update4 should be better than update3")
// update4 should be valid for broadcast - meaning it should be broadcasted
require.Equal(t, true, lightClient.IsFinalityUpdateValidForBroadcast(update4, lcStore.LastFinalityUpdate()), "update4 should be valid for broadcast")
lcStore.SetLastFinalityUpdate(update4, true)
require.Equal(t, update4, lcStore.LastFinalityUpdate(), "lastFinalityUpdate should match the set value")
require.Equal(t, true, p2p.BroadcastCalled.Load(), "Broadcast should have been called after a new finality update with increased finality slot")
p2p.BroadcastCalled.Store(false) // Reset for next test
// update 5 with the same new finality slot, increased attested slot, and supermajority - should save but not broadcast
l5 := util.NewTestLightClient(t, version.Altair, util.WithIncreasedFinalizedSlot(1), util.WithIncreasedAttestedSlot(2), util.WithSupermajority())
update5, err := lightClient.NewLightClientFinalityUpdateFromBeaconState(l5.Ctx, l5.State, l5.Block, l5.AttestedState, l5.AttestedBlock, l5.FinalizedBlock)
require.NoError(t, err, "Failed to create light client finality update")
require.Equal(t, true, lightClient.IsBetterFinalityUpdate(update5, update4), "update5 should be better than update4")
// update5 should not be valid for broadcast - meaning it should not be broadcasted
require.Equal(t, false, lightClient.IsFinalityUpdateValidForBroadcast(update5, lcStore.LastFinalityUpdate()), "update5 should not be valid for broadcast")
lcStore.SetLastFinalityUpdate(update5, true)
require.Equal(t, update5, lcStore.LastFinalityUpdate(), "lastFinalityUpdate should match the set value")
require.Equal(t, false, p2p.BroadcastCalled.Load(), "Broadcast should not have been called when previous was already broadcast with supermajority")
// update 6 with the same new finality slot, increased attested slot, and no supermajority - should save but not broadcast
l6 := util.NewTestLightClient(t, version.Altair, util.WithIncreasedFinalizedSlot(1), util.WithIncreasedAttestedSlot(3))
update6, err := lightClient.NewLightClientFinalityUpdateFromBeaconState(l6.Ctx, l6.State, l6.Block, l6.AttestedState, l6.AttestedBlock, l6.FinalizedBlock)
require.NoError(t, err, "Failed to create light client finality update")
require.Equal(t, true, lightClient.IsBetterFinalityUpdate(update6, update5), "update6 should be better than update5")
// update6 should not be valid for broadcast - meaning it should not be broadcasted
require.Equal(t, false, lightClient.IsFinalityUpdateValidForBroadcast(update6, lcStore.LastFinalityUpdate()), "update6 should not be valid for broadcast")
lcStore.SetLastFinalityUpdate(update6, true)
require.Equal(t, update6, lcStore.LastFinalityUpdate(), "lastFinalityUpdate should match the set value")
require.Equal(t, false, p2p.BroadcastCalled.Load(), "Broadcast should not have been called when previous was already broadcast with supermajority")
}

View File

@@ -236,7 +236,7 @@ func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*Beaco
beacon.finalizedStateAtStartUp = nil
if features.Get().EnableLightClient {
beacon.lcStore = lightclient.NewLightClientStore(beacon.db)
beacon.lcStore = lightclient.NewLightClientStore(beacon.db, beacon.fetchP2P(), beacon.StateFeed())
}
return beacon, nil

View File

@@ -147,7 +147,6 @@ go_test(
"//beacon-chain/blockchain/testing:go_default_library",
"//beacon-chain/cache:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/light-client:go_default_library",
"//beacon-chain/core/peerdas:go_default_library",
"//beacon-chain/core/signing:go_default_library",
"//beacon-chain/db/testing:go_default_library",
@@ -174,7 +173,6 @@ go_test(
"//proto/prysm/v1alpha1:go_default_library",
"//proto/prysm/v1alpha1/metadata:go_default_library",
"//proto/testing:go_default_library",
"//runtime/version:go_default_library",
"//testing/assert:go_default_library",
"//testing/require:go_default_library",
"//testing/util:go_default_library",

View File

@@ -11,7 +11,6 @@ import (
"github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/kzg"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/helpers"
lightClient "github.com/OffchainLabs/prysm/v6/beacon-chain/core/light-client"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers/scorers"
@@ -24,7 +23,6 @@ import (
"github.com/OffchainLabs/prysm/v6/network/forks"
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
testpb "github.com/OffchainLabs/prysm/v6/proto/testing"
"github.com/OffchainLabs/prysm/v6/runtime/version"
"github.com/OffchainLabs/prysm/v6/testing/assert"
"github.com/OffchainLabs/prysm/v6/testing/require"
"github.com/OffchainLabs/prysm/v6/testing/util"
@@ -546,8 +544,7 @@ func TestService_BroadcastLightClientOptimisticUpdate(t *testing.T) {
}),
}
l := util.NewTestLightClient(t, version.Altair)
msg, err := lightClient.NewLightClientOptimisticUpdateFromBeaconState(l.Ctx, l.State, l.Block, l.AttestedState, l.AttestedBlock)
msg, err := util.MockOptimisticUpdate()
require.NoError(t, err)
GossipTypeMapping[reflect.TypeOf(msg)] = LightClientOptimisticUpdateTopicFormat
@@ -613,8 +610,7 @@ func TestService_BroadcastLightClientFinalityUpdate(t *testing.T) {
}),
}
l := util.NewTestLightClient(t, version.Altair)
msg, err := lightClient.NewLightClientFinalityUpdateFromBeaconState(l.Ctx, l.State, l.Block, l.AttestedState, l.AttestedBlock, l.FinalizedBlock)
msg, err := util.MockFinalityUpdate()
require.NoError(t, err)
GossipTypeMapping[reflect.TypeOf(msg)] = LightClientFinalityUpdateTopicFormat

View File

@@ -1103,9 +1103,9 @@ func TestSubmitSyncCommitteeSignatures(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 1, len(msgsInPool))
assert.Equal(t, primitives.Slot(1), msgsInPool[0].Slot)
assert.Equal(t, "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", hexutil.Encode(msgsInPool[0].BlockRoot))
assert.Equal(t, "0xbacd20f09da907734434f052bd4c9503aa16bab1960e89ea20610d08d064481c", hexutil.Encode(msgsInPool[0].BlockRoot))
assert.Equal(t, primitives.ValidatorIndex(1), msgsInPool[0].ValidatorIndex)
assert.Equal(t, "0x1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505", hexutil.Encode(msgsInPool[0].Signature))
assert.Equal(t, "0xb591bd4ca7d745b6e027879645d7c014fecb8c58631af070f7607acc0c1c948a5102a33267f0e4ba41a85b254b07df91185274375b2e6436e37e81d2fd46cb3751f5a6c86efb7499c1796c0c17e122a54ac067bb0f5ff41f3241659cceb0c21c", hexutil.Encode(msgsInPool[0].Signature))
assert.Equal(t, true, broadcaster.BroadcastCalled.Load())
})
t.Run("multiple", func(t *testing.T) {
@@ -2497,23 +2497,23 @@ var (
singleSyncCommitteeMsg = `[
{
"slot": "1",
"beacon_block_root": "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2",
"beacon_block_root": "0xbacd20f09da907734434f052bd4c9503aa16bab1960e89ea20610d08d064481c",
"validator_index": "1",
"signature": "0x1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505"
"signature": "0xb591bd4ca7d745b6e027879645d7c014fecb8c58631af070f7607acc0c1c948a5102a33267f0e4ba41a85b254b07df91185274375b2e6436e37e81d2fd46cb3751f5a6c86efb7499c1796c0c17e122a54ac067bb0f5ff41f3241659cceb0c21c"
}
]`
multipleSyncCommitteeMsg = `[
{
"slot": "1",
"beacon_block_root": "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2",
"beacon_block_root": "0xbacd20f09da907734434f052bd4c9503aa16bab1960e89ea20610d08d064481c",
"validator_index": "1",
"signature": "0x1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505"
"signature": "0xb591bd4ca7d745b6e027879645d7c014fecb8c58631af070f7607acc0c1c948a5102a33267f0e4ba41a85b254b07df91185274375b2e6436e37e81d2fd46cb3751f5a6c86efb7499c1796c0c17e122a54ac067bb0f5ff41f3241659cceb0c21c"
},
{
"slot": "2",
"beacon_block_root": "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2",
"beacon_block_root": "0x2757f6fd8590925cd000a86a3e543f98a93eae23781783a33e34504729a8ad0c",
"validator_index": "1",
"signature": "0x1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505"
"signature": "0x99dfe11b6c8b306d2c72eb891926d37922d226ea8e1e7484d6c30fab746494f192b0daa3e40c13f1e335b35238f3362c113455a329b1fab0bc500bc47f643786f49e151d5b5052afb51af57ba5aa34a6051dc90ee4de83a26eb54a895061d89a"
}
]`
// signature is invalid
@@ -2523,6 +2523,18 @@ var (
"beacon_block_root": "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2",
"validator_index": "1",
"signature": "foo"
},
{
"slot": "1121",
"beacon_block_root": "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2",
"validator_index": "1",
"signature": "0x1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505"
},
{
"slot": "1121",
"beacon_block_root": "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2",
"validator_index": "2",
"signature": "0x1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505"
}
]`
// signatures are invalid

View File

@@ -33,9 +33,11 @@ go_test(
embed = [":go_default_library"],
deps = [
"//api/server/structs:go_default_library",
"//async/event:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/light-client:go_default_library",
"//beacon-chain/db/testing:go_default_library",
"//beacon-chain/p2p/testing:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",

View File

@@ -11,9 +11,11 @@ import (
"testing"
"github.com/OffchainLabs/prysm/v6/api/server/structs"
"github.com/OffchainLabs/prysm/v6/async/event"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/helpers"
lightclient "github.com/OffchainLabs/prysm/v6/beacon-chain/core/light-client"
dbtesting "github.com/OffchainLabs/prysm/v6/beacon-chain/db/testing"
p2ptesting "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/testing"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
@@ -53,7 +55,7 @@ func TestLightClientHandler_GetLightClientBootstrap(t *testing.T) {
require.NoError(t, err)
db := dbtesting.SetupDB(t)
lcStore := lightclient.NewLightClientStore(db)
lcStore := lightclient.NewLightClientStore(db, &p2ptesting.FakeP2P{}, new(event.Feed))
err = db.SaveLightClientBootstrap(l.Ctx, blockRoot[:], bootstrap)
require.NoError(t, err)
@@ -97,7 +99,7 @@ func TestLightClientHandler_GetLightClientBootstrap(t *testing.T) {
require.NoError(t, err)
db := dbtesting.SetupDB(t)
lcStore := lightclient.NewLightClientStore(db)
lcStore := lightclient.NewLightClientStore(db, &p2ptesting.FakeP2P{}, new(event.Feed))
err = db.SaveLightClientBootstrap(l.Ctx, blockRoot[:], bootstrap)
require.NoError(t, err)
@@ -141,7 +143,7 @@ func TestLightClientHandler_GetLightClientBootstrap(t *testing.T) {
t.Run("no bootstrap found", func(t *testing.T) {
s := &Server{
LCStore: lightclient.NewLightClientStore(dbtesting.SetupDB(t)),
LCStore: lightclient.NewLightClientStore(dbtesting.SetupDB(t), &p2ptesting.FakeP2P{}, new(event.Feed)),
}
request := httptest.NewRequest("GET", "http://foo.com/", nil)
request.SetPathValue("block_root", hexutil.Encode([]byte{0x00, 0x01, 0x02}))
@@ -184,7 +186,7 @@ func TestLightClientHandler_GetLightClientByRange(t *testing.T) {
}
s := &Server{
LCStore: lightclient.NewLightClientStore(db),
LCStore: lightclient.NewLightClientStore(db, &p2ptesting.FakeP2P{}, new(event.Feed)),
}
updatePeriod := startPeriod
@@ -325,7 +327,7 @@ func TestLightClientHandler_GetLightClientByRange(t *testing.T) {
db := dbtesting.SetupDB(t)
s := &Server{
LCStore: lightclient.NewLightClientStore(db),
LCStore: lightclient.NewLightClientStore(db, &p2ptesting.FakeP2P{}, new(event.Feed)),
}
updates := make([]interfaces.LightClientUpdate, 2)
@@ -445,7 +447,7 @@ func TestLightClientHandler_GetLightClientByRange(t *testing.T) {
db := dbtesting.SetupDB(t)
s := &Server{
LCStore: lightclient.NewLightClientStore(db),
LCStore: lightclient.NewLightClientStore(db, &p2ptesting.FakeP2P{}, new(event.Feed)),
}
updates := make([]interfaces.LightClientUpdate, 3)
@@ -492,7 +494,7 @@ func TestLightClientHandler_GetLightClientByRange(t *testing.T) {
db := dbtesting.SetupDB(t)
s := &Server{
LCStore: lightclient.NewLightClientStore(db),
LCStore: lightclient.NewLightClientStore(db, &p2ptesting.FakeP2P{}, new(event.Feed)),
}
updates := make([]interfaces.LightClientUpdate, 3)
@@ -536,7 +538,7 @@ func TestLightClientHandler_GetLightClientByRange(t *testing.T) {
t.Run("start period before altair", func(t *testing.T) {
db := dbtesting.SetupDB(t)
s := &Server{
LCStore: lightclient.NewLightClientStore(db),
LCStore: lightclient.NewLightClientStore(db, &p2ptesting.FakeP2P{}, new(event.Feed)),
}
startPeriod := 0
url := fmt.Sprintf("http://foo.com/?count=128&start_period=%d", startPeriod)
@@ -559,7 +561,7 @@ func TestLightClientHandler_GetLightClientByRange(t *testing.T) {
t.Run("missing update in the middle", func(t *testing.T) {
db := dbtesting.SetupDB(t)
s := &Server{
LCStore: lightclient.NewLightClientStore(db),
LCStore: lightclient.NewLightClientStore(db, &p2ptesting.FakeP2P{}, new(event.Feed)),
}
updates := make([]interfaces.LightClientUpdate, 3)
@@ -603,7 +605,7 @@ func TestLightClientHandler_GetLightClientByRange(t *testing.T) {
t.Run("missing update at the beginning", func(t *testing.T) {
db := dbtesting.SetupDB(t)
s := &Server{
LCStore: lightclient.NewLightClientStore(db),
LCStore: lightclient.NewLightClientStore(db, &p2ptesting.FakeP2P{}, new(event.Feed)),
}
updates := make([]interfaces.LightClientUpdate, 3)
@@ -663,8 +665,8 @@ func TestLightClientHandler_GetLightClientFinalityUpdate(t *testing.T) {
update, err := lightclient.NewLightClientFinalityUpdateFromBeaconState(ctx, l.State, l.Block, l.AttestedState, l.AttestedBlock, l.FinalizedBlock)
require.NoError(t, err)
s := &Server{LCStore: &lightclient.Store{}}
s.LCStore.SetLastFinalityUpdate(update)
s := &Server{LCStore: lightclient.NewLightClientStore(dbtesting.SetupDB(t), &p2ptesting.FakeP2P{}, new(event.Feed))}
s.LCStore.SetLastFinalityUpdate(update, false)
request := httptest.NewRequest("GET", "http://foo.com", nil)
writer := httptest.NewRecorder()
@@ -688,8 +690,8 @@ func TestLightClientHandler_GetLightClientFinalityUpdate(t *testing.T) {
update, err := lightclient.NewLightClientFinalityUpdateFromBeaconState(ctx, l.State, l.Block, l.AttestedState, l.AttestedBlock, l.FinalizedBlock)
require.NoError(t, err)
s := &Server{LCStore: &lightclient.Store{}}
s.LCStore.SetLastFinalityUpdate(update)
s := &Server{LCStore: lightclient.NewLightClientStore(dbtesting.SetupDB(t), &p2ptesting.FakeP2P{}, new(event.Feed))}
s.LCStore.SetLastFinalityUpdate(update, false)
request := httptest.NewRequest("GET", "http://foo.com", nil)
request.Header.Add("Accept", "application/octet-stream")
@@ -727,7 +729,7 @@ func TestLightClientHandler_GetLightClientOptimisticUpdate(t *testing.T) {
helpers.ClearCache()
t.Run("no update", func(t *testing.T) {
s := &Server{LCStore: &lightclient.Store{}}
s := &Server{LCStore: lightclient.NewLightClientStore(dbtesting.SetupDB(t), &p2ptesting.FakeP2P{}, new(event.Feed))}
request := httptest.NewRequest("GET", "http://foo.com", nil)
writer := httptest.NewRecorder()
@@ -743,8 +745,8 @@ func TestLightClientHandler_GetLightClientOptimisticUpdate(t *testing.T) {
update, err := lightclient.NewLightClientOptimisticUpdateFromBeaconState(ctx, l.State, l.Block, l.AttestedState, l.AttestedBlock)
require.NoError(t, err)
s := &Server{LCStore: &lightclient.Store{}}
s.LCStore.SetLastOptimisticUpdate(update)
s := &Server{LCStore: lightclient.NewLightClientStore(dbtesting.SetupDB(t), &p2ptesting.FakeP2P{}, new(event.Feed))}
s.LCStore.SetLastOptimisticUpdate(update, false)
request := httptest.NewRequest("GET", "http://foo.com", nil)
writer := httptest.NewRecorder()
@@ -767,8 +769,8 @@ func TestLightClientHandler_GetLightClientOptimisticUpdate(t *testing.T) {
update, err := lightclient.NewLightClientOptimisticUpdateFromBeaconState(ctx, l.State, l.Block, l.AttestedState, l.AttestedBlock)
require.NoError(t, err)
s := &Server{LCStore: &lightclient.Store{}}
s.LCStore.SetLastOptimisticUpdate(update)
s := &Server{LCStore: lightclient.NewLightClientStore(dbtesting.SetupDB(t), &p2ptesting.FakeP2P{}, new(event.Feed))}
s.LCStore.SetLastOptimisticUpdate(update, false)
request := httptest.NewRequest("GET", "http://foo.com", nil)
request.Header.Add("Accept", "application/octet-stream")

View File

@@ -81,6 +81,7 @@ go_library(
"//crypto/rand:go_default_library",
"//encoding/bytesutil:go_default_library",
"//encoding/ssz:go_default_library",
"//io/file:go_default_library",
"//math:go_default_library",
"//monitoring/tracing:go_default_library",
"//monitoring/tracing/trace:go_default_library",

View File

@@ -2,17 +2,26 @@ package validator
import (
"context"
"fmt"
"os"
"path/filepath"
"runtime"
"runtime/pprof"
"time"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/helpers"
coreTime "github.com/OffchainLabs/prysm/v6/beacon-chain/core/time"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/transition"
"github.com/OffchainLabs/prysm/v6/beacon-chain/rpc/core"
"github.com/OffchainLabs/prysm/v6/beacon-chain/state"
"github.com/OffchainLabs/prysm/v6/config/features"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v6/encoding/bytesutil"
"github.com/OffchainLabs/prysm/v6/io/file"
"github.com/OffchainLabs/prysm/v6/monitoring/tracing/trace"
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v6/time/slots"
"github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
@@ -25,7 +34,19 @@ func (vs *Server) GetDutiesV2(ctx context.Context, req *ethpb.DutiesRequest) (*e
if vs.SyncChecker.Syncing() {
return nil, status.Error(codes.Unavailable, "Syncing to latest head, not ready to respond")
}
return vs.dutiesv2(ctx, req)
start := time.Now()
// Start background profiling that will capture if this takes too long
var profileCancel func()
if features.Get().SlowDutiesProfile {
profileCancel = vs.startSlowDutiesProfiler(start, len(req.PublicKeys), req.Epoch)
defer profileCancel()
}
resp, err := vs.dutiesv2(ctx, req)
return resp, err
}
// Compute the validator duties from the head state's corresponding epoch
@@ -270,3 +291,138 @@ func populateCommitteeFields(duty *ethpb.DutiesV2Response_Duty, la *helpers.Lite
duty.ValidatorCommitteeIndex = la.ValidatorCommitteeIndex
duty.AttesterSlot = la.AttesterSlot
}
// startSlowDutiesProfiler starts background profiling that triggers after 2s
// Returns a cancel function that should be called when the operation completes
func (vs *Server) startSlowDutiesProfiler(startTime time.Time, numValidators int, epoch primitives.Epoch) func() {
ctx, cancel := context.WithCancel(context.Background())
go func() {
// Wait for 2 seconds
select {
case <-time.After(2 * time.Second):
// Operation is taking too long, start profiling
vs.captureSlowDutiesProfile(startTime, numValidators, epoch, ctx)
case <-ctx.Done():
// Operation completed before 2s, no profiling needed
return
}
}()
return cancel
}
// captureSlowDutiesProfile captures CPU and mutex profiles when GetDutiesV2 is slow
func (vs *Server) captureSlowDutiesProfile(startTime time.Time, numValidators int, epoch primitives.Epoch, ctx context.Context) {
timestamp := time.Now().Format("20060102-150405")
// Get the datadir from the database path and create debug subdirectory
// Cast to Database interface to access DatabasePath method
dbWithPath, ok := vs.BeaconDB.(interface{ DatabasePath() string })
if !ok {
log.Error("Cannot access database path for profiling - database does not implement DatabasePath method")
return
}
dbPath := dbWithPath.DatabasePath()
profileDir := filepath.Join(filepath.Dir(dbPath), "debug")
// Create profile directory if it doesn't exist
if err := file.MkdirAll(profileDir); err != nil {
log.WithError(err).Warn("Failed to create profile directory")
return
}
currentDuration := time.Since(startTime)
log.WithFields(logrus.Fields{
"currentDuration": currentDuration,
"numValidators": numValidators,
"epoch": epoch,
"profileDir": profileDir,
}).Warn("GetDutiesV2 taking longer than 2s, capturing profiles")
// Start CPU profiling immediately
cpuFile, err := os.Create(fmt.Sprintf("%s/cpu-duties-%s.prof", profileDir, timestamp))
if err != nil {
log.WithError(err).Warn("Failed to create CPU profile file")
} else {
if err := pprof.StartCPUProfile(cpuFile); err != nil {
log.WithError(err).Warn("Failed to start CPU profile")
if closeErr := cpuFile.Close(); closeErr != nil {
log.WithError(closeErr).Warn("Failed to close CPU profile file")
}
} else {
// Profile for up to 10 seconds or until context is cancelled
go func() {
defer func() {
pprof.StopCPUProfile()
if closeErr := cpuFile.Close(); closeErr != nil {
log.WithError(closeErr).Warn("Failed to close CPU profile file")
}
log.WithField("file", cpuFile.Name()).Info("CPU profile captured")
}()
select {
case <-time.After(10 * time.Second):
// Stop profiling after 10s max
case <-ctx.Done():
// Stop profiling when operation completes
}
}()
}
}
// Enable mutex profiling
runtime.SetMutexProfileFraction(1)
// Capture snapshot profiles immediately
vs.captureSnapshotProfiles(profileDir, timestamp)
}
// captureSnapshotProfiles captures point-in-time profiles
func (vs *Server) captureSnapshotProfiles(profileDir, timestamp string) {
// Capture mutex profile
mutexFile, err := os.Create(fmt.Sprintf("%s/mutex-duties-%s.prof", profileDir, timestamp))
if err != nil {
log.WithError(err).Warn("Failed to create mutex profile file")
} else {
if err := pprof.Lookup("mutex").WriteTo(mutexFile, 0); err != nil {
log.WithError(err).Warn("Failed to write mutex profile")
} else {
log.WithField("file", mutexFile.Name()).Info("Mutex profile captured")
}
if closeErr := mutexFile.Close(); closeErr != nil {
log.WithError(closeErr).Warn("Failed to close mutex profile file")
}
}
// Capture goroutine profile
goroutineFile, err := os.Create(fmt.Sprintf("%s/goroutine-duties-%s.prof", profileDir, timestamp))
if err != nil {
log.WithError(err).Warn("Failed to create goroutine profile file")
} else {
if err := pprof.Lookup("goroutine").WriteTo(goroutineFile, 0); err != nil {
log.WithError(err).Warn("Failed to write goroutine profile")
} else {
log.WithField("file", goroutineFile.Name()).Info("Goroutine profile captured")
}
if closeErr := goroutineFile.Close(); closeErr != nil {
log.WithError(closeErr).Warn("Failed to close goroutine profile file")
}
}
// Capture heap profile
heapFile, err := os.Create(fmt.Sprintf("%s/heap-duties-%s.prof", profileDir, timestamp))
if err != nil {
log.WithError(err).Warn("Failed to create heap profile file")
} else {
runtime.GC() // Force GC before heap profile
if err := pprof.Lookup("heap").WriteTo(heapFile, 0); err != nil {
log.WithError(err).Warn("Failed to write heap profile")
} else {
log.WithField("file", heapFile.Name()).Info("Heap profile captured")
}
if closeErr := heapFile.Close(); closeErr != nil {
log.WithError(closeErr).Warn("Failed to close heap profile file")
}
}
}

View File

@@ -5,7 +5,6 @@ import (
"context"
"fmt"
"math/big"
"strings"
"time"
"github.com/OffchainLabs/prysm/v6/api/client/builder"
@@ -19,7 +18,6 @@ import (
"github.com/OffchainLabs/prysm/v6/encoding/ssz"
"github.com/OffchainLabs/prysm/v6/monitoring/tracing"
"github.com/OffchainLabs/prysm/v6/monitoring/tracing/trace"
"github.com/OffchainLabs/prysm/v6/network/forks"
enginev1 "github.com/OffchainLabs/prysm/v6/proto/engine/v1"
"github.com/OffchainLabs/prysm/v6/runtime/version"
"github.com/OffchainLabs/prysm/v6/time/slots"
@@ -220,16 +218,10 @@ func (vs *Server) getPayloadHeaderFromBuilder(
if signedBid == nil || signedBid.IsNil() {
return nil, errors.New("builder returned nil bid")
}
fork, err := forks.Fork(slots.ToEpoch(slot))
if err != nil {
return nil, errors.Wrap(err, "unable to get fork information")
}
forkName, ok := params.BeaconConfig().ForkVersionNames[bytesutil.ToBytes4(fork.CurrentVersion)]
if !ok {
return nil, errors.New("unable to find current fork in schedule")
}
if !strings.EqualFold(version.String(signedBid.Version()), forkName) {
return nil, fmt.Errorf("builder bid response version: %d is different from head block version: %d for epoch %d", signedBid.Version(), b.Version(), slots.ToEpoch(slot))
bidVersion := signedBid.Version()
headBlockVersion := b.Version()
if !isVersionCompatible(bidVersion, headBlockVersion) {
return nil, fmt.Errorf("builder bid response version: %d is not compatible with head block version: %d for epoch %d", bidVersion, headBlockVersion, slots.ToEpoch(slot))
}
bid, err := signedBid.Message()
@@ -466,3 +458,19 @@ func expectedGasLimit(parentGasLimit, proposerGasLimit uint64) uint64 {
}
return proposerGasLimit
}
// isVersionCompatible checks if a builder bid version is compatible with the head block version.
func isVersionCompatible(bidVersion, headBlockVersion int) bool {
// Exact version match is always compatible
if bidVersion == headBlockVersion {
return true
}
// Allow Electra bids for Fulu blocks - they have compatible payload formats
if bidVersion == version.Electra && headBlockVersion == version.Fulu {
return true
}
// For all other cases, require exact version match
return false
}

View File

@@ -25,6 +25,7 @@ import (
"github.com/OffchainLabs/prysm/v6/encoding/ssz"
v1 "github.com/OffchainLabs/prysm/v6/proto/engine/v1"
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v6/runtime/version"
"github.com/OffchainLabs/prysm/v6/testing/require"
"github.com/OffchainLabs/prysm/v6/testing/util"
"github.com/OffchainLabs/prysm/v6/time/slots"
@@ -156,7 +157,7 @@ func TestServer_setExecutionData(t *testing.T) {
HasConfigured: true,
Cfg: &builderTest.Config{BeaconDB: beaconDB},
}
wb, err := blocks.NewSignedBeaconBlock(util.NewBeaconBlockBellatrix())
wb, err := blocks.NewSignedBeaconBlock(util.NewBeaconBlockCapella())
require.NoError(t, err)
chain := &blockchainTest.ChainService{ForkChoiceStore: doublylinkedtree.New(), Genesis: time.Now(), Block: wb}
vs.ForkchoiceFetcher = chain
@@ -973,7 +974,7 @@ func TestServer_getPayloadHeader(t *testing.T) {
return wb
}(),
},
err: "is different from head block version",
err: "builder bid response version: 3 is not compatible with head block version: 2 for epoch 1",
},
{
name: "different bid version during hard fork",
@@ -982,7 +983,7 @@ func TestServer_getPayloadHeader(t *testing.T) {
},
fetcher: &blockchainTest.ChainService{
Block: func() interfaces.ReadOnlySignedBeaconBlock {
wb, err := blocks.NewSignedBeaconBlock(util.NewBeaconBlockBellatrix())
wb, err := blocks.NewSignedBeaconBlock(util.NewBeaconBlockCapella())
require.NoError(t, err)
wb.SetSlot(primitives.Slot(fakeCapellaEpoch) * params.BeaconConfig().SlotsPerEpoch)
return wb
@@ -1005,6 +1006,86 @@ func TestServer_getPayloadHeader(t *testing.T) {
},
err: "incorrect header gas limit 30000000 != 31000000",
},
{
name: "electra bid with fulu head block - compatible",
mock: func() *builderTest.MockBuilderService {
// Create Electra bid
requests := &v1.ExecutionRequests{
Deposits: []*v1.DepositRequest{
{
Pubkey: bytesutil.PadTo([]byte{byte('a')}, fieldparams.BLSPubkeyLength),
WithdrawalCredentials: bytesutil.PadTo([]byte{byte('b')}, fieldparams.RootLength),
Amount: params.BeaconConfig().MinActivationBalance,
Signature: bytesutil.PadTo([]byte{byte('c')}, fieldparams.BLSSignatureLength),
Index: 0,
},
},
Withdrawals: []*v1.WithdrawalRequest{
{
SourceAddress: bytesutil.PadTo([]byte{byte('d')}, common.AddressLength),
ValidatorPubkey: bytesutil.PadTo([]byte{byte('e')}, fieldparams.BLSPubkeyLength),
Amount: params.BeaconConfig().MinActivationBalance,
},
},
Consolidations: []*v1.ConsolidationRequest{
{
SourceAddress: bytesutil.PadTo([]byte{byte('f')}, common.AddressLength),
SourcePubkey: bytesutil.PadTo([]byte{byte('g')}, fieldparams.BLSPubkeyLength),
TargetPubkey: bytesutil.PadTo([]byte{byte('h')}, fieldparams.BLSPubkeyLength),
},
},
}
electraBid := &ethpb.BuilderBidElectra{
Header: &v1.ExecutionPayloadHeaderDeneb{
FeeRecipient: make([]byte, fieldparams.FeeRecipientLength),
StateRoot: make([]byte, fieldparams.RootLength),
ReceiptsRoot: make([]byte, fieldparams.RootLength),
LogsBloom: make([]byte, fieldparams.LogsBloomLength),
PrevRandao: make([]byte, fieldparams.RootLength),
BaseFeePerGas: make([]byte, fieldparams.RootLength),
BlockHash: make([]byte, fieldparams.RootLength),
TransactionsRoot: bytesutil.PadTo([]byte{1}, fieldparams.RootLength),
ParentHash: params.BeaconConfig().ZeroHash[:],
Timestamp: uint64(ti.Unix()),
BlockNumber: 2,
WithdrawalsRoot: make([]byte, fieldparams.RootLength),
BlobGasUsed: 123,
ExcessBlobGas: 456,
GasLimit: gasLimit,
},
Pubkey: sk.PublicKey().Marshal(),
Value: bytesutil.PadTo([]byte{1, 2, 3}, 32),
BlobKzgCommitments: [][]byte{bytesutil.PadTo([]byte{2}, fieldparams.BLSPubkeyLength)},
ExecutionRequests: requests,
}
d := params.BeaconConfig().DomainApplicationBuilder
domain, err := signing.ComputeDomain(d, nil, nil)
require.NoError(t, err)
sr, err := signing.ComputeSigningRoot(electraBid, domain)
require.NoError(t, err)
sBidElectra := &ethpb.SignedBuilderBidElectra{
Message: electraBid,
Signature: sk.Sign(sr[:]).Marshal(),
}
return &builderTest.MockBuilderService{
BidElectra: sBidElectra,
}
}(),
fetcher: &blockchainTest.ChainService{
Block: func() interfaces.ReadOnlySignedBeaconBlock {
// Create Fulu head block
wb, err := blocks.NewSignedBeaconBlock(util.NewBeaconBlockFulu())
require.NoError(t, err)
wb.SetSlot(primitives.Slot(params.BeaconConfig().BellatrixForkEpoch) * params.BeaconConfig().SlotsPerEpoch)
return wb
}(),
},
// Should succeed because Electra bids are compatible with Fulu head blocks
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
@@ -1222,3 +1303,107 @@ func Test_expectedGasLimit(t *testing.T) {
})
}
}
func TestIsVersionCompatible(t *testing.T) {
tests := []struct {
name string
bidVersion int
headBlockVersion int
want bool
}{
{
name: "Exact version match - Bellatrix",
bidVersion: version.Bellatrix,
headBlockVersion: version.Bellatrix,
want: true,
},
{
name: "Exact version match - Capella",
bidVersion: version.Capella,
headBlockVersion: version.Capella,
want: true,
},
{
name: "Exact version match - Deneb",
bidVersion: version.Deneb,
headBlockVersion: version.Deneb,
want: true,
},
{
name: "Exact version match - Electra",
bidVersion: version.Electra,
headBlockVersion: version.Electra,
want: true,
},
{
name: "Exact version match - Fulu",
bidVersion: version.Fulu,
headBlockVersion: version.Fulu,
want: true,
},
{
name: "Electra bid with Fulu head block - Compatible",
bidVersion: version.Electra,
headBlockVersion: version.Fulu,
want: true,
},
{
name: "Fulu bid with Electra head block - Not compatible",
bidVersion: version.Fulu,
headBlockVersion: version.Electra,
want: false,
},
{
name: "Deneb bid with Electra head block - Not compatible",
bidVersion: version.Deneb,
headBlockVersion: version.Electra,
want: false,
},
{
name: "Electra bid with Deneb head block - Not compatible",
bidVersion: version.Electra,
headBlockVersion: version.Deneb,
want: false,
},
{
name: "Capella bid with Deneb head block - Not compatible",
bidVersion: version.Capella,
headBlockVersion: version.Deneb,
want: false,
},
{
name: "Bellatrix bid with Capella head block - Not compatible",
bidVersion: version.Bellatrix,
headBlockVersion: version.Capella,
want: false,
},
{
name: "Phase0 bid with Altair head block - Not compatible",
bidVersion: version.Phase0,
headBlockVersion: version.Altair,
want: false,
},
{
name: "Deneb bid with Fulu head block - Not compatible",
bidVersion: version.Deneb,
headBlockVersion: version.Fulu,
want: false,
},
{
name: "Capella bid with Fulu head block - Not compatible",
bidVersion: version.Capella,
headBlockVersion: version.Fulu,
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := isVersionCompatible(tt.bidVersion, tt.headBlockVersion)
if got != tt.want {
t.Errorf("isVersionCompatible(%d, %d) = %v, want %v", tt.bidVersion, tt.headBlockVersion, got, tt.want)
}
})
}
}

View File

@@ -212,6 +212,7 @@ go_test(
shard_count = 4,
deps = [
"//async/abool:go_default_library",
"//async/event:go_default_library",
"//beacon-chain/blockchain:go_default_library",
"//beacon-chain/blockchain/kzg:go_default_library",
"//beacon-chain/blockchain/testing:go_default_library",

View File

@@ -6,6 +6,7 @@ import (
"time"
"github.com/OffchainLabs/prysm/v6/async/abool"
"github.com/OffchainLabs/prysm/v6/async/event"
mockChain "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/testing"
lightClient "github.com/OffchainLabs/prysm/v6/beacon-chain/core/light-client"
db "github.com/OffchainLabs/prysm/v6/beacon-chain/db/testing"
@@ -54,7 +55,7 @@ func TestRPC_LightClientBootstrap(t *testing.T) {
stateNotifier: &mockChain.MockStateNotifier{},
},
chainStarted: abool.New(),
lcStore: lightClient.NewLightClientStore(d),
lcStore: lightClient.NewLightClientStore(d, &p2ptest.FakeP2P{}, new(event.Feed)),
subHandler: newSubTopicHandler(),
rateLimiter: newRateLimiter(p1),
}
@@ -176,7 +177,7 @@ func TestRPC_LightClientOptimisticUpdate(t *testing.T) {
stateNotifier: &mockChain.MockStateNotifier{},
},
chainStarted: abool.New(),
lcStore: &lightClient.Store{},
lcStore: lightClient.NewLightClientStore(d, &p2ptest.FakeP2P{}, new(event.Feed)),
subHandler: newSubTopicHandler(),
rateLimiter: newRateLimiter(p1),
}
@@ -202,7 +203,7 @@ func TestRPC_LightClientOptimisticUpdate(t *testing.T) {
update, err := lightClient.NewLightClientOptimisticUpdateFromBeaconState(ctx, l.State, l.Block, l.AttestedState, l.AttestedBlock)
require.NoError(t, err)
r.lcStore.SetLastOptimisticUpdate(update)
r.lcStore.SetLastOptimisticUpdate(update, false)
var wg sync.WaitGroup
wg.Add(1)
@@ -296,7 +297,7 @@ func TestRPC_LightClientFinalityUpdate(t *testing.T) {
stateNotifier: &mockChain.MockStateNotifier{},
},
chainStarted: abool.New(),
lcStore: &lightClient.Store{},
lcStore: lightClient.NewLightClientStore(d, &p2ptest.FakeP2P{}, new(event.Feed)),
subHandler: newSubTopicHandler(),
rateLimiter: newRateLimiter(p1),
}
@@ -322,7 +323,7 @@ func TestRPC_LightClientFinalityUpdate(t *testing.T) {
update, err := lightClient.NewLightClientFinalityUpdateFromBeaconState(ctx, l.State, l.Block, l.AttestedState, l.AttestedBlock, l.FinalizedBlock)
require.NoError(t, err)
r.lcStore.SetLastFinalityUpdate(update)
r.lcStore.SetLastFinalityUpdate(update, false)
var wg sync.WaitGroup
wg.Add(1)
@@ -416,7 +417,7 @@ func TestRPC_LightClientUpdatesByRange(t *testing.T) {
stateNotifier: &mockChain.MockStateNotifier{},
},
chainStarted: abool.New(),
lcStore: lightClient.NewLightClientStore(d),
lcStore: lightClient.NewLightClientStore(d, &p2ptest.FakeP2P{}, new(event.Feed)),
subHandler: newSubTopicHandler(),
rateLimiter: newRateLimiter(p1),
}

View File

@@ -287,25 +287,33 @@ func (s *Service) Stop() error {
}
}()
// Say goodbye to all peers.
// Create context with timeout to prevent hanging
goodbyeCtx, cancel := context.WithTimeout(s.ctx, 10*time.Second)
defer cancel()
// Use WaitGroup to ensure all goodbye messages complete
var wg sync.WaitGroup
for _, peerID := range s.cfg.p2p.Peers().Connected() {
if s.cfg.p2p.Host().Network().Connectedness(peerID) == network.Connected {
if err := s.sendGoodByeAndDisconnect(s.ctx, p2ptypes.GoodbyeCodeClientShutdown, peerID); err != nil {
log.WithError(err).WithField("peerID", peerID).Error("Failed to send goodbye message")
}
wg.Add(1)
go func(pid peer.ID) {
defer wg.Done()
if err := s.sendGoodByeAndDisconnect(goodbyeCtx, p2ptypes.GoodbyeCodeClientShutdown, pid); err != nil {
log.WithError(err).WithField("peerID", pid).Error("Failed to send goodbye message")
}
}(peerID)
}
}
wg.Wait()
log.Debug("All goodbye messages sent successfully")
// Removing RPC Stream handlers.
// Now safe to remove handlers / unsubscribe.
for _, p := range s.cfg.p2p.Host().Mux().Protocols() {
s.cfg.p2p.Host().RemoveStreamHandler(p)
}
// Deregister Topic Subscribers.
for _, t := range s.cfg.p2p.PubSub().GetTopics() {
s.unSubscribeFromTopic(t)
}
return nil
}

View File

@@ -2,6 +2,7 @@ package sync
import (
"context"
"sync"
"testing"
"time"
@@ -9,16 +10,22 @@ import (
mockChain "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed"
dbTest "github.com/OffchainLabs/prysm/v6/beacon-chain/db/testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers"
p2ptest "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/testing"
p2ptypes "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/types"
"github.com/OffchainLabs/prysm/v6/beacon-chain/startup"
state_native "github.com/OffchainLabs/prysm/v6/beacon-chain/state/state-native"
mockSync "github.com/OffchainLabs/prysm/v6/beacon-chain/sync/initial-sync/testing"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
leakybucket "github.com/OffchainLabs/prysm/v6/container/leaky-bucket"
"github.com/OffchainLabs/prysm/v6/crypto/bls"
"github.com/OffchainLabs/prysm/v6/encoding/bytesutil"
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v6/testing/assert"
"github.com/OffchainLabs/prysm/v6/testing/require"
"github.com/OffchainLabs/prysm/v6/testing/util"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/protocol"
gcache "github.com/patrickmn/go-cache"
)
@@ -227,3 +234,212 @@ func TestSyncService_StopCleanly(t *testing.T) {
require.Equal(t, 0, len(r.cfg.p2p.PubSub().GetTopics()))
require.Equal(t, 0, len(r.cfg.p2p.Host().Mux().Protocols()))
}
func TestService_Stop_SendsGoodbyeMessages(t *testing.T) {
// Create test peers
p1 := p2ptest.NewTestP2P(t)
p2 := p2ptest.NewTestP2P(t)
p3 := p2ptest.NewTestP2P(t)
// Connect peers
p1.Connect(p2)
p1.Connect(p3)
// Register peers in the peer status
p1.Peers().Add(nil, p2.BHost.ID(), p2.BHost.Addrs()[0], network.DirOutbound)
p1.Peers().Add(nil, p3.BHost.ID(), p3.BHost.Addrs()[0], network.DirOutbound)
p1.Peers().SetConnectionState(p2.BHost.ID(), peers.Connected)
p1.Peers().SetConnectionState(p3.BHost.ID(), peers.Connected)
// Create service with connected peers
d := dbTest.SetupDB(t)
chain := &mockChain.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}}
ctx, cancel := context.WithCancel(context.Background())
r := &Service{
cfg: &config{
beaconDB: d,
p2p: p1,
chain: chain,
clock: startup.NewClock(chain.Genesis, chain.ValidatorsRoot),
},
ctx: ctx,
cancel: cancel,
rateLimiter: newRateLimiter(p1),
}
// Initialize context map for RPC
ctxMap, err := ContextByteVersionsForValRoot(chain.ValidatorsRoot)
require.NoError(t, err)
r.ctxMap = ctxMap
// Setup rate limiter for goodbye topic
pcl := protocol.ID("/eth2/beacon_chain/req/goodbye/1/ssz_snappy")
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false)
// Track goodbye messages received
var goodbyeMessages sync.Map
var wg sync.WaitGroup
wg.Add(2)
p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) {
defer wg.Done()
out := new(primitives.SSZUint64)
require.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, out))
goodbyeMessages.Store(p2.BHost.ID().String(), *out)
require.NoError(t, stream.Close())
})
p3.BHost.SetStreamHandler(pcl, func(stream network.Stream) {
defer wg.Done()
out := new(primitives.SSZUint64)
require.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, out))
goodbyeMessages.Store(p3.BHost.ID().String(), *out)
require.NoError(t, stream.Close())
})
connectedPeers := r.cfg.p2p.Peers().Connected()
t.Logf("Connected peers before Stop: %d", len(connectedPeers))
assert.Equal(t, 2, len(connectedPeers), "Expected 2 connected peers")
err = r.Stop()
assert.NoError(t, err)
// Wait for goodbye messages
if util.WaitTimeout(&wg, 15*time.Second) {
t.Fatal("Did not receive goodbye messages within timeout")
}
// Verify correct goodbye codes were sent
msg2, ok := goodbyeMessages.Load(p2.BHost.ID().String())
assert.Equal(t, true, ok, "Expected goodbye message to peer 2")
assert.Equal(t, p2ptypes.GoodbyeCodeClientShutdown, msg2)
msg3, ok := goodbyeMessages.Load(p3.BHost.ID().String())
assert.Equal(t, true, ok, "Expected goodbye message to peer 3")
assert.Equal(t, p2ptypes.GoodbyeCodeClientShutdown, msg3)
}
func TestService_Stop_TimeoutHandling(t *testing.T) {
p1 := p2ptest.NewTestP2P(t)
p2 := p2ptest.NewTestP2P(t)
p1.Connect(p2)
p1.Peers().Add(nil, p2.BHost.ID(), p2.BHost.Addrs()[0], network.DirOutbound)
p1.Peers().SetConnectionState(p2.BHost.ID(), peers.Connected)
d := dbTest.SetupDB(t)
chain := &mockChain.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}}
ctx, cancel := context.WithCancel(context.Background())
r := &Service{
cfg: &config{
beaconDB: d,
p2p: p1,
chain: chain,
clock: startup.NewClock(chain.Genesis, chain.ValidatorsRoot),
},
ctx: ctx,
cancel: cancel,
rateLimiter: newRateLimiter(p1),
}
// Initialize context map for RPC
ctxMap, err := ContextByteVersionsForValRoot(chain.ValidatorsRoot)
require.NoError(t, err)
r.ctxMap = ctxMap
// Setup rate limiter for goodbye topic
pcl := protocol.ID("/eth2/beacon_chain/req/goodbye/1/ssz_snappy")
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false)
// Don't set up stream handler on p2 to simulate unresponsive peer
// Verify peers are connected before stopping
connectedPeers := r.cfg.p2p.Peers().Connected()
t.Logf("Connected peers before Stop: %d", len(connectedPeers))
start := time.Now()
err = r.Stop()
duration := time.Since(start)
t.Logf("Stop completed in %v", duration)
// Stop should complete successfully even when peers don't respond
assert.NoError(t, err)
// Should not hang - completes quickly when goodbye fails
assert.Equal(t, true, duration < 5*time.Second, "Stop() should not hang when peer is unresponsive")
// Test passes - the timeout behavior is working correctly, goodbye attempts fail quickly
}
func TestService_Stop_ConcurrentGoodbyeMessages(t *testing.T) {
// Test that goodbye messages are sent concurrently, not sequentially
const numPeers = 10
p1 := p2ptest.NewTestP2P(t)
testPeers := make([]*p2ptest.TestP2P, numPeers)
// Create and connect multiple peers
for i := 0; i < numPeers; i++ {
testPeers[i] = p2ptest.NewTestP2P(t)
p1.Connect(testPeers[i])
// Register peer in the peer status
p1.Peers().Add(nil, testPeers[i].BHost.ID(), testPeers[i].BHost.Addrs()[0], network.DirOutbound)
p1.Peers().SetConnectionState(testPeers[i].BHost.ID(), peers.Connected)
}
d := dbTest.SetupDB(t)
chain := &mockChain.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}}
ctx, cancel := context.WithCancel(context.Background())
r := &Service{
cfg: &config{
beaconDB: d,
p2p: p1,
chain: chain,
clock: startup.NewClock(chain.Genesis, chain.ValidatorsRoot),
},
ctx: ctx,
cancel: cancel,
rateLimiter: newRateLimiter(p1),
}
// Initialize context map for RPC
ctxMap, err := ContextByteVersionsForValRoot(chain.ValidatorsRoot)
require.NoError(t, err)
r.ctxMap = ctxMap
// Setup rate limiter for goodbye topic
pcl := protocol.ID("/eth2/beacon_chain/req/goodbye/1/ssz_snappy")
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false)
// Each peer will have artificial delay in processing goodbye
var wg sync.WaitGroup
wg.Add(numPeers)
for i := 0; i < numPeers; i++ {
idx := i // capture loop variable
testPeers[idx].BHost.SetStreamHandler(pcl, func(stream network.Stream) {
defer wg.Done()
time.Sleep(100 * time.Millisecond) // Artificial delay
out := new(primitives.SSZUint64)
require.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, out))
require.NoError(t, stream.Close())
})
}
start := time.Now()
err = r.Stop()
duration := time.Since(start)
// If messages were sent sequentially, it would take numPeers * 100ms = 1 second
// If concurrent, should be ~100ms
assert.NoError(t, err)
assert.Equal(t, true, duration < 500*time.Millisecond, "Goodbye messages should be sent concurrently")
require.Equal(t, false, util.WaitTimeout(&wg, 2*time.Second))
}

View File

@@ -28,7 +28,7 @@ func (s *Service) lightClientOptimisticUpdateSubscriber(_ context.Context, msg p
"attestedHeaderRoot": fmt.Sprintf("%x", attestedHeaderRoot),
}).Debug("Saving newly received light client optimistic update.")
s.lcStore.SetLastOptimisticUpdate(update)
s.lcStore.SetLastOptimisticUpdate(update, false)
s.cfg.stateNotifier.StateFeed().Send(&feed.Event{
Type: statefeed.LightClientOptimisticUpdate,
@@ -55,7 +55,7 @@ func (s *Service) lightClientFinalityUpdateSubscriber(_ context.Context, msg pro
"attestedHeaderRoot": fmt.Sprintf("%x", attestedHeaderRoot),
}).Debug("Saving newly received light client finality update.")
s.lcStore.SetLastFinalityUpdate(update)
s.lcStore.SetLastFinalityUpdate(update, false)
s.cfg.stateNotifier.StateFeed().Send(&feed.Event{
Type: statefeed.LightClientFinalityUpdate,

View File

@@ -9,6 +9,7 @@ import (
"time"
"github.com/OffchainLabs/prysm/v6/async/abool"
"github.com/OffchainLabs/prysm/v6/async/event"
mockChain "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/cache"
lightClient "github.com/OffchainLabs/prysm/v6/beacon-chain/core/light-client"
@@ -684,7 +685,7 @@ func TestSubscribe_ReceivesLCOptimisticUpdate(t *testing.T) {
stateNotifier: &mockChain.MockStateNotifier{},
},
chainStarted: abool.New(),
lcStore: &lightClient.Store{},
lcStore: lightClient.NewLightClientStore(d, &p2ptest.FakeP2P{}, new(event.Feed)),
subHandler: newSubTopicHandler(),
}
topic := p2p.LightClientOptimisticUpdateTopicFormat
@@ -751,7 +752,7 @@ func TestSubscribe_ReceivesLCFinalityUpdate(t *testing.T) {
stateNotifier: &mockChain.MockStateNotifier{},
},
chainStarted: abool.New(),
lcStore: &lightClient.Store{},
lcStore: lightClient.NewLightClientStore(d, &p2ptest.FakeP2P{}, new(event.Feed)),
subHandler: newSubTopicHandler(),
}
topic := p2p.LightClientFinalityUpdateTopicFormat

View File

@@ -131,7 +131,7 @@ func (s *Service) validateLightClientFinalityUpdate(ctx context.Context, pid pee
return pubsub.ValidationIgnore, nil
}
if !lightclient.IsBetterFinalityUpdate(newUpdate, s.lcStore.LastFinalityUpdate()) {
if !lightclient.IsFinalityUpdateValidForBroadcast(newUpdate, s.lcStore.LastFinalityUpdate()) {
log.WithFields(logrus.Fields{
"attestedSlot": fmt.Sprintf("%d", newUpdate.AttestedHeader().Beacon().Slot),
"signatureSlot": fmt.Sprintf("%d", newUpdate.SignatureSlot()),

View File

@@ -5,6 +5,7 @@ import (
"testing"
"time"
"github.com/OffchainLabs/prysm/v6/async/event"
mock "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/testing"
lightClient "github.com/OffchainLabs/prysm/v6/beacon-chain/core/light-client"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p"
@@ -102,7 +103,7 @@ func TestValidateLightClientOptimisticUpdate(t *testing.T) {
// drift back appropriate number of epochs based on fork + 2 slots for signature slot + time for gossip propagation + any extra drift
genesisDrift := v*slotsPerEpoch*secondsPerSlot + 2*secondsPerSlot + secondsPerSlot/slotIntervals + test.genesisDrift
chainService := &mock.ChainService{Genesis: time.Unix(time.Now().Unix()-int64(genesisDrift), 0)}
s := &Service{cfg: &config{p2p: p, initialSync: &mockSync.Sync{}, clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot)}, lcStore: &lightClient.Store{}}
s := &Service{cfg: &config{p2p: p, initialSync: &mockSync.Sync{}, clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot)}, lcStore: lightClient.NewLightClientStore(nil, &p2ptest.FakeP2P{}, new(event.Feed))}
var oldUpdate interfaces.LightClientOptimisticUpdate
var err error
@@ -111,7 +112,7 @@ func TestValidateLightClientOptimisticUpdate(t *testing.T) {
oldUpdate, err = lightClient.NewLightClientOptimisticUpdateFromBeaconState(l.Ctx, l.State, l.Block, l.AttestedState, l.AttestedBlock)
require.NoError(t, err)
s.lcStore.SetLastOptimisticUpdate(oldUpdate)
s.lcStore.SetLastOptimisticUpdate(oldUpdate, false)
}
l := util.NewTestLightClient(t, v, test.newUpdateOptions...)
@@ -242,7 +243,7 @@ func TestValidateLightClientFinalityUpdate(t *testing.T) {
// drift back appropriate number of epochs based on fork + 2 slots for signature slot + time for gossip propagation + any extra drift
genesisDrift := v*slotsPerEpoch*secondsPerSlot + 2*secondsPerSlot + secondsPerSlot/slotIntervals + test.genesisDrift
chainService := &mock.ChainService{Genesis: time.Unix(time.Now().Unix()-int64(genesisDrift), 0)}
s := &Service{cfg: &config{p2p: p, initialSync: &mockSync.Sync{}, clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot)}, lcStore: &lightClient.Store{}}
s := &Service{cfg: &config{p2p: p, initialSync: &mockSync.Sync{}, clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot)}, lcStore: lightClient.NewLightClientStore(nil, &p2ptest.FakeP2P{}, new(event.Feed))}
var oldUpdate interfaces.LightClientFinalityUpdate
var err error
@@ -251,7 +252,7 @@ func TestValidateLightClientFinalityUpdate(t *testing.T) {
oldUpdate, err = lightClient.NewLightClientFinalityUpdateFromBeaconState(l.Ctx, l.State, l.Block, l.AttestedState, l.AttestedBlock, l.FinalizedBlock)
require.NoError(t, err)
s.lcStore.SetLastFinalityUpdate(oldUpdate)
s.lcStore.SetLastFinalityUpdate(oldUpdate, false)
}
l := util.NewTestLightClient(t, v, test.newUpdateOptions...)

View File

@@ -0,0 +1,3 @@
### Changed
- when shutting down the sync service we now send p2p goodbye messages in parallel to maxmimize changes of propogating goodbyes to all peers before an unsafe shutdown.

View File

@@ -0,0 +1,5 @@
### Changed
- Moved the broadcast and event notifier logic for saving LC updates to the store function.
- Fixed the issue with broadcasting more than twice per LC Finality update, and the if-case bug.
- Separated the finality update validation rules for saving and broadcasting.

View File

@@ -0,0 +1,3 @@
### Fixed
- Fix builder bid version compatibility to support Electra bids with Fulu blocks

View File

@@ -0,0 +1,3 @@
### Fixed
- Fixed align submitPoolSyncCommitteeSignatures response with Beacon API specification

View File

@@ -0,0 +1,2 @@
### Fixed
- Trigger payload attribute event as soon as an early block is processed.

View File

@@ -0,0 +1,3 @@
### Changed
- Do not compare liveness response with LH in e2e Beacon API evaluator.

View File

@@ -0,0 +1,3 @@
### Added
- Performance profiling for GetDutiesV2 operations taking over 2 seconds

View File

@@ -79,6 +79,8 @@ type Flags struct {
SaveInvalidBlock bool // SaveInvalidBlock saves invalid block to temp.
SaveInvalidBlob bool // SaveInvalidBlob saves invalid blob to temp.
SlowDutiesProfile bool // SlowDutiesProfile enables performance profiling when GetDutiesV2 is slow.
EnableDiscoveryReboot bool // EnableDiscoveryReboot allows the node to have its local listener to be rebooted in the event of discovery issues.
// KeystoreImportDebounceInterval specifies the time duration the validator waits to reload new keys if they have
@@ -202,6 +204,11 @@ func ConfigureBeaconChain(ctx *cli.Context) error {
cfg.SaveInvalidBlob = true
}
if ctx.IsSet(slowDutiesProfileFlag.Name) {
logEnabled(slowDutiesProfileFlag)
cfg.SlowDutiesProfile = true
}
if ctx.IsSet(disableGRPCConnectionLogging.Name) {
logDisabled(disableGRPCConnectionLogging)
cfg.DisableGRPCConnectionLogs = true

View File

@@ -45,6 +45,10 @@ var (
Name: "save-invalid-blob-temp",
Usage: "Writes invalid blobs to temp directory.",
}
slowDutiesProfileFlag = &cli.BoolFlag{
Name: "slow-duties-profile",
Usage: "Enable performance profiling when GetDutiesV2 takes longer than 2s. Saves profiles to <datadir>/debug.",
}
disableGRPCConnectionLogging = &cli.BoolFlag{
Name: "disable-grpc-connection-logging",
Usage: `WARNING: The gRPC API will remain the default and fully supported through v8 (expected in 2026) but will be eventually removed in favor of REST API..
@@ -232,6 +236,7 @@ var BeaconChainFlags = combinedFlags([]cli.Flag{
writeSSZStateTransitionsFlag,
saveInvalidBlockTempFlag,
saveInvalidBlobTempFlag,
slowDutiesProfileFlag,
disableGRPCConnectionLogging,
HoleskyTestnet,
SepoliaTestnet,

View File

@@ -322,6 +322,7 @@ var (
}())),
"/validator/liveness/{param1}": newMetadata[structs.GetLivenessResponse](
v1PathTemplate,
withSanityCheckOnly(),
withParams(func(currentEpoch primitives.Epoch) []string {
return []string{fmt.Sprintf("%v", currentEpoch)}
}),

View File

@@ -53,6 +53,7 @@ go_library(
"//consensus-types:go_default_library",
"//consensus-types/blocks:go_default_library",
"//consensus-types/interfaces:go_default_library",
"//consensus-types/light-client:go_default_library",
"//consensus-types/primitives:go_default_library",
"//container/trie:go_default_library",
"//crypto/bls:go_default_library",

View File

@@ -10,6 +10,7 @@ import (
consensus_types "github.com/OffchainLabs/prysm/v6/consensus-types"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
lightclienttypes "github.com/OffchainLabs/prysm/v6/consensus-types/light-client"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v6/encoding/ssz"
v11 "github.com/OffchainLabs/prysm/v6/proto/engine/v1"
@@ -28,6 +29,7 @@ type TestLightClient struct {
version int
increaseAttestedSlotBy uint64
increaseFinalizedSlotBy uint64
increaseSignatureSlotBy uint64
T *testing.T
Ctx context.Context
@@ -112,6 +114,13 @@ func WithIncreasedFinalizedSlot(increaseBy uint64) LightClientOption {
}
}
// WithIncreasedSignatureSlot specifies the number of slots to increase the signature slot by. This does not affect the attested/finalized block's slot.
func WithIncreasedSignatureSlot(increaseBy uint64) LightClientOption {
return func(l *TestLightClient) {
l.increaseSignatureSlotBy = increaseBy
}
}
func (l *TestLightClient) setupTestAltair() *TestLightClient {
ctx := context.Background()
@@ -121,6 +130,9 @@ func (l *TestLightClient) setupTestAltair() *TestLightClient {
}
signatureSlot := attestedSlot.Add(1)
if l.increaseSignatureSlotBy > 0 {
signatureSlot = signatureSlot.Add(l.increaseSignatureSlotBy)
}
// Attested State
attestedState, err := NewBeaconStateAltair()
@@ -232,6 +244,9 @@ func (l *TestLightClient) setupTestBellatrix() *TestLightClient {
}
signatureSlot := attestedSlot.Add(1)
if l.increaseSignatureSlotBy > 0 {
signatureSlot = signatureSlot.Add(l.increaseSignatureSlotBy)
}
// Attested State & Block
attestedState, err := NewBeaconStateBellatrix()
@@ -404,6 +419,9 @@ func (l *TestLightClient) setupTestCapella() *TestLightClient {
}
signatureSlot := attestedSlot.Add(1)
if l.increaseSignatureSlotBy > 0 {
signatureSlot = signatureSlot.Add(l.increaseSignatureSlotBy)
}
// Attested State
attestedState, err := NewBeaconStateCapella()
@@ -577,6 +595,9 @@ func (l *TestLightClient) setupTestDeneb() *TestLightClient {
}
signatureSlot := attestedSlot.Add(1)
if l.increaseSignatureSlotBy > 0 {
signatureSlot = signatureSlot.Add(l.increaseSignatureSlotBy)
}
// Attested State
attestedState, err := NewBeaconStateDeneb()
@@ -751,6 +772,9 @@ func (l *TestLightClient) setupTestElectra() *TestLightClient {
}
signatureSlot := attestedSlot.Add(1)
if l.increaseSignatureSlotBy > 0 {
signatureSlot = signatureSlot.Add(l.increaseSignatureSlotBy)
}
// Attested State & Block
attestedState, err := NewBeaconStateElectra()
@@ -1044,3 +1068,55 @@ func (l *TestLightClient) CheckSyncAggregate(sa *ethpb.SyncAggregate) {
require.DeepSSZEqual(l.T, syncAggregate.SyncCommitteeBits, sa.SyncCommitteeBits, "SyncAggregate bits is not equal")
require.DeepSSZEqual(l.T, syncAggregate.SyncCommitteeSignature, sa.SyncCommitteeSignature, "SyncAggregate signature is not equal")
}
func MockOptimisticUpdate() (interfaces.LightClientOptimisticUpdate, error) {
pbUpdate := &ethpb.LightClientOptimisticUpdateAltair{
AttestedHeader: &ethpb.LightClientHeaderAltair{
Beacon: &ethpb.BeaconBlockHeader{
Slot: primitives.Slot(32),
ParentRoot: make([]byte, 32),
StateRoot: make([]byte, 32),
BodyRoot: make([]byte, 32),
},
},
SyncAggregate: &ethpb.SyncAggregate{
SyncCommitteeBits: make([]byte, 64),
SyncCommitteeSignature: make([]byte, 96),
},
SignatureSlot: primitives.Slot(33),
}
return lightclienttypes.NewWrappedOptimisticUpdateAltair(pbUpdate)
}
func MockFinalityUpdate() (interfaces.LightClientFinalityUpdate, error) {
finalityBranch := make([][]byte, fieldparams.FinalityBranchDepth)
for i := 0; i < len(finalityBranch); i++ {
finalityBranch[i] = make([]byte, 32)
}
pbUpdate := &ethpb.LightClientFinalityUpdateAltair{
FinalizedHeader: &ethpb.LightClientHeaderAltair{
Beacon: &ethpb.BeaconBlockHeader{
Slot: primitives.Slot(31),
ParentRoot: make([]byte, 32),
StateRoot: make([]byte, 32),
BodyRoot: make([]byte, 32),
},
},
FinalityBranch: finalityBranch,
AttestedHeader: &ethpb.LightClientHeaderAltair{
Beacon: &ethpb.BeaconBlockHeader{
Slot: primitives.Slot(32),
ParentRoot: make([]byte, 32),
StateRoot: make([]byte, 32),
BodyRoot: make([]byte, 32),
},
},
SyncAggregate: &ethpb.SyncAggregate{
SyncCommitteeBits: make([]byte, 64),
SyncCommitteeSignature: make([]byte, 96),
},
SignatureSlot: primitives.Slot(33),
}
return lightclienttypes.NewWrappedFinalityUpdateAltair(pbUpdate)
}