diff --git a/beacon-chain/p2p/connection_gater_test.go b/beacon-chain/p2p/connection_gater_test.go index ebf4e8c8ee..30c56fa682 100644 --- a/beacon-chain/p2p/connection_gater_test.go +++ b/beacon-chain/p2p/connection_gater_test.go @@ -21,7 +21,10 @@ func TestPeer_AtMaxLimit(t *testing.T) { listen, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", ipAddr, 2000)) require.NoError(t, err, "Failed to p2p listen") s := &Service{} - s.peers = peers.NewStatus(3, 0) + s.peers = peers.NewStatus(context.Background(), &peers.StatusConfig{ + PeerLimit: 0, + MaxBadResponses: 3, + }) s.cfg = &Config{MaxPeers: 0} s.addrFilter, err = configureFilter(&Config{}) require.NoError(t, err) @@ -57,7 +60,10 @@ func TestPeer_BelowMaxLimit(t *testing.T) { listen, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", ipAddr, 2000)) require.NoError(t, err, "Failed to p2p listen") s := &Service{} - s.peers = peers.NewStatus(3, 1) + s.peers = peers.NewStatus(context.Background(), &peers.StatusConfig{ + PeerLimit: 1, + MaxBadResponses: 3, + }) s.cfg = &Config{MaxPeers: 1} s.addrFilter, err = configureFilter(&Config{}) require.NoError(t, err) diff --git a/beacon-chain/p2p/peers/BUILD.bazel b/beacon-chain/p2p/peers/BUILD.bazel index afd59b4008..49709923fb 100644 --- a/beacon-chain/p2p/peers/BUILD.bazel +++ b/beacon-chain/p2p/peers/BUILD.bazel @@ -3,7 +3,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_test") go_library( name = "go_default_library", - srcs = ["status.go"], + srcs = [ + "status.go", + "store.go", + ], importpath = "github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers", visibility = ["//beacon-chain:__subpackages__"], deps = [ diff --git a/beacon-chain/p2p/peers/status.go b/beacon-chain/p2p/peers/status.go index 460bb26104..fca42565fc 100644 --- a/beacon-chain/p2p/peers/status.go +++ b/beacon-chain/p2p/peers/status.go @@ -20,9 +20,9 @@ package peers import ( + "context" "errors" "sort" - "sync" "time" "github.com/ethereum/go-ethereum/p2p/enr" @@ -62,79 +62,76 @@ var ( // Status is the structure holding the peer status information. type Status struct { - lock sync.RWMutex - maxBadResponses int - status map[peer.ID]*peerStatus - maxLimit int + ctx context.Context + store *peerDataStore + config *StatusConfig } -// peerStatus is the status of an individual peer at the protocol level. -type peerStatus struct { - address ma.Multiaddr - direction network.Direction - peerState PeerConnectionState - chainState *pb.Status - enr *enr.Record - metaData *pb.MetaData - chainStateLastUpdated time.Time - badResponses int +// StatusConfig represents peer status service params. +type StatusConfig struct { + // PeerLimit specifies maximum amount of concurrent peers that are expected to be connect to the node. + PeerLimit int + // MaxBadResponses specifies number of bad responses tolerated, before peer is banned. + MaxBadResponses int } // NewStatus creates a new status entity. -func NewStatus(maxBadResponses int, peerLimit int) *Status { +func NewStatus(ctx context.Context, config *StatusConfig) *Status { + store := newPeerDataStore(ctx, &peerDataStoreConfig{ + maxPeers: maxLimitBuffer + config.PeerLimit, + }) return &Status{ - maxBadResponses: maxBadResponses, - status: make(map[peer.ID]*peerStatus), - maxLimit: maxLimitBuffer + peerLimit, + ctx: ctx, + store: store, + config: config, } } // MaxBadResponses returns the maximum number of bad responses a peer can provide before it is considered bad. func (p *Status) MaxBadResponses() int { - return p.maxBadResponses + return p.config.MaxBadResponses } -// MaxPeerLimit returns the max peer limit stored in -// the current peer store. +// MaxPeerLimit returns the max peer limit stored in the current peer store. func (p *Status) MaxPeerLimit() int { - return p.maxLimit + return p.store.config.maxPeers } // Add adds a peer. // If a peer already exists with this ID its address and direction are updated with the supplied data. func (p *Status) Add(record *enr.Record, pid peer.ID, address ma.Multiaddr, direction network.Direction) { - p.lock.Lock() - defer p.lock.Unlock() + p.store.Lock() + defer p.store.Unlock() - if status, ok := p.status[pid]; ok { + if peerData, ok := p.store.peers[pid]; ok { // Peer already exists, just update its address info. - status.address = address - status.direction = direction + peerData.address = address + peerData.direction = direction if record != nil { - status.enr = record + peerData.enr = record } return } - status := &peerStatus{ + peerData := &peerData{ address: address, direction: direction, // Peers start disconnected; state will be updated when the handshake process begins. - peerState: PeerDisconnected, + connState: PeerDisconnected, } if record != nil { - status.enr = record + peerData.enr = record } - p.status[pid] = status + p.store.peers[pid] = peerData } // Address returns the multiaddress of the given remote peer. // This will error if the peer does not exist. func (p *Status) Address(pid peer.ID) (ma.Multiaddr, error) { - p.lock.RLock() - defer p.lock.RUnlock() + p.store.RLock() + defer p.store.RUnlock() - if status, ok := p.status[pid]; ok { - return status.address, nil + if peerData, ok := p.store.peers[pid]; ok { + return peerData.address, nil } return nil, ErrPeerUnknown } @@ -142,89 +139,89 @@ func (p *Status) Address(pid peer.ID) (ma.Multiaddr, error) { // Direction returns the direction of the given remote peer. // This will error if the peer does not exist. func (p *Status) Direction(pid peer.ID) (network.Direction, error) { - p.lock.RLock() - defer p.lock.RUnlock() + p.store.RLock() + defer p.store.RUnlock() - if status, ok := p.status[pid]; ok { - return status.direction, nil + if peerData, ok := p.store.peers[pid]; ok { + return peerData.direction, nil } return network.DirUnknown, ErrPeerUnknown } // ENR returns the enr for the corresponding peer id. func (p *Status) ENR(pid peer.ID) (*enr.Record, error) { - p.lock.RLock() - defer p.lock.RUnlock() + p.store.RLock() + defer p.store.RUnlock() - if status, ok := p.status[pid]; ok { - return status.enr, nil + if peerData, ok := p.store.peers[pid]; ok { + return peerData.enr, nil } return nil, ErrPeerUnknown } // SetChainState sets the chain state of the given remote peer. func (p *Status) SetChainState(pid peer.ID, chainState *pb.Status) { - p.lock.Lock() - defer p.lock.Unlock() + p.store.Lock() + defer p.store.Unlock() - status := p.fetch(pid) - status.chainState = chainState - status.chainStateLastUpdated = roughtime.Now() + peerData := p.fetch(pid) + peerData.chainState = chainState + peerData.chainStateLastUpdated = roughtime.Now() } // ChainState gets the chain state of the given remote peer. // This can return nil if there is no known chain state for the peer. // This will error if the peer does not exist. func (p *Status) ChainState(pid peer.ID) (*pb.Status, error) { - p.lock.RLock() - defer p.lock.RUnlock() + p.store.RLock() + defer p.store.RUnlock() - if status, ok := p.status[pid]; ok { - return status.chainState, nil + if peerData, ok := p.store.peers[pid]; ok { + return peerData.chainState, nil } return nil, ErrPeerUnknown } // IsActive checks if a peers is active and returns the result appropriately. func (p *Status) IsActive(pid peer.ID) bool { - p.lock.RLock() - defer p.lock.RUnlock() + p.store.RLock() + defer p.store.RUnlock() - status, ok := p.status[pid] - return ok && (status.peerState == PeerConnected || status.peerState == PeerConnecting) + peerData, ok := p.store.peers[pid] + return ok && (peerData.connState == PeerConnected || peerData.connState == PeerConnecting) } // SetMetadata sets the metadata of the given remote peer. func (p *Status) SetMetadata(pid peer.ID, metaData *pb.MetaData) { - p.lock.Lock() - defer p.lock.Unlock() + p.store.Lock() + defer p.store.Unlock() - status := p.fetch(pid) - status.metaData = metaData + peerData := p.fetch(pid) + peerData.metaData = metaData } // Metadata returns a copy of the metadata corresponding to the provided // peer id. func (p *Status) Metadata(pid peer.ID) (*pb.MetaData, error) { - p.lock.RLock() - defer p.lock.RUnlock() + p.store.RLock() + defer p.store.RUnlock() - if status, ok := p.status[pid]; ok { - return proto.Clone(status.metaData).(*pb.MetaData), nil + if peerData, ok := p.store.peers[pid]; ok { + return proto.Clone(peerData.metaData).(*pb.MetaData), nil } return nil, ErrPeerUnknown } // CommitteeIndices retrieves the committee subnets the peer is subscribed to. func (p *Status) CommitteeIndices(pid peer.ID) ([]uint64, error) { - p.lock.RLock() - defer p.lock.RUnlock() + p.store.RLock() + defer p.store.RUnlock() - if status, ok := p.status[pid]; ok { - if status.enr == nil || status.metaData == nil { + if peerData, ok := p.store.peers[pid]; ok { + if peerData.enr == nil || peerData.metaData == nil { return []uint64{}, nil } - return retrieveIndicesFromBitfield(status.metaData.Attnets), nil + return retrieveIndicesFromBitfield(peerData.metaData.Attnets), nil } return nil, ErrPeerUnknown } @@ -232,15 +229,15 @@ func (p *Status) CommitteeIndices(pid peer.ID) ([]uint64, error) { // SubscribedToSubnet retrieves the peers subscribed to the given // committee subnet. func (p *Status) SubscribedToSubnet(index uint64) []peer.ID { - p.lock.RLock() - defer p.lock.RUnlock() + p.store.RLock() + defer p.store.RUnlock() peers := make([]peer.ID, 0) - for pid, status := range p.status { + for pid, peerData := range p.store.peers { // look at active peers - connectedStatus := status.peerState == PeerConnecting || status.peerState == PeerConnected - if connectedStatus && status.metaData != nil && status.metaData.Attnets != nil { - indices := retrieveIndicesFromBitfield(status.metaData.Attnets) + connectedStatus := peerData.connState == PeerConnecting || peerData.connState == PeerConnected + if connectedStatus && peerData.metaData != nil && peerData.metaData.Attnets != nil { + indices := retrieveIndicesFromBitfield(peerData.metaData.Attnets) for _, idx := range indices { if idx == index { peers = append(peers, pid) @@ -254,21 +251,21 @@ func (p *Status) SubscribedToSubnet(index uint64) []peer.ID { // SetConnectionState sets the connection state of the given remote peer. func (p *Status) SetConnectionState(pid peer.ID, state PeerConnectionState) { - p.lock.Lock() - defer p.lock.Unlock() + p.store.Lock() + defer p.store.Unlock() - status := p.fetch(pid) - status.peerState = state + peerData := p.fetch(pid) + peerData.connState = state } // ConnectionState gets the connection state of the given remote peer. // This will error if the peer does not exist. func (p *Status) ConnectionState(pid peer.ID) (PeerConnectionState, error) { - p.lock.RLock() - defer p.lock.RUnlock() + p.store.RLock() + defer p.store.RUnlock() - if status, ok := p.status[pid]; ok { - return status.peerState, nil + if peerData, ok := p.store.peers[pid]; ok { + return peerData.connState, nil } return PeerDisconnected, ErrPeerUnknown } @@ -276,32 +273,32 @@ func (p *Status) ConnectionState(pid peer.ID) (PeerConnectionState, error) { // ChainStateLastUpdated gets the last time the chain state of the given remote peer was updated. // This will error if the peer does not exist. func (p *Status) ChainStateLastUpdated(pid peer.ID) (time.Time, error) { - p.lock.RLock() - defer p.lock.RUnlock() + p.store.RLock() + defer p.store.RUnlock() - if status, ok := p.status[pid]; ok { - return status.chainStateLastUpdated, nil + if peerData, ok := p.store.peers[pid]; ok { + return peerData.chainStateLastUpdated, nil } return roughtime.Now(), ErrPeerUnknown } // IncrementBadResponses increments the number of bad responses we have received from the given remote peer. func (p *Status) IncrementBadResponses(pid peer.ID) { - p.lock.Lock() - defer p.lock.Unlock() + p.store.Lock() + defer p.store.Unlock() - status := p.fetch(pid) - status.badResponses++ + peerData := p.fetch(pid) + peerData.badResponsesCount++ } // BadResponses obtains the number of bad responses we have received from the given remote peer. // This will error if the peer does not exist. func (p *Status) BadResponses(pid peer.ID) (int, error) { - p.lock.RLock() - defer p.lock.RUnlock() + p.store.RLock() + defer p.store.RUnlock() - if status, ok := p.status[pid]; ok { - return status.badResponses, nil + if peerData, ok := p.store.peers[pid]; ok { + return peerData.badResponsesCount, nil } return -1, ErrPeerUnknown } @@ -309,22 +306,22 @@ func (p *Status) BadResponses(pid peer.ID) (int, error) { // IsBad states if the peer is to be considered bad. // If the peer is unknown this will return `false`, which makes using this function easier than returning an error. func (p *Status) IsBad(pid peer.ID) bool { - p.lock.RLock() - defer p.lock.RUnlock() + p.store.RLock() + defer p.store.RUnlock() - if status, ok := p.status[pid]; ok { - return status.badResponses >= p.maxBadResponses + if peerData, ok := p.store.peers[pid]; ok { + return peerData.badResponsesCount >= p.config.MaxBadResponses } return false } // Connecting returns the peers that are connecting. func (p *Status) Connecting() []peer.ID { - p.lock.RLock() - defer p.lock.RUnlock() + p.store.RLock() + defer p.store.RUnlock() peers := make([]peer.ID, 0) - for pid, status := range p.status { - if status.peerState == PeerConnecting { + for pid, peerData := range p.store.peers { + if peerData.connState == PeerConnecting { peers = append(peers, pid) } } @@ -333,11 +330,11 @@ func (p *Status) Connecting() []peer.ID { // Connected returns the peers that are connected. func (p *Status) Connected() []peer.ID { - p.lock.RLock() - defer p.lock.RUnlock() + p.store.RLock() + defer p.store.RUnlock() peers := make([]peer.ID, 0) - for pid, status := range p.status { - if status.peerState == PeerConnected { + for pid, peerData := range p.store.peers { + if peerData.connState == PeerConnected { peers = append(peers, pid) } } @@ -346,11 +343,11 @@ func (p *Status) Connected() []peer.ID { // Active returns the peers that are connecting or connected. func (p *Status) Active() []peer.ID { - p.lock.RLock() - defer p.lock.RUnlock() + p.store.RLock() + defer p.store.RUnlock() peers := make([]peer.ID, 0) - for pid, status := range p.status { - if status.peerState == PeerConnecting || status.peerState == PeerConnected { + for pid, peerData := range p.store.peers { + if peerData.connState == PeerConnecting || peerData.connState == PeerConnected { peers = append(peers, pid) } } @@ -359,11 +356,11 @@ func (p *Status) Active() []peer.ID { // Disconnecting returns the peers that are disconnecting. func (p *Status) Disconnecting() []peer.ID { - p.lock.RLock() - defer p.lock.RUnlock() + p.store.RLock() + defer p.store.RUnlock() peers := make([]peer.ID, 0) - for pid, status := range p.status { - if status.peerState == PeerDisconnecting { + for pid, peerData := range p.store.peers { + if peerData.connState == PeerDisconnecting { peers = append(peers, pid) } } @@ -372,11 +369,11 @@ func (p *Status) Disconnecting() []peer.ID { // Disconnected returns the peers that are disconnected. func (p *Status) Disconnected() []peer.ID { - p.lock.RLock() - defer p.lock.RUnlock() + p.store.RLock() + defer p.store.RUnlock() peers := make([]peer.ID, 0) - for pid, status := range p.status { - if status.peerState == PeerDisconnected { + for pid, peerData := range p.store.peers { + if peerData.connState == PeerDisconnected { peers = append(peers, pid) } } @@ -385,11 +382,11 @@ func (p *Status) Disconnected() []peer.ID { // Inactive returns the peers that are disconnecting or disconnected. func (p *Status) Inactive() []peer.ID { - p.lock.RLock() - defer p.lock.RUnlock() + p.store.RLock() + defer p.store.RUnlock() peers := make([]peer.ID, 0) - for pid, status := range p.status { - if status.peerState == PeerDisconnecting || status.peerState == PeerDisconnected { + for pid, peerData := range p.store.peers { + if peerData.connState == PeerDisconnecting || peerData.connState == PeerDisconnected { peers = append(peers, pid) } } @@ -398,11 +395,11 @@ func (p *Status) Inactive() []peer.ID { // Bad returns the peers that are bad. func (p *Status) Bad() []peer.ID { - p.lock.RLock() - defer p.lock.RUnlock() + p.store.RLock() + defer p.store.RUnlock() peers := make([]peer.ID, 0) - for pid, status := range p.status { - if status.badResponses >= p.maxBadResponses { + for pid, peerData := range p.store.peers { + if peerData.badResponsesCount >= p.config.MaxBadResponses { peers = append(peers, pid) } } @@ -411,10 +408,10 @@ func (p *Status) Bad() []peer.ID { // All returns all the peers regardless of state. func (p *Status) All() []peer.ID { - p.lock.RLock() - defer p.lock.RUnlock() - pids := make([]peer.ID, 0, len(p.status)) - for pid := range p.status { + p.store.RLock() + defer p.store.RUnlock() + pids := make([]peer.ID, 0, len(p.store.peers)) + for pid := range p.store.peers { pids = append(pids, pid) } return pids @@ -424,42 +421,40 @@ func (p *Status) All() []peer.ID { // This can be run periodically, although note that each time it runs it does give all bad peers another chance as well to clog up // the network with bad responses, so should not be run too frequently; once an hour would be reasonable. func (p *Status) Decay() { - p.lock.Lock() - defer p.lock.Unlock() - for _, status := range p.status { - if status.badResponses > 0 { - status.badResponses-- + p.store.Lock() + defer p.store.Unlock() + for _, peerData := range p.store.peers { + if peerData.badResponsesCount > 0 { + peerData.badResponsesCount-- } } } // Prune clears out and removes outdated and disconnected peers. func (p *Status) Prune() { - currSize := p.totalSize() - // Exit early if there is nothing - // to prune. - if currSize <= p.maxLimit { + p.store.Lock() + defer p.store.Unlock() + + // Exit early if there is nothing to prune. + if len(p.store.peers) <= p.store.config.maxPeers { return } - disconnected := p.Disconnected() type peerResp struct { pid peer.ID badResp int } - peersToPrune := make([]*peerResp, 0, len(disconnected)) - p.lock.RLock() + peersToPrune := make([]*peerResp, 0) // Select disconnected peers with a smaller // bad response count. - for _, pid := range disconnected { - if p.status[pid].badResponses < p.maxBadResponses { + for pid, peerData := range p.store.peers { + if peerData.connState == PeerDisconnected && peerData.badResponsesCount < p.config.MaxBadResponses { peersToPrune = append(peersToPrune, &peerResp{ pid: pid, - badResp: p.status[pid].badResponses, + badResp: p.store.peers[pid].badResponsesCount, }) } } - p.lock.RUnlock() // Sort peers in ascending order, so the peers with the // least amount of bad responses are pruned first. This @@ -469,7 +464,7 @@ func (p *Status) Prune() { return peersToPrune[i].badResp < peersToPrune[j].badResp }) - limitDiff := currSize - p.maxLimit + limitDiff := len(p.store.peers) - p.store.config.maxPeers if limitDiff > len(peersToPrune) { limitDiff = len(peersToPrune) @@ -477,11 +472,9 @@ func (p *Status) Prune() { peersToPrune = peersToPrune[:limitDiff] - p.lock.Lock() - defer p.lock.Unlock() // Delete peers from map. - for _, peerRes := range peersToPrune { - delete(p.status, peerRes.pid) + for _, peerData := range peersToPrune { + delete(p.store.peers, peerData.pid) } } @@ -536,32 +529,26 @@ func (p *Status) BestFinalized(maxPeers int, ourFinalizedEpoch uint64) (uint64, } // fetch is a helper function that fetches a peer status, possibly creating it. -func (p *Status) fetch(pid peer.ID) *peerStatus { - if _, ok := p.status[pid]; !ok { - p.status[pid] = &peerStatus{} +func (p *Status) fetch(pid peer.ID) *peerData { + if _, ok := p.store.peers[pid]; !ok { + p.store.peers[pid] = &peerData{} } - return p.status[pid] + return p.store.peers[pid] } // HighestEpoch returns the highest epoch reported epoch amongst peers. func (p *Status) HighestEpoch() uint64 { - p.lock.RLock() - defer p.lock.RUnlock() + p.store.RLock() + defer p.store.RUnlock() var highestSlot uint64 - for _, ps := range p.status { - if ps != nil && ps.chainState != nil && ps.chainState.HeadSlot > highestSlot { - highestSlot = ps.chainState.HeadSlot + for _, peerData := range p.store.peers { + if peerData != nil && peerData.chainState != nil && peerData.chainState.HeadSlot > highestSlot { + highestSlot = peerData.chainState.HeadSlot } } return helpers.SlotToEpoch(highestSlot) } -func (p *Status) totalSize() int { - p.lock.RLock() - defer p.lock.RUnlock() - return len(p.status) -} - func retrieveIndicesFromBitfield(bitV bitfield.Bitvector64) []uint64 { committeeIdxs := make([]uint64, 0, bitV.Count()) for i := uint64(0); i < 64; i++ { diff --git a/beacon-chain/p2p/peers/status_test.go b/beacon-chain/p2p/peers/status_test.go index f785942249..5603dd854a 100644 --- a/beacon-chain/p2p/peers/status_test.go +++ b/beacon-chain/p2p/peers/status_test.go @@ -1,6 +1,7 @@ package peers_test import ( + "context" "crypto/rand" "testing" @@ -18,14 +19,20 @@ import ( func TestStatus(t *testing.T) { maxBadResponses := 2 - p := peers.NewStatus(maxBadResponses, 30) + p := peers.NewStatus(context.Background(), &peers.StatusConfig{ + PeerLimit: 30, + MaxBadResponses: maxBadResponses, + }) require.NotNil(t, p, "p not created") assert.Equal(t, maxBadResponses, p.MaxBadResponses(), "maxBadResponses incorrect value") } func TestPeerExplicitAdd(t *testing.T) { maxBadResponses := 2 - p := peers.NewStatus(maxBadResponses, 30) + p := peers.NewStatus(context.Background(), &peers.StatusConfig{ + PeerLimit: 30, + MaxBadResponses: maxBadResponses, + }) id, err := peer.Decode("16Uiu2HAkyWZ4Ni1TpvDS8dPxsozmHY85KaiFjodQuV6Tz5tkHVeR") require.NoError(t, err, "Failed to create ID") @@ -59,7 +66,10 @@ func TestPeerExplicitAdd(t *testing.T) { func TestPeerNoENR(t *testing.T) { maxBadResponses := 2 - p := peers.NewStatus(maxBadResponses, 30) + p := peers.NewStatus(context.Background(), &peers.StatusConfig{ + PeerLimit: 30, + MaxBadResponses: maxBadResponses, + }) id, err := peer.Decode("16Uiu2HAkyWZ4Ni1TpvDS8dPxsozmHY85KaiFjodQuV6Tz5tkHVeR") require.NoError(t, err, "Failed to create ID") @@ -76,7 +86,10 @@ func TestPeerNoENR(t *testing.T) { func TestPeerNoOverwriteENR(t *testing.T) { maxBadResponses := 2 - p := peers.NewStatus(maxBadResponses, 30) + p := peers.NewStatus(context.Background(), &peers.StatusConfig{ + PeerLimit: 30, + MaxBadResponses: maxBadResponses, + }) id, err := peer.Decode("16Uiu2HAkyWZ4Ni1TpvDS8dPxsozmHY85KaiFjodQuV6Tz5tkHVeR") require.NoError(t, err, "Failed to create ID") @@ -96,7 +109,10 @@ func TestPeerNoOverwriteENR(t *testing.T) { func TestErrUnknownPeer(t *testing.T) { maxBadResponses := 2 - p := peers.NewStatus(maxBadResponses, 30) + p := peers.NewStatus(context.Background(), &peers.StatusConfig{ + PeerLimit: 30, + MaxBadResponses: maxBadResponses, + }) id, err := peer.Decode("16Uiu2HAkyWZ4Ni1TpvDS8dPxsozmHY85KaiFjodQuV6Tz5tkHVeR") require.NoError(t, err) @@ -122,7 +138,10 @@ func TestErrUnknownPeer(t *testing.T) { func TestPeerCommitteeIndices(t *testing.T) { maxBadResponses := 2 - p := peers.NewStatus(maxBadResponses, 30) + p := peers.NewStatus(context.Background(), &peers.StatusConfig{ + PeerLimit: 30, + MaxBadResponses: maxBadResponses, + }) id, err := peer.Decode("16Uiu2HAkyWZ4Ni1TpvDS8dPxsozmHY85KaiFjodQuV6Tz5tkHVeR") require.NoError(t, err, "Failed to create ID") @@ -152,7 +171,10 @@ func TestPeerCommitteeIndices(t *testing.T) { func TestPeerSubscribedToSubnet(t *testing.T) { maxBadResponses := 2 - p := peers.NewStatus(maxBadResponses, 30) + p := peers.NewStatus(context.Background(), &peers.StatusConfig{ + PeerLimit: 30, + MaxBadResponses: maxBadResponses, + }) // Add some peers with different states numPeers := 2 @@ -189,7 +211,10 @@ func TestPeerSubscribedToSubnet(t *testing.T) { func TestPeerImplicitAdd(t *testing.T) { maxBadResponses := 2 - p := peers.NewStatus(maxBadResponses, 30) + p := peers.NewStatus(context.Background(), &peers.StatusConfig{ + PeerLimit: 30, + MaxBadResponses: maxBadResponses, + }) id, err := peer.Decode("16Uiu2HAkyWZ4Ni1TpvDS8dPxsozmHY85KaiFjodQuV6Tz5tkHVeR") require.NoError(t, err) @@ -205,7 +230,10 @@ func TestPeerImplicitAdd(t *testing.T) { func TestPeerChainState(t *testing.T) { maxBadResponses := 2 - p := peers.NewStatus(maxBadResponses, 30) + p := peers.NewStatus(context.Background(), &peers.StatusConfig{ + PeerLimit: 30, + MaxBadResponses: maxBadResponses, + }) id, err := peer.Decode("16Uiu2HAkyWZ4Ni1TpvDS8dPxsozmHY85KaiFjodQuV6Tz5tkHVeR") require.NoError(t, err) @@ -233,7 +261,10 @@ func TestPeerChainState(t *testing.T) { func TestPeerBadResponses(t *testing.T) { maxBadResponses := 2 - p := peers.NewStatus(maxBadResponses, 30) + p := peers.NewStatus(context.Background(), &peers.StatusConfig{ + PeerLimit: 30, + MaxBadResponses: maxBadResponses, + }) id, err := peer.Decode("16Uiu2HAkyWZ4Ni1TpvDS8dPxsozmHY85KaiFjodQuV6Tz5tkHVeR") require.NoError(t, err) @@ -275,7 +306,10 @@ func TestPeerBadResponses(t *testing.T) { func TestAddMetaData(t *testing.T) { maxBadResponses := 2 - p := peers.NewStatus(maxBadResponses, 30) + p := peers.NewStatus(context.Background(), &peers.StatusConfig{ + PeerLimit: 30, + MaxBadResponses: maxBadResponses, + }) // Add some peers with different states numPeers := 5 @@ -297,7 +331,10 @@ func TestAddMetaData(t *testing.T) { func TestPeerConnectionStatuses(t *testing.T) { maxBadResponses := 2 - p := peers.NewStatus(maxBadResponses, 30) + p := peers.NewStatus(context.Background(), &peers.StatusConfig{ + PeerLimit: 30, + MaxBadResponses: maxBadResponses, + }) // Add some peers with different states numPeersDisconnected := 11 @@ -332,7 +369,10 @@ func TestPeerConnectionStatuses(t *testing.T) { func TestDecay(t *testing.T) { maxBadResponses := 2 - p := peers.NewStatus(maxBadResponses, 30) + p := peers.NewStatus(context.Background(), &peers.StatusConfig{ + PeerLimit: 30, + MaxBadResponses: maxBadResponses, + }) // Peer 1 has 0 bad responses. pid1 := addPeer(t, p, peers.PeerConnected) @@ -361,7 +401,10 @@ func TestDecay(t *testing.T) { func TestPrune(t *testing.T) { maxBadResponses := 2 - p := peers.NewStatus(maxBadResponses, 30) + p := peers.NewStatus(context.Background(), &peers.StatusConfig{ + PeerLimit: 30, + MaxBadResponses: maxBadResponses, + }) for i := 0; i < p.MaxPeerLimit()+100; i++ { if i%7 == 0 { @@ -403,7 +446,10 @@ func TestPrune(t *testing.T) { } func TestTrimmedOrderedPeers(t *testing.T) { - p := peers.NewStatus(1, 30) + p := peers.NewStatus(context.Background(), &peers.StatusConfig{ + PeerLimit: 30, + MaxBadResponses: 1, + }) expectedTarget := uint64(2) maxPeers := 3 @@ -461,7 +507,10 @@ func TestBestPeer(t *testing.T) { expectedFinEpoch := uint64(4) expectedRoot := [32]byte{'t', 'e', 's', 't'} junkRoot := [32]byte{'j', 'u', 'n', 'k'} - p := peers.NewStatus(maxBadResponses, 30) + p := peers.NewStatus(context.Background(), &peers.StatusConfig{ + PeerLimit: 30, + MaxBadResponses: maxBadResponses, + }) // Peer 1 pid1 := addPeer(t, p, peers.PeerConnected) @@ -506,7 +555,10 @@ func TestBestPeer(t *testing.T) { func TestBestFinalized_returnsMaxValue(t *testing.T) { maxBadResponses := 2 maxPeers := 10 - p := peers.NewStatus(maxBadResponses, 30) + p := peers.NewStatus(context.Background(), &peers.StatusConfig{ + PeerLimit: 30, + MaxBadResponses: maxBadResponses, + }) for i := 0; i <= maxPeers+100; i++ { p.Add(new(enr.Record), peer.ID(i), nil, network.DirOutbound) @@ -522,7 +574,10 @@ func TestBestFinalized_returnsMaxValue(t *testing.T) { func TestStatus_CurrentEpoch(t *testing.T) { maxBadResponses := 2 - p := peers.NewStatus(maxBadResponses, 30) + p := peers.NewStatus(context.Background(), &peers.StatusConfig{ + PeerLimit: 30, + MaxBadResponses: maxBadResponses, + }) // Peer 1 pid1 := addPeer(t, p, peers.PeerConnected) p.SetChainState(pid1, &pb.Status{ diff --git a/beacon-chain/p2p/peers/store.go b/beacon-chain/p2p/peers/store.go new file mode 100644 index 0000000000..4c676bca50 --- /dev/null +++ b/beacon-chain/p2p/peers/store.go @@ -0,0 +1,49 @@ +package peers + +import ( + "context" + "sync" + "time" + + "github.com/ethereum/go-ethereum/p2p/enr" + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + ma "github.com/multiformats/go-multiaddr" + pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" +) + +// peerDataStore is a container for various peer related data (both protocol and app level). +// Container implements RWMutex, so data access can be restricted on the container level. This allows +// different components rely on the very same peer map container. +type peerDataStore struct { + sync.RWMutex + ctx context.Context + config *peerDataStoreConfig + peers map[peer.ID]*peerData +} + +// peerDataStoreConfig holds peer store parameters. +type peerDataStoreConfig struct { + maxPeers int +} + +// peerData aggregates protocol and application level info about a single peer. +type peerData struct { + address ma.Multiaddr + direction network.Direction + connState PeerConnectionState + chainState *pb.Status + enr *enr.Record + metaData *pb.MetaData + chainStateLastUpdated time.Time + badResponsesCount int +} + +// newPeerDataStore creates peer store. +func newPeerDataStore(ctx context.Context, config *peerDataStoreConfig) *peerDataStore { + return &peerDataStore{ + ctx: ctx, + config: config, + peers: make(map[peer.ID]*peerData), + } +} diff --git a/beacon-chain/p2p/service.go b/beacon-chain/p2p/service.go index 503b7b0638..e309a9d26e 100644 --- a/beacon-chain/p2p/service.go +++ b/beacon-chain/p2p/service.go @@ -149,7 +149,10 @@ func NewService(cfg *Config) (*Service, error) { } s.pubsub = gs - s.peers = peers.NewStatus(maxBadResponses, int(s.cfg.MaxPeers)) + s.peers = peers.NewStatus(ctx, &peers.StatusConfig{ + PeerLimit: int(s.cfg.MaxPeers), + MaxBadResponses: maxBadResponses, + }) return s, nil } diff --git a/beacon-chain/p2p/testing/mock_peersprovider.go b/beacon-chain/p2p/testing/mock_peersprovider.go index 3517abf3fc..2b31a701d3 100644 --- a/beacon-chain/p2p/testing/mock_peersprovider.go +++ b/beacon-chain/p2p/testing/mock_peersprovider.go @@ -1,6 +1,7 @@ package testing import ( + "context" "sync" "github.com/ethereum/go-ethereum/crypto" @@ -25,7 +26,10 @@ func (m *MockPeersProvider) Peers() *peers.Status { m.lock.Lock() defer m.lock.Unlock() if m.peers == nil { - m.peers = peers.NewStatus(5, 30) + m.peers = peers.NewStatus(context.Background(), &peers.StatusConfig{ + PeerLimit: 30, + MaxBadResponses: 5, + }) // Pretend we are connected to two peers id0, err := peer.Decode("16Uiu2HAkyWZ4Ni1TpvDS8dPxsozmHY85KaiFjodQuV6Tz5tkHVeR") if err != nil { diff --git a/beacon-chain/p2p/testing/p2p.go b/beacon-chain/p2p/testing/p2p.go index ca0f32cf72..5feeef6bf8 100644 --- a/beacon-chain/p2p/testing/p2p.go +++ b/beacon-chain/p2p/testing/p2p.go @@ -53,12 +53,16 @@ func NewTestP2P(t *testing.T) *TestP2P { t.Fatal(err) } + peerStatuses := peers.NewStatus(context.Background(), &peers.StatusConfig{ + PeerLimit: 30, + MaxBadResponses: 5, + }) return &TestP2P{ t: t, BHost: h, pubsub: ps, joinedTopics: map[string]*pubsub.Topic{}, - peers: peers.NewStatus(5, 30), + peers: peerStatuses, } }