finish draft

This commit is contained in:
aarshkshah1992
2025-11-04 20:08:11 +04:00
parent 7853cb9db0
commit ce3660d2e7
16 changed files with 341 additions and 259 deletions

View File

@@ -131,7 +131,8 @@ func (s *Service) internalBroadcastAttestation(ctx context.Context, subnet uint6
s.subnetLocker(subnet).Lock()
defer s.subnetLocker(subnet).Unlock()
if err := s.FindAndDialPeersWithSubnets(ctx, AttestationSubnetTopicFormat, forkDigest, minimumPeersPerSubnetForBroadcast, map[uint64]bool{subnet: true}); err != nil {
builder := func(idx uint64) string { return attestationToTopic(idx, forkDigest) + s.Encoding().ProtocolSuffix() }
if err := s.FindAndDialPeersWithSubnets(ctx, builder, minimumPeersPerSubnetForBroadcast, map[uint64]bool{subnet: true}); err != nil {
return errors.Wrap(err, "find peers with subnets")
}
@@ -187,7 +188,8 @@ func (s *Service) broadcastSyncCommittee(ctx context.Context, subnet uint64, sMs
if err := func() error {
s.subnetLocker(wrappedSubIdx).Lock()
defer s.subnetLocker(wrappedSubIdx).Unlock()
if err := s.FindAndDialPeersWithSubnets(ctx, SyncCommitteeSubnetTopicFormat, forkDigest, minimumPeersPerSubnetForBroadcast, map[uint64]bool{subnet: true}); err != nil {
builder := func(idx uint64) string { return syncCommitteeToTopic(idx, forkDigest) + s.Encoding().ProtocolSuffix() }
if err := s.FindAndDialPeersWithSubnets(ctx, builder, minimumPeersPerSubnetForBroadcast, map[uint64]bool{subnet: true}); err != nil {
return errors.Wrap(err, "find peers with subnets")
}
@@ -252,7 +254,8 @@ func (s *Service) internalBroadcastBlob(ctx context.Context, subnet uint64, blob
s.subnetLocker(wrappedSubIdx).Lock()
defer s.subnetLocker(wrappedSubIdx).Unlock()
if err := s.FindAndDialPeersWithSubnets(ctx, BlobSubnetTopicFormat, forkDigest, minimumPeersPerSubnetForBroadcast, map[uint64]bool{subnet: true}); err != nil {
builder := func(idx uint64) string { return blobSubnetToTopic(idx, forkDigest) + s.Encoding().ProtocolSuffix() }
if err := s.FindAndDialPeersWithSubnets(ctx, builder, minimumPeersPerSubnetForBroadcast, map[uint64]bool{subnet: true}); err != nil {
return errors.Wrap(err, "find peers with subnets")
}
@@ -392,7 +395,10 @@ func (s *Service) broadcastDataColumnSidecars(ctx context.Context, forkDigest [f
wrappedSubIdx := subnet + dataColumnSubnetVal
// Find peers if needed.
if err := s.findPeersIfNeeded(ctx, wrappedSubIdx, DataColumnSubnetTopicFormat, forkDigest, subnet); err != nil {
builder := func(idx uint64) string {
return dataColumnSubnetToTopic(idx, forkDigest) + s.Encoding().ProtocolSuffix()
}
if err := s.findPeersIfNeeded(ctx, wrappedSubIdx, builder, subnet); err != nil {
tracing.AnnotateError(span, err)
log.WithError(err).Error("Cannot find peers if needed")
return
@@ -487,8 +493,7 @@ func (s *Service) broadcastDataColumnSidecars(ctx context.Context, forkDigest [f
func (s *Service) findPeersIfNeeded(
ctx context.Context,
wrappedSubIdx uint64,
topicFormat string,
forkDigest [fieldparams.VersionLength]byte,
fullTopicForSubnet func(uint64) string,
subnet uint64,
) error {
// Sending a data column sidecar to only one peer is not ideal,
@@ -497,7 +502,7 @@ func (s *Service) findPeersIfNeeded(
defer s.subnetLocker(wrappedSubIdx).Unlock()
// No peers found, attempt to find peers with this subnet.
if err := s.FindAndDialPeersWithSubnets(ctx, topicFormat, forkDigest, minimumPeersPerSubnetForBroadcast, map[uint64]bool{subnet: true}); err != nil {
if err := s.FindAndDialPeersWithSubnets(ctx, fullTopicForSubnet, minimumPeersPerSubnetForBroadcast, map[uint64]bool{subnet: true}); err != nil {
return errors.Wrap(err, "find peers with subnet")
}

View File

@@ -5,7 +5,6 @@ import (
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/encoder"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
@@ -35,7 +34,6 @@ type (
PeersProvider
MetadataProvider
CustodyManager
SubnetFilter
}
// Accessor provides access to the Broadcaster, PeerManager and CustodyManager interfaces.
@@ -102,7 +100,7 @@ type (
NodeID() enode.ID
DiscoveryAddresses() ([]multiaddr.Multiaddr, error)
RefreshPersistentSubnets()
FindAndDialPeersWithSubnets(ctx context.Context, topicFormat string, digest [fieldparams.VersionLength]byte, minimumPeersPerSubnet int, subnets map[uint64]bool) error
FindAndDialPeersWithSubnets(ctx context.Context, fullTopicForSubnet func(uint64) string, minimumPeersPerSubnet int, subnets map[uint64]bool) error
AddPingMethod(reqFunc func(ctx context.Context, id peer.ID) error)
}
@@ -130,11 +128,4 @@ type (
UpdateEarliestAvailableSlot(earliestAvailableSlot primitives.Slot) error
CustodyGroupCountFromPeer(peer.ID) uint64
}
// SubnetFilter provides methods for extracting subnet information from a peer's ENR records.
SubnetFilter interface {
AttestationSubnets(nodeID enode.ID, node *enode.Node, record *enr.Record) (map[uint64]bool, error)
SyncSubnets(nodeID enode.ID, node *enode.Node, record *enr.Record) (map[uint64]bool, error)
DataColumnSubnets(nodeID enode.ID, node *enode.Node, record *enr.Record) (map[uint64]bool, error)
}
)

View File

@@ -558,14 +558,14 @@ func (s *Service) downscorePeer(peerID peer.ID, reason string) {
log.WithFields(logrus.Fields{"peerID": peerID, "reason": reason, "newScore": newScore}).Debug("Downscore peer")
}
func (s *Service) AttestationSubnets(nodeID enode.ID, node *enode.Node, record *enr.Record) (map[uint64]bool, error) {
func AttestationSubnets(nodeID enode.ID, node *enode.Node, record *enr.Record) (map[uint64]bool, error) {
return attestationSubnets(record)
}
func (s *Service) SyncSubnets(nodeID enode.ID, node *enode.Node, record *enr.Record) (map[uint64]bool, error) {
func SyncSubnets(nodeID enode.ID, node *enode.Node, record *enr.Record) (map[uint64]bool, error) {
return syncSubnets(record)
}
func (s *Service) DataColumnSubnets(nodeID enode.ID, node *enode.Node, record *enr.Record) (map[uint64]bool, error) {
func DataColumnSubnets(nodeID enode.ID, node *enode.Node, record *enr.Record) (map[uint64]bool, error) {
return dataColumnSubnets(nodeID, record)
}

View File

@@ -12,7 +12,6 @@ import (
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/helpers"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v6/cmd/beacon-chain/flags"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v6/consensus-types/wrapper"
@@ -84,8 +83,7 @@ func (s *Service) nodeFilter(topic string, indices map[uint64]int) (func(node *e
// In this case, the function returns an error.
func (s *Service) FindAndDialPeersWithSubnets(
ctx context.Context,
topicFormat string,
digest [fieldparams.VersionLength]byte,
fullTopicForSubnet func(uint64) string,
minimumPeersPerSubnet int,
subnets map[uint64]bool,
) error {
@@ -103,7 +101,7 @@ func (s *Service) FindAndDialPeersWithSubnets(
maxConcurrentDials = flags.Get().MaxConcurrentDials
}
defectiveSubnets := s.defectiveSubnets(topicFormat, digest, minimumPeersPerSubnet, subnets)
defectiveSubnets := s.defectiveSubnets(fullTopicForSubnet, minimumPeersPerSubnet, subnets)
for len(defectiveSubnets) > 0 {
// Stop the search/dialing loop if the context is canceled.
if err := ctx.Err(); err != nil {
@@ -114,7 +112,7 @@ func (s *Service) FindAndDialPeersWithSubnets(
ctx, cancel := context.WithTimeout(ctx, batchPeriod)
defer cancel()
peersToDial, err := s.findPeersWithSubnets(ctx, topicFormat, digest, minimumPeersPerSubnet, defectiveSubnets)
peersToDial, err := s.findPeersWithSubnets(ctx, fullTopicForSubnet, minimumPeersPerSubnet, defectiveSubnets)
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
return nil, errors.Wrap(err, "find peers with subnets")
}
@@ -129,7 +127,7 @@ func (s *Service) FindAndDialPeersWithSubnets(
// Dial new peers in batches.
s.dialPeers(s.ctx, maxConcurrentDials, peersToDial)
defectiveSubnets = s.defectiveSubnets(topicFormat, digest, minimumPeersPerSubnet, subnets)
defectiveSubnets = s.defectiveSubnets(fullTopicForSubnet, minimumPeersPerSubnet, subnets)
}
return nil
@@ -158,8 +156,7 @@ func updateDefectiveSubnets(
// It returns new peers found during the search.
func (s *Service) findPeersWithSubnets(
ctx context.Context,
topicFormat string,
digest [fieldparams.VersionLength]byte,
fullTopicForSubnet func(uint64) string,
minimumPeersPerSubnet int,
defectiveSubnetsOrigin map[uint64]int,
) ([]*enode.Node, error) {
@@ -181,7 +178,13 @@ func (s *Service) findPeersWithSubnets(
}()
// Retrieve the filter function that will be used to filter nodes based on the defective subnets.
filter, err := s.nodeFilter(topicFormat, defectiveSubnets)
// Use any subnet's full topic to infer the family type from the topic string.
var sampleTopic string
for k := range defectiveSubnets {
sampleTopic = fullTopicForSubnet(k)
break
}
filter, err := s.nodeFilter(sampleTopic, defectiveSubnets)
if err != nil {
return nil, errors.Wrap(err, "node filter")
}
@@ -225,8 +228,8 @@ func (s *Service) findPeersWithSubnets(
nodeSubnets, err := filter(node)
if err != nil {
log.WithError(err).WithFields(logrus.Fields{
"nodeID": node.ID(),
"topicFormat": topicFormat,
"nodeID": node.ID(),
"topic": sampleTopic,
}).Debug("Could not get needed subnets from peer")
continue
@@ -241,7 +244,7 @@ func (s *Service) findPeersWithSubnets(
nodeByNodeID[node.ID()] = node
updateDefectiveSubnets(nodeSubnets, defectiveSubnets)
filter, err = s.nodeFilter(topicFormat, defectiveSubnets)
filter, err = s.nodeFilter(sampleTopic, defectiveSubnets)
if err != nil {
return nil, errors.Wrap(err, "node filter")
}
@@ -258,14 +261,13 @@ func (s *Service) findPeersWithSubnets(
// defectiveSubnets returns a map of subnets that have fewer than the minimum peer count.
func (s *Service) defectiveSubnets(
topicFormat string,
digest [fieldparams.VersionLength]byte,
fullTopicForSubnet func(uint64) string,
minimumPeersPerSubnet int,
subnets map[uint64]bool,
) map[uint64]int {
missingCountPerSubnet := make(map[uint64]int, len(subnets))
for subnet := range subnets {
topic := fmt.Sprintf(topicFormat, digest, subnet) + s.Encoding().ProtocolSuffix()
topic := fullTopicForSubnet(subnet)
peers := s.pubsub.ListPeers(topic)
peerCount := len(peers)
if peerCount < minimumPeersPerSubnet {

View File

@@ -168,16 +168,19 @@ func TestStartDiscV5_FindAndDialPeersWithSubnet(t *testing.T) {
}()
subnets := map[uint64]bool{1: true, 2: true, 3: true}
defectiveSubnets := service.defectiveSubnets(AttestationSubnetTopicFormat, bootNodeForkDigest, minimumPeersPerSubnet, subnets)
builder := func(idx uint64) string {
return fmt.Sprintf(AttestationSubnetTopicFormat, bootNodeForkDigest, idx) + service.Encoding().ProtocolSuffix()
}
defectiveSubnets := service.defectiveSubnets(builder, minimumPeersPerSubnet, subnets)
require.Equal(t, subnetCount, len(defectiveSubnets))
ctxWithTimeOut, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
err = service.FindAndDialPeersWithSubnets(ctxWithTimeOut, AttestationSubnetTopicFormat, bootNodeForkDigest, minimumPeersPerSubnet, subnets)
err = service.FindAndDialPeersWithSubnets(ctxWithTimeOut, builder, minimumPeersPerSubnet, subnets)
require.NoError(t, err)
defectiveSubnets = service.defectiveSubnets(AttestationSubnetTopicFormat, bootNodeForkDigest, minimumPeersPerSubnet, subnets)
defectiveSubnets = service.defectiveSubnets(builder, minimumPeersPerSubnet, subnets)
require.Equal(t, 0, len(defectiveSubnets))
}
@@ -762,10 +765,12 @@ func TestFindPeersWithSubnets_NodeDeduplication(t *testing.T) {
ctxWithTimeout, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
defer cancel()
builder := func(idx uint64) string {
return fmt.Sprintf(AttestationSubnetTopicFormat, digest, idx) + s.Encoding().ProtocolSuffix()
}
result, err := s.findPeersWithSubnets(
ctxWithTimeout,
AttestationSubnetTopicFormat,
digest,
builder,
1,
tt.defectiveSubnets,
)
@@ -982,10 +987,12 @@ func TestFindPeersWithSubnets_FilterPeerRemoval(t *testing.T) {
ctxWithTimeout, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
defer cancel()
builder := func(idx uint64) string {
return fmt.Sprintf(AttestationSubnetTopicFormat, digest, idx) + s.Encoding().ProtocolSuffix()
}
result, err := s.findPeersWithSubnets(
ctxWithTimeout,
AttestationSubnetTopicFormat,
digest,
builder,
1,
tt.defectiveSubnets,
)
@@ -1105,10 +1112,12 @@ func TestFindPeersWithSubnets_received_bad_existing_node(t *testing.T) {
ctxWithTimeout, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
builder := func(idx uint64) string {
return fmt.Sprintf(AttestationSubnetTopicFormat, digest, idx) + service.Encoding().ProtocolSuffix()
}
result, err := service.findPeersWithSubnets(
ctxWithTimeout,
AttestationSubnetTopicFormat,
digest,
builder,
1,
map[uint64]int{1: 2}, // Need 2 peers for subnet 1
)

View File

@@ -5,7 +5,6 @@ import (
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/encoder"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
@@ -71,7 +70,7 @@ func (*FakeP2P) DiscoveryAddresses() ([]multiaddr.Multiaddr, error) {
}
// FindAndDialPeersWithSubnets mocks the p2p func.
func (*FakeP2P) FindAndDialPeersWithSubnets(ctx context.Context, topicFormat string, digest [fieldparams.VersionLength]byte, minimumPeersPerSubnet int, subnets map[uint64]bool) error {
func (*FakeP2P) FindAndDialPeersWithSubnets(ctx context.Context, fullTopicForSubnet func(uint64) string, minimumPeersPerSubnet int, subnets map[uint64]bool) error {
return nil
}

View File

@@ -4,7 +4,6 @@ import (
"context"
"errors"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/libp2p/go-libp2p/core/host"
@@ -58,7 +57,7 @@ func (m *MockPeerManager) DiscoveryAddresses() ([]multiaddr.Multiaddr, error) {
func (*MockPeerManager) RefreshPersistentSubnets() {}
// FindAndDialPeersWithSubnet .
func (*MockPeerManager) FindAndDialPeersWithSubnets(ctx context.Context, topicFormat string, digest [fieldparams.VersionLength]byte, minimumPeersPerSubnet int, subnets map[uint64]bool) error {
func (*MockPeerManager) FindAndDialPeersWithSubnets(ctx context.Context, fullTopicForSubnet func(uint64) string, minimumPeersPerSubnet int, subnets map[uint64]bool) error {
return nil
}

View File

@@ -15,7 +15,6 @@ import (
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/encoder"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers/scorers"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
@@ -183,11 +182,7 @@ func (p *TestP2P) ReceivePubSub(topic string, msg proto.Message) {
if _, err := p.Encoding().EncodeGossip(buf, castedMsg); err != nil {
p.t.Fatalf("Failed to encode message: %v", err)
}
digest, err := p.ForkDigest()
if err != nil {
p.t.Fatal(err)
}
topicHandle, err := ps.Join(fmt.Sprintf(topic, digest) + p.Encoding().ProtocolSuffix())
topicHandle, err := ps.Join(topic)
if err != nil {
p.t.Fatal(err)
}
@@ -420,7 +415,7 @@ func (p *TestP2P) Peers() *peers.Status {
}
// FindAndDialPeersWithSubnets mocks the p2p func.
func (*TestP2P) FindAndDialPeersWithSubnets(ctx context.Context, topicFormat string, digest [fieldparams.VersionLength]byte, minimumPeersPerSubnet int, subnets map[uint64]bool) error {
func (*TestP2P) FindAndDialPeersWithSubnets(ctx context.Context, fullTopicForSubnet func(uint64) string, minimumPeersPerSubnet int, subnets map[uint64]bool) error {
return nil
}

View File

@@ -2,22 +2,18 @@ package sync
import (
"context"
"fmt"
"sync"
"testing"
"time"
"github.com/OffchainLabs/prysm/v6/async/abool"
mockChain "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p"
p2ptest "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/startup"
mockSync "github.com/OffchainLabs/prysm/v6/beacon-chain/sync/initial-sync/testing"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v6/genesis"
"github.com/OffchainLabs/prysm/v6/testing/assert"
"github.com/OffchainLabs/prysm/v6/testing/require"
)
func defaultClockWithTimeAtEpoch(epoch primitives.Epoch) *startup.Clock {
@@ -50,6 +46,8 @@ func testForkWatcherService(t *testing.T, current primitives.Epoch) *Service {
return r
}
// TODO: Move to gossip controller test
/*
func TestRegisterSubscriptions_Idempotent(t *testing.T) {
params.SetupTestConfigCleanup(t)
genesis.StoreEmbeddedDuringTest(t, params.BeaconConfig().ConfigName)
@@ -71,9 +69,11 @@ func TestRegisterSubscriptions_Idempotent(t *testing.T) {
}
// the goal of this callback is just to assert that spawn is never called.
s.subscriptionSpawner = func(func()) { t.Error("registration routines spawned twice for the same digest") }
require.NoError(t, s.ensureRegistrationsForEpoch(fulu))
}
require.NoError(t, s.ensureRPCRegistrationsForEpoch(fulu))
}*/
// TODO: Move to gossip controller test
/*
func TestService_CheckForNextEpochFork(t *testing.T) {
closedChan := make(chan struct{})
close(closedChan)
@@ -121,8 +121,8 @@ func TestService_CheckForNextEpochFork(t *testing.T) {
expected := topic + s.cfg.p2p.Encoding().ProtocolSuffix()
assert.Equal(t, true, s.subHandler.topicExists(expected), fmt.Sprintf("subnet topic %s doesn't exist", expected))
}
*/
},
*/
/*},
},
{
name: "capella fork in the next epoch",
@@ -195,7 +195,7 @@ func TestService_CheckForNextEpochFork(t *testing.T) {
current := tt.epochAtRegistration(tt.forkEpoch)
s := testForkWatcherService(t, current)
wg := attachSpawner(s)
require.NoError(t, s.ensureRegistrationsForEpoch(s.cfg.clock.CurrentEpoch()))
require.NoError(t, s.ensureRPCRegistrationsForEpoch(s.cfg.clock.CurrentEpoch()))
wg.Wait()
tt.checkRegistration(t, s)
@@ -217,18 +217,18 @@ func TestService_CheckForNextEpochFork(t *testing.T) {
// Move the clock to just before the next fork epoch and ensure deregistration is correct
wg = attachSpawner(s)
s.cfg.clock = defaultClockWithTimeAtEpoch(tt.nextForkEpoch - 1)
require.NoError(t, s.ensureRegistrationsForEpoch(s.cfg.clock.CurrentEpoch()))
require.NoError(t, s.ensureRPCRegistrationsForEpoch(s.cfg.clock.CurrentEpoch()))
wg.Wait()
require.NoError(t, s.ensureDeregistrationForEpoch(tt.nextForkEpoch))
require.NoError(t, s.ensureRPCDeregistrationForEpoch(tt.nextForkEpoch))
assert.Equal(t, true, s.subHandler.digestExists(digest))
// deregister as if it is the epoch after the next fork epoch
require.NoError(t, s.ensureDeregistrationForEpoch(tt.nextForkEpoch+1))
require.NoError(t, s.ensureRPCDeregistrationForEpoch(tt.nextForkEpoch+1))
assert.Equal(t, false, s.subHandler.digestExists(digest))
assert.Equal(t, true, s.subHandler.digestExists(nextDigest))
})
}
}
}*/
func attachSpawner(s *Service) *sync.WaitGroup {
wg := new(sync.WaitGroup)

View File

@@ -44,6 +44,10 @@ func NewGossipsubController(ctx context.Context, s *Service) *GossipsubControlle
func (g *GossipsubController) Start() {
currentEpoch := g.syncService.cfg.clock.CurrentEpoch()
if err := g.syncService.waitForInitialSync(g.ctx); err != nil {
log.WithError(err).Debug("Context cancelled while waiting for initial sync, not starting GossipsubController")
return
}
g.updateActiveTopicFamilies(currentEpoch)
g.wg.Go(func() { g.controlLoop() })

View File

@@ -2,6 +2,7 @@ package sync
import (
"context"
"fmt"
"sync"
"testing"
"time"
@@ -16,6 +17,7 @@ import (
"github.com/OffchainLabs/prysm/v6/beacon-chain/startup"
state_native "github.com/OffchainLabs/prysm/v6/beacon-chain/state/state-native"
mockSync "github.com/OffchainLabs/prysm/v6/beacon-chain/sync/initial-sync/testing"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
leakybucket "github.com/OffchainLabs/prysm/v6/container/leaky-bucket"
"github.com/OffchainLabs/prysm/v6/crypto/bls"
@@ -67,8 +69,9 @@ func TestSyncHandlers_WaitToSync(t *testing.T) {
chainStarted: abool.New(),
clockWaiter: gs,
}
r.gossipsubController = NewGossipsubController(context.Background(), &r)
topic := "/eth2/%x/beacon_block"
topicFmt := "/eth2/%x/beacon_block"
go r.startDiscoveryAndSubscriptions()
time.Sleep(100 * time.Millisecond)
@@ -82,7 +85,10 @@ func TestSyncHandlers_WaitToSync(t *testing.T) {
msg := util.NewBeaconBlock()
msg.Block.ParentRoot = util.Random32Bytes(t)
msg.Signature = sk.Sign([]byte("data")).Marshal()
p2p.ReceivePubSub(topic, msg)
// Build full topic using current fork digest
nse := params.GetNetworkScheduleEntry(r.cfg.clock.CurrentEpoch())
fullTopic := fmt.Sprintf(topicFmt, nse.ForkDigest) + p2p.Encoding().ProtocolSuffix()
p2p.ReceivePubSub(fullTopic, msg)
// wait for chainstart to be sent
time.Sleep(400 * time.Millisecond)
require.Equal(t, true, r.chainStarted.IsSet(), "Did not receive chain start event.")
@@ -137,6 +143,7 @@ func TestSyncHandlers_WaitTillSynced(t *testing.T) {
clockWaiter: gs,
initialSyncComplete: make(chan struct{}),
}
r.gossipsubController = NewGossipsubController(context.Background(), &r)
r.initCaches()
var vr [32]byte
@@ -169,14 +176,16 @@ func TestSyncHandlers_WaitTillSynced(t *testing.T) {
// Save block into DB so that validateBeaconBlockPubSub() process gets short cut.
util.SaveBlock(t, ctx, r.cfg.beaconDB, msg)
topic := "/eth2/%x/beacon_block"
p2p.ReceivePubSub(topic, msg)
topicFmt := "/eth2/%x/beacon_block"
nse := params.GetNetworkScheduleEntry(r.cfg.clock.CurrentEpoch())
fullTopic := fmt.Sprintf(topicFmt, nse.ForkDigest) + p2p.Encoding().ProtocolSuffix()
p2p.ReceivePubSub(fullTopic, msg)
assert.Equal(t, 0, len(blockChan), "block was received by sync service despite not being fully synced")
close(r.initialSyncComplete)
<-syncCompleteCh
p2p.ReceivePubSub(topic, msg)
p2p.ReceivePubSub(fullTopic, msg)
select {
case <-blockChan:
@@ -206,6 +215,7 @@ func TestSyncService_StopCleanly(t *testing.T) {
clockWaiter: gs,
initialSyncComplete: make(chan struct{}),
}
r.gossipsubController = NewGossipsubController(context.Background(), &r)
markInitSyncComplete(t, &r)
go r.startDiscoveryAndSubscriptions()
@@ -265,6 +275,7 @@ func TestService_Stop_SendsGoodbyeMessages(t *testing.T) {
cancel: cancel,
rateLimiter: newRateLimiter(p1),
}
r.gossipsubController = NewGossipsubController(ctx, r)
// Initialize context map for RPC
ctxMap, err := ContextByteVersionsForValRoot(chain.ValidatorsRoot)
@@ -343,6 +354,7 @@ func TestService_Stop_TimeoutHandling(t *testing.T) {
cancel: cancel,
rateLimiter: newRateLimiter(p1),
}
r.gossipsubController = NewGossipsubController(ctx, r)
// Initialize context map for RPC
ctxMap, err := ContextByteVersionsForValRoot(chain.ValidatorsRoot)
@@ -404,6 +416,7 @@ func TestService_Stop_ConcurrentGoodbyeMessages(t *testing.T) {
cancel: cancel,
rateLimiter: newRateLimiter(p1),
}
r.gossipsubController = NewGossipsubController(ctx, r)
// Initialize context map for RPC
ctxMap, err := ContextByteVersionsForValRoot(chain.ValidatorsRoot)

View File

@@ -38,53 +38,27 @@ const pubsubMessageTimeout = 30 * time.Second
var errInvalidDigest = errors.New("invalid digest")
// subscribeParameters holds the parameters that are needed to construct a set of subscriptions topics for a given
// set of gossipsub subnets.
type subscribeParameters struct {
topicFormat string
validate wrappedVal
handle subHandler
nse params.NetworkScheduleEntry
// getSubnetsToJoin is a function that returns all subnets the node should join.
getSubnetsToJoin func(currentSlot primitives.Slot) map[uint64]bool
// getSubnetsRequiringPeers is a function that returns all subnets that require peers to be found
// but for which no subscriptions are needed.
getSubnetsRequiringPeers func(currentSlot primitives.Slot) map[uint64]bool
}
// shortTopic is a less verbose version of topic strings used for logging.
func (p subscribeParameters) shortTopic() string {
short := p.topicFormat
fmtLen := len(short)
if fmtLen >= 3 && short[fmtLen-3:] == "_%d" {
short = short[:fmtLen-3]
}
return fmt.Sprintf(short, p.nse.ForkDigest)
}
func (p subscribeParameters) logFields() logrus.Fields {
func familyLogFields(tf GossipsubTopicFamilyWithDynamicSubnets) logrus.Fields {
nse := tf.NetworkScheduleEntry()
return logrus.Fields{
"topic": p.shortTopic(),
"topicFamily": fmt.Sprintf("%T", tf),
"digest": nse.ForkDigest,
"forkEpoch": nse.Epoch,
}
}
// fullTopic is the fully qualified topic string, given to gossipsub.
func (p subscribeParameters) fullTopic(subnet uint64, suffix string) string {
return fmt.Sprintf(p.topicFormat, p.nse.ForkDigest, subnet) + suffix
}
// subnetTracker keeps track of which subnets we are subscribed to, out of the set of
// possible subnets described by a `subscribeParameters`.
// subnetTracker keeps track of which subnets we are subscribed to for a given
// dynamic topic family (attestations, sync-committee, data-column, etc.).
type subnetTracker struct {
subscribeParameters
family GossipsubTopicFamilyWithDynamicSubnets
mu sync.RWMutex
subscriptions map[uint64]*pubsub.Subscription
}
func newSubnetTracker(p subscribeParameters) *subnetTracker {
func newSubnetTracker(tf GossipsubTopicFamilyWithDynamicSubnets) *subnetTracker {
return &subnetTracker{
subscribeParameters: p,
subscriptions: make(map[uint64]*pubsub.Subscription),
family: tf,
subscriptions: make(map[uint64]*pubsub.Subscription),
}
}
@@ -181,50 +155,12 @@ func (s *Service) activeSyncSubnetIndices(currentSlot primitives.Slot) map[uint6
return mapFromSlice(subscriptions)
}
// spawn allows the Service to use a custom function for launching goroutines.
// This is useful in tests where we can set spawner to a sync.WaitGroup and
// wait for the spawned goroutines to finish.
func (s *Service) spawn(f func()) {
if s.subscriptionSpawner != nil {
s.subscriptionSpawner(f)
} else {
go f()
}
}
func (s *Service) subscriptionRequestExpired(nse params.NetworkScheduleEntry) bool {
next := params.NextNetworkScheduleEntry(nse.Epoch)
return next.Epoch != nse.Epoch && s.cfg.clock.CurrentEpoch() > next.Epoch
}
func (s *Service) subscribeLogFields(topic string, nse params.NetworkScheduleEntry) logrus.Fields {
return logrus.Fields{
"topic": topic,
"digest": nse.ForkDigest,
"forkEpoch": nse.Epoch,
"currentEpoch": s.cfg.clock.CurrentEpoch(),
}
}
// subscribe to a given topic with a given validator and subscription handler.
// The base protobuf message is used to initialize new messages for decoding.
func (s *Service) subscribe(tf GossipsubTopicFamilyWithoutDynamicSubnets) {
topic := tf.GetFullTopicString()
nse := tf.NetworkScheduleEntry()
if err := s.waitForInitialSync(s.ctx); err != nil {
log.WithFields(s.subscribeLogFields(topic, nse)).WithError(err).Debug("Context cancelled while waiting for initial sync, not subscribing to topic")
return
}
// Check if this subscribe request is still valid - we may have crossed another fork epoch while waiting for initial sync.
if s.subscriptionRequestExpired(nse) {
// If we are already past the next fork epoch, do not subscribe to this topic.
log.WithFields(s.subscribeLogFields(topic, nse)).Debug("Not subscribing to topic as we are already past the next fork epoch")
return
}
s.subscribeWithBase(topic, tf.Validator(), tf.Handler())
}
func (s *Service) subscribeWithBase(topic string, validator wrappedVal, handle subHandler) *pubsub.Subscription {
func (s *Service) subscribe(topic string, validator wrappedVal, handle subHandler) *pubsub.Subscription {
log := log.WithField("topic", topic)
// Do not resubscribe already seen subscriptions.
@@ -387,36 +323,38 @@ func (s *Service) wrapAndReportValidation(topic string, v wrappedVal) (string, p
func (s *Service) pruneNotWanted(t *subnetTracker, wantedSubnets map[uint64]bool) {
for _, subnet := range t.unwanted(wantedSubnets) {
t.cancelSubscription(subnet)
s.unSubscribeFromTopic(t.fullTopic(subnet, s.cfg.p2p.Encoding().ProtocolSuffix()))
s.unSubscribeFromTopic(t.family.GetFullTopicString(subnet))
}
}
// subscribeWithParameters subscribes to a list of subnets.
func (s *Service) subscribeWithParameters(p subscribeParameters) {
// subscribeToDynamicSubnetFamily subscribes to a list of subnets.
func (s *Service) subscribeToDynamicSubnetFamily(tf GossipsubTopicFamilyWithDynamicSubnets) *subnetTracker {
tracker := newSubnetTracker(tf)
go s.subscribeToSubnets(tf, tracker)
return tracker
}
func (s *Service) subscribeToSubnets(tf GossipsubTopicFamilyWithDynamicSubnets, tracker *subnetTracker) {
ctx, cancel := context.WithCancel(s.ctx)
defer cancel()
tracker := newSubnetTracker(p)
go s.ensurePeers(ctx, tracker)
go s.logMinimumPeersPerSubnet(ctx, p)
go s.logMinimumPeersPerSubnet(ctx, tf)
if err := s.waitForInitialSync(ctx); err != nil {
log.WithFields(p.logFields()).WithError(err).Debug("Could not subscribe to subnets as initial sync failed")
return
}
s.trySubscribeSubnets(tracker)
slotTicker := slots.NewSlotTicker(s.cfg.clock.GenesisTime(), params.BeaconConfig().SecondsPerSlot)
defer slotTicker.Done()
for {
select {
case <-slotTicker.C():
// Check if this subscribe request is still valid - we may have crossed another fork epoch while waiting for initial sync.
if s.subscriptionRequestExpired(p.nse) {
if s.subscriptionRequestExpired(tf.NetworkScheduleEntry()) {
// If we are already past the next fork epoch, do not subscribe to this topic.
log.WithFields(logrus.Fields{
"topic": p.shortTopic(),
"digest": p.nse.ForkDigest,
"epoch": p.nse.Epoch,
"topicFamily": fmt.Sprintf("%T", tf),
"digest": tf.NetworkScheduleEntry().ForkDigest,
"epoch": tf.NetworkScheduleEntry().Epoch,
"currentEpoch": s.cfg.clock.CurrentEpoch(),
}).Debug("Exiting topic subnet subscription loop")
return
@@ -431,12 +369,11 @@ func (s *Service) subscribeWithParameters(p subscribeParameters) {
// trySubscribeSubnets attempts to subscribe to any missing subnets that we should be subscribed to.
// Only if initial sync is complete.
func (s *Service) trySubscribeSubnets(t *subnetTracker) {
subnetsToJoin := t.getSubnetsToJoin(s.cfg.clock.CurrentSlot())
subnetsToJoin := t.family.GetSubnetsToJoin(s.cfg.clock.CurrentSlot())
s.pruneNotWanted(t, subnetsToJoin)
for _, subnet := range t.missing(subnetsToJoin) {
// TODO: subscribeWithBase appends the protocol suffix, other methods don't. Make this consistent.
topic := t.fullTopic(subnet, "")
t.track(subnet, s.subscribeWithBase(topic, t.validate, t.handle))
topic := t.family.GetFullTopicString(subnet)
t.track(subnet, s.subscribe(topic, t.family.Validator(), t.family.Handler()))
}
}
@@ -459,17 +396,18 @@ func (s *Service) ensurePeers(ctx context.Context, tracker *subnetTracker) {
func (s *Service) tryEnsurePeers(ctx context.Context, tracker *subnetTracker) {
timeout := (time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second) - 100*time.Millisecond
minPeers := flags.Get().MinimumPeersPerSubnet
neededSubnets := computeAllNeededSubnets(s.cfg.clock.CurrentSlot(), tracker.getSubnetsToJoin, tracker.getSubnetsRequiringPeers)
neededSubnets := computeAllNeededSubnets(s.cfg.clock.CurrentSlot(), tracker.family)
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
err := s.cfg.p2p.FindAndDialPeersWithSubnets(ctx, tracker.topicFormat, tracker.nse.ForkDigest, minPeers, neededSubnets)
builder := func(idx uint64) string { return tracker.family.GetFullTopicString(idx) }
err := s.cfg.p2p.FindAndDialPeersWithSubnets(ctx, builder, minPeers, neededSubnets)
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
log.WithFields(tracker.logFields()).WithError(err).Debug("Could not find peers with subnets")
log.WithFields(familyLogFields(tracker.family)).WithError(err).Debug("Could not find peers with subnets")
}
}
func (s *Service) logMinimumPeersPerSubnet(ctx context.Context, p subscribeParameters) {
logFields := p.logFields()
func (s *Service) logMinimumPeersPerSubnet(ctx context.Context, tf GossipsubTopicFamilyWithDynamicSubnets) {
logFields := familyLogFields(tf)
minimumPeersPerSubnet := flags.Get().MinimumPeersPerSubnet
// Warn the user if we are not subscribed to enough peers in the subnets.
log := log.WithField("minimum", minimumPeersPerSubnet)
@@ -480,12 +418,12 @@ func (s *Service) logMinimumPeersPerSubnet(ctx context.Context, p subscribeParam
select {
case <-logTicker.C:
currentSlot := s.cfg.clock.CurrentSlot()
subnetsToFindPeersIndex := computeAllNeededSubnets(currentSlot, p.getSubnetsToJoin, p.getSubnetsRequiringPeers)
subnetsToFindPeersIndex := computeAllNeededSubnets(currentSlot, tf)
isSubnetWithMissingPeers := false
// Find new peers for wanted subnets if needed.
for index := range subnetsToFindPeersIndex {
topic := fmt.Sprintf(p.topicFormat, p.nse.ForkDigest, index)
topic := tf.GetFullTopicString(index)
// Check if we have enough peers in the subnet. Skip if we do.
if count := s.connectedPeersCount(topic); count < minimumPeersPerSubnet {
@@ -521,9 +459,8 @@ func (s *Service) unSubscribeFromTopic(topic string) {
}
// connectedPeersCount counts how many peer for a given topic are connected to the node.
func (s *Service) connectedPeersCount(subnetTopic string) int {
topic := subnetTopic + s.cfg.p2p.Encoding().ProtocolSuffix()
peersWithSubnet := s.cfg.p2p.PubSub().ListPeers(topic)
func (s *Service) connectedPeersCount(fullTopic string) int {
peersWithSubnet := s.cfg.p2p.PubSub().ListPeers(fullTopic)
return len(peersWithSubnet)
}
@@ -673,17 +610,13 @@ func isDigestValid(digest [4]byte, clock *startup.Clock) (bool, error) {
// and the subnets for which we want to find peers.
func computeAllNeededSubnets(
currentSlot primitives.Slot,
getSubnetsToJoin func(currentSlot primitives.Slot) map[uint64]bool,
getSubnetsRequiringPeers func(currentSlot primitives.Slot) map[uint64]bool,
dtf GossipsubTopicFamilyWithDynamicSubnets,
) map[uint64]bool {
// Retrieve the subnets we want to join.
subnetsToJoin := getSubnetsToJoin(currentSlot)
subnetsToJoin := dtf.GetSubnetsToJoin(currentSlot)
// Retrieve the subnets we want to find peers into.
subnetsRequiringPeers := make(map[uint64]bool)
if getSubnetsRequiringPeers != nil {
subnetsRequiringPeers = getSubnetsRequiringPeers(currentSlot)
}
subnetsRequiringPeers := dtf.GetSubnetsForBroadcast(currentSlot)
// Combine the two maps to get all needed subnets.
neededSubnets := make(map[uint64]bool, len(subnetsToJoin)+len(subnetsRequiringPeers))

View File

@@ -30,6 +30,7 @@ import (
"github.com/OffchainLabs/prysm/v6/testing/require"
"github.com/OffchainLabs/prysm/v6/testing/util"
"github.com/OffchainLabs/prysm/v6/time/slots"
"github.com/ethereum/go-ethereum/p2p/enode"
pubsub "github.com/libp2p/go-libp2p-pubsub"
pubsubpb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/libp2p/go-libp2p/core/peer"
@@ -37,6 +38,111 @@ import (
"google.golang.org/protobuf/proto"
)
// testStaticFamily implements a minimal static topic family for tests.
type testStaticFamily struct {
nse params.NetworkScheduleEntry
topicFmt string
protocolSuffix string
validator wrappedVal
handler subHandler
}
func (t testStaticFamily) Validator() wrappedVal {
return t.validator
}
func (t testStaticFamily) Handler() subHandler {
return t.handler
}
func (t testStaticFamily) NetworkScheduleEntry() params.NetworkScheduleEntry {
return t.nse
}
func (t testStaticFamily) GetFullTopicString() string {
return fmt.Sprintf(t.topicFmt, t.nse.ForkDigest) + t.protocolSuffix
}
func (t testStaticFamily) Subscribe() {}
func (t testStaticFamily) Unsubscribe() {}
func makeTestFamily(nse params.NetworkScheduleEntry, topicFmt, suffix string, validator wrappedVal, handler subHandler) testStaticFamily {
return testStaticFamily{
nse: nse,
topicFmt: topicFmt,
protocolSuffix: suffix,
validator: validator,
handler: handler,
}
}
func makeFullTopic(topicFmt string, nse params.NetworkScheduleEntry, suffix string) string {
return fmt.Sprintf(topicFmt, nse.ForkDigest) + suffix
}
// testDynamicFamily implements a minimal dynamic topic family for tests.
type testDynamicFamily struct {
nse params.NetworkScheduleEntry
topicFmt string
protocolSuffix string
validator wrappedVal
handler subHandler
subnetsToJoin func(primitives.Slot) map[uint64]bool
subnetsForCast func(primitives.Slot) map[uint64]bool
}
func (t *testDynamicFamily) Validator() wrappedVal {
return t.validator
}
func (t *testDynamicFamily) Handler() subHandler {
return t.handler
}
func (t *testDynamicFamily) NetworkScheduleEntry() params.NetworkScheduleEntry {
return t.nse
}
func (t *testDynamicFamily) GetFullTopicString(subnet uint64) string {
return fmt.Sprintf(t.topicFmt, t.nse.ForkDigest, subnet) + t.protocolSuffix
}
func (t *testDynamicFamily) GetSubnetsToJoin(slot primitives.Slot) map[uint64]bool {
if t.subnetsToJoin != nil {
return t.subnetsToJoin(slot)
}
return nil
}
func (t *testDynamicFamily) GetSubnetsForBroadcast(slot primitives.Slot) map[uint64]bool {
if t.subnetsForCast != nil {
return t.subnetsForCast(slot)
}
return nil
}
func (t *testDynamicFamily) Subscribe() {}
func (t *testDynamicFamily) Unsubscribe() {}
func (t *testDynamicFamily) GetTopicsForNode(_ *enode.Node) ([]string, error) {
return nil, nil
}
func makeTestDynamicFamily(nse params.NetworkScheduleEntry, topicFmt, suffix string, validator wrappedVal, handler subHandler,
getJoin func(primitives.Slot) map[uint64]bool, getCast func(primitives.Slot) map[uint64]bool) *testDynamicFamily {
return &testDynamicFamily{
nse: nse,
topicFmt: topicFmt,
protocolSuffix: suffix,
validator: validator,
handler: handler,
subnetsToJoin: getJoin,
subnetsForCast: getCast,
}
}
func TestSubscribe_ReceivesValidMessage(t *testing.T) {
p2pService := p2ptest.NewTestP2P(t)
gt := time.Now()
@@ -64,7 +170,7 @@ func TestSubscribe_ReceivesValidMessage(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
r.subscribe(topic, r.noopValidator, func(_ context.Context, msg proto.Message) error {
tf := makeTestFamily(nse, topic, r.cfg.p2p.Encoding().ProtocolSuffix(), r.noopValidator, func(_ context.Context, msg proto.Message) error {
m, ok := msg.(*pb.SignedVoluntaryExit)
assert.Equal(t, true, ok, "Object is not of type *pb.SignedVoluntaryExit")
if m.Exit == nil || m.Exit.Epoch != 55 {
@@ -72,10 +178,10 @@ func TestSubscribe_ReceivesValidMessage(t *testing.T) {
}
wg.Done()
return nil
}, nse)
})
r.subscribe(tf.GetFullTopicString(), tf.Validator(), tf.Handler())
r.markForChainStart()
p2pService.ReceivePubSub(topic, &pb.SignedVoluntaryExit{Exit: &pb.VoluntaryExit{Epoch: 55}, Signature: make([]byte, fieldparams.BLSSignatureLength)})
p2pService.ReceivePubSub(tf.GetFullTopicString(), &pb.SignedVoluntaryExit{Exit: &pb.VoluntaryExit{Epoch: 55}, Signature: make([]byte, fieldparams.BLSSignatureLength)})
if util.WaitTimeout(&wg, time.Second) {
t.Fatal("Did not receive PubSub in 1 second")
@@ -110,12 +216,10 @@ func TestSubscribe_UnsubscribeTopic(t *testing.T) {
p2pService.Digest = nse.ForkDigest
topic := "/eth2/%x/voluntary_exit"
r.subscribe(topic, r.noopValidator, func(_ context.Context, msg proto.Message) error {
return nil
}, nse)
tf := makeTestFamily(nse, topic, r.cfg.p2p.Encoding().ProtocolSuffix(), r.noopValidator, func(_ context.Context, msg proto.Message) error { return nil })
r.subscribe(tf.GetFullTopicString(), tf.Validator(), tf.Handler())
r.markForChainStart()
fullTopic := fmt.Sprintf(topic, p2pService.Digest) + p2pService.Encoding().ProtocolSuffix()
fullTopic := tf.GetFullTopicString()
assert.Equal(t, true, r.subHandler.topicExists(fullTopic))
topics := p2pService.PubSub().GetTopics()
assert.Equal(t, fullTopic, topics[0])
@@ -162,11 +266,12 @@ func TestSubscribe_ReceivesAttesterSlashing(t *testing.T) {
wg.Add(1)
nse := params.GetNetworkScheduleEntry(r.cfg.clock.CurrentEpoch())
p2pService.Digest = nse.ForkDigest
r.subscribe(topic, r.noopValidator, func(ctx context.Context, msg proto.Message) error {
tf := makeTestFamily(nse, topic, r.cfg.p2p.Encoding().ProtocolSuffix(), r.noopValidator, func(ctx context.Context, msg proto.Message) error {
require.NoError(t, r.attesterSlashingSubscriber(ctx, msg))
wg.Done()
return nil
}, nse)
})
r.subscribe(tf.GetFullTopicString(), tf.Validator(), tf.Handler())
beaconState, privKeys := util.DeterministicGenesisState(t, 64)
chainService.State = beaconState
r.markForChainStart()
@@ -178,7 +283,7 @@ func TestSubscribe_ReceivesAttesterSlashing(t *testing.T) {
require.NoError(t, err, "Error generating attester slashing")
err = r.cfg.beaconDB.SaveState(ctx, beaconState, bytesutil.ToBytes32(attesterSlashing.FirstAttestation().GetData().BeaconBlockRoot))
require.NoError(t, err)
p2pService.ReceivePubSub(topic, attesterSlashing)
p2pService.ReceivePubSub(tf.GetFullTopicString(), attesterSlashing)
if util.WaitTimeout(&wg, time.Second) {
t.Fatal("Did not receive PubSub in 1 second")
@@ -217,11 +322,12 @@ func TestSubscribe_ReceivesProposerSlashing(t *testing.T) {
params.OverrideBeaconConfig(params.MainnetConfig())
nse := params.GetNetworkScheduleEntry(r.cfg.clock.CurrentEpoch())
p2pService.Digest = nse.ForkDigest
r.subscribe(topic, r.noopValidator, func(ctx context.Context, msg proto.Message) error {
tf := makeTestFamily(nse, topic, r.cfg.p2p.Encoding().ProtocolSuffix(), r.noopValidator, func(ctx context.Context, msg proto.Message) error {
require.NoError(t, r.proposerSlashingSubscriber(ctx, msg))
wg.Done()
return nil
}, nse)
})
r.subscribe(tf.GetFullTopicString(), tf.Validator(), tf.Handler())
beaconState, privKeys := util.DeterministicGenesisState(t, 64)
chainService.State = beaconState
r.markForChainStart()
@@ -232,7 +338,7 @@ func TestSubscribe_ReceivesProposerSlashing(t *testing.T) {
)
require.NoError(t, err, "Error generating proposer slashing")
p2pService.ReceivePubSub(topic, proposerSlashing)
p2pService.ReceivePubSub(tf.GetFullTopicString(), proposerSlashing)
if util.WaitTimeout(&wg, time.Second) {
t.Fatal("Did not receive PubSub in 1 second")
@@ -266,12 +372,13 @@ func TestSubscribe_HandlesPanic(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
r.subscribe(topic, r.noopValidator, func(_ context.Context, msg proto.Message) error {
tf := makeTestFamily(nse, topic, p.Encoding().ProtocolSuffix(), r.noopValidator, func(_ context.Context, msg proto.Message) error {
defer wg.Done()
panic("bad")
}, nse)
})
r.subscribe(tf.GetFullTopicString(), tf.Validator(), tf.Handler())
r.markForChainStart()
p.ReceivePubSub(topic, &pb.SignedVoluntaryExit{Exit: &pb.VoluntaryExit{Epoch: 55}, Signature: make([]byte, fieldparams.BLSSignatureLength)})
p.ReceivePubSub(tf.GetFullTopicString(), &pb.SignedVoluntaryExit{Exit: &pb.VoluntaryExit{Epoch: 55}, Signature: make([]byte, fieldparams.BLSSignatureLength)})
if util.WaitTimeout(&wg, time.Second) {
t.Fatal("Did not receive PubSub in 1 second")
@@ -297,15 +404,12 @@ func TestRevalidateSubscription_CorrectlyFormatsTopic(t *testing.T) {
}
nse := params.GetNetworkScheduleEntry(r.cfg.clock.CurrentEpoch())
params := subscribeParameters{
topicFormat: "/eth2/testing/%#x/committee%d",
nse: nse,
}
tracker := newSubnetTracker(params)
tfDyn := makeTestDynamicFamily(nse, "/eth2/testing/%#x/committee%d", r.cfg.p2p.Encoding().ProtocolSuffix(), r.noopValidator, nil, nil, nil)
tracker := newSubnetTracker(tfDyn)
// committee index 1
c1 := uint64(1)
fullTopic := params.fullTopic(c1, r.cfg.p2p.Encoding().ProtocolSuffix())
fullTopic := tfDyn.GetFullTopicString(c1)
_, topVal := r.wrapAndReportValidation(fullTopic, r.noopValidator)
require.NoError(t, r.cfg.p2p.PubSub().RegisterTopicValidator(fullTopic, topVal))
sub1, err := r.cfg.p2p.SubscribeToTopic(fullTopic)
@@ -314,7 +418,7 @@ func TestRevalidateSubscription_CorrectlyFormatsTopic(t *testing.T) {
// committee index 2
c2 := uint64(2)
fullTopic = params.fullTopic(c2, r.cfg.p2p.Encoding().ProtocolSuffix())
fullTopic = tfDyn.GetFullTopicString(c2)
_, topVal = r.wrapAndReportValidation(fullTopic, r.noopValidator)
err = r.cfg.p2p.PubSub().RegisterTopicValidator(fullTopic, topVal)
require.NoError(t, err)
@@ -552,11 +656,8 @@ func TestSubscribeWithSyncSubnets_DynamicOK(t *testing.T) {
currEpoch := slots.ToEpoch(slot)
cache.SyncSubnetIDs.AddSyncCommitteeSubnets([]byte("pubkey"), currEpoch, []uint64{0, 1}, 10*time.Second)
nse := params.GetNetworkScheduleEntry(r.cfg.clock.CurrentEpoch())
go r.subscribeWithParameters(subscribeParameters{
topicFormat: p2p.SyncCommitteeSubnetTopicFormat,
nse: nse,
getSubnetsToJoin: r.activeSyncSubnetIndices,
})
tfDyn := makeTestDynamicFamily(nse, p2p.SyncCommitteeSubnetTopicFormat, r.cfg.p2p.Encoding().ProtocolSuffix(), r.noopValidator, nil, r.activeSyncSubnetIndices, nil)
go r.subscribeToDynamicSubnetFamily(tfDyn)
time.Sleep(2 * time.Second)
assert.Equal(t, 2, len(r.cfg.p2p.PubSub().GetTopics()))
topicMap := map[string]bool{}
@@ -601,11 +702,8 @@ func TestSubscribeWithSyncSubnets_DynamicSwitchFork(t *testing.T) {
require.Equal(t, [4]byte(params.BeaconConfig().DenebForkVersion), nse.ForkVersion)
require.Equal(t, params.BeaconConfig().DenebForkEpoch, nse.Epoch)
sp := newSubnetTracker(subscribeParameters{
topicFormat: p2p.SyncCommitteeSubnetTopicFormat,
nse: nse,
getSubnetsToJoin: r.activeSyncSubnetIndices,
})
tfDyn2 := makeTestDynamicFamily(nse, p2p.SyncCommitteeSubnetTopicFormat, r.cfg.p2p.Encoding().ProtocolSuffix(), r.noopValidator, nil, r.activeSyncSubnetIndices, nil)
sp := newSubnetTracker(tfDyn2)
r.trySubscribeSubnets(sp)
assert.Equal(t, 2, len(r.cfg.p2p.PubSub().GetTopics()))
topicMap := map[string]bool{}
@@ -625,7 +723,7 @@ func TestSubscribeWithSyncSubnets_DynamicSwitchFork(t *testing.T) {
require.Equal(t, [4]byte(params.BeaconConfig().ElectraForkVersion), nse.ForkVersion)
require.Equal(t, params.BeaconConfig().ElectraForkEpoch, nse.Epoch)
sp.nse = nse
tfDyn2.nse = nse
// clear the cache and re-subscribe to subnets.
// this should result in the subscriptions being removed
cache.SyncSubnetIDs.EmptyAllCaches()

View File

@@ -2,6 +2,7 @@ package sync
import (
"fmt"
"sync"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p"
"github.com/OffchainLabs/prysm/v6/config/params"
@@ -9,24 +10,57 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
// AttestationTopicFamily
var _ GossipsubTopicFamilyWithDynamicSubnets = (*AttestationTopicFamily)(nil)
type AttestationTopicFamily struct {
type baseGossipsubTopicFamilyWithDynamicSubnets struct {
baseGossipsubTopicFamily
mu sync.Mutex
tracker *subnetTracker
unsubscribed bool
}
func (b *baseGossipsubTopicFamilyWithDynamicSubnets) Subscribe(tf GossipsubTopicFamilyWithDynamicSubnets) {
b.mu.Lock()
defer b.mu.Unlock()
if b.unsubscribed {
log.WithFields(logrus.Fields{
"topicFamily": fmt.Sprintf("%T", tf),
"digest": b.nse.ForkDigest,
"epoch": b.nse.Epoch,
}).Error("Cannot subscribe after unsubscribing")
return
}
b.tracker = b.syncService.subscribeToDynamicSubnetFamily(tf)
}
func (b *baseGossipsubTopicFamilyWithDynamicSubnets) Unsubscribe() {
b.mu.Lock()
defer b.mu.Unlock()
b.unsubscribed = true
b.syncService.pruneNotWanted(b.tracker, nil) // unsubscribe from all subnets
}
type AttestationTopicFamily struct {
baseGossipsubTopicFamilyWithDynamicSubnets
}
// NewAttestationTopicFamily creates a new AttestationTopicFamily.
func NewAttestationTopicFamily(s *Service, nse params.NetworkScheduleEntry) *AttestationTopicFamily {
return &AttestationTopicFamily{
baseGossipsubTopicFamily{
syncService: s,
nse: nse,
protocolSuffix: s.cfg.p2p.Encoding().ProtocolSuffix(),
attestationTopicFamily := &AttestationTopicFamily{
baseGossipsubTopicFamilyWithDynamicSubnets: baseGossipsubTopicFamilyWithDynamicSubnets{
baseGossipsubTopicFamily: baseGossipsubTopicFamily{
syncService: s,
nse: nse,
protocolSuffix: s.cfg.p2p.Encoding().ProtocolSuffix(),
},
},
}
return attestationTopicFamily
}
// Validator returns the validator function for attestation subnets.
@@ -56,32 +90,33 @@ func (a *AttestationTopicFamily) GetSubnetsForBroadcast(slot primitives.Slot) ma
// GetTopicsForNode returns all topics for the given node that are relevant to this topic family.
func (a *AttestationTopicFamily) GetTopicsForNode(node *enode.Node) ([]string, error) {
return getTopicsForNode(a.syncService, a, node, a.syncService.cfg.p2p.AttestationSubnets)
return getTopicsForNode(a.syncService, a, node, p2p.AttestationSubnets)
}
// TODO
func (a *AttestationTopicFamily) Subscribe() {
a.baseGossipsubTopicFamilyWithDynamicSubnets.Subscribe(a)
}
func (a *AttestationTopicFamily) Unsubscribe() {
a.baseGossipsubTopicFamilyWithDynamicSubnets.Unsubscribe()
}
// SyncCommitteeTopicFamily
var _ GossipsubTopicFamilyWithDynamicSubnets = (*SyncCommitteeTopicFamily)(nil)
type SyncCommitteeTopicFamily struct {
baseGossipsubTopicFamily
baseGossipsubTopicFamilyWithDynamicSubnets
}
// NewSyncCommitteeTopicFamily creates a new SyncCommitteeTopicFamily.
func NewSyncCommitteeTopicFamily(s *Service, nse params.NetworkScheduleEntry) *SyncCommitteeTopicFamily {
return &SyncCommitteeTopicFamily{
baseGossipsubTopicFamily{
syncService: s,
nse: nse,
protocolSuffix: s.cfg.p2p.Encoding().ProtocolSuffix(),
baseGossipsubTopicFamilyWithDynamicSubnets: baseGossipsubTopicFamilyWithDynamicSubnets{
baseGossipsubTopicFamily: baseGossipsubTopicFamily{
syncService: s,
nse: nse,
protocolSuffix: s.cfg.p2p.Encoding().ProtocolSuffix(),
},
},
}
}
@@ -113,32 +148,33 @@ func (s *SyncCommitteeTopicFamily) GetSubnetsForBroadcast(slot primitives.Slot)
// GetTopicsForNode returns all topics for the given node that are relevant to this topic family.
func (s *SyncCommitteeTopicFamily) GetTopicsForNode(node *enode.Node) ([]string, error) {
return getTopicsForNode(s.syncService, s, node, s.syncService.cfg.p2p.SyncSubnets)
return getTopicsForNode(s.syncService, s, node, p2p.SyncSubnets)
}
// TODO
func (s *SyncCommitteeTopicFamily) Subscribe() {
s.baseGossipsubTopicFamilyWithDynamicSubnets.Subscribe(s)
}
func (s *SyncCommitteeTopicFamily) Unsubscribe() {
s.baseGossipsubTopicFamilyWithDynamicSubnets.Unsubscribe()
}
// DataColumnTopicFamily
var _ GossipsubTopicFamilyWithDynamicSubnets = (*DataColumnTopicFamily)(nil)
type DataColumnTopicFamily struct {
baseGossipsubTopicFamily
baseGossipsubTopicFamilyWithDynamicSubnets
}
// NewDataColumnTopicFamily creates a new DataColumnTopicFamily.
func NewDataColumnTopicFamily(s *Service, nse params.NetworkScheduleEntry) *DataColumnTopicFamily {
return &DataColumnTopicFamily{
baseGossipsubTopicFamily{
syncService: s,
nse: nse,
protocolSuffix: s.cfg.p2p.Encoding().ProtocolSuffix(),
baseGossipsubTopicFamilyWithDynamicSubnets: baseGossipsubTopicFamilyWithDynamicSubnets{
baseGossipsubTopicFamily: baseGossipsubTopicFamily{
syncService: s,
nse: nse,
protocolSuffix: s.cfg.p2p.Encoding().ProtocolSuffix(),
},
},
}
}
@@ -170,16 +206,15 @@ func (d *DataColumnTopicFamily) GetSubnetsForBroadcast(slot primitives.Slot) map
// GetTopicsForNode returns all topics for the given node that are relevant to this topic family.
func (d *DataColumnTopicFamily) GetTopicsForNode(node *enode.Node) ([]string, error) {
return getTopicsForNode(d.syncService, d, node, d.syncService.cfg.p2p.DataColumnSubnets)
return getTopicsForNode(d.syncService, d, node, p2p.DataColumnSubnets)
}
// TODO
func (d *DataColumnTopicFamily) Subscribe() {
d.baseGossipsubTopicFamilyWithDynamicSubnets.Subscribe(d)
}
func (d *DataColumnTopicFamily) Unsubscribe() {
d.baseGossipsubTopicFamilyWithDynamicSubnets.Unsubscribe()
}
type nodeSubnetExtractor func(id enode.ID, n *enode.Node, r *enr.Record) (map[uint64]bool, error)
@@ -196,8 +231,7 @@ func getTopicsForNode(
currentSlot := s.cfg.clock.CurrentSlot()
neededSubnets := computeAllNeededSubnets(
currentSlot,
tf.GetSubnetsToJoin,
tf.GetSubnetsForBroadcast,
tf,
)
nodeSubnets, err := extractor(node.ID(), node, node.Record())

View File

@@ -39,7 +39,7 @@ func (b *BlobTopicFamily) GetFullTopicString() string {
}
func (b *BlobTopicFamily) Subscribe() {
b.syncService.subscribe(b)
b.syncService.subscribe(b.GetFullTopicString(), b.Validator(), b.Handler())
}
func (b *BlobTopicFamily) Unsubscribe() {

View File

@@ -36,7 +36,7 @@ func (b *BlockTopicFamily) GetFullTopicString() string {
}
func (b *BlockTopicFamily) Subscribe() {
b.syncService.subscribe(b)
b.syncService.subscribe(b.GetFullTopicString(), b.Validator(), b.Handler())
}
func (b *BlockTopicFamily) Unsubscribe() {
@@ -72,7 +72,7 @@ func (a *AggregateAndProofTopicFamily) GetFullTopicString() string {
}
func (a *AggregateAndProofTopicFamily) Subscribe() {
a.syncService.subscribe(a)
a.syncService.subscribe(a.GetFullTopicString(), a.Validator(), a.Handler())
}
func (a *AggregateAndProofTopicFamily) Unsubscribe() {
@@ -108,7 +108,7 @@ func (v *VoluntaryExitTopicFamily) GetFullTopicString() string {
}
func (v *VoluntaryExitTopicFamily) Subscribe() {
v.syncService.subscribe(v)
v.syncService.subscribe(v.GetFullTopicString(), v.Validator(), v.Handler())
}
func (v *VoluntaryExitTopicFamily) Unsubscribe() {
@@ -144,7 +144,7 @@ func (p *ProposerSlashingTopicFamily) GetFullTopicString() string {
}
func (p *ProposerSlashingTopicFamily) Subscribe() {
p.syncService.subscribe(p)
p.syncService.subscribe(p.GetFullTopicString(), p.Validator(), p.Handler())
}
func (p *ProposerSlashingTopicFamily) Unsubscribe() {
@@ -181,7 +181,7 @@ func (a *AttesterSlashingTopicFamily) GetFullTopicString() string {
// TODO: Do we really need to spawn go-routines here ?
func (a *AttesterSlashingTopicFamily) Subscribe() {
a.syncService.subscribe(a)
a.syncService.subscribe(a.GetFullTopicString(), a.Validator(), a.Handler())
}
func (a *AttesterSlashingTopicFamily) Unsubscribe() {
@@ -215,7 +215,7 @@ func (sc *SyncContributionAndProofTopicFamily) GetFullTopicString() string {
}
func (sc *SyncContributionAndProofTopicFamily) Subscribe() {
sc.syncService.subscribe(sc)
sc.syncService.subscribe(sc.GetFullTopicString(), sc.Validator(), sc.Handler())
}
func (sc *SyncContributionAndProofTopicFamily) Unsubscribe() {
@@ -251,7 +251,7 @@ func (l *LightClientOptimisticUpdateTopicFamily) GetFullTopicString() string {
}
func (l *LightClientOptimisticUpdateTopicFamily) Subscribe() {
l.syncService.subscribe(l)
l.syncService.subscribe(l.GetFullTopicString(), l.Validator(), l.Handler())
}
func (l *LightClientOptimisticUpdateTopicFamily) Unsubscribe() {
@@ -287,7 +287,7 @@ func (l *LightClientFinalityUpdateTopicFamily) GetFullTopicString() string {
}
func (l *LightClientFinalityUpdateTopicFamily) Subscribe() {
l.syncService.subscribe(l)
l.syncService.subscribe(l.GetFullTopicString(), l.Validator(), l.Handler())
}
func (l *LightClientFinalityUpdateTopicFamily) Unsubscribe() {
l.syncService.unSubscribeFromTopic(l.GetFullTopicString())
@@ -322,7 +322,7 @@ func (b *BlsToExecutionChangeTopicFamily) GetFullTopicString() string {
}
func (b *BlsToExecutionChangeTopicFamily) Subscribe() {
b.syncService.subscribe(b)
b.syncService.subscribe(b.GetFullTopicString(), b.Validator(), b.Handler())
}
func (b *BlsToExecutionChangeTopicFamily) Unsubscribe() {