Add Strict Connection Manager (#4110)

* add forked connMgr
* gaz
* add license header
* Merge branch 'master' of https://github.com/prysmaticlabs/geth-sharding into connMgr
* add conn manager test
* gaz
* fix connManager
* Merge branch 'master' of https://github.com/prysmaticlabs/geth-sharding into connMgr
* gaz
* remove todo
* add new dep
* lint
* lint
* lint
* space
* visibility
* Merge branch 'master' into connMgr
* Merge branch 'master' into connMgr
* Merge refs/heads/master into connMgr
This commit is contained in:
Nishant Das
2019-12-04 02:18:57 +08:00
committed by prylabs-bulldozer[bot]
parent a2d4701f6e
commit 28c4f28d32
9 changed files with 1253 additions and 4 deletions

View File

@@ -1381,3 +1381,10 @@ go_repository(
sum = "h1:HgqpYBng0n7tLJIlyT4kPCIv5XgCsF+kai1NnnrJzEU=",
version = "v2.20.1+incompatible",
)
go_repository(
name = "com_github_ipfs_go_detect_race",
importpath = "github.com/ipfs/go-detect-race",
sum = "h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ6cOk=",
version = "v0.0.1",
)

View File

@@ -28,6 +28,7 @@ go_library(
"//tools:__subpackages__",
],
deps = [
"//beacon-chain/p2p/connmgr:go_default_library",
"//beacon-chain/p2p/encoder:go_default_library",
"//beacon-chain/p2p/peers:go_default_library",
"//beacon-chain/sync/peerstatus:go_default_library",
@@ -48,7 +49,6 @@ go_library(
"@com_github_libp2p_go_libp2p//:go_default_library",
"@com_github_libp2p_go_libp2p//config:go_default_library",
"@com_github_libp2p_go_libp2p//p2p/host/routed:go_default_library",
"@com_github_libp2p_go_libp2p_connmgr//:go_default_library",
"@com_github_libp2p_go_libp2p_core//:go_default_library",
"@com_github_libp2p_go_libp2p_core//crypto:go_default_library",
"@com_github_libp2p_go_libp2p_core//host:go_default_library",

View File

@@ -0,0 +1,28 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["connmgr.go"],
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/p2p/connmgr",
visibility = ["//beacon-chain:__subpackages__"],
deps = [
"@com_github_ipfs_go_log//:go_default_library",
"@com_github_libp2p_go_libp2p_core//connmgr:go_default_library",
"@com_github_libp2p_go_libp2p_core//network:go_default_library",
"@com_github_libp2p_go_libp2p_core//peer:go_default_library",
"@com_github_multiformats_go_multiaddr//:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["connmgr_test.go"],
embed = [":go_default_library"],
deps = [
"@com_github_ipfs_go_detect_race//:go_default_library",
"@com_github_libp2p_go_libp2p_core//network:go_default_library",
"@com_github_libp2p_go_libp2p_core//peer:go_default_library",
"@com_github_libp2p_go_libp2p_core//test:go_default_library",
"@com_github_multiformats_go_multiaddr//:go_default_library",
],
)

View File

@@ -0,0 +1,521 @@
/*
Copyright 2019. Protocol Labs, Inc.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
// Package connmgr : This file is forked from github.com/libp2p/go-libp2p-core/connmgr/connmgr.go
package connmgr
import (
"context"
"sort"
"sync"
"sync/atomic"
"time"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/connmgr"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
)
// SilencePeriod refers to the period in which a connection is given leeway by the connection
// manager before it is handled normally.
var SilencePeriod = 5 * time.Second
// TickerPeriod represents the frequency in which we check the number of
// open connections.
var TickerPeriod = 5 * time.Second
var log = logging.Logger("connmgr")
// BasicConnMgr is a ConnManager that trims connections whenever the count exceeds the
// high watermark. New connections are given a grace period before they're subject
// to trimming. Trims are automatically run on demand, only if the time from the
// previous trim is higher than 10 seconds. Furthermore, trims can be explicitly
// requested through the public interface of this struct (see TrimOpenConns).
//
// See configuration parameters in NewConnManager.
type BasicConnMgr struct {
highWater int
lowWater int
connCount int32
gracePeriod time.Duration
segments segments
plk sync.RWMutex
protected map[peer.ID]map[string]struct{}
// channel-based semaphore that enforces only a single trim is in progress
trimRunningCh chan struct{}
lastTrim time.Time
silencePeriod time.Duration
ctx context.Context
cancel func()
}
var _ connmgr.ConnManager = (*BasicConnMgr)(nil)
type segment struct {
sync.Mutex
peers map[peer.ID]*peerInfo
}
type segments [256]*segment
func (ss *segments) get(p peer.ID) *segment {
return ss[byte(p[len(p)-1])]
}
func (ss *segments) countPeers() (count int) {
for _, seg := range ss {
seg.Lock()
count += len(seg.peers)
seg.Unlock()
}
return count
}
func (s *segment) tagInfoFor(p peer.ID) *peerInfo {
pi, ok := s.peers[p]
if ok {
return pi
}
// create a temporary peer to buffer early tags before the Connected notification arrives.
pi = &peerInfo{
id: p,
firstSeen: time.Now(), // this timestamp will be updated when the first Connected notification arrives.
temp: true,
tags: make(map[string]int),
conns: make(map[network.Conn]time.Time),
}
s.peers[p] = pi
return pi
}
// NewConnManager creates a new BasicConnMgr with the provided params:
// * lo and hi are watermarks governing the number of connections that'll be maintained.
// When the peer count exceeds the 'high watermark', as many peers will be pruned (and
// their connections terminated) until 'low watermark' peers remain.
// * grace is the amount of time a newly opened connection is given before it becomes
// subject to pruning.
func NewConnManager(low, hi int, grace time.Duration) *BasicConnMgr {
ctx, cancel := context.WithCancel(context.Background())
cm := &BasicConnMgr{
highWater: hi,
lowWater: low,
gracePeriod: grace,
trimRunningCh: make(chan struct{}, 1),
protected: make(map[peer.ID]map[string]struct{}, 16),
silencePeriod: SilencePeriod,
ctx: ctx,
cancel: cancel,
segments: func() (ret segments) {
for i := range ret {
ret[i] = &segment{
peers: make(map[peer.ID]*peerInfo),
}
}
return ret
}(),
}
go cm.background()
return cm
}
// Close shutsdown the connection manager.
func (cm *BasicConnMgr) Close() error {
cm.cancel()
return nil
}
// Protect is used to protect a peer from being pruned by the
// connection manager.
func (cm *BasicConnMgr) Protect(id peer.ID, tag string) {
cm.plk.Lock()
defer cm.plk.Unlock()
tags, ok := cm.protected[id]
if !ok {
tags = make(map[string]struct{}, 2)
cm.protected[id] = tags
}
tags[tag] = struct{}{}
}
// Unprotect is used to remove the protection a previously protected peer
// so that it can be normally pruned by the connection manager.
func (cm *BasicConnMgr) Unprotect(id peer.ID, tag string) (protected bool) {
cm.plk.Lock()
defer cm.plk.Unlock()
tags, ok := cm.protected[id]
if !ok {
return false
}
if delete(tags, tag); len(tags) == 0 {
delete(cm.protected, id)
return false
}
return true
}
// peerInfo stores metadata for a given peer.
type peerInfo struct {
id peer.ID
tags map[string]int // value for each tag
value int // cached sum of all tag values
temp bool // this is a temporary entry holding early tags, and awaiting connections
conns map[network.Conn]time.Time // start time of each connection
firstSeen time.Time // timestamp when we began tracking this peer.
}
// TrimOpenConns closes the connections of as many peers as needed to make the peer count
// equal the low watermark. Peers are sorted in ascending order based on their total value,
// pruning those peers with the lowest scores first, as long as they are not within their
// grace period.
//
// (a) there's another trim in progress, or (b) the silence period is in effect.
func (cm *BasicConnMgr) TrimOpenConns(ctx context.Context) {
select {
case cm.trimRunningCh <- struct{}{}:
default:
return
}
defer func() { <-cm.trimRunningCh }()
if time.Since(cm.lastTrim) < cm.silencePeriod {
// skip this attempt to trim as the last one just took place.
return
}
defer log.EventBegin(ctx, "connCleanup").Done()
for _, c := range cm.getConnsToClose(ctx) {
log.Info("closing conn: ", c.RemotePeer())
log.Event(ctx, "closeConn", c.RemotePeer())
c.Close()
}
cm.lastTrim = time.Now()
}
func (cm *BasicConnMgr) background() {
ticker := time.NewTicker(TickerPeriod)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if atomic.LoadInt32(&cm.connCount) > int32(cm.highWater) {
cm.TrimOpenConns(cm.ctx)
}
case <-cm.ctx.Done():
return
}
}
}
// getConnsToClose runs the heuristics described in TrimOpenConns and returns the
// connections to close.
func (cm *BasicConnMgr) getConnsToClose(ctx context.Context) []network.Conn {
if cm.lowWater == 0 || cm.highWater == 0 {
// disabled
return nil
}
nconns := int(atomic.LoadInt32(&cm.connCount))
if nconns <= cm.lowWater {
log.Info("open connection count below limit")
return nil
}
npeers := cm.segments.countPeers()
candidates := make([]*peerInfo, 0, npeers)
ncandidates := 0
gracePeriodStart := time.Now().Add(-cm.gracePeriod)
cm.plk.RLock()
for _, s := range cm.segments {
s.Lock()
for id, inf := range s.peers {
if _, ok := cm.protected[id]; ok {
// skip over protected peer.
continue
}
if inf.firstSeen.After(gracePeriodStart) {
// skip peers in the grace period.
continue
}
candidates = append(candidates, inf)
ncandidates += len(inf.conns)
}
s.Unlock()
}
cm.plk.RUnlock()
if ncandidates < cm.lowWater {
log.Info("open connection count above limit but too many are in the grace period")
// We have too many connections but fewer than lowWater
// connections out of the grace period.
//
// If we trimmed now, we'd kill potentially useful connections.
return nil
}
// Sort peers according to their value.
sort.Slice(candidates, func(i, j int) bool {
left, right := candidates[i], candidates[j]
// temporary peers are preferred for pruning.
if left.temp != right.temp {
return left.temp
}
// otherwise, compare by value.
return left.value < right.value
})
target := ncandidates - cm.lowWater
// slightly overallocate because we may have more than one conns per peer
selected := make([]network.Conn, 0, target+10)
for _, inf := range candidates {
if target <= 0 {
break
}
// lock this to protect from concurrent modifications from connect/disconnect events
s := cm.segments.get(inf.id)
s.Lock()
if len(inf.conns) == 0 && inf.temp {
// handle temporary entries for early tags -- this entry has gone past the grace period
// and still holds no connections, so prune it.
delete(s.peers, inf.id)
} else {
for c := range inf.conns {
selected = append(selected, c)
}
}
target -= len(inf.conns)
s.Unlock()
}
return selected
}
// GetTagInfo is called to fetch the tag information associated with a given
// peer, nil is returned if p refers to an unknown peer.
func (cm *BasicConnMgr) GetTagInfo(p peer.ID) *connmgr.TagInfo {
s := cm.segments.get(p)
s.Lock()
defer s.Unlock()
pi, ok := s.peers[p]
if !ok {
return nil
}
out := &connmgr.TagInfo{
FirstSeen: pi.firstSeen,
Value: pi.value,
Tags: make(map[string]int),
Conns: make(map[string]time.Time),
}
for t, v := range pi.tags {
out.Tags[t] = v
}
for c, t := range pi.conns {
out.Conns[c.RemoteMultiaddr().String()] = t
}
return out
}
// TagPeer is called to associate a string and integer with a given peer.
func (cm *BasicConnMgr) TagPeer(p peer.ID, tag string, val int) {
s := cm.segments.get(p)
s.Lock()
defer s.Unlock()
pi := s.tagInfoFor(p)
// Update the total value of the peer.
pi.value += val - pi.tags[tag]
pi.tags[tag] = val
}
// UntagPeer is called to disassociate a string and integer from a given peer.
func (cm *BasicConnMgr) UntagPeer(p peer.ID, tag string) {
s := cm.segments.get(p)
s.Lock()
defer s.Unlock()
pi, ok := s.peers[p]
if !ok {
log.Info("tried to remove tag from untracked peer: ", p)
return
}
// Update the total value of the peer.
pi.value -= pi.tags[tag]
delete(pi.tags, tag)
}
// UpsertTag is called to insert/update a peer tag
func (cm *BasicConnMgr) UpsertTag(p peer.ID, tag string, upsert func(int) int) {
s := cm.segments.get(p)
s.Lock()
defer s.Unlock()
pi := s.tagInfoFor(p)
oldval := pi.tags[tag]
newval := upsert(oldval)
pi.value += newval - oldval
pi.tags[tag] = newval
}
// CMInfo holds the configuration for BasicConnMgr, as well as status data.
type CMInfo struct {
// The low watermark, as described in NewConnManager.
LowWater int
// The high watermark, as described in NewConnManager.
HighWater int
// The timestamp when the last trim was triggered.
LastTrim time.Time
// The configured grace period, as described in NewConnManager.
GracePeriod time.Duration
// The current connection count.
ConnCount int
}
// GetInfo returns the configuration and status data for this connection manager.
func (cm *BasicConnMgr) GetInfo() CMInfo {
return CMInfo{
HighWater: cm.highWater,
LowWater: cm.lowWater,
LastTrim: cm.lastTrim,
GracePeriod: cm.gracePeriod,
ConnCount: int(atomic.LoadInt32(&cm.connCount)),
}
}
// Notifee returns a sink through which Notifiers can inform the BasicConnMgr when
// events occur. Currently, the notifee only reacts upon connection events
// {Connected, Disconnected}.
func (cm *BasicConnMgr) Notifee() network.Notifiee {
return (*cmNotifee)(cm)
}
type cmNotifee BasicConnMgr
func (nn *cmNotifee) cm() *BasicConnMgr {
return (*BasicConnMgr)(nn)
}
// Connected is called by notifiers to inform that a new connection has been established.
// The notifee updates the BasicConnMgr to start tracking the connection. If the new connection
// count exceeds the high watermark, a trim may be triggered.
func (nn *cmNotifee) Connected(n network.Network, c network.Conn) {
cm := nn.cm()
p := c.RemotePeer()
s := cm.segments.get(p)
s.Lock()
defer s.Unlock()
id := c.RemotePeer()
pinfo, ok := s.peers[id]
if !ok {
pinfo = &peerInfo{
id: id,
firstSeen: time.Now(),
tags: make(map[string]int),
conns: make(map[network.Conn]time.Time),
}
s.peers[id] = pinfo
} else if pinfo.temp {
// we had created a temporary entry for this peer to buffer early tags before the
// Connected notification arrived: flip the temporary flag, and update the firstSeen
// timestamp to the real one.
pinfo.temp = false
pinfo.firstSeen = time.Now()
}
_, ok = pinfo.conns[c]
if ok {
log.Error("received connected notification for conn we are already tracking: ", p)
return
}
pinfo.conns[c] = time.Now()
atomic.AddInt32(&cm.connCount, 1)
}
// Disconnected is called by notifiers to inform that an existing connection has been closed or terminated.
// The notifee updates the BasicConnMgr accordingly to stop tracking the connection, and performs housekeeping.
func (nn *cmNotifee) Disconnected(n network.Network, c network.Conn) {
cm := nn.cm()
p := c.RemotePeer()
s := cm.segments.get(p)
s.Lock()
defer s.Unlock()
cinf, ok := s.peers[p]
if !ok {
log.Error("received disconnected notification for peer we are not tracking: ", p)
return
}
_, ok = cinf.conns[c]
if !ok {
log.Error("received disconnected notification for conn we are not tracking: ", p)
return
}
delete(cinf.conns, c)
if len(cinf.conns) == 0 {
delete(s.peers, p)
}
atomic.AddInt32(&cm.connCount, -1)
}
// Listen is no-op in this implementation.
func (nn *cmNotifee) Listen(n network.Network, addr ma.Multiaddr) {}
// ListenClose is no-op in this implementation.
func (nn *cmNotifee) ListenClose(n network.Network, addr ma.Multiaddr) {}
// OpenedStream is no-op in this implementation.
func (nn *cmNotifee) OpenedStream(network.Network, network.Stream) {}
// ClosedStream is no-op in this implementation.
func (nn *cmNotifee) ClosedStream(network.Network, network.Stream) {}

View File

@@ -0,0 +1,681 @@
package connmgr
import (
"context"
"testing"
"time"
detectrace "github.com/ipfs/go-detect-race"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
tu "github.com/libp2p/go-libp2p-core/test"
ma "github.com/multiformats/go-multiaddr"
)
type tconn struct {
network.Conn
peer peer.ID
closed bool
disconnectNotify func(net network.Network, conn network.Conn)
}
func (c *tconn) Close() error {
c.closed = true
if c.disconnectNotify != nil {
c.disconnectNotify(nil, c)
}
return nil
}
func (c *tconn) RemotePeer() peer.ID {
return c.peer
}
func (c *tconn) RemoteMultiaddr() ma.Multiaddr {
addr, err := ma.NewMultiaddr("/ip4/127.0.0.1/udp/1234")
if err != nil {
panic("cannot create multiaddr")
}
return addr
}
func randConn(t testing.TB, discNotify func(network.Network, network.Conn)) network.Conn {
pid := tu.RandPeerIDFatal(t)
return &tconn{peer: pid, disconnectNotify: discNotify}
}
func TestConnTrimming(t *testing.T) {
cm := NewConnManager(200, 300, 0)
not := cm.Notifee()
var conns []network.Conn
for i := 0; i < 300; i++ {
rc := randConn(t, nil)
conns = append(conns, rc)
not.Connected(nil, rc)
}
for _, c := range conns {
if c.(*tconn).closed {
t.Fatal("nothing should be closed yet")
}
}
for i := 0; i < 100; i++ {
cm.TagPeer(conns[i].RemotePeer(), "foo", 10)
}
cm.TagPeer(conns[299].RemotePeer(), "badfoo", -5)
cm.TrimOpenConns(context.Background())
for i := 0; i < 100; i++ {
c := conns[i]
if c.(*tconn).closed {
t.Fatal("these shouldnt be closed")
}
}
if !conns[299].(*tconn).closed {
t.Fatal("conn with bad tag should have gotten closed")
}
}
func TestConnsToClose(t *testing.T) {
cm := NewConnManager(0, 10, 0)
conns := cm.getConnsToClose(context.Background())
if conns != nil {
t.Fatal("expected no connections")
}
cm = NewConnManager(10, 0, 0)
conns = cm.getConnsToClose(context.Background())
if conns != nil {
t.Fatal("expected no connections")
}
cm = NewConnManager(1, 1, 0)
conns = cm.getConnsToClose(context.Background())
if conns != nil {
t.Fatal("expected no connections")
}
cm = NewConnManager(1, 1, time.Duration(10*time.Minute))
not := cm.Notifee()
for i := 0; i < 5; i++ {
conn := randConn(t, nil)
not.Connected(nil, conn)
}
conns = cm.getConnsToClose(context.Background())
if len(conns) != 0 {
t.Fatal("expected no connections")
}
}
func TestGetTagInfo(t *testing.T) {
start := time.Now()
cm := NewConnManager(1, 1, time.Duration(10*time.Minute))
not := cm.Notifee()
conn := randConn(t, nil)
not.Connected(nil, conn)
end := time.Now()
other := tu.RandPeerIDFatal(t)
tag := cm.GetTagInfo(other)
if tag != nil {
t.Fatal("expected no tag")
}
tag = cm.GetTagInfo(conn.RemotePeer())
if tag == nil {
t.Fatal("expected tag")
}
if tag.FirstSeen.Before(start) || tag.FirstSeen.After(end) {
t.Fatal("expected first seen time")
}
if tag.Value != 0 {
t.Fatal("expected zero value")
}
if len(tag.Tags) != 0 {
t.Fatal("expected no tags")
}
if len(tag.Conns) != 1 {
t.Fatal("expected one connection")
}
for s, tm := range tag.Conns {
if s != conn.RemoteMultiaddr().String() {
t.Fatal("unexpected multiaddr")
}
if tm.Before(start) || tm.After(end) {
t.Fatal("unexpected connection time")
}
}
cm.TagPeer(conn.RemotePeer(), "tag", 5)
tag = cm.GetTagInfo(conn.RemotePeer())
if tag == nil {
t.Fatal("expected tag")
}
if tag.FirstSeen.Before(start) || tag.FirstSeen.After(end) {
t.Fatal("expected first seen time")
}
if tag.Value != 5 {
t.Fatal("expected five value")
}
if len(tag.Tags) != 1 {
t.Fatal("expected no tags")
}
for tString, v := range tag.Tags {
if tString != "tag" || v != 5 {
t.Fatal("expected tag value")
}
}
if len(tag.Conns) != 1 {
t.Fatal("expected one connection")
}
for s, tm := range tag.Conns {
if s != conn.RemoteMultiaddr().String() {
t.Fatal("unexpected multiaddr")
}
if tm.Before(start) || tm.After(end) {
t.Fatal("unexpected connection time")
}
}
}
func TestTagPeerNonExistant(t *testing.T) {
cm := NewConnManager(1, 1, time.Duration(10*time.Minute))
id := tu.RandPeerIDFatal(t)
cm.TagPeer(id, "test", 1)
if !cm.segments.get(id).peers[id].temp {
t.Fatal("expected 1 temporary entry")
}
}
func TestUntagPeer(t *testing.T) {
cm := NewConnManager(1, 1, time.Duration(10*time.Minute))
not := cm.Notifee()
conn := randConn(t, nil)
not.Connected(nil, conn)
rp := conn.RemotePeer()
cm.TagPeer(rp, "tag", 5)
cm.TagPeer(rp, "tag two", 5)
id := tu.RandPeerIDFatal(t)
cm.UntagPeer(id, "test")
if len(cm.segments.get(rp).peers[rp].tags) != 2 {
t.Fatal("expected tags to be uneffected")
}
cm.UntagPeer(conn.RemotePeer(), "test")
if len(cm.segments.get(rp).peers[rp].tags) != 2 {
t.Fatal("expected tags to be uneffected")
}
cm.UntagPeer(conn.RemotePeer(), "tag")
if len(cm.segments.get(rp).peers[rp].tags) != 1 {
t.Fatal("expected tag to be removed")
}
if cm.segments.get(rp).peers[rp].value != 5 {
t.Fatal("expected aggreagte tag value to be 5")
}
}
func TestGetInfo(t *testing.T) {
start := time.Now()
gp := time.Duration(10 * time.Minute)
cm := NewConnManager(1, 5, gp)
not := cm.Notifee()
conn := randConn(t, nil)
not.Connected(nil, conn)
cm.TrimOpenConns(context.Background())
end := time.Now()
info := cm.GetInfo()
if info.HighWater != 5 {
t.Fatal("expected highwater to be 5")
}
if info.LowWater != 1 {
t.Fatal("expected highwater to be 1")
}
if info.LastTrim.Before(start) || info.LastTrim.After(end) {
t.Fatal("unexpected last trim time")
}
if info.GracePeriod != gp {
t.Fatal("unexpected grace period")
}
if info.ConnCount != 1 {
t.Fatal("unexpected number of connections")
}
}
func TestDoubleConnection(t *testing.T) {
gp := time.Duration(10 * time.Minute)
cm := NewConnManager(1, 5, gp)
not := cm.Notifee()
conn := randConn(t, nil)
not.Connected(nil, conn)
cm.TagPeer(conn.RemotePeer(), "foo", 10)
not.Connected(nil, conn)
if cm.connCount != 1 {
t.Fatal("unexpected number of connections")
}
if cm.segments.get(conn.RemotePeer()).peers[conn.RemotePeer()].value != 10 {
t.Fatal("unexpected peer value")
}
}
func TestDisconnected(t *testing.T) {
gp := time.Duration(10 * time.Minute)
cm := NewConnManager(1, 5, gp)
not := cm.Notifee()
conn := randConn(t, nil)
not.Connected(nil, conn)
cm.TagPeer(conn.RemotePeer(), "foo", 10)
not.Disconnected(nil, randConn(t, nil))
if cm.connCount != 1 {
t.Fatal("unexpected number of connections")
}
if cm.segments.get(conn.RemotePeer()).peers[conn.RemotePeer()].value != 10 {
t.Fatal("unexpected peer value")
}
not.Disconnected(nil, &tconn{peer: conn.RemotePeer()})
if cm.connCount != 1 {
t.Fatal("unexpected number of connections")
}
if cm.segments.get(conn.RemotePeer()).peers[conn.RemotePeer()].value != 10 {
t.Fatal("unexpected peer value")
}
not.Disconnected(nil, conn)
if cm.connCount != 0 {
t.Fatal("unexpected number of connections")
}
if cm.segments.countPeers() != 0 {
t.Fatal("unexpected number of peers")
}
}
func TestGracePeriod(t *testing.T) {
if detectrace.WithRace() {
t.Skip("race detector is unhappy with this test")
}
SilencePeriod = 0
cm := NewConnManager(10, 20, 100*time.Millisecond)
SilencePeriod = 10 * time.Second
not := cm.Notifee()
var conns []network.Conn
// Add a connection and wait the grace period.
{
rc := randConn(t, not.Disconnected)
conns = append(conns, rc)
not.Connected(nil, rc)
time.Sleep(200 * time.Millisecond)
if rc.(*tconn).closed {
t.Fatal("expected conn to remain open")
}
}
// quickly add 30 connections (sending us above the high watermark)
for i := 0; i < 30; i++ {
rc := randConn(t, not.Disconnected)
conns = append(conns, rc)
not.Connected(nil, rc)
}
cm.TrimOpenConns(context.Background())
for _, c := range conns {
if c.(*tconn).closed {
t.Fatal("expected no conns to be closed")
}
}
time.Sleep(200 * time.Millisecond)
cm.TrimOpenConns(context.Background())
closed := 0
for _, c := range conns {
if c.(*tconn).closed {
closed++
}
}
if closed != 21 {
t.Fatal("expected to have closed 21 connections")
}
}
// see https://github.com/libp2p/go-libp2p-connmgr/issues/23
func TestQuickBurstRespectsSilencePeriod(t *testing.T) {
if detectrace.WithRace() {
t.Skip("race detector is unhappy with this test")
}
cm := NewConnManager(10, 20, 0)
not := cm.Notifee()
var conns []network.Conn
// quickly produce 30 connections (sending us above the high watermark)
for i := 0; i < 30; i++ {
rc := randConn(t, not.Disconnected)
conns = append(conns, rc)
not.Connected(nil, rc)
}
// wait for a few seconds
time.Sleep(time.Second * 3)
// only the first trim is allowed in; make sure we close at most 20 connections, not all of them.
var closed int
for _, c := range conns {
if c.(*tconn).closed {
closed++
}
}
if closed > 20 {
t.Fatalf("should have closed at most 20 connections, closed: %d", closed)
}
if total := closed + int(cm.connCount); total != 30 {
t.Fatalf("expected closed connections + open conn count to equal 30, value: %d", total)
}
}
func TestPeerProtectionSingleTag(t *testing.T) {
if detectrace.WithRace() {
t.Skip("race detector is unhappy with this test")
}
SilencePeriod = 0
cm := NewConnManager(19, 20, 0)
SilencePeriod = 10 * time.Second
not := cm.Notifee()
var conns []network.Conn
addConn := func(value int) {
rc := randConn(t, not.Disconnected)
conns = append(conns, rc)
not.Connected(nil, rc)
cm.TagPeer(rc.RemotePeer(), "test", value)
}
// produce 20 connections with unique peers.
for i := 0; i < 20; i++ {
addConn(20)
}
// protect the first 5 peers.
var protected []network.Conn
for _, c := range conns[0:5] {
cm.Protect(c.RemotePeer(), "global")
protected = append(protected, c)
// tag them negatively to make them preferred for pruning.
cm.TagPeer(c.RemotePeer(), "test", -100)
}
// add 1 more conn, this shouldn't send us over the limit as protected conns don't count
addConn(20)
cm.TrimOpenConns(context.Background())
for _, c := range conns {
if c.(*tconn).closed {
t.Error("connection was closed by connection manager")
}
}
// add 5 more connection, sending the connection manager overboard.
for i := 0; i < 5; i++ {
addConn(20)
}
cm.TrimOpenConns(context.Background())
for _, c := range protected {
if c.(*tconn).closed {
t.Error("protected connection was closed by connection manager")
}
}
closed := 0
for _, c := range conns {
if c.(*tconn).closed {
closed++
}
}
if closed != 2 {
t.Errorf("expected 2 connection to be closed, found %d", closed)
}
// unprotect the first peer.
cm.Unprotect(protected[0].RemotePeer(), "global")
// add 2 more connections, sending the connection manager overboard again.
for i := 0; i < 2; i++ {
addConn(20)
}
cm.TrimOpenConns(context.Background())
if !protected[0].(*tconn).closed {
t.Error("unprotected connection was kept open by connection manager")
}
for _, c := range protected[1:] {
if c.(*tconn).closed {
t.Error("protected connection was closed by connection manager")
}
}
}
func TestPeerProtectionMultipleTags(t *testing.T) {
if detectrace.WithRace() {
t.Skip("race detector is unhappy with this test")
}
SilencePeriod = 0
cm := NewConnManager(19, 20, 0)
SilencePeriod = 10 * time.Second
not := cm.Notifee()
// produce 20 connections with unique peers.
var conns []network.Conn
for i := 0; i < 20; i++ {
rc := randConn(t, not.Disconnected)
conns = append(conns, rc)
not.Connected(nil, rc)
cm.TagPeer(rc.RemotePeer(), "test", 20)
}
// protect the first 5 peers under two tags.
var protected []network.Conn
for _, c := range conns[0:5] {
cm.Protect(c.RemotePeer(), "tag1")
cm.Protect(c.RemotePeer(), "tag2")
protected = append(protected, c)
// tag them negatively to make them preferred for pruning.
cm.TagPeer(c.RemotePeer(), "test", -100)
}
// add one more connection, sending the connection manager overboard.
not.Connected(nil, randConn(t, not.Disconnected))
cm.TrimOpenConns(context.Background())
for _, c := range protected {
if c.(*tconn).closed {
t.Error("protected connection was closed by connection manager")
}
}
// remove the protection from one tag.
for _, c := range protected {
if !cm.Unprotect(c.RemotePeer(), "tag1") {
t.Error("peer should still be protected")
}
}
// add 2 more connections, sending the connection manager overboard again.
for i := 0; i < 2; i++ {
rc := randConn(t, not.Disconnected)
conns = append(conns, rc)
not.Connected(nil, rc)
cm.TagPeer(rc.RemotePeer(), "test", 20)
}
cm.TrimOpenConns(context.Background())
// connections should still remain open, as they were protected.
for _, c := range protected[0:] {
if c.(*tconn).closed {
t.Error("protected connection was closed by connection manager")
}
}
// unprotect the first peer entirely.
cm.Unprotect(protected[0].RemotePeer(), "tag2")
// add 2 more connections, sending the connection manager overboard again.
for i := 0; i < 2; i++ {
rc := randConn(t, not.Disconnected)
conns = append(conns, rc)
not.Connected(nil, rc)
cm.TagPeer(rc.RemotePeer(), "test", 20)
}
cm.TrimOpenConns(context.Background())
if !protected[0].(*tconn).closed {
t.Error("unprotected connection was kept open by connection manager")
}
for _, c := range protected[1:] {
if c.(*tconn).closed {
t.Error("protected connection was closed by connection manager")
}
}
}
func TestPeerProtectionIdempotent(t *testing.T) {
SilencePeriod = 0
cm := NewConnManager(10, 20, 0)
SilencePeriod = 10 * time.Second
id, _ := tu.RandPeerID()
cm.Protect(id, "global")
cm.Protect(id, "global")
cm.Protect(id, "global")
cm.Protect(id, "global")
if len(cm.protected[id]) > 1 {
t.Error("expected peer to be protected only once")
}
if !cm.Unprotect(id, "unused") {
t.Error("expected peer to continue to be protected")
}
if !cm.Unprotect(id, "unused2") {
t.Error("expected peer to continue to be protected")
}
if cm.Unprotect(id, "global") {
t.Error("expected peer to be unprotected")
}
if len(cm.protected) > 0 {
t.Error("expected no protections")
}
}
func TestUpsertTag(t *testing.T) {
cm := NewConnManager(1, 1, time.Duration(10*time.Minute))
not := cm.Notifee()
conn := randConn(t, nil)
rp := conn.RemotePeer()
// this is an early tag, before the Connected notification arrived.
cm.UpsertTag(rp, "tag", func(v int) int { return v + 1 })
if len(cm.segments.get(rp).peers[rp].tags) != 1 {
t.Fatal("expected a tag")
}
if cm.segments.get(rp).peers[rp].value != 1 {
t.Fatal("expected a tag value of 1")
}
// now let's notify the connection.
not.Connected(nil, conn)
cm.UpsertTag(rp, "tag", func(v int) int { return v + 1 })
if len(cm.segments.get(rp).peers[rp].tags) != 1 {
t.Fatal("expected a tag")
}
if cm.segments.get(rp).peers[rp].value != 2 {
t.Fatal("expected a tag value of 2")
}
cm.UpsertTag(rp, "tag", func(v int) int { return v - 1 })
if len(cm.segments.get(rp).peers[rp].tags) != 1 {
t.Fatal("expected a tag")
}
if cm.segments.get(rp).peers[rp].value != 1 {
t.Fatal("expected a tag value of 1")
}
}
func TestTemporaryEntriesClearedFirst(t *testing.T) {
cm := NewConnManager(1, 1, 0)
id := tu.RandPeerIDFatal(t)
cm.TagPeer(id, "test", 20)
if cm.GetTagInfo(id).Value != 20 {
t.Fatal("expected an early tag with value 20")
}
not := cm.Notifee()
conn1, conn2 := randConn(t, nil), randConn(t, nil)
not.Connected(nil, conn1)
not.Connected(nil, conn2)
cm.TrimOpenConns(context.Background())
if cm.GetTagInfo(id) != nil {
t.Fatal("expected no temporary tags after trimming")
}
}
func TestTemporaryEntryConvertedOnConnection(t *testing.T) {
cm := NewConnManager(1, 1, 0)
conn := randConn(t, nil)
cm.TagPeer(conn.RemotePeer(), "test", 20)
ti := cm.segments.get(conn.RemotePeer()).peers[conn.RemotePeer()]
if ti.value != 20 || !ti.temp {
t.Fatal("expected a temporary tag with value 20")
}
not := cm.Notifee()
not.Connected(nil, conn)
if ti.value != 20 || ti.temp {
t.Fatal("expected a non-temporary tag with value 20")
}
}

View File

@@ -5,13 +5,12 @@ import (
"github.com/libp2p/go-libp2p-core/host"
peerstore "github.com/libp2p/go-libp2p-peerstore"
"github.com/multiformats/go-multiaddr"
"go.opencensus.io/trace"
)
// MakePeer from multiaddress string.
func MakePeer(addr string) (*peerstore.PeerInfo, error) {
maddr, err := multiaddr.NewMultiaddr(addr)
maddr, err := multiAddrFromString(addr)
if err != nil {
return nil, err
}

View File

@@ -41,7 +41,10 @@ func (s *Service) AddConnectionHandler(reqFunc func(ctx context.Context, id peer
if err := reqFunc(ctx, conn.RemotePeer()); err != nil && err != io.EOF {
log.WithError(err).Debug("Could not send successful hello rpc request")
if err.Error() == "protocol not supported" {
// This is only to ensure the smooth running of our testnets. This will not be
// used in production.
log.Debug("Not disconnecting peer with unsupported protocol. This may be the DHT node or relay.")
s.host.ConnManager().Protect(conn.RemotePeer(), "relay/bootnode")
return
}
if err := s.Disconnect(conn.RemotePeer()); err != nil {

View File

@@ -7,10 +7,10 @@ import (
"time"
"github.com/libp2p/go-libp2p"
connmgr "github.com/libp2p/go-libp2p-connmgr"
filter "github.com/libp2p/go-maddr-filter"
"github.com/multiformats/go-multiaddr"
ma "github.com/multiformats/go-multiaddr"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/connmgr"
)
// buildOptions for the libp2p host.

View File

@@ -130,6 +130,11 @@ func (s *Service) Start() {
if err := dialRelayNode(s.ctx, s.host, s.cfg.RelayNodeAddr); err != nil {
log.WithError(err).Errorf("Could not dial relay node")
}
peer, err := MakePeer(s.cfg.RelayNodeAddr)
if err != nil {
log.WithError(err).Errorf("Could not create peer")
}
s.host.ConnManager().Protect(peer.ID, "relay")
}
if len(s.cfg.Discv5BootStrapAddr) != 0 && !s.cfg.NoDiscovery {
@@ -164,6 +169,11 @@ func (s *Service) Start() {
s.startupErr = err
return
}
peer, err := MakePeer(addr)
if err != nil {
log.WithError(err).Errorf("Could not create peer")
}
s.host.ConnManager().Protect(peer.ID, "bootnode")
}
bcfg := kaddht.DefaultBootstrapConfig
bcfg.Period = time.Duration(30 * time.Second)