From 0475631543f86caccf9a22bd814dfbea0936ae80 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Tue, 26 Nov 2024 18:53:27 +0100 Subject: [PATCH] Improve connection/disconnection logging. (#14665) * Improve disconnection logs. * Update beacon-chain/p2p/handshake.go Co-authored-by: Sammy Rosso <15244892+saolyn@users.noreply.github.com> * Address Sammy's comment. * Update beacon-chain/p2p/handshake.go Co-authored-by: Sammy Rosso <15244892+saolyn@users.noreply.github.com> * Fix Sammy's comment. * Fix Sammy's comment. * `MockPeerManager`: Stop mixing value and pointer receivers (deepsource). * Remove unused parameters (deepsource) * Fix receiver names (deepsource) * Change not after into before (deepsource) * Update beacon-chain/p2p/handshake.go Co-authored-by: Sammy Rosso <15244892+saolyn@users.noreply.github.com> * Update beacon-chain/p2p/peers/status.go Co-authored-by: Sammy Rosso <15244892+saolyn@users.noreply.github.com> --------- Co-authored-by: Sammy Rosso <15244892+saolyn@users.noreply.github.com> --- CHANGELOG.md | 1 + beacon-chain/p2p/connection_gater.go | 2 +- beacon-chain/p2p/connection_gater_test.go | 4 +- beacon-chain/p2p/discovery.go | 4 +- beacon-chain/p2p/discovery_test.go | 10 +- beacon-chain/p2p/handshake.go | 131 ++++++++++------ beacon-chain/p2p/peers/peerdata/store.go | 6 +- beacon-chain/p2p/peers/scorers/BUILD.bazel | 1 + .../p2p/peers/scorers/bad_responses.go | 19 ++- .../p2p/peers/scorers/bad_responses_test.go | 26 ++-- .../p2p/peers/scorers/block_providers.go | 4 +- .../p2p/peers/scorers/block_providers_test.go | 10 +- .../p2p/peers/scorers/gossip_scorer.go | 16 +- .../p2p/peers/scorers/gossip_scorer_test.go | 8 +- beacon-chain/p2p/peers/scorers/peer_status.go | 18 ++- .../p2p/peers/scorers/peer_status_test.go | 34 ++-- beacon-chain/p2p/peers/scorers/service.go | 26 ++-- .../p2p/peers/scorers/service_test.go | 24 +-- beacon-chain/p2p/peers/status.go | 95 +++++++----- beacon-chain/p2p/peers/status_test.go | 145 ++++++++++-------- beacon-chain/p2p/service.go | 18 ++- beacon-chain/p2p/testing/mock_peermanager.go | 12 +- .../p2p/testing/mock_peersprovider.go | 4 +- beacon-chain/p2p/testing/p2p.go | 34 ++-- .../rpc/eth/node/handlers_peers_test.go | 16 +- beacon-chain/rpc/prysm/node/handlers_test.go | 12 +- .../sync/initial-sync/blocks_queue_test.go | 4 +- .../sync/initial-sync/initial_sync_test.go | 4 +- .../sync/pending_attestations_queue_test.go | 2 +- .../sync/pending_blocks_queue_test.go | 6 +- beacon-chain/sync/rate_limiter.go | 12 +- beacon-chain/sync/rate_limiter_test.go | 2 +- beacon-chain/sync/rpc.go | 10 +- .../sync/rpc_beacon_blocks_by_root_test.go | 2 +- beacon-chain/sync/rpc_goodbye.go | 7 +- beacon-chain/sync/rpc_status.go | 25 ++- beacon-chain/sync/rpc_status_test.go | 8 +- 37 files changed, 436 insertions(+), 326 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0f1250aa10..2208521465 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,7 @@ The format is based on Keep a Changelog, and this project adheres to Semantic Ve - Added Validator REST mode use of Attestation V2 endpoints and Electra attestations. - PeerDAS: Added proto for `DataColumnIdentifier`, `DataColumnSidecar`, `DataColumnSidecarsByRangeRequest` and `MetadataV2`. - Better attestation packing for Electra. [PR](https://github.com/prysmaticlabs/prysm/pull/14534) +- P2P: Add logs when a peer is (dis)connected. Add the reason of the disconnection when we initiate it. ### Changed diff --git a/beacon-chain/p2p/connection_gater.go b/beacon-chain/p2p/connection_gater.go index a573bea81c..8147d07b6c 100644 --- a/beacon-chain/p2p/connection_gater.go +++ b/beacon-chain/p2p/connection_gater.go @@ -33,7 +33,7 @@ func (*Service) InterceptPeerDial(_ peer.ID) (allow bool) { // multiaddr for the given peer. func (s *Service) InterceptAddrDial(pid peer.ID, m multiaddr.Multiaddr) (allow bool) { // Disallow bad peers from dialing in. - if s.peers.IsBad(pid) { + if s.peers.IsBad(pid) != nil { return false } return filterConnections(s.addrFilter, m) diff --git a/beacon-chain/p2p/connection_gater_test.go b/beacon-chain/p2p/connection_gater_test.go index 4b056a47f5..a2e34417c3 100644 --- a/beacon-chain/p2p/connection_gater_test.go +++ b/beacon-chain/p2p/connection_gater_test.go @@ -50,7 +50,7 @@ func TestPeer_AtMaxLimit(t *testing.T) { }() for i := 0; i < highWatermarkBuffer; i++ { - addPeer(t, s.peers, peers.PeerConnected, false) + addPeer(t, s.peers, peers.Connected, false) } // create alternate host @@ -159,7 +159,7 @@ func TestService_RejectInboundPeersBeyondLimit(t *testing.T) { inboundLimit += 1 // Add in up to inbound peer limit. for i := 0; i < int(inboundLimit); i++ { - addPeer(t, s.peers, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED), false) + addPeer(t, s.peers, peerdata.ConnectionState(ethpb.ConnectionState_CONNECTED), false) } valid = s.InterceptAccept(&maEndpoints{raddr: multiAddress}) if valid { diff --git a/beacon-chain/p2p/discovery.go b/beacon-chain/p2p/discovery.go index 0bc8f70828..1eb5ae3f4f 100644 --- a/beacon-chain/p2p/discovery.go +++ b/beacon-chain/p2p/discovery.go @@ -189,7 +189,7 @@ func (s *Service) RefreshENR() { s.updateSubnetRecordWithMetadataV2(bitV, bitS) } // ping all peers to inform them of new metadata - s.pingPeers() + s.pingPeersAndLogEnr() } // listen for new nodes watches for new nodes in the network and adds them to the peerstore. @@ -452,7 +452,7 @@ func (s *Service) filterPeer(node *enode.Node) bool { } // Ignore bad nodes. - if s.peers.IsBad(peerData.ID) { + if s.peers.IsBad(peerData.ID) != nil { return false } diff --git a/beacon-chain/p2p/discovery_test.go b/beacon-chain/p2p/discovery_test.go index 734533fd25..8dd87333ee 100644 --- a/beacon-chain/p2p/discovery_test.go +++ b/beacon-chain/p2p/discovery_test.go @@ -378,14 +378,14 @@ func TestInboundPeerLimit(t *testing.T) { } for i := 0; i < 30; i++ { - _ = addPeer(t, s.peers, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED), false) + _ = addPeer(t, s.peers, peerdata.ConnectionState(ethpb.ConnectionState_CONNECTED), false) } require.Equal(t, true, s.isPeerAtLimit(false), "not at limit for outbound peers") require.Equal(t, false, s.isPeerAtLimit(true), "at limit for inbound peers") for i := 0; i < highWatermarkBuffer; i++ { - _ = addPeer(t, s.peers, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED), false) + _ = addPeer(t, s.peers, peerdata.ConnectionState(ethpb.ConnectionState_CONNECTED), false) } require.Equal(t, true, s.isPeerAtLimit(true), "not at limit for inbound peers") @@ -404,13 +404,13 @@ func TestOutboundPeerThreshold(t *testing.T) { } for i := 0; i < 2; i++ { - _ = addPeer(t, s.peers, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED), true) + _ = addPeer(t, s.peers, peerdata.ConnectionState(ethpb.ConnectionState_CONNECTED), true) } require.Equal(t, true, s.isBelowOutboundPeerThreshold(), "not at outbound peer threshold") for i := 0; i < 3; i++ { - _ = addPeer(t, s.peers, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED), true) + _ = addPeer(t, s.peers, peerdata.ConnectionState(ethpb.ConnectionState_CONNECTED), true) } require.Equal(t, false, s.isBelowOutboundPeerThreshold(), "still at outbound peer threshold") @@ -477,7 +477,7 @@ func TestCorrectUDPVersion(t *testing.T) { } // addPeer is a helper to add a peer with a given connection state) -func addPeer(t *testing.T, p *peers.Status, state peerdata.PeerConnectionState, outbound bool) peer.ID { +func addPeer(t *testing.T, p *peers.Status, state peerdata.ConnectionState, outbound bool) peer.ID { // Set up some peers with different states mhBytes := []byte{0x11, 0x04} idBytes := make([]byte, 4) diff --git a/beacon-chain/p2p/handshake.go b/beacon-chain/p2p/handshake.go index 97d2af8eed..df19f861ee 100644 --- a/beacon-chain/p2p/handshake.go +++ b/beacon-chain/p2p/handshake.go @@ -2,7 +2,6 @@ package p2p import ( "context" - "errors" "fmt" "io" "sync" @@ -10,6 +9,7 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" + "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers/peerdata" prysmTime "github.com/prysmaticlabs/prysm/v5/time" @@ -25,6 +25,46 @@ func peerMultiaddrString(conn network.Conn) string { return fmt.Sprintf("%s/p2p/%s", conn.RemoteMultiaddr().String(), conn.RemotePeer().String()) } +func (s *Service) connectToPeer(conn network.Conn) { + s.peers.SetConnectionState(conn.RemotePeer(), peers.Connected) + // Go through the handshake process. + log.WithFields(logrus.Fields{ + "direction": conn.Stat().Direction.String(), + "multiAddr": peerMultiaddrString(conn), + "activePeers": len(s.peers.Active()), + }).Debug("Initiate peer connection") +} + +func (s *Service) disconnectFromPeerOnError( + conn network.Conn, + goodByeFunc func(ctx context.Context, id peer.ID) error, + badPeerErr error, +) { + // Get the remote peer ID. + remotePeerID := conn.RemotePeer() + + // Set the peer to disconnecting state. + s.peers.SetConnectionState(remotePeerID, peers.Disconnecting) + + // Only attempt a goodbye if we are still connected to the peer. + if s.host.Network().Connectedness(remotePeerID) == network.Connected { + if err := goodByeFunc(context.TODO(), remotePeerID); err != nil { + log.WithError(err).Error("Unable to disconnect from peer") + } + } + + log. + WithError(badPeerErr). + WithFields(logrus.Fields{ + "multiaddr": peerMultiaddrString(conn), + "direction": conn.Stat().Direction.String(), + "remainingActivePeers": len(s.peers.Active()), + }). + Debug("Initiate peer disconnection") + + s.peers.SetConnectionState(remotePeerID, peers.Disconnected) +} + // AddConnectionHandler adds a callback function which handles the connection with a // newly added peer. It performs a handshake with that peer by sending a hello request // and validating the response from the peer. @@ -57,18 +97,9 @@ func (s *Service) AddConnectionHandler(reqFunc, goodByeFunc func(ctx context.Con } s.host.Network().Notify(&network.NotifyBundle{ - ConnectedF: func(net network.Network, conn network.Conn) { + ConnectedF: func(_ network.Network, conn network.Conn) { remotePeer := conn.RemotePeer() - disconnectFromPeer := func() { - s.peers.SetConnectionState(remotePeer, peers.PeerDisconnecting) - // Only attempt a goodbye if we are still connected to the peer. - if s.host.Network().Connectedness(remotePeer) == network.Connected { - if err := goodByeFunc(context.TODO(), remotePeer); err != nil { - log.WithError(err).Error("Unable to disconnect from peer") - } - } - s.peers.SetConnectionState(remotePeer, peers.PeerDisconnected) - } + // Connection handler must be non-blocking as part of libp2p design. go func() { if peerHandshaking(remotePeer) { @@ -77,28 +108,21 @@ func (s *Service) AddConnectionHandler(reqFunc, goodByeFunc func(ctx context.Con return } defer peerFinished(remotePeer) + // Handle the various pre-existing conditions that will result in us not handshaking. peerConnectionState, err := s.peers.ConnectionState(remotePeer) - if err == nil && (peerConnectionState == peers.PeerConnected || peerConnectionState == peers.PeerConnecting) { + if err == nil && (peerConnectionState == peers.Connected || peerConnectionState == peers.Connecting) { log.WithField("currentState", peerConnectionState).WithField("reason", "already active").Trace("Ignoring connection request") return } + s.peers.Add(nil /* ENR */, remotePeer, conn.RemoteMultiaddr(), conn.Stat().Direction) + // Defensive check in the event we still get a bad peer. - if s.peers.IsBad(remotePeer) { - log.WithField("reason", "bad peer").Trace("Ignoring connection request") - disconnectFromPeer() + if err := s.peers.IsBad(remotePeer); err != nil { + s.disconnectFromPeerOnError(conn, goodByeFunc, err) return } - validPeerConnection := func() { - s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerConnected) - // Go through the handshake process. - log.WithFields(logrus.Fields{ - "direction": conn.Stat().Direction, - "multiAddr": peerMultiaddrString(conn), - "activePeers": len(s.peers.Active()), - }).Debug("Peer connected") - } // Do not perform handshake on inbound dials. if conn.Stat().Direction == network.DirInbound { @@ -117,63 +141,80 @@ func (s *Service) AddConnectionHandler(reqFunc, goodByeFunc func(ctx context.Con // If peer hasn't sent a status request, we disconnect with them if _, err := s.peers.ChainState(remotePeer); errors.Is(err, peerdata.ErrPeerUnknown) || errors.Is(err, peerdata.ErrNoPeerStatus) { statusMessageMissing.Inc() - disconnectFromPeer() + s.disconnectFromPeerOnError(conn, goodByeFunc, errors.Wrap(err, "chain state")) return } + if peerExists { updated, err := s.peers.ChainStateLastUpdated(remotePeer) if err != nil { - disconnectFromPeer() + s.disconnectFromPeerOnError(conn, goodByeFunc, errors.Wrap(err, "chain state last updated")) return } - // exit if we don't receive any current status messages from - // peer. - if updated.IsZero() || !updated.After(currentTime) { - disconnectFromPeer() + + // Exit if we don't receive any current status messages from peer. + if updated.IsZero() { + s.disconnectFromPeerOnError(conn, goodByeFunc, errors.New("is zero")) + return + } + + if updated.Before(currentTime) { + s.disconnectFromPeerOnError(conn, goodByeFunc, errors.New("did not update")) return } } - validPeerConnection() + + s.connectToPeer(conn) return } - s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerConnecting) + s.peers.SetConnectionState(conn.RemotePeer(), peers.Connecting) if err := reqFunc(context.TODO(), conn.RemotePeer()); err != nil && !errors.Is(err, io.EOF) { - log.WithError(err).Trace("Handshake failed") - disconnectFromPeer() + s.disconnectFromPeerOnError(conn, goodByeFunc, err) return } - validPeerConnection() + + s.connectToPeer(conn) }() }, }) } -// AddDisconnectionHandler disconnects from peers. It handles updating the peer status. +// AddDisconnectionHandler disconnects from peers. It handles updating the peer status. // This also calls the handler responsible for maintaining other parts of the sync or p2p system. func (s *Service) AddDisconnectionHandler(handler func(ctx context.Context, id peer.ID) error) { s.host.Network().Notify(&network.NotifyBundle{ DisconnectedF: func(net network.Network, conn network.Conn) { - log := log.WithField("multiAddr", peerMultiaddrString(conn)) + peerID := conn.RemotePeer() + + log.WithFields(logrus.Fields{ + "multiAddr": peerMultiaddrString(conn), + "direction": conn.Stat().Direction.String(), + }) // Must be handled in a goroutine as this callback cannot be blocking. go func() { // Exit early if we are still connected to the peer. - if net.Connectedness(conn.RemotePeer()) == network.Connected { + if net.Connectedness(peerID) == network.Connected { return } - priorState, err := s.peers.ConnectionState(conn.RemotePeer()) + + priorState, err := s.peers.ConnectionState(peerID) if err != nil { // Can happen if the peer has already disconnected, so... - priorState = peers.PeerDisconnected + priorState = peers.Disconnected } - s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerDisconnecting) + + s.peers.SetConnectionState(peerID, peers.Disconnecting) if err := handler(context.TODO(), conn.RemotePeer()); err != nil { log.WithError(err).Error("Disconnect handler failed") } - s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerDisconnected) + + s.peers.SetConnectionState(peerID, peers.Disconnected) + // Only log disconnections if we were fully connected. - if priorState == peers.PeerConnected { - log.WithField("activePeers", len(s.peers.Active())).Debug("Peer disconnected") + if priorState == peers.Connected { + activePeersCount := len(s.peers.Active()) + log.WithField("remainingActivePeers", activePeersCount).Debug("Peer disconnected") } }() }, diff --git a/beacon-chain/p2p/peers/peerdata/store.go b/beacon-chain/p2p/peers/peerdata/store.go index d22d888826..1e3f49a384 100644 --- a/beacon-chain/p2p/peers/peerdata/store.go +++ b/beacon-chain/p2p/peers/peerdata/store.go @@ -23,8 +23,8 @@ var ( ErrNoPeerStatus = errors.New("no chain status for peer") ) -// PeerConnectionState is the state of the connection. -type PeerConnectionState ethpb.ConnectionState +// ConnectionState is the state of the connection. +type ConnectionState ethpb.ConnectionState // StoreConfig holds peer store parameters. type StoreConfig struct { @@ -49,7 +49,7 @@ type PeerData struct { // Network related data. Address ma.Multiaddr Direction network.Direction - ConnState PeerConnectionState + ConnState ConnectionState Enr *enr.Record NextValidTime time.Time // Chain related data. diff --git a/beacon-chain/p2p/peers/scorers/BUILD.bazel b/beacon-chain/p2p/peers/scorers/BUILD.bazel index e6eb6a277c..463ade4fa2 100644 --- a/beacon-chain/p2p/peers/scorers/BUILD.bazel +++ b/beacon-chain/p2p/peers/scorers/BUILD.bazel @@ -20,6 +20,7 @@ go_library( "//crypto/rand:go_default_library", "//proto/prysm/v1alpha1:go_default_library", "@com_github_libp2p_go_libp2p//core/peer:go_default_library", + "@com_github_pkg_errors//:go_default_library", ], ) diff --git a/beacon-chain/p2p/peers/scorers/bad_responses.go b/beacon-chain/p2p/peers/scorers/bad_responses.go index 73d74ecfc0..9e834e2578 100644 --- a/beacon-chain/p2p/peers/scorers/bad_responses.go +++ b/beacon-chain/p2p/peers/scorers/bad_responses.go @@ -4,6 +4,7 @@ import ( "time" "github.com/libp2p/go-libp2p/core/peer" + "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers/peerdata" ) @@ -61,7 +62,7 @@ func (s *BadResponsesScorer) Score(pid peer.ID) float64 { // scoreNoLock is a lock-free version of Score. func (s *BadResponsesScorer) scoreNoLock(pid peer.ID) float64 { - if s.isBadPeerNoLock(pid) { + if s.isBadPeerNoLock(pid) != nil { return BadPeerScore } score := float64(0) @@ -116,18 +117,24 @@ func (s *BadResponsesScorer) Increment(pid peer.ID) { // IsBadPeer states if the peer is to be considered bad. // If the peer is unknown this will return `false`, which makes using this function easier than returning an error. -func (s *BadResponsesScorer) IsBadPeer(pid peer.ID) bool { +func (s *BadResponsesScorer) IsBadPeer(pid peer.ID) error { s.store.RLock() defer s.store.RUnlock() + return s.isBadPeerNoLock(pid) } // isBadPeerNoLock is lock-free version of IsBadPeer. -func (s *BadResponsesScorer) isBadPeerNoLock(pid peer.ID) bool { +func (s *BadResponsesScorer) isBadPeerNoLock(pid peer.ID) error { if peerData, ok := s.store.PeerData(pid); ok { - return peerData.BadResponses >= s.config.Threshold + if peerData.BadResponses >= s.config.Threshold { + return errors.Errorf("peer exceeded bad responses threshold: got %d, threshold %d", peerData.BadResponses, s.config.Threshold) + } + + return nil } - return false + + return nil } // BadPeers returns the peers that are considered bad. @@ -137,7 +144,7 @@ func (s *BadResponsesScorer) BadPeers() []peer.ID { badPeers := make([]peer.ID, 0) for pid := range s.store.Peers() { - if s.isBadPeerNoLock(pid) { + if s.isBadPeerNoLock(pid) != nil { badPeers = append(badPeers, pid) } } diff --git a/beacon-chain/p2p/peers/scorers/bad_responses_test.go b/beacon-chain/p2p/peers/scorers/bad_responses_test.go index 186a50f55d..094be28d5f 100644 --- a/beacon-chain/p2p/peers/scorers/bad_responses_test.go +++ b/beacon-chain/p2p/peers/scorers/bad_responses_test.go @@ -33,19 +33,19 @@ func TestScorers_BadResponses_Score(t *testing.T) { assert.Equal(t, 0., scorer.Score(pid), "Unexpected score for unregistered peer") scorer.Increment(pid) - assert.Equal(t, false, scorer.IsBadPeer(pid)) + assert.NoError(t, scorer.IsBadPeer(pid)) assert.Equal(t, -2.5, scorer.Score(pid)) scorer.Increment(pid) - assert.Equal(t, false, scorer.IsBadPeer(pid)) + assert.NoError(t, scorer.IsBadPeer(pid)) assert.Equal(t, float64(-5), scorer.Score(pid)) scorer.Increment(pid) - assert.Equal(t, false, scorer.IsBadPeer(pid)) + assert.NoError(t, scorer.IsBadPeer(pid)) assert.Equal(t, float64(-7.5), scorer.Score(pid)) scorer.Increment(pid) - assert.Equal(t, true, scorer.IsBadPeer(pid)) + assert.NotNil(t, scorer.IsBadPeer(pid)) assert.Equal(t, -100.0, scorer.Score(pid)) } @@ -152,17 +152,17 @@ func TestScorers_BadResponses_IsBadPeer(t *testing.T) { }) scorer := peerStatuses.Scorers().BadResponsesScorer() pid := peer.ID("peer1") - assert.Equal(t, false, scorer.IsBadPeer(pid)) + assert.NoError(t, scorer.IsBadPeer(pid)) peerStatuses.Add(nil, pid, nil, network.DirUnknown) - assert.Equal(t, false, scorer.IsBadPeer(pid)) + assert.NoError(t, scorer.IsBadPeer(pid)) for i := 0; i < scorers.DefaultBadResponsesThreshold; i++ { scorer.Increment(pid) if i == scorers.DefaultBadResponsesThreshold-1 { - assert.Equal(t, true, scorer.IsBadPeer(pid), "Unexpected peer status") + assert.NotNil(t, scorer.IsBadPeer(pid), "Unexpected peer status") } else { - assert.Equal(t, false, scorer.IsBadPeer(pid), "Unexpected peer status") + assert.NoError(t, scorer.IsBadPeer(pid), "Unexpected peer status") } } } @@ -185,11 +185,11 @@ func TestScorers_BadResponses_BadPeers(t *testing.T) { scorer.Increment(pids[2]) scorer.Increment(pids[4]) } - assert.Equal(t, false, scorer.IsBadPeer(pids[0]), "Invalid peer status") - assert.Equal(t, true, scorer.IsBadPeer(pids[1]), "Invalid peer status") - assert.Equal(t, true, scorer.IsBadPeer(pids[2]), "Invalid peer status") - assert.Equal(t, false, scorer.IsBadPeer(pids[3]), "Invalid peer status") - assert.Equal(t, true, scorer.IsBadPeer(pids[4]), "Invalid peer status") + assert.NoError(t, scorer.IsBadPeer(pids[0]), "Invalid peer status") + assert.NotNil(t, scorer.IsBadPeer(pids[1]), "Invalid peer status") + assert.NotNil(t, scorer.IsBadPeer(pids[2]), "Invalid peer status") + assert.NoError(t, scorer.IsBadPeer(pids[3]), "Invalid peer status") + assert.NotNil(t, scorer.IsBadPeer(pids[4]), "Invalid peer status") want := []peer.ID{pids[1], pids[2], pids[4]} badPeers := scorer.BadPeers() sort.Slice(badPeers, func(i, j int) bool { diff --git a/beacon-chain/p2p/peers/scorers/block_providers.go b/beacon-chain/p2p/peers/scorers/block_providers.go index 649ff57009..9840b9c081 100644 --- a/beacon-chain/p2p/peers/scorers/block_providers.go +++ b/beacon-chain/p2p/peers/scorers/block_providers.go @@ -177,8 +177,8 @@ func (s *BlockProviderScorer) processedBlocksNoLock(pid peer.ID) uint64 { // Block provider scorer cannot guarantee that lower score of a peer is indeed a sign of a bad peer. // Therefore this scorer never marks peers as bad, and relies on scores to probabilistically sort // out low-scorers (see WeightSorted method). -func (*BlockProviderScorer) IsBadPeer(_ peer.ID) bool { - return false +func (*BlockProviderScorer) IsBadPeer(_ peer.ID) error { + return nil } // BadPeers returns the peers that are considered bad. diff --git a/beacon-chain/p2p/peers/scorers/block_providers_test.go b/beacon-chain/p2p/peers/scorers/block_providers_test.go index bcb2c8d45e..70b29c05a3 100644 --- a/beacon-chain/p2p/peers/scorers/block_providers_test.go +++ b/beacon-chain/p2p/peers/scorers/block_providers_test.go @@ -119,7 +119,7 @@ func TestScorers_BlockProvider_Score(t *testing.T) { } for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { + t.Run(tt.name, func(*testing.T) { peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{ PeerLimit: 30, ScorerParams: &scorers.Config{ @@ -224,7 +224,7 @@ func TestScorers_BlockProvider_Sorted(t *testing.T) { }{ { name: "no peers", - update: func(s *scorers.BlockProviderScorer) {}, + update: func(*scorers.BlockProviderScorer) {}, have: []peer.ID{}, want: []peer.ID{}, }, @@ -451,7 +451,7 @@ func TestScorers_BlockProvider_FormatScorePretty(t *testing.T) { }) } for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { + t.Run(tt.name, func(*testing.T) { peerStatuses := peerStatusGen() scorer := peerStatuses.Scorers().BlockProviderScorer() if tt.update != nil { @@ -481,8 +481,8 @@ func TestScorers_BlockProvider_BadPeerMarking(t *testing.T) { }) scorer := peerStatuses.Scorers().BlockProviderScorer() - assert.Equal(t, false, scorer.IsBadPeer("peer1"), "Unexpected status for unregistered peer") + assert.NoError(t, scorer.IsBadPeer("peer1"), "Unexpected status for unregistered peer") scorer.IncrementProcessedBlocks("peer1", 64) - assert.Equal(t, false, scorer.IsBadPeer("peer1")) + assert.NoError(t, scorer.IsBadPeer("peer1")) assert.Equal(t, 0, len(scorer.BadPeers())) } diff --git a/beacon-chain/p2p/peers/scorers/gossip_scorer.go b/beacon-chain/p2p/peers/scorers/gossip_scorer.go index 5482ebde74..1adec7b9eb 100644 --- a/beacon-chain/p2p/peers/scorers/gossip_scorer.go +++ b/beacon-chain/p2p/peers/scorers/gossip_scorer.go @@ -2,6 +2,7 @@ package scorers import ( "github.com/libp2p/go-libp2p/core/peer" + "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers/peerdata" pbrpc "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" ) @@ -51,19 +52,24 @@ func (s *GossipScorer) scoreNoLock(pid peer.ID) float64 { } // IsBadPeer states if the peer is to be considered bad. -func (s *GossipScorer) IsBadPeer(pid peer.ID) bool { +func (s *GossipScorer) IsBadPeer(pid peer.ID) error { s.store.RLock() defer s.store.RUnlock() return s.isBadPeerNoLock(pid) } // isBadPeerNoLock is lock-free version of IsBadPeer. -func (s *GossipScorer) isBadPeerNoLock(pid peer.ID) bool { +func (s *GossipScorer) isBadPeerNoLock(pid peer.ID) error { peerData, ok := s.store.PeerData(pid) if !ok { - return false + return nil } - return peerData.GossipScore < gossipThreshold + + if peerData.GossipScore < gossipThreshold { + return errors.Errorf("gossip score below threshold: got %f - threshold %f", peerData.GossipScore, gossipThreshold) + } + + return nil } // BadPeers returns the peers that are considered bad. @@ -73,7 +79,7 @@ func (s *GossipScorer) BadPeers() []peer.ID { badPeers := make([]peer.ID, 0) for pid := range s.store.Peers() { - if s.isBadPeerNoLock(pid) { + if s.isBadPeerNoLock(pid) != nil { badPeers = append(badPeers, pid) } } diff --git a/beacon-chain/p2p/peers/scorers/gossip_scorer_test.go b/beacon-chain/p2p/peers/scorers/gossip_scorer_test.go index 98fccf38d1..f8cbb21e07 100644 --- a/beacon-chain/p2p/peers/scorers/gossip_scorer_test.go +++ b/beacon-chain/p2p/peers/scorers/gossip_scorer_test.go @@ -21,7 +21,7 @@ func TestScorers_Gossip_Score(t *testing.T) { }{ { name: "nonexistent peer", - update: func(scorer *scorers.GossipScorer) { + update: func(*scorers.GossipScorer) { }, check: func(scorer *scorers.GossipScorer) { assert.Equal(t, 0.0, scorer.Score("peer1"), "Unexpected score") @@ -34,7 +34,7 @@ func TestScorers_Gossip_Score(t *testing.T) { }, check: func(scorer *scorers.GossipScorer) { assert.Equal(t, -101.0, scorer.Score("peer1"), "Unexpected score") - assert.Equal(t, true, scorer.IsBadPeer("peer1"), "Unexpected good peer") + assert.NotNil(t, scorer.IsBadPeer("peer1"), "Unexpected good peer") }, }, { @@ -44,7 +44,7 @@ func TestScorers_Gossip_Score(t *testing.T) { }, check: func(scorer *scorers.GossipScorer) { assert.Equal(t, 10.0, scorer.Score("peer1"), "Unexpected score") - assert.Equal(t, false, scorer.IsBadPeer("peer1"), "Unexpected bad peer") + assert.Equal(t, nil, scorer.IsBadPeer("peer1"), "Unexpected bad peer") _, _, topicMap, err := scorer.GossipData("peer1") assert.NoError(t, err) assert.Equal(t, uint64(100), topicMap["a"].TimeInMesh, "incorrect time in mesh") @@ -53,7 +53,7 @@ func TestScorers_Gossip_Score(t *testing.T) { } for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { + t.Run(tt.name, func(*testing.T) { peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{ ScorerParams: &scorers.Config{}, }) diff --git a/beacon-chain/p2p/peers/scorers/peer_status.go b/beacon-chain/p2p/peers/scorers/peer_status.go index 5153c0c784..6003bb4b71 100644 --- a/beacon-chain/p2p/peers/scorers/peer_status.go +++ b/beacon-chain/p2p/peers/scorers/peer_status.go @@ -46,7 +46,7 @@ func (s *PeerStatusScorer) Score(pid peer.ID) float64 { // scoreNoLock is a lock-free version of Score. func (s *PeerStatusScorer) scoreNoLock(pid peer.ID) float64 { - if s.isBadPeerNoLock(pid) { + if s.isBadPeerNoLock(pid) != nil { return BadPeerScore } score := float64(0) @@ -67,30 +67,34 @@ func (s *PeerStatusScorer) scoreNoLock(pid peer.ID) float64 { } // IsBadPeer states if the peer is to be considered bad. -func (s *PeerStatusScorer) IsBadPeer(pid peer.ID) bool { +func (s *PeerStatusScorer) IsBadPeer(pid peer.ID) error { s.store.RLock() defer s.store.RUnlock() + return s.isBadPeerNoLock(pid) } // isBadPeerNoLock is lock-free version of IsBadPeer. -func (s *PeerStatusScorer) isBadPeerNoLock(pid peer.ID) bool { +func (s *PeerStatusScorer) isBadPeerNoLock(pid peer.ID) error { peerData, ok := s.store.PeerData(pid) if !ok { - return false + return nil } + // Mark peer as bad, if the latest error is one of the terminal ones. terminalErrs := []error{ p2ptypes.ErrWrongForkDigestVersion, p2ptypes.ErrInvalidFinalizedRoot, p2ptypes.ErrInvalidRequest, } + for _, err := range terminalErrs { if errors.Is(peerData.ChainStateValidationError, err) { - return true + return err } } - return false + + return nil } // BadPeers returns the peers that are considered bad. @@ -100,7 +104,7 @@ func (s *PeerStatusScorer) BadPeers() []peer.ID { badPeers := make([]peer.ID, 0) for pid := range s.store.Peers() { - if s.isBadPeerNoLock(pid) { + if s.isBadPeerNoLock(pid) != nil { badPeers = append(badPeers, pid) } } diff --git a/beacon-chain/p2p/peers/scorers/peer_status_test.go b/beacon-chain/p2p/peers/scorers/peer_status_test.go index 241749068d..8fad8f93a2 100644 --- a/beacon-chain/p2p/peers/scorers/peer_status_test.go +++ b/beacon-chain/p2p/peers/scorers/peer_status_test.go @@ -122,7 +122,7 @@ func TestScorers_PeerStatus_Score(t *testing.T) { } for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { + t.Run(tt.name, func(*testing.T) { peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{ ScorerParams: &scorers.Config{}, }) @@ -140,12 +140,12 @@ func TestScorers_PeerStatus_IsBadPeer(t *testing.T) { ScorerParams: &scorers.Config{}, }) pid := peer.ID("peer1") - assert.Equal(t, false, peerStatuses.Scorers().IsBadPeer(pid)) - assert.Equal(t, false, peerStatuses.Scorers().PeerStatusScorer().IsBadPeer(pid)) + assert.NoError(t, peerStatuses.Scorers().IsBadPeer(pid)) + assert.NoError(t, peerStatuses.Scorers().PeerStatusScorer().IsBadPeer(pid)) peerStatuses.Scorers().PeerStatusScorer().SetPeerStatus(pid, &pb.Status{}, p2ptypes.ErrWrongForkDigestVersion) - assert.Equal(t, true, peerStatuses.Scorers().IsBadPeer(pid)) - assert.Equal(t, true, peerStatuses.Scorers().PeerStatusScorer().IsBadPeer(pid)) + assert.NotNil(t, peerStatuses.Scorers().IsBadPeer(pid)) + assert.NotNil(t, peerStatuses.Scorers().PeerStatusScorer().IsBadPeer(pid)) } func TestScorers_PeerStatus_BadPeers(t *testing.T) { @@ -155,22 +155,22 @@ func TestScorers_PeerStatus_BadPeers(t *testing.T) { pid1 := peer.ID("peer1") pid2 := peer.ID("peer2") pid3 := peer.ID("peer3") - assert.Equal(t, false, peerStatuses.Scorers().IsBadPeer(pid1)) - assert.Equal(t, false, peerStatuses.Scorers().PeerStatusScorer().IsBadPeer(pid1)) - assert.Equal(t, false, peerStatuses.Scorers().IsBadPeer(pid2)) - assert.Equal(t, false, peerStatuses.Scorers().PeerStatusScorer().IsBadPeer(pid2)) - assert.Equal(t, false, peerStatuses.Scorers().IsBadPeer(pid3)) - assert.Equal(t, false, peerStatuses.Scorers().PeerStatusScorer().IsBadPeer(pid3)) + assert.NoError(t, peerStatuses.Scorers().IsBadPeer(pid1)) + assert.NoError(t, peerStatuses.Scorers().PeerStatusScorer().IsBadPeer(pid1)) + assert.NoError(t, peerStatuses.Scorers().IsBadPeer(pid2)) + assert.NoError(t, peerStatuses.Scorers().PeerStatusScorer().IsBadPeer(pid2)) + assert.NoError(t, peerStatuses.Scorers().IsBadPeer(pid3)) + assert.NoError(t, peerStatuses.Scorers().PeerStatusScorer().IsBadPeer(pid3)) peerStatuses.Scorers().PeerStatusScorer().SetPeerStatus(pid1, &pb.Status{}, p2ptypes.ErrWrongForkDigestVersion) peerStatuses.Scorers().PeerStatusScorer().SetPeerStatus(pid2, &pb.Status{}, nil) peerStatuses.Scorers().PeerStatusScorer().SetPeerStatus(pid3, &pb.Status{}, p2ptypes.ErrWrongForkDigestVersion) - assert.Equal(t, true, peerStatuses.Scorers().IsBadPeer(pid1)) - assert.Equal(t, true, peerStatuses.Scorers().PeerStatusScorer().IsBadPeer(pid1)) - assert.Equal(t, false, peerStatuses.Scorers().IsBadPeer(pid2)) - assert.Equal(t, false, peerStatuses.Scorers().PeerStatusScorer().IsBadPeer(pid2)) - assert.Equal(t, true, peerStatuses.Scorers().IsBadPeer(pid3)) - assert.Equal(t, true, peerStatuses.Scorers().PeerStatusScorer().IsBadPeer(pid3)) + assert.NotNil(t, peerStatuses.Scorers().IsBadPeer(pid1)) + assert.NotNil(t, peerStatuses.Scorers().PeerStatusScorer().IsBadPeer(pid1)) + assert.NoError(t, peerStatuses.Scorers().IsBadPeer(pid2)) + assert.NoError(t, peerStatuses.Scorers().PeerStatusScorer().IsBadPeer(pid2)) + assert.NotNil(t, peerStatuses.Scorers().IsBadPeer(pid3)) + assert.NotNil(t, peerStatuses.Scorers().PeerStatusScorer().IsBadPeer(pid3)) assert.Equal(t, 2, len(peerStatuses.Scorers().PeerStatusScorer().BadPeers())) assert.Equal(t, 2, len(peerStatuses.Scorers().BadPeers())) } diff --git a/beacon-chain/p2p/peers/scorers/service.go b/beacon-chain/p2p/peers/scorers/service.go index 4ae91fc499..108315882c 100644 --- a/beacon-chain/p2p/peers/scorers/service.go +++ b/beacon-chain/p2p/peers/scorers/service.go @@ -6,6 +6,7 @@ import ( "time" "github.com/libp2p/go-libp2p/core/peer" + "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers/peerdata" "github.com/prysmaticlabs/prysm/v5/config/features" ) @@ -24,7 +25,7 @@ const BadPeerScore = gossipThreshold // Scorer defines minimum set of methods every peer scorer must expose. type Scorer interface { Score(pid peer.ID) float64 - IsBadPeer(pid peer.ID) bool + IsBadPeer(pid peer.ID) error BadPeers() []peer.ID } @@ -124,26 +125,29 @@ func (s *Service) ScoreNoLock(pid peer.ID) float64 { } // IsBadPeer traverses all the scorers to see if any of them classifies peer as bad. -func (s *Service) IsBadPeer(pid peer.ID) bool { +func (s *Service) IsBadPeer(pid peer.ID) error { s.store.RLock() defer s.store.RUnlock() return s.IsBadPeerNoLock(pid) } // IsBadPeerNoLock is a lock-free version of IsBadPeer. -func (s *Service) IsBadPeerNoLock(pid peer.ID) bool { - if s.scorers.badResponsesScorer.isBadPeerNoLock(pid) { - return true +func (s *Service) IsBadPeerNoLock(pid peer.ID) error { + if err := s.scorers.badResponsesScorer.isBadPeerNoLock(pid); err != nil { + return errors.Wrap(err, "bad responses scorer") } - if s.scorers.peerStatusScorer.isBadPeerNoLock(pid) { - return true + + if err := s.scorers.peerStatusScorer.isBadPeerNoLock(pid); err != nil { + return errors.Wrap(err, "peer status scorer") } + if features.Get().EnablePeerScorer { - if s.scorers.gossipScorer.isBadPeerNoLock(pid) { - return true + if err := s.scorers.gossipScorer.isBadPeerNoLock(pid); err != nil { + return errors.Wrap(err, "gossip scorer") } } - return false + + return nil } // BadPeers returns the peers that are considered bad by any of registered scorers. @@ -153,7 +157,7 @@ func (s *Service) BadPeers() []peer.ID { badPeers := make([]peer.ID, 0) for pid := range s.store.Peers() { - if s.IsBadPeerNoLock(pid) { + if s.IsBadPeerNoLock(pid) != nil { badPeers = append(badPeers, pid) } } diff --git a/beacon-chain/p2p/peers/scorers/service_test.go b/beacon-chain/p2p/peers/scorers/service_test.go index f7f7aa9c50..2e28838d30 100644 --- a/beacon-chain/p2p/peers/scorers/service_test.go +++ b/beacon-chain/p2p/peers/scorers/service_test.go @@ -100,7 +100,7 @@ func TestScorers_Service_Score(t *testing.T) { return scores } - pack := func(scorer *scorers.Service, s1, s2, s3 float64) map[string]float64 { + pack := func(_ *scorers.Service, s1, s2, s3 float64) map[string]float64 { return map[string]float64{ "peer1": roundScore(s1), "peer2": roundScore(s2), @@ -237,7 +237,7 @@ func TestScorers_Service_loop(t *testing.T) { for i := 0; i < s1.Params().Threshold+5; i++ { s1.Increment(pid1) } - assert.Equal(t, true, s1.IsBadPeer(pid1), "Peer should be marked as bad") + assert.NotNil(t, s1.IsBadPeer(pid1), "Peer should be marked as bad") s2.IncrementProcessedBlocks("peer1", 221) assert.Equal(t, uint64(221), s2.ProcessedBlocks("peer1")) @@ -252,7 +252,7 @@ func TestScorers_Service_loop(t *testing.T) { for { select { case <-ticker.C: - if s1.IsBadPeer(pid1) == false && s2.ProcessedBlocks("peer1") == 0 { + if s1.IsBadPeer(pid1) == nil && s2.ProcessedBlocks("peer1") == 0 { return } case <-ctx.Done(): @@ -263,7 +263,7 @@ func TestScorers_Service_loop(t *testing.T) { }() <-done - assert.Equal(t, false, s1.IsBadPeer(pid1), "Peer should not be marked as bad") + assert.NoError(t, s1.IsBadPeer(pid1), "Peer should not be marked as bad") assert.Equal(t, uint64(0), s2.ProcessedBlocks("peer1"), "No blocks are expected") } @@ -278,10 +278,10 @@ func TestScorers_Service_IsBadPeer(t *testing.T) { }, }) - assert.Equal(t, false, peerStatuses.Scorers().IsBadPeer("peer1")) + assert.NoError(t, peerStatuses.Scorers().IsBadPeer("peer1")) peerStatuses.Scorers().BadResponsesScorer().Increment("peer1") peerStatuses.Scorers().BadResponsesScorer().Increment("peer1") - assert.Equal(t, true, peerStatuses.Scorers().IsBadPeer("peer1")) + assert.NotNil(t, peerStatuses.Scorers().IsBadPeer("peer1")) } func TestScorers_Service_BadPeers(t *testing.T) { @@ -295,16 +295,16 @@ func TestScorers_Service_BadPeers(t *testing.T) { }, }) - assert.Equal(t, false, peerStatuses.Scorers().IsBadPeer("peer1")) - assert.Equal(t, false, peerStatuses.Scorers().IsBadPeer("peer2")) - assert.Equal(t, false, peerStatuses.Scorers().IsBadPeer("peer3")) + assert.NoError(t, peerStatuses.Scorers().IsBadPeer("peer1")) + assert.NoError(t, peerStatuses.Scorers().IsBadPeer("peer2")) + assert.NoError(t, peerStatuses.Scorers().IsBadPeer("peer3")) assert.Equal(t, 0, len(peerStatuses.Scorers().BadPeers())) for _, pid := range []peer.ID{"peer1", "peer3"} { peerStatuses.Scorers().BadResponsesScorer().Increment(pid) peerStatuses.Scorers().BadResponsesScorer().Increment(pid) } - assert.Equal(t, true, peerStatuses.Scorers().IsBadPeer("peer1")) - assert.Equal(t, false, peerStatuses.Scorers().IsBadPeer("peer2")) - assert.Equal(t, true, peerStatuses.Scorers().IsBadPeer("peer3")) + assert.NotNil(t, peerStatuses.Scorers().IsBadPeer("peer1")) + assert.NoError(t, peerStatuses.Scorers().IsBadPeer("peer2")) + assert.NotNil(t, peerStatuses.Scorers().IsBadPeer("peer3")) assert.Equal(t, 2, len(peerStatuses.Scorers().BadPeers())) } diff --git a/beacon-chain/p2p/peers/status.go b/beacon-chain/p2p/peers/status.go index 3dda2df288..6b8c32657e 100644 --- a/beacon-chain/p2p/peers/status.go +++ b/beacon-chain/p2p/peers/status.go @@ -34,6 +34,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" ma "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" + "github.com/pkg/errors" "github.com/prysmaticlabs/go-bitfield" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers/peerdata" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers/scorers" @@ -49,14 +50,14 @@ import ( ) const ( - // PeerDisconnected means there is no connection to the peer. - PeerDisconnected peerdata.PeerConnectionState = iota - // PeerDisconnecting means there is an on-going attempt to disconnect from the peer. - PeerDisconnecting - // PeerConnected means the peer has an active connection. - PeerConnected - // PeerConnecting means there is an on-going attempt to connect to the peer. - PeerConnecting + // Disconnected means there is no connection to the peer. + Disconnected peerdata.ConnectionState = iota + // Disconnecting means there is an on-going attempt to disconnect from the peer. + Disconnecting + // Connected means the peer has an active connection. + Connected + // Connecting means there is an on-going attempt to connect to the peer. + Connecting ) const ( @@ -150,7 +151,7 @@ func (p *Status) Add(record *enr.Record, pid peer.ID, address ma.Multiaddr, dire Address: address, Direction: direction, // Peers start disconnected; state will be updated when the handshake process begins. - ConnState: PeerDisconnected, + ConnState: Disconnected, } if record != nil { peerData.Enr = record @@ -212,7 +213,7 @@ func (p *Status) IsActive(pid peer.ID) bool { defer p.store.RUnlock() peerData, ok := p.store.PeerData(pid) - return ok && (peerData.ConnState == PeerConnected || peerData.ConnState == PeerConnecting) + return ok && (peerData.ConnState == Connected || peerData.ConnState == Connecting) } // IsAboveInboundLimit checks if we are above our current inbound @@ -222,7 +223,7 @@ func (p *Status) IsAboveInboundLimit() bool { defer p.store.RUnlock() totalInbound := 0 for _, peerData := range p.store.Peers() { - if peerData.ConnState == PeerConnected && + if peerData.ConnState == Connected && peerData.Direction == network.DirInbound { totalInbound += 1 } @@ -286,7 +287,7 @@ func (p *Status) SubscribedToSubnet(index uint64) []peer.ID { peers := make([]peer.ID, 0) for pid, peerData := range p.store.Peers() { // look at active peers - connectedStatus := peerData.ConnState == PeerConnecting || peerData.ConnState == PeerConnected + connectedStatus := peerData.ConnState == Connecting || peerData.ConnState == Connected if connectedStatus && peerData.MetaData != nil && !peerData.MetaData.IsNil() && peerData.MetaData.AttnetsBitfield() != nil { indices := indicesFromBitfield(peerData.MetaData.AttnetsBitfield()) for _, idx := range indices { @@ -301,7 +302,7 @@ func (p *Status) SubscribedToSubnet(index uint64) []peer.ID { } // SetConnectionState sets the connection state of the given remote peer. -func (p *Status) SetConnectionState(pid peer.ID, state peerdata.PeerConnectionState) { +func (p *Status) SetConnectionState(pid peer.ID, state peerdata.ConnectionState) { p.store.Lock() defer p.store.Unlock() @@ -311,14 +312,14 @@ func (p *Status) SetConnectionState(pid peer.ID, state peerdata.PeerConnectionSt // ConnectionState gets the connection state of the given remote peer. // This will error if the peer does not exist. -func (p *Status) ConnectionState(pid peer.ID) (peerdata.PeerConnectionState, error) { +func (p *Status) ConnectionState(pid peer.ID) (peerdata.ConnectionState, error) { p.store.RLock() defer p.store.RUnlock() if peerData, ok := p.store.PeerData(pid); ok { return peerData.ConnState, nil } - return PeerDisconnected, peerdata.ErrPeerUnknown + return Disconnected, peerdata.ErrPeerUnknown } // ChainStateLastUpdated gets the last time the chain state of the given remote peer was updated. @@ -335,19 +336,29 @@ func (p *Status) ChainStateLastUpdated(pid peer.ID) (time.Time, error) { // IsBad states if the peer is to be considered bad (by *any* of the registered scorers). // If the peer is unknown this will return `false`, which makes using this function easier than returning an error. -func (p *Status) IsBad(pid peer.ID) bool { +func (p *Status) IsBad(pid peer.ID) error { p.store.RLock() defer p.store.RUnlock() + return p.isBad(pid) } // isBad is the lock-free version of IsBad. -func (p *Status) isBad(pid peer.ID) bool { +func (p *Status) isBad(pid peer.ID) error { // Do not disconnect from trusted peers. if p.store.IsTrustedPeer(pid) { - return false + return nil } - return p.isfromBadIP(pid) || p.scorers.IsBadPeerNoLock(pid) + + if err := p.isfromBadIP(pid); err != nil { + return errors.Wrap(err, "peer is from a bad IP") + } + + if err := p.scorers.IsBadPeerNoLock(pid); err != nil { + return errors.Wrap(err, "is bad peer no lock") + } + + return nil } // NextValidTime gets the earliest possible time it is to contact/dial @@ -411,7 +422,7 @@ func (p *Status) Connecting() []peer.ID { defer p.store.RUnlock() peers := make([]peer.ID, 0) for pid, peerData := range p.store.Peers() { - if peerData.ConnState == PeerConnecting { + if peerData.ConnState == Connecting { peers = append(peers, pid) } } @@ -424,7 +435,7 @@ func (p *Status) Connected() []peer.ID { defer p.store.RUnlock() peers := make([]peer.ID, 0) for pid, peerData := range p.store.Peers() { - if peerData.ConnState == PeerConnected { + if peerData.ConnState == Connected { peers = append(peers, pid) } } @@ -450,7 +461,7 @@ func (p *Status) InboundConnected() []peer.ID { defer p.store.RUnlock() peers := make([]peer.ID, 0) for pid, peerData := range p.store.Peers() { - if peerData.ConnState == PeerConnected && peerData.Direction == network.DirInbound { + if peerData.ConnState == Connected && peerData.Direction == network.DirInbound { peers = append(peers, pid) } } @@ -463,7 +474,7 @@ func (p *Status) InboundConnectedWithProtocol(protocol InternetProtocol) []peer. defer p.store.RUnlock() peers := make([]peer.ID, 0) for pid, peerData := range p.store.Peers() { - if peerData.ConnState == PeerConnected && peerData.Direction == network.DirInbound && strings.Contains(peerData.Address.String(), string(protocol)) { + if peerData.ConnState == Connected && peerData.Direction == network.DirInbound && strings.Contains(peerData.Address.String(), string(protocol)) { peers = append(peers, pid) } } @@ -489,7 +500,7 @@ func (p *Status) OutboundConnected() []peer.ID { defer p.store.RUnlock() peers := make([]peer.ID, 0) for pid, peerData := range p.store.Peers() { - if peerData.ConnState == PeerConnected && peerData.Direction == network.DirOutbound { + if peerData.ConnState == Connected && peerData.Direction == network.DirOutbound { peers = append(peers, pid) } } @@ -502,7 +513,7 @@ func (p *Status) OutboundConnectedWithProtocol(protocol InternetProtocol) []peer defer p.store.RUnlock() peers := make([]peer.ID, 0) for pid, peerData := range p.store.Peers() { - if peerData.ConnState == PeerConnected && peerData.Direction == network.DirOutbound && strings.Contains(peerData.Address.String(), string(protocol)) { + if peerData.ConnState == Connected && peerData.Direction == network.DirOutbound && strings.Contains(peerData.Address.String(), string(protocol)) { peers = append(peers, pid) } } @@ -515,7 +526,7 @@ func (p *Status) Active() []peer.ID { defer p.store.RUnlock() peers := make([]peer.ID, 0) for pid, peerData := range p.store.Peers() { - if peerData.ConnState == PeerConnecting || peerData.ConnState == PeerConnected { + if peerData.ConnState == Connecting || peerData.ConnState == Connected { peers = append(peers, pid) } } @@ -528,7 +539,7 @@ func (p *Status) Disconnecting() []peer.ID { defer p.store.RUnlock() peers := make([]peer.ID, 0) for pid, peerData := range p.store.Peers() { - if peerData.ConnState == PeerDisconnecting { + if peerData.ConnState == Disconnecting { peers = append(peers, pid) } } @@ -541,7 +552,7 @@ func (p *Status) Disconnected() []peer.ID { defer p.store.RUnlock() peers := make([]peer.ID, 0) for pid, peerData := range p.store.Peers() { - if peerData.ConnState == PeerDisconnected { + if peerData.ConnState == Disconnected { peers = append(peers, pid) } } @@ -554,7 +565,7 @@ func (p *Status) Inactive() []peer.ID { defer p.store.RUnlock() peers := make([]peer.ID, 0) for pid, peerData := range p.store.Peers() { - if peerData.ConnState == PeerDisconnecting || peerData.ConnState == PeerDisconnected { + if peerData.ConnState == Disconnecting || peerData.ConnState == Disconnected { peers = append(peers, pid) } } @@ -592,7 +603,7 @@ func (p *Status) Prune() { return } notBadPeer := func(pid peer.ID) bool { - return !p.isBad(pid) + return p.isBad(pid) == nil } notTrustedPeer := func(pid peer.ID) bool { return !p.isTrustedPeers(pid) @@ -605,7 +616,7 @@ func (p *Status) Prune() { // Select disconnected peers with a smaller bad response count. for pid, peerData := range p.store.Peers() { // Should not prune trusted peer or prune the peer dara and unset trusted peer. - if peerData.ConnState == PeerDisconnected && notBadPeer(pid) && notTrustedPeer(pid) { + if peerData.ConnState == Disconnected && notBadPeer(pid) && notTrustedPeer(pid) { peersToPrune = append(peersToPrune, &peerResp{ pid: pid, score: p.Scorers().ScoreNoLock(pid), @@ -657,7 +668,7 @@ func (p *Status) deprecatedPrune() { // Select disconnected peers with a smaller bad response count. for pid, peerData := range p.store.Peers() { // Should not prune trusted peer or prune the peer dara and unset trusted peer. - if peerData.ConnState == PeerDisconnected && notBadPeer(peerData) && notTrustedPeer(pid) { + if peerData.ConnState == Disconnected && notBadPeer(peerData) && notTrustedPeer(pid) { peersToPrune = append(peersToPrune, &peerResp{ pid: pid, badResp: peerData.BadResponses, @@ -814,7 +825,7 @@ func (p *Status) PeersToPrune() []peer.ID { peersToPrune := make([]*peerResp, 0) // Select connected and inbound peers to prune. for pid, peerData := range p.store.Peers() { - if peerData.ConnState == PeerConnected && + if peerData.ConnState == Connected && peerData.Direction == network.DirInbound && !p.store.IsTrustedPeer(pid) { peersToPrune = append(peersToPrune, &peerResp{ pid: pid, @@ -880,7 +891,7 @@ func (p *Status) deprecatedPeersToPrune() []peer.ID { peersToPrune := make([]*peerResp, 0) // Select connected and inbound peers to prune. for pid, peerData := range p.store.Peers() { - if peerData.ConnState == PeerConnected && + if peerData.ConnState == Connected && peerData.Direction == network.DirInbound && !p.store.IsTrustedPeer(pid) { peersToPrune = append(peersToPrune, &peerResp{ pid: pid, @@ -982,24 +993,28 @@ func (p *Status) isTrustedPeers(pid peer.ID) bool { // this method assumes the store lock is acquired before // executing the method. -func (p *Status) isfromBadIP(pid peer.ID) bool { +func (p *Status) isfromBadIP(pid peer.ID) error { peerData, ok := p.store.PeerData(pid) if !ok { - return false + return nil } + if peerData.Address == nil { - return false + return nil } + ip, err := manet.ToIP(peerData.Address) if err != nil { - return true + return errors.Wrap(err, "to ip") } + if val, ok := p.ipTracker[ip.String()]; ok { if val > CollocationLimit { - return true + return errors.Errorf("collocation limit exceeded: got %d - limit %d", val, CollocationLimit) } } - return false + + return nil } func (p *Status) addIpToTracker(pid peer.ID) { diff --git a/beacon-chain/p2p/peers/status_test.go b/beacon-chain/p2p/peers/status_test.go index ae57af71f1..db9b17f569 100644 --- a/beacon-chain/p2p/peers/status_test.go +++ b/beacon-chain/p2p/peers/status_test.go @@ -215,7 +215,7 @@ func TestPeerSubscribedToSubnet(t *testing.T) { // Add some peers with different states numPeers := 2 for i := 0; i < numPeers; i++ { - addPeer(t, p, peers.PeerConnected) + addPeer(t, p, peers.Connected) } expectedPeer := p.All()[1] bitV := bitfield.NewBitvector64() @@ -230,7 +230,7 @@ func TestPeerSubscribedToSubnet(t *testing.T) { })) numPeers = 3 for i := 0; i < numPeers; i++ { - addPeer(t, p, peers.PeerDisconnected) + addPeer(t, p, peers.Disconnected) } ps := p.SubscribedToSubnet(2) assert.Equal(t, 1, len(ps), "Unexpected num of peers") @@ -259,7 +259,7 @@ func TestPeerImplicitAdd(t *testing.T) { id, err := peer.Decode("16Uiu2HAkyWZ4Ni1TpvDS8dPxsozmHY85KaiFjodQuV6Tz5tkHVeR") require.NoError(t, err) - connectionState := peers.PeerConnecting + connectionState := peers.Connecting p.SetConnectionState(id, connectionState) resConnectionState, err := p.ConnectionState(id) @@ -347,7 +347,7 @@ func TestPeerBadResponses(t *testing.T) { require.NoError(t, err) } - assert.Equal(t, false, p.IsBad(id), "Peer marked as bad when should be good") + assert.NoError(t, p.IsBad(id), "Peer marked as bad when should be good") address, err := ma.NewMultiaddr("/ip4/213.202.254.180/tcp/13000") require.NoError(t, err, "Failed to create address") @@ -358,25 +358,25 @@ func TestPeerBadResponses(t *testing.T) { resBadResponses, err := scorer.Count(id) require.NoError(t, err) assert.Equal(t, 0, resBadResponses, "Unexpected bad responses") - assert.Equal(t, false, p.IsBad(id), "Peer marked as bad when should be good") + assert.NoError(t, p.IsBad(id), "Peer marked as bad when should be good") scorer.Increment(id) resBadResponses, err = scorer.Count(id) require.NoError(t, err) assert.Equal(t, 1, resBadResponses, "Unexpected bad responses") - assert.Equal(t, false, p.IsBad(id), "Peer marked as bad when should be good") + assert.NoError(t, p.IsBad(id), "Peer marked as bad when should be good") scorer.Increment(id) resBadResponses, err = scorer.Count(id) require.NoError(t, err) assert.Equal(t, 2, resBadResponses, "Unexpected bad responses") - assert.Equal(t, true, p.IsBad(id), "Peer not marked as bad when it should be") + assert.NotNil(t, p.IsBad(id), "Peer not marked as bad when it should be") scorer.Increment(id) resBadResponses, err = scorer.Count(id) require.NoError(t, err) assert.Equal(t, 3, resBadResponses, "Unexpected bad responses") - assert.Equal(t, true, p.IsBad(id), "Peer not marked as bad when it should be") + assert.NotNil(t, p.IsBad(id), "Peer not marked as bad when it should be") } func TestAddMetaData(t *testing.T) { @@ -393,7 +393,7 @@ func TestAddMetaData(t *testing.T) { // Add some peers with different states numPeers := 5 for i := 0; i < numPeers; i++ { - addPeer(t, p, peers.PeerConnected) + addPeer(t, p, peers.Connected) } newPeer := p.All()[2] @@ -422,19 +422,19 @@ func TestPeerConnectionStatuses(t *testing.T) { // Add some peers with different states numPeersDisconnected := 11 for i := 0; i < numPeersDisconnected; i++ { - addPeer(t, p, peers.PeerDisconnected) + addPeer(t, p, peers.Disconnected) } numPeersConnecting := 7 for i := 0; i < numPeersConnecting; i++ { - addPeer(t, p, peers.PeerConnecting) + addPeer(t, p, peers.Connecting) } numPeersConnected := 43 for i := 0; i < numPeersConnected; i++ { - addPeer(t, p, peers.PeerConnected) + addPeer(t, p, peers.Connected) } numPeersDisconnecting := 4 for i := 0; i < numPeersDisconnecting; i++ { - addPeer(t, p, peers.PeerDisconnecting) + addPeer(t, p, peers.Disconnecting) } // Now confirm the states @@ -463,7 +463,7 @@ func TestPeerValidTime(t *testing.T) { numPeersConnected := 6 for i := 0; i < numPeersConnected; i++ { - addPeer(t, p, peers.PeerConnected) + addPeer(t, p, peers.Connected) } allPeers := p.All() @@ -510,10 +510,10 @@ func TestPrune(t *testing.T) { for i := 0; i < p.MaxPeerLimit()+100; i++ { if i%7 == 0 { // Peer added as disconnected. - _ = addPeer(t, p, peers.PeerDisconnected) + _ = addPeer(t, p, peers.Disconnected) } // Peer added to peer handler. - _ = addPeer(t, p, peers.PeerConnected) + _ = addPeer(t, p, peers.Connected) } disPeers := p.Disconnected() @@ -571,23 +571,23 @@ func TestPeerIPTracker(t *testing.T) { if err != nil { t.Fatal(err) } - badPeers = append(badPeers, createPeer(t, p, addr, network.DirUnknown, peerdata.PeerConnectionState(ethpb.ConnectionState_DISCONNECTED))) + badPeers = append(badPeers, createPeer(t, p, addr, network.DirUnknown, peerdata.ConnectionState(ethpb.ConnectionState_DISCONNECTED))) } for _, pr := range badPeers { - assert.Equal(t, true, p.IsBad(pr), "peer with bad ip is not bad") + assert.NotNil(t, p.IsBad(pr), "peer with bad ip is not bad") } // Add in bad peers, so that our records are trimmed out // from the peer store. for i := 0; i < p.MaxPeerLimit()+100; i++ { // Peer added to peer handler. - pid := addPeer(t, p, peers.PeerDisconnected) + pid := addPeer(t, p, peers.Disconnected) p.Scorers().BadResponsesScorer().Increment(pid) } p.Prune() for _, pr := range badPeers { - assert.Equal(t, false, p.IsBad(pr), "peer with good ip is regarded as bad") + assert.NoError(t, p.IsBad(pr), "peer with good ip is regarded as bad") } } @@ -601,8 +601,11 @@ func TestTrimmedOrderedPeers(t *testing.T) { }, }) - expectedTarget := primitives.Epoch(2) - maxPeers := 3 + const ( + expectedTarget = primitives.Epoch(2) + maxPeers = 3 + ) + var mockroot2 [32]byte var mockroot3 [32]byte var mockroot4 [32]byte @@ -611,36 +614,41 @@ func TestTrimmedOrderedPeers(t *testing.T) { copy(mockroot3[:], "three") copy(mockroot4[:], "four") copy(mockroot5[:], "five") + // Peer 1 - pid1 := addPeer(t, p, peers.PeerConnected) + pid1 := addPeer(t, p, peers.Connected) p.SetChainState(pid1, &pb.Status{ HeadSlot: 3 * params.BeaconConfig().SlotsPerEpoch, FinalizedEpoch: 3, FinalizedRoot: mockroot3[:], }) + // Peer 2 - pid2 := addPeer(t, p, peers.PeerConnected) + pid2 := addPeer(t, p, peers.Connected) p.SetChainState(pid2, &pb.Status{ HeadSlot: 4 * params.BeaconConfig().SlotsPerEpoch, FinalizedEpoch: 4, FinalizedRoot: mockroot4[:], }) + // Peer 3 - pid3 := addPeer(t, p, peers.PeerConnected) + pid3 := addPeer(t, p, peers.Connected) p.SetChainState(pid3, &pb.Status{ HeadSlot: 5 * params.BeaconConfig().SlotsPerEpoch, FinalizedEpoch: 5, FinalizedRoot: mockroot5[:], }) + // Peer 4 - pid4 := addPeer(t, p, peers.PeerConnected) + pid4 := addPeer(t, p, peers.Connected) p.SetChainState(pid4, &pb.Status{ HeadSlot: 2 * params.BeaconConfig().SlotsPerEpoch, FinalizedEpoch: 2, FinalizedRoot: mockroot2[:], }) + // Peer 5 - pid5 := addPeer(t, p, peers.PeerConnected) + pid5 := addPeer(t, p, peers.Connected) p.SetChainState(pid5, &pb.Status{ HeadSlot: 2 * params.BeaconConfig().SlotsPerEpoch, FinalizedEpoch: 2, @@ -680,12 +688,12 @@ func TestAtInboundPeerLimit(t *testing.T) { }) for i := 0; i < 15; i++ { // Peer added to peer handler. - createPeer(t, p, nil, network.DirOutbound, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED)) + createPeer(t, p, nil, network.DirOutbound, peerdata.ConnectionState(ethpb.ConnectionState_CONNECTED)) } assert.Equal(t, false, p.IsAboveInboundLimit(), "Inbound limit exceeded") for i := 0; i < 31; i++ { // Peer added to peer handler. - createPeer(t, p, nil, network.DirInbound, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED)) + createPeer(t, p, nil, network.DirInbound, peerdata.ConnectionState(ethpb.ConnectionState_CONNECTED)) } assert.Equal(t, true, p.IsAboveInboundLimit(), "Inbound limit not exceeded") } @@ -705,7 +713,7 @@ func TestPrunePeers(t *testing.T) { }) for i := 0; i < 15; i++ { // Peer added to peer handler. - createPeer(t, p, nil, network.DirOutbound, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED)) + createPeer(t, p, nil, network.DirOutbound, peerdata.ConnectionState(ethpb.ConnectionState_CONNECTED)) } // Assert there are no prunable peers. peersToPrune := p.PeersToPrune() @@ -713,7 +721,7 @@ func TestPrunePeers(t *testing.T) { for i := 0; i < 18; i++ { // Peer added to peer handler. - createPeer(t, p, nil, network.DirInbound, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED)) + createPeer(t, p, nil, network.DirInbound, peerdata.ConnectionState(ethpb.ConnectionState_CONNECTED)) } // Assert there are the correct prunable peers. @@ -723,7 +731,7 @@ func TestPrunePeers(t *testing.T) { // Add in more peers. for i := 0; i < 13; i++ { // Peer added to peer handler. - createPeer(t, p, nil, network.DirInbound, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED)) + createPeer(t, p, nil, network.DirInbound, peerdata.ConnectionState(ethpb.ConnectionState_CONNECTED)) } // Set up bad scores for inbound peers. @@ -767,7 +775,7 @@ func TestPrunePeers_TrustedPeers(t *testing.T) { for i := 0; i < 15; i++ { // Peer added to peer handler. - createPeer(t, p, nil, network.DirOutbound, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED)) + createPeer(t, p, nil, network.DirOutbound, peerdata.ConnectionState(ethpb.ConnectionState_CONNECTED)) } // Assert there are no prunable peers. peersToPrune := p.PeersToPrune() @@ -775,7 +783,7 @@ func TestPrunePeers_TrustedPeers(t *testing.T) { for i := 0; i < 18; i++ { // Peer added to peer handler. - createPeer(t, p, nil, network.DirInbound, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED)) + createPeer(t, p, nil, network.DirInbound, peerdata.ConnectionState(ethpb.ConnectionState_CONNECTED)) } // Assert there are the correct prunable peers. @@ -785,7 +793,7 @@ func TestPrunePeers_TrustedPeers(t *testing.T) { // Add in more peers. for i := 0; i < 13; i++ { // Peer added to peer handler. - createPeer(t, p, nil, network.DirInbound, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED)) + createPeer(t, p, nil, network.DirInbound, peerdata.ConnectionState(ethpb.ConnectionState_CONNECTED)) } var trustedPeers []peer.ID @@ -821,7 +829,7 @@ func TestPrunePeers_TrustedPeers(t *testing.T) { // Add more peers to check if trusted peers can be pruned after they are deleted from trusted peer set. for i := 0; i < 9; i++ { // Peer added to peer handler. - createPeer(t, p, nil, network.DirInbound, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED)) + createPeer(t, p, nil, network.DirInbound, peerdata.ConnectionState(ethpb.ConnectionState_CONNECTED)) } // Delete trusted peers. @@ -865,14 +873,14 @@ func TestStatus_BestPeer(t *testing.T) { headSlot primitives.Slot finalizedEpoch primitives.Epoch } + tests := []struct { - name string - peers []*peerConfig - limitPeers int - ourFinalizedEpoch primitives.Epoch - targetEpoch primitives.Epoch - // targetEpochSupport denotes how many peers support returned epoch. - targetEpochSupport int + name string + peers []*peerConfig + limitPeers int + ourFinalizedEpoch primitives.Epoch + targetEpoch primitives.Epoch + targetEpochSupport int // Denotes how many peers support returned epoch. }{ { name: "head slot matches finalized epoch", @@ -885,6 +893,7 @@ func TestStatus_BestPeer(t *testing.T) { {finalizedEpoch: 3, headSlot: 3 * params.BeaconConfig().SlotsPerEpoch}, }, limitPeers: 15, + ourFinalizedEpoch: 0, targetEpoch: 4, targetEpochSupport: 4, }, @@ -902,6 +911,7 @@ func TestStatus_BestPeer(t *testing.T) { {finalizedEpoch: 3, headSlot: 4 * params.BeaconConfig().SlotsPerEpoch}, }, limitPeers: 15, + ourFinalizedEpoch: 0, targetEpoch: 4, targetEpochSupport: 4, }, @@ -916,6 +926,7 @@ func TestStatus_BestPeer(t *testing.T) { {finalizedEpoch: 3, headSlot: 42 * params.BeaconConfig().SlotsPerEpoch}, }, limitPeers: 15, + ourFinalizedEpoch: 0, targetEpoch: 4, targetEpochSupport: 4, }, @@ -930,8 +941,8 @@ func TestStatus_BestPeer(t *testing.T) { {finalizedEpoch: 3, headSlot: 46 * params.BeaconConfig().SlotsPerEpoch}, {finalizedEpoch: 6, headSlot: 6 * params.BeaconConfig().SlotsPerEpoch}, }, - ourFinalizedEpoch: 5, limitPeers: 15, + ourFinalizedEpoch: 5, targetEpoch: 6, targetEpochSupport: 1, }, @@ -950,8 +961,8 @@ func TestStatus_BestPeer(t *testing.T) { {finalizedEpoch: 7, headSlot: 7 * params.BeaconConfig().SlotsPerEpoch}, {finalizedEpoch: 8, headSlot: 8 * params.BeaconConfig().SlotsPerEpoch}, }, - ourFinalizedEpoch: 5, limitPeers: 15, + ourFinalizedEpoch: 5, targetEpoch: 6, targetEpochSupport: 5, }, @@ -970,8 +981,8 @@ func TestStatus_BestPeer(t *testing.T) { {finalizedEpoch: 7, headSlot: 7 * params.BeaconConfig().SlotsPerEpoch}, {finalizedEpoch: 8, headSlot: 8 * params.BeaconConfig().SlotsPerEpoch}, }, - ourFinalizedEpoch: 5, limitPeers: 4, + ourFinalizedEpoch: 5, targetEpoch: 6, targetEpochSupport: 4, }, @@ -986,8 +997,8 @@ func TestStatus_BestPeer(t *testing.T) { {finalizedEpoch: 8, headSlot: 8 * params.BeaconConfig().SlotsPerEpoch}, {finalizedEpoch: 8, headSlot: 8 * params.BeaconConfig().SlotsPerEpoch}, }, - ourFinalizedEpoch: 5, limitPeers: 15, + ourFinalizedEpoch: 5, targetEpoch: 8, targetEpochSupport: 3, }, @@ -1002,7 +1013,7 @@ func TestStatus_BestPeer(t *testing.T) { }, }) for _, peerConfig := range tt.peers { - p.SetChainState(addPeer(t, p, peers.PeerConnected), &pb.Status{ + p.SetChainState(addPeer(t, p, peers.Connected), &pb.Status{ FinalizedEpoch: peerConfig.finalizedEpoch, HeadSlot: peerConfig.headSlot, }) @@ -1028,7 +1039,7 @@ func TestBestFinalized_returnsMaxValue(t *testing.T) { for i := 0; i <= maxPeers+100; i++ { p.Add(new(enr.Record), peer.ID(rune(i)), nil, network.DirOutbound) - p.SetConnectionState(peer.ID(rune(i)), peers.PeerConnected) + p.SetConnectionState(peer.ID(rune(i)), peers.Connected) p.SetChainState(peer.ID(rune(i)), &pb.Status{ FinalizedEpoch: 10, }) @@ -1051,7 +1062,7 @@ func TestStatus_BestNonFinalized(t *testing.T) { peerSlots := []primitives.Slot{32, 32, 32, 32, 235, 233, 258, 268, 270} for i, headSlot := range peerSlots { p.Add(new(enr.Record), peer.ID(rune(i)), nil, network.DirOutbound) - p.SetConnectionState(peer.ID(rune(i)), peers.PeerConnected) + p.SetConnectionState(peer.ID(rune(i)), peers.Connected) p.SetChainState(peer.ID(rune(i)), &pb.Status{ HeadSlot: headSlot, }) @@ -1074,17 +1085,17 @@ func TestStatus_CurrentEpoch(t *testing.T) { }, }) // Peer 1 - pid1 := addPeer(t, p, peers.PeerConnected) + pid1 := addPeer(t, p, peers.Connected) p.SetChainState(pid1, &pb.Status{ HeadSlot: params.BeaconConfig().SlotsPerEpoch * 4, }) // Peer 2 - pid2 := addPeer(t, p, peers.PeerConnected) + pid2 := addPeer(t, p, peers.Connected) p.SetChainState(pid2, &pb.Status{ HeadSlot: params.BeaconConfig().SlotsPerEpoch * 5, }) // Peer 3 - pid3 := addPeer(t, p, peers.PeerConnected) + pid3 := addPeer(t, p, peers.Connected) p.SetChainState(pid3, &pb.Status{ HeadSlot: params.BeaconConfig().SlotsPerEpoch * 4, }) @@ -1103,8 +1114,8 @@ func TestInbound(t *testing.T) { }) addr, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/33333") require.NoError(t, err) - inbound := createPeer(t, p, addr, network.DirInbound, peers.PeerConnected) - createPeer(t, p, addr, network.DirOutbound, peers.PeerConnected) + inbound := createPeer(t, p, addr, network.DirInbound, peers.Connected) + createPeer(t, p, addr, network.DirOutbound, peers.Connected) result := p.Inbound() require.Equal(t, 1, len(result)) @@ -1123,8 +1134,8 @@ func TestInboundConnected(t *testing.T) { addr, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/33333") require.NoError(t, err) - inbound := createPeer(t, p, addr, network.DirInbound, peers.PeerConnected) - createPeer(t, p, addr, network.DirInbound, peers.PeerConnecting) + inbound := createPeer(t, p, addr, network.DirInbound, peers.Connected) + createPeer(t, p, addr, network.DirInbound, peers.Connecting) result := p.InboundConnected() require.Equal(t, 1, len(result)) @@ -1157,7 +1168,7 @@ func TestInboundConnectedWithProtocol(t *testing.T) { multiaddr, err := ma.NewMultiaddr(addr) require.NoError(t, err) - peer := createPeer(t, p, multiaddr, network.DirInbound, peers.PeerConnected) + peer := createPeer(t, p, multiaddr, network.DirInbound, peers.Connected) expectedTCP[peer.String()] = true } @@ -1166,7 +1177,7 @@ func TestInboundConnectedWithProtocol(t *testing.T) { multiaddr, err := ma.NewMultiaddr(addr) require.NoError(t, err) - peer := createPeer(t, p, multiaddr, network.DirInbound, peers.PeerConnected) + peer := createPeer(t, p, multiaddr, network.DirInbound, peers.Connected) expectedQUIC[peer.String()] = true } @@ -1203,8 +1214,8 @@ func TestOutbound(t *testing.T) { }) addr, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/33333") require.NoError(t, err) - createPeer(t, p, addr, network.DirInbound, peers.PeerConnected) - outbound := createPeer(t, p, addr, network.DirOutbound, peers.PeerConnected) + createPeer(t, p, addr, network.DirInbound, peers.Connected) + outbound := createPeer(t, p, addr, network.DirOutbound, peers.Connected) result := p.Outbound() require.Equal(t, 1, len(result)) @@ -1223,8 +1234,8 @@ func TestOutboundConnected(t *testing.T) { addr, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/33333") require.NoError(t, err) - inbound := createPeer(t, p, addr, network.DirOutbound, peers.PeerConnected) - createPeer(t, p, addr, network.DirOutbound, peers.PeerConnecting) + inbound := createPeer(t, p, addr, network.DirOutbound, peers.Connected) + createPeer(t, p, addr, network.DirOutbound, peers.Connecting) result := p.OutboundConnected() require.Equal(t, 1, len(result)) @@ -1257,7 +1268,7 @@ func TestOutboundConnectedWithProtocol(t *testing.T) { multiaddr, err := ma.NewMultiaddr(addr) require.NoError(t, err) - peer := createPeer(t, p, multiaddr, network.DirOutbound, peers.PeerConnected) + peer := createPeer(t, p, multiaddr, network.DirOutbound, peers.Connected) expectedTCP[peer.String()] = true } @@ -1266,7 +1277,7 @@ func TestOutboundConnectedWithProtocol(t *testing.T) { multiaddr, err := ma.NewMultiaddr(addr) require.NoError(t, err) - peer := createPeer(t, p, multiaddr, network.DirOutbound, peers.PeerConnected) + peer := createPeer(t, p, multiaddr, network.DirOutbound, peers.Connected) expectedQUIC[peer.String()] = true } @@ -1293,7 +1304,7 @@ func TestOutboundConnectedWithProtocol(t *testing.T) { } // addPeer is a helper to add a peer with a given connection state) -func addPeer(t *testing.T, p *peers.Status, state peerdata.PeerConnectionState) peer.ID { +func addPeer(t *testing.T, p *peers.Status, state peerdata.ConnectionState) peer.ID { // Set up some peers with different states mhBytes := []byte{0x11, 0x04} idBytes := make([]byte, 4) @@ -1312,7 +1323,7 @@ func addPeer(t *testing.T, p *peers.Status, state peerdata.PeerConnectionState) } func createPeer(t *testing.T, p *peers.Status, addr ma.Multiaddr, - dir network.Direction, state peerdata.PeerConnectionState) peer.ID { + dir network.Direction, state peerdata.ConnectionState) peer.ID { mhBytes := []byte{0x11, 0x04} idBytes := make([]byte, 4) _, err := rand.Read(idBytes) diff --git a/beacon-chain/p2p/service.go b/beacon-chain/p2p/service.go index f0784a0a34..2cdac68a3e 100644 --- a/beacon-chain/p2p/service.go +++ b/beacon-chain/p2p/service.go @@ -202,12 +202,13 @@ func (s *Service) Start() { s.startupErr = err return } - err = s.connectToBootnodes() - if err != nil { - log.WithError(err).Error("Could not add bootnode to the exclusion list") + + if err := s.connectToBootnodes(); err != nil { + log.WithError(err).Error("Could not connect to boot nodes") s.startupErr = err return } + s.dv5Listener = listener go s.listenForNewNodes() } @@ -384,12 +385,17 @@ func (s *Service) AddPingMethod(reqFunc func(ctx context.Context, id peer.ID) er s.pingMethodLock.Unlock() } -func (s *Service) pingPeers() { +func (s *Service) pingPeersAndLogEnr() { s.pingMethodLock.RLock() defer s.pingMethodLock.RUnlock() + + localENR := s.dv5Listener.Self() + log.WithField("ENR", localENR).Info("New node record") + if s.pingMethod == nil { return } + for _, pid := range s.peers.Connected() { go func(id peer.ID) { if err := s.pingMethod(s.ctx, id); err != nil { @@ -462,8 +468,8 @@ func (s *Service) connectWithPeer(ctx context.Context, info peer.AddrInfo) error if info.ID == s.host.ID() { return nil } - if s.Peers().IsBad(info.ID) { - return errors.New("refused to connect to bad peer") + if err := s.Peers().IsBad(info.ID); err != nil { + return errors.Wrap(err, "refused to connect to bad peer") } ctx, cancel := context.WithTimeout(ctx, maxDialTimeout) defer cancel() diff --git a/beacon-chain/p2p/testing/mock_peermanager.go b/beacon-chain/p2p/testing/mock_peermanager.go index ffe7d04808..67ad98ac7a 100644 --- a/beacon-chain/p2p/testing/mock_peermanager.go +++ b/beacon-chain/p2p/testing/mock_peermanager.go @@ -20,7 +20,7 @@ type MockPeerManager struct { } // Disconnect . -func (_ *MockPeerManager) Disconnect(peer.ID) error { +func (*MockPeerManager) Disconnect(peer.ID) error { return nil } @@ -35,12 +35,12 @@ func (m *MockPeerManager) Host() host.Host { } // ENR . -func (m MockPeerManager) ENR() *enr.Record { +func (m *MockPeerManager) ENR() *enr.Record { return m.Enr } // DiscoveryAddresses . -func (m MockPeerManager) DiscoveryAddresses() ([]multiaddr.Multiaddr, error) { +func (m *MockPeerManager) DiscoveryAddresses() ([]multiaddr.Multiaddr, error) { if m.FailDiscoveryAddr { return nil, errors.New("fail") } @@ -48,12 +48,12 @@ func (m MockPeerManager) DiscoveryAddresses() ([]multiaddr.Multiaddr, error) { } // RefreshENR . -func (_ MockPeerManager) RefreshENR() {} +func (*MockPeerManager) RefreshENR() {} // FindPeersWithSubnet . -func (_ MockPeerManager) FindPeersWithSubnet(_ context.Context, _ string, _ uint64, _ int) (bool, error) { +func (*MockPeerManager) FindPeersWithSubnet(_ context.Context, _ string, _ uint64, _ int) (bool, error) { return true, nil } // AddPingMethod . -func (_ MockPeerManager) AddPingMethod(_ func(ctx context.Context, id peer.ID) error) {} +func (*MockPeerManager) AddPingMethod(_ func(ctx context.Context, id peer.ID) error) {} diff --git a/beacon-chain/p2p/testing/mock_peersprovider.go b/beacon-chain/p2p/testing/mock_peersprovider.go index 1e2eca27bb..db36deab8b 100644 --- a/beacon-chain/p2p/testing/mock_peersprovider.go +++ b/beacon-chain/p2p/testing/mock_peersprovider.go @@ -64,7 +64,7 @@ func (m *MockPeersProvider) Peers() *peers.Status { log.WithError(err).Debug("Cannot decode") } m.peers.Add(createENR(), id0, ma0, network.DirInbound) - m.peers.SetConnectionState(id0, peers.PeerConnected) + m.peers.SetConnectionState(id0, peers.Connected) m.peers.SetChainState(id0, &pb.Status{FinalizedEpoch: 10}) id1, err := peer.Decode(MockRawPeerId1) if err != nil { @@ -75,7 +75,7 @@ func (m *MockPeersProvider) Peers() *peers.Status { log.WithError(err).Debug("Cannot decode") } m.peers.Add(createENR(), id1, ma1, network.DirOutbound) - m.peers.SetConnectionState(id1, peers.PeerConnected) + m.peers.SetConnectionState(id1, peers.Connected) m.peers.SetChainState(id1, &pb.Status{FinalizedEpoch: 11}) } return m.peers diff --git a/beacon-chain/p2p/testing/p2p.go b/beacon-chain/p2p/testing/p2p.go index cf31efb843..1a44200427 100644 --- a/beacon-chain/p2p/testing/p2p.go +++ b/beacon-chain/p2p/testing/p2p.go @@ -239,7 +239,7 @@ func (p *TestP2P) LeaveTopic(topic string) error { } // Encoding returns ssz encoding. -func (_ *TestP2P) Encoding() encoder.NetworkEncoding { +func (*TestP2P) Encoding() encoder.NetworkEncoding { return &encoder.SszNetworkEncoder{} } @@ -266,12 +266,12 @@ func (p *TestP2P) Host() host.Host { } // ENR returns the enr of the local peer. -func (_ *TestP2P) ENR() *enr.Record { +func (*TestP2P) ENR() *enr.Record { return new(enr.Record) } // DiscoveryAddresses -- -func (_ *TestP2P) DiscoveryAddresses() ([]multiaddr.Multiaddr, error) { +func (*TestP2P) DiscoveryAddresses() ([]multiaddr.Multiaddr, error) { return nil, nil } @@ -284,16 +284,16 @@ func (p *TestP2P) AddConnectionHandler(f, _ func(ctx context.Context, id peer.ID p.peers.Add(new(enr.Record), conn.RemotePeer(), conn.RemoteMultiaddr(), conn.Stat().Direction) ctx := context.Background() - p.peers.SetConnectionState(conn.RemotePeer(), peers.PeerConnecting) + p.peers.SetConnectionState(conn.RemotePeer(), peers.Connecting) if err := f(ctx, conn.RemotePeer()); err != nil { logrus.WithError(err).Error("Could not send successful hello rpc request") if err := p.Disconnect(conn.RemotePeer()); err != nil { logrus.WithError(err).Errorf("Unable to close peer %s", conn.RemotePeer()) } - p.peers.SetConnectionState(conn.RemotePeer(), peers.PeerDisconnected) + p.peers.SetConnectionState(conn.RemotePeer(), peers.Disconnected) return } - p.peers.SetConnectionState(conn.RemotePeer(), peers.PeerConnected) + p.peers.SetConnectionState(conn.RemotePeer(), peers.Connected) }() }, }) @@ -305,11 +305,11 @@ func (p *TestP2P) AddDisconnectionHandler(f func(ctx context.Context, id peer.ID DisconnectedF: func(net network.Network, conn network.Conn) { // Must be handled in a goroutine as this callback cannot be blocking. go func() { - p.peers.SetConnectionState(conn.RemotePeer(), peers.PeerDisconnecting) + p.peers.SetConnectionState(conn.RemotePeer(), peers.Disconnecting) if err := f(context.Background(), conn.RemotePeer()); err != nil { logrus.WithError(err).Debug("Unable to invoke callback") } - p.peers.SetConnectionState(conn.RemotePeer(), peers.PeerDisconnected) + p.peers.SetConnectionState(conn.RemotePeer(), peers.Disconnected) }() }, }) @@ -353,7 +353,7 @@ func (p *TestP2P) Send(ctx context.Context, msg interface{}, topic string, pid p } // Started always returns true. -func (_ *TestP2P) Started() bool { +func (*TestP2P) Started() bool { return true } @@ -363,12 +363,12 @@ func (p *TestP2P) Peers() *peers.Status { } // FindPeersWithSubnet mocks the p2p func. -func (_ *TestP2P) FindPeersWithSubnet(_ context.Context, _ string, _ uint64, _ int) (bool, error) { +func (*TestP2P) FindPeersWithSubnet(_ context.Context, _ string, _ uint64, _ int) (bool, error) { return false, nil } // RefreshENR mocks the p2p func. -func (_ *TestP2P) RefreshENR() {} +func (*TestP2P) RefreshENR() {} // ForkDigest mocks the p2p func. func (p *TestP2P) ForkDigest() ([4]byte, error) { @@ -386,31 +386,31 @@ func (p *TestP2P) MetadataSeq() uint64 { } // AddPingMethod mocks the p2p func. -func (_ *TestP2P) AddPingMethod(_ func(ctx context.Context, id peer.ID) error) { +func (*TestP2P) AddPingMethod(_ func(ctx context.Context, id peer.ID) error) { // no-op } // InterceptPeerDial . -func (_ *TestP2P) InterceptPeerDial(peer.ID) (allow bool) { +func (*TestP2P) InterceptPeerDial(peer.ID) (allow bool) { return true } // InterceptAddrDial . -func (_ *TestP2P) InterceptAddrDial(peer.ID, multiaddr.Multiaddr) (allow bool) { +func (*TestP2P) InterceptAddrDial(peer.ID, multiaddr.Multiaddr) (allow bool) { return true } // InterceptAccept . -func (_ *TestP2P) InterceptAccept(_ network.ConnMultiaddrs) (allow bool) { +func (*TestP2P) InterceptAccept(_ network.ConnMultiaddrs) (allow bool) { return true } // InterceptSecured . -func (_ *TestP2P) InterceptSecured(network.Direction, peer.ID, network.ConnMultiaddrs) (allow bool) { +func (*TestP2P) InterceptSecured(network.Direction, peer.ID, network.ConnMultiaddrs) (allow bool) { return true } // InterceptUpgraded . -func (_ *TestP2P) InterceptUpgraded(network.Conn) (allow bool, reason control.DisconnectReason) { +func (*TestP2P) InterceptUpgraded(network.Conn) (allow bool, reason control.DisconnectReason) { return true, 0 } diff --git a/beacon-chain/rpc/eth/node/handlers_peers_test.go b/beacon-chain/rpc/eth/node/handlers_peers_test.go index 8aa04c0166..859d7efef2 100644 --- a/beacon-chain/rpc/eth/node/handlers_peers_test.go +++ b/beacon-chain/rpc/eth/node/handlers_peers_test.go @@ -117,13 +117,13 @@ func TestGetPeers(t *testing.T) { switch i { case 0, 1: - peerStatus.SetConnectionState(id, peers.PeerConnecting) + peerStatus.SetConnectionState(id, peers.Connecting) case 2, 3: - peerStatus.SetConnectionState(id, peers.PeerConnected) + peerStatus.SetConnectionState(id, peers.Connected) case 4, 5: - peerStatus.SetConnectionState(id, peers.PeerDisconnecting) + peerStatus.SetConnectionState(id, peers.Disconnecting) case 6, 7: - peerStatus.SetConnectionState(id, peers.PeerDisconnected) + peerStatus.SetConnectionState(id, peers.Disconnected) default: t.Fatalf("Failed to set connection state for peer") } @@ -289,13 +289,13 @@ func TestGetPeerCount(t *testing.T) { switch i { case 0: - peerStatus.SetConnectionState(id, peers.PeerConnecting) + peerStatus.SetConnectionState(id, peers.Connecting) case 1, 2: - peerStatus.SetConnectionState(id, peers.PeerConnected) + peerStatus.SetConnectionState(id, peers.Connected) case 3, 4, 5: - peerStatus.SetConnectionState(id, peers.PeerDisconnecting) + peerStatus.SetConnectionState(id, peers.Disconnecting) case 6, 7, 8, 9: - peerStatus.SetConnectionState(id, peers.PeerDisconnected) + peerStatus.SetConnectionState(id, peers.Disconnected) default: t.Fatalf("Failed to set connection state for peer") } diff --git a/beacon-chain/rpc/prysm/node/handlers_test.go b/beacon-chain/rpc/prysm/node/handlers_test.go index 00532e9b77..8b07eb4d05 100644 --- a/beacon-chain/rpc/prysm/node/handlers_test.go +++ b/beacon-chain/rpc/prysm/node/handlers_test.go @@ -25,8 +25,8 @@ import ( type testIdentity enode.ID -func (_ testIdentity) Verify(_ *enr.Record, _ []byte) error { return nil } -func (id testIdentity) NodeAddr(_ *enr.Record) []byte { return id[:] } +func (testIdentity) Verify(*enr.Record, []byte) error { return nil } +func (id testIdentity) NodeAddr(*enr.Record) []byte { return id[:] } func TestListTrustedPeer(t *testing.T) { ids := libp2ptest.GeneratePeerIDs(9) @@ -62,13 +62,13 @@ func TestListTrustedPeer(t *testing.T) { switch i { case 0, 1: - peerStatus.SetConnectionState(id, peers.PeerConnecting) + peerStatus.SetConnectionState(id, peers.Connecting) case 2, 3: - peerStatus.SetConnectionState(id, peers.PeerConnected) + peerStatus.SetConnectionState(id, peers.Connected) case 4, 5: - peerStatus.SetConnectionState(id, peers.PeerDisconnecting) + peerStatus.SetConnectionState(id, peers.Disconnecting) case 6, 7: - peerStatus.SetConnectionState(id, peers.PeerDisconnected) + peerStatus.SetConnectionState(id, peers.Disconnected) default: t.Fatalf("Failed to set connection state for peer") } diff --git a/beacon-chain/sync/initial-sync/blocks_queue_test.go b/beacon-chain/sync/initial-sync/blocks_queue_test.go index 8a2ba65f51..0d96cef82a 100644 --- a/beacon-chain/sync/initial-sync/blocks_queue_test.go +++ b/beacon-chain/sync/initial-sync/blocks_queue_test.go @@ -1094,7 +1094,7 @@ func TestBlocksQueue_stuckInUnfavourableFork(t *testing.T) { // its claims with actual blocks. emptyPeer := connectPeerHavingBlocks(t, p2p, chain1, finalizedSlot, p2p.Peers()) defer func() { - p2p.Peers().SetConnectionState(emptyPeer, peers.PeerDisconnected) + p2p.Peers().SetConnectionState(emptyPeer, peers.Disconnected) }() chainState, err := p2p.Peers().ChainState(emptyPeer) require.NoError(t, err) @@ -1291,7 +1291,7 @@ func TestBlocksQueue_stuckWhenHeadIsSetToOrphanedBlock(t *testing.T) { // Connect peer that has all the blocks available. allBlocksPeer := connectPeerHavingBlocks(t, p2p, chain, finalizedSlot, p2p.Peers()) defer func() { - p2p.Peers().SetConnectionState(allBlocksPeer, peers.PeerDisconnected) + p2p.Peers().SetConnectionState(allBlocksPeer, peers.Disconnected) }() // Queue should be able to fetch whole chain (including slot which comes before the currently set head). diff --git a/beacon-chain/sync/initial-sync/initial_sync_test.go b/beacon-chain/sync/initial-sync/initial_sync_test.go index 306370f279..7a038fd682 100644 --- a/beacon-chain/sync/initial-sync/initial_sync_test.go +++ b/beacon-chain/sync/initial-sync/initial_sync_test.go @@ -227,7 +227,7 @@ func connectPeer(t *testing.T, host *p2pt.TestP2P, datum *peerData, peerStatus * p.Connect(host) peerStatus.Add(new(enr.Record), p.PeerID(), nil, network.DirOutbound) - peerStatus.SetConnectionState(p.PeerID(), peers.PeerConnected) + peerStatus.SetConnectionState(p.PeerID(), peers.Connected) peerStatus.SetChainState(p.PeerID(), ðpb.Status{ ForkDigest: params.BeaconConfig().GenesisForkVersion, FinalizedRoot: []byte(fmt.Sprintf("finalized_root %d", datum.finalizedEpoch)), @@ -326,7 +326,7 @@ func connectPeerHavingBlocks( require.NoError(t, err) peerStatus.Add(new(enr.Record), p.PeerID(), nil, network.DirOutbound) - peerStatus.SetConnectionState(p.PeerID(), peers.PeerConnected) + peerStatus.SetConnectionState(p.PeerID(), peers.Connected) peerStatus.SetChainState(p.PeerID(), ðpb.Status{ ForkDigest: params.BeaconConfig().GenesisForkVersion, FinalizedRoot: []byte(fmt.Sprintf("finalized_root %d", finalizedEpoch)), diff --git a/beacon-chain/sync/pending_attestations_queue_test.go b/beacon-chain/sync/pending_attestations_queue_test.go index 6a7e7077a1..cccdefa1fb 100644 --- a/beacon-chain/sync/pending_attestations_queue_test.go +++ b/beacon-chain/sync/pending_attestations_queue_test.go @@ -40,7 +40,7 @@ func TestProcessPendingAtts_NoBlockRequestBlock(t *testing.T) { p1.Connect(p2) assert.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected") p1.Peers().Add(new(enr.Record), p2.PeerID(), nil, network.DirOutbound) - p1.Peers().SetConnectionState(p2.PeerID(), peers.PeerConnected) + p1.Peers().SetConnectionState(p2.PeerID(), peers.Connected) p1.Peers().SetChainState(p2.PeerID(), ðpb.Status{}) chain := &mock.ChainService{Genesis: prysmTime.Now(), FinalizedCheckPoint: ðpb.Checkpoint{}} diff --git a/beacon-chain/sync/pending_blocks_queue_test.go b/beacon-chain/sync/pending_blocks_queue_test.go index 3e367b2146..6eef5bcac8 100644 --- a/beacon-chain/sync/pending_blocks_queue_test.go +++ b/beacon-chain/sync/pending_blocks_queue_test.go @@ -406,7 +406,7 @@ func TestRegularSyncBeaconBlockSubscriber_ProcessPendingBlocks_2Chains(t *testin r.initCaches() p1.Peers().Add(new(enr.Record), p2.PeerID(), nil, network.DirOutbound) - p1.Peers().SetConnectionState(p2.PeerID(), peers.PeerConnected) + p1.Peers().SetConnectionState(p2.PeerID(), peers.Connected) p1.Peers().SetChainState(p2.PeerID(), ðpb.Status{}) b0 := util.NewBeaconBlock() @@ -505,7 +505,7 @@ func TestRegularSyncBeaconBlockSubscriber_PruneOldPendingBlocks(t *testing.T) { r.initCaches() p1.Peers().Add(new(enr.Record), p1.PeerID(), nil, network.DirOutbound) - p1.Peers().SetConnectionState(p1.PeerID(), peers.PeerConnected) + p1.Peers().SetConnectionState(p1.PeerID(), peers.Connected) p1.Peers().SetChainState(p1.PeerID(), ðpb.Status{}) b0 := util.NewBeaconBlock() @@ -611,7 +611,7 @@ func TestService_BatchRootRequest(t *testing.T) { r.initCaches() p1.Peers().Add(new(enr.Record), p2.PeerID(), nil, network.DirOutbound) - p1.Peers().SetConnectionState(p2.PeerID(), peers.PeerConnected) + p1.Peers().SetConnectionState(p2.PeerID(), peers.Connected) p1.Peers().SetChainState(p2.PeerID(), ðpb.Status{FinalizedEpoch: 2}) b0 := util.NewBeaconBlock() diff --git a/beacon-chain/sync/rate_limiter.go b/beacon-chain/sync/rate_limiter.go index 636fe54c68..5d088f5002 100644 --- a/beacon-chain/sync/rate_limiter.go +++ b/beacon-chain/sync/rate_limiter.go @@ -7,12 +7,13 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "github.com/trailofbits/go-mutexasserts" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p" p2ptypes "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types" "github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags" leakybucket "github.com/prysmaticlabs/prysm/v5/container/leaky-bucket" - "github.com/sirupsen/logrus" - "github.com/trailofbits/go-mutexasserts" ) const defaultBurstLimit = 5 @@ -98,19 +99,20 @@ func (l *limiter) validateRequest(stream network.Stream, amt uint64) error { defer l.RUnlock() topic := string(stream.Protocol()) + remotePeer := stream.Conn().RemotePeer() collector, err := l.retrieveCollector(topic) if err != nil { return err } - key := stream.Conn().RemotePeer().String() - remaining := collector.Remaining(key) + + remaining := collector.Remaining(remotePeer.String()) // Treat each request as a minimum of 1. if amt == 0 { amt = 1 } if amt > uint64(remaining) { - l.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer()) + l.p2p.Peers().Scorers().BadResponsesScorer().Increment(remotePeer) writeErrorResponseToStream(responseCodeInvalidRequest, p2ptypes.ErrRateLimited.Error(), stream, l.p2p) return p2ptypes.ErrRateLimited } diff --git a/beacon-chain/sync/rate_limiter_test.go b/beacon-chain/sync/rate_limiter_test.go index a45d769bb6..6535811031 100644 --- a/beacon-chain/sync/rate_limiter_test.go +++ b/beacon-chain/sync/rate_limiter_test.go @@ -97,7 +97,7 @@ func TestRateLimiter_ExceedRawCapacity(t *testing.T) { for i := 0; i < defaultBurstLimit; i++ { assert.ErrorContains(t, p2ptypes.ErrRateLimited.Error(), rlimiter.validateRawRpcRequest(stream)) } - assert.Equal(t, true, p1.Peers().IsBad(p2.PeerID()), "peer is not marked as a bad peer") + assert.NotNil(t, p1.Peers().IsBad(p2.PeerID()), "peer is not marked as a bad peer") require.NoError(t, stream.Close(), "could not close stream") if util.WaitTimeout(&wg, 1*time.Second) { diff --git a/beacon-chain/sync/rpc.go b/beacon-chain/sync/rpc.go index 1c067036f5..34d0c661ac 100644 --- a/beacon-chain/sync/rpc.go +++ b/beacon-chain/sync/rpc.go @@ -51,6 +51,7 @@ func (s *Service) registerRPCHandlers() { s.pingHandler, ) s.registerRPCHandlersAltair() + if currEpoch >= params.BeaconConfig().DenebForkEpoch { s.registerRPCHandlersDeneb() } @@ -138,6 +139,9 @@ func (s *Service) registerRPC(baseTopic string, handle rpcHandler) { ctx, cancel := context.WithTimeout(s.ctx, ttfbTimeout) defer cancel() + conn := stream.Conn() + remotePeer := conn.RemotePeer() + // Resetting after closing is a no-op so defer a reset in case something goes wrong. // It's up to the handler to Close the stream (send an EOF) if // it successfully writes a response. We don't blindly call @@ -157,12 +161,12 @@ func (s *Service) registerRPC(baseTopic string, handle rpcHandler) { ctx, span := trace.StartSpan(ctx, "sync.rpc") defer span.End() span.SetAttributes(trace.StringAttribute("topic", topic)) - span.SetAttributes(trace.StringAttribute("peer", stream.Conn().RemotePeer().String())) + span.SetAttributes(trace.StringAttribute("peer", remotePeer.String())) log := log.WithField("peer", stream.Conn().RemotePeer().String()).WithField("topic", string(stream.Protocol())) // Check before hand that peer is valid. - if s.cfg.p2p.Peers().IsBad(stream.Conn().RemotePeer()) { - if err := s.sendGoodByeAndDisconnect(ctx, p2ptypes.GoodbyeCodeBanned, stream.Conn().RemotePeer()); err != nil { + if err := s.cfg.p2p.Peers().IsBad(remotePeer); err != nil { + if err := s.sendGoodByeAndDisconnect(ctx, p2ptypes.GoodbyeCodeBanned, remotePeer); err != nil { log.WithError(err).Debug("Could not disconnect from peer") } return diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_root_test.go b/beacon-chain/sync/rpc_beacon_blocks_by_root_test.go index b84d15b412..74070a16e7 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_root_test.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_root_test.go @@ -395,7 +395,7 @@ func TestRequestPendingBlobs(t *testing.T) { Genesis: time.Now(), } p1.Peers().Add(new(enr.Record), p2.PeerID(), nil, network.DirOutbound) - p1.Peers().SetConnectionState(p2.PeerID(), peers.PeerConnected) + p1.Peers().SetConnectionState(p2.PeerID(), peers.Connected) p1.Peers().SetChainState(p2.PeerID(), ðpb.Status{FinalizedEpoch: 1}) s := &Service{ cfg: &config{ diff --git a/beacon-chain/sync/rpc_goodbye.go b/beacon-chain/sync/rpc_goodbye.go index 4c27e0b55c..03f0789c87 100644 --- a/beacon-chain/sync/rpc_goodbye.go +++ b/beacon-chain/sync/rpc_goodbye.go @@ -55,10 +55,7 @@ func (s *Service) goodbyeRPCHandler(_ context.Context, msg interface{}, stream l // disconnectBadPeer checks whether peer is considered bad by some scorer, and tries to disconnect // the peer, if that is the case. Additionally, disconnection reason is obtained from scorer. -func (s *Service) disconnectBadPeer(ctx context.Context, id peer.ID) { - if !s.cfg.p2p.Peers().IsBad(id) { - return - } +func (s *Service) disconnectBadPeer(ctx context.Context, id peer.ID, badPeerErr error) { err := s.cfg.p2p.Peers().Scorers().ValidationError(id) goodbyeCode := p2ptypes.ErrToGoodbyeCode(err) if err == nil { @@ -67,6 +64,8 @@ func (s *Service) disconnectBadPeer(ctx context.Context, id peer.ID) { if err := s.sendGoodByeAndDisconnect(ctx, goodbyeCode, id); err != nil { log.WithError(err).Debug("Error when disconnecting with bad peer") } + + log.WithError(badPeerErr).WithField("peerID", id).Debug("Initiate peer disconnection") } // A custom goodbye method that is used by our connection handler, in the diff --git a/beacon-chain/sync/rpc_status.go b/beacon-chain/sync/rpc_status.go index e688301d66..34be4c4020 100644 --- a/beacon-chain/sync/rpc_status.go +++ b/beacon-chain/sync/rpc_status.go @@ -25,7 +25,7 @@ import ( "github.com/sirupsen/logrus" ) -// maintainPeerStatuses by infrequently polling peers for their latest status. +// maintainPeerStatuses maintains peer statuses by polling peers for their latest status twice per epoch. func (s *Service) maintainPeerStatuses() { // Run twice per epoch. interval := time.Duration(params.BeaconConfig().SlotsPerEpoch.Div(2).Mul(params.BeaconConfig().SecondsPerSlot)) * time.Second @@ -38,16 +38,20 @@ func (s *Service) maintainPeerStatuses() { // If our peer status has not been updated correctly we disconnect over here // and set the connection state over here instead. if s.cfg.p2p.Host().Network().Connectedness(id) != network.Connected { - s.cfg.p2p.Peers().SetConnectionState(id, peers.PeerDisconnecting) + s.cfg.p2p.Peers().SetConnectionState(id, peers.Disconnecting) if err := s.cfg.p2p.Disconnect(id); err != nil { log.WithError(err).Debug("Error when disconnecting with peer") } - s.cfg.p2p.Peers().SetConnectionState(id, peers.PeerDisconnected) + s.cfg.p2p.Peers().SetConnectionState(id, peers.Disconnected) + log.WithFields(logrus.Fields{ + "peer": id, + "reason": "maintain peer statuses - peer is not connected", + }).Debug("Initiate peer disconnection") return } // Disconnect from peers that are considered bad by any of the registered scorers. - if s.cfg.p2p.Peers().IsBad(id) { - s.disconnectBadPeer(s.ctx, id) + if err := s.cfg.p2p.Peers().IsBad(id); err != nil { + s.disconnectBadPeer(s.ctx, id, err) return } // If the status hasn't been updated in the recent interval time. @@ -73,6 +77,11 @@ func (s *Service) maintainPeerStatuses() { if err := s.sendGoodByeAndDisconnect(s.ctx, p2ptypes.GoodbyeCodeTooManyPeers, id); err != nil { log.WithField("peer", id).WithError(err).Debug("Could not disconnect with peer") } + + log.WithFields(logrus.Fields{ + "peer": id, + "reason": "to be pruned", + }).Debug("Initiate peer disconnection") } }) } @@ -169,8 +178,8 @@ func (s *Service) sendRPCStatusRequest(ctx context.Context, id peer.ID) error { // If validation fails, validation error is logged, and peer status scorer will mark peer as bad. err = s.validateStatusMessage(ctx, msg) s.cfg.p2p.Peers().Scorers().PeerStatusScorer().SetPeerStatus(id, msg, err) - if s.cfg.p2p.Peers().IsBad(id) { - s.disconnectBadPeer(s.ctx, id) + if err := s.cfg.p2p.Peers().IsBad(id); err != nil { + s.disconnectBadPeer(s.ctx, id, err) } return err } @@ -182,7 +191,7 @@ func (s *Service) reValidatePeer(ctx context.Context, id peer.ID) error { } // Do not return an error for ping requests. if err := s.sendPingRequest(ctx, id); err != nil && !isUnwantedError(err) { - log.WithError(err).Debug("Could not ping peer") + log.WithError(err).WithField("pid", id).Debug("Could not ping peer") } return nil } diff --git a/beacon-chain/sync/rpc_status_test.go b/beacon-chain/sync/rpc_status_test.go index 67323129d6..2aece63d6a 100644 --- a/beacon-chain/sync/rpc_status_test.go +++ b/beacon-chain/sync/rpc_status_test.go @@ -413,7 +413,7 @@ func TestHandshakeHandlers_Roundtrip(t *testing.T) { assert.Equal(t, numActive1+1, numActive2, "Number of active peers unexpected") require.NoError(t, p2.Disconnect(p1.PeerID())) - p1.Peers().SetConnectionState(p2.PeerID(), peers.PeerDisconnected) + p1.Peers().SetConnectionState(p2.PeerID(), peers.Disconnected) // Wait for disconnect event to trigger. time.Sleep(200 * time.Millisecond) @@ -877,7 +877,7 @@ func TestStatusRPCRequest_BadPeerHandshake(t *testing.T) { require.NoError(t, cw.SetClock(startup.NewClock(chain.Genesis, chain.ValidatorsRoot))) - assert.Equal(t, false, p1.Peers().Scorers().IsBadPeer(p2.PeerID()), "Peer is marked as bad") + assert.NoError(t, p1.Peers().Scorers().IsBadPeer(p2.PeerID()), "Peer is marked as bad") p1.Connect(p2) if util.WaitTimeout(&wg, time.Second) { @@ -887,9 +887,9 @@ func TestStatusRPCRequest_BadPeerHandshake(t *testing.T) { connectionState, err := p1.Peers().ConnectionState(p2.PeerID()) require.NoError(t, err, "Could not obtain peer connection state") - assert.Equal(t, peers.PeerDisconnected, connectionState, "Expected peer to be disconnected") + assert.Equal(t, peers.Disconnected, connectionState, "Expected peer to be disconnected") - assert.Equal(t, true, p1.Peers().Scorers().IsBadPeer(p2.PeerID()), "Peer is not marked as bad") + assert.NotNil(t, p1.Peers().Scorers().IsBadPeer(p2.PeerID()), "Peer is not marked as bad") } func TestStatusRPC_ValidGenesisMessage(t *testing.T) {