mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-30 07:38:09 -05:00
Compare commits
5 Commits
debug-stat
...
patchRelea
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6a8ac948a2 | ||
|
|
7a9a026cef | ||
|
|
f52c29489d | ||
|
|
3c6ea738f4 | ||
|
|
4ce1261197 |
@@ -4,6 +4,7 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"crypto/ecdsa"
|
"crypto/ecdsa"
|
||||||
"net"
|
"net"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/p2p/discover"
|
"github.com/ethereum/go-ethereum/p2p/discover"
|
||||||
@@ -15,9 +16,11 @@ import (
|
|||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/prysmaticlabs/go-bitfield"
|
"github.com/prysmaticlabs/go-bitfield"
|
||||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/cache"
|
"github.com/prysmaticlabs/prysm/v5/beacon-chain/cache"
|
||||||
|
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags"
|
||||||
"github.com/prysmaticlabs/prysm/v5/config/features"
|
"github.com/prysmaticlabs/prysm/v5/config/features"
|
||||||
"github.com/prysmaticlabs/prysm/v5/config/params"
|
"github.com/prysmaticlabs/prysm/v5/config/params"
|
||||||
ecdsaprysm "github.com/prysmaticlabs/prysm/v5/crypto/ecdsa"
|
ecdsaprysm "github.com/prysmaticlabs/prysm/v5/crypto/ecdsa"
|
||||||
|
"github.com/prysmaticlabs/prysm/v5/math"
|
||||||
"github.com/prysmaticlabs/prysm/v5/runtime/version"
|
"github.com/prysmaticlabs/prysm/v5/runtime/version"
|
||||||
"github.com/prysmaticlabs/prysm/v5/time/slots"
|
"github.com/prysmaticlabs/prysm/v5/time/slots"
|
||||||
)
|
)
|
||||||
@@ -106,7 +109,7 @@ func (s *Service) RefreshENR() {
|
|||||||
|
|
||||||
// listen for new nodes watches for new nodes in the network and adds them to the peerstore.
|
// listen for new nodes watches for new nodes in the network and adds them to the peerstore.
|
||||||
func (s *Service) listenForNewNodes() {
|
func (s *Service) listenForNewNodes() {
|
||||||
iterator := enode.Filter(s.dv5Listener.RandomNodes(), s.filterPeer)
|
iterator := filterNodes(s.ctx, s.dv5Listener.RandomNodes(), s.filterPeer)
|
||||||
defer iterator.Close()
|
defer iterator.Close()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
@@ -122,29 +125,47 @@ func (s *Service) listenForNewNodes() {
|
|||||||
time.Sleep(pollingPeriod)
|
time.Sleep(pollingPeriod)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
wantedCount := s.wantedPeerDials()
|
||||||
if exists := iterator.Next(); !exists {
|
if wantedCount == 0 {
|
||||||
break
|
log.Trace("Not looking for peers, at peer limit")
|
||||||
}
|
time.Sleep(pollingPeriod)
|
||||||
|
|
||||||
node := iterator.Node()
|
|
||||||
peerInfo, _, err := convertToAddrInfo(node)
|
|
||||||
if err != nil {
|
|
||||||
log.WithError(err).Error("Could not convert to peer info")
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
// Restrict dials if limit is applied.
|
||||||
if peerInfo == nil {
|
if flags.MaxDialIsActive() {
|
||||||
continue
|
var err error
|
||||||
}
|
wantedICount := math.Min(uint64(wantedCount), uint64(flags.Get().MaxConcurrentDials))
|
||||||
|
wantedCount, err = math.Int(wantedICount)
|
||||||
// Make sure that peer is not dialed too often, for each connection attempt there's a backoff period.
|
if err != nil {
|
||||||
s.Peers().RandomizeBackOff(peerInfo.ID)
|
log.WithError(err).Error("Could not get wanted count")
|
||||||
go func(info *peer.AddrInfo) {
|
continue
|
||||||
if err := s.connectWithPeer(s.ctx, *info); err != nil {
|
|
||||||
log.WithError(err).Tracef("Could not connect with peer %s", info.String())
|
|
||||||
}
|
}
|
||||||
}(peerInfo)
|
}
|
||||||
|
wantedNodes := enode.ReadNodes(iterator, wantedCount)
|
||||||
|
wg := new(sync.WaitGroup)
|
||||||
|
for i := 0; i < len(wantedNodes); i++ {
|
||||||
|
node := wantedNodes[i]
|
||||||
|
peerInfo, _, err := convertToAddrInfo(node)
|
||||||
|
if err != nil {
|
||||||
|
log.WithError(err).Error("Could not convert to peer info")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if peerInfo == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure that peer is not dialed too often, for each connection attempt there's a backoff period.
|
||||||
|
s.Peers().RandomizeBackOff(peerInfo.ID)
|
||||||
|
wg.Add(1)
|
||||||
|
go func(info *peer.AddrInfo) {
|
||||||
|
if err := s.connectWithPeer(s.ctx, *info); err != nil {
|
||||||
|
log.WithError(err).Tracef("Could not connect with peer %s", info.String())
|
||||||
|
}
|
||||||
|
wg.Done()
|
||||||
|
}(peerInfo)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -384,6 +405,17 @@ func (s *Service) isPeerAtLimit(inbound bool) bool {
|
|||||||
return activePeers >= maxPeers || numOfConns >= maxPeers
|
return activePeers >= maxPeers || numOfConns >= maxPeers
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Service) wantedPeerDials() int {
|
||||||
|
maxPeers := int(s.cfg.MaxPeers)
|
||||||
|
|
||||||
|
activePeers := len(s.Peers().Active())
|
||||||
|
wantedCount := 0
|
||||||
|
if maxPeers > activePeers {
|
||||||
|
wantedCount = maxPeers - activePeers
|
||||||
|
}
|
||||||
|
return wantedCount
|
||||||
|
}
|
||||||
|
|
||||||
// PeersFromStringAddrs converts peer raw ENRs into multiaddrs for p2p.
|
// PeersFromStringAddrs converts peer raw ENRs into multiaddrs for p2p.
|
||||||
func PeersFromStringAddrs(addrs []string) ([]ma.Multiaddr, error) {
|
func PeersFromStringAddrs(addrs []string) ([]ma.Multiaddr, error) {
|
||||||
var allAddrs []ma.Multiaddr
|
var allAddrs []ma.Multiaddr
|
||||||
|
|||||||
@@ -2,10 +2,14 @@ package p2p
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"runtime"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const backOffCounter = 50
|
||||||
|
|
||||||
// filterNodes wraps an iterator such that Next only returns nodes for which
|
// filterNodes wraps an iterator such that Next only returns nodes for which
|
||||||
// the 'check' function returns true. This custom implementation also
|
// the 'check' function returns true. This custom implementation also
|
||||||
// checks for context deadlines so that in the event the parent context has
|
// checks for context deadlines so that in the event the parent context has
|
||||||
@@ -24,13 +28,21 @@ type filterIter struct {
|
|||||||
// Next looks up for the next valid node according to our
|
// Next looks up for the next valid node according to our
|
||||||
// filter criteria.
|
// filter criteria.
|
||||||
func (f *filterIter) Next() bool {
|
func (f *filterIter) Next() bool {
|
||||||
|
lookupCounter := 0
|
||||||
for f.Iterator.Next() {
|
for f.Iterator.Next() {
|
||||||
|
// Do not excessively perform lookups if we constantly receive non-viable peers.
|
||||||
|
if lookupCounter > backOffCounter {
|
||||||
|
lookupCounter = 0
|
||||||
|
runtime.Gosched()
|
||||||
|
time.Sleep(30 * time.Second)
|
||||||
|
}
|
||||||
if f.Context.Err() != nil {
|
if f.Context.Err() != nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
if f.check(f.Node()) {
|
if f.check(f.Node()) {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
lookupCounter++
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -87,7 +87,18 @@ func (s *Service) FindPeersWithSubnet(ctx context.Context, topic string,
|
|||||||
return false, errors.Errorf("unable to find requisite number of peers for topic %s - "+
|
return false, errors.Errorf("unable to find requisite number of peers for topic %s - "+
|
||||||
"only %d out of %d peers were able to be found", topic, currNum, threshold)
|
"only %d out of %d peers were able to be found", topic, currNum, threshold)
|
||||||
}
|
}
|
||||||
nodes := enode.ReadNodes(iterator, int(params.BeaconNetworkConfig().MinimumPeersInSubnetSearch))
|
nodeCount := int(params.BeaconNetworkConfig().MinimumPeersInSubnetSearch)
|
||||||
|
// Restrict dials if limit is applied.
|
||||||
|
if flags.MaxDialIsActive() {
|
||||||
|
var err error
|
||||||
|
wantedICount := mathutil.Min(uint64(nodeCount), uint64(flags.Get().MaxConcurrentDials))
|
||||||
|
nodeCount, err = mathutil.Int(wantedICount)
|
||||||
|
if err != nil {
|
||||||
|
log.WithError(err).Error("Could not get wanted count")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
nodes := enode.ReadNodes(iterator, nodeCount)
|
||||||
for _, node := range nodes {
|
for _, node := range nodes {
|
||||||
info, _, err := convertToAddrInfo(node)
|
info, _, err := convertToAddrInfo(node)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -227,6 +227,12 @@ var (
|
|||||||
Usage: "Sets the minimum number of peers that a node will attempt to peer with that are subscribed to a subnet.",
|
Usage: "Sets the minimum number of peers that a node will attempt to peer with that are subscribed to a subnet.",
|
||||||
Value: 6,
|
Value: 6,
|
||||||
}
|
}
|
||||||
|
// MaxConcurrentDials defines a flag to set the maximum number of peers that a node will attempt to dial with from discovery.
|
||||||
|
MaxConcurrentDials = &cli.Uint64Flag{
|
||||||
|
Name: "max-concurrent-dials",
|
||||||
|
Usage: "Sets the maximum number of peers that a node will attempt to dial with from discovery. By default we will dials as" +
|
||||||
|
"many peers as possible.",
|
||||||
|
}
|
||||||
// SuggestedFeeRecipient specifies the fee recipient for the transaction fees.
|
// SuggestedFeeRecipient specifies the fee recipient for the transaction fees.
|
||||||
SuggestedFeeRecipient = &cli.StringFlag{
|
SuggestedFeeRecipient = &cli.StringFlag{
|
||||||
Name: "suggested-fee-recipient",
|
Name: "suggested-fee-recipient",
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ type GlobalFlags struct {
|
|||||||
SubscribeToAllSubnets bool
|
SubscribeToAllSubnets bool
|
||||||
MinimumSyncPeers int
|
MinimumSyncPeers int
|
||||||
MinimumPeersPerSubnet int
|
MinimumPeersPerSubnet int
|
||||||
|
MaxConcurrentDials int
|
||||||
BlockBatchLimit int
|
BlockBatchLimit int
|
||||||
BlockBatchLimitBurstFactor int
|
BlockBatchLimitBurstFactor int
|
||||||
BlobBatchLimit int
|
BlobBatchLimit int
|
||||||
@@ -45,11 +46,17 @@ func ConfigureGlobalFlags(ctx *cli.Context) {
|
|||||||
cfg.BlobBatchLimit = ctx.Int(BlobBatchLimit.Name)
|
cfg.BlobBatchLimit = ctx.Int(BlobBatchLimit.Name)
|
||||||
cfg.BlobBatchLimitBurstFactor = ctx.Int(BlobBatchLimitBurstFactor.Name)
|
cfg.BlobBatchLimitBurstFactor = ctx.Int(BlobBatchLimitBurstFactor.Name)
|
||||||
cfg.MinimumPeersPerSubnet = ctx.Int(MinPeersPerSubnet.Name)
|
cfg.MinimumPeersPerSubnet = ctx.Int(MinPeersPerSubnet.Name)
|
||||||
|
cfg.MaxConcurrentDials = ctx.Int(MaxConcurrentDials.Name)
|
||||||
configureMinimumPeers(ctx, cfg)
|
configureMinimumPeers(ctx, cfg)
|
||||||
|
|
||||||
Init(cfg)
|
Init(cfg)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MaxDialIsActive checks if the user has enabled the max dial flag.
|
||||||
|
func MaxDialIsActive() bool {
|
||||||
|
return Get().MaxConcurrentDials > 0
|
||||||
|
}
|
||||||
|
|
||||||
func configureMinimumPeers(ctx *cli.Context, cfg *GlobalFlags) {
|
func configureMinimumPeers(ctx *cli.Context, cfg *GlobalFlags) {
|
||||||
cfg.MinimumSyncPeers = ctx.Int(MinSyncPeers.Name)
|
cfg.MinimumSyncPeers = ctx.Int(MinSyncPeers.Name)
|
||||||
maxPeers := ctx.Int(cmd.P2PMaxPeers.Name)
|
maxPeers := ctx.Int(cmd.P2PMaxPeers.Name)
|
||||||
|
|||||||
@@ -72,6 +72,7 @@ var appFlags = []cli.Flag{
|
|||||||
flags.WeakSubjectivityCheckpoint,
|
flags.WeakSubjectivityCheckpoint,
|
||||||
flags.Eth1HeaderReqLimit,
|
flags.Eth1HeaderReqLimit,
|
||||||
flags.MinPeersPerSubnet,
|
flags.MinPeersPerSubnet,
|
||||||
|
flags.MaxConcurrentDials,
|
||||||
flags.SuggestedFeeRecipient,
|
flags.SuggestedFeeRecipient,
|
||||||
flags.TerminalTotalDifficultyOverride,
|
flags.TerminalTotalDifficultyOverride,
|
||||||
flags.TerminalBlockHashOverride,
|
flags.TerminalBlockHashOverride,
|
||||||
|
|||||||
@@ -124,6 +124,7 @@ var appHelpFlagGroups = []flagGroup{
|
|||||||
flags.WeakSubjectivityCheckpoint,
|
flags.WeakSubjectivityCheckpoint,
|
||||||
flags.Eth1HeaderReqLimit,
|
flags.Eth1HeaderReqLimit,
|
||||||
flags.MinPeersPerSubnet,
|
flags.MinPeersPerSubnet,
|
||||||
|
flags.MaxConcurrentDials,
|
||||||
flags.MevRelayEndpoint,
|
flags.MevRelayEndpoint,
|
||||||
flags.MaxBuilderEpochMissedSlots,
|
flags.MaxBuilderEpochMissedSlots,
|
||||||
flags.MaxBuilderConsecutiveMissedSlots,
|
flags.MaxBuilderConsecutiveMissedSlots,
|
||||||
|
|||||||
4
testing/validator-mock/node_client_mock.go
generated
4
testing/validator-mock/node_client_mock.go
generated
@@ -21,8 +21,8 @@ import (
|
|||||||
|
|
||||||
// MockNodeClient is a mock of NodeClient interface.
|
// MockNodeClient is a mock of NodeClient interface.
|
||||||
type MockNodeClient struct {
|
type MockNodeClient struct {
|
||||||
ctrl *gomock.Controller
|
ctrl *gomock.Controller
|
||||||
recorder *MockNodeClientMockRecorder
|
recorder *MockNodeClientMockRecorder
|
||||||
healthTracker *beacon.NodeHealthTracker
|
healthTracker *beacon.NodeHealthTracker
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -76,4 +76,3 @@ func (mr *MockJsonRestHandlerMockRecorder) Post(ctx, endpoint, headers, data, re
|
|||||||
mr.mock.ctrl.T.Helper()
|
mr.mock.ctrl.T.Helper()
|
||||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Post", reflect.TypeOf((*MockJsonRestHandler)(nil).Post), ctx, endpoint, headers, data, resp)
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Post", reflect.TypeOf((*MockJsonRestHandler)(nil).Post), ctx, endpoint, headers, data, resp)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user