Append Dynamic Addinng Trusted Peer Apis (#12531)

* Append Dynamic Addinng Trusted Peer Apis

* Append unit tests for Dynamic Addinng Trusted Peer Apis

* Update beacon-chain/p2p/peers/peerdata/store.go

* Update beacon-chain/p2p/peers/peerdata/store_test.go

* Update beacon-chain/p2p/peers/peerdata/store_test.go

* Update beacon-chain/p2p/peers/peerdata/store_test.go

* Update beacon-chain/p2p/peers/status.go

* Update beacon-chain/p2p/peers/status_test.go

* Update beacon-chain/p2p/peers/status_test.go

* Update beacon-chain/rpc/eth/node/handlers.go

* Update beacon-chain/rpc/eth/node/handlers.go

* Update beacon-chain/rpc/eth/node/handlers.go

* Update beacon-chain/rpc/eth/node/handlers.go

* Move trusted peer apis from rpc/eth/v1/node to rpc/prysm/node; move peersToWatch into ensurePeerConnections function;

* Update beacon-chain/rpc/prysm/node/server.go

* Update beacon-chain/rpc/prysm/node/server.go

* fix go lint problem

* p2p/watch_peers.go: trusted peer makes AddrInfo structure by itself instead of using MakePeer().

p2p/service.go: add connectWithAllTrustedPeers function, before connectWithPeer, add trusted peer info into peer status.

p2p/peers/status.go: trusted peers are not included, when pruning outdated and disconnected peers.

* use readlock for GetTrustedPeers and IsTrustedPeers

---------

Co-authored-by: simon <sunminghui2@huawei.com>
Co-authored-by: Radosław Kapka <rkapka@wp.pl>
Co-authored-by: Nishant Das <nishdas93@gmail.com>
This commit is contained in:
Simon
2023-07-11 17:26:08 +08:00
committed by GitHub
parent d56a530c86
commit cec32cb996
14 changed files with 691 additions and 19 deletions

View File

@@ -108,12 +108,31 @@ func (s *Store) DeletePeerData(pid peer.ID) {
}
// SetTrustedPeers sets our desired trusted peer set.
// Important: it is assumed that store mutex is locked when calling this method.
func (s *Store) SetTrustedPeers(peers []peer.ID) {
for _, p := range peers {
s.trustedPeers[p] = true
}
}
// GetTrustedPeers gets our desired trusted peer ids.
// Important: it is assumed that store mutex is locked when calling this method.
func (s *Store) GetTrustedPeers() []peer.ID {
peers := []peer.ID{}
for p := range s.trustedPeers {
peers = append(peers, p)
}
return peers
}
// DeleteTrustedPeers removes peers from trusted peer set.
// Important: it is assumed that store mutex is locked when calling this method.
func (s *Store) DeleteTrustedPeers(peers []peer.ID) {
for _, p := range peers {
delete(s.trustedPeers, p)
}
}
// Peers returns map of peer data objects.
// Important: it is assumed that store mutex is locked when calling this method.
func (s *Store) Peers() map[peer.ID]*PeerData {

View File

@@ -96,4 +96,16 @@ func TestStore_TrustedPeers(t *testing.T) {
assert.Equal(t, true, store.IsTrustedPeer(pid1))
assert.Equal(t, true, store.IsTrustedPeer(pid2))
assert.Equal(t, true, store.IsTrustedPeer(pid3))
tPeers = store.GetTrustedPeers()
assert.Equal(t, 3, len(tPeers))
store.DeleteTrustedPeers(tPeers)
tPeers = store.GetTrustedPeers()
assert.Equal(t, 0, len(tPeers))
assert.Equal(t, false, store.IsTrustedPeer(pid1))
assert.Equal(t, false, store.IsTrustedPeer(pid2))
assert.Equal(t, false, store.IsTrustedPeer(pid3))
}

View File

@@ -560,6 +560,9 @@ func (p *Status) Prune() {
notBadPeer := func(pid peer.ID) bool {
return !p.isBad(pid)
}
notTrustedPeer := func(pid peer.ID) bool {
return !p.isTrustedPeers(pid)
}
type peerResp struct {
pid peer.ID
score float64
@@ -567,7 +570,8 @@ func (p *Status) Prune() {
peersToPrune := make([]*peerResp, 0)
// Select disconnected peers with a smaller bad response count.
for pid, peerData := range p.store.Peers() {
if peerData.ConnState == PeerDisconnected && notBadPeer(pid) {
// Should not prune trusted peer or prune the peer dara and unset trusted peer.
if peerData.ConnState == PeerDisconnected && notBadPeer(pid) && notTrustedPeer(pid) {
peersToPrune = append(peersToPrune, &peerResp{
pid: pid,
score: p.Scorers().ScoreNoLock(pid),
@@ -608,6 +612,9 @@ func (p *Status) deprecatedPrune() {
notBadPeer := func(peerData *peerdata.PeerData) bool {
return peerData.BadResponses < p.scorers.BadResponsesScorer().Params().Threshold
}
notTrustedPeer := func(pid peer.ID) bool {
return !p.isTrustedPeers(pid)
}
type peerResp struct {
pid peer.ID
badResp int
@@ -615,7 +622,8 @@ func (p *Status) deprecatedPrune() {
peersToPrune := make([]*peerResp, 0)
// Select disconnected peers with a smaller bad response count.
for pid, peerData := range p.store.Peers() {
if peerData.ConnState == PeerDisconnected && notBadPeer(peerData) {
// Should not prune trusted peer or prune the peer dara and unset trusted peer.
if peerData.ConnState == PeerDisconnected && notBadPeer(peerData) && notTrustedPeer(pid) {
peersToPrune = append(peersToPrune, &peerResp{
pid: pid,
badResp: peerData.BadResponses,
@@ -912,6 +920,32 @@ func (p *Status) SetTrustedPeers(peers []peer.ID) {
p.store.SetTrustedPeers(peers)
}
// GetTrustedPeers returns a list of all trusted peers' ids
func (p *Status) GetTrustedPeers() []peer.ID {
p.store.RLock()
defer p.store.RUnlock()
return p.store.GetTrustedPeers()
}
// DeleteTrustedPeers removes peers from trusted peer set
func (p *Status) DeleteTrustedPeers(peers []peer.ID) {
p.store.Lock()
defer p.store.Unlock()
p.store.DeleteTrustedPeers(peers)
}
// IsTrustedPeers returns if given peer is a Trusted peer
func (p *Status) IsTrustedPeers(pid peer.ID) bool {
p.store.RLock()
defer p.store.RUnlock()
return p.isTrustedPeers(pid)
}
// isTrustedPeers is the lock-free version of IsTrustedPeers.
func (p *Status) isTrustedPeers(pid peer.ID) bool {
return p.store.IsTrustedPeer(pid)
}
// this method assumes the store lock is acquired before
// executing the method.
func (p *Status) isfromBadIP(pid peer.ID) bool {

View File

@@ -802,6 +802,11 @@ func TestPrunePeers_TrustedPeers(t *testing.T) {
}
}
p.SetTrustedPeers(trustedPeers)
// Assert we have correct trusted peers
trustedPeers = p.GetTrustedPeers()
assert.Equal(t, 6, len(trustedPeers))
// Assert all peers more than max are prunable.
peersToPrune = p.PeersToPrune()
assert.Equal(t, 16, len(peersToPrune))
@@ -812,6 +817,34 @@ func TestPrunePeers_TrustedPeers(t *testing.T) {
assert.NotEqual(t, pid.String(), tPid.String())
}
}
// Add more peers to check if trusted peers can be pruned after they are deleted from trusted peer set.
for i := 0; i < 9; i++ {
// Peer added to peer handler.
createPeer(t, p, nil, network.DirInbound, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED))
}
// Delete trusted peers.
p.DeleteTrustedPeers(trustedPeers)
peersToPrune = p.PeersToPrune()
assert.Equal(t, 25, len(peersToPrune))
// Check that trusted peers are pruned.
for _, tPid := range trustedPeers {
pruned := false
for _, pid := range peersToPrune {
if pid.String() == tPid.String() {
pruned = true
}
}
assert.Equal(t, true, pruned)
}
// Assert have zero trusted peers
trustedPeers = p.GetTrustedPeers()
assert.Equal(t, 0, len(trustedPeers))
for _, pid := range peersToPrune {
dir, err := p.Direction(pid)
require.NoError(t, err)
@@ -821,8 +854,8 @@ func TestPrunePeers_TrustedPeers(t *testing.T) {
// Ensure it is in the descending order.
currScore := p.Scorers().Score(peersToPrune[0])
for _, pid := range peersToPrune {
score := p.Scorers().BadResponsesScorer().Score(pid)
assert.Equal(t, true, currScore >= score)
score := p.Scorers().Score(pid)
assert.Equal(t, true, currScore <= score)
currScore = score
}
}

View File

@@ -174,9 +174,9 @@ func (s *Service) Start() {
s.awaitStateInitialized()
s.isPreGenesis = false
var peersToWatch []string
var relayNodes []string
if s.cfg.RelayNodeAddr != "" {
peersToWatch = append(peersToWatch, s.cfg.RelayNodeAddr)
relayNodes = append(relayNodes, s.cfg.RelayNodeAddr)
if err := dialRelayNode(s.ctx, s.host, s.cfg.RelayNodeAddr); err != nil {
log.WithError(err).Errorf("Could not dial relay node")
}
@@ -213,8 +213,7 @@ func (s *Service) Start() {
// Set trusted peers for those that are provided as static addresses.
pids := peerIdsFromMultiAddrs(addrs)
s.peers.SetTrustedPeers(pids)
peersToWatch = append(peersToWatch, s.cfg.StaticPeers...)
s.connectWithAllPeers(addrs)
s.connectWithAllTrustedPeers(addrs)
}
// Initialize metadata according to the
// current epoch.
@@ -226,7 +225,7 @@ func (s *Service) Start() {
// Periodic functions.
async.RunEvery(s.ctx, params.BeaconNetworkConfig().TtfbTimeout, func() {
ensurePeerConnections(s.ctx, s.host, peersToWatch...)
ensurePeerConnections(s.ctx, s.host, s.peers, relayNodes...)
})
async.RunEvery(s.ctx, 30*time.Minute, s.Peers().Prune)
async.RunEvery(s.ctx, params.BeaconNetworkConfig().RespTimeout, s.updateMetrics)
@@ -399,6 +398,24 @@ func (s *Service) awaitStateInitialized() {
}
}
func (s *Service) connectWithAllTrustedPeers(multiAddrs []multiaddr.Multiaddr) {
addrInfos, err := peer.AddrInfosFromP2pAddrs(multiAddrs...)
if err != nil {
log.WithError(err).Error("Could not convert to peer address info's from multiaddresses")
return
}
for _, info := range addrInfos {
// add peer into peer status
s.peers.Add(nil, info.ID, info.Addrs[0], network.DirUnknown)
// make each dial non-blocking
go func(info peer.AddrInfo) {
if err := s.connectWithPeer(s.ctx, info); err != nil {
log.WithError(err).Tracef("Could not connect with peer %s", info.String())
}
}(info)
}
}
func (s *Service) connectWithAllPeers(multiAddrs []multiaddr.Multiaddr) {
addrInfos, err := peer.AddrInfosFromP2pAddrs(multiAddrs...)
if err != nil {

View File

@@ -5,28 +5,52 @@ import (
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
ma "github.com/multiformats/go-multiaddr"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/peers"
)
// ensurePeerConnections will attempt to reestablish connection to the peers
// if there are currently no connections to that peer.
func ensurePeerConnections(ctx context.Context, h host.Host, peers ...string) {
if len(peers) == 0 {
return
}
for _, p := range peers {
if p == "" {
func ensurePeerConnections(ctx context.Context, h host.Host, peers *peers.Status, relayNodes ...string) {
// every time reset peersToWatch, add RelayNodes and trust peers
var peersToWatch []*peer.AddrInfo
// add RelayNodes
for _, node := range relayNodes {
if node == "" {
continue
}
peerInfo, err := MakePeer(p)
peerInfo, err := MakePeer(node)
if err != nil {
log.WithError(err).Error("Could not make peer")
continue
}
peersToWatch = append(peersToWatch, peerInfo)
}
c := h.Network().ConnsToPeer(peerInfo.ID)
// add trusted peers
trustedPeers := peers.GetTrustedPeers()
for _, trustedPeer := range trustedPeers {
maddr, err := peers.Address(trustedPeer)
// avoid invalid trusted peers
if err != nil || maddr == nil {
log.WithField("peer", trustedPeers).WithError(err).Error("Could not get peer address")
continue
}
peerInfo := &peer.AddrInfo{ID: trustedPeer}
peerInfo.Addrs = []ma.Multiaddr{maddr}
peersToWatch = append(peersToWatch, peerInfo)
}
if len(peersToWatch) == 0 {
return
}
for _, p := range peersToWatch {
c := h.Network().ConnsToPeer(p.ID)
if len(c) == 0 {
if err := connectWithTimeout(ctx, h, peerInfo); err != nil {
log.WithField("peer", peerInfo.ID).WithField("addrs", peerInfo.Addrs).WithError(err).Errorf("Failed to reconnect to peer")
if err := connectWithTimeout(ctx, h, p); err != nil {
log.WithField("peer", p.ID).WithField("addrs", p.Addrs).WithError(err).Errorf("Failed to reconnect to peer")
continue
}
}

View File

@@ -32,6 +32,7 @@ go_library(
"//beacon-chain/rpc/eth/rewards:go_default_library",
"//beacon-chain/rpc/eth/validator:go_default_library",
"//beacon-chain/rpc/lookup:go_default_library",
"//beacon-chain/rpc/prysm/node:go_default_library",
"//beacon-chain/rpc/prysm/v1alpha1/beacon:go_default_library",
"//beacon-chain/rpc/prysm/v1alpha1/debug:go_default_library",
"//beacon-chain/rpc/prysm/v1alpha1/node:go_default_library",

View File

@@ -0,0 +1,49 @@
load("@prysm//tools/go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"handlers.go",
"server.go",
"structs.go",
],
importpath = "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/prysm/node",
visibility = ["//beacon-chain:__subpackages__"],
deps = [
"//beacon-chain/blockchain:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/execution:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//beacon-chain/p2p/peers:go_default_library",
"//beacon-chain/p2p/peers/peerdata:go_default_library",
"//beacon-chain/sync:go_default_library",
"//network:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"@com_github_libp2p_go_libp2p//core/network:go_default_library",
"@com_github_libp2p_go_libp2p//core/peer:go_default_library",
"@com_github_pkg_errors//:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = [
"handlers_test.go",
"server_test.go",
],
embed = [":go_default_library"],
deps = [
"//beacon-chain/p2p:go_default_library",
"//beacon-chain/p2p/peers:go_default_library",
"//beacon-chain/p2p/testing:go_default_library",
"//network:go_default_library",
"//testing/assert:go_default_library",
"//testing/require:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enode:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enr:go_default_library",
"@com_github_libp2p_go_libp2p//core/network:go_default_library",
"@com_github_libp2p_go_libp2p//core/peer:go_default_library",
"@com_github_libp2p_go_libp2p//p2p/host/peerstore/test:go_default_library",
"@com_github_multiformats_go_multiaddr//:go_default_library",
],
)

View File

@@ -0,0 +1,177 @@
package node
import (
"encoding/json"
"io"
"net/http"
"strings"
corenet "github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/peers"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/peers/peerdata"
"github.com/prysmaticlabs/prysm/v4/network"
eth "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
)
// ListTrustedPeer retrieves data about the node's trusted peers.
func (s *Server) ListTrustedPeer(w http.ResponseWriter, r *http.Request) {
peerStatus := s.PeersFetcher.Peers()
allIds := s.PeersFetcher.Peers().GetTrustedPeers()
allPeers := make([]*Peer, 0, len(allIds))
for _, id := range allIds {
p, err := httpPeerInfo(peerStatus, id)
if err != nil {
errJson := &network.DefaultErrorJson{
Message: errors.Wrapf(err, "Could not get peer info").Error(),
Code: http.StatusInternalServerError,
}
network.WriteError(w, errJson)
return
}
// peers added into trusted set but never connected should also be listed
if p == nil {
p = &Peer{
PeerID: id.String(),
Enr: "",
LastSeenP2PAddress: "",
State: eth.ConnectionState(corenet.NotConnected).String(),
Direction: eth.PeerDirection(corenet.DirUnknown).String(),
}
}
allPeers = append(allPeers, p)
}
response := &PeersResponse{Peers: allPeers}
network.WriteJson(w, response)
}
// AddTrustedPeer adds a new peer into node's trusted peer set by Multiaddr
func (s *Server) AddTrustedPeer(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
if err != nil {
errJson := &network.DefaultErrorJson{
Message: errors.Wrapf(err, "Could not read request body").Error(),
Code: http.StatusInternalServerError,
}
network.WriteError(w, errJson)
return
}
var addrRequest *AddrRequest
err = json.Unmarshal(body, &addrRequest)
if err != nil {
errJson := &network.DefaultErrorJson{
Message: errors.Wrapf(err, "Could not decode request body into peer address").Error(),
Code: http.StatusBadRequest,
}
network.WriteError(w, errJson)
return
}
info, err := peer.AddrInfoFromString(addrRequest.Addr)
if err != nil {
errJson := &network.DefaultErrorJson{
Message: errors.Wrapf(err, "Could not derive peer info from multiaddress").Error(),
Code: http.StatusBadRequest,
}
network.WriteError(w, errJson)
return
}
// also add new peerdata to peers
direction, err := s.PeersFetcher.Peers().Direction(info.ID)
if err != nil {
s.PeersFetcher.Peers().Add(nil, info.ID, info.Addrs[0], corenet.DirUnknown)
} else {
s.PeersFetcher.Peers().Add(nil, info.ID, info.Addrs[0], direction)
}
peers := []peer.ID{}
peers = append(peers, info.ID)
s.PeersFetcher.Peers().SetTrustedPeers(peers)
w.WriteHeader(http.StatusOK)
}
// RemoveTrustedPeer removes peer from our trusted peer set but does not close connection.
func (s *Server) RemoveTrustedPeer(w http.ResponseWriter, r *http.Request) {
segments := strings.Split(r.URL.Path, "/")
id := segments[len(segments)-1]
peerId, err := peer.Decode(id)
if err != nil {
errJson := &network.DefaultErrorJson{
Message: errors.Wrapf(err, "Could not decode peer id").Error(),
Code: http.StatusBadRequest,
}
network.WriteError(w, errJson)
return
}
// if the peer is not a trusted peer, do nothing but return 200
if !s.PeersFetcher.Peers().IsTrustedPeers(peerId) {
w.WriteHeader(http.StatusOK)
return
}
peers := []peer.ID{}
peers = append(peers, peerId)
s.PeersFetcher.Peers().DeleteTrustedPeers(peers)
w.WriteHeader(http.StatusOK)
}
// httpPeerInfo does the same thing as peerInfo function in node.go but returns the
// http peer response.
func httpPeerInfo(peerStatus *peers.Status, id peer.ID) (*Peer, error) {
enr, err := peerStatus.ENR(id)
if err != nil {
if errors.Is(err, peerdata.ErrPeerUnknown) {
return nil, nil
}
return nil, errors.Wrap(err, "could not obtain ENR")
}
var serializedEnr string
if enr != nil {
serializedEnr, err = p2p.SerializeENR(enr)
if err != nil {
return nil, errors.Wrap(err, "could not serialize ENR")
}
}
address, err := peerStatus.Address(id)
if err != nil {
if errors.Is(err, peerdata.ErrPeerUnknown) {
return nil, nil
}
return nil, errors.Wrap(err, "could not obtain address")
}
connectionState, err := peerStatus.ConnectionState(id)
if err != nil {
if errors.Is(err, peerdata.ErrPeerUnknown) {
return nil, nil
}
return nil, errors.Wrap(err, "could not obtain connection state")
}
direction, err := peerStatus.Direction(id)
if err != nil {
if errors.Is(err, peerdata.ErrPeerUnknown) {
return nil, nil
}
return nil, errors.Wrap(err, "could not obtain direction")
}
if eth.PeerDirection(direction) == eth.PeerDirection_UNKNOWN {
return nil, nil
}
v1ConnState := eth.ConnectionState(connectionState).String()
v1PeerDirection := eth.PeerDirection(direction).String()
p := Peer{
PeerID: id.String(),
State: v1ConnState,
Direction: v1PeerDirection,
}
if address != nil {
p.LastSeenP2PAddress = address.String()
}
if serializedEnr != "" {
p.Enr = "enr:" + serializedEnr
}
return &p, nil
}

View File

@@ -0,0 +1,250 @@
package node
import (
"bytes"
"encoding/json"
"net/http"
"net/http/httptest"
"strconv"
"testing"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
corenet "github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
libp2ptest "github.com/libp2p/go-libp2p/p2p/host/peerstore/test"
ma "github.com/multiformats/go-multiaddr"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/peers"
mockp2p "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/testing"
"github.com/prysmaticlabs/prysm/v4/network"
"github.com/prysmaticlabs/prysm/v4/testing/assert"
"github.com/prysmaticlabs/prysm/v4/testing/require"
)
type testIdentity enode.ID
func (_ testIdentity) Verify(_ *enr.Record, _ []byte) error { return nil }
func (id testIdentity) NodeAddr(_ *enr.Record) []byte { return id[:] }
func TestListTrustedPeer(t *testing.T) {
ids := libp2ptest.GeneratePeerIDs(9)
peerFetcher := &mockp2p.MockPeersProvider{}
peerFetcher.ClearPeers()
peerStatus := peerFetcher.Peers()
for i, id := range ids {
if i == len(ids)-1 {
var p2pAddr = "/ip4/127.0.0." + strconv.Itoa(i) + "/udp/12000/p2p/16Uiu2HAm7yD5fhhw1Kihg5pffaGbvKV3k7sqxRGHMZzkb7u9UUxQ"
p2pMultiAddr, err := ma.NewMultiaddr(p2pAddr)
require.NoError(t, err)
peerStatus.Add(nil, id, p2pMultiAddr, corenet.DirUnknown)
continue
}
enrRecord := &enr.Record{}
err := enrRecord.SetSig(testIdentity{1}, []byte{42})
require.NoError(t, err)
enrRecord.Set(enr.IPv4{127, 0, 0, byte(i)})
err = enrRecord.SetSig(testIdentity{}, []byte{})
require.NoError(t, err)
var p2pAddr = "/ip4/127.0.0." + strconv.Itoa(i) + "/udp/12000/p2p/16Uiu2HAm7yD5fhhw1Kihg5pffaGbvKV3k7sqxRGHMZzkb7u9UUxQ"
p2pMultiAddr, err := ma.NewMultiaddr(p2pAddr)
require.NoError(t, err)
var direction corenet.Direction
if i%2 == 0 {
direction = corenet.DirInbound
} else {
direction = corenet.DirOutbound
}
peerStatus.Add(enrRecord, id, p2pMultiAddr, direction)
switch i {
case 0, 1:
peerStatus.SetConnectionState(id, peers.PeerConnecting)
case 2, 3:
peerStatus.SetConnectionState(id, peers.PeerConnected)
case 4, 5:
peerStatus.SetConnectionState(id, peers.PeerDisconnecting)
case 6, 7:
peerStatus.SetConnectionState(id, peers.PeerDisconnected)
default:
t.Fatalf("Failed to set connection state for peer")
}
}
s := Server{PeersFetcher: peerFetcher}
// set all peers as trusted peers
s.PeersFetcher.Peers().SetTrustedPeers(ids)
t.Run("Peer data OK", func(t *testing.T) {
url := "http://anything.is.fine"
request := httptest.NewRequest("GET", url, nil)
writer := httptest.NewRecorder()
writer.Body = &bytes.Buffer{}
s.ListTrustedPeer(writer, request)
assert.Equal(t, http.StatusOK, writer.Code)
resp := &PeersResponse{}
require.NoError(t, json.Unmarshal(writer.Body.Bytes(), resp))
peers := resp.Peers
// assert number of trusted peer is right
assert.Equal(t, 9, len(peers))
for i := 0; i < 9; i++ {
pid, err := peer.Decode(peers[i].PeerID)
require.NoError(t, err)
if pid == ids[8] {
assert.Equal(t, "", peers[i].Enr)
assert.Equal(t, "", peers[i].LastSeenP2PAddress)
assert.Equal(t, "DISCONNECTED", peers[i].State)
assert.Equal(t, "UNKNOWN", peers[i].Direction)
continue
}
expectedEnr, err := peerStatus.ENR(pid)
require.NoError(t, err)
serializeENR, err := p2p.SerializeENR(expectedEnr)
require.NoError(t, err)
assert.Equal(t, "enr:"+serializeENR, peers[i].Enr)
expectedP2PAddr, err := peerStatus.Address(pid)
require.NoError(t, err)
assert.Equal(t, expectedP2PAddr.String(), peers[i].LastSeenP2PAddress)
switch pid {
case ids[0]:
assert.Equal(t, "CONNECTING", peers[i].State)
assert.Equal(t, "INBOUND", peers[i].Direction)
case ids[1]:
assert.Equal(t, "CONNECTING", peers[i].State)
assert.Equal(t, "OUTBOUND", peers[i].Direction)
case ids[2]:
assert.Equal(t, "CONNECTED", peers[i].State)
assert.Equal(t, "INBOUND", peers[i].Direction)
case ids[3]:
assert.Equal(t, "CONNECTED", peers[i].State)
assert.Equal(t, "OUTBOUND", peers[i].Direction)
case ids[4]:
assert.Equal(t, "DISCONNECTING", peers[i].State)
assert.Equal(t, "INBOUND", peers[i].Direction)
case ids[5]:
assert.Equal(t, "DISCONNECTING", peers[i].State)
assert.Equal(t, "OUTBOUND", peers[i].Direction)
case ids[6]:
assert.Equal(t, "DISCONNECTED", peers[i].State)
assert.Equal(t, "INBOUND", peers[i].Direction)
case ids[7]:
assert.Equal(t, "DISCONNECTED", peers[i].State)
assert.Equal(t, "OUTBOUND", peers[i].Direction)
default:
t.Fatalf("Failed to get connection state and direction for peer")
}
}
})
}
func TestListTrustedPeers_NoPeersReturnsEmptyArray(t *testing.T) {
peerFetcher := &mockp2p.MockPeersProvider{}
peerFetcher.ClearPeers()
s := Server{PeersFetcher: peerFetcher}
url := "http://anything.is.fine"
request := httptest.NewRequest("GET", url, nil)
writer := httptest.NewRecorder()
writer.Body = &bytes.Buffer{}
s.ListTrustedPeer(writer, request)
assert.Equal(t, http.StatusOK, writer.Code)
resp := &PeersResponse{}
require.NoError(t, json.Unmarshal(writer.Body.Bytes(), resp))
peers := resp.Peers
assert.Equal(t, 0, len(peers))
}
func TestAddTrustedPeer(t *testing.T) {
peerFetcher := &mockp2p.MockPeersProvider{}
peerFetcher.ClearPeers()
s := Server{PeersFetcher: peerFetcher}
url := "http://anything.is.fine"
addr := &AddrRequest{
Addr: "/ip4/127.0.0.1/tcp/30303/p2p/16Uiu2HAm1n583t4huDMMqEUUBuQs6bLts21mxCfX3tiqu9JfHvRJ",
}
addrJson, err := json.Marshal(addr)
require.NoError(t, err)
var body bytes.Buffer
_, err = body.Write(addrJson)
require.NoError(t, err)
request := httptest.NewRequest("POST", url, &body)
writer := httptest.NewRecorder()
writer.Body = &bytes.Buffer{}
s.AddTrustedPeer(writer, request)
assert.Equal(t, http.StatusOK, writer.Code)
}
func TestAddTrustedPeer_EmptyBody(t *testing.T) {
peerFetcher := &mockp2p.MockPeersProvider{}
peerFetcher.ClearPeers()
s := Server{PeersFetcher: peerFetcher}
url := "http://anything.is.fine"
request := httptest.NewRequest("POST", url, nil)
writer := httptest.NewRecorder()
writer.Body = &bytes.Buffer{}
s.AddTrustedPeer(writer, request)
e := &network.DefaultErrorJson{}
require.NoError(t, json.Unmarshal(writer.Body.Bytes(), e))
assert.Equal(t, http.StatusBadRequest, writer.Code)
assert.Equal(t, "Could not decode request body into peer address: unexpected end of JSON input", e.Message)
}
func TestAddTrustedPeer_BadAddress(t *testing.T) {
peerFetcher := &mockp2p.MockPeersProvider{}
peerFetcher.ClearPeers()
s := Server{PeersFetcher: peerFetcher}
url := "http://anything.is.fine"
addr := &AddrRequest{
Addr: "anything/but/not/an/address",
}
addrJson, err := json.Marshal(addr)
require.NoError(t, err)
var body bytes.Buffer
_, err = body.Write(addrJson)
require.NoError(t, err)
request := httptest.NewRequest("POST", url, &body)
writer := httptest.NewRecorder()
writer.Body = &bytes.Buffer{}
s.AddTrustedPeer(writer, request)
e := &network.DefaultErrorJson{}
require.NoError(t, json.Unmarshal(writer.Body.Bytes(), e))
assert.Equal(t, http.StatusBadRequest, writer.Code)
assert.StringContains(t, "Could not derive peer info from multiaddress", e.Message)
}
func TestRemoveTrustedPeer(t *testing.T) {
peerFetcher := &mockp2p.MockPeersProvider{}
peerFetcher.ClearPeers()
s := Server{PeersFetcher: peerFetcher}
url := "http://anything.is.fine.but.last.is.important/16Uiu2HAm1n583t4huDMMqEUUBuQs6bLts21mxCfX3tiqu9JfHvRJ"
request := httptest.NewRequest("DELETE", url, nil)
writer := httptest.NewRecorder()
writer.Body = &bytes.Buffer{}
s.RemoveTrustedPeer(writer, request)
assert.Equal(t, http.StatusOK, writer.Code)
}
func TestRemoveTrustedPeer_EmptyParameter(t *testing.T) {
peerFetcher := &mockp2p.MockPeersProvider{}
peerFetcher.ClearPeers()
s := Server{PeersFetcher: peerFetcher}
url := "http://anything.is.fine"
request := httptest.NewRequest("DELETE", url, nil)
writer := httptest.NewRecorder()
writer.Body = &bytes.Buffer{}
s.RemoveTrustedPeer(writer, request)
e := &network.DefaultErrorJson{}
require.NoError(t, json.Unmarshal(writer.Body.Bytes(), e))
assert.Equal(t, http.StatusBadRequest, writer.Code)
assert.Equal(t, "Could not decode peer id: failed to parse peer ID: invalid cid: cid too short", e.Message)
}

View File

@@ -0,0 +1,21 @@
package node
import (
"github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/db"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/execution"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/sync"
)
type Server struct {
SyncChecker sync.Checker
OptimisticModeFetcher blockchain.OptimisticModeFetcher
BeaconDB db.ReadOnlyDatabase
PeersFetcher p2p.PeersProvider
PeerManager p2p.PeerManager
MetadataProvider p2p.MetadataProvider
GenesisTimeFetcher blockchain.TimeFetcher
HeadFetcher blockchain.HeadFetcher
ExecutionChainInfoFetcher execution.ChainInfoFetcher
}

View File

@@ -0,0 +1 @@
package node

View File

@@ -0,0 +1,17 @@
package node
type AddrRequest struct {
Addr string `json:"addr"`
}
type PeersResponse struct {
Peers []*Peer `json:"Peers"`
}
type Peer struct {
PeerID string `json:"peer_id"`
Enr string `json:"enr"`
LastSeenP2PAddress string `json:"last_seen_p2p_address"`
State string `json:"state"`
Direction string `json:"direction"`
}

View File

@@ -37,6 +37,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/eth/rewards"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/eth/validator"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/lookup"
nodeprysm "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/prysm/node"
beaconv1alpha1 "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/prysm/v1alpha1/beacon"
debugv1alpha1 "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/prysm/v1alpha1/debug"
nodev1alpha1 "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/prysm/v1alpha1/node"
@@ -307,6 +308,22 @@ func (s *Service) Start() {
ExecutionChainInfoFetcher: s.cfg.ExecutionChainInfoFetcher,
}
nodeServerPrysm := &nodeprysm.Server{
BeaconDB: s.cfg.BeaconDB,
SyncChecker: s.cfg.SyncService,
OptimisticModeFetcher: s.cfg.OptimisticModeFetcher,
GenesisTimeFetcher: s.cfg.GenesisTimeFetcher,
PeersFetcher: s.cfg.PeersFetcher,
PeerManager: s.cfg.PeerManager,
MetadataProvider: s.cfg.MetadataProvider,
HeadFetcher: s.cfg.HeadFetcher,
ExecutionChainInfoFetcher: s.cfg.ExecutionChainInfoFetcher,
}
s.cfg.Router.HandleFunc("/prysm/node/trusted_peers", nodeServerPrysm.ListTrustedPeer).Methods("GET")
s.cfg.Router.HandleFunc("/prysm/node/trusted_peers", nodeServerPrysm.AddTrustedPeer).Methods("POST")
s.cfg.Router.HandleFunc("/prysm/node/trusted_peers/{peer_id}", nodeServerPrysm.RemoveTrustedPeer).Methods("Delete")
beaconChainServer := &beaconv1alpha1.Server{
Ctx: s.ctx,
BeaconDB: s.cfg.BeaconDB,