mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 15:37:56 -05:00
Add Inbound Peer Limit (#7942)
* add changes * fix up * fix * add test * fix test * fix again Co-authored-by: Preston Van Loon <preston@prysmaticlabs.com>
This commit is contained in:
@@ -158,6 +158,7 @@ go_test(
|
||||
"@com_github_libp2p_go_libp2p_pubsub//pb:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p_swarm//testing:go_default_library",
|
||||
"@com_github_multiformats_go_multiaddr//:go_default_library",
|
||||
"@com_github_prysmaticlabs_ethereumapis//eth/v1:go_default_library",
|
||||
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
|
||||
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
|
||||
"@com_github_sirupsen_logrus//:go_default_library",
|
||||
|
||||
@@ -79,7 +79,7 @@ func (s *Service) validateDial(addr multiaddr.Multiaddr) bool {
|
||||
return false
|
||||
}
|
||||
s.ipLimiter.Add(ip.String(), 1)
|
||||
return true
|
||||
return !s.peers.IsAboveInboundLimit()
|
||||
}
|
||||
|
||||
// configureFilter looks at the provided allow lists and
|
||||
|
||||
@@ -9,7 +9,9 @@ import (
|
||||
"github.com/libp2p/go-libp2p"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"github.com/multiformats/go-multiaddr"
|
||||
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/peerdata"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/scorers"
|
||||
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
|
||||
"github.com/prysmaticlabs/prysm/shared/testutil/require"
|
||||
@@ -69,6 +71,7 @@ func TestService_InterceptBannedIP(t *testing.T) {
|
||||
s := &Service{
|
||||
ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, false),
|
||||
peers: peers.NewStatus(context.Background(), &peers.StatusConfig{
|
||||
PeerLimit: 20,
|
||||
ScorerParams: &scorers.Config{},
|
||||
}),
|
||||
}
|
||||
@@ -91,6 +94,40 @@ func TestService_InterceptBannedIP(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestService_RejectInboundPeersBeyondLimit(t *testing.T) {
|
||||
limit := 20
|
||||
s := &Service{
|
||||
ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, false),
|
||||
peers: peers.NewStatus(context.Background(), &peers.StatusConfig{
|
||||
PeerLimit: limit,
|
||||
ScorerParams: &scorers.Config{},
|
||||
}),
|
||||
}
|
||||
var err error
|
||||
s.addrFilter, err = configureFilter(&Config{})
|
||||
require.NoError(t, err)
|
||||
ip := "212.67.10.122"
|
||||
multiAddress, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", ip, 3000))
|
||||
require.NoError(t, err)
|
||||
|
||||
valid := s.validateDial(multiAddress)
|
||||
if !valid {
|
||||
t.Errorf("Expected multiaddress with ip %s to be accepted as it is below the inbound limit", ip)
|
||||
}
|
||||
|
||||
inboundLimit := float64(limit) * peers.InboundRatio
|
||||
// top off by 1 to trigger it above the limit.
|
||||
inboundLimit += 1
|
||||
// Add in up to inbound peer limit.
|
||||
for i := 0; i < int(inboundLimit); i++ {
|
||||
addPeer(t, s.peers, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED))
|
||||
}
|
||||
valid = s.validateDial(multiAddress)
|
||||
if valid {
|
||||
t.Errorf("Expected multiaddress with ip %s to be rejected as it exceeds the inbound limit", ip)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPeer_BelowMaxLimit(t *testing.T) {
|
||||
// create host and remote peer
|
||||
ipAddr, pkey := createAddrAndPrivKey(t)
|
||||
|
||||
@@ -274,7 +274,7 @@ func addPeer(t *testing.T, p *peers.Status, state peerdata.PeerConnectionState)
|
||||
mhBytes = append(mhBytes, idBytes...)
|
||||
id, err := peer.IDFromBytes(mhBytes)
|
||||
require.NoError(t, err)
|
||||
p.Add(new(enr.Record), id, nil, network.DirUnknown)
|
||||
p.Add(new(enr.Record), id, nil, network.DirInbound)
|
||||
p.SetConnectionState(id, state)
|
||||
p.SetMetadata(id, &pb.MetaData{
|
||||
SeqNumber: 0,
|
||||
|
||||
@@ -44,6 +44,7 @@ go_test(
|
||||
"@com_github_libp2p_go_libp2p_core//network:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p_core//peer:go_default_library",
|
||||
"@com_github_multiformats_go_multiaddr//:go_default_library",
|
||||
"@com_github_prysmaticlabs_ethereumapis//eth/v1:go_default_library",
|
||||
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
|
||||
"@com_github_sirupsen_logrus//:go_default_library",
|
||||
],
|
||||
|
||||
@@ -55,10 +55,13 @@ const (
|
||||
|
||||
const (
|
||||
// ColocationLimit restricts how many peer identities we can see from a single ip or ipv6 subnet.
|
||||
ColocationLimit = 5
|
||||
ColocationLimit = 7
|
||||
|
||||
// Additional buffer beyond current peer limit, from which we can store the relevant peer statuses.
|
||||
maxLimitBuffer = 150
|
||||
|
||||
// InboundRatio is the proportion of our connected peer limit at which we will allow inbound peers.
|
||||
InboundRatio = float64(2) / 5
|
||||
)
|
||||
|
||||
// Status is the structure holding the peer status information.
|
||||
@@ -188,6 +191,22 @@ func (p *Status) IsActive(pid peer.ID) bool {
|
||||
return ok && (peerData.ConnState == PeerConnected || peerData.ConnState == PeerConnecting)
|
||||
}
|
||||
|
||||
// IsAboveInboundLimit checks if we are above our current inbound
|
||||
// peer limit.
|
||||
func (p *Status) IsAboveInboundLimit() bool {
|
||||
p.store.RLock()
|
||||
defer p.store.RUnlock()
|
||||
totalInbound := 0
|
||||
for _, peerData := range p.store.Peers() {
|
||||
if peerData.ConnState == PeerConnected &&
|
||||
peerData.Direction == network.DirInbound {
|
||||
totalInbound += 1
|
||||
}
|
||||
}
|
||||
inboundLimit := int(float64(p.ConnectedPeerLimit()) * InboundRatio)
|
||||
return totalInbound > inboundLimit
|
||||
}
|
||||
|
||||
// SetMetadata sets the metadata of the given remote peer.
|
||||
func (p *Status) SetMetadata(pid peer.ID, metaData *pb.MetaData) {
|
||||
p.store.Lock()
|
||||
@@ -582,6 +601,16 @@ func (p *Status) HighestEpoch() uint64 {
|
||||
return helpers.SlotToEpoch(highestSlot)
|
||||
}
|
||||
|
||||
// ConnectedPeerLimit returns the peer limit of
|
||||
// concurrent peers connected to the beacon-node.
|
||||
func (p *Status) ConnectedPeerLimit() uint64 {
|
||||
maxLim := p.MaxPeerLimit()
|
||||
if maxLim <= maxLimitBuffer {
|
||||
return 0
|
||||
}
|
||||
return uint64(maxLim) - maxLimitBuffer
|
||||
}
|
||||
|
||||
func (p *Status) isfromBadIP(pid peer.ID) bool {
|
||||
p.store.RLock()
|
||||
defer p.store.RUnlock()
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
"github.com/libp2p/go-libp2p-core/network"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1"
|
||||
"github.com/prysmaticlabs/go-bitfield"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/peerdata"
|
||||
@@ -537,7 +538,7 @@ func TestPeerIPTracker(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
badPeers = append(badPeers, createPeer(t, p, addr))
|
||||
badPeers = append(badPeers, createPeer(t, p, addr, network.DirUnknown, peerdata.PeerConnectionState(ethpb.ConnectionState_DISCONNECTED)))
|
||||
}
|
||||
for _, pr := range badPeers {
|
||||
assert.Equal(t, true, p.IsBad(pr), "peer with bad ip is not bad")
|
||||
@@ -547,7 +548,7 @@ func TestPeerIPTracker(t *testing.T) {
|
||||
// from the peer store.
|
||||
for i := 0; i < p.MaxPeerLimit()+100; i++ {
|
||||
// Peer added to peer handler.
|
||||
pid := addPeer(t, p, peers.PeerConnected)
|
||||
pid := addPeer(t, p, peers.PeerDisconnected)
|
||||
p.Scorers().BadResponsesScorer().Increment(pid)
|
||||
}
|
||||
p.Prune()
|
||||
@@ -623,6 +624,39 @@ func TestTrimmedOrderedPeers(t *testing.T) {
|
||||
assert.Equal(t, pid1, pids[2], "Incorrect third peer")
|
||||
}
|
||||
|
||||
func TestConcurrentPeerLimitHolds(t *testing.T) {
|
||||
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
|
||||
PeerLimit: 30,
|
||||
ScorerParams: &scorers.Config{
|
||||
BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{
|
||||
Threshold: 1,
|
||||
},
|
||||
},
|
||||
})
|
||||
assert.Equal(t, true, uint64(p.MaxPeerLimit()) > p.ConnectedPeerLimit(), "max peer limit doesnt exceed connected peer limit")
|
||||
}
|
||||
|
||||
func TestAtInboundPeerLimit(t *testing.T) {
|
||||
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
|
||||
PeerLimit: 30,
|
||||
ScorerParams: &scorers.Config{
|
||||
BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{
|
||||
Threshold: 1,
|
||||
},
|
||||
},
|
||||
})
|
||||
for i := 0; i < 15; i++ {
|
||||
// Peer added to peer handler.
|
||||
createPeer(t, p, nil, network.DirOutbound, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED))
|
||||
}
|
||||
assert.Equal(t, false, p.IsAboveInboundLimit(), "Inbound limit exceeded")
|
||||
for i := 0; i < 15; i++ {
|
||||
// Peer added to peer handler.
|
||||
createPeer(t, p, nil, network.DirInbound, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED))
|
||||
}
|
||||
assert.Equal(t, true, p.IsAboveInboundLimit(), "Inbound limit not exceeded")
|
||||
}
|
||||
|
||||
func TestStatus_BestPeer(t *testing.T) {
|
||||
type peerConfig struct {
|
||||
headSlot uint64
|
||||
@@ -874,7 +908,8 @@ func addPeer(t *testing.T, p *peers.Status, state peerdata.PeerConnectionState)
|
||||
return id
|
||||
}
|
||||
|
||||
func createPeer(t *testing.T, p *peers.Status, addr ma.Multiaddr) peer.ID {
|
||||
func createPeer(t *testing.T, p *peers.Status, addr ma.Multiaddr,
|
||||
dir network.Direction, state peerdata.PeerConnectionState) peer.ID {
|
||||
mhBytes := []byte{0x11, 0x04}
|
||||
idBytes := make([]byte, 4)
|
||||
_, err := rand.Read(idBytes)
|
||||
@@ -882,6 +917,7 @@ func createPeer(t *testing.T, p *peers.Status, addr ma.Multiaddr) peer.ID {
|
||||
mhBytes = append(mhBytes, idBytes...)
|
||||
id, err := peer.IDFromBytes(mhBytes)
|
||||
require.NoError(t, err)
|
||||
p.Add(new(enr.Record), id, addr, network.DirUnknown)
|
||||
p.Add(new(enr.Record), id, addr, dir)
|
||||
p.SetConnectionState(id, state)
|
||||
return id
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user