Compare commits

...

6 Commits

Author SHA1 Message Date
terence tsao
fd65297676 Proposer timer handling when the context is cancelled in random peer 2025-10-26 12:25:47 -07:00
Manu NALEPA
4fb75d6d0b Add some metrics improvements (#15922)
* Define TCP and QUIC as `InternetProtocol` (no functional change).

* Group types. (No functional changes)

* Rename variables and use range syntax.

* Add `p2pMaxPeers` and `p2pPeerCountDirectionType` metrics

* `p2p_subscribed_topic_peer_total`: Reset to avoid dangling values.

* `validateConfig`:
- Use `Warning` with fields instead of `Warnf`.
- Avoid to both modify in place the input value and return it.

* Add `p2p_minimum_peers_per_subnet` metric.

* `beaconConfig` => `cfg`.

https://github.com/OffchainLabs/prysm/pull/15880#discussion_r2436826215

* Add changelog

---------

Co-authored-by: james-prysm <90280386+james-prysm@users.noreply.github.com>
2025-10-26 15:16:05 +00:00
terence
6d596edea2 Use SlotTicker instead of time.Ticker for attestation pool pruning (#15917)
* Use SlotTicker instead of time.Ticker for attestation pool pruning

* Offset one second before slot start
2025-10-24 15:35:26 +00:00
Bastin
9153c5a202 light client logging (#15927) 2025-10-24 14:42:27 +00:00
james-prysm
26ce94e224 removes misleading keymanager info log (#15926)
* simple change

* fixing test"
"
2025-10-24 14:28:30 +00:00
terence
255ea2fac1 Return optimistic response only when handling blinded blocks (#15925)
* Return optimistic response only when handling blinded blocks in proposer

* Remove blind condition
2025-10-24 03:37:32 +00:00
42 changed files with 224 additions and 157 deletions

View File

@@ -472,8 +472,8 @@ func (s *Service) removeStartupState() {
func (s *Service) updateCustodyInfoInDB(slot primitives.Slot) (primitives.Slot, uint64, error) {
isSubscribedToAllDataSubnets := flags.Get().SubscribeAllDataSubnets
beaconConfig := params.BeaconConfig()
custodyRequirement := beaconConfig.CustodyRequirement
cfg := params.BeaconConfig()
custodyRequirement := cfg.CustodyRequirement
// Check if the node was previously subscribed to all data subnets, and if so,
// store the new status accordingly.
@@ -493,7 +493,7 @@ func (s *Service) updateCustodyInfoInDB(slot primitives.Slot) (primitives.Slot,
// Compute the custody group count.
custodyGroupCount := custodyRequirement
if isSubscribedToAllDataSubnets {
custodyGroupCount = beaconConfig.NumberOfCustodyGroups
custodyGroupCount = cfg.NumberOfCustodyGroups
}
// Safely compute the fulu fork slot.
@@ -536,11 +536,11 @@ func spawnCountdownIfPreGenesis(ctx context.Context, genesisTime time.Time, db d
}
func fuluForkSlot() (primitives.Slot, error) {
beaconConfig := params.BeaconConfig()
cfg := params.BeaconConfig()
fuluForkEpoch := beaconConfig.FuluForkEpoch
if fuluForkEpoch == beaconConfig.FarFutureEpoch {
return beaconConfig.FarFutureSlot, nil
fuluForkEpoch := cfg.FuluForkEpoch
if fuluForkEpoch == cfg.FarFutureEpoch {
return cfg.FarFutureSlot, nil
}
forkFuluSlot, err := slots.EpochStart(fuluForkEpoch)

View File

@@ -401,7 +401,7 @@ func ComputeProposerIndex(bState state.ReadOnlyBeaconState, activeIndices []prim
return 0, errors.New("empty active indices list")
}
hashFunc := hash.CustomSHA256Hasher()
beaconConfig := params.BeaconConfig()
cfg := params.BeaconConfig()
seedBuffer := make([]byte, len(seed)+8)
copy(seedBuffer, seed[:])
@@ -426,14 +426,14 @@ func ComputeProposerIndex(bState state.ReadOnlyBeaconState, activeIndices []prim
offset := (i % 16) * 2
randomValue := uint64(randomBytes[offset]) | uint64(randomBytes[offset+1])<<8
if effectiveBal*fieldparams.MaxRandomValueElectra >= beaconConfig.MaxEffectiveBalanceElectra*randomValue {
if effectiveBal*fieldparams.MaxRandomValueElectra >= cfg.MaxEffectiveBalanceElectra*randomValue {
return candidateIndex, nil
}
} else {
binary.LittleEndian.PutUint64(seedBuffer[len(seed):], i/32)
randomByte := hashFunc(seedBuffer)[i%32]
if effectiveBal*fieldparams.MaxRandomByte >= beaconConfig.MaxEffectiveBalance*uint64(randomByte) {
if effectiveBal*fieldparams.MaxRandomByte >= cfg.MaxEffectiveBalance*uint64(randomByte) {
return candidateIndex, nil
}
}

View File

@@ -89,14 +89,14 @@ func CustodyGroups(nodeId enode.ID, custodyGroupCount uint64) ([]uint64, error)
// ComputeColumnsForCustodyGroup computes the columns for a given custody group.
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/das-core.md#compute_columns_for_custody_group
func ComputeColumnsForCustodyGroup(custodyGroup uint64) ([]uint64, error) {
beaconConfig := params.BeaconConfig()
numberOfCustodyGroups := beaconConfig.NumberOfCustodyGroups
cfg := params.BeaconConfig()
numberOfCustodyGroups := cfg.NumberOfCustodyGroups
if custodyGroup >= numberOfCustodyGroups {
return nil, ErrCustodyGroupTooLarge
}
numberOfColumns := beaconConfig.NumberOfColumns
numberOfColumns := cfg.NumberOfColumns
columnsPerGroup := numberOfColumns / numberOfCustodyGroups
@@ -112,9 +112,9 @@ func ComputeColumnsForCustodyGroup(custodyGroup uint64) ([]uint64, error) {
// ComputeCustodyGroupForColumn computes the custody group for a given column.
// It is the reciprocal function of ComputeColumnsForCustodyGroup.
func ComputeCustodyGroupForColumn(columnIndex uint64) (uint64, error) {
beaconConfig := params.BeaconConfig()
numberOfColumns := beaconConfig.NumberOfColumns
numberOfCustodyGroups := beaconConfig.NumberOfCustodyGroups
cfg := params.BeaconConfig()
numberOfColumns := cfg.NumberOfColumns
numberOfCustodyGroups := cfg.NumberOfCustodyGroups
if columnIndex >= numberOfColumns {
return 0, ErrIndexTooLarge

View File

@@ -84,10 +84,10 @@ func ValidatorsCustodyRequirement(state beaconState.ReadOnlyBeaconState, validat
totalNodeBalance += validator.EffectiveBalance()
}
beaconConfig := params.BeaconConfig()
numberOfCustodyGroups := beaconConfig.NumberOfCustodyGroups
validatorCustodyRequirement := beaconConfig.ValidatorCustodyRequirement
balancePerAdditionalCustodyGroup := beaconConfig.BalancePerAdditionalCustodyGroup
cfg := params.BeaconConfig()
numberOfCustodyGroups := cfg.NumberOfCustodyGroups
validatorCustodyRequirement := cfg.ValidatorCustodyRequirement
balancePerAdditionalCustodyGroup := cfg.BalancePerAdditionalCustodyGroup
count := totalNodeBalance / balancePerAdditionalCustodyGroup
return min(max(count, validatorCustodyRequirement), numberOfCustodyGroups), nil

View File

@@ -196,7 +196,7 @@ func TestAltairCompatible(t *testing.T) {
}
func TestCanUpgradeTo(t *testing.T) {
beaconConfig := params.BeaconConfig()
cfg := params.BeaconConfig()
outerTestCases := []struct {
name string
@@ -205,32 +205,32 @@ func TestCanUpgradeTo(t *testing.T) {
}{
{
name: "Altair",
forkEpoch: &beaconConfig.AltairForkEpoch,
forkEpoch: &cfg.AltairForkEpoch,
upgradeFunc: time.CanUpgradeToAltair,
},
{
name: "Bellatrix",
forkEpoch: &beaconConfig.BellatrixForkEpoch,
forkEpoch: &cfg.BellatrixForkEpoch,
upgradeFunc: time.CanUpgradeToBellatrix,
},
{
name: "Capella",
forkEpoch: &beaconConfig.CapellaForkEpoch,
forkEpoch: &cfg.CapellaForkEpoch,
upgradeFunc: time.CanUpgradeToCapella,
},
{
name: "Deneb",
forkEpoch: &beaconConfig.DenebForkEpoch,
forkEpoch: &cfg.DenebForkEpoch,
upgradeFunc: time.CanUpgradeToDeneb,
},
{
name: "Electra",
forkEpoch: &beaconConfig.ElectraForkEpoch,
forkEpoch: &cfg.ElectraForkEpoch,
upgradeFunc: time.CanUpgradeToElectra,
},
{
name: "Fulu",
forkEpoch: &beaconConfig.FuluForkEpoch,
forkEpoch: &cfg.FuluForkEpoch,
upgradeFunc: time.CanUpgradeToFulu,
},
}
@@ -238,7 +238,7 @@ func TestCanUpgradeTo(t *testing.T) {
for _, otc := range outerTestCases {
params.SetupTestConfigCleanup(t)
*otc.forkEpoch = 5
params.OverrideBeaconConfig(beaconConfig)
params.OverrideBeaconConfig(cfg)
innerTestCases := []struct {
name string

View File

@@ -6,6 +6,7 @@ go_library(
"cache.go",
"helpers.go",
"lightclient.go",
"log.go",
"store.go",
],
importpath = "github.com/OffchainLabs/prysm/v6/beacon-chain/light-client",

View File

@@ -0,0 +1,5 @@
package light_client
import "github.com/sirupsen/logrus"
var log = logrus.WithField("prefix", "light-client")

View File

@@ -14,7 +14,6 @@ import (
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v6/time/slots"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)
var ErrLightClientBootstrapNotFound = errors.New("light client bootstrap not found")

View File

@@ -10,11 +10,13 @@ import (
// pruneExpired prunes attestations pool on every slot interval.
func (s *Service) pruneExpired() {
ticker := time.NewTicker(s.cfg.pruneInterval)
defer ticker.Stop()
secondsPerSlot := params.BeaconConfig().SecondsPerSlot
offset := time.Duration(secondsPerSlot-1) * time.Second
slotTicker := slots.NewSlotTickerWithOffset(s.genesisTime, offset, secondsPerSlot)
defer slotTicker.Done()
for {
select {
case <-ticker.C:
case <-slotTicker.C():
s.pruneExpiredAtts()
s.updateMetrics()
case <-s.ctx.Done():

View File

@@ -17,7 +17,9 @@ import (
)
func TestPruneExpired_Ticker(t *testing.T) {
ctx, cancel := context.WithTimeout(t.Context(), 3*time.Second)
// Need timeout longer than the offset (secondsPerSlot - 1) + some buffer
timeout := time.Duration(params.BeaconConfig().SecondsPerSlot+5) * time.Second
ctx, cancel := context.WithTimeout(t.Context(), timeout)
defer cancel()
s, err := NewService(ctx, &Config{

View File

@@ -7,6 +7,7 @@ import (
statefeed "github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed/state"
"github.com/OffchainLabs/prysm/v6/beacon-chain/db"
"github.com/OffchainLabs/prysm/v6/beacon-chain/startup"
"github.com/sirupsen/logrus"
)
// This is the default queue size used if we have specified an invalid one.
@@ -63,12 +64,17 @@ func (cfg *Config) connManagerLowHigh() (int, int) {
return low, high
}
// validateConfig validates whether the values provided are accurate and will set
// the appropriate values for those that are invalid.
func validateConfig(cfg *Config) *Config {
if cfg.QueueSize == 0 {
log.Warnf("Invalid pubsub queue size of %d initialized, setting the quese size as %d instead", cfg.QueueSize, defaultPubsubQueueSize)
cfg.QueueSize = defaultPubsubQueueSize
// validateConfig validates whether the provided config has valid values and sets
// the invalid ones to default.
func validateConfig(cfg *Config) {
if cfg.QueueSize > 0 {
return
}
return cfg
log.WithFields(logrus.Fields{
"queueSize": cfg.QueueSize,
"default": defaultPubsubQueueSize,
}).Warning("Invalid pubsub queue size, setting the queue size to the default value")
cfg.QueueSize = defaultPubsubQueueSize
}

View File

@@ -259,11 +259,11 @@ func (s *Service) custodyGroupCountFromPeerENR(pid peer.ID) uint64 {
}
func fuluForkSlot() (primitives.Slot, error) {
beaconConfig := params.BeaconConfig()
cfg := params.BeaconConfig()
fuluForkEpoch := beaconConfig.FuluForkEpoch
if fuluForkEpoch == beaconConfig.FarFutureEpoch {
return beaconConfig.FarFutureSlot, nil
fuluForkEpoch := cfg.FuluForkEpoch
if fuluForkEpoch == cfg.FarFutureEpoch {
return cfg.FarFutureSlot, nil
}
forkFuluSlot, err := slots.EpochStart(fuluForkEpoch)

View File

@@ -3,6 +3,7 @@ package p2p
import (
"strings"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/prometheus/client_golang/prometheus"
@@ -26,12 +27,25 @@ var (
Help: "The number of peers in a given state.",
},
[]string{"state"})
p2pMaxPeers = promauto.NewGauge(prometheus.GaugeOpts{
Name: "p2p_max_peers",
Help: "The target maximum number of peers.",
})
p2pPeerCountDirectionType = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "p2p_peer_count_direction_type",
Help: "The number of peers in a given direction and type.",
},
[]string{"direction", "type"})
connectedPeersCount = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "connected_libp2p_peers",
Help: "Tracks the total number of connected libp2p peers by agent string",
},
[]string{"agent"},
)
minimumPeersPerSubnet = promauto.NewGauge(prometheus.GaugeOpts{
Name: "p2p_minimum_peers_per_subnet",
Help: "The minimum number of peers to connect to per subnet",
})
avgScoreConnectedClients = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "connected_libp2p_peers_average_scores",
Help: "Tracks the overall p2p scores of connected libp2p peers by agent string",
@@ -174,18 +188,26 @@ var (
)
func (s *Service) updateMetrics() {
store := s.Host().Peerstore()
connectedPeers := s.peers.Connected()
p2pPeerCount.WithLabelValues("Connected").Set(float64(len(connectedPeers)))
p2pPeerCount.WithLabelValues("Disconnected").Set(float64(len(s.peers.Disconnected())))
p2pPeerCount.WithLabelValues("Connecting").Set(float64(len(s.peers.Connecting())))
p2pPeerCount.WithLabelValues("Disconnecting").Set(float64(len(s.peers.Disconnecting())))
p2pPeerCount.WithLabelValues("Bad").Set(float64(len(s.peers.Bad())))
store := s.Host().Peerstore()
numConnectedPeersByClient := make(map[string]float64)
upperTCP := strings.ToUpper(string(peers.TCP))
upperQUIC := strings.ToUpper(string(peers.QUIC))
p2pPeerCountDirectionType.WithLabelValues("inbound", upperTCP).Set(float64(len(s.peers.InboundConnectedWithProtocol(peers.TCP))))
p2pPeerCountDirectionType.WithLabelValues("inbound", upperQUIC).Set(float64(len(s.peers.InboundConnectedWithProtocol(peers.QUIC))))
p2pPeerCountDirectionType.WithLabelValues("outbound", upperTCP).Set(float64(len(s.peers.OutboundConnectedWithProtocol(peers.TCP))))
p2pPeerCountDirectionType.WithLabelValues("outbound", upperQUIC).Set(float64(len(s.peers.OutboundConnectedWithProtocol(peers.QUIC))))
connectedPeersCountByClient := make(map[string]float64)
peerScoresByClient := make(map[string][]float64)
for i := 0; i < len(connectedPeers); i++ {
p := connectedPeers[i]
for _, p := range connectedPeers {
pid, err := peer.Decode(p.String())
if err != nil {
log.WithError(err).Debug("Could not decode peer string")
@@ -193,16 +215,18 @@ func (s *Service) updateMetrics() {
}
foundName := agentFromPid(pid, store)
numConnectedPeersByClient[foundName] += 1
connectedPeersCountByClient[foundName] += 1
// Get peer scoring data.
overallScore := s.peers.Scorers().Score(pid)
peerScoresByClient[foundName] = append(peerScoresByClient[foundName], overallScore)
}
connectedPeersCount.Reset() // Clear out previous results.
for agent, total := range numConnectedPeersByClient {
for agent, total := range connectedPeersCountByClient {
connectedPeersCount.WithLabelValues(agent).Set(total)
}
avgScoreConnectedClients.Reset() // Clear out previous results.
for agent, scoringData := range peerScoresByClient {
avgScore := average(scoringData)

View File

@@ -81,29 +81,31 @@ const (
type InternetProtocol string
const (
TCP = "tcp"
QUIC = "quic"
TCP = InternetProtocol("tcp")
QUIC = InternetProtocol("quic")
)
// Status is the structure holding the peer status information.
type Status struct {
ctx context.Context
scorers *scorers.Service
store *peerdata.Store
ipTracker map[string]uint64
rand *rand.Rand
ipColocationWhitelist []*net.IPNet
}
type (
// Status is the structure holding the peer status information.
Status struct {
ctx context.Context
scorers *scorers.Service
store *peerdata.Store
ipTracker map[string]uint64
rand *rand.Rand
ipColocationWhitelist []*net.IPNet
}
// StatusConfig represents peer status service params.
type StatusConfig struct {
// PeerLimit specifies maximum amount of concurrent peers that are expected to be connect to the node.
PeerLimit int
// ScorerParams holds peer scorer configuration params.
ScorerParams *scorers.Config
// IPColocationWhitelist contains CIDR ranges that are exempt from IP colocation limits.
IPColocationWhitelist []*net.IPNet
}
// StatusConfig represents peer status service params.
StatusConfig struct {
// PeerLimit specifies maximum amount of concurrent peers that are expected to be connect to the node.
PeerLimit int
// ScorerParams holds peer scorer configuration params.
ScorerParams *scorers.Config
// IPColocationWhitelist contains CIDR ranges that are exempt from IP colocation limits.
IPColocationWhitelist []*net.IPNet
}
)
// NewStatus creates a new status entity.
func NewStatus(ctx context.Context, config *StatusConfig) *Status {

View File

@@ -345,17 +345,17 @@ func TopicFromMessage(msg string, epoch primitives.Epoch) (string, error) {
return "", errors.Errorf("%s: %s", invalidRPCMessageType, msg)
}
beaconConfig := params.BeaconConfig()
cfg := params.BeaconConfig()
// Check if the message is to be updated in fulu.
if epoch >= beaconConfig.FuluForkEpoch {
if epoch >= cfg.FuluForkEpoch {
if version, ok := fuluMapping[msg]; ok {
return protocolPrefix + msg + version, nil
}
}
// Check if the message is to be updated in altair.
if epoch >= beaconConfig.AltairForkEpoch {
if epoch >= cfg.AltairForkEpoch {
if version, ok := altairMapping[msg]; ok {
return protocolPrefix + msg + version, nil
}

View File

@@ -14,6 +14,7 @@ import (
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers/scorers"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/types"
"github.com/OffchainLabs/prysm/v6/cmd/beacon-chain/flags"
"github.com/OffchainLabs/prysm/v6/config/features"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
@@ -106,12 +107,16 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) {
ctx, cancel := context.WithCancel(ctx)
_ = cancel // govet fix for lost cancel. Cancel is handled in service.Stop().
cfg = validateConfig(cfg)
validateConfig(cfg)
privKey, err := privKey(cfg)
if err != nil {
return nil, errors.Wrapf(err, "failed to generate p2p private key")
}
p2pMaxPeers.Set(float64(cfg.MaxPeers))
minimumPeersPerSubnet.Set(float64(flags.Get().MinimumPeersPerSubnet))
metaData, err := metaDataFromDB(ctx, cfg.DB)
if err != nil {
log.WithError(err).Error("Failed to create peer metadata")

View File

@@ -514,18 +514,18 @@ func initializePersistentSubnets(id enode.ID, epoch primitives.Epoch) error {
//
// return [compute_subscribed_subnet(node_id, epoch, index) for index in range(SUBNETS_PER_NODE)]
func computeSubscribedSubnets(nodeID enode.ID, epoch primitives.Epoch) ([]uint64, error) {
beaconConfig := params.BeaconConfig()
cfg := params.BeaconConfig()
if flags.Get().SubscribeToAllSubnets {
subnets := make([]uint64, 0, beaconConfig.AttestationSubnetCount)
for i := range beaconConfig.AttestationSubnetCount {
subnets := make([]uint64, 0, cfg.AttestationSubnetCount)
for i := range cfg.AttestationSubnetCount {
subnets = append(subnets, i)
}
return subnets, nil
}
subnets := make([]uint64, 0, beaconConfig.SubnetsPerNode)
for i := range beaconConfig.SubnetsPerNode {
subnets := make([]uint64, 0, cfg.SubnetsPerNode)
for i := range cfg.SubnetsPerNode {
sub, err := computeSubscribedSubnet(nodeID, epoch, i)
if err != nil {
return nil, errors.Wrap(err, "compute subscribed subnet")

View File

@@ -524,12 +524,12 @@ func TestSubnetComputation(t *testing.T) {
require.NoError(t, err)
localNode := enode.NewLocalNode(db, convertedKey)
beaconConfig := params.BeaconConfig()
cfg := params.BeaconConfig()
t.Run("standard", func(t *testing.T) {
retrievedSubnets, err := computeSubscribedSubnets(localNode.ID(), 1000)
require.NoError(t, err)
require.Equal(t, beaconConfig.SubnetsPerNode, uint64(len(retrievedSubnets)))
require.Equal(t, cfg.SubnetsPerNode, uint64(len(retrievedSubnets)))
require.Equal(t, retrievedSubnets[0]+1, retrievedSubnets[1])
})
@@ -541,8 +541,8 @@ func TestSubnetComputation(t *testing.T) {
retrievedSubnets, err := computeSubscribedSubnets(localNode.ID(), 1000)
require.NoError(t, err)
require.Equal(t, beaconConfig.AttestationSubnetCount, uint64(len(retrievedSubnets)))
for i := range beaconConfig.AttestationSubnetCount {
require.Equal(t, cfg.AttestationSubnetCount, uint64(len(retrievedSubnets)))
for i := range cfg.AttestationSubnetCount {
require.Equal(t, i, retrievedSubnets[i])
}
})

View File

@@ -312,14 +312,14 @@ func (vs *Server) ProposeBeaconBlock(ctx context.Context, req *ethpb.GenericSign
rob, err := blocks.NewROBlockWithRoot(block, root)
if block.IsBlinded() {
block, blobSidecars, err = vs.handleBlindedBlock(ctx, block)
if errors.Is(err, builderapi.ErrBadGateway) {
log.WithError(err).Info("Optimistically proposed block - builder relay temporarily unavailable, block may arrive over P2P")
return &ethpb.ProposeResponse{BlockRoot: root[:]}, nil
}
} else if block.Version() >= version.Deneb {
blobSidecars, dataColumnSidecars, err = vs.handleUnblindedBlock(rob, req)
}
if err != nil {
if errors.Is(err, builderapi.ErrBadGateway) && block.IsBlinded() {
log.WithError(err).Info("Optimistically proposed block - builder relay temporarily unavailable, block may arrive over P2P")
return &ethpb.ProposeResponse{BlockRoot: root[:]}, nil
}
return nil, status.Errorf(codes.Internal, "%s: %v", "handle block failed", err)
}

View File

@@ -90,10 +90,10 @@ func (s *Service) updateCustodyInfoIfNeeded() error {
// custodyGroupCount computes the custody group count based on the custody requirement,
// the validators custody requirement, and whether the node is subscribed to all data subnets.
func (s *Service) custodyGroupCount(context.Context) (uint64, error) {
beaconConfig := params.BeaconConfig()
cfg := params.BeaconConfig()
if flags.Get().SubscribeAllDataSubnets {
return beaconConfig.NumberOfCustodyGroups, nil
return cfg.NumberOfCustodyGroups, nil
}
validatorsCustodyRequirement, err := s.validatorsCustodyRequirement()
@@ -101,7 +101,7 @@ func (s *Service) custodyGroupCount(context.Context) (uint64, error) {
return 0, errors.Wrap(err, "validators custody requirement")
}
return max(beaconConfig.CustodyRequirement, validatorsCustodyRequirement), nil
return max(cfg.CustodyRequirement, validatorsCustodyRequirement), nil
}
// validatorsCustodyRequirements computes the custody requirements based on the

View File

@@ -116,11 +116,11 @@ func withSubscribeAllDataSubnets(t *testing.T, fn func()) {
func TestUpdateCustodyInfoIfNeeded(t *testing.T) {
params.SetupTestConfigCleanup(t)
beaconConfig := params.BeaconConfig()
beaconConfig.NumberOfCustodyGroups = 128
beaconConfig.CustodyRequirement = 4
beaconConfig.SamplesPerSlot = 8
params.OverrideBeaconConfig(beaconConfig)
cfg := params.BeaconConfig()
cfg.NumberOfCustodyGroups = 128
cfg.CustodyRequirement = 4
cfg.SamplesPerSlot = 8
params.OverrideBeaconConfig(cfg)
t.Run("Skip update when actual custody count >= target", func(t *testing.T) {
setup := setupCustodyTest(t, false)
@@ -159,7 +159,7 @@ func TestUpdateCustodyInfoIfNeeded(t *testing.T) {
require.NoError(t, err)
const expectedSlot = primitives.Slot(100)
setup.assertCustodyInfo(t, expectedSlot, beaconConfig.NumberOfCustodyGroups)
setup.assertCustodyInfo(t, expectedSlot, cfg.NumberOfCustodyGroups)
})
})
}

View File

@@ -1133,9 +1133,14 @@ func randomPeer(
"delay": waitPeriod,
}).Debug("Waiting for a peer with enough bandwidth for data column sidecars")
timer := time.NewTimer(waitPeriod)
select {
case <-time.After(waitPeriod):
case <-timer.C:
// Timer expired, retry the loop
case <-ctx.Done():
// Context cancelled - stop timer to prevent leak
timer.Stop()
return "", ctx.Err()
}
}

View File

@@ -1366,16 +1366,16 @@ func TestFetchSidecars(t *testing.T) {
})
t.Run("Nominal", func(t *testing.T) {
beaconConfig := params.BeaconConfig()
numberOfColumns := beaconConfig.NumberOfColumns
samplesPerSlot := beaconConfig.SamplesPerSlot
cfg := params.BeaconConfig()
numberOfColumns := cfg.NumberOfColumns
samplesPerSlot := cfg.SamplesPerSlot
// Define "now" to be one epoch after genesis time + retention period.
genesisTime := time.Date(2025, time.August, 10, 0, 0, 0, 0, time.UTC)
secondsPerSlot := beaconConfig.SecondsPerSlot
slotsPerEpoch := beaconConfig.SlotsPerEpoch
secondsPerSlot := cfg.SecondsPerSlot
slotsPerEpoch := cfg.SlotsPerEpoch
secondsPerEpoch := uint64(slotsPerEpoch.Mul(secondsPerSlot))
retentionEpochs := beaconConfig.MinEpochsForDataColumnSidecarsRequest
retentionEpochs := cfg.MinEpochsForDataColumnSidecarsRequest
nowWrtGenesisSecs := retentionEpochs.Add(1).Mul(secondsPerEpoch)
now := genesisTime.Add(time.Duration(nowWrtGenesisSecs) * time.Second)

View File

@@ -530,12 +530,12 @@ func TestOriginOutsideRetention(t *testing.T) {
func TestFetchOriginSidecars(t *testing.T) {
ctx := t.Context()
beaconConfig := params.BeaconConfig()
cfg := params.BeaconConfig()
genesisTime := time.Date(2025, time.August, 10, 0, 0, 0, 0, time.UTC)
secondsPerSlot := beaconConfig.SecondsPerSlot
slotsPerEpoch := beaconConfig.SlotsPerEpoch
secondsPerSlot := cfg.SecondsPerSlot
slotsPerEpoch := cfg.SlotsPerEpoch
secondsPerEpoch := uint64(slotsPerEpoch.Mul(secondsPerSlot))
retentionEpochs := beaconConfig.MinEpochsForDataColumnSidecarsRequest
retentionEpochs := cfg.MinEpochsForDataColumnSidecarsRequest
genesisValidatorRoot := [fieldparams.RootLength]byte{}

View File

@@ -286,6 +286,7 @@ func (s *Service) updateMetrics() {
topicPeerCount.WithLabelValues(formattedTopic).Set(float64(len(s.cfg.p2p.PubSub().ListPeers(formattedTopic))))
}
subscribedTopicPeerCount.Reset()
for _, topic := range s.cfg.p2p.PubSub().GetTopics() {
subscribedTopicPeerCount.WithLabelValues(topic).Set(float64(len(s.cfg.p2p.PubSub().ListPeers(topic))))
}

View File

@@ -113,19 +113,19 @@ func (s *Service) blobSidecarByRootRPCHandler(ctx context.Context, msg interface
}
func validateBlobByRootRequest(blobIdents types.BlobSidecarsByRootReq, slot primitives.Slot) error {
beaconConfig := params.BeaconConfig()
cfg := params.BeaconConfig()
epoch := slots.ToEpoch(slot)
blobIdentCount := uint64(len(blobIdents))
if epoch >= beaconConfig.ElectraForkEpoch {
if blobIdentCount > beaconConfig.MaxRequestBlobSidecarsElectra {
if epoch >= cfg.ElectraForkEpoch {
if blobIdentCount > cfg.MaxRequestBlobSidecarsElectra {
return types.ErrMaxBlobReqExceeded
}
return nil
}
if blobIdentCount > beaconConfig.MaxRequestBlobSidecars {
if blobIdentCount > cfg.MaxRequestBlobSidecars {
return types.ErrMaxBlobReqExceeded
}

View File

@@ -38,8 +38,8 @@ func (s *Service) dataColumnSidecarsByRangeRPCHandler(ctx context.Context, msg i
defer cancel()
SetRPCStreamDeadlines(stream)
beaconConfig := params.BeaconConfig()
maxRequestDataColumnSidecars := beaconConfig.MaxRequestDataColumnSidecars
cfg := params.BeaconConfig()
maxRequestDataColumnSidecars := cfg.MaxRequestDataColumnSidecars
remotePeer := stream.Conn().RemotePeer()
log := log.WithFields(logrus.Fields{
@@ -102,7 +102,7 @@ func (s *Service) dataColumnSidecarsByRangeRPCHandler(ctx context.Context, msg i
// Once the quota is reached, we're done serving the request.
if maxRequestDataColumnSidecars == 0 {
log.WithField("initialQuota", beaconConfig.MaxRequestDataColumnSidecars).Trace("Reached quota for data column sidecars by range request")
log.WithField("initialQuota", cfg.MaxRequestDataColumnSidecars).Trace("Reached quota for data column sidecars by range request")
break
}
}

View File

@@ -31,9 +31,9 @@ import (
func TestDataColumnSidecarsByRangeRPCHandler(t *testing.T) {
params.SetupTestConfigCleanup(t)
beaconConfig := params.BeaconConfig()
beaconConfig.FuluForkEpoch = 0
params.OverrideBeaconConfig(beaconConfig)
cfg := params.BeaconConfig()
cfg.FuluForkEpoch = 0
params.OverrideBeaconConfig(cfg)
params.BeaconConfig().InitializeForkSchedule()
ctx := context.Background()
t.Run("wrong message type", func(t *testing.T) {

View File

@@ -163,9 +163,9 @@ func dataColumnsRPCMinValidSlot(currentSlot primitives.Slot) (primitives.Slot, e
return primitives.Slot(math.MaxUint64), nil
}
beaconConfig := params.BeaconConfig()
minReqEpochs := beaconConfig.MinEpochsForDataColumnSidecarsRequest
minStartEpoch := beaconConfig.FuluForkEpoch
cfg := params.BeaconConfig()
minReqEpochs := cfg.MinEpochsForDataColumnSidecarsRequest
minStartEpoch := cfg.FuluForkEpoch
currEpoch := slots.ToEpoch(currentSlot)
if currEpoch > minReqEpochs && currEpoch-minReqEpochs > minStartEpoch {

View File

@@ -28,9 +28,9 @@ import (
func TestDataColumnSidecarsByRootRPCHandler(t *testing.T) {
params.SetupTestConfigCleanup(t)
beaconConfig := params.BeaconConfig()
beaconConfig.FuluForkEpoch = 0
params.OverrideBeaconConfig(beaconConfig)
cfg := params.BeaconConfig()
cfg.FuluForkEpoch = 0
params.OverrideBeaconConfig(cfg)
params.BeaconConfig().InitializeForkSchedule()
ctxMap, err := ContextByteVersionsForValRoot(params.BeaconConfig().GenesisValidatorsRoot)
require.NoError(t, err)
@@ -43,9 +43,9 @@ func TestDataColumnSidecarsByRootRPCHandler(t *testing.T) {
t.Run("invalid request", func(t *testing.T) {
params.SetupTestConfigCleanup(t)
beaconConfig := params.BeaconConfig()
beaconConfig.MaxRequestDataColumnSidecars = 1
params.OverrideBeaconConfig(beaconConfig)
cfg := params.BeaconConfig()
cfg.MaxRequestDataColumnSidecars = 1
params.OverrideBeaconConfig(cfg)
localP2P := p2ptest.NewTestP2P(t)
service := &Service{cfg: &config{p2p: localP2P}}
@@ -96,9 +96,9 @@ func TestDataColumnSidecarsByRootRPCHandler(t *testing.T) {
}()
params.SetupTestConfigCleanup(t)
beaconConfig := params.BeaconConfig()
beaconConfig.FuluForkEpoch = 1
params.OverrideBeaconConfig(beaconConfig)
cfg := params.BeaconConfig()
cfg.FuluForkEpoch = 1
params.OverrideBeaconConfig(cfg)
localP2P := p2ptest.NewTestP2P(t)
clock := startup.NewClock(time.Now(), [fieldparams.RootLength]byte{})

View File

@@ -465,8 +465,8 @@ func SendDataColumnSidecarsByRangeRequest(
return nil, nil
}
beaconConfig := params.BeaconConfig()
numberOfColumns := beaconConfig.NumberOfColumns
cfg := params.BeaconConfig()
numberOfColumns := cfg.NumberOfColumns
maxRequestDataColumnSidecars := params.BeaconConfig().MaxRequestDataColumnSidecars
// Check if we do not request too many sidecars.

View File

@@ -889,9 +889,9 @@ func TestErrInvalidFetchedDataDistinction(t *testing.T) {
func TestSendDataColumnSidecarsByRangeRequest(t *testing.T) {
params.SetupTestConfigCleanup(t)
beaconConfig := params.BeaconConfig()
beaconConfig.FuluForkEpoch = 0
params.OverrideBeaconConfig(beaconConfig)
cfg := params.BeaconConfig()
cfg.FuluForkEpoch = 0
params.OverrideBeaconConfig(cfg)
params.BeaconConfig().InitializeForkSchedule()
ctxMap, err := ContextByteVersionsForValRoot(params.BeaconConfig().GenesisValidatorsRoot)
require.NoError(t, err)
@@ -923,9 +923,9 @@ func TestSendDataColumnSidecarsByRangeRequest(t *testing.T) {
t.Run("too many columns in request", func(t *testing.T) {
params.SetupTestConfigCleanup(t)
beaconConfig := params.BeaconConfig()
beaconConfig.MaxRequestDataColumnSidecars = 0
params.OverrideBeaconConfig(beaconConfig)
cfg := params.BeaconConfig()
cfg.MaxRequestDataColumnSidecars = 0
params.OverrideBeaconConfig(cfg)
request := &ethpb.DataColumnSidecarsByRangeRequest{Count: 1, Columns: []uint64{1, 2, 3}}
_, err := SendDataColumnSidecarsByRangeRequest(DataColumnSidecarsParams{Ctx: t.Context()}, "", request)
@@ -1193,9 +1193,9 @@ func TestIsSidecarIndexRequested(t *testing.T) {
func TestSendDataColumnSidecarsByRootRequest(t *testing.T) {
params.SetupTestConfigCleanup(t)
beaconConfig := params.BeaconConfig()
beaconConfig.FuluForkEpoch = 0
params.OverrideBeaconConfig(beaconConfig)
cfg := params.BeaconConfig()
cfg.FuluForkEpoch = 0
params.OverrideBeaconConfig(cfg)
params.BeaconConfig().InitializeForkSchedule()
ctxMap, err := ContextByteVersionsForValRoot(params.BeaconConfig().GenesisValidatorsRoot)
require.NoError(t, err)
@@ -1223,9 +1223,9 @@ func TestSendDataColumnSidecarsByRootRequest(t *testing.T) {
t.Run("too many columns in request", func(t *testing.T) {
params.SetupTestConfigCleanup(t)
beaconConfig := params.BeaconConfig()
beaconConfig.MaxRequestDataColumnSidecars = 4
params.OverrideBeaconConfig(beaconConfig)
cfg := params.BeaconConfig()
cfg.MaxRequestDataColumnSidecars = 4
params.OverrideBeaconConfig(cfg)
request := p2ptypes.DataColumnsByRootIdentifiers{
{Columns: []uint64{1, 2, 3}},

View File

@@ -445,7 +445,7 @@ func TestStatusRPCRequest_RequestSent(t *testing.T) {
custodyGroupCount = uint64(4)
)
beaconConfig := params.BeaconConfig()
cfg := params.BeaconConfig()
ctx := t.Context()
testCases := []struct {
@@ -456,7 +456,7 @@ func TestStatusRPCRequest_RequestSent(t *testing.T) {
}{
{
name: "before fulu",
fuluForkEpoch: beaconConfig.FarFutureEpoch,
fuluForkEpoch: cfg.FarFutureEpoch,
topic: "/eth2/beacon_chain/req/status/1/ssz_snappy",
streamHandler: func(service *Service, stream network.Stream, genesisState beaconState.BeaconState, beaconRoot, headRoot, finalizedRoot []byte) {
out := &ethpb.Status{}

View File

@@ -695,10 +695,10 @@ func (s *Service) dataColumnSubnetIndices(primitives.Slot) map[uint64]bool {
// the validators custody requirement, and whether the node is subscribed to all data subnets.
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/das-core.md#custody-sampling
func (s *Service) samplingSize() (uint64, error) {
beaconConfig := params.BeaconConfig()
cfg := params.BeaconConfig()
if flags.Get().SubscribeAllDataSubnets {
return beaconConfig.DataColumnSidecarSubnetCount, nil
return cfg.DataColumnSidecarSubnetCount, nil
}
// Compute the validators custody requirement.
@@ -712,7 +712,7 @@ func (s *Service) samplingSize() (uint64, error) {
return 0, errors.Wrap(err, "custody group count")
}
return max(beaconConfig.SamplesPerSlot, validatorsCustodyRequirement, custodyGroupCount), nil
return max(cfg.SamplesPerSlot, validatorsCustodyRequirement, custodyGroupCount), nil
}
func (s *Service) persistentAndAggregatorSubnetIndices(currentSlot primitives.Slot) map[uint64]bool {

View File

@@ -0,0 +1,3 @@
### Ignored
- Add log prefix to the light-client package.

View File

@@ -0,0 +1,3 @@
### Removed
- log mentioning removed flag `--show-deposit-data`

View File

@@ -0,0 +1,4 @@
### Added
- Metrics: Add count of peers per direction and type (inbound/outbound), (TCP/QUIC).
- `p2p_subscribed_topic_peer_total`: Reset to avoid dangling values.
- Add `p2p_minimum_peers_per_subnet` metric.

View File

@@ -0,0 +1,3 @@
### Ignored
- Return optimistic response only when handling blinded blocks in proposer

View File

@@ -0,0 +1,3 @@
### Changed
- Replace `time.After()` with `time.NewTimer()` and explicitly stop the timer when the context is cancelled

View File

@@ -0,0 +1,3 @@
### Ignored
- Use SlotTicker with offset instead of time.Ticker for attestation pool pruning to avoid conflicts with slot boundary operations

View File

@@ -221,10 +221,10 @@ func TestListAccounts_LocalKeymanager(t *testing.T) {
// Expected output format definition
const prologLength = 4
const accountLength = 4
const epilogLength = 2
const nameOffset = 1
const keyOffset = 2
const privkeyOffset = 3
const epilogLength = 1
const keyOffset = 1
const privkeyOffset = 2
// Require the output has correct number of lines
lineCount := prologLength + accountLength*numAccounts + epilogLength
@@ -242,7 +242,7 @@ func TestListAccounts_LocalKeymanager(t *testing.T) {
// Assert that account names are printed on the correct lines
for i, accountName := range accountNames {
lineNumber := prologLength + accountLength*i + nameOffset
lineNumber := prologLength + accountLength*i
accountNameFound := strings.Contains(lines[lineNumber], accountName)
assert.Equal(t, true, accountNameFound, "Account Name %s not found on line number %d", accountName, lineNumber)
}

View File

@@ -402,10 +402,6 @@ func (km *Keymanager) ListKeymanagerAccounts(ctx context.Context, cfg keymanager
} else {
fmt.Printf("Showing %d validator accounts\n", numAccounts)
}
fmt.Println(
au.BrightRed("View the eth1 deposit transaction data for your accounts " +
"by running `validator accounts list --show-deposit-data`"),
)
pubKeys, err := km.FetchValidatingPublicKeys(ctx)
if err != nil {