mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 13:28:01 -05:00
Add Goodbye Message When Disconnecting With Peers (#5589)
* add goodbye message * Merge branch 'master' into addGoodbye * fix interface * Merge refs/heads/master into addGoodbye * use wrapper func * Merge branch 'addGoodbye' of https://github.com/prysmaticlabs/geth-sharding into addGoodbye * Merge refs/heads/master into addGoodbye * Merge refs/heads/master into addGoodbye * Merge refs/heads/master into addGoodbye * Merge refs/heads/master into addGoodbye
This commit is contained in:
@@ -4,7 +4,6 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p-core/network"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
@@ -15,7 +14,8 @@ import (
|
||||
// AddConnectionHandler adds a callback function which handles the connection with a
|
||||
// newly added peer. It performs a handshake with that peer by sending a hello request
|
||||
// and validating the response from the peer.
|
||||
func (s *Service) AddConnectionHandler(reqFunc func(ctx context.Context, id peer.ID) error) {
|
||||
func (s *Service) AddConnectionHandler(reqFunc func(ctx context.Context, id peer.ID) error,
|
||||
goodbyeFunc func(ctx context.Context, id peer.ID) error) {
|
||||
s.host.Network().Notify(&network.NotifyBundle{
|
||||
ConnectedF: func(net network.Network, conn network.Conn) {
|
||||
log := log.WithField("peer", conn.RemotePeer().Pretty())
|
||||
@@ -28,10 +28,15 @@ func (s *Service) AddConnectionHandler(reqFunc func(ctx context.Context, id peer
|
||||
}
|
||||
s.peers.Add(nil /* ENR */, conn.RemotePeer(), conn.RemoteMultiaddr(), conn.Stat().Direction)
|
||||
if len(s.peers.Active()) >= int(s.cfg.MaxPeers) {
|
||||
log.WithField("reason", "at peer limit").Trace("Ignoring connection request")
|
||||
if err := s.Disconnect(conn.RemotePeer()); err != nil {
|
||||
log.WithError(err).Error("Unable to disconnect from peer")
|
||||
}
|
||||
go func() {
|
||||
log.WithField("reason", "at peer limit").Trace("Ignoring connection request")
|
||||
if err := goodbyeFunc(context.Background(), conn.RemotePeer()); err != nil {
|
||||
log.WithError(err).Error("Unable to send goodbye message to peer")
|
||||
}
|
||||
if err := s.Disconnect(conn.RemotePeer()); err != nil {
|
||||
log.WithError(err).Error("Unable to disconnect from peer")
|
||||
}
|
||||
}()
|
||||
return
|
||||
}
|
||||
if s.peers.IsBad(conn.RemotePeer()) {
|
||||
@@ -52,9 +57,7 @@ func (s *Service) AddConnectionHandler(reqFunc func(ctx context.Context, id peer
|
||||
"activePeers": len(s.peers.Active()),
|
||||
})
|
||||
s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerConnecting)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
if err := reqFunc(ctx, conn.RemotePeer()); err != nil && err != io.EOF {
|
||||
if err := reqFunc(context.Background(), conn.RemotePeer()); err != nil && err != io.EOF {
|
||||
log.WithError(err).Trace("Handshake failed")
|
||||
if err.Error() == "protocol not supported" {
|
||||
// This is only to ensure the smooth running of our testnets. This will not be
|
||||
|
||||
@@ -37,7 +37,7 @@ type SetStreamHandler interface {
|
||||
|
||||
// ConnectionHandler configures p2p to handle connections with a peer.
|
||||
type ConnectionHandler interface {
|
||||
AddConnectionHandler(f func(ctx context.Context, id peer.ID) error)
|
||||
AddConnectionHandler(f func(ctx context.Context, id peer.ID) error, g func(context.Context, peer.ID) error)
|
||||
AddDisconnectionHandler(f func(ctx context.Context, id peer.ID) error)
|
||||
}
|
||||
|
||||
|
||||
@@ -168,7 +168,8 @@ func (p *TestP2P) PeerID() peer.ID {
|
||||
}
|
||||
|
||||
// AddConnectionHandler handles the connection with a newly connected peer.
|
||||
func (p *TestP2P) AddConnectionHandler(f func(ctx context.Context, id peer.ID) error) {
|
||||
func (p *TestP2P) AddConnectionHandler(f func(ctx context.Context, id peer.ID) error,
|
||||
g func(context.Context, peer.ID) error) {
|
||||
p.Host.Network().Notify(&network.NotifyBundle{
|
||||
ConnectedF: func(net network.Network, conn network.Conn) {
|
||||
// Must be handled in a goroutine as this callback cannot be blocking.
|
||||
|
||||
@@ -6,6 +6,8 @@ import (
|
||||
"time"
|
||||
|
||||
libp2pcore "github.com/libp2p/go-libp2p-core"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -41,6 +43,24 @@ func (r *Service) goodbyeRPCHandler(ctx context.Context, msg interface{}, stream
|
||||
return r.p2p.Disconnect(stream.Conn().RemotePeer())
|
||||
}
|
||||
|
||||
func (r *Service) sendGoodByeMessage(ctx context.Context, code uint64, id peer.ID) error {
|
||||
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
stream, err := r.p2p.Send(ctx, &code, p2p.RPCGoodByeTopic, id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log := log.WithField("Reason", goodbyeMessage(code))
|
||||
log.WithField("peer", stream.Conn().RemotePeer()).Debug("Sending Goodbye message to peer")
|
||||
return nil
|
||||
}
|
||||
|
||||
// sends a goodbye message for a generic error
|
||||
func (r *Service) sendGenericGoodbyeMessage(ctx context.Context, id peer.ID) error {
|
||||
return r.sendGoodByeMessage(ctx, codeGenericError, id)
|
||||
}
|
||||
|
||||
func goodbyeMessage(num uint64) string {
|
||||
reason, ok := goodByes[num]
|
||||
if ok {
|
||||
|
||||
@@ -58,3 +58,52 @@ func TestGoodByeRPCHandler_Disconnects_With_Peer(t *testing.T) {
|
||||
t.Error("Peer is still not disconnected despite sending a goodbye message")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSendGoodbye_SendsMessage(t *testing.T) {
|
||||
p1 := p2ptest.NewTestP2P(t)
|
||||
p2 := p2ptest.NewTestP2P(t)
|
||||
p1.Connect(p2)
|
||||
if len(p1.Host.Network().Peers()) != 1 {
|
||||
t.Error("Expected peers to be connected")
|
||||
}
|
||||
|
||||
// Set up a head state in the database with data we expect.
|
||||
d := db.SetupDB(t)
|
||||
defer db.TeardownDB(t, d)
|
||||
|
||||
r := &Service{
|
||||
db: d,
|
||||
p2p: p1,
|
||||
}
|
||||
failureCode := codeClientShutdown
|
||||
|
||||
// Setup streams
|
||||
pcl := protocol.ID("/eth2/beacon_chain/req/goodbye/1/ssz")
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
p2.Host.SetStreamHandler(pcl, func(stream network.Stream) {
|
||||
defer wg.Done()
|
||||
out := new(uint64)
|
||||
if err := r.p2p.Encoding().DecodeWithLength(stream, out); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if *out != failureCode {
|
||||
t.Fatalf("Wanted goodbye code of %d but got %d", failureCode, *out)
|
||||
}
|
||||
|
||||
})
|
||||
|
||||
err := r.sendGoodByeMessage(context.Background(), failureCode, p2.Host.ID())
|
||||
if err != nil {
|
||||
t.Errorf("Unxpected error: %v", err)
|
||||
}
|
||||
|
||||
if testutil.WaitTimeout(&wg, 1*time.Second) {
|
||||
t.Fatal("Did not receive stream within 1 sec")
|
||||
}
|
||||
|
||||
conns := p1.Host.Network().ConnsToPeer(p1.Host.ID())
|
||||
if len(conns) > 0 {
|
||||
t.Error("Peer is still not disconnected despite sending a goodbye message")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,7 +18,7 @@ func (r *Service) pingHandler(ctx context.Context, msg interface{}, stream libp2
|
||||
log.WithError(err).Error("Failed to close stream")
|
||||
}
|
||||
}()
|
||||
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
_, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
defer cancel()
|
||||
setRPCStreamDeadlines(stream)
|
||||
|
||||
|
||||
@@ -384,7 +384,7 @@ func TestStatusRPCRequest_RequestSent(t *testing.T) {
|
||||
}
|
||||
})
|
||||
|
||||
p1.AddConnectionHandler(r.sendRPCStatusRequest)
|
||||
p1.AddConnectionHandler(r.sendRPCStatusRequest, r.sendGenericGoodbyeMessage)
|
||||
p1.Connect(p2)
|
||||
|
||||
if testutil.WaitTimeout(&wg, 1*time.Second) {
|
||||
|
||||
@@ -135,7 +135,7 @@ func (r *Service) Start() {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
r.p2p.AddConnectionHandler(r.reValidatePeer)
|
||||
r.p2p.AddConnectionHandler(r.reValidatePeer, r.sendGenericGoodbyeMessage)
|
||||
r.p2p.AddDisconnectionHandler(r.removeDisconnectedPeerStatus)
|
||||
r.p2p.AddPingMethod(r.sendPingRequest)
|
||||
r.processPendingBlocksQueue()
|
||||
|
||||
Reference in New Issue
Block a user