Fix Discv5 in Runtime (#3373)

* fix bug

* remove logs

* fix test

* add locks

* add ttl

* Apply suggestions from code review

Co-Authored-By: terence tsao <terence@prysmaticlabs.com>

* change to ccache
This commit is contained in:
Nishant Das
2019-09-02 03:59:58 +05:30
committed by Preston Van Loon
parent 25dbc5ea85
commit 876e0ea84d
7 changed files with 107 additions and 59 deletions

View File

@@ -34,6 +34,7 @@ go_library(
"@com_github_ethereum_go_ethereum//p2p/discv5:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_ipfs_go_ipfs_addr//:go_default_library",
"@com_github_karlseguin_ccache//:go_default_library",
"@com_github_libp2p_go_libp2p//:go_default_library",
"@com_github_libp2p_go_libp2p_core//crypto:go_default_library",
"@com_github_libp2p_go_libp2p_core//host:go_default_library",

View File

@@ -44,7 +44,7 @@ func createListener(ipAddr net.IP, port int, privKey *ecdsa.PrivateKey) *discv5.
}
func startDiscoveryV5(addr net.IP, privKey *ecdsa.PrivateKey, cfg *Config) (*discv5.Network, error) {
listener := createListener(addr, int(cfg.UDPPort), privKey)
listener := createListener(addr, int(cfg.Port), privKey)
bootNode, err := discv5.ParseNode(cfg.BootstrapNodeAddr)
if err != nil {
return nil, err
@@ -60,25 +60,9 @@ func startDiscoveryV5(addr net.IP, privKey *ecdsa.PrivateKey, cfg *Config) (*dis
func convertToMultiAddr(nodes []*discv5.Node) []ma.Multiaddr {
var multiAddrs []ma.Multiaddr
for _, node := range nodes {
ip4 := node.IP.To4()
if ip4 == nil {
log.Error("Node doesn't have an ip4 address")
continue
}
pubkey, err := node.ID.Pubkey()
multiAddr, err := convertToSingleMultiAddr(node)
if err != nil {
log.Errorf("Could not get pubkey from node ID: %v", err)
continue
}
assertedKey := convertToInterfacePubkey(pubkey)
id, err := peer.IDFromPublicKey(assertedKey)
if err != nil {
log.Errorf("Could not get peer id: %v", err)
}
multiAddrString := fmt.Sprintf("/ip4/%s/tcp/%d/p2p/%s", ip4.String(), node.TCP, id)
multiAddr, err := ma.NewMultiaddr(multiAddrString)
if err != nil {
log.Errorf("Could not get multiaddr:%v", err)
log.WithError(err).Error("Could not convert to multiAddr")
continue
}
multiAddrs = append(multiAddrs, multiAddr)
@@ -86,6 +70,28 @@ func convertToMultiAddr(nodes []*discv5.Node) []ma.Multiaddr {
return multiAddrs
}
func convertToSingleMultiAddr(node *discv5.Node) (ma.Multiaddr, error) {
ip4 := node.IP.To4()
if ip4 == nil {
return nil, errors.New("node doesn't have an ip4 address")
}
pubkey, err := node.ID.Pubkey()
if err != nil {
return nil, errors.Wrap(err, "could not get pubkey from node ID")
}
assertedKey := convertToInterfacePubkey(pubkey)
id, err := peer.IDFromPublicKey(assertedKey)
if err != nil {
return nil, errors.Wrap(err, "could not get peer id")
}
multiAddrString := fmt.Sprintf("/ip4/%s/tcp/%d/p2p/%s", ip4.String(), node.TCP, id)
multiAddr, err := ma.NewMultiaddr(multiAddrString)
if err != nil {
return nil, errors.Wrap(err, "could not get multiaddr")
}
return multiAddr, nil
}
func manyMultiAddrsFromString(addrs []string) ([]ma.Multiaddr, error) {
var allAddrs []ma.Multiaddr
for _, stringAddr := range addrs {

View File

@@ -86,7 +86,7 @@ func TestStartDiscV5_DiscoverAllPeers(t *testing.T) {
nodes := lastListener.Lookup(bootNode.ID)
if len(nodes) != 6 {
t.Errorf("The node's local table doesn't have the expected number of nodes. "+
"Expected %d but got %d", 11, len(nodes))
"Expected %d but got %d", 6, len(nodes))
}
// Close all ports
@@ -106,7 +106,7 @@ func TestMultiAddrsConversion_InvalidIPAddr(t *testing.T) {
node := discv5.NewNode(nodeID, ipAddr, 0, 0)
_ = convertToMultiAddr([]*discv5.Node{node})
testutil.AssertLogsContain(t, hook, "Node doesn't have an ip4 address")
testutil.AssertLogsContain(t, hook, "node doesn't have an ip4 address")
}
func TestMultiAddrConversion_OK(t *testing.T) {

View File

@@ -13,32 +13,32 @@ var handshakes = make(map[peer.ID]*pb.Hello)
var handshakeLock sync.Mutex
// AddHandshake to the local records for initial sync.
func (p *Service) AddHandshake(pid peer.ID, hello *pb.Hello) {
func (s *Service) AddHandshake(pid peer.ID, hello *pb.Hello) {
handshakeLock.Lock()
defer handshakeLock.Unlock()
handshakes[pid] = hello
}
// Handshakes has not been implemented yet and it may be moved to regular sync...
func (p *Service) Handshakes() map[peer.ID]*pb.Hello {
func (s *Service) Handshakes() map[peer.ID]*pb.Hello {
return nil
}
// AddConnectionHandler adds a callback function which handles the connection with a
// newly added peer. It performs a handshake with that peer by sending a hello request
// and validating the response from the peer.
func (p *Service) AddConnectionHandler(reqFunc func(ctx context.Context, id peer.ID) error) {
p.host.Network().Notify(&network.NotifyBundle{
func (s *Service) AddConnectionHandler(reqFunc func(ctx context.Context, id peer.ID) error) {
s.host.Network().Notify(&network.NotifyBundle{
ConnectedF: func(net network.Network, conn network.Conn) {
// Must be handled in a goroutine as this callback cannot be blocking.
go func() {
ctx := context.Background()
log.WithField("peer", conn.RemotePeer()).Debug(
"Performing handshake with to peer",
"Performing handshake with peer",
)
if err := reqFunc(ctx, conn.RemotePeer()); err != nil {
log.WithError(err).Error("Could not send successful hello rpc request")
if err := p.Disconnect(conn.RemotePeer()); err != nil {
if err := s.Disconnect(conn.RemotePeer()); err != nil {
log.WithError(err).Errorf("Unable to close peer %s", conn.RemotePeer())
return
}
@@ -48,3 +48,19 @@ func (p *Service) AddConnectionHandler(reqFunc func(ctx context.Context, id peer
},
})
}
// addDisconnectionHandler ensures that previously disconnected peers aren't dialed again. Due
// to either their ports being closed, nodes are no longer active,etc.
func (s *Service) addDisconnectionHandler() {
s.host.Network().Notify(&network.NotifyBundle{
DisconnectedF: func(net network.Network, conn network.Conn) {
// Must be handled in a goroutine as this callback cannot be blocking.
go func() {
s.exclusionList.Set(conn.RemotePeer().String(), true, ttl)
log.WithField("peer", conn.RemotePeer()).Debug(
"Peer is added to exclusion list",
)
}()
},
})
}

View File

@@ -2,14 +2,16 @@ package p2p
import (
"context"
"crypto/ecdsa"
"strings"
"time"
"github.com/ethereum/go-ethereum/p2p/discv5"
"github.com/gogo/protobuf/proto"
libp2p "github.com/libp2p/go-libp2p"
"github.com/karlseguin/ccache"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/host"
network "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
pubsub "github.com/libp2p/go-libp2p-pubsub"
@@ -23,39 +25,44 @@ import (
var _ = shared.Service(&Service{})
var pollingPeriod = 1 * time.Second
var ttl = 1 * time.Hour
// Service for managing peer to peer (p2p) networking.
type Service struct {
ctx context.Context
cancel context.CancelFunc
started bool
cfg *Config
startupErr error
dv5Listener Listener
host host.Host
pubsub *pubsub.PubSub
ctx context.Context
cancel context.CancelFunc
started bool
cfg *Config
startupErr error
dv5Listener Listener
host host.Host
pubsub *pubsub.PubSub
exclusionList *ccache.Cache
privKey *ecdsa.PrivateKey
}
// NewService initializes a new p2p service compatible with shared.Service interface. No
// connections are made until the Start function is called during the service registry startup.
func NewService(cfg *Config) (*Service, error) {
var err error
ctx, cancel := context.WithCancel(context.Background())
s := &Service{
ctx: ctx,
cancel: cancel,
cfg: cfg,
ctx: ctx,
cancel: cancel,
cfg: cfg,
exclusionList: ccache.New(ccache.Configure()),
}
ipAddr := ipAddr(s.cfg)
privKey, err := privKey(s.cfg)
s.privKey, err = privKey(s.cfg)
if err != nil {
log.WithError(err).Error("Failed to generate p2p private key")
return nil, err
}
// TODO(3147): Add host options
opts := buildOptions(s.cfg, ipAddr, privKey)
opts := buildOptions(s.cfg, ipAddr, s.privKey)
h, err := libp2p.New(s.ctx, opts...)
if err != nil {
log.WithError(err).Error("Failed to create p2p host")
@@ -84,22 +91,23 @@ func (s *Service) Start() {
log.Error("Attempted to start p2p service when it was already started")
return
}
s.addDisconnectionHandler()
if s.cfg.BootstrapNodeAddr != "" && !s.cfg.NoDiscovery {
ipAddr := ipAddr(s.cfg)
privKey, err := privKey(s.cfg)
if err != nil {
s.startupErr = err
log.WithError(err).Error("Failed to generate p2p private key")
return
}
listener, err := startDiscoveryV5(ipAddr, privKey, s.cfg)
listener, err := startDiscoveryV5(ipAddr, s.privKey, s.cfg)
if err != nil {
log.WithError(err).Error("Failed to start discovery")
s.startupErr = err
return
}
err = s.addBootNodeToExclusionList()
if err != nil {
log.WithError(err).Error("Could not add bootnode to the exclusion list")
s.startupErr = err
return
}
s.dv5Listener = listener
go s.listenForNewNodes()
@@ -179,20 +187,14 @@ func (s *Service) Disconnect(pid peer.ID) error {
// listen for new nodes watches for new nodes in the network and adds them to the peerstore.
func (s *Service) listenForNewNodes() {
node, err := discv5.ParseNode(s.cfg.BootstrapNodeAddr)
if err != nil {
log.Fatalf("could not parse bootstrap address: %v", err)
}
nodeID := node.ID
nodes := make([]*discv5.Node, 10)
ticker := time.NewTicker(pollingPeriod)
for {
select {
case <-ticker.C:
nodes := s.dv5Listener.Lookup(nodeID)
multiAddresses := convertToMultiAddr(nodes)
num := s.dv5Listener.ReadRandomNodes(nodes)
multiAddresses := convertToMultiAddr(nodes[:num])
s.connectWithAllPeers(multiAddresses)
// store furthest node as the next to lookup
nodeID = nodes[len(nodes)-1].ID
case <-s.ctx.Done():
log.Debug("p2p context is closed, exiting routine")
break
@@ -211,12 +213,34 @@ func (s *Service) connectWithAllPeers(multiAddrs []ma.Multiaddr) {
if info.ID == s.host.ID() {
continue
}
if s.exclusionList.Get(info.ID.String()) != nil {
continue
}
if err := s.host.Connect(s.ctx, info); err != nil {
log.Errorf("Could not connect with peer: %v", err)
}
}
}
func (s *Service) addBootNodeToExclusionList() error {
bootNode, err := discv5.ParseNode(s.cfg.BootstrapNodeAddr)
if err != nil {
return err
}
multAddr, err := convertToSingleMultiAddr(bootNode)
if err != nil {
return err
}
addrInfo, err := peer.AddrInfoFromP2pAddr(multAddr)
if err != nil {
return err
}
// bootnode is never dialled, so ttl is tentatively 1 year
s.exclusionList.Set(addrInfo.ID.String(), true, 365*24*time.Hour)
return nil
}
func logIP4Addr(id peer.ID, addrs ...ma.Multiaddr) {
var correctAddr ma.Multiaddr
for _, addr := range addrs {

View File

@@ -165,6 +165,7 @@ func TestListenForNewNodes(t *testing.T) {
if len(peers) != 5 {
t.Errorf("Not all peers added to peerstore, wanted %d but got %d", 5, len(peers))
}
// close down all peers
for _, listener := range listeners {
listener.Close()

View File

@@ -20,8 +20,8 @@ import (
"github.com/ethereum/go-ethereum/p2p/discv5"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/prysmaticlabs/prysm/shared/version"
_ "go.uber.org/automaxprocs"
"github.com/sirupsen/logrus"
_ "go.uber.org/automaxprocs"
)
var (