Advanced peer tracking (#4233)

* Advanced peer status

* Rework errors; add tests

* Gazelle

* time->roughtime

* Update beacon-chain/p2p/handshake.go

Co-Authored-By: terence tsao <terence@prysmaticlabs.com>

* Update beacon-chain/p2p/interfaces.go

Co-Authored-By: terence tsao <terence@prysmaticlabs.com>

* Downgrade log

* Tidy up handshaking logic and commentary

* Downgrade log message

* Protect connected peers from disconnection; increase high water level to avoid bad interactions at maxPeers
This commit is contained in:
Jim McDonald
2019-12-11 10:31:36 +00:00
committed by Nishant Das
parent 5757ce8894
commit 813233373e
29 changed files with 884 additions and 407 deletions

View File

@@ -31,7 +31,6 @@ go_library(
"//beacon-chain/p2p/connmgr:go_default_library",
"//beacon-chain/p2p/encoder:go_default_library",
"//beacon-chain/p2p/peers:go_default_library",
"//beacon-chain/sync/peerstatus:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//shared:go_default_library",
"//shared/hashutil:go_default_library",

View File

@@ -8,7 +8,7 @@ import (
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/prysmaticlabs/prysm/beacon-chain/sync/peerstatus"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers"
"github.com/sirupsen/logrus"
)
@@ -18,73 +18,94 @@ import (
func (s *Service) AddConnectionHandler(reqFunc func(ctx context.Context, id peer.ID) error) {
s.host.Network().Notify(&network.NotifyBundle{
ConnectedF: func(net network.Network, conn network.Conn) {
if peerCount(s.host) > int(s.cfg.MaxPeers) {
if err := s.Disconnect(conn.RemotePeer()); err != nil {
log.WithError(err).Errorf("Unable to close peer %s", conn.RemotePeer())
return
}
log := log.WithField("peer", conn.RemotePeer().Pretty())
// Handle the various pre-existing conditions that will result in us not handshaking.
peerConnectionState, err := s.peers.ConnectionState(conn.RemotePeer())
if err == nil && peerConnectionState == peers.PeerConnected {
log.Debug("Peer already connected; not handshaking again")
return
}
multiAddr := fmt.Sprintf("%s/p2p/%s", conn.RemoteMultiaddr().String(), conn.RemotePeer().String())
log.WithFields(logrus.Fields{
"direction": conn.Stat().Direction,
"multiAddr": multiAddr,
"peerCount": peerCount(s.host),
}).Debug("Connection")
if peerstatus.IsBadPeer(conn.RemotePeer()) {
// Add Peer to gossipsub blacklist
s.pubsub.BlacklistPeer(conn.RemotePeer())
log.WithField("peerID", conn.RemotePeer().Pretty()).Trace("Disconnecting with bad peer")
s.peers.Add(conn.RemotePeer(), conn.RemoteMultiaddr(), conn.Stat().Direction)
if len(s.peers.Active()) >= int(s.cfg.MaxPeers) {
log.Debug("We have enough peers; disconnecting")
if err := s.Disconnect(conn.RemotePeer()); err != nil {
log.WithError(err).Errorf("Unable to close peer %s", conn.RemotePeer())
return
log.WithError(err).Error("Unable to disconnect from peer")
}
return
}
if s.peers.IsBad(conn.RemotePeer()) {
// Add peer to gossipsub blacklist.
s.pubsub.BlacklistPeer(conn.RemotePeer())
log.Trace("Disconnecting from bad peer")
if err := s.Disconnect(conn.RemotePeer()); err != nil {
log.WithError(err).Error("Unable to disconnect from peer")
}
return
}
// ConnectedF must be non-blocking as part of libp2p design.
// Connection handler must be non-blocking as part of libp2p design.
go func() {
// Go through the handshake process.
multiAddr := fmt.Sprintf("%s/p2p/%s", conn.RemoteMultiaddr().String(), conn.RemotePeer().String())
log.WithFields(logrus.Fields{
"direction": conn.Stat().Direction,
"multiAddr": multiAddr,
"activePeers": len(s.peers.Active()),
}).Debug("Peer handshaking")
s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerConnecting)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
log := log.WithField("peer", conn.RemotePeer().Pretty())
log.Debug("Performing handshake with peer")
if err := reqFunc(ctx, conn.RemotePeer()); err != nil && err != io.EOF {
log.WithError(err).Debug("Could not send successful hello rpc request")
log.WithError(err).Debug("Handshake failed")
if err.Error() == "protocol not supported" {
// This is only to ensure the smooth running of our testnets. This will not be
// used in production.
log.Debug("Not disconnecting peer with unsupported protocol. This may be the DHT node or relay.")
s.host.ConnManager().Protect(conn.RemotePeer(), "relay/bootnode")
s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerDisconnected)
return
}
s.peers.IncrementBadResponses(conn.RemotePeer())
badResponses, err := s.peers.BadResponses(conn.RemotePeer())
if err == nil && badResponses == s.peers.MaxBadResponses() {
log.Debug("Peer has given too many bad responses; will ignore in future")
}
s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerDisconnecting)
if err := s.Disconnect(conn.RemotePeer()); err != nil {
log.WithError(err).Errorf("Unable to close peer %s", conn.RemotePeer())
return
log.WithError(err).Error("Unable to disconnect from peer")
}
s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerDisconnected)
return
}
log.WithField("peer", conn.RemotePeer().Pretty()).Info("New peer connected")
s.host.ConnManager().Protect(conn.RemotePeer(), "protocol")
s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerConnected)
log.WithField("active", len(s.peers.Active())).Info("Peer connected")
}()
},
})
}
// AddDisconnectionHandler ensures that previously disconnected peers aren't dialed again. Due
// to either their ports being closed, nodes are no longer active,etc. This also calls the handler
// responsible for maintaining other parts of the sync or p2p system.
// AddDisconnectionHandler disconnects from peers. It handles updating the peer status.
// This also calls the handler responsible for maintaining other parts of the sync or p2p system.
func (s *Service) AddDisconnectionHandler(handler func(ctx context.Context, id peer.ID) error) {
s.host.Network().Notify(&network.NotifyBundle{
DisconnectedF: func(net network.Network, conn network.Conn) {
log := log.WithField("peer", conn.RemotePeer().Pretty())
// Must be handled in a goroutine as this callback cannot be blocking.
go func() {
s.exclusionList.Set(conn.RemotePeer().String(), true, 1)
log := log.WithField("peer", conn.RemotePeer())
log.Debug("Peer is added to exclusion list")
s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerDisconnecting)
ctx := context.Background()
if err := handler(ctx, conn.RemotePeer()); err != nil {
log.WithError(err).Error("Failed to handle disconnecting peer")
log.WithError(err).Error("Disconnect handler failed")
}
s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerDisconnected)
s.host.ConnManager().Unprotect(conn.RemotePeer(), "protocol")
// Log good peer disconnects; bad peers can visit here frequently so do not log them.
if !s.peers.IsBad(conn.RemotePeer()) {
log.WithField("active", len(s.peers.Active())).Debug("Peer disconnected")
}
}()
},
})
}

View File

@@ -60,7 +60,7 @@ type Sender interface {
Send(context.Context, interface{}, peer.ID) (network.Stream, error)
}
// PeersProvider abstracts obtaining our current list of known peers.
// PeersProvider abstracts obtaining our current list of known peers status.
type PeersProvider interface {
Peers() []*peers.Info
Peers() *peers.Status
}

View File

@@ -24,7 +24,9 @@ func buildOptions(cfg *Config, ip net.IP, priKey *ecdsa.PrivateKey) []libp2p.Opt
libp2p.EnableRelay(),
libp2p.ListenAddrs(listen),
whitelistSubnet(cfg.WhitelistCIDR),
libp2p.ConnectionManager(connmgr.NewConnManager(int(cfg.MaxPeers), int(cfg.MaxPeers), 1*time.Second)),
// Add one for the boot node and another for the relay, otherwise when we are close to maxPeers we will be above the high
// water mark and continually trigger pruning.
libp2p.ConnectionManager(connmgr.NewConnManager(int(cfg.MaxPeers+2), int(cfg.MaxPeers+2), 1*time.Second)),
}
if cfg.EnableUPnP {
options = append(options, libp2p.NATPortMap()) //Allow to use UPnP

View File

@@ -1,12 +1,27 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["info.go"],
srcs = ["status.go"],
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers",
visibility = ["//beacon-chain:__subpackages__"],
deps = [
"//proto/beacon/p2p/v1:go_default_library",
"//shared/roughtime:go_default_library",
"@com_github_libp2p_go_libp2p_core//network:go_default_library",
"@com_github_libp2p_go_libp2p_core//peer:go_default_library",
"@com_github_multiformats_go_multiaddr//:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["status_test.go"],
embed = [":go_default_library"],
deps = [
"//proto/beacon/p2p/v1:go_default_library",
"@com_github_libp2p_go_libp2p_core//network:go_default_library",
"@com_github_libp2p_go_libp2p_peer//:go_default_library",
"@com_github_multiformats_go_multiaddr//:go_default_library",
],
)

View File

@@ -1,12 +0,0 @@
package peers
import (
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
)
// Info provides information about a peer connection.
type Info struct {
AddrInfo *peer.AddrInfo
Direction network.Direction
}

View File

@@ -0,0 +1,312 @@
// Package peers provides information about peers at the Ethereum protocol level.
// "Protocol level" is the level above the network level, so this layer never sees or interacts with (for example) hosts that are
// uncontactable due to being down, firewalled, etc. Instead, this works with peers that are contactable but may or may not be of
// the correct fork version, not currently required due to the number of current connections, etc.
//
// A peer can have one of a number of states:
//
// - connected if we are able to talk to the remote peer
// - connecting if we are attempting to be able to talk to the remote peer
// - disconnecting if we are attempting to stop being able to talk to the remote peer
// - disconnected if we are not able to talk to the remote peer
//
// For convenience, there are two aggregate states expressed in functions:
//
// - active if we are connecting or connected
// - inactive if we are disconnecting or disconnected
//
// Peer information is persistent for the run of the service. This allows for collection of useful long-term statistics such as
// number of bad responses obtained from the peer, giving the basis for decisions to not talk to known-bad peers.
package peers
import (
"errors"
"sync"
"time"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/roughtime"
)
// PeerConnectionState is the state of the connection.
type PeerConnectionState int
const (
// PeerDisconnected means there is no connection to the peer.
PeerDisconnected PeerConnectionState = iota
// PeerConnecting means there is an on-going attempt to connect to the peer.
PeerConnecting
// PeerConnected means the peer has an active connection.
PeerConnected
// PeerDisconnecting means there is an on-going attempt to disconnect from the peer.
PeerDisconnecting
)
var (
// ErrPeerUnknown is returned when there is an attempt to obtain data from a peer that is not known.
ErrPeerUnknown = errors.New("peer unknown")
)
// Status is the structure holding the peer status information.
type Status struct {
lock sync.RWMutex
maxBadResponses int
status map[peer.ID]*peerStatus
}
// peerStatus is the status of an individual peer at the protocol level.
type peerStatus struct {
address ma.Multiaddr
direction network.Direction
peerState PeerConnectionState
chainState *pb.Status
chainStateLastUpdated time.Time
badResponses int
}
// NewStatus creates a new status entity.
func NewStatus(maxBadResponses int) *Status {
return &Status{
maxBadResponses: maxBadResponses,
status: make(map[peer.ID]*peerStatus),
}
}
// MaxBadResponses returns the maximum number of bad responses a peer can provide before it is considered bad.
func (p *Status) MaxBadResponses() int {
return p.maxBadResponses
}
// Add adds a peer.
// If a peer already exists with this ID its address and direction are updated with the supplied data.
func (p *Status) Add(pid peer.ID, address ma.Multiaddr, direction network.Direction) {
p.lock.Lock()
defer p.lock.Unlock()
if status, ok := p.status[pid]; ok {
// Peer already exists, just update its address info.
status.address = address
status.direction = direction
return
}
p.status[pid] = &peerStatus{
address: address,
direction: direction,
// Peers start disconnected; state will be updated when the handshake process begins.
peerState: PeerDisconnected,
}
}
// Address returns the multiaddress of the given remote peer.
// This will error if the peer does not exist.
func (p *Status) Address(pid peer.ID) (ma.Multiaddr, error) {
p.lock.RLock()
defer p.lock.RUnlock()
if status, ok := p.status[pid]; ok {
return status.address, nil
}
return nil, ErrPeerUnknown
}
// Direction returns the direction of the given remote peer.
// This will error if the peer does not exist.
func (p *Status) Direction(pid peer.ID) (network.Direction, error) {
p.lock.RLock()
defer p.lock.RUnlock()
if status, ok := p.status[pid]; ok {
return status.direction, nil
}
return network.DirUnknown, ErrPeerUnknown
}
// SetChainState sets the chain state of the given remote peer.
func (p *Status) SetChainState(pid peer.ID, chainState *pb.Status) {
p.lock.Lock()
defer p.lock.Unlock()
status := p.fetch(pid)
status.chainState = chainState
status.chainStateLastUpdated = roughtime.Now()
}
// ChainState gets the chain state of the given remote peer.
// This can return nil if there is no known chain state for the peer.
// This will error if the peer does not exist.
func (p *Status) ChainState(pid peer.ID) (*pb.Status, error) {
p.lock.RLock()
defer p.lock.RUnlock()
if status, ok := p.status[pid]; ok {
return status.chainState, nil
}
return nil, ErrPeerUnknown
}
// SetConnectionState sets the connection state of the given remote peer.
func (p *Status) SetConnectionState(pid peer.ID, state PeerConnectionState) {
p.lock.Lock()
defer p.lock.Unlock()
status := p.fetch(pid)
status.peerState = state
}
// ConnectionState gets the connection state of the given remote peer.
// This will error if the peer does not exist.
func (p *Status) ConnectionState(pid peer.ID) (PeerConnectionState, error) {
p.lock.RLock()
defer p.lock.RUnlock()
if status, ok := p.status[pid]; ok {
return status.peerState, nil
}
return PeerDisconnected, ErrPeerUnknown
}
// ChainStateLastUpdated gets the last time the chain state of the given remote peer was updated.
// This will error if the peer does not exist.
func (p *Status) ChainStateLastUpdated(pid peer.ID) (time.Time, error) {
p.lock.RLock()
defer p.lock.RUnlock()
if status, ok := p.status[pid]; ok {
return status.chainStateLastUpdated, nil
}
return roughtime.Now(), ErrPeerUnknown
}
// IncrementBadResponses increments the number of bad responses we have received from the given remote peer.
func (p *Status) IncrementBadResponses(pid peer.ID) {
p.lock.Lock()
defer p.lock.Unlock()
status := p.fetch(pid)
status.badResponses++
}
// BadResponses obtains the number of bad responses we have received from the given remote peer.
// This will error if the peer does not exist.
func (p *Status) BadResponses(pid peer.ID) (int, error) {
p.lock.RLock()
defer p.lock.RUnlock()
if status, ok := p.status[pid]; ok {
return status.badResponses, nil
}
return -1, ErrPeerUnknown
}
// IsBad states if the peer is to be considered bad.
// If the peer is unknown this will return `false`, which makes using this function easier than returning an error.
func (p *Status) IsBad(pid peer.ID) bool {
p.lock.RLock()
defer p.lock.RUnlock()
if status, ok := p.status[pid]; ok {
return status.badResponses >= p.maxBadResponses
}
return false
}
// Connecting returns the peers that are connecting.
func (p *Status) Connecting() []peer.ID {
p.lock.RLock()
defer p.lock.RUnlock()
peers := make([]peer.ID, 0)
for pid, status := range p.status {
if status.peerState == PeerConnecting {
peers = append(peers, pid)
}
}
return peers
}
// Connected returns the peers that are connected.
func (p *Status) Connected() []peer.ID {
p.lock.RLock()
defer p.lock.RUnlock()
peers := make([]peer.ID, 0)
for pid, status := range p.status {
if status.peerState == PeerConnected {
peers = append(peers, pid)
}
}
return peers
}
// Active returns the peers that are connecting or connected.
func (p *Status) Active() []peer.ID {
p.lock.RLock()
defer p.lock.RUnlock()
peers := make([]peer.ID, 0)
for pid, status := range p.status {
if status.peerState == PeerConnecting || status.peerState == PeerConnected {
peers = append(peers, pid)
}
}
return peers
}
// Disconnecting returns the peers that are disconnecting.
func (p *Status) Disconnecting() []peer.ID {
p.lock.RLock()
defer p.lock.RUnlock()
peers := make([]peer.ID, 0)
for pid, status := range p.status {
if status.peerState == PeerDisconnecting {
peers = append(peers, pid)
}
}
return peers
}
// Disconnected returns the peers that are disconnected.
func (p *Status) Disconnected() []peer.ID {
p.lock.RLock()
defer p.lock.RUnlock()
peers := make([]peer.ID, 0)
for pid, status := range p.status {
if status.peerState == PeerDisconnected {
peers = append(peers, pid)
}
}
return peers
}
// Inactive returns the peers that are disconnecting or disconnected.
func (p *Status) Inactive() []peer.ID {
p.lock.RLock()
defer p.lock.RUnlock()
peers := make([]peer.ID, 0)
for pid, status := range p.status {
if status.peerState == PeerDisconnecting || status.peerState == PeerDisconnected {
peers = append(peers, pid)
}
}
return peers
}
// All returns all the peers regardless of state.
func (p *Status) All() []peer.ID {
p.lock.RLock()
defer p.lock.RUnlock()
pids := make([]peer.ID, 0, len(p.status))
for pid := range p.status {
pids = append(pids, pid)
}
return pids
}
// fetch is a helper function that fetches a peer status, possibly creating it.
func (p *Status) fetch(pid peer.ID) *peerStatus {
if _, ok := p.status[pid]; !ok {
p.status[pid] = &peerStatus{}
}
return p.status[pid]
}

View File

@@ -0,0 +1,318 @@
package peers_test
import (
"crypto/rand"
"fmt"
"testing"
"github.com/libp2p/go-libp2p-core/network"
peer "github.com/libp2p/go-libp2p-peer"
ma "github.com/multiformats/go-multiaddr"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
)
func TestStatus(t *testing.T) {
maxBadResponses := 2
p := peers.NewStatus(maxBadResponses)
if p == nil {
t.Fatalf("p not created")
}
if p.MaxBadResponses() != maxBadResponses {
t.Errorf("maxBadResponses incorrect value: expected %v, received %v", maxBadResponses, p.MaxBadResponses())
}
}
func TestPeerExplicitAdd(t *testing.T) {
maxBadResponses := 2
p := peers.NewStatus(maxBadResponses)
id, err := peer.IDB58Decode("16Uiu2HAkyWZ4Ni1TpvDS8dPxsozmHY85KaiFjodQuV6Tz5tkHVeR")
if err != nil {
t.Fatalf("Failed to create ID: %v", err)
}
address, err := ma.NewMultiaddr("/ip4/213.202.254.180/tcp/13000")
if err != nil {
t.Fatalf("Failed to create address: %v", err)
}
direction := network.DirInbound
p.Add(id, address, direction)
resAddress, err := p.Address(id)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if resAddress != address {
t.Errorf("Unexpected address: expected %v, received %v", address, resAddress)
}
resDirection, err := p.Direction(id)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if resDirection != direction {
t.Errorf("Unexpected direction: expected %v, received %v", direction, resDirection)
}
// Update with another explicit add
address2, err := ma.NewMultiaddr("/ip4/52.23.23.253/tcp/30000/ipfs/QmfAgkmjiZNZhr2wFN9TwaRgHouMTBT6HELyzE5A3BT2wK/p2p-circuit")
if err != nil {
t.Fatalf("Failed to create address: %v", err)
}
direction2 := network.DirOutbound
p.Add(id, address2, direction2)
resAddress2, err := p.Address(id)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if resAddress2 != address2 {
t.Errorf("Unexpected address: expected %v, received %v", address2, resAddress2)
}
resDirection2, err := p.Direction(id)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if resDirection2 != direction2 {
t.Errorf("Unexpected direction: expected %v, received %v", direction2, resDirection2)
}
}
func TestErrUnknownPeer(t *testing.T) {
maxBadResponses := 2
p := peers.NewStatus(maxBadResponses)
id, err := peer.IDB58Decode("16Uiu2HAkyWZ4Ni1TpvDS8dPxsozmHY85KaiFjodQuV6Tz5tkHVeR")
if err != nil {
t.Fatal(err)
}
_, err = p.Address(id)
if err != peers.ErrPeerUnknown {
t.Errorf("Unexpected error: expected %v, received %v", peers.ErrPeerUnknown, err)
}
_, err = p.Direction(id)
if err != peers.ErrPeerUnknown {
t.Errorf("Unexpected error: expected %v, received %v", peers.ErrPeerUnknown, err)
}
_, err = p.ChainState(id)
if err != peers.ErrPeerUnknown {
t.Errorf("Unexpected error: expected %v, received %v", peers.ErrPeerUnknown, err)
}
_, err = p.ConnectionState(id)
if err != peers.ErrPeerUnknown {
t.Errorf("Unexpected error: expected %v, received %v", peers.ErrPeerUnknown, err)
}
_, err = p.ChainStateLastUpdated(id)
if err != peers.ErrPeerUnknown {
t.Errorf("Unexpected error: expected %v, received %v", peers.ErrPeerUnknown, err)
}
_, err = p.BadResponses(id)
if err != peers.ErrPeerUnknown {
t.Errorf("Unexpected error: expected %v, received %v", peers.ErrPeerUnknown, err)
}
}
func TestPeerImplicitAdd(t *testing.T) {
maxBadResponses := 2
p := peers.NewStatus(maxBadResponses)
id, err := peer.IDB58Decode("16Uiu2HAkyWZ4Ni1TpvDS8dPxsozmHY85KaiFjodQuV6Tz5tkHVeR")
if err != nil {
t.Fatal(err)
}
connectionState := peers.PeerConnecting
p.SetConnectionState(id, connectionState)
resConnectionState, err := p.ConnectionState(id)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if resConnectionState != connectionState {
t.Errorf("Unexpected connection state: expected %v, received %v", connectionState, resConnectionState)
}
}
func TestPeerChainState(t *testing.T) {
maxBadResponses := 2
p := peers.NewStatus(maxBadResponses)
id, err := peer.IDB58Decode("16Uiu2HAkyWZ4Ni1TpvDS8dPxsozmHY85KaiFjodQuV6Tz5tkHVeR")
if err != nil {
t.Fatal(err)
}
address, err := ma.NewMultiaddr("/ip4/213.202.254.180/tcp/13000")
if err != nil {
t.Fatalf("Failed to create address: %v", err)
}
direction := network.DirInbound
p.Add(id, address, direction)
oldChainStartLastUpdated, err := p.ChainStateLastUpdated(id)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
finalizedEpoch := uint64(123)
p.SetChainState(id, &pb.Status{FinalizedEpoch: finalizedEpoch})
resChainState, err := p.ChainState(id)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if resChainState.FinalizedEpoch != finalizedEpoch {
t.Errorf("Unexpected finalized epoch: expected %v, received %v", finalizedEpoch, resChainState.FinalizedEpoch)
}
newChainStartLastUpdated, err := p.ChainStateLastUpdated(id)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !newChainStartLastUpdated.After(oldChainStartLastUpdated) {
t.Errorf("Last updated did not increase: old %v new %v", oldChainStartLastUpdated, newChainStartLastUpdated)
}
}
func TestPeerBadResponses(t *testing.T) {
maxBadResponses := 2
p := peers.NewStatus(maxBadResponses)
id, err := peer.IDB58Decode("16Uiu2HAkyWZ4Ni1TpvDS8dPxsozmHY85KaiFjodQuV6Tz5tkHVeR")
if err != nil {
t.Fatal(err)
}
{
bytes, _ := id.MarshalBinary()
fmt.Printf("%x\n", bytes)
}
if p.IsBad(id) {
t.Error("Peer marked as bad when should be good")
}
address, err := ma.NewMultiaddr("/ip4/213.202.254.180/tcp/13000")
if err != nil {
t.Fatalf("Failed to create address: %v", err)
}
direction := network.DirInbound
p.Add(id, address, direction)
resBadResponses, err := p.BadResponses(id)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if resBadResponses != 0 {
t.Errorf("Unexpected bad responses: expected 0, received %v", resBadResponses)
}
if p.IsBad(id) {
t.Error("Peer marked as bad when should be good")
}
p.IncrementBadResponses(id)
resBadResponses, err = p.BadResponses(id)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if resBadResponses != 1 {
t.Errorf("Unexpected bad responses: expected 1, received %v", resBadResponses)
}
if p.IsBad(id) {
t.Error("Peer marked as bad when should be good")
}
p.IncrementBadResponses(id)
resBadResponses, err = p.BadResponses(id)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if resBadResponses != 2 {
t.Errorf("Unexpected bad responses: expected 2, received %v", resBadResponses)
}
if !p.IsBad(id) {
t.Error("Peer not marked as bad when it should be")
}
p.IncrementBadResponses(id)
resBadResponses, err = p.BadResponses(id)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if resBadResponses != 3 {
t.Errorf("Unexpected bad responses: expected 3, received %v", resBadResponses)
}
if !p.IsBad(id) {
t.Error("Peer not marked as bad when it should be")
}
}
func TestPeerConnectionStatuses(t *testing.T) {
maxBadResponses := 2
p := peers.NewStatus(maxBadResponses)
// Add some peers with different states
numPeersDisconnected := 11
for i := 0; i < numPeersDisconnected; i++ {
addPeer(t, p, peers.PeerDisconnected)
}
numPeersConnecting := 7
for i := 0; i < numPeersConnecting; i++ {
addPeer(t, p, peers.PeerConnecting)
}
numPeersConnected := 43
for i := 0; i < numPeersConnected; i++ {
addPeer(t, p, peers.PeerConnected)
}
numPeersDisconnecting := 4
for i := 0; i < numPeersDisconnecting; i++ {
addPeer(t, p, peers.PeerDisconnecting)
}
// Now confirm the states
if len(p.Disconnected()) != numPeersDisconnected {
t.Errorf("Unexpected number of disconnected peers: expected %v, received %v", numPeersDisconnected, len(p.Disconnected()))
}
if len(p.Connecting()) != numPeersConnecting {
t.Errorf("Unexpected number of connecting peers: expected %v, received %v", numPeersConnecting, len(p.Connecting()))
}
if len(p.Connected()) != numPeersConnected {
t.Errorf("Unexpected number of connected peers: expected %v, received %v", numPeersConnected, len(p.Connected()))
}
if len(p.Disconnecting()) != numPeersDisconnecting {
t.Errorf("Unexpected number of disconnecting peers: expected %v, received %v", numPeersDisconnecting, len(p.Disconnecting()))
}
numPeersActive := numPeersConnecting + numPeersConnected
if len(p.Active()) != numPeersActive {
t.Errorf("Unexpected number of active peers: expected %v, received %v", numPeersActive, len(p.Active()))
}
numPeersInactive := numPeersDisconnecting + numPeersDisconnected
if len(p.Inactive()) != numPeersInactive {
t.Errorf("Unexpected number of inactive peers: expected %v, received %v", numPeersInactive, len(p.Inactive()))
}
numPeersAll := numPeersActive + numPeersInactive
if len(p.All()) != numPeersAll {
t.Errorf("Unexpected number of peers: expected %v, received %v", numPeersAll, len(p.All()))
}
}
// addPeer is a helper to add a peer with a given connection state)
func addPeer(t *testing.T, p *peers.Status, state peers.PeerConnectionState) {
// Set up some peers with different states
mhBytes := []byte{0x11, 0x04}
idBytes := make([]byte, 4)
rand.Read(idBytes)
mhBytes = append(mhBytes, idBytes...)
id, err := peer.IDFromBytes(mhBytes)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
p.Add(id, nil, network.DirUnknown)
p.SetConnectionState(id, state)
}

View File

@@ -33,6 +33,9 @@ var ttl = 1 * time.Hour
const prysmProtocolPrefix = "/prysm/0.0.0"
// maxBadResponses is the maximum number of bad responses from a peer before we stop talking to it.
const maxBadResponses = 3
// Service for managing peer to peer (p2p) networking.
type Service struct {
ctx context.Context
@@ -46,6 +49,7 @@ type Service struct {
exclusionList *ristretto.Cache
privKey *ecdsa.PrivateKey
dht *kaddht.IpfsDHT
peers *peers.Status
}
// NewService initializes a new p2p service compatible with shared.Service interface. No
@@ -119,6 +123,8 @@ func NewService(cfg *Config) (*Service, error) {
}
s.pubsub = gs
s.peers = peers.NewStatus(maxBadResponses)
return s, nil
}
@@ -260,23 +266,9 @@ func (s *Service) Disconnect(pid peer.ID) error {
return s.host.Network().ClosePeer(pid)
}
// Peers provides a list of peers that are known to this node.
// Note this includes peers to which we are not currently actively connected; to find active
// peers the info returned here should be filtered against that in `peerstatus`, which contains
// information about active peers.
func (s *Service) Peers() []*peers.Info {
res := make([]*peers.Info, 0)
for _, conn := range s.host.Network().Conns() {
addrInfo := &peer.AddrInfo{
ID: conn.RemotePeer(),
Addrs: []ma.Multiaddr{conn.RemoteMultiaddr()},
}
res = append(res, &peers.Info{
AddrInfo: addrInfo,
Direction: conn.Stat().Direction,
})
}
return res
// Peers returns the peer status interface.
func (s *Service) Peers() *peers.Status {
return s.peers
}
// listen for new nodes watches for new nodes in the network and adds them to the peerstore.
@@ -312,6 +304,9 @@ func (s *Service) connectWithAllPeers(multiAddrs []ma.Multiaddr) {
if _, ok := s.exclusionList.Get(info.ID.String()); ok {
continue
}
if s.Peers().IsBad(info.ID) {
continue
}
if err := s.host.Connect(s.ctx, info); err != nil {
log.Errorf("Could not connect with peer %s: %v", info.String(), err)
s.exclusionList.Set(info.ID.String(), true, 1)

View File

@@ -21,6 +21,7 @@ go_library(
"@com_github_libp2p_go_libp2p_core//network:go_default_library",
"@com_github_libp2p_go_libp2p_core//peer:go_default_library",
"@com_github_libp2p_go_libp2p_core//protocol:go_default_library",
"@com_github_libp2p_go_libp2p_peer//:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//:go_default_library",
"@com_github_libp2p_go_libp2p_swarm//testing:go_default_library",
"@com_github_multiformats_go_multiaddr//:go_default_library",

View File

@@ -1,36 +1,38 @@
package testing
import (
"sync"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
peer "github.com/libp2p/go-libp2p-peer"
ma "github.com/multiformats/go-multiaddr"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
)
// MockPeersProvider implements PeersProvider for testing.
type MockPeersProvider struct {
lock sync.Mutex
peers *peers.Status
}
// Peers records a broadcast occurred.
func (m *MockPeersProvider) Peers() []*peers.Info {
res := make([]*peers.Info, 2)
id0, _ := peer.IDB58Decode("16Uiu2HAkyWZ4Ni1TpvDS8dPxsozmHY85KaiFjodQuV6Tz5tkHVeR")
ma0, _ := ma.NewMultiaddr("/ip4/213.202.254.180/tcp/13000")
res[0] = &peers.Info{
AddrInfo: &peer.AddrInfo{
ID: id0,
Addrs: []ma.Multiaddr{ma0},
},
Direction: network.DirInbound,
// Peers provides access the peer status.
func (m *MockPeersProvider) Peers() *peers.Status {
m.lock.Lock()
defer m.lock.Unlock()
if m.peers == nil {
m.peers = peers.NewStatus(5 /* maxBadResponses */)
// Pretend we are connected to two peers
id0, _ := peer.IDB58Decode("16Uiu2HAkyWZ4Ni1TpvDS8dPxsozmHY85KaiFjodQuV6Tz5tkHVeR")
ma0, _ := ma.NewMultiaddr("/ip4/213.202.254.180/tcp/13000")
m.peers.Add(id0, ma0, network.DirInbound)
m.peers.SetConnectionState(id0, peers.PeerConnected)
m.peers.SetChainState(id0, &pb.Status{FinalizedEpoch: uint64(10)})
id1, _ := peer.IDB58Decode("16Uiu2HAm4HgJ9N1o222xK61o7LSgToYWoAy1wNTJRkh9gLZapVAy")
ma1, _ := ma.NewMultiaddr("/ip4/52.23.23.253/tcp/30000/ipfs/QmfAgkmjiZNZhr2wFN9TwaRgHouMTBT6HELyzE5A3BT2wK/p2p-circuit")
m.peers.Add(id1, ma1, network.DirOutbound)
m.peers.SetConnectionState(id1, peers.PeerConnected)
m.peers.SetChainState(id1, &pb.Status{FinalizedEpoch: uint64(11)})
}
id1, _ := peer.IDB58Decode("16Uiu2HAm4HgJ9N1o222xK61o7LSgToYWoAy1wNTJRkh9gLZapVAy")
ma1, _ := ma.NewMultiaddr("/ip4/52.23.23.253/tcp/30000/ipfs/QmfAgkmjiZNZhr2wFN9TwaRgHouMTBT6HELyzE5A3BT2wK/p2p-circuit")
res[1] = &peers.Info{
AddrInfo: &peer.AddrInfo{
ID: id1,
Addrs: []ma.Multiaddr{ma1},
},
Direction: network.DirOutbound,
}
return res
return m.peers
}

View File

@@ -38,6 +38,7 @@ type TestP2P struct {
pubsub *pubsub.PubSub
BroadcastCalled bool
DelaySend bool
peers *peers.Status
}
// NewTestP2P initializes a new p2p test service.
@@ -56,6 +57,7 @@ func NewTestP2P(t *testing.T) *TestP2P {
t: t,
Host: h,
pubsub: ps,
peers: peers.NewStatus(5 /* maxBadResponses */),
}
}
@@ -157,15 +159,19 @@ func (p *TestP2P) AddConnectionHandler(f func(ctx context.Context, id peer.ID) e
ConnectedF: func(net network.Network, conn network.Conn) {
// Must be handled in a goroutine as this callback cannot be blocking.
go func() {
p.peers.Add(conn.RemotePeer(), conn.RemoteMultiaddr(), conn.Stat().Direction)
ctx := context.Background()
p.peers.SetConnectionState(conn.RemotePeer(), peers.PeerConnecting)
if err := f(ctx, conn.RemotePeer()); err != nil {
logrus.WithError(err).Error("Could not send succesful hello rpc request")
if err := p.Disconnect(conn.RemotePeer()); err != nil {
logrus.WithError(err).Errorf("Unable to close peer %s", conn.RemotePeer())
}
p.peers.SetConnectionState(conn.RemotePeer(), peers.PeerDisconnected)
return
}
p.peers.SetConnectionState(conn.RemotePeer(), peers.PeerConnected)
}()
},
})
@@ -177,7 +183,9 @@ func (p *TestP2P) AddDisconnectionHandler(f func(ctx context.Context, id peer.ID
DisconnectedF: func(net network.Network, conn network.Conn) {
// Must be handled in a goroutine as this callback cannot be blocking.
go func() {
p.peers.SetConnectionState(conn.RemotePeer(), peers.PeerDisconnecting)
f(context.Background(), conn.RemotePeer())
p.peers.SetConnectionState(conn.RemotePeer(), peers.PeerDisconnected)
}()
},
})
@@ -215,8 +223,7 @@ func (p *TestP2P) Started() bool {
return true
}
// Peers returns a mocked list of peers.
func (p *TestP2P) Peers() []*peers.Info {
provider := &MockPeersProvider{}
return provider.Peers()
// Peers returns the peer status.
func (p *TestP2P) Peers() *peers.Status {
return p.peers
}

View File

@@ -10,7 +10,6 @@ go_library(
"//beacon-chain/db:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//beacon-chain/sync:go_default_library",
"//beacon-chain/sync/peerstatus:go_default_library",
"//shared/version:go_default_library",
"@com_github_gogo_protobuf//types:go_default_library",
"@com_github_libp2p_go_libp2p_core//network:go_default_library",
@@ -30,8 +29,6 @@ go_test(
"//beacon-chain/db/testing:go_default_library",
"//beacon-chain/p2p/testing:go_default_library",
"//beacon-chain/sync/initial-sync/testing:go_default_library",
"//beacon-chain/sync/peerstatus:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//shared/version:go_default_library",
"@com_github_ethereum_go_ethereum//common:go_default_library",
"@com_github_gogo_protobuf//types:go_default_library",

View File

@@ -12,7 +12,6 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/db"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/beacon-chain/sync"
"github.com/prysmaticlabs/prysm/beacon-chain/sync/peerstatus"
"github.com/prysmaticlabs/prysm/shared/version"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
@@ -80,26 +79,32 @@ func (ns *Server) ListImplementedServices(ctx context.Context, _ *ptypes.Empty)
// ListPeers lists the peers connected to this node.
func (ns *Server) ListPeers(ctx context.Context, _ *ptypes.Empty) (*ethpb.Peers, error) {
peers := make([]*ethpb.Peer, 0)
for _, peer := range ns.PeersFetcher.Peers() {
peerStatus := peerstatus.Get(peer.AddrInfo.ID)
if peerStatus != nil {
address := fmt.Sprintf("%s/p2p/%s", peer.AddrInfo.Addrs[0].String(), peer.AddrInfo.ID.Pretty())
direction := ethpb.PeerDirection_UNKNOWN
switch peer.Direction {
case network.DirInbound:
direction = ethpb.PeerDirection_INBOUND
case network.DirOutbound:
direction = ethpb.PeerDirection_OUTBOUND
}
peers = append(peers, &ethpb.Peer{
Address: address,
Direction: direction,
})
res := make([]*ethpb.Peer, 0)
for _, pid := range ns.PeersFetcher.Peers().Connected() {
multiaddr, err := ns.PeersFetcher.Peers().Address(pid)
if err != nil {
continue
}
direction, err := ns.PeersFetcher.Peers().Direction(pid)
if err != nil {
continue
}
address := fmt.Sprintf("%s/p2p/%s", multiaddr.String(), pid.Pretty())
pbDirection := ethpb.PeerDirection_UNKNOWN
switch direction {
case network.DirInbound:
pbDirection = ethpb.PeerDirection_INBOUND
case network.DirOutbound:
pbDirection = ethpb.PeerDirection_OUTBOUND
}
res = append(res, &ethpb.Peer{
Address: address,
Direction: pbDirection,
})
}
return &ethpb.Peers{
Peers: peers,
Peers: res,
}, nil
}

View File

@@ -13,8 +13,6 @@ import (
dbutil "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
mockP2p "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
mockSync "github.com/prysmaticlabs/prysm/beacon-chain/sync/initial-sync/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/sync/peerstatus"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/version"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
@@ -109,17 +107,12 @@ func TestNodeServer_ListPeers(t *testing.T) {
ethpb.RegisterNodeServer(server, ns)
reflection.Register(server)
// We register the peers with peerstatus to pretend they are active.
for i, peer := range peersProvider.Peers() {
peerstatus.Set(peer.AddrInfo.ID, &pb.Status{FinalizedEpoch: uint64(i + 10)})
}
res, err := ns.ListPeers(context.Background(), &ptypes.Empty{})
if err != nil {
t.Fatal(err)
}
if len(res.Peers) != 2 {
t.Errorf("Expected 2 peers, received %d: %v", len(res.Peers), res.Peers)
t.Fatalf("Expected 2 peers, received %d: %v", len(res.Peers), res.Peers)
}
if int(res.Peers[0].Direction) != int(ethpb.PeerDirection_INBOUND) {

View File

@@ -41,7 +41,6 @@ go_library(
"//beacon-chain/operations:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//beacon-chain/p2p/encoder:go_default_library",
"//beacon-chain/sync/peerstatus:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//shared:go_default_library",
"//shared/bls:go_default_library",
@@ -95,9 +94,9 @@ go_test(
"//beacon-chain/core/state:go_default_library",
"//beacon-chain/db/testing:go_default_library",
"//beacon-chain/p2p/encoder:go_default_library",
"//beacon-chain/p2p/peers:go_default_library",
"//beacon-chain/p2p/testing:go_default_library",
"//beacon-chain/sync/initial-sync/testing:go_default_library",
"//beacon-chain/sync/peerstatus:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//proto/testing:go_default_library",
"//shared/bls:go_default_library",

View File

@@ -17,7 +17,6 @@ go_library(
"//beacon-chain/db:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//beacon-chain/sync:go_default_library",
"//beacon-chain/sync/peerstatus:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//shared:go_default_library",
"//shared/bytesutil:go_default_library",
@@ -42,9 +41,9 @@ go_test(
deps = [
"//beacon-chain/blockchain/testing:go_default_library",
"//beacon-chain/db/testing:go_default_library",
"//beacon-chain/p2p/peers:go_default_library",
"//beacon-chain/p2p/testing:go_default_library",
"//beacon-chain/sync:go_default_library",
"//beacon-chain/sync/peerstatus:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//shared/hashutil:go_default_library",
"//shared/params:go_default_library",

View File

@@ -15,7 +15,6 @@ import (
eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
prysmsync "github.com/prysmaticlabs/prysm/beacon-chain/sync"
"github.com/prysmaticlabs/prysm/beacon-chain/sync/peerstatus"
p2ppb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
@@ -46,8 +45,8 @@ func (s *InitialSync) roundRobinSync(genesis time.Time) error {
var lastEmptyRequests int
// Step 1 - Sync to end of finalized epoch.
for s.chain.HeadSlot() < helpers.StartSlot(highestFinalizedEpoch()+1) {
root, finalizedEpoch, peers := bestFinalized()
for s.chain.HeadSlot() < helpers.StartSlot(s.highestFinalizedEpoch()+1) {
root, finalizedEpoch, peers := s.bestFinalized()
if len(peers) == 0 {
log.Warn("No peers; waiting for reconnect")
time.Sleep(refreshTime)
@@ -83,8 +82,8 @@ func (s *InitialSync) roundRobinSync(genesis time.Time) error {
}
// Short circuit start far exceeding the highest finalized epoch in some infinite loop.
if start > helpers.StartSlot(highestFinalizedEpoch()+1) {
return nil, errors.Errorf("attempted to ask for a start slot of %d which is greater than the next highest epoch of %d", start, highestFinalizedEpoch()+1)
if start > helpers.StartSlot(s.highestFinalizedEpoch()+1) {
return nil, errors.Errorf("attempted to ask for a start slot of %d which is greater than the next highest epoch of %d", start, s.highestFinalizedEpoch()+1)
}
atomic.AddInt32(&p2pRequestCount, int32(len(peers)))
@@ -189,7 +188,7 @@ func (s *InitialSync) roundRobinSync(genesis time.Time) error {
})
for _, blk := range blocks {
logSyncStatus(genesis, blk, peers, counter)
s.logSyncStatus(genesis, blk, peers, counter)
if !s.db.HasBlock(ctx, bytesutil.ToBytes32(blk.ParentRoot)) {
log.Debugf("Beacon node doesn't have a block in db with root %#x", blk.ParentRoot)
continue
@@ -225,14 +224,14 @@ func (s *InitialSync) roundRobinSync(genesis time.Time) error {
// mitigation. We are already convinced that we are on the correct finalized chain. Any blocks
// we receive there after must build on the finalized chain or be considered invalid during
// fork choice resolution / block processing.
best := bestPeer()
root, _, _ := bestFinalized()
best := s.bestPeer()
root, _, _ := s.bestFinalized()
// if no best peer exists, retry until a new best peer is found.
for len(best) == 0 {
time.Sleep(refreshTime)
best = bestPeer()
root, _, _ = bestFinalized()
best = s.bestPeer()
root, _, _ = s.bestFinalized()
}
for head := slotsSinceGenesis(genesis); s.chain.HeadSlot() < head; {
req := &p2ppb.BeaconBlocksByRangeRequest{
@@ -252,7 +251,7 @@ func (s *InitialSync) roundRobinSync(genesis time.Time) error {
}
for _, blk := range resp {
logSyncStatus(genesis, blk, []peer.ID{best}, counter)
s.logSyncStatus(genesis, blk, []peer.ID{best}, counter)
if err := s.chain.ReceiveBlockNoPubsubForkchoice(ctx, blk); err != nil {
return err
}
@@ -291,8 +290,8 @@ func (s *InitialSync) requestBlocks(ctx context.Context, req *p2ppb.BeaconBlocks
// highestFinalizedEpoch as reported by peers. This is the absolute highest finalized epoch as
// reported by peers.
func highestFinalizedEpoch() uint64 {
_, epoch, _ := bestFinalized()
func (s *InitialSync) highestFinalizedEpoch() uint64 {
_, epoch, _ := s.bestFinalized()
return epoch
}
@@ -301,17 +300,16 @@ func highestFinalizedEpoch() uint64 {
// which most peers can serve blocks. Ideally, all peers would be reporting the same finalized
// epoch.
// Returns the best finalized root, epoch number, and peers that agree.
func bestFinalized() ([]byte, uint64, []peer.ID) {
func (s *InitialSync) bestFinalized() ([]byte, uint64, []peer.ID) {
finalized := make(map[[32]byte]uint64)
rootToEpoch := make(map[[32]byte]uint64)
for _, k := range peerstatus.Keys() {
s := peerstatus.Get(k)
if s == nil {
continue
for _, pid := range s.p2p.Peers().Connected() {
peerChainState, err := s.p2p.Peers().ChainState(pid)
if err == nil && peerChainState != nil {
r := bytesutil.ToBytes32(peerChainState.FinalizedRoot)
finalized[r]++
rootToEpoch[r] = peerChainState.FinalizedEpoch
}
r := bytesutil.ToBytes32(s.FinalizedRoot)
finalized[r]++
rootToEpoch[r] = s.FinalizedEpoch
}
var mostVotedFinalizedRoot [32]byte
@@ -324,13 +322,10 @@ func bestFinalized() ([]byte, uint64, []peer.ID) {
}
var pids []peer.ID
for _, k := range peerstatus.Keys() {
s := peerstatus.Get(k)
if s == nil {
continue
}
if s.FinalizedEpoch >= rootToEpoch[mostVotedFinalizedRoot] {
pids = append(pids, k)
for _, pid := range s.p2p.Peers().Connected() {
peerChainState, err := s.p2p.Peers().ChainState(pid)
if err == nil && peerChainState != nil && peerChainState.FinalizedEpoch >= rootToEpoch[mostVotedFinalizedRoot] {
pids = append(pids, pid)
if len(pids) >= maxPeersToSync {
break
}
@@ -341,13 +336,13 @@ func bestFinalized() ([]byte, uint64, []peer.ID) {
}
// bestPeer returns the peer ID of the peer reporting the highest head slot.
func bestPeer() peer.ID {
func (s *InitialSync) bestPeer() peer.ID {
var best peer.ID
var bestSlot uint64
for _, k := range peerstatus.Keys() {
s := peerstatus.Get(k)
if s.HeadSlot >= bestSlot {
bestSlot = s.HeadSlot
for _, k := range s.p2p.Peers().Connected() {
peerChainState, err := s.p2p.Peers().ChainState(k)
if err == nil && peerChainState != nil && peerChainState.HeadSlot >= bestSlot {
bestSlot = peerChainState.HeadSlot
best = k
}
}
@@ -355,7 +350,7 @@ func bestPeer() peer.ID {
}
// logSyncStatus and increment block processing counter.
func logSyncStatus(genesis time.Time, blk *eth.BeaconBlock, peers []peer.ID, counter *ratecounter.RateCounter) {
func (s *InitialSync) logSyncStatus(genesis time.Time, blk *eth.BeaconBlock, syncingPeers []peer.ID, counter *ratecounter.RateCounter) {
counter.Incr(1)
rate := float64(counter.Rate()) / counterSeconds
if rate == 0 {
@@ -364,7 +359,7 @@ func logSyncStatus(genesis time.Time, blk *eth.BeaconBlock, peers []peer.ID, cou
timeRemaining := time.Duration(float64(slotsSinceGenesis(genesis)-blk.Slot)/rate) * time.Second
log.WithField(
"peers",
fmt.Sprintf("%d/%d", len(peers), len(peerstatus.Keys())),
fmt.Sprintf("%d/%d", len(syncingPeers), len(s.p2p.Peers().Connected())),
).WithField(
"blocksPerSecond",
fmt.Sprintf("%.1f", rate),

View File

@@ -14,9 +14,9 @@ import (
eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers"
p2pt "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/sync"
"github.com/prysmaticlabs/prysm/beacon-chain/sync/peerstatus"
p2ppb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/hashutil"
@@ -236,12 +236,11 @@ func TestRoundRobinSync(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
initializeRootCache(tt.expectedBlockSlots, t)
peerstatus.Clear()
p := p2pt.NewTestP2P(t)
beaconDB := dbtest.SetupDB(t)
connectPeers(t, p, tt.peers)
connectPeers(t, p, tt.peers, p.Peers())
genesisRoot := rootCache[0]
err := beaconDB.SaveBlock(context.Background(), &eth.BeaconBlock{
@@ -286,7 +285,7 @@ func TestRoundRobinSync(t *testing.T) {
// Connect peers with local host. This method sets up peer statuses and the appropriate handlers
// for each test peer.
func connectPeers(t *testing.T, host *p2pt.TestP2P, data []*peerData) {
func connectPeers(t *testing.T, host *p2pt.TestP2P, data []*peerData, peerStatus *peers.Status) {
const topic = "/eth2/beacon_chain/req/beacon_blocks_by_range/1/ssz"
for _, d := range data {
@@ -352,7 +351,9 @@ func connectPeers(t *testing.T, host *p2pt.TestP2P, data []*peerData) {
peer.Connect(host)
peerstatus.Set(peer.PeerID(), &p2ppb.Status{
peerStatus.Add(peer.PeerID(), nil, network.DirOutbound)
peerStatus.SetConnectionState(peer.PeerID(), peers.PeerConnected)
peerStatus.SetChainState(peer.PeerID(), &p2ppb.Status{
HeadForkVersion: params.BeaconConfig().GenesisForkVersion,
FinalizedRoot: []byte(fmt.Sprintf("finalized_root %d", datum.finalizedEpoch)),
FinalizedEpoch: datum.finalizedEpoch,
@@ -426,15 +427,20 @@ func TestMakeSequence(t *testing.T) {
}
func TestBestFinalized_returnsMaxValue(t *testing.T) {
defer peerstatus.Clear()
p := p2pt.NewTestP2P(t)
s := &InitialSync{
p2p: p,
}
for i := 0; i <= maxPeersToSync+100; i++ {
peerstatus.Set(peer.ID(i), &pb.Status{
s.p2p.Peers().Add(peer.ID(i), nil, network.DirOutbound)
s.p2p.Peers().SetConnectionState(peer.ID(i), peers.PeerConnected)
s.p2p.Peers().SetChainState(peer.ID(i), &pb.Status{
FinalizedEpoch: 10,
})
}
_, _, pids := bestFinalized()
_, _, pids := s.bestFinalized()
if len(pids) != maxPeersToSync {
t.Fatalf("returned wrong number of peers, wanted %d, got %d", maxPeersToSync, len(pids))
}

View File

@@ -12,7 +12,6 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/beacon-chain/sync/peerstatus"
"github.com/prysmaticlabs/prysm/shared"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/roughtime"
@@ -118,7 +117,7 @@ func (s *InitialSync) Start() {
// Every 5 sec, report handshake count.
for {
count := peerstatus.Count()
count := len(s.p2p.Peers().Connected())
if count >= minStatusCount {
break
}

View File

@@ -1,27 +0,0 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["peer_statuses.go"],
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/sync/peerstatus",
visibility = [
"//beacon-chain/p2p:__subpackages__",
"//beacon-chain/rpc:__subpackages__",
"//beacon-chain/sync:__subpackages__",
],
deps = [
"//proto/beacon/p2p/v1:go_default_library",
"//shared/roughtime:go_default_library",
"@com_github_libp2p_go_libp2p_core//peer:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["peer_statuses_test.go"],
embed = [":go_default_library"],
deps = [
"//proto/beacon/p2p/v1:go_default_library",
"@com_github_libp2p_go_libp2p_core//peer:go_default_library",
],
)

View File

@@ -1,119 +0,0 @@
// Package peerstatus is a threadsafe global cache to store recent peer status messages for access
// across multiple services.
package peerstatus
import (
"sync"
"time"
"github.com/libp2p/go-libp2p-core/peer"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/roughtime"
)
var lock sync.RWMutex
var peerStatuses = make(map[peer.ID]*peerStatus)
var failureCount = make(map[peer.ID]int)
var maxFailureThreshold = 3
type peerStatus struct {
status *pb.Status
lastUpdated time.Time
}
// Get most recent status from peer in cache. Threadsafe.
func Get(pid peer.ID) *pb.Status {
lock.RLock()
defer lock.RUnlock()
if pStatus, ok := peerStatuses[pid]; ok {
return pStatus.status
}
return nil
}
// Set most recent status from peer in cache. Threadsafe.
func Set(pid peer.ID, status *pb.Status) {
lock.Lock()
defer lock.Unlock()
if _, ok := peerStatuses[pid]; !ok {
peerStatuses[pid] = &peerStatus{}
}
peerStatuses[pid].status = status
peerStatuses[pid].lastUpdated = roughtime.Now()
}
// Delete peer status from cache. Threadsafe.
func Delete(pid peer.ID) {
lock.Lock()
defer lock.Unlock()
delete(peerStatuses, pid)
}
// Count of peer statuses in cache. Threadsafe.
func Count() int {
lock.RLock()
defer lock.RUnlock()
return len(peerStatuses)
}
// Keys is the list of peer IDs which status exists. Threadsafe.
func Keys() []peer.ID {
lock.RLock()
defer lock.RUnlock()
keys := make([]peer.ID, 0, len(peerStatuses))
for k := range peerStatuses {
keys = append(keys, k)
}
return keys
}
// LastUpdated time which the status was set for the given peer. Threadsafe.
func LastUpdated(pid peer.ID) time.Time {
lock.RLock()
defer lock.RUnlock()
if pStatus, ok := peerStatuses[pid]; ok {
return pStatus.lastUpdated
}
return time.Unix(0, 0)
}
// IncreaseFailureCount increases the failure count for the particular peer.
func IncreaseFailureCount(pid peer.ID) {
lock.Lock()
defer lock.Unlock()
count, ok := failureCount[pid]
if !ok {
failureCount[pid] = 1
return
}
failureCount[pid] = count + 1
}
// FailureCount returns the failure count for the particular peer.
func FailureCount(pid peer.ID) int {
lock.RLock()
defer lock.RUnlock()
count, ok := failureCount[pid]
if !ok {
return 0
}
return count
}
// IsBadPeer checks whether the given peer has
// exceeded the number of bad handshakes threshold.
func IsBadPeer(pid peer.ID) bool {
lock.RLock()
defer lock.RUnlock()
count, ok := failureCount[pid]
if !ok {
return false
}
return count > maxFailureThreshold
}
// Clear the cache. This method should only be used for tests.
func Clear() {
peerStatuses = make(map[peer.ID]*peerStatus)
failureCount = make(map[peer.ID]int)
}

View File

@@ -1,49 +0,0 @@
package peerstatus
import (
"testing"
"time"
"github.com/libp2p/go-libp2p-core/peer"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
)
func TestIncrementFailureCount(t *testing.T) {
testID := peer.ID("test")
IncreaseFailureCount(testID)
if FailureCount(testID) != 1 {
t.Errorf("Wanted failure count of %d but got %d", 1, FailureCount(testID))
}
}
func TestAboveFailureThreshold(t *testing.T) {
testID := peer.ID("test")
for i := 0; i <= maxFailureThreshold; i++ {
IncreaseFailureCount(testID)
}
if !IsBadPeer(testID) {
t.Errorf("Peer isnt considered as a bad peer despite crossing the failure threshold "+
"with a failure count of %d", FailureCount(testID))
}
}
func TestLastUpdated(t *testing.T) {
testID := peer.ID("test")
status := &pb.Status{
FinalizedEpoch: 1,
}
Set(testID, status)
firstUpdated := LastUpdated(testID)
time.Sleep(100 * time.Millisecond)
status = &pb.Status{
FinalizedEpoch: 2,
}
Set(testID, status)
secondUpdated := LastUpdated(testID)
if !secondUpdated.After(firstUpdated) {
t.Error("lastupdated did not increment on subsequent set")
}
}

View File

@@ -9,7 +9,6 @@ import (
"github.com/pkg/errors"
"github.com/prysmaticlabs/go-ssz"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/sync/peerstatus"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/traceutil"
@@ -40,7 +39,7 @@ func (r *RegularSync) processPendingBlocks(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "processPendingBlocks")
defer span.End()
pids := peerstatus.Keys()
pids := r.p2p.Peers().Connected()
if err := r.validatePendingSlots(); err != nil {
return errors.Wrap(err, "could not validate pending slots")
}

View File

@@ -11,8 +11,8 @@ import (
"github.com/prysmaticlabs/go-ssz"
mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing"
dbtest "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers"
p2ptest "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/sync/peerstatus"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/sirupsen/logrus"
)
@@ -29,8 +29,10 @@ func TestRegularSyncBeaconBlockSubscriber_ProcessPendingBlocks1(t *testing.T) {
db := dbtest.SetupDB(t)
defer dbtest.TeardownDB(t, db)
p1 := p2ptest.NewTestP2P(t)
r := &RegularSync{
db: db,
p2p: p1,
db: db,
chain: &mock.ChainService{
FinalizedCheckPoint: &ethpb.Checkpoint{
Epoch: 0,
@@ -127,7 +129,9 @@ func TestRegularSyncBeaconBlockSubscriber_ProcessPendingBlocks2(t *testing.T) {
}, slotToPendingBlocks: make(map[uint64]*ethpb.BeaconBlock),
seenPendingBlocks: make(map[[32]byte]bool),
}
peerstatus.Set(p2.PeerID(), &pb.Status{})
p1.Peers().Add(p2.PeerID(), nil, network.DirOutbound)
p1.Peers().SetConnectionState(p2.PeerID(), peers.PeerConnected)
p1.Peers().SetChainState(p2.PeerID(), &pb.Status{})
b0 := &ethpb.BeaconBlock{}
if err := r.db.SaveBlock(context.Background(), b0); err != nil {
@@ -220,7 +224,9 @@ func TestRegularSyncBeaconBlockSubscriber_PruneOldPendingBlocks(t *testing.T) {
}, slotToPendingBlocks: make(map[uint64]*ethpb.BeaconBlock),
seenPendingBlocks: make(map[[32]byte]bool),
}
peerstatus.Set(p2.PeerID(), &pb.Status{})
p1.Peers().Add(p1.PeerID(), nil, network.DirOutbound)
p1.Peers().SetConnectionState(p1.PeerID(), peers.PeerConnected)
p1.Peers().SetChainState(p1.PeerID(), &pb.Status{})
b0 := &ethpb.BeaconBlock{}
if err := r.db.SaveBlock(context.Background(), b0); err != nil {

View File

@@ -9,7 +9,6 @@ import (
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/beacon-chain/sync/peerstatus"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/roughtime"
@@ -24,9 +23,14 @@ func (r *RegularSync) maintainPeerStatuses() {
ctx := context.Background()
select {
case <-ticker.C:
for _, pid := range peerstatus.Keys() {
for _, pid := range r.p2p.Peers().Connected() {
// If the status hasn't been updated in the recent interval time.
if roughtime.Now().After(peerstatus.LastUpdated(pid).Add(statusInterval)) {
lastUpdated, err := r.p2p.Peers().ChainStateLastUpdated(pid)
if err != nil {
// Peer has vanished; nothing to do
continue
}
if roughtime.Now().After(lastUpdated.Add(statusInterval)) {
if err := r.sendRPCStatusRequest(ctx, pid); err != nil {
log.WithError(err).Error("Failed to request peer status")
}
@@ -62,7 +66,7 @@ func (r *RegularSync) sendRPCStatusRequest(ctx context.Context, id peer.ID) erro
}
if code != 0 {
peerstatus.IncreaseFailureCount(stream.Conn().RemotePeer())
r.p2p.Peers().IncrementBadResponses(stream.Conn().RemotePeer())
return errors.New(errMsg)
}
@@ -70,17 +74,16 @@ func (r *RegularSync) sendRPCStatusRequest(ctx context.Context, id peer.ID) erro
if err := r.p2p.Encoding().DecodeWithLength(stream, msg); err != nil {
return err
}
peerstatus.Set(stream.Conn().RemotePeer(), msg)
r.p2p.Peers().SetChainState(stream.Conn().RemotePeer(), msg)
err = r.validateStatusMessage(msg, stream)
if err != nil {
peerstatus.IncreaseFailureCount(stream.Conn().RemotePeer())
r.p2p.Peers().IncrementBadResponses(stream.Conn().RemotePeer())
}
return err
}
func (r *RegularSync) removeDisconnectedPeerStatus(ctx context.Context, pid peer.ID) error {
peerstatus.Delete(pid)
return nil
}
@@ -94,10 +97,9 @@ func (r *RegularSync) statusRPCHandler(ctx context.Context, msg interface{}, str
log := log.WithField("handler", "status")
m := msg.(*pb.Status)
peerstatus.Set(stream.Conn().RemotePeer(), m)
if err := r.validateStatusMessage(m, stream); err != nil {
peerstatus.IncreaseFailureCount(stream.Conn().RemotePeer())
log.WithField("peer", stream.Conn().RemotePeer()).Warn("Invalid fork version from peer")
r.p2p.Peers().IncrementBadResponses(stream.Conn().RemotePeer())
originalErr := err
resp, err := r.generateErrorResponse(responseCodeInvalidRequest, err.Error())
if err != nil {
@@ -116,6 +118,7 @@ func (r *RegularSync) statusRPCHandler(ctx context.Context, msg interface{}, str
}
return originalErr
}
r.p2p.Peers().SetChainState(stream.Conn().RemotePeer(), m)
resp := &pb.Status{
HeadForkVersion: r.chain.CurrentFork().CurrentVersion,

View File

@@ -13,8 +13,8 @@ import (
"github.com/prysmaticlabs/go-ssz"
mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/core/state"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers"
p2ptest "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/sync/peerstatus"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/testutil"
@@ -26,8 +26,6 @@ func init() {
}
func TestHelloRPCHandler_Disconnects_OnForkVersionMismatch(t *testing.T) {
peerstatus.Clear()
// TODO(3441): Fix ssz string length issue.
t.Skip("3441: SSZ is decoding a string with an unexpected length")
p1 := p2ptest.NewTestP2P(t)
@@ -77,8 +75,6 @@ func TestHelloRPCHandler_Disconnects_OnForkVersionMismatch(t *testing.T) {
}
func TestHelloRPCHandler_ReturnsHelloMessage(t *testing.T) {
peerstatus.Clear()
p1 := p2ptest.NewTestP2P(t)
p2 := p2ptest.NewTestP2P(t)
p1.Connect(p2)
@@ -157,8 +153,6 @@ func TestHelloRPCHandler_ReturnsHelloMessage(t *testing.T) {
}
func TestHandshakeHandlers_Roundtrip(t *testing.T) {
peerstatus.Clear()
// Scenario is that p1 and p2 connect, exchange handshakes.
// p2 disconnects and p1 should forget the handshake status.
p1 := p2ptest.NewTestP2P(t)
@@ -189,6 +183,7 @@ func TestHandshakeHandlers_Roundtrip(t *testing.T) {
if err := r.p2p.Encoding().DecodeWithLength(stream, out); err != nil {
t.Fatal(err)
}
log.WithField("status", out).Warn("received status")
resp := &pb.Status{HeadSlot: 100, HeadForkVersion: params.BeaconConfig().GenesisForkVersion}
@@ -199,9 +194,13 @@ func TestHandshakeHandlers_Roundtrip(t *testing.T) {
if err != nil {
t.Fatal(err)
}
log.WithField("status", out).Warn("sending status")
stream.Close()
})
numInactive1 := len(p1.Peers().Inactive())
numActive1 := len(p1.Peers().Active())
p1.Connect(p2)
if testutil.WaitTimeout(&wg, 1*time.Second) {
@@ -211,21 +210,32 @@ func TestHandshakeHandlers_Roundtrip(t *testing.T) {
// Wait for stream buffer to be read.
time.Sleep(200 * time.Millisecond)
if peerstatus.Count() != 1 {
t.Errorf("Expected 1 status in the cache, got %d", peerstatus.Count())
numInactive2 := len(p1.Peers().Inactive())
numActive2 := len(p1.Peers().Active())
if numInactive2 != numInactive1 {
t.Errorf("Number of inactive peers changed unexpectedly: was %d, now %d", numInactive1, numInactive2)
}
if numActive2 != numActive1+1 {
t.Errorf("Number of active peers unexpected: wanted %d, found %d", numActive1+1, numActive2)
}
if err := p2.Disconnect(p1.PeerID()); err != nil {
t.Fatal(err)
}
p1.Peers().SetConnectionState(p2.PeerID(), peers.PeerDisconnected)
// Wait for disconnect event to trigger.
time.Sleep(200 * time.Millisecond)
if peerstatus.Count() != 0 {
t.Errorf("Expected 0 status in the tracker, got %d", peerstatus.Count())
numInactive3 := len(p1.Peers().Inactive())
numActive3 := len(p1.Peers().Active())
if numInactive3 != numInactive2+1 {
t.Errorf("Number of inactive peers unexpected: wanted %d, found %d", numInactive2+1, numInactive3)
}
if numActive3 != numActive2-1 {
t.Errorf("Number of active peers unexpected: wanted %d, found %d", numActive2-1, numActive3)
}
}
func TestStatusRPCRequest_RequestSent(t *testing.T) {
@@ -338,6 +348,8 @@ func TestStatusRPCRequest_BadPeerHandshake(t *testing.T) {
ctx: context.Background(),
}
r.Start()
// Setup streams
pcl := protocol.ID("/eth2/beacon_chain/req/status/1/ssz")
var wg sync.WaitGroup
@@ -364,7 +376,6 @@ func TestStatusRPCRequest_BadPeerHandshake(t *testing.T) {
}
})
p1.AddConnectionHandler(r.sendRPCStatusRequest)
p1.Connect(p2)
if testutil.WaitTimeout(&wg, time.Second) {
@@ -372,11 +383,19 @@ func TestStatusRPCRequest_BadPeerHandshake(t *testing.T) {
}
time.Sleep(100 * time.Millisecond)
if len(p1.Host.Network().Peers()) != 0 {
connectionState, err := p1.Peers().ConnectionState(p2.PeerID())
if err != nil {
t.Fatal("Failed to obtain peer connection state")
}
if connectionState != peers.PeerDisconnected {
t.Error("Expected peer to be disconnected")
}
if peerstatus.FailureCount(p2.PeerID()) != 1 {
t.Errorf("Failure count was not bumped to one, instead it is %d", peerstatus.FailureCount(p2.PeerID()))
badResponses, err := p1.Peers().BadResponses(p2.PeerID())
if err != nil {
t.Fatal("Failed to obtain peer connection state")
}
if badResponses != 1 {
t.Errorf("Bad response was not bumped to one, instead it is %d", badResponses)
}
}

View File

@@ -4,7 +4,6 @@ import (
"context"
"sync"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
@@ -12,7 +11,6 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/db"
"github.com/prysmaticlabs/prysm/beacon-chain/operations"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared"
)
@@ -102,8 +100,3 @@ type Checker interface {
Syncing() bool
Status() error
}
// StatusTracker interface for accessing the status / handshake messages received so far.
type StatusTracker interface {
PeerStatuses() map[peer.ID]*pb.Status
}

View File

@@ -10,7 +10,6 @@ func ExternalIPv4() (string, error) {
if err != nil {
return "", err
}
for _, iface := range ifaces {
if iface.Flags&net.FlagUp == 0 {
continue // interface down