Subnets subscription: Avoid dynamic subscribing blocking in case not enough peers per subnets are found. (#15471)

* Subnets subscription: Avoid dynamic subscribing blocking in case not enough peers per subnets are found.

* `subscribeWithParameters`: Use struct to avoid too many function parameters (no functional changes).

* Optimise subnets search.

Currently, when we are looking for peers in let's say data column sidecars subnets 3, 6 and 7, we first look for peers in subnet 3.
If, during the crawling, we meet some peers with subnet 6, we discard them (because we are exclusively looking for peers with subnet 3).
When we are happy, we start again with peers with subnet 6.

This commit optimizes that by looking for peers with satisfy our constraints in one look.

* Fix James' comment.

* Fix James' comment.

* Fix James' comment.

* Fix James' commnet.

* Fix James' comment.

* Fix James' comment.

* Fix James's comment.

* Simplify following James' comment.

* Fix James' comment.

* Update beacon-chain/sync/rpc_goodbye.go

Co-authored-by: Preston Van Loon <pvanloon@offchainlabs.com>

* Update config/params/config.go

Co-authored-by: Preston Van Loon <pvanloon@offchainlabs.com>

* Update beacon-chain/sync/subscriber.go

Co-authored-by: Preston Van Loon <pvanloon@offchainlabs.com>

* Fix Preston's comment.

* Fix Preston's comment.

* `TestService_BroadcastDataColumn`: Re-add sleep 50 ms.

* Fix Preston's comment.

* Update beacon-chain/p2p/subnets.go

Co-authored-by: Preston Van Loon <pvanloon@offchainlabs.com>

---------

Co-authored-by: Preston Van Loon <pvanloon@offchainlabs.com>
This commit is contained in:
Manu NALEPA
2025-07-18 19:19:15 +02:00
committed by GitHub
parent 8b53887891
commit cd0821d026
27 changed files with 988 additions and 778 deletions

View File

@@ -87,7 +87,7 @@ func (mb *mockBroadcaster) BroadcastLightClientFinalityUpdate(_ context.Context,
return nil return nil
} }
func (mb *mockBroadcaster) BroadcastDataColumn(_ [fieldparams.RootLength]byte, _ uint64, _ *ethpb.DataColumnSidecar, _ ...chan<- bool) error { func (mb *mockBroadcaster) BroadcastDataColumn(_ [fieldparams.RootLength]byte, _ uint64, _ *ethpb.DataColumnSidecar) error {
mb.broadcastCalled = true mb.broadcastCalled = true
return nil return nil
} }

View File

@@ -68,7 +68,7 @@ func NewService(ctx context.Context, opts ...Option) (*Service, error) {
log.WithError(err).Error("Failed to check builder status") log.WithError(err).Error("Failed to check builder status")
} else { } else {
log.WithField("endpoint", s.c.NodeURL()).Info("Builder has been configured") log.WithField("endpoint", s.c.NodeURL()).Info("Builder has been configured")
log.Warn("Outsourcing block construction to external builders adds non-trivial delay to block propagation time. " + log.Warn("Outsourcing block construction to external builders adds non-trivial delay to block propagation time. " +
"Builder-constructed blocks or fallback blocks may get orphaned. Use at your own risk!") "Builder-constructed blocks or fallback blocks may get orphaned. Use at your own risk!")
} }
} }

View File

@@ -24,6 +24,8 @@ import (
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
) )
const minimumPeersPerSubnetForBroadcast = 1
// ErrMessageNotMapped occurs on a Broadcast attempt when a message has not been defined in the // ErrMessageNotMapped occurs on a Broadcast attempt when a message has not been defined in the
// GossipTypeMapping. // GossipTypeMapping.
var ErrMessageNotMapped = errors.New("message type is not mapped to a PubSub topic") var ErrMessageNotMapped = errors.New("message type is not mapped to a PubSub topic")
@@ -124,15 +126,13 @@ func (s *Service) internalBroadcastAttestation(ctx context.Context, subnet uint6
if err := func() error { if err := func() error {
s.subnetLocker(subnet).Lock() s.subnetLocker(subnet).Lock()
defer s.subnetLocker(subnet).Unlock() defer s.subnetLocker(subnet).Unlock()
ok, err := s.FindPeersWithSubnet(ctx, attestationToTopic(subnet, forkDigest), subnet, 1)
if err != nil { if err := s.FindAndDialPeersWithSubnets(ctx, AttestationSubnetTopicFormat, forkDigest, minimumPeersPerSubnetForBroadcast, map[uint64]bool{subnet: true}); err != nil {
return err return errors.Wrap(err, "find peers with subnets")
} }
if ok {
savedAttestationBroadcasts.Inc() savedAttestationBroadcasts.Inc()
return nil return nil
}
return errors.New("failed to find peers for subnet")
}(); err != nil { }(); err != nil {
log.WithError(err).Error("Failed to find peers") log.WithError(err).Error("Failed to find peers")
tracing.AnnotateError(span, err) tracing.AnnotateError(span, err)
@@ -183,15 +183,12 @@ func (s *Service) broadcastSyncCommittee(ctx context.Context, subnet uint64, sMs
if err := func() error { if err := func() error {
s.subnetLocker(wrappedSubIdx).Lock() s.subnetLocker(wrappedSubIdx).Lock()
defer s.subnetLocker(wrappedSubIdx).Unlock() defer s.subnetLocker(wrappedSubIdx).Unlock()
ok, err := s.FindPeersWithSubnet(ctx, syncCommitteeToTopic(subnet, forkDigest), subnet, 1) if err := s.FindAndDialPeersWithSubnets(ctx, SyncCommitteeSubnetTopicFormat, forkDigest, minimumPeersPerSubnetForBroadcast, map[uint64]bool{subnet: true}); err != nil {
if err != nil { return errors.Wrap(err, "find peers with subnets")
return err
} }
if ok {
savedSyncCommitteeBroadcasts.Inc() savedSyncCommitteeBroadcasts.Inc()
return nil return nil
}
return errors.New("failed to find peers for subnet")
}(); err != nil { }(); err != nil {
log.WithError(err).Error("Failed to find peers") log.WithError(err).Error("Failed to find peers")
tracing.AnnotateError(span, err) tracing.AnnotateError(span, err)
@@ -250,15 +247,13 @@ func (s *Service) internalBroadcastBlob(ctx context.Context, subnet uint64, blob
if err := func() error { if err := func() error {
s.subnetLocker(wrappedSubIdx).Lock() s.subnetLocker(wrappedSubIdx).Lock()
defer s.subnetLocker(wrappedSubIdx).Unlock() defer s.subnetLocker(wrappedSubIdx).Unlock()
ok, err := s.FindPeersWithSubnet(ctx, blobSubnetToTopic(subnet, forkDigest), subnet, 1)
if err != nil { if err := s.FindAndDialPeersWithSubnets(ctx, BlobSubnetTopicFormat, forkDigest, minimumPeersPerSubnetForBroadcast, map[uint64]bool{subnet: true}); err != nil {
return err return errors.Wrap(err, "find peers with subnets")
} }
if ok {
blobSidecarBroadcasts.Inc() blobSidecarBroadcasts.Inc()
return nil return nil
}
return errors.New("failed to find peers for subnet")
}(); err != nil { }(); err != nil {
log.WithError(err).Error("Failed to find peers") log.WithError(err).Error("Failed to find peers")
tracing.AnnotateError(span, err) tracing.AnnotateError(span, err)
@@ -329,7 +324,6 @@ func (s *Service) BroadcastDataColumn(
root [fieldparams.RootLength]byte, root [fieldparams.RootLength]byte,
dataColumnSubnet uint64, dataColumnSubnet uint64,
dataColumnSidecar *ethpb.DataColumnSidecar, dataColumnSidecar *ethpb.DataColumnSidecar,
peersCheckedChans ...chan<- bool, // Used for testing purposes to signal when peers are checked.
) error { ) error {
// Add tracing to the function. // Add tracing to the function.
ctx, span := trace.StartSpan(s.ctx, "p2p.BroadcastDataColumn") ctx, span := trace.StartSpan(s.ctx, "p2p.BroadcastDataColumn")
@@ -349,7 +343,7 @@ func (s *Service) BroadcastDataColumn(
} }
// Non-blocking broadcast, with attempts to discover a column subnet peer if none available. // Non-blocking broadcast, with attempts to discover a column subnet peer if none available.
go s.internalBroadcastDataColumn(ctx, root, dataColumnSubnet, dataColumnSidecar, forkDigest, peersCheckedChans) go s.internalBroadcastDataColumn(ctx, root, dataColumnSubnet, dataColumnSidecar, forkDigest)
return nil return nil
} }
@@ -360,7 +354,6 @@ func (s *Service) internalBroadcastDataColumn(
columnSubnet uint64, columnSubnet uint64,
dataColumnSidecar *ethpb.DataColumnSidecar, dataColumnSidecar *ethpb.DataColumnSidecar,
forkDigest [fieldparams.VersionLength]byte, forkDigest [fieldparams.VersionLength]byte,
peersCheckedChans []chan<- bool, // Used for testing purposes to signal when peers are checked.
) { ) {
// Add tracing to the function. // Add tracing to the function.
_, span := trace.StartSpan(ctx, "p2p.internalBroadcastDataColumn") _, span := trace.StartSpan(ctx, "p2p.internalBroadcastDataColumn")
@@ -382,7 +375,7 @@ func (s *Service) internalBroadcastDataColumn(
wrappedSubIdx := columnSubnet + dataColumnSubnetVal wrappedSubIdx := columnSubnet + dataColumnSubnetVal
// Find peers if needed. // Find peers if needed.
if err := s.findPeersIfNeeded(ctx, wrappedSubIdx, topic, columnSubnet, peersCheckedChans); err != nil { if err := s.findPeersIfNeeded(ctx, wrappedSubIdx, DataColumnSubnetTopicFormat, forkDigest, columnSubnet); err != nil {
log.WithError(err).Error("Failed to find peers for data column subnet") log.WithError(err).Error("Failed to find peers for data column subnet")
tracing.AnnotateError(span, err) tracing.AnnotateError(span, err)
} }
@@ -416,35 +409,19 @@ func (s *Service) internalBroadcastDataColumn(
func (s *Service) findPeersIfNeeded( func (s *Service) findPeersIfNeeded(
ctx context.Context, ctx context.Context,
wrappedSubIdx uint64, wrappedSubIdx uint64,
topic string, topicFormat string,
forkDigest [fieldparams.VersionLength]byte,
subnet uint64, subnet uint64,
peersCheckedChans []chan<- bool, // Used for testing purposes to signal when peers are checked.
) error { ) error {
// Sending a data column sidecar to only one peer is not ideal,
// but it ensures at least one peer receives it.
s.subnetLocker(wrappedSubIdx).Lock() s.subnetLocker(wrappedSubIdx).Lock()
defer s.subnetLocker(wrappedSubIdx).Unlock() defer s.subnetLocker(wrappedSubIdx).Unlock()
// Sending a data column sidecar to only one peer is not ideal,
// but it ensures at least one peer receives it.
const peerCount = 1
if s.hasPeerWithSubnet(topic) {
// Exit early if we already have peers with this subnet.
return nil
}
// Used for testing purposes.
if len(peersCheckedChans) > 0 {
peersCheckedChans[0] <- true
}
// No peers found, attempt to find peers with this subnet. // No peers found, attempt to find peers with this subnet.
ok, err := s.FindPeersWithSubnet(ctx, topic, subnet, peerCount) if err := s.FindAndDialPeersWithSubnets(ctx, topicFormat, forkDigest, minimumPeersPerSubnetForBroadcast, map[uint64]bool{subnet: true}); err != nil {
if err != nil {
return errors.Wrap(err, "find peers with subnet") return errors.Wrap(err, "find peers with subnet")
} }
if !ok {
return errors.Errorf("failed to find peers for topic %s with subnet %d", topic, subnet)
}
return nil return nil
} }

View File

@@ -216,9 +216,10 @@ func TestService_BroadcastAttestation(t *testing.T) {
} }
func TestService_BroadcastAttestationWithDiscoveryAttempts(t *testing.T) { func TestService_BroadcastAttestationWithDiscoveryAttempts(t *testing.T) {
const port = uint(2000)
// Setup bootnode. // Setup bootnode.
cfg := &Config{PingInterval: testPingInterval} cfg := &Config{PingInterval: testPingInterval}
port := 2000
cfg.UDPPort = uint(port) cfg.UDPPort = uint(port)
_, pkey := createAddrAndPrivKey(t) _, pkey := createAddrAndPrivKey(t)
ipAddr := net.ParseIP("127.0.0.1") ipAddr := net.ParseIP("127.0.0.1")
@@ -245,7 +246,7 @@ func TestService_BroadcastAttestationWithDiscoveryAttempts(t *testing.T) {
PingInterval: testPingInterval, PingInterval: testPingInterval,
} }
// Setup 2 different hosts // Setup 2 different hosts
for i := 1; i <= 2; i++ { for i := uint(1); i <= 2; i++ {
h, pkey, ipAddr := createHost(t, port+i) h, pkey, ipAddr := createHost(t, port+i)
cfg.UDPPort = uint(port + i) cfg.UDPPort = uint(port + i)
cfg.TCPPort = uint(port + i) cfg.TCPPort = uint(port + i)
@@ -687,7 +688,7 @@ func TestService_BroadcastDataColumn(t *testing.T) {
// Create a host. // Create a host.
_, pkey, ipAddr := createHost(t, port) _, pkey, ipAddr := createHost(t, port)
p := &Service{ service := &Service{
ctx: t.Context(), ctx: t.Context(),
host: p1.BHost, host: p1.BHost,
pubsub: p1.PubSub(), pubsub: p1.PubSub(),
@@ -701,56 +702,44 @@ func TestService_BroadcastDataColumn(t *testing.T) {
} }
// Create a listener. // Create a listener.
listener, err := p.startDiscoveryV5(ipAddr, pkey) listener, err := service.startDiscoveryV5(ipAddr, pkey)
require.NoError(t, err) require.NoError(t, err)
p.dv5Listener = listener service.dv5Listener = listener
digest, err := p.currentForkDigest() digest, err := service.currentForkDigest()
require.NoError(t, err) require.NoError(t, err)
subnet := peerdas.ComputeSubnetForDataColumnSidecar(columnIndex) subnet := peerdas.ComputeSubnetForDataColumnSidecar(columnIndex)
topic := fmt.Sprintf(topicFormat, digest, subnet) topic := fmt.Sprintf(topicFormat, digest, subnet) + service.Encoding().ProtocolSuffix()
roSidecars, _ := util.CreateTestVerifiedRoDataColumnSidecars(t, []util.DataColumnParam{{Index: columnIndex}}) roSidecars, _ := util.CreateTestVerifiedRoDataColumnSidecars(t, []util.DataColumnParam{{Index: columnIndex}})
sidecar := roSidecars[0].DataColumnSidecar sidecar := roSidecars[0].DataColumnSidecar
// Async listen for the pubsub, must be before the broadcast.
var wg sync.WaitGroup
wg.Add(1)
peersChecked := make(chan bool, 0)
go func(tt *testing.T) {
defer wg.Done()
ctx, cancel := context.WithTimeout(t.Context(), 5*time.Second)
defer cancel()
// Wait for the peers to be checked.
<-peersChecked
// External peer subscribes to the topic.
topic += p.Encoding().ProtocolSuffix()
sub, err := p2.SubscribeToTopic(topic)
require.NoError(tt, err)
msg, err := sub.Next(ctx)
require.NoError(tt, err)
var result ethpb.DataColumnSidecar
require.NoError(tt, p.Encoding().DecodeGossip(msg.Data, &result))
require.DeepEqual(tt, &result, sidecar)
}(t)
var emptyRoot [fieldparams.RootLength]byte
// Attempt to broadcast nil object should fail. // Attempt to broadcast nil object should fail.
err = p.BroadcastDataColumn(emptyRoot, subnet, nil) var emptyRoot [fieldparams.RootLength]byte
err = service.BroadcastDataColumn(emptyRoot, subnet, nil)
require.ErrorContains(t, "attempted to broadcast nil", err) require.ErrorContains(t, "attempted to broadcast nil", err)
// Broadcast to peers and wait. // Subscribe to the topic.
err = p.BroadcastDataColumn(emptyRoot, subnet, sidecar, peersChecked) sub, err := p2.SubscribeToTopic(topic)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, false, util.WaitTimeout(&wg, 1*time.Minute), "Failed to receive pubsub within 1s")
// libp2p fails without this delay
time.Sleep(50 * time.Millisecond)
// Broadcast to peers and wait.
err = service.BroadcastDataColumn(emptyRoot, subnet, sidecar)
require.NoError(t, err)
// Receive the message.
ctx, cancel := context.WithTimeout(t.Context(), 5*time.Second)
defer cancel()
msg, err := sub.Next(ctx)
require.NoError(t, err)
var result ethpb.DataColumnSidecar
require.NoError(t, service.Encoding().DecodeGossip(msg.Data, &result))
require.DeepEqual(t, &result, sidecar)
} }

View File

@@ -19,8 +19,7 @@ const (
// Burst limit for inbound dials. // Burst limit for inbound dials.
ipBurst = 8 ipBurst = 8
// High watermark buffer signifies the buffer till which // High watermark buffer signifies the buffer till which we will handle inbound requests.
// we will handle inbound requests.
highWatermarkBuffer = 20 highWatermarkBuffer = 20
) )
@@ -53,7 +52,7 @@ func (s *Service) InterceptAccept(n network.ConnMultiaddrs) (allow bool) {
"reason": "exceeded dial limit"}).Trace("Not accepting inbound dial from ip address") "reason": "exceeded dial limit"}).Trace("Not accepting inbound dial from ip address")
return false return false
} }
if s.isPeerAtLimit(true /* inbound */) { if s.isPeerAtLimit(inbound) {
log.WithFields(logrus.Fields{"peer": n.RemoteMultiaddr(), log.WithFields(logrus.Fields{"peer": n.RemoteMultiaddr(),
"reason": "at peer limit"}).Trace("Not accepting inbound dial") "reason": "at peer limit"}).Trace("Not accepting inbound dial")
return false return false

View File

@@ -2,7 +2,9 @@ package p2p
import ( import (
"bytes" "bytes"
"context"
"crypto/ecdsa" "crypto/ecdsa"
"math"
"net" "net"
"sync" "sync"
"time" "time"
@@ -23,45 +25,56 @@ import (
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/prysmaticlabs/go-bitfield" "github.com/prysmaticlabs/go-bitfield"
"github.com/sirupsen/logrus"
) )
type ListenerRebooter interface { type (
Listener // ListenerRebooter is an interface that extends the Listener interface
RebootListener() error // with the `RebootListener` method.
} ListenerRebooter interface {
Listener
RebootListener() error
}
// Listener defines the discovery V5 network interface that is used // Listener defines the discovery V5 network interface that is used
// to communicate with other peers. // to communicate with other peers.
type Listener interface { Listener interface {
Self() *enode.Node Self() *enode.Node
Close() Close()
Lookup(enode.ID) []*enode.Node Lookup(enode.ID) []*enode.Node
Resolve(*enode.Node) *enode.Node Resolve(*enode.Node) *enode.Node
RandomNodes() enode.Iterator RandomNodes() enode.Iterator
Ping(*enode.Node) error Ping(*enode.Node) error
RequestENR(*enode.Node) (*enode.Node, error) RequestENR(*enode.Node) (*enode.Node, error)
LocalNode() *enode.LocalNode LocalNode() *enode.LocalNode
} }
const ( quicProtocol uint16
udp4 = iota
udp6 listenerWrapper struct {
mu sync.RWMutex
listener *discover.UDPv5
listenerCreator func() (*discover.UDPv5, error)
}
connectivityDirection int
udpVersion int
) )
const quickProtocolEnrKey = "quic" const quickProtocolEnrKey = "quic"
type quicProtocol uint16 const (
udp4 udpVersion = iota
udp6
)
const (
inbound connectivityDirection = iota
all
)
// quicProtocol is the "quic" key, which holds the QUIC port of the node. // quicProtocol is the "quic" key, which holds the QUIC port of the node.
func (quicProtocol) ENRKey() string { return quickProtocolEnrKey } func (quicProtocol) ENRKey() string { return quickProtocolEnrKey }
type listenerWrapper struct {
mu sync.RWMutex
listener *discover.UDPv5
listenerCreator func() (*discover.UDPv5, error)
}
func newListener(listenerCreator func() (*discover.UDPv5, error)) (*listenerWrapper, error) { func newListener(listenerCreator func() (*discover.UDPv5, error)) (*listenerWrapper, error) {
rawListener, err := listenerCreator() rawListener, err := listenerCreator()
if err != nil { if err != nil {
@@ -276,29 +289,10 @@ func (s *Service) RefreshPersistentSubnets() {
// listen for new nodes watches for new nodes in the network and adds them to the peerstore. // listen for new nodes watches for new nodes in the network and adds them to the peerstore.
func (s *Service) listenForNewNodes() { func (s *Service) listenForNewNodes() {
const ( const (
minLogInterval = 1 * time.Minute
thresholdLimit = 5 thresholdLimit = 5
searchPeriod = 20 * time.Second
) )
peersSummary := func(threshold uint) (uint, uint) {
// Retrieve how many active peers we have.
activePeers := s.Peers().Active()
activePeerCount := uint(len(activePeers))
// Compute how many peers we are missing to reach the threshold.
if activePeerCount >= threshold {
return activePeerCount, 0
}
missingPeerCount := threshold - activePeerCount
return activePeerCount, missingPeerCount
}
var lastLogTime time.Time
iterator := s.dv5Listener.RandomNodes()
defer iterator.Close()
connectivityTicker := time.NewTicker(1 * time.Minute) connectivityTicker := time.NewTicker(1 * time.Minute)
thresholdCount := 0 thresholdCount := 0
@@ -330,74 +324,148 @@ func (s *Service) listenForNewNodes() {
continue continue
} }
iterator = s.dv5Listener.RandomNodes()
thresholdCount = 0 thresholdCount = 0
} }
default: default:
if s.isPeerAtLimit(false /* inbound */) { if s.isPeerAtLimit(all) {
// Pause the main loop for a period to stop looking // Pause the main loop for a period to stop looking for new peers.
// for new peers.
log.Trace("Not looking for peers, at peer limit") log.Trace("Not looking for peers, at peer limit")
time.Sleep(pollingPeriod) time.Sleep(pollingPeriod)
continue continue
} }
// Compute the number of new peers we want to dial. // Return early if the discovery listener isn't set.
activePeerCount, missingPeerCount := peersSummary(s.cfg.MaxPeers) if s.dv5Listener == nil {
return
fields := logrus.Fields{
"currentPeerCount": activePeerCount,
"targetPeerCount": s.cfg.MaxPeers,
} }
if missingPeerCount == 0 { func() {
log.Trace("Not looking for peers, at peer limit") ctx, cancel := context.WithTimeout(s.ctx, searchPeriod)
time.Sleep(pollingPeriod) defer cancel()
continue
}
if time.Since(lastLogTime) > minLogInterval { if err := s.findAndDialPeers(ctx); err != nil && !errors.Is(err, context.DeadlineExceeded) {
lastLogTime = time.Now() log.WithError(err).Error("Failed to find and dial peers")
log.WithFields(fields).Debug("Searching for new active peers")
}
// Restrict dials if limit is applied.
if flags.MaxDialIsActive() {
maxConcurrentDials := uint(flags.Get().MaxConcurrentDials)
missingPeerCount = min(missingPeerCount, maxConcurrentDials)
}
// Search for new peers.
wantedNodes := searchForPeers(iterator, batchPeriod, missingPeerCount, s.filterPeer)
wg := new(sync.WaitGroup)
for i := 0; i < len(wantedNodes); i++ {
node := wantedNodes[i]
peerInfo, _, err := convertToAddrInfo(node)
if err != nil {
log.WithError(err).Error("Could not convert to peer info")
continue
} }
}()
if peerInfo == nil {
continue
}
// Make sure that peer is not dialed too often, for each connection attempt there's a backoff period.
s.Peers().RandomizeBackOff(peerInfo.ID)
wg.Add(1)
go func(info *peer.AddrInfo) {
if err := s.connectWithPeer(s.ctx, *info); err != nil {
log.WithError(err).Tracef("Could not connect with peer %s", info.String())
}
wg.Done()
}(peerInfo)
}
wg.Wait()
} }
} }
} }
// FindAndDialPeersWithSubnets ensures that our node is connected to enough peers.
// If, the threshold is met, then this function immediately returns.
// Otherwise, it searches for new peers and dials them.
// If `ctx“ is canceled while searching for peers, search is stopped, but new found peers are still dialed.
// In this case, the function returns an error.
func (s *Service) findAndDialPeers(ctx context.Context) error {
// Restrict dials if limit is applied.
maxConcurrentDials := math.MaxInt
if flags.MaxDialIsActive() {
maxConcurrentDials = flags.Get().MaxConcurrentDials
}
missingPeerCount := s.missingPeerCount(s.cfg.MaxPeers)
for missingPeerCount > 0 {
// Stop the search/dialing loop if the context is canceled.
if err := ctx.Err(); err != nil {
return err
}
peersToDial, err := func() ([]*enode.Node, error) {
ctx, cancel := context.WithTimeout(ctx, batchPeriod)
defer cancel()
peersToDial, err := s.findPeers(ctx, missingPeerCount)
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
return nil, errors.Wrap(err, "find peers")
}
return peersToDial, nil
}()
if err != nil {
return err
}
dialedPeerCount := s.dialPeers(s.ctx, maxConcurrentDials, peersToDial)
if dialedPeerCount > missingPeerCount {
missingPeerCount = 0
continue
}
missingPeerCount -= dialedPeerCount
}
return nil
}
// findAndDialPeers finds new peers until `targetPeerCount` is reached, `batchPeriod` is over,
// the peers iterator is exhausted or the context is canceled.
func (s *Service) findPeers(ctx context.Context, missingPeerCount uint) ([]*enode.Node, error) {
// Create an discovery iterator to find new peers.
iterator := s.dv5Listener.RandomNodes()
// `iterator.Next` can block indefinitely. `iterator.Close` unblocks it.
// So it is important to close the iterator when the context is done to ensure
// that the search does not hang indefinitely.
go func() {
<-ctx.Done()
iterator.Close()
}()
// Crawl the network for peers subscribed to the defective subnets.
nodeByNodeID := make(map[enode.ID]*enode.Node)
for missingPeerCount > 0 && iterator.Next() {
if ctx.Err() != nil {
peersToDial := make([]*enode.Node, 0, len(nodeByNodeID))
for _, node := range nodeByNodeID {
peersToDial = append(peersToDial, node)
}
return peersToDial, ctx.Err()
}
// Skip peer not matching the filter.
node := iterator.Node()
if !s.filterPeer(node) {
continue
}
// Remove duplicates, keeping the node with higher seq.
existing, ok := nodeByNodeID[node.ID()]
if ok && existing.Seq() > node.Seq() {
continue
}
nodeByNodeID[node.ID()] = node
// We found a new peer. Decrease the missing peer count.
missingPeerCount--
}
// Convert the map to a slice.
peersToDial := make([]*enode.Node, 0, len(nodeByNodeID))
for _, node := range nodeByNodeID {
peersToDial = append(peersToDial, node)
}
return peersToDial, nil
}
// missingPeerCount computes how many peers we are missing to reach the target peer count.
func (s *Service) missingPeerCount(targetCount uint) uint {
// Retrieve how many active peers we have.
activePeers := s.Peers().Active()
activePeerCount := uint(len(activePeers))
// Compute how many peers we are missing to reach the threshold.
missingPeerCount := uint(0)
if targetCount > activePeerCount {
missingPeerCount = targetCount - activePeerCount
}
return missingPeerCount
}
func (s *Service) createListener( func (s *Service) createListener(
ipAddr net.IP, ipAddr net.IP,
privKey *ecdsa.PrivateKey, privKey *ecdsa.PrivateKey,
@@ -562,8 +630,7 @@ func (s *Service) startDiscoveryV5(
// 2. Peer hasn't been marked as 'bad'. // 2. Peer hasn't been marked as 'bad'.
// 3. Peer is not currently active or connected. // 3. Peer is not currently active or connected.
// 4. Peer is ready to receive incoming connections. // 4. Peer is ready to receive incoming connections.
// 5. Peer's fork digest in their ENR matches that of // 5. Peer's fork digest in their ENR matches that of our localnodes.
// our localnodes.
func (s *Service) filterPeer(node *enode.Node) bool { func (s *Service) filterPeer(node *enode.Node) bool {
// Ignore nil node entries passed in. // Ignore nil node entries passed in.
if node == nil { if node == nil {
@@ -628,22 +695,24 @@ func (s *Service) filterPeer(node *enode.Node) bool {
// This checks our set max peers in our config, and // This checks our set max peers in our config, and
// determines whether our currently connected and // determines whether our currently connected and
// active peers are above our set max peer limit. // active peers are above our set max peer limit.
func (s *Service) isPeerAtLimit(inbound bool) bool { func (s *Service) isPeerAtLimit(direction connectivityDirection) bool {
numOfConns := len(s.host.Network().Peers())
maxPeers := int(s.cfg.MaxPeers) maxPeers := int(s.cfg.MaxPeers)
// If we are measuring the limit for inbound peers
// we apply the high watermark buffer. // If we are measuring the limit for inbound peers we apply the high watermark buffer.
if inbound { if direction == inbound {
maxPeers += highWatermarkBuffer maxPeers += highWatermarkBuffer
maxInbound := s.peers.InboundLimit() + highWatermarkBuffer maxInbound := s.peers.InboundLimit() + highWatermarkBuffer
currInbound := len(s.peers.InboundConnected()) inboundCount := len(s.peers.InboundConnected())
// Exit early if we are at the inbound limit.
if currInbound >= maxInbound { // Return early if we are at the inbound limit.
if inboundCount >= maxInbound {
return true return true
} }
} }
activePeers := len(s.Peers().Active())
return activePeers >= maxPeers || numOfConns >= maxPeers peerCount := len(s.host.Network().Peers())
activePeerCount := len(s.Peers().Active())
return activePeerCount >= maxPeers || peerCount >= maxPeers
} }
// isBelowOutboundPeerThreshold checks if the number of outbound peers that // isBelowOutboundPeerThreshold checks if the number of outbound peers that
@@ -901,7 +970,7 @@ func multiAddrFromString(address string) (ma.Multiaddr, error) {
return ma.NewMultiaddr(address) return ma.NewMultiaddr(address)
} }
func udpVersionFromIP(ipAddr net.IP) int { func udpVersionFromIP(ipAddr net.IP) udpVersion {
if ipAddr.To4() != nil { if ipAddr.To4() != nil {
return udp4 return udp4
} }

View File

@@ -323,16 +323,16 @@ func TestMultiAddrConversion_OK(t *testing.T) {
} }
func TestStaticPeering_PeersAreAdded(t *testing.T) { func TestStaticPeering_PeersAreAdded(t *testing.T) {
const port = uint(6000)
cs := startup.NewClockSynchronizer() cs := startup.NewClockSynchronizer()
cfg := &Config{ cfg := &Config{
MaxPeers: 30, MaxPeers: 30,
ClockWaiter: cs, ClockWaiter: cs,
} }
port := 6000
var staticPeers []string var staticPeers []string
var hosts []host.Host var hosts []host.Host
// setup other nodes // setup other nodes
for i := 1; i <= 5; i++ { for i := uint(1); i <= 5; i++ {
h, _, ipaddr := createHost(t, port+i) h, _, ipaddr := createHost(t, port+i)
staticPeers = append(staticPeers, fmt.Sprintf("/ip4/%s/tcp/%d/p2p/%s", ipaddr, port+i, h.ID())) staticPeers = append(staticPeers, fmt.Sprintf("/ip4/%s/tcp/%d/p2p/%s", ipaddr, port+i, h.ID()))
hosts = append(hosts, h) hosts = append(hosts, h)
@@ -406,14 +406,14 @@ func TestInboundPeerLimit(t *testing.T) {
_ = addPeer(t, s.peers, peerdata.ConnectionState(ethpb.ConnectionState_CONNECTED), false) _ = addPeer(t, s.peers, peerdata.ConnectionState(ethpb.ConnectionState_CONNECTED), false)
} }
require.Equal(t, true, s.isPeerAtLimit(false), "not at limit for outbound peers") require.Equal(t, true, s.isPeerAtLimit(all), "not at limit for outbound peers")
require.Equal(t, false, s.isPeerAtLimit(true), "at limit for inbound peers") require.Equal(t, false, s.isPeerAtLimit(inbound), "at limit for inbound peers")
for i := 0; i < highWatermarkBuffer; i++ { for i := 0; i < highWatermarkBuffer; i++ {
_ = addPeer(t, s.peers, peerdata.ConnectionState(ethpb.ConnectionState_CONNECTED), false) _ = addPeer(t, s.peers, peerdata.ConnectionState(ethpb.ConnectionState_CONNECTED), false)
} }
require.Equal(t, true, s.isPeerAtLimit(true), "not at limit for inbound peers") require.Equal(t, true, s.isPeerAtLimit(inbound), "not at limit for inbound peers")
} }
func TestOutboundPeerThreshold(t *testing.T) { func TestOutboundPeerThreshold(t *testing.T) {

View File

@@ -10,6 +10,7 @@ import (
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers" "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers/peerdata" "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers/peerdata"
prysmTime "github.com/OffchainLabs/prysm/v6/time" prysmTime "github.com/OffchainLabs/prysm/v6/time"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors" "github.com/pkg/errors"
@@ -17,6 +18,8 @@ import (
) )
const ( const (
agentVersionKey = "AgentVersion"
// The time to wait for a status request. // The time to wait for a status request.
timeForStatus = 10 * time.Second timeForStatus = 10 * time.Second
) )
@@ -28,12 +31,15 @@ func peerMultiaddrString(conn network.Conn) string {
} }
func (s *Service) connectToPeer(conn network.Conn) { func (s *Service) connectToPeer(conn network.Conn) {
s.peers.SetConnectionState(conn.RemotePeer(), peers.Connected) remotePeer := conn.RemotePeer()
s.peers.SetConnectionState(remotePeer, peers.Connected)
// Go through the handshake process. // Go through the handshake process.
log.WithFields(logrus.Fields{ log.WithFields(logrus.Fields{
"direction": conn.Stat().Direction.String(), "direction": conn.Stat().Direction.String(),
"multiAddr": peerMultiaddrString(conn), "multiAddr": peerMultiaddrString(conn),
"activePeers": len(s.peers.Active()), "activePeers": len(s.peers.Active()),
"agent": agentString(remotePeer, s.Host()),
}).Debug("Initiate peer connection") }).Debug("Initiate peer connection")
} }
@@ -61,6 +67,7 @@ func (s *Service) disconnectFromPeerOnError(
"multiaddr": peerMultiaddrString(conn), "multiaddr": peerMultiaddrString(conn),
"direction": conn.Stat().Direction.String(), "direction": conn.Stat().Direction.String(),
"remainingActivePeers": len(s.peers.Active()), "remainingActivePeers": len(s.peers.Active()),
"agent": agentString(remotePeerID, s.Host()),
}). }).
Debug("Initiate peer disconnection") Debug("Initiate peer disconnection")
@@ -189,9 +196,10 @@ func (s *Service) AddDisconnectionHandler(handler func(ctx context.Context, id p
DisconnectedF: func(net network.Network, conn network.Conn) { DisconnectedF: func(net network.Network, conn network.Conn) {
peerID := conn.RemotePeer() peerID := conn.RemotePeer()
log.WithFields(logrus.Fields{ log := log.WithFields(logrus.Fields{
"multiAddr": peerMultiaddrString(conn), "multiAddr": peerMultiaddrString(conn),
"direction": conn.Stat().Direction.String(), "direction": conn.Stat().Direction.String(),
"agent": agentString(peerID, s.Host()),
}) })
// Must be handled in a goroutine as this callback cannot be blocking. // Must be handled in a goroutine as this callback cannot be blocking.
go func() { go func() {
@@ -222,3 +230,14 @@ func (s *Service) AddDisconnectionHandler(handler func(ctx context.Context, id p
}, },
}) })
} }
func agentString(pid peer.ID, hst host.Host) string {
rawVersion, storeErr := hst.Peerstore().Get(pid, agentVersionKey)
result, ok := rawVersion.(string)
if storeErr != nil || !ok {
result = ""
}
return result
}

View File

@@ -49,7 +49,7 @@ type (
BroadcastBlob(ctx context.Context, subnet uint64, blob *ethpb.BlobSidecar) error BroadcastBlob(ctx context.Context, subnet uint64, blob *ethpb.BlobSidecar) error
BroadcastLightClientOptimisticUpdate(ctx context.Context, update interfaces.LightClientOptimisticUpdate) error BroadcastLightClientOptimisticUpdate(ctx context.Context, update interfaces.LightClientOptimisticUpdate) error
BroadcastLightClientFinalityUpdate(ctx context.Context, update interfaces.LightClientFinalityUpdate) error BroadcastLightClientFinalityUpdate(ctx context.Context, update interfaces.LightClientFinalityUpdate) error
BroadcastDataColumn(root [fieldparams.RootLength]byte, columnSubnet uint64, dataColumnSidecar *ethpb.DataColumnSidecar, peersChecked ...chan<- bool) error BroadcastDataColumn(root [fieldparams.RootLength]byte, columnSubnet uint64, dataColumnSidecar *ethpb.DataColumnSidecar) error
} }
// SetStreamHandler configures p2p to handle streams of a certain topic ID. // SetStreamHandler configures p2p to handle streams of a certain topic ID.
@@ -98,7 +98,7 @@ type (
NodeID() enode.ID NodeID() enode.ID
DiscoveryAddresses() ([]multiaddr.Multiaddr, error) DiscoveryAddresses() ([]multiaddr.Multiaddr, error)
RefreshPersistentSubnets() RefreshPersistentSubnets()
FindPeersWithSubnet(ctx context.Context, topic string, subIndex uint64, threshold int) (bool, error) FindAndDialPeersWithSubnets(ctx context.Context, topicFormat string, digest [fieldparams.VersionLength]byte, minimumPeersPerSubnet int, subnets map[uint64]bool) error
AddPingMethod(reqFunc func(ctx context.Context, id peer.ID) error) AddPingMethod(reqFunc func(ctx context.Context, id peer.ID) error)
} }

View File

@@ -393,7 +393,7 @@ func (p *Status) SetNextValidTime(pid peer.ID, nextTime time.Time) {
peerData.NextValidTime = nextTime peerData.NextValidTime = nextTime
} }
// RandomizeBackOff adds extra backoff period during which peer will not be dialed. // RandomizeBackOff adds extra backoff period during which peer won't be dialed.
func (p *Status) RandomizeBackOff(pid peer.ID) { func (p *Status) RandomizeBackOff(pid peer.ID) {
p.store.Lock() p.store.Lock()
defer p.store.Unlock() defer p.store.Unlock()

View File

@@ -37,24 +37,28 @@ import (
var _ runtime.Service = (*Service)(nil) var _ runtime.Service = (*Service)(nil)
// In the event that we are at our peer limit, we const (
// stop looking for new peers and instead poll // When looking for new nodes, if not enough nodes are found,
// for the current peer limit status for the time period // we stop after this spent time.
// defined below. batchPeriod = 2 * time.Second
var pollingPeriod = 6 * time.Second
// When looking for new nodes, if not enough nodes are found, // maxBadResponses is the maximum number of bad responses from a peer before we stop talking to it.
// we stop after this spent time. maxBadResponses = 5
var batchPeriod = 2 * time.Second )
// Refresh rate of ENR set at twice per slot. var (
var refreshRate = slots.DivideSlotBy(2) // Refresh rate of ENR set at twice per slot.
refreshRate = slots.DivideSlotBy(2)
// maxBadResponses is the maximum number of bad responses from a peer before we stop talking to it. // maxDialTimeout is the timeout for a single peer dial.
const maxBadResponses = 5 maxDialTimeout = params.BeaconConfig().RespTimeoutDuration()
// maxDialTimeout is the timeout for a single peer dial. // In the event that we are at our peer limit, we
var maxDialTimeout = params.BeaconConfig().RespTimeoutDuration() // stop looking for new peers and instead poll
// for the current peer limit status for the time period
// defined below.
pollingPeriod = 6 * time.Second
)
// Service for managing peer to peer (p2p) networking. // Service for managing peer to peer (p2p) networking.
type Service struct { type Service struct {
@@ -251,6 +255,7 @@ func (s *Service) Start() {
"inboundTCP": inboundTCPCount, "inboundTCP": inboundTCPCount,
"outboundTCP": outboundTCPCount, "outboundTCP": outboundTCPCount,
"total": total, "total": total,
"target": s.cfg.MaxPeers,
} }
if features.Get().EnableQUIC { if features.Get().EnableQUIC {

View File

@@ -13,6 +13,7 @@ import (
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers" "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers/scorers" "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers/scorers"
"github.com/OffchainLabs/prysm/v6/beacon-chain/startup" "github.com/OffchainLabs/prysm/v6/beacon-chain/startup"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/config/params" "github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/encoding/bytesutil" "github.com/OffchainLabs/prysm/v6/encoding/bytesutil"
"github.com/OffchainLabs/prysm/v6/network/forks" "github.com/OffchainLabs/prysm/v6/network/forks"
@@ -72,7 +73,7 @@ func (mockListener) RandomNodes() enode.Iterator {
func (mockListener) RebootListener() error { panic("implement me") } func (mockListener) RebootListener() error { panic("implement me") }
func createHost(t *testing.T, port int) (host.Host, *ecdsa.PrivateKey, net.IP) { func createHost(t *testing.T, port uint) (host.Host, *ecdsa.PrivateKey, net.IP) {
_, pkey := createAddrAndPrivKey(t) _, pkey := createAddrAndPrivKey(t)
ipAddr := net.ParseIP("127.0.0.1") ipAddr := net.ParseIP("127.0.0.1")
listen, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", ipAddr, port)) listen, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", ipAddr, port))
@@ -185,21 +186,33 @@ func TestService_Start_NoDiscoverFlag(t *testing.T) {
} }
func TestListenForNewNodes(t *testing.T) { func TestListenForNewNodes(t *testing.T) {
const (
port = uint(2000)
testPollingPeriod = 1 * time.Second
peerCount = 5
)
params.SetupTestConfigCleanup(t) params.SetupTestConfigCleanup(t)
// Setup bootnode. // Setup bootnode.
notifier := &mock.MockStateNotifier{} cfg := &Config{
cfg := &Config{StateNotifier: notifier, PingInterval: testPingInterval, DisableLivenessCheck: true} StateNotifier: &mock.MockStateNotifier{},
port := 2000 PingInterval: testPingInterval,
cfg.UDPPort = uint(port) DisableLivenessCheck: true,
UDPPort: port,
}
_, pkey := createAddrAndPrivKey(t) _, pkey := createAddrAndPrivKey(t)
ipAddr := net.ParseIP("127.0.0.1") ipAddr := net.ParseIP("127.0.0.1")
genesisTime := prysmTime.Now() genesisTime := prysmTime.Now()
var gvr [32]byte var gvr [fieldparams.RootLength]byte
s := &Service{ s := &Service{
cfg: cfg, cfg: cfg,
genesisTime: genesisTime, genesisTime: genesisTime,
genesisValidatorsRoot: gvr[:], genesisValidatorsRoot: gvr[:],
} }
bootListener, err := s.createListener(ipAddr, pkey) bootListener, err := s.createListener(ipAddr, pkey)
require.NoError(t, err) require.NoError(t, err)
defer bootListener.Close() defer bootListener.Close()
@@ -210,35 +223,40 @@ func TestListenForNewNodes(t *testing.T) {
// Use shorter period for testing. // Use shorter period for testing.
currentPeriod := pollingPeriod currentPeriod := pollingPeriod
pollingPeriod = 1 * time.Second pollingPeriod = testPollingPeriod
defer func() { defer func() {
pollingPeriod = currentPeriod pollingPeriod = currentPeriod
}() }()
bootNode := bootListener.Self() bootNode := bootListener.Self()
var listeners []*listenerWrapper // Setup other nodes.
var hosts []host.Host
// setup other nodes.
cs := startup.NewClockSynchronizer() cs := startup.NewClockSynchronizer()
cfg = &Config{ listeners := make([]*listenerWrapper, 0, peerCount)
Discv5BootStrapAddrs: []string{bootNode.String()}, hosts := make([]host.Host, 0, peerCount)
PingInterval: testPingInterval,
DisableLivenessCheck: true, for i := uint(1); i <= peerCount; i++ {
MaxPeers: 30, cfg = &Config{
ClockWaiter: cs, Discv5BootStrapAddrs: []string{bootNode.String()},
} PingInterval: testPingInterval,
for i := 1; i <= 5; i++ { DisableLivenessCheck: true,
MaxPeers: peerCount,
ClockWaiter: cs,
UDPPort: port + i,
TCPPort: port + i,
}
h, pkey, ipAddr := createHost(t, port+i) h, pkey, ipAddr := createHost(t, port+i)
cfg.UDPPort = uint(port + i)
cfg.TCPPort = uint(port + i)
s := &Service{ s := &Service{
cfg: cfg, cfg: cfg,
genesisTime: genesisTime, genesisTime: genesisTime,
genesisValidatorsRoot: gvr[:], genesisValidatorsRoot: gvr[:],
} }
listener, err := s.startDiscoveryV5(ipAddr, pkey) listener, err := s.startDiscoveryV5(ipAddr, pkey)
assert.NoError(t, err, "Could not start discovery for node") require.NoError(t, err, "Could not start discovery for node")
listeners = append(listeners, listener) listeners = append(listeners, listener)
hosts = append(hosts, h) hosts = append(hosts, h)
} }
@@ -263,19 +281,26 @@ func TestListenForNewNodes(t *testing.T) {
s, err = NewService(t.Context(), cfg) s, err = NewService(t.Context(), cfg)
require.NoError(t, err) require.NoError(t, err)
exitRoutine := make(chan bool)
go func() {
s.Start()
<-exitRoutine
}()
time.Sleep(1 * time.Second)
require.NoError(t, cs.SetClock(startup.NewClock(genesisTime, gvr))) go s.Start()
time.Sleep(4 * time.Second) err = cs.SetClock(startup.NewClock(genesisTime, gvr))
assert.Equal(t, 5, len(s.host.Network().Peers()), "Not all peers added to peerstore") require.NoError(t, err, "Could not set clock in service")
require.NoError(t, s.Stop())
exitRoutine <- true actualPeerCount := len(s.host.Network().Peers())
for range 40 {
if actualPeerCount == peerCount {
break
}
time.Sleep(100 * time.Millisecond)
actualPeerCount = len(s.host.Network().Peers())
}
assert.Equal(t, peerCount, actualPeerCount, "Not all peers added to peerstore")
err = s.Stop()
require.NoError(t, err, "Failed to stop service")
} }
func TestPeer_Disconnect(t *testing.T) { func TestPeer_Disconnect(t *testing.T) {

View File

@@ -2,6 +2,7 @@ package p2p
import ( import (
"context" "context"
"fmt"
"math" "math"
"strings" "strings"
"sync" "sync"
@@ -11,6 +12,7 @@ import (
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/helpers" "github.com/OffchainLabs/prysm/v6/beacon-chain/core/helpers"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas" "github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v6/cmd/beacon-chain/flags" "github.com/OffchainLabs/prysm/v6/cmd/beacon-chain/flags"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/config/params" "github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives" "github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v6/consensus-types/wrapper" "github.com/OffchainLabs/prysm/v6/consensus-types/wrapper"
@@ -23,7 +25,6 @@ import (
"github.com/holiman/uint256" "github.com/holiman/uint256"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/prysmaticlabs/go-bitfield" "github.com/prysmaticlabs/go-bitfield"
"github.com/sirupsen/logrus"
) )
var ( var (
@@ -57,249 +58,297 @@ const blobSubnetLockerVal = 110
const dataColumnSubnetVal = 150 const dataColumnSubnetVal = 150
// nodeFilter returns a function that filters nodes based on the subnet topic and subnet index. // nodeFilter returns a function that filters nodes based on the subnet topic and subnet index.
func (s *Service) nodeFilter(topic string, index uint64) (func(node *enode.Node) bool, error) { func (s *Service) nodeFilter(topic string, indices map[uint64]int) (func(node *enode.Node) (map[uint64]bool, error), error) {
switch { switch {
case strings.Contains(topic, GossipAttestationMessage): case strings.Contains(topic, GossipAttestationMessage):
return s.filterPeerForAttSubnet(index), nil return s.filterPeerForAttSubnet(indices), nil
case strings.Contains(topic, GossipSyncCommitteeMessage): case strings.Contains(topic, GossipSyncCommitteeMessage):
return s.filterPeerForSyncSubnet(index), nil return s.filterPeerForSyncSubnet(indices), nil
case strings.Contains(topic, GossipBlobSidecarMessage): case strings.Contains(topic, GossipBlobSidecarMessage):
return s.filterPeerForBlobSubnet(), nil return s.filterPeerForBlobSubnet(indices), nil
case strings.Contains(topic, GossipDataColumnSidecarMessage): case strings.Contains(topic, GossipDataColumnSidecarMessage):
return s.filterPeerForDataColumnsSubnet(index), nil return s.filterPeerForDataColumnsSubnet(indices), nil
default: default:
return nil, errors.Errorf("no subnet exists for provided topic: %s", topic) return nil, errors.Errorf("no subnet exists for provided topic: %s", topic)
} }
} }
// searchForPeers performs a network search for peers subscribed to a particular subnet. // FindAndDialPeersWithSubnets ensures that our node is connected to at least `minimumPeersPerSubnet`
// It exits as soon as one of these conditions is met: // peers for each subnet listed in `subnets`.
// - It looped through `batchSize` nodes. // If, for all subnets, the threshold is met, then this function immediately returns.
// - It found `peersToFindCount“ peers corresponding to the `filter` criteria. // Otherwise, it searches for new peers for defective subnets, and dials them.
// - Iterator is exhausted. // If `ctx“ is canceled while searching for peers, search is stopped, but new found peers are still dialed.
func searchForPeers( // In this case, the function returns an error.
iterator enode.Iterator, func (s *Service) FindAndDialPeersWithSubnets(
batchPeriod time.Duration, ctx context.Context,
peersToFindCount uint, topicFormat string,
filter func(node *enode.Node) bool, digest [fieldparams.VersionLength]byte,
) []*enode.Node { minimumPeersPerSubnet int,
nodeFromNodeID := make(map[enode.ID]*enode.Node) subnets map[uint64]bool,
start := time.Now() ) error {
ctx, span := trace.StartSpan(ctx, "p2p.FindAndDialPeersWithSubnet")
defer span.End()
for time.Since(start) < batchPeriod && uint(len(nodeFromNodeID)) < peersToFindCount && iterator.Next() { // Return early if the discovery listener isn't set.
if s.dv5Listener == nil {
return nil
}
// Restrict dials if limit is applied.
maxConcurrentDials := math.MaxInt
if flags.MaxDialIsActive() {
maxConcurrentDials = flags.Get().MaxConcurrentDials
}
defectiveSubnets := s.defectiveSubnets(topicFormat, digest, minimumPeersPerSubnet, subnets)
for len(defectiveSubnets) > 0 {
// Stop the search/dialing loop if the context is canceled.
if err := ctx.Err(); err != nil {
return err
}
peersToDial, err := func() ([]*enode.Node, error) {
ctx, cancel := context.WithTimeout(ctx, batchPeriod)
defer cancel()
peersToDial, err := s.findPeersWithSubnets(ctx, topicFormat, digest, minimumPeersPerSubnet, defectiveSubnets)
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
return nil, errors.Wrap(err, "find peers with subnets")
}
return peersToDial, nil
}()
if err != nil {
return err
}
// Dial new peers in batches.
s.dialPeers(s.ctx, maxConcurrentDials, peersToDial)
defectiveSubnets = s.defectiveSubnets(topicFormat, digest, minimumPeersPerSubnet, subnets)
}
return nil
}
// findPeersWithSubnets finds peers subscribed to defective subnets in batches
// until enough peers are found or the context is canceled.
// It returns new peers found during the search.
func (s *Service) findPeersWithSubnets(
ctx context.Context,
topicFormat string,
digest [fieldparams.VersionLength]byte,
minimumPeersPerSubnet int,
defectiveSubnetsOrigin map[uint64]int,
) ([]*enode.Node, error) {
// Copy the defective subnets map to avoid modifying the original map.
defectiveSubnets := make(map[uint64]int, len(defectiveSubnetsOrigin))
for k, v := range defectiveSubnetsOrigin {
defectiveSubnets[k] = v
}
// Create an discovery iterator to find new peers.
iterator := s.dv5Listener.RandomNodes()
// `iterator.Next` can block indefinitely. `iterator.Close` unblocks it.
// So it is important to close the iterator when the context is done to ensure
// that the search does not hang indefinitely.
go func() {
<-ctx.Done()
iterator.Close()
}()
// Retrieve the filter function that will be used to filter nodes based on the defective subnets.
filter, err := s.nodeFilter(topicFormat, defectiveSubnets)
if err != nil {
return nil, errors.Wrap(err, "node filter")
}
// Crawl the network for peers subscribed to the defective subnets.
nodeByNodeID := make(map[enode.ID]*enode.Node)
for len(defectiveSubnets) > 0 && iterator.Next() {
if err := ctx.Err(); err != nil {
// Convert the map to a slice.
peersToDial := make([]*enode.Node, 0, len(nodeByNodeID))
for _, node := range nodeByNodeID {
peersToDial = append(peersToDial, node)
}
return peersToDial, err
}
// Get all needed subnets that the node is subscribed to.
// Skip nodes that are not subscribed to any of the defective subnets.
node := iterator.Node() node := iterator.Node()
nodeSubnets, err := filter(node)
// Filter out nodes that do not meet the criteria. if err != nil {
if !filter(node) { return nil, errors.Wrap(err, "filter node")
}
if len(nodeSubnets) == 0 {
continue continue
} }
// Remove duplicates, keeping the node with higher seq. // Remove duplicates, keeping the node with higher seq.
prevNode, ok := nodeFromNodeID[node.ID()] existing, ok := nodeByNodeID[node.ID()]
if ok && prevNode.Seq() > node.Seq() { if ok && existing.Seq() > node.Seq() {
continue continue
} }
nodeByNodeID[node.ID()] = node
nodeFromNodeID[node.ID()] = node // We found a new peer. Modify the defective subnets map
// and the filter accordingly.
for subnet := range defectiveSubnets {
if !nodeSubnets[subnet] {
continue
}
defectiveSubnets[subnet]--
if defectiveSubnets[subnet] == 0 {
delete(defectiveSubnets, subnet)
}
filter, err = s.nodeFilter(topicFormat, defectiveSubnets)
if err != nil {
return nil, errors.Wrap(err, "node filter")
}
}
} }
// Convert the map to a slice. // Convert the map to a slice.
nodes := make([]*enode.Node, 0, len(nodeFromNodeID)) peersToDial := make([]*enode.Node, 0, len(nodeByNodeID))
for _, node := range nodeFromNodeID { for _, node := range nodeByNodeID {
nodes = append(nodes, node) peersToDial = append(peersToDial, node)
} }
return nodes return peersToDial, nil
} }
// dialPeer dials a peer in a separate goroutine. // defectiveSubnets returns a map of subnets that have fewer than the minimum peer count.
func (s *Service) dialPeer(ctx context.Context, wg *sync.WaitGroup, node *enode.Node) { func (s *Service) defectiveSubnets(
info, _, err := convertToAddrInfo(node) topicFormat string,
if err != nil { digest [fieldparams.VersionLength]byte,
return minimumPeersPerSubnet int,
} subnets map[uint64]bool,
) map[uint64]int {
if info == nil { missingCountPerSubnet := make(map[uint64]int, len(subnets))
return for subnet := range subnets {
} topic := fmt.Sprintf(topicFormat, digest, subnet) + s.Encoding().ProtocolSuffix()
peers := s.pubsub.ListPeers(topic)
wg.Add(1) peerCount := len(peers)
go func() { if peerCount < minimumPeersPerSubnet {
if err := s.connectWithPeer(ctx, *info); err != nil { missingCountPerSubnet[subnet] = minimumPeersPerSubnet - peerCount
log.WithError(err).Tracef("Could not connect with peer %s", info.String())
} }
}
wg.Done() return missingCountPerSubnet
}()
} }
// FindPeersWithSubnet performs a network search for peers // dialPeers dials multiple peers concurrently up to `maxConcurrentDials` at a time.
// subscribed to a particular subnet. Then it tries to connect // In case of a dial failure, it logs the error but continues dialing other peers.
// with those peers. This method will block until either: func (s *Service) dialPeers(ctx context.Context, maxConcurrentDials int, nodes []*enode.Node) uint {
// - the required amount of peers are found, or var mut sync.Mutex
// - the context is terminated.
// On some edge cases, this method may hang indefinitely while peers
// are actually found. In such a case, the user should cancel the context
// and re-run the method again.
func (s *Service) FindPeersWithSubnet(
ctx context.Context,
topic string,
index uint64,
threshold int,
) (bool, error) {
const minLogInterval = 1 * time.Minute
ctx, span := trace.StartSpan(ctx, "p2p.FindPeersWithSubnet") counter := uint(0)
defer span.End() for start := 0; start < len(nodes); start += maxConcurrentDials {
if ctx.Err() != nil {
span.SetAttributes(trace.Int64Attribute("index", int64(index))) // lint:ignore uintcast -- It's safe to do this for tracing. return counter
if s.dv5Listener == nil {
// Return if discovery isn't set
return false, nil
}
topic += s.Encoding().ProtocolSuffix()
iterator := s.dv5Listener.RandomNodes()
defer iterator.Close()
filter, err := s.nodeFilter(topic, index)
if err != nil {
return false, errors.Wrap(err, "node filter")
}
peersSummary := func(topic string, threshold int) (int, int) {
// Retrieve how many peers we have for this topic.
peerCountForTopic := len(s.pubsub.ListPeers(topic))
// Compute how many peers we are missing to reach the threshold.
missingPeerCountForTopic := max(0, threshold-peerCountForTopic)
return peerCountForTopic, missingPeerCountForTopic
}
// Compute how many peers we are missing to reach the threshold.
peerCountForTopic, missingPeerCountForTopic := peersSummary(topic, threshold)
// Exit early if we have enough peers.
if missingPeerCountForTopic == 0 {
return true, nil
}
log := log.WithFields(logrus.Fields{
"topic": topic,
"targetPeerCount": threshold,
})
log.WithField("currentPeerCount", peerCountForTopic).Debug("Searching for new peers for a subnet - start")
lastLogTime := time.Now()
wg := new(sync.WaitGroup)
for {
// If the context is done, we can exit the loop. This is the unhappy path.
if err := ctx.Err(); err != nil {
return false, errors.Errorf(
"unable to find requisite number of peers for topic %s - only %d out of %d peers available after searching",
topic, peerCountForTopic, threshold,
)
} }
// Search for new peers in the network. var wg sync.WaitGroup
nodes := searchForPeers(iterator, batchPeriod, uint(missingPeerCountForTopic), filter) stop := min(start+maxConcurrentDials, len(nodes))
for _, node := range nodes[start:stop] {
// Restrict dials if limit is applied. log := log.WithField("nodeID", node.ID())
maxConcurrentDials := math.MaxInt info, _, err := convertToAddrInfo(node)
if flags.MaxDialIsActive() { if err != nil {
maxConcurrentDials = flags.Get().MaxConcurrentDials log.WithError(err).Debug("Could not convert node to addr info")
} continue
// Dial the peers in batches.
for start := 0; start < len(nodes); start += maxConcurrentDials {
stop := min(start+maxConcurrentDials, len(nodes))
for _, node := range nodes[start:stop] {
s.dialPeer(ctx, wg, node)
} }
// Wait for all dials to be completed. if info == nil {
wg.Wait() log.Debug("Nil addr info")
continue
}
wg.Add(1)
go func() {
defer wg.Done()
if err := s.connectWithPeer(ctx, *info); err != nil {
log.WithError(err).WithField("info", info.String()).Debug("Could not connect with peer")
return
}
mut.Lock()
defer mut.Unlock()
counter++
}()
} }
peerCountForTopic, missingPeerCountForTopic := peersSummary(topic, threshold) wg.Wait()
// If we have enough peers, we can exit the loop. This is the happy path.
if missingPeerCountForTopic == 0 {
break
}
if time.Since(lastLogTime) > minLogInterval {
lastLogTime = time.Now()
log.WithField("currentPeerCount", peerCountForTopic).Debug("Searching for new peers for a subnet - continue")
}
} }
log.WithField("currentPeerCount", threshold).Debug("Searching for new peers for a subnet - success") return counter
return true, nil
} }
// returns a method with filters peers specifically for a particular attestation subnet. // filterPeerForAttSubnet returns a method with filters peers specifically for a particular attestation subnet.
func (s *Service) filterPeerForAttSubnet(index uint64) func(node *enode.Node) bool { func (s *Service) filterPeerForAttSubnet(indices map[uint64]int) func(node *enode.Node) (map[uint64]bool, error) {
return func(node *enode.Node) bool { return func(node *enode.Node) (map[uint64]bool, error) {
if !s.filterPeer(node) { if !s.filterPeer(node) {
return false return map[uint64]bool{}, nil
} }
subnets, err := attSubnets(node.Record()) subnets, err := attestationSubnets(node.Record())
if err != nil { if err != nil {
return false return nil, errors.Wrap(err, "attestation subnets")
} }
return subnets[index] return intersect(indices, subnets), nil
} }
} }
// returns a method with filters peers specifically for a particular sync subnet. // returns a method with filters peers specifically for a particular sync subnet.
func (s *Service) filterPeerForSyncSubnet(index uint64) func(node *enode.Node) bool { func (s *Service) filterPeerForSyncSubnet(indices map[uint64]int) func(node *enode.Node) (map[uint64]bool, error) {
return func(node *enode.Node) bool { return func(node *enode.Node) (map[uint64]bool, error) {
if !s.filterPeer(node) { if !s.filterPeer(node) {
return false return map[uint64]bool{}, nil
} }
subnets, err := syncSubnets(node.Record()) subnets, err := syncSubnets(node.Record())
if err != nil { if err != nil {
return false return nil, errors.Wrap(err, "sync subnets")
} }
indExists := false
for _, comIdx := range subnets { return intersect(indices, subnets), nil
if comIdx == index {
indExists = true
break
}
}
return indExists
} }
} }
// returns a method with filters peers specifically for a particular blob subnet. // returns a method with filters peers specifically for a particular blob subnet.
// All peers are supposed to be subscribed to all blob subnets. // All peers are supposed to be subscribed to all blob subnets.
func (s *Service) filterPeerForBlobSubnet() func(_ *enode.Node) bool { func (s *Service) filterPeerForBlobSubnet(indices map[uint64]int) func(_ *enode.Node) (map[uint64]bool, error) {
return func(_ *enode.Node) bool { result := make(map[uint64]bool, len(indices))
return true for i := range indices {
result[i] = true
}
return func(_ *enode.Node) (map[uint64]bool, error) {
return result, nil
} }
} }
// returns a method with filters peers specifically for a particular data column subnet. // returns a method with filters peers specifically for a particular data column subnet.
func (s *Service) filterPeerForDataColumnsSubnet(index uint64) func(node *enode.Node) bool { func (s *Service) filterPeerForDataColumnsSubnet(indices map[uint64]int) func(node *enode.Node) (map[uint64]bool, error) {
return func(node *enode.Node) bool { return func(node *enode.Node) (map[uint64]bool, error) {
if !s.filterPeer(node) { if !s.filterPeer(node) {
return false return map[uint64]bool{}, nil
} }
subnets, err := dataColumnSubnets(node.ID(), node.Record()) subnets, err := dataColumnSubnets(node.ID(), node.Record())
if err != nil { if err != nil {
return false return nil, errors.Wrap(err, "data column subnets")
} }
return subnets[index] return intersect(indices, subnets), nil
} }
} }
@@ -475,43 +524,47 @@ func initializeSyncCommSubnets(node *enode.LocalNode) *enode.LocalNode {
// Reads the attestation subnets entry from a node's ENR and determines // Reads the attestation subnets entry from a node's ENR and determines
// the committee indices of the attestation subnets the node is subscribed to. // the committee indices of the attestation subnets the node is subscribed to.
func attSubnets(record *enr.Record) (map[uint64]bool, error) { func attestationSubnets(record *enr.Record) (map[uint64]bool, error) {
bitV, err := attBitvector(record) bitV, err := attBitvector(record)
if err != nil { if err != nil {
return nil, err return nil, errors.Wrap(err, "att bit vector")
}
committeeIdxs := make(map[uint64]bool)
// lint:ignore uintcast -- subnet count can be safely cast to int.
if len(bitV) != byteCount(int(attestationSubnetCount)) {
return committeeIdxs, errors.Errorf("invalid bitvector provided, it has a size of %d", len(bitV))
} }
for i := uint64(0); i < attestationSubnetCount; i++ { // lint:ignore uintcast -- subnet count can be safely cast to int.
if len(bitV) != byteCount(int(attestationSubnetCount)) {
return nil, errors.Errorf("invalid bitvector provided, it has a size of %d", len(bitV))
}
indices := make(map[uint64]bool, attestationSubnetCount)
for i := range attestationSubnetCount {
if bitV.BitAt(i) { if bitV.BitAt(i) {
committeeIdxs[i] = true indices[i] = true
} }
} }
return committeeIdxs, nil
return indices, nil
} }
// Reads the sync subnets entry from a node's ENR and determines // Reads the sync subnets entry from a node's ENR and determines
// the committee indices of the sync subnets the node is subscribed to. // the committee indices of the sync subnets the node is subscribed to.
func syncSubnets(record *enr.Record) ([]uint64, error) { func syncSubnets(record *enr.Record) (map[uint64]bool, error) {
bitV, err := syncBitvector(record) bitV, err := syncBitvector(record)
if err != nil { if err != nil {
return nil, err return nil, errors.Wrap(err, "sync bit vector")
} }
// lint:ignore uintcast -- subnet count can be safely cast to int. // lint:ignore uintcast -- subnet count can be safely cast to int.
if len(bitV) != byteCount(int(syncCommsSubnetCount)) { if len(bitV) != byteCount(int(syncCommsSubnetCount)) {
return []uint64{}, errors.Errorf("invalid bitvector provided, it has a size of %d", len(bitV)) return nil, errors.Errorf("invalid bitvector provided, it has a size of %d", len(bitV))
} }
var committeeIdxs []uint64
for i := uint64(0); i < syncCommsSubnetCount; i++ { indices := make(map[uint64]bool, syncCommsSubnetCount)
for i := range syncCommsSubnetCount {
if bitV.BitAt(i) { if bitV.BitAt(i) {
committeeIdxs = append(committeeIdxs, i) indices[i] = true
} }
} }
return committeeIdxs, nil return indices, nil
} }
// Retrieve the data columns subnets from a node's ENR and node ID. // Retrieve the data columns subnets from a node's ENR and node ID.
@@ -585,3 +638,16 @@ func byteCount(bitCount int) int {
} }
return numOfBytes return numOfBytes
} }
// interesect intersects two maps and returns a new map containing only the keys
// that are present in both maps.
func intersect(left map[uint64]int, right map[uint64]bool) map[uint64]bool {
result := make(map[uint64]bool, min(len(left), len(right)))
for i := range left {
if right[i] {
result[i] = true
}
}
return result
}

View File

@@ -5,7 +5,6 @@ import (
"crypto/rand" "crypto/rand"
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"reflect"
"testing" "testing"
"time" "time"
@@ -22,7 +21,7 @@ import (
"github.com/prysmaticlabs/go-bitfield" "github.com/prysmaticlabs/go-bitfield"
) )
func TestStartDiscV5_FindPeersWithSubnet(t *testing.T) { func TestStartDiscV5_FindAndDialPeersWithSubnet(t *testing.T) {
// Topology of this test: // Topology of this test:
// //
// //
@@ -37,7 +36,12 @@ func TestStartDiscV5_FindPeersWithSubnet(t *testing.T) {
// In our case: The node i is subscribed to subnet i, with i = 1, 2, 3 // In our case: The node i is subscribed to subnet i, with i = 1, 2, 3
// Define the genesis validators root, to ensure everybody is on the same network. // Define the genesis validators root, to ensure everybody is on the same network.
const genesisValidatorRootStr = "0xdeadbeefcafecafedeadbeefcafecafedeadbeefcafecafedeadbeefcafecafe" const (
genesisValidatorRootStr = "0xdeadbeefcafecafedeadbeefcafecafedeadbeefcafecafedeadbeefcafecafe"
subnetCount = 3
minimumPeersPerSubnet = 1
)
genesisValidatorsRoot, err := hex.DecodeString(genesisValidatorRootStr[2:]) genesisValidatorsRoot, err := hex.DecodeString(genesisValidatorRootStr[2:])
require.NoError(t, err) require.NoError(t, err)
@@ -87,13 +91,12 @@ func TestStartDiscV5_FindPeersWithSubnet(t *testing.T) {
// Create 3 nodes, each subscribed to a different subnet. // Create 3 nodes, each subscribed to a different subnet.
// Each node is connected to the bootstrap node. // Each node is connected to the bootstrap node.
services := make([]*Service, 0, 3) services := make([]*Service, 0, subnetCount)
for i := 1; i <= 3; i++ { for i := uint64(1); i <= subnetCount; i++ {
subnet := uint64(i)
service, err := NewService(ctx, &Config{ service, err := NewService(ctx, &Config{
Discv5BootStrapAddrs: []string{bootNodeENR}, Discv5BootStrapAddrs: []string{bootNodeENR},
MaxPeers: 30, MaxPeers: 0, // Set to 0 to ensure that peers are discovered via subnets search, and not generic peers discovery.
UDPPort: uint(2000 + i), UDPPort: uint(2000 + i),
TCPPort: uint(3000 + i), TCPPort: uint(3000 + i),
QUICPort: uint(3000 + i), QUICPort: uint(3000 + i),
@@ -115,12 +118,13 @@ func TestStartDiscV5_FindPeersWithSubnet(t *testing.T) {
// Set the ENR `attnets`, used by Prysm to filter peers by subnet. // Set the ENR `attnets`, used by Prysm to filter peers by subnet.
bitV := bitfield.NewBitvector64() bitV := bitfield.NewBitvector64()
bitV.SetBitAt(subnet, true) bitV.SetBitAt(i, true)
entry := enr.WithEntry(attSubnetEnrKey, &bitV) entry := enr.WithEntry(attSubnetEnrKey, &bitV)
service.dv5Listener.LocalNode().Set(entry) service.dv5Listener.LocalNode().Set(entry)
// Join and subscribe to the subnet, needed by libp2p. // Join and subscribe to the subnet, needed by libp2p.
topic, err := service.pubsub.Join(fmt.Sprintf(AttestationSubnetTopicFormat, bootNodeForkDigest, subnet) + "/ssz_snappy") topicName := fmt.Sprintf(AttestationSubnetTopicFormat, bootNodeForkDigest, i) + "/ssz_snappy"
topic, err := service.pubsub.Join(topicName)
require.NoError(t, err) require.NoError(t, err)
_, err = topic.Subscribe() _, err = topic.Subscribe()
@@ -160,37 +164,18 @@ func TestStartDiscV5_FindPeersWithSubnet(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
}() }()
// Look up 3 different subnets. subnets := map[uint64]bool{1: true, 2: true, 3: true}
exists := make([]bool, 0, 3) defectiveSubnets := service.defectiveSubnets(AttestationSubnetTopicFormat, bootNodeForkDigest, minimumPeersPerSubnet, subnets)
for i := 1; i <= 3; i++ { require.Equal(t, subnetCount, len(defectiveSubnets))
subnet := uint64(i)
topic := fmt.Sprintf(AttestationSubnetTopicFormat, bootNodeForkDigest, subnet)
exist := false ctxWithTimeOut, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
// This for loop is used to ensure we don't get stuck in `FindPeersWithSubnet`. err = service.FindAndDialPeersWithSubnets(ctxWithTimeOut, AttestationSubnetTopicFormat, bootNodeForkDigest, minimumPeersPerSubnet, subnets)
// Read the documentation of `FindPeersWithSubnet` for more details. require.NoError(t, err)
for j := 0; j < 3; j++ {
ctxWithTimeOut, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
exist, err = service.FindPeersWithSubnet(ctxWithTimeOut, topic, subnet, 1) defectiveSubnets = service.defectiveSubnets(AttestationSubnetTopicFormat, bootNodeForkDigest, minimumPeersPerSubnet, subnets)
require.NoError(t, err) require.Equal(t, 0, len(defectiveSubnets))
if exist {
break
}
}
require.NoError(t, err)
exists = append(exists, exist)
}
// Check if all peers are found.
for _, exist := range exists {
require.Equal(t, true, exist, "Peer with subnet doesn't exist")
}
} }
func Test_AttSubnets(t *testing.T) { func Test_AttSubnets(t *testing.T) {
@@ -305,37 +290,34 @@ func Test_AttSubnets(t *testing.T) {
wantErr: false, wantErr: false,
}, },
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
db, err := enode.OpenDB("") db, err := enode.OpenDB("")
assert.NoError(t, err) require.NoError(t, err)
priv, _, err := crypto.GenerateSecp256k1Key(rand.Reader) priv, _, err := crypto.GenerateSecp256k1Key(rand.Reader)
assert.NoError(t, err) require.NoError(t, err)
convertedKey, err := ecdsaprysm.ConvertFromInterfacePrivKey(priv) convertedKey, err := ecdsaprysm.ConvertFromInterfacePrivKey(priv)
assert.NoError(t, err) require.NoError(t, err)
localNode := enode.NewLocalNode(db, convertedKey) localNode := enode.NewLocalNode(db, convertedKey)
record := tt.record(localNode) record := tt.record(localNode)
got, err := attSubnets(record) got, err := attestationSubnets(record)
if (err != nil) != tt.wantErr { if (err != nil) != tt.wantErr {
t.Errorf("syncSubnets() error = %v, wantErr %v", err, tt.wantErr) t.Errorf("attestationSubnets() error = %v, wantErr %v", err, tt.wantErr)
return return
} }
if tt.wantErr { if tt.wantErr {
assert.ErrorContains(t, tt.errContains, err) require.ErrorContains(t, tt.errContains, err)
} }
want := make(map[uint64]bool, len(tt.want)) require.Equal(t, len(tt.want), len(got))
for _, subnet := range tt.want { for _, subnet := range tt.want {
want[subnet] = true require.Equal(t, true, got[subnet])
}
if !reflect.DeepEqual(got, want) {
t.Errorf("syncSubnets() got = %v, want %v", got, want)
} }
}) })
} }
@@ -494,11 +476,14 @@ func Test_SyncSubnets(t *testing.T) {
t.Errorf("syncSubnets() error = %v, wantErr %v", err, tt.wantErr) t.Errorf("syncSubnets() error = %v, wantErr %v", err, tt.wantErr)
return return
} }
if tt.wantErr { if tt.wantErr {
assert.ErrorContains(t, tt.errContains, err) assert.ErrorContains(t, tt.errContains, err)
} }
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("syncSubnets() got = %v, want %v", got, tt.want) require.Equal(t, len(tt.want), len(got))
for _, subnet := range tt.want {
require.Equal(t, true, got[subnet])
} }
}) })
} }

View File

@@ -68,9 +68,9 @@ func (*FakeP2P) DiscoveryAddresses() ([]multiaddr.Multiaddr, error) {
return nil, nil return nil, nil
} }
// FindPeersWithSubnet mocks the p2p func. // FindAndDialPeersWithSubnets mocks the p2p func.
func (*FakeP2P) FindPeersWithSubnet(_ context.Context, _ string, _ uint64, _ int) (bool, error) { func (*FakeP2P) FindAndDialPeersWithSubnets(ctx context.Context, topicFormat string, digest [fieldparams.VersionLength]byte, minimumPeersPerSubnet int, subnets map[uint64]bool) error {
return false, nil return nil
} }
// RefreshPersistentSubnets mocks the p2p func. // RefreshPersistentSubnets mocks the p2p func.
@@ -167,7 +167,7 @@ func (*FakeP2P) BroadcastLightClientFinalityUpdate(_ context.Context, _ interfac
} }
// BroadcastDataColumn -- fake. // BroadcastDataColumn -- fake.
func (*FakeP2P) BroadcastDataColumn(_ [fieldparams.RootLength]byte, _ uint64, _ *ethpb.DataColumnSidecar, _ ...chan<- bool) error { func (*FakeP2P) BroadcastDataColumn(_ [fieldparams.RootLength]byte, _ uint64, _ *ethpb.DataColumnSidecar) error {
return nil return nil
} }

View File

@@ -63,7 +63,7 @@ func (m *MockBroadcaster) BroadcastLightClientFinalityUpdate(_ context.Context,
} }
// BroadcastDataColumn broadcasts a data column for mock. // BroadcastDataColumn broadcasts a data column for mock.
func (m *MockBroadcaster) BroadcastDataColumn([fieldparams.RootLength]byte, uint64, *ethpb.DataColumnSidecar, ...chan<- bool) error { func (m *MockBroadcaster) BroadcastDataColumn([fieldparams.RootLength]byte, uint64, *ethpb.DataColumnSidecar) error {
m.BroadcastCalled.Store(true) m.BroadcastCalled.Store(true)
return nil return nil
} }

View File

@@ -4,6 +4,7 @@ import (
"context" "context"
"errors" "errors"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr" "github.com/ethereum/go-ethereum/p2p/enr"
"github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/host"
@@ -56,9 +57,9 @@ func (m *MockPeerManager) DiscoveryAddresses() ([]multiaddr.Multiaddr, error) {
// RefreshPersistentSubnets . // RefreshPersistentSubnets .
func (*MockPeerManager) RefreshPersistentSubnets() {} func (*MockPeerManager) RefreshPersistentSubnets() {}
// FindPeersWithSubnet . // FindAndDialPeersWithSubnet .
func (*MockPeerManager) FindPeersWithSubnet(_ context.Context, _ string, _ uint64, _ int) (bool, error) { func (*MockPeerManager) FindAndDialPeersWithSubnets(ctx context.Context, topicFormat string, digest [fieldparams.VersionLength]byte, minimumPeersPerSubnet int, subnets map[uint64]bool) error {
return true, nil return nil
} }
// AddPingMethod . // AddPingMethod .

View File

@@ -224,7 +224,7 @@ func (p *TestP2P) BroadcastLightClientFinalityUpdate(_ context.Context, _ interf
} }
// BroadcastDataColumn broadcasts a data column for mock. // BroadcastDataColumn broadcasts a data column for mock.
func (p *TestP2P) BroadcastDataColumn([fieldparams.RootLength]byte, uint64, *ethpb.DataColumnSidecar, ...chan<- bool) error { func (p *TestP2P) BroadcastDataColumn([fieldparams.RootLength]byte, uint64, *ethpb.DataColumnSidecar) error {
p.BroadcastCalled.Store(true) p.BroadcastCalled.Store(true)
return nil return nil
} }
@@ -408,9 +408,9 @@ func (p *TestP2P) Peers() *peers.Status {
return p.peers return p.peers
} }
// FindPeersWithSubnet mocks the p2p func. // FindAndDialPeersWithSubnets mocks the p2p func.
func (*TestP2P) FindPeersWithSubnet(_ context.Context, _ string, _ uint64, _ int) (bool, error) { func (*TestP2P) FindAndDialPeersWithSubnets(ctx context.Context, topicFormat string, digest [fieldparams.VersionLength]byte, minimumPeersPerSubnet int, subnets map[uint64]bool) error {
return false, nil return nil
} }
// RefreshPersistentSubnets mocks the p2p func. // RefreshPersistentSubnets mocks the p2p func.

View File

@@ -109,7 +109,12 @@ func isValidStreamError(err error) bool {
func closeStream(stream network.Stream, log *logrus.Entry) { func closeStream(stream network.Stream, log *logrus.Entry) {
if err := stream.Close(); isValidStreamError(err) { if err := stream.Close(); isValidStreamError(err) {
log.WithError(err).Debugf("Could not reset stream with protocol %s", stream.Protocol()) log.WithError(err).
WithFields(logrus.Fields{
"protocol": stream.Protocol(),
"peer": stream.Conn().RemotePeer(),
}).
Debug("Could not close stream")
} }
} }
@@ -118,7 +123,12 @@ func closeStreamAndWait(stream network.Stream, log *logrus.Entry) {
_err := stream.Reset() _err := stream.Reset()
_ = _err _ = _err
if isValidStreamError(err) { if isValidStreamError(err) {
log.WithError(err).Debugf("Could not reset stream with protocol %s", stream.Protocol()) log.WithError(err).
WithFields(logrus.Fields{
"protocol": stream.Protocol(),
"peer": stream.Conn().RemotePeer(),
}).
Debug("Could not reset stream")
} }
return return
} }

View File

@@ -236,7 +236,7 @@ func (s *Service) updateMetrics() {
if err != nil { if err != nil {
log.WithError(err).Debugf("Could not compute fork digest") log.WithError(err).Debugf("Could not compute fork digest")
} }
indices := s.aggregatorSubnetIndices(s.cfg.clock.CurrentSlot()) indices := aggregatorSubnetIndices(s.cfg.clock.CurrentSlot())
syncIndices := cache.SyncSubnetIDs.GetAllSubnets(slots.ToEpoch(s.cfg.clock.CurrentSlot())) syncIndices := cache.SyncSubnetIDs.GetAllSubnets(slots.ToEpoch(s.cfg.clock.CurrentSlot()))
attTopic := p2p.GossipTypeMapping[reflect.TypeOf(&pb.Attestation{})] attTopic := p2p.GossipTypeMapping[reflect.TypeOf(&pb.Attestation{})]
syncTopic := p2p.GossipTypeMapping[reflect.TypeOf(&pb.SyncCommitteeMessage{})] syncTopic := p2p.GossipTypeMapping[reflect.TypeOf(&pb.SyncCommitteeMessage{})]

View File

@@ -42,7 +42,7 @@ func (s *Service) goodbyeRPCHandler(_ context.Context, msg interface{}, stream l
return fmt.Errorf("wrong message type for goodbye, got %T, wanted *uint64", msg) return fmt.Errorf("wrong message type for goodbye, got %T, wanted *uint64", msg)
} }
if err := s.rateLimiter.validateRequest(stream, 1); err != nil { if err := s.rateLimiter.validateRequest(stream, 1); err != nil {
log.WithError(err).Debug("Goodbye message from rate-limited peer.") log.WithError(err).Debug("Goodbye message from rate-limited peer")
} else { } else {
s.rateLimiter.add(stream, 1) s.rateLimiter.add(stream, 1)
} }
@@ -65,7 +65,12 @@ func (s *Service) disconnectBadPeer(ctx context.Context, id peer.ID, badPeerErr
log.WithError(err).Debug("Error when disconnecting with bad peer") log.WithError(err).Debug("Error when disconnecting with bad peer")
} }
log.WithError(badPeerErr).WithField("peerID", id).Debug("Initiate peer disconnection") log.WithError(badPeerErr).
WithFields(logrus.Fields{
"peerID": id,
"agent": agentString(id, s.cfg.p2p.Host()),
}).
Debug("Sent peer disconnection")
} }
// A custom goodbye method that is used by our connection handler, in the // A custom goodbye method that is used by our connection handler, in the

View File

@@ -217,6 +217,7 @@ func (s *Service) statusRPCHandler(ctx context.Context, msg interface{}, stream
log.WithFields(logrus.Fields{ log.WithFields(logrus.Fields{
"peer": remotePeer, "peer": remotePeer,
"error": err, "error": err,
"agent": agentString(remotePeer, s.cfg.p2p.Host()),
}).Debug("Invalid status message from peer") }).Debug("Invalid status message from peer")
var respCode byte var respCode byte

View File

@@ -2,11 +2,9 @@ package sync
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"reflect" "reflect"
"runtime/debug" "runtime/debug"
"slices"
"strings" "strings"
"time" "time"
@@ -21,7 +19,6 @@ import (
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams" fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/config/params" "github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives" "github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v6/container/slice"
"github.com/OffchainLabs/prysm/v6/monitoring/tracing" "github.com/OffchainLabs/prysm/v6/monitoring/tracing"
"github.com/OffchainLabs/prysm/v6/monitoring/tracing/trace" "github.com/OffchainLabs/prysm/v6/monitoring/tracing/trace"
"github.com/OffchainLabs/prysm/v6/network/forks" "github.com/OffchainLabs/prysm/v6/network/forks"
@@ -32,17 +29,50 @@ import (
pubsub "github.com/libp2p/go-libp2p-pubsub" pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
) )
const pubsubMessageTimeout = 30 * time.Second const pubsubMessageTimeout = 30 * time.Second
// wrappedVal represents a gossip validator which also returns an error along with the result. type (
type wrappedVal func(context.Context, peer.ID, *pubsub.Message) (pubsub.ValidationResult, error) // wrappedVal represents a gossip validator which also returns an error along with the result.
wrappedVal func(context.Context, peer.ID, *pubsub.Message) (pubsub.ValidationResult, error)
// subHandler represents handler for a given subscription. // subHandler represents handler for a given subscription.
type subHandler func(context.Context, proto.Message) error subHandler func(context.Context, proto.Message) error
// parameters used for the `subscribeWithParameters` function.
subscribeParameters struct {
topicFormat string
validate wrappedVal
handle subHandler
digest [4]byte
// getSubnetsToJoin is a function that returns all subnets the node should join.
getSubnetsToJoin func(currentSlot primitives.Slot) map[uint64]bool
// getSubnetsRequiringPeers is a function that returns all subnets that require peers to be found
// but for which no subscriptions are needed.
getSubnetsRequiringPeers func(currentSlot primitives.Slot) map[uint64]bool
}
// parameters used for the `subscribeToSubnets` function.
subscribeToSubnetsParameters struct {
subscriptionBySubnet map[uint64]*pubsub.Subscription
topicFormat string
digest [4]byte
genesisValidatorsRoot [fieldparams.RootLength]byte
genesisTime time.Time
currentSlot primitives.Slot
validate wrappedVal
handle subHandler
getSubnetsToJoin func(currentSlot primitives.Slot) map[uint64]bool
}
)
var errInvalidDigest = errors.New("invalid digest")
// noopValidator is a no-op that only decodes the message, but does not check its contents. // noopValidator is a no-op that only decodes the message, but does not check its contents.
func (s *Service) noopValidator(_ context.Context, _ peer.ID, msg *pubsub.Message) (pubsub.ValidationResult, error) { func (s *Service) noopValidator(_ context.Context, _ peer.ID, msg *pubsub.Message) (pubsub.ValidationResult, error) {
@@ -55,28 +85,36 @@ func (s *Service) noopValidator(_ context.Context, _ peer.ID, msg *pubsub.Messag
return pubsub.ValidationAccept, nil return pubsub.ValidationAccept, nil
} }
func sliceFromCount(count uint64) []uint64 { func mapFromCount(count uint64) map[uint64]bool {
result := make([]uint64, 0, count) result := make(map[uint64]bool, count)
for item := range count { for item := range count {
result = append(result, item) result[item] = true
} }
return result return result
} }
func (s *Service) activeSyncSubnetIndices(currentSlot primitives.Slot) []uint64 { func mapFromSlice(slices ...[]uint64) map[uint64]bool {
if flags.Get().SubscribeToAllSubnets { result := make(map[uint64]bool)
return sliceFromCount(params.BeaconConfig().SyncCommitteeSubnetCount)
for _, slice := range slices {
for _, item := range slice {
result[item] = true
}
}
return result
}
func (s *Service) activeSyncSubnetIndices(currentSlot primitives.Slot) map[uint64]bool {
if flags.Get().SubscribeToAllSubnets {
return mapFromCount(params.BeaconConfig().SyncCommitteeSubnetCount)
} }
// Get the current epoch.
currentEpoch := slots.ToEpoch(currentSlot) currentEpoch := slots.ToEpoch(currentSlot)
subscriptions := cache.SyncSubnetIDs.GetAllSubnets(currentEpoch)
// Retrieve the subnets we want to subscribe to. return mapFromSlice(subscriptions)
subs := cache.SyncSubnetIDs.GetAllSubnets(currentEpoch)
return slice.SetUint64(subs)
} }
// Register PubSub subscribers // Register PubSub subscribers
@@ -111,14 +149,14 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
s.attesterSlashingSubscriber, s.attesterSlashingSubscriber,
digest, digest,
) )
s.subscribeWithParameters( s.subscribeWithParameters(subscribeParameters{
p2p.AttestationSubnetTopicFormat, topicFormat: p2p.AttestationSubnetTopicFormat,
s.validateCommitteeIndexBeaconAttestation, validate: s.validateCommitteeIndexBeaconAttestation,
s.committeeIndexBeaconAttestationSubscriber, handle: s.committeeIndexBeaconAttestationSubscriber,
digest, digest: digest,
s.persistentAndAggregatorSubnetIndices, getSubnetsToJoin: s.persistentAndAggregatorSubnetIndices,
s.attesterSubnetIndices, getSubnetsRequiringPeers: attesterSubnetIndices,
) })
// New gossip topic in Altair // New gossip topic in Altair
if params.BeaconConfig().AltairForkEpoch <= epoch { if params.BeaconConfig().AltairForkEpoch <= epoch {
@@ -128,14 +166,15 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
s.syncContributionAndProofSubscriber, s.syncContributionAndProofSubscriber,
digest, digest,
) )
s.subscribeWithParameters(
p2p.SyncCommitteeSubnetTopicFormat, s.subscribeWithParameters(subscribeParameters{
s.validateSyncCommitteeMessage, topicFormat: p2p.SyncCommitteeSubnetTopicFormat,
s.syncCommitteeMessageSubscriber, validate: s.validateSyncCommitteeMessage,
digest, handle: s.syncCommitteeMessageSubscriber,
s.activeSyncSubnetIndices, digest: digest,
func(currentSlot primitives.Slot) []uint64 { return []uint64{} }, getSubnetsToJoin: s.activeSyncSubnetIndices,
) })
if features.Get().EnableLightClient { if features.Get().EnableLightClient {
s.subscribe( s.subscribe(
p2p.LightClientOptimisticUpdateTopicFormat, p2p.LightClientOptimisticUpdateTopicFormat,
@@ -164,42 +203,39 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
// New gossip topic in Deneb, removed in Electra // New gossip topic in Deneb, removed in Electra
if params.BeaconConfig().DenebForkEpoch <= epoch && epoch < params.BeaconConfig().ElectraForkEpoch { if params.BeaconConfig().DenebForkEpoch <= epoch && epoch < params.BeaconConfig().ElectraForkEpoch {
s.subscribeWithParameters( s.subscribeWithParameters(subscribeParameters{
p2p.BlobSubnetTopicFormat, topicFormat: p2p.BlobSubnetTopicFormat,
s.validateBlob, validate: s.validateBlob,
s.blobSubscriber, handle: s.blobSubscriber,
digest, digest: digest,
func(currentSlot primitives.Slot) []uint64 { getSubnetsToJoin: func(primitives.Slot) map[uint64]bool {
return sliceFromCount(params.BeaconConfig().BlobsidecarSubnetCount) return mapFromCount(params.BeaconConfig().BlobsidecarSubnetCount)
}, },
func(currentSlot primitives.Slot) []uint64 { return []uint64{} }, })
)
} }
// New gossip topic in Electra, removed in Fulu // New gossip topic in Electra, removed in Fulu
if params.BeaconConfig().ElectraForkEpoch <= epoch && epoch < params.BeaconConfig().FuluForkEpoch { if params.BeaconConfig().ElectraForkEpoch <= epoch && epoch < params.BeaconConfig().FuluForkEpoch {
s.subscribeWithParameters( s.subscribeWithParameters(subscribeParameters{
p2p.BlobSubnetTopicFormat, topicFormat: p2p.BlobSubnetTopicFormat,
s.validateBlob, validate: s.validateBlob,
s.blobSubscriber, handle: s.blobSubscriber,
digest, digest: digest,
func(currentSlot primitives.Slot) []uint64 { getSubnetsToJoin: func(currentSlot primitives.Slot) map[uint64]bool {
return sliceFromCount(params.BeaconConfig().BlobsidecarSubnetCountElectra) return mapFromCount(params.BeaconConfig().BlobsidecarSubnetCountElectra)
}, },
func(currentSlot primitives.Slot) []uint64 { return []uint64{} }, })
)
} }
// New gossip topic in Fulu. // New gossip topic in Fulu.
if params.BeaconConfig().FuluForkEpoch <= epoch { if params.BeaconConfig().FuluForkEpoch <= epoch {
s.subscribeWithParameters( s.subscribeWithParameters(subscribeParameters{
p2p.DataColumnSubnetTopicFormat, topicFormat: p2p.DataColumnSubnetTopicFormat,
s.validateDataColumn, validate: s.validateDataColumn,
s.dataColumnSubscriber, handle: s.dataColumnSubscriber,
digest, digest: digest,
s.dataColumnSubnetIndices, getSubnetsToJoin: s.dataColumnSubnetIndices,
func(currentSlot primitives.Slot) []uint64 { return []uint64{} }, })
)
} }
} }
@@ -379,197 +415,185 @@ func (s *Service) wrapAndReportValidation(topic string, v wrappedVal) (string, p
} }
} }
// pruneSubscriptions unsubscribe from topics we are currently subscribed to but that are // pruneSubscriptions unsubscribes from topics we are currently subscribed to but that are
// not in the list of wanted subnets. // not in the list of wanted subnets.
// This function mutates the `subscriptionBySubnet` map, which is used to keep track of the current subscriptions.
func (s *Service) pruneSubscriptions( func (s *Service) pruneSubscriptions(
subscriptions map[uint64]*pubsub.Subscription, subscriptionBySubnet map[uint64]*pubsub.Subscription,
wantedSubs []uint64, wantedSubnets map[uint64]bool,
topicFormat string, topicFormat string,
digest [4]byte, digest [4]byte,
) { ) {
for k, v := range subscriptions { for subnet, subscription := range subscriptionBySubnet {
var wanted bool if subscription == nil {
for _, idx := range wantedSubs { // Should not happen, but just in case.
if k == idx { delete(subscriptionBySubnet, subnet)
wanted = true
break
}
}
if !wanted && v != nil {
v.Cancel()
fullTopic := fmt.Sprintf(topicFormat, digest, k) + s.cfg.p2p.Encoding().ProtocolSuffix()
s.unSubscribeFromTopic(fullTopic)
delete(subscriptions, k)
}
}
}
// searchForPeers searches for peers in the given subnets.
func (s *Service) searchForPeers(
ctx context.Context,
topicFormat string,
digest [4]byte,
currentSlot primitives.Slot,
getSubnetsToSubscribe func(currentSlot primitives.Slot) []uint64,
getSubnetsToFindPeersOnly func(currentSlot primitives.Slot) []uint64,
) {
// Retrieve the subnets we want to subscribe to.
subnetsToSubscribeIndex := getSubnetsToSubscribe(currentSlot)
// Retrieve the subnets we want to find peers for.
subnetsToFindPeersOnlyIndex := getSubnetsToFindPeersOnly(currentSlot)
// Combine the subnets to subscribe and the subnets to find peers for.
subnetsToFindPeersIndex := slice.SetUint64(append(subnetsToSubscribeIndex, subnetsToFindPeersOnlyIndex...))
// Find new peers for wanted subnets if needed.
for _, subnetIndex := range subnetsToFindPeersIndex {
topic := fmt.Sprintf(topicFormat, digest, subnetIndex)
// Check if we have enough peers in the subnet. Skip if we do.
if s.enoughPeersAreConnected(topic) {
continue continue
} }
// Not enough peers in the subnet, we need to search for more. if wantedSubnets[subnet] {
_, err := s.cfg.p2p.FindPeersWithSubnet(ctx, topic, subnetIndex, flags.Get().MinimumPeersPerSubnet) // Nothing to prune.
if err != nil { continue
log.WithError(err).Debug("Could not search for peers")
} }
// We are subscribed to a subnet that is no longer wanted.
subscription.Cancel()
fullTopic := fmt.Sprintf(topicFormat, digest, subnet) + s.cfg.p2p.Encoding().ProtocolSuffix()
s.unSubscribeFromTopic(fullTopic)
delete(subscriptionBySubnet, subnet)
} }
} }
// subscribeToSubnets subscribes to needed subnets, unsubscribe from unneeded ones and search for more peers if needed. // subscribeToSubnets subscribes to needed subnets and unsubscribe from unneeded ones.
// Returns `true` if the digest is valid (wrt. the current epoch), `false` otherwise. // This functions mutates the `subscriptionBySubnet` map, which is used to keep track of the current subscriptions.
func (s *Service) subscribeToSubnets( func (s *Service) subscribeToSubnets(p subscribeToSubnetsParameters) error {
topicFormat string,
digest [4]byte,
genesisValidatorsRoot [fieldparams.RootLength]byte,
genesisTime time.Time,
subscriptions map[uint64]*pubsub.Subscription,
currentSlot primitives.Slot,
validate wrappedVal,
handle subHandler,
getSubnetsToSubscribe func(currentSlot primitives.Slot) []uint64,
) bool {
// Do not subscribe if not synced. // Do not subscribe if not synced.
if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() { if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() {
return true return nil
} }
// Check the validity of the digest. // Check the validity of the digest.
valid, err := isDigestValid(digest, genesisTime, genesisValidatorsRoot) valid, err := isDigestValid(p.digest, p.genesisTime, p.genesisValidatorsRoot)
if err != nil { if err != nil {
log.Error(err) return errors.Wrap(err, "is digest valid")
return true
} }
// Unsubscribe from all subnets if the digest is not valid. It's likely to be the case after a hard fork. // Unsubscribe from all subnets if digest is not valid. It's likely to be the case after a hard fork.
if !valid { if !valid {
description := topicFormat wantedSubnets := map[uint64]bool{}
if pos := strings.LastIndex(topicFormat, "/"); pos != -1 { s.pruneSubscriptions(p.subscriptionBySubnet, wantedSubnets, p.topicFormat, p.digest)
description = topicFormat[pos+1:] return errInvalidDigest
}
if pos := strings.LastIndex(description, "_"); pos != -1 {
description = description[:pos]
}
log.WithFields(logrus.Fields{
"digest": fmt.Sprintf("%#x", digest),
"subnets": description,
}).Debug("Subnets with this digest are no longer valid, unsubscribing from all of them")
s.pruneSubscriptions(subscriptions, []uint64{}, topicFormat, digest)
return false
} }
// Retrieve the subnets we want to subscribe to. // Retrieve the subnets we want to join.
subnetsToSubscribeIndex := getSubnetsToSubscribe(currentSlot) subnetsToJoin := p.getSubnetsToJoin(p.currentSlot)
// Remove subscriptions that are no longer wanted. // Remove subscriptions that are no longer wanted.
s.pruneSubscriptions(subscriptions, subnetsToSubscribeIndex, topicFormat, digest) s.pruneSubscriptions(p.subscriptionBySubnet, subnetsToJoin, p.topicFormat, p.digest)
// Subscribe to wanted subnets. // Subscribe to wanted and not already registered subnets.
for _, subnetIndex := range subnetsToSubscribeIndex { for subnet := range subnetsToJoin {
subnetTopic := fmt.Sprintf(topicFormat, digest, subnetIndex) subnetTopic := fmt.Sprintf(p.topicFormat, p.digest, subnet)
// Check if subscription exists. if _, exists := p.subscriptionBySubnet[subnet]; !exists {
if _, exists := subscriptions[subnetIndex]; exists { subscription := s.subscribeWithBase(subnetTopic, p.validate, p.handle)
continue p.subscriptionBySubnet[subnet] = subscription
} }
// We need to subscribe to the subnet.
subscription := s.subscribeWithBase(subnetTopic, validate, handle)
subscriptions[subnetIndex] = subscription
} }
return true
return nil
} }
// subscribeWithParameters subscribes to a list of subnets. // subscribeWithParameters subscribes to a list of subnets.
func (s *Service) subscribeWithParameters( func (s *Service) subscribeWithParameters(p subscribeParameters) {
topicFormat string, minimumPeersPerSubnet := flags.Get().MinimumPeersPerSubnet
validate wrappedVal, subscriptionBySubnet := make(map[uint64]*pubsub.Subscription)
handle subHandler,
digest [4]byte,
getSubnetsToSubscribe func(currentSlot primitives.Slot) []uint64,
getSubnetsToFindPeersOnly func(currentSlot primitives.Slot) []uint64,
) {
// Initialize the subscriptions map.
subscriptions := make(map[uint64]*pubsub.Subscription)
// Retrieve the genesis validators root.
genesisValidatorsRoot := s.cfg.clock.GenesisValidatorsRoot() genesisValidatorsRoot := s.cfg.clock.GenesisValidatorsRoot()
// Retrieve the epoch of the fork corresponding to the digest.
_, epoch, err := forks.RetrieveForkDataFromDigest(digest, genesisValidatorsRoot[:])
if err != nil {
panic(err) // lint:nopanic -- Impossible condition.
}
// Retrieve the base protobuf message.
base := p2p.GossipTopicMappings(topicFormat, epoch)
if base == nil {
panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topicFormat)) // lint:nopanic -- Impossible condition.
}
// Retrieve the genesis time.
genesisTime := s.cfg.clock.GenesisTime() genesisTime := s.cfg.clock.GenesisTime()
// Define a ticker ticking every slot.
secondsPerSlot := params.BeaconConfig().SecondsPerSlot secondsPerSlot := params.BeaconConfig().SecondsPerSlot
ticker := slots.NewSlotTicker(genesisTime, secondsPerSlot) secondsPerSlotDuration := time.Duration(secondsPerSlot) * time.Second
// Retrieve the current slot.
currentSlot := s.cfg.clock.CurrentSlot() currentSlot := s.cfg.clock.CurrentSlot()
neededSubnets := computeAllNeededSubnets(currentSlot, p.getSubnetsToJoin, p.getSubnetsRequiringPeers)
// Subscribe to subnets. shortTopicFormat := p.topicFormat
s.subscribeToSubnets(topicFormat, digest, genesisValidatorsRoot, genesisTime, subscriptions, currentSlot, validate, handle, getSubnetsToSubscribe) shortTopicFormatLen := len(shortTopicFormat)
if shortTopicFormatLen >= 3 && shortTopicFormat[shortTopicFormatLen-3:] == "_%d" {
shortTopicFormat = shortTopicFormat[:shortTopicFormatLen-3]
}
// Derive a new context and cancel function. shortTopic := fmt.Sprintf(shortTopicFormat, p.digest)
ctx, cancel := context.WithCancel(s.ctx)
parameters := subscribeToSubnetsParameters{
subscriptionBySubnet: subscriptionBySubnet,
topicFormat: p.topicFormat,
digest: p.digest,
genesisValidatorsRoot: genesisValidatorsRoot,
genesisTime: genesisTime,
currentSlot: currentSlot,
validate: p.validate,
handle: p.handle,
getSubnetsToJoin: p.getSubnetsToJoin,
}
err := s.subscribeToSubnets(parameters)
if err != nil {
log.WithError(err).Error("Could not subscribe to subnets")
}
// Subscribe to expected subnets and search for peers if needed at every slot.
go func() { go func() {
// Search for peers. func() {
s.searchForPeers(ctx, topicFormat, digest, currentSlot, getSubnetsToSubscribe, getSubnetsToFindPeersOnly) ctx, cancel := context.WithTimeout(s.ctx, secondsPerSlotDuration)
defer cancel()
if err := s.cfg.p2p.FindAndDialPeersWithSubnets(ctx, p.topicFormat, p.digest, minimumPeersPerSubnet, neededSubnets); err != nil && !errors.Is(err, context.DeadlineExceeded) {
log.WithError(err).Debug("Could not find peers with subnets")
}
}()
slotTicker := slots.NewSlotTicker(genesisTime, secondsPerSlot)
defer slotTicker.Done()
for { for {
select { select {
case currentSlot := <-ticker.C(): case currentSlot := <-slotTicker.C():
isDigestValid := s.subscribeToSubnets(topicFormat, digest, genesisValidatorsRoot, genesisTime, subscriptions, currentSlot, validate, handle, getSubnetsToSubscribe) parameters.currentSlot = currentSlot
if err := s.subscribeToSubnets(parameters); err != nil {
if errors.Is(err, errInvalidDigest) {
log.WithField("topics", shortTopic).Debug("Digest is invalid, stopping subscription")
return
}
// Stop the ticker if the digest is not valid. Likely to happen after a hard fork. log.WithError(err).Error("Could not subscribe to subnets")
if !isDigestValid { continue
ticker.Done()
return
} }
// Search for peers. func() {
s.searchForPeers(ctx, topicFormat, digest, currentSlot, getSubnetsToSubscribe, getSubnetsToFindPeersOnly) ctx, cancel := context.WithTimeout(s.ctx, secondsPerSlotDuration)
defer cancel()
if err := s.cfg.p2p.FindAndDialPeersWithSubnets(ctx, p.topicFormat, p.digest, minimumPeersPerSubnet, neededSubnets); err != nil && !errors.Is(err, context.DeadlineExceeded) {
log.WithError(err).Debug("Could not find peers with subnets")
}
}()
case <-s.ctx.Done():
return
}
}
}()
// Warn the user if we are not subscribed to enough peers in the subnets.
go func() {
log := log.WithField("minimum", minimumPeersPerSubnet)
logTicker := time.NewTicker(5 * time.Minute)
defer logTicker.Stop()
for {
select {
case <-logTicker.C:
subnetsToFindPeersIndex := computeAllNeededSubnets(currentSlot, p.getSubnetsToJoin, p.getSubnetsRequiringPeers)
isSubnetWithMissingPeers := false
// Find new peers for wanted subnets if needed.
for index := range subnetsToFindPeersIndex {
topic := fmt.Sprintf(p.topicFormat, p.digest, index)
// Check if we have enough peers in the subnet. Skip if we do.
if count := s.connectedPeersCount(topic); count < minimumPeersPerSubnet {
isSubnetWithMissingPeers = true
log.WithFields(logrus.Fields{
"topic": topic,
"actual": count,
}).Warning("Not enough connected peers")
}
}
if !isSubnetWithMissingPeers {
log.WithField("topic", shortTopic).Info("All subnets have enough connected peers")
}
case <-s.ctx.Done(): case <-s.ctx.Done():
cancel()
ticker.Done()
return return
} }
} }
@@ -591,76 +615,82 @@ func (s *Service) unSubscribeFromTopic(topic string) {
} }
} }
// enoughPeersAreConnected checks if we have enough peers which are subscribed to the same subnet. // connectedPeersCount counts how many peer for a given topic are connected to the node.
func (s *Service) enoughPeersAreConnected(subnetTopic string) bool { func (s *Service) connectedPeersCount(subnetTopic string) int {
topic := subnetTopic + s.cfg.p2p.Encoding().ProtocolSuffix() topic := subnetTopic + s.cfg.p2p.Encoding().ProtocolSuffix()
threshold := flags.Get().MinimumPeersPerSubnet
peersWithSubnet := s.cfg.p2p.PubSub().ListPeers(topic) peersWithSubnet := s.cfg.p2p.PubSub().ListPeers(topic)
peersWithSubnetCount := len(peersWithSubnet) return len(peersWithSubnet)
return peersWithSubnetCount >= threshold
} }
func (s *Service) dataColumnSubnetIndices(_ primitives.Slot) []uint64 { func (s *Service) dataColumnSubnetIndices(primitives.Slot) map[uint64]bool {
nodeID := s.cfg.p2p.NodeID() nodeID := s.cfg.p2p.NodeID()
custodyGroupCount := s.cfg.custodyInfo.CustodyGroupSamplingSize(peerdas.Target) custodyGroupCount := s.cfg.custodyInfo.CustodyGroupSamplingSize(peerdas.Target)
nodeInfo, _, err := peerdas.Info(nodeID, custodyGroupCount) nodeInfo, _, err := peerdas.Info(nodeID, custodyGroupCount)
if err != nil { if err != nil {
log.WithError(err).Error("Could not retrieve peer info") log.WithError(err).Error("Could not retrieve peer info")
return []uint64{} return nil
} }
return sliceFromMap(nodeInfo.DataColumnsSubnets, true /*sorted*/) return nodeInfo.DataColumnsSubnets
} }
func (s *Service) persistentAndAggregatorSubnetIndices(currentSlot primitives.Slot) []uint64 { func (s *Service) persistentAndAggregatorSubnetIndices(currentSlot primitives.Slot) map[uint64]bool {
if flags.Get().SubscribeToAllSubnets { if flags.Get().SubscribeToAllSubnets {
return sliceFromCount(params.BeaconConfig().AttestationSubnetCount) return mapFromCount(params.BeaconConfig().AttestationSubnetCount)
} }
persistentSubnetIndices := s.persistentSubnetIndices() persistentSubnetIndices := persistentSubnetIndices()
aggregatorSubnetIndices := s.aggregatorSubnetIndices(currentSlot) aggregatorSubnetIndices := aggregatorSubnetIndices(currentSlot)
// Combine subscriptions to get all requested subscriptions. // Combine subscriptions to get all requested subscriptions.
return slice.SetUint64(append(persistentSubnetIndices, aggregatorSubnetIndices...)) return mapFromSlice(persistentSubnetIndices, aggregatorSubnetIndices)
} }
// filters out required peers for the node to function, not // filters out required peers for the node to function, not
// pruning peers who are in our attestation subnets. // pruning peers who are in our attestation subnets.
func (s *Service) filterNeededPeers(pids []peer.ID) []peer.ID { func (s *Service) filterNeededPeers(pids []peer.ID) []peer.ID {
minimumPeersPerSubnet := flags.Get().MinimumPeersPerSubnet
currentSlot := s.cfg.clock.CurrentSlot()
// Exit early if nothing to filter. // Exit early if nothing to filter.
if len(pids) == 0 { if len(pids) == 0 {
return pids return pids
} }
digest, err := s.currentForkDigest() digest, err := s.currentForkDigest()
if err != nil { if err != nil {
log.WithError(err).Error("Could not compute fork digest") log.WithError(err).Error("Could not compute fork digest")
return pids return pids
} }
currSlot := s.cfg.clock.CurrentSlot()
wantedSubs := s.persistentAndAggregatorSubnetIndices(currSlot) wantedSubnets := make(map[uint64]bool)
wantedSubs = slice.SetUint64(append(wantedSubs, s.attesterSubnetIndices(currSlot)...)) for subnet := range s.persistentAndAggregatorSubnetIndices(currentSlot) {
wantedSubnets[subnet] = true
}
for subnet := range attesterSubnetIndices(currentSlot) {
wantedSubnets[subnet] = true
}
topic := p2p.GossipTypeMapping[reflect.TypeOf(&ethpb.Attestation{})] topic := p2p.GossipTypeMapping[reflect.TypeOf(&ethpb.Attestation{})]
// Map of peers in subnets // Map of peers in subnets
peerMap := make(map[peer.ID]bool) peerMap := make(map[peer.ID]bool)
for subnet := range wantedSubnets {
for _, sub := range wantedSubs { subnetTopic := fmt.Sprintf(topic, digest, subnet) + s.cfg.p2p.Encoding().ProtocolSuffix()
subnetTopic := fmt.Sprintf(topic, digest, sub) + s.cfg.p2p.Encoding().ProtocolSuffix() peers := s.cfg.p2p.PubSub().ListPeers(subnetTopic)
ps := s.cfg.p2p.PubSub().ListPeers(subnetTopic) if len(peers) > minimumPeersPerSubnet {
if len(ps) > flags.Get().MinimumPeersPerSubnet {
// In the event we have more than the minimum, we can // In the event we have more than the minimum, we can
// mark the remaining as viable for pruning. // mark the remaining as viable for pruning.
ps = ps[:flags.Get().MinimumPeersPerSubnet] peers = peers[:minimumPeersPerSubnet]
} }
// Add peer to peer map. // Add peer to peer map.
for _, p := range ps { for _, peer := range peers {
// Even if the peer id has // Even if the peer ID has already been seen we still set it,
// already been seen we still set // as the outcome is the same.
// it, as the outcome is the same. peerMap[peer] = true
peerMap[p] = true
} }
} }
@@ -716,6 +746,34 @@ func isDigestValid(digest [4]byte, genesis time.Time, genValRoot [32]byte) (bool
return retDigest == digest, nil return retDigest == digest, nil
} }
// computeAllNeededSubnets computes the subnets we want to join
// and the subnets for which we want to find peers.
func computeAllNeededSubnets(
currentSlot primitives.Slot,
getSubnetsToJoin func(currentSlot primitives.Slot) map[uint64]bool,
getSubnetsRequiringPeers func(currentSlot primitives.Slot) map[uint64]bool,
) map[uint64]bool {
// Retrieve the subnets we want to join.
subnetsToJoin := getSubnetsToJoin(currentSlot)
// Retrieve the subnets we want to find peers into.
subnetsRequiringPeers := make(map[uint64]bool)
if getSubnetsRequiringPeers != nil {
subnetsRequiringPeers = getSubnetsRequiringPeers(currentSlot)
}
// Combine the two maps to get all needed subnets.
neededSubnets := make(map[uint64]bool, len(subnetsToJoin)+len(subnetsRequiringPeers))
for subnet := range subnetsToJoin {
neededSubnets[subnet] = true
}
for subnet := range subnetsRequiringPeers {
neededSubnets[subnet] = true
}
return neededSubnets
}
func agentString(pid peer.ID, hst host.Host) string { func agentString(pid peer.ID, hst host.Host) string {
rawVersion, storeErr := hst.Peerstore().Get(pid, "AgentVersion") rawVersion, storeErr := hst.Peerstore().Get(pid, "AgentVersion")
agString, ok := rawVersion.(string) agString, ok := rawVersion.(string)
@@ -742,17 +800,3 @@ func errorIsIgnored(err error) bool {
} }
return false return false
} }
// sliceFromMap returns a sorted list of keys from a map.
func sliceFromMap(m map[uint64]bool, sorted ...bool) []uint64 {
result := make([]uint64, 0, len(m))
for k := range m {
result = append(result, k)
}
if len(sorted) > 0 && sorted[0] {
slices.Sort(result)
}
return result
}

View File

@@ -35,11 +35,11 @@ func (s *Service) committeeIndexBeaconAttestationSubscriber(_ context.Context, m
} }
} }
func (*Service) persistentSubnetIndices() []uint64 { func persistentSubnetIndices() []uint64 {
return cache.SubnetIDs.GetAllSubnets() return cache.SubnetIDs.GetAllSubnets()
} }
func (*Service) aggregatorSubnetIndices(currentSlot primitives.Slot) []uint64 { func aggregatorSubnetIndices(currentSlot primitives.Slot) []uint64 {
endEpoch := slots.ToEpoch(currentSlot) + 1 endEpoch := slots.ToEpoch(currentSlot) + 1
endSlot := params.BeaconConfig().SlotsPerEpoch.Mul(uint64(endEpoch)) endSlot := params.BeaconConfig().SlotsPerEpoch.Mul(uint64(endEpoch))
var commIds []uint64 var commIds []uint64
@@ -49,12 +49,16 @@ func (*Service) aggregatorSubnetIndices(currentSlot primitives.Slot) []uint64 {
return slice.SetUint64(commIds) return slice.SetUint64(commIds)
} }
func (*Service) attesterSubnetIndices(currentSlot primitives.Slot) []uint64 { func attesterSubnetIndices(currentSlot primitives.Slot) map[uint64]bool {
endEpoch := slots.ToEpoch(currentSlot) + 1 endEpoch := slots.ToEpoch(currentSlot) + 1
endSlot := params.BeaconConfig().SlotsPerEpoch.Mul(uint64(endEpoch)) endSlot := params.BeaconConfig().SlotsPerEpoch.Mul(uint64(endEpoch))
var commIds []uint64
subnets := make(map[uint64]bool, int(endSlot-currentSlot+1))
for i := currentSlot; i <= endSlot; i++ { for i := currentSlot; i <= endSlot; i++ {
commIds = append(commIds, cache.SubnetIDs.GetAttesterSubnetIDs(i)...) for _, subnetId := range cache.SubnetIDs.GetAttesterSubnetIDs(i) {
subnets[subnetId] = true
}
} }
return slice.SetUint64(commIds)
return subnets
} }

View File

@@ -310,7 +310,7 @@ func TestRevalidateSubscription_CorrectlyFormatsTopic(t *testing.T) {
subscriptions[2], err = r.cfg.p2p.SubscribeToTopic(fullTopic) subscriptions[2], err = r.cfg.p2p.SubscribeToTopic(fullTopic)
require.NoError(t, err) require.NoError(t, err)
r.pruneSubscriptions(subscriptions, []uint64{2}, defaultTopic, digest) r.pruneSubscriptions(subscriptions, map[uint64]bool{2: true}, defaultTopic, digest)
require.LogsDoNotContain(t, hook, "Could not unregister topic validator") require.LogsDoNotContain(t, hook, "Could not unregister topic validator")
} }
@@ -482,8 +482,7 @@ func TestFilterSubnetPeers(t *testing.T) {
p2 := createPeer(t, subnet10, subnet20) p2 := createPeer(t, subnet10, subnet20)
p3 := createPeer(t) p3 := createPeer(t)
// Connect to all // Connect to all peers.
// peers.
p.Connect(p1) p.Connect(p1)
p.Connect(p2) p.Connect(p2)
p.Connect(p3) p.Connect(p3)
@@ -540,7 +539,11 @@ func TestSubscribeWithSyncSubnets_DynamicOK(t *testing.T) {
cache.SyncSubnetIDs.AddSyncCommitteeSubnets([]byte("pubkey"), currEpoch, []uint64{0, 1}, 10*time.Second) cache.SyncSubnetIDs.AddSyncCommitteeSubnets([]byte("pubkey"), currEpoch, []uint64{0, 1}, 10*time.Second)
digest, err := r.currentForkDigest() digest, err := r.currentForkDigest()
assert.NoError(t, err) assert.NoError(t, err)
r.subscribeWithParameters(p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest, r.activeSyncSubnetIndices, func(currentSlot primitives.Slot) []uint64 { return []uint64{} }) r.subscribeWithParameters(subscribeParameters{
topicFormat: p2p.SyncCommitteeSubnetTopicFormat,
digest: digest,
getSubnetsToJoin: r.activeSyncSubnetIndices,
})
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
assert.Equal(t, 2, len(r.cfg.p2p.PubSub().GetTopics())) assert.Equal(t, 2, len(r.cfg.p2p.PubSub().GetTopics()))
topicMap := map[string]bool{} topicMap := map[string]bool{}
@@ -589,7 +592,11 @@ func TestSubscribeWithSyncSubnets_DynamicSwitchFork(t *testing.T) {
digest, err := signing.ComputeForkDigest(params.BeaconConfig().GenesisForkVersion, genRoot[:]) digest, err := signing.ComputeForkDigest(params.BeaconConfig().GenesisForkVersion, genRoot[:])
assert.NoError(t, err) assert.NoError(t, err)
r.subscribeWithParameters(p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest, r.activeSyncSubnetIndices, func(currentSlot primitives.Slot) []uint64 { return []uint64{} }) r.subscribeWithParameters(subscribeParameters{
topicFormat: p2p.SyncCommitteeSubnetTopicFormat,
digest: digest,
getSubnetsToJoin: r.activeSyncSubnetIndices,
})
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
assert.Equal(t, 2, len(r.cfg.p2p.PubSub().GetTopics())) assert.Equal(t, 2, len(r.cfg.p2p.PubSub().GetTopics()))
topicMap := map[string]bool{} topicMap := map[string]bool{}

View File

@@ -0,0 +1,2 @@
### Fixed
- Subnets subscription: Avoid dynamic subscribing blocking in case not enough peers per subnets are found.

View File

@@ -427,6 +427,7 @@ func (b *BeaconChainConfig) TargetBlobsPerBlock(slot primitives.Slot) int {
return b.DeprecatedMaxBlobsPerBlock / 2 return b.DeprecatedMaxBlobsPerBlock / 2
} }
// MaxBlobsPerBlock returns the maximum number of blobs per block for the given slot.
func (b *BeaconChainConfig) MaxBlobsPerBlock(slot primitives.Slot) int { func (b *BeaconChainConfig) MaxBlobsPerBlock(slot primitives.Slot) int {
epoch := primitives.Epoch(slot.DivSlot(b.SlotsPerEpoch)) epoch := primitives.Epoch(slot.DivSlot(b.SlotsPerEpoch))
@@ -449,6 +450,7 @@ func (b *BeaconChainConfig) MaxBlobsPerBlock(slot primitives.Slot) int {
if epoch >= b.ElectraForkEpoch { if epoch >= b.ElectraForkEpoch {
return b.DeprecatedMaxBlobsPerBlockElectra return b.DeprecatedMaxBlobsPerBlockElectra
} }
return b.DeprecatedMaxBlobsPerBlock return b.DeprecatedMaxBlobsPerBlock
} }