Compare commits

...

9 Commits

Author SHA1 Message Date
Manu NALEPA
f17ece0a8c Add p2p_minimum_peers_per_subnet metric. 2025-10-14 00:04:28 +02:00
Manu NALEPA
5f50e620c4 validateConfig:
- Use `Warning` with fields instead of `Warnf`.
- Avoid to both modify in place the input value and return it.
2025-10-14 00:04:28 +02:00
Manu NALEPA
a3dc2bb3b8 p2p_subscribed_topic_peer_total: Reset to avoid dangling values. 2025-10-14 00:04:28 +02:00
Manu NALEPA
e7c865f70e Add p2pMaxPeers and p2pPeerCountDirectionType metrics 2025-10-14 00:04:28 +02:00
Manu NALEPA
8957c45eb9 Rename variables and use range syntax. 2025-10-14 00:04:28 +02:00
Manu NALEPA
bdb6be2601 Group types. (No functional changes) 2025-10-14 00:04:28 +02:00
Manu NALEPA
a2b88359c6 Define TCP and QUIC as InternetProtocol (no functional change). 2025-10-14 00:04:28 +02:00
Manu NALEPA
8bdf9ee3c5 TestVerifyDataColumnSidecar: Refactor using test cases. 2025-10-14 00:04:28 +02:00
Manu NALEPA
de5cddb812 VerifyDataColumnSidecar: Check if there is no too many commitments. 2025-10-14 00:04:28 +02:00
7 changed files with 115 additions and 62 deletions

View File

@@ -43,6 +43,13 @@ func VerifyDataColumnSidecar(sidecar blocks.RODataColumn) error {
return ErrNoKzgCommitments
}
// A sidecar with more commitments than the max blob count for this block is invalid.
slot := sidecar.Slot()
maxBlobsPerBlock := params.BeaconConfig().MaxBlobsPerBlock(slot)
if len(sidecar.KzgCommitments) > maxBlobsPerBlock {
return ErrTooManyCommitments
}
// The column length must be equal to the number of commitments/proofs.
if len(sidecar.Column) != len(sidecar.KzgCommitments) || len(sidecar.Column) != len(sidecar.KzgProofs) {
return ErrMismatchLength

View File

@@ -18,38 +18,46 @@ import (
)
func TestVerifyDataColumnSidecar(t *testing.T) {
t.Run("index too large", func(t *testing.T) {
roSidecar := createTestSidecar(t, 1_000_000, nil, nil, nil)
err := peerdas.VerifyDataColumnSidecar(roSidecar)
require.ErrorIs(t, err, peerdas.ErrIndexTooLarge)
})
testCases := []struct {
name string
index uint64
blobCount int
commitmentCount int
proofCount int
maxBlobsPerBlock uint64
expectedError error
}{
{name: "index too large", index: 1_000_000, expectedError: peerdas.ErrIndexTooLarge},
{name: "no commitments", expectedError: peerdas.ErrNoKzgCommitments},
{name: "too many commitments", blobCount: 10, commitmentCount: 10, proofCount: 10, maxBlobsPerBlock: 2, expectedError: peerdas.ErrTooManyCommitments},
{name: "commitments size mismatch", commitmentCount: 1, maxBlobsPerBlock: 1, expectedError: peerdas.ErrMismatchLength},
{name: "proofs size mismatch", blobCount: 1, commitmentCount: 1, maxBlobsPerBlock: 1, expectedError: peerdas.ErrMismatchLength},
{name: "nominal", blobCount: 1, commitmentCount: 1, proofCount: 1, maxBlobsPerBlock: 1, expectedError: nil},
}
t.Run("no commitments", func(t *testing.T) {
roSidecar := createTestSidecar(t, 0, nil, nil, nil)
err := peerdas.VerifyDataColumnSidecar(roSidecar)
require.ErrorIs(t, err, peerdas.ErrNoKzgCommitments)
})
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
params.SetupTestConfigCleanup(t)
cfg := params.BeaconConfig()
cfg.FuluForkEpoch = 0
cfg.BlobSchedule = []params.BlobScheduleEntry{{Epoch: 0, MaxBlobsPerBlock: tc.maxBlobsPerBlock}}
params.OverrideBeaconConfig(cfg)
t.Run("KZG commitments size mismatch", func(t *testing.T) {
kzgCommitments := make([][]byte, 1)
roSidecar := createTestSidecar(t, 0, nil, kzgCommitments, nil)
err := peerdas.VerifyDataColumnSidecar(roSidecar)
require.ErrorIs(t, err, peerdas.ErrMismatchLength)
})
column := make([][]byte, tc.blobCount)
kzgCommitments := make([][]byte, tc.commitmentCount)
kzgProof := make([][]byte, tc.proofCount)
t.Run("KZG proofs size mismatch", func(t *testing.T) {
column, kzgCommitments := make([][]byte, 1), make([][]byte, 1)
roSidecar := createTestSidecar(t, 0, column, kzgCommitments, nil)
err := peerdas.VerifyDataColumnSidecar(roSidecar)
require.ErrorIs(t, err, peerdas.ErrMismatchLength)
})
roSidecar := createTestSidecar(t, tc.index, column, kzgCommitments, kzgProof)
err := peerdas.VerifyDataColumnSidecar(roSidecar)
t.Run("nominal", func(t *testing.T) {
column, kzgCommitments, kzgProofs := make([][]byte, 1), make([][]byte, 1), make([][]byte, 1)
roSidecar := createTestSidecar(t, 0, column, kzgCommitments, kzgProofs)
err := peerdas.VerifyDataColumnSidecar(roSidecar)
require.NoError(t, err)
})
if tc.expectedError != nil {
require.ErrorIs(t, err, tc.expectedError)
return
}
require.NoError(t, err)
})
}
}
func TestVerifyDataColumnSidecarKZGProofs(t *testing.T) {

View File

@@ -7,6 +7,7 @@ import (
statefeed "github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed/state"
"github.com/OffchainLabs/prysm/v6/beacon-chain/db"
"github.com/OffchainLabs/prysm/v6/beacon-chain/startup"
"github.com/sirupsen/logrus"
)
// This is the default queue size used if we have specified an invalid one.
@@ -63,12 +64,17 @@ func (cfg *Config) connManagerLowHigh() (int, int) {
return low, high
}
// validateConfig validates whether the values provided are accurate and will set
// the appropriate values for those that are invalid.
func validateConfig(cfg *Config) *Config {
if cfg.QueueSize == 0 {
log.Warnf("Invalid pubsub queue size of %d initialized, setting the quese size as %d instead", cfg.QueueSize, defaultPubsubQueueSize)
cfg.QueueSize = defaultPubsubQueueSize
// validateConfig validates whether the provided config has valid values and sets
// the invalid ones to default.
func validateConfig(cfg *Config) {
if cfg.QueueSize > 0 {
return
}
return cfg
log.WithFields(logrus.Fields{
"queueSize": cfg.QueueSize,
"default": defaultPubsubQueueSize,
}).Warning("Invalid pubsub queue size, setting the queue size to the default value")
cfg.QueueSize = defaultPubsubQueueSize
}

View File

@@ -3,6 +3,7 @@ package p2p
import (
"strings"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/prometheus/client_golang/prometheus"
@@ -26,12 +27,25 @@ var (
Help: "The number of peers in a given state.",
},
[]string{"state"})
p2pMaxPeers = promauto.NewGauge(prometheus.GaugeOpts{
Name: "p2p_max_peers",
Help: "The target maximum number of peers.",
})
p2pPeerCountDirectionType = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "p2p_peer_count_direction_type",
Help: "The number of peers in a given direction and type.",
},
[]string{"direction", "type"})
connectedPeersCount = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "connected_libp2p_peers",
Help: "Tracks the total number of connected libp2p peers by agent string",
},
[]string{"agent"},
)
minimumPeersPerSubnet = promauto.NewGauge(prometheus.GaugeOpts{
Name: "p2p_minimum_peers_per_subnet",
Help: "The minimum number of peers to connect to per subnet",
})
avgScoreConnectedClients = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "connected_libp2p_peers_average_scores",
Help: "Tracks the overall p2p scores of connected libp2p peers by agent string",
@@ -174,18 +188,26 @@ var (
)
func (s *Service) updateMetrics() {
store := s.Host().Peerstore()
connectedPeers := s.peers.Connected()
p2pPeerCount.WithLabelValues("Connected").Set(float64(len(connectedPeers)))
p2pPeerCount.WithLabelValues("Disconnected").Set(float64(len(s.peers.Disconnected())))
p2pPeerCount.WithLabelValues("Connecting").Set(float64(len(s.peers.Connecting())))
p2pPeerCount.WithLabelValues("Disconnecting").Set(float64(len(s.peers.Disconnecting())))
p2pPeerCount.WithLabelValues("Bad").Set(float64(len(s.peers.Bad())))
store := s.Host().Peerstore()
numConnectedPeersByClient := make(map[string]float64)
upperTCP := strings.ToUpper(string(peers.TCP))
upperQUIC := strings.ToUpper(string(peers.QUIC))
p2pPeerCountDirectionType.WithLabelValues("inbound", upperTCP).Set(float64(len(s.peers.InboundConnectedWithProtocol(peers.TCP))))
p2pPeerCountDirectionType.WithLabelValues("inbound", upperQUIC).Set(float64(len(s.peers.InboundConnectedWithProtocol(peers.QUIC))))
p2pPeerCountDirectionType.WithLabelValues("outbound", upperTCP).Set(float64(len(s.peers.OutboundConnectedWithProtocol(peers.TCP))))
p2pPeerCountDirectionType.WithLabelValues("outbound", upperQUIC).Set(float64(len(s.peers.OutboundConnectedWithProtocol(peers.QUIC))))
connectedPeersCountByClient := make(map[string]float64)
peerScoresByClient := make(map[string][]float64)
for i := 0; i < len(connectedPeers); i++ {
p := connectedPeers[i]
for _, p := range connectedPeers {
pid, err := peer.Decode(p.String())
if err != nil {
log.WithError(err).Debug("Could not decode peer string")
@@ -193,16 +215,18 @@ func (s *Service) updateMetrics() {
}
foundName := agentFromPid(pid, store)
numConnectedPeersByClient[foundName] += 1
connectedPeersCountByClient[foundName] += 1
// Get peer scoring data.
overallScore := s.peers.Scorers().Score(pid)
peerScoresByClient[foundName] = append(peerScoresByClient[foundName], overallScore)
}
connectedPeersCount.Reset() // Clear out previous results.
for agent, total := range numConnectedPeersByClient {
for agent, total := range connectedPeersCountByClient {
connectedPeersCount.WithLabelValues(agent).Set(total)
}
avgScoreConnectedClients.Reset() // Clear out previous results.
for agent, scoringData := range peerScoresByClient {
avgScore := average(scoringData)

View File

@@ -81,29 +81,31 @@ const (
type InternetProtocol string
const (
TCP = "tcp"
QUIC = "quic"
TCP = InternetProtocol("tcp")
QUIC = InternetProtocol("quic")
)
// Status is the structure holding the peer status information.
type Status struct {
ctx context.Context
scorers *scorers.Service
store *peerdata.Store
ipTracker map[string]uint64
rand *rand.Rand
ipColocationWhitelist []*net.IPNet
}
type (
// Status is the structure holding the peer status information.
Status struct {
ctx context.Context
scorers *scorers.Service
store *peerdata.Store
ipTracker map[string]uint64
rand *rand.Rand
ipColocationWhitelist []*net.IPNet
}
// StatusConfig represents peer status service params.
type StatusConfig struct {
// PeerLimit specifies maximum amount of concurrent peers that are expected to be connect to the node.
PeerLimit int
// ScorerParams holds peer scorer configuration params.
ScorerParams *scorers.Config
// IPColocationWhitelist contains CIDR ranges that are exempt from IP colocation limits.
IPColocationWhitelist []*net.IPNet
}
// StatusConfig represents peer status service params.
StatusConfig struct {
// PeerLimit specifies maximum amount of concurrent peers that are expected to be connect to the node.
PeerLimit int
// ScorerParams holds peer scorer configuration params.
ScorerParams *scorers.Config
// IPColocationWhitelist contains CIDR ranges that are exempt from IP colocation limits.
IPColocationWhitelist []*net.IPNet
}
)
// NewStatus creates a new status entity.
func NewStatus(ctx context.Context, config *StatusConfig) *Status {

View File

@@ -14,6 +14,7 @@ import (
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers/scorers"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/types"
"github.com/OffchainLabs/prysm/v6/cmd/beacon-chain/flags"
"github.com/OffchainLabs/prysm/v6/config/features"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
@@ -106,12 +107,16 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) {
ctx, cancel := context.WithCancel(ctx)
_ = cancel // govet fix for lost cancel. Cancel is handled in service.Stop().
cfg = validateConfig(cfg)
validateConfig(cfg)
privKey, err := privKey(cfg)
if err != nil {
return nil, errors.Wrapf(err, "failed to generate p2p private key")
}
p2pMaxPeers.Set(float64(cfg.MaxPeers))
minimumPeersPerSubnet.Set(float64(flags.Get().MinimumPeersPerSubnet))
metaData, err := metaDataFromDB(ctx, cfg.DB)
if err != nil {
log.WithError(err).Error("Failed to create peer metadata")

View File

@@ -279,6 +279,7 @@ func (s *Service) updateMetrics() {
topicPeerCount.WithLabelValues(formattedTopic).Set(float64(len(s.cfg.p2p.PubSub().ListPeers(formattedTopic))))
}
subscribedTopicPeerCount.Reset()
for _, topic := range s.cfg.p2p.PubSub().GetTopics() {
subscribedTopicPeerCount.WithLabelValues(topic).Set(float64(len(s.cfg.p2p.PubSub().ListPeers(topic))))
}