mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 23:48:06 -05:00
Reform Inbound Limit (#8465)
* clean up * name * comment * change back Co-authored-by: Raul Jordan <raul@prysmaticlabs.com> Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
This commit is contained in:
@@ -78,7 +78,7 @@ func (s *Service) validateDial(addr multiaddr.Multiaddr) bool {
|
||||
return false
|
||||
}
|
||||
s.ipLimiter.Add(ip.String(), 1)
|
||||
return !s.peers.IsAboveInboundLimit()
|
||||
return true
|
||||
}
|
||||
|
||||
// configureFilter looks at the provided allow lists and
|
||||
|
||||
@@ -9,10 +9,12 @@ import (
|
||||
"github.com/libp2p/go-libp2p"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"github.com/multiformats/go-multiaddr"
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/peerdata"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/scorers"
|
||||
mockp2p "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
|
||||
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
|
||||
"github.com/prysmaticlabs/prysm/shared/testutil/require"
|
||||
)
|
||||
@@ -102,6 +104,8 @@ func TestService_RejectInboundPeersBeyondLimit(t *testing.T) {
|
||||
PeerLimit: limit,
|
||||
ScorerParams: &scorers.Config{},
|
||||
}),
|
||||
host: mockp2p.NewTestP2P(t).BHost,
|
||||
cfg: &Config{MaxPeers: uint(limit)},
|
||||
}
|
||||
var err error
|
||||
s.addrFilter, err = configureFilter(&Config{})
|
||||
@@ -110,19 +114,20 @@ func TestService_RejectInboundPeersBeyondLimit(t *testing.T) {
|
||||
multiAddress, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", ip, 3000))
|
||||
require.NoError(t, err)
|
||||
|
||||
valid := s.validateDial(multiAddress)
|
||||
valid := s.InterceptAccept(&maEndpoints{raddr: multiAddress})
|
||||
if !valid {
|
||||
t.Errorf("Expected multiaddress with ip %s to be accepted as it is below the inbound limit", ip)
|
||||
}
|
||||
|
||||
inboundLimit := float64(limit) * peers.InboundRatio
|
||||
inboundLimit += highWatermarkBuffer
|
||||
// top off by 1 to trigger it above the limit.
|
||||
inboundLimit += 1
|
||||
// Add in up to inbound peer limit.
|
||||
for i := 0; i < int(inboundLimit); i++ {
|
||||
addPeer(t, s.peers, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED))
|
||||
}
|
||||
valid = s.validateDial(multiAddress)
|
||||
valid = s.InterceptAccept(&maEndpoints{raddr: multiAddress})
|
||||
if valid {
|
||||
t.Errorf("Expected multiaddress with ip %s to be rejected as it exceeds the inbound limit", ip)
|
||||
}
|
||||
@@ -285,3 +290,21 @@ func TestService_InterceptAddrDial_Allow(t *testing.T) {
|
||||
t.Errorf("Expected multiaddress with ip %s to not be rejected with an allow cidr mask of %s", ip, cidr)
|
||||
}
|
||||
}
|
||||
|
||||
// Mock type for testing.
|
||||
type maEndpoints struct {
|
||||
laddr ma.Multiaddr
|
||||
raddr ma.Multiaddr
|
||||
}
|
||||
|
||||
// LocalMultiaddr returns the local address associated with
|
||||
// this connection
|
||||
func (c *maEndpoints) LocalMultiaddr() ma.Multiaddr {
|
||||
return c.laddr
|
||||
}
|
||||
|
||||
// RemoteMultiaddr returns the remote address associated with
|
||||
// this connection
|
||||
func (c *maEndpoints) RemoteMultiaddr() ma.Multiaddr {
|
||||
return c.raddr
|
||||
}
|
||||
|
||||
@@ -288,9 +288,14 @@ func (s *Service) isPeerAtLimit(inbound bool) bool {
|
||||
// we apply the high watermark buffer.
|
||||
if inbound {
|
||||
maxPeers += highWatermarkBuffer
|
||||
maxInbound := s.peers.InboundLimit() + highWatermarkBuffer
|
||||
currInbound := len(s.peers.Inbound())
|
||||
// Exit early if we are at the inbound limit.
|
||||
if currInbound >= maxInbound {
|
||||
return true
|
||||
}
|
||||
}
|
||||
activePeers := len(s.Peers().Active())
|
||||
|
||||
return activePeers >= maxPeers || numOfConns >= maxPeers
|
||||
}
|
||||
|
||||
|
||||
@@ -62,7 +62,7 @@ const (
|
||||
maxLimitBuffer = 150
|
||||
|
||||
// InboundRatio is the proportion of our connected peer limit at which we will allow inbound peers.
|
||||
InboundRatio = float64(1)
|
||||
InboundRatio = float64(0.8)
|
||||
)
|
||||
|
||||
// Status is the structure holding the peer status information.
|
||||
@@ -208,6 +208,14 @@ func (p *Status) IsAboveInboundLimit() bool {
|
||||
return totalInbound > inboundLimit
|
||||
}
|
||||
|
||||
// InboundLimit returns the current inbound
|
||||
// peer limit.
|
||||
func (p *Status) InboundLimit() int {
|
||||
p.store.RLock()
|
||||
defer p.store.RUnlock()
|
||||
return int(float64(p.ConnectedPeerLimit()) * InboundRatio)
|
||||
}
|
||||
|
||||
// SetMetadata sets the metadata of the given remote peer.
|
||||
func (p *Status) SetMetadata(pid peer.ID, metaData *pb.MetaData) {
|
||||
p.store.Lock()
|
||||
@@ -662,7 +670,7 @@ func (p *Status) PeersToPrune() []peer.ID {
|
||||
badResp int
|
||||
}
|
||||
peersToPrune := make([]*peerResp, 0)
|
||||
// Select disconnected peers with a smaller bad response count.
|
||||
// Select connected and inbound peers to prune.
|
||||
for pid, peerData := range p.store.Peers() {
|
||||
if peerData.ConnState == PeerConnected && peerData.Direction == network.DirInbound {
|
||||
peersToPrune = append(peersToPrune, &peerResp{
|
||||
|
||||
Reference in New Issue
Block a user