diff --git a/WORKSPACE b/WORKSPACE index cc74241474..c85fce4c07 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -1381,3 +1381,10 @@ go_repository( sum = "h1:HgqpYBng0n7tLJIlyT4kPCIv5XgCsF+kai1NnnrJzEU=", version = "v2.20.1+incompatible", ) + +go_repository( + name = "com_github_ipfs_go_detect_race", + importpath = "github.com/ipfs/go-detect-race", + sum = "h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ6cOk=", + version = "v0.0.1", +) diff --git a/beacon-chain/p2p/BUILD.bazel b/beacon-chain/p2p/BUILD.bazel index 10c4c75736..75cf3f013a 100644 --- a/beacon-chain/p2p/BUILD.bazel +++ b/beacon-chain/p2p/BUILD.bazel @@ -28,6 +28,7 @@ go_library( "//tools:__subpackages__", ], deps = [ + "//beacon-chain/p2p/connmgr:go_default_library", "//beacon-chain/p2p/encoder:go_default_library", "//beacon-chain/p2p/peers:go_default_library", "//beacon-chain/sync/peerstatus:go_default_library", @@ -48,7 +49,6 @@ go_library( "@com_github_libp2p_go_libp2p//:go_default_library", "@com_github_libp2p_go_libp2p//config:go_default_library", "@com_github_libp2p_go_libp2p//p2p/host/routed:go_default_library", - "@com_github_libp2p_go_libp2p_connmgr//:go_default_library", "@com_github_libp2p_go_libp2p_core//:go_default_library", "@com_github_libp2p_go_libp2p_core//crypto:go_default_library", "@com_github_libp2p_go_libp2p_core//host:go_default_library", diff --git a/beacon-chain/p2p/connmgr/BUILD.bazel b/beacon-chain/p2p/connmgr/BUILD.bazel new file mode 100644 index 0000000000..03261e4705 --- /dev/null +++ b/beacon-chain/p2p/connmgr/BUILD.bazel @@ -0,0 +1,28 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = ["connmgr.go"], + importpath = "github.com/prysmaticlabs/prysm/beacon-chain/p2p/connmgr", + visibility = ["//beacon-chain:__subpackages__"], + deps = [ + "@com_github_ipfs_go_log//:go_default_library", + "@com_github_libp2p_go_libp2p_core//connmgr:go_default_library", + "@com_github_libp2p_go_libp2p_core//network:go_default_library", + "@com_github_libp2p_go_libp2p_core//peer:go_default_library", + "@com_github_multiformats_go_multiaddr//:go_default_library", + ], +) + +go_test( + name = "go_default_test", + srcs = ["connmgr_test.go"], + embed = [":go_default_library"], + deps = [ + "@com_github_ipfs_go_detect_race//:go_default_library", + "@com_github_libp2p_go_libp2p_core//network:go_default_library", + "@com_github_libp2p_go_libp2p_core//peer:go_default_library", + "@com_github_libp2p_go_libp2p_core//test:go_default_library", + "@com_github_multiformats_go_multiaddr//:go_default_library", + ], +) diff --git a/beacon-chain/p2p/connmgr/connmgr.go b/beacon-chain/p2p/connmgr/connmgr.go new file mode 100644 index 0000000000..1e3742deb4 --- /dev/null +++ b/beacon-chain/p2p/connmgr/connmgr.go @@ -0,0 +1,521 @@ +/* +Copyright 2019. Protocol Labs, Inc. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +*/ + +// Package connmgr : This file is forked from github.com/libp2p/go-libp2p-core/connmgr/connmgr.go +package connmgr + +import ( + "context" + "sort" + "sync" + "sync/atomic" + "time" + + logging "github.com/ipfs/go-log" + "github.com/libp2p/go-libp2p-core/connmgr" + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + ma "github.com/multiformats/go-multiaddr" +) + +// SilencePeriod refers to the period in which a connection is given leeway by the connection +// manager before it is handled normally. +var SilencePeriod = 5 * time.Second + +// TickerPeriod represents the frequency in which we check the number of +// open connections. +var TickerPeriod = 5 * time.Second + +var log = logging.Logger("connmgr") + +// BasicConnMgr is a ConnManager that trims connections whenever the count exceeds the +// high watermark. New connections are given a grace period before they're subject +// to trimming. Trims are automatically run on demand, only if the time from the +// previous trim is higher than 10 seconds. Furthermore, trims can be explicitly +// requested through the public interface of this struct (see TrimOpenConns). +// +// See configuration parameters in NewConnManager. +type BasicConnMgr struct { + highWater int + lowWater int + connCount int32 + gracePeriod time.Duration + segments segments + + plk sync.RWMutex + protected map[peer.ID]map[string]struct{} + + // channel-based semaphore that enforces only a single trim is in progress + trimRunningCh chan struct{} + lastTrim time.Time + silencePeriod time.Duration + + ctx context.Context + cancel func() +} + +var _ connmgr.ConnManager = (*BasicConnMgr)(nil) + +type segment struct { + sync.Mutex + peers map[peer.ID]*peerInfo +} + +type segments [256]*segment + +func (ss *segments) get(p peer.ID) *segment { + return ss[byte(p[len(p)-1])] +} + +func (ss *segments) countPeers() (count int) { + for _, seg := range ss { + seg.Lock() + count += len(seg.peers) + seg.Unlock() + } + return count +} + +func (s *segment) tagInfoFor(p peer.ID) *peerInfo { + pi, ok := s.peers[p] + if ok { + return pi + } + // create a temporary peer to buffer early tags before the Connected notification arrives. + pi = &peerInfo{ + id: p, + firstSeen: time.Now(), // this timestamp will be updated when the first Connected notification arrives. + temp: true, + tags: make(map[string]int), + conns: make(map[network.Conn]time.Time), + } + s.peers[p] = pi + return pi +} + +// NewConnManager creates a new BasicConnMgr with the provided params: +// * lo and hi are watermarks governing the number of connections that'll be maintained. +// When the peer count exceeds the 'high watermark', as many peers will be pruned (and +// their connections terminated) until 'low watermark' peers remain. +// * grace is the amount of time a newly opened connection is given before it becomes +// subject to pruning. +func NewConnManager(low, hi int, grace time.Duration) *BasicConnMgr { + ctx, cancel := context.WithCancel(context.Background()) + cm := &BasicConnMgr{ + highWater: hi, + lowWater: low, + gracePeriod: grace, + trimRunningCh: make(chan struct{}, 1), + protected: make(map[peer.ID]map[string]struct{}, 16), + silencePeriod: SilencePeriod, + ctx: ctx, + cancel: cancel, + segments: func() (ret segments) { + for i := range ret { + ret[i] = &segment{ + peers: make(map[peer.ID]*peerInfo), + } + } + return ret + }(), + } + + go cm.background() + return cm +} + +// Close shutsdown the connection manager. +func (cm *BasicConnMgr) Close() error { + cm.cancel() + return nil +} + +// Protect is used to protect a peer from being pruned by the +// connection manager. +func (cm *BasicConnMgr) Protect(id peer.ID, tag string) { + cm.plk.Lock() + defer cm.plk.Unlock() + + tags, ok := cm.protected[id] + if !ok { + tags = make(map[string]struct{}, 2) + cm.protected[id] = tags + } + tags[tag] = struct{}{} +} + +// Unprotect is used to remove the protection a previously protected peer +// so that it can be normally pruned by the connection manager. +func (cm *BasicConnMgr) Unprotect(id peer.ID, tag string) (protected bool) { + cm.plk.Lock() + defer cm.plk.Unlock() + + tags, ok := cm.protected[id] + if !ok { + return false + } + if delete(tags, tag); len(tags) == 0 { + delete(cm.protected, id) + return false + } + return true +} + +// peerInfo stores metadata for a given peer. +type peerInfo struct { + id peer.ID + tags map[string]int // value for each tag + value int // cached sum of all tag values + temp bool // this is a temporary entry holding early tags, and awaiting connections + + conns map[network.Conn]time.Time // start time of each connection + + firstSeen time.Time // timestamp when we began tracking this peer. +} + +// TrimOpenConns closes the connections of as many peers as needed to make the peer count +// equal the low watermark. Peers are sorted in ascending order based on their total value, +// pruning those peers with the lowest scores first, as long as they are not within their +// grace period. +// +// (a) there's another trim in progress, or (b) the silence period is in effect. +func (cm *BasicConnMgr) TrimOpenConns(ctx context.Context) { + select { + case cm.trimRunningCh <- struct{}{}: + default: + return + } + defer func() { <-cm.trimRunningCh }() + if time.Since(cm.lastTrim) < cm.silencePeriod { + // skip this attempt to trim as the last one just took place. + return + } + + defer log.EventBegin(ctx, "connCleanup").Done() + for _, c := range cm.getConnsToClose(ctx) { + log.Info("closing conn: ", c.RemotePeer()) + log.Event(ctx, "closeConn", c.RemotePeer()) + c.Close() + } + + cm.lastTrim = time.Now() +} + +func (cm *BasicConnMgr) background() { + ticker := time.NewTicker(TickerPeriod) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if atomic.LoadInt32(&cm.connCount) > int32(cm.highWater) { + cm.TrimOpenConns(cm.ctx) + } + + case <-cm.ctx.Done(): + return + } + } +} + +// getConnsToClose runs the heuristics described in TrimOpenConns and returns the +// connections to close. +func (cm *BasicConnMgr) getConnsToClose(ctx context.Context) []network.Conn { + if cm.lowWater == 0 || cm.highWater == 0 { + // disabled + return nil + } + + nconns := int(atomic.LoadInt32(&cm.connCount)) + if nconns <= cm.lowWater { + log.Info("open connection count below limit") + return nil + } + + npeers := cm.segments.countPeers() + candidates := make([]*peerInfo, 0, npeers) + ncandidates := 0 + gracePeriodStart := time.Now().Add(-cm.gracePeriod) + + cm.plk.RLock() + for _, s := range cm.segments { + s.Lock() + for id, inf := range s.peers { + if _, ok := cm.protected[id]; ok { + // skip over protected peer. + continue + } + if inf.firstSeen.After(gracePeriodStart) { + // skip peers in the grace period. + continue + } + candidates = append(candidates, inf) + ncandidates += len(inf.conns) + } + s.Unlock() + } + cm.plk.RUnlock() + + if ncandidates < cm.lowWater { + log.Info("open connection count above limit but too many are in the grace period") + // We have too many connections but fewer than lowWater + // connections out of the grace period. + // + // If we trimmed now, we'd kill potentially useful connections. + return nil + } + + // Sort peers according to their value. + sort.Slice(candidates, func(i, j int) bool { + left, right := candidates[i], candidates[j] + // temporary peers are preferred for pruning. + if left.temp != right.temp { + return left.temp + } + // otherwise, compare by value. + return left.value < right.value + }) + + target := ncandidates - cm.lowWater + + // slightly overallocate because we may have more than one conns per peer + selected := make([]network.Conn, 0, target+10) + + for _, inf := range candidates { + if target <= 0 { + break + } + + // lock this to protect from concurrent modifications from connect/disconnect events + s := cm.segments.get(inf.id) + s.Lock() + + if len(inf.conns) == 0 && inf.temp { + // handle temporary entries for early tags -- this entry has gone past the grace period + // and still holds no connections, so prune it. + delete(s.peers, inf.id) + } else { + for c := range inf.conns { + selected = append(selected, c) + } + } + target -= len(inf.conns) + s.Unlock() + } + + return selected +} + +// GetTagInfo is called to fetch the tag information associated with a given +// peer, nil is returned if p refers to an unknown peer. +func (cm *BasicConnMgr) GetTagInfo(p peer.ID) *connmgr.TagInfo { + s := cm.segments.get(p) + s.Lock() + defer s.Unlock() + + pi, ok := s.peers[p] + if !ok { + return nil + } + + out := &connmgr.TagInfo{ + FirstSeen: pi.firstSeen, + Value: pi.value, + Tags: make(map[string]int), + Conns: make(map[string]time.Time), + } + + for t, v := range pi.tags { + out.Tags[t] = v + } + for c, t := range pi.conns { + out.Conns[c.RemoteMultiaddr().String()] = t + } + + return out +} + +// TagPeer is called to associate a string and integer with a given peer. +func (cm *BasicConnMgr) TagPeer(p peer.ID, tag string, val int) { + s := cm.segments.get(p) + s.Lock() + defer s.Unlock() + + pi := s.tagInfoFor(p) + + // Update the total value of the peer. + pi.value += val - pi.tags[tag] + pi.tags[tag] = val +} + +// UntagPeer is called to disassociate a string and integer from a given peer. +func (cm *BasicConnMgr) UntagPeer(p peer.ID, tag string) { + s := cm.segments.get(p) + s.Lock() + defer s.Unlock() + + pi, ok := s.peers[p] + if !ok { + log.Info("tried to remove tag from untracked peer: ", p) + return + } + + // Update the total value of the peer. + pi.value -= pi.tags[tag] + delete(pi.tags, tag) +} + +// UpsertTag is called to insert/update a peer tag +func (cm *BasicConnMgr) UpsertTag(p peer.ID, tag string, upsert func(int) int) { + s := cm.segments.get(p) + s.Lock() + defer s.Unlock() + + pi := s.tagInfoFor(p) + + oldval := pi.tags[tag] + newval := upsert(oldval) + pi.value += newval - oldval + pi.tags[tag] = newval +} + +// CMInfo holds the configuration for BasicConnMgr, as well as status data. +type CMInfo struct { + // The low watermark, as described in NewConnManager. + LowWater int + + // The high watermark, as described in NewConnManager. + HighWater int + + // The timestamp when the last trim was triggered. + LastTrim time.Time + + // The configured grace period, as described in NewConnManager. + GracePeriod time.Duration + + // The current connection count. + ConnCount int +} + +// GetInfo returns the configuration and status data for this connection manager. +func (cm *BasicConnMgr) GetInfo() CMInfo { + return CMInfo{ + HighWater: cm.highWater, + LowWater: cm.lowWater, + LastTrim: cm.lastTrim, + GracePeriod: cm.gracePeriod, + ConnCount: int(atomic.LoadInt32(&cm.connCount)), + } +} + +// Notifee returns a sink through which Notifiers can inform the BasicConnMgr when +// events occur. Currently, the notifee only reacts upon connection events +// {Connected, Disconnected}. +func (cm *BasicConnMgr) Notifee() network.Notifiee { + return (*cmNotifee)(cm) +} + +type cmNotifee BasicConnMgr + +func (nn *cmNotifee) cm() *BasicConnMgr { + return (*BasicConnMgr)(nn) +} + +// Connected is called by notifiers to inform that a new connection has been established. +// The notifee updates the BasicConnMgr to start tracking the connection. If the new connection +// count exceeds the high watermark, a trim may be triggered. +func (nn *cmNotifee) Connected(n network.Network, c network.Conn) { + cm := nn.cm() + + p := c.RemotePeer() + s := cm.segments.get(p) + s.Lock() + defer s.Unlock() + + id := c.RemotePeer() + pinfo, ok := s.peers[id] + if !ok { + pinfo = &peerInfo{ + id: id, + firstSeen: time.Now(), + tags: make(map[string]int), + conns: make(map[network.Conn]time.Time), + } + s.peers[id] = pinfo + } else if pinfo.temp { + // we had created a temporary entry for this peer to buffer early tags before the + // Connected notification arrived: flip the temporary flag, and update the firstSeen + // timestamp to the real one. + pinfo.temp = false + pinfo.firstSeen = time.Now() + } + + _, ok = pinfo.conns[c] + if ok { + log.Error("received connected notification for conn we are already tracking: ", p) + return + } + + pinfo.conns[c] = time.Now() + atomic.AddInt32(&cm.connCount, 1) +} + +// Disconnected is called by notifiers to inform that an existing connection has been closed or terminated. +// The notifee updates the BasicConnMgr accordingly to stop tracking the connection, and performs housekeeping. +func (nn *cmNotifee) Disconnected(n network.Network, c network.Conn) { + cm := nn.cm() + + p := c.RemotePeer() + s := cm.segments.get(p) + s.Lock() + defer s.Unlock() + + cinf, ok := s.peers[p] + if !ok { + log.Error("received disconnected notification for peer we are not tracking: ", p) + return + } + + _, ok = cinf.conns[c] + if !ok { + log.Error("received disconnected notification for conn we are not tracking: ", p) + return + } + + delete(cinf.conns, c) + if len(cinf.conns) == 0 { + delete(s.peers, p) + } + atomic.AddInt32(&cm.connCount, -1) +} + +// Listen is no-op in this implementation. +func (nn *cmNotifee) Listen(n network.Network, addr ma.Multiaddr) {} + +// ListenClose is no-op in this implementation. +func (nn *cmNotifee) ListenClose(n network.Network, addr ma.Multiaddr) {} + +// OpenedStream is no-op in this implementation. +func (nn *cmNotifee) OpenedStream(network.Network, network.Stream) {} + +// ClosedStream is no-op in this implementation. +func (nn *cmNotifee) ClosedStream(network.Network, network.Stream) {} diff --git a/beacon-chain/p2p/connmgr/connmgr_test.go b/beacon-chain/p2p/connmgr/connmgr_test.go new file mode 100644 index 0000000000..ad787864c7 --- /dev/null +++ b/beacon-chain/p2p/connmgr/connmgr_test.go @@ -0,0 +1,681 @@ +package connmgr + +import ( + "context" + "testing" + "time" + + detectrace "github.com/ipfs/go-detect-race" + + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + + tu "github.com/libp2p/go-libp2p-core/test" + ma "github.com/multiformats/go-multiaddr" +) + +type tconn struct { + network.Conn + + peer peer.ID + closed bool + disconnectNotify func(net network.Network, conn network.Conn) +} + +func (c *tconn) Close() error { + c.closed = true + if c.disconnectNotify != nil { + c.disconnectNotify(nil, c) + } + return nil +} + +func (c *tconn) RemotePeer() peer.ID { + return c.peer +} + +func (c *tconn) RemoteMultiaddr() ma.Multiaddr { + addr, err := ma.NewMultiaddr("/ip4/127.0.0.1/udp/1234") + if err != nil { + panic("cannot create multiaddr") + } + return addr +} + +func randConn(t testing.TB, discNotify func(network.Network, network.Conn)) network.Conn { + pid := tu.RandPeerIDFatal(t) + return &tconn{peer: pid, disconnectNotify: discNotify} +} + +func TestConnTrimming(t *testing.T) { + cm := NewConnManager(200, 300, 0) + not := cm.Notifee() + + var conns []network.Conn + for i := 0; i < 300; i++ { + rc := randConn(t, nil) + conns = append(conns, rc) + not.Connected(nil, rc) + } + + for _, c := range conns { + if c.(*tconn).closed { + t.Fatal("nothing should be closed yet") + } + } + + for i := 0; i < 100; i++ { + cm.TagPeer(conns[i].RemotePeer(), "foo", 10) + } + + cm.TagPeer(conns[299].RemotePeer(), "badfoo", -5) + + cm.TrimOpenConns(context.Background()) + + for i := 0; i < 100; i++ { + c := conns[i] + if c.(*tconn).closed { + t.Fatal("these shouldnt be closed") + } + } + + if !conns[299].(*tconn).closed { + t.Fatal("conn with bad tag should have gotten closed") + } +} + +func TestConnsToClose(t *testing.T) { + cm := NewConnManager(0, 10, 0) + conns := cm.getConnsToClose(context.Background()) + if conns != nil { + t.Fatal("expected no connections") + } + + cm = NewConnManager(10, 0, 0) + conns = cm.getConnsToClose(context.Background()) + if conns != nil { + t.Fatal("expected no connections") + } + + cm = NewConnManager(1, 1, 0) + conns = cm.getConnsToClose(context.Background()) + if conns != nil { + t.Fatal("expected no connections") + } + + cm = NewConnManager(1, 1, time.Duration(10*time.Minute)) + not := cm.Notifee() + for i := 0; i < 5; i++ { + conn := randConn(t, nil) + not.Connected(nil, conn) + } + conns = cm.getConnsToClose(context.Background()) + if len(conns) != 0 { + t.Fatal("expected no connections") + } +} + +func TestGetTagInfo(t *testing.T) { + start := time.Now() + cm := NewConnManager(1, 1, time.Duration(10*time.Minute)) + not := cm.Notifee() + conn := randConn(t, nil) + not.Connected(nil, conn) + end := time.Now() + + other := tu.RandPeerIDFatal(t) + tag := cm.GetTagInfo(other) + if tag != nil { + t.Fatal("expected no tag") + } + + tag = cm.GetTagInfo(conn.RemotePeer()) + if tag == nil { + t.Fatal("expected tag") + } + if tag.FirstSeen.Before(start) || tag.FirstSeen.After(end) { + t.Fatal("expected first seen time") + } + if tag.Value != 0 { + t.Fatal("expected zero value") + } + if len(tag.Tags) != 0 { + t.Fatal("expected no tags") + } + if len(tag.Conns) != 1 { + t.Fatal("expected one connection") + } + for s, tm := range tag.Conns { + if s != conn.RemoteMultiaddr().String() { + t.Fatal("unexpected multiaddr") + } + if tm.Before(start) || tm.After(end) { + t.Fatal("unexpected connection time") + } + } + + cm.TagPeer(conn.RemotePeer(), "tag", 5) + tag = cm.GetTagInfo(conn.RemotePeer()) + if tag == nil { + t.Fatal("expected tag") + } + if tag.FirstSeen.Before(start) || tag.FirstSeen.After(end) { + t.Fatal("expected first seen time") + } + if tag.Value != 5 { + t.Fatal("expected five value") + } + if len(tag.Tags) != 1 { + t.Fatal("expected no tags") + } + for tString, v := range tag.Tags { + if tString != "tag" || v != 5 { + t.Fatal("expected tag value") + } + } + if len(tag.Conns) != 1 { + t.Fatal("expected one connection") + } + for s, tm := range tag.Conns { + if s != conn.RemoteMultiaddr().String() { + t.Fatal("unexpected multiaddr") + } + if tm.Before(start) || tm.After(end) { + t.Fatal("unexpected connection time") + } + } +} + +func TestTagPeerNonExistant(t *testing.T) { + cm := NewConnManager(1, 1, time.Duration(10*time.Minute)) + + id := tu.RandPeerIDFatal(t) + cm.TagPeer(id, "test", 1) + + if !cm.segments.get(id).peers[id].temp { + t.Fatal("expected 1 temporary entry") + } +} + +func TestUntagPeer(t *testing.T) { + cm := NewConnManager(1, 1, time.Duration(10*time.Minute)) + not := cm.Notifee() + conn := randConn(t, nil) + not.Connected(nil, conn) + rp := conn.RemotePeer() + cm.TagPeer(rp, "tag", 5) + cm.TagPeer(rp, "tag two", 5) + + id := tu.RandPeerIDFatal(t) + cm.UntagPeer(id, "test") + if len(cm.segments.get(rp).peers[rp].tags) != 2 { + t.Fatal("expected tags to be uneffected") + } + + cm.UntagPeer(conn.RemotePeer(), "test") + if len(cm.segments.get(rp).peers[rp].tags) != 2 { + t.Fatal("expected tags to be uneffected") + } + + cm.UntagPeer(conn.RemotePeer(), "tag") + if len(cm.segments.get(rp).peers[rp].tags) != 1 { + t.Fatal("expected tag to be removed") + } + if cm.segments.get(rp).peers[rp].value != 5 { + t.Fatal("expected aggreagte tag value to be 5") + } +} + +func TestGetInfo(t *testing.T) { + start := time.Now() + gp := time.Duration(10 * time.Minute) + cm := NewConnManager(1, 5, gp) + not := cm.Notifee() + conn := randConn(t, nil) + not.Connected(nil, conn) + cm.TrimOpenConns(context.Background()) + end := time.Now() + + info := cm.GetInfo() + if info.HighWater != 5 { + t.Fatal("expected highwater to be 5") + } + if info.LowWater != 1 { + t.Fatal("expected highwater to be 1") + } + if info.LastTrim.Before(start) || info.LastTrim.After(end) { + t.Fatal("unexpected last trim time") + } + if info.GracePeriod != gp { + t.Fatal("unexpected grace period") + } + if info.ConnCount != 1 { + t.Fatal("unexpected number of connections") + } +} + +func TestDoubleConnection(t *testing.T) { + gp := time.Duration(10 * time.Minute) + cm := NewConnManager(1, 5, gp) + not := cm.Notifee() + conn := randConn(t, nil) + not.Connected(nil, conn) + cm.TagPeer(conn.RemotePeer(), "foo", 10) + not.Connected(nil, conn) + if cm.connCount != 1 { + t.Fatal("unexpected number of connections") + } + if cm.segments.get(conn.RemotePeer()).peers[conn.RemotePeer()].value != 10 { + t.Fatal("unexpected peer value") + } +} + +func TestDisconnected(t *testing.T) { + gp := time.Duration(10 * time.Minute) + cm := NewConnManager(1, 5, gp) + not := cm.Notifee() + conn := randConn(t, nil) + not.Connected(nil, conn) + cm.TagPeer(conn.RemotePeer(), "foo", 10) + + not.Disconnected(nil, randConn(t, nil)) + if cm.connCount != 1 { + t.Fatal("unexpected number of connections") + } + if cm.segments.get(conn.RemotePeer()).peers[conn.RemotePeer()].value != 10 { + t.Fatal("unexpected peer value") + } + + not.Disconnected(nil, &tconn{peer: conn.RemotePeer()}) + if cm.connCount != 1 { + t.Fatal("unexpected number of connections") + } + if cm.segments.get(conn.RemotePeer()).peers[conn.RemotePeer()].value != 10 { + t.Fatal("unexpected peer value") + } + + not.Disconnected(nil, conn) + if cm.connCount != 0 { + t.Fatal("unexpected number of connections") + } + if cm.segments.countPeers() != 0 { + t.Fatal("unexpected number of peers") + } +} + +func TestGracePeriod(t *testing.T) { + if detectrace.WithRace() { + t.Skip("race detector is unhappy with this test") + } + + SilencePeriod = 0 + cm := NewConnManager(10, 20, 100*time.Millisecond) + SilencePeriod = 10 * time.Second + + not := cm.Notifee() + + var conns []network.Conn + + // Add a connection and wait the grace period. + { + rc := randConn(t, not.Disconnected) + conns = append(conns, rc) + not.Connected(nil, rc) + + time.Sleep(200 * time.Millisecond) + + if rc.(*tconn).closed { + t.Fatal("expected conn to remain open") + } + } + + // quickly add 30 connections (sending us above the high watermark) + for i := 0; i < 30; i++ { + rc := randConn(t, not.Disconnected) + conns = append(conns, rc) + not.Connected(nil, rc) + } + + cm.TrimOpenConns(context.Background()) + + for _, c := range conns { + if c.(*tconn).closed { + t.Fatal("expected no conns to be closed") + } + } + + time.Sleep(200 * time.Millisecond) + + cm.TrimOpenConns(context.Background()) + + closed := 0 + for _, c := range conns { + if c.(*tconn).closed { + closed++ + } + } + + if closed != 21 { + t.Fatal("expected to have closed 21 connections") + } +} + +// see https://github.com/libp2p/go-libp2p-connmgr/issues/23 +func TestQuickBurstRespectsSilencePeriod(t *testing.T) { + if detectrace.WithRace() { + t.Skip("race detector is unhappy with this test") + } + + cm := NewConnManager(10, 20, 0) + not := cm.Notifee() + + var conns []network.Conn + + // quickly produce 30 connections (sending us above the high watermark) + for i := 0; i < 30; i++ { + rc := randConn(t, not.Disconnected) + conns = append(conns, rc) + not.Connected(nil, rc) + } + + // wait for a few seconds + time.Sleep(time.Second * 3) + + // only the first trim is allowed in; make sure we close at most 20 connections, not all of them. + var closed int + for _, c := range conns { + if c.(*tconn).closed { + closed++ + } + } + if closed > 20 { + t.Fatalf("should have closed at most 20 connections, closed: %d", closed) + } + if total := closed + int(cm.connCount); total != 30 { + t.Fatalf("expected closed connections + open conn count to equal 30, value: %d", total) + } +} + +func TestPeerProtectionSingleTag(t *testing.T) { + if detectrace.WithRace() { + t.Skip("race detector is unhappy with this test") + } + + SilencePeriod = 0 + cm := NewConnManager(19, 20, 0) + SilencePeriod = 10 * time.Second + + not := cm.Notifee() + + var conns []network.Conn + addConn := func(value int) { + rc := randConn(t, not.Disconnected) + conns = append(conns, rc) + not.Connected(nil, rc) + cm.TagPeer(rc.RemotePeer(), "test", value) + } + + // produce 20 connections with unique peers. + for i := 0; i < 20; i++ { + addConn(20) + } + + // protect the first 5 peers. + var protected []network.Conn + for _, c := range conns[0:5] { + cm.Protect(c.RemotePeer(), "global") + protected = append(protected, c) + // tag them negatively to make them preferred for pruning. + cm.TagPeer(c.RemotePeer(), "test", -100) + } + + // add 1 more conn, this shouldn't send us over the limit as protected conns don't count + addConn(20) + + cm.TrimOpenConns(context.Background()) + + for _, c := range conns { + if c.(*tconn).closed { + t.Error("connection was closed by connection manager") + } + } + + // add 5 more connection, sending the connection manager overboard. + for i := 0; i < 5; i++ { + addConn(20) + } + + cm.TrimOpenConns(context.Background()) + + for _, c := range protected { + if c.(*tconn).closed { + t.Error("protected connection was closed by connection manager") + } + } + + closed := 0 + for _, c := range conns { + if c.(*tconn).closed { + closed++ + } + } + if closed != 2 { + t.Errorf("expected 2 connection to be closed, found %d", closed) + } + + // unprotect the first peer. + cm.Unprotect(protected[0].RemotePeer(), "global") + + // add 2 more connections, sending the connection manager overboard again. + for i := 0; i < 2; i++ { + addConn(20) + } + + cm.TrimOpenConns(context.Background()) + + if !protected[0].(*tconn).closed { + t.Error("unprotected connection was kept open by connection manager") + } + for _, c := range protected[1:] { + if c.(*tconn).closed { + t.Error("protected connection was closed by connection manager") + } + } +} + +func TestPeerProtectionMultipleTags(t *testing.T) { + if detectrace.WithRace() { + t.Skip("race detector is unhappy with this test") + } + + SilencePeriod = 0 + cm := NewConnManager(19, 20, 0) + SilencePeriod = 10 * time.Second + + not := cm.Notifee() + + // produce 20 connections with unique peers. + var conns []network.Conn + for i := 0; i < 20; i++ { + rc := randConn(t, not.Disconnected) + conns = append(conns, rc) + not.Connected(nil, rc) + cm.TagPeer(rc.RemotePeer(), "test", 20) + } + + // protect the first 5 peers under two tags. + var protected []network.Conn + for _, c := range conns[0:5] { + cm.Protect(c.RemotePeer(), "tag1") + cm.Protect(c.RemotePeer(), "tag2") + protected = append(protected, c) + // tag them negatively to make them preferred for pruning. + cm.TagPeer(c.RemotePeer(), "test", -100) + } + + // add one more connection, sending the connection manager overboard. + not.Connected(nil, randConn(t, not.Disconnected)) + + cm.TrimOpenConns(context.Background()) + + for _, c := range protected { + if c.(*tconn).closed { + t.Error("protected connection was closed by connection manager") + } + } + + // remove the protection from one tag. + for _, c := range protected { + if !cm.Unprotect(c.RemotePeer(), "tag1") { + t.Error("peer should still be protected") + } + } + + // add 2 more connections, sending the connection manager overboard again. + for i := 0; i < 2; i++ { + rc := randConn(t, not.Disconnected) + conns = append(conns, rc) + not.Connected(nil, rc) + cm.TagPeer(rc.RemotePeer(), "test", 20) + } + + cm.TrimOpenConns(context.Background()) + + // connections should still remain open, as they were protected. + for _, c := range protected[0:] { + if c.(*tconn).closed { + t.Error("protected connection was closed by connection manager") + } + } + + // unprotect the first peer entirely. + cm.Unprotect(protected[0].RemotePeer(), "tag2") + + // add 2 more connections, sending the connection manager overboard again. + for i := 0; i < 2; i++ { + rc := randConn(t, not.Disconnected) + conns = append(conns, rc) + not.Connected(nil, rc) + cm.TagPeer(rc.RemotePeer(), "test", 20) + } + + cm.TrimOpenConns(context.Background()) + + if !protected[0].(*tconn).closed { + t.Error("unprotected connection was kept open by connection manager") + } + for _, c := range protected[1:] { + if c.(*tconn).closed { + t.Error("protected connection was closed by connection manager") + } + } + +} + +func TestPeerProtectionIdempotent(t *testing.T) { + SilencePeriod = 0 + cm := NewConnManager(10, 20, 0) + SilencePeriod = 10 * time.Second + + id, _ := tu.RandPeerID() + cm.Protect(id, "global") + cm.Protect(id, "global") + cm.Protect(id, "global") + cm.Protect(id, "global") + + if len(cm.protected[id]) > 1 { + t.Error("expected peer to be protected only once") + } + + if !cm.Unprotect(id, "unused") { + t.Error("expected peer to continue to be protected") + } + + if !cm.Unprotect(id, "unused2") { + t.Error("expected peer to continue to be protected") + } + + if cm.Unprotect(id, "global") { + t.Error("expected peer to be unprotected") + } + + if len(cm.protected) > 0 { + t.Error("expected no protections") + } +} + +func TestUpsertTag(t *testing.T) { + cm := NewConnManager(1, 1, time.Duration(10*time.Minute)) + not := cm.Notifee() + conn := randConn(t, nil) + rp := conn.RemotePeer() + + // this is an early tag, before the Connected notification arrived. + cm.UpsertTag(rp, "tag", func(v int) int { return v + 1 }) + if len(cm.segments.get(rp).peers[rp].tags) != 1 { + t.Fatal("expected a tag") + } + if cm.segments.get(rp).peers[rp].value != 1 { + t.Fatal("expected a tag value of 1") + } + + // now let's notify the connection. + not.Connected(nil, conn) + + cm.UpsertTag(rp, "tag", func(v int) int { return v + 1 }) + if len(cm.segments.get(rp).peers[rp].tags) != 1 { + t.Fatal("expected a tag") + } + if cm.segments.get(rp).peers[rp].value != 2 { + t.Fatal("expected a tag value of 2") + } + + cm.UpsertTag(rp, "tag", func(v int) int { return v - 1 }) + if len(cm.segments.get(rp).peers[rp].tags) != 1 { + t.Fatal("expected a tag") + } + if cm.segments.get(rp).peers[rp].value != 1 { + t.Fatal("expected a tag value of 1") + } +} + +func TestTemporaryEntriesClearedFirst(t *testing.T) { + cm := NewConnManager(1, 1, 0) + + id := tu.RandPeerIDFatal(t) + cm.TagPeer(id, "test", 20) + + if cm.GetTagInfo(id).Value != 20 { + t.Fatal("expected an early tag with value 20") + } + + not := cm.Notifee() + conn1, conn2 := randConn(t, nil), randConn(t, nil) + not.Connected(nil, conn1) + not.Connected(nil, conn2) + + cm.TrimOpenConns(context.Background()) + if cm.GetTagInfo(id) != nil { + t.Fatal("expected no temporary tags after trimming") + } +} + +func TestTemporaryEntryConvertedOnConnection(t *testing.T) { + cm := NewConnManager(1, 1, 0) + + conn := randConn(t, nil) + cm.TagPeer(conn.RemotePeer(), "test", 20) + + ti := cm.segments.get(conn.RemotePeer()).peers[conn.RemotePeer()] + + if ti.value != 20 || !ti.temp { + t.Fatal("expected a temporary tag with value 20") + } + + not := cm.Notifee() + not.Connected(nil, conn) + + if ti.value != 20 || ti.temp { + t.Fatal("expected a non-temporary tag with value 20") + } +} diff --git a/beacon-chain/p2p/dial_relay_node.go b/beacon-chain/p2p/dial_relay_node.go index c63855a3ff..58b6a21292 100644 --- a/beacon-chain/p2p/dial_relay_node.go +++ b/beacon-chain/p2p/dial_relay_node.go @@ -5,13 +5,12 @@ import ( "github.com/libp2p/go-libp2p-core/host" peerstore "github.com/libp2p/go-libp2p-peerstore" - "github.com/multiformats/go-multiaddr" "go.opencensus.io/trace" ) // MakePeer from multiaddress string. func MakePeer(addr string) (*peerstore.PeerInfo, error) { - maddr, err := multiaddr.NewMultiaddr(addr) + maddr, err := multiAddrFromString(addr) if err != nil { return nil, err } diff --git a/beacon-chain/p2p/handshake.go b/beacon-chain/p2p/handshake.go index 5e586d613d..5d4479c86a 100644 --- a/beacon-chain/p2p/handshake.go +++ b/beacon-chain/p2p/handshake.go @@ -41,7 +41,10 @@ func (s *Service) AddConnectionHandler(reqFunc func(ctx context.Context, id peer if err := reqFunc(ctx, conn.RemotePeer()); err != nil && err != io.EOF { log.WithError(err).Debug("Could not send successful hello rpc request") if err.Error() == "protocol not supported" { + // This is only to ensure the smooth running of our testnets. This will not be + // used in production. log.Debug("Not disconnecting peer with unsupported protocol. This may be the DHT node or relay.") + s.host.ConnManager().Protect(conn.RemotePeer(), "relay/bootnode") return } if err := s.Disconnect(conn.RemotePeer()); err != nil { diff --git a/beacon-chain/p2p/options.go b/beacon-chain/p2p/options.go index 0bf017ea63..1d7481fe67 100644 --- a/beacon-chain/p2p/options.go +++ b/beacon-chain/p2p/options.go @@ -7,10 +7,10 @@ import ( "time" "github.com/libp2p/go-libp2p" - connmgr "github.com/libp2p/go-libp2p-connmgr" filter "github.com/libp2p/go-maddr-filter" "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr" + "github.com/prysmaticlabs/prysm/beacon-chain/p2p/connmgr" ) // buildOptions for the libp2p host. diff --git a/beacon-chain/p2p/service.go b/beacon-chain/p2p/service.go index b96ddee7d2..95daa3f3d0 100644 --- a/beacon-chain/p2p/service.go +++ b/beacon-chain/p2p/service.go @@ -130,6 +130,11 @@ func (s *Service) Start() { if err := dialRelayNode(s.ctx, s.host, s.cfg.RelayNodeAddr); err != nil { log.WithError(err).Errorf("Could not dial relay node") } + peer, err := MakePeer(s.cfg.RelayNodeAddr) + if err != nil { + log.WithError(err).Errorf("Could not create peer") + } + s.host.ConnManager().Protect(peer.ID, "relay") } if len(s.cfg.Discv5BootStrapAddr) != 0 && !s.cfg.NoDiscovery { @@ -164,6 +169,11 @@ func (s *Service) Start() { s.startupErr = err return } + peer, err := MakePeer(addr) + if err != nil { + log.WithError(err).Errorf("Could not create peer") + } + s.host.ConnManager().Protect(peer.ID, "bootnode") } bcfg := kaddht.DefaultBootstrapConfig bcfg.Period = time.Duration(30 * time.Second)