From cd0821d0262b38be34b0dd63f2e354dcb5860470 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Fri, 18 Jul 2025 19:19:15 +0200 Subject: [PATCH] Subnets subscription: Avoid dynamic subscribing blocking in case not enough peers per subnets are found. (#15471) * Subnets subscription: Avoid dynamic subscribing blocking in case not enough peers per subnets are found. * `subscribeWithParameters`: Use struct to avoid too many function parameters (no functional changes). * Optimise subnets search. Currently, when we are looking for peers in let's say data column sidecars subnets 3, 6 and 7, we first look for peers in subnet 3. If, during the crawling, we meet some peers with subnet 6, we discard them (because we are exclusively looking for peers with subnet 3). When we are happy, we start again with peers with subnet 6. This commit optimizes that by looking for peers with satisfy our constraints in one look. * Fix James' comment. * Fix James' comment. * Fix James' comment. * Fix James' commnet. * Fix James' comment. * Fix James' comment. * Fix James's comment. * Simplify following James' comment. * Fix James' comment. * Update beacon-chain/sync/rpc_goodbye.go Co-authored-by: Preston Van Loon * Update config/params/config.go Co-authored-by: Preston Van Loon * Update beacon-chain/sync/subscriber.go Co-authored-by: Preston Van Loon * Fix Preston's comment. * Fix Preston's comment. * `TestService_BroadcastDataColumn`: Re-add sleep 50 ms. * Fix Preston's comment. * Update beacon-chain/p2p/subnets.go Co-authored-by: Preston Van Loon --------- Co-authored-by: Preston Van Loon --- beacon-chain/blockchain/setup_test.go | 2 +- beacon-chain/builder/service.go | 2 +- beacon-chain/p2p/broadcaster.go | 75 +-- beacon-chain/p2p/broadcaster_test.go | 71 +-- beacon-chain/p2p/connection_gater.go | 5 +- beacon-chain/p2p/discovery.go | 293 ++++++---- beacon-chain/p2p/discovery_test.go | 10 +- beacon-chain/p2p/handshake.go | 23 +- beacon-chain/p2p/interfaces.go | 4 +- beacon-chain/p2p/peers/status.go | 2 +- beacon-chain/p2p/service.go | 33 +- beacon-chain/p2p/service_test.go | 89 +-- beacon-chain/p2p/subnets.go | 444 +++++++++------ beacon-chain/p2p/subnets_test.go | 87 ++- beacon-chain/p2p/testing/fuzz_p2p.go | 8 +- beacon-chain/p2p/testing/mock_broadcaster.go | 2 +- beacon-chain/p2p/testing/mock_peermanager.go | 7 +- beacon-chain/p2p/testing/p2p.go | 8 +- beacon-chain/sync/error.go | 14 +- beacon-chain/sync/metrics.go | 2 +- beacon-chain/sync/rpc_goodbye.go | 9 +- beacon-chain/sync/rpc_status.go | 1 + beacon-chain/sync/subscriber.go | 538 ++++++++++-------- .../sync/subscriber_beacon_attestation.go | 16 +- beacon-chain/sync/subscriber_test.go | 17 +- changelog/manu-subscriptions.md | 2 + config/params/config.go | 2 + 27 files changed, 988 insertions(+), 778 deletions(-) create mode 100644 changelog/manu-subscriptions.md diff --git a/beacon-chain/blockchain/setup_test.go b/beacon-chain/blockchain/setup_test.go index 70645d89d7..c953695453 100644 --- a/beacon-chain/blockchain/setup_test.go +++ b/beacon-chain/blockchain/setup_test.go @@ -87,7 +87,7 @@ func (mb *mockBroadcaster) BroadcastLightClientFinalityUpdate(_ context.Context, return nil } -func (mb *mockBroadcaster) BroadcastDataColumn(_ [fieldparams.RootLength]byte, _ uint64, _ *ethpb.DataColumnSidecar, _ ...chan<- bool) error { +func (mb *mockBroadcaster) BroadcastDataColumn(_ [fieldparams.RootLength]byte, _ uint64, _ *ethpb.DataColumnSidecar) error { mb.broadcastCalled = true return nil } diff --git a/beacon-chain/builder/service.go b/beacon-chain/builder/service.go index c7f9a06d04..7cb86d60c6 100644 --- a/beacon-chain/builder/service.go +++ b/beacon-chain/builder/service.go @@ -68,7 +68,7 @@ func NewService(ctx context.Context, opts ...Option) (*Service, error) { log.WithError(err).Error("Failed to check builder status") } else { log.WithField("endpoint", s.c.NodeURL()).Info("Builder has been configured") - log.Warn("Outsourcing block construction to external builders adds non-trivial delay to block propagation time. " + + log.Warn("Outsourcing block construction to external builders adds non-trivial delay to block propagation time. " + "Builder-constructed blocks or fallback blocks may get orphaned. Use at your own risk!") } } diff --git a/beacon-chain/p2p/broadcaster.go b/beacon-chain/p2p/broadcaster.go index dc12bdbefe..e0faf2e415 100644 --- a/beacon-chain/p2p/broadcaster.go +++ b/beacon-chain/p2p/broadcaster.go @@ -24,6 +24,8 @@ import ( "google.golang.org/protobuf/proto" ) +const minimumPeersPerSubnetForBroadcast = 1 + // ErrMessageNotMapped occurs on a Broadcast attempt when a message has not been defined in the // GossipTypeMapping. var ErrMessageNotMapped = errors.New("message type is not mapped to a PubSub topic") @@ -124,15 +126,13 @@ func (s *Service) internalBroadcastAttestation(ctx context.Context, subnet uint6 if err := func() error { s.subnetLocker(subnet).Lock() defer s.subnetLocker(subnet).Unlock() - ok, err := s.FindPeersWithSubnet(ctx, attestationToTopic(subnet, forkDigest), subnet, 1) - if err != nil { - return err + + if err := s.FindAndDialPeersWithSubnets(ctx, AttestationSubnetTopicFormat, forkDigest, minimumPeersPerSubnetForBroadcast, map[uint64]bool{subnet: true}); err != nil { + return errors.Wrap(err, "find peers with subnets") } - if ok { - savedAttestationBroadcasts.Inc() - return nil - } - return errors.New("failed to find peers for subnet") + + savedAttestationBroadcasts.Inc() + return nil }(); err != nil { log.WithError(err).Error("Failed to find peers") tracing.AnnotateError(span, err) @@ -183,15 +183,12 @@ func (s *Service) broadcastSyncCommittee(ctx context.Context, subnet uint64, sMs if err := func() error { s.subnetLocker(wrappedSubIdx).Lock() defer s.subnetLocker(wrappedSubIdx).Unlock() - ok, err := s.FindPeersWithSubnet(ctx, syncCommitteeToTopic(subnet, forkDigest), subnet, 1) - if err != nil { - return err + if err := s.FindAndDialPeersWithSubnets(ctx, SyncCommitteeSubnetTopicFormat, forkDigest, minimumPeersPerSubnetForBroadcast, map[uint64]bool{subnet: true}); err != nil { + return errors.Wrap(err, "find peers with subnets") } - if ok { - savedSyncCommitteeBroadcasts.Inc() - return nil - } - return errors.New("failed to find peers for subnet") + + savedSyncCommitteeBroadcasts.Inc() + return nil }(); err != nil { log.WithError(err).Error("Failed to find peers") tracing.AnnotateError(span, err) @@ -250,15 +247,13 @@ func (s *Service) internalBroadcastBlob(ctx context.Context, subnet uint64, blob if err := func() error { s.subnetLocker(wrappedSubIdx).Lock() defer s.subnetLocker(wrappedSubIdx).Unlock() - ok, err := s.FindPeersWithSubnet(ctx, blobSubnetToTopic(subnet, forkDigest), subnet, 1) - if err != nil { - return err + + if err := s.FindAndDialPeersWithSubnets(ctx, BlobSubnetTopicFormat, forkDigest, minimumPeersPerSubnetForBroadcast, map[uint64]bool{subnet: true}); err != nil { + return errors.Wrap(err, "find peers with subnets") } - if ok { - blobSidecarBroadcasts.Inc() - return nil - } - return errors.New("failed to find peers for subnet") + + blobSidecarBroadcasts.Inc() + return nil }(); err != nil { log.WithError(err).Error("Failed to find peers") tracing.AnnotateError(span, err) @@ -329,7 +324,6 @@ func (s *Service) BroadcastDataColumn( root [fieldparams.RootLength]byte, dataColumnSubnet uint64, dataColumnSidecar *ethpb.DataColumnSidecar, - peersCheckedChans ...chan<- bool, // Used for testing purposes to signal when peers are checked. ) error { // Add tracing to the function. ctx, span := trace.StartSpan(s.ctx, "p2p.BroadcastDataColumn") @@ -349,7 +343,7 @@ func (s *Service) BroadcastDataColumn( } // Non-blocking broadcast, with attempts to discover a column subnet peer if none available. - go s.internalBroadcastDataColumn(ctx, root, dataColumnSubnet, dataColumnSidecar, forkDigest, peersCheckedChans) + go s.internalBroadcastDataColumn(ctx, root, dataColumnSubnet, dataColumnSidecar, forkDigest) return nil } @@ -360,7 +354,6 @@ func (s *Service) internalBroadcastDataColumn( columnSubnet uint64, dataColumnSidecar *ethpb.DataColumnSidecar, forkDigest [fieldparams.VersionLength]byte, - peersCheckedChans []chan<- bool, // Used for testing purposes to signal when peers are checked. ) { // Add tracing to the function. _, span := trace.StartSpan(ctx, "p2p.internalBroadcastDataColumn") @@ -382,7 +375,7 @@ func (s *Service) internalBroadcastDataColumn( wrappedSubIdx := columnSubnet + dataColumnSubnetVal // Find peers if needed. - if err := s.findPeersIfNeeded(ctx, wrappedSubIdx, topic, columnSubnet, peersCheckedChans); err != nil { + if err := s.findPeersIfNeeded(ctx, wrappedSubIdx, DataColumnSubnetTopicFormat, forkDigest, columnSubnet); err != nil { log.WithError(err).Error("Failed to find peers for data column subnet") tracing.AnnotateError(span, err) } @@ -416,35 +409,19 @@ func (s *Service) internalBroadcastDataColumn( func (s *Service) findPeersIfNeeded( ctx context.Context, wrappedSubIdx uint64, - topic string, + topicFormat string, + forkDigest [fieldparams.VersionLength]byte, subnet uint64, - peersCheckedChans []chan<- bool, // Used for testing purposes to signal when peers are checked. ) error { + // Sending a data column sidecar to only one peer is not ideal, + // but it ensures at least one peer receives it. s.subnetLocker(wrappedSubIdx).Lock() defer s.subnetLocker(wrappedSubIdx).Unlock() - // Sending a data column sidecar to only one peer is not ideal, - // but it ensures at least one peer receives it. - const peerCount = 1 - - if s.hasPeerWithSubnet(topic) { - // Exit early if we already have peers with this subnet. - return nil - } - - // Used for testing purposes. - if len(peersCheckedChans) > 0 { - peersCheckedChans[0] <- true - } - // No peers found, attempt to find peers with this subnet. - ok, err := s.FindPeersWithSubnet(ctx, topic, subnet, peerCount) - if err != nil { + if err := s.FindAndDialPeersWithSubnets(ctx, topicFormat, forkDigest, minimumPeersPerSubnetForBroadcast, map[uint64]bool{subnet: true}); err != nil { return errors.Wrap(err, "find peers with subnet") } - if !ok { - return errors.Errorf("failed to find peers for topic %s with subnet %d", topic, subnet) - } return nil } diff --git a/beacon-chain/p2p/broadcaster_test.go b/beacon-chain/p2p/broadcaster_test.go index 058b5e489d..4e53bd398f 100644 --- a/beacon-chain/p2p/broadcaster_test.go +++ b/beacon-chain/p2p/broadcaster_test.go @@ -216,9 +216,10 @@ func TestService_BroadcastAttestation(t *testing.T) { } func TestService_BroadcastAttestationWithDiscoveryAttempts(t *testing.T) { + const port = uint(2000) + // Setup bootnode. cfg := &Config{PingInterval: testPingInterval} - port := 2000 cfg.UDPPort = uint(port) _, pkey := createAddrAndPrivKey(t) ipAddr := net.ParseIP("127.0.0.1") @@ -245,7 +246,7 @@ func TestService_BroadcastAttestationWithDiscoveryAttempts(t *testing.T) { PingInterval: testPingInterval, } // Setup 2 different hosts - for i := 1; i <= 2; i++ { + for i := uint(1); i <= 2; i++ { h, pkey, ipAddr := createHost(t, port+i) cfg.UDPPort = uint(port + i) cfg.TCPPort = uint(port + i) @@ -687,7 +688,7 @@ func TestService_BroadcastDataColumn(t *testing.T) { // Create a host. _, pkey, ipAddr := createHost(t, port) - p := &Service{ + service := &Service{ ctx: t.Context(), host: p1.BHost, pubsub: p1.PubSub(), @@ -701,56 +702,44 @@ func TestService_BroadcastDataColumn(t *testing.T) { } // Create a listener. - listener, err := p.startDiscoveryV5(ipAddr, pkey) + listener, err := service.startDiscoveryV5(ipAddr, pkey) require.NoError(t, err) - p.dv5Listener = listener + service.dv5Listener = listener - digest, err := p.currentForkDigest() + digest, err := service.currentForkDigest() require.NoError(t, err) subnet := peerdas.ComputeSubnetForDataColumnSidecar(columnIndex) - topic := fmt.Sprintf(topicFormat, digest, subnet) + topic := fmt.Sprintf(topicFormat, digest, subnet) + service.Encoding().ProtocolSuffix() roSidecars, _ := util.CreateTestVerifiedRoDataColumnSidecars(t, []util.DataColumnParam{{Index: columnIndex}}) sidecar := roSidecars[0].DataColumnSidecar - // Async listen for the pubsub, must be before the broadcast. - var wg sync.WaitGroup - wg.Add(1) - - peersChecked := make(chan bool, 0) - - go func(tt *testing.T) { - defer wg.Done() - - ctx, cancel := context.WithTimeout(t.Context(), 5*time.Second) - defer cancel() - - // Wait for the peers to be checked. - <-peersChecked - - // External peer subscribes to the topic. - topic += p.Encoding().ProtocolSuffix() - sub, err := p2.SubscribeToTopic(topic) - require.NoError(tt, err) - - msg, err := sub.Next(ctx) - require.NoError(tt, err) - - var result ethpb.DataColumnSidecar - require.NoError(tt, p.Encoding().DecodeGossip(msg.Data, &result)) - require.DeepEqual(tt, &result, sidecar) - }(t) - - var emptyRoot [fieldparams.RootLength]byte - // Attempt to broadcast nil object should fail. - err = p.BroadcastDataColumn(emptyRoot, subnet, nil) + var emptyRoot [fieldparams.RootLength]byte + err = service.BroadcastDataColumn(emptyRoot, subnet, nil) require.ErrorContains(t, "attempted to broadcast nil", err) - // Broadcast to peers and wait. - err = p.BroadcastDataColumn(emptyRoot, subnet, sidecar, peersChecked) + // Subscribe to the topic. + sub, err := p2.SubscribeToTopic(topic) require.NoError(t, err) - require.Equal(t, false, util.WaitTimeout(&wg, 1*time.Minute), "Failed to receive pubsub within 1s") + + // libp2p fails without this delay + time.Sleep(50 * time.Millisecond) + + // Broadcast to peers and wait. + err = service.BroadcastDataColumn(emptyRoot, subnet, sidecar) + require.NoError(t, err) + + // Receive the message. + ctx, cancel := context.WithTimeout(t.Context(), 5*time.Second) + defer cancel() + + msg, err := sub.Next(ctx) + require.NoError(t, err) + + var result ethpb.DataColumnSidecar + require.NoError(t, service.Encoding().DecodeGossip(msg.Data, &result)) + require.DeepEqual(t, &result, sidecar) } diff --git a/beacon-chain/p2p/connection_gater.go b/beacon-chain/p2p/connection_gater.go index 8147d07b6c..eb27bff432 100644 --- a/beacon-chain/p2p/connection_gater.go +++ b/beacon-chain/p2p/connection_gater.go @@ -19,8 +19,7 @@ const ( // Burst limit for inbound dials. ipBurst = 8 - // High watermark buffer signifies the buffer till which - // we will handle inbound requests. + // High watermark buffer signifies the buffer till which we will handle inbound requests. highWatermarkBuffer = 20 ) @@ -53,7 +52,7 @@ func (s *Service) InterceptAccept(n network.ConnMultiaddrs) (allow bool) { "reason": "exceeded dial limit"}).Trace("Not accepting inbound dial from ip address") return false } - if s.isPeerAtLimit(true /* inbound */) { + if s.isPeerAtLimit(inbound) { log.WithFields(logrus.Fields{"peer": n.RemoteMultiaddr(), "reason": "at peer limit"}).Trace("Not accepting inbound dial") return false diff --git a/beacon-chain/p2p/discovery.go b/beacon-chain/p2p/discovery.go index 882b531999..2fd791591a 100644 --- a/beacon-chain/p2p/discovery.go +++ b/beacon-chain/p2p/discovery.go @@ -2,7 +2,9 @@ package p2p import ( "bytes" + "context" "crypto/ecdsa" + "math" "net" "sync" "time" @@ -23,45 +25,56 @@ import ( ma "github.com/multiformats/go-multiaddr" "github.com/pkg/errors" "github.com/prysmaticlabs/go-bitfield" - "github.com/sirupsen/logrus" ) -type ListenerRebooter interface { - Listener - RebootListener() error -} +type ( + // ListenerRebooter is an interface that extends the Listener interface + // with the `RebootListener` method. + ListenerRebooter interface { + Listener + RebootListener() error + } -// Listener defines the discovery V5 network interface that is used -// to communicate with other peers. -type Listener interface { - Self() *enode.Node - Close() - Lookup(enode.ID) []*enode.Node - Resolve(*enode.Node) *enode.Node - RandomNodes() enode.Iterator - Ping(*enode.Node) error - RequestENR(*enode.Node) (*enode.Node, error) - LocalNode() *enode.LocalNode -} + // Listener defines the discovery V5 network interface that is used + // to communicate with other peers. + Listener interface { + Self() *enode.Node + Close() + Lookup(enode.ID) []*enode.Node + Resolve(*enode.Node) *enode.Node + RandomNodes() enode.Iterator + Ping(*enode.Node) error + RequestENR(*enode.Node) (*enode.Node, error) + LocalNode() *enode.LocalNode + } -const ( - udp4 = iota - udp6 + quicProtocol uint16 + + listenerWrapper struct { + mu sync.RWMutex + listener *discover.UDPv5 + listenerCreator func() (*discover.UDPv5, error) + } + + connectivityDirection int + udpVersion int ) const quickProtocolEnrKey = "quic" -type quicProtocol uint16 +const ( + udp4 udpVersion = iota + udp6 +) + +const ( + inbound connectivityDirection = iota + all +) // quicProtocol is the "quic" key, which holds the QUIC port of the node. func (quicProtocol) ENRKey() string { return quickProtocolEnrKey } -type listenerWrapper struct { - mu sync.RWMutex - listener *discover.UDPv5 - listenerCreator func() (*discover.UDPv5, error) -} - func newListener(listenerCreator func() (*discover.UDPv5, error)) (*listenerWrapper, error) { rawListener, err := listenerCreator() if err != nil { @@ -276,29 +289,10 @@ func (s *Service) RefreshPersistentSubnets() { // listen for new nodes watches for new nodes in the network and adds them to the peerstore. func (s *Service) listenForNewNodes() { const ( - minLogInterval = 1 * time.Minute thresholdLimit = 5 + searchPeriod = 20 * time.Second ) - peersSummary := func(threshold uint) (uint, uint) { - // Retrieve how many active peers we have. - activePeers := s.Peers().Active() - activePeerCount := uint(len(activePeers)) - - // Compute how many peers we are missing to reach the threshold. - if activePeerCount >= threshold { - return activePeerCount, 0 - } - - missingPeerCount := threshold - activePeerCount - - return activePeerCount, missingPeerCount - } - - var lastLogTime time.Time - - iterator := s.dv5Listener.RandomNodes() - defer iterator.Close() connectivityTicker := time.NewTicker(1 * time.Minute) thresholdCount := 0 @@ -330,74 +324,148 @@ func (s *Service) listenForNewNodes() { continue } - iterator = s.dv5Listener.RandomNodes() thresholdCount = 0 } default: - if s.isPeerAtLimit(false /* inbound */) { - // Pause the main loop for a period to stop looking - // for new peers. + if s.isPeerAtLimit(all) { + // Pause the main loop for a period to stop looking for new peers. log.Trace("Not looking for peers, at peer limit") time.Sleep(pollingPeriod) continue } - // Compute the number of new peers we want to dial. - activePeerCount, missingPeerCount := peersSummary(s.cfg.MaxPeers) - - fields := logrus.Fields{ - "currentPeerCount": activePeerCount, - "targetPeerCount": s.cfg.MaxPeers, + // Return early if the discovery listener isn't set. + if s.dv5Listener == nil { + return } - if missingPeerCount == 0 { - log.Trace("Not looking for peers, at peer limit") - time.Sleep(pollingPeriod) - continue - } + func() { + ctx, cancel := context.WithTimeout(s.ctx, searchPeriod) + defer cancel() - if time.Since(lastLogTime) > minLogInterval { - lastLogTime = time.Now() - log.WithFields(fields).Debug("Searching for new active peers") - } - - // Restrict dials if limit is applied. - if flags.MaxDialIsActive() { - maxConcurrentDials := uint(flags.Get().MaxConcurrentDials) - missingPeerCount = min(missingPeerCount, maxConcurrentDials) - } - - // Search for new peers. - wantedNodes := searchForPeers(iterator, batchPeriod, missingPeerCount, s.filterPeer) - - wg := new(sync.WaitGroup) - for i := 0; i < len(wantedNodes); i++ { - node := wantedNodes[i] - peerInfo, _, err := convertToAddrInfo(node) - if err != nil { - log.WithError(err).Error("Could not convert to peer info") - continue + if err := s.findAndDialPeers(ctx); err != nil && !errors.Is(err, context.DeadlineExceeded) { + log.WithError(err).Error("Failed to find and dial peers") } - - if peerInfo == nil { - continue - } - - // Make sure that peer is not dialed too often, for each connection attempt there's a backoff period. - s.Peers().RandomizeBackOff(peerInfo.ID) - wg.Add(1) - go func(info *peer.AddrInfo) { - if err := s.connectWithPeer(s.ctx, *info); err != nil { - log.WithError(err).Tracef("Could not connect with peer %s", info.String()) - } - wg.Done() - }(peerInfo) - } - wg.Wait() + }() } } } +// FindAndDialPeersWithSubnets ensures that our node is connected to enough peers. +// If, the threshold is met, then this function immediately returns. +// Otherwise, it searches for new peers and dials them. +// If `ctx“ is canceled while searching for peers, search is stopped, but new found peers are still dialed. +// In this case, the function returns an error. +func (s *Service) findAndDialPeers(ctx context.Context) error { + // Restrict dials if limit is applied. + maxConcurrentDials := math.MaxInt + if flags.MaxDialIsActive() { + maxConcurrentDials = flags.Get().MaxConcurrentDials + } + + missingPeerCount := s.missingPeerCount(s.cfg.MaxPeers) + for missingPeerCount > 0 { + // Stop the search/dialing loop if the context is canceled. + if err := ctx.Err(); err != nil { + return err + } + + peersToDial, err := func() ([]*enode.Node, error) { + ctx, cancel := context.WithTimeout(ctx, batchPeriod) + defer cancel() + + peersToDial, err := s.findPeers(ctx, missingPeerCount) + if err != nil && !errors.Is(err, context.DeadlineExceeded) { + return nil, errors.Wrap(err, "find peers") + } + + return peersToDial, nil + }() + + if err != nil { + return err + } + + dialedPeerCount := s.dialPeers(s.ctx, maxConcurrentDials, peersToDial) + + if dialedPeerCount > missingPeerCount { + missingPeerCount = 0 + continue + } + + missingPeerCount -= dialedPeerCount + } + + return nil +} + +// findAndDialPeers finds new peers until `targetPeerCount` is reached, `batchPeriod` is over, +// the peers iterator is exhausted or the context is canceled. +func (s *Service) findPeers(ctx context.Context, missingPeerCount uint) ([]*enode.Node, error) { + // Create an discovery iterator to find new peers. + iterator := s.dv5Listener.RandomNodes() + + // `iterator.Next` can block indefinitely. `iterator.Close` unblocks it. + // So it is important to close the iterator when the context is done to ensure + // that the search does not hang indefinitely. + go func() { + <-ctx.Done() + iterator.Close() + }() + + // Crawl the network for peers subscribed to the defective subnets. + nodeByNodeID := make(map[enode.ID]*enode.Node) + for missingPeerCount > 0 && iterator.Next() { + if ctx.Err() != nil { + peersToDial := make([]*enode.Node, 0, len(nodeByNodeID)) + for _, node := range nodeByNodeID { + peersToDial = append(peersToDial, node) + } + + return peersToDial, ctx.Err() + } + + // Skip peer not matching the filter. + node := iterator.Node() + if !s.filterPeer(node) { + continue + } + + // Remove duplicates, keeping the node with higher seq. + existing, ok := nodeByNodeID[node.ID()] + if ok && existing.Seq() > node.Seq() { + continue + } + nodeByNodeID[node.ID()] = node + + // We found a new peer. Decrease the missing peer count. + missingPeerCount-- + } + + // Convert the map to a slice. + peersToDial := make([]*enode.Node, 0, len(nodeByNodeID)) + for _, node := range nodeByNodeID { + peersToDial = append(peersToDial, node) + } + + return peersToDial, nil +} + +// missingPeerCount computes how many peers we are missing to reach the target peer count. +func (s *Service) missingPeerCount(targetCount uint) uint { + // Retrieve how many active peers we have. + activePeers := s.Peers().Active() + activePeerCount := uint(len(activePeers)) + + // Compute how many peers we are missing to reach the threshold. + missingPeerCount := uint(0) + if targetCount > activePeerCount { + missingPeerCount = targetCount - activePeerCount + } + + return missingPeerCount +} + func (s *Service) createListener( ipAddr net.IP, privKey *ecdsa.PrivateKey, @@ -562,8 +630,7 @@ func (s *Service) startDiscoveryV5( // 2. Peer hasn't been marked as 'bad'. // 3. Peer is not currently active or connected. // 4. Peer is ready to receive incoming connections. -// 5. Peer's fork digest in their ENR matches that of -// our localnodes. +// 5. Peer's fork digest in their ENR matches that of our localnodes. func (s *Service) filterPeer(node *enode.Node) bool { // Ignore nil node entries passed in. if node == nil { @@ -628,22 +695,24 @@ func (s *Service) filterPeer(node *enode.Node) bool { // This checks our set max peers in our config, and // determines whether our currently connected and // active peers are above our set max peer limit. -func (s *Service) isPeerAtLimit(inbound bool) bool { - numOfConns := len(s.host.Network().Peers()) +func (s *Service) isPeerAtLimit(direction connectivityDirection) bool { maxPeers := int(s.cfg.MaxPeers) - // If we are measuring the limit for inbound peers - // we apply the high watermark buffer. - if inbound { + + // If we are measuring the limit for inbound peers we apply the high watermark buffer. + if direction == inbound { maxPeers += highWatermarkBuffer maxInbound := s.peers.InboundLimit() + highWatermarkBuffer - currInbound := len(s.peers.InboundConnected()) - // Exit early if we are at the inbound limit. - if currInbound >= maxInbound { + inboundCount := len(s.peers.InboundConnected()) + + // Return early if we are at the inbound limit. + if inboundCount >= maxInbound { return true } } - activePeers := len(s.Peers().Active()) - return activePeers >= maxPeers || numOfConns >= maxPeers + + peerCount := len(s.host.Network().Peers()) + activePeerCount := len(s.Peers().Active()) + return activePeerCount >= maxPeers || peerCount >= maxPeers } // isBelowOutboundPeerThreshold checks if the number of outbound peers that @@ -901,7 +970,7 @@ func multiAddrFromString(address string) (ma.Multiaddr, error) { return ma.NewMultiaddr(address) } -func udpVersionFromIP(ipAddr net.IP) int { +func udpVersionFromIP(ipAddr net.IP) udpVersion { if ipAddr.To4() != nil { return udp4 } diff --git a/beacon-chain/p2p/discovery_test.go b/beacon-chain/p2p/discovery_test.go index 3e29e69c0d..66e9007b3d 100644 --- a/beacon-chain/p2p/discovery_test.go +++ b/beacon-chain/p2p/discovery_test.go @@ -323,16 +323,16 @@ func TestMultiAddrConversion_OK(t *testing.T) { } func TestStaticPeering_PeersAreAdded(t *testing.T) { + const port = uint(6000) cs := startup.NewClockSynchronizer() cfg := &Config{ MaxPeers: 30, ClockWaiter: cs, } - port := 6000 var staticPeers []string var hosts []host.Host // setup other nodes - for i := 1; i <= 5; i++ { + for i := uint(1); i <= 5; i++ { h, _, ipaddr := createHost(t, port+i) staticPeers = append(staticPeers, fmt.Sprintf("/ip4/%s/tcp/%d/p2p/%s", ipaddr, port+i, h.ID())) hosts = append(hosts, h) @@ -406,14 +406,14 @@ func TestInboundPeerLimit(t *testing.T) { _ = addPeer(t, s.peers, peerdata.ConnectionState(ethpb.ConnectionState_CONNECTED), false) } - require.Equal(t, true, s.isPeerAtLimit(false), "not at limit for outbound peers") - require.Equal(t, false, s.isPeerAtLimit(true), "at limit for inbound peers") + require.Equal(t, true, s.isPeerAtLimit(all), "not at limit for outbound peers") + require.Equal(t, false, s.isPeerAtLimit(inbound), "at limit for inbound peers") for i := 0; i < highWatermarkBuffer; i++ { _ = addPeer(t, s.peers, peerdata.ConnectionState(ethpb.ConnectionState_CONNECTED), false) } - require.Equal(t, true, s.isPeerAtLimit(true), "not at limit for inbound peers") + require.Equal(t, true, s.isPeerAtLimit(inbound), "not at limit for inbound peers") } func TestOutboundPeerThreshold(t *testing.T) { diff --git a/beacon-chain/p2p/handshake.go b/beacon-chain/p2p/handshake.go index e9c0a82eae..c6910b15e9 100644 --- a/beacon-chain/p2p/handshake.go +++ b/beacon-chain/p2p/handshake.go @@ -10,6 +10,7 @@ import ( "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers" "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers/peerdata" prysmTime "github.com/OffchainLabs/prysm/v6/time" + "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/pkg/errors" @@ -17,6 +18,8 @@ import ( ) const ( + agentVersionKey = "AgentVersion" + // The time to wait for a status request. timeForStatus = 10 * time.Second ) @@ -28,12 +31,15 @@ func peerMultiaddrString(conn network.Conn) string { } func (s *Service) connectToPeer(conn network.Conn) { - s.peers.SetConnectionState(conn.RemotePeer(), peers.Connected) + remotePeer := conn.RemotePeer() + + s.peers.SetConnectionState(remotePeer, peers.Connected) // Go through the handshake process. log.WithFields(logrus.Fields{ "direction": conn.Stat().Direction.String(), "multiAddr": peerMultiaddrString(conn), "activePeers": len(s.peers.Active()), + "agent": agentString(remotePeer, s.Host()), }).Debug("Initiate peer connection") } @@ -61,6 +67,7 @@ func (s *Service) disconnectFromPeerOnError( "multiaddr": peerMultiaddrString(conn), "direction": conn.Stat().Direction.String(), "remainingActivePeers": len(s.peers.Active()), + "agent": agentString(remotePeerID, s.Host()), }). Debug("Initiate peer disconnection") @@ -189,9 +196,10 @@ func (s *Service) AddDisconnectionHandler(handler func(ctx context.Context, id p DisconnectedF: func(net network.Network, conn network.Conn) { peerID := conn.RemotePeer() - log.WithFields(logrus.Fields{ + log := log.WithFields(logrus.Fields{ "multiAddr": peerMultiaddrString(conn), "direction": conn.Stat().Direction.String(), + "agent": agentString(peerID, s.Host()), }) // Must be handled in a goroutine as this callback cannot be blocking. go func() { @@ -222,3 +230,14 @@ func (s *Service) AddDisconnectionHandler(handler func(ctx context.Context, id p }, }) } + +func agentString(pid peer.ID, hst host.Host) string { + rawVersion, storeErr := hst.Peerstore().Get(pid, agentVersionKey) + + result, ok := rawVersion.(string) + if storeErr != nil || !ok { + result = "" + } + + return result +} diff --git a/beacon-chain/p2p/interfaces.go b/beacon-chain/p2p/interfaces.go index 640bca6371..8f285acc12 100644 --- a/beacon-chain/p2p/interfaces.go +++ b/beacon-chain/p2p/interfaces.go @@ -49,7 +49,7 @@ type ( BroadcastBlob(ctx context.Context, subnet uint64, blob *ethpb.BlobSidecar) error BroadcastLightClientOptimisticUpdate(ctx context.Context, update interfaces.LightClientOptimisticUpdate) error BroadcastLightClientFinalityUpdate(ctx context.Context, update interfaces.LightClientFinalityUpdate) error - BroadcastDataColumn(root [fieldparams.RootLength]byte, columnSubnet uint64, dataColumnSidecar *ethpb.DataColumnSidecar, peersChecked ...chan<- bool) error + BroadcastDataColumn(root [fieldparams.RootLength]byte, columnSubnet uint64, dataColumnSidecar *ethpb.DataColumnSidecar) error } // SetStreamHandler configures p2p to handle streams of a certain topic ID. @@ -98,7 +98,7 @@ type ( NodeID() enode.ID DiscoveryAddresses() ([]multiaddr.Multiaddr, error) RefreshPersistentSubnets() - FindPeersWithSubnet(ctx context.Context, topic string, subIndex uint64, threshold int) (bool, error) + FindAndDialPeersWithSubnets(ctx context.Context, topicFormat string, digest [fieldparams.VersionLength]byte, minimumPeersPerSubnet int, subnets map[uint64]bool) error AddPingMethod(reqFunc func(ctx context.Context, id peer.ID) error) } diff --git a/beacon-chain/p2p/peers/status.go b/beacon-chain/p2p/peers/status.go index 59ee1fb370..3240bb35dc 100644 --- a/beacon-chain/p2p/peers/status.go +++ b/beacon-chain/p2p/peers/status.go @@ -393,7 +393,7 @@ func (p *Status) SetNextValidTime(pid peer.ID, nextTime time.Time) { peerData.NextValidTime = nextTime } -// RandomizeBackOff adds extra backoff period during which peer will not be dialed. +// RandomizeBackOff adds extra backoff period during which peer won't be dialed. func (p *Status) RandomizeBackOff(pid peer.ID) { p.store.Lock() defer p.store.Unlock() diff --git a/beacon-chain/p2p/service.go b/beacon-chain/p2p/service.go index 1ceda55002..2191a269dd 100644 --- a/beacon-chain/p2p/service.go +++ b/beacon-chain/p2p/service.go @@ -37,24 +37,28 @@ import ( var _ runtime.Service = (*Service)(nil) -// In the event that we are at our peer limit, we -// stop looking for new peers and instead poll -// for the current peer limit status for the time period -// defined below. -var pollingPeriod = 6 * time.Second +const ( + // When looking for new nodes, if not enough nodes are found, + // we stop after this spent time. + batchPeriod = 2 * time.Second -// When looking for new nodes, if not enough nodes are found, -// we stop after this spent time. -var batchPeriod = 2 * time.Second + // maxBadResponses is the maximum number of bad responses from a peer before we stop talking to it. + maxBadResponses = 5 +) -// Refresh rate of ENR set at twice per slot. -var refreshRate = slots.DivideSlotBy(2) +var ( + // Refresh rate of ENR set at twice per slot. + refreshRate = slots.DivideSlotBy(2) -// maxBadResponses is the maximum number of bad responses from a peer before we stop talking to it. -const maxBadResponses = 5 + // maxDialTimeout is the timeout for a single peer dial. + maxDialTimeout = params.BeaconConfig().RespTimeoutDuration() -// maxDialTimeout is the timeout for a single peer dial. -var maxDialTimeout = params.BeaconConfig().RespTimeoutDuration() + // In the event that we are at our peer limit, we + // stop looking for new peers and instead poll + // for the current peer limit status for the time period + // defined below. + pollingPeriod = 6 * time.Second +) // Service for managing peer to peer (p2p) networking. type Service struct { @@ -251,6 +255,7 @@ func (s *Service) Start() { "inboundTCP": inboundTCPCount, "outboundTCP": outboundTCPCount, "total": total, + "target": s.cfg.MaxPeers, } if features.Get().EnableQUIC { diff --git a/beacon-chain/p2p/service_test.go b/beacon-chain/p2p/service_test.go index f2e65c6053..ea08be2b24 100644 --- a/beacon-chain/p2p/service_test.go +++ b/beacon-chain/p2p/service_test.go @@ -13,6 +13,7 @@ import ( "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers" "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers/scorers" "github.com/OffchainLabs/prysm/v6/beacon-chain/startup" + fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams" "github.com/OffchainLabs/prysm/v6/config/params" "github.com/OffchainLabs/prysm/v6/encoding/bytesutil" "github.com/OffchainLabs/prysm/v6/network/forks" @@ -72,7 +73,7 @@ func (mockListener) RandomNodes() enode.Iterator { func (mockListener) RebootListener() error { panic("implement me") } -func createHost(t *testing.T, port int) (host.Host, *ecdsa.PrivateKey, net.IP) { +func createHost(t *testing.T, port uint) (host.Host, *ecdsa.PrivateKey, net.IP) { _, pkey := createAddrAndPrivKey(t) ipAddr := net.ParseIP("127.0.0.1") listen, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", ipAddr, port)) @@ -185,21 +186,33 @@ func TestService_Start_NoDiscoverFlag(t *testing.T) { } func TestListenForNewNodes(t *testing.T) { + const ( + port = uint(2000) + testPollingPeriod = 1 * time.Second + peerCount = 5 + ) + params.SetupTestConfigCleanup(t) + // Setup bootnode. - notifier := &mock.MockStateNotifier{} - cfg := &Config{StateNotifier: notifier, PingInterval: testPingInterval, DisableLivenessCheck: true} - port := 2000 - cfg.UDPPort = uint(port) + cfg := &Config{ + StateNotifier: &mock.MockStateNotifier{}, + PingInterval: testPingInterval, + DisableLivenessCheck: true, + UDPPort: port, + } + _, pkey := createAddrAndPrivKey(t) ipAddr := net.ParseIP("127.0.0.1") genesisTime := prysmTime.Now() - var gvr [32]byte + var gvr [fieldparams.RootLength]byte + s := &Service{ cfg: cfg, genesisTime: genesisTime, genesisValidatorsRoot: gvr[:], } + bootListener, err := s.createListener(ipAddr, pkey) require.NoError(t, err) defer bootListener.Close() @@ -210,35 +223,40 @@ func TestListenForNewNodes(t *testing.T) { // Use shorter period for testing. currentPeriod := pollingPeriod - pollingPeriod = 1 * time.Second + pollingPeriod = testPollingPeriod defer func() { pollingPeriod = currentPeriod }() bootNode := bootListener.Self() - var listeners []*listenerWrapper - var hosts []host.Host - // setup other nodes. + // Setup other nodes. cs := startup.NewClockSynchronizer() - cfg = &Config{ - Discv5BootStrapAddrs: []string{bootNode.String()}, - PingInterval: testPingInterval, - DisableLivenessCheck: true, - MaxPeers: 30, - ClockWaiter: cs, - } - for i := 1; i <= 5; i++ { + listeners := make([]*listenerWrapper, 0, peerCount) + hosts := make([]host.Host, 0, peerCount) + + for i := uint(1); i <= peerCount; i++ { + cfg = &Config{ + Discv5BootStrapAddrs: []string{bootNode.String()}, + PingInterval: testPingInterval, + DisableLivenessCheck: true, + MaxPeers: peerCount, + ClockWaiter: cs, + UDPPort: port + i, + TCPPort: port + i, + } + h, pkey, ipAddr := createHost(t, port+i) - cfg.UDPPort = uint(port + i) - cfg.TCPPort = uint(port + i) + s := &Service{ cfg: cfg, genesisTime: genesisTime, genesisValidatorsRoot: gvr[:], } + listener, err := s.startDiscoveryV5(ipAddr, pkey) - assert.NoError(t, err, "Could not start discovery for node") + require.NoError(t, err, "Could not start discovery for node") + listeners = append(listeners, listener) hosts = append(hosts, h) } @@ -263,19 +281,26 @@ func TestListenForNewNodes(t *testing.T) { s, err = NewService(t.Context(), cfg) require.NoError(t, err) - exitRoutine := make(chan bool) - go func() { - s.Start() - <-exitRoutine - }() - time.Sleep(1 * time.Second) - require.NoError(t, cs.SetClock(startup.NewClock(genesisTime, gvr))) + go s.Start() - time.Sleep(4 * time.Second) - assert.Equal(t, 5, len(s.host.Network().Peers()), "Not all peers added to peerstore") - require.NoError(t, s.Stop()) - exitRoutine <- true + err = cs.SetClock(startup.NewClock(genesisTime, gvr)) + require.NoError(t, err, "Could not set clock in service") + + actualPeerCount := len(s.host.Network().Peers()) + for range 40 { + if actualPeerCount == peerCount { + break + } + + time.Sleep(100 * time.Millisecond) + actualPeerCount = len(s.host.Network().Peers()) + } + + assert.Equal(t, peerCount, actualPeerCount, "Not all peers added to peerstore") + + err = s.Stop() + require.NoError(t, err, "Failed to stop service") } func TestPeer_Disconnect(t *testing.T) { diff --git a/beacon-chain/p2p/subnets.go b/beacon-chain/p2p/subnets.go index 68e357bf0e..7addca6950 100644 --- a/beacon-chain/p2p/subnets.go +++ b/beacon-chain/p2p/subnets.go @@ -2,6 +2,7 @@ package p2p import ( "context" + "fmt" "math" "strings" "sync" @@ -11,6 +12,7 @@ import ( "github.com/OffchainLabs/prysm/v6/beacon-chain/core/helpers" "github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas" "github.com/OffchainLabs/prysm/v6/cmd/beacon-chain/flags" + fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams" "github.com/OffchainLabs/prysm/v6/config/params" "github.com/OffchainLabs/prysm/v6/consensus-types/primitives" "github.com/OffchainLabs/prysm/v6/consensus-types/wrapper" @@ -23,7 +25,6 @@ import ( "github.com/holiman/uint256" "github.com/pkg/errors" "github.com/prysmaticlabs/go-bitfield" - "github.com/sirupsen/logrus" ) var ( @@ -57,249 +58,297 @@ const blobSubnetLockerVal = 110 const dataColumnSubnetVal = 150 // nodeFilter returns a function that filters nodes based on the subnet topic and subnet index. -func (s *Service) nodeFilter(topic string, index uint64) (func(node *enode.Node) bool, error) { +func (s *Service) nodeFilter(topic string, indices map[uint64]int) (func(node *enode.Node) (map[uint64]bool, error), error) { switch { case strings.Contains(topic, GossipAttestationMessage): - return s.filterPeerForAttSubnet(index), nil + return s.filterPeerForAttSubnet(indices), nil case strings.Contains(topic, GossipSyncCommitteeMessage): - return s.filterPeerForSyncSubnet(index), nil + return s.filterPeerForSyncSubnet(indices), nil case strings.Contains(topic, GossipBlobSidecarMessage): - return s.filterPeerForBlobSubnet(), nil + return s.filterPeerForBlobSubnet(indices), nil case strings.Contains(topic, GossipDataColumnSidecarMessage): - return s.filterPeerForDataColumnsSubnet(index), nil + return s.filterPeerForDataColumnsSubnet(indices), nil default: return nil, errors.Errorf("no subnet exists for provided topic: %s", topic) } } -// searchForPeers performs a network search for peers subscribed to a particular subnet. -// It exits as soon as one of these conditions is met: -// - It looped through `batchSize` nodes. -// - It found `peersToFindCount“ peers corresponding to the `filter` criteria. -// - Iterator is exhausted. -func searchForPeers( - iterator enode.Iterator, - batchPeriod time.Duration, - peersToFindCount uint, - filter func(node *enode.Node) bool, -) []*enode.Node { - nodeFromNodeID := make(map[enode.ID]*enode.Node) - start := time.Now() +// FindAndDialPeersWithSubnets ensures that our node is connected to at least `minimumPeersPerSubnet` +// peers for each subnet listed in `subnets`. +// If, for all subnets, the threshold is met, then this function immediately returns. +// Otherwise, it searches for new peers for defective subnets, and dials them. +// If `ctx“ is canceled while searching for peers, search is stopped, but new found peers are still dialed. +// In this case, the function returns an error. +func (s *Service) FindAndDialPeersWithSubnets( + ctx context.Context, + topicFormat string, + digest [fieldparams.VersionLength]byte, + minimumPeersPerSubnet int, + subnets map[uint64]bool, +) error { + ctx, span := trace.StartSpan(ctx, "p2p.FindAndDialPeersWithSubnet") + defer span.End() - for time.Since(start) < batchPeriod && uint(len(nodeFromNodeID)) < peersToFindCount && iterator.Next() { + // Return early if the discovery listener isn't set. + if s.dv5Listener == nil { + return nil + } + + // Restrict dials if limit is applied. + maxConcurrentDials := math.MaxInt + if flags.MaxDialIsActive() { + maxConcurrentDials = flags.Get().MaxConcurrentDials + } + + defectiveSubnets := s.defectiveSubnets(topicFormat, digest, minimumPeersPerSubnet, subnets) + for len(defectiveSubnets) > 0 { + // Stop the search/dialing loop if the context is canceled. + if err := ctx.Err(); err != nil { + return err + } + + peersToDial, err := func() ([]*enode.Node, error) { + ctx, cancel := context.WithTimeout(ctx, batchPeriod) + defer cancel() + + peersToDial, err := s.findPeersWithSubnets(ctx, topicFormat, digest, minimumPeersPerSubnet, defectiveSubnets) + if err != nil && !errors.Is(err, context.DeadlineExceeded) { + return nil, errors.Wrap(err, "find peers with subnets") + } + + return peersToDial, nil + }() + + if err != nil { + return err + } + + // Dial new peers in batches. + s.dialPeers(s.ctx, maxConcurrentDials, peersToDial) + + defectiveSubnets = s.defectiveSubnets(topicFormat, digest, minimumPeersPerSubnet, subnets) + } + + return nil +} + +// findPeersWithSubnets finds peers subscribed to defective subnets in batches +// until enough peers are found or the context is canceled. +// It returns new peers found during the search. +func (s *Service) findPeersWithSubnets( + ctx context.Context, + topicFormat string, + digest [fieldparams.VersionLength]byte, + minimumPeersPerSubnet int, + defectiveSubnetsOrigin map[uint64]int, +) ([]*enode.Node, error) { + // Copy the defective subnets map to avoid modifying the original map. + defectiveSubnets := make(map[uint64]int, len(defectiveSubnetsOrigin)) + for k, v := range defectiveSubnetsOrigin { + defectiveSubnets[k] = v + } + + // Create an discovery iterator to find new peers. + iterator := s.dv5Listener.RandomNodes() + + // `iterator.Next` can block indefinitely. `iterator.Close` unblocks it. + // So it is important to close the iterator when the context is done to ensure + // that the search does not hang indefinitely. + go func() { + <-ctx.Done() + iterator.Close() + }() + + // Retrieve the filter function that will be used to filter nodes based on the defective subnets. + filter, err := s.nodeFilter(topicFormat, defectiveSubnets) + if err != nil { + return nil, errors.Wrap(err, "node filter") + } + + // Crawl the network for peers subscribed to the defective subnets. + nodeByNodeID := make(map[enode.ID]*enode.Node) + for len(defectiveSubnets) > 0 && iterator.Next() { + if err := ctx.Err(); err != nil { + // Convert the map to a slice. + peersToDial := make([]*enode.Node, 0, len(nodeByNodeID)) + for _, node := range nodeByNodeID { + peersToDial = append(peersToDial, node) + } + + return peersToDial, err + } + + // Get all needed subnets that the node is subscribed to. + // Skip nodes that are not subscribed to any of the defective subnets. node := iterator.Node() - - // Filter out nodes that do not meet the criteria. - if !filter(node) { + nodeSubnets, err := filter(node) + if err != nil { + return nil, errors.Wrap(err, "filter node") + } + if len(nodeSubnets) == 0 { continue } // Remove duplicates, keeping the node with higher seq. - prevNode, ok := nodeFromNodeID[node.ID()] - if ok && prevNode.Seq() > node.Seq() { + existing, ok := nodeByNodeID[node.ID()] + if ok && existing.Seq() > node.Seq() { continue } + nodeByNodeID[node.ID()] = node - nodeFromNodeID[node.ID()] = node + // We found a new peer. Modify the defective subnets map + // and the filter accordingly. + for subnet := range defectiveSubnets { + if !nodeSubnets[subnet] { + continue + } + + defectiveSubnets[subnet]-- + + if defectiveSubnets[subnet] == 0 { + delete(defectiveSubnets, subnet) + } + + filter, err = s.nodeFilter(topicFormat, defectiveSubnets) + if err != nil { + return nil, errors.Wrap(err, "node filter") + } + } } // Convert the map to a slice. - nodes := make([]*enode.Node, 0, len(nodeFromNodeID)) - for _, node := range nodeFromNodeID { - nodes = append(nodes, node) + peersToDial := make([]*enode.Node, 0, len(nodeByNodeID)) + for _, node := range nodeByNodeID { + peersToDial = append(peersToDial, node) } - return nodes + return peersToDial, nil } -// dialPeer dials a peer in a separate goroutine. -func (s *Service) dialPeer(ctx context.Context, wg *sync.WaitGroup, node *enode.Node) { - info, _, err := convertToAddrInfo(node) - if err != nil { - return - } - - if info == nil { - return - } - - wg.Add(1) - go func() { - if err := s.connectWithPeer(ctx, *info); err != nil { - log.WithError(err).Tracef("Could not connect with peer %s", info.String()) +// defectiveSubnets returns a map of subnets that have fewer than the minimum peer count. +func (s *Service) defectiveSubnets( + topicFormat string, + digest [fieldparams.VersionLength]byte, + minimumPeersPerSubnet int, + subnets map[uint64]bool, +) map[uint64]int { + missingCountPerSubnet := make(map[uint64]int, len(subnets)) + for subnet := range subnets { + topic := fmt.Sprintf(topicFormat, digest, subnet) + s.Encoding().ProtocolSuffix() + peers := s.pubsub.ListPeers(topic) + peerCount := len(peers) + if peerCount < minimumPeersPerSubnet { + missingCountPerSubnet[subnet] = minimumPeersPerSubnet - peerCount } + } - wg.Done() - }() + return missingCountPerSubnet } -// FindPeersWithSubnet performs a network search for peers -// subscribed to a particular subnet. Then it tries to connect -// with those peers. This method will block until either: -// - the required amount of peers are found, or -// - the context is terminated. -// On some edge cases, this method may hang indefinitely while peers -// are actually found. In such a case, the user should cancel the context -// and re-run the method again. -func (s *Service) FindPeersWithSubnet( - ctx context.Context, - topic string, - index uint64, - threshold int, -) (bool, error) { - const minLogInterval = 1 * time.Minute +// dialPeers dials multiple peers concurrently up to `maxConcurrentDials` at a time. +// In case of a dial failure, it logs the error but continues dialing other peers. +func (s *Service) dialPeers(ctx context.Context, maxConcurrentDials int, nodes []*enode.Node) uint { + var mut sync.Mutex - ctx, span := trace.StartSpan(ctx, "p2p.FindPeersWithSubnet") - defer span.End() - - span.SetAttributes(trace.Int64Attribute("index", int64(index))) // lint:ignore uintcast -- It's safe to do this for tracing. - - if s.dv5Listener == nil { - // Return if discovery isn't set - return false, nil - } - - topic += s.Encoding().ProtocolSuffix() - iterator := s.dv5Listener.RandomNodes() - defer iterator.Close() - - filter, err := s.nodeFilter(topic, index) - if err != nil { - return false, errors.Wrap(err, "node filter") - } - - peersSummary := func(topic string, threshold int) (int, int) { - // Retrieve how many peers we have for this topic. - peerCountForTopic := len(s.pubsub.ListPeers(topic)) - - // Compute how many peers we are missing to reach the threshold. - missingPeerCountForTopic := max(0, threshold-peerCountForTopic) - - return peerCountForTopic, missingPeerCountForTopic - } - - // Compute how many peers we are missing to reach the threshold. - peerCountForTopic, missingPeerCountForTopic := peersSummary(topic, threshold) - - // Exit early if we have enough peers. - if missingPeerCountForTopic == 0 { - return true, nil - } - - log := log.WithFields(logrus.Fields{ - "topic": topic, - "targetPeerCount": threshold, - }) - - log.WithField("currentPeerCount", peerCountForTopic).Debug("Searching for new peers for a subnet - start") - - lastLogTime := time.Now() - - wg := new(sync.WaitGroup) - for { - // If the context is done, we can exit the loop. This is the unhappy path. - if err := ctx.Err(); err != nil { - return false, errors.Errorf( - "unable to find requisite number of peers for topic %s - only %d out of %d peers available after searching", - topic, peerCountForTopic, threshold, - ) + counter := uint(0) + for start := 0; start < len(nodes); start += maxConcurrentDials { + if ctx.Err() != nil { + return counter } - // Search for new peers in the network. - nodes := searchForPeers(iterator, batchPeriod, uint(missingPeerCountForTopic), filter) - - // Restrict dials if limit is applied. - maxConcurrentDials := math.MaxInt - if flags.MaxDialIsActive() { - maxConcurrentDials = flags.Get().MaxConcurrentDials - } - - // Dial the peers in batches. - for start := 0; start < len(nodes); start += maxConcurrentDials { - stop := min(start+maxConcurrentDials, len(nodes)) - for _, node := range nodes[start:stop] { - s.dialPeer(ctx, wg, node) + var wg sync.WaitGroup + stop := min(start+maxConcurrentDials, len(nodes)) + for _, node := range nodes[start:stop] { + log := log.WithField("nodeID", node.ID()) + info, _, err := convertToAddrInfo(node) + if err != nil { + log.WithError(err).Debug("Could not convert node to addr info") + continue } - // Wait for all dials to be completed. - wg.Wait() + if info == nil { + log.Debug("Nil addr info") + continue + } + + wg.Add(1) + go func() { + defer wg.Done() + if err := s.connectWithPeer(ctx, *info); err != nil { + log.WithError(err).WithField("info", info.String()).Debug("Could not connect with peer") + return + } + + mut.Lock() + defer mut.Unlock() + counter++ + }() } - peerCountForTopic, missingPeerCountForTopic := peersSummary(topic, threshold) - - // If we have enough peers, we can exit the loop. This is the happy path. - if missingPeerCountForTopic == 0 { - break - } - - if time.Since(lastLogTime) > minLogInterval { - lastLogTime = time.Now() - log.WithField("currentPeerCount", peerCountForTopic).Debug("Searching for new peers for a subnet - continue") - } + wg.Wait() } - log.WithField("currentPeerCount", threshold).Debug("Searching for new peers for a subnet - success") - return true, nil + return counter } -// returns a method with filters peers specifically for a particular attestation subnet. -func (s *Service) filterPeerForAttSubnet(index uint64) func(node *enode.Node) bool { - return func(node *enode.Node) bool { +// filterPeerForAttSubnet returns a method with filters peers specifically for a particular attestation subnet. +func (s *Service) filterPeerForAttSubnet(indices map[uint64]int) func(node *enode.Node) (map[uint64]bool, error) { + return func(node *enode.Node) (map[uint64]bool, error) { if !s.filterPeer(node) { - return false + return map[uint64]bool{}, nil } - subnets, err := attSubnets(node.Record()) + subnets, err := attestationSubnets(node.Record()) if err != nil { - return false + return nil, errors.Wrap(err, "attestation subnets") } - return subnets[index] + return intersect(indices, subnets), nil } } // returns a method with filters peers specifically for a particular sync subnet. -func (s *Service) filterPeerForSyncSubnet(index uint64) func(node *enode.Node) bool { - return func(node *enode.Node) bool { +func (s *Service) filterPeerForSyncSubnet(indices map[uint64]int) func(node *enode.Node) (map[uint64]bool, error) { + return func(node *enode.Node) (map[uint64]bool, error) { if !s.filterPeer(node) { - return false + return map[uint64]bool{}, nil } + subnets, err := syncSubnets(node.Record()) if err != nil { - return false + return nil, errors.Wrap(err, "sync subnets") } - indExists := false - for _, comIdx := range subnets { - if comIdx == index { - indExists = true - break - } - } - return indExists + + return intersect(indices, subnets), nil } } // returns a method with filters peers specifically for a particular blob subnet. // All peers are supposed to be subscribed to all blob subnets. -func (s *Service) filterPeerForBlobSubnet() func(_ *enode.Node) bool { - return func(_ *enode.Node) bool { - return true +func (s *Service) filterPeerForBlobSubnet(indices map[uint64]int) func(_ *enode.Node) (map[uint64]bool, error) { + result := make(map[uint64]bool, len(indices)) + for i := range indices { + result[i] = true + } + + return func(_ *enode.Node) (map[uint64]bool, error) { + return result, nil } } // returns a method with filters peers specifically for a particular data column subnet. -func (s *Service) filterPeerForDataColumnsSubnet(index uint64) func(node *enode.Node) bool { - return func(node *enode.Node) bool { +func (s *Service) filterPeerForDataColumnsSubnet(indices map[uint64]int) func(node *enode.Node) (map[uint64]bool, error) { + return func(node *enode.Node) (map[uint64]bool, error) { if !s.filterPeer(node) { - return false + return map[uint64]bool{}, nil } subnets, err := dataColumnSubnets(node.ID(), node.Record()) if err != nil { - return false + return nil, errors.Wrap(err, "data column subnets") } - return subnets[index] + return intersect(indices, subnets), nil } } @@ -475,43 +524,47 @@ func initializeSyncCommSubnets(node *enode.LocalNode) *enode.LocalNode { // Reads the attestation subnets entry from a node's ENR and determines // the committee indices of the attestation subnets the node is subscribed to. -func attSubnets(record *enr.Record) (map[uint64]bool, error) { +func attestationSubnets(record *enr.Record) (map[uint64]bool, error) { bitV, err := attBitvector(record) if err != nil { - return nil, err - } - committeeIdxs := make(map[uint64]bool) - // lint:ignore uintcast -- subnet count can be safely cast to int. - if len(bitV) != byteCount(int(attestationSubnetCount)) { - return committeeIdxs, errors.Errorf("invalid bitvector provided, it has a size of %d", len(bitV)) + return nil, errors.Wrap(err, "att bit vector") } - for i := uint64(0); i < attestationSubnetCount; i++ { + // lint:ignore uintcast -- subnet count can be safely cast to int. + if len(bitV) != byteCount(int(attestationSubnetCount)) { + return nil, errors.Errorf("invalid bitvector provided, it has a size of %d", len(bitV)) + } + + indices := make(map[uint64]bool, attestationSubnetCount) + for i := range attestationSubnetCount { if bitV.BitAt(i) { - committeeIdxs[i] = true + indices[i] = true } } - return committeeIdxs, nil + + return indices, nil } // Reads the sync subnets entry from a node's ENR and determines // the committee indices of the sync subnets the node is subscribed to. -func syncSubnets(record *enr.Record) ([]uint64, error) { +func syncSubnets(record *enr.Record) (map[uint64]bool, error) { bitV, err := syncBitvector(record) if err != nil { - return nil, err + return nil, errors.Wrap(err, "sync bit vector") } + // lint:ignore uintcast -- subnet count can be safely cast to int. if len(bitV) != byteCount(int(syncCommsSubnetCount)) { - return []uint64{}, errors.Errorf("invalid bitvector provided, it has a size of %d", len(bitV)) + return nil, errors.Errorf("invalid bitvector provided, it has a size of %d", len(bitV)) } - var committeeIdxs []uint64 - for i := uint64(0); i < syncCommsSubnetCount; i++ { + + indices := make(map[uint64]bool, syncCommsSubnetCount) + for i := range syncCommsSubnetCount { if bitV.BitAt(i) { - committeeIdxs = append(committeeIdxs, i) + indices[i] = true } } - return committeeIdxs, nil + return indices, nil } // Retrieve the data columns subnets from a node's ENR and node ID. @@ -585,3 +638,16 @@ func byteCount(bitCount int) int { } return numOfBytes } + +// interesect intersects two maps and returns a new map containing only the keys +// that are present in both maps. +func intersect(left map[uint64]int, right map[uint64]bool) map[uint64]bool { + result := make(map[uint64]bool, min(len(left), len(right))) + for i := range left { + if right[i] { + result[i] = true + } + } + + return result +} diff --git a/beacon-chain/p2p/subnets_test.go b/beacon-chain/p2p/subnets_test.go index faa6b17ed9..537eecb07f 100644 --- a/beacon-chain/p2p/subnets_test.go +++ b/beacon-chain/p2p/subnets_test.go @@ -5,7 +5,6 @@ import ( "crypto/rand" "encoding/hex" "fmt" - "reflect" "testing" "time" @@ -22,7 +21,7 @@ import ( "github.com/prysmaticlabs/go-bitfield" ) -func TestStartDiscV5_FindPeersWithSubnet(t *testing.T) { +func TestStartDiscV5_FindAndDialPeersWithSubnet(t *testing.T) { // Topology of this test: // // @@ -37,7 +36,12 @@ func TestStartDiscV5_FindPeersWithSubnet(t *testing.T) { // In our case: The node i is subscribed to subnet i, with i = 1, 2, 3 // Define the genesis validators root, to ensure everybody is on the same network. - const genesisValidatorRootStr = "0xdeadbeefcafecafedeadbeefcafecafedeadbeefcafecafedeadbeefcafecafe" + const ( + genesisValidatorRootStr = "0xdeadbeefcafecafedeadbeefcafecafedeadbeefcafecafedeadbeefcafecafe" + subnetCount = 3 + minimumPeersPerSubnet = 1 + ) + genesisValidatorsRoot, err := hex.DecodeString(genesisValidatorRootStr[2:]) require.NoError(t, err) @@ -87,13 +91,12 @@ func TestStartDiscV5_FindPeersWithSubnet(t *testing.T) { // Create 3 nodes, each subscribed to a different subnet. // Each node is connected to the bootstrap node. - services := make([]*Service, 0, 3) + services := make([]*Service, 0, subnetCount) - for i := 1; i <= 3; i++ { - subnet := uint64(i) + for i := uint64(1); i <= subnetCount; i++ { service, err := NewService(ctx, &Config{ Discv5BootStrapAddrs: []string{bootNodeENR}, - MaxPeers: 30, + MaxPeers: 0, // Set to 0 to ensure that peers are discovered via subnets search, and not generic peers discovery. UDPPort: uint(2000 + i), TCPPort: uint(3000 + i), QUICPort: uint(3000 + i), @@ -115,12 +118,13 @@ func TestStartDiscV5_FindPeersWithSubnet(t *testing.T) { // Set the ENR `attnets`, used by Prysm to filter peers by subnet. bitV := bitfield.NewBitvector64() - bitV.SetBitAt(subnet, true) + bitV.SetBitAt(i, true) entry := enr.WithEntry(attSubnetEnrKey, &bitV) service.dv5Listener.LocalNode().Set(entry) // Join and subscribe to the subnet, needed by libp2p. - topic, err := service.pubsub.Join(fmt.Sprintf(AttestationSubnetTopicFormat, bootNodeForkDigest, subnet) + "/ssz_snappy") + topicName := fmt.Sprintf(AttestationSubnetTopicFormat, bootNodeForkDigest, i) + "/ssz_snappy" + topic, err := service.pubsub.Join(topicName) require.NoError(t, err) _, err = topic.Subscribe() @@ -160,37 +164,18 @@ func TestStartDiscV5_FindPeersWithSubnet(t *testing.T) { require.NoError(t, err) }() - // Look up 3 different subnets. - exists := make([]bool, 0, 3) - for i := 1; i <= 3; i++ { - subnet := uint64(i) - topic := fmt.Sprintf(AttestationSubnetTopicFormat, bootNodeForkDigest, subnet) + subnets := map[uint64]bool{1: true, 2: true, 3: true} + defectiveSubnets := service.defectiveSubnets(AttestationSubnetTopicFormat, bootNodeForkDigest, minimumPeersPerSubnet, subnets) + require.Equal(t, subnetCount, len(defectiveSubnets)) - exist := false + ctxWithTimeOut, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() - // This for loop is used to ensure we don't get stuck in `FindPeersWithSubnet`. - // Read the documentation of `FindPeersWithSubnet` for more details. - for j := 0; j < 3; j++ { - ctxWithTimeOut, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() + err = service.FindAndDialPeersWithSubnets(ctxWithTimeOut, AttestationSubnetTopicFormat, bootNodeForkDigest, minimumPeersPerSubnet, subnets) + require.NoError(t, err) - exist, err = service.FindPeersWithSubnet(ctxWithTimeOut, topic, subnet, 1) - require.NoError(t, err) - - if exist { - break - } - } - - require.NoError(t, err) - exists = append(exists, exist) - - } - - // Check if all peers are found. - for _, exist := range exists { - require.Equal(t, true, exist, "Peer with subnet doesn't exist") - } + defectiveSubnets = service.defectiveSubnets(AttestationSubnetTopicFormat, bootNodeForkDigest, minimumPeersPerSubnet, subnets) + require.Equal(t, 0, len(defectiveSubnets)) } func Test_AttSubnets(t *testing.T) { @@ -305,37 +290,34 @@ func Test_AttSubnets(t *testing.T) { wantErr: false, }, } + for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { db, err := enode.OpenDB("") - assert.NoError(t, err) + require.NoError(t, err) priv, _, err := crypto.GenerateSecp256k1Key(rand.Reader) - assert.NoError(t, err) + require.NoError(t, err) convertedKey, err := ecdsaprysm.ConvertFromInterfacePrivKey(priv) - assert.NoError(t, err) + require.NoError(t, err) localNode := enode.NewLocalNode(db, convertedKey) record := tt.record(localNode) - got, err := attSubnets(record) + got, err := attestationSubnets(record) if (err != nil) != tt.wantErr { - t.Errorf("syncSubnets() error = %v, wantErr %v", err, tt.wantErr) + t.Errorf("attestationSubnets() error = %v, wantErr %v", err, tt.wantErr) return } if tt.wantErr { - assert.ErrorContains(t, tt.errContains, err) + require.ErrorContains(t, tt.errContains, err) } - want := make(map[uint64]bool, len(tt.want)) + require.Equal(t, len(tt.want), len(got)) for _, subnet := range tt.want { - want[subnet] = true - } - - if !reflect.DeepEqual(got, want) { - t.Errorf("syncSubnets() got = %v, want %v", got, want) + require.Equal(t, true, got[subnet]) } }) } @@ -494,11 +476,14 @@ func Test_SyncSubnets(t *testing.T) { t.Errorf("syncSubnets() error = %v, wantErr %v", err, tt.wantErr) return } + if tt.wantErr { assert.ErrorContains(t, tt.errContains, err) } - if !reflect.DeepEqual(got, tt.want) { - t.Errorf("syncSubnets() got = %v, want %v", got, tt.want) + + require.Equal(t, len(tt.want), len(got)) + for _, subnet := range tt.want { + require.Equal(t, true, got[subnet]) } }) } diff --git a/beacon-chain/p2p/testing/fuzz_p2p.go b/beacon-chain/p2p/testing/fuzz_p2p.go index 9d2f8cd3f5..f73bbbd97c 100644 --- a/beacon-chain/p2p/testing/fuzz_p2p.go +++ b/beacon-chain/p2p/testing/fuzz_p2p.go @@ -68,9 +68,9 @@ func (*FakeP2P) DiscoveryAddresses() ([]multiaddr.Multiaddr, error) { return nil, nil } -// FindPeersWithSubnet mocks the p2p func. -func (*FakeP2P) FindPeersWithSubnet(_ context.Context, _ string, _ uint64, _ int) (bool, error) { - return false, nil +// FindAndDialPeersWithSubnets mocks the p2p func. +func (*FakeP2P) FindAndDialPeersWithSubnets(ctx context.Context, topicFormat string, digest [fieldparams.VersionLength]byte, minimumPeersPerSubnet int, subnets map[uint64]bool) error { + return nil } // RefreshPersistentSubnets mocks the p2p func. @@ -167,7 +167,7 @@ func (*FakeP2P) BroadcastLightClientFinalityUpdate(_ context.Context, _ interfac } // BroadcastDataColumn -- fake. -func (*FakeP2P) BroadcastDataColumn(_ [fieldparams.RootLength]byte, _ uint64, _ *ethpb.DataColumnSidecar, _ ...chan<- bool) error { +func (*FakeP2P) BroadcastDataColumn(_ [fieldparams.RootLength]byte, _ uint64, _ *ethpb.DataColumnSidecar) error { return nil } diff --git a/beacon-chain/p2p/testing/mock_broadcaster.go b/beacon-chain/p2p/testing/mock_broadcaster.go index 209cefc564..abdb811d4f 100644 --- a/beacon-chain/p2p/testing/mock_broadcaster.go +++ b/beacon-chain/p2p/testing/mock_broadcaster.go @@ -63,7 +63,7 @@ func (m *MockBroadcaster) BroadcastLightClientFinalityUpdate(_ context.Context, } // BroadcastDataColumn broadcasts a data column for mock. -func (m *MockBroadcaster) BroadcastDataColumn([fieldparams.RootLength]byte, uint64, *ethpb.DataColumnSidecar, ...chan<- bool) error { +func (m *MockBroadcaster) BroadcastDataColumn([fieldparams.RootLength]byte, uint64, *ethpb.DataColumnSidecar) error { m.BroadcastCalled.Store(true) return nil } diff --git a/beacon-chain/p2p/testing/mock_peermanager.go b/beacon-chain/p2p/testing/mock_peermanager.go index a23341ff69..fed8926c67 100644 --- a/beacon-chain/p2p/testing/mock_peermanager.go +++ b/beacon-chain/p2p/testing/mock_peermanager.go @@ -4,6 +4,7 @@ import ( "context" "errors" + fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enr" "github.com/libp2p/go-libp2p/core/host" @@ -56,9 +57,9 @@ func (m *MockPeerManager) DiscoveryAddresses() ([]multiaddr.Multiaddr, error) { // RefreshPersistentSubnets . func (*MockPeerManager) RefreshPersistentSubnets() {} -// FindPeersWithSubnet . -func (*MockPeerManager) FindPeersWithSubnet(_ context.Context, _ string, _ uint64, _ int) (bool, error) { - return true, nil +// FindAndDialPeersWithSubnet . +func (*MockPeerManager) FindAndDialPeersWithSubnets(ctx context.Context, topicFormat string, digest [fieldparams.VersionLength]byte, minimumPeersPerSubnet int, subnets map[uint64]bool) error { + return nil } // AddPingMethod . diff --git a/beacon-chain/p2p/testing/p2p.go b/beacon-chain/p2p/testing/p2p.go index 0eab911c06..25814844e5 100644 --- a/beacon-chain/p2p/testing/p2p.go +++ b/beacon-chain/p2p/testing/p2p.go @@ -224,7 +224,7 @@ func (p *TestP2P) BroadcastLightClientFinalityUpdate(_ context.Context, _ interf } // BroadcastDataColumn broadcasts a data column for mock. -func (p *TestP2P) BroadcastDataColumn([fieldparams.RootLength]byte, uint64, *ethpb.DataColumnSidecar, ...chan<- bool) error { +func (p *TestP2P) BroadcastDataColumn([fieldparams.RootLength]byte, uint64, *ethpb.DataColumnSidecar) error { p.BroadcastCalled.Store(true) return nil } @@ -408,9 +408,9 @@ func (p *TestP2P) Peers() *peers.Status { return p.peers } -// FindPeersWithSubnet mocks the p2p func. -func (*TestP2P) FindPeersWithSubnet(_ context.Context, _ string, _ uint64, _ int) (bool, error) { - return false, nil +// FindAndDialPeersWithSubnets mocks the p2p func. +func (*TestP2P) FindAndDialPeersWithSubnets(ctx context.Context, topicFormat string, digest [fieldparams.VersionLength]byte, minimumPeersPerSubnet int, subnets map[uint64]bool) error { + return nil } // RefreshPersistentSubnets mocks the p2p func. diff --git a/beacon-chain/sync/error.go b/beacon-chain/sync/error.go index 301cdb9b49..c67af0417f 100644 --- a/beacon-chain/sync/error.go +++ b/beacon-chain/sync/error.go @@ -109,7 +109,12 @@ func isValidStreamError(err error) bool { func closeStream(stream network.Stream, log *logrus.Entry) { if err := stream.Close(); isValidStreamError(err) { - log.WithError(err).Debugf("Could not reset stream with protocol %s", stream.Protocol()) + log.WithError(err). + WithFields(logrus.Fields{ + "protocol": stream.Protocol(), + "peer": stream.Conn().RemotePeer(), + }). + Debug("Could not close stream") } } @@ -118,7 +123,12 @@ func closeStreamAndWait(stream network.Stream, log *logrus.Entry) { _err := stream.Reset() _ = _err if isValidStreamError(err) { - log.WithError(err).Debugf("Could not reset stream with protocol %s", stream.Protocol()) + log.WithError(err). + WithFields(logrus.Fields{ + "protocol": stream.Protocol(), + "peer": stream.Conn().RemotePeer(), + }). + Debug("Could not reset stream") } return } diff --git a/beacon-chain/sync/metrics.go b/beacon-chain/sync/metrics.go index 186e80d192..64b4c2bb9d 100644 --- a/beacon-chain/sync/metrics.go +++ b/beacon-chain/sync/metrics.go @@ -236,7 +236,7 @@ func (s *Service) updateMetrics() { if err != nil { log.WithError(err).Debugf("Could not compute fork digest") } - indices := s.aggregatorSubnetIndices(s.cfg.clock.CurrentSlot()) + indices := aggregatorSubnetIndices(s.cfg.clock.CurrentSlot()) syncIndices := cache.SyncSubnetIDs.GetAllSubnets(slots.ToEpoch(s.cfg.clock.CurrentSlot())) attTopic := p2p.GossipTypeMapping[reflect.TypeOf(&pb.Attestation{})] syncTopic := p2p.GossipTypeMapping[reflect.TypeOf(&pb.SyncCommitteeMessage{})] diff --git a/beacon-chain/sync/rpc_goodbye.go b/beacon-chain/sync/rpc_goodbye.go index 73bf93508b..a1e053997a 100644 --- a/beacon-chain/sync/rpc_goodbye.go +++ b/beacon-chain/sync/rpc_goodbye.go @@ -42,7 +42,7 @@ func (s *Service) goodbyeRPCHandler(_ context.Context, msg interface{}, stream l return fmt.Errorf("wrong message type for goodbye, got %T, wanted *uint64", msg) } if err := s.rateLimiter.validateRequest(stream, 1); err != nil { - log.WithError(err).Debug("Goodbye message from rate-limited peer.") + log.WithError(err).Debug("Goodbye message from rate-limited peer") } else { s.rateLimiter.add(stream, 1) } @@ -65,7 +65,12 @@ func (s *Service) disconnectBadPeer(ctx context.Context, id peer.ID, badPeerErr log.WithError(err).Debug("Error when disconnecting with bad peer") } - log.WithError(badPeerErr).WithField("peerID", id).Debug("Initiate peer disconnection") + log.WithError(badPeerErr). + WithFields(logrus.Fields{ + "peerID": id, + "agent": agentString(id, s.cfg.p2p.Host()), + }). + Debug("Sent peer disconnection") } // A custom goodbye method that is used by our connection handler, in the diff --git a/beacon-chain/sync/rpc_status.go b/beacon-chain/sync/rpc_status.go index fbc41bbd44..6f297cc69b 100644 --- a/beacon-chain/sync/rpc_status.go +++ b/beacon-chain/sync/rpc_status.go @@ -217,6 +217,7 @@ func (s *Service) statusRPCHandler(ctx context.Context, msg interface{}, stream log.WithFields(logrus.Fields{ "peer": remotePeer, "error": err, + "agent": agentString(remotePeer, s.cfg.p2p.Host()), }).Debug("Invalid status message from peer") var respCode byte diff --git a/beacon-chain/sync/subscriber.go b/beacon-chain/sync/subscriber.go index 51d59bd837..845a8a46fe 100644 --- a/beacon-chain/sync/subscriber.go +++ b/beacon-chain/sync/subscriber.go @@ -2,11 +2,9 @@ package sync import ( "context" - "errors" "fmt" "reflect" "runtime/debug" - "slices" "strings" "time" @@ -21,7 +19,6 @@ import ( fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams" "github.com/OffchainLabs/prysm/v6/config/params" "github.com/OffchainLabs/prysm/v6/consensus-types/primitives" - "github.com/OffchainLabs/prysm/v6/container/slice" "github.com/OffchainLabs/prysm/v6/monitoring/tracing" "github.com/OffchainLabs/prysm/v6/monitoring/tracing/trace" "github.com/OffchainLabs/prysm/v6/network/forks" @@ -32,17 +29,50 @@ import ( pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" + "github.com/pkg/errors" "github.com/sirupsen/logrus" "google.golang.org/protobuf/proto" ) const pubsubMessageTimeout = 30 * time.Second -// wrappedVal represents a gossip validator which also returns an error along with the result. -type wrappedVal func(context.Context, peer.ID, *pubsub.Message) (pubsub.ValidationResult, error) +type ( + // wrappedVal represents a gossip validator which also returns an error along with the result. + wrappedVal func(context.Context, peer.ID, *pubsub.Message) (pubsub.ValidationResult, error) -// subHandler represents handler for a given subscription. -type subHandler func(context.Context, proto.Message) error + // subHandler represents handler for a given subscription. + subHandler func(context.Context, proto.Message) error + + // parameters used for the `subscribeWithParameters` function. + subscribeParameters struct { + topicFormat string + validate wrappedVal + handle subHandler + digest [4]byte + + // getSubnetsToJoin is a function that returns all subnets the node should join. + getSubnetsToJoin func(currentSlot primitives.Slot) map[uint64]bool + + // getSubnetsRequiringPeers is a function that returns all subnets that require peers to be found + // but for which no subscriptions are needed. + getSubnetsRequiringPeers func(currentSlot primitives.Slot) map[uint64]bool + } + + // parameters used for the `subscribeToSubnets` function. + subscribeToSubnetsParameters struct { + subscriptionBySubnet map[uint64]*pubsub.Subscription + topicFormat string + digest [4]byte + genesisValidatorsRoot [fieldparams.RootLength]byte + genesisTime time.Time + currentSlot primitives.Slot + validate wrappedVal + handle subHandler + getSubnetsToJoin func(currentSlot primitives.Slot) map[uint64]bool + } +) + +var errInvalidDigest = errors.New("invalid digest") // noopValidator is a no-op that only decodes the message, but does not check its contents. func (s *Service) noopValidator(_ context.Context, _ peer.ID, msg *pubsub.Message) (pubsub.ValidationResult, error) { @@ -55,28 +85,36 @@ func (s *Service) noopValidator(_ context.Context, _ peer.ID, msg *pubsub.Messag return pubsub.ValidationAccept, nil } -func sliceFromCount(count uint64) []uint64 { - result := make([]uint64, 0, count) - +func mapFromCount(count uint64) map[uint64]bool { + result := make(map[uint64]bool, count) for item := range count { - result = append(result, item) + result[item] = true } return result } -func (s *Service) activeSyncSubnetIndices(currentSlot primitives.Slot) []uint64 { - if flags.Get().SubscribeToAllSubnets { - return sliceFromCount(params.BeaconConfig().SyncCommitteeSubnetCount) +func mapFromSlice(slices ...[]uint64) map[uint64]bool { + result := make(map[uint64]bool) + + for _, slice := range slices { + for _, item := range slice { + result[item] = true + } + } + + return result +} + +func (s *Service) activeSyncSubnetIndices(currentSlot primitives.Slot) map[uint64]bool { + if flags.Get().SubscribeToAllSubnets { + return mapFromCount(params.BeaconConfig().SyncCommitteeSubnetCount) } - // Get the current epoch. currentEpoch := slots.ToEpoch(currentSlot) + subscriptions := cache.SyncSubnetIDs.GetAllSubnets(currentEpoch) - // Retrieve the subnets we want to subscribe to. - subs := cache.SyncSubnetIDs.GetAllSubnets(currentEpoch) - - return slice.SetUint64(subs) + return mapFromSlice(subscriptions) } // Register PubSub subscribers @@ -111,14 +149,14 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) { s.attesterSlashingSubscriber, digest, ) - s.subscribeWithParameters( - p2p.AttestationSubnetTopicFormat, - s.validateCommitteeIndexBeaconAttestation, - s.committeeIndexBeaconAttestationSubscriber, - digest, - s.persistentAndAggregatorSubnetIndices, - s.attesterSubnetIndices, - ) + s.subscribeWithParameters(subscribeParameters{ + topicFormat: p2p.AttestationSubnetTopicFormat, + validate: s.validateCommitteeIndexBeaconAttestation, + handle: s.committeeIndexBeaconAttestationSubscriber, + digest: digest, + getSubnetsToJoin: s.persistentAndAggregatorSubnetIndices, + getSubnetsRequiringPeers: attesterSubnetIndices, + }) // New gossip topic in Altair if params.BeaconConfig().AltairForkEpoch <= epoch { @@ -128,14 +166,15 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) { s.syncContributionAndProofSubscriber, digest, ) - s.subscribeWithParameters( - p2p.SyncCommitteeSubnetTopicFormat, - s.validateSyncCommitteeMessage, - s.syncCommitteeMessageSubscriber, - digest, - s.activeSyncSubnetIndices, - func(currentSlot primitives.Slot) []uint64 { return []uint64{} }, - ) + + s.subscribeWithParameters(subscribeParameters{ + topicFormat: p2p.SyncCommitteeSubnetTopicFormat, + validate: s.validateSyncCommitteeMessage, + handle: s.syncCommitteeMessageSubscriber, + digest: digest, + getSubnetsToJoin: s.activeSyncSubnetIndices, + }) + if features.Get().EnableLightClient { s.subscribe( p2p.LightClientOptimisticUpdateTopicFormat, @@ -164,42 +203,39 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) { // New gossip topic in Deneb, removed in Electra if params.BeaconConfig().DenebForkEpoch <= epoch && epoch < params.BeaconConfig().ElectraForkEpoch { - s.subscribeWithParameters( - p2p.BlobSubnetTopicFormat, - s.validateBlob, - s.blobSubscriber, - digest, - func(currentSlot primitives.Slot) []uint64 { - return sliceFromCount(params.BeaconConfig().BlobsidecarSubnetCount) + s.subscribeWithParameters(subscribeParameters{ + topicFormat: p2p.BlobSubnetTopicFormat, + validate: s.validateBlob, + handle: s.blobSubscriber, + digest: digest, + getSubnetsToJoin: func(primitives.Slot) map[uint64]bool { + return mapFromCount(params.BeaconConfig().BlobsidecarSubnetCount) }, - func(currentSlot primitives.Slot) []uint64 { return []uint64{} }, - ) + }) } // New gossip topic in Electra, removed in Fulu if params.BeaconConfig().ElectraForkEpoch <= epoch && epoch < params.BeaconConfig().FuluForkEpoch { - s.subscribeWithParameters( - p2p.BlobSubnetTopicFormat, - s.validateBlob, - s.blobSubscriber, - digest, - func(currentSlot primitives.Slot) []uint64 { - return sliceFromCount(params.BeaconConfig().BlobsidecarSubnetCountElectra) + s.subscribeWithParameters(subscribeParameters{ + topicFormat: p2p.BlobSubnetTopicFormat, + validate: s.validateBlob, + handle: s.blobSubscriber, + digest: digest, + getSubnetsToJoin: func(currentSlot primitives.Slot) map[uint64]bool { + return mapFromCount(params.BeaconConfig().BlobsidecarSubnetCountElectra) }, - func(currentSlot primitives.Slot) []uint64 { return []uint64{} }, - ) + }) } // New gossip topic in Fulu. if params.BeaconConfig().FuluForkEpoch <= epoch { - s.subscribeWithParameters( - p2p.DataColumnSubnetTopicFormat, - s.validateDataColumn, - s.dataColumnSubscriber, - digest, - s.dataColumnSubnetIndices, - func(currentSlot primitives.Slot) []uint64 { return []uint64{} }, - ) + s.subscribeWithParameters(subscribeParameters{ + topicFormat: p2p.DataColumnSubnetTopicFormat, + validate: s.validateDataColumn, + handle: s.dataColumnSubscriber, + digest: digest, + getSubnetsToJoin: s.dataColumnSubnetIndices, + }) } } @@ -379,197 +415,185 @@ func (s *Service) wrapAndReportValidation(topic string, v wrappedVal) (string, p } } -// pruneSubscriptions unsubscribe from topics we are currently subscribed to but that are +// pruneSubscriptions unsubscribes from topics we are currently subscribed to but that are // not in the list of wanted subnets. +// This function mutates the `subscriptionBySubnet` map, which is used to keep track of the current subscriptions. func (s *Service) pruneSubscriptions( - subscriptions map[uint64]*pubsub.Subscription, - wantedSubs []uint64, + subscriptionBySubnet map[uint64]*pubsub.Subscription, + wantedSubnets map[uint64]bool, topicFormat string, digest [4]byte, ) { - for k, v := range subscriptions { - var wanted bool - for _, idx := range wantedSubs { - if k == idx { - wanted = true - break - } - } - - if !wanted && v != nil { - v.Cancel() - fullTopic := fmt.Sprintf(topicFormat, digest, k) + s.cfg.p2p.Encoding().ProtocolSuffix() - s.unSubscribeFromTopic(fullTopic) - delete(subscriptions, k) - } - } -} - -// searchForPeers searches for peers in the given subnets. -func (s *Service) searchForPeers( - ctx context.Context, - topicFormat string, - digest [4]byte, - currentSlot primitives.Slot, - getSubnetsToSubscribe func(currentSlot primitives.Slot) []uint64, - getSubnetsToFindPeersOnly func(currentSlot primitives.Slot) []uint64, -) { - // Retrieve the subnets we want to subscribe to. - subnetsToSubscribeIndex := getSubnetsToSubscribe(currentSlot) - - // Retrieve the subnets we want to find peers for. - subnetsToFindPeersOnlyIndex := getSubnetsToFindPeersOnly(currentSlot) - - // Combine the subnets to subscribe and the subnets to find peers for. - subnetsToFindPeersIndex := slice.SetUint64(append(subnetsToSubscribeIndex, subnetsToFindPeersOnlyIndex...)) - - // Find new peers for wanted subnets if needed. - for _, subnetIndex := range subnetsToFindPeersIndex { - topic := fmt.Sprintf(topicFormat, digest, subnetIndex) - - // Check if we have enough peers in the subnet. Skip if we do. - if s.enoughPeersAreConnected(topic) { + for subnet, subscription := range subscriptionBySubnet { + if subscription == nil { + // Should not happen, but just in case. + delete(subscriptionBySubnet, subnet) continue } - // Not enough peers in the subnet, we need to search for more. - _, err := s.cfg.p2p.FindPeersWithSubnet(ctx, topic, subnetIndex, flags.Get().MinimumPeersPerSubnet) - if err != nil { - log.WithError(err).Debug("Could not search for peers") + if wantedSubnets[subnet] { + // Nothing to prune. + continue } + + // We are subscribed to a subnet that is no longer wanted. + subscription.Cancel() + fullTopic := fmt.Sprintf(topicFormat, digest, subnet) + s.cfg.p2p.Encoding().ProtocolSuffix() + s.unSubscribeFromTopic(fullTopic) + delete(subscriptionBySubnet, subnet) } } -// subscribeToSubnets subscribes to needed subnets, unsubscribe from unneeded ones and search for more peers if needed. -// Returns `true` if the digest is valid (wrt. the current epoch), `false` otherwise. -func (s *Service) subscribeToSubnets( - topicFormat string, - digest [4]byte, - genesisValidatorsRoot [fieldparams.RootLength]byte, - genesisTime time.Time, - subscriptions map[uint64]*pubsub.Subscription, - currentSlot primitives.Slot, - validate wrappedVal, - handle subHandler, - getSubnetsToSubscribe func(currentSlot primitives.Slot) []uint64, -) bool { +// subscribeToSubnets subscribes to needed subnets and unsubscribe from unneeded ones. +// This functions mutates the `subscriptionBySubnet` map, which is used to keep track of the current subscriptions. +func (s *Service) subscribeToSubnets(p subscribeToSubnetsParameters) error { // Do not subscribe if not synced. if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() { - return true + return nil } // Check the validity of the digest. - valid, err := isDigestValid(digest, genesisTime, genesisValidatorsRoot) + valid, err := isDigestValid(p.digest, p.genesisTime, p.genesisValidatorsRoot) if err != nil { - log.Error(err) - return true + return errors.Wrap(err, "is digest valid") } - // Unsubscribe from all subnets if the digest is not valid. It's likely to be the case after a hard fork. + // Unsubscribe from all subnets if digest is not valid. It's likely to be the case after a hard fork. if !valid { - description := topicFormat - if pos := strings.LastIndex(topicFormat, "/"); pos != -1 { - description = topicFormat[pos+1:] - } - - if pos := strings.LastIndex(description, "_"); pos != -1 { - description = description[:pos] - } - - log.WithFields(logrus.Fields{ - "digest": fmt.Sprintf("%#x", digest), - "subnets": description, - }).Debug("Subnets with this digest are no longer valid, unsubscribing from all of them") - s.pruneSubscriptions(subscriptions, []uint64{}, topicFormat, digest) - return false + wantedSubnets := map[uint64]bool{} + s.pruneSubscriptions(p.subscriptionBySubnet, wantedSubnets, p.topicFormat, p.digest) + return errInvalidDigest } - // Retrieve the subnets we want to subscribe to. - subnetsToSubscribeIndex := getSubnetsToSubscribe(currentSlot) + // Retrieve the subnets we want to join. + subnetsToJoin := p.getSubnetsToJoin(p.currentSlot) // Remove subscriptions that are no longer wanted. - s.pruneSubscriptions(subscriptions, subnetsToSubscribeIndex, topicFormat, digest) + s.pruneSubscriptions(p.subscriptionBySubnet, subnetsToJoin, p.topicFormat, p.digest) - // Subscribe to wanted subnets. - for _, subnetIndex := range subnetsToSubscribeIndex { - subnetTopic := fmt.Sprintf(topicFormat, digest, subnetIndex) + // Subscribe to wanted and not already registered subnets. + for subnet := range subnetsToJoin { + subnetTopic := fmt.Sprintf(p.topicFormat, p.digest, subnet) - // Check if subscription exists. - if _, exists := subscriptions[subnetIndex]; exists { - continue + if _, exists := p.subscriptionBySubnet[subnet]; !exists { + subscription := s.subscribeWithBase(subnetTopic, p.validate, p.handle) + p.subscriptionBySubnet[subnet] = subscription } - - // We need to subscribe to the subnet. - subscription := s.subscribeWithBase(subnetTopic, validate, handle) - subscriptions[subnetIndex] = subscription } - return true + + return nil } // subscribeWithParameters subscribes to a list of subnets. -func (s *Service) subscribeWithParameters( - topicFormat string, - validate wrappedVal, - handle subHandler, - digest [4]byte, - getSubnetsToSubscribe func(currentSlot primitives.Slot) []uint64, - getSubnetsToFindPeersOnly func(currentSlot primitives.Slot) []uint64, -) { - // Initialize the subscriptions map. - subscriptions := make(map[uint64]*pubsub.Subscription) - - // Retrieve the genesis validators root. +func (s *Service) subscribeWithParameters(p subscribeParameters) { + minimumPeersPerSubnet := flags.Get().MinimumPeersPerSubnet + subscriptionBySubnet := make(map[uint64]*pubsub.Subscription) genesisValidatorsRoot := s.cfg.clock.GenesisValidatorsRoot() - - // Retrieve the epoch of the fork corresponding to the digest. - _, epoch, err := forks.RetrieveForkDataFromDigest(digest, genesisValidatorsRoot[:]) - if err != nil { - panic(err) // lint:nopanic -- Impossible condition. - } - - // Retrieve the base protobuf message. - base := p2p.GossipTopicMappings(topicFormat, epoch) - if base == nil { - panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topicFormat)) // lint:nopanic -- Impossible condition. - } - - // Retrieve the genesis time. genesisTime := s.cfg.clock.GenesisTime() - - // Define a ticker ticking every slot. secondsPerSlot := params.BeaconConfig().SecondsPerSlot - ticker := slots.NewSlotTicker(genesisTime, secondsPerSlot) - - // Retrieve the current slot. + secondsPerSlotDuration := time.Duration(secondsPerSlot) * time.Second currentSlot := s.cfg.clock.CurrentSlot() + neededSubnets := computeAllNeededSubnets(currentSlot, p.getSubnetsToJoin, p.getSubnetsRequiringPeers) - // Subscribe to subnets. - s.subscribeToSubnets(topicFormat, digest, genesisValidatorsRoot, genesisTime, subscriptions, currentSlot, validate, handle, getSubnetsToSubscribe) + shortTopicFormat := p.topicFormat + shortTopicFormatLen := len(shortTopicFormat) + if shortTopicFormatLen >= 3 && shortTopicFormat[shortTopicFormatLen-3:] == "_%d" { + shortTopicFormat = shortTopicFormat[:shortTopicFormatLen-3] + } - // Derive a new context and cancel function. - ctx, cancel := context.WithCancel(s.ctx) + shortTopic := fmt.Sprintf(shortTopicFormat, p.digest) + parameters := subscribeToSubnetsParameters{ + subscriptionBySubnet: subscriptionBySubnet, + topicFormat: p.topicFormat, + digest: p.digest, + genesisValidatorsRoot: genesisValidatorsRoot, + genesisTime: genesisTime, + currentSlot: currentSlot, + validate: p.validate, + handle: p.handle, + getSubnetsToJoin: p.getSubnetsToJoin, + } + + err := s.subscribeToSubnets(parameters) + if err != nil { + log.WithError(err).Error("Could not subscribe to subnets") + } + + // Subscribe to expected subnets and search for peers if needed at every slot. go func() { - // Search for peers. - s.searchForPeers(ctx, topicFormat, digest, currentSlot, getSubnetsToSubscribe, getSubnetsToFindPeersOnly) + func() { + ctx, cancel := context.WithTimeout(s.ctx, secondsPerSlotDuration) + defer cancel() + + if err := s.cfg.p2p.FindAndDialPeersWithSubnets(ctx, p.topicFormat, p.digest, minimumPeersPerSubnet, neededSubnets); err != nil && !errors.Is(err, context.DeadlineExceeded) { + log.WithError(err).Debug("Could not find peers with subnets") + } + }() + + slotTicker := slots.NewSlotTicker(genesisTime, secondsPerSlot) + defer slotTicker.Done() for { select { - case currentSlot := <-ticker.C(): - isDigestValid := s.subscribeToSubnets(topicFormat, digest, genesisValidatorsRoot, genesisTime, subscriptions, currentSlot, validate, handle, getSubnetsToSubscribe) + case currentSlot := <-slotTicker.C(): + parameters.currentSlot = currentSlot + if err := s.subscribeToSubnets(parameters); err != nil { + if errors.Is(err, errInvalidDigest) { + log.WithField("topics", shortTopic).Debug("Digest is invalid, stopping subscription") + return + } - // Stop the ticker if the digest is not valid. Likely to happen after a hard fork. - if !isDigestValid { - ticker.Done() - return + log.WithError(err).Error("Could not subscribe to subnets") + continue } - // Search for peers. - s.searchForPeers(ctx, topicFormat, digest, currentSlot, getSubnetsToSubscribe, getSubnetsToFindPeersOnly) + func() { + ctx, cancel := context.WithTimeout(s.ctx, secondsPerSlotDuration) + defer cancel() + + if err := s.cfg.p2p.FindAndDialPeersWithSubnets(ctx, p.topicFormat, p.digest, minimumPeersPerSubnet, neededSubnets); err != nil && !errors.Is(err, context.DeadlineExceeded) { + log.WithError(err).Debug("Could not find peers with subnets") + } + }() + + case <-s.ctx.Done(): + return + } + } + }() + + // Warn the user if we are not subscribed to enough peers in the subnets. + go func() { + log := log.WithField("minimum", minimumPeersPerSubnet) + logTicker := time.NewTicker(5 * time.Minute) + defer logTicker.Stop() + + for { + select { + case <-logTicker.C: + subnetsToFindPeersIndex := computeAllNeededSubnets(currentSlot, p.getSubnetsToJoin, p.getSubnetsRequiringPeers) + + isSubnetWithMissingPeers := false + // Find new peers for wanted subnets if needed. + for index := range subnetsToFindPeersIndex { + topic := fmt.Sprintf(p.topicFormat, p.digest, index) + + // Check if we have enough peers in the subnet. Skip if we do. + if count := s.connectedPeersCount(topic); count < minimumPeersPerSubnet { + isSubnetWithMissingPeers = true + log.WithFields(logrus.Fields{ + "topic": topic, + "actual": count, + }).Warning("Not enough connected peers") + } + } + + if !isSubnetWithMissingPeers { + log.WithField("topic", shortTopic).Info("All subnets have enough connected peers") + } case <-s.ctx.Done(): - cancel() - ticker.Done() return } } @@ -591,76 +615,82 @@ func (s *Service) unSubscribeFromTopic(topic string) { } } -// enoughPeersAreConnected checks if we have enough peers which are subscribed to the same subnet. -func (s *Service) enoughPeersAreConnected(subnetTopic string) bool { +// connectedPeersCount counts how many peer for a given topic are connected to the node. +func (s *Service) connectedPeersCount(subnetTopic string) int { topic := subnetTopic + s.cfg.p2p.Encoding().ProtocolSuffix() - threshold := flags.Get().MinimumPeersPerSubnet - peersWithSubnet := s.cfg.p2p.PubSub().ListPeers(topic) - peersWithSubnetCount := len(peersWithSubnet) - - return peersWithSubnetCount >= threshold + return len(peersWithSubnet) } -func (s *Service) dataColumnSubnetIndices(_ primitives.Slot) []uint64 { +func (s *Service) dataColumnSubnetIndices(primitives.Slot) map[uint64]bool { nodeID := s.cfg.p2p.NodeID() custodyGroupCount := s.cfg.custodyInfo.CustodyGroupSamplingSize(peerdas.Target) nodeInfo, _, err := peerdas.Info(nodeID, custodyGroupCount) if err != nil { log.WithError(err).Error("Could not retrieve peer info") - return []uint64{} + return nil } - return sliceFromMap(nodeInfo.DataColumnsSubnets, true /*sorted*/) + return nodeInfo.DataColumnsSubnets } -func (s *Service) persistentAndAggregatorSubnetIndices(currentSlot primitives.Slot) []uint64 { +func (s *Service) persistentAndAggregatorSubnetIndices(currentSlot primitives.Slot) map[uint64]bool { if flags.Get().SubscribeToAllSubnets { - return sliceFromCount(params.BeaconConfig().AttestationSubnetCount) + return mapFromCount(params.BeaconConfig().AttestationSubnetCount) } - persistentSubnetIndices := s.persistentSubnetIndices() - aggregatorSubnetIndices := s.aggregatorSubnetIndices(currentSlot) + persistentSubnetIndices := persistentSubnetIndices() + aggregatorSubnetIndices := aggregatorSubnetIndices(currentSlot) // Combine subscriptions to get all requested subscriptions. - return slice.SetUint64(append(persistentSubnetIndices, aggregatorSubnetIndices...)) + return mapFromSlice(persistentSubnetIndices, aggregatorSubnetIndices) } // filters out required peers for the node to function, not // pruning peers who are in our attestation subnets. func (s *Service) filterNeededPeers(pids []peer.ID) []peer.ID { + minimumPeersPerSubnet := flags.Get().MinimumPeersPerSubnet + currentSlot := s.cfg.clock.CurrentSlot() + // Exit early if nothing to filter. if len(pids) == 0 { return pids } + digest, err := s.currentForkDigest() if err != nil { log.WithError(err).Error("Could not compute fork digest") return pids } - currSlot := s.cfg.clock.CurrentSlot() - wantedSubs := s.persistentAndAggregatorSubnetIndices(currSlot) - wantedSubs = slice.SetUint64(append(wantedSubs, s.attesterSubnetIndices(currSlot)...)) + + wantedSubnets := make(map[uint64]bool) + for subnet := range s.persistentAndAggregatorSubnetIndices(currentSlot) { + wantedSubnets[subnet] = true + } + + for subnet := range attesterSubnetIndices(currentSlot) { + wantedSubnets[subnet] = true + } + topic := p2p.GossipTypeMapping[reflect.TypeOf(ðpb.Attestation{})] // Map of peers in subnets peerMap := make(map[peer.ID]bool) - - for _, sub := range wantedSubs { - subnetTopic := fmt.Sprintf(topic, digest, sub) + s.cfg.p2p.Encoding().ProtocolSuffix() - ps := s.cfg.p2p.PubSub().ListPeers(subnetTopic) - if len(ps) > flags.Get().MinimumPeersPerSubnet { + for subnet := range wantedSubnets { + subnetTopic := fmt.Sprintf(topic, digest, subnet) + s.cfg.p2p.Encoding().ProtocolSuffix() + peers := s.cfg.p2p.PubSub().ListPeers(subnetTopic) + if len(peers) > minimumPeersPerSubnet { // In the event we have more than the minimum, we can // mark the remaining as viable for pruning. - ps = ps[:flags.Get().MinimumPeersPerSubnet] + peers = peers[:minimumPeersPerSubnet] } + // Add peer to peer map. - for _, p := range ps { - // Even if the peer id has - // already been seen we still set - // it, as the outcome is the same. - peerMap[p] = true + for _, peer := range peers { + // Even if the peer ID has already been seen we still set it, + // as the outcome is the same. + peerMap[peer] = true } } @@ -716,6 +746,34 @@ func isDigestValid(digest [4]byte, genesis time.Time, genValRoot [32]byte) (bool return retDigest == digest, nil } +// computeAllNeededSubnets computes the subnets we want to join +// and the subnets for which we want to find peers. +func computeAllNeededSubnets( + currentSlot primitives.Slot, + getSubnetsToJoin func(currentSlot primitives.Slot) map[uint64]bool, + getSubnetsRequiringPeers func(currentSlot primitives.Slot) map[uint64]bool, +) map[uint64]bool { + // Retrieve the subnets we want to join. + subnetsToJoin := getSubnetsToJoin(currentSlot) + + // Retrieve the subnets we want to find peers into. + subnetsRequiringPeers := make(map[uint64]bool) + if getSubnetsRequiringPeers != nil { + subnetsRequiringPeers = getSubnetsRequiringPeers(currentSlot) + } + + // Combine the two maps to get all needed subnets. + neededSubnets := make(map[uint64]bool, len(subnetsToJoin)+len(subnetsRequiringPeers)) + for subnet := range subnetsToJoin { + neededSubnets[subnet] = true + } + for subnet := range subnetsRequiringPeers { + neededSubnets[subnet] = true + } + + return neededSubnets +} + func agentString(pid peer.ID, hst host.Host) string { rawVersion, storeErr := hst.Peerstore().Get(pid, "AgentVersion") agString, ok := rawVersion.(string) @@ -742,17 +800,3 @@ func errorIsIgnored(err error) bool { } return false } - -// sliceFromMap returns a sorted list of keys from a map. -func sliceFromMap(m map[uint64]bool, sorted ...bool) []uint64 { - result := make([]uint64, 0, len(m)) - for k := range m { - result = append(result, k) - } - - if len(sorted) > 0 && sorted[0] { - slices.Sort(result) - } - - return result -} diff --git a/beacon-chain/sync/subscriber_beacon_attestation.go b/beacon-chain/sync/subscriber_beacon_attestation.go index 8fb2d588fa..d5bd9158b9 100644 --- a/beacon-chain/sync/subscriber_beacon_attestation.go +++ b/beacon-chain/sync/subscriber_beacon_attestation.go @@ -35,11 +35,11 @@ func (s *Service) committeeIndexBeaconAttestationSubscriber(_ context.Context, m } } -func (*Service) persistentSubnetIndices() []uint64 { +func persistentSubnetIndices() []uint64 { return cache.SubnetIDs.GetAllSubnets() } -func (*Service) aggregatorSubnetIndices(currentSlot primitives.Slot) []uint64 { +func aggregatorSubnetIndices(currentSlot primitives.Slot) []uint64 { endEpoch := slots.ToEpoch(currentSlot) + 1 endSlot := params.BeaconConfig().SlotsPerEpoch.Mul(uint64(endEpoch)) var commIds []uint64 @@ -49,12 +49,16 @@ func (*Service) aggregatorSubnetIndices(currentSlot primitives.Slot) []uint64 { return slice.SetUint64(commIds) } -func (*Service) attesterSubnetIndices(currentSlot primitives.Slot) []uint64 { +func attesterSubnetIndices(currentSlot primitives.Slot) map[uint64]bool { endEpoch := slots.ToEpoch(currentSlot) + 1 endSlot := params.BeaconConfig().SlotsPerEpoch.Mul(uint64(endEpoch)) - var commIds []uint64 + + subnets := make(map[uint64]bool, int(endSlot-currentSlot+1)) for i := currentSlot; i <= endSlot; i++ { - commIds = append(commIds, cache.SubnetIDs.GetAttesterSubnetIDs(i)...) + for _, subnetId := range cache.SubnetIDs.GetAttesterSubnetIDs(i) { + subnets[subnetId] = true + } } - return slice.SetUint64(commIds) + + return subnets } diff --git a/beacon-chain/sync/subscriber_test.go b/beacon-chain/sync/subscriber_test.go index 04156dc6af..d36996f8d1 100644 --- a/beacon-chain/sync/subscriber_test.go +++ b/beacon-chain/sync/subscriber_test.go @@ -310,7 +310,7 @@ func TestRevalidateSubscription_CorrectlyFormatsTopic(t *testing.T) { subscriptions[2], err = r.cfg.p2p.SubscribeToTopic(fullTopic) require.NoError(t, err) - r.pruneSubscriptions(subscriptions, []uint64{2}, defaultTopic, digest) + r.pruneSubscriptions(subscriptions, map[uint64]bool{2: true}, defaultTopic, digest) require.LogsDoNotContain(t, hook, "Could not unregister topic validator") } @@ -482,8 +482,7 @@ func TestFilterSubnetPeers(t *testing.T) { p2 := createPeer(t, subnet10, subnet20) p3 := createPeer(t) - // Connect to all - // peers. + // Connect to all peers. p.Connect(p1) p.Connect(p2) p.Connect(p3) @@ -540,7 +539,11 @@ func TestSubscribeWithSyncSubnets_DynamicOK(t *testing.T) { cache.SyncSubnetIDs.AddSyncCommitteeSubnets([]byte("pubkey"), currEpoch, []uint64{0, 1}, 10*time.Second) digest, err := r.currentForkDigest() assert.NoError(t, err) - r.subscribeWithParameters(p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest, r.activeSyncSubnetIndices, func(currentSlot primitives.Slot) []uint64 { return []uint64{} }) + r.subscribeWithParameters(subscribeParameters{ + topicFormat: p2p.SyncCommitteeSubnetTopicFormat, + digest: digest, + getSubnetsToJoin: r.activeSyncSubnetIndices, + }) time.Sleep(2 * time.Second) assert.Equal(t, 2, len(r.cfg.p2p.PubSub().GetTopics())) topicMap := map[string]bool{} @@ -589,7 +592,11 @@ func TestSubscribeWithSyncSubnets_DynamicSwitchFork(t *testing.T) { digest, err := signing.ComputeForkDigest(params.BeaconConfig().GenesisForkVersion, genRoot[:]) assert.NoError(t, err) - r.subscribeWithParameters(p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest, r.activeSyncSubnetIndices, func(currentSlot primitives.Slot) []uint64 { return []uint64{} }) + r.subscribeWithParameters(subscribeParameters{ + topicFormat: p2p.SyncCommitteeSubnetTopicFormat, + digest: digest, + getSubnetsToJoin: r.activeSyncSubnetIndices, + }) time.Sleep(2 * time.Second) assert.Equal(t, 2, len(r.cfg.p2p.PubSub().GetTopics())) topicMap := map[string]bool{} diff --git a/changelog/manu-subscriptions.md b/changelog/manu-subscriptions.md new file mode 100644 index 0000000000..45931a0065 --- /dev/null +++ b/changelog/manu-subscriptions.md @@ -0,0 +1,2 @@ +### Fixed +- Subnets subscription: Avoid dynamic subscribing blocking in case not enough peers per subnets are found. \ No newline at end of file diff --git a/config/params/config.go b/config/params/config.go index 1600c60629..ff777832f4 100644 --- a/config/params/config.go +++ b/config/params/config.go @@ -427,6 +427,7 @@ func (b *BeaconChainConfig) TargetBlobsPerBlock(slot primitives.Slot) int { return b.DeprecatedMaxBlobsPerBlock / 2 } +// MaxBlobsPerBlock returns the maximum number of blobs per block for the given slot. func (b *BeaconChainConfig) MaxBlobsPerBlock(slot primitives.Slot) int { epoch := primitives.Epoch(slot.DivSlot(b.SlotsPerEpoch)) @@ -449,6 +450,7 @@ func (b *BeaconChainConfig) MaxBlobsPerBlock(slot primitives.Slot) int { if epoch >= b.ElectraForkEpoch { return b.DeprecatedMaxBlobsPerBlockElectra } + return b.DeprecatedMaxBlobsPerBlock }