From cec32cb99688963ee713a3a8023b1edbc036aa7d Mon Sep 17 00:00:00 2001 From: Simon <39431803+SimonSMH1015@users.noreply.github.com> Date: Tue, 11 Jul 2023 17:26:08 +0800 Subject: [PATCH] Append Dynamic Addinng Trusted Peer Apis (#12531) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Append Dynamic Addinng Trusted Peer Apis * Append unit tests for Dynamic Addinng Trusted Peer Apis * Update beacon-chain/p2p/peers/peerdata/store.go * Update beacon-chain/p2p/peers/peerdata/store_test.go * Update beacon-chain/p2p/peers/peerdata/store_test.go * Update beacon-chain/p2p/peers/peerdata/store_test.go * Update beacon-chain/p2p/peers/status.go * Update beacon-chain/p2p/peers/status_test.go * Update beacon-chain/p2p/peers/status_test.go * Update beacon-chain/rpc/eth/node/handlers.go * Update beacon-chain/rpc/eth/node/handlers.go * Update beacon-chain/rpc/eth/node/handlers.go * Update beacon-chain/rpc/eth/node/handlers.go * Move trusted peer apis from rpc/eth/v1/node to rpc/prysm/node; move peersToWatch into ensurePeerConnections function; * Update beacon-chain/rpc/prysm/node/server.go * Update beacon-chain/rpc/prysm/node/server.go * fix go lint problem * p2p/watch_peers.go: trusted peer makes AddrInfo structure by itself instead of using MakePeer(). p2p/service.go: add connectWithAllTrustedPeers function, before connectWithPeer, add trusted peer info into peer status. p2p/peers/status.go: trusted peers are not included, when pruning outdated and disconnected peers. * use readlock for GetTrustedPeers and IsTrustedPeers --------- Co-authored-by: simon Co-authored-by: Radosław Kapka Co-authored-by: Nishant Das --- beacon-chain/p2p/peers/peerdata/store.go | 19 ++ beacon-chain/p2p/peers/peerdata/store_test.go | 12 + beacon-chain/p2p/peers/status.go | 38 ++- beacon-chain/p2p/peers/status_test.go | 37 ++- beacon-chain/p2p/service.go | 27 +- beacon-chain/p2p/watch_peers.go | 44 ++- beacon-chain/rpc/BUILD.bazel | 1 + beacon-chain/rpc/prysm/node/BUILD.bazel | 49 ++++ beacon-chain/rpc/prysm/node/handlers.go | 177 +++++++++++++ beacon-chain/rpc/prysm/node/handlers_test.go | 250 ++++++++++++++++++ beacon-chain/rpc/prysm/node/server.go | 21 ++ beacon-chain/rpc/prysm/node/server_test.go | 1 + beacon-chain/rpc/prysm/node/structs.go | 17 ++ beacon-chain/rpc/service.go | 17 ++ 14 files changed, 691 insertions(+), 19 deletions(-) create mode 100644 beacon-chain/rpc/prysm/node/BUILD.bazel create mode 100644 beacon-chain/rpc/prysm/node/handlers.go create mode 100644 beacon-chain/rpc/prysm/node/handlers_test.go create mode 100644 beacon-chain/rpc/prysm/node/server.go create mode 100644 beacon-chain/rpc/prysm/node/server_test.go create mode 100644 beacon-chain/rpc/prysm/node/structs.go diff --git a/beacon-chain/p2p/peers/peerdata/store.go b/beacon-chain/p2p/peers/peerdata/store.go index 08203ac4a1..294cb449e5 100644 --- a/beacon-chain/p2p/peers/peerdata/store.go +++ b/beacon-chain/p2p/peers/peerdata/store.go @@ -108,12 +108,31 @@ func (s *Store) DeletePeerData(pid peer.ID) { } // SetTrustedPeers sets our desired trusted peer set. +// Important: it is assumed that store mutex is locked when calling this method. func (s *Store) SetTrustedPeers(peers []peer.ID) { for _, p := range peers { s.trustedPeers[p] = true } } +// GetTrustedPeers gets our desired trusted peer ids. +// Important: it is assumed that store mutex is locked when calling this method. +func (s *Store) GetTrustedPeers() []peer.ID { + peers := []peer.ID{} + for p := range s.trustedPeers { + peers = append(peers, p) + } + return peers +} + +// DeleteTrustedPeers removes peers from trusted peer set. +// Important: it is assumed that store mutex is locked when calling this method. +func (s *Store) DeleteTrustedPeers(peers []peer.ID) { + for _, p := range peers { + delete(s.trustedPeers, p) + } +} + // Peers returns map of peer data objects. // Important: it is assumed that store mutex is locked when calling this method. func (s *Store) Peers() map[peer.ID]*PeerData { diff --git a/beacon-chain/p2p/peers/peerdata/store_test.go b/beacon-chain/p2p/peers/peerdata/store_test.go index 421ebd6115..9dc975fc98 100644 --- a/beacon-chain/p2p/peers/peerdata/store_test.go +++ b/beacon-chain/p2p/peers/peerdata/store_test.go @@ -96,4 +96,16 @@ func TestStore_TrustedPeers(t *testing.T) { assert.Equal(t, true, store.IsTrustedPeer(pid1)) assert.Equal(t, true, store.IsTrustedPeer(pid2)) assert.Equal(t, true, store.IsTrustedPeer(pid3)) + + tPeers = store.GetTrustedPeers() + assert.Equal(t, 3, len(tPeers)) + + store.DeleteTrustedPeers(tPeers) + tPeers = store.GetTrustedPeers() + assert.Equal(t, 0, len(tPeers)) + + assert.Equal(t, false, store.IsTrustedPeer(pid1)) + assert.Equal(t, false, store.IsTrustedPeer(pid2)) + assert.Equal(t, false, store.IsTrustedPeer(pid3)) + } diff --git a/beacon-chain/p2p/peers/status.go b/beacon-chain/p2p/peers/status.go index e09b1ff95d..76a7dfe929 100644 --- a/beacon-chain/p2p/peers/status.go +++ b/beacon-chain/p2p/peers/status.go @@ -560,6 +560,9 @@ func (p *Status) Prune() { notBadPeer := func(pid peer.ID) bool { return !p.isBad(pid) } + notTrustedPeer := func(pid peer.ID) bool { + return !p.isTrustedPeers(pid) + } type peerResp struct { pid peer.ID score float64 @@ -567,7 +570,8 @@ func (p *Status) Prune() { peersToPrune := make([]*peerResp, 0) // Select disconnected peers with a smaller bad response count. for pid, peerData := range p.store.Peers() { - if peerData.ConnState == PeerDisconnected && notBadPeer(pid) { + // Should not prune trusted peer or prune the peer dara and unset trusted peer. + if peerData.ConnState == PeerDisconnected && notBadPeer(pid) && notTrustedPeer(pid) { peersToPrune = append(peersToPrune, &peerResp{ pid: pid, score: p.Scorers().ScoreNoLock(pid), @@ -608,6 +612,9 @@ func (p *Status) deprecatedPrune() { notBadPeer := func(peerData *peerdata.PeerData) bool { return peerData.BadResponses < p.scorers.BadResponsesScorer().Params().Threshold } + notTrustedPeer := func(pid peer.ID) bool { + return !p.isTrustedPeers(pid) + } type peerResp struct { pid peer.ID badResp int @@ -615,7 +622,8 @@ func (p *Status) deprecatedPrune() { peersToPrune := make([]*peerResp, 0) // Select disconnected peers with a smaller bad response count. for pid, peerData := range p.store.Peers() { - if peerData.ConnState == PeerDisconnected && notBadPeer(peerData) { + // Should not prune trusted peer or prune the peer dara and unset trusted peer. + if peerData.ConnState == PeerDisconnected && notBadPeer(peerData) && notTrustedPeer(pid) { peersToPrune = append(peersToPrune, &peerResp{ pid: pid, badResp: peerData.BadResponses, @@ -912,6 +920,32 @@ func (p *Status) SetTrustedPeers(peers []peer.ID) { p.store.SetTrustedPeers(peers) } +// GetTrustedPeers returns a list of all trusted peers' ids +func (p *Status) GetTrustedPeers() []peer.ID { + p.store.RLock() + defer p.store.RUnlock() + return p.store.GetTrustedPeers() +} + +// DeleteTrustedPeers removes peers from trusted peer set +func (p *Status) DeleteTrustedPeers(peers []peer.ID) { + p.store.Lock() + defer p.store.Unlock() + p.store.DeleteTrustedPeers(peers) +} + +// IsTrustedPeers returns if given peer is a Trusted peer +func (p *Status) IsTrustedPeers(pid peer.ID) bool { + p.store.RLock() + defer p.store.RUnlock() + return p.isTrustedPeers(pid) +} + +// isTrustedPeers is the lock-free version of IsTrustedPeers. +func (p *Status) isTrustedPeers(pid peer.ID) bool { + return p.store.IsTrustedPeer(pid) +} + // this method assumes the store lock is acquired before // executing the method. func (p *Status) isfromBadIP(pid peer.ID) bool { diff --git a/beacon-chain/p2p/peers/status_test.go b/beacon-chain/p2p/peers/status_test.go index 398e542aae..28d940dc30 100644 --- a/beacon-chain/p2p/peers/status_test.go +++ b/beacon-chain/p2p/peers/status_test.go @@ -802,6 +802,11 @@ func TestPrunePeers_TrustedPeers(t *testing.T) { } } p.SetTrustedPeers(trustedPeers) + + // Assert we have correct trusted peers + trustedPeers = p.GetTrustedPeers() + assert.Equal(t, 6, len(trustedPeers)) + // Assert all peers more than max are prunable. peersToPrune = p.PeersToPrune() assert.Equal(t, 16, len(peersToPrune)) @@ -812,6 +817,34 @@ func TestPrunePeers_TrustedPeers(t *testing.T) { assert.NotEqual(t, pid.String(), tPid.String()) } } + + // Add more peers to check if trusted peers can be pruned after they are deleted from trusted peer set. + for i := 0; i < 9; i++ { + // Peer added to peer handler. + createPeer(t, p, nil, network.DirInbound, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED)) + } + + // Delete trusted peers. + p.DeleteTrustedPeers(trustedPeers) + + peersToPrune = p.PeersToPrune() + assert.Equal(t, 25, len(peersToPrune)) + + // Check that trusted peers are pruned. + for _, tPid := range trustedPeers { + pruned := false + for _, pid := range peersToPrune { + if pid.String() == tPid.String() { + pruned = true + } + } + assert.Equal(t, true, pruned) + } + + // Assert have zero trusted peers + trustedPeers = p.GetTrustedPeers() + assert.Equal(t, 0, len(trustedPeers)) + for _, pid := range peersToPrune { dir, err := p.Direction(pid) require.NoError(t, err) @@ -821,8 +854,8 @@ func TestPrunePeers_TrustedPeers(t *testing.T) { // Ensure it is in the descending order. currScore := p.Scorers().Score(peersToPrune[0]) for _, pid := range peersToPrune { - score := p.Scorers().BadResponsesScorer().Score(pid) - assert.Equal(t, true, currScore >= score) + score := p.Scorers().Score(pid) + assert.Equal(t, true, currScore <= score) currScore = score } } diff --git a/beacon-chain/p2p/service.go b/beacon-chain/p2p/service.go index 8d7f0ba939..8a883b0285 100644 --- a/beacon-chain/p2p/service.go +++ b/beacon-chain/p2p/service.go @@ -174,9 +174,9 @@ func (s *Service) Start() { s.awaitStateInitialized() s.isPreGenesis = false - var peersToWatch []string + var relayNodes []string if s.cfg.RelayNodeAddr != "" { - peersToWatch = append(peersToWatch, s.cfg.RelayNodeAddr) + relayNodes = append(relayNodes, s.cfg.RelayNodeAddr) if err := dialRelayNode(s.ctx, s.host, s.cfg.RelayNodeAddr); err != nil { log.WithError(err).Errorf("Could not dial relay node") } @@ -213,8 +213,7 @@ func (s *Service) Start() { // Set trusted peers for those that are provided as static addresses. pids := peerIdsFromMultiAddrs(addrs) s.peers.SetTrustedPeers(pids) - peersToWatch = append(peersToWatch, s.cfg.StaticPeers...) - s.connectWithAllPeers(addrs) + s.connectWithAllTrustedPeers(addrs) } // Initialize metadata according to the // current epoch. @@ -226,7 +225,7 @@ func (s *Service) Start() { // Periodic functions. async.RunEvery(s.ctx, params.BeaconNetworkConfig().TtfbTimeout, func() { - ensurePeerConnections(s.ctx, s.host, peersToWatch...) + ensurePeerConnections(s.ctx, s.host, s.peers, relayNodes...) }) async.RunEvery(s.ctx, 30*time.Minute, s.Peers().Prune) async.RunEvery(s.ctx, params.BeaconNetworkConfig().RespTimeout, s.updateMetrics) @@ -399,6 +398,24 @@ func (s *Service) awaitStateInitialized() { } } +func (s *Service) connectWithAllTrustedPeers(multiAddrs []multiaddr.Multiaddr) { + addrInfos, err := peer.AddrInfosFromP2pAddrs(multiAddrs...) + if err != nil { + log.WithError(err).Error("Could not convert to peer address info's from multiaddresses") + return + } + for _, info := range addrInfos { + // add peer into peer status + s.peers.Add(nil, info.ID, info.Addrs[0], network.DirUnknown) + // make each dial non-blocking + go func(info peer.AddrInfo) { + if err := s.connectWithPeer(s.ctx, info); err != nil { + log.WithError(err).Tracef("Could not connect with peer %s", info.String()) + } + }(info) + } +} + func (s *Service) connectWithAllPeers(multiAddrs []multiaddr.Multiaddr) { addrInfos, err := peer.AddrInfosFromP2pAddrs(multiAddrs...) if err != nil { diff --git a/beacon-chain/p2p/watch_peers.go b/beacon-chain/p2p/watch_peers.go index 0766ddbeda..d34807b00a 100644 --- a/beacon-chain/p2p/watch_peers.go +++ b/beacon-chain/p2p/watch_peers.go @@ -5,28 +5,52 @@ import ( "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" + ma "github.com/multiformats/go-multiaddr" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/peers" ) // ensurePeerConnections will attempt to reestablish connection to the peers // if there are currently no connections to that peer. -func ensurePeerConnections(ctx context.Context, h host.Host, peers ...string) { - if len(peers) == 0 { - return - } - for _, p := range peers { - if p == "" { +func ensurePeerConnections(ctx context.Context, h host.Host, peers *peers.Status, relayNodes ...string) { + // every time reset peersToWatch, add RelayNodes and trust peers + var peersToWatch []*peer.AddrInfo + + // add RelayNodes + for _, node := range relayNodes { + if node == "" { continue } - peerInfo, err := MakePeer(p) + peerInfo, err := MakePeer(node) if err != nil { log.WithError(err).Error("Could not make peer") continue } + peersToWatch = append(peersToWatch, peerInfo) + } - c := h.Network().ConnsToPeer(peerInfo.ID) + // add trusted peers + trustedPeers := peers.GetTrustedPeers() + for _, trustedPeer := range trustedPeers { + maddr, err := peers.Address(trustedPeer) + + // avoid invalid trusted peers + if err != nil || maddr == nil { + log.WithField("peer", trustedPeers).WithError(err).Error("Could not get peer address") + continue + } + peerInfo := &peer.AddrInfo{ID: trustedPeer} + peerInfo.Addrs = []ma.Multiaddr{maddr} + peersToWatch = append(peersToWatch, peerInfo) + } + + if len(peersToWatch) == 0 { + return + } + for _, p := range peersToWatch { + c := h.Network().ConnsToPeer(p.ID) if len(c) == 0 { - if err := connectWithTimeout(ctx, h, peerInfo); err != nil { - log.WithField("peer", peerInfo.ID).WithField("addrs", peerInfo.Addrs).WithError(err).Errorf("Failed to reconnect to peer") + if err := connectWithTimeout(ctx, h, p); err != nil { + log.WithField("peer", p.ID).WithField("addrs", p.Addrs).WithError(err).Errorf("Failed to reconnect to peer") continue } } diff --git a/beacon-chain/rpc/BUILD.bazel b/beacon-chain/rpc/BUILD.bazel index 4a3ddc9c1a..0d63ef84f7 100644 --- a/beacon-chain/rpc/BUILD.bazel +++ b/beacon-chain/rpc/BUILD.bazel @@ -32,6 +32,7 @@ go_library( "//beacon-chain/rpc/eth/rewards:go_default_library", "//beacon-chain/rpc/eth/validator:go_default_library", "//beacon-chain/rpc/lookup:go_default_library", + "//beacon-chain/rpc/prysm/node:go_default_library", "//beacon-chain/rpc/prysm/v1alpha1/beacon:go_default_library", "//beacon-chain/rpc/prysm/v1alpha1/debug:go_default_library", "//beacon-chain/rpc/prysm/v1alpha1/node:go_default_library", diff --git a/beacon-chain/rpc/prysm/node/BUILD.bazel b/beacon-chain/rpc/prysm/node/BUILD.bazel new file mode 100644 index 0000000000..d786484940 --- /dev/null +++ b/beacon-chain/rpc/prysm/node/BUILD.bazel @@ -0,0 +1,49 @@ +load("@prysm//tools/go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = [ + "handlers.go", + "server.go", + "structs.go", + ], + importpath = "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/prysm/node", + visibility = ["//beacon-chain:__subpackages__"], + deps = [ + "//beacon-chain/blockchain:go_default_library", + "//beacon-chain/db:go_default_library", + "//beacon-chain/execution:go_default_library", + "//beacon-chain/p2p:go_default_library", + "//beacon-chain/p2p/peers:go_default_library", + "//beacon-chain/p2p/peers/peerdata:go_default_library", + "//beacon-chain/sync:go_default_library", + "//network:go_default_library", + "//proto/prysm/v1alpha1:go_default_library", + "@com_github_libp2p_go_libp2p//core/network:go_default_library", + "@com_github_libp2p_go_libp2p//core/peer:go_default_library", + "@com_github_pkg_errors//:go_default_library", + ], +) + +go_test( + name = "go_default_test", + srcs = [ + "handlers_test.go", + "server_test.go", + ], + embed = [":go_default_library"], + deps = [ + "//beacon-chain/p2p:go_default_library", + "//beacon-chain/p2p/peers:go_default_library", + "//beacon-chain/p2p/testing:go_default_library", + "//network:go_default_library", + "//testing/assert:go_default_library", + "//testing/require:go_default_library", + "@com_github_ethereum_go_ethereum//p2p/enode:go_default_library", + "@com_github_ethereum_go_ethereum//p2p/enr:go_default_library", + "@com_github_libp2p_go_libp2p//core/network:go_default_library", + "@com_github_libp2p_go_libp2p//core/peer:go_default_library", + "@com_github_libp2p_go_libp2p//p2p/host/peerstore/test:go_default_library", + "@com_github_multiformats_go_multiaddr//:go_default_library", + ], +) diff --git a/beacon-chain/rpc/prysm/node/handlers.go b/beacon-chain/rpc/prysm/node/handlers.go new file mode 100644 index 0000000000..ecc8416663 --- /dev/null +++ b/beacon-chain/rpc/prysm/node/handlers.go @@ -0,0 +1,177 @@ +package node + +import ( + "encoding/json" + "io" + "net/http" + "strings" + + corenet "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/peers" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/peers/peerdata" + "github.com/prysmaticlabs/prysm/v4/network" + eth "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" +) + +// ListTrustedPeer retrieves data about the node's trusted peers. +func (s *Server) ListTrustedPeer(w http.ResponseWriter, r *http.Request) { + peerStatus := s.PeersFetcher.Peers() + allIds := s.PeersFetcher.Peers().GetTrustedPeers() + allPeers := make([]*Peer, 0, len(allIds)) + for _, id := range allIds { + p, err := httpPeerInfo(peerStatus, id) + if err != nil { + errJson := &network.DefaultErrorJson{ + Message: errors.Wrapf(err, "Could not get peer info").Error(), + Code: http.StatusInternalServerError, + } + network.WriteError(w, errJson) + return + } + // peers added into trusted set but never connected should also be listed + if p == nil { + p = &Peer{ + PeerID: id.String(), + Enr: "", + LastSeenP2PAddress: "", + State: eth.ConnectionState(corenet.NotConnected).String(), + Direction: eth.PeerDirection(corenet.DirUnknown).String(), + } + } + allPeers = append(allPeers, p) + } + response := &PeersResponse{Peers: allPeers} + network.WriteJson(w, response) +} + +// AddTrustedPeer adds a new peer into node's trusted peer set by Multiaddr +func (s *Server) AddTrustedPeer(w http.ResponseWriter, r *http.Request) { + body, err := io.ReadAll(r.Body) + if err != nil { + errJson := &network.DefaultErrorJson{ + Message: errors.Wrapf(err, "Could not read request body").Error(), + Code: http.StatusInternalServerError, + } + network.WriteError(w, errJson) + return + } + var addrRequest *AddrRequest + err = json.Unmarshal(body, &addrRequest) + if err != nil { + errJson := &network.DefaultErrorJson{ + Message: errors.Wrapf(err, "Could not decode request body into peer address").Error(), + Code: http.StatusBadRequest, + } + network.WriteError(w, errJson) + return + } + info, err := peer.AddrInfoFromString(addrRequest.Addr) + if err != nil { + errJson := &network.DefaultErrorJson{ + Message: errors.Wrapf(err, "Could not derive peer info from multiaddress").Error(), + Code: http.StatusBadRequest, + } + network.WriteError(w, errJson) + return + } + + // also add new peerdata to peers + direction, err := s.PeersFetcher.Peers().Direction(info.ID) + if err != nil { + s.PeersFetcher.Peers().Add(nil, info.ID, info.Addrs[0], corenet.DirUnknown) + } else { + s.PeersFetcher.Peers().Add(nil, info.ID, info.Addrs[0], direction) + } + + peers := []peer.ID{} + peers = append(peers, info.ID) + s.PeersFetcher.Peers().SetTrustedPeers(peers) + w.WriteHeader(http.StatusOK) +} + +// RemoveTrustedPeer removes peer from our trusted peer set but does not close connection. +func (s *Server) RemoveTrustedPeer(w http.ResponseWriter, r *http.Request) { + segments := strings.Split(r.URL.Path, "/") + id := segments[len(segments)-1] + peerId, err := peer.Decode(id) + if err != nil { + errJson := &network.DefaultErrorJson{ + Message: errors.Wrapf(err, "Could not decode peer id").Error(), + Code: http.StatusBadRequest, + } + network.WriteError(w, errJson) + return + } + + // if the peer is not a trusted peer, do nothing but return 200 + if !s.PeersFetcher.Peers().IsTrustedPeers(peerId) { + w.WriteHeader(http.StatusOK) + return + } + + peers := []peer.ID{} + peers = append(peers, peerId) + s.PeersFetcher.Peers().DeleteTrustedPeers(peers) + w.WriteHeader(http.StatusOK) +} + +// httpPeerInfo does the same thing as peerInfo function in node.go but returns the +// http peer response. +func httpPeerInfo(peerStatus *peers.Status, id peer.ID) (*Peer, error) { + enr, err := peerStatus.ENR(id) + if err != nil { + if errors.Is(err, peerdata.ErrPeerUnknown) { + return nil, nil + } + return nil, errors.Wrap(err, "could not obtain ENR") + } + var serializedEnr string + if enr != nil { + serializedEnr, err = p2p.SerializeENR(enr) + if err != nil { + return nil, errors.Wrap(err, "could not serialize ENR") + } + } + address, err := peerStatus.Address(id) + if err != nil { + if errors.Is(err, peerdata.ErrPeerUnknown) { + return nil, nil + } + return nil, errors.Wrap(err, "could not obtain address") + } + connectionState, err := peerStatus.ConnectionState(id) + if err != nil { + if errors.Is(err, peerdata.ErrPeerUnknown) { + return nil, nil + } + return nil, errors.Wrap(err, "could not obtain connection state") + } + direction, err := peerStatus.Direction(id) + if err != nil { + if errors.Is(err, peerdata.ErrPeerUnknown) { + return nil, nil + } + return nil, errors.Wrap(err, "could not obtain direction") + } + if eth.PeerDirection(direction) == eth.PeerDirection_UNKNOWN { + return nil, nil + } + v1ConnState := eth.ConnectionState(connectionState).String() + v1PeerDirection := eth.PeerDirection(direction).String() + p := Peer{ + PeerID: id.String(), + State: v1ConnState, + Direction: v1PeerDirection, + } + if address != nil { + p.LastSeenP2PAddress = address.String() + } + if serializedEnr != "" { + p.Enr = "enr:" + serializedEnr + } + + return &p, nil +} diff --git a/beacon-chain/rpc/prysm/node/handlers_test.go b/beacon-chain/rpc/prysm/node/handlers_test.go new file mode 100644 index 0000000000..946fc058cf --- /dev/null +++ b/beacon-chain/rpc/prysm/node/handlers_test.go @@ -0,0 +1,250 @@ +package node + +import ( + "bytes" + "encoding/json" + "net/http" + "net/http/httptest" + "strconv" + "testing" + + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/p2p/enr" + corenet "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" + libp2ptest "github.com/libp2p/go-libp2p/p2p/host/peerstore/test" + ma "github.com/multiformats/go-multiaddr" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/peers" + mockp2p "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/testing" + "github.com/prysmaticlabs/prysm/v4/network" + "github.com/prysmaticlabs/prysm/v4/testing/assert" + "github.com/prysmaticlabs/prysm/v4/testing/require" +) + +type testIdentity enode.ID + +func (_ testIdentity) Verify(_ *enr.Record, _ []byte) error { return nil } +func (id testIdentity) NodeAddr(_ *enr.Record) []byte { return id[:] } + +func TestListTrustedPeer(t *testing.T) { + ids := libp2ptest.GeneratePeerIDs(9) + peerFetcher := &mockp2p.MockPeersProvider{} + peerFetcher.ClearPeers() + peerStatus := peerFetcher.Peers() + + for i, id := range ids { + if i == len(ids)-1 { + var p2pAddr = "/ip4/127.0.0." + strconv.Itoa(i) + "/udp/12000/p2p/16Uiu2HAm7yD5fhhw1Kihg5pffaGbvKV3k7sqxRGHMZzkb7u9UUxQ" + p2pMultiAddr, err := ma.NewMultiaddr(p2pAddr) + require.NoError(t, err) + peerStatus.Add(nil, id, p2pMultiAddr, corenet.DirUnknown) + continue + } + enrRecord := &enr.Record{} + err := enrRecord.SetSig(testIdentity{1}, []byte{42}) + require.NoError(t, err) + enrRecord.Set(enr.IPv4{127, 0, 0, byte(i)}) + err = enrRecord.SetSig(testIdentity{}, []byte{}) + require.NoError(t, err) + var p2pAddr = "/ip4/127.0.0." + strconv.Itoa(i) + "/udp/12000/p2p/16Uiu2HAm7yD5fhhw1Kihg5pffaGbvKV3k7sqxRGHMZzkb7u9UUxQ" + p2pMultiAddr, err := ma.NewMultiaddr(p2pAddr) + require.NoError(t, err) + + var direction corenet.Direction + if i%2 == 0 { + direction = corenet.DirInbound + } else { + direction = corenet.DirOutbound + } + peerStatus.Add(enrRecord, id, p2pMultiAddr, direction) + + switch i { + case 0, 1: + peerStatus.SetConnectionState(id, peers.PeerConnecting) + case 2, 3: + peerStatus.SetConnectionState(id, peers.PeerConnected) + case 4, 5: + peerStatus.SetConnectionState(id, peers.PeerDisconnecting) + case 6, 7: + peerStatus.SetConnectionState(id, peers.PeerDisconnected) + default: + t.Fatalf("Failed to set connection state for peer") + } + } + + s := Server{PeersFetcher: peerFetcher} + // set all peers as trusted peers + s.PeersFetcher.Peers().SetTrustedPeers(ids) + + t.Run("Peer data OK", func(t *testing.T) { + url := "http://anything.is.fine" + request := httptest.NewRequest("GET", url, nil) + writer := httptest.NewRecorder() + writer.Body = &bytes.Buffer{} + s.ListTrustedPeer(writer, request) + assert.Equal(t, http.StatusOK, writer.Code) + resp := &PeersResponse{} + require.NoError(t, json.Unmarshal(writer.Body.Bytes(), resp)) + peers := resp.Peers + // assert number of trusted peer is right + assert.Equal(t, 9, len(peers)) + + for i := 0; i < 9; i++ { + pid, err := peer.Decode(peers[i].PeerID) + require.NoError(t, err) + if pid == ids[8] { + assert.Equal(t, "", peers[i].Enr) + assert.Equal(t, "", peers[i].LastSeenP2PAddress) + assert.Equal(t, "DISCONNECTED", peers[i].State) + assert.Equal(t, "UNKNOWN", peers[i].Direction) + continue + } + expectedEnr, err := peerStatus.ENR(pid) + require.NoError(t, err) + serializeENR, err := p2p.SerializeENR(expectedEnr) + require.NoError(t, err) + assert.Equal(t, "enr:"+serializeENR, peers[i].Enr) + expectedP2PAddr, err := peerStatus.Address(pid) + require.NoError(t, err) + assert.Equal(t, expectedP2PAddr.String(), peers[i].LastSeenP2PAddress) + switch pid { + case ids[0]: + assert.Equal(t, "CONNECTING", peers[i].State) + assert.Equal(t, "INBOUND", peers[i].Direction) + case ids[1]: + assert.Equal(t, "CONNECTING", peers[i].State) + assert.Equal(t, "OUTBOUND", peers[i].Direction) + case ids[2]: + assert.Equal(t, "CONNECTED", peers[i].State) + assert.Equal(t, "INBOUND", peers[i].Direction) + case ids[3]: + assert.Equal(t, "CONNECTED", peers[i].State) + assert.Equal(t, "OUTBOUND", peers[i].Direction) + case ids[4]: + assert.Equal(t, "DISCONNECTING", peers[i].State) + assert.Equal(t, "INBOUND", peers[i].Direction) + case ids[5]: + assert.Equal(t, "DISCONNECTING", peers[i].State) + assert.Equal(t, "OUTBOUND", peers[i].Direction) + case ids[6]: + assert.Equal(t, "DISCONNECTED", peers[i].State) + assert.Equal(t, "INBOUND", peers[i].Direction) + case ids[7]: + assert.Equal(t, "DISCONNECTED", peers[i].State) + assert.Equal(t, "OUTBOUND", peers[i].Direction) + default: + t.Fatalf("Failed to get connection state and direction for peer") + } + } + }) +} + +func TestListTrustedPeers_NoPeersReturnsEmptyArray(t *testing.T) { + peerFetcher := &mockp2p.MockPeersProvider{} + peerFetcher.ClearPeers() + s := Server{PeersFetcher: peerFetcher} + + url := "http://anything.is.fine" + request := httptest.NewRequest("GET", url, nil) + writer := httptest.NewRecorder() + writer.Body = &bytes.Buffer{} + s.ListTrustedPeer(writer, request) + assert.Equal(t, http.StatusOK, writer.Code) + resp := &PeersResponse{} + require.NoError(t, json.Unmarshal(writer.Body.Bytes(), resp)) + peers := resp.Peers + assert.Equal(t, 0, len(peers)) +} + +func TestAddTrustedPeer(t *testing.T) { + peerFetcher := &mockp2p.MockPeersProvider{} + peerFetcher.ClearPeers() + s := Server{PeersFetcher: peerFetcher} + + url := "http://anything.is.fine" + addr := &AddrRequest{ + Addr: "/ip4/127.0.0.1/tcp/30303/p2p/16Uiu2HAm1n583t4huDMMqEUUBuQs6bLts21mxCfX3tiqu9JfHvRJ", + } + addrJson, err := json.Marshal(addr) + require.NoError(t, err) + var body bytes.Buffer + _, err = body.Write(addrJson) + require.NoError(t, err) + request := httptest.NewRequest("POST", url, &body) + writer := httptest.NewRecorder() + writer.Body = &bytes.Buffer{} + s.AddTrustedPeer(writer, request) + assert.Equal(t, http.StatusOK, writer.Code) +} + +func TestAddTrustedPeer_EmptyBody(t *testing.T) { + peerFetcher := &mockp2p.MockPeersProvider{} + peerFetcher.ClearPeers() + s := Server{PeersFetcher: peerFetcher} + + url := "http://anything.is.fine" + request := httptest.NewRequest("POST", url, nil) + writer := httptest.NewRecorder() + writer.Body = &bytes.Buffer{} + s.AddTrustedPeer(writer, request) + e := &network.DefaultErrorJson{} + require.NoError(t, json.Unmarshal(writer.Body.Bytes(), e)) + assert.Equal(t, http.StatusBadRequest, writer.Code) + assert.Equal(t, "Could not decode request body into peer address: unexpected end of JSON input", e.Message) + +} + +func TestAddTrustedPeer_BadAddress(t *testing.T) { + peerFetcher := &mockp2p.MockPeersProvider{} + peerFetcher.ClearPeers() + s := Server{PeersFetcher: peerFetcher} + + url := "http://anything.is.fine" + addr := &AddrRequest{ + Addr: "anything/but/not/an/address", + } + addrJson, err := json.Marshal(addr) + require.NoError(t, err) + var body bytes.Buffer + _, err = body.Write(addrJson) + require.NoError(t, err) + request := httptest.NewRequest("POST", url, &body) + writer := httptest.NewRecorder() + writer.Body = &bytes.Buffer{} + s.AddTrustedPeer(writer, request) + e := &network.DefaultErrorJson{} + require.NoError(t, json.Unmarshal(writer.Body.Bytes(), e)) + assert.Equal(t, http.StatusBadRequest, writer.Code) + assert.StringContains(t, "Could not derive peer info from multiaddress", e.Message) +} + +func TestRemoveTrustedPeer(t *testing.T) { + peerFetcher := &mockp2p.MockPeersProvider{} + peerFetcher.ClearPeers() + s := Server{PeersFetcher: peerFetcher} + + url := "http://anything.is.fine.but.last.is.important/16Uiu2HAm1n583t4huDMMqEUUBuQs6bLts21mxCfX3tiqu9JfHvRJ" + request := httptest.NewRequest("DELETE", url, nil) + writer := httptest.NewRecorder() + writer.Body = &bytes.Buffer{} + s.RemoveTrustedPeer(writer, request) + assert.Equal(t, http.StatusOK, writer.Code) + +} + +func TestRemoveTrustedPeer_EmptyParameter(t *testing.T) { + peerFetcher := &mockp2p.MockPeersProvider{} + peerFetcher.ClearPeers() + s := Server{PeersFetcher: peerFetcher} + + url := "http://anything.is.fine" + request := httptest.NewRequest("DELETE", url, nil) + writer := httptest.NewRecorder() + writer.Body = &bytes.Buffer{} + s.RemoveTrustedPeer(writer, request) + e := &network.DefaultErrorJson{} + require.NoError(t, json.Unmarshal(writer.Body.Bytes(), e)) + assert.Equal(t, http.StatusBadRequest, writer.Code) + assert.Equal(t, "Could not decode peer id: failed to parse peer ID: invalid cid: cid too short", e.Message) +} diff --git a/beacon-chain/rpc/prysm/node/server.go b/beacon-chain/rpc/prysm/node/server.go new file mode 100644 index 0000000000..53de7ae07e --- /dev/null +++ b/beacon-chain/rpc/prysm/node/server.go @@ -0,0 +1,21 @@ +package node + +import ( + "github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/db" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/execution" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync" +) + +type Server struct { + SyncChecker sync.Checker + OptimisticModeFetcher blockchain.OptimisticModeFetcher + BeaconDB db.ReadOnlyDatabase + PeersFetcher p2p.PeersProvider + PeerManager p2p.PeerManager + MetadataProvider p2p.MetadataProvider + GenesisTimeFetcher blockchain.TimeFetcher + HeadFetcher blockchain.HeadFetcher + ExecutionChainInfoFetcher execution.ChainInfoFetcher +} diff --git a/beacon-chain/rpc/prysm/node/server_test.go b/beacon-chain/rpc/prysm/node/server_test.go new file mode 100644 index 0000000000..2b4023a62f --- /dev/null +++ b/beacon-chain/rpc/prysm/node/server_test.go @@ -0,0 +1 @@ +package node diff --git a/beacon-chain/rpc/prysm/node/structs.go b/beacon-chain/rpc/prysm/node/structs.go new file mode 100644 index 0000000000..7dbe3c8d46 --- /dev/null +++ b/beacon-chain/rpc/prysm/node/structs.go @@ -0,0 +1,17 @@ +package node + +type AddrRequest struct { + Addr string `json:"addr"` +} + +type PeersResponse struct { + Peers []*Peer `json:"Peers"` +} + +type Peer struct { + PeerID string `json:"peer_id"` + Enr string `json:"enr"` + LastSeenP2PAddress string `json:"last_seen_p2p_address"` + State string `json:"state"` + Direction string `json:"direction"` +} diff --git a/beacon-chain/rpc/service.go b/beacon-chain/rpc/service.go index f47ecffeb6..9ecb2b95e0 100644 --- a/beacon-chain/rpc/service.go +++ b/beacon-chain/rpc/service.go @@ -37,6 +37,7 @@ import ( "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/eth/rewards" "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/eth/validator" "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/lookup" + nodeprysm "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/prysm/node" beaconv1alpha1 "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/prysm/v1alpha1/beacon" debugv1alpha1 "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/prysm/v1alpha1/debug" nodev1alpha1 "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/prysm/v1alpha1/node" @@ -307,6 +308,22 @@ func (s *Service) Start() { ExecutionChainInfoFetcher: s.cfg.ExecutionChainInfoFetcher, } + nodeServerPrysm := &nodeprysm.Server{ + BeaconDB: s.cfg.BeaconDB, + SyncChecker: s.cfg.SyncService, + OptimisticModeFetcher: s.cfg.OptimisticModeFetcher, + GenesisTimeFetcher: s.cfg.GenesisTimeFetcher, + PeersFetcher: s.cfg.PeersFetcher, + PeerManager: s.cfg.PeerManager, + MetadataProvider: s.cfg.MetadataProvider, + HeadFetcher: s.cfg.HeadFetcher, + ExecutionChainInfoFetcher: s.cfg.ExecutionChainInfoFetcher, + } + + s.cfg.Router.HandleFunc("/prysm/node/trusted_peers", nodeServerPrysm.ListTrustedPeer).Methods("GET") + s.cfg.Router.HandleFunc("/prysm/node/trusted_peers", nodeServerPrysm.AddTrustedPeer).Methods("POST") + s.cfg.Router.HandleFunc("/prysm/node/trusted_peers/{peer_id}", nodeServerPrysm.RemoveTrustedPeer).Methods("Delete") + beaconChainServer := &beaconv1alpha1.Server{ Ctx: s.ctx, BeaconDB: s.cfg.BeaconDB,