Improve connection/disconnection logging. (#14665)

* Improve disconnection logs.

* Update beacon-chain/p2p/handshake.go

Co-authored-by: Sammy Rosso <15244892+saolyn@users.noreply.github.com>

* Address Sammy's comment.

* Update beacon-chain/p2p/handshake.go

Co-authored-by: Sammy Rosso <15244892+saolyn@users.noreply.github.com>

* Fix Sammy's comment.

* Fix Sammy's comment.

* `MockPeerManager`: Stop mixing value and pointer receivers (deepsource).

* Remove unused parameters (deepsource)

* Fix receiver names (deepsource)

* Change not after into before (deepsource)

* Update beacon-chain/p2p/handshake.go

Co-authored-by: Sammy Rosso <15244892+saolyn@users.noreply.github.com>

* Update beacon-chain/p2p/peers/status.go

Co-authored-by: Sammy Rosso <15244892+saolyn@users.noreply.github.com>

---------

Co-authored-by: Sammy Rosso <15244892+saolyn@users.noreply.github.com>
This commit is contained in:
Manu NALEPA
2024-11-26 18:53:27 +01:00
committed by GitHub
parent f27092fa91
commit 0475631543
37 changed files with 436 additions and 326 deletions

View File

@@ -28,6 +28,7 @@ The format is based on Keep a Changelog, and this project adheres to Semantic Ve
- Added Validator REST mode use of Attestation V2 endpoints and Electra attestations.
- PeerDAS: Added proto for `DataColumnIdentifier`, `DataColumnSidecar`, `DataColumnSidecarsByRangeRequest` and `MetadataV2`.
- Better attestation packing for Electra. [PR](https://github.com/prysmaticlabs/prysm/pull/14534)
- P2P: Add logs when a peer is (dis)connected. Add the reason of the disconnection when we initiate it.
### Changed

View File

@@ -33,7 +33,7 @@ func (*Service) InterceptPeerDial(_ peer.ID) (allow bool) {
// multiaddr for the given peer.
func (s *Service) InterceptAddrDial(pid peer.ID, m multiaddr.Multiaddr) (allow bool) {
// Disallow bad peers from dialing in.
if s.peers.IsBad(pid) {
if s.peers.IsBad(pid) != nil {
return false
}
return filterConnections(s.addrFilter, m)

View File

@@ -50,7 +50,7 @@ func TestPeer_AtMaxLimit(t *testing.T) {
}()
for i := 0; i < highWatermarkBuffer; i++ {
addPeer(t, s.peers, peers.PeerConnected, false)
addPeer(t, s.peers, peers.Connected, false)
}
// create alternate host
@@ -159,7 +159,7 @@ func TestService_RejectInboundPeersBeyondLimit(t *testing.T) {
inboundLimit += 1
// Add in up to inbound peer limit.
for i := 0; i < int(inboundLimit); i++ {
addPeer(t, s.peers, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED), false)
addPeer(t, s.peers, peerdata.ConnectionState(ethpb.ConnectionState_CONNECTED), false)
}
valid = s.InterceptAccept(&maEndpoints{raddr: multiAddress})
if valid {

View File

@@ -189,7 +189,7 @@ func (s *Service) RefreshENR() {
s.updateSubnetRecordWithMetadataV2(bitV, bitS)
}
// ping all peers to inform them of new metadata
s.pingPeers()
s.pingPeersAndLogEnr()
}
// listen for new nodes watches for new nodes in the network and adds them to the peerstore.
@@ -452,7 +452,7 @@ func (s *Service) filterPeer(node *enode.Node) bool {
}
// Ignore bad nodes.
if s.peers.IsBad(peerData.ID) {
if s.peers.IsBad(peerData.ID) != nil {
return false
}

View File

@@ -378,14 +378,14 @@ func TestInboundPeerLimit(t *testing.T) {
}
for i := 0; i < 30; i++ {
_ = addPeer(t, s.peers, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED), false)
_ = addPeer(t, s.peers, peerdata.ConnectionState(ethpb.ConnectionState_CONNECTED), false)
}
require.Equal(t, true, s.isPeerAtLimit(false), "not at limit for outbound peers")
require.Equal(t, false, s.isPeerAtLimit(true), "at limit for inbound peers")
for i := 0; i < highWatermarkBuffer; i++ {
_ = addPeer(t, s.peers, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED), false)
_ = addPeer(t, s.peers, peerdata.ConnectionState(ethpb.ConnectionState_CONNECTED), false)
}
require.Equal(t, true, s.isPeerAtLimit(true), "not at limit for inbound peers")
@@ -404,13 +404,13 @@ func TestOutboundPeerThreshold(t *testing.T) {
}
for i := 0; i < 2; i++ {
_ = addPeer(t, s.peers, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED), true)
_ = addPeer(t, s.peers, peerdata.ConnectionState(ethpb.ConnectionState_CONNECTED), true)
}
require.Equal(t, true, s.isBelowOutboundPeerThreshold(), "not at outbound peer threshold")
for i := 0; i < 3; i++ {
_ = addPeer(t, s.peers, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED), true)
_ = addPeer(t, s.peers, peerdata.ConnectionState(ethpb.ConnectionState_CONNECTED), true)
}
require.Equal(t, false, s.isBelowOutboundPeerThreshold(), "still at outbound peer threshold")
@@ -477,7 +477,7 @@ func TestCorrectUDPVersion(t *testing.T) {
}
// addPeer is a helper to add a peer with a given connection state)
func addPeer(t *testing.T, p *peers.Status, state peerdata.PeerConnectionState, outbound bool) peer.ID {
func addPeer(t *testing.T, p *peers.Status, state peerdata.ConnectionState, outbound bool) peer.ID {
// Set up some peers with different states
mhBytes := []byte{0x11, 0x04}
idBytes := make([]byte, 4)

View File

@@ -2,7 +2,6 @@ package p2p
import (
"context"
"errors"
"fmt"
"io"
"sync"
@@ -10,6 +9,7 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers/peerdata"
prysmTime "github.com/prysmaticlabs/prysm/v5/time"
@@ -25,6 +25,46 @@ func peerMultiaddrString(conn network.Conn) string {
return fmt.Sprintf("%s/p2p/%s", conn.RemoteMultiaddr().String(), conn.RemotePeer().String())
}
func (s *Service) connectToPeer(conn network.Conn) {
s.peers.SetConnectionState(conn.RemotePeer(), peers.Connected)
// Go through the handshake process.
log.WithFields(logrus.Fields{
"direction": conn.Stat().Direction.String(),
"multiAddr": peerMultiaddrString(conn),
"activePeers": len(s.peers.Active()),
}).Debug("Initiate peer connection")
}
func (s *Service) disconnectFromPeerOnError(
conn network.Conn,
goodByeFunc func(ctx context.Context, id peer.ID) error,
badPeerErr error,
) {
// Get the remote peer ID.
remotePeerID := conn.RemotePeer()
// Set the peer to disconnecting state.
s.peers.SetConnectionState(remotePeerID, peers.Disconnecting)
// Only attempt a goodbye if we are still connected to the peer.
if s.host.Network().Connectedness(remotePeerID) == network.Connected {
if err := goodByeFunc(context.TODO(), remotePeerID); err != nil {
log.WithError(err).Error("Unable to disconnect from peer")
}
}
log.
WithError(badPeerErr).
WithFields(logrus.Fields{
"multiaddr": peerMultiaddrString(conn),
"direction": conn.Stat().Direction.String(),
"remainingActivePeers": len(s.peers.Active()),
}).
Debug("Initiate peer disconnection")
s.peers.SetConnectionState(remotePeerID, peers.Disconnected)
}
// AddConnectionHandler adds a callback function which handles the connection with a
// newly added peer. It performs a handshake with that peer by sending a hello request
// and validating the response from the peer.
@@ -57,18 +97,9 @@ func (s *Service) AddConnectionHandler(reqFunc, goodByeFunc func(ctx context.Con
}
s.host.Network().Notify(&network.NotifyBundle{
ConnectedF: func(net network.Network, conn network.Conn) {
ConnectedF: func(_ network.Network, conn network.Conn) {
remotePeer := conn.RemotePeer()
disconnectFromPeer := func() {
s.peers.SetConnectionState(remotePeer, peers.PeerDisconnecting)
// Only attempt a goodbye if we are still connected to the peer.
if s.host.Network().Connectedness(remotePeer) == network.Connected {
if err := goodByeFunc(context.TODO(), remotePeer); err != nil {
log.WithError(err).Error("Unable to disconnect from peer")
}
}
s.peers.SetConnectionState(remotePeer, peers.PeerDisconnected)
}
// Connection handler must be non-blocking as part of libp2p design.
go func() {
if peerHandshaking(remotePeer) {
@@ -77,28 +108,21 @@ func (s *Service) AddConnectionHandler(reqFunc, goodByeFunc func(ctx context.Con
return
}
defer peerFinished(remotePeer)
// Handle the various pre-existing conditions that will result in us not handshaking.
peerConnectionState, err := s.peers.ConnectionState(remotePeer)
if err == nil && (peerConnectionState == peers.PeerConnected || peerConnectionState == peers.PeerConnecting) {
if err == nil && (peerConnectionState == peers.Connected || peerConnectionState == peers.Connecting) {
log.WithField("currentState", peerConnectionState).WithField("reason", "already active").Trace("Ignoring connection request")
return
}
s.peers.Add(nil /* ENR */, remotePeer, conn.RemoteMultiaddr(), conn.Stat().Direction)
// Defensive check in the event we still get a bad peer.
if s.peers.IsBad(remotePeer) {
log.WithField("reason", "bad peer").Trace("Ignoring connection request")
disconnectFromPeer()
if err := s.peers.IsBad(remotePeer); err != nil {
s.disconnectFromPeerOnError(conn, goodByeFunc, err)
return
}
validPeerConnection := func() {
s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerConnected)
// Go through the handshake process.
log.WithFields(logrus.Fields{
"direction": conn.Stat().Direction,
"multiAddr": peerMultiaddrString(conn),
"activePeers": len(s.peers.Active()),
}).Debug("Peer connected")
}
// Do not perform handshake on inbound dials.
if conn.Stat().Direction == network.DirInbound {
@@ -117,63 +141,80 @@ func (s *Service) AddConnectionHandler(reqFunc, goodByeFunc func(ctx context.Con
// If peer hasn't sent a status request, we disconnect with them
if _, err := s.peers.ChainState(remotePeer); errors.Is(err, peerdata.ErrPeerUnknown) || errors.Is(err, peerdata.ErrNoPeerStatus) {
statusMessageMissing.Inc()
disconnectFromPeer()
s.disconnectFromPeerOnError(conn, goodByeFunc, errors.Wrap(err, "chain state"))
return
}
if peerExists {
updated, err := s.peers.ChainStateLastUpdated(remotePeer)
if err != nil {
disconnectFromPeer()
s.disconnectFromPeerOnError(conn, goodByeFunc, errors.Wrap(err, "chain state last updated"))
return
}
// exit if we don't receive any current status messages from
// peer.
if updated.IsZero() || !updated.After(currentTime) {
disconnectFromPeer()
// Exit if we don't receive any current status messages from peer.
if updated.IsZero() {
s.disconnectFromPeerOnError(conn, goodByeFunc, errors.New("is zero"))
return
}
if updated.Before(currentTime) {
s.disconnectFromPeerOnError(conn, goodByeFunc, errors.New("did not update"))
return
}
}
validPeerConnection()
s.connectToPeer(conn)
return
}
s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerConnecting)
s.peers.SetConnectionState(conn.RemotePeer(), peers.Connecting)
if err := reqFunc(context.TODO(), conn.RemotePeer()); err != nil && !errors.Is(err, io.EOF) {
log.WithError(err).Trace("Handshake failed")
disconnectFromPeer()
s.disconnectFromPeerOnError(conn, goodByeFunc, err)
return
}
validPeerConnection()
s.connectToPeer(conn)
}()
},
})
}
// AddDisconnectionHandler disconnects from peers. It handles updating the peer status.
// AddDisconnectionHandler disconnects from peers. It handles updating the peer status.
// This also calls the handler responsible for maintaining other parts of the sync or p2p system.
func (s *Service) AddDisconnectionHandler(handler func(ctx context.Context, id peer.ID) error) {
s.host.Network().Notify(&network.NotifyBundle{
DisconnectedF: func(net network.Network, conn network.Conn) {
log := log.WithField("multiAddr", peerMultiaddrString(conn))
peerID := conn.RemotePeer()
log.WithFields(logrus.Fields{
"multiAddr": peerMultiaddrString(conn),
"direction": conn.Stat().Direction.String(),
})
// Must be handled in a goroutine as this callback cannot be blocking.
go func() {
// Exit early if we are still connected to the peer.
if net.Connectedness(conn.RemotePeer()) == network.Connected {
if net.Connectedness(peerID) == network.Connected {
return
}
priorState, err := s.peers.ConnectionState(conn.RemotePeer())
priorState, err := s.peers.ConnectionState(peerID)
if err != nil {
// Can happen if the peer has already disconnected, so...
priorState = peers.PeerDisconnected
priorState = peers.Disconnected
}
s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerDisconnecting)
s.peers.SetConnectionState(peerID, peers.Disconnecting)
if err := handler(context.TODO(), conn.RemotePeer()); err != nil {
log.WithError(err).Error("Disconnect handler failed")
}
s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerDisconnected)
s.peers.SetConnectionState(peerID, peers.Disconnected)
// Only log disconnections if we were fully connected.
if priorState == peers.PeerConnected {
log.WithField("activePeers", len(s.peers.Active())).Debug("Peer disconnected")
if priorState == peers.Connected {
activePeersCount := len(s.peers.Active())
log.WithField("remainingActivePeers", activePeersCount).Debug("Peer disconnected")
}
}()
},

View File

@@ -23,8 +23,8 @@ var (
ErrNoPeerStatus = errors.New("no chain status for peer")
)
// PeerConnectionState is the state of the connection.
type PeerConnectionState ethpb.ConnectionState
// ConnectionState is the state of the connection.
type ConnectionState ethpb.ConnectionState
// StoreConfig holds peer store parameters.
type StoreConfig struct {
@@ -49,7 +49,7 @@ type PeerData struct {
// Network related data.
Address ma.Multiaddr
Direction network.Direction
ConnState PeerConnectionState
ConnState ConnectionState
Enr *enr.Record
NextValidTime time.Time
// Chain related data.

View File

@@ -20,6 +20,7 @@ go_library(
"//crypto/rand:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"@com_github_libp2p_go_libp2p//core/peer:go_default_library",
"@com_github_pkg_errors//:go_default_library",
],
)

View File

@@ -4,6 +4,7 @@ import (
"time"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers/peerdata"
)
@@ -61,7 +62,7 @@ func (s *BadResponsesScorer) Score(pid peer.ID) float64 {
// scoreNoLock is a lock-free version of Score.
func (s *BadResponsesScorer) scoreNoLock(pid peer.ID) float64 {
if s.isBadPeerNoLock(pid) {
if s.isBadPeerNoLock(pid) != nil {
return BadPeerScore
}
score := float64(0)
@@ -116,18 +117,24 @@ func (s *BadResponsesScorer) Increment(pid peer.ID) {
// IsBadPeer states if the peer is to be considered bad.
// If the peer is unknown this will return `false`, which makes using this function easier than returning an error.
func (s *BadResponsesScorer) IsBadPeer(pid peer.ID) bool {
func (s *BadResponsesScorer) IsBadPeer(pid peer.ID) error {
s.store.RLock()
defer s.store.RUnlock()
return s.isBadPeerNoLock(pid)
}
// isBadPeerNoLock is lock-free version of IsBadPeer.
func (s *BadResponsesScorer) isBadPeerNoLock(pid peer.ID) bool {
func (s *BadResponsesScorer) isBadPeerNoLock(pid peer.ID) error {
if peerData, ok := s.store.PeerData(pid); ok {
return peerData.BadResponses >= s.config.Threshold
if peerData.BadResponses >= s.config.Threshold {
return errors.Errorf("peer exceeded bad responses threshold: got %d, threshold %d", peerData.BadResponses, s.config.Threshold)
}
return nil
}
return false
return nil
}
// BadPeers returns the peers that are considered bad.
@@ -137,7 +144,7 @@ func (s *BadResponsesScorer) BadPeers() []peer.ID {
badPeers := make([]peer.ID, 0)
for pid := range s.store.Peers() {
if s.isBadPeerNoLock(pid) {
if s.isBadPeerNoLock(pid) != nil {
badPeers = append(badPeers, pid)
}
}

View File

@@ -33,19 +33,19 @@ func TestScorers_BadResponses_Score(t *testing.T) {
assert.Equal(t, 0., scorer.Score(pid), "Unexpected score for unregistered peer")
scorer.Increment(pid)
assert.Equal(t, false, scorer.IsBadPeer(pid))
assert.NoError(t, scorer.IsBadPeer(pid))
assert.Equal(t, -2.5, scorer.Score(pid))
scorer.Increment(pid)
assert.Equal(t, false, scorer.IsBadPeer(pid))
assert.NoError(t, scorer.IsBadPeer(pid))
assert.Equal(t, float64(-5), scorer.Score(pid))
scorer.Increment(pid)
assert.Equal(t, false, scorer.IsBadPeer(pid))
assert.NoError(t, scorer.IsBadPeer(pid))
assert.Equal(t, float64(-7.5), scorer.Score(pid))
scorer.Increment(pid)
assert.Equal(t, true, scorer.IsBadPeer(pid))
assert.NotNil(t, scorer.IsBadPeer(pid))
assert.Equal(t, -100.0, scorer.Score(pid))
}
@@ -152,17 +152,17 @@ func TestScorers_BadResponses_IsBadPeer(t *testing.T) {
})
scorer := peerStatuses.Scorers().BadResponsesScorer()
pid := peer.ID("peer1")
assert.Equal(t, false, scorer.IsBadPeer(pid))
assert.NoError(t, scorer.IsBadPeer(pid))
peerStatuses.Add(nil, pid, nil, network.DirUnknown)
assert.Equal(t, false, scorer.IsBadPeer(pid))
assert.NoError(t, scorer.IsBadPeer(pid))
for i := 0; i < scorers.DefaultBadResponsesThreshold; i++ {
scorer.Increment(pid)
if i == scorers.DefaultBadResponsesThreshold-1 {
assert.Equal(t, true, scorer.IsBadPeer(pid), "Unexpected peer status")
assert.NotNil(t, scorer.IsBadPeer(pid), "Unexpected peer status")
} else {
assert.Equal(t, false, scorer.IsBadPeer(pid), "Unexpected peer status")
assert.NoError(t, scorer.IsBadPeer(pid), "Unexpected peer status")
}
}
}
@@ -185,11 +185,11 @@ func TestScorers_BadResponses_BadPeers(t *testing.T) {
scorer.Increment(pids[2])
scorer.Increment(pids[4])
}
assert.Equal(t, false, scorer.IsBadPeer(pids[0]), "Invalid peer status")
assert.Equal(t, true, scorer.IsBadPeer(pids[1]), "Invalid peer status")
assert.Equal(t, true, scorer.IsBadPeer(pids[2]), "Invalid peer status")
assert.Equal(t, false, scorer.IsBadPeer(pids[3]), "Invalid peer status")
assert.Equal(t, true, scorer.IsBadPeer(pids[4]), "Invalid peer status")
assert.NoError(t, scorer.IsBadPeer(pids[0]), "Invalid peer status")
assert.NotNil(t, scorer.IsBadPeer(pids[1]), "Invalid peer status")
assert.NotNil(t, scorer.IsBadPeer(pids[2]), "Invalid peer status")
assert.NoError(t, scorer.IsBadPeer(pids[3]), "Invalid peer status")
assert.NotNil(t, scorer.IsBadPeer(pids[4]), "Invalid peer status")
want := []peer.ID{pids[1], pids[2], pids[4]}
badPeers := scorer.BadPeers()
sort.Slice(badPeers, func(i, j int) bool {

View File

@@ -177,8 +177,8 @@ func (s *BlockProviderScorer) processedBlocksNoLock(pid peer.ID) uint64 {
// Block provider scorer cannot guarantee that lower score of a peer is indeed a sign of a bad peer.
// Therefore this scorer never marks peers as bad, and relies on scores to probabilistically sort
// out low-scorers (see WeightSorted method).
func (*BlockProviderScorer) IsBadPeer(_ peer.ID) bool {
return false
func (*BlockProviderScorer) IsBadPeer(_ peer.ID) error {
return nil
}
// BadPeers returns the peers that are considered bad.

View File

@@ -119,7 +119,7 @@ func TestScorers_BlockProvider_Score(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Run(tt.name, func(*testing.T) {
peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &scorers.Config{
@@ -224,7 +224,7 @@ func TestScorers_BlockProvider_Sorted(t *testing.T) {
}{
{
name: "no peers",
update: func(s *scorers.BlockProviderScorer) {},
update: func(*scorers.BlockProviderScorer) {},
have: []peer.ID{},
want: []peer.ID{},
},
@@ -451,7 +451,7 @@ func TestScorers_BlockProvider_FormatScorePretty(t *testing.T) {
})
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Run(tt.name, func(*testing.T) {
peerStatuses := peerStatusGen()
scorer := peerStatuses.Scorers().BlockProviderScorer()
if tt.update != nil {
@@ -481,8 +481,8 @@ func TestScorers_BlockProvider_BadPeerMarking(t *testing.T) {
})
scorer := peerStatuses.Scorers().BlockProviderScorer()
assert.Equal(t, false, scorer.IsBadPeer("peer1"), "Unexpected status for unregistered peer")
assert.NoError(t, scorer.IsBadPeer("peer1"), "Unexpected status for unregistered peer")
scorer.IncrementProcessedBlocks("peer1", 64)
assert.Equal(t, false, scorer.IsBadPeer("peer1"))
assert.NoError(t, scorer.IsBadPeer("peer1"))
assert.Equal(t, 0, len(scorer.BadPeers()))
}

View File

@@ -2,6 +2,7 @@ package scorers
import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers/peerdata"
pbrpc "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
)
@@ -51,19 +52,24 @@ func (s *GossipScorer) scoreNoLock(pid peer.ID) float64 {
}
// IsBadPeer states if the peer is to be considered bad.
func (s *GossipScorer) IsBadPeer(pid peer.ID) bool {
func (s *GossipScorer) IsBadPeer(pid peer.ID) error {
s.store.RLock()
defer s.store.RUnlock()
return s.isBadPeerNoLock(pid)
}
// isBadPeerNoLock is lock-free version of IsBadPeer.
func (s *GossipScorer) isBadPeerNoLock(pid peer.ID) bool {
func (s *GossipScorer) isBadPeerNoLock(pid peer.ID) error {
peerData, ok := s.store.PeerData(pid)
if !ok {
return false
return nil
}
return peerData.GossipScore < gossipThreshold
if peerData.GossipScore < gossipThreshold {
return errors.Errorf("gossip score below threshold: got %f - threshold %f", peerData.GossipScore, gossipThreshold)
}
return nil
}
// BadPeers returns the peers that are considered bad.
@@ -73,7 +79,7 @@ func (s *GossipScorer) BadPeers() []peer.ID {
badPeers := make([]peer.ID, 0)
for pid := range s.store.Peers() {
if s.isBadPeerNoLock(pid) {
if s.isBadPeerNoLock(pid) != nil {
badPeers = append(badPeers, pid)
}
}

View File

@@ -21,7 +21,7 @@ func TestScorers_Gossip_Score(t *testing.T) {
}{
{
name: "nonexistent peer",
update: func(scorer *scorers.GossipScorer) {
update: func(*scorers.GossipScorer) {
},
check: func(scorer *scorers.GossipScorer) {
assert.Equal(t, 0.0, scorer.Score("peer1"), "Unexpected score")
@@ -34,7 +34,7 @@ func TestScorers_Gossip_Score(t *testing.T) {
},
check: func(scorer *scorers.GossipScorer) {
assert.Equal(t, -101.0, scorer.Score("peer1"), "Unexpected score")
assert.Equal(t, true, scorer.IsBadPeer("peer1"), "Unexpected good peer")
assert.NotNil(t, scorer.IsBadPeer("peer1"), "Unexpected good peer")
},
},
{
@@ -44,7 +44,7 @@ func TestScorers_Gossip_Score(t *testing.T) {
},
check: func(scorer *scorers.GossipScorer) {
assert.Equal(t, 10.0, scorer.Score("peer1"), "Unexpected score")
assert.Equal(t, false, scorer.IsBadPeer("peer1"), "Unexpected bad peer")
assert.Equal(t, nil, scorer.IsBadPeer("peer1"), "Unexpected bad peer")
_, _, topicMap, err := scorer.GossipData("peer1")
assert.NoError(t, err)
assert.Equal(t, uint64(100), topicMap["a"].TimeInMesh, "incorrect time in mesh")
@@ -53,7 +53,7 @@ func TestScorers_Gossip_Score(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Run(tt.name, func(*testing.T) {
peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{
ScorerParams: &scorers.Config{},
})

View File

@@ -46,7 +46,7 @@ func (s *PeerStatusScorer) Score(pid peer.ID) float64 {
// scoreNoLock is a lock-free version of Score.
func (s *PeerStatusScorer) scoreNoLock(pid peer.ID) float64 {
if s.isBadPeerNoLock(pid) {
if s.isBadPeerNoLock(pid) != nil {
return BadPeerScore
}
score := float64(0)
@@ -67,30 +67,34 @@ func (s *PeerStatusScorer) scoreNoLock(pid peer.ID) float64 {
}
// IsBadPeer states if the peer is to be considered bad.
func (s *PeerStatusScorer) IsBadPeer(pid peer.ID) bool {
func (s *PeerStatusScorer) IsBadPeer(pid peer.ID) error {
s.store.RLock()
defer s.store.RUnlock()
return s.isBadPeerNoLock(pid)
}
// isBadPeerNoLock is lock-free version of IsBadPeer.
func (s *PeerStatusScorer) isBadPeerNoLock(pid peer.ID) bool {
func (s *PeerStatusScorer) isBadPeerNoLock(pid peer.ID) error {
peerData, ok := s.store.PeerData(pid)
if !ok {
return false
return nil
}
// Mark peer as bad, if the latest error is one of the terminal ones.
terminalErrs := []error{
p2ptypes.ErrWrongForkDigestVersion,
p2ptypes.ErrInvalidFinalizedRoot,
p2ptypes.ErrInvalidRequest,
}
for _, err := range terminalErrs {
if errors.Is(peerData.ChainStateValidationError, err) {
return true
return err
}
}
return false
return nil
}
// BadPeers returns the peers that are considered bad.
@@ -100,7 +104,7 @@ func (s *PeerStatusScorer) BadPeers() []peer.ID {
badPeers := make([]peer.ID, 0)
for pid := range s.store.Peers() {
if s.isBadPeerNoLock(pid) {
if s.isBadPeerNoLock(pid) != nil {
badPeers = append(badPeers, pid)
}
}

View File

@@ -122,7 +122,7 @@ func TestScorers_PeerStatus_Score(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Run(tt.name, func(*testing.T) {
peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{
ScorerParams: &scorers.Config{},
})
@@ -140,12 +140,12 @@ func TestScorers_PeerStatus_IsBadPeer(t *testing.T) {
ScorerParams: &scorers.Config{},
})
pid := peer.ID("peer1")
assert.Equal(t, false, peerStatuses.Scorers().IsBadPeer(pid))
assert.Equal(t, false, peerStatuses.Scorers().PeerStatusScorer().IsBadPeer(pid))
assert.NoError(t, peerStatuses.Scorers().IsBadPeer(pid))
assert.NoError(t, peerStatuses.Scorers().PeerStatusScorer().IsBadPeer(pid))
peerStatuses.Scorers().PeerStatusScorer().SetPeerStatus(pid, &pb.Status{}, p2ptypes.ErrWrongForkDigestVersion)
assert.Equal(t, true, peerStatuses.Scorers().IsBadPeer(pid))
assert.Equal(t, true, peerStatuses.Scorers().PeerStatusScorer().IsBadPeer(pid))
assert.NotNil(t, peerStatuses.Scorers().IsBadPeer(pid))
assert.NotNil(t, peerStatuses.Scorers().PeerStatusScorer().IsBadPeer(pid))
}
func TestScorers_PeerStatus_BadPeers(t *testing.T) {
@@ -155,22 +155,22 @@ func TestScorers_PeerStatus_BadPeers(t *testing.T) {
pid1 := peer.ID("peer1")
pid2 := peer.ID("peer2")
pid3 := peer.ID("peer3")
assert.Equal(t, false, peerStatuses.Scorers().IsBadPeer(pid1))
assert.Equal(t, false, peerStatuses.Scorers().PeerStatusScorer().IsBadPeer(pid1))
assert.Equal(t, false, peerStatuses.Scorers().IsBadPeer(pid2))
assert.Equal(t, false, peerStatuses.Scorers().PeerStatusScorer().IsBadPeer(pid2))
assert.Equal(t, false, peerStatuses.Scorers().IsBadPeer(pid3))
assert.Equal(t, false, peerStatuses.Scorers().PeerStatusScorer().IsBadPeer(pid3))
assert.NoError(t, peerStatuses.Scorers().IsBadPeer(pid1))
assert.NoError(t, peerStatuses.Scorers().PeerStatusScorer().IsBadPeer(pid1))
assert.NoError(t, peerStatuses.Scorers().IsBadPeer(pid2))
assert.NoError(t, peerStatuses.Scorers().PeerStatusScorer().IsBadPeer(pid2))
assert.NoError(t, peerStatuses.Scorers().IsBadPeer(pid3))
assert.NoError(t, peerStatuses.Scorers().PeerStatusScorer().IsBadPeer(pid3))
peerStatuses.Scorers().PeerStatusScorer().SetPeerStatus(pid1, &pb.Status{}, p2ptypes.ErrWrongForkDigestVersion)
peerStatuses.Scorers().PeerStatusScorer().SetPeerStatus(pid2, &pb.Status{}, nil)
peerStatuses.Scorers().PeerStatusScorer().SetPeerStatus(pid3, &pb.Status{}, p2ptypes.ErrWrongForkDigestVersion)
assert.Equal(t, true, peerStatuses.Scorers().IsBadPeer(pid1))
assert.Equal(t, true, peerStatuses.Scorers().PeerStatusScorer().IsBadPeer(pid1))
assert.Equal(t, false, peerStatuses.Scorers().IsBadPeer(pid2))
assert.Equal(t, false, peerStatuses.Scorers().PeerStatusScorer().IsBadPeer(pid2))
assert.Equal(t, true, peerStatuses.Scorers().IsBadPeer(pid3))
assert.Equal(t, true, peerStatuses.Scorers().PeerStatusScorer().IsBadPeer(pid3))
assert.NotNil(t, peerStatuses.Scorers().IsBadPeer(pid1))
assert.NotNil(t, peerStatuses.Scorers().PeerStatusScorer().IsBadPeer(pid1))
assert.NoError(t, peerStatuses.Scorers().IsBadPeer(pid2))
assert.NoError(t, peerStatuses.Scorers().PeerStatusScorer().IsBadPeer(pid2))
assert.NotNil(t, peerStatuses.Scorers().IsBadPeer(pid3))
assert.NotNil(t, peerStatuses.Scorers().PeerStatusScorer().IsBadPeer(pid3))
assert.Equal(t, 2, len(peerStatuses.Scorers().PeerStatusScorer().BadPeers()))
assert.Equal(t, 2, len(peerStatuses.Scorers().BadPeers()))
}

View File

@@ -6,6 +6,7 @@ import (
"time"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers/peerdata"
"github.com/prysmaticlabs/prysm/v5/config/features"
)
@@ -24,7 +25,7 @@ const BadPeerScore = gossipThreshold
// Scorer defines minimum set of methods every peer scorer must expose.
type Scorer interface {
Score(pid peer.ID) float64
IsBadPeer(pid peer.ID) bool
IsBadPeer(pid peer.ID) error
BadPeers() []peer.ID
}
@@ -124,26 +125,29 @@ func (s *Service) ScoreNoLock(pid peer.ID) float64 {
}
// IsBadPeer traverses all the scorers to see if any of them classifies peer as bad.
func (s *Service) IsBadPeer(pid peer.ID) bool {
func (s *Service) IsBadPeer(pid peer.ID) error {
s.store.RLock()
defer s.store.RUnlock()
return s.IsBadPeerNoLock(pid)
}
// IsBadPeerNoLock is a lock-free version of IsBadPeer.
func (s *Service) IsBadPeerNoLock(pid peer.ID) bool {
if s.scorers.badResponsesScorer.isBadPeerNoLock(pid) {
return true
func (s *Service) IsBadPeerNoLock(pid peer.ID) error {
if err := s.scorers.badResponsesScorer.isBadPeerNoLock(pid); err != nil {
return errors.Wrap(err, "bad responses scorer")
}
if s.scorers.peerStatusScorer.isBadPeerNoLock(pid) {
return true
if err := s.scorers.peerStatusScorer.isBadPeerNoLock(pid); err != nil {
return errors.Wrap(err, "peer status scorer")
}
if features.Get().EnablePeerScorer {
if s.scorers.gossipScorer.isBadPeerNoLock(pid) {
return true
if err := s.scorers.gossipScorer.isBadPeerNoLock(pid); err != nil {
return errors.Wrap(err, "gossip scorer")
}
}
return false
return nil
}
// BadPeers returns the peers that are considered bad by any of registered scorers.
@@ -153,7 +157,7 @@ func (s *Service) BadPeers() []peer.ID {
badPeers := make([]peer.ID, 0)
for pid := range s.store.Peers() {
if s.IsBadPeerNoLock(pid) {
if s.IsBadPeerNoLock(pid) != nil {
badPeers = append(badPeers, pid)
}
}

View File

@@ -100,7 +100,7 @@ func TestScorers_Service_Score(t *testing.T) {
return scores
}
pack := func(scorer *scorers.Service, s1, s2, s3 float64) map[string]float64 {
pack := func(_ *scorers.Service, s1, s2, s3 float64) map[string]float64 {
return map[string]float64{
"peer1": roundScore(s1),
"peer2": roundScore(s2),
@@ -237,7 +237,7 @@ func TestScorers_Service_loop(t *testing.T) {
for i := 0; i < s1.Params().Threshold+5; i++ {
s1.Increment(pid1)
}
assert.Equal(t, true, s1.IsBadPeer(pid1), "Peer should be marked as bad")
assert.NotNil(t, s1.IsBadPeer(pid1), "Peer should be marked as bad")
s2.IncrementProcessedBlocks("peer1", 221)
assert.Equal(t, uint64(221), s2.ProcessedBlocks("peer1"))
@@ -252,7 +252,7 @@ func TestScorers_Service_loop(t *testing.T) {
for {
select {
case <-ticker.C:
if s1.IsBadPeer(pid1) == false && s2.ProcessedBlocks("peer1") == 0 {
if s1.IsBadPeer(pid1) == nil && s2.ProcessedBlocks("peer1") == 0 {
return
}
case <-ctx.Done():
@@ -263,7 +263,7 @@ func TestScorers_Service_loop(t *testing.T) {
}()
<-done
assert.Equal(t, false, s1.IsBadPeer(pid1), "Peer should not be marked as bad")
assert.NoError(t, s1.IsBadPeer(pid1), "Peer should not be marked as bad")
assert.Equal(t, uint64(0), s2.ProcessedBlocks("peer1"), "No blocks are expected")
}
@@ -278,10 +278,10 @@ func TestScorers_Service_IsBadPeer(t *testing.T) {
},
})
assert.Equal(t, false, peerStatuses.Scorers().IsBadPeer("peer1"))
assert.NoError(t, peerStatuses.Scorers().IsBadPeer("peer1"))
peerStatuses.Scorers().BadResponsesScorer().Increment("peer1")
peerStatuses.Scorers().BadResponsesScorer().Increment("peer1")
assert.Equal(t, true, peerStatuses.Scorers().IsBadPeer("peer1"))
assert.NotNil(t, peerStatuses.Scorers().IsBadPeer("peer1"))
}
func TestScorers_Service_BadPeers(t *testing.T) {
@@ -295,16 +295,16 @@ func TestScorers_Service_BadPeers(t *testing.T) {
},
})
assert.Equal(t, false, peerStatuses.Scorers().IsBadPeer("peer1"))
assert.Equal(t, false, peerStatuses.Scorers().IsBadPeer("peer2"))
assert.Equal(t, false, peerStatuses.Scorers().IsBadPeer("peer3"))
assert.NoError(t, peerStatuses.Scorers().IsBadPeer("peer1"))
assert.NoError(t, peerStatuses.Scorers().IsBadPeer("peer2"))
assert.NoError(t, peerStatuses.Scorers().IsBadPeer("peer3"))
assert.Equal(t, 0, len(peerStatuses.Scorers().BadPeers()))
for _, pid := range []peer.ID{"peer1", "peer3"} {
peerStatuses.Scorers().BadResponsesScorer().Increment(pid)
peerStatuses.Scorers().BadResponsesScorer().Increment(pid)
}
assert.Equal(t, true, peerStatuses.Scorers().IsBadPeer("peer1"))
assert.Equal(t, false, peerStatuses.Scorers().IsBadPeer("peer2"))
assert.Equal(t, true, peerStatuses.Scorers().IsBadPeer("peer3"))
assert.NotNil(t, peerStatuses.Scorers().IsBadPeer("peer1"))
assert.NoError(t, peerStatuses.Scorers().IsBadPeer("peer2"))
assert.NotNil(t, peerStatuses.Scorers().IsBadPeer("peer3"))
assert.Equal(t, 2, len(peerStatuses.Scorers().BadPeers()))
}

View File

@@ -34,6 +34,7 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/pkg/errors"
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers/peerdata"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers/scorers"
@@ -49,14 +50,14 @@ import (
)
const (
// PeerDisconnected means there is no connection to the peer.
PeerDisconnected peerdata.PeerConnectionState = iota
// PeerDisconnecting means there is an on-going attempt to disconnect from the peer.
PeerDisconnecting
// PeerConnected means the peer has an active connection.
PeerConnected
// PeerConnecting means there is an on-going attempt to connect to the peer.
PeerConnecting
// Disconnected means there is no connection to the peer.
Disconnected peerdata.ConnectionState = iota
// Disconnecting means there is an on-going attempt to disconnect from the peer.
Disconnecting
// Connected means the peer has an active connection.
Connected
// Connecting means there is an on-going attempt to connect to the peer.
Connecting
)
const (
@@ -150,7 +151,7 @@ func (p *Status) Add(record *enr.Record, pid peer.ID, address ma.Multiaddr, dire
Address: address,
Direction: direction,
// Peers start disconnected; state will be updated when the handshake process begins.
ConnState: PeerDisconnected,
ConnState: Disconnected,
}
if record != nil {
peerData.Enr = record
@@ -212,7 +213,7 @@ func (p *Status) IsActive(pid peer.ID) bool {
defer p.store.RUnlock()
peerData, ok := p.store.PeerData(pid)
return ok && (peerData.ConnState == PeerConnected || peerData.ConnState == PeerConnecting)
return ok && (peerData.ConnState == Connected || peerData.ConnState == Connecting)
}
// IsAboveInboundLimit checks if we are above our current inbound
@@ -222,7 +223,7 @@ func (p *Status) IsAboveInboundLimit() bool {
defer p.store.RUnlock()
totalInbound := 0
for _, peerData := range p.store.Peers() {
if peerData.ConnState == PeerConnected &&
if peerData.ConnState == Connected &&
peerData.Direction == network.DirInbound {
totalInbound += 1
}
@@ -286,7 +287,7 @@ func (p *Status) SubscribedToSubnet(index uint64) []peer.ID {
peers := make([]peer.ID, 0)
for pid, peerData := range p.store.Peers() {
// look at active peers
connectedStatus := peerData.ConnState == PeerConnecting || peerData.ConnState == PeerConnected
connectedStatus := peerData.ConnState == Connecting || peerData.ConnState == Connected
if connectedStatus && peerData.MetaData != nil && !peerData.MetaData.IsNil() && peerData.MetaData.AttnetsBitfield() != nil {
indices := indicesFromBitfield(peerData.MetaData.AttnetsBitfield())
for _, idx := range indices {
@@ -301,7 +302,7 @@ func (p *Status) SubscribedToSubnet(index uint64) []peer.ID {
}
// SetConnectionState sets the connection state of the given remote peer.
func (p *Status) SetConnectionState(pid peer.ID, state peerdata.PeerConnectionState) {
func (p *Status) SetConnectionState(pid peer.ID, state peerdata.ConnectionState) {
p.store.Lock()
defer p.store.Unlock()
@@ -311,14 +312,14 @@ func (p *Status) SetConnectionState(pid peer.ID, state peerdata.PeerConnectionSt
// ConnectionState gets the connection state of the given remote peer.
// This will error if the peer does not exist.
func (p *Status) ConnectionState(pid peer.ID) (peerdata.PeerConnectionState, error) {
func (p *Status) ConnectionState(pid peer.ID) (peerdata.ConnectionState, error) {
p.store.RLock()
defer p.store.RUnlock()
if peerData, ok := p.store.PeerData(pid); ok {
return peerData.ConnState, nil
}
return PeerDisconnected, peerdata.ErrPeerUnknown
return Disconnected, peerdata.ErrPeerUnknown
}
// ChainStateLastUpdated gets the last time the chain state of the given remote peer was updated.
@@ -335,19 +336,29 @@ func (p *Status) ChainStateLastUpdated(pid peer.ID) (time.Time, error) {
// IsBad states if the peer is to be considered bad (by *any* of the registered scorers).
// If the peer is unknown this will return `false`, which makes using this function easier than returning an error.
func (p *Status) IsBad(pid peer.ID) bool {
func (p *Status) IsBad(pid peer.ID) error {
p.store.RLock()
defer p.store.RUnlock()
return p.isBad(pid)
}
// isBad is the lock-free version of IsBad.
func (p *Status) isBad(pid peer.ID) bool {
func (p *Status) isBad(pid peer.ID) error {
// Do not disconnect from trusted peers.
if p.store.IsTrustedPeer(pid) {
return false
return nil
}
return p.isfromBadIP(pid) || p.scorers.IsBadPeerNoLock(pid)
if err := p.isfromBadIP(pid); err != nil {
return errors.Wrap(err, "peer is from a bad IP")
}
if err := p.scorers.IsBadPeerNoLock(pid); err != nil {
return errors.Wrap(err, "is bad peer no lock")
}
return nil
}
// NextValidTime gets the earliest possible time it is to contact/dial
@@ -411,7 +422,7 @@ func (p *Status) Connecting() []peer.ID {
defer p.store.RUnlock()
peers := make([]peer.ID, 0)
for pid, peerData := range p.store.Peers() {
if peerData.ConnState == PeerConnecting {
if peerData.ConnState == Connecting {
peers = append(peers, pid)
}
}
@@ -424,7 +435,7 @@ func (p *Status) Connected() []peer.ID {
defer p.store.RUnlock()
peers := make([]peer.ID, 0)
for pid, peerData := range p.store.Peers() {
if peerData.ConnState == PeerConnected {
if peerData.ConnState == Connected {
peers = append(peers, pid)
}
}
@@ -450,7 +461,7 @@ func (p *Status) InboundConnected() []peer.ID {
defer p.store.RUnlock()
peers := make([]peer.ID, 0)
for pid, peerData := range p.store.Peers() {
if peerData.ConnState == PeerConnected && peerData.Direction == network.DirInbound {
if peerData.ConnState == Connected && peerData.Direction == network.DirInbound {
peers = append(peers, pid)
}
}
@@ -463,7 +474,7 @@ func (p *Status) InboundConnectedWithProtocol(protocol InternetProtocol) []peer.
defer p.store.RUnlock()
peers := make([]peer.ID, 0)
for pid, peerData := range p.store.Peers() {
if peerData.ConnState == PeerConnected && peerData.Direction == network.DirInbound && strings.Contains(peerData.Address.String(), string(protocol)) {
if peerData.ConnState == Connected && peerData.Direction == network.DirInbound && strings.Contains(peerData.Address.String(), string(protocol)) {
peers = append(peers, pid)
}
}
@@ -489,7 +500,7 @@ func (p *Status) OutboundConnected() []peer.ID {
defer p.store.RUnlock()
peers := make([]peer.ID, 0)
for pid, peerData := range p.store.Peers() {
if peerData.ConnState == PeerConnected && peerData.Direction == network.DirOutbound {
if peerData.ConnState == Connected && peerData.Direction == network.DirOutbound {
peers = append(peers, pid)
}
}
@@ -502,7 +513,7 @@ func (p *Status) OutboundConnectedWithProtocol(protocol InternetProtocol) []peer
defer p.store.RUnlock()
peers := make([]peer.ID, 0)
for pid, peerData := range p.store.Peers() {
if peerData.ConnState == PeerConnected && peerData.Direction == network.DirOutbound && strings.Contains(peerData.Address.String(), string(protocol)) {
if peerData.ConnState == Connected && peerData.Direction == network.DirOutbound && strings.Contains(peerData.Address.String(), string(protocol)) {
peers = append(peers, pid)
}
}
@@ -515,7 +526,7 @@ func (p *Status) Active() []peer.ID {
defer p.store.RUnlock()
peers := make([]peer.ID, 0)
for pid, peerData := range p.store.Peers() {
if peerData.ConnState == PeerConnecting || peerData.ConnState == PeerConnected {
if peerData.ConnState == Connecting || peerData.ConnState == Connected {
peers = append(peers, pid)
}
}
@@ -528,7 +539,7 @@ func (p *Status) Disconnecting() []peer.ID {
defer p.store.RUnlock()
peers := make([]peer.ID, 0)
for pid, peerData := range p.store.Peers() {
if peerData.ConnState == PeerDisconnecting {
if peerData.ConnState == Disconnecting {
peers = append(peers, pid)
}
}
@@ -541,7 +552,7 @@ func (p *Status) Disconnected() []peer.ID {
defer p.store.RUnlock()
peers := make([]peer.ID, 0)
for pid, peerData := range p.store.Peers() {
if peerData.ConnState == PeerDisconnected {
if peerData.ConnState == Disconnected {
peers = append(peers, pid)
}
}
@@ -554,7 +565,7 @@ func (p *Status) Inactive() []peer.ID {
defer p.store.RUnlock()
peers := make([]peer.ID, 0)
for pid, peerData := range p.store.Peers() {
if peerData.ConnState == PeerDisconnecting || peerData.ConnState == PeerDisconnected {
if peerData.ConnState == Disconnecting || peerData.ConnState == Disconnected {
peers = append(peers, pid)
}
}
@@ -592,7 +603,7 @@ func (p *Status) Prune() {
return
}
notBadPeer := func(pid peer.ID) bool {
return !p.isBad(pid)
return p.isBad(pid) == nil
}
notTrustedPeer := func(pid peer.ID) bool {
return !p.isTrustedPeers(pid)
@@ -605,7 +616,7 @@ func (p *Status) Prune() {
// Select disconnected peers with a smaller bad response count.
for pid, peerData := range p.store.Peers() {
// Should not prune trusted peer or prune the peer dara and unset trusted peer.
if peerData.ConnState == PeerDisconnected && notBadPeer(pid) && notTrustedPeer(pid) {
if peerData.ConnState == Disconnected && notBadPeer(pid) && notTrustedPeer(pid) {
peersToPrune = append(peersToPrune, &peerResp{
pid: pid,
score: p.Scorers().ScoreNoLock(pid),
@@ -657,7 +668,7 @@ func (p *Status) deprecatedPrune() {
// Select disconnected peers with a smaller bad response count.
for pid, peerData := range p.store.Peers() {
// Should not prune trusted peer or prune the peer dara and unset trusted peer.
if peerData.ConnState == PeerDisconnected && notBadPeer(peerData) && notTrustedPeer(pid) {
if peerData.ConnState == Disconnected && notBadPeer(peerData) && notTrustedPeer(pid) {
peersToPrune = append(peersToPrune, &peerResp{
pid: pid,
badResp: peerData.BadResponses,
@@ -814,7 +825,7 @@ func (p *Status) PeersToPrune() []peer.ID {
peersToPrune := make([]*peerResp, 0)
// Select connected and inbound peers to prune.
for pid, peerData := range p.store.Peers() {
if peerData.ConnState == PeerConnected &&
if peerData.ConnState == Connected &&
peerData.Direction == network.DirInbound && !p.store.IsTrustedPeer(pid) {
peersToPrune = append(peersToPrune, &peerResp{
pid: pid,
@@ -880,7 +891,7 @@ func (p *Status) deprecatedPeersToPrune() []peer.ID {
peersToPrune := make([]*peerResp, 0)
// Select connected and inbound peers to prune.
for pid, peerData := range p.store.Peers() {
if peerData.ConnState == PeerConnected &&
if peerData.ConnState == Connected &&
peerData.Direction == network.DirInbound && !p.store.IsTrustedPeer(pid) {
peersToPrune = append(peersToPrune, &peerResp{
pid: pid,
@@ -982,24 +993,28 @@ func (p *Status) isTrustedPeers(pid peer.ID) bool {
// this method assumes the store lock is acquired before
// executing the method.
func (p *Status) isfromBadIP(pid peer.ID) bool {
func (p *Status) isfromBadIP(pid peer.ID) error {
peerData, ok := p.store.PeerData(pid)
if !ok {
return false
return nil
}
if peerData.Address == nil {
return false
return nil
}
ip, err := manet.ToIP(peerData.Address)
if err != nil {
return true
return errors.Wrap(err, "to ip")
}
if val, ok := p.ipTracker[ip.String()]; ok {
if val > CollocationLimit {
return true
return errors.Errorf("collocation limit exceeded: got %d - limit %d", val, CollocationLimit)
}
}
return false
return nil
}
func (p *Status) addIpToTracker(pid peer.ID) {

View File

@@ -215,7 +215,7 @@ func TestPeerSubscribedToSubnet(t *testing.T) {
// Add some peers with different states
numPeers := 2
for i := 0; i < numPeers; i++ {
addPeer(t, p, peers.PeerConnected)
addPeer(t, p, peers.Connected)
}
expectedPeer := p.All()[1]
bitV := bitfield.NewBitvector64()
@@ -230,7 +230,7 @@ func TestPeerSubscribedToSubnet(t *testing.T) {
}))
numPeers = 3
for i := 0; i < numPeers; i++ {
addPeer(t, p, peers.PeerDisconnected)
addPeer(t, p, peers.Disconnected)
}
ps := p.SubscribedToSubnet(2)
assert.Equal(t, 1, len(ps), "Unexpected num of peers")
@@ -259,7 +259,7 @@ func TestPeerImplicitAdd(t *testing.T) {
id, err := peer.Decode("16Uiu2HAkyWZ4Ni1TpvDS8dPxsozmHY85KaiFjodQuV6Tz5tkHVeR")
require.NoError(t, err)
connectionState := peers.PeerConnecting
connectionState := peers.Connecting
p.SetConnectionState(id, connectionState)
resConnectionState, err := p.ConnectionState(id)
@@ -347,7 +347,7 @@ func TestPeerBadResponses(t *testing.T) {
require.NoError(t, err)
}
assert.Equal(t, false, p.IsBad(id), "Peer marked as bad when should be good")
assert.NoError(t, p.IsBad(id), "Peer marked as bad when should be good")
address, err := ma.NewMultiaddr("/ip4/213.202.254.180/tcp/13000")
require.NoError(t, err, "Failed to create address")
@@ -358,25 +358,25 @@ func TestPeerBadResponses(t *testing.T) {
resBadResponses, err := scorer.Count(id)
require.NoError(t, err)
assert.Equal(t, 0, resBadResponses, "Unexpected bad responses")
assert.Equal(t, false, p.IsBad(id), "Peer marked as bad when should be good")
assert.NoError(t, p.IsBad(id), "Peer marked as bad when should be good")
scorer.Increment(id)
resBadResponses, err = scorer.Count(id)
require.NoError(t, err)
assert.Equal(t, 1, resBadResponses, "Unexpected bad responses")
assert.Equal(t, false, p.IsBad(id), "Peer marked as bad when should be good")
assert.NoError(t, p.IsBad(id), "Peer marked as bad when should be good")
scorer.Increment(id)
resBadResponses, err = scorer.Count(id)
require.NoError(t, err)
assert.Equal(t, 2, resBadResponses, "Unexpected bad responses")
assert.Equal(t, true, p.IsBad(id), "Peer not marked as bad when it should be")
assert.NotNil(t, p.IsBad(id), "Peer not marked as bad when it should be")
scorer.Increment(id)
resBadResponses, err = scorer.Count(id)
require.NoError(t, err)
assert.Equal(t, 3, resBadResponses, "Unexpected bad responses")
assert.Equal(t, true, p.IsBad(id), "Peer not marked as bad when it should be")
assert.NotNil(t, p.IsBad(id), "Peer not marked as bad when it should be")
}
func TestAddMetaData(t *testing.T) {
@@ -393,7 +393,7 @@ func TestAddMetaData(t *testing.T) {
// Add some peers with different states
numPeers := 5
for i := 0; i < numPeers; i++ {
addPeer(t, p, peers.PeerConnected)
addPeer(t, p, peers.Connected)
}
newPeer := p.All()[2]
@@ -422,19 +422,19 @@ func TestPeerConnectionStatuses(t *testing.T) {
// Add some peers with different states
numPeersDisconnected := 11
for i := 0; i < numPeersDisconnected; i++ {
addPeer(t, p, peers.PeerDisconnected)
addPeer(t, p, peers.Disconnected)
}
numPeersConnecting := 7
for i := 0; i < numPeersConnecting; i++ {
addPeer(t, p, peers.PeerConnecting)
addPeer(t, p, peers.Connecting)
}
numPeersConnected := 43
for i := 0; i < numPeersConnected; i++ {
addPeer(t, p, peers.PeerConnected)
addPeer(t, p, peers.Connected)
}
numPeersDisconnecting := 4
for i := 0; i < numPeersDisconnecting; i++ {
addPeer(t, p, peers.PeerDisconnecting)
addPeer(t, p, peers.Disconnecting)
}
// Now confirm the states
@@ -463,7 +463,7 @@ func TestPeerValidTime(t *testing.T) {
numPeersConnected := 6
for i := 0; i < numPeersConnected; i++ {
addPeer(t, p, peers.PeerConnected)
addPeer(t, p, peers.Connected)
}
allPeers := p.All()
@@ -510,10 +510,10 @@ func TestPrune(t *testing.T) {
for i := 0; i < p.MaxPeerLimit()+100; i++ {
if i%7 == 0 {
// Peer added as disconnected.
_ = addPeer(t, p, peers.PeerDisconnected)
_ = addPeer(t, p, peers.Disconnected)
}
// Peer added to peer handler.
_ = addPeer(t, p, peers.PeerConnected)
_ = addPeer(t, p, peers.Connected)
}
disPeers := p.Disconnected()
@@ -571,23 +571,23 @@ func TestPeerIPTracker(t *testing.T) {
if err != nil {
t.Fatal(err)
}
badPeers = append(badPeers, createPeer(t, p, addr, network.DirUnknown, peerdata.PeerConnectionState(ethpb.ConnectionState_DISCONNECTED)))
badPeers = append(badPeers, createPeer(t, p, addr, network.DirUnknown, peerdata.ConnectionState(ethpb.ConnectionState_DISCONNECTED)))
}
for _, pr := range badPeers {
assert.Equal(t, true, p.IsBad(pr), "peer with bad ip is not bad")
assert.NotNil(t, p.IsBad(pr), "peer with bad ip is not bad")
}
// Add in bad peers, so that our records are trimmed out
// from the peer store.
for i := 0; i < p.MaxPeerLimit()+100; i++ {
// Peer added to peer handler.
pid := addPeer(t, p, peers.PeerDisconnected)
pid := addPeer(t, p, peers.Disconnected)
p.Scorers().BadResponsesScorer().Increment(pid)
}
p.Prune()
for _, pr := range badPeers {
assert.Equal(t, false, p.IsBad(pr), "peer with good ip is regarded as bad")
assert.NoError(t, p.IsBad(pr), "peer with good ip is regarded as bad")
}
}
@@ -601,8 +601,11 @@ func TestTrimmedOrderedPeers(t *testing.T) {
},
})
expectedTarget := primitives.Epoch(2)
maxPeers := 3
const (
expectedTarget = primitives.Epoch(2)
maxPeers = 3
)
var mockroot2 [32]byte
var mockroot3 [32]byte
var mockroot4 [32]byte
@@ -611,36 +614,41 @@ func TestTrimmedOrderedPeers(t *testing.T) {
copy(mockroot3[:], "three")
copy(mockroot4[:], "four")
copy(mockroot5[:], "five")
// Peer 1
pid1 := addPeer(t, p, peers.PeerConnected)
pid1 := addPeer(t, p, peers.Connected)
p.SetChainState(pid1, &pb.Status{
HeadSlot: 3 * params.BeaconConfig().SlotsPerEpoch,
FinalizedEpoch: 3,
FinalizedRoot: mockroot3[:],
})
// Peer 2
pid2 := addPeer(t, p, peers.PeerConnected)
pid2 := addPeer(t, p, peers.Connected)
p.SetChainState(pid2, &pb.Status{
HeadSlot: 4 * params.BeaconConfig().SlotsPerEpoch,
FinalizedEpoch: 4,
FinalizedRoot: mockroot4[:],
})
// Peer 3
pid3 := addPeer(t, p, peers.PeerConnected)
pid3 := addPeer(t, p, peers.Connected)
p.SetChainState(pid3, &pb.Status{
HeadSlot: 5 * params.BeaconConfig().SlotsPerEpoch,
FinalizedEpoch: 5,
FinalizedRoot: mockroot5[:],
})
// Peer 4
pid4 := addPeer(t, p, peers.PeerConnected)
pid4 := addPeer(t, p, peers.Connected)
p.SetChainState(pid4, &pb.Status{
HeadSlot: 2 * params.BeaconConfig().SlotsPerEpoch,
FinalizedEpoch: 2,
FinalizedRoot: mockroot2[:],
})
// Peer 5
pid5 := addPeer(t, p, peers.PeerConnected)
pid5 := addPeer(t, p, peers.Connected)
p.SetChainState(pid5, &pb.Status{
HeadSlot: 2 * params.BeaconConfig().SlotsPerEpoch,
FinalizedEpoch: 2,
@@ -680,12 +688,12 @@ func TestAtInboundPeerLimit(t *testing.T) {
})
for i := 0; i < 15; i++ {
// Peer added to peer handler.
createPeer(t, p, nil, network.DirOutbound, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED))
createPeer(t, p, nil, network.DirOutbound, peerdata.ConnectionState(ethpb.ConnectionState_CONNECTED))
}
assert.Equal(t, false, p.IsAboveInboundLimit(), "Inbound limit exceeded")
for i := 0; i < 31; i++ {
// Peer added to peer handler.
createPeer(t, p, nil, network.DirInbound, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED))
createPeer(t, p, nil, network.DirInbound, peerdata.ConnectionState(ethpb.ConnectionState_CONNECTED))
}
assert.Equal(t, true, p.IsAboveInboundLimit(), "Inbound limit not exceeded")
}
@@ -705,7 +713,7 @@ func TestPrunePeers(t *testing.T) {
})
for i := 0; i < 15; i++ {
// Peer added to peer handler.
createPeer(t, p, nil, network.DirOutbound, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED))
createPeer(t, p, nil, network.DirOutbound, peerdata.ConnectionState(ethpb.ConnectionState_CONNECTED))
}
// Assert there are no prunable peers.
peersToPrune := p.PeersToPrune()
@@ -713,7 +721,7 @@ func TestPrunePeers(t *testing.T) {
for i := 0; i < 18; i++ {
// Peer added to peer handler.
createPeer(t, p, nil, network.DirInbound, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED))
createPeer(t, p, nil, network.DirInbound, peerdata.ConnectionState(ethpb.ConnectionState_CONNECTED))
}
// Assert there are the correct prunable peers.
@@ -723,7 +731,7 @@ func TestPrunePeers(t *testing.T) {
// Add in more peers.
for i := 0; i < 13; i++ {
// Peer added to peer handler.
createPeer(t, p, nil, network.DirInbound, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED))
createPeer(t, p, nil, network.DirInbound, peerdata.ConnectionState(ethpb.ConnectionState_CONNECTED))
}
// Set up bad scores for inbound peers.
@@ -767,7 +775,7 @@ func TestPrunePeers_TrustedPeers(t *testing.T) {
for i := 0; i < 15; i++ {
// Peer added to peer handler.
createPeer(t, p, nil, network.DirOutbound, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED))
createPeer(t, p, nil, network.DirOutbound, peerdata.ConnectionState(ethpb.ConnectionState_CONNECTED))
}
// Assert there are no prunable peers.
peersToPrune := p.PeersToPrune()
@@ -775,7 +783,7 @@ func TestPrunePeers_TrustedPeers(t *testing.T) {
for i := 0; i < 18; i++ {
// Peer added to peer handler.
createPeer(t, p, nil, network.DirInbound, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED))
createPeer(t, p, nil, network.DirInbound, peerdata.ConnectionState(ethpb.ConnectionState_CONNECTED))
}
// Assert there are the correct prunable peers.
@@ -785,7 +793,7 @@ func TestPrunePeers_TrustedPeers(t *testing.T) {
// Add in more peers.
for i := 0; i < 13; i++ {
// Peer added to peer handler.
createPeer(t, p, nil, network.DirInbound, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED))
createPeer(t, p, nil, network.DirInbound, peerdata.ConnectionState(ethpb.ConnectionState_CONNECTED))
}
var trustedPeers []peer.ID
@@ -821,7 +829,7 @@ func TestPrunePeers_TrustedPeers(t *testing.T) {
// Add more peers to check if trusted peers can be pruned after they are deleted from trusted peer set.
for i := 0; i < 9; i++ {
// Peer added to peer handler.
createPeer(t, p, nil, network.DirInbound, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED))
createPeer(t, p, nil, network.DirInbound, peerdata.ConnectionState(ethpb.ConnectionState_CONNECTED))
}
// Delete trusted peers.
@@ -865,14 +873,14 @@ func TestStatus_BestPeer(t *testing.T) {
headSlot primitives.Slot
finalizedEpoch primitives.Epoch
}
tests := []struct {
name string
peers []*peerConfig
limitPeers int
ourFinalizedEpoch primitives.Epoch
targetEpoch primitives.Epoch
// targetEpochSupport denotes how many peers support returned epoch.
targetEpochSupport int
name string
peers []*peerConfig
limitPeers int
ourFinalizedEpoch primitives.Epoch
targetEpoch primitives.Epoch
targetEpochSupport int // Denotes how many peers support returned epoch.
}{
{
name: "head slot matches finalized epoch",
@@ -885,6 +893,7 @@ func TestStatus_BestPeer(t *testing.T) {
{finalizedEpoch: 3, headSlot: 3 * params.BeaconConfig().SlotsPerEpoch},
},
limitPeers: 15,
ourFinalizedEpoch: 0,
targetEpoch: 4,
targetEpochSupport: 4,
},
@@ -902,6 +911,7 @@ func TestStatus_BestPeer(t *testing.T) {
{finalizedEpoch: 3, headSlot: 4 * params.BeaconConfig().SlotsPerEpoch},
},
limitPeers: 15,
ourFinalizedEpoch: 0,
targetEpoch: 4,
targetEpochSupport: 4,
},
@@ -916,6 +926,7 @@ func TestStatus_BestPeer(t *testing.T) {
{finalizedEpoch: 3, headSlot: 42 * params.BeaconConfig().SlotsPerEpoch},
},
limitPeers: 15,
ourFinalizedEpoch: 0,
targetEpoch: 4,
targetEpochSupport: 4,
},
@@ -930,8 +941,8 @@ func TestStatus_BestPeer(t *testing.T) {
{finalizedEpoch: 3, headSlot: 46 * params.BeaconConfig().SlotsPerEpoch},
{finalizedEpoch: 6, headSlot: 6 * params.BeaconConfig().SlotsPerEpoch},
},
ourFinalizedEpoch: 5,
limitPeers: 15,
ourFinalizedEpoch: 5,
targetEpoch: 6,
targetEpochSupport: 1,
},
@@ -950,8 +961,8 @@ func TestStatus_BestPeer(t *testing.T) {
{finalizedEpoch: 7, headSlot: 7 * params.BeaconConfig().SlotsPerEpoch},
{finalizedEpoch: 8, headSlot: 8 * params.BeaconConfig().SlotsPerEpoch},
},
ourFinalizedEpoch: 5,
limitPeers: 15,
ourFinalizedEpoch: 5,
targetEpoch: 6,
targetEpochSupport: 5,
},
@@ -970,8 +981,8 @@ func TestStatus_BestPeer(t *testing.T) {
{finalizedEpoch: 7, headSlot: 7 * params.BeaconConfig().SlotsPerEpoch},
{finalizedEpoch: 8, headSlot: 8 * params.BeaconConfig().SlotsPerEpoch},
},
ourFinalizedEpoch: 5,
limitPeers: 4,
ourFinalizedEpoch: 5,
targetEpoch: 6,
targetEpochSupport: 4,
},
@@ -986,8 +997,8 @@ func TestStatus_BestPeer(t *testing.T) {
{finalizedEpoch: 8, headSlot: 8 * params.BeaconConfig().SlotsPerEpoch},
{finalizedEpoch: 8, headSlot: 8 * params.BeaconConfig().SlotsPerEpoch},
},
ourFinalizedEpoch: 5,
limitPeers: 15,
ourFinalizedEpoch: 5,
targetEpoch: 8,
targetEpochSupport: 3,
},
@@ -1002,7 +1013,7 @@ func TestStatus_BestPeer(t *testing.T) {
},
})
for _, peerConfig := range tt.peers {
p.SetChainState(addPeer(t, p, peers.PeerConnected), &pb.Status{
p.SetChainState(addPeer(t, p, peers.Connected), &pb.Status{
FinalizedEpoch: peerConfig.finalizedEpoch,
HeadSlot: peerConfig.headSlot,
})
@@ -1028,7 +1039,7 @@ func TestBestFinalized_returnsMaxValue(t *testing.T) {
for i := 0; i <= maxPeers+100; i++ {
p.Add(new(enr.Record), peer.ID(rune(i)), nil, network.DirOutbound)
p.SetConnectionState(peer.ID(rune(i)), peers.PeerConnected)
p.SetConnectionState(peer.ID(rune(i)), peers.Connected)
p.SetChainState(peer.ID(rune(i)), &pb.Status{
FinalizedEpoch: 10,
})
@@ -1051,7 +1062,7 @@ func TestStatus_BestNonFinalized(t *testing.T) {
peerSlots := []primitives.Slot{32, 32, 32, 32, 235, 233, 258, 268, 270}
for i, headSlot := range peerSlots {
p.Add(new(enr.Record), peer.ID(rune(i)), nil, network.DirOutbound)
p.SetConnectionState(peer.ID(rune(i)), peers.PeerConnected)
p.SetConnectionState(peer.ID(rune(i)), peers.Connected)
p.SetChainState(peer.ID(rune(i)), &pb.Status{
HeadSlot: headSlot,
})
@@ -1074,17 +1085,17 @@ func TestStatus_CurrentEpoch(t *testing.T) {
},
})
// Peer 1
pid1 := addPeer(t, p, peers.PeerConnected)
pid1 := addPeer(t, p, peers.Connected)
p.SetChainState(pid1, &pb.Status{
HeadSlot: params.BeaconConfig().SlotsPerEpoch * 4,
})
// Peer 2
pid2 := addPeer(t, p, peers.PeerConnected)
pid2 := addPeer(t, p, peers.Connected)
p.SetChainState(pid2, &pb.Status{
HeadSlot: params.BeaconConfig().SlotsPerEpoch * 5,
})
// Peer 3
pid3 := addPeer(t, p, peers.PeerConnected)
pid3 := addPeer(t, p, peers.Connected)
p.SetChainState(pid3, &pb.Status{
HeadSlot: params.BeaconConfig().SlotsPerEpoch * 4,
})
@@ -1103,8 +1114,8 @@ func TestInbound(t *testing.T) {
})
addr, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/33333")
require.NoError(t, err)
inbound := createPeer(t, p, addr, network.DirInbound, peers.PeerConnected)
createPeer(t, p, addr, network.DirOutbound, peers.PeerConnected)
inbound := createPeer(t, p, addr, network.DirInbound, peers.Connected)
createPeer(t, p, addr, network.DirOutbound, peers.Connected)
result := p.Inbound()
require.Equal(t, 1, len(result))
@@ -1123,8 +1134,8 @@ func TestInboundConnected(t *testing.T) {
addr, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/33333")
require.NoError(t, err)
inbound := createPeer(t, p, addr, network.DirInbound, peers.PeerConnected)
createPeer(t, p, addr, network.DirInbound, peers.PeerConnecting)
inbound := createPeer(t, p, addr, network.DirInbound, peers.Connected)
createPeer(t, p, addr, network.DirInbound, peers.Connecting)
result := p.InboundConnected()
require.Equal(t, 1, len(result))
@@ -1157,7 +1168,7 @@ func TestInboundConnectedWithProtocol(t *testing.T) {
multiaddr, err := ma.NewMultiaddr(addr)
require.NoError(t, err)
peer := createPeer(t, p, multiaddr, network.DirInbound, peers.PeerConnected)
peer := createPeer(t, p, multiaddr, network.DirInbound, peers.Connected)
expectedTCP[peer.String()] = true
}
@@ -1166,7 +1177,7 @@ func TestInboundConnectedWithProtocol(t *testing.T) {
multiaddr, err := ma.NewMultiaddr(addr)
require.NoError(t, err)
peer := createPeer(t, p, multiaddr, network.DirInbound, peers.PeerConnected)
peer := createPeer(t, p, multiaddr, network.DirInbound, peers.Connected)
expectedQUIC[peer.String()] = true
}
@@ -1203,8 +1214,8 @@ func TestOutbound(t *testing.T) {
})
addr, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/33333")
require.NoError(t, err)
createPeer(t, p, addr, network.DirInbound, peers.PeerConnected)
outbound := createPeer(t, p, addr, network.DirOutbound, peers.PeerConnected)
createPeer(t, p, addr, network.DirInbound, peers.Connected)
outbound := createPeer(t, p, addr, network.DirOutbound, peers.Connected)
result := p.Outbound()
require.Equal(t, 1, len(result))
@@ -1223,8 +1234,8 @@ func TestOutboundConnected(t *testing.T) {
addr, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/33333")
require.NoError(t, err)
inbound := createPeer(t, p, addr, network.DirOutbound, peers.PeerConnected)
createPeer(t, p, addr, network.DirOutbound, peers.PeerConnecting)
inbound := createPeer(t, p, addr, network.DirOutbound, peers.Connected)
createPeer(t, p, addr, network.DirOutbound, peers.Connecting)
result := p.OutboundConnected()
require.Equal(t, 1, len(result))
@@ -1257,7 +1268,7 @@ func TestOutboundConnectedWithProtocol(t *testing.T) {
multiaddr, err := ma.NewMultiaddr(addr)
require.NoError(t, err)
peer := createPeer(t, p, multiaddr, network.DirOutbound, peers.PeerConnected)
peer := createPeer(t, p, multiaddr, network.DirOutbound, peers.Connected)
expectedTCP[peer.String()] = true
}
@@ -1266,7 +1277,7 @@ func TestOutboundConnectedWithProtocol(t *testing.T) {
multiaddr, err := ma.NewMultiaddr(addr)
require.NoError(t, err)
peer := createPeer(t, p, multiaddr, network.DirOutbound, peers.PeerConnected)
peer := createPeer(t, p, multiaddr, network.DirOutbound, peers.Connected)
expectedQUIC[peer.String()] = true
}
@@ -1293,7 +1304,7 @@ func TestOutboundConnectedWithProtocol(t *testing.T) {
}
// addPeer is a helper to add a peer with a given connection state)
func addPeer(t *testing.T, p *peers.Status, state peerdata.PeerConnectionState) peer.ID {
func addPeer(t *testing.T, p *peers.Status, state peerdata.ConnectionState) peer.ID {
// Set up some peers with different states
mhBytes := []byte{0x11, 0x04}
idBytes := make([]byte, 4)
@@ -1312,7 +1323,7 @@ func addPeer(t *testing.T, p *peers.Status, state peerdata.PeerConnectionState)
}
func createPeer(t *testing.T, p *peers.Status, addr ma.Multiaddr,
dir network.Direction, state peerdata.PeerConnectionState) peer.ID {
dir network.Direction, state peerdata.ConnectionState) peer.ID {
mhBytes := []byte{0x11, 0x04}
idBytes := make([]byte, 4)
_, err := rand.Read(idBytes)

View File

@@ -202,12 +202,13 @@ func (s *Service) Start() {
s.startupErr = err
return
}
err = s.connectToBootnodes()
if err != nil {
log.WithError(err).Error("Could not add bootnode to the exclusion list")
if err := s.connectToBootnodes(); err != nil {
log.WithError(err).Error("Could not connect to boot nodes")
s.startupErr = err
return
}
s.dv5Listener = listener
go s.listenForNewNodes()
}
@@ -384,12 +385,17 @@ func (s *Service) AddPingMethod(reqFunc func(ctx context.Context, id peer.ID) er
s.pingMethodLock.Unlock()
}
func (s *Service) pingPeers() {
func (s *Service) pingPeersAndLogEnr() {
s.pingMethodLock.RLock()
defer s.pingMethodLock.RUnlock()
localENR := s.dv5Listener.Self()
log.WithField("ENR", localENR).Info("New node record")
if s.pingMethod == nil {
return
}
for _, pid := range s.peers.Connected() {
go func(id peer.ID) {
if err := s.pingMethod(s.ctx, id); err != nil {
@@ -462,8 +468,8 @@ func (s *Service) connectWithPeer(ctx context.Context, info peer.AddrInfo) error
if info.ID == s.host.ID() {
return nil
}
if s.Peers().IsBad(info.ID) {
return errors.New("refused to connect to bad peer")
if err := s.Peers().IsBad(info.ID); err != nil {
return errors.Wrap(err, "refused to connect to bad peer")
}
ctx, cancel := context.WithTimeout(ctx, maxDialTimeout)
defer cancel()

View File

@@ -20,7 +20,7 @@ type MockPeerManager struct {
}
// Disconnect .
func (_ *MockPeerManager) Disconnect(peer.ID) error {
func (*MockPeerManager) Disconnect(peer.ID) error {
return nil
}
@@ -35,12 +35,12 @@ func (m *MockPeerManager) Host() host.Host {
}
// ENR .
func (m MockPeerManager) ENR() *enr.Record {
func (m *MockPeerManager) ENR() *enr.Record {
return m.Enr
}
// DiscoveryAddresses .
func (m MockPeerManager) DiscoveryAddresses() ([]multiaddr.Multiaddr, error) {
func (m *MockPeerManager) DiscoveryAddresses() ([]multiaddr.Multiaddr, error) {
if m.FailDiscoveryAddr {
return nil, errors.New("fail")
}
@@ -48,12 +48,12 @@ func (m MockPeerManager) DiscoveryAddresses() ([]multiaddr.Multiaddr, error) {
}
// RefreshENR .
func (_ MockPeerManager) RefreshENR() {}
func (*MockPeerManager) RefreshENR() {}
// FindPeersWithSubnet .
func (_ MockPeerManager) FindPeersWithSubnet(_ context.Context, _ string, _ uint64, _ int) (bool, error) {
func (*MockPeerManager) FindPeersWithSubnet(_ context.Context, _ string, _ uint64, _ int) (bool, error) {
return true, nil
}
// AddPingMethod .
func (_ MockPeerManager) AddPingMethod(_ func(ctx context.Context, id peer.ID) error) {}
func (*MockPeerManager) AddPingMethod(_ func(ctx context.Context, id peer.ID) error) {}

View File

@@ -64,7 +64,7 @@ func (m *MockPeersProvider) Peers() *peers.Status {
log.WithError(err).Debug("Cannot decode")
}
m.peers.Add(createENR(), id0, ma0, network.DirInbound)
m.peers.SetConnectionState(id0, peers.PeerConnected)
m.peers.SetConnectionState(id0, peers.Connected)
m.peers.SetChainState(id0, &pb.Status{FinalizedEpoch: 10})
id1, err := peer.Decode(MockRawPeerId1)
if err != nil {
@@ -75,7 +75,7 @@ func (m *MockPeersProvider) Peers() *peers.Status {
log.WithError(err).Debug("Cannot decode")
}
m.peers.Add(createENR(), id1, ma1, network.DirOutbound)
m.peers.SetConnectionState(id1, peers.PeerConnected)
m.peers.SetConnectionState(id1, peers.Connected)
m.peers.SetChainState(id1, &pb.Status{FinalizedEpoch: 11})
}
return m.peers

View File

@@ -239,7 +239,7 @@ func (p *TestP2P) LeaveTopic(topic string) error {
}
// Encoding returns ssz encoding.
func (_ *TestP2P) Encoding() encoder.NetworkEncoding {
func (*TestP2P) Encoding() encoder.NetworkEncoding {
return &encoder.SszNetworkEncoder{}
}
@@ -266,12 +266,12 @@ func (p *TestP2P) Host() host.Host {
}
// ENR returns the enr of the local peer.
func (_ *TestP2P) ENR() *enr.Record {
func (*TestP2P) ENR() *enr.Record {
return new(enr.Record)
}
// DiscoveryAddresses --
func (_ *TestP2P) DiscoveryAddresses() ([]multiaddr.Multiaddr, error) {
func (*TestP2P) DiscoveryAddresses() ([]multiaddr.Multiaddr, error) {
return nil, nil
}
@@ -284,16 +284,16 @@ func (p *TestP2P) AddConnectionHandler(f, _ func(ctx context.Context, id peer.ID
p.peers.Add(new(enr.Record), conn.RemotePeer(), conn.RemoteMultiaddr(), conn.Stat().Direction)
ctx := context.Background()
p.peers.SetConnectionState(conn.RemotePeer(), peers.PeerConnecting)
p.peers.SetConnectionState(conn.RemotePeer(), peers.Connecting)
if err := f(ctx, conn.RemotePeer()); err != nil {
logrus.WithError(err).Error("Could not send successful hello rpc request")
if err := p.Disconnect(conn.RemotePeer()); err != nil {
logrus.WithError(err).Errorf("Unable to close peer %s", conn.RemotePeer())
}
p.peers.SetConnectionState(conn.RemotePeer(), peers.PeerDisconnected)
p.peers.SetConnectionState(conn.RemotePeer(), peers.Disconnected)
return
}
p.peers.SetConnectionState(conn.RemotePeer(), peers.PeerConnected)
p.peers.SetConnectionState(conn.RemotePeer(), peers.Connected)
}()
},
})
@@ -305,11 +305,11 @@ func (p *TestP2P) AddDisconnectionHandler(f func(ctx context.Context, id peer.ID
DisconnectedF: func(net network.Network, conn network.Conn) {
// Must be handled in a goroutine as this callback cannot be blocking.
go func() {
p.peers.SetConnectionState(conn.RemotePeer(), peers.PeerDisconnecting)
p.peers.SetConnectionState(conn.RemotePeer(), peers.Disconnecting)
if err := f(context.Background(), conn.RemotePeer()); err != nil {
logrus.WithError(err).Debug("Unable to invoke callback")
}
p.peers.SetConnectionState(conn.RemotePeer(), peers.PeerDisconnected)
p.peers.SetConnectionState(conn.RemotePeer(), peers.Disconnected)
}()
},
})
@@ -353,7 +353,7 @@ func (p *TestP2P) Send(ctx context.Context, msg interface{}, topic string, pid p
}
// Started always returns true.
func (_ *TestP2P) Started() bool {
func (*TestP2P) Started() bool {
return true
}
@@ -363,12 +363,12 @@ func (p *TestP2P) Peers() *peers.Status {
}
// FindPeersWithSubnet mocks the p2p func.
func (_ *TestP2P) FindPeersWithSubnet(_ context.Context, _ string, _ uint64, _ int) (bool, error) {
func (*TestP2P) FindPeersWithSubnet(_ context.Context, _ string, _ uint64, _ int) (bool, error) {
return false, nil
}
// RefreshENR mocks the p2p func.
func (_ *TestP2P) RefreshENR() {}
func (*TestP2P) RefreshENR() {}
// ForkDigest mocks the p2p func.
func (p *TestP2P) ForkDigest() ([4]byte, error) {
@@ -386,31 +386,31 @@ func (p *TestP2P) MetadataSeq() uint64 {
}
// AddPingMethod mocks the p2p func.
func (_ *TestP2P) AddPingMethod(_ func(ctx context.Context, id peer.ID) error) {
func (*TestP2P) AddPingMethod(_ func(ctx context.Context, id peer.ID) error) {
// no-op
}
// InterceptPeerDial .
func (_ *TestP2P) InterceptPeerDial(peer.ID) (allow bool) {
func (*TestP2P) InterceptPeerDial(peer.ID) (allow bool) {
return true
}
// InterceptAddrDial .
func (_ *TestP2P) InterceptAddrDial(peer.ID, multiaddr.Multiaddr) (allow bool) {
func (*TestP2P) InterceptAddrDial(peer.ID, multiaddr.Multiaddr) (allow bool) {
return true
}
// InterceptAccept .
func (_ *TestP2P) InterceptAccept(_ network.ConnMultiaddrs) (allow bool) {
func (*TestP2P) InterceptAccept(_ network.ConnMultiaddrs) (allow bool) {
return true
}
// InterceptSecured .
func (_ *TestP2P) InterceptSecured(network.Direction, peer.ID, network.ConnMultiaddrs) (allow bool) {
func (*TestP2P) InterceptSecured(network.Direction, peer.ID, network.ConnMultiaddrs) (allow bool) {
return true
}
// InterceptUpgraded .
func (_ *TestP2P) InterceptUpgraded(network.Conn) (allow bool, reason control.DisconnectReason) {
func (*TestP2P) InterceptUpgraded(network.Conn) (allow bool, reason control.DisconnectReason) {
return true, 0
}

View File

@@ -117,13 +117,13 @@ func TestGetPeers(t *testing.T) {
switch i {
case 0, 1:
peerStatus.SetConnectionState(id, peers.PeerConnecting)
peerStatus.SetConnectionState(id, peers.Connecting)
case 2, 3:
peerStatus.SetConnectionState(id, peers.PeerConnected)
peerStatus.SetConnectionState(id, peers.Connected)
case 4, 5:
peerStatus.SetConnectionState(id, peers.PeerDisconnecting)
peerStatus.SetConnectionState(id, peers.Disconnecting)
case 6, 7:
peerStatus.SetConnectionState(id, peers.PeerDisconnected)
peerStatus.SetConnectionState(id, peers.Disconnected)
default:
t.Fatalf("Failed to set connection state for peer")
}
@@ -289,13 +289,13 @@ func TestGetPeerCount(t *testing.T) {
switch i {
case 0:
peerStatus.SetConnectionState(id, peers.PeerConnecting)
peerStatus.SetConnectionState(id, peers.Connecting)
case 1, 2:
peerStatus.SetConnectionState(id, peers.PeerConnected)
peerStatus.SetConnectionState(id, peers.Connected)
case 3, 4, 5:
peerStatus.SetConnectionState(id, peers.PeerDisconnecting)
peerStatus.SetConnectionState(id, peers.Disconnecting)
case 6, 7, 8, 9:
peerStatus.SetConnectionState(id, peers.PeerDisconnected)
peerStatus.SetConnectionState(id, peers.Disconnected)
default:
t.Fatalf("Failed to set connection state for peer")
}

View File

@@ -25,8 +25,8 @@ import (
type testIdentity enode.ID
func (_ testIdentity) Verify(_ *enr.Record, _ []byte) error { return nil }
func (id testIdentity) NodeAddr(_ *enr.Record) []byte { return id[:] }
func (testIdentity) Verify(*enr.Record, []byte) error { return nil }
func (id testIdentity) NodeAddr(*enr.Record) []byte { return id[:] }
func TestListTrustedPeer(t *testing.T) {
ids := libp2ptest.GeneratePeerIDs(9)
@@ -62,13 +62,13 @@ func TestListTrustedPeer(t *testing.T) {
switch i {
case 0, 1:
peerStatus.SetConnectionState(id, peers.PeerConnecting)
peerStatus.SetConnectionState(id, peers.Connecting)
case 2, 3:
peerStatus.SetConnectionState(id, peers.PeerConnected)
peerStatus.SetConnectionState(id, peers.Connected)
case 4, 5:
peerStatus.SetConnectionState(id, peers.PeerDisconnecting)
peerStatus.SetConnectionState(id, peers.Disconnecting)
case 6, 7:
peerStatus.SetConnectionState(id, peers.PeerDisconnected)
peerStatus.SetConnectionState(id, peers.Disconnected)
default:
t.Fatalf("Failed to set connection state for peer")
}

View File

@@ -1094,7 +1094,7 @@ func TestBlocksQueue_stuckInUnfavourableFork(t *testing.T) {
// its claims with actual blocks.
emptyPeer := connectPeerHavingBlocks(t, p2p, chain1, finalizedSlot, p2p.Peers())
defer func() {
p2p.Peers().SetConnectionState(emptyPeer, peers.PeerDisconnected)
p2p.Peers().SetConnectionState(emptyPeer, peers.Disconnected)
}()
chainState, err := p2p.Peers().ChainState(emptyPeer)
require.NoError(t, err)
@@ -1291,7 +1291,7 @@ func TestBlocksQueue_stuckWhenHeadIsSetToOrphanedBlock(t *testing.T) {
// Connect peer that has all the blocks available.
allBlocksPeer := connectPeerHavingBlocks(t, p2p, chain, finalizedSlot, p2p.Peers())
defer func() {
p2p.Peers().SetConnectionState(allBlocksPeer, peers.PeerDisconnected)
p2p.Peers().SetConnectionState(allBlocksPeer, peers.Disconnected)
}()
// Queue should be able to fetch whole chain (including slot which comes before the currently set head).

View File

@@ -227,7 +227,7 @@ func connectPeer(t *testing.T, host *p2pt.TestP2P, datum *peerData, peerStatus *
p.Connect(host)
peerStatus.Add(new(enr.Record), p.PeerID(), nil, network.DirOutbound)
peerStatus.SetConnectionState(p.PeerID(), peers.PeerConnected)
peerStatus.SetConnectionState(p.PeerID(), peers.Connected)
peerStatus.SetChainState(p.PeerID(), &ethpb.Status{
ForkDigest: params.BeaconConfig().GenesisForkVersion,
FinalizedRoot: []byte(fmt.Sprintf("finalized_root %d", datum.finalizedEpoch)),
@@ -326,7 +326,7 @@ func connectPeerHavingBlocks(
require.NoError(t, err)
peerStatus.Add(new(enr.Record), p.PeerID(), nil, network.DirOutbound)
peerStatus.SetConnectionState(p.PeerID(), peers.PeerConnected)
peerStatus.SetConnectionState(p.PeerID(), peers.Connected)
peerStatus.SetChainState(p.PeerID(), &ethpb.Status{
ForkDigest: params.BeaconConfig().GenesisForkVersion,
FinalizedRoot: []byte(fmt.Sprintf("finalized_root %d", finalizedEpoch)),

View File

@@ -40,7 +40,7 @@ func TestProcessPendingAtts_NoBlockRequestBlock(t *testing.T) {
p1.Connect(p2)
assert.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected")
p1.Peers().Add(new(enr.Record), p2.PeerID(), nil, network.DirOutbound)
p1.Peers().SetConnectionState(p2.PeerID(), peers.PeerConnected)
p1.Peers().SetConnectionState(p2.PeerID(), peers.Connected)
p1.Peers().SetChainState(p2.PeerID(), &ethpb.Status{})
chain := &mock.ChainService{Genesis: prysmTime.Now(), FinalizedCheckPoint: &ethpb.Checkpoint{}}

View File

@@ -406,7 +406,7 @@ func TestRegularSyncBeaconBlockSubscriber_ProcessPendingBlocks_2Chains(t *testin
r.initCaches()
p1.Peers().Add(new(enr.Record), p2.PeerID(), nil, network.DirOutbound)
p1.Peers().SetConnectionState(p2.PeerID(), peers.PeerConnected)
p1.Peers().SetConnectionState(p2.PeerID(), peers.Connected)
p1.Peers().SetChainState(p2.PeerID(), &ethpb.Status{})
b0 := util.NewBeaconBlock()
@@ -505,7 +505,7 @@ func TestRegularSyncBeaconBlockSubscriber_PruneOldPendingBlocks(t *testing.T) {
r.initCaches()
p1.Peers().Add(new(enr.Record), p1.PeerID(), nil, network.DirOutbound)
p1.Peers().SetConnectionState(p1.PeerID(), peers.PeerConnected)
p1.Peers().SetConnectionState(p1.PeerID(), peers.Connected)
p1.Peers().SetChainState(p1.PeerID(), &ethpb.Status{})
b0 := util.NewBeaconBlock()
@@ -611,7 +611,7 @@ func TestService_BatchRootRequest(t *testing.T) {
r.initCaches()
p1.Peers().Add(new(enr.Record), p2.PeerID(), nil, network.DirOutbound)
p1.Peers().SetConnectionState(p2.PeerID(), peers.PeerConnected)
p1.Peers().SetConnectionState(p2.PeerID(), peers.Connected)
p1.Peers().SetChainState(p2.PeerID(), &ethpb.Status{FinalizedEpoch: 2})
b0 := util.NewBeaconBlock()

View File

@@ -7,12 +7,13 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/trailofbits/go-mutexasserts"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p"
p2ptypes "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types"
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags"
leakybucket "github.com/prysmaticlabs/prysm/v5/container/leaky-bucket"
"github.com/sirupsen/logrus"
"github.com/trailofbits/go-mutexasserts"
)
const defaultBurstLimit = 5
@@ -98,19 +99,20 @@ func (l *limiter) validateRequest(stream network.Stream, amt uint64) error {
defer l.RUnlock()
topic := string(stream.Protocol())
remotePeer := stream.Conn().RemotePeer()
collector, err := l.retrieveCollector(topic)
if err != nil {
return err
}
key := stream.Conn().RemotePeer().String()
remaining := collector.Remaining(key)
remaining := collector.Remaining(remotePeer.String())
// Treat each request as a minimum of 1.
if amt == 0 {
amt = 1
}
if amt > uint64(remaining) {
l.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
l.p2p.Peers().Scorers().BadResponsesScorer().Increment(remotePeer)
writeErrorResponseToStream(responseCodeInvalidRequest, p2ptypes.ErrRateLimited.Error(), stream, l.p2p)
return p2ptypes.ErrRateLimited
}

View File

@@ -97,7 +97,7 @@ func TestRateLimiter_ExceedRawCapacity(t *testing.T) {
for i := 0; i < defaultBurstLimit; i++ {
assert.ErrorContains(t, p2ptypes.ErrRateLimited.Error(), rlimiter.validateRawRpcRequest(stream))
}
assert.Equal(t, true, p1.Peers().IsBad(p2.PeerID()), "peer is not marked as a bad peer")
assert.NotNil(t, p1.Peers().IsBad(p2.PeerID()), "peer is not marked as a bad peer")
require.NoError(t, stream.Close(), "could not close stream")
if util.WaitTimeout(&wg, 1*time.Second) {

View File

@@ -51,6 +51,7 @@ func (s *Service) registerRPCHandlers() {
s.pingHandler,
)
s.registerRPCHandlersAltair()
if currEpoch >= params.BeaconConfig().DenebForkEpoch {
s.registerRPCHandlersDeneb()
}
@@ -138,6 +139,9 @@ func (s *Service) registerRPC(baseTopic string, handle rpcHandler) {
ctx, cancel := context.WithTimeout(s.ctx, ttfbTimeout)
defer cancel()
conn := stream.Conn()
remotePeer := conn.RemotePeer()
// Resetting after closing is a no-op so defer a reset in case something goes wrong.
// It's up to the handler to Close the stream (send an EOF) if
// it successfully writes a response. We don't blindly call
@@ -157,12 +161,12 @@ func (s *Service) registerRPC(baseTopic string, handle rpcHandler) {
ctx, span := trace.StartSpan(ctx, "sync.rpc")
defer span.End()
span.SetAttributes(trace.StringAttribute("topic", topic))
span.SetAttributes(trace.StringAttribute("peer", stream.Conn().RemotePeer().String()))
span.SetAttributes(trace.StringAttribute("peer", remotePeer.String()))
log := log.WithField("peer", stream.Conn().RemotePeer().String()).WithField("topic", string(stream.Protocol()))
// Check before hand that peer is valid.
if s.cfg.p2p.Peers().IsBad(stream.Conn().RemotePeer()) {
if err := s.sendGoodByeAndDisconnect(ctx, p2ptypes.GoodbyeCodeBanned, stream.Conn().RemotePeer()); err != nil {
if err := s.cfg.p2p.Peers().IsBad(remotePeer); err != nil {
if err := s.sendGoodByeAndDisconnect(ctx, p2ptypes.GoodbyeCodeBanned, remotePeer); err != nil {
log.WithError(err).Debug("Could not disconnect from peer")
}
return

View File

@@ -395,7 +395,7 @@ func TestRequestPendingBlobs(t *testing.T) {
Genesis: time.Now(),
}
p1.Peers().Add(new(enr.Record), p2.PeerID(), nil, network.DirOutbound)
p1.Peers().SetConnectionState(p2.PeerID(), peers.PeerConnected)
p1.Peers().SetConnectionState(p2.PeerID(), peers.Connected)
p1.Peers().SetChainState(p2.PeerID(), &ethpb.Status{FinalizedEpoch: 1})
s := &Service{
cfg: &config{

View File

@@ -55,10 +55,7 @@ func (s *Service) goodbyeRPCHandler(_ context.Context, msg interface{}, stream l
// disconnectBadPeer checks whether peer is considered bad by some scorer, and tries to disconnect
// the peer, if that is the case. Additionally, disconnection reason is obtained from scorer.
func (s *Service) disconnectBadPeer(ctx context.Context, id peer.ID) {
if !s.cfg.p2p.Peers().IsBad(id) {
return
}
func (s *Service) disconnectBadPeer(ctx context.Context, id peer.ID, badPeerErr error) {
err := s.cfg.p2p.Peers().Scorers().ValidationError(id)
goodbyeCode := p2ptypes.ErrToGoodbyeCode(err)
if err == nil {
@@ -67,6 +64,8 @@ func (s *Service) disconnectBadPeer(ctx context.Context, id peer.ID) {
if err := s.sendGoodByeAndDisconnect(ctx, goodbyeCode, id); err != nil {
log.WithError(err).Debug("Error when disconnecting with bad peer")
}
log.WithError(badPeerErr).WithField("peerID", id).Debug("Initiate peer disconnection")
}
// A custom goodbye method that is used by our connection handler, in the

View File

@@ -25,7 +25,7 @@ import (
"github.com/sirupsen/logrus"
)
// maintainPeerStatuses by infrequently polling peers for their latest status.
// maintainPeerStatuses maintains peer statuses by polling peers for their latest status twice per epoch.
func (s *Service) maintainPeerStatuses() {
// Run twice per epoch.
interval := time.Duration(params.BeaconConfig().SlotsPerEpoch.Div(2).Mul(params.BeaconConfig().SecondsPerSlot)) * time.Second
@@ -38,16 +38,20 @@ func (s *Service) maintainPeerStatuses() {
// If our peer status has not been updated correctly we disconnect over here
// and set the connection state over here instead.
if s.cfg.p2p.Host().Network().Connectedness(id) != network.Connected {
s.cfg.p2p.Peers().SetConnectionState(id, peers.PeerDisconnecting)
s.cfg.p2p.Peers().SetConnectionState(id, peers.Disconnecting)
if err := s.cfg.p2p.Disconnect(id); err != nil {
log.WithError(err).Debug("Error when disconnecting with peer")
}
s.cfg.p2p.Peers().SetConnectionState(id, peers.PeerDisconnected)
s.cfg.p2p.Peers().SetConnectionState(id, peers.Disconnected)
log.WithFields(logrus.Fields{
"peer": id,
"reason": "maintain peer statuses - peer is not connected",
}).Debug("Initiate peer disconnection")
return
}
// Disconnect from peers that are considered bad by any of the registered scorers.
if s.cfg.p2p.Peers().IsBad(id) {
s.disconnectBadPeer(s.ctx, id)
if err := s.cfg.p2p.Peers().IsBad(id); err != nil {
s.disconnectBadPeer(s.ctx, id, err)
return
}
// If the status hasn't been updated in the recent interval time.
@@ -73,6 +77,11 @@ func (s *Service) maintainPeerStatuses() {
if err := s.sendGoodByeAndDisconnect(s.ctx, p2ptypes.GoodbyeCodeTooManyPeers, id); err != nil {
log.WithField("peer", id).WithError(err).Debug("Could not disconnect with peer")
}
log.WithFields(logrus.Fields{
"peer": id,
"reason": "to be pruned",
}).Debug("Initiate peer disconnection")
}
})
}
@@ -169,8 +178,8 @@ func (s *Service) sendRPCStatusRequest(ctx context.Context, id peer.ID) error {
// If validation fails, validation error is logged, and peer status scorer will mark peer as bad.
err = s.validateStatusMessage(ctx, msg)
s.cfg.p2p.Peers().Scorers().PeerStatusScorer().SetPeerStatus(id, msg, err)
if s.cfg.p2p.Peers().IsBad(id) {
s.disconnectBadPeer(s.ctx, id)
if err := s.cfg.p2p.Peers().IsBad(id); err != nil {
s.disconnectBadPeer(s.ctx, id, err)
}
return err
}
@@ -182,7 +191,7 @@ func (s *Service) reValidatePeer(ctx context.Context, id peer.ID) error {
}
// Do not return an error for ping requests.
if err := s.sendPingRequest(ctx, id); err != nil && !isUnwantedError(err) {
log.WithError(err).Debug("Could not ping peer")
log.WithError(err).WithField("pid", id).Debug("Could not ping peer")
}
return nil
}

View File

@@ -413,7 +413,7 @@ func TestHandshakeHandlers_Roundtrip(t *testing.T) {
assert.Equal(t, numActive1+1, numActive2, "Number of active peers unexpected")
require.NoError(t, p2.Disconnect(p1.PeerID()))
p1.Peers().SetConnectionState(p2.PeerID(), peers.PeerDisconnected)
p1.Peers().SetConnectionState(p2.PeerID(), peers.Disconnected)
// Wait for disconnect event to trigger.
time.Sleep(200 * time.Millisecond)
@@ -877,7 +877,7 @@ func TestStatusRPCRequest_BadPeerHandshake(t *testing.T) {
require.NoError(t, cw.SetClock(startup.NewClock(chain.Genesis, chain.ValidatorsRoot)))
assert.Equal(t, false, p1.Peers().Scorers().IsBadPeer(p2.PeerID()), "Peer is marked as bad")
assert.NoError(t, p1.Peers().Scorers().IsBadPeer(p2.PeerID()), "Peer is marked as bad")
p1.Connect(p2)
if util.WaitTimeout(&wg, time.Second) {
@@ -887,9 +887,9 @@ func TestStatusRPCRequest_BadPeerHandshake(t *testing.T) {
connectionState, err := p1.Peers().ConnectionState(p2.PeerID())
require.NoError(t, err, "Could not obtain peer connection state")
assert.Equal(t, peers.PeerDisconnected, connectionState, "Expected peer to be disconnected")
assert.Equal(t, peers.Disconnected, connectionState, "Expected peer to be disconnected")
assert.Equal(t, true, p1.Peers().Scorers().IsBadPeer(p2.PeerID()), "Peer is not marked as bad")
assert.NotNil(t, p1.Peers().Scorers().IsBadPeer(p2.PeerID()), "Peer is not marked as bad")
}
func TestStatusRPC_ValidGenesisMessage(t *testing.T) {