mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-10 07:58:22 -05:00
Wait for subnet peers before broadcasting onto attestation subnet topic (#6893)
* Initial pass * Add metric to measure success * Use a subnet RWLock to prevent duplicate requests, give up after 3 attempts * push latest commented code * try with non-blocking broadcast * Add feature flag, ignore parent deadline if any * Add slot as metadata * add tests * gaz Co-authored-by: nisdas <nishdas93@gmail.com> Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
This commit is contained in:
@@ -118,6 +118,7 @@ go_test(
|
||||
"//beacon-chain/p2p/testing:go_default_library",
|
||||
"//proto/beacon/p2p/v1:go_default_library",
|
||||
"//proto/testing:go_default_library",
|
||||
"//shared/featureconfig:go_default_library",
|
||||
"//shared/iputils:go_default_library",
|
||||
"//shared/p2putils:go_default_library",
|
||||
"//shared/params:go_default_library",
|
||||
|
||||
@@ -5,11 +5,14 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/pkg/errors"
|
||||
eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/shared/featureconfig"
|
||||
"github.com/prysmaticlabs/prysm/shared/hashutil"
|
||||
"github.com/prysmaticlabs/prysm/shared/params"
|
||||
"github.com/prysmaticlabs/prysm/shared/traceutil"
|
||||
"go.opencensus.io/trace"
|
||||
)
|
||||
@@ -18,10 +21,18 @@ import (
|
||||
// GossipTypeMapping.
|
||||
var ErrMessageNotMapped = errors.New("message type is not mapped to a PubSub topic")
|
||||
|
||||
// Max number of attempts to search the network for a specific subnet.
|
||||
const maxSubnetDiscoveryAttempts = 1
|
||||
|
||||
// Broadcast a message to the p2p network.
|
||||
func (s *Service) Broadcast(ctx context.Context, msg proto.Message) error {
|
||||
ctx, span := trace.StartSpan(ctx, "p2p.Broadcast")
|
||||
defer span.End()
|
||||
|
||||
twoSlots := time.Duration(2*params.BeaconConfig().SecondsPerSlot) * time.Second
|
||||
ctx, cancel := context.WithTimeout(ctx, twoSlots)
|
||||
defer cancel()
|
||||
|
||||
forkDigest, err := s.forkDigest()
|
||||
if err != nil {
|
||||
err := errors.Wrap(err, "could not retrieve fork digest")
|
||||
@@ -47,7 +58,67 @@ func (s *Service) BroadcastAttestation(ctx context.Context, subnet uint64, att *
|
||||
traceutil.AnnotateError(span, err)
|
||||
return err
|
||||
}
|
||||
return s.broadcastObject(ctx, att, attestationToTopic(subnet, forkDigest))
|
||||
|
||||
if featureconfig.Get().EnableAttBroadcastDiscoveryAttempts {
|
||||
// Non-blocking broadcast.
|
||||
go s.broadcastAttestation(ctx, subnet, att, forkDigest)
|
||||
} else {
|
||||
return s.broadcastObject(ctx, att, attestationToTopic(subnet, forkDigest))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) broadcastAttestation(ctx context.Context, subnet uint64, att *eth.Attestation, forkDigest [4]byte) {
|
||||
ctx, span := trace.StartSpan(ctx, "p2p.broadcastAttestation")
|
||||
defer span.End()
|
||||
ctx = trace.NewContext(context.Background(), span) // clear parent context / deadline.
|
||||
|
||||
oneEpoch := time.Duration(1*params.BeaconConfig().SlotsPerEpoch*params.BeaconConfig().SecondsPerSlot) * time.Second
|
||||
ctx, cancel := context.WithTimeout(ctx, oneEpoch)
|
||||
defer cancel()
|
||||
|
||||
// Ensure we have peers with this subnet.
|
||||
s.subnetLocker(subnet).RLock()
|
||||
hasPeer := s.hasPeerWithSubnet(subnet)
|
||||
s.subnetLocker(subnet).RUnlock()
|
||||
|
||||
span.AddAttributes(
|
||||
trace.BoolAttribute("hasPeer", hasPeer),
|
||||
trace.Int64Attribute("slot", int64(att.Data.Slot)),
|
||||
trace.Int64Attribute("subnet", int64(subnet)),
|
||||
)
|
||||
|
||||
attestationBroadcastAttempts.Inc()
|
||||
|
||||
if !hasPeer {
|
||||
if err := func() error {
|
||||
s.subnetLocker(subnet).Lock()
|
||||
defer s.subnetLocker(subnet).Unlock()
|
||||
for i := 0; i < maxSubnetDiscoveryAttempts; i++ {
|
||||
if err := ctx.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
ok, err := s.FindPeersWithSubnet(ctx, subnet)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if ok {
|
||||
savedAttestationBroadcasts.Inc()
|
||||
break
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}(); err != nil {
|
||||
log.WithError(err).Error("Failed to find peers")
|
||||
traceutil.AnnotateError(span, err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := s.broadcastObject(ctx, att, attestationToTopic(subnet, forkDigest)); err != nil {
|
||||
log.WithError(err).Error("Failed to broadcast attestation")
|
||||
traceutil.AnnotateError(span, err)
|
||||
}
|
||||
}
|
||||
|
||||
// method to broadcast messages to other peers in our gossip mesh.
|
||||
|
||||
@@ -3,17 +3,25 @@ package p2p
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"reflect"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
|
||||
|
||||
"github.com/ethereum/go-ethereum/p2p/discover"
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/libp2p/go-libp2p-core/host"
|
||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||
eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
|
||||
"github.com/prysmaticlabs/go-bitfield"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers"
|
||||
p2ptest "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
|
||||
testpb "github.com/prysmaticlabs/prysm/proto/testing"
|
||||
"github.com/prysmaticlabs/prysm/shared/featureconfig"
|
||||
"github.com/prysmaticlabs/prysm/shared/testutil"
|
||||
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
|
||||
"github.com/prysmaticlabs/prysm/shared/testutil/require"
|
||||
@@ -129,3 +137,261 @@ func TestService_Attestation_Subnet(t *testing.T) {
|
||||
assert.Equal(t, tt.topic, attestationToTopic(subnet, [4]byte{} /* fork digest */), "Wrong topic")
|
||||
}
|
||||
}
|
||||
|
||||
func TestService_BroadcastAttestation(t *testing.T) {
|
||||
p1 := p2ptest.NewTestP2P(t)
|
||||
p2 := p2ptest.NewTestP2P(t)
|
||||
p1.Connect(p2)
|
||||
if len(p1.BHost.Network().Peers()) == 0 {
|
||||
t.Fatal("No peers")
|
||||
}
|
||||
|
||||
p := &Service{
|
||||
host: p1.BHost,
|
||||
pubsub: p1.PubSub(),
|
||||
joinedTopics: map[string]*pubsub.Topic{},
|
||||
cfg: &Config{},
|
||||
genesisTime: time.Now(),
|
||||
genesisValidatorsRoot: []byte{'A'},
|
||||
subnetsLock: make(map[uint64]*sync.RWMutex),
|
||||
subnetsLockLock: sync.Mutex{},
|
||||
peers: peers.NewStatus(context.Background(), &peers.StatusConfig{
|
||||
ScorerParams: &peers.PeerScorerConfig{},
|
||||
}),
|
||||
}
|
||||
|
||||
msg := ð.Attestation{
|
||||
AggregationBits: bitfield.NewBitlist(7),
|
||||
Data: ð.AttestationData{
|
||||
Slot: 0,
|
||||
CommitteeIndex: 0,
|
||||
BeaconBlockRoot: make([]byte, 32),
|
||||
Source: ð.Checkpoint{
|
||||
Epoch: 0,
|
||||
Root: make([]byte, 32),
|
||||
},
|
||||
Target: ð.Checkpoint{
|
||||
Epoch: 0,
|
||||
Root: make([]byte, 32),
|
||||
},
|
||||
},
|
||||
Signature: make([]byte, 96),
|
||||
}
|
||||
|
||||
subnet := uint64(5)
|
||||
|
||||
topic := AttestationSubnetTopicFormat
|
||||
GossipTypeMapping[reflect.TypeOf(msg)] = topic
|
||||
digest, err := p.forkDigest()
|
||||
require.NoError(t, err)
|
||||
topic = fmt.Sprintf(topic, digest, subnet)
|
||||
|
||||
// External peer subscribes to the topic.
|
||||
topic += p.Encoding().ProtocolSuffix()
|
||||
sub, err := p2.SubscribeToTopic(topic)
|
||||
require.NoError(t, err)
|
||||
|
||||
time.Sleep(50 * time.Millisecond) // libp2p fails without this delay...
|
||||
|
||||
// Async listen for the pubsub, must be before the broadcast.
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
go func(tt *testing.T) {
|
||||
defer wg.Done()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
||||
defer cancel()
|
||||
|
||||
incomingMessage, err := sub.Next(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
result := ð.Attestation{}
|
||||
require.NoError(t, p.Encoding().DecodeGossip(incomingMessage.Data, result))
|
||||
if !proto.Equal(result, msg) {
|
||||
tt.Errorf("Did not receive expected message, got %+v, wanted %+v", result, msg)
|
||||
}
|
||||
}(t)
|
||||
|
||||
// Broadcast to peers and wait.
|
||||
require.NoError(t, p.BroadcastAttestation(context.Background(), subnet, msg))
|
||||
if testutil.WaitTimeout(&wg, 1*time.Second) {
|
||||
t.Error("Failed to receive pubsub within 1s")
|
||||
}
|
||||
}
|
||||
|
||||
func TestService_BroadcastAttestationWithDiscoveryAttempts(t *testing.T) {
|
||||
// Setup bootnode.
|
||||
cfg := &Config{}
|
||||
port := 2000
|
||||
cfg.UDPPort = uint(port)
|
||||
_, pkey := createAddrAndPrivKey(t)
|
||||
ipAddr := net.ParseIP("127.0.0.1")
|
||||
genesisTime := time.Now()
|
||||
genesisValidatorsRoot := make([]byte, 32)
|
||||
s := &Service{
|
||||
cfg: cfg,
|
||||
genesisTime: genesisTime,
|
||||
genesisValidatorsRoot: genesisValidatorsRoot,
|
||||
}
|
||||
bootListener, err := s.createListener(ipAddr, pkey)
|
||||
require.NoError(t, err)
|
||||
defer bootListener.Close()
|
||||
|
||||
// Use shorter period for testing.
|
||||
currentPeriod := pollingPeriod
|
||||
pollingPeriod = 1 * time.Second
|
||||
defer func() {
|
||||
pollingPeriod = currentPeriod
|
||||
}()
|
||||
|
||||
bootNode := bootListener.Self()
|
||||
subnet := uint64(5)
|
||||
|
||||
var listeners []*discover.UDPv5
|
||||
var hosts []host.Host
|
||||
// setup other nodes.
|
||||
cfg = &Config{
|
||||
BootstrapNodeAddr: []string{bootNode.String()},
|
||||
Discv5BootStrapAddr: []string{bootNode.String()},
|
||||
MaxPeers: 30,
|
||||
}
|
||||
// Setup 2 different hosts
|
||||
for i := 1; i <= 2; i++ {
|
||||
h, pkey, ipAddr := createHost(t, port+i)
|
||||
cfg.UDPPort = uint(port + i)
|
||||
cfg.TCPPort = uint(port + i)
|
||||
s := &Service{
|
||||
cfg: cfg,
|
||||
genesisTime: genesisTime,
|
||||
genesisValidatorsRoot: genesisValidatorsRoot,
|
||||
}
|
||||
listener, err := s.startDiscoveryV5(ipAddr, pkey)
|
||||
// Set for 2nd peer
|
||||
if i == 2 {
|
||||
s.dv5Listener = listener
|
||||
s.metaData = new(pb.MetaData)
|
||||
bitV := bitfield.NewBitvector64()
|
||||
bitV.SetBitAt(subnet, true)
|
||||
s.updateSubnetRecordWithMetadata(bitV)
|
||||
}
|
||||
assert.NoError(t, err, "Could not start discovery for node")
|
||||
listeners = append(listeners, listener)
|
||||
hosts = append(hosts, h)
|
||||
}
|
||||
defer func() {
|
||||
// Close down all peers.
|
||||
for _, listener := range listeners {
|
||||
listener.Close()
|
||||
}
|
||||
}()
|
||||
|
||||
// close peers upon exit of test
|
||||
defer func() {
|
||||
for _, h := range hosts {
|
||||
if err := h.Close(); err != nil {
|
||||
t.Log(err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
f := featureconfig.Get()
|
||||
f.EnableAttBroadcastDiscoveryAttempts = true
|
||||
rst := featureconfig.InitWithReset(f)
|
||||
defer rst()
|
||||
|
||||
ps1, err := pubsub.NewFloodSub(context.Background(), hosts[0],
|
||||
pubsub.WithMessageSigning(false),
|
||||
pubsub.WithStrictSignatureVerification(false),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
ps2, err := pubsub.NewFloodSub(context.Background(), hosts[1],
|
||||
pubsub.WithMessageSigning(false),
|
||||
pubsub.WithStrictSignatureVerification(false),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
p := &Service{
|
||||
host: hosts[0],
|
||||
pubsub: ps1,
|
||||
dv5Listener: listeners[0],
|
||||
joinedTopics: map[string]*pubsub.Topic{},
|
||||
cfg: &Config{},
|
||||
genesisTime: time.Now(),
|
||||
genesisValidatorsRoot: []byte{'A'},
|
||||
subnetsLock: make(map[uint64]*sync.RWMutex),
|
||||
subnetsLockLock: sync.Mutex{},
|
||||
peers: peers.NewStatus(context.Background(), &peers.StatusConfig{
|
||||
ScorerParams: &peers.PeerScorerConfig{},
|
||||
}),
|
||||
}
|
||||
|
||||
p2 := &Service{
|
||||
host: hosts[1],
|
||||
pubsub: ps2,
|
||||
dv5Listener: listeners[1],
|
||||
joinedTopics: map[string]*pubsub.Topic{},
|
||||
cfg: &Config{},
|
||||
genesisTime: time.Now(),
|
||||
genesisValidatorsRoot: []byte{'A'},
|
||||
subnetsLock: make(map[uint64]*sync.RWMutex),
|
||||
subnetsLockLock: sync.Mutex{},
|
||||
peers: peers.NewStatus(context.Background(), &peers.StatusConfig{
|
||||
ScorerParams: &peers.PeerScorerConfig{},
|
||||
}),
|
||||
}
|
||||
|
||||
msg := ð.Attestation{
|
||||
AggregationBits: bitfield.NewBitlist(7),
|
||||
Data: ð.AttestationData{
|
||||
Slot: 0,
|
||||
CommitteeIndex: 0,
|
||||
BeaconBlockRoot: make([]byte, 32),
|
||||
Source: ð.Checkpoint{
|
||||
Epoch: 0,
|
||||
Root: make([]byte, 32),
|
||||
},
|
||||
Target: ð.Checkpoint{
|
||||
Epoch: 0,
|
||||
Root: make([]byte, 32),
|
||||
},
|
||||
},
|
||||
Signature: make([]byte, 96),
|
||||
}
|
||||
|
||||
topic := AttestationSubnetTopicFormat
|
||||
GossipTypeMapping[reflect.TypeOf(msg)] = topic
|
||||
digest, err := p.forkDigest()
|
||||
require.NoError(t, err)
|
||||
topic = fmt.Sprintf(topic, digest, subnet)
|
||||
|
||||
// External peer subscribes to the topic.
|
||||
topic += p.Encoding().ProtocolSuffix()
|
||||
sub, err := p2.SubscribeToTopic(topic)
|
||||
require.NoError(t, err)
|
||||
|
||||
time.Sleep(50 * time.Millisecond) // libp2p fails without this delay...
|
||||
|
||||
// Async listen for the pubsub, must be before the broadcast.
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
go func(tt *testing.T) {
|
||||
defer wg.Done()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
||||
defer cancel()
|
||||
|
||||
incomingMessage, err := sub.Next(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
result := ð.Attestation{}
|
||||
require.NoError(t, p.Encoding().DecodeGossip(incomingMessage.Data, result))
|
||||
if !proto.Equal(result, msg) {
|
||||
tt.Errorf("Did not receive expected message, got %+v, wanted %+v", result, msg)
|
||||
}
|
||||
}(t)
|
||||
|
||||
// Broadcast to peers and wait.
|
||||
require.NoError(t, p.BroadcastAttestation(context.Background(), subnet, msg))
|
||||
if testutil.WaitTimeout(&wg, 1*time.Second) {
|
||||
t.Error("Failed to receive pubsub within 5s")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -88,7 +88,7 @@ func (s *Service) listenForNewNodes() {
|
||||
continue
|
||||
}
|
||||
go func(info *peer.AddrInfo) {
|
||||
if err := s.connectWithPeer(*info); err != nil {
|
||||
if err := s.connectWithPeer(s.ctx, *info); err != nil {
|
||||
log.WithError(err).Tracef("Could not connect with peer %s", info.String())
|
||||
}
|
||||
}(peerInfo)
|
||||
|
||||
@@ -73,7 +73,7 @@ type PeerManager interface {
|
||||
Host() host.Host
|
||||
ENR() *enr.Record
|
||||
RefreshENR()
|
||||
FindPeersWithSubnet(index uint64) (bool, error)
|
||||
FindPeersWithSubnet(ctx context.Context, index uint64) (bool, error)
|
||||
AddPingMethod(reqFunc func(ctx context.Context, id peer.ID) error)
|
||||
}
|
||||
|
||||
|
||||
@@ -15,6 +15,16 @@ var (
|
||||
Name: "p2p_repeat_attempts",
|
||||
Help: "The number of repeat attempts the connection handler is triggered for a peer.",
|
||||
})
|
||||
savedAttestationBroadcasts = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "p2p_attestation_subnet_recovered_broadcasts",
|
||||
Help: "The number of attestations that were attempted to be broadcast with no peers on " +
|
||||
"the subnet. The beacon node increments this counter when the broadcast is blocked " +
|
||||
"until a subnet peer can be found.",
|
||||
})
|
||||
attestationBroadcastAttempts = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "p2p_attestation_subnet_attempted_broadcasts",
|
||||
Help: "The number of attestations that were attempted to be broadcast.",
|
||||
})
|
||||
)
|
||||
|
||||
func (s *Service) updateMetrics() {
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
|
||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
|
||||
"github.com/prysmaticlabs/prysm/shared/featureconfig"
|
||||
"github.com/prysmaticlabs/prysm/shared/hashutil"
|
||||
)
|
||||
|
||||
@@ -47,7 +48,24 @@ func (s *Service) PublishToTopic(ctx context.Context, topic string, data []byte,
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return topicHandle.Publish(ctx, data, opts...)
|
||||
|
||||
// If feature flag isn't enabled, don't wait for peers to be present.
|
||||
if !featureconfig.Get().EnableAttBroadcastDiscoveryAttempts {
|
||||
return topicHandle.Publish(ctx, data, opts...)
|
||||
}
|
||||
|
||||
// Wait for at least 1 peer to be available to receive the published message.
|
||||
for {
|
||||
if len(topicHandle.ListPeers()) > 0 {
|
||||
return topicHandle.Publish(ctx, data, opts...)
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// SubscribeToTopic joins (if necessary) and subscribes to PubSub topic.
|
||||
|
||||
@@ -23,6 +23,8 @@ import (
|
||||
filter "github.com/multiformats/go-multiaddr"
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
"github.com/pkg/errors"
|
||||
"go.opencensus.io/trace"
|
||||
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
|
||||
statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/encoder"
|
||||
@@ -73,6 +75,8 @@ type Service struct {
|
||||
pubsub *pubsub.PubSub
|
||||
joinedTopics map[string]*pubsub.Topic
|
||||
joinedTopicsLock sync.Mutex
|
||||
subnetsLock map[uint64]*sync.RWMutex
|
||||
subnetsLockLock sync.Mutex // Lock access to subnetsLock
|
||||
dv5Listener Listener
|
||||
startupErr error
|
||||
stateNotifier statefeed.Notifier
|
||||
@@ -105,6 +109,7 @@ func NewService(cfg *Config) (*Service, error) {
|
||||
exclusionList: cache,
|
||||
isPreGenesis: true,
|
||||
joinedTopics: make(map[string]*pubsub.Topic, len(GossipTopicMappings)),
|
||||
subnetsLock: make(map[uint64]*sync.RWMutex),
|
||||
}
|
||||
|
||||
dv5Nodes := parseBootStrapAddrs(s.cfg.BootstrapNodeAddr)
|
||||
@@ -390,21 +395,24 @@ func (s *Service) connectWithAllPeers(multiAddrs []ma.Multiaddr) {
|
||||
for _, info := range addrInfos {
|
||||
// make each dial non-blocking
|
||||
go func(info peer.AddrInfo) {
|
||||
if err := s.connectWithPeer(info); err != nil {
|
||||
if err := s.connectWithPeer(s.ctx, info); err != nil {
|
||||
log.WithError(err).Tracef("Could not connect with peer %s", info.String())
|
||||
}
|
||||
}(info)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) connectWithPeer(info peer.AddrInfo) error {
|
||||
func (s *Service) connectWithPeer(ctx context.Context, info peer.AddrInfo) error {
|
||||
ctx, span := trace.StartSpan(ctx, "p2p.connectWithPeer")
|
||||
defer span.End()
|
||||
|
||||
if info.ID == s.host.ID() {
|
||||
return nil
|
||||
}
|
||||
if s.Peers().IsBad(info.ID) {
|
||||
return nil
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(s.ctx, maxDialTimeout)
|
||||
ctx, cancel := context.WithTimeout(ctx, maxDialTimeout)
|
||||
defer cancel()
|
||||
if err := s.host.Connect(ctx, info); err != nil {
|
||||
s.Peers().Scorers().BadResponsesScorer().Increment(info.ID)
|
||||
|
||||
@@ -1,10 +1,15 @@
|
||||
package p2p
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||
"github.com/ethereum/go-ethereum/p2p/enr"
|
||||
"github.com/libp2p/go-libp2p-core/network"
|
||||
"github.com/prysmaticlabs/go-bitfield"
|
||||
"go.opencensus.io/trace"
|
||||
|
||||
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
|
||||
"github.com/prysmaticlabs/prysm/shared/params"
|
||||
)
|
||||
@@ -16,7 +21,12 @@ var attSubnetEnrKey = params.BeaconNetworkConfig().AttSubnetKey
|
||||
// FindPeersWithSubnet performs a network search for peers
|
||||
// subscribed to a particular subnet. Then we try to connect
|
||||
// with those peers.
|
||||
func (s *Service) FindPeersWithSubnet(index uint64) (bool, error) {
|
||||
func (s *Service) FindPeersWithSubnet(ctx context.Context, index uint64) (bool, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "p2p.FindPeersWithSubnet")
|
||||
defer span.End()
|
||||
|
||||
span.AddAttributes(trace.Int64Attribute("index", int64(index)))
|
||||
|
||||
if s.dv5Listener == nil {
|
||||
// return if discovery isn't set
|
||||
return false, nil
|
||||
@@ -25,6 +35,9 @@ func (s *Service) FindPeersWithSubnet(index uint64) (bool, error) {
|
||||
nodes := enode.ReadNodes(iterator, lookupLimit)
|
||||
exists := false
|
||||
for _, node := range nodes {
|
||||
if err := ctx.Err(); err != nil {
|
||||
return false, err
|
||||
}
|
||||
if node.IP() == nil {
|
||||
continue
|
||||
}
|
||||
@@ -55,7 +68,7 @@ func (s *Service) FindPeersWithSubnet(index uint64) (bool, error) {
|
||||
continue
|
||||
}
|
||||
s.peers.Add(node.Record(), info.ID, multiAddr, network.DirUnknown)
|
||||
if err := s.connectWithPeer(*info); err != nil {
|
||||
if err := s.connectWithPeer(ctx, *info); err != nil {
|
||||
log.WithError(err).Tracef("Could not connect with peer %s", info.String())
|
||||
continue
|
||||
}
|
||||
@@ -66,6 +79,10 @@ func (s *Service) FindPeersWithSubnet(index uint64) (bool, error) {
|
||||
return exists, nil
|
||||
}
|
||||
|
||||
func (s *Service) hasPeerWithSubnet(subnet uint64) bool {
|
||||
return len(s.Peers().SubscribedToSubnet(subnet)) > 0
|
||||
}
|
||||
|
||||
// Updates the service's discv5 listener record's attestation subnet
|
||||
// with a new value for a bitfield of subnets tracked. It also updates
|
||||
// the node's metadata by increasing the sequence number and the
|
||||
@@ -115,3 +132,14 @@ func retrieveBitvector(record *enr.Record) (bitfield.Bitvector64, error) {
|
||||
}
|
||||
return bitV, nil
|
||||
}
|
||||
|
||||
func (s *Service) subnetLocker(i uint64) *sync.RWMutex {
|
||||
s.subnetsLockLock.Lock()
|
||||
defer s.subnetsLockLock.Unlock()
|
||||
l, ok := s.subnetsLock[i]
|
||||
if !ok {
|
||||
l = &sync.RWMutex{}
|
||||
s.subnetsLock[i] = l
|
||||
}
|
||||
return l
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package p2p
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -100,11 +101,12 @@ func TestStartDiscV5_DiscoverPeersWithSubnets(t *testing.T) {
|
||||
time.Sleep(6 * discoveryWaitTime)
|
||||
|
||||
// look up 3 different subnets
|
||||
exists, err := s.FindPeersWithSubnet(1)
|
||||
ctx := context.Background()
|
||||
exists, err := s.FindPeersWithSubnet(ctx, 1)
|
||||
require.NoError(t, err)
|
||||
exists2, err := s.FindPeersWithSubnet(2)
|
||||
exists2, err := s.FindPeersWithSubnet(ctx, 2)
|
||||
require.NoError(t, err)
|
||||
exists3, err := s.FindPeersWithSubnet(3)
|
||||
exists3, err := s.FindPeersWithSubnet(ctx, 3)
|
||||
require.NoError(t, err)
|
||||
if !exists || !exists2 || !exists3 {
|
||||
t.Fatal("Peer with subnet doesn't exist")
|
||||
@@ -119,7 +121,7 @@ func TestStartDiscV5_DiscoverPeersWithSubnets(t *testing.T) {
|
||||
testService.RefreshENR()
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
exists, err = s.FindPeersWithSubnet(2)
|
||||
exists, err = s.FindPeersWithSubnet(ctx, 2)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, true, exists, "Peer with subnet doesn't exist")
|
||||
|
||||
@@ -41,7 +41,7 @@ func (m MockPeerManager) RefreshENR() {
|
||||
}
|
||||
|
||||
// FindPeersWithSubnet .
|
||||
func (m MockPeerManager) FindPeersWithSubnet(index uint64) (bool, error) {
|
||||
func (m MockPeerManager) FindPeersWithSubnet(ctx context.Context, index uint64) (bool, error) {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -313,7 +313,7 @@ func (p *TestP2P) Peers() *peers.Status {
|
||||
}
|
||||
|
||||
// FindPeersWithSubnet mocks the p2p func.
|
||||
func (p *TestP2P) FindPeersWithSubnet(index uint64) (bool, error) {
|
||||
func (p *TestP2P) FindPeersWithSubnet(ctx context.Context, index uint64) (bool, error) {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -214,7 +214,7 @@ func (s *Service) subscribeStaticWithSubnets(topic string, validator pubsub.Vali
|
||||
log.Debugf("No peers found subscribed to attestation gossip subnet with "+
|
||||
"committee index %d. Searching network for peers subscribed to the subnet.", i)
|
||||
go func(idx uint64) {
|
||||
_, err := s.p2p.FindPeersWithSubnet(idx)
|
||||
_, err := s.p2p.FindPeersWithSubnet(s.ctx, idx)
|
||||
if err != nil {
|
||||
log.Debugf("Could not search for peers: %v", err)
|
||||
return
|
||||
@@ -320,7 +320,7 @@ func (s *Service) subscribeAggregatorSubnet(subscriptions map[uint64]*pubsub.Sub
|
||||
log.Debugf("No peers found subscribed to attestation gossip subnet with "+
|
||||
"committee index %d. Searching network for peers subscribed to the subnet.", idx)
|
||||
go func(idx uint64) {
|
||||
_, err := s.p2p.FindPeersWithSubnet(idx)
|
||||
_, err := s.p2p.FindPeersWithSubnet(s.ctx, idx)
|
||||
if err != nil {
|
||||
log.Debugf("Could not search for peers: %v", err)
|
||||
return
|
||||
@@ -339,7 +339,7 @@ func (s *Service) lookupAttesterSubnets(digest [4]byte, idx uint64) {
|
||||
"committee index %d. Searching network for peers subscribed to the subnet.", idx)
|
||||
go func(idx uint64) {
|
||||
// perform a search for peers with the desired committee index.
|
||||
_, err := s.p2p.FindPeersWithSubnet(idx)
|
||||
_, err := s.p2p.FindPeersWithSubnet(s.ctx, idx)
|
||||
if err != nil {
|
||||
log.Debugf("Could not search for peers: %v", err)
|
||||
return
|
||||
|
||||
@@ -60,6 +60,7 @@ type Flags struct {
|
||||
InitSyncVerbose bool // InitSyncVerbose logs every processed block during initial syncing.
|
||||
EnableFinalizedDepositsCache bool // EnableFinalizedDepositsCache enables utilization of cached finalized deposits.
|
||||
EnableEth1DataMajorityVote bool // EnableEth1DataMajorityVote uses the Voting With The Majority algorithm to vote for eth1data.
|
||||
EnableAttBroadcastDiscoveryAttempts bool // EnableAttBroadcastDiscoveryAttempts allows the p2p service to attempt to ensure a subnet peer is present before broadcasting an attestation.
|
||||
|
||||
// DisableForkChoice disables using LMD-GHOST fork choice to update
|
||||
// the head of the chain based on attestations and instead accepts any valid received block
|
||||
@@ -254,6 +255,9 @@ func ConfigureBeaconChain(ctx *cli.Context) {
|
||||
log.Warn("Enabling eth1data majority vote")
|
||||
cfg.EnableEth1DataMajorityVote = true
|
||||
}
|
||||
if ctx.Bool(enableAttBroadcastDiscoveryAttempts.Name) {
|
||||
cfg.EnableAttBroadcastDiscoveryAttempts = true
|
||||
}
|
||||
Init(cfg)
|
||||
}
|
||||
|
||||
|
||||
@@ -166,12 +166,17 @@ var (
|
||||
Name: "disable-accounts-v2",
|
||||
Usage: "Disables usage of v2 for Prysm validator accounts",
|
||||
}
|
||||
enableAttBroadcastDiscoveryAttempts = &cli.BoolFlag{
|
||||
Name: "enable-att-broadcast-discovery-attempts",
|
||||
Usage: "Enable experimental attestation subnet discovery before broadcasting.",
|
||||
}
|
||||
)
|
||||
|
||||
// devModeFlags holds list of flags that are set when development mode is on.
|
||||
var devModeFlags = []cli.Flag{
|
||||
forceMaxCoverAttestationAggregation,
|
||||
batchBlockVerify,
|
||||
enableAttBroadcastDiscoveryAttempts,
|
||||
}
|
||||
|
||||
// Deprecated flags list.
|
||||
@@ -648,6 +653,7 @@ var BeaconChainFlags = append(deprecatedFlags, []cli.Flag{
|
||||
initSyncVerbose,
|
||||
enableFinalizedDepositsCache,
|
||||
enableEth1DataMajorityVote,
|
||||
enableAttBroadcastDiscoveryAttempts,
|
||||
}...)
|
||||
|
||||
// E2EBeaconChainFlags contains a list of the beacon chain feature flags to be tested in E2E.
|
||||
|
||||
Reference in New Issue
Block a user