Add Basic Support for IP Tracker (#7844)

* add basic support for ip tracker

* clean up

* check for it

* fix

* Update beacon-chain/p2p/peers/status.go

* fix
This commit is contained in:
Nishant Das
2020-11-19 20:54:19 +08:00
committed by GitHub
parent 8a256de2dd
commit c4a1fe4d0d
3 changed files with 158 additions and 9 deletions

View File

@@ -18,6 +18,7 @@ go_library(
"@com_github_libp2p_go_libp2p_core//network:go_default_library",
"@com_github_libp2p_go_libp2p_core//peer:go_default_library",
"@com_github_multiformats_go_multiaddr//:go_default_library",
"@com_github_multiformats_go_multiaddr//net:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
],
)

View File

@@ -32,6 +32,7 @@ import (
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/peerdata"
@@ -52,14 +53,20 @@ const (
PeerConnecting
)
// Additional buffer beyond current peer limit, from which we can store the relevant peer statuses.
const maxLimitBuffer = 150
const (
// ColocationLimit restricts how many peer identities we can see from a single ip or ipv6 subnet.
ColocationLimit = 5
// Additional buffer beyond current peer limit, from which we can store the relevant peer statuses.
maxLimitBuffer = 150
)
// Status is the structure holding the peer status information.
type Status struct {
ctx context.Context
scorers *scorers.Service
store *peerdata.Store
ctx context.Context
scorers *scorers.Service
store *peerdata.Store
ipTracker map[string]uint64
}
// StatusConfig represents peer status service params.
@@ -76,9 +83,10 @@ func NewStatus(ctx context.Context, config *StatusConfig) *Status {
MaxPeers: maxLimitBuffer + config.PeerLimit,
})
return &Status{
ctx: ctx,
store: store,
scorers: scorers.NewService(ctx, store, config.ScorerParams),
ctx: ctx,
store: store,
scorers: scorers.NewService(ctx, store, config.ScorerParams),
ipTracker: map[string]uint64{},
}
}
@@ -100,11 +108,15 @@ func (p *Status) Add(record *enr.Record, pid peer.ID, address ma.Multiaddr, dire
if peerData, ok := p.store.PeerData(pid); ok {
// Peer already exists, just update its address info.
prevAddress := peerData.Address
peerData.Address = address
peerData.Direction = direction
if record != nil {
peerData.Enr = record
}
if !sameIP(prevAddress, address) {
p.addIpToTracker(pid)
}
return
}
peerData := &peerdata.PeerData{
@@ -117,6 +129,7 @@ func (p *Status) Add(record *enr.Record, pid peer.ID, address ma.Multiaddr, dire
peerData.Enr = record
}
p.store.SetPeerData(pid, peerData)
p.addIpToTracker(pid)
}
// Address returns the multiaddress of the given remote peer.
@@ -269,7 +282,7 @@ func (p *Status) ChainStateLastUpdated(pid peer.ID) (time.Time, error) {
// IsBad states if the peer is to be considered bad (by *any* of the registered scorers).
// If the peer is unknown this will return `false`, which makes using this function easier than returning an error.
func (p *Status) IsBad(pid peer.ID) bool {
return p.scorers.IsBadPeer(pid)
return p.isfromBadIP(pid) || p.scorers.IsBadPeer(pid)
}
// NextValidTime gets the earliest possible time it is to contact/dial
@@ -452,6 +465,7 @@ func (p *Status) Prune() {
for _, peerData := range peersToPrune {
p.store.DeletePeerData(peerData.pid)
}
p.tallyIPTracker()
}
// BestFinalized returns the highest finalized epoch equal to or higher than ours that is agreed
@@ -568,6 +582,88 @@ func (p *Status) HighestEpoch() uint64 {
return helpers.SlotToEpoch(highestSlot)
}
func (p *Status) isfromBadIP(pid peer.ID) bool {
p.store.RLock()
defer p.store.RUnlock()
peerData, ok := p.store.PeerData(pid)
if !ok {
return false
}
if peerData.Address == nil {
return false
}
ip, err := manet.ToIP(peerData.Address)
if err != nil {
return true
}
if val, ok := p.ipTracker[ip.String()]; ok {
if val > ColocationLimit {
return true
}
}
return false
}
func (p *Status) addIpToTracker(pid peer.ID) {
data, ok := p.store.PeerData(pid)
if !ok {
return
}
if data.Address == nil {
return
}
ip, err := manet.ToIP(data.Address)
if err != nil {
// Should never happen, it is
// assumed every IP coming in
// is a valid ip.
return
}
// Ignore loopback addresses.
if ip.IsLoopback() {
return
}
stringIP := ip.String()
p.ipTracker[stringIP] += 1
}
func (p *Status) tallyIPTracker() {
tracker := map[string]uint64{}
// Iterate through all peers.
for _, peerData := range p.store.Peers() {
if peerData.Address == nil {
continue
}
ip, err := manet.ToIP(peerData.Address)
if err != nil {
// Should never happen, it is
// assumed every IP coming in
// is a valid ip.
continue
}
stringIP := ip.String()
tracker[stringIP] += 1
}
p.ipTracker = tracker
}
func sameIP(firstAddr, secondAddr ma.Multiaddr) bool {
// Exit early if we do get nil multiaddresses
if firstAddr == nil || secondAddr == nil {
return false
}
firstIP, err := manet.ToIP(firstAddr)
if err != nil {
return false
}
secondIP, err := manet.ToIP(secondAddr)
if err != nil {
return false
}
return firstIP.Equal(secondIP)
}
func retrieveIndicesFromBitfield(bitV bitfield.Bitvector64) []uint64 {
committeeIdxs := make([]uint64, 0, bitV.Count())
for i := uint64(0); i < 64; i++ {

View File

@@ -3,6 +3,7 @@ package peers_test
import (
"context"
"crypto/rand"
"strconv"
"testing"
"time"
@@ -517,6 +518,45 @@ func TestPrune(t *testing.T) {
assert.ErrorContains(t, "peer unknown", err)
}
func TestPeerIPTracker(t *testing.T) {
maxBadResponses := 2
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &scorers.Config{
BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{
Threshold: maxBadResponses,
},
},
})
badIP := "211.227.218.116"
badPeers := []peer.ID{}
for i := 0; i < peers.ColocationLimit+10; i++ {
port := strconv.Itoa(3000 + i)
addr, err := ma.NewMultiaddr("/ip4/" + badIP + "/tcp/" + port)
if err != nil {
t.Fatal(err)
}
badPeers = append(badPeers, createPeer(t, p, addr))
}
for _, pr := range badPeers {
assert.Equal(t, true, p.IsBad(pr), "peer with bad ip is not bad")
}
// Add in bad peers, so that our records are trimmed out
// from the peer store.
for i := 0; i < p.MaxPeerLimit()+100; i++ {
// Peer added to peer handler.
pid := addPeer(t, p, peers.PeerConnected)
p.Scorers().BadResponsesScorer().Increment(pid)
}
p.Prune()
for _, pr := range badPeers {
assert.Equal(t, false, p.IsBad(pr), "peer with good ip is regarded as bad")
}
}
func TestTrimmedOrderedPeers(t *testing.T) {
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
@@ -833,3 +873,15 @@ func addPeer(t *testing.T, p *peers.Status, state peerdata.PeerConnectionState)
})
return id
}
func createPeer(t *testing.T, p *peers.Status, addr ma.Multiaddr) peer.ID {
mhBytes := []byte{0x11, 0x04}
idBytes := make([]byte, 4)
_, err := rand.Read(idBytes)
require.NoError(t, err)
mhBytes = append(mhBytes, idBytes...)
id, err := peer.IDFromBytes(mhBytes)
require.NoError(t, err)
p.Add(new(enr.Record), id, addr, network.DirUnknown)
return id
}