Files
prysm/beacon-chain/sync/subscriber.go
Manu NALEPA fa056c2d21 Move the "Not enough connected peers" (for a given subnet) from WARN to DEBUG (#16087)
**What type of PR is this?**
Other

**What does this PR do? Why is it needed?**
Move the "Not enough connected peers" (for a given subnet) from WARN to
DEBUG

**Rationale:**
The
<img width="1839" height="31" alt="image"
src="https://github.com/user-attachments/assets/44dbdc8d-3e37-42ee-967b-75a7a1fbcafb"
/>
log is (potentially) printed every 5 minutes.
Every 5 minutes, the BN checks if, for a given subnet, the actual count
of peers is at least equal to a minimum one.
If not, this kind of log is printed.

When validators are connected and selected to be an aggregator in the
next epoch, the BN needs to subscribe and find new peers in the
corresponding attestation subnet.
If, right after the beacon is subscribed (but before it had time to find
peers), the "5 min ticker" ticks, then this warning log is displayed,
even if the slot for which the validator is selected as an aggregator is
still minutes away.

For this reason, this log is moved from WARN to DEBUG

**Acknowledgements**

- [x] I have read
[CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md).
- [x] I have included a uniquely named [changelog fragment
file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd).
- [x] I have added a description to this PR with sufficient context for
reviewers to understand this PR.
2025-12-03 11:07:24 +00:00

873 lines
29 KiB
Go

package sync
import (
"context"
"fmt"
"reflect"
"runtime/debug"
"strings"
"sync"
"time"
"github.com/OffchainLabs/prysm/v7/beacon-chain/cache"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/altair"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/helpers"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/peers"
"github.com/OffchainLabs/prysm/v7/beacon-chain/startup"
"github.com/OffchainLabs/prysm/v7/cmd/beacon-chain/flags"
"github.com/OffchainLabs/prysm/v7/config/features"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v7/monitoring/tracing"
"github.com/OffchainLabs/prysm/v7/monitoring/tracing/trace"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/runtime/messagehandler"
"github.com/OffchainLabs/prysm/v7/time/slots"
"github.com/ethereum/go-ethereum/common/hexutil"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
)
const pubsubMessageTimeout = 30 * time.Second
var errInvalidDigest = errors.New("invalid digest")
// wrappedVal represents a gossip validator which also returns an error along with the result.
type wrappedVal func(context.Context, peer.ID, *pubsub.Message) (pubsub.ValidationResult, error)
// subHandler represents handler for a given subscription.
type subHandler func(context.Context, proto.Message) error
// noopHandler is used for subscriptions that do not require anything to be done.
var noopHandler subHandler = func(ctx context.Context, msg proto.Message) error {
return nil
}
// subscribeParameters holds the parameters that are needed to construct a set of subscriptions topics for a given
// set of gossipsub subnets.
type subscribeParameters struct {
topicFormat string
validate wrappedVal
handle subHandler
nse params.NetworkScheduleEntry
// getSubnetsToJoin is a function that returns all subnets the node should join.
getSubnetsToJoin func(currentSlot primitives.Slot) map[uint64]bool
// getSubnetsRequiringPeers is a function that returns all subnets that require peers to be found
// but for which no subscriptions are needed.
getSubnetsRequiringPeers func(currentSlot primitives.Slot) map[uint64]bool
}
// shortTopic is a less verbose version of topic strings used for logging.
func (p subscribeParameters) shortTopic() string {
short := p.topicFormat
fmtLen := len(short)
if fmtLen >= 3 && short[fmtLen-3:] == "_%d" {
short = short[:fmtLen-3]
}
return fmt.Sprintf(short, p.nse.ForkDigest)
}
func (p subscribeParameters) logFields() logrus.Fields {
return logrus.Fields{
"topic": p.shortTopic(),
}
}
// fullTopic is the fully qualified topic string, given to gossipsub.
func (p subscribeParameters) fullTopic(subnet uint64, suffix string) string {
return fmt.Sprintf(p.topicFormat, p.nse.ForkDigest, subnet) + suffix
}
// subnetTracker keeps track of which subnets we are subscribed to, out of the set of
// possible subnets described by a `subscribeParameters`.
type subnetTracker struct {
subscribeParameters
mu sync.RWMutex
subscriptions map[uint64]*pubsub.Subscription
}
func newSubnetTracker(p subscribeParameters) *subnetTracker {
return &subnetTracker{
subscribeParameters: p,
subscriptions: make(map[uint64]*pubsub.Subscription),
}
}
// unwanted takes a list of wanted subnets and returns a list of currently subscribed subnets that are not included.
func (t *subnetTracker) unwanted(wanted map[uint64]bool) []uint64 {
t.mu.RLock()
defer t.mu.RUnlock()
unwanted := make([]uint64, 0, len(t.subscriptions))
for subnet := range t.subscriptions {
if wanted == nil || !wanted[subnet] {
unwanted = append(unwanted, subnet)
}
}
return unwanted
}
// missing takes a list of wanted subnets and returns a list of wanted subnets that are not currently tracked.
func (t *subnetTracker) missing(wanted map[uint64]bool) []uint64 {
t.mu.RLock()
defer t.mu.RUnlock()
missing := make([]uint64, 0, len(wanted))
for subnet := range wanted {
if _, ok := t.subscriptions[subnet]; !ok {
missing = append(missing, subnet)
}
}
return missing
}
// cancelSubscription cancels and removes the subscription for a given subnet.
func (t *subnetTracker) cancelSubscription(subnet uint64) {
t.mu.Lock()
defer t.mu.Unlock()
defer delete(t.subscriptions, subnet)
sub := t.subscriptions[subnet]
if sub == nil {
return
}
sub.Cancel()
}
// track asks subscriptionTracker to hold on to the subscription for a given subnet so
// that we can remember that it is tracked and cancel its context when it's time to unsubscribe.
func (t *subnetTracker) track(subnet uint64, sub *pubsub.Subscription) {
if sub == nil {
return
}
t.mu.Lock()
defer t.mu.Unlock()
t.subscriptions[subnet] = sub
}
// noopValidator is a no-op that only decodes the message, but does not check its contents.
func (s *Service) noopValidator(_ context.Context, _ peer.ID, msg *pubsub.Message) (pubsub.ValidationResult, error) {
m, err := s.decodePubsubMessage(msg)
if err != nil {
log.WithError(err).Debug("Could not decode message")
return pubsub.ValidationReject, nil
}
msg.ValidatorData = m
return pubsub.ValidationAccept, nil
}
func mapFromCount(count uint64) map[uint64]bool {
result := make(map[uint64]bool, count)
for item := range count {
result[item] = true
}
return result
}
func mapFromSlice(slices ...[]uint64) map[uint64]bool {
result := make(map[uint64]bool)
for _, slice := range slices {
for _, item := range slice {
result[item] = true
}
}
return result
}
func (s *Service) activeSyncSubnetIndices(currentSlot primitives.Slot) map[uint64]bool {
if flags.Get().SubscribeToAllSubnets {
return mapFromCount(params.BeaconConfig().SyncCommitteeSubnetCount)
}
currentEpoch := slots.ToEpoch(currentSlot)
subscriptions := cache.SyncSubnetIDs.GetAllSubnets(currentEpoch)
return mapFromSlice(subscriptions)
}
// spawn allows the Service to use a custom function for launching goroutines.
// This is useful in tests where we can set spawner to a sync.WaitGroup and
// wait for the spawned goroutines to finish.
func (s *Service) spawn(f func()) {
if s.subscriptionSpawner != nil {
s.subscriptionSpawner(f)
} else {
go f()
}
}
// Register PubSub subscribers
func (s *Service) registerSubscribers(nse params.NetworkScheduleEntry) bool {
// If we have already registered for this fork digest, exit early.
if s.digestActionDone(nse.ForkDigest, registerGossipOnce) {
return false
}
s.spawn(func() {
s.subscribe(p2p.BlockSubnetTopicFormat, s.validateBeaconBlockPubSub, s.beaconBlockSubscriber, nse)
})
s.spawn(func() {
s.subscribe(p2p.AggregateAndProofSubnetTopicFormat, s.validateAggregateAndProof, s.beaconAggregateProofSubscriber, nse)
})
s.spawn(func() {
s.subscribe(p2p.ExitSubnetTopicFormat, s.validateVoluntaryExit, s.voluntaryExitSubscriber, nse)
})
s.spawn(func() {
s.subscribe(p2p.ProposerSlashingSubnetTopicFormat, s.validateProposerSlashing, s.proposerSlashingSubscriber, nse)
})
s.spawn(func() {
s.subscribe(p2p.AttesterSlashingSubnetTopicFormat, s.validateAttesterSlashing, s.attesterSlashingSubscriber, nse)
})
s.spawn(func() {
s.subscribeWithParameters(subscribeParameters{
topicFormat: p2p.AttestationSubnetTopicFormat,
validate: s.validateCommitteeIndexBeaconAttestation,
handle: s.committeeIndexBeaconAttestationSubscriber,
getSubnetsToJoin: s.persistentAndAggregatorSubnetIndices,
getSubnetsRequiringPeers: attesterSubnetIndices,
nse: nse,
})
})
// New gossip topic in Altair
if params.BeaconConfig().AltairForkEpoch <= nse.Epoch {
s.spawn(func() {
s.subscribe(
p2p.SyncContributionAndProofSubnetTopicFormat,
s.validateSyncContributionAndProof,
s.syncContributionAndProofSubscriber,
nse,
)
})
s.spawn(func() {
s.subscribeWithParameters(subscribeParameters{
topicFormat: p2p.SyncCommitteeSubnetTopicFormat,
validate: s.validateSyncCommitteeMessage,
handle: s.syncCommitteeMessageSubscriber,
getSubnetsToJoin: s.activeSyncSubnetIndices,
nse: nse,
})
})
if features.Get().EnableLightClient {
s.spawn(func() {
s.subscribe(
p2p.LightClientOptimisticUpdateTopicFormat,
s.validateLightClientOptimisticUpdate,
noopHandler,
nse,
)
})
s.spawn(func() {
s.subscribe(
p2p.LightClientFinalityUpdateTopicFormat,
s.validateLightClientFinalityUpdate,
noopHandler,
nse,
)
})
}
}
// New gossip topic in Capella
if params.BeaconConfig().CapellaForkEpoch <= nse.Epoch {
s.spawn(func() {
s.subscribe(
p2p.BlsToExecutionChangeSubnetTopicFormat,
s.validateBlsToExecutionChange,
s.blsToExecutionChangeSubscriber,
nse,
)
})
}
// New gossip topic in Deneb, removed in Electra
if params.BeaconConfig().DenebForkEpoch <= nse.Epoch && nse.Epoch < params.BeaconConfig().ElectraForkEpoch {
s.spawn(func() {
s.subscribeWithParameters(subscribeParameters{
topicFormat: p2p.BlobSubnetTopicFormat,
validate: s.validateBlob,
handle: s.blobSubscriber,
nse: nse,
getSubnetsToJoin: func(primitives.Slot) map[uint64]bool {
return mapFromCount(params.BeaconConfig().BlobsidecarSubnetCount)
},
})
})
}
// New gossip topic in Electra, removed in Fulu
if params.BeaconConfig().ElectraForkEpoch <= nse.Epoch && nse.Epoch < params.BeaconConfig().FuluForkEpoch {
s.spawn(func() {
s.subscribeWithParameters(subscribeParameters{
topicFormat: p2p.BlobSubnetTopicFormat,
validate: s.validateBlob,
handle: s.blobSubscriber,
nse: nse,
getSubnetsToJoin: func(currentSlot primitives.Slot) map[uint64]bool {
return mapFromCount(params.BeaconConfig().BlobsidecarSubnetCountElectra)
},
})
})
}
// New gossip topic in Fulu.
if params.BeaconConfig().FuluForkEpoch <= nse.Epoch {
s.spawn(func() {
s.subscribeWithParameters(subscribeParameters{
topicFormat: p2p.DataColumnSubnetTopicFormat,
validate: s.validateDataColumn,
handle: s.dataColumnSubscriber,
nse: nse,
getSubnetsToJoin: s.dataColumnSubnetIndices,
getSubnetsRequiringPeers: s.allDataColumnSubnets,
})
})
}
return true
}
func (s *Service) subscriptionRequestExpired(nse params.NetworkScheduleEntry) bool {
next := params.NextNetworkScheduleEntry(nse.Epoch)
return next.Epoch != nse.Epoch && s.cfg.clock.CurrentEpoch() > next.Epoch
}
func (s *Service) subscribeLogFields(topic string, nse params.NetworkScheduleEntry) logrus.Fields {
return logrus.Fields{
"topic": topic,
"digest": nse.ForkDigest,
"forkEpoch": nse.Epoch,
"currentEpoch": s.cfg.clock.CurrentEpoch(),
}
}
// subscribe to a given topic with a given validator and subscription handler.
// The base protobuf message is used to initialize new messages for decoding.
func (s *Service) subscribe(topic string, validator wrappedVal, handle subHandler, nse params.NetworkScheduleEntry) {
if err := s.waitForInitialSync(s.ctx); err != nil {
log.WithFields(s.subscribeLogFields(topic, nse)).WithError(err).Debug("Context cancelled while waiting for initial sync, not subscribing to topic")
return
}
// Check if this subscribe request is still valid - we may have crossed another fork epoch while waiting for initial sync.
if s.subscriptionRequestExpired(nse) {
// If we are already past the next fork epoch, do not subscribe to this topic.
log.WithFields(s.subscribeLogFields(topic, nse)).Debug("Not subscribing to topic as we are already past the next fork epoch")
return
}
base := p2p.GossipTopicMappings(topic, nse.Epoch)
if base == nil {
// Impossible condition as it would mean topic does not exist.
panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topic)) // lint:nopanic -- Impossible condition.
}
s.subscribeWithBase(s.addDigestToTopic(topic, nse.ForkDigest), validator, handle)
}
func (s *Service) subscribeWithBase(topic string, validator wrappedVal, handle subHandler) *pubsub.Subscription {
topic += s.cfg.p2p.Encoding().ProtocolSuffix()
log := log.WithField("topic", topic)
// Do not resubscribe already seen subscriptions.
ok := s.subHandler.topicExists(topic)
if ok {
log.WithField("topic", topic).Error("Provided topic already has an active subscription running")
return nil
}
if err := s.cfg.p2p.PubSub().RegisterTopicValidator(s.wrapAndReportValidation(topic, validator)); err != nil {
log.WithError(err).Error("Could not register validator for topic")
return nil
}
sub, err := s.cfg.p2p.SubscribeToTopic(topic)
if err != nil {
// Any error subscribing to a PubSub topic would be the result of a misconfiguration of
// libp2p PubSub library or a subscription request to a topic that fails to match the topic
// subscription filter.
log.WithError(err).Error("Could not subscribe topic")
return nil
}
s.subHandler.addTopic(sub.Topic(), sub)
// Pipeline decodes the incoming subscription data, runs the validation, and handles the
// message.
pipeline := func(msg *pubsub.Message) {
ctx, cancel := context.WithTimeout(s.ctx, pubsubMessageTimeout)
defer cancel()
ctx, span := trace.StartSpan(ctx, "sync.pubsub")
defer span.End()
defer func() {
if r := recover(); r != nil {
tracing.AnnotateError(span, fmt.Errorf("panic occurred: %v", r))
log.WithField("error", r).
WithField("recoveredAt", "subscribeWithBase").
WithField("stack", string(debug.Stack())).
Error("Panic occurred")
}
}()
span.SetAttributes(trace.StringAttribute("topic", topic))
if msg.ValidatorData == nil {
log.Error("Received nil message on pubsub")
messageFailedProcessingCounter.WithLabelValues(topic).Inc()
return
}
if err := handle(ctx, msg.ValidatorData.(proto.Message)); err != nil {
tracing.AnnotateError(span, err)
log.WithError(err).Error("Could not handle p2p pubsub")
messageFailedProcessingCounter.WithLabelValues(topic).Inc()
return
}
}
// The main message loop for receiving incoming messages from this subscription.
messageLoop := func() {
for {
msg, err := sub.Next(s.ctx)
if err != nil {
// This should only happen when the context is cancelled or subscription is cancelled.
if !errors.Is(err, pubsub.ErrSubscriptionCancelled) { // Only log a warning on unexpected errors.
log.WithError(err).Warn("Subscription next failed")
}
// Cancel subscription in the event of an error, as we are
// now exiting topic event loop.
sub.Cancel()
return
}
if msg.ReceivedFrom == s.cfg.p2p.PeerID() {
continue
}
go pipeline(msg)
}
}
go messageLoop()
log.WithField("topic", topic).Info("Subscribed to")
return sub
}
// Wrap the pubsub validator with a metric monitoring function. This function increments the
// appropriate counter if the particular message fails to validate.
func (s *Service) wrapAndReportValidation(topic string, v wrappedVal) (string, pubsub.ValidatorEx) {
return topic, func(ctx context.Context, pid peer.ID, msg *pubsub.Message) (res pubsub.ValidationResult) {
defer messagehandler.HandlePanic(ctx, msg)
// Default: ignore any message that panics.
res = pubsub.ValidationIgnore // nolint:wastedassign
ctx, cancel := context.WithTimeout(ctx, pubsubMessageTimeout)
defer cancel()
messageReceivedCounter.WithLabelValues(topic).Inc()
if msg.Topic == nil {
messageFailedValidationCounter.WithLabelValues(topic).Inc()
return pubsub.ValidationReject
}
// Ignore any messages received before chainstart.
if s.chainStarted.IsNotSet() {
messageIgnoredValidationCounter.WithLabelValues(topic).Inc()
return pubsub.ValidationIgnore
}
retDigest, err := p2p.ExtractGossipDigest(topic)
if err != nil {
log.WithField("topic", topic).Errorf("Invalid topic format of pubsub topic: %v", err)
return pubsub.ValidationIgnore
}
currDigest, err := s.currentForkDigest()
if err != nil {
log.WithField("topic", topic).Errorf("Unable to retrieve fork data: %v", err)
return pubsub.ValidationIgnore
}
if currDigest != retDigest {
log.WithField("topic", topic).Debugf("Received message from outdated fork digest %#x", retDigest)
return pubsub.ValidationIgnore
}
b, err := v(ctx, pid, msg)
// We do not penalize peers if we are hitting pubsub timeouts
// trying to process those messages.
if b == pubsub.ValidationReject && ctx.Err() != nil {
b = pubsub.ValidationIgnore
}
if b == pubsub.ValidationReject {
fields := logrus.Fields{
"topic": topic,
"multiaddress": multiAddr(pid, s.cfg.p2p.Peers()),
"peerID": pid.String(),
"agent": agentString(pid, s.cfg.p2p.Host()),
"gossipScore": s.cfg.p2p.Peers().Scorers().GossipScorer().Score(pid),
}
if features.Get().EnableFullSSZDataLogging {
fields["message"] = hexutil.Encode(msg.Data)
}
log.WithError(err).WithFields(fields).Debug("Gossip message was rejected")
messageFailedValidationCounter.WithLabelValues(topic).Inc()
}
if b == pubsub.ValidationIgnore {
if err != nil && !errorIsIgnored(err) {
log.WithError(err).WithFields(logrus.Fields{
"topic": topic,
"multiaddress": multiAddr(pid, s.cfg.p2p.Peers()),
"peerID": pid.String(),
"agent": agentString(pid, s.cfg.p2p.Host()),
"gossipScore": fmt.Sprintf("%.2f", s.cfg.p2p.Peers().Scorers().GossipScorer().Score(pid)),
}).Debug("Gossip message was ignored")
}
messageIgnoredValidationCounter.WithLabelValues(topic).Inc()
}
return b
}
}
// pruneNotWanted unsubscribes from topics we are currently subscribed to but that are
// not in the list of wanted subnets.
func (s *Service) pruneNotWanted(t *subnetTracker, wantedSubnets map[uint64]bool) {
for _, subnet := range t.unwanted(wantedSubnets) {
t.cancelSubscription(subnet)
s.unSubscribeFromTopic(t.fullTopic(subnet, s.cfg.p2p.Encoding().ProtocolSuffix()))
}
}
// subscribeWithParameters subscribes to a list of subnets.
func (s *Service) subscribeWithParameters(p subscribeParameters) {
ctx, cancel := context.WithCancel(s.ctx)
defer cancel()
tracker := newSubnetTracker(p)
go s.ensurePeers(ctx, tracker)
go s.logMinimumPeersPerSubnet(ctx, p)
if err := s.waitForInitialSync(ctx); err != nil {
log.WithFields(p.logFields()).WithError(err).Debug("Could not subscribe to subnets as initial sync failed")
return
}
s.trySubscribeSubnets(tracker)
slotTicker := slots.NewSlotTicker(s.cfg.clock.GenesisTime(), params.BeaconConfig().SecondsPerSlot)
defer slotTicker.Done()
for {
select {
case <-slotTicker.C():
// Check if this subscribe request is still valid - we may have crossed another fork epoch while waiting for initial sync.
if s.subscriptionRequestExpired(p.nse) {
// If we are already past the next fork epoch, do not subscribe to this topic.
log.WithFields(logrus.Fields{
"topic": p.shortTopic(),
"digest": p.nse.ForkDigest,
"epoch": p.nse.Epoch,
"currentEpoch": s.cfg.clock.CurrentEpoch(),
}).Debug("Exiting topic subnet subscription loop")
return
}
s.trySubscribeSubnets(tracker)
case <-s.ctx.Done():
return
}
}
}
// trySubscribeSubnets attempts to subscribe to any missing subnets that we should be subscribed to.
// Only if initial sync is complete.
func (s *Service) trySubscribeSubnets(t *subnetTracker) {
subnetsToJoin := t.getSubnetsToJoin(s.cfg.clock.CurrentSlot())
s.pruneNotWanted(t, subnetsToJoin)
for _, subnet := range t.missing(subnetsToJoin) {
// TODO: subscribeWithBase appends the protocol suffix, other methods don't. Make this consistent.
topic := t.fullTopic(subnet, "")
t.track(subnet, s.subscribeWithBase(topic, t.validate, t.handle))
}
}
func (s *Service) ensurePeers(ctx context.Context, tracker *subnetTracker) {
// Try once immediately so we don't have to wait until the next slot.
s.tryEnsurePeers(ctx, tracker)
oncePerSlot := slots.NewSlotTicker(s.cfg.clock.GenesisTime(), params.BeaconConfig().SecondsPerSlot)
defer oncePerSlot.Done()
for {
select {
case <-oncePerSlot.C():
s.tryEnsurePeers(ctx, tracker)
case <-ctx.Done():
return
}
}
}
func (s *Service) tryEnsurePeers(ctx context.Context, tracker *subnetTracker) {
timeout := (time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second) - 100*time.Millisecond
minPeers := flags.Get().MinimumPeersPerSubnet
neededSubnets := computeAllNeededSubnets(s.cfg.clock.CurrentSlot(), tracker.getSubnetsToJoin, tracker.getSubnetsRequiringPeers)
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
err := s.cfg.p2p.FindAndDialPeersWithSubnets(ctx, tracker.topicFormat, tracker.nse.ForkDigest, minPeers, neededSubnets)
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
log.WithFields(tracker.logFields()).WithError(err).Debug("Could not find peers with subnets")
}
}
func (s *Service) logMinimumPeersPerSubnet(ctx context.Context, p subscribeParameters) {
logFields := p.logFields()
minimumPeersPerSubnet := flags.Get().MinimumPeersPerSubnet
// Warn the user if we are not subscribed to enough peers in the subnets.
log := log.WithField("minimum", minimumPeersPerSubnet)
logTicker := time.NewTicker(5 * time.Minute)
defer logTicker.Stop()
for {
select {
case <-logTicker.C:
currentSlot := s.cfg.clock.CurrentSlot()
subnetsToFindPeersIndex := computeAllNeededSubnets(currentSlot, p.getSubnetsToJoin, p.getSubnetsRequiringPeers)
isSubnetWithMissingPeers := false
// Find new peers for wanted subnets if needed.
for index := range subnetsToFindPeersIndex {
topic := fmt.Sprintf(p.topicFormat, p.nse.ForkDigest, index)
// Check if we have enough peers in the subnet. Skip if we do.
if count := s.connectedPeersCount(topic); count < minimumPeersPerSubnet {
isSubnetWithMissingPeers = true
log.WithFields(logrus.Fields{
"topic": topic,
"actual": count,
}).Debug("Not enough connected peers")
}
}
if !isSubnetWithMissingPeers {
log.WithFields(logFields).Debug("All subnets have enough connected peers")
}
case <-ctx.Done():
return
}
}
}
func (s *Service) unSubscribeFromTopic(topic string) {
log.WithField("topic", topic).Info("Unsubscribed from")
if err := s.cfg.p2p.PubSub().UnregisterTopicValidator(topic); err != nil {
log.WithError(err).Error("Could not unregister topic validator")
}
sub := s.subHandler.subForTopic(topic)
if sub != nil {
sub.Cancel()
}
s.subHandler.removeTopic(topic)
if err := s.cfg.p2p.LeaveTopic(topic); err != nil {
log.WithError(err).Error("Unable to leave topic")
}
}
// connectedPeersCount counts how many peer for a given topic are connected to the node.
func (s *Service) connectedPeersCount(subnetTopic string) int {
topic := subnetTopic + s.cfg.p2p.Encoding().ProtocolSuffix()
peersWithSubnet := s.cfg.p2p.PubSub().ListPeers(topic)
return len(peersWithSubnet)
}
func (s *Service) dataColumnSubnetIndices(primitives.Slot) map[uint64]bool {
nodeID := s.cfg.p2p.NodeID()
samplingSize, err := s.samplingSize()
if err != nil {
log.WithError(err).Error("Could not retrieve sampling size")
return nil
}
// Compute the subnets to subscribe to.
nodeInfo, _, err := peerdas.Info(nodeID, samplingSize)
if err != nil {
log.WithError(err).Error("Could not retrieve peer info")
return nil
}
return nodeInfo.DataColumnsSubnets
}
// samplingSize computes the sampling size based on the samples per slot value,
// the validators custody requirement, and the custody group count.
// The custody group count is the source of truth and already includes supernode/semi-supernode logic.
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/das-core.md#custody-sampling
func (s *Service) samplingSize() (uint64, error) {
cfg := params.BeaconConfig()
// Compute the validators custody requirement.
validatorsCustodyRequirement, err := s.validatorsCustodyRequirement()
if err != nil {
return 0, errors.Wrap(err, "validators custody requirement")
}
// Get custody group count - this is the source of truth and already reflects:
// - Supernode mode: NUMBER_OF_CUSTODY_GROUPS
// - Semi-supernode mode: half of NUMBER_OF_CUSTODY_GROUPS (or more if validators require)
// - Regular mode: validator custody requirement
custodyGroupCount, err := s.cfg.p2p.CustodyGroupCount(s.ctx)
if err != nil {
return 0, errors.Wrap(err, "custody group count")
}
// Sampling size should match custody to ensure we can serve what we advertise
return max(cfg.SamplesPerSlot, validatorsCustodyRequirement, custodyGroupCount), nil
}
func (s *Service) persistentAndAggregatorSubnetIndices(currentSlot primitives.Slot) map[uint64]bool {
persistentSubnetIndices := persistentSubnetIndices()
aggregatorSubnetIndices := aggregatorSubnetIndices(currentSlot)
// Combine subscriptions to get all requested subscriptions.
return mapFromSlice(persistentSubnetIndices, aggregatorSubnetIndices)
}
// filters out required peers for the node to function, not
// pruning peers who are in our attestation subnets.
func (s *Service) filterNeededPeers(pids []peer.ID) []peer.ID {
minimumPeersPerSubnet := flags.Get().MinimumPeersPerSubnet
currentSlot := s.cfg.clock.CurrentSlot()
// Exit early if nothing to filter.
if len(pids) == 0 {
return pids
}
digest, err := s.currentForkDigest()
if err != nil {
log.WithError(err).Error("Could not compute fork digest")
return pids
}
wantedSubnets := make(map[uint64]bool)
for subnet := range s.persistentAndAggregatorSubnetIndices(currentSlot) {
wantedSubnets[subnet] = true
}
for subnet := range attesterSubnetIndices(currentSlot) {
wantedSubnets[subnet] = true
}
topic := p2p.GossipTypeMapping[reflect.TypeFor[*ethpb.Attestation]()]
// Map of peers in subnets
peerMap := make(map[peer.ID]bool)
for subnet := range wantedSubnets {
subnetTopic := fmt.Sprintf(topic, digest, subnet) + s.cfg.p2p.Encoding().ProtocolSuffix()
peers := s.cfg.p2p.PubSub().ListPeers(subnetTopic)
if len(peers) > minimumPeersPerSubnet {
// In the event we have more than the minimum, we can
// mark the remaining as viable for pruning.
peers = peers[:minimumPeersPerSubnet]
}
// Add peer to peer map.
for _, peer := range peers {
// Even if the peer ID has already been seen we still set it,
// as the outcome is the same.
peerMap[peer] = true
}
}
// Clear out necessary peers from the peers to prune.
newPeers := make([]peer.ID, 0, len(pids))
for _, pid := range pids {
if peerMap[pid] {
continue
}
newPeers = append(newPeers, pid)
}
return newPeers
}
// Add fork digest to topic.
func (*Service) addDigestToTopic(topic string, digest [4]byte) string {
if !strings.Contains(topic, "%x") {
log.Error("Topic does not have appropriate formatter for digest")
}
return fmt.Sprintf(topic, digest)
}
// Add the digest and index to subnet topic.
func (*Service) addDigestAndIndexToTopic(topic string, digest [4]byte, idx uint64) string {
if !strings.Contains(topic, "%x") {
log.Error("Topic does not have appropriate formatter for digest")
}
return fmt.Sprintf(topic, digest, idx)
}
func (s *Service) currentForkDigest() ([4]byte, error) {
return params.ForkDigest(s.cfg.clock.CurrentEpoch()), nil
}
// Checks if the provided digest matches up with the current supposed digest.
func isDigestValid(digest [4]byte, clock *startup.Clock) (bool, error) {
current := clock.CurrentEpoch()
// In the event there is a fork the next epoch,
// we skip the check, as we subscribe subnets an
// epoch in advance.
if params.NextNetworkScheduleEntry(current).Epoch == current+1 {
return true, nil
}
return params.ForkDigest(current) == digest, nil
}
// computeAllNeededSubnets computes the subnets we want to join
// and the subnets for which we want to find peers.
func computeAllNeededSubnets(
currentSlot primitives.Slot,
getSubnetsToJoin func(currentSlot primitives.Slot) map[uint64]bool,
getSubnetsRequiringPeers func(currentSlot primitives.Slot) map[uint64]bool,
) map[uint64]bool {
// Retrieve the subnets we want to join.
subnetsToJoin := getSubnetsToJoin(currentSlot)
// Retrieve the subnets we want to find peers into.
subnetsRequiringPeers := make(map[uint64]bool)
if getSubnetsRequiringPeers != nil {
subnetsRequiringPeers = getSubnetsRequiringPeers(currentSlot)
}
// Combine the two maps to get all needed subnets.
neededSubnets := make(map[uint64]bool, len(subnetsToJoin)+len(subnetsRequiringPeers))
for subnet := range subnetsToJoin {
neededSubnets[subnet] = true
}
for subnet := range subnetsRequiringPeers {
neededSubnets[subnet] = true
}
return neededSubnets
}
func agentString(pid peer.ID, hst host.Host) string {
rawVersion, storeErr := hst.Peerstore().Get(pid, "AgentVersion")
agString, ok := rawVersion.(string)
if storeErr != nil || !ok {
agString = ""
}
return agString
}
func multiAddr(pid peer.ID, stat *peers.Status) string {
addrs, err := stat.Address(pid)
if err != nil || addrs == nil {
return ""
}
return addrs.String()
}
func errorIsIgnored(err error) bool {
if errors.Is(err, helpers.ErrTooLate) {
return true
}
if errors.Is(err, altair.ErrTooLate) {
return true
}
return false
}