Files
prysm/beacon-chain/p2p/service.go
Aarsh Shah 0fc9ab925a feat: add support for detecting and logging per address reachability via libp2p AutoNAT v2 (#16100)
**What type of PR is this?**
Feature

**What does this PR do? Why is it needed?**

This PR adds support for detecting and logging per address reachability
via libp2p AutoNAT v2. See
https://github.com/libp2p/go-libp2p/releases/tag/v0.42.0 for details.
This PR also upgrades Prysm to libp2p v0.42.0

**Which issues(s) does this PR fix?**

Fixes #https://github.com/OffchainLabs/prysm/issues/16098

**Other notes for review**

**Acknowledgements**

- [x] I have read
[CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md).
- [x] I have included a uniquely named [changelog fragment
file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd).
- [x] I have added a description to this PR with sufficient context for
reviewers to understand this PR.
2025-12-11 11:56:52 +00:00

606 lines
18 KiB
Go

// Package p2p defines the network protocol implementation for Ethereum consensus
// used by beacon nodes, including peer discovery using discv5, gossip-sub
// using libp2p, and handing peer lifecycles + handshakes.
package p2p
import (
"context"
"crypto/ecdsa"
"fmt"
"sync"
"time"
"github.com/OffchainLabs/prysm/v7/async"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/encoder"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/peers"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/peers/scorers"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/types"
"github.com/OffchainLabs/prysm/v7/cmd/beacon-chain/flags"
"github.com/OffchainLabs/prysm/v7/config/features"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
leakybucket "github.com/OffchainLabs/prysm/v7/container/leaky-bucket"
"github.com/OffchainLabs/prysm/v7/monitoring/tracing/trace"
prysmnetwork "github.com/OffchainLabs/prysm/v7/network"
"github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1/metadata"
"github.com/OffchainLabs/prysm/v7/runtime"
"github.com/OffchainLabs/prysm/v7/time/slots"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/libp2p/go-libp2p"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/multiformats/go-multiaddr"
"github.com/patrickmn/go-cache"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
var _ runtime.Service = (*Service)(nil)
const (
// When looking for new nodes, if not enough nodes are found,
// we stop after this spent time.
batchPeriod = 2 * time.Second
// maxBadResponses is the maximum number of bad responses from a peer before we stop talking to it.
maxBadResponses = 5
)
var (
// Refresh rate of ENR set at twice per slot.
refreshRate = slots.DivideSlotBy(2)
// maxDialTimeout is the timeout for a single peer dial.
maxDialTimeout = params.BeaconConfig().RespTimeoutDuration()
// In the event that we are at our peer limit, we
// stop looking for new peers and instead poll
// for the current peer limit status for the time period
// defined below.
pollingPeriod = 6 * time.Second
)
// Service for managing peer to peer (p2p) networking.
type Service struct {
started bool
isPreGenesis bool
pingMethod func(ctx context.Context, id peer.ID) error
pingMethodLock sync.RWMutex
cancel context.CancelFunc
cfg *Config
peers *peers.Status
addrFilter *multiaddr.Filters
ipLimiter *leakybucket.Collector
privKey *ecdsa.PrivateKey
metaData metadata.Metadata
pubsub *pubsub.PubSub
joinedTopics map[string]*pubsub.Topic
joinedTopicsLock sync.RWMutex
subnetsLock map[uint64]*sync.RWMutex
subnetsLockLock sync.Mutex // Lock access to subnetsLock
initializationLock sync.Mutex
dv5Listener ListenerRebooter
startupErr error
ctx context.Context
host host.Host
genesisTime time.Time
genesisValidatorsRoot []byte
activeValidatorCount uint64
activeValidatorCountLock sync.Mutex
peerDisconnectionTime *cache.Cache
custodyInfo *custodyInfo
custodyInfoLock sync.RWMutex // Lock access to custodyInfo
custodyInfoSet chan struct{}
allForkDigests map[[4]byte]struct{}
}
type custodyInfo struct {
earliestAvailableSlot primitives.Slot
groupCount uint64
}
// NewService initializes a new p2p service compatible with shared.Service interface. No
// connections are made until the Start function is called during the service registry startup.
func NewService(ctx context.Context, cfg *Config) (*Service, error) {
ctx, cancel := context.WithCancel(ctx)
_ = cancel // govet fix for lost cancel. Cancel is handled in service.Stop().
validateConfig(cfg)
privKey, err := privKey(cfg)
if err != nil {
return nil, errors.Wrapf(err, "failed to generate p2p private key")
}
p2pMaxPeers.Set(float64(cfg.MaxPeers))
minimumPeersPerSubnet.Set(float64(flags.Get().MinimumPeersPerSubnet))
metaData, err := metaDataFromDB(ctx, cfg.DB)
if err != nil {
log.WithError(err).Error("Failed to create peer metadata")
return nil, err
}
addrFilter, err := configureFilter(cfg)
if err != nil {
log.WithError(err).Error("Failed to create address filter")
return nil, err
}
ipLimiter := leakybucket.NewCollector(ipLimit, ipBurst, 30*time.Second, true /* deleteEmptyBuckets */)
s := &Service{
ctx: ctx,
cancel: cancel,
cfg: cfg,
addrFilter: addrFilter,
ipLimiter: ipLimiter,
privKey: privKey,
metaData: metaData,
isPreGenesis: true,
joinedTopics: make(map[string]*pubsub.Topic, len(gossipTopicMappings)),
subnetsLock: make(map[uint64]*sync.RWMutex),
peerDisconnectionTime: cache.New(1*time.Second, 1*time.Minute),
custodyInfoSet: make(chan struct{}),
}
ipAddr := prysmnetwork.IPAddr()
opts, err := s.buildOptions(ipAddr, s.privKey)
if err != nil {
return nil, errors.Wrapf(err, "failed to build p2p options")
}
h, err := libp2p.New(opts...)
if err != nil {
return nil, errors.Wrapf(err, "failed to create p2p host")
}
s.host = h
// Gossipsub registration is done before we add in any new peers
// due to libp2p's gossipsub implementation not taking into
// account previously added peers when creating the gossipsub
// object.
psOpts := s.pubsubOptions()
// Set the pubsub global parameters that we require.
setPubSubParameters()
// Reinitialize them in the event we are running a custom config.
attestationSubnetCount = params.BeaconConfig().AttestationSubnetCount
syncCommsSubnetCount = params.BeaconConfig().SyncCommitteeSubnetCount
gs, err := pubsub.NewGossipSub(s.ctx, s.host, psOpts...)
if err != nil {
return nil, errors.Wrapf(err, "failed to create p2p pubsub")
}
s.pubsub = gs
s.peers = peers.NewStatus(ctx, &peers.StatusConfig{
PeerLimit: int(s.cfg.MaxPeers),
IPColocationWhitelist: s.cfg.IPColocationWhitelist,
ScorerParams: &scorers.Config{
BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{
Threshold: maxBadResponses,
DecayInterval: time.Hour,
},
},
})
// Initialize Data maps.
types.InitializeDataMaps()
return s, nil
}
// Start the p2p service.
func (s *Service) Start() {
if s.started {
log.Error("Attempted to start p2p service when it was already started")
return
}
// Waits until the state is initialized via an event feed.
// Used for fork-related data when connecting peers.
s.awaitStateInitialized()
s.setAllForkDigests()
s.isPreGenesis = false
var relayNodes []string
if s.cfg.RelayNodeAddr != "" {
relayNodes = append(relayNodes, s.cfg.RelayNodeAddr)
if err := dialRelayNode(s.ctx, s.host, s.cfg.RelayNodeAddr); err != nil {
log.WithError(err).Errorf("Could not dial relay node")
}
}
if !s.cfg.NoDiscovery {
ipAddr := prysmnetwork.IPAddr()
listener, err := s.startDiscoveryV5(
ipAddr,
s.privKey,
)
if err != nil {
log.WithError(err).Fatal("Failed to start discovery")
s.startupErr = err
return
}
if err := s.connectToBootnodes(); err != nil {
log.WithError(err).Error("Could not connect to boot nodes")
s.startupErr = err
return
}
s.dv5Listener = listener
go s.listenForNewNodes()
}
s.started = true
if len(s.cfg.StaticPeers) > 0 {
addrs, err := PeersFromStringAddrs(s.cfg.StaticPeers)
if err != nil {
log.WithError(err).Error("Could not convert ENR to multiaddr")
}
// Set trusted peers for those that are provided as static addresses.
pids := peerIdsFromMultiAddrs(addrs)
s.peers.SetTrustedPeers(pids)
s.connectWithAllTrustedPeers(addrs)
}
// Initialize metadata according to the
// current epoch.
s.RefreshPersistentSubnets()
if s.cfg.EnableAutoNAT {
if err := s.subscribeReachabilityEvents(); err != nil {
log.WithError(err).Error("Failed to subscribe to AutoNAT v2 reachability events")
} else {
log.Info("AutoNAT v2 enabled for address reachability detection")
}
}
// Periodic functions.
async.RunEvery(s.ctx, params.BeaconConfig().TtfbTimeoutDuration(), func() {
ensurePeerConnections(s.ctx, s.host, s.peers, relayNodes...)
})
async.RunEvery(s.ctx, 30*time.Minute, s.Peers().Prune)
async.RunEvery(s.ctx, time.Duration(params.BeaconConfig().RespTimeout)*time.Second, s.updateMetrics)
async.RunEvery(s.ctx, refreshRate, s.RefreshPersistentSubnets)
async.RunEvery(s.ctx, 1*time.Minute, func() {
inboundQUICCount := len(s.peers.InboundConnectedWithProtocol(peers.QUIC))
inboundTCPCount := len(s.peers.InboundConnectedWithProtocol(peers.TCP))
outboundQUICCount := len(s.peers.OutboundConnectedWithProtocol(peers.QUIC))
outboundTCPCount := len(s.peers.OutboundConnectedWithProtocol(peers.TCP))
total := inboundQUICCount + inboundTCPCount + outboundQUICCount + outboundTCPCount
fields := logrus.Fields{
"inboundTCP": inboundTCPCount,
"outboundTCP": outboundTCPCount,
"total": total,
"target": s.cfg.MaxPeers,
}
if features.Get().EnableQUIC {
fields["inboundQUIC"] = inboundQUICCount
fields["outboundQUIC"] = outboundQUICCount
}
log.WithFields(fields).Info("Connected peers")
})
multiAddrs := s.host.Network().ListenAddresses()
logIPAddr(s.host.ID(), multiAddrs...)
p2pHostAddress := s.cfg.HostAddress
p2pTCPPort := s.cfg.TCPPort
p2pQUICPort := s.cfg.QUICPort
if p2pHostAddress != "" {
logExternalIPAddr(s.host.ID(), p2pHostAddress, p2pTCPPort, p2pQUICPort)
verifyConnectivity(p2pHostAddress, p2pTCPPort, "tcp")
}
p2pHostDNS := s.cfg.HostDNS
if p2pHostDNS != "" {
logExternalDNSAddr(s.host.ID(), p2pHostDNS, p2pTCPPort)
}
go s.forkWatcher()
}
// Stop the p2p service and terminate all peer connections.
func (s *Service) Stop() error {
defer s.cancel()
s.started = false
if s.dv5Listener != nil {
s.dv5Listener.Close()
}
return nil
}
// Status of the p2p service. Will return an error if the service is considered unhealthy to
// indicate that this node should not serve traffic until the issue has been resolved.
func (s *Service) Status() error {
if s.isPreGenesis {
return nil
}
if !s.started {
return errors.New("not running")
}
if s.startupErr != nil {
return s.startupErr
}
if s.genesisTime.IsZero() {
return errors.New("no genesis time set")
}
return nil
}
// Started returns true if the p2p service has successfully started.
func (s *Service) Started() bool {
return s.started
}
// Encoding returns the configured networking encoding.
func (*Service) Encoding() encoder.NetworkEncoding {
return &encoder.SszNetworkEncoder{}
}
// PubSub returns the p2p pubsub framework.
func (s *Service) PubSub() *pubsub.PubSub {
return s.pubsub
}
// Host returns the currently running libp2p
// host of the service.
func (s *Service) Host() host.Host {
return s.host
}
// SetStreamHandler sets the protocol handler on the p2p host multiplexer.
// This method is a pass through to libp2pcore.Host.SetStreamHandler.
func (s *Service) SetStreamHandler(topic string, handler network.StreamHandler) {
s.host.SetStreamHandler(protocol.ID(topic), handler)
}
// PeerID returns the Peer ID of the local peer.
func (s *Service) PeerID() peer.ID {
return s.host.ID()
}
// Disconnect from a peer.
func (s *Service) Disconnect(pid peer.ID) error {
return s.host.Network().ClosePeer(pid)
}
// Connect to a specific peer.
func (s *Service) Connect(pi peer.AddrInfo) error {
return s.host.Connect(s.ctx, pi)
}
// Peers returns the peer status interface.
func (s *Service) Peers() *peers.Status {
return s.peers
}
// ENR returns the local node's current ENR.
func (s *Service) ENR() *enr.Record {
if s.dv5Listener == nil {
return nil
}
return s.dv5Listener.Self().Record()
}
// NodeID returns the local node's node ID for discovery.
func (s *Service) NodeID() enode.ID {
if s.dv5Listener == nil {
return enode.ID{}
}
return s.dv5Listener.Self().ID()
}
// DiscoveryAddresses represents our enr addresses as multiaddresses.
func (s *Service) DiscoveryAddresses() ([]multiaddr.Multiaddr, error) {
if s.dv5Listener == nil {
return nil, nil
}
return convertToUdpMultiAddr(s.dv5Listener.Self())
}
// Metadata returns a copy of the peer's metadata.
func (s *Service) Metadata() metadata.Metadata {
return s.metaData.Copy()
}
// MetadataSeq returns the metadata sequence number.
func (s *Service) MetadataSeq() uint64 {
return s.metaData.SequenceNumber()
}
// AddPingMethod adds the metadata ping rpc method to the p2p service, so that it can
// be used to refresh ENR.
func (s *Service) AddPingMethod(reqFunc func(ctx context.Context, id peer.ID) error) {
s.pingMethodLock.Lock()
s.pingMethod = reqFunc
s.pingMethodLock.Unlock()
}
func (s *Service) pingPeersAndLogEnr() {
s.pingMethodLock.RLock()
defer s.pingMethodLock.RUnlock()
localENR := s.dv5Listener.Self()
log.WithFields(logrus.Fields{
"ENR": localENR,
"seq": localENR.Seq(),
}).Info("New node record")
if s.pingMethod == nil {
return
}
for _, pid := range s.peers.Connected() {
go func(id peer.ID) {
if err := s.pingMethod(s.ctx, id); err != nil {
log.WithField("peer", id).WithError(err).Debug("Failed to ping peer")
}
}(pid)
}
}
// Waits for the beacon state to be initialized, important
// for initializing the p2p service as p2p needs to be aware
// of genesis information for peering.
func (s *Service) awaitStateInitialized() {
s.initializationLock.Lock()
defer s.initializationLock.Unlock()
if s.isInitialized() {
return
}
clock, err := s.cfg.ClockWaiter.WaitForClock(s.ctx)
if err != nil {
log.WithError(err).Fatal("Failed to receive initial genesis data")
}
s.genesisTime = clock.GenesisTime()
gvr := clock.GenesisValidatorsRoot()
s.genesisValidatorsRoot = gvr[:]
_, err = s.currentForkDigest()
if err != nil {
log.WithError(err).Error("Could not initialize fork digest")
}
}
func (s *Service) connectWithAllTrustedPeers(multiAddrs []multiaddr.Multiaddr) {
addrInfos, err := peer.AddrInfosFromP2pAddrs(multiAddrs...)
if err != nil {
log.WithError(err).Error("Could not convert to peer address info's from multiaddresses")
return
}
for _, info := range addrInfos {
// add peer into peer status
s.peers.Add(nil, info.ID, info.Addrs[0], network.DirUnknown)
// make each dial non-blocking
go func(info peer.AddrInfo) {
if err := s.connectWithPeer(s.ctx, info); err != nil {
log.WithError(err).Tracef("Could not connect with peer %s", info.String())
}
}(info)
}
}
func (s *Service) connectWithAllPeers(multiAddrs []multiaddr.Multiaddr) {
addrInfos, err := peer.AddrInfosFromP2pAddrs(multiAddrs...)
if err != nil {
log.WithError(err).Error("Could not convert to peer address info's from multiaddresses")
return
}
for _, info := range addrInfos {
// make each dial non-blocking
go func(info peer.AddrInfo) {
if err := s.connectWithPeer(s.ctx, info); err != nil {
log.WithError(err).Tracef("Could not connect with peer %s", info.String())
}
}(info)
}
}
func (s *Service) connectWithPeer(ctx context.Context, info peer.AddrInfo) error {
ctx, span := trace.StartSpan(ctx, "p2p.connectWithPeer")
defer span.End()
if info.ID == s.host.ID() {
return nil
}
if err := s.Peers().IsBad(info.ID); err != nil {
return errors.Wrap(err, "bad peer")
}
ctx, cancel := context.WithTimeout(ctx, maxDialTimeout)
defer cancel()
if err := s.host.Connect(ctx, info); err != nil {
s.downscorePeer(info.ID, "connectionError")
return errors.Wrap(err, "peer connect")
}
return nil
}
func (s *Service) connectToBootnodes() error {
nodes := make([]*enode.Node, 0, len(s.cfg.Discv5BootStrapAddrs))
for _, addr := range s.cfg.Discv5BootStrapAddrs {
bootNode, err := enode.Parse(enode.ValidSchemes, addr)
if err != nil {
return err
}
// do not dial bootnodes with their tcp ports not set
if err := bootNode.Record().Load(enr.WithEntry("tcp", new(enr.TCP))); err != nil {
if !enr.IsNotFound(err) {
log.WithError(err).Error("Could not retrieve tcp port")
}
continue
}
nodes = append(nodes, bootNode)
}
multiAddresses := convertToMultiAddr(nodes)
s.connectWithAllPeers(multiAddresses)
return nil
}
// Returns true if the service is aware of the genesis time and genesis validators root. This is
// required for discovery and pubsub validation.
func (s *Service) isInitialized() bool {
return !s.genesisTime.IsZero() && len(s.genesisValidatorsRoot) == 32
}
func (s *Service) downscorePeer(peerID peer.ID, reason string) {
newScore := s.Peers().Scorers().BadResponsesScorer().Increment(peerID)
log.WithFields(logrus.Fields{"peerID": peerID, "reason": reason, "newScore": newScore}).Debug("Downscore peer")
}
func (s *Service) subscribeReachabilityEvents() error {
sub, err := s.host.EventBus().Subscribe(new(event.EvtHostReachableAddrsChanged))
if err != nil {
return fmt.Errorf("subscribing to reachability events: %w", err)
}
go func() {
defer func() {
if err := sub.Close(); err != nil {
log.WithError(err).Debug("Failed to close reachability event subscription")
}
}()
for {
select {
case <-s.ctx.Done():
return
case ev := <-sub.Out():
if event, ok := ev.(event.EvtHostReachableAddrsChanged); ok {
log.WithFields(logrus.Fields{
"reachable": multiaddrsToStrings(event.Reachable),
"unreachable": multiaddrsToStrings(event.Unreachable),
"unknown": multiaddrsToStrings(event.Unknown),
}).Info("Address reachability changed")
}
}
}
}()
return nil
}
func multiaddrsToStrings(addrs []multiaddr.Multiaddr) []string {
strs := make([]string, len(addrs))
for i, a := range addrs {
strs[i] = a.String()
}
return strs
}