mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-10 13:58:09 -05:00
@@ -52,6 +52,7 @@ go_test(
|
||||
"feed_example_test.go",
|
||||
"feed_test.go",
|
||||
"message_test.go",
|
||||
"monitoring_test.go",
|
||||
"options_test.go",
|
||||
"register_topic_example_test.go",
|
||||
"service_test.go",
|
||||
|
||||
@@ -19,7 +19,9 @@ func init() {
|
||||
prometheus.MustRegister(peerCountMetric)
|
||||
}
|
||||
|
||||
func startPeerWatcher(ctx context.Context, h host.Host) {
|
||||
// starPeerWatcher updates the peer count metric and calls to reconnect any VIP
|
||||
// peers such as the bootnode peer or relay node peer.
|
||||
func startPeerWatcher(ctx context.Context, h host.Host, reconnectPeers ...string) {
|
||||
|
||||
go (func() {
|
||||
for {
|
||||
@@ -28,9 +30,10 @@ func startPeerWatcher(ctx context.Context, h host.Host) {
|
||||
return
|
||||
default:
|
||||
peerCountMetric.Set(float64(peerCount(h)))
|
||||
ensurePeerConnections(ctx, h, reconnectPeers...)
|
||||
|
||||
// Wait 1 second to update again
|
||||
time.Sleep(1 * time.Second)
|
||||
// Wait 5 second to update again
|
||||
time.Sleep(5 * time.Second)
|
||||
}
|
||||
}
|
||||
})()
|
||||
@@ -39,3 +42,31 @@ func startPeerWatcher(ctx context.Context, h host.Host) {
|
||||
func peerCount(h host.Host) int {
|
||||
return len(h.Network().Peers())
|
||||
}
|
||||
|
||||
// ensurePeerConnections will attempt to reestablish connection to the peers
|
||||
// if there are currently no connections to that peer.
|
||||
func ensurePeerConnections(ctx context.Context, h host.Host, peers ...string) {
|
||||
if len(peers) == 0 {
|
||||
return
|
||||
}
|
||||
for _, p := range peers {
|
||||
if p == "" {
|
||||
continue
|
||||
}
|
||||
peer, err := MakePeer(p)
|
||||
if err != nil {
|
||||
log.Errorf("Could not make peer: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
c := h.Network().ConnsToPeer(peer.ID)
|
||||
if len(c) == 0 {
|
||||
log.WithField("peer", peer.ID).Debug("No connections to peer, reconnecting")
|
||||
ctx, _ := context.WithTimeout(ctx, 30*time.Second)
|
||||
if err := h.Connect(ctx, *peer); err != nil {
|
||||
log.Errorf("Failed to reconnect to peer %v", err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
31
shared/p2p/monitoring_test.go
Normal file
31
shared/p2p/monitoring_test.go
Normal file
@@ -0,0 +1,31 @@
|
||||
package p2p
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
bhost "github.com/libp2p/go-libp2p-blankhost"
|
||||
pstore "github.com/libp2p/go-libp2p-peerstore"
|
||||
swarmt "github.com/libp2p/go-libp2p-swarm/testing"
|
||||
)
|
||||
|
||||
func TestEnsurePeerConnections_reconnectsToPeer(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
h := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
|
||||
vipPeer := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
|
||||
|
||||
vipMAddrs, err := pstore.InfoToP2pAddrs(&pstore.PeerInfo{ID: vipPeer.ID(), Addrs: vipPeer.Addrs()})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if len(h.Peerstore().Peers()) != 1 {
|
||||
t.Fatal("expected 1 peer")
|
||||
}
|
||||
|
||||
ensurePeerConnections(ctx, h, vipMAddrs[0].String())
|
||||
|
||||
if len(h.Peerstore().Peers()) != 2 {
|
||||
t.Fatal("expected 2 peers")
|
||||
}
|
||||
}
|
||||
@@ -146,7 +146,7 @@ func (s *Server) Start() {
|
||||
return
|
||||
}
|
||||
|
||||
startPeerWatcher(ctx, s.host)
|
||||
startPeerWatcher(ctx, s.host, s.bootstrapNode, s.relayNodeAddr)
|
||||
}
|
||||
|
||||
// Stop the main p2p loop.
|
||||
|
||||
Reference in New Issue
Block a user