diff --git a/beacon-chain/blockchain/setup_test.go b/beacon-chain/blockchain/setup_test.go index 70645d89d7..c953695453 100644 --- a/beacon-chain/blockchain/setup_test.go +++ b/beacon-chain/blockchain/setup_test.go @@ -87,7 +87,7 @@ func (mb *mockBroadcaster) BroadcastLightClientFinalityUpdate(_ context.Context, return nil } -func (mb *mockBroadcaster) BroadcastDataColumn(_ [fieldparams.RootLength]byte, _ uint64, _ *ethpb.DataColumnSidecar, _ ...chan<- bool) error { +func (mb *mockBroadcaster) BroadcastDataColumn(_ [fieldparams.RootLength]byte, _ uint64, _ *ethpb.DataColumnSidecar) error { mb.broadcastCalled = true return nil } diff --git a/beacon-chain/builder/service.go b/beacon-chain/builder/service.go index c7f9a06d04..7cb86d60c6 100644 --- a/beacon-chain/builder/service.go +++ b/beacon-chain/builder/service.go @@ -68,7 +68,7 @@ func NewService(ctx context.Context, opts ...Option) (*Service, error) { log.WithError(err).Error("Failed to check builder status") } else { log.WithField("endpoint", s.c.NodeURL()).Info("Builder has been configured") - log.Warn("Outsourcing block construction to external builders adds non-trivial delay to block propagation time. " + + log.Warn("Outsourcing block construction to external builders adds non-trivial delay to block propagation time. " + "Builder-constructed blocks or fallback blocks may get orphaned. Use at your own risk!") } } diff --git a/beacon-chain/p2p/broadcaster.go b/beacon-chain/p2p/broadcaster.go index dc12bdbefe..e0faf2e415 100644 --- a/beacon-chain/p2p/broadcaster.go +++ b/beacon-chain/p2p/broadcaster.go @@ -24,6 +24,8 @@ import ( "google.golang.org/protobuf/proto" ) +const minimumPeersPerSubnetForBroadcast = 1 + // ErrMessageNotMapped occurs on a Broadcast attempt when a message has not been defined in the // GossipTypeMapping. var ErrMessageNotMapped = errors.New("message type is not mapped to a PubSub topic") @@ -124,15 +126,13 @@ func (s *Service) internalBroadcastAttestation(ctx context.Context, subnet uint6 if err := func() error { s.subnetLocker(subnet).Lock() defer s.subnetLocker(subnet).Unlock() - ok, err := s.FindPeersWithSubnet(ctx, attestationToTopic(subnet, forkDigest), subnet, 1) - if err != nil { - return err + + if err := s.FindAndDialPeersWithSubnets(ctx, AttestationSubnetTopicFormat, forkDigest, minimumPeersPerSubnetForBroadcast, map[uint64]bool{subnet: true}); err != nil { + return errors.Wrap(err, "find peers with subnets") } - if ok { - savedAttestationBroadcasts.Inc() - return nil - } - return errors.New("failed to find peers for subnet") + + savedAttestationBroadcasts.Inc() + return nil }(); err != nil { log.WithError(err).Error("Failed to find peers") tracing.AnnotateError(span, err) @@ -183,15 +183,12 @@ func (s *Service) broadcastSyncCommittee(ctx context.Context, subnet uint64, sMs if err := func() error { s.subnetLocker(wrappedSubIdx).Lock() defer s.subnetLocker(wrappedSubIdx).Unlock() - ok, err := s.FindPeersWithSubnet(ctx, syncCommitteeToTopic(subnet, forkDigest), subnet, 1) - if err != nil { - return err + if err := s.FindAndDialPeersWithSubnets(ctx, SyncCommitteeSubnetTopicFormat, forkDigest, minimumPeersPerSubnetForBroadcast, map[uint64]bool{subnet: true}); err != nil { + return errors.Wrap(err, "find peers with subnets") } - if ok { - savedSyncCommitteeBroadcasts.Inc() - return nil - } - return errors.New("failed to find peers for subnet") + + savedSyncCommitteeBroadcasts.Inc() + return nil }(); err != nil { log.WithError(err).Error("Failed to find peers") tracing.AnnotateError(span, err) @@ -250,15 +247,13 @@ func (s *Service) internalBroadcastBlob(ctx context.Context, subnet uint64, blob if err := func() error { s.subnetLocker(wrappedSubIdx).Lock() defer s.subnetLocker(wrappedSubIdx).Unlock() - ok, err := s.FindPeersWithSubnet(ctx, blobSubnetToTopic(subnet, forkDigest), subnet, 1) - if err != nil { - return err + + if err := s.FindAndDialPeersWithSubnets(ctx, BlobSubnetTopicFormat, forkDigest, minimumPeersPerSubnetForBroadcast, map[uint64]bool{subnet: true}); err != nil { + return errors.Wrap(err, "find peers with subnets") } - if ok { - blobSidecarBroadcasts.Inc() - return nil - } - return errors.New("failed to find peers for subnet") + + blobSidecarBroadcasts.Inc() + return nil }(); err != nil { log.WithError(err).Error("Failed to find peers") tracing.AnnotateError(span, err) @@ -329,7 +324,6 @@ func (s *Service) BroadcastDataColumn( root [fieldparams.RootLength]byte, dataColumnSubnet uint64, dataColumnSidecar *ethpb.DataColumnSidecar, - peersCheckedChans ...chan<- bool, // Used for testing purposes to signal when peers are checked. ) error { // Add tracing to the function. ctx, span := trace.StartSpan(s.ctx, "p2p.BroadcastDataColumn") @@ -349,7 +343,7 @@ func (s *Service) BroadcastDataColumn( } // Non-blocking broadcast, with attempts to discover a column subnet peer if none available. - go s.internalBroadcastDataColumn(ctx, root, dataColumnSubnet, dataColumnSidecar, forkDigest, peersCheckedChans) + go s.internalBroadcastDataColumn(ctx, root, dataColumnSubnet, dataColumnSidecar, forkDigest) return nil } @@ -360,7 +354,6 @@ func (s *Service) internalBroadcastDataColumn( columnSubnet uint64, dataColumnSidecar *ethpb.DataColumnSidecar, forkDigest [fieldparams.VersionLength]byte, - peersCheckedChans []chan<- bool, // Used for testing purposes to signal when peers are checked. ) { // Add tracing to the function. _, span := trace.StartSpan(ctx, "p2p.internalBroadcastDataColumn") @@ -382,7 +375,7 @@ func (s *Service) internalBroadcastDataColumn( wrappedSubIdx := columnSubnet + dataColumnSubnetVal // Find peers if needed. - if err := s.findPeersIfNeeded(ctx, wrappedSubIdx, topic, columnSubnet, peersCheckedChans); err != nil { + if err := s.findPeersIfNeeded(ctx, wrappedSubIdx, DataColumnSubnetTopicFormat, forkDigest, columnSubnet); err != nil { log.WithError(err).Error("Failed to find peers for data column subnet") tracing.AnnotateError(span, err) } @@ -416,35 +409,19 @@ func (s *Service) internalBroadcastDataColumn( func (s *Service) findPeersIfNeeded( ctx context.Context, wrappedSubIdx uint64, - topic string, + topicFormat string, + forkDigest [fieldparams.VersionLength]byte, subnet uint64, - peersCheckedChans []chan<- bool, // Used for testing purposes to signal when peers are checked. ) error { + // Sending a data column sidecar to only one peer is not ideal, + // but it ensures at least one peer receives it. s.subnetLocker(wrappedSubIdx).Lock() defer s.subnetLocker(wrappedSubIdx).Unlock() - // Sending a data column sidecar to only one peer is not ideal, - // but it ensures at least one peer receives it. - const peerCount = 1 - - if s.hasPeerWithSubnet(topic) { - // Exit early if we already have peers with this subnet. - return nil - } - - // Used for testing purposes. - if len(peersCheckedChans) > 0 { - peersCheckedChans[0] <- true - } - // No peers found, attempt to find peers with this subnet. - ok, err := s.FindPeersWithSubnet(ctx, topic, subnet, peerCount) - if err != nil { + if err := s.FindAndDialPeersWithSubnets(ctx, topicFormat, forkDigest, minimumPeersPerSubnetForBroadcast, map[uint64]bool{subnet: true}); err != nil { return errors.Wrap(err, "find peers with subnet") } - if !ok { - return errors.Errorf("failed to find peers for topic %s with subnet %d", topic, subnet) - } return nil } diff --git a/beacon-chain/p2p/broadcaster_test.go b/beacon-chain/p2p/broadcaster_test.go index 058b5e489d..4e53bd398f 100644 --- a/beacon-chain/p2p/broadcaster_test.go +++ b/beacon-chain/p2p/broadcaster_test.go @@ -216,9 +216,10 @@ func TestService_BroadcastAttestation(t *testing.T) { } func TestService_BroadcastAttestationWithDiscoveryAttempts(t *testing.T) { + const port = uint(2000) + // Setup bootnode. cfg := &Config{PingInterval: testPingInterval} - port := 2000 cfg.UDPPort = uint(port) _, pkey := createAddrAndPrivKey(t) ipAddr := net.ParseIP("127.0.0.1") @@ -245,7 +246,7 @@ func TestService_BroadcastAttestationWithDiscoveryAttempts(t *testing.T) { PingInterval: testPingInterval, } // Setup 2 different hosts - for i := 1; i <= 2; i++ { + for i := uint(1); i <= 2; i++ { h, pkey, ipAddr := createHost(t, port+i) cfg.UDPPort = uint(port + i) cfg.TCPPort = uint(port + i) @@ -687,7 +688,7 @@ func TestService_BroadcastDataColumn(t *testing.T) { // Create a host. _, pkey, ipAddr := createHost(t, port) - p := &Service{ + service := &Service{ ctx: t.Context(), host: p1.BHost, pubsub: p1.PubSub(), @@ -701,56 +702,44 @@ func TestService_BroadcastDataColumn(t *testing.T) { } // Create a listener. - listener, err := p.startDiscoveryV5(ipAddr, pkey) + listener, err := service.startDiscoveryV5(ipAddr, pkey) require.NoError(t, err) - p.dv5Listener = listener + service.dv5Listener = listener - digest, err := p.currentForkDigest() + digest, err := service.currentForkDigest() require.NoError(t, err) subnet := peerdas.ComputeSubnetForDataColumnSidecar(columnIndex) - topic := fmt.Sprintf(topicFormat, digest, subnet) + topic := fmt.Sprintf(topicFormat, digest, subnet) + service.Encoding().ProtocolSuffix() roSidecars, _ := util.CreateTestVerifiedRoDataColumnSidecars(t, []util.DataColumnParam{{Index: columnIndex}}) sidecar := roSidecars[0].DataColumnSidecar - // Async listen for the pubsub, must be before the broadcast. - var wg sync.WaitGroup - wg.Add(1) - - peersChecked := make(chan bool, 0) - - go func(tt *testing.T) { - defer wg.Done() - - ctx, cancel := context.WithTimeout(t.Context(), 5*time.Second) - defer cancel() - - // Wait for the peers to be checked. - <-peersChecked - - // External peer subscribes to the topic. - topic += p.Encoding().ProtocolSuffix() - sub, err := p2.SubscribeToTopic(topic) - require.NoError(tt, err) - - msg, err := sub.Next(ctx) - require.NoError(tt, err) - - var result ethpb.DataColumnSidecar - require.NoError(tt, p.Encoding().DecodeGossip(msg.Data, &result)) - require.DeepEqual(tt, &result, sidecar) - }(t) - - var emptyRoot [fieldparams.RootLength]byte - // Attempt to broadcast nil object should fail. - err = p.BroadcastDataColumn(emptyRoot, subnet, nil) + var emptyRoot [fieldparams.RootLength]byte + err = service.BroadcastDataColumn(emptyRoot, subnet, nil) require.ErrorContains(t, "attempted to broadcast nil", err) - // Broadcast to peers and wait. - err = p.BroadcastDataColumn(emptyRoot, subnet, sidecar, peersChecked) + // Subscribe to the topic. + sub, err := p2.SubscribeToTopic(topic) require.NoError(t, err) - require.Equal(t, false, util.WaitTimeout(&wg, 1*time.Minute), "Failed to receive pubsub within 1s") + + // libp2p fails without this delay + time.Sleep(50 * time.Millisecond) + + // Broadcast to peers and wait. + err = service.BroadcastDataColumn(emptyRoot, subnet, sidecar) + require.NoError(t, err) + + // Receive the message. + ctx, cancel := context.WithTimeout(t.Context(), 5*time.Second) + defer cancel() + + msg, err := sub.Next(ctx) + require.NoError(t, err) + + var result ethpb.DataColumnSidecar + require.NoError(t, service.Encoding().DecodeGossip(msg.Data, &result)) + require.DeepEqual(t, &result, sidecar) } diff --git a/beacon-chain/p2p/connection_gater.go b/beacon-chain/p2p/connection_gater.go index 8147d07b6c..eb27bff432 100644 --- a/beacon-chain/p2p/connection_gater.go +++ b/beacon-chain/p2p/connection_gater.go @@ -19,8 +19,7 @@ const ( // Burst limit for inbound dials. ipBurst = 8 - // High watermark buffer signifies the buffer till which - // we will handle inbound requests. + // High watermark buffer signifies the buffer till which we will handle inbound requests. highWatermarkBuffer = 20 ) @@ -53,7 +52,7 @@ func (s *Service) InterceptAccept(n network.ConnMultiaddrs) (allow bool) { "reason": "exceeded dial limit"}).Trace("Not accepting inbound dial from ip address") return false } - if s.isPeerAtLimit(true /* inbound */) { + if s.isPeerAtLimit(inbound) { log.WithFields(logrus.Fields{"peer": n.RemoteMultiaddr(), "reason": "at peer limit"}).Trace("Not accepting inbound dial") return false diff --git a/beacon-chain/p2p/discovery.go b/beacon-chain/p2p/discovery.go index 882b531999..2fd791591a 100644 --- a/beacon-chain/p2p/discovery.go +++ b/beacon-chain/p2p/discovery.go @@ -2,7 +2,9 @@ package p2p import ( "bytes" + "context" "crypto/ecdsa" + "math" "net" "sync" "time" @@ -23,45 +25,56 @@ import ( ma "github.com/multiformats/go-multiaddr" "github.com/pkg/errors" "github.com/prysmaticlabs/go-bitfield" - "github.com/sirupsen/logrus" ) -type ListenerRebooter interface { - Listener - RebootListener() error -} +type ( + // ListenerRebooter is an interface that extends the Listener interface + // with the `RebootListener` method. + ListenerRebooter interface { + Listener + RebootListener() error + } -// Listener defines the discovery V5 network interface that is used -// to communicate with other peers. -type Listener interface { - Self() *enode.Node - Close() - Lookup(enode.ID) []*enode.Node - Resolve(*enode.Node) *enode.Node - RandomNodes() enode.Iterator - Ping(*enode.Node) error - RequestENR(*enode.Node) (*enode.Node, error) - LocalNode() *enode.LocalNode -} + // Listener defines the discovery V5 network interface that is used + // to communicate with other peers. + Listener interface { + Self() *enode.Node + Close() + Lookup(enode.ID) []*enode.Node + Resolve(*enode.Node) *enode.Node + RandomNodes() enode.Iterator + Ping(*enode.Node) error + RequestENR(*enode.Node) (*enode.Node, error) + LocalNode() *enode.LocalNode + } -const ( - udp4 = iota - udp6 + quicProtocol uint16 + + listenerWrapper struct { + mu sync.RWMutex + listener *discover.UDPv5 + listenerCreator func() (*discover.UDPv5, error) + } + + connectivityDirection int + udpVersion int ) const quickProtocolEnrKey = "quic" -type quicProtocol uint16 +const ( + udp4 udpVersion = iota + udp6 +) + +const ( + inbound connectivityDirection = iota + all +) // quicProtocol is the "quic" key, which holds the QUIC port of the node. func (quicProtocol) ENRKey() string { return quickProtocolEnrKey } -type listenerWrapper struct { - mu sync.RWMutex - listener *discover.UDPv5 - listenerCreator func() (*discover.UDPv5, error) -} - func newListener(listenerCreator func() (*discover.UDPv5, error)) (*listenerWrapper, error) { rawListener, err := listenerCreator() if err != nil { @@ -276,29 +289,10 @@ func (s *Service) RefreshPersistentSubnets() { // listen for new nodes watches for new nodes in the network and adds them to the peerstore. func (s *Service) listenForNewNodes() { const ( - minLogInterval = 1 * time.Minute thresholdLimit = 5 + searchPeriod = 20 * time.Second ) - peersSummary := func(threshold uint) (uint, uint) { - // Retrieve how many active peers we have. - activePeers := s.Peers().Active() - activePeerCount := uint(len(activePeers)) - - // Compute how many peers we are missing to reach the threshold. - if activePeerCount >= threshold { - return activePeerCount, 0 - } - - missingPeerCount := threshold - activePeerCount - - return activePeerCount, missingPeerCount - } - - var lastLogTime time.Time - - iterator := s.dv5Listener.RandomNodes() - defer iterator.Close() connectivityTicker := time.NewTicker(1 * time.Minute) thresholdCount := 0 @@ -330,74 +324,148 @@ func (s *Service) listenForNewNodes() { continue } - iterator = s.dv5Listener.RandomNodes() thresholdCount = 0 } default: - if s.isPeerAtLimit(false /* inbound */) { - // Pause the main loop for a period to stop looking - // for new peers. + if s.isPeerAtLimit(all) { + // Pause the main loop for a period to stop looking for new peers. log.Trace("Not looking for peers, at peer limit") time.Sleep(pollingPeriod) continue } - // Compute the number of new peers we want to dial. - activePeerCount, missingPeerCount := peersSummary(s.cfg.MaxPeers) - - fields := logrus.Fields{ - "currentPeerCount": activePeerCount, - "targetPeerCount": s.cfg.MaxPeers, + // Return early if the discovery listener isn't set. + if s.dv5Listener == nil { + return } - if missingPeerCount == 0 { - log.Trace("Not looking for peers, at peer limit") - time.Sleep(pollingPeriod) - continue - } + func() { + ctx, cancel := context.WithTimeout(s.ctx, searchPeriod) + defer cancel() - if time.Since(lastLogTime) > minLogInterval { - lastLogTime = time.Now() - log.WithFields(fields).Debug("Searching for new active peers") - } - - // Restrict dials if limit is applied. - if flags.MaxDialIsActive() { - maxConcurrentDials := uint(flags.Get().MaxConcurrentDials) - missingPeerCount = min(missingPeerCount, maxConcurrentDials) - } - - // Search for new peers. - wantedNodes := searchForPeers(iterator, batchPeriod, missingPeerCount, s.filterPeer) - - wg := new(sync.WaitGroup) - for i := 0; i < len(wantedNodes); i++ { - node := wantedNodes[i] - peerInfo, _, err := convertToAddrInfo(node) - if err != nil { - log.WithError(err).Error("Could not convert to peer info") - continue + if err := s.findAndDialPeers(ctx); err != nil && !errors.Is(err, context.DeadlineExceeded) { + log.WithError(err).Error("Failed to find and dial peers") } - - if peerInfo == nil { - continue - } - - // Make sure that peer is not dialed too often, for each connection attempt there's a backoff period. - s.Peers().RandomizeBackOff(peerInfo.ID) - wg.Add(1) - go func(info *peer.AddrInfo) { - if err := s.connectWithPeer(s.ctx, *info); err != nil { - log.WithError(err).Tracef("Could not connect with peer %s", info.String()) - } - wg.Done() - }(peerInfo) - } - wg.Wait() + }() } } } +// FindAndDialPeersWithSubnets ensures that our node is connected to enough peers. +// If, the threshold is met, then this function immediately returns. +// Otherwise, it searches for new peers and dials them. +// If `ctx“ is canceled while searching for peers, search is stopped, but new found peers are still dialed. +// In this case, the function returns an error. +func (s *Service) findAndDialPeers(ctx context.Context) error { + // Restrict dials if limit is applied. + maxConcurrentDials := math.MaxInt + if flags.MaxDialIsActive() { + maxConcurrentDials = flags.Get().MaxConcurrentDials + } + + missingPeerCount := s.missingPeerCount(s.cfg.MaxPeers) + for missingPeerCount > 0 { + // Stop the search/dialing loop if the context is canceled. + if err := ctx.Err(); err != nil { + return err + } + + peersToDial, err := func() ([]*enode.Node, error) { + ctx, cancel := context.WithTimeout(ctx, batchPeriod) + defer cancel() + + peersToDial, err := s.findPeers(ctx, missingPeerCount) + if err != nil && !errors.Is(err, context.DeadlineExceeded) { + return nil, errors.Wrap(err, "find peers") + } + + return peersToDial, nil + }() + + if err != nil { + return err + } + + dialedPeerCount := s.dialPeers(s.ctx, maxConcurrentDials, peersToDial) + + if dialedPeerCount > missingPeerCount { + missingPeerCount = 0 + continue + } + + missingPeerCount -= dialedPeerCount + } + + return nil +} + +// findAndDialPeers finds new peers until `targetPeerCount` is reached, `batchPeriod` is over, +// the peers iterator is exhausted or the context is canceled. +func (s *Service) findPeers(ctx context.Context, missingPeerCount uint) ([]*enode.Node, error) { + // Create an discovery iterator to find new peers. + iterator := s.dv5Listener.RandomNodes() + + // `iterator.Next` can block indefinitely. `iterator.Close` unblocks it. + // So it is important to close the iterator when the context is done to ensure + // that the search does not hang indefinitely. + go func() { + <-ctx.Done() + iterator.Close() + }() + + // Crawl the network for peers subscribed to the defective subnets. + nodeByNodeID := make(map[enode.ID]*enode.Node) + for missingPeerCount > 0 && iterator.Next() { + if ctx.Err() != nil { + peersToDial := make([]*enode.Node, 0, len(nodeByNodeID)) + for _, node := range nodeByNodeID { + peersToDial = append(peersToDial, node) + } + + return peersToDial, ctx.Err() + } + + // Skip peer not matching the filter. + node := iterator.Node() + if !s.filterPeer(node) { + continue + } + + // Remove duplicates, keeping the node with higher seq. + existing, ok := nodeByNodeID[node.ID()] + if ok && existing.Seq() > node.Seq() { + continue + } + nodeByNodeID[node.ID()] = node + + // We found a new peer. Decrease the missing peer count. + missingPeerCount-- + } + + // Convert the map to a slice. + peersToDial := make([]*enode.Node, 0, len(nodeByNodeID)) + for _, node := range nodeByNodeID { + peersToDial = append(peersToDial, node) + } + + return peersToDial, nil +} + +// missingPeerCount computes how many peers we are missing to reach the target peer count. +func (s *Service) missingPeerCount(targetCount uint) uint { + // Retrieve how many active peers we have. + activePeers := s.Peers().Active() + activePeerCount := uint(len(activePeers)) + + // Compute how many peers we are missing to reach the threshold. + missingPeerCount := uint(0) + if targetCount > activePeerCount { + missingPeerCount = targetCount - activePeerCount + } + + return missingPeerCount +} + func (s *Service) createListener( ipAddr net.IP, privKey *ecdsa.PrivateKey, @@ -562,8 +630,7 @@ func (s *Service) startDiscoveryV5( // 2. Peer hasn't been marked as 'bad'. // 3. Peer is not currently active or connected. // 4. Peer is ready to receive incoming connections. -// 5. Peer's fork digest in their ENR matches that of -// our localnodes. +// 5. Peer's fork digest in their ENR matches that of our localnodes. func (s *Service) filterPeer(node *enode.Node) bool { // Ignore nil node entries passed in. if node == nil { @@ -628,22 +695,24 @@ func (s *Service) filterPeer(node *enode.Node) bool { // This checks our set max peers in our config, and // determines whether our currently connected and // active peers are above our set max peer limit. -func (s *Service) isPeerAtLimit(inbound bool) bool { - numOfConns := len(s.host.Network().Peers()) +func (s *Service) isPeerAtLimit(direction connectivityDirection) bool { maxPeers := int(s.cfg.MaxPeers) - // If we are measuring the limit for inbound peers - // we apply the high watermark buffer. - if inbound { + + // If we are measuring the limit for inbound peers we apply the high watermark buffer. + if direction == inbound { maxPeers += highWatermarkBuffer maxInbound := s.peers.InboundLimit() + highWatermarkBuffer - currInbound := len(s.peers.InboundConnected()) - // Exit early if we are at the inbound limit. - if currInbound >= maxInbound { + inboundCount := len(s.peers.InboundConnected()) + + // Return early if we are at the inbound limit. + if inboundCount >= maxInbound { return true } } - activePeers := len(s.Peers().Active()) - return activePeers >= maxPeers || numOfConns >= maxPeers + + peerCount := len(s.host.Network().Peers()) + activePeerCount := len(s.Peers().Active()) + return activePeerCount >= maxPeers || peerCount >= maxPeers } // isBelowOutboundPeerThreshold checks if the number of outbound peers that @@ -901,7 +970,7 @@ func multiAddrFromString(address string) (ma.Multiaddr, error) { return ma.NewMultiaddr(address) } -func udpVersionFromIP(ipAddr net.IP) int { +func udpVersionFromIP(ipAddr net.IP) udpVersion { if ipAddr.To4() != nil { return udp4 } diff --git a/beacon-chain/p2p/discovery_test.go b/beacon-chain/p2p/discovery_test.go index 3e29e69c0d..66e9007b3d 100644 --- a/beacon-chain/p2p/discovery_test.go +++ b/beacon-chain/p2p/discovery_test.go @@ -323,16 +323,16 @@ func TestMultiAddrConversion_OK(t *testing.T) { } func TestStaticPeering_PeersAreAdded(t *testing.T) { + const port = uint(6000) cs := startup.NewClockSynchronizer() cfg := &Config{ MaxPeers: 30, ClockWaiter: cs, } - port := 6000 var staticPeers []string var hosts []host.Host // setup other nodes - for i := 1; i <= 5; i++ { + for i := uint(1); i <= 5; i++ { h, _, ipaddr := createHost(t, port+i) staticPeers = append(staticPeers, fmt.Sprintf("/ip4/%s/tcp/%d/p2p/%s", ipaddr, port+i, h.ID())) hosts = append(hosts, h) @@ -406,14 +406,14 @@ func TestInboundPeerLimit(t *testing.T) { _ = addPeer(t, s.peers, peerdata.ConnectionState(ethpb.ConnectionState_CONNECTED), false) } - require.Equal(t, true, s.isPeerAtLimit(false), "not at limit for outbound peers") - require.Equal(t, false, s.isPeerAtLimit(true), "at limit for inbound peers") + require.Equal(t, true, s.isPeerAtLimit(all), "not at limit for outbound peers") + require.Equal(t, false, s.isPeerAtLimit(inbound), "at limit for inbound peers") for i := 0; i < highWatermarkBuffer; i++ { _ = addPeer(t, s.peers, peerdata.ConnectionState(ethpb.ConnectionState_CONNECTED), false) } - require.Equal(t, true, s.isPeerAtLimit(true), "not at limit for inbound peers") + require.Equal(t, true, s.isPeerAtLimit(inbound), "not at limit for inbound peers") } func TestOutboundPeerThreshold(t *testing.T) { diff --git a/beacon-chain/p2p/handshake.go b/beacon-chain/p2p/handshake.go index e9c0a82eae..c6910b15e9 100644 --- a/beacon-chain/p2p/handshake.go +++ b/beacon-chain/p2p/handshake.go @@ -10,6 +10,7 @@ import ( "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers" "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers/peerdata" prysmTime "github.com/OffchainLabs/prysm/v6/time" + "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/pkg/errors" @@ -17,6 +18,8 @@ import ( ) const ( + agentVersionKey = "AgentVersion" + // The time to wait for a status request. timeForStatus = 10 * time.Second ) @@ -28,12 +31,15 @@ func peerMultiaddrString(conn network.Conn) string { } func (s *Service) connectToPeer(conn network.Conn) { - s.peers.SetConnectionState(conn.RemotePeer(), peers.Connected) + remotePeer := conn.RemotePeer() + + s.peers.SetConnectionState(remotePeer, peers.Connected) // Go through the handshake process. log.WithFields(logrus.Fields{ "direction": conn.Stat().Direction.String(), "multiAddr": peerMultiaddrString(conn), "activePeers": len(s.peers.Active()), + "agent": agentString(remotePeer, s.Host()), }).Debug("Initiate peer connection") } @@ -61,6 +67,7 @@ func (s *Service) disconnectFromPeerOnError( "multiaddr": peerMultiaddrString(conn), "direction": conn.Stat().Direction.String(), "remainingActivePeers": len(s.peers.Active()), + "agent": agentString(remotePeerID, s.Host()), }). Debug("Initiate peer disconnection") @@ -189,9 +196,10 @@ func (s *Service) AddDisconnectionHandler(handler func(ctx context.Context, id p DisconnectedF: func(net network.Network, conn network.Conn) { peerID := conn.RemotePeer() - log.WithFields(logrus.Fields{ + log := log.WithFields(logrus.Fields{ "multiAddr": peerMultiaddrString(conn), "direction": conn.Stat().Direction.String(), + "agent": agentString(peerID, s.Host()), }) // Must be handled in a goroutine as this callback cannot be blocking. go func() { @@ -222,3 +230,14 @@ func (s *Service) AddDisconnectionHandler(handler func(ctx context.Context, id p }, }) } + +func agentString(pid peer.ID, hst host.Host) string { + rawVersion, storeErr := hst.Peerstore().Get(pid, agentVersionKey) + + result, ok := rawVersion.(string) + if storeErr != nil || !ok { + result = "" + } + + return result +} diff --git a/beacon-chain/p2p/interfaces.go b/beacon-chain/p2p/interfaces.go index 640bca6371..8f285acc12 100644 --- a/beacon-chain/p2p/interfaces.go +++ b/beacon-chain/p2p/interfaces.go @@ -49,7 +49,7 @@ type ( BroadcastBlob(ctx context.Context, subnet uint64, blob *ethpb.BlobSidecar) error BroadcastLightClientOptimisticUpdate(ctx context.Context, update interfaces.LightClientOptimisticUpdate) error BroadcastLightClientFinalityUpdate(ctx context.Context, update interfaces.LightClientFinalityUpdate) error - BroadcastDataColumn(root [fieldparams.RootLength]byte, columnSubnet uint64, dataColumnSidecar *ethpb.DataColumnSidecar, peersChecked ...chan<- bool) error + BroadcastDataColumn(root [fieldparams.RootLength]byte, columnSubnet uint64, dataColumnSidecar *ethpb.DataColumnSidecar) error } // SetStreamHandler configures p2p to handle streams of a certain topic ID. @@ -98,7 +98,7 @@ type ( NodeID() enode.ID DiscoveryAddresses() ([]multiaddr.Multiaddr, error) RefreshPersistentSubnets() - FindPeersWithSubnet(ctx context.Context, topic string, subIndex uint64, threshold int) (bool, error) + FindAndDialPeersWithSubnets(ctx context.Context, topicFormat string, digest [fieldparams.VersionLength]byte, minimumPeersPerSubnet int, subnets map[uint64]bool) error AddPingMethod(reqFunc func(ctx context.Context, id peer.ID) error) } diff --git a/beacon-chain/p2p/peers/status.go b/beacon-chain/p2p/peers/status.go index 59ee1fb370..3240bb35dc 100644 --- a/beacon-chain/p2p/peers/status.go +++ b/beacon-chain/p2p/peers/status.go @@ -393,7 +393,7 @@ func (p *Status) SetNextValidTime(pid peer.ID, nextTime time.Time) { peerData.NextValidTime = nextTime } -// RandomizeBackOff adds extra backoff period during which peer will not be dialed. +// RandomizeBackOff adds extra backoff period during which peer won't be dialed. func (p *Status) RandomizeBackOff(pid peer.ID) { p.store.Lock() defer p.store.Unlock() diff --git a/beacon-chain/p2p/service.go b/beacon-chain/p2p/service.go index 1ceda55002..2191a269dd 100644 --- a/beacon-chain/p2p/service.go +++ b/beacon-chain/p2p/service.go @@ -37,24 +37,28 @@ import ( var _ runtime.Service = (*Service)(nil) -// In the event that we are at our peer limit, we -// stop looking for new peers and instead poll -// for the current peer limit status for the time period -// defined below. -var pollingPeriod = 6 * time.Second +const ( + // When looking for new nodes, if not enough nodes are found, + // we stop after this spent time. + batchPeriod = 2 * time.Second -// When looking for new nodes, if not enough nodes are found, -// we stop after this spent time. -var batchPeriod = 2 * time.Second + // maxBadResponses is the maximum number of bad responses from a peer before we stop talking to it. + maxBadResponses = 5 +) -// Refresh rate of ENR set at twice per slot. -var refreshRate = slots.DivideSlotBy(2) +var ( + // Refresh rate of ENR set at twice per slot. + refreshRate = slots.DivideSlotBy(2) -// maxBadResponses is the maximum number of bad responses from a peer before we stop talking to it. -const maxBadResponses = 5 + // maxDialTimeout is the timeout for a single peer dial. + maxDialTimeout = params.BeaconConfig().RespTimeoutDuration() -// maxDialTimeout is the timeout for a single peer dial. -var maxDialTimeout = params.BeaconConfig().RespTimeoutDuration() + // In the event that we are at our peer limit, we + // stop looking for new peers and instead poll + // for the current peer limit status for the time period + // defined below. + pollingPeriod = 6 * time.Second +) // Service for managing peer to peer (p2p) networking. type Service struct { @@ -251,6 +255,7 @@ func (s *Service) Start() { "inboundTCP": inboundTCPCount, "outboundTCP": outboundTCPCount, "total": total, + "target": s.cfg.MaxPeers, } if features.Get().EnableQUIC { diff --git a/beacon-chain/p2p/service_test.go b/beacon-chain/p2p/service_test.go index f2e65c6053..ea08be2b24 100644 --- a/beacon-chain/p2p/service_test.go +++ b/beacon-chain/p2p/service_test.go @@ -13,6 +13,7 @@ import ( "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers" "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers/scorers" "github.com/OffchainLabs/prysm/v6/beacon-chain/startup" + fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams" "github.com/OffchainLabs/prysm/v6/config/params" "github.com/OffchainLabs/prysm/v6/encoding/bytesutil" "github.com/OffchainLabs/prysm/v6/network/forks" @@ -72,7 +73,7 @@ func (mockListener) RandomNodes() enode.Iterator { func (mockListener) RebootListener() error { panic("implement me") } -func createHost(t *testing.T, port int) (host.Host, *ecdsa.PrivateKey, net.IP) { +func createHost(t *testing.T, port uint) (host.Host, *ecdsa.PrivateKey, net.IP) { _, pkey := createAddrAndPrivKey(t) ipAddr := net.ParseIP("127.0.0.1") listen, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", ipAddr, port)) @@ -185,21 +186,33 @@ func TestService_Start_NoDiscoverFlag(t *testing.T) { } func TestListenForNewNodes(t *testing.T) { + const ( + port = uint(2000) + testPollingPeriod = 1 * time.Second + peerCount = 5 + ) + params.SetupTestConfigCleanup(t) + // Setup bootnode. - notifier := &mock.MockStateNotifier{} - cfg := &Config{StateNotifier: notifier, PingInterval: testPingInterval, DisableLivenessCheck: true} - port := 2000 - cfg.UDPPort = uint(port) + cfg := &Config{ + StateNotifier: &mock.MockStateNotifier{}, + PingInterval: testPingInterval, + DisableLivenessCheck: true, + UDPPort: port, + } + _, pkey := createAddrAndPrivKey(t) ipAddr := net.ParseIP("127.0.0.1") genesisTime := prysmTime.Now() - var gvr [32]byte + var gvr [fieldparams.RootLength]byte + s := &Service{ cfg: cfg, genesisTime: genesisTime, genesisValidatorsRoot: gvr[:], } + bootListener, err := s.createListener(ipAddr, pkey) require.NoError(t, err) defer bootListener.Close() @@ -210,35 +223,40 @@ func TestListenForNewNodes(t *testing.T) { // Use shorter period for testing. currentPeriod := pollingPeriod - pollingPeriod = 1 * time.Second + pollingPeriod = testPollingPeriod defer func() { pollingPeriod = currentPeriod }() bootNode := bootListener.Self() - var listeners []*listenerWrapper - var hosts []host.Host - // setup other nodes. + // Setup other nodes. cs := startup.NewClockSynchronizer() - cfg = &Config{ - Discv5BootStrapAddrs: []string{bootNode.String()}, - PingInterval: testPingInterval, - DisableLivenessCheck: true, - MaxPeers: 30, - ClockWaiter: cs, - } - for i := 1; i <= 5; i++ { + listeners := make([]*listenerWrapper, 0, peerCount) + hosts := make([]host.Host, 0, peerCount) + + for i := uint(1); i <= peerCount; i++ { + cfg = &Config{ + Discv5BootStrapAddrs: []string{bootNode.String()}, + PingInterval: testPingInterval, + DisableLivenessCheck: true, + MaxPeers: peerCount, + ClockWaiter: cs, + UDPPort: port + i, + TCPPort: port + i, + } + h, pkey, ipAddr := createHost(t, port+i) - cfg.UDPPort = uint(port + i) - cfg.TCPPort = uint(port + i) + s := &Service{ cfg: cfg, genesisTime: genesisTime, genesisValidatorsRoot: gvr[:], } + listener, err := s.startDiscoveryV5(ipAddr, pkey) - assert.NoError(t, err, "Could not start discovery for node") + require.NoError(t, err, "Could not start discovery for node") + listeners = append(listeners, listener) hosts = append(hosts, h) } @@ -263,19 +281,26 @@ func TestListenForNewNodes(t *testing.T) { s, err = NewService(t.Context(), cfg) require.NoError(t, err) - exitRoutine := make(chan bool) - go func() { - s.Start() - <-exitRoutine - }() - time.Sleep(1 * time.Second) - require.NoError(t, cs.SetClock(startup.NewClock(genesisTime, gvr))) + go s.Start() - time.Sleep(4 * time.Second) - assert.Equal(t, 5, len(s.host.Network().Peers()), "Not all peers added to peerstore") - require.NoError(t, s.Stop()) - exitRoutine <- true + err = cs.SetClock(startup.NewClock(genesisTime, gvr)) + require.NoError(t, err, "Could not set clock in service") + + actualPeerCount := len(s.host.Network().Peers()) + for range 40 { + if actualPeerCount == peerCount { + break + } + + time.Sleep(100 * time.Millisecond) + actualPeerCount = len(s.host.Network().Peers()) + } + + assert.Equal(t, peerCount, actualPeerCount, "Not all peers added to peerstore") + + err = s.Stop() + require.NoError(t, err, "Failed to stop service") } func TestPeer_Disconnect(t *testing.T) { diff --git a/beacon-chain/p2p/subnets.go b/beacon-chain/p2p/subnets.go index 68e357bf0e..7addca6950 100644 --- a/beacon-chain/p2p/subnets.go +++ b/beacon-chain/p2p/subnets.go @@ -2,6 +2,7 @@ package p2p import ( "context" + "fmt" "math" "strings" "sync" @@ -11,6 +12,7 @@ import ( "github.com/OffchainLabs/prysm/v6/beacon-chain/core/helpers" "github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas" "github.com/OffchainLabs/prysm/v6/cmd/beacon-chain/flags" + fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams" "github.com/OffchainLabs/prysm/v6/config/params" "github.com/OffchainLabs/prysm/v6/consensus-types/primitives" "github.com/OffchainLabs/prysm/v6/consensus-types/wrapper" @@ -23,7 +25,6 @@ import ( "github.com/holiman/uint256" "github.com/pkg/errors" "github.com/prysmaticlabs/go-bitfield" - "github.com/sirupsen/logrus" ) var ( @@ -57,249 +58,297 @@ const blobSubnetLockerVal = 110 const dataColumnSubnetVal = 150 // nodeFilter returns a function that filters nodes based on the subnet topic and subnet index. -func (s *Service) nodeFilter(topic string, index uint64) (func(node *enode.Node) bool, error) { +func (s *Service) nodeFilter(topic string, indices map[uint64]int) (func(node *enode.Node) (map[uint64]bool, error), error) { switch { case strings.Contains(topic, GossipAttestationMessage): - return s.filterPeerForAttSubnet(index), nil + return s.filterPeerForAttSubnet(indices), nil case strings.Contains(topic, GossipSyncCommitteeMessage): - return s.filterPeerForSyncSubnet(index), nil + return s.filterPeerForSyncSubnet(indices), nil case strings.Contains(topic, GossipBlobSidecarMessage): - return s.filterPeerForBlobSubnet(), nil + return s.filterPeerForBlobSubnet(indices), nil case strings.Contains(topic, GossipDataColumnSidecarMessage): - return s.filterPeerForDataColumnsSubnet(index), nil + return s.filterPeerForDataColumnsSubnet(indices), nil default: return nil, errors.Errorf("no subnet exists for provided topic: %s", topic) } } -// searchForPeers performs a network search for peers subscribed to a particular subnet. -// It exits as soon as one of these conditions is met: -// - It looped through `batchSize` nodes. -// - It found `peersToFindCount“ peers corresponding to the `filter` criteria. -// - Iterator is exhausted. -func searchForPeers( - iterator enode.Iterator, - batchPeriod time.Duration, - peersToFindCount uint, - filter func(node *enode.Node) bool, -) []*enode.Node { - nodeFromNodeID := make(map[enode.ID]*enode.Node) - start := time.Now() +// FindAndDialPeersWithSubnets ensures that our node is connected to at least `minimumPeersPerSubnet` +// peers for each subnet listed in `subnets`. +// If, for all subnets, the threshold is met, then this function immediately returns. +// Otherwise, it searches for new peers for defective subnets, and dials them. +// If `ctx“ is canceled while searching for peers, search is stopped, but new found peers are still dialed. +// In this case, the function returns an error. +func (s *Service) FindAndDialPeersWithSubnets( + ctx context.Context, + topicFormat string, + digest [fieldparams.VersionLength]byte, + minimumPeersPerSubnet int, + subnets map[uint64]bool, +) error { + ctx, span := trace.StartSpan(ctx, "p2p.FindAndDialPeersWithSubnet") + defer span.End() - for time.Since(start) < batchPeriod && uint(len(nodeFromNodeID)) < peersToFindCount && iterator.Next() { + // Return early if the discovery listener isn't set. + if s.dv5Listener == nil { + return nil + } + + // Restrict dials if limit is applied. + maxConcurrentDials := math.MaxInt + if flags.MaxDialIsActive() { + maxConcurrentDials = flags.Get().MaxConcurrentDials + } + + defectiveSubnets := s.defectiveSubnets(topicFormat, digest, minimumPeersPerSubnet, subnets) + for len(defectiveSubnets) > 0 { + // Stop the search/dialing loop if the context is canceled. + if err := ctx.Err(); err != nil { + return err + } + + peersToDial, err := func() ([]*enode.Node, error) { + ctx, cancel := context.WithTimeout(ctx, batchPeriod) + defer cancel() + + peersToDial, err := s.findPeersWithSubnets(ctx, topicFormat, digest, minimumPeersPerSubnet, defectiveSubnets) + if err != nil && !errors.Is(err, context.DeadlineExceeded) { + return nil, errors.Wrap(err, "find peers with subnets") + } + + return peersToDial, nil + }() + + if err != nil { + return err + } + + // Dial new peers in batches. + s.dialPeers(s.ctx, maxConcurrentDials, peersToDial) + + defectiveSubnets = s.defectiveSubnets(topicFormat, digest, minimumPeersPerSubnet, subnets) + } + + return nil +} + +// findPeersWithSubnets finds peers subscribed to defective subnets in batches +// until enough peers are found or the context is canceled. +// It returns new peers found during the search. +func (s *Service) findPeersWithSubnets( + ctx context.Context, + topicFormat string, + digest [fieldparams.VersionLength]byte, + minimumPeersPerSubnet int, + defectiveSubnetsOrigin map[uint64]int, +) ([]*enode.Node, error) { + // Copy the defective subnets map to avoid modifying the original map. + defectiveSubnets := make(map[uint64]int, len(defectiveSubnetsOrigin)) + for k, v := range defectiveSubnetsOrigin { + defectiveSubnets[k] = v + } + + // Create an discovery iterator to find new peers. + iterator := s.dv5Listener.RandomNodes() + + // `iterator.Next` can block indefinitely. `iterator.Close` unblocks it. + // So it is important to close the iterator when the context is done to ensure + // that the search does not hang indefinitely. + go func() { + <-ctx.Done() + iterator.Close() + }() + + // Retrieve the filter function that will be used to filter nodes based on the defective subnets. + filter, err := s.nodeFilter(topicFormat, defectiveSubnets) + if err != nil { + return nil, errors.Wrap(err, "node filter") + } + + // Crawl the network for peers subscribed to the defective subnets. + nodeByNodeID := make(map[enode.ID]*enode.Node) + for len(defectiveSubnets) > 0 && iterator.Next() { + if err := ctx.Err(); err != nil { + // Convert the map to a slice. + peersToDial := make([]*enode.Node, 0, len(nodeByNodeID)) + for _, node := range nodeByNodeID { + peersToDial = append(peersToDial, node) + } + + return peersToDial, err + } + + // Get all needed subnets that the node is subscribed to. + // Skip nodes that are not subscribed to any of the defective subnets. node := iterator.Node() - - // Filter out nodes that do not meet the criteria. - if !filter(node) { + nodeSubnets, err := filter(node) + if err != nil { + return nil, errors.Wrap(err, "filter node") + } + if len(nodeSubnets) == 0 { continue } // Remove duplicates, keeping the node with higher seq. - prevNode, ok := nodeFromNodeID[node.ID()] - if ok && prevNode.Seq() > node.Seq() { + existing, ok := nodeByNodeID[node.ID()] + if ok && existing.Seq() > node.Seq() { continue } + nodeByNodeID[node.ID()] = node - nodeFromNodeID[node.ID()] = node + // We found a new peer. Modify the defective subnets map + // and the filter accordingly. + for subnet := range defectiveSubnets { + if !nodeSubnets[subnet] { + continue + } + + defectiveSubnets[subnet]-- + + if defectiveSubnets[subnet] == 0 { + delete(defectiveSubnets, subnet) + } + + filter, err = s.nodeFilter(topicFormat, defectiveSubnets) + if err != nil { + return nil, errors.Wrap(err, "node filter") + } + } } // Convert the map to a slice. - nodes := make([]*enode.Node, 0, len(nodeFromNodeID)) - for _, node := range nodeFromNodeID { - nodes = append(nodes, node) + peersToDial := make([]*enode.Node, 0, len(nodeByNodeID)) + for _, node := range nodeByNodeID { + peersToDial = append(peersToDial, node) } - return nodes + return peersToDial, nil } -// dialPeer dials a peer in a separate goroutine. -func (s *Service) dialPeer(ctx context.Context, wg *sync.WaitGroup, node *enode.Node) { - info, _, err := convertToAddrInfo(node) - if err != nil { - return - } - - if info == nil { - return - } - - wg.Add(1) - go func() { - if err := s.connectWithPeer(ctx, *info); err != nil { - log.WithError(err).Tracef("Could not connect with peer %s", info.String()) +// defectiveSubnets returns a map of subnets that have fewer than the minimum peer count. +func (s *Service) defectiveSubnets( + topicFormat string, + digest [fieldparams.VersionLength]byte, + minimumPeersPerSubnet int, + subnets map[uint64]bool, +) map[uint64]int { + missingCountPerSubnet := make(map[uint64]int, len(subnets)) + for subnet := range subnets { + topic := fmt.Sprintf(topicFormat, digest, subnet) + s.Encoding().ProtocolSuffix() + peers := s.pubsub.ListPeers(topic) + peerCount := len(peers) + if peerCount < minimumPeersPerSubnet { + missingCountPerSubnet[subnet] = minimumPeersPerSubnet - peerCount } + } - wg.Done() - }() + return missingCountPerSubnet } -// FindPeersWithSubnet performs a network search for peers -// subscribed to a particular subnet. Then it tries to connect -// with those peers. This method will block until either: -// - the required amount of peers are found, or -// - the context is terminated. -// On some edge cases, this method may hang indefinitely while peers -// are actually found. In such a case, the user should cancel the context -// and re-run the method again. -func (s *Service) FindPeersWithSubnet( - ctx context.Context, - topic string, - index uint64, - threshold int, -) (bool, error) { - const minLogInterval = 1 * time.Minute +// dialPeers dials multiple peers concurrently up to `maxConcurrentDials` at a time. +// In case of a dial failure, it logs the error but continues dialing other peers. +func (s *Service) dialPeers(ctx context.Context, maxConcurrentDials int, nodes []*enode.Node) uint { + var mut sync.Mutex - ctx, span := trace.StartSpan(ctx, "p2p.FindPeersWithSubnet") - defer span.End() - - span.SetAttributes(trace.Int64Attribute("index", int64(index))) // lint:ignore uintcast -- It's safe to do this for tracing. - - if s.dv5Listener == nil { - // Return if discovery isn't set - return false, nil - } - - topic += s.Encoding().ProtocolSuffix() - iterator := s.dv5Listener.RandomNodes() - defer iterator.Close() - - filter, err := s.nodeFilter(topic, index) - if err != nil { - return false, errors.Wrap(err, "node filter") - } - - peersSummary := func(topic string, threshold int) (int, int) { - // Retrieve how many peers we have for this topic. - peerCountForTopic := len(s.pubsub.ListPeers(topic)) - - // Compute how many peers we are missing to reach the threshold. - missingPeerCountForTopic := max(0, threshold-peerCountForTopic) - - return peerCountForTopic, missingPeerCountForTopic - } - - // Compute how many peers we are missing to reach the threshold. - peerCountForTopic, missingPeerCountForTopic := peersSummary(topic, threshold) - - // Exit early if we have enough peers. - if missingPeerCountForTopic == 0 { - return true, nil - } - - log := log.WithFields(logrus.Fields{ - "topic": topic, - "targetPeerCount": threshold, - }) - - log.WithField("currentPeerCount", peerCountForTopic).Debug("Searching for new peers for a subnet - start") - - lastLogTime := time.Now() - - wg := new(sync.WaitGroup) - for { - // If the context is done, we can exit the loop. This is the unhappy path. - if err := ctx.Err(); err != nil { - return false, errors.Errorf( - "unable to find requisite number of peers for topic %s - only %d out of %d peers available after searching", - topic, peerCountForTopic, threshold, - ) + counter := uint(0) + for start := 0; start < len(nodes); start += maxConcurrentDials { + if ctx.Err() != nil { + return counter } - // Search for new peers in the network. - nodes := searchForPeers(iterator, batchPeriod, uint(missingPeerCountForTopic), filter) - - // Restrict dials if limit is applied. - maxConcurrentDials := math.MaxInt - if flags.MaxDialIsActive() { - maxConcurrentDials = flags.Get().MaxConcurrentDials - } - - // Dial the peers in batches. - for start := 0; start < len(nodes); start += maxConcurrentDials { - stop := min(start+maxConcurrentDials, len(nodes)) - for _, node := range nodes[start:stop] { - s.dialPeer(ctx, wg, node) + var wg sync.WaitGroup + stop := min(start+maxConcurrentDials, len(nodes)) + for _, node := range nodes[start:stop] { + log := log.WithField("nodeID", node.ID()) + info, _, err := convertToAddrInfo(node) + if err != nil { + log.WithError(err).Debug("Could not convert node to addr info") + continue } - // Wait for all dials to be completed. - wg.Wait() + if info == nil { + log.Debug("Nil addr info") + continue + } + + wg.Add(1) + go func() { + defer wg.Done() + if err := s.connectWithPeer(ctx, *info); err != nil { + log.WithError(err).WithField("info", info.String()).Debug("Could not connect with peer") + return + } + + mut.Lock() + defer mut.Unlock() + counter++ + }() } - peerCountForTopic, missingPeerCountForTopic := peersSummary(topic, threshold) - - // If we have enough peers, we can exit the loop. This is the happy path. - if missingPeerCountForTopic == 0 { - break - } - - if time.Since(lastLogTime) > minLogInterval { - lastLogTime = time.Now() - log.WithField("currentPeerCount", peerCountForTopic).Debug("Searching for new peers for a subnet - continue") - } + wg.Wait() } - log.WithField("currentPeerCount", threshold).Debug("Searching for new peers for a subnet - success") - return true, nil + return counter } -// returns a method with filters peers specifically for a particular attestation subnet. -func (s *Service) filterPeerForAttSubnet(index uint64) func(node *enode.Node) bool { - return func(node *enode.Node) bool { +// filterPeerForAttSubnet returns a method with filters peers specifically for a particular attestation subnet. +func (s *Service) filterPeerForAttSubnet(indices map[uint64]int) func(node *enode.Node) (map[uint64]bool, error) { + return func(node *enode.Node) (map[uint64]bool, error) { if !s.filterPeer(node) { - return false + return map[uint64]bool{}, nil } - subnets, err := attSubnets(node.Record()) + subnets, err := attestationSubnets(node.Record()) if err != nil { - return false + return nil, errors.Wrap(err, "attestation subnets") } - return subnets[index] + return intersect(indices, subnets), nil } } // returns a method with filters peers specifically for a particular sync subnet. -func (s *Service) filterPeerForSyncSubnet(index uint64) func(node *enode.Node) bool { - return func(node *enode.Node) bool { +func (s *Service) filterPeerForSyncSubnet(indices map[uint64]int) func(node *enode.Node) (map[uint64]bool, error) { + return func(node *enode.Node) (map[uint64]bool, error) { if !s.filterPeer(node) { - return false + return map[uint64]bool{}, nil } + subnets, err := syncSubnets(node.Record()) if err != nil { - return false + return nil, errors.Wrap(err, "sync subnets") } - indExists := false - for _, comIdx := range subnets { - if comIdx == index { - indExists = true - break - } - } - return indExists + + return intersect(indices, subnets), nil } } // returns a method with filters peers specifically for a particular blob subnet. // All peers are supposed to be subscribed to all blob subnets. -func (s *Service) filterPeerForBlobSubnet() func(_ *enode.Node) bool { - return func(_ *enode.Node) bool { - return true +func (s *Service) filterPeerForBlobSubnet(indices map[uint64]int) func(_ *enode.Node) (map[uint64]bool, error) { + result := make(map[uint64]bool, len(indices)) + for i := range indices { + result[i] = true + } + + return func(_ *enode.Node) (map[uint64]bool, error) { + return result, nil } } // returns a method with filters peers specifically for a particular data column subnet. -func (s *Service) filterPeerForDataColumnsSubnet(index uint64) func(node *enode.Node) bool { - return func(node *enode.Node) bool { +func (s *Service) filterPeerForDataColumnsSubnet(indices map[uint64]int) func(node *enode.Node) (map[uint64]bool, error) { + return func(node *enode.Node) (map[uint64]bool, error) { if !s.filterPeer(node) { - return false + return map[uint64]bool{}, nil } subnets, err := dataColumnSubnets(node.ID(), node.Record()) if err != nil { - return false + return nil, errors.Wrap(err, "data column subnets") } - return subnets[index] + return intersect(indices, subnets), nil } } @@ -475,43 +524,47 @@ func initializeSyncCommSubnets(node *enode.LocalNode) *enode.LocalNode { // Reads the attestation subnets entry from a node's ENR and determines // the committee indices of the attestation subnets the node is subscribed to. -func attSubnets(record *enr.Record) (map[uint64]bool, error) { +func attestationSubnets(record *enr.Record) (map[uint64]bool, error) { bitV, err := attBitvector(record) if err != nil { - return nil, err - } - committeeIdxs := make(map[uint64]bool) - // lint:ignore uintcast -- subnet count can be safely cast to int. - if len(bitV) != byteCount(int(attestationSubnetCount)) { - return committeeIdxs, errors.Errorf("invalid bitvector provided, it has a size of %d", len(bitV)) + return nil, errors.Wrap(err, "att bit vector") } - for i := uint64(0); i < attestationSubnetCount; i++ { + // lint:ignore uintcast -- subnet count can be safely cast to int. + if len(bitV) != byteCount(int(attestationSubnetCount)) { + return nil, errors.Errorf("invalid bitvector provided, it has a size of %d", len(bitV)) + } + + indices := make(map[uint64]bool, attestationSubnetCount) + for i := range attestationSubnetCount { if bitV.BitAt(i) { - committeeIdxs[i] = true + indices[i] = true } } - return committeeIdxs, nil + + return indices, nil } // Reads the sync subnets entry from a node's ENR and determines // the committee indices of the sync subnets the node is subscribed to. -func syncSubnets(record *enr.Record) ([]uint64, error) { +func syncSubnets(record *enr.Record) (map[uint64]bool, error) { bitV, err := syncBitvector(record) if err != nil { - return nil, err + return nil, errors.Wrap(err, "sync bit vector") } + // lint:ignore uintcast -- subnet count can be safely cast to int. if len(bitV) != byteCount(int(syncCommsSubnetCount)) { - return []uint64{}, errors.Errorf("invalid bitvector provided, it has a size of %d", len(bitV)) + return nil, errors.Errorf("invalid bitvector provided, it has a size of %d", len(bitV)) } - var committeeIdxs []uint64 - for i := uint64(0); i < syncCommsSubnetCount; i++ { + + indices := make(map[uint64]bool, syncCommsSubnetCount) + for i := range syncCommsSubnetCount { if bitV.BitAt(i) { - committeeIdxs = append(committeeIdxs, i) + indices[i] = true } } - return committeeIdxs, nil + return indices, nil } // Retrieve the data columns subnets from a node's ENR and node ID. @@ -585,3 +638,16 @@ func byteCount(bitCount int) int { } return numOfBytes } + +// interesect intersects two maps and returns a new map containing only the keys +// that are present in both maps. +func intersect(left map[uint64]int, right map[uint64]bool) map[uint64]bool { + result := make(map[uint64]bool, min(len(left), len(right))) + for i := range left { + if right[i] { + result[i] = true + } + } + + return result +} diff --git a/beacon-chain/p2p/subnets_test.go b/beacon-chain/p2p/subnets_test.go index faa6b17ed9..537eecb07f 100644 --- a/beacon-chain/p2p/subnets_test.go +++ b/beacon-chain/p2p/subnets_test.go @@ -5,7 +5,6 @@ import ( "crypto/rand" "encoding/hex" "fmt" - "reflect" "testing" "time" @@ -22,7 +21,7 @@ import ( "github.com/prysmaticlabs/go-bitfield" ) -func TestStartDiscV5_FindPeersWithSubnet(t *testing.T) { +func TestStartDiscV5_FindAndDialPeersWithSubnet(t *testing.T) { // Topology of this test: // // @@ -37,7 +36,12 @@ func TestStartDiscV5_FindPeersWithSubnet(t *testing.T) { // In our case: The node i is subscribed to subnet i, with i = 1, 2, 3 // Define the genesis validators root, to ensure everybody is on the same network. - const genesisValidatorRootStr = "0xdeadbeefcafecafedeadbeefcafecafedeadbeefcafecafedeadbeefcafecafe" + const ( + genesisValidatorRootStr = "0xdeadbeefcafecafedeadbeefcafecafedeadbeefcafecafedeadbeefcafecafe" + subnetCount = 3 + minimumPeersPerSubnet = 1 + ) + genesisValidatorsRoot, err := hex.DecodeString(genesisValidatorRootStr[2:]) require.NoError(t, err) @@ -87,13 +91,12 @@ func TestStartDiscV5_FindPeersWithSubnet(t *testing.T) { // Create 3 nodes, each subscribed to a different subnet. // Each node is connected to the bootstrap node. - services := make([]*Service, 0, 3) + services := make([]*Service, 0, subnetCount) - for i := 1; i <= 3; i++ { - subnet := uint64(i) + for i := uint64(1); i <= subnetCount; i++ { service, err := NewService(ctx, &Config{ Discv5BootStrapAddrs: []string{bootNodeENR}, - MaxPeers: 30, + MaxPeers: 0, // Set to 0 to ensure that peers are discovered via subnets search, and not generic peers discovery. UDPPort: uint(2000 + i), TCPPort: uint(3000 + i), QUICPort: uint(3000 + i), @@ -115,12 +118,13 @@ func TestStartDiscV5_FindPeersWithSubnet(t *testing.T) { // Set the ENR `attnets`, used by Prysm to filter peers by subnet. bitV := bitfield.NewBitvector64() - bitV.SetBitAt(subnet, true) + bitV.SetBitAt(i, true) entry := enr.WithEntry(attSubnetEnrKey, &bitV) service.dv5Listener.LocalNode().Set(entry) // Join and subscribe to the subnet, needed by libp2p. - topic, err := service.pubsub.Join(fmt.Sprintf(AttestationSubnetTopicFormat, bootNodeForkDigest, subnet) + "/ssz_snappy") + topicName := fmt.Sprintf(AttestationSubnetTopicFormat, bootNodeForkDigest, i) + "/ssz_snappy" + topic, err := service.pubsub.Join(topicName) require.NoError(t, err) _, err = topic.Subscribe() @@ -160,37 +164,18 @@ func TestStartDiscV5_FindPeersWithSubnet(t *testing.T) { require.NoError(t, err) }() - // Look up 3 different subnets. - exists := make([]bool, 0, 3) - for i := 1; i <= 3; i++ { - subnet := uint64(i) - topic := fmt.Sprintf(AttestationSubnetTopicFormat, bootNodeForkDigest, subnet) + subnets := map[uint64]bool{1: true, 2: true, 3: true} + defectiveSubnets := service.defectiveSubnets(AttestationSubnetTopicFormat, bootNodeForkDigest, minimumPeersPerSubnet, subnets) + require.Equal(t, subnetCount, len(defectiveSubnets)) - exist := false + ctxWithTimeOut, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() - // This for loop is used to ensure we don't get stuck in `FindPeersWithSubnet`. - // Read the documentation of `FindPeersWithSubnet` for more details. - for j := 0; j < 3; j++ { - ctxWithTimeOut, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() + err = service.FindAndDialPeersWithSubnets(ctxWithTimeOut, AttestationSubnetTopicFormat, bootNodeForkDigest, minimumPeersPerSubnet, subnets) + require.NoError(t, err) - exist, err = service.FindPeersWithSubnet(ctxWithTimeOut, topic, subnet, 1) - require.NoError(t, err) - - if exist { - break - } - } - - require.NoError(t, err) - exists = append(exists, exist) - - } - - // Check if all peers are found. - for _, exist := range exists { - require.Equal(t, true, exist, "Peer with subnet doesn't exist") - } + defectiveSubnets = service.defectiveSubnets(AttestationSubnetTopicFormat, bootNodeForkDigest, minimumPeersPerSubnet, subnets) + require.Equal(t, 0, len(defectiveSubnets)) } func Test_AttSubnets(t *testing.T) { @@ -305,37 +290,34 @@ func Test_AttSubnets(t *testing.T) { wantErr: false, }, } + for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { db, err := enode.OpenDB("") - assert.NoError(t, err) + require.NoError(t, err) priv, _, err := crypto.GenerateSecp256k1Key(rand.Reader) - assert.NoError(t, err) + require.NoError(t, err) convertedKey, err := ecdsaprysm.ConvertFromInterfacePrivKey(priv) - assert.NoError(t, err) + require.NoError(t, err) localNode := enode.NewLocalNode(db, convertedKey) record := tt.record(localNode) - got, err := attSubnets(record) + got, err := attestationSubnets(record) if (err != nil) != tt.wantErr { - t.Errorf("syncSubnets() error = %v, wantErr %v", err, tt.wantErr) + t.Errorf("attestationSubnets() error = %v, wantErr %v", err, tt.wantErr) return } if tt.wantErr { - assert.ErrorContains(t, tt.errContains, err) + require.ErrorContains(t, tt.errContains, err) } - want := make(map[uint64]bool, len(tt.want)) + require.Equal(t, len(tt.want), len(got)) for _, subnet := range tt.want { - want[subnet] = true - } - - if !reflect.DeepEqual(got, want) { - t.Errorf("syncSubnets() got = %v, want %v", got, want) + require.Equal(t, true, got[subnet]) } }) } @@ -494,11 +476,14 @@ func Test_SyncSubnets(t *testing.T) { t.Errorf("syncSubnets() error = %v, wantErr %v", err, tt.wantErr) return } + if tt.wantErr { assert.ErrorContains(t, tt.errContains, err) } - if !reflect.DeepEqual(got, tt.want) { - t.Errorf("syncSubnets() got = %v, want %v", got, tt.want) + + require.Equal(t, len(tt.want), len(got)) + for _, subnet := range tt.want { + require.Equal(t, true, got[subnet]) } }) } diff --git a/beacon-chain/p2p/testing/fuzz_p2p.go b/beacon-chain/p2p/testing/fuzz_p2p.go index 9d2f8cd3f5..f73bbbd97c 100644 --- a/beacon-chain/p2p/testing/fuzz_p2p.go +++ b/beacon-chain/p2p/testing/fuzz_p2p.go @@ -68,9 +68,9 @@ func (*FakeP2P) DiscoveryAddresses() ([]multiaddr.Multiaddr, error) { return nil, nil } -// FindPeersWithSubnet mocks the p2p func. -func (*FakeP2P) FindPeersWithSubnet(_ context.Context, _ string, _ uint64, _ int) (bool, error) { - return false, nil +// FindAndDialPeersWithSubnets mocks the p2p func. +func (*FakeP2P) FindAndDialPeersWithSubnets(ctx context.Context, topicFormat string, digest [fieldparams.VersionLength]byte, minimumPeersPerSubnet int, subnets map[uint64]bool) error { + return nil } // RefreshPersistentSubnets mocks the p2p func. @@ -167,7 +167,7 @@ func (*FakeP2P) BroadcastLightClientFinalityUpdate(_ context.Context, _ interfac } // BroadcastDataColumn -- fake. -func (*FakeP2P) BroadcastDataColumn(_ [fieldparams.RootLength]byte, _ uint64, _ *ethpb.DataColumnSidecar, _ ...chan<- bool) error { +func (*FakeP2P) BroadcastDataColumn(_ [fieldparams.RootLength]byte, _ uint64, _ *ethpb.DataColumnSidecar) error { return nil } diff --git a/beacon-chain/p2p/testing/mock_broadcaster.go b/beacon-chain/p2p/testing/mock_broadcaster.go index 209cefc564..abdb811d4f 100644 --- a/beacon-chain/p2p/testing/mock_broadcaster.go +++ b/beacon-chain/p2p/testing/mock_broadcaster.go @@ -63,7 +63,7 @@ func (m *MockBroadcaster) BroadcastLightClientFinalityUpdate(_ context.Context, } // BroadcastDataColumn broadcasts a data column for mock. -func (m *MockBroadcaster) BroadcastDataColumn([fieldparams.RootLength]byte, uint64, *ethpb.DataColumnSidecar, ...chan<- bool) error { +func (m *MockBroadcaster) BroadcastDataColumn([fieldparams.RootLength]byte, uint64, *ethpb.DataColumnSidecar) error { m.BroadcastCalled.Store(true) return nil } diff --git a/beacon-chain/p2p/testing/mock_peermanager.go b/beacon-chain/p2p/testing/mock_peermanager.go index a23341ff69..fed8926c67 100644 --- a/beacon-chain/p2p/testing/mock_peermanager.go +++ b/beacon-chain/p2p/testing/mock_peermanager.go @@ -4,6 +4,7 @@ import ( "context" "errors" + fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enr" "github.com/libp2p/go-libp2p/core/host" @@ -56,9 +57,9 @@ func (m *MockPeerManager) DiscoveryAddresses() ([]multiaddr.Multiaddr, error) { // RefreshPersistentSubnets . func (*MockPeerManager) RefreshPersistentSubnets() {} -// FindPeersWithSubnet . -func (*MockPeerManager) FindPeersWithSubnet(_ context.Context, _ string, _ uint64, _ int) (bool, error) { - return true, nil +// FindAndDialPeersWithSubnet . +func (*MockPeerManager) FindAndDialPeersWithSubnets(ctx context.Context, topicFormat string, digest [fieldparams.VersionLength]byte, minimumPeersPerSubnet int, subnets map[uint64]bool) error { + return nil } // AddPingMethod . diff --git a/beacon-chain/p2p/testing/p2p.go b/beacon-chain/p2p/testing/p2p.go index 0eab911c06..25814844e5 100644 --- a/beacon-chain/p2p/testing/p2p.go +++ b/beacon-chain/p2p/testing/p2p.go @@ -224,7 +224,7 @@ func (p *TestP2P) BroadcastLightClientFinalityUpdate(_ context.Context, _ interf } // BroadcastDataColumn broadcasts a data column for mock. -func (p *TestP2P) BroadcastDataColumn([fieldparams.RootLength]byte, uint64, *ethpb.DataColumnSidecar, ...chan<- bool) error { +func (p *TestP2P) BroadcastDataColumn([fieldparams.RootLength]byte, uint64, *ethpb.DataColumnSidecar) error { p.BroadcastCalled.Store(true) return nil } @@ -408,9 +408,9 @@ func (p *TestP2P) Peers() *peers.Status { return p.peers } -// FindPeersWithSubnet mocks the p2p func. -func (*TestP2P) FindPeersWithSubnet(_ context.Context, _ string, _ uint64, _ int) (bool, error) { - return false, nil +// FindAndDialPeersWithSubnets mocks the p2p func. +func (*TestP2P) FindAndDialPeersWithSubnets(ctx context.Context, topicFormat string, digest [fieldparams.VersionLength]byte, minimumPeersPerSubnet int, subnets map[uint64]bool) error { + return nil } // RefreshPersistentSubnets mocks the p2p func. diff --git a/beacon-chain/sync/error.go b/beacon-chain/sync/error.go index 301cdb9b49..c67af0417f 100644 --- a/beacon-chain/sync/error.go +++ b/beacon-chain/sync/error.go @@ -109,7 +109,12 @@ func isValidStreamError(err error) bool { func closeStream(stream network.Stream, log *logrus.Entry) { if err := stream.Close(); isValidStreamError(err) { - log.WithError(err).Debugf("Could not reset stream with protocol %s", stream.Protocol()) + log.WithError(err). + WithFields(logrus.Fields{ + "protocol": stream.Protocol(), + "peer": stream.Conn().RemotePeer(), + }). + Debug("Could not close stream") } } @@ -118,7 +123,12 @@ func closeStreamAndWait(stream network.Stream, log *logrus.Entry) { _err := stream.Reset() _ = _err if isValidStreamError(err) { - log.WithError(err).Debugf("Could not reset stream with protocol %s", stream.Protocol()) + log.WithError(err). + WithFields(logrus.Fields{ + "protocol": stream.Protocol(), + "peer": stream.Conn().RemotePeer(), + }). + Debug("Could not reset stream") } return } diff --git a/beacon-chain/sync/metrics.go b/beacon-chain/sync/metrics.go index 186e80d192..64b4c2bb9d 100644 --- a/beacon-chain/sync/metrics.go +++ b/beacon-chain/sync/metrics.go @@ -236,7 +236,7 @@ func (s *Service) updateMetrics() { if err != nil { log.WithError(err).Debugf("Could not compute fork digest") } - indices := s.aggregatorSubnetIndices(s.cfg.clock.CurrentSlot()) + indices := aggregatorSubnetIndices(s.cfg.clock.CurrentSlot()) syncIndices := cache.SyncSubnetIDs.GetAllSubnets(slots.ToEpoch(s.cfg.clock.CurrentSlot())) attTopic := p2p.GossipTypeMapping[reflect.TypeOf(&pb.Attestation{})] syncTopic := p2p.GossipTypeMapping[reflect.TypeOf(&pb.SyncCommitteeMessage{})] diff --git a/beacon-chain/sync/rpc_goodbye.go b/beacon-chain/sync/rpc_goodbye.go index 73bf93508b..a1e053997a 100644 --- a/beacon-chain/sync/rpc_goodbye.go +++ b/beacon-chain/sync/rpc_goodbye.go @@ -42,7 +42,7 @@ func (s *Service) goodbyeRPCHandler(_ context.Context, msg interface{}, stream l return fmt.Errorf("wrong message type for goodbye, got %T, wanted *uint64", msg) } if err := s.rateLimiter.validateRequest(stream, 1); err != nil { - log.WithError(err).Debug("Goodbye message from rate-limited peer.") + log.WithError(err).Debug("Goodbye message from rate-limited peer") } else { s.rateLimiter.add(stream, 1) } @@ -65,7 +65,12 @@ func (s *Service) disconnectBadPeer(ctx context.Context, id peer.ID, badPeerErr log.WithError(err).Debug("Error when disconnecting with bad peer") } - log.WithError(badPeerErr).WithField("peerID", id).Debug("Initiate peer disconnection") + log.WithError(badPeerErr). + WithFields(logrus.Fields{ + "peerID": id, + "agent": agentString(id, s.cfg.p2p.Host()), + }). + Debug("Sent peer disconnection") } // A custom goodbye method that is used by our connection handler, in the diff --git a/beacon-chain/sync/rpc_status.go b/beacon-chain/sync/rpc_status.go index fbc41bbd44..6f297cc69b 100644 --- a/beacon-chain/sync/rpc_status.go +++ b/beacon-chain/sync/rpc_status.go @@ -217,6 +217,7 @@ func (s *Service) statusRPCHandler(ctx context.Context, msg interface{}, stream log.WithFields(logrus.Fields{ "peer": remotePeer, "error": err, + "agent": agentString(remotePeer, s.cfg.p2p.Host()), }).Debug("Invalid status message from peer") var respCode byte diff --git a/beacon-chain/sync/subscriber.go b/beacon-chain/sync/subscriber.go index 51d59bd837..845a8a46fe 100644 --- a/beacon-chain/sync/subscriber.go +++ b/beacon-chain/sync/subscriber.go @@ -2,11 +2,9 @@ package sync import ( "context" - "errors" "fmt" "reflect" "runtime/debug" - "slices" "strings" "time" @@ -21,7 +19,6 @@ import ( fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams" "github.com/OffchainLabs/prysm/v6/config/params" "github.com/OffchainLabs/prysm/v6/consensus-types/primitives" - "github.com/OffchainLabs/prysm/v6/container/slice" "github.com/OffchainLabs/prysm/v6/monitoring/tracing" "github.com/OffchainLabs/prysm/v6/monitoring/tracing/trace" "github.com/OffchainLabs/prysm/v6/network/forks" @@ -32,17 +29,50 @@ import ( pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" + "github.com/pkg/errors" "github.com/sirupsen/logrus" "google.golang.org/protobuf/proto" ) const pubsubMessageTimeout = 30 * time.Second -// wrappedVal represents a gossip validator which also returns an error along with the result. -type wrappedVal func(context.Context, peer.ID, *pubsub.Message) (pubsub.ValidationResult, error) +type ( + // wrappedVal represents a gossip validator which also returns an error along with the result. + wrappedVal func(context.Context, peer.ID, *pubsub.Message) (pubsub.ValidationResult, error) -// subHandler represents handler for a given subscription. -type subHandler func(context.Context, proto.Message) error + // subHandler represents handler for a given subscription. + subHandler func(context.Context, proto.Message) error + + // parameters used for the `subscribeWithParameters` function. + subscribeParameters struct { + topicFormat string + validate wrappedVal + handle subHandler + digest [4]byte + + // getSubnetsToJoin is a function that returns all subnets the node should join. + getSubnetsToJoin func(currentSlot primitives.Slot) map[uint64]bool + + // getSubnetsRequiringPeers is a function that returns all subnets that require peers to be found + // but for which no subscriptions are needed. + getSubnetsRequiringPeers func(currentSlot primitives.Slot) map[uint64]bool + } + + // parameters used for the `subscribeToSubnets` function. + subscribeToSubnetsParameters struct { + subscriptionBySubnet map[uint64]*pubsub.Subscription + topicFormat string + digest [4]byte + genesisValidatorsRoot [fieldparams.RootLength]byte + genesisTime time.Time + currentSlot primitives.Slot + validate wrappedVal + handle subHandler + getSubnetsToJoin func(currentSlot primitives.Slot) map[uint64]bool + } +) + +var errInvalidDigest = errors.New("invalid digest") // noopValidator is a no-op that only decodes the message, but does not check its contents. func (s *Service) noopValidator(_ context.Context, _ peer.ID, msg *pubsub.Message) (pubsub.ValidationResult, error) { @@ -55,28 +85,36 @@ func (s *Service) noopValidator(_ context.Context, _ peer.ID, msg *pubsub.Messag return pubsub.ValidationAccept, nil } -func sliceFromCount(count uint64) []uint64 { - result := make([]uint64, 0, count) - +func mapFromCount(count uint64) map[uint64]bool { + result := make(map[uint64]bool, count) for item := range count { - result = append(result, item) + result[item] = true } return result } -func (s *Service) activeSyncSubnetIndices(currentSlot primitives.Slot) []uint64 { - if flags.Get().SubscribeToAllSubnets { - return sliceFromCount(params.BeaconConfig().SyncCommitteeSubnetCount) +func mapFromSlice(slices ...[]uint64) map[uint64]bool { + result := make(map[uint64]bool) + + for _, slice := range slices { + for _, item := range slice { + result[item] = true + } + } + + return result +} + +func (s *Service) activeSyncSubnetIndices(currentSlot primitives.Slot) map[uint64]bool { + if flags.Get().SubscribeToAllSubnets { + return mapFromCount(params.BeaconConfig().SyncCommitteeSubnetCount) } - // Get the current epoch. currentEpoch := slots.ToEpoch(currentSlot) + subscriptions := cache.SyncSubnetIDs.GetAllSubnets(currentEpoch) - // Retrieve the subnets we want to subscribe to. - subs := cache.SyncSubnetIDs.GetAllSubnets(currentEpoch) - - return slice.SetUint64(subs) + return mapFromSlice(subscriptions) } // Register PubSub subscribers @@ -111,14 +149,14 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) { s.attesterSlashingSubscriber, digest, ) - s.subscribeWithParameters( - p2p.AttestationSubnetTopicFormat, - s.validateCommitteeIndexBeaconAttestation, - s.committeeIndexBeaconAttestationSubscriber, - digest, - s.persistentAndAggregatorSubnetIndices, - s.attesterSubnetIndices, - ) + s.subscribeWithParameters(subscribeParameters{ + topicFormat: p2p.AttestationSubnetTopicFormat, + validate: s.validateCommitteeIndexBeaconAttestation, + handle: s.committeeIndexBeaconAttestationSubscriber, + digest: digest, + getSubnetsToJoin: s.persistentAndAggregatorSubnetIndices, + getSubnetsRequiringPeers: attesterSubnetIndices, + }) // New gossip topic in Altair if params.BeaconConfig().AltairForkEpoch <= epoch { @@ -128,14 +166,15 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) { s.syncContributionAndProofSubscriber, digest, ) - s.subscribeWithParameters( - p2p.SyncCommitteeSubnetTopicFormat, - s.validateSyncCommitteeMessage, - s.syncCommitteeMessageSubscriber, - digest, - s.activeSyncSubnetIndices, - func(currentSlot primitives.Slot) []uint64 { return []uint64{} }, - ) + + s.subscribeWithParameters(subscribeParameters{ + topicFormat: p2p.SyncCommitteeSubnetTopicFormat, + validate: s.validateSyncCommitteeMessage, + handle: s.syncCommitteeMessageSubscriber, + digest: digest, + getSubnetsToJoin: s.activeSyncSubnetIndices, + }) + if features.Get().EnableLightClient { s.subscribe( p2p.LightClientOptimisticUpdateTopicFormat, @@ -164,42 +203,39 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) { // New gossip topic in Deneb, removed in Electra if params.BeaconConfig().DenebForkEpoch <= epoch && epoch < params.BeaconConfig().ElectraForkEpoch { - s.subscribeWithParameters( - p2p.BlobSubnetTopicFormat, - s.validateBlob, - s.blobSubscriber, - digest, - func(currentSlot primitives.Slot) []uint64 { - return sliceFromCount(params.BeaconConfig().BlobsidecarSubnetCount) + s.subscribeWithParameters(subscribeParameters{ + topicFormat: p2p.BlobSubnetTopicFormat, + validate: s.validateBlob, + handle: s.blobSubscriber, + digest: digest, + getSubnetsToJoin: func(primitives.Slot) map[uint64]bool { + return mapFromCount(params.BeaconConfig().BlobsidecarSubnetCount) }, - func(currentSlot primitives.Slot) []uint64 { return []uint64{} }, - ) + }) } // New gossip topic in Electra, removed in Fulu if params.BeaconConfig().ElectraForkEpoch <= epoch && epoch < params.BeaconConfig().FuluForkEpoch { - s.subscribeWithParameters( - p2p.BlobSubnetTopicFormat, - s.validateBlob, - s.blobSubscriber, - digest, - func(currentSlot primitives.Slot) []uint64 { - return sliceFromCount(params.BeaconConfig().BlobsidecarSubnetCountElectra) + s.subscribeWithParameters(subscribeParameters{ + topicFormat: p2p.BlobSubnetTopicFormat, + validate: s.validateBlob, + handle: s.blobSubscriber, + digest: digest, + getSubnetsToJoin: func(currentSlot primitives.Slot) map[uint64]bool { + return mapFromCount(params.BeaconConfig().BlobsidecarSubnetCountElectra) }, - func(currentSlot primitives.Slot) []uint64 { return []uint64{} }, - ) + }) } // New gossip topic in Fulu. if params.BeaconConfig().FuluForkEpoch <= epoch { - s.subscribeWithParameters( - p2p.DataColumnSubnetTopicFormat, - s.validateDataColumn, - s.dataColumnSubscriber, - digest, - s.dataColumnSubnetIndices, - func(currentSlot primitives.Slot) []uint64 { return []uint64{} }, - ) + s.subscribeWithParameters(subscribeParameters{ + topicFormat: p2p.DataColumnSubnetTopicFormat, + validate: s.validateDataColumn, + handle: s.dataColumnSubscriber, + digest: digest, + getSubnetsToJoin: s.dataColumnSubnetIndices, + }) } } @@ -379,197 +415,185 @@ func (s *Service) wrapAndReportValidation(topic string, v wrappedVal) (string, p } } -// pruneSubscriptions unsubscribe from topics we are currently subscribed to but that are +// pruneSubscriptions unsubscribes from topics we are currently subscribed to but that are // not in the list of wanted subnets. +// This function mutates the `subscriptionBySubnet` map, which is used to keep track of the current subscriptions. func (s *Service) pruneSubscriptions( - subscriptions map[uint64]*pubsub.Subscription, - wantedSubs []uint64, + subscriptionBySubnet map[uint64]*pubsub.Subscription, + wantedSubnets map[uint64]bool, topicFormat string, digest [4]byte, ) { - for k, v := range subscriptions { - var wanted bool - for _, idx := range wantedSubs { - if k == idx { - wanted = true - break - } - } - - if !wanted && v != nil { - v.Cancel() - fullTopic := fmt.Sprintf(topicFormat, digest, k) + s.cfg.p2p.Encoding().ProtocolSuffix() - s.unSubscribeFromTopic(fullTopic) - delete(subscriptions, k) - } - } -} - -// searchForPeers searches for peers in the given subnets. -func (s *Service) searchForPeers( - ctx context.Context, - topicFormat string, - digest [4]byte, - currentSlot primitives.Slot, - getSubnetsToSubscribe func(currentSlot primitives.Slot) []uint64, - getSubnetsToFindPeersOnly func(currentSlot primitives.Slot) []uint64, -) { - // Retrieve the subnets we want to subscribe to. - subnetsToSubscribeIndex := getSubnetsToSubscribe(currentSlot) - - // Retrieve the subnets we want to find peers for. - subnetsToFindPeersOnlyIndex := getSubnetsToFindPeersOnly(currentSlot) - - // Combine the subnets to subscribe and the subnets to find peers for. - subnetsToFindPeersIndex := slice.SetUint64(append(subnetsToSubscribeIndex, subnetsToFindPeersOnlyIndex...)) - - // Find new peers for wanted subnets if needed. - for _, subnetIndex := range subnetsToFindPeersIndex { - topic := fmt.Sprintf(topicFormat, digest, subnetIndex) - - // Check if we have enough peers in the subnet. Skip if we do. - if s.enoughPeersAreConnected(topic) { + for subnet, subscription := range subscriptionBySubnet { + if subscription == nil { + // Should not happen, but just in case. + delete(subscriptionBySubnet, subnet) continue } - // Not enough peers in the subnet, we need to search for more. - _, err := s.cfg.p2p.FindPeersWithSubnet(ctx, topic, subnetIndex, flags.Get().MinimumPeersPerSubnet) - if err != nil { - log.WithError(err).Debug("Could not search for peers") + if wantedSubnets[subnet] { + // Nothing to prune. + continue } + + // We are subscribed to a subnet that is no longer wanted. + subscription.Cancel() + fullTopic := fmt.Sprintf(topicFormat, digest, subnet) + s.cfg.p2p.Encoding().ProtocolSuffix() + s.unSubscribeFromTopic(fullTopic) + delete(subscriptionBySubnet, subnet) } } -// subscribeToSubnets subscribes to needed subnets, unsubscribe from unneeded ones and search for more peers if needed. -// Returns `true` if the digest is valid (wrt. the current epoch), `false` otherwise. -func (s *Service) subscribeToSubnets( - topicFormat string, - digest [4]byte, - genesisValidatorsRoot [fieldparams.RootLength]byte, - genesisTime time.Time, - subscriptions map[uint64]*pubsub.Subscription, - currentSlot primitives.Slot, - validate wrappedVal, - handle subHandler, - getSubnetsToSubscribe func(currentSlot primitives.Slot) []uint64, -) bool { +// subscribeToSubnets subscribes to needed subnets and unsubscribe from unneeded ones. +// This functions mutates the `subscriptionBySubnet` map, which is used to keep track of the current subscriptions. +func (s *Service) subscribeToSubnets(p subscribeToSubnetsParameters) error { // Do not subscribe if not synced. if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() { - return true + return nil } // Check the validity of the digest. - valid, err := isDigestValid(digest, genesisTime, genesisValidatorsRoot) + valid, err := isDigestValid(p.digest, p.genesisTime, p.genesisValidatorsRoot) if err != nil { - log.Error(err) - return true + return errors.Wrap(err, "is digest valid") } - // Unsubscribe from all subnets if the digest is not valid. It's likely to be the case after a hard fork. + // Unsubscribe from all subnets if digest is not valid. It's likely to be the case after a hard fork. if !valid { - description := topicFormat - if pos := strings.LastIndex(topicFormat, "/"); pos != -1 { - description = topicFormat[pos+1:] - } - - if pos := strings.LastIndex(description, "_"); pos != -1 { - description = description[:pos] - } - - log.WithFields(logrus.Fields{ - "digest": fmt.Sprintf("%#x", digest), - "subnets": description, - }).Debug("Subnets with this digest are no longer valid, unsubscribing from all of them") - s.pruneSubscriptions(subscriptions, []uint64{}, topicFormat, digest) - return false + wantedSubnets := map[uint64]bool{} + s.pruneSubscriptions(p.subscriptionBySubnet, wantedSubnets, p.topicFormat, p.digest) + return errInvalidDigest } - // Retrieve the subnets we want to subscribe to. - subnetsToSubscribeIndex := getSubnetsToSubscribe(currentSlot) + // Retrieve the subnets we want to join. + subnetsToJoin := p.getSubnetsToJoin(p.currentSlot) // Remove subscriptions that are no longer wanted. - s.pruneSubscriptions(subscriptions, subnetsToSubscribeIndex, topicFormat, digest) + s.pruneSubscriptions(p.subscriptionBySubnet, subnetsToJoin, p.topicFormat, p.digest) - // Subscribe to wanted subnets. - for _, subnetIndex := range subnetsToSubscribeIndex { - subnetTopic := fmt.Sprintf(topicFormat, digest, subnetIndex) + // Subscribe to wanted and not already registered subnets. + for subnet := range subnetsToJoin { + subnetTopic := fmt.Sprintf(p.topicFormat, p.digest, subnet) - // Check if subscription exists. - if _, exists := subscriptions[subnetIndex]; exists { - continue + if _, exists := p.subscriptionBySubnet[subnet]; !exists { + subscription := s.subscribeWithBase(subnetTopic, p.validate, p.handle) + p.subscriptionBySubnet[subnet] = subscription } - - // We need to subscribe to the subnet. - subscription := s.subscribeWithBase(subnetTopic, validate, handle) - subscriptions[subnetIndex] = subscription } - return true + + return nil } // subscribeWithParameters subscribes to a list of subnets. -func (s *Service) subscribeWithParameters( - topicFormat string, - validate wrappedVal, - handle subHandler, - digest [4]byte, - getSubnetsToSubscribe func(currentSlot primitives.Slot) []uint64, - getSubnetsToFindPeersOnly func(currentSlot primitives.Slot) []uint64, -) { - // Initialize the subscriptions map. - subscriptions := make(map[uint64]*pubsub.Subscription) - - // Retrieve the genesis validators root. +func (s *Service) subscribeWithParameters(p subscribeParameters) { + minimumPeersPerSubnet := flags.Get().MinimumPeersPerSubnet + subscriptionBySubnet := make(map[uint64]*pubsub.Subscription) genesisValidatorsRoot := s.cfg.clock.GenesisValidatorsRoot() - - // Retrieve the epoch of the fork corresponding to the digest. - _, epoch, err := forks.RetrieveForkDataFromDigest(digest, genesisValidatorsRoot[:]) - if err != nil { - panic(err) // lint:nopanic -- Impossible condition. - } - - // Retrieve the base protobuf message. - base := p2p.GossipTopicMappings(topicFormat, epoch) - if base == nil { - panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topicFormat)) // lint:nopanic -- Impossible condition. - } - - // Retrieve the genesis time. genesisTime := s.cfg.clock.GenesisTime() - - // Define a ticker ticking every slot. secondsPerSlot := params.BeaconConfig().SecondsPerSlot - ticker := slots.NewSlotTicker(genesisTime, secondsPerSlot) - - // Retrieve the current slot. + secondsPerSlotDuration := time.Duration(secondsPerSlot) * time.Second currentSlot := s.cfg.clock.CurrentSlot() + neededSubnets := computeAllNeededSubnets(currentSlot, p.getSubnetsToJoin, p.getSubnetsRequiringPeers) - // Subscribe to subnets. - s.subscribeToSubnets(topicFormat, digest, genesisValidatorsRoot, genesisTime, subscriptions, currentSlot, validate, handle, getSubnetsToSubscribe) + shortTopicFormat := p.topicFormat + shortTopicFormatLen := len(shortTopicFormat) + if shortTopicFormatLen >= 3 && shortTopicFormat[shortTopicFormatLen-3:] == "_%d" { + shortTopicFormat = shortTopicFormat[:shortTopicFormatLen-3] + } - // Derive a new context and cancel function. - ctx, cancel := context.WithCancel(s.ctx) + shortTopic := fmt.Sprintf(shortTopicFormat, p.digest) + parameters := subscribeToSubnetsParameters{ + subscriptionBySubnet: subscriptionBySubnet, + topicFormat: p.topicFormat, + digest: p.digest, + genesisValidatorsRoot: genesisValidatorsRoot, + genesisTime: genesisTime, + currentSlot: currentSlot, + validate: p.validate, + handle: p.handle, + getSubnetsToJoin: p.getSubnetsToJoin, + } + + err := s.subscribeToSubnets(parameters) + if err != nil { + log.WithError(err).Error("Could not subscribe to subnets") + } + + // Subscribe to expected subnets and search for peers if needed at every slot. go func() { - // Search for peers. - s.searchForPeers(ctx, topicFormat, digest, currentSlot, getSubnetsToSubscribe, getSubnetsToFindPeersOnly) + func() { + ctx, cancel := context.WithTimeout(s.ctx, secondsPerSlotDuration) + defer cancel() + + if err := s.cfg.p2p.FindAndDialPeersWithSubnets(ctx, p.topicFormat, p.digest, minimumPeersPerSubnet, neededSubnets); err != nil && !errors.Is(err, context.DeadlineExceeded) { + log.WithError(err).Debug("Could not find peers with subnets") + } + }() + + slotTicker := slots.NewSlotTicker(genesisTime, secondsPerSlot) + defer slotTicker.Done() for { select { - case currentSlot := <-ticker.C(): - isDigestValid := s.subscribeToSubnets(topicFormat, digest, genesisValidatorsRoot, genesisTime, subscriptions, currentSlot, validate, handle, getSubnetsToSubscribe) + case currentSlot := <-slotTicker.C(): + parameters.currentSlot = currentSlot + if err := s.subscribeToSubnets(parameters); err != nil { + if errors.Is(err, errInvalidDigest) { + log.WithField("topics", shortTopic).Debug("Digest is invalid, stopping subscription") + return + } - // Stop the ticker if the digest is not valid. Likely to happen after a hard fork. - if !isDigestValid { - ticker.Done() - return + log.WithError(err).Error("Could not subscribe to subnets") + continue } - // Search for peers. - s.searchForPeers(ctx, topicFormat, digest, currentSlot, getSubnetsToSubscribe, getSubnetsToFindPeersOnly) + func() { + ctx, cancel := context.WithTimeout(s.ctx, secondsPerSlotDuration) + defer cancel() + + if err := s.cfg.p2p.FindAndDialPeersWithSubnets(ctx, p.topicFormat, p.digest, minimumPeersPerSubnet, neededSubnets); err != nil && !errors.Is(err, context.DeadlineExceeded) { + log.WithError(err).Debug("Could not find peers with subnets") + } + }() + + case <-s.ctx.Done(): + return + } + } + }() + + // Warn the user if we are not subscribed to enough peers in the subnets. + go func() { + log := log.WithField("minimum", minimumPeersPerSubnet) + logTicker := time.NewTicker(5 * time.Minute) + defer logTicker.Stop() + + for { + select { + case <-logTicker.C: + subnetsToFindPeersIndex := computeAllNeededSubnets(currentSlot, p.getSubnetsToJoin, p.getSubnetsRequiringPeers) + + isSubnetWithMissingPeers := false + // Find new peers for wanted subnets if needed. + for index := range subnetsToFindPeersIndex { + topic := fmt.Sprintf(p.topicFormat, p.digest, index) + + // Check if we have enough peers in the subnet. Skip if we do. + if count := s.connectedPeersCount(topic); count < minimumPeersPerSubnet { + isSubnetWithMissingPeers = true + log.WithFields(logrus.Fields{ + "topic": topic, + "actual": count, + }).Warning("Not enough connected peers") + } + } + + if !isSubnetWithMissingPeers { + log.WithField("topic", shortTopic).Info("All subnets have enough connected peers") + } case <-s.ctx.Done(): - cancel() - ticker.Done() return } } @@ -591,76 +615,82 @@ func (s *Service) unSubscribeFromTopic(topic string) { } } -// enoughPeersAreConnected checks if we have enough peers which are subscribed to the same subnet. -func (s *Service) enoughPeersAreConnected(subnetTopic string) bool { +// connectedPeersCount counts how many peer for a given topic are connected to the node. +func (s *Service) connectedPeersCount(subnetTopic string) int { topic := subnetTopic + s.cfg.p2p.Encoding().ProtocolSuffix() - threshold := flags.Get().MinimumPeersPerSubnet - peersWithSubnet := s.cfg.p2p.PubSub().ListPeers(topic) - peersWithSubnetCount := len(peersWithSubnet) - - return peersWithSubnetCount >= threshold + return len(peersWithSubnet) } -func (s *Service) dataColumnSubnetIndices(_ primitives.Slot) []uint64 { +func (s *Service) dataColumnSubnetIndices(primitives.Slot) map[uint64]bool { nodeID := s.cfg.p2p.NodeID() custodyGroupCount := s.cfg.custodyInfo.CustodyGroupSamplingSize(peerdas.Target) nodeInfo, _, err := peerdas.Info(nodeID, custodyGroupCount) if err != nil { log.WithError(err).Error("Could not retrieve peer info") - return []uint64{} + return nil } - return sliceFromMap(nodeInfo.DataColumnsSubnets, true /*sorted*/) + return nodeInfo.DataColumnsSubnets } -func (s *Service) persistentAndAggregatorSubnetIndices(currentSlot primitives.Slot) []uint64 { +func (s *Service) persistentAndAggregatorSubnetIndices(currentSlot primitives.Slot) map[uint64]bool { if flags.Get().SubscribeToAllSubnets { - return sliceFromCount(params.BeaconConfig().AttestationSubnetCount) + return mapFromCount(params.BeaconConfig().AttestationSubnetCount) } - persistentSubnetIndices := s.persistentSubnetIndices() - aggregatorSubnetIndices := s.aggregatorSubnetIndices(currentSlot) + persistentSubnetIndices := persistentSubnetIndices() + aggregatorSubnetIndices := aggregatorSubnetIndices(currentSlot) // Combine subscriptions to get all requested subscriptions. - return slice.SetUint64(append(persistentSubnetIndices, aggregatorSubnetIndices...)) + return mapFromSlice(persistentSubnetIndices, aggregatorSubnetIndices) } // filters out required peers for the node to function, not // pruning peers who are in our attestation subnets. func (s *Service) filterNeededPeers(pids []peer.ID) []peer.ID { + minimumPeersPerSubnet := flags.Get().MinimumPeersPerSubnet + currentSlot := s.cfg.clock.CurrentSlot() + // Exit early if nothing to filter. if len(pids) == 0 { return pids } + digest, err := s.currentForkDigest() if err != nil { log.WithError(err).Error("Could not compute fork digest") return pids } - currSlot := s.cfg.clock.CurrentSlot() - wantedSubs := s.persistentAndAggregatorSubnetIndices(currSlot) - wantedSubs = slice.SetUint64(append(wantedSubs, s.attesterSubnetIndices(currSlot)...)) + + wantedSubnets := make(map[uint64]bool) + for subnet := range s.persistentAndAggregatorSubnetIndices(currentSlot) { + wantedSubnets[subnet] = true + } + + for subnet := range attesterSubnetIndices(currentSlot) { + wantedSubnets[subnet] = true + } + topic := p2p.GossipTypeMapping[reflect.TypeOf(ðpb.Attestation{})] // Map of peers in subnets peerMap := make(map[peer.ID]bool) - - for _, sub := range wantedSubs { - subnetTopic := fmt.Sprintf(topic, digest, sub) + s.cfg.p2p.Encoding().ProtocolSuffix() - ps := s.cfg.p2p.PubSub().ListPeers(subnetTopic) - if len(ps) > flags.Get().MinimumPeersPerSubnet { + for subnet := range wantedSubnets { + subnetTopic := fmt.Sprintf(topic, digest, subnet) + s.cfg.p2p.Encoding().ProtocolSuffix() + peers := s.cfg.p2p.PubSub().ListPeers(subnetTopic) + if len(peers) > minimumPeersPerSubnet { // In the event we have more than the minimum, we can // mark the remaining as viable for pruning. - ps = ps[:flags.Get().MinimumPeersPerSubnet] + peers = peers[:minimumPeersPerSubnet] } + // Add peer to peer map. - for _, p := range ps { - // Even if the peer id has - // already been seen we still set - // it, as the outcome is the same. - peerMap[p] = true + for _, peer := range peers { + // Even if the peer ID has already been seen we still set it, + // as the outcome is the same. + peerMap[peer] = true } } @@ -716,6 +746,34 @@ func isDigestValid(digest [4]byte, genesis time.Time, genValRoot [32]byte) (bool return retDigest == digest, nil } +// computeAllNeededSubnets computes the subnets we want to join +// and the subnets for which we want to find peers. +func computeAllNeededSubnets( + currentSlot primitives.Slot, + getSubnetsToJoin func(currentSlot primitives.Slot) map[uint64]bool, + getSubnetsRequiringPeers func(currentSlot primitives.Slot) map[uint64]bool, +) map[uint64]bool { + // Retrieve the subnets we want to join. + subnetsToJoin := getSubnetsToJoin(currentSlot) + + // Retrieve the subnets we want to find peers into. + subnetsRequiringPeers := make(map[uint64]bool) + if getSubnetsRequiringPeers != nil { + subnetsRequiringPeers = getSubnetsRequiringPeers(currentSlot) + } + + // Combine the two maps to get all needed subnets. + neededSubnets := make(map[uint64]bool, len(subnetsToJoin)+len(subnetsRequiringPeers)) + for subnet := range subnetsToJoin { + neededSubnets[subnet] = true + } + for subnet := range subnetsRequiringPeers { + neededSubnets[subnet] = true + } + + return neededSubnets +} + func agentString(pid peer.ID, hst host.Host) string { rawVersion, storeErr := hst.Peerstore().Get(pid, "AgentVersion") agString, ok := rawVersion.(string) @@ -742,17 +800,3 @@ func errorIsIgnored(err error) bool { } return false } - -// sliceFromMap returns a sorted list of keys from a map. -func sliceFromMap(m map[uint64]bool, sorted ...bool) []uint64 { - result := make([]uint64, 0, len(m)) - for k := range m { - result = append(result, k) - } - - if len(sorted) > 0 && sorted[0] { - slices.Sort(result) - } - - return result -} diff --git a/beacon-chain/sync/subscriber_beacon_attestation.go b/beacon-chain/sync/subscriber_beacon_attestation.go index 8fb2d588fa..d5bd9158b9 100644 --- a/beacon-chain/sync/subscriber_beacon_attestation.go +++ b/beacon-chain/sync/subscriber_beacon_attestation.go @@ -35,11 +35,11 @@ func (s *Service) committeeIndexBeaconAttestationSubscriber(_ context.Context, m } } -func (*Service) persistentSubnetIndices() []uint64 { +func persistentSubnetIndices() []uint64 { return cache.SubnetIDs.GetAllSubnets() } -func (*Service) aggregatorSubnetIndices(currentSlot primitives.Slot) []uint64 { +func aggregatorSubnetIndices(currentSlot primitives.Slot) []uint64 { endEpoch := slots.ToEpoch(currentSlot) + 1 endSlot := params.BeaconConfig().SlotsPerEpoch.Mul(uint64(endEpoch)) var commIds []uint64 @@ -49,12 +49,16 @@ func (*Service) aggregatorSubnetIndices(currentSlot primitives.Slot) []uint64 { return slice.SetUint64(commIds) } -func (*Service) attesterSubnetIndices(currentSlot primitives.Slot) []uint64 { +func attesterSubnetIndices(currentSlot primitives.Slot) map[uint64]bool { endEpoch := slots.ToEpoch(currentSlot) + 1 endSlot := params.BeaconConfig().SlotsPerEpoch.Mul(uint64(endEpoch)) - var commIds []uint64 + + subnets := make(map[uint64]bool, int(endSlot-currentSlot+1)) for i := currentSlot; i <= endSlot; i++ { - commIds = append(commIds, cache.SubnetIDs.GetAttesterSubnetIDs(i)...) + for _, subnetId := range cache.SubnetIDs.GetAttesterSubnetIDs(i) { + subnets[subnetId] = true + } } - return slice.SetUint64(commIds) + + return subnets } diff --git a/beacon-chain/sync/subscriber_test.go b/beacon-chain/sync/subscriber_test.go index 04156dc6af..d36996f8d1 100644 --- a/beacon-chain/sync/subscriber_test.go +++ b/beacon-chain/sync/subscriber_test.go @@ -310,7 +310,7 @@ func TestRevalidateSubscription_CorrectlyFormatsTopic(t *testing.T) { subscriptions[2], err = r.cfg.p2p.SubscribeToTopic(fullTopic) require.NoError(t, err) - r.pruneSubscriptions(subscriptions, []uint64{2}, defaultTopic, digest) + r.pruneSubscriptions(subscriptions, map[uint64]bool{2: true}, defaultTopic, digest) require.LogsDoNotContain(t, hook, "Could not unregister topic validator") } @@ -482,8 +482,7 @@ func TestFilterSubnetPeers(t *testing.T) { p2 := createPeer(t, subnet10, subnet20) p3 := createPeer(t) - // Connect to all - // peers. + // Connect to all peers. p.Connect(p1) p.Connect(p2) p.Connect(p3) @@ -540,7 +539,11 @@ func TestSubscribeWithSyncSubnets_DynamicOK(t *testing.T) { cache.SyncSubnetIDs.AddSyncCommitteeSubnets([]byte("pubkey"), currEpoch, []uint64{0, 1}, 10*time.Second) digest, err := r.currentForkDigest() assert.NoError(t, err) - r.subscribeWithParameters(p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest, r.activeSyncSubnetIndices, func(currentSlot primitives.Slot) []uint64 { return []uint64{} }) + r.subscribeWithParameters(subscribeParameters{ + topicFormat: p2p.SyncCommitteeSubnetTopicFormat, + digest: digest, + getSubnetsToJoin: r.activeSyncSubnetIndices, + }) time.Sleep(2 * time.Second) assert.Equal(t, 2, len(r.cfg.p2p.PubSub().GetTopics())) topicMap := map[string]bool{} @@ -589,7 +592,11 @@ func TestSubscribeWithSyncSubnets_DynamicSwitchFork(t *testing.T) { digest, err := signing.ComputeForkDigest(params.BeaconConfig().GenesisForkVersion, genRoot[:]) assert.NoError(t, err) - r.subscribeWithParameters(p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest, r.activeSyncSubnetIndices, func(currentSlot primitives.Slot) []uint64 { return []uint64{} }) + r.subscribeWithParameters(subscribeParameters{ + topicFormat: p2p.SyncCommitteeSubnetTopicFormat, + digest: digest, + getSubnetsToJoin: r.activeSyncSubnetIndices, + }) time.Sleep(2 * time.Second) assert.Equal(t, 2, len(r.cfg.p2p.PubSub().GetTopics())) topicMap := map[string]bool{} diff --git a/changelog/manu-subscriptions.md b/changelog/manu-subscriptions.md new file mode 100644 index 0000000000..45931a0065 --- /dev/null +++ b/changelog/manu-subscriptions.md @@ -0,0 +1,2 @@ +### Fixed +- Subnets subscription: Avoid dynamic subscribing blocking in case not enough peers per subnets are found. \ No newline at end of file diff --git a/config/params/config.go b/config/params/config.go index 1600c60629..ff777832f4 100644 --- a/config/params/config.go +++ b/config/params/config.go @@ -427,6 +427,7 @@ func (b *BeaconChainConfig) TargetBlobsPerBlock(slot primitives.Slot) int { return b.DeprecatedMaxBlobsPerBlock / 2 } +// MaxBlobsPerBlock returns the maximum number of blobs per block for the given slot. func (b *BeaconChainConfig) MaxBlobsPerBlock(slot primitives.Slot) int { epoch := primitives.Epoch(slot.DivSlot(b.SlotsPerEpoch)) @@ -449,6 +450,7 @@ func (b *BeaconChainConfig) MaxBlobsPerBlock(slot primitives.Slot) int { if epoch >= b.ElectraForkEpoch { return b.DeprecatedMaxBlobsPerBlockElectra } + return b.DeprecatedMaxBlobsPerBlock }